1 /*
2    Copyright (c) 2005, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUP_C
26 #define DBTUP_DISK_ALLOC_CPP
27 #include "Dbtup.hpp"
28 #include <signaldata/LgmanContinueB.hpp>
29 #include "../dblqh/Dblqh.hpp"
30 
31 #define JAM_FILE_ID 426
32 
33 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
34 //#define DEBUG_LCP 1
35 //#define DEBUG_PGMAN 1
36 //#define DEBUG_EXTENT_BITS 1
37 //#define DEBUG_EXTENT_BITS_HASH 1
38 //#define DEBUG_UNDO 1
39 #endif
40 
41 #ifdef DEBUG_LCP
42 #define DEB_LCP(arglist) do { g_eventLogger->info arglist ; } while (0)
43 #else
44 #define DEB_LCP(arglist) do { } while (0)
45 #endif
46 
47 #ifdef DEBUG_PGMAN
48 #define DEB_PGMAN(arglist) do { g_eventLogger->info arglist ; } while (0)
49 #else
50 #define DEB_PGMAN(arglist) do { } while (0)
51 #endif
52 
53 #ifdef DEBUG_EXTENT_BITS
54 #define DEB_EXTENT_BITS(arglist) do { g_eventLogger->info arglist ; } while (0)
55 #else
56 #define DEB_EXTENT_BITS(arglist) do { } while (0)
57 #endif
58 
59 #ifdef DEBUG_EXTENT_BITS_HASH
60 #define DEB_EXTENT_BITS_HASH(arglist) do { g_eventLogger->info arglist ; } while (0)
61 #else
62 #define DEB_EXTENT_BITS_HASH(arglist) do { } while (0)
63 #endif
64 
65 #ifdef DEBUG_UNDO
66 #define DEB_UNDO(arglist) do { g_eventLogger->info arglist ; } while (0)
67 #else
68 #define DEB_UNDO(arglist) do { } while (0)
69 #endif
70 
71 static
72 NdbOut&
operator <<(NdbOut & out,const Ptr<Dbtup::Page> & ptr)73 operator<<(NdbOut& out, const Ptr<Dbtup::Page> & ptr)
74 {
75   out << "[ Page: ptr.i: " << ptr.i
76       << " ["
77       << " m_m_page_lsn_hi: " << ptr.p->m_page_header.m_page_lsn_hi
78       << " m_m_page_lsn_lo: " << ptr.p->m_page_header.m_page_lsn_lo
79       << " m_page_type: " << ptr.p->m_page_header.m_page_type
80       << " m_file_no: " << ptr.p->m_file_no
81       << " m_page_no: " << ptr.p->m_page_no
82       << " m_table_id: " << ptr.p->m_table_id
83       << " m_fragment_id: " << ptr.p->m_fragment_id
84       << " m_extent_no: " << ptr.p->m_extent_no
85       << " m_extent_info_ptr: " << ptr.p->m_extent_info_ptr
86       << " m_restart_seq: " << ptr.p->m_restart_seq
87       << "]"
88       << " list_index: " << ptr.p->list_index
89       << " free_space: " << ptr.p->free_space
90       << " uncommitted_used_space: " << ptr.p->uncommitted_used_space
91       << " ]";
92   return out;
93 }
94 
95 static
96 NdbOut&
operator <<(NdbOut & out,const Ptr<Dbtup::Page_request> & ptr)97 operator<<(NdbOut& out, const Ptr<Dbtup::Page_request> & ptr)
98 {
99   out << "[ Page_request: ptr.i: " << ptr.i
100       << " " << ptr.p->m_key
101       << " m_original_estimated_free_space: " << ptr.p->m_original_estimated_free_space
102       << " m_list_index: " << ptr.p->m_list_index
103       << " m_frag_ptr_i: " << ptr.p->m_frag_ptr_i
104       << " m_extent_info_ptr: " << ptr.p->m_extent_info_ptr
105       << " m_ref_count: " << ptr.p->m_ref_count
106       << " m_uncommitted_used_space: " << ptr.p->m_uncommitted_used_space
107       << " ]";
108 
109   return out;
110 }
111 
112 static
113 NdbOut&
operator <<(NdbOut & out,const Ptr<Dbtup::Extent_info> & ptr)114 operator<<(NdbOut& out, const Ptr<Dbtup::Extent_info> & ptr)
115 {
116   out << "[ Extent_info: ptr.i " << ptr.i
117       << " " << ptr.p->m_key
118       << " m_first_page_no: " << ptr.p->m_first_page_no
119       << " m_empty_page_no: " << ptr.p->m_empty_page_no
120       << " m_key: ["
121       << " m_file_no=" << ptr.p->m_key.m_file_no
122       << " m_page_no=" << ptr.p->m_key.m_page_no
123       << " m_page_idx=" << ptr.p->m_key.m_page_idx
124       << " ]"
125       << " m_free_space: " << ptr.p->m_free_space
126       << " m_free_matrix_pos: " << ptr.p->m_free_matrix_pos
127       << " m_free_page_count: [";
128 
129   for(Uint32 i = 0; i<Dbtup::EXTENT_SEARCH_MATRIX_COLS; i++)
130     out << " " << ptr.p->m_free_page_count[i];
131   out << " ] ]";
132 
133   return out;
134 }
135 
136 void
dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)137 Dbtup::dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)
138 {
139   const Uint32 limit = 512;
140   ndbout_c("dirty pages");
141   for(Uint32 i = 0; i < EXTENT_SEARCH_MATRIX_COLS; i++)
142   {
143     printf("  %d : ", i);
144     PagePtr ptr;
145     Page_pool *pool= (Page_pool*)&m_global_page_pool;
146     Local_Page_list list(*pool, alloc.m_dirty_pages[i]);
147     Uint32 c = 0;
148     for (list.first(ptr); c < limit && !ptr.isNull(); c++, list.next(ptr))
149     {
150       ndbout << ptr << " ";
151     }
152     if (c == limit)
153     {
154       ndbout << "MAXLIMIT ";
155     }
156     ndbout_c(" ");
157   }
158   ndbout_c("page requests");
159   for(Uint32 i = 0; i < EXTENT_SEARCH_MATRIX_COLS; i++)
160   {
161     printf("  %d : ", i);
162     Ptr<Page_request> ptr;
163     Local_page_request_list list(c_page_request_pool,
164 				 alloc.m_page_requests[i]);
165     Uint32 c = 0;
166     for (list.first(ptr); c < limit && !ptr.isNull(); c++, list.next(ptr))
167     {
168       ndbout << ptr << " ";
169     }
170     if (c == limit)
171     {
172       ndbout << "MAXLIMIT ";
173     }
174     ndbout_c(" ");
175   }
176 
177   ndbout_c("Extent matrix");
178   for(Uint32 i = 0; i<alloc.SZ; i++)
179   {
180     printf("  %d : ", i);
181     Ptr<Extent_info> ptr;
182     Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[i]);
183     Uint32 c = 0;
184     for (list.first(ptr); c < limit && !ptr.isNull(); c++, list.next(ptr))
185     {
186       ndbout << ptr << " ";
187     }
188     if (c == limit)
189     {
190       ndbout << "MAXLIMIT ";
191     }
192     ndbout_c(" ");
193   }
194 
195   if (alloc.m_curr_extent_info_ptr_i != RNIL)
196   {
197     Ptr<Extent_info> ptr;
198     c_extent_pool.getPtr(ptr, alloc.m_curr_extent_info_ptr_i);
199     ndbout << "current extent: " << ptr << endl;
200   }
201 }
202 
203 #define ddrequire(x) do { if(unlikely(!(x))) { dump_disk_alloc(alloc); ndbabort(); } } while(0)
204 #if defined(VM_TRACE) || defined(ERROR_INSERT)
205 #define ddassert(x) do { if(unlikely(!(x))) { dump_disk_alloc(alloc); ndbabort(); } } while(0)
206 #else
207 #define ddassert(x)
208 #endif
209 
Disk_alloc_info(const Tablerec * tabPtrP,Uint32 extent_size)210 Dbtup::Disk_alloc_info::Disk_alloc_info(const Tablerec* tabPtrP,
211 					Uint32 extent_size)
212 {
213   m_extent_size = extent_size;
214   m_curr_extent_info_ptr_i = RNIL;
215   if (tabPtrP->m_no_of_disk_attributes == 0)
216     return;
217 
218   Uint32 min_size= 4*tabPtrP->m_offsets[DD].m_fix_header_size;
219 
220   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
221   {
222     Uint32 recs_per_page= (4*Tup_fixsize_page::DATA_WORDS)/min_size;
223     m_page_free_bits_map[0] = recs_per_page; // 100% free
224     m_page_free_bits_map[1] = 1;
225     m_page_free_bits_map[2] = 0;
226     m_page_free_bits_map[3] = 0;
227 
228     Uint32 max= recs_per_page * extent_size;
229     for(Uint32 i = 0; i<EXTENT_SEARCH_MATRIX_ROWS; i++)
230     {
231       m_total_extent_free_space_thresholds[i] =
232 	(EXTENT_SEARCH_MATRIX_ROWS - i - 1)*max/EXTENT_SEARCH_MATRIX_ROWS;
233     }
234   }
235   else
236   {
237     abort();
238   }
239 }
240 
241 Uint32
find_extent(Uint32 sz) const242 Dbtup::Disk_alloc_info::find_extent(Uint32 sz) const
243 {
244   /**
245    * Find an extent with sufficient space for sz
246    * Find the biggest available (with most free space)
247    * Return position in matrix
248    */
249   Uint32 col = calc_page_free_bits(sz);
250   Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
251   for(Uint32 i= 0; i<EXTENT_SEARCH_MATRIX_SIZE; i++)
252   {
253     // Check that it can cater for request
254     if (!m_free_extents[i].isEmpty())
255     {
256       return i;
257     }
258 
259     if ((i & mask) >= col)
260     {
261       i = (i & ~mask) + mask;
262     }
263   }
264 
265   return RNIL;
266 }
267 
268 Uint32
calc_extent_pos(const Extent_info * extP) const269 Dbtup::Disk_alloc_info::calc_extent_pos(const Extent_info* extP) const
270 {
271   Uint32 free= extP->m_free_space;
272   Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
273 
274   Uint32 col= 0, row=0;
275 
276   /**
277    * Find correct row based on total free space
278    *   if zero (or very small free space) put
279    *     absolutly last
280    */
281   {
282     const Uint32 *arr= m_total_extent_free_space_thresholds;
283     for(; free < * arr++; row++)
284       assert(row < EXTENT_SEARCH_MATRIX_ROWS);
285   }
286 
287   /**
288    * Find correct col based on largest available chunk
289    */
290   {
291     const Uint16 *arr= extP->m_free_page_count;
292     for(; col < EXTENT_SEARCH_MATRIX_COLS && * arr++ == 0; col++);
293   }
294 
295   /**
296    * NOTE
297    *
298    *   If free space on extent is small or zero,
299    *     col will be = EXTENT_SEARCH_MATRIX_COLS
300    *     row will be = EXTENT_SEARCH_MATRIX_ROWS
301    *   in that case pos will be col * row = max pos
302    *   (as fixed by + 1 in declaration)
303    */
304   Uint32 pos= (row * (mask + 1)) + (col & mask);
305 
306   assert(pos < EXTENT_SEARCH_MATRIX_SIZE);
307   return pos;
308 }
309 
310 void
update_extent_pos(EmulatedJamBuffer * jamBuf,Disk_alloc_info & alloc,Ptr<Extent_info> extentPtr,Int32 delta)311 Dbtup::update_extent_pos(EmulatedJamBuffer* jamBuf,
312                          Disk_alloc_info& alloc,
313                          Ptr<Extent_info> extentPtr,
314                          Int32 delta)
315 {
316   if (delta < 0)
317   {
318     thrjam(jamBuf);
319     Uint32 sub = Uint32(- delta);
320     ddrequire(extentPtr.p->m_free_space >= sub);
321     extentPtr.p->m_free_space -= sub;
322   }
323   else
324   {
325     thrjam(jamBuf);
326     extentPtr.p->m_free_space += delta;
327     ndbassert(Uint32(delta) <= alloc.calc_page_free_space(0));
328   }
329 
330 #if defined(VM_TRACE) || defined(ERROR_INSERT)
331   Uint32 cnt = 0;
332   Uint32 sum = 0;
333   for(Uint32 i = 0; i < EXTENT_SEARCH_MATRIX_COLS; i++)
334   {
335     cnt += extentPtr.p->m_free_page_count[i];
336     sum += extentPtr.p->m_free_page_count[i] * alloc.calc_page_free_space(i);
337   }
338   if (extentPtr.p->m_free_page_count[0] == cnt)
339   {
340     ddrequire(extentPtr.p->m_free_space == cnt*alloc.m_page_free_bits_map[0]);
341   }
342   else
343   {
344     ddrequire(extentPtr.p->m_free_space < cnt*alloc.m_page_free_bits_map[0]);
345   }
346   ddrequire(extentPtr.p->m_free_space >= sum);
347   ddrequire(extentPtr.p->m_free_space <= cnt*alloc.m_page_free_bits_map[0]);
348 #endif
349 
350   Uint32 old = extentPtr.p->m_free_matrix_pos;
351   if (old != RNIL)
352   {
353     thrjam(jamBuf);
354     Uint32 pos = alloc.calc_extent_pos(extentPtr.p);
355     if (old != pos)
356     {
357       thrjam(jamBuf);
358       Local_extent_info_list old_list(c_extent_pool, alloc.m_free_extents[old]);
359       Local_extent_info_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
360       old_list.remove(extentPtr);
361       new_list.addFirst(extentPtr);
362       extentPtr.p->m_free_matrix_pos= pos;
363     }
364   }
365   else
366   {
367     ddrequire(alloc.m_curr_extent_info_ptr_i == extentPtr.i);
368   }
369 }
370 
371 void
restart_setup_page(Ptr<Fragrecord> fragPtr,Disk_alloc_info & alloc,PagePtr pagePtr,Int32 estimate)372 Dbtup::restart_setup_page(Ptr<Fragrecord> fragPtr,
373                           Disk_alloc_info& alloc,
374                           PagePtr pagePtr,
375                           Int32 estimate)
376 {
377   jam();
378   /**
379    * Link to extent, clear uncommitted_used_space
380    */
381   pagePtr.p->uncommitted_used_space = 0;
382 
383   Extent_info key;
384   key.m_key.m_file_no = pagePtr.p->m_file_no;
385   key.m_key.m_page_idx = pagePtr.p->m_extent_no;
386   Ptr<Extent_info> extentPtr;
387   if (!c_extent_hash.find(extentPtr, key))
388   {
389     g_eventLogger->info("(%u)Crash on page(%u,%u) in tab(%u,%u),"
390                         " extent page: %u"
391                         " restart_seq(%u,%u)",
392                         instance(),
393                         pagePtr.p->m_file_no,
394                         pagePtr.p->m_page_no,
395                         fragPtr.p->fragTableId,
396                         fragPtr.p->fragmentId,
397                         pagePtr.p->m_extent_no,
398                         pagePtr.p->m_restart_seq,
399                         globalData.m_restart_seq);
400     ndbabort();
401   }
402   DEB_EXTENT_BITS(("(%u)restart_setup_page(%u,%u) in tab(%u,%u),"
403                    " extent page: %u.%u"
404                    " restart_seq(%u,%u)",
405                    instance(),
406                    pagePtr.p->m_file_no,
407                    pagePtr.p->m_page_no,
408                    fragPtr.p->fragTableId,
409                    fragPtr.p->fragmentId,
410                    pagePtr.p->m_extent_no,
411                    extentPtr.i,
412                    pagePtr.p->m_restart_seq,
413                    globalData.m_restart_seq));
414 
415   pagePtr.p->m_restart_seq = globalData.m_restart_seq;
416   pagePtr.p->m_extent_info_ptr = extentPtr.i;
417 
418   Uint32 real_free = pagePtr.p->free_space;
419   const bool prealloc = estimate >= 0;
420   Uint32 estimated;
421   if (prealloc)
422   {
423     jam();
424     /**
425      * If this is during prealloc, use estimate from there
426      */
427     estimated = (Uint32)estimate;
428     Uint32 page_estimated =
429       alloc.calc_page_free_space(alloc.calc_page_free_bits(real_free));
430     if (page_estimated != estimated && real_free == 0)
431     {
432       jam();
433       /**
434        * The page claims it is full, but the extent bits says that it isn't
435        * full, this can occur if the tablespace is using the v1 page format.
436        * It must be an old dropped page and thus we can safely overwrite it.
437        */
438       g_eventLogger->info("(%u)tab(%u,%u), page(%u,%u):%u"
439                           ", inconsistency between extent and page, most"
440                           " likely due to using v1 pages, we assume page"
441                           " comes from dropped table and is really empty",
442                           instance(),
443                           fragPtr.p->fragTableId,
444                           fragPtr.p->fragmentId,
445                           pagePtr.p->m_file_no,
446                           pagePtr.p->m_page_no,
447                           pagePtr.i);
448       ndbassert(false); //Crash in debug for analysis
449       Ptr<Tablerec> tabPtr;
450       tabPtr.i= fragPtr.p->fragTableId;
451       ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
452       convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
453       estimated = alloc.calc_page_free_space(
454         alloc.calc_page_free_bits(real_free));
455     }
456   }
457   else
458   {
459     jam();
460     /**
461      * else use the estimate based on the actual free space
462      */
463     estimated = alloc.calc_page_free_space(alloc.calc_page_free_bits(real_free));
464   }
465 
466 #if defined(VM_TRACE) || defined(ERROR_INSERT)
467   {
468     Local_key page;
469     page.m_file_no = pagePtr.p->m_file_no;
470     page.m_page_no = pagePtr.p->m_page_no;
471 
472     D("Tablespace_client - restart_setup_page");
473     Tablespace_client tsman(0, this, c_tsman,
474 			    0, 0, 0, 0);
475     unsigned uncommitted, committed;
476     uncommitted = committed = ~(unsigned)0;
477     (void) tsman.get_page_free_bits(&page, &uncommitted, &committed);
478     jamEntry();
479 
480     if (alloc.calc_page_free_bits(real_free) != committed)
481     {
482       Uint64 page_lsn = 0;
483       page_lsn += pagePtr.p->m_page_header.m_page_lsn_hi;
484       page_lsn <<= 32;
485       page_lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
486       g_eventLogger->info("(%u)page(%u,%u):%u, calc_free_bits: %u,"
487                           " committed: %u, uncommitted: %u, free_space: %u"
488                           ", page_lsn: %llu",
489                           instance(),
490                           page.m_file_no,
491                           page.m_page_no,
492                           pagePtr.i,
493                           alloc.calc_page_free_bits(real_free),
494                           committed,
495                           uncommitted,
496                           real_free,
497                           page_lsn);
498     }
499     ddassert(alloc.calc_page_free_bits(real_free) == committed);
500     if (prealloc)
501     {
502       /**
503        * tsman.alloc_page sets the uncommitted-bits to EXTENT_SEARCH_MATRIX_COLS -1
504        *   to avoid page being preallocated several times
505        */
506       ddassert(uncommitted == EXTENT_SEARCH_MATRIX_COLS - 1);
507     }
508     else
509     {
510       ddassert(committed == uncommitted);
511     }
512   }
513 #endif
514 
515   ddrequire(real_free >= estimated);
516 
517   if (real_free != estimated)
518   {
519     jam();
520     Uint32 delta = (real_free-estimated);
521     update_extent_pos(jamBuffer(), alloc, extentPtr, delta);
522   }
523 }
524 
525 /**
526  * - Page free bits -
527  * 0 = 00 - free - 100% free
528  * 1 = 01 - atleast one row free
529  * 2 = 10 - full
530  * 3 = 11 - full
531  *
532  * sz is always 1 when coming here, so calc_page_free_bits will
533  * will always return 1 here. This will change with implementation
534  * var-sized disk attributes.
535  */
536 
537 #define DBG_DISK 0
538 
539 int
disk_page_prealloc(Signal * signal,Ptr<Fragrecord> fragPtr,Local_key * key,Uint32 sz)540 Dbtup::disk_page_prealloc(Signal* signal,
541 			  Ptr<Fragrecord> fragPtr,
542 			  Local_key* key, Uint32 sz)
543 {
544   int err;
545   Uint32 i, ptrI;
546   Ptr<Page_request> req;
547   Fragrecord* fragPtrP = fragPtr.p;
548   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
549   Uint32 idx= alloc.calc_page_free_bits(sz);
550   D("Tablespace_client - disk_page_prealloc");
551 
552   /**
553    * 1) search current dirty pages
554    * First check for empty pages and then search for non-full pages.
555    */
556   for(i= 0; i <= idx; i++)
557   {
558     if (!alloc.m_dirty_pages[i].isEmpty())
559     {
560       jam();
561       jamLine(i);
562       ptrI= alloc.m_dirty_pages[i].getFirst();
563       Ptr<GlobalPage> gpage;
564       m_global_page_pool.getPtr(gpage, ptrI);
565 
566       PagePtr tmp;
567       tmp.i = gpage.i;
568       tmp.p = reinterpret_cast<Page*>(gpage.p);
569       disk_page_prealloc_dirty_page(alloc, tmp, i, sz, fragPtrP);
570       key->m_page_no= tmp.p->m_page_no;
571       key->m_file_no= tmp.p->m_file_no;
572       jam();
573       return 0; // Page in memory
574     }
575   }
576 
577   /**
578    * Search outanding page requests
579    *   callback does not need to access page request again
580    *   as it's not the first request to this page
581    */
582   for(i= 0; i <= idx; i++)
583   {
584     if (!alloc.m_page_requests[i].isEmpty())
585     {
586       jam();
587       jamLine(i);
588       ptrI= alloc.m_page_requests[i].getFirst();
589       Ptr<Page_request> req;
590       c_page_request_pool.getPtr(req, ptrI);
591 
592       disk_page_prealloc_transit_page(alloc, req, i, sz);
593       * key = req.p->m_key;
594       jam();
595       return 0;
596     }
597   }
598 
599   /**
600    * We need to request a page...
601    */
602   if (!c_page_request_pool.seize(req))
603   {
604     jam();
605     err= 1605;
606     return -err;
607   }
608 
609   req.p->m_ref_count= 1;
610   req.p->m_frag_ptr_i= fragPtr.i;
611   req.p->m_uncommitted_used_space= sz;
612 
613   int pageBits = 0; // received
614   Ptr<Extent_info> ext;
615   const Uint32 bits = alloc.calc_page_free_bits(sz); // required
616   bool found= false;
617 
618   /**
619    * Do we have a current extent
620    */
621   if ((ext.i = alloc.m_curr_extent_info_ptr_i) != RNIL)
622   {
623     jam();
624     {
625       Tablespace_client tsman(signal, this, c_tsman,
626                     fragPtrP->fragTableId,
627                     fragPtrP->fragmentId,
628                     c_lqh->getCreateSchemaVersion(fragPtrP->fragTableId),
629                     fragPtrP->m_tablespace_id);
630       c_extent_pool.getPtr(ext);
631       pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
632     }
633     if (pageBits >= 0)
634     {
635       jamEntry();
636       jamLine(pageBits);
637       found= true;
638     }
639     else
640     {
641       jamEntry();
642       /**
643        * The current extent is not in a free list
644        *   and since it couldn't accomodate the request
645        *   we put it on the free list per state (so also
646        *   a full page is in one of the m_free_extents
647        *   lists).
648        */
649       alloc.m_curr_extent_info_ptr_i = RNIL;
650       Uint32 pos= alloc.calc_extent_pos(ext.p);
651       ext.p->m_free_matrix_pos = pos;
652       Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[pos]);
653       list.addFirst(ext);
654     }
655   }
656 
657   if (!found)
658   {
659     Uint32 pos;
660     if ((pos= alloc.find_extent(sz)) != RNIL)
661     {
662       jam();
663       Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[pos]);
664       list.first(ext);
665       list.remove(ext);
666     }
667     else
668     {
669       jam();
670       /**
671        * We need to alloc an extent
672        */
673       if (!c_extent_pool.seize(ext))
674       {
675 	jam();
676 	err= 1606;
677 	c_page_request_pool.release(req);
678 	return -err;
679       }
680       {
681         Tablespace_client tsman(signal, this, c_tsman,
682                       fragPtrP->fragTableId,
683                       fragPtrP->fragmentId,
684                       c_lqh->getCreateSchemaVersion(fragPtrP->fragTableId),
685                       fragPtrP->m_tablespace_id);
686         err= tsman.alloc_extent(&ext.p->m_key);
687       }
688       if (err < 0)
689       {
690         jamEntry();
691         c_extent_pool.release(ext);
692         c_page_request_pool.release(req);
693         return err;
694       }
695 
696       int pages= err;
697 
698 #ifdef VM_TRACE
699       ndbout << "allocated " << pages << " pages: " << ext.p->m_key
700 	     << " table: " << fragPtr.p->fragTableId
701 	     << " fragment: " << fragPtr.p->fragmentId << endl;
702 #endif
703       ext.p->m_first_page_no = ext.p->m_key.m_page_no;
704       memset(ext.p->m_free_page_count, 0, sizeof(ext.p->m_free_page_count));
705       ext.p->m_free_space= alloc.m_page_free_bits_map[0] * pages;
706       ext.p->m_free_page_count[0]= pages; // All pages are "free"-est
707       ext.p->m_empty_page_no = 0;
708 
709       DEB_EXTENT_BITS_HASH((
710                "(%u)new:extent .i=%u in tab(%u,%u),"
711                " page(%u,%u)->%u,"
712                " empty_page: %u",
713                 instance(),
714                 ext.i,
715                 fragPtr.p->fragTableId,
716                 fragPtr.p->fragmentId,
717                 ext.p->m_key.m_file_no,
718                 ext.p->m_first_page_no,
719                 ext.p->m_first_page_no + (pages - 1),
720                 ext.p->m_empty_page_no));
721 
722       c_extent_hash.add(ext);
723 
724       Local_fragment_extent_list list1(c_extent_pool, alloc.m_extent_list);
725       list1.addFirst(ext);
726     }
727     jam();
728     alloc.m_curr_extent_info_ptr_i= ext.i;
729     ext.p->m_free_matrix_pos= RNIL;
730     {
731       Tablespace_client tsman(signal, this, c_tsman,
732                     fragPtrP->fragTableId,
733                     fragPtrP->fragmentId,
734                     c_lqh->getCreateSchemaVersion(fragPtrP->fragTableId),
735                     fragPtrP->m_tablespace_id);
736       pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
737     }
738     jamEntry();
739     ddrequire(pageBits >= 0);
740   }
741 
742   /**
743    * We have a page from an extent
744    */
745   *key= req.p->m_key= ext.p->m_key;
746 
747   /**
748    * We don't know exact free space of page
749    *   but we know what page free bits it has.
750    *   compute free space based on them
751    */
752   Uint32 size= alloc.calc_page_free_space((Uint32)pageBits);
753 
754   ddrequire(size >= sz);
755   req.p->m_original_estimated_free_space = size;
756 
757   Uint32 new_size = size - sz;   // Subtract alloc rec
758   Uint32 newPageBits= alloc.calc_page_free_bits(new_size);
759   ndbrequire(newPageBits != (Uint32)pageBits)
760   {
761     jam();
762     /**
763      * We should always enter this path. When the new page was empty
764      * before coming here, then it will go from empty state to either
765      * non-full or to the full state. If we come here with a page which
766      * non-full before, then we will enter the full state. We will
767      * possibly return it to the non-full list when the real page have
768      * been read and we know the exact fullness level.
769      */
770     DEB_EXTENT_BITS(("(%u)alloc page, extent(%u), pageBits: %u,"
771                      " newPageBits: %u, free_page_count(%u,%u)",
772                      instance(),
773                      ext.p->m_key.m_page_idx,
774                      pageBits,
775                      newPageBits,
776                      ext.p->m_free_page_count[pageBits],
777                      ext.p->m_free_page_count[newPageBits]));
778     ddrequire(ext.p->m_free_page_count[pageBits] > 0);
779     ext.p->m_free_page_count[pageBits]--;
780     ext.p->m_free_page_count[newPageBits]++;
781 
782   }
783   update_extent_pos(jamBuffer(), alloc, ext, -Int32(sz));
784 
785   // And put page request in correct free list
786   idx= alloc.calc_page_free_bits(new_size);
787   jamLine(idx);
788   {
789     Local_page_request_list list(c_page_request_pool,
790 				 alloc.m_page_requests[idx]);
791 
792     list.addLast(req);
793   }
794   req.p->m_list_index= idx;
795   req.p->m_extent_info_ptr= ext.i;
796 
797   Page_cache_client::Request preq;
798   preq.m_page = *key;
799   preq.m_table_id = fragPtr.p->fragTableId;
800   preq.m_fragment_id = fragPtr.p->fragmentId;
801   preq.m_callback.m_callbackData= req.i;
802   preq.m_callback.m_callbackFunction =
803     safe_cast(&Dbtup::disk_page_prealloc_callback);
804 
805   int flags= Page_cache_client::ALLOC_REQ;
806   if (pageBits == 0)
807   {
808     jam();
809     flags |= Page_cache_client::EMPTY_PAGE;
810     if (ext.p->m_first_page_no + ext.p->m_empty_page_no == key->m_page_no)
811     {
812       jam();
813       ext.p->m_empty_page_no++;
814       DEB_EXTENT_BITS(("(%u)extent(%u) new page in tab(%u,%u), first_page(%u,%u)"
815                        " empty_page: %u",
816                 instance(),
817                 ext.p->m_key.m_page_idx,
818                 fragPtr.p->fragTableId,
819                 fragPtr.p->fragmentId,
820                 key->m_file_no,
821                 key->m_page_no,
822                 ext.p->m_empty_page_no));
823     }
824     else
825     {
826       DEB_EXTENT_BITS(("(%u)extent(%u) new page in tab(%u,%u), page(%u,%u)",
827                 instance(),
828                 ext.p->m_key.m_page_idx,
829                 fragPtr.p->fragTableId,
830                 fragPtr.p->fragmentId,
831                 key->m_file_no,
832                 key->m_page_no));
833     }
834     preq.m_callback.m_callbackFunction =
835       safe_cast(&Dbtup::disk_page_prealloc_initial_callback);
836   }
837 
838   Page_cache_client pgman(this, c_pgman);
839   int res= pgman.get_page(signal, preq, flags);
840   jamEntry();
841   switch(res)
842   {
843   case 0:
844     jam();
845     break;
846   case -1:
847     return -1604;
848   case -1518:
849     return -res;
850   default:
851     ndbrequire(res > 0);
852     jam();
853     execute(signal, preq.m_callback, res); // run callback
854   }
855 
856   return res;
857 }
858 
859 void
disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,PagePtr pagePtr,Uint32 old_idx,Uint32 sz,Fragrecord * fragPtrP)860 Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
861 				     PagePtr pagePtr,
862 				     Uint32 old_idx,
863                                      Uint32 sz,
864                                      Fragrecord *fragPtrP)
865 {
866   jam();
867   jamLine(pagePtr.i);
868   ddrequire(pagePtr.p->list_index == old_idx);
869 
870   Uint32 free= pagePtr.p->free_space;
871   Uint32 used= pagePtr.p->uncommitted_used_space + sz;
872   Uint32 ext= pagePtr.p->m_extent_info_ptr;
873 
874   ddrequire(free >= used);
875   Ptr<Extent_info> extentPtr;
876   c_extent_pool.getPtr(extentPtr, ext);
877 
878   Uint32 new_idx= alloc.calc_page_free_bits(free - used);
879 
880   if (old_idx != new_idx)
881   {
882     jam();
883     disk_page_move_dirty_page(alloc,
884                               extentPtr,
885                               pagePtr,
886                               old_idx,
887                               new_idx,
888                               fragPtrP);
889   }
890 
891   pagePtr.p->uncommitted_used_space = used;
892   update_extent_pos(jamBuffer(), alloc, extentPtr, -Int32(sz));
893 }
894 
895 
896 void
disk_page_prealloc_transit_page(Disk_alloc_info & alloc,Ptr<Page_request> req,Uint32 old_idx,Uint32 sz)897 Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
898 				       Ptr<Page_request> req,
899 				       Uint32 old_idx, Uint32 sz)
900 {
901   jam();
902   ddrequire(req.p->m_list_index == old_idx);
903 
904   Uint32 free= req.p->m_original_estimated_free_space;
905   Uint32 used= req.p->m_uncommitted_used_space + sz;
906   Uint32 ext= req.p->m_extent_info_ptr;
907 
908   Ptr<Extent_info> extentPtr;
909   c_extent_pool.getPtr(extentPtr, ext);
910 
911   ddrequire(free >= used);
912   Uint32 new_idx= alloc.calc_page_free_bits(free - used);
913 
914   if (old_idx != new_idx)
915   {
916     jam();
917     disk_page_move_page_request(alloc, extentPtr, req, old_idx, new_idx);
918   }
919 
920   req.p->m_uncommitted_used_space = used;
921   update_extent_pos(jamBuffer(), alloc, extentPtr, -Int32(sz));
922 }
923 
924 void
disk_page_prealloc_callback(Signal * signal,Uint32 page_request,Uint32 page_id)925 Dbtup::disk_page_prealloc_callback(Signal* signal,
926 				   Uint32 page_request, Uint32 page_id)
927 {
928   jamEntry();
929 
930   Ptr<Page_request> req;
931   c_page_request_pool.getPtr(req, page_request);
932 
933   Ptr<GlobalPage> gpage;
934   m_global_page_pool.getPtr(gpage, page_id);
935 
936   Ptr<Fragrecord> fragPtr;
937   fragPtr.i= req.p->m_frag_ptr_i;
938   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
939 
940   PagePtr pagePtr;
941   pagePtr.i = gpage.i;
942   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
943 
944   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
945 
946   Local_key key = req.p->m_key;
947   if (key.m_file_no != pagePtr.p->m_file_no ||
948       key.m_page_no != pagePtr.p->m_page_no ||
949       fragPtr.p->fragTableId != pagePtr.p->m_table_id ||
950       fragPtr.p->fragmentId != pagePtr.p->m_fragment_id ||
951       pagePtr.p->m_restart_seq == 0)
952   {
953     jam();
954     /**
955      * At this point we are reading what should be an initialised page
956      * and thus file_no, page_no, table and fragment id should be correct.
957      * If not crash and provide details.
958      */
959     g_eventLogger->info("(%u)key(%u,%u), page(%u,%u), restart_seq(%u,%u)"
960                         "key_tab(%u,%u), page_tab(%u,%u)",
961                         instance(),
962                         key.m_file_no,
963                         key.m_page_no,
964                         pagePtr.p->m_file_no,
965                         pagePtr.p->m_page_no,
966                         globalData.m_restart_seq,
967                         pagePtr.p->m_restart_seq,
968                         fragPtr.p->fragTableId,
969                         fragPtr.p->fragmentId,
970                         pagePtr.p->m_table_id,
971                         pagePtr.p->m_fragment_id);
972     ndbabort();
973   }
974   if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
975   {
976     jam();
977     D(V(pagePtr.p->m_restart_seq) << V(globalData.m_restart_seq));
978     restart_setup_page(fragPtr,
979                        alloc,
980                        pagePtr,
981                        req.p->m_original_estimated_free_space);
982   }
983 
984   Ptr<Extent_info> extentPtr;
985   c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
986 
987   pagePtr.p->uncommitted_used_space += req.p->m_uncommitted_used_space;
988   ddrequire(pagePtr.p->free_space >= pagePtr.p->uncommitted_used_space);
989 
990   Uint32 free = pagePtr.p->free_space - pagePtr.p->uncommitted_used_space;
991   Uint32 idx = req.p->m_list_index;
992   Uint32 real_idx = alloc.calc_page_free_bits(free);
993 
994   if (idx != real_idx)
995   {
996     jam();
997 
998     DEB_EXTENT_BITS((
999       "(%u)extent(%u) page(%u,%u):%u u_u_s: %u, free:%u idx:%u, new_idx:%u"
1000       ", free_page_count(%u,%u)",
1001       instance(),
1002       extentPtr.p->m_key.m_page_idx,
1003       pagePtr.p->m_file_no,
1004       pagePtr.p->m_page_no,
1005       pagePtr.i,
1006       pagePtr.p->uncommitted_used_space,
1007       free,
1008       idx,
1009       real_idx,
1010       extentPtr.p->m_free_page_count[idx],
1011       extentPtr.p->m_free_page_count[real_idx]));
1012 
1013     ddrequire(extentPtr.p->m_free_page_count[idx] > 0);
1014     extentPtr.p->m_free_page_count[idx]--;
1015     extentPtr.p->m_free_page_count[real_idx]++;
1016     update_extent_pos(jamBuffer(), alloc, extentPtr, 0);
1017   }
1018   {
1019     /**
1020      * add to dirty list
1021      */
1022     pagePtr.p->list_index = real_idx;
1023     Page_pool *cheat_pool= (Page_pool*)&m_global_page_pool;
1024     Local_Page_list list(* cheat_pool, alloc.m_dirty_pages[real_idx]);
1025     list.addFirst(pagePtr);
1026   }
1027 
1028   {
1029     /**
1030      * release page request
1031      */
1032     Local_page_request_list list(c_page_request_pool,
1033 				 alloc.m_page_requests[idx]);
1034     list.release(req);
1035   }
1036 }
1037 
1038 void
disk_page_move_dirty_page(Disk_alloc_info & alloc,Ptr<Extent_info> extentPtr,Ptr<Page> pagePtr,Uint32 old_idx,Uint32 new_idx,Fragrecord * fragPtrP)1039 Dbtup::disk_page_move_dirty_page(Disk_alloc_info& alloc,
1040                                  Ptr<Extent_info> extentPtr,
1041                                  Ptr<Page> pagePtr,
1042                                  Uint32 old_idx,
1043                                  Uint32 new_idx,
1044                                  Fragrecord *fragPtrP)
1045 {
1046   DEB_EXTENT_BITS(("(%u)dpmdp:extent(%u) page(%u,%u):%u, old_idx: %u,"
1047                    " new_idx: %u, free_page_count(%u,%u)",
1048                    instance(),
1049                    extentPtr.p->m_key.m_page_idx,
1050                    pagePtr.p->m_file_no,
1051                    pagePtr.p->m_page_no,
1052                    pagePtr.i,
1053                    old_idx,
1054                    new_idx,
1055                    extentPtr.p->m_free_page_count[old_idx],
1056                    extentPtr.p->m_free_page_count[new_idx]));
1057 
1058   ddrequire(extentPtr.p->m_free_page_count[old_idx] > 0);
1059   extentPtr.p->m_free_page_count[old_idx]--;
1060   extentPtr.p->m_free_page_count[new_idx]++;
1061 
1062   jam();
1063   Page_pool *pool= (Page_pool*)&m_global_page_pool;
1064   Local_Page_list new_list(*pool, alloc.m_dirty_pages[new_idx]);
1065   Local_Page_list old_list(*pool, alloc.m_dirty_pages[old_idx]);
1066   old_list.remove(pagePtr);
1067   new_list.addFirst(pagePtr);
1068 
1069   pagePtr.p->list_index = new_idx;
1070 }
1071 
1072 void
disk_page_move_page_request(Disk_alloc_info & alloc,Ptr<Extent_info> extentPtr,Ptr<Page_request> req,Uint32 old_idx,Uint32 new_idx)1073 Dbtup::disk_page_move_page_request(Disk_alloc_info& alloc,
1074                                    Ptr<Extent_info> extentPtr,
1075                                    Ptr<Page_request> req,
1076                                    Uint32 old_idx, Uint32 new_idx)
1077 {
1078   jam();
1079   Page_request_list::Head *lists = alloc.m_page_requests;
1080   Local_page_request_list old_list(c_page_request_pool, lists[old_idx]);
1081   Local_page_request_list new_list(c_page_request_pool, lists[new_idx]);
1082   old_list.remove(req);
1083   new_list.addLast(req);
1084 
1085   DEB_EXTENT_BITS(("(%u)dpmpqr:extent(%u) page(%u,%u), old_idx: %u new_idx: %u"
1086                    ", free_page_count(%u,%u)",
1087                    instance(),
1088                    extentPtr.p->m_key.m_page_idx,
1089                    req.p->m_key.m_file_no,
1090                    req.p->m_key.m_page_no,
1091                    old_idx,
1092                    new_idx,
1093                    extentPtr.p->m_free_page_count[old_idx],
1094                    extentPtr.p->m_free_page_count[new_idx]));
1095 
1096   ddrequire(extentPtr.p->m_free_page_count[old_idx] > 0);
1097   extentPtr.p->m_free_page_count[old_idx]--;
1098   extentPtr.p->m_free_page_count[new_idx]++;
1099   req.p->m_list_index= new_idx;
1100 
1101 }
1102 
1103 /**
1104  * We have read in a page which is at the moment empty. It is possible that
1105  * the information on this page is garbage since this could be our first
1106  * access to this page. It could even have belonged to another table that
1107  * was deleted before getting here. So we need to initialise the page header
1108  * at this point in time.
1109  */
1110 void
disk_page_prealloc_initial_callback(Signal * signal,Uint32 page_request,Uint32 page_id)1111 Dbtup::disk_page_prealloc_initial_callback(Signal*signal,
1112 					   Uint32 page_request,
1113 					   Uint32 page_id)
1114 {
1115   jamEntry();
1116   /**
1117    * 1) lookup page request
1118    * 2) lookup page
1119    * 3) lookup table
1120    * 4) init page (according to page type)
1121    * 5) call ordinary callback
1122    */
1123   Ptr<Page_request> req;
1124   c_page_request_pool.getPtr(req, page_request);
1125 
1126   Ptr<GlobalPage> gpage;
1127   m_global_page_pool.getPtr(gpage, page_id);
1128   PagePtr pagePtr;
1129   pagePtr.i = gpage.i;
1130   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1131 
1132   Ptr<Fragrecord> fragPtr;
1133   fragPtr.i= req.p->m_frag_ptr_i;
1134   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1135 
1136   Ptr<Tablerec> tabPtr;
1137   tabPtr.i = fragPtr.p->fragTableId;
1138   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1139 
1140   Ptr<Extent_info> extentPtr;
1141   c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
1142 
1143   ndbrequire(tabPtr.p->m_attributes[DD].m_no_of_varsize == 0);
1144 
1145   /**
1146    * We can come here even when the page have been already initialised.
1147    *
1148    * Unfortunately there is no sure way of discovering if we are reusing
1149    * an already used disk page. The extent information isn't synchronised
1150    * together with the disk page itself. So it is perfectly possible to
1151    * allocate an extent and write a page in it and then restart and as
1152    * part of recovery processing the extent isn't any more a part of this
1153    * fragment. A new extent can be used and this can be any extent. So this
1154    * means that we can even allocate the same extent once more by the same
1155    * fragment after the restart.
1156    *
1157    * So we simply go ahead and write this new page as an initial page.
1158    * There are plenty of other safeguards against wrong use of disk
1159    * pages and checkpointing algorithms.
1160    */
1161 
1162   /**
1163    * Ensure that all unset header variables are set to 0.
1164    */
1165   memset((char*)pagePtr.p, 0, Page::HEADER_WORDS * 4);
1166 
1167   convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
1168 
1169   pagePtr.p->m_page_no= req.p->m_key.m_page_no;
1170   pagePtr.p->m_file_no= req.p->m_key.m_file_no;
1171   pagePtr.p->m_table_id= fragPtr.p->fragTableId;
1172   pagePtr.p->m_ndb_version = htonl(NDB_DISK_V2);
1173   pagePtr.p->m_create_table_version =
1174     c_lqh->getCreateSchemaVersion(fragPtr.p->fragTableId);
1175   pagePtr.p->m_fragment_id = fragPtr.p->fragmentId;
1176   pagePtr.p->m_extent_no = extentPtr.p->m_key.m_page_idx; // logical extent no
1177   pagePtr.p->m_extent_info_ptr= req.p->m_extent_info_ptr;
1178   pagePtr.p->m_restart_seq = globalData.m_restart_seq;
1179   pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
1180   pagePtr.p->list_index = req.p->m_list_index;
1181   pagePtr.p->uncommitted_used_space = req.p->m_uncommitted_used_space;
1182 
1183   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
1184   Uint32 idx = req.p->m_list_index;
1185 
1186 #if defined(VM_TRACE) || defined(ERROR_INSERT)
1187   {
1188     Uint32 free = pagePtr.p->free_space - pagePtr.p->uncommitted_used_space;
1189     ddrequire(idx == alloc.calc_page_free_bits(free));
1190     ddrequire(pagePtr.p->free_space == req.p->m_original_estimated_free_space);
1191   }
1192 #endif
1193 
1194   {
1195     /**
1196      * add to dirty list
1197      */
1198     Page_pool *cheat_pool= (Page_pool*)&m_global_page_pool;
1199     Local_Page_list list(* cheat_pool, alloc.m_dirty_pages[idx]);
1200     list.addFirst(pagePtr);
1201   }
1202 
1203   {
1204     /**
1205      * release page request
1206      */
1207     Local_page_request_list list(c_page_request_pool,
1208 				 alloc.m_page_requests[idx]);
1209     list.release(req);
1210   }
1211 }
1212 
1213 void
disk_page_set_dirty(PagePtr pagePtr)1214 Dbtup::disk_page_set_dirty(PagePtr pagePtr)
1215 {
1216   jam();
1217   Uint32 idx = pagePtr.p->list_index;
1218   if ((pagePtr.p->m_restart_seq == globalData.m_restart_seq) &&
1219       ((idx & 0x8000) == 0))
1220   {
1221     jam();
1222     /**
1223      * Already in dirty list
1224      */
1225     return ;
1226   }
1227 
1228   Local_key key;
1229   key.m_page_no = pagePtr.p->m_page_no;
1230   key.m_file_no = pagePtr.p->m_file_no;
1231 
1232   pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
1233 
1234   if (DBG_DISK)
1235     ndbout << " disk_page_set_dirty " << key << endl;
1236 
1237   Ptr<Tablerec> tabPtr;
1238   tabPtr.i= pagePtr.p->m_table_id;
1239   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1240 
1241   Ptr<Fragrecord> fragPtr;
1242   getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
1243 
1244   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
1245 
1246   Uint32 free = pagePtr.p->free_space;
1247   Uint32 used = pagePtr.p->uncommitted_used_space;
1248   if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
1249   {
1250     jam();
1251     D(V(pagePtr.p->m_restart_seq) << V(globalData.m_restart_seq));
1252     restart_setup_page(fragPtr, alloc, pagePtr, -1);
1253     ndbrequire(free == pagePtr.p->free_space);
1254     free = pagePtr.p->free_space;
1255     idx = alloc.calc_page_free_bits(free);
1256     used = 0;
1257   }
1258   else
1259   {
1260     jam();
1261     idx &= ~0x8000;
1262     DEB_EXTENT_BITS(("((%u)Reset list_index bit 0x8000 on page(%u,%u):%u"
1263                      ", idx = %u",
1264                      instance(),
1265                      pagePtr.p->m_file_no,
1266                      pagePtr.p->m_page_no,
1267                      pagePtr.i,
1268                      idx));
1269     ddrequire(idx == alloc.calc_page_free_bits(free - used));
1270   }
1271 
1272   ddrequire(free >= used);
1273 
1274   D("Tablespace_client - disk_page_set_dirty");
1275   Tablespace_client tsman(0, this, c_tsman,
1276                         fragPtr.p->fragTableId,
1277                         fragPtr.p->fragmentId,
1278                         c_lqh->getCreateSchemaVersion(fragPtr.p->fragTableId),
1279                         fragPtr.p->m_tablespace_id);
1280 
1281   pagePtr.p->list_index = idx;
1282   Page_pool *pool= (Page_pool*)&m_global_page_pool;
1283   Local_Page_list list(*pool, alloc.m_dirty_pages[idx]);
1284   list.addFirst(pagePtr);
1285 
1286   // Make sure no one will allocate it...
1287   tsman.unmap_page(&key, EXTENT_SEARCH_MATRIX_COLS - 1);
1288   jamEntry();
1289 }
1290 
1291 void
disk_page_unmap_callback(Uint32 when,Uint32 page_id,Uint32 dirty_count)1292 Dbtup::disk_page_unmap_callback(Uint32 when,
1293 				Uint32 page_id, Uint32 dirty_count)
1294 {
1295   jamEntry();
1296   Ptr<GlobalPage> gpage;
1297   m_global_page_pool.getPtr(gpage, page_id);
1298   PagePtr pagePtr;
1299   pagePtr.i = gpage.i;
1300   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1301 
1302   Uint32 type = pagePtr.p->m_page_header.m_page_type;
1303   if (unlikely((type != File_formats::PT_Tup_fixsize_page &&
1304 		type != File_formats::PT_Tup_varsize_page) ||
1305 	       f_undo_done == false))
1306   {
1307     jam();
1308     D("disk_page_unmap_callback" << V(type) << V(f_undo_done));
1309     return ;
1310   }
1311 
1312   Uint32 idx = pagePtr.p->list_index;
1313 
1314   Ptr<Tablerec> tabPtr;
1315   tabPtr.i= pagePtr.p->m_table_id;
1316   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1317 
1318   Ptr<Fragrecord> fragPtr;
1319   getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
1320 
1321   DEB_LCP(("(%u)unmap page: tab(%u,%u), page(%u,%u):%u",
1322            instance(),
1323            pagePtr.p->m_table_id,
1324            pagePtr.p->m_fragment_id,
1325            pagePtr.p->m_file_no,
1326            pagePtr.p->m_page_no,
1327            pagePtr.i));
1328 
1329   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
1330 
1331   if (when == 0)
1332   {
1333     /**
1334      * Before pageout
1335      */
1336     jam();
1337 
1338     if (DBG_DISK)
1339     {
1340       Local_key key;
1341       key.m_page_no = pagePtr.p->m_page_no;
1342       key.m_file_no = pagePtr.p->m_file_no;
1343       ndbout << "disk_page_unmap_callback(before) " << key
1344 	     << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
1345     }
1346 
1347     ndbassert((idx & 0x8000) == 0);
1348 
1349     Page_pool *pool= (Page_pool*)&m_global_page_pool;
1350     Local_Page_list list(*pool, alloc.m_dirty_pages[idx]);
1351     Local_Page_list list2(*pool, alloc.m_unmap_pages);
1352     list.remove(pagePtr);
1353     list2.addFirst(pagePtr);
1354 
1355     if (dirty_count == 0)
1356     {
1357       jam();
1358       pagePtr.p->list_index = idx | 0x8000;
1359       DEB_EXTENT_BITS(("(%u)Set list_index bit 0x8000 on page(%u,%u)"
1360                        " when unmap",
1361                        instance(),
1362                        pagePtr.p->m_file_no,
1363                        pagePtr.p->m_page_no));
1364 
1365       Local_key key;
1366       key.m_page_no = pagePtr.p->m_page_no;
1367       key.m_file_no = pagePtr.p->m_file_no;
1368 
1369       Uint32 free = pagePtr.p->free_space;
1370       Uint32 used = pagePtr.p->uncommitted_used_space;
1371       ddrequire(free >= used);
1372       ddrequire(alloc.calc_page_free_bits(free - used) == idx);
1373 
1374       D("Tablespace_client - disk_page_unmap_callback");
1375       Tablespace_client tsman(0, this, c_tsman,
1376                     fragPtr.p->fragTableId,
1377                     fragPtr.p->fragmentId,
1378                     c_lqh->getCreateSchemaVersion(fragPtr.p->fragTableId),
1379                     fragPtr.p->m_tablespace_id);
1380 
1381       tsman.unmap_page(&key, idx);
1382       jamEntry();
1383     }
1384   }
1385   else if (when == 1)
1386   {
1387     /**
1388      * After page out
1389      */
1390     jam();
1391 
1392     Local_key key;
1393     key.m_page_no = pagePtr.p->m_page_no;
1394     key.m_file_no = pagePtr.p->m_file_no;
1395     Uint32 real_free = pagePtr.p->free_space;
1396 
1397     if (DBG_DISK)
1398     {
1399       ndbout << "disk_page_unmap_callback(after) " << key
1400 	     << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
1401     }
1402 
1403     Page_pool *pool= (Page_pool*)&m_global_page_pool;
1404     Local_Page_list list(*pool, alloc.m_unmap_pages);
1405     list.remove(pagePtr);
1406 
1407     D("Tablespace_client - disk_page_unmap_callback");
1408     Tablespace_client tsman(0, this, c_tsman,
1409                    fragPtr.p->fragTableId,
1410                    fragPtr.p->fragmentId,
1411                    c_lqh->getCreateSchemaVersion(fragPtr.p->fragTableId),
1412                    fragPtr.p->m_tablespace_id);
1413 
1414     if (DBG_DISK && alloc.calc_page_free_bits(real_free) != (idx & ~0x8000))
1415     {
1416       ndbout << key
1417 	     << " calc: " << alloc.calc_page_free_bits(real_free)
1418 	     << " idx: " << (idx & ~0x8000)
1419 	     << endl;
1420     }
1421     DEB_EXTENT_BITS(("(%u)tab(%u,%u), page(%u,%u):%u real_free: %u, new_bits: %u",
1422                     instance(),
1423                     fragPtr.p->fragTableId,
1424                     fragPtr.p->fragmentId,
1425                     pagePtr.p->m_file_no,
1426                     pagePtr.p->m_page_no,
1427                     pagePtr.i,
1428                     real_free,
1429                     alloc.calc_page_free_bits(real_free)));
1430 
1431     tsman.update_page_free_bits(&key, alloc.calc_page_free_bits(real_free));
1432     jamEntry();
1433   }
1434 }
1435 
1436 void
disk_page_alloc(Signal * signal,Tablerec * tabPtrP,Fragrecord * fragPtrP,Local_key * key,PagePtr pagePtr,Uint32 gci,const Local_key * row_id,Uint32 alloc_size)1437 Dbtup::disk_page_alloc(Signal* signal,
1438 		       Tablerec* tabPtrP,
1439                        Fragrecord* fragPtrP,
1440 		       Local_key* key,
1441                        PagePtr pagePtr,
1442                        Uint32 gci,
1443                        const Local_key *row_id,
1444                        Uint32 alloc_size)
1445 {
1446   jam();
1447   Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
1448   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
1449 
1450   Uint64 lsn;
1451   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
1452   {
1453     jam();
1454     DEB_PGMAN((
1455       "(%u)disk_page_alloc: tab(%u,%u):%u,page(%u,%u).%u.%u,gci: %u,"
1456       "row_id(%u,%u)",
1457                 instance(),
1458                 pagePtr.p->m_table_id,
1459                 pagePtr.p->m_fragment_id,
1460                 pagePtr.p->m_create_table_version,
1461                 key->m_file_no,
1462                 key->m_page_no,
1463                 key->m_page_idx,
1464                 pagePtr.i,
1465                 gci,
1466                 row_id->m_page_no,
1467                 row_id->m_page_idx));
1468     ddrequire(pagePtr.p->uncommitted_used_space > 0);
1469     pagePtr.p->uncommitted_used_space--;
1470     key->m_page_idx= ((Fix_page*)pagePtr.p)->alloc_record();
1471     jamLine(Uint16(key->m_page_idx));
1472     lsn= disk_page_undo_alloc(signal,
1473                               pagePtr.p,
1474                               key,
1475                               1,
1476                               gci,
1477                               logfile_group_id,
1478                               alloc_size);
1479     DEB_PGMAN(("(%u)page(%u,%u).%u, lsn=%llu",
1480                instance(),
1481                key->m_file_no,
1482                key->m_page_no,
1483                key->m_page_idx,
1484                lsn));
1485   }
1486   else
1487   {
1488     jam();
1489     Uint32 sz= key->m_page_idx;
1490     ddrequire(pagePtr.p->uncommitted_used_space >= sz);
1491     pagePtr.p->uncommitted_used_space -= sz;
1492     key->m_page_idx= ((Var_page*)pagePtr.p)->
1493       alloc_record(sz, (Var_page*)ctemp_page, 0);
1494 
1495     lsn= disk_page_undo_alloc(signal,
1496                               pagePtr.p,
1497                               key,
1498                               sz,
1499                               gci,
1500                               logfile_group_id,
1501                               alloc_size);
1502   }
1503 }
1504 
1505 void
disk_page_free(Signal * signal,Tablerec * tabPtrP,Fragrecord * fragPtrP,Local_key * key,PagePtr pagePtr,Uint32 gci,const Local_key * row_id,Uint32 alloc_size)1506 Dbtup::disk_page_free(Signal *signal,
1507 		      Tablerec *tabPtrP,
1508                       Fragrecord * fragPtrP,
1509 		      Local_key* key,
1510                       PagePtr pagePtr,
1511                       Uint32 gci,
1512                       const Local_key *row_id,
1513                       Uint32 alloc_size)
1514 {
1515   jam();
1516   if (DBG_DISK)
1517     ndbout << " disk_page_free " << *key << endl;
1518 
1519   Uint32 page_idx= key->m_page_idx;
1520   jamLine(Uint16(key->m_page_idx));
1521   Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
1522   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
1523   Uint32 old_free= pagePtr.p->free_space;
1524 
1525   Uint32 sz;
1526   Uint64 lsn;
1527   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
1528   {
1529     sz = 1;
1530     const Uint32 *src= ((Fix_page*)pagePtr.p)->get_ptr(page_idx, 0);
1531     if (((*(src + 1)) & Tup_fixsize_page::FREE_RECORD) ==
1532                Tup_fixsize_page::FREE_RECORD)
1533     {
1534       g_eventLogger->info(
1535         "(%u)disk_page_free crash:tab(%u,%u):%u,page(%u,%u).%u.%u"
1536         ",gci:%u,row(%u,%u)",
1537                  instance(),
1538                  fragPtrP->fragTableId,
1539                  fragPtrP->fragmentId,
1540                  pagePtr.p->m_create_table_version,
1541                  pagePtr.p->m_file_no,
1542                  pagePtr.p->m_page_no,
1543                  page_idx,
1544                  pagePtr.i,
1545                  gci,
1546                  row_id->m_page_no,
1547                  row_id->m_page_idx);
1548       ndbrequire(((*(src + 1)) & Tup_fixsize_page::FREE_RECORD) !=
1549                  Tup_fixsize_page::FREE_RECORD);
1550     }
1551     lsn= disk_page_undo_free(signal,
1552                              pagePtr.p,
1553                              key,
1554 			     src,
1555                              tabPtrP->m_offsets[DD].m_fix_header_size,
1556 			     gci,
1557                              logfile_group_id,
1558                              alloc_size);
1559 
1560     DEB_PGMAN((
1561       "(%u)disk_page_free:tab(%u,%u):%u,page(%u,%u).%u.%u,gci:%u,row(%u,%u)"
1562       ", lsn=%llu",
1563                instance(),
1564                fragPtrP->fragTableId,
1565                fragPtrP->fragmentId,
1566                pagePtr.p->m_create_table_version,
1567                pagePtr.p->m_file_no,
1568                pagePtr.p->m_page_no,
1569                page_idx,
1570                pagePtr.i,
1571                gci,
1572                row_id->m_page_no,
1573                row_id->m_page_idx,
1574                lsn));
1575 
1576     ((Fix_page*)pagePtr.p)->free_record(page_idx);
1577   }
1578   else
1579   {
1580     jam();
1581     const Uint32 *src= ((Var_page*)pagePtr.p)->get_ptr(page_idx);
1582     sz= ((Var_page*)pagePtr.p)->get_entry_len(page_idx);
1583     lsn= disk_page_undo_free(signal,
1584                              pagePtr.p,
1585                              key,
1586 			     src,
1587                              sz,
1588 			     gci,
1589                              logfile_group_id,
1590                              alloc_size);
1591 
1592     ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
1593   }
1594 
1595   Uint32 new_free = pagePtr.p->free_space;
1596 
1597   Uint32 ext = pagePtr.p->m_extent_info_ptr;
1598   Uint32 used = pagePtr.p->uncommitted_used_space;
1599   Uint32 old_idx = pagePtr.p->list_index;
1600   ddrequire(old_free >= used);
1601   ddrequire(new_free >= used);
1602   ddrequire(new_free >= old_free);
1603   ddrequire((old_idx & 0x8000) == 0);
1604 
1605   Uint32 new_idx = alloc.calc_page_free_bits(new_free - used);
1606   ddrequire(alloc.calc_page_free_bits(old_free - used) == old_idx);
1607 
1608   Ptr<Extent_info> extentPtr;
1609   c_extent_pool.getPtr(extentPtr, ext);
1610 
1611   if (old_idx != new_idx)
1612   {
1613     jam();
1614     disk_page_move_dirty_page(alloc,
1615                               extentPtr,
1616                               pagePtr,
1617                               old_idx,
1618                               new_idx,
1619                               fragPtrP);
1620   }
1621 
1622   update_extent_pos(jamBuffer(), alloc, extentPtr, sz);
1623 }
1624 
1625 void
disk_page_abort_prealloc(Signal * signal,Fragrecord * fragPtrP,Local_key * key,Uint32 sz)1626 Dbtup::disk_page_abort_prealloc(Signal *signal, Fragrecord* fragPtrP,
1627 				Local_key* key, Uint32 sz)
1628 {
1629   jam();
1630 
1631   Page_cache_client::Request req;
1632   req.m_callback.m_callbackData= sz;
1633   req.m_callback.m_callbackFunction =
1634     safe_cast(&Dbtup::disk_page_abort_prealloc_callback);
1635 
1636   int flags= Page_cache_client::ABORT_REQ;
1637   memcpy(&req.m_page, key, sizeof(Local_key));
1638   req.m_table_id = fragPtrP->fragTableId;
1639   req.m_fragment_id = fragPtrP->fragmentId;
1640 
1641   Page_cache_client pgman(this, c_pgman);
1642   int res= pgman.get_page(signal, req, flags);
1643   jamEntry();
1644   switch(res)
1645   {
1646   case 0:
1647     jam();
1648     break;
1649   case -1:
1650     ndbabort();
1651   default:
1652     jam();
1653     ndbrequire(res > 0);
1654     Ptr<GlobalPage> gpage;
1655     m_global_page_pool.getPtr(gpage, (Uint32)res);
1656     PagePtr pagePtr;
1657     pagePtr.i = gpage.i;
1658     pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1659 
1660     disk_page_abort_prealloc_callback_1(signal, fragPtrP, pagePtr, sz);
1661   }
1662 }
1663 
1664 void
disk_page_abort_prealloc_callback(Signal * signal,Uint32 sz,Uint32 page_id)1665 Dbtup::disk_page_abort_prealloc_callback(Signal* signal,
1666 					 Uint32 sz, Uint32 page_id)
1667 {
1668   jamEntry();
1669   Ptr<GlobalPage> gpage;
1670   m_global_page_pool.getPtr(gpage, page_id);
1671 
1672   PagePtr pagePtr;
1673   pagePtr.i = gpage.i;
1674   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1675 
1676   Ptr<Tablerec> tabPtr;
1677   tabPtr.i= pagePtr.p->m_table_id;
1678   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1679 
1680   Ptr<Fragrecord> fragPtr;
1681   getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
1682 
1683   disk_page_abort_prealloc_callback_1(signal, fragPtr.p, pagePtr, sz);
1684 }
1685 
1686 void
disk_page_abort_prealloc_callback_1(Signal * signal,Fragrecord * fragPtrP,PagePtr pagePtr,Uint32 sz)1687 Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
1688 					   Fragrecord* fragPtrP,
1689 					   PagePtr pagePtr,
1690 					   Uint32 sz)
1691 {
1692   jam();
1693   disk_page_set_dirty(pagePtr);
1694 
1695   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
1696 
1697   Ptr<Extent_info> extentPtr;
1698   c_extent_pool.getPtr(extentPtr, pagePtr.p->m_extent_info_ptr);
1699 
1700   Uint32 idx = pagePtr.p->list_index & 0x7FFF;
1701   Uint32 used = pagePtr.p->uncommitted_used_space;
1702   Uint32 free = pagePtr.p->free_space;
1703 
1704   ddrequire(free >= used);
1705   ddrequire(used >= sz);
1706   ddrequire(alloc.calc_page_free_bits(free - used) == idx);
1707 
1708   pagePtr.p->uncommitted_used_space = used - sz;
1709 
1710   Uint32 new_idx = alloc.calc_page_free_bits(free - used + sz);
1711 
1712   if (idx != new_idx)
1713   {
1714     jam();
1715     disk_page_move_dirty_page(alloc,
1716                               extentPtr,
1717                               pagePtr,
1718                               idx,
1719                               new_idx,
1720                               fragPtrP);
1721   }
1722 
1723   update_extent_pos(jamBuffer(), alloc, extentPtr, sz);
1724 }
1725 
1726 Uint64
disk_page_undo_alloc(Signal * signal,Page * page,const Local_key * key,Uint32 sz,Uint32 gci,Uint32 logfile_group_id,Uint32 alloc_size)1727 Dbtup::disk_page_undo_alloc(Signal *signal,
1728                             Page* page,
1729                             const Local_key* key,
1730 			    Uint32 sz,
1731                             Uint32 gci,
1732                             Uint32 logfile_group_id,
1733                             Uint32 alloc_size)
1734 {
1735   jam();
1736   Disk_undo::Alloc alloc;
1737   alloc.m_type_length= (Disk_undo::UNDO_ALLOC << 16) | (sizeof(alloc) >> 2);
1738   alloc.m_page_no = key->m_page_no;
1739   alloc.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
1740 
1741   Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
1742 
1743   Uint64 lsn;
1744   {
1745     D("Logfile_client - disk_page_undo_alloc");
1746     Logfile_client lgman(this, c_lgman, logfile_group_id);
1747     lsn= lgman.add_entry_simple(c, 1, alloc_size);
1748   }
1749   jamEntry();
1750   {
1751     Page_cache_client pgman(this, c_pgman);
1752     pgman.update_lsn(signal, * key, lsn);
1753   }
1754   jamEntry();
1755 
1756   return lsn;
1757 }
1758 
1759 Uint64
disk_page_undo_update(Signal * signal,Page * page,const Local_key * key,const Uint32 * src,Uint32 sz,Uint32 gci,Uint32 logfile_group_id,Uint32 alloc_size)1760 Dbtup::disk_page_undo_update(Signal *signal,
1761                              Page* page,
1762                              const Local_key* key,
1763 			     const Uint32* src,
1764                              Uint32 sz,
1765 			     Uint32 gci,
1766                              Uint32 logfile_group_id,
1767                              Uint32 alloc_size)
1768 {
1769   jam();
1770 
1771   Disk_undo::Update update;
1772   update.m_page_no = key->m_page_no;
1773   update.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
1774   update.m_gci= gci;
1775 
1776   update.m_type_length=
1777     (Disk_undo::UNDO_UPDATE << 16) | (sz + (sizeof(update) >> 2) - 1);
1778 
1779   Logfile_client::Change c[3] = {
1780     { &update, 3 },
1781     { src, sz },
1782     { &update.m_type_length, 1 }
1783   };
1784 
1785   ndbassert(4*(3 + sz + 1) == (sizeof(update) + 4*sz - 4));
1786 
1787   Uint64 lsn;
1788   {
1789     D("Logfile_client - disk_page_undo_update");
1790     Logfile_client lgman(this, c_lgman, logfile_group_id);
1791     lsn= lgman.add_entry_complex(c, 3, true, alloc_size);
1792   }
1793   jamEntry();
1794   {
1795     Page_cache_client pgman(this, c_pgman);
1796     pgman.update_lsn(signal, * key, lsn);
1797   }
1798   jamEntry();
1799 
1800   return lsn;
1801 }
1802 
1803 Uint64
disk_page_undo_free(Signal * signal,Page * page,const Local_key * key,const Uint32 * src,Uint32 sz,Uint32 gci,Uint32 logfile_group_id,Uint32 alloc_size)1804 Dbtup::disk_page_undo_free(Signal *signal,
1805                            Page* page,
1806                            const Local_key* key,
1807 			   const Uint32* src,
1808                            Uint32 sz,
1809 			   Uint32 gci,
1810                            Uint32 logfile_group_id,
1811                            Uint32 alloc_size)
1812 {
1813   jam();
1814 
1815   Disk_undo::Free free;
1816   free.m_page_no = key->m_page_no;
1817   free.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
1818   free.m_gci= gci;
1819 
1820   free.m_type_length=
1821     (Disk_undo::UNDO_FREE << 16) | (sz + (sizeof(free) >> 2) - 1);
1822 
1823   Logfile_client::Change c[3] = {
1824     { &free, 3 },
1825     { src, sz },
1826     { &free.m_type_length, 1 }
1827   };
1828 
1829   ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4));
1830 
1831   Uint64 lsn;
1832   {
1833     D("Logfile_client - disk_page_undo_free");
1834     Logfile_client lgman(this, c_lgman, logfile_group_id);
1835     lsn= lgman.add_entry_complex(c, 3, false, alloc_size);
1836   }
1837   jamEntry();
1838   {
1839     Page_cache_client pgman(this, c_pgman);
1840     pgman.update_lsn(signal, * key, lsn);
1841   }
1842   jamEntry();
1843   return lsn;
1844 }
1845 
1846 #define DBG_UNDO 0
1847 
1848 void
verify_undo_log_execution()1849 Dbtup::verify_undo_log_execution()
1850 {
1851   ndbrequire(!f_undo.m_in_intermediate_log_record);
1852 }
1853 
1854 /**
1855  * Preface:
1856  * With parallel undo log application, many undo records can be sent to the
1857  * LDM threads without waiting for the LDM threads to finish applying them.
1858  *
1859  * Before applying a log record, we must fetch the page (get_page) and
1860  * sometimes, if the page is not available immediately, we have to wait for it
1861  * before the log record can be applied. Waiting is done by periodically
1862  * checking if the page is available (do_busy_loop()).
1863  * However, between the checks, a subsequent log record belonging to the same
1864  * page might get processed. This is because multiple log records are sent from
1865  * LGMAN to the LDM threads continuously without waiting for the LDM threads to
1866  * finish applying them. (WL #8478)
1867  * This subsequent log record will try to get the page as well and might
1868  * succeed. This will result in unordered application of the undo records.
1869  *
1870  * The solution for this is to order the undo records belonging to a page.
1871  *
1872  * Algorithm for ordering record types which require disk page requests:
1873  * (UNDO_TUP_ALLOC, UNDO_TUP_UPDATE, UNDO_TUP_UPDATE_PART, UNDO_TUP_UPDATE_PART
1874  * , UNDO_TUP_FREE, UNDO_TUP_FREE_PART)
1875  *
1876  * c_undo_page_hash holds all the pages (of type Pending_undo_page) which
1877  * have requests pending. Each Pending_undo_page has a list of pending undo
1878  * records (of type Apply_undo) for that page.
1879  *
1880  * First, the page to which the current record being processed belongs is
1881  * searched in the hash table(c_undo_page_hash).
1882  * If it exists, the current undo record is added to the list of pending undo
1883  * records of the page.
1884  *
1885  * If the page isn't present in the hash table, it means there are no pending
1886  * requests for that page and the page is requested from PGMAN.
1887  * If the page is not available at the moment, it is added to the hash table
1888  * and the current undo record being processed is added to the pending list of
1889  * the page.
1890  * When the page is available immediately, the callback which applies the
1891  * undo records (disk_restart_undo_callback()) is executed.
1892  */
1893 void
disk_restart_undo(Signal * signal,Uint64 lsn,Uint32 type,const Uint32 * ptr,Uint32 len)1894 Dbtup::disk_restart_undo(Signal* signal,
1895                          Uint64 lsn,
1896 			 Uint32 type,
1897                          const Uint32 * ptr,
1898                          Uint32 len)
1899 {
1900   f_undo_done = false;
1901   f_undo.m_lsn= lsn;
1902   f_undo.m_ptr= ptr;
1903   f_undo.m_len= len;
1904   f_undo.m_type = type;
1905 
1906   Page_cache_client::Request preq;
1907   switch(f_undo.m_type){
1908   case File_formats::Undofile::UNDO_LOCAL_LCP_FIRST:
1909   case File_formats::Undofile::UNDO_LOCAL_LCP:
1910   case File_formats::Undofile::UNDO_LCP_FIRST:
1911   case File_formats::Undofile::UNDO_LCP:
1912   {
1913     /**
1914      * Searching for end of UNDO log execution is only done in
1915      * lgman.cpp. So here we assume that we are supposed to continue
1916      * executing the UNDO log. So no checks for end in this logic.
1917      */
1918     jam();
1919     Uint32 lcpId;
1920     Uint32 localLcpId;
1921     Uint32 tableId;
1922     Uint32 fragId;
1923     if (f_undo.m_type == File_formats::Undofile::UNDO_LOCAL_LCP ||
1924         f_undo.m_type == File_formats::Undofile::UNDO_LOCAL_LCP_FIRST)
1925     {
1926       jam();
1927       ndbrequire(len == 4);
1928       lcpId = ptr[0];
1929       localLcpId = ptr[1];
1930       tableId = ptr[2] >> 16;
1931       fragId = ptr[2] & 0xFFFF;
1932     }
1933     else
1934     {
1935       jam();
1936       ndbrequire(len == 3);
1937       lcpId = ptr[0];
1938       localLcpId = 0;
1939       tableId = ptr[1] >> 16;
1940       fragId = ptr[1] & 0xFFFF;
1941     }
1942     if (tableId != 0)
1943     {
1944       jam();
1945       disk_restart_undo_lcp(tableId,
1946                             fragId,
1947                             Fragrecord::UC_LCP,
1948                             lcpId,
1949                             localLcpId);
1950     }
1951     if (!isNdbMtLqh())
1952       disk_restart_undo_next(signal);
1953 
1954     DEB_UNDO(("(%u)UNDO LCP [%u,%u] tab(%u,%u)",
1955               instance(),
1956               lcpId,
1957               localLcpId,
1958               tableId,
1959               fragId));
1960     return;
1961   }
1962   case File_formats::Undofile::UNDO_TUP_ALLOC:
1963   {
1964     jam();
1965     Disk_undo::Alloc* rec= (Disk_undo::Alloc*)ptr;
1966     preq.m_page.m_page_no = rec->m_page_no;
1967     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
1968     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
1969     f_undo.m_offset = 0;
1970     break;
1971   }
1972   case File_formats::Undofile::UNDO_TUP_UPDATE:
1973   {
1974     jam();
1975     Disk_undo::Update* rec= (Disk_undo::Update*)ptr;
1976     preq.m_page.m_page_no = rec->m_page_no;
1977     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
1978     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
1979     f_undo.m_offset = 0;
1980     break;
1981   }
1982   case File_formats::Undofile::UNDO_TUP_UPDATE_PART:
1983   {
1984     jam();
1985     Disk_undo::UpdatePart* rec= (Disk_undo::UpdatePart*)ptr;
1986     preq.m_page.m_page_no = rec->m_page_no;
1987     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
1988     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
1989     f_undo.m_offset = rec->m_offset;
1990     break;
1991   }
1992   case File_formats::Undofile::UNDO_TUP_FIRST_UPDATE_PART:
1993   {
1994     jam();
1995     Disk_undo::Update* rec= (Disk_undo::Update*)ptr;
1996     preq.m_page.m_page_no = rec->m_page_no;
1997     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
1998     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
1999     f_undo.m_offset = 0;
2000     break;
2001   }
2002   case File_formats::Undofile::UNDO_TUP_FREE:
2003   {
2004     jam();
2005     Disk_undo::Free* rec= (Disk_undo::Free*)ptr;
2006     preq.m_page.m_page_no = rec->m_page_no;
2007     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
2008     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
2009     f_undo.m_offset = 0;
2010     break;
2011   }
2012   case File_formats::Undofile::UNDO_TUP_FREE_PART:
2013   {
2014     jam();
2015     Disk_undo::Free* rec= (Disk_undo::Free*)ptr;
2016     preq.m_page.m_page_no = rec->m_page_no;
2017     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
2018     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
2019     f_undo.m_offset = 0;
2020     break;
2021   }
2022   case File_formats::Undofile::UNDO_TUP_DROP:
2023   {
2024     jam();
2025     Disk_undo::Drop* rec = (Disk_undo::Drop*)ptr;
2026     Ptr<Tablerec> tabPtr;
2027     /**
2028      * We could come here in a number of situations:
2029      * 1) It could be a record that belongs to a table that we are not
2030      *    restoring, in this case we won't find the table in the search
2031      *    below.
2032      * 2) It could belong to a table we are restoring, but this is a
2033      *    drop of a previous incarnation of this table. Definitely no
2034      *    more log records should be executed for this table.
2035      *
2036      * Coming here after we reached the end of the fragment LCP should not
2037      * happen, so we insert an ndbrequire to ensure this doesn't happen.
2038      */
2039     tabPtr.i= rec->m_table;
2040     if (tabPtr.i < cnoOfTablerec)
2041     {
2042       jam();
2043       ptrAss(tabPtr, tablerec);
2044       DEB_UNDO(("(%u)UNDO_TUP_DROP: lsn: %llu, tab: %u",
2045                instance(),
2046                lsn,
2047                tabPtr.i));
2048       for(Uint32 i = 0; i<NDB_ARRAY_SIZE(tabPtr.p->fragrec); i++)
2049       {
2050         jam();
2051         if (tabPtr.p->fragrec[i] != RNIL)
2052         {
2053           jam();
2054           jamLine(Uint16(tabPtr.p->fragid[i]));
2055           disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i],
2056                                 Fragrecord::UC_DROP, 0, 0);
2057         }
2058       }
2059     }
2060     if (!isNdbMtLqh())
2061       disk_restart_undo_next(signal);
2062     return;
2063   }
2064   case File_formats::Undofile::UNDO_END:
2065     jam();
2066     f_undo_done = true;
2067     ndbrequire(c_pending_undo_page_hash.getCount() == 0);
2068     return;
2069   default:
2070     ndbabort();
2071   }
2072 
2073   f_undo.m_key = preq.m_page;
2074   preq.m_table_id = (~0); /* Special code for table id for UNDO_REQ */
2075   preq.m_fragment_id = 0;
2076   preq.m_callback.m_callbackFunction =
2077     safe_cast(&Dbtup::disk_restart_undo_callback);
2078 
2079   Ptr<Pending_undo_page> cur_undo_record_page;
2080   cur_undo_record_page.i = RNIL;
2081 
2082   if (isNdbMtLqh())
2083   {
2084     jam();
2085     Pending_undo_page key(preq.m_page.m_file_no, preq.m_page.m_page_no);
2086 
2087     if (c_pending_undo_page_hash.find(cur_undo_record_page, key))
2088     {
2089       jam();
2090       /**
2091        *  Page of the current undo record being processed already has a pending
2092        *  request.
2093        */
2094       Ptr<Apply_undo> cur_undo_record;
2095       ndbrequire(c_apply_undo_pool.seize(cur_undo_record));
2096 
2097       f_undo.m_magic = cur_undo_record.p->m_magic;
2098       *(cur_undo_record.p) = f_undo;
2099 
2100       LocalApply_undo_list undoList(c_apply_undo_pool,
2101                                     cur_undo_record_page.p->m_apply_undo_head);
2102       // add to Apply_undo list of the page it belongs to
2103       undoList.addLast(cur_undo_record);
2104       DEB_UNDO(("LDM(%u) WAIT Page:%u File:%u count:%u lsn:%llu",
2105           instance(), preq.m_page.m_page_no, preq.m_page.m_file_no,
2106           undoList.getCount(), f_undo.m_lsn));
2107       ndbrequire(undoList.getCount() <= MAX_PENDING_UNDO_RECORDS);
2108       return;
2109     }
2110 
2111     // page doesn't have any pending request
2112     // allocate for cur_undo_record_page from pool
2113     ndbrequire(c_pending_undo_page_pool.seize(cur_undo_record_page));
2114     preq.m_callback.m_callbackData = cur_undo_record_page.i;
2115   }
2116 
2117   int flags = Page_cache_client::UNDO_REQ;
2118   Page_cache_client pgman(this, c_pgman);
2119   int res= pgman.get_page(signal, preq, flags);
2120 
2121   jamEntry();
2122 
2123   switch(res)
2124   {
2125   case 0:
2126     jam();
2127     m_immediate_flag = false;
2128 
2129     if (isNdbMtLqh())
2130     {
2131       //initialize page, add to hash table
2132       new(cur_undo_record_page.p)
2133           Pending_undo_page(preq.m_page.m_file_no, preq.m_page.m_page_no);
2134       c_pending_undo_page_hash.add(cur_undo_record_page);
2135 
2136       //add undo record to list
2137       Ptr<Apply_undo> cur_undo_record;
2138       ndbrequire(c_apply_undo_pool.seize(cur_undo_record));
2139 
2140       f_undo.m_magic = cur_undo_record.p->m_magic;
2141       *(cur_undo_record.p) = f_undo;
2142 
2143       LocalApply_undo_list undoList(c_apply_undo_pool,
2144                                     cur_undo_record_page.p->m_apply_undo_head);
2145       undoList.addLast(cur_undo_record);
2146       DEB_UNDO(("LDM(%u) FIRST WAIT Page:%u File:%u count:%u lsn:%llu",
2147           instance(), preq.m_page.m_page_no, preq.m_page.m_file_no,
2148           undoList.getCount(), f_undo.m_lsn));
2149 
2150     }
2151     break; // Wait for callback
2152   case -1:
2153     ndbabort();
2154   default:
2155     ndbrequire(res > 0);
2156     DEB_UNDO(("LDM(%u) DIRECT_EXECUTE Page:%u lsn:%llu",
2157                         instance(),
2158                         preq.m_page.m_page_no,
2159                         f_undo.m_lsn));
2160     if (isNdbMtLqh())
2161     {
2162       jam();
2163       c_pending_undo_page_pool.release(cur_undo_record_page);
2164       // no page stored in hash, so i = RNIL
2165       preq.m_callback.m_callbackData = RNIL;
2166     }
2167     jam();
2168     /**
2169      * The m_immediate_flag variable stays false except for the time
2170      * from this call to execute until we reach the callback
2171      * where it is immediately read and immediately set back to
2172      * false again. Essentially this is a parameter to the
2173      * callback which is hard to get into the callback handling.
2174      */
2175     m_immediate_flag = true;
2176     execute(signal, preq.m_callback, res); // run callback
2177   }
2178 }
2179 
2180 void
disk_restart_undo_next(Signal * signal,Uint32 applied,Uint32 count_pending)2181 Dbtup::disk_restart_undo_next(Signal* signal, Uint32 applied, Uint32 count_pending)
2182 {
2183   signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
2184   /* Flag indicating whether UNDO log was applied. */
2185   signal->theData[1] = applied;
2186   signal->theData[2] = count_pending;
2187   sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 3, JBB);
2188 }
2189 
2190 /**
2191  * This method is called before the UNDO log execution. It is called with
2192  * lcpId == RNIL when no LCP exists. It is called with the lcpId to restore
2193  * the fragment with when called with a value other than RNIL.
2194  */
2195 void
disk_restart_lcp_id(Uint32 tableId,Uint32 fragId,Uint32 lcpId,Uint32 localLcpId)2196 Dbtup::disk_restart_lcp_id(Uint32 tableId,
2197                            Uint32 fragId,
2198                            Uint32 lcpId,
2199                            Uint32 localLcpId)
2200 {
2201   /**
2202    * disk_restart_lcp_id is called from DBLQH when the restore of a
2203    * fragment is completed. At this time we know exactly which
2204    * lcpId that this fragment should use in its restore.
2205    * If no LCP is used to restore then lcpId is RNIL.
2206    */
2207   if (lcpId == RNIL)
2208   {
2209     jam();
2210     disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_NO_LCP, 0, 0);
2211     DEB_UNDO(("(%u)mark_no_lcp tab(%u,%u), UC_NO_LCP",
2212               instance(),
2213               tableId,
2214               fragId));
2215   }
2216   else
2217   {
2218     jam();
2219     disk_restart_undo_lcp(tableId,
2220                           fragId,
2221                           Fragrecord::UC_SET_LCP,
2222                           lcpId,
2223                           localLcpId);
2224     DEB_UNDO(("(%u)mark_no_lcp tab(%u,%u), UC_SET_LCP",
2225               instance(),
2226               tableId,
2227               fragId));
2228   }
2229 }
2230 
2231 void
disk_restart_undo_lcp(Uint32 tableId,Uint32 fragId,Uint32 flag,Uint32 lcpId,Uint32 localLcpId)2232 Dbtup::disk_restart_undo_lcp(Uint32 tableId,
2233                              Uint32 fragId,
2234                              Uint32 flag,
2235 			     Uint32 lcpId,
2236                              Uint32 localLcpId)
2237 {
2238   Ptr<Tablerec> tabPtr;
2239   tabPtr.i= tableId;
2240   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
2241 
2242   if (tabPtr.p->tableStatus == DEFINED && tabPtr.p->m_no_of_disk_attributes)
2243   {
2244     jam();
2245     FragrecordPtr fragPtr;
2246     getFragmentrec(fragPtr, fragId, tabPtr.p);
2247     if (!fragPtr.isNull())
2248     {
2249       jam();
2250       DEB_UNDO(("(%u)tab(%u,%u), lcp(%u,%u), flag: %u",
2251                 instance(),
2252                 tableId,
2253                 fragId,
2254                 lcpId,
2255                 localLcpId,
2256                 flag));
2257       switch(flag){
2258       case Fragrecord::UC_DROP:
2259       {
2260         jam();
2261         /**
2262          * In this case we have decided to start with a table.
2263          * If the table was dropped it must have been another table
2264          * that was dropped. Given that UNDO_TUP_CREATE isn't
2265          * logged we can find this at times. We should not look
2266          * any more at log records from this table going backwards
2267          * since they are belonging to an old table.
2268          */
2269 	fragPtr.p->m_undo_complete = Fragrecord::UC_CREATE;
2270         return;
2271       }
2272       case Fragrecord::UC_CREATE:
2273       {
2274         /**
2275          * We have reached a point in the undo log record where the table
2276          * was created. This is not always inserted, but we don't perform
2277          * any UNDO operations after this operation have been seen.
2278          */
2279         jam();
2280 	fragPtr.p->m_undo_complete = Fragrecord::UC_CREATE;
2281 	return;
2282       }
2283       case Fragrecord::UC_NO_LCP:
2284       {
2285         jam();
2286         /**
2287          * We are restoring a table that had no LCPs connected to it.
2288          * We need to run the UNDO log for this table all the way back
2289          * to the table creation. We don't track table creations in the
2290          * UNDO log, so we have to execute the UNDO log back to the
2291          * LCP before it was created.
2292          */
2293 	fragPtr.p->m_undo_complete = Fragrecord::UC_NO_LCP;
2294         return;
2295       }
2296       case Fragrecord::UC_LCP:
2297 	jam();
2298         if (fragPtr.p->m_undo_complete == 0 &&
2299             fragPtr.p->m_restore_lcp_id == lcpId &&
2300             fragPtr.p->m_restore_local_lcp_id == localLcpId)
2301 	{
2302 	  jam();
2303           /**
2304            * We have reached the LCP UNDO log record, this indicates that the
2305            * fragment is now rolled back to where it should be.
2306            * We might still need to execute UNDO log record to synchronize the
2307            * page information with the extent bits.
2308            */
2309 	  fragPtr.p->m_undo_complete = flag;
2310           DEB_UNDO(("(%u)tab(%u,%u) lcp(%u,%u) -> done",
2311                      instance(),
2312                      tableId,
2313                      fragId,
2314                      lcpId,
2315                      localLcpId));
2316 	}
2317 	return;
2318       case Fragrecord::UC_SET_LCP:
2319       {
2320 	jam();
2321         /**
2322          * Used before UNDO log execution starts to set
2323          * m_restore_lcp_id for the fragment.
2324          */
2325         DEB_UNDO(("(%u)table(%u,%u) restore to lcp(%u,%u)",
2326                   instance(),
2327                   tableId,
2328                   fragId,
2329                   lcpId,
2330                   localLcpId));
2331 	ndbrequire(fragPtr.p->m_undo_complete == 0);
2332 	ndbrequire(fragPtr.p->m_restore_lcp_id == RNIL);
2333 	fragPtr.p->m_restore_lcp_id = lcpId;
2334         fragPtr.p->m_restore_local_lcp_id = localLcpId;
2335 	return;
2336       }
2337       }
2338       jamLine(flag);
2339       ndbabort();
2340     }
2341   }
2342 }
2343 
2344 void
release_undo_record(Ptr<Apply_undo> & undo_record,bool pending)2345 Dbtup::release_undo_record(Ptr<Apply_undo>& undo_record, bool pending)
2346 {
2347   if (pending)
2348   {
2349     jam();
2350     c_apply_undo_pool.release(undo_record);
2351   }
2352 }
2353 
2354 /**
2355  * Algorithm for applying undo records:
2356  *
2357  * The page_i passed is searched in the hashmap. If it is present,
2358  * it means there are pending undo records for the page, and they are processed
2359  * one by one from the list.
2360  * If it isn't present, the current undo record being processed in this signal
2361  * execution is the one which should be applied (f_undo).
2362  */
2363 void
disk_restart_undo_callback(Signal * signal,Uint32 page_i,Uint32 page_id)2364 Dbtup::disk_restart_undo_callback(Signal* signal,
2365 				  Uint32 page_i,
2366 				  Uint32 page_id)
2367 {
2368   jamEntry();
2369   Ptr<GlobalPage> gpage;
2370   m_global_page_pool.getPtr(gpage, page_id);
2371   PagePtr pagePtr;
2372   pagePtr.i = gpage.i;
2373   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
2374   bool immediate_flag = m_immediate_flag;
2375   m_immediate_flag = false;
2376   Pending_undo_page* pendingPage = NULL;
2377   Apply_undo* undo = &f_undo;
2378   Uint32 count_pending = 1;
2379 
2380   bool pending = false;
2381 
2382   if (isNdbMtLqh())
2383   {
2384     jam();
2385     pending = (page_i != RNIL);
2386 
2387     if (pending)
2388     {
2389       jam();
2390       pendingPage = c_pending_undo_page_hash.getPtr(page_i);
2391       // page has outstanding undo records
2392       LocalApply_undo_list undoList(c_apply_undo_pool,
2393                                     pendingPage->m_apply_undo_head);
2394       count_pending = undoList.getCount();
2395       DEB_UNDO(("LDM(%u) EXECUTE LIST CALLBACK Page:%u count:%u",
2396                               instance(),
2397                               pendingPage->m_page_no,
2398                               count_pending));
2399     }
2400     else
2401     {
2402       DEB_UNDO(("LDM(%u) PAGE_NOT_FOUND_HASH", instance()));
2403     }
2404   }
2405 
2406   /**
2407    * Before we apply the UNDO record we need to discover which table
2408    * the page belongs to. For most pages this is listed in the page
2409    * header. However we cannot trust the page header since we could
2410    * come here with an UNDO log record for a page that have not ever
2411    * been written to disk after table creation. Worse the table could
2412    * even be listed as belonging to a different table and thus we
2413    * would create a mess here.
2414    *
2415    * To get the true identity of the page we will look up the table
2416    * in tsman, from this we will get the table id and fragment id
2417    * of the extent and this will also be the table id and fragment
2418    * id of the page we're dealing with here.
2419    *
2420    * Two things could happen here. We could come here with a page
2421    * that is belonging to table RNIL, this means that the page
2422    * was allocated after start of the LCP and also the extent was
2423    * allocated after the start of the LCP. In this case we don't
2424    * need to do anything, the extent isn't allocated to any table
2425    * and thus should remain a free extent and thus it doesn't make
2426    * sense to write to the page anything.
2427    *
2428    * Another variant is that the page belongs to a table which
2429    * isn't part of the restart, this can happen if the table
2430    * was dropped just before the crash.
2431    * Also in this case there is no need to do anything.
2432    *
2433    * Finally if we find that it belongs to an existing table, then
2434    * we will use this table id and fragment id here.
2435    *
2436    * Now the next question is if the page have been initialised
2437    * yet. We need to check 3 header variables for this.
2438    * table id, fragment id and table version.
2439    * Table id and fragment id isn't enough, the page could have belonged
2440    * a table with the same table id and fragment id, but it cannot at the
2441    * same time also have the same table version.
2442    *
2443    * Actually older versions didn't set the table version in the pages.
2444    * So it isn't possible here to be fully certain that the page belongs
2445    * to the correct table.
2446    *
2447    * A simple optimisation here is that this only needs to be done for
2448    * pages that misses in the page cache. If they are already in the page
2449    * cache then we can use the table id and fragment id as found in the
2450    * page header.
2451    *
2452    * For all pages that are changed or read into the page cache we will
2453    * also synchronize the extent bits with the page information.
2454    */
2455 
2456   if (! (pagePtr.p->list_index & 0x8000) ||
2457       pagePtr.p->nextList != RNIL ||
2458       pagePtr.p->prevList != RNIL)
2459   {
2460     jam();
2461     pagePtr.p->list_index |= 0x8000;
2462     pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
2463 #ifdef DEBUG_EXTENT_BITS
2464     Uint64 lsn = 0;
2465     lsn += pagePtr.p->m_page_header.m_page_lsn_hi;
2466     lsn <<= 32;
2467     lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
2468     DEB_EXTENT_BITS(("(%u)Set list_index bit 0x8000 on page(%u,%u)"
2469                      " when undo, page_lsn = %llu, key(%u,%u).%u"
2470                      ", undo_lsn: %llu",
2471                      instance(),
2472                      pagePtr.p->m_file_no,
2473                      pagePtr.p->m_page_no,
2474                      lsn,
2475                      undo->m_key.m_file_no,
2476                      undo->m_key.m_page_no,
2477                      undo->m_key.m_page_idx,
2478                      undo->m_lsn));
2479 #endif
2480   }
2481 
2482   Uint32 tableId= pagePtr.p->m_table_id;
2483   Uint32 fragId = pagePtr.p->m_fragment_id;
2484   Uint32 applied = 0;
2485 
2486   if (!pending) // direct execute, page not present in hash table.
2487   {
2488     ndbrequire(count_pending == 1);
2489   }
2490 
2491   for (Uint32 i = 1; i <= count_pending; i++)
2492   {
2493     Ptr<Apply_undo> pending_undo;
2494     if (pending)
2495     {
2496       jam();
2497       //Remove, process, release all Apply_undo from the list.
2498       LocalApply_undo_list undoList(c_apply_undo_pool,
2499                                     pendingPage->m_apply_undo_head);
2500       undoList.removeFirst(pending_undo);
2501       undo = pending_undo.p;
2502     }
2503 
2504     /**
2505      * Ensure that the Page entry in PGMAN has the correct table id
2506      * fragment id set if it will be used in a future LCP.
2507      */
2508     Page_cache_client::Request preq;
2509     preq.m_page.m_file_no = undo->m_key.m_file_no;
2510     preq.m_page.m_page_no = undo->m_key.m_page_no;
2511     preq.m_table_id = tableId;
2512     preq.m_fragment_id = fragId;
2513     Page_cache_client pgman(this, c_pgman);
2514     ndbrequire(pgman.init_page_entry(preq));
2515 
2516     // process the undo record/s
2517     if (tableId >= cnoOfTablerec)
2518     {
2519       jam();
2520       DEB_UNDO(("(%u)UNDO table> %u, page(%u,%u).%u",
2521                instance(),
2522                tableId,
2523                undo->m_key.m_file_no,
2524                undo->m_key.m_page_no,
2525                undo->m_key.m_page_idx));
2526       release_undo_record(pending_undo, pending);
2527       continue;
2528     }
2529 
2530     undo->m_table_ptr.i = tableId;
2531     ptrCheckGuard(undo->m_table_ptr, cnoOfTablerec, tablerec);
2532 
2533     if (! (undo->m_table_ptr.p->tableStatus == DEFINED &&
2534            undo->m_table_ptr.p->m_no_of_disk_attributes))
2535     {
2536       jam();
2537       DEB_UNDO(("(%u)UNDO !defined (%u) on page(%u,%u).%u",
2538                 instance(),
2539                 tableId,
2540                 undo->m_key.m_file_no,
2541                 undo->m_key.m_page_no,
2542                 undo->m_key.m_page_idx));
2543       release_undo_record(pending_undo, pending);
2544       continue;
2545     }
2546 
2547     Uint32 create_table_version = pagePtr.p->m_create_table_version;
2548     Uint32 page_version = pagePtr.p->m_ndb_version;
2549 
2550     ndbrequire(page_version >= NDB_DISK_V2);
2551     if (create_table_version !=
2552           c_lqh->getCreateSchemaVersion(tableId))
2553     {
2554       jam();
2555       DEB_UNDO(("UNDO fragment null %u/%u, old,new=(%u,%u), page(%u,%u).%u",
2556                  tableId,
2557                  fragId,
2558                  create_table_version,
2559                  c_lqh->getCreateSchemaVersion(tableId),
2560                  undo->m_key.m_file_no,
2561                  undo->m_key.m_page_no,
2562                  undo->m_key.m_page_idx));
2563       release_undo_record(pending_undo, pending);
2564       continue;
2565     }
2566 
2567     getFragmentrec(undo->m_fragment_ptr, fragId, undo->m_table_ptr.p);
2568     if (undo->m_fragment_ptr.isNull())
2569     {
2570       jam();
2571       DEB_UNDO(("(%u)UNDO fragment null tab(%u,%u), page(%u,%u).%u",
2572                 instance(),
2573                 tableId,
2574                 fragId,
2575                 undo->m_key.m_file_no,
2576                 undo->m_key.m_page_no,
2577                 undo->m_key.m_page_idx));
2578       release_undo_record(pending_undo, pending);
2579       continue;
2580     }
2581 
2582     Uint64 lsn = 0;
2583     applied = 0;
2584     lsn += pagePtr.p->m_page_header.m_page_lsn_hi;
2585     lsn <<= 32;
2586     lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
2587 
2588     undo->m_page_ptr = pagePtr;
2589 
2590     if (undo->m_lsn <= lsn &&
2591         !undo->m_fragment_ptr.p->m_undo_complete)
2592     {
2593       jam();
2594 
2595       applied = applied | 1;
2596       /**
2597        * Apply undo record
2598        */
2599       switch(undo->m_type){
2600       case File_formats::Undofile::UNDO_TUP_ALLOC:
2601       {
2602         jam();
2603         disk_restart_undo_alloc(undo);
2604         break;
2605       }
2606       case File_formats::Undofile::UNDO_TUP_UPDATE:
2607       {
2608         jam();
2609         disk_restart_undo_update(undo);
2610         break;
2611       }
2612       case File_formats::Undofile::UNDO_TUP_FIRST_UPDATE_PART:
2613       {
2614         jam();
2615         undo->m_in_intermediate_log_record = false;
2616         disk_restart_undo_update_first_part(undo);
2617         break;
2618       }
2619       case File_formats::Undofile::UNDO_TUP_UPDATE_PART:
2620       {
2621         jam();
2622         undo->m_in_intermediate_log_record = true;
2623         disk_restart_undo_update_part(undo);
2624         break;
2625       }
2626       case File_formats::Undofile::UNDO_TUP_FREE:
2627       {
2628         jam();
2629         disk_restart_undo_free(undo, true);
2630         break;
2631       }
2632       case File_formats::Undofile::UNDO_TUP_FREE_PART:
2633       {
2634         jam();
2635         undo->m_in_intermediate_log_record = false;
2636         disk_restart_undo_free(undo, false);
2637         break;
2638       }
2639       default:
2640         ndbabort();
2641       }
2642 
2643       if (undo->m_type != File_formats::Undofile::UNDO_TUP_UPDATE_PART)
2644       {
2645         jam();
2646         lsn = undo->m_lsn - 1; // make sure undo isn't run again...
2647         Page_cache_client pgman(this, c_pgman);
2648         pgman.update_lsn(signal, undo->m_key, lsn);
2649         jamEntry();
2650         disk_restart_undo_page_bits(signal, undo);
2651       }
2652     }
2653     else
2654     {
2655       jam();
2656       if (!immediate_flag &&
2657           undo->m_fragment_ptr.p->m_undo_complete != Fragrecord::UC_CREATE)
2658       {
2659         jam();
2660         /**
2661          * See Lemma 1 and Lemma 2 in analysis of extent page
2662          * synchronisation at restart.
2663          *
2664          * We don't need to call this function when immediate
2665          * flag since we already applied the first UNDO log
2666          * record on the page, there is no need to update
2667          * the page bits and the first log record have ensured
2668          * that the extent information is already allocated
2669          * properly.
2670          *
2671          * Also we don't go back from when a table was dropped or
2672          * created since we are then in territory where an old
2673          * incarnation of the table was and we need not handle
2674          * those log records.
2675          */
2676         DEB_UNDO(("(%u)disk_restart_undo_page_bits: page_lsn: %llu"
2677                   ", undo_lsn: %llu, page(%u,%u).%u",
2678                   instance(),
2679                   lsn,
2680                   undo->m_lsn,
2681                   undo->m_key.m_file_no,
2682                   undo->m_key.m_page_no,
2683                   undo->m_key.m_page_idx));
2684         disk_restart_undo_page_bits(signal, undo);
2685       }
2686       else
2687       {
2688         DEB_UNDO(("(%u)UNDO ignored: page_lsn: %llu"
2689                   ", undo_lsn: %llu, page(%u,%u).%u",
2690                   instance(),
2691                   lsn,
2692                   undo->m_lsn,
2693                   undo->m_key.m_file_no,
2694                   undo->m_key.m_page_no,
2695                   undo->m_key.m_page_idx));
2696       }
2697     }
2698 
2699     release_undo_record(pending_undo, pending);
2700   }
2701 
2702   ndbassert(count_pending != 0);
2703   if (isNdbMtLqh() && pending)
2704   {
2705     jam();
2706     LocalApply_undo_list undoList(c_apply_undo_pool,
2707                                   pendingPage->m_apply_undo_head);
2708     DEB_UNDO(("LDM(%u) Page:%u CheckCount:%u Applied:%u", instance(),
2709         pendingPage->m_page_no, undoList.getCount(), count_pending));
2710     ndbrequire(undoList.getCount() == 0);
2711     c_pending_undo_page_hash.remove(page_i);
2712     Ptr<Pending_undo_page> rel;
2713     rel.p = pendingPage;
2714     rel.i = page_i;
2715     c_pending_undo_page_pool.release(rel);
2716   }
2717   disk_restart_undo_next(signal, applied, count_pending);
2718 }
2719 
2720 void
disk_restart_undo_alloc(Apply_undo * undo)2721 Dbtup::disk_restart_undo_alloc(Apply_undo* undo)
2722 {
2723 #ifdef DEBUG_UNDO
2724   Uint64 lsn = 0;
2725   lsn += undo->m_page_ptr.p->m_page_header.m_page_lsn_hi;
2726   lsn <<= 32;
2727   lsn += undo->m_page_ptr.p->m_page_header.m_page_lsn_lo;
2728   DEB_UNDO(("(%u)applying %lld UNDO_TUP_ALLOC on page(%u,%u).%u"
2729             ", page_lsn: %llu",
2730             instance(),
2731             undo->m_lsn,
2732             undo->m_key.m_file_no,
2733             undo->m_key.m_page_no,
2734             undo->m_key.m_page_idx,
2735             lsn));
2736 #endif
2737   ndbassert(undo->m_page_ptr.p->m_file_no == undo->m_key.m_file_no);
2738   ndbassert(undo->m_page_ptr.p->m_page_no == undo->m_key.m_page_no);
2739   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
2740   {
2741     ((Fix_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx);
2742   }
2743   else
2744   {
2745     ((Var_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx, 0);
2746   }
2747 }
2748 
2749 void
disk_restart_undo_update(Apply_undo * undo)2750 Dbtup::disk_restart_undo_update(Apply_undo* undo)
2751 {
2752   Uint32* ptr;
2753   Uint32 len= undo->m_len - 4;
2754 #ifdef DEBUG_UNDO
2755   Uint64 lsn = 0;
2756   lsn += undo->m_page_ptr.p->m_page_header.m_page_lsn_hi;
2757   lsn <<= 32;
2758   lsn += undo->m_page_ptr.p->m_page_header.m_page_lsn_lo;
2759   DEB_UNDO(("(%u)applying %lld UNDO_TUP_UPDATE on page(%u,%u).%u,"
2760             " page_lsn: %llu",
2761             instance(),
2762             undo->m_lsn,
2763             undo->m_key.m_file_no,
2764             undo->m_key.m_page_no,
2765             undo->m_key.m_page_idx,
2766             lsn));
2767 #endif
2768   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
2769   {
2770     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx, len);
2771     ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
2772   }
2773   else
2774   {
2775     ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
2776     abort();
2777   }
2778 
2779   const Disk_undo::Update *update = (const Disk_undo::Update*)undo->m_ptr;
2780   const Uint32* src= update->m_data;
2781   memcpy(ptr, src, 4 * len);
2782 }
2783 
2784 void
disk_restart_undo_update_first_part(Apply_undo * undo)2785 Dbtup::disk_restart_undo_update_first_part(Apply_undo* undo)
2786 {
2787   Uint32* ptr;
2788   Uint32 len= undo->m_len - 4;
2789 
2790   DEB_UNDO(("(%u)applying %lld UNDO_TUP_FIRST_UPDATE_PART"
2791             " on page(%u,%u).%u[%u]",
2792             instance(),
2793             undo->m_lsn,
2794             undo->m_key.m_file_no,
2795             undo->m_key.m_page_no,
2796             undo->m_key.m_page_idx,
2797             undo->m_offset));
2798 
2799   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
2800   {
2801     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx, len);
2802     ndbrequire(len < undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
2803   }
2804   else
2805   {
2806     ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
2807     abort();
2808   }
2809 
2810   const Disk_undo::Update *update = (const Disk_undo::Update*)undo->m_ptr;
2811   const Uint32* src= update->m_data;
2812   memcpy(ptr, src, 4 * len);
2813 }
2814 
2815 void
disk_restart_undo_update_part(Apply_undo * undo)2816 Dbtup::disk_restart_undo_update_part(Apply_undo* undo)
2817 {
2818   Uint32* ptr;
2819   Uint32 len= undo->m_len - 5;
2820 
2821   DEB_UNDO(("(%u)applying %lld UNDO_TUP_UPDATE_PART on page(%u,%u).%u[%u]",
2822             instance(),
2823             undo->m_lsn,
2824             undo->m_key.m_file_no,
2825             undo->m_key.m_page_no,
2826             undo->m_key.m_page_idx,
2827             undo->m_offset));
2828 
2829   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
2830   {
2831     Uint32 fix_header_size = undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size;
2832     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx, len);
2833     Uint32 offset = undo->m_offset;
2834     ndbrequire((len + offset) <= fix_header_size);
2835     ptr = &ptr[offset];
2836   }
2837   else
2838   {
2839     ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
2840     abort();
2841   }
2842 
2843   const Disk_undo::UpdatePart *update = (const Disk_undo::UpdatePart*)undo->m_ptr;
2844   const Uint32* src= update->m_data;
2845   memcpy(ptr, src, 4 * len);
2846 }
2847 
2848 void
disk_restart_undo_free(Apply_undo * undo,bool full_free)2849 Dbtup::disk_restart_undo_free(Apply_undo* undo, bool full_free)
2850 {
2851   Uint32* ptr, idx = undo->m_key.m_page_idx;
2852   Uint32 len= undo->m_len - 4;
2853 #ifdef DEBUG_UNDO
2854   {
2855     Uint64 lsn = 0;
2856     lsn += undo->m_page_ptr.p->m_page_header.m_page_lsn_hi;
2857     lsn <<= 32;
2858     lsn += undo->m_page_ptr.p->m_page_header.m_page_lsn_lo;
2859     const char *free = (const char*)"UNDO_TUP_FREE";
2860     const char *free_part = (const char*)"UNDO_TUP_FREE_PART";
2861     DEB_UNDO(("(%u)applying %lld %s on page(%u,%u).%u, page_lsn:"
2862               " %llu idx:%u",
2863               instance(),
2864               undo->m_lsn,
2865               full_free ? free : free_part,
2866               undo->m_key.m_file_no,
2867               undo->m_key.m_page_no,
2868               undo->m_key.m_page_idx,
2869               lsn,
2870               idx));
2871   }
2872 #endif
2873   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
2874   {
2875     idx= ((Fix_page*)undo->m_page_ptr.p)->alloc_record(idx);
2876     Uint32 fix_header_size = undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size;
2877     if (full_free)
2878     {
2879       ndbrequire(len == fix_header_size);
2880     }
2881     else
2882     {
2883       ndbrequire(len < fix_header_size);
2884     }
2885     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(idx, fix_header_size);
2886   }
2887   else
2888   {
2889     abort();
2890   }
2891 
2892   if (idx != undo->m_key.m_page_idx)
2893   {
2894     Uint64 lsn = undo->m_lsn;
2895     jam();
2896     jamLine(lsn & 0xFFFF);
2897     jamLine((lsn >> 16) & 0xFFFF);
2898     jamLine((lsn >> 32) & 0xFFFF);
2899     jamLine((lsn >> 48) & 0xFFFF);
2900     ndbabort();
2901   }
2902   const Disk_undo::Free *free = (const Disk_undo::Free*)undo->m_ptr;
2903   const Uint32* src= free->m_data;
2904   memcpy(ptr, src, 4 * len);
2905 }
2906 
2907 void
disk_restart_undo_page_bits(Signal * signal,Apply_undo * undo)2908 Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
2909 {
2910   Fragrecord* fragPtrP = undo->m_fragment_ptr.p;
2911   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
2912 
2913   /**
2914    * Set alloc.m_curr_extent_info_ptr_i to
2915    *   current this extent (and move old extend into free matrix)
2916    */
2917   Page* pageP = undo->m_page_ptr.p;
2918   Uint32 free = pageP->free_space;
2919   Uint32 new_bits = alloc.calc_page_free_bits(free);
2920   pageP->list_index = 0x8000 | new_bits;
2921 
2922   D("Tablespace_client - disk_restart_undo_page_bits");
2923   Tablespace_client tsman(signal, this, c_tsman,
2924 			  fragPtrP->fragTableId,
2925 	 		  fragPtrP->fragmentId,
2926                           c_lqh->getCreateSchemaVersion(fragPtrP->fragTableId),
2927 			  fragPtrP->m_tablespace_id);
2928 
2929   DEB_EXTENT_BITS(("(%u)tab(%u,%u), page(%u,%u):%u new_bits: %u,"
2930                    " free_space: %u, page_tab(%u,%u).%u",
2931                   instance(),
2932                   fragPtrP->fragTableId,
2933                   fragPtrP->fragmentId,
2934                   pageP->m_file_no,
2935                   pageP->m_page_no,
2936                   undo->m_page_ptr.i,
2937                   new_bits,
2938                   free,
2939                   pageP->m_table_id,
2940                   pageP->m_fragment_id,
2941                   pageP->m_create_table_version));
2942 
2943   tsman.restart_undo_page_free_bits(&undo->m_key, new_bits);
2944   jamEntry();
2945 }
2946 
2947 /**
2948  * disk_restart_alloc_extent is called during scan of extent
2949  * headers in TSMAN. It ensures that we build the extent data
2950  * structures that ensures that we select the proper extent for
2951  * new records.
2952  *
2953  * The data to build is to start with the Extent_info struct.
2954  * m_free_space
2955  * ------------
2956  * This variable contains the number free records available
2957  * in the extent. It is initialised to
2958  * number of pages in extent times the number of records per
2959  * page when creating a new extent. Each prealloc will
2960  * decrease the number by one and each free will increase it
2961  * by one (also abort of prealloc).
2962  * At restarts we don't know the number so it is first set to
2963  * 0. Next it is set according to the page bits in the extent
2964  * information stored on disk by TSMAN.
2965  * The page bits on disk have the following meaning:
2966  * 0: The page is free, no records stored there
2967  * 1: The page is not free and not full, at least one record
2968  *    is stored in the page.
2969  * 2: The page is full
2970  * 3: The page is full
2971  *
2972  * For free pages we add number of records per page, for "half full"
2973  * pages we add to number of free pages in extent.
2974  * This means that this number is a minimum of the actual number of
2975  * free records in the extent.
2976  * Each time we use a page we will check the m_restart_seq variable on
2977  * the page (not checked during UNDO log execution since the variables
2978  * are not initialised at that time). If it isn't set to the
2979  * current m_restart_seq it means that the page is not yet fully
2980  * known. In this case we will call restart_setup_page that will
2981  * update the m_free_pages variable correctly for the page and will
2982  * also update the extent position (explained below).
2983  *
2984  * m_free_page_count
2985  * -----------------
2986  * For each state above we have a count of how many pages of each type
2987  * that we have. When initialised we set all pages to be in free bucket.
2988  * At restart we set all counters to 0, next we check each page in the
2989  * call to disk_restart_page_bits, this is called immediately after
2990  * the call to disk_restart_alloc_extent for each page in the extent.
2991  *
2992  * m_empty_page_no
2993  * ---------------
2994  * This is only used the first time we create the extent. It is never
2995  * used after a node restart. It makes sure that we allocate free
2996  * pages from the beginning of the extent to the end of the extent.
2997  * The variable isn't really necessary since it will work fairly good
2998  * also after a restart.
2999  *
3000  * m_first_page_no
3001  * ---------------
3002  * This is the page number of the first page in the extent. This is the
3003  * page id in the data file, so page id 3 is the 3rd 32kByte page in the
3004  * data file.
3005  *
3006  * m_key
3007  * -----
3008  * This represents the information about the extent page and extent number.
3009  * m_key.m_file_no is the file number of the extent
3010  * m_key.m_page_no is the page number of the first page in the extent
3011  * m_key.m_page_idx is the extent number, can be used to find the exact place
3012  *   of the extent information on the page
3013  *
3014  * nextHash, prevHash
3015  * ------------------
3016  * Each extent is placed in a hash table c_extent_hash. The key to this
3017  * hash table is m_key above, the m_page_no is not part of the key. So
3018  * a key with m_file_no set to file number and m_page_idx set to
3019  * extent number will find the appropriate extent.
3020  *
3021  * nextPool
3022  * --------
3023  * Used for linking free extent records in the c_extent_pool.
3024  * When allocated it is used to keep things in the m_extent_list.
3025  *
3026  * nextList, prevList
3027  * ------------------
3028  * Used to store the extent information in one of the 20 lists
3029  * in m_free_extents in the Disk_alloc_info struct as part of
3030  * the fragment.
3031  * The general idea about this matrix is explained in the
3032  * paper "Recovery in MySQL Cluster 5.1" presented at
3033  * VLDB 2005.
3034  *
3035  * m_free_matrix_pos
3036  * -----------------
3037  * This specifies which of the 20 lists the extent is currently
3038  * stored in. If set to RNIL then it is the extent referred to
3039  * from the m_curr_extent_info_ptr_i in the Disk_alloc_info
3040  * struct of the fragment. This indicates the current extent
3041  * used to insert data into.
3042  *
3043  * The data structures in Disk_alloc_info is referring to extent
3044  * information.
3045  *
3046  * Disk_alloc_info data variables (part of fragment)
3047  * -------------------------------------------------
3048  *
3049  * m_extent_size
3050  * -------------
3051  * Size of the extents used by this fragment
3052  *
3053  * m_curr_extent_info_ptr_i
3054  * ------------------------
3055  * Pointing to the current extent used for inserts, RNIL if
3056  * no current one.
3057  *
3058  * m_free_extents
3059  * --------------
3060  * List of extents as arranged in a matrix, there are 20
3061  * entries in a 5,4 matrix.
3062  *
3063  * The row information is the free level.
3064  * Row 0 is at least 80% free
3065  * Row 1 is at least 60% free
3066  * Row 2 is at least 40% free
3067  * Row 3 is at least 20% free
3068  * Row 4 is at least 0% free
3069  *
3070  * Col is based on the states described above. So if any page
3071  * in extent is fully free it will be in column 0.
3072  * If at least one page in extent is in "half full" state it
3073  * will be in column 1, if any page is in full state 2 it will
3074  * be in column 2 and otherwise it will be in column 3.
3075  * Search starts in Row 0 and goes through the columns, next
3076  * to Row 1 and so forth.
3077  *
3078  * m_total_extent_free_space_thresholds
3079  * ------------------------------------
3080  * This variable is static after creating the fragment. It
3081  * provides the levels on number of records for 80% level,
3082  * 60% level and so forth.
3083  *
3084  * m_page_free_bits_map
3085  * --------------------
3086  * This is also static information after creation of fragment.
3087  * It describes the number of free records in a page when in
3088  * states 0 through.
3089  * In state 0 it is set to records per page.
3090  * State 1 is set to 1
3091  * State 2 and 3 is set to 0.
3092  *
3093  * m_extent_list
3094  * -------------
3095  * This list is used for disk scans. In this case we need to know all
3096  * disk pages and these are found by scanning all extents one by one.
3097  * New extents are added first, so new pages added during scan are not
3098  * seen by the scan. Disk scans are currently only used for backups.
3099  *
3100  * m_dirty_pages
3101  * -------------
3102  * This is one list per state. When allocating a new page for insert we
3103  * search for a page in the free (state 0) and "half full" (state 1)
3104  * lists. If any page is in these lists we're done with our search of
3105  * page to insert into. This happens in disk_page_prealloc.
3106  * If a page is found in dirty pages we immediately update the
3107  * extent position of the page, we also move the page to another
3108  * list in m_dirty_pages if state changed due to insert, finally
3109  * we also update m_free_page_count above on the extent if state
3110  * changed.
3111  *
3112  * If the prealloc is aborted we remove the record from the page
3113  * and update the same structures again if necessary.
3114  *
3115  * When the page arrives from disk we also check whether there is a
3116  * need to change the m_free_page_count and extent position. A page
3117  * only arrives from disk after disk_page_prealloc if we were unable
3118  * to find a page among the ones already in memory that could fit the
3119  * new row. Here it is also placed in the proper m_dirty_pages list.
3120  * It is a new page at this point not currently in any list since it
3121  * comes from disk. It could actually come from the page cache still.
3122  * This could happen when a page have been read and is used for writing.
3123  * We don't use any knowledge of what pages have been read when
3124  * selecting which page to write.
3125  *
3126  * There are also some important variables on each page that is used
3127  * for page allocation.
3128  *
3129  * m_unmap_pages
3130  * -------------
3131  * Whenever a data page (not extent page) is to be flushed to disk PGMAN
3132  * will inform DBTUP about this. It will inform it before the flush and
3133  * also when the flush is completed.
3134  *
3135  * Before flush we will move the page away from the m_dirty_pages list
3136  * and into the m_unmap_pages list. If the dirty count is down to 0
3137  * we will also set list_index bit 0x8000 to indicate page is not in
3138  * dirty page list. We also set the uncommitted bits in the extent
3139  * information before we flush it to disk.
3140  *
3141  * After flush we will remove it from the unmap pages list.
3142  * We will also update the extent information if necessary and if it
3143  * has changed we will set the page to be dirty in PGMAN.
3144  *
3145  * m_page_requests
3146  * ---------------
3147  * This is a set of lists, one list for each state as described above.
3148  * Pages in these lists are in transit from disk to the memory to be
3149  * made dirty. Thus they are suitable to be used if no dirty pages are
3150  * available in memory. When we use those pages we will also move them
3151  * to the proper list to ensure that they are no longer used when already
3152  * full.
3153  *
3154  * list_index
3155  * ----------
3156  * This represents the state of the page from above (0 free, 1 "half full",
3157  * 2 and 3 full). Also if 0x8000 is set the page isn't in the m_dirty_pages
3158  * list.
3159  *
3160  * free_space
3161  * ----------
3162  * This is the count of the number of records stored on the page. It is
3163  * update by calls to free_record and alloc_record in tuppage.cpp.
3164  *
3165  * disk_page_prealloc
3166  * ------------------
3167  * This function is called to allocate a record for use in insert of disk
3168  * record. It returns the page id and page index of the row to be used.
3169  * The page isn't necessarily available in memory when returned from
3170  * this function. It is however guaranteed to at least be in transit
3171  * from disk. So the caller can safely call get_page on this page and
3172  * know that when it arrives it will be ready for consumption. The
3173  * callbacks are executed in order, so this means that
3174  * disk_page_prealloc_callback is called before the callback used by
3175  * the caller to actually perform the insert action.
3176  */
3177 int
disk_restart_alloc_extent(EmulatedJamBuffer * jamBuf,Uint32 tableId,Uint32 fragId,Uint32 create_table_version,const Local_key * key,Uint32 pages)3178 Dbtup::disk_restart_alloc_extent(EmulatedJamBuffer* jamBuf,
3179                                  Uint32 tableId,
3180                                  Uint32 fragId,
3181                                  Uint32 create_table_version,
3182 				 const Local_key* key,
3183                                  Uint32 pages)
3184 {
3185   /**
3186    * This function is called from TSMAN in rep thread. Must not use any
3187    * block variables other than extent information.
3188    */
3189   TablerecPtr tabPtr;
3190   FragrecordPtr fragPtr;
3191   tabPtr.i = tableId;
3192   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
3193   Uint32 current_create_table_version = c_lqh->getCreateSchemaVersion(tableId);
3194   DEB_EXTENT_BITS(("(%u)disk_restart_alloc_extent: tab(%u,%u):%u,"
3195                    " current version: %u",
3196                    instance(),
3197                    tableId,
3198                    fragId,
3199                    create_table_version,
3200                    current_create_table_version));
3201 
3202   if (tabPtr.p->tableStatus == DEFINED &&
3203       tabPtr.p->m_no_of_disk_attributes &&
3204       (current_create_table_version == create_table_version ||
3205        create_table_version == 0))
3206   {
3207     thrjam(jamBuf);
3208     getFragmentrec(fragPtr, fragId, tabPtr.p);
3209 
3210     if (!fragPtr.isNull())
3211     {
3212       thrjam(jamBuf);
3213 
3214       Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
3215 
3216       Ptr<Extent_info> ext;
3217       ndbrequire(c_extent_pool.seize(ext));
3218 #ifdef VM_TRACE
3219       ndbout << "allocated " << pages << " pages: " << *key
3220 	     << " table: " << tabPtr.i << " fragment: " << fragId << endl;
3221 #endif
3222       ext.p->m_key = *key;
3223       ext.p->m_first_page_no = ext.p->m_key.m_page_no;
3224       ext.p->m_free_space= 0;
3225       ext.p->m_empty_page_no = (1 << 16); // We don't know, so assume none
3226       DEB_EXTENT_BITS_HASH((
3227                 "(%u)restart:extent(%u).%u in tab(%u,%u),"
3228                 " first_page(%u,%u)",
3229                 instance(),
3230                 ext.p->m_key.m_page_idx,
3231                 ext.i,
3232                 fragPtr.p->fragTableId,
3233                 fragPtr.p->fragmentId,
3234                 ext.p->m_key.m_file_no,
3235                 ext.p->m_first_page_no));
3236       memset(ext.p->m_free_page_count, 0, sizeof(ext.p->m_free_page_count));
3237 
3238       if (alloc.m_curr_extent_info_ptr_i != RNIL)
3239       {
3240         thrjam(jamBuf);
3241         Ptr<Extent_info> old;
3242         c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
3243         ndbassert(old.p->m_free_matrix_pos == RNIL);
3244         Uint32 pos= alloc.calc_extent_pos(old.p);
3245         Local_extent_info_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
3246         new_list.addFirst(old);
3247         old.p->m_free_matrix_pos= pos;
3248       }
3249 
3250       alloc.m_curr_extent_info_ptr_i = ext.i;
3251       ext.p->m_free_matrix_pos = RNIL;
3252       c_extent_hash.add(ext);
3253 
3254       Local_fragment_extent_list list1(c_extent_pool, alloc.m_extent_list);
3255       list1.addFirst(ext);
3256       return 0;
3257     }
3258   }
3259   thrjam(jamBuf);
3260   return -1;
3261 }
3262 
3263 /**
3264  * This function is called from TSMAN during scan of extent headers.
3265  * It is vital that the LDM thread is not doing any activity
3266  * regarding this information at the same time. This only happens
3267  * in a very specific part of restart. It is vital to ensure that
3268  * one only uses stack variables and no block variables. The only
3269  * block variables allowed to use are those that we update here, that
3270  * is the extent information of a fragment and this must not be
3271  * manipulated at the same time from LDM thread activity, this is
3272  * safe guarded by the restart phase serialisation.
3273  */
3274 void
disk_restart_page_bits(EmulatedJamBuffer * jamBuf,Uint32 tableId,Uint32 fragId,Uint32 create_table_version,const Local_key * key,Uint32 bits)3275 Dbtup::disk_restart_page_bits(EmulatedJamBuffer* jamBuf,
3276                               Uint32 tableId,
3277                               Uint32 fragId,
3278                               Uint32 create_table_version,
3279 			      const Local_key* key,
3280                               Uint32 bits)
3281 {
3282   thrjam(jamBuf);
3283   TablerecPtr tabPtr;
3284   FragrecordPtr fragPtr;
3285   Uint32 current_create_table_version = c_lqh->getCreateSchemaVersion(tableId);
3286   tabPtr.i = tableId;
3287   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
3288   if (tabPtr.p->tableStatus == DEFINED &&
3289       tabPtr.p->m_no_of_disk_attributes &&
3290       (current_create_table_version == create_table_version ||
3291        create_table_version == 0))
3292   {
3293     thrjam(jamBuf);
3294     getFragmentrec(fragPtr, fragId, tabPtr.p);
3295     Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
3296 
3297     Ptr<Extent_info> ext;
3298     c_extent_pool.getPtr(ext, alloc.m_curr_extent_info_ptr_i);
3299 
3300     Uint32 size= alloc.calc_page_free_space(bits);
3301 
3302     ext.p->m_free_page_count[bits]++;
3303     DEB_EXTENT_BITS(("(%u)disk_restart_page_bits:extent(%u), tab(%u,%u),"
3304                      " page(%u,%u), bits: %u, new_count: %u",
3305                      instance(),
3306                      ext.p->m_key.m_page_idx,
3307                      tableId,
3308                      fragId,
3309                      key->m_file_no,
3310                      key->m_page_no,
3311                      bits,
3312                      ext.p->m_free_page_count[bits]));
3313 
3314     // actually only to update free_space
3315     update_extent_pos(jamBuf, alloc, ext, size);
3316     ndbassert(ext.p->m_free_matrix_pos == RNIL);
3317     DEB_EXTENT_BITS(("(%u)disk_restart_page_bits in tab(%u,%u):%u,"
3318                      " page(%u,%u), bits: %u, ext.i: %u,"
3319                      " extent_no: %u",
3320                      instance(),
3321                      tableId,
3322                      fragId,
3323                      create_table_version,
3324                      key->m_file_no,
3325                      key->m_page_no,
3326                      bits,
3327                      ext.i,
3328                      key->m_page_idx));
3329   }
3330 }
3331 
3332 void
disk_page_get_allocated(const Tablerec * tabPtrP,const Fragrecord * fragPtrP,Uint64 res[2])3333 Dbtup::disk_page_get_allocated(const Tablerec* tabPtrP,
3334                                const Fragrecord * fragPtrP,
3335                                Uint64 res[2])
3336 {
3337   res[0] = res[1] = 0;
3338   if (tabPtrP->m_no_of_disk_attributes)
3339   {
3340     jam();
3341     const Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
3342     Uint64 cnt = 0;
3343     Uint64 free = 0;
3344 
3345     {
3346       Disk_alloc_info& tmp = const_cast<Disk_alloc_info&>(alloc);
3347       Local_fragment_extent_list list(c_extent_pool, tmp.m_extent_list);
3348       Ptr<Extent_info> extentPtr;
3349       for (list.first(extentPtr); !extentPtr.isNull(); list.next(extentPtr))
3350       {
3351         cnt++;
3352         free += extentPtr.p->m_free_space;
3353       }
3354     }
3355     res[0] = cnt * alloc.m_extent_size * File_formats::NDB_PAGE_SIZE;
3356     res[1] = free * 4 * tabPtrP->m_offsets[DD].m_fix_header_size;
3357   }
3358 }
3359