1 /*
2    Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUP_C
26 #define DBTUP_DISK_ALLOC_CPP
27 #include "Dbtup.hpp"
28 
29 static
30 NdbOut&
operator <<(NdbOut & out,const Ptr<Dbtup::Page> & ptr)31 operator<<(NdbOut& out, const Ptr<Dbtup::Page> & ptr)
32 {
33   out << "[ Page: ptr.i: " << ptr.i
34       << " [ m_file_no: " << ptr.p->m_file_no
35       << " m_page_no: " << ptr.p->m_page_no << "]"
36       << " list_index: " << ptr.p->list_index
37       << " free_space: " << ptr.p->free_space
38       << " uncommitted_used_space: " << ptr.p->uncommitted_used_space
39       << " ]";
40   return out;
41 }
42 
43 static
44 NdbOut&
operator <<(NdbOut & out,const Ptr<Dbtup::Page_request> & ptr)45 operator<<(NdbOut& out, const Ptr<Dbtup::Page_request> & ptr)
46 {
47   out << "[ Page_request: ptr.i: " << ptr.i
48       << " " << ptr.p->m_key
49       << " m_original_estimated_free_space: " << ptr.p->m_original_estimated_free_space
50       << " m_list_index: " << ptr.p->m_list_index
51       << " m_frag_ptr_i: " << ptr.p->m_frag_ptr_i
52       << " m_extent_info_ptr: " << ptr.p->m_extent_info_ptr
53       << " m_ref_count: " << ptr.p->m_ref_count
54       << " m_uncommitted_used_space: " << ptr.p->m_uncommitted_used_space
55       << " ]";
56 
57   return out;
58 }
59 
60 static
61 NdbOut&
operator <<(NdbOut & out,const Ptr<Dbtup::Extent_info> & ptr)62 operator<<(NdbOut& out, const Ptr<Dbtup::Extent_info> & ptr)
63 {
64   out << "[ Extent_info: ptr.i " << ptr.i
65       << " " << ptr.p->m_key
66       << " m_first_page_no: " << ptr.p->m_first_page_no
67       << " m_free_space: " << ptr.p->m_free_space
68       << " m_free_matrix_pos: " << ptr.p->m_free_matrix_pos
69       << " m_free_page_count: [";
70 
71   for(Uint32 i = 0; i<Dbtup::EXTENT_SEARCH_MATRIX_COLS; i++)
72     out << " " << ptr.p->m_free_page_count[i];
73   out << " ] ]";
74 
75   return out;
76 }
77 
78 #if NOT_YET_FREE_EXTENT
79 static
80 inline
81 bool
check_free(const Dbtup::Extent_info * extP)82 check_free(const Dbtup::Extent_info* extP)
83 {
84   Uint32 res = 0;
85   for (Uint32 i = 1; i<MAX_FREE_LIST; i++)
86     res += extP->m_free_page_count[i];
87   return res;
88 }
89 #error "Code for deallocting extents when they get empty"
90 #error "This code is not yet complete"
91 #endif
92 
93 #if NOT_YET_UNDO_ALLOC_EXTENT
94 #error "This is needed for deallocting extents when they get empty"
95 #error "This code is not complete yet"
96 #endif
97 
98 void
dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)99 Dbtup::dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)
100 {
101   const Uint32 limit = 512;
102   ndbout_c("dirty pages");
103   for(Uint32 i = 0; i<MAX_FREE_LIST; i++)
104   {
105     printf("  %d : ", i);
106     PagePtr ptr;
107     ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
108     LocalDLList<Page> list(*pool, alloc.m_dirty_pages[i]);
109     Uint32 c = 0;
110     for (list.first(ptr); c < limit && !ptr.isNull(); c++, list.next(ptr))
111     {
112       ndbout << ptr << " ";
113     }
114     if (c == limit)
115     {
116       ndbout << "MAXLIMIT ";
117     }
118     ndbout_c(" ");
119   }
120   ndbout_c("page requests");
121   for(Uint32 i = 0; i<MAX_FREE_LIST; i++)
122   {
123     printf("  %d : ", i);
124     Ptr<Page_request> ptr;
125     Local_page_request_list list(c_page_request_pool,
126 				 alloc.m_page_requests[i]);
127     Uint32 c = 0;
128     for (list.first(ptr); c < limit && !ptr.isNull(); c++, list.next(ptr))
129     {
130       ndbout << ptr << " ";
131     }
132     if (c == limit)
133     {
134       ndbout << "MAXLIMIT ";
135     }
136     ndbout_c(" ");
137   }
138 
139   ndbout_c("Extent matrix");
140   for(Uint32 i = 0; i<alloc.SZ; i++)
141   {
142     printf("  %d : ", i);
143     Ptr<Extent_info> ptr;
144     Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[i]);
145     Uint32 c = 0;
146     for (list.first(ptr); c < limit && !ptr.isNull(); c++, list.next(ptr))
147     {
148       ndbout << ptr << " ";
149     }
150     if (c == limit)
151     {
152       ndbout << "MAXLIMIT ";
153     }
154     ndbout_c(" ");
155   }
156 
157   if (alloc.m_curr_extent_info_ptr_i != RNIL)
158   {
159     Ptr<Extent_info> ptr;
160     c_extent_pool.getPtr(ptr, alloc.m_curr_extent_info_ptr_i);
161     ndbout << "current extent: " << ptr << endl;
162   }
163 }
164 
165 #if defined VM_TRACE || 1
166 #define ddassert(x) do { if(unlikely(!(x))) { dump_disk_alloc(alloc); ndbrequire(false); } } while(0)
167 #else
168 #define ddassert(x)
169 #endif
170 
Disk_alloc_info(const Tablerec * tabPtrP,Uint32 extent_size)171 Dbtup::Disk_alloc_info::Disk_alloc_info(const Tablerec* tabPtrP,
172 					Uint32 extent_size)
173 {
174   m_extent_size = extent_size;
175   m_curr_extent_info_ptr_i = RNIL;
176   if (tabPtrP->m_no_of_disk_attributes == 0)
177     return;
178 
179   Uint32 min_size= 4*tabPtrP->m_offsets[DD].m_fix_header_size;
180 
181   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
182   {
183     Uint32 recs_per_page= (4*Tup_fixsize_page::DATA_WORDS)/min_size;
184     m_page_free_bits_map[0] = recs_per_page; // 100% free
185     m_page_free_bits_map[1] = 1;
186     m_page_free_bits_map[2] = 0;
187     m_page_free_bits_map[3] = 0;
188 
189     Uint32 max= recs_per_page * extent_size;
190     for(Uint32 i = 0; i<EXTENT_SEARCH_MATRIX_ROWS; i++)
191     {
192       m_total_extent_free_space_thresholds[i] =
193 	(EXTENT_SEARCH_MATRIX_ROWS - i - 1)*max/EXTENT_SEARCH_MATRIX_ROWS;
194     }
195   }
196   else
197   {
198     abort();
199   }
200 }
201 
202 Uint32
find_extent(Uint32 sz) const203 Dbtup::Disk_alloc_info::find_extent(Uint32 sz) const
204 {
205   /**
206    * Find an extent with sufficient space for sz
207    * Find the biggest available (with most free space)
208    * Return position in matrix
209    */
210   Uint32 col = calc_page_free_bits(sz);
211   Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
212   for(Uint32 i= 0; i<EXTENT_SEARCH_MATRIX_SIZE; i++)
213   {
214     // Check that it can cater for request
215     if (!m_free_extents[i].isEmpty())
216     {
217       return i;
218     }
219 
220     if ((i & mask) >= col)
221     {
222       i = (i & ~mask) + mask;
223     }
224   }
225 
226   return RNIL;
227 }
228 
229 Uint32
calc_extent_pos(const Extent_info * extP) const230 Dbtup::Disk_alloc_info::calc_extent_pos(const Extent_info* extP) const
231 {
232   Uint32 free= extP->m_free_space;
233   Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
234 
235   Uint32 col= 0, row=0;
236 
237   /**
238    * Find correct row based on total free space
239    *   if zero (or very small free space) put
240    *     absolutly last
241    */
242   {
243     const Uint32 *arr= m_total_extent_free_space_thresholds;
244     for(; free < * arr++; row++)
245       assert(row < EXTENT_SEARCH_MATRIX_ROWS);
246   }
247 
248   /**
249    * Find correct col based on largest available chunk
250    */
251   {
252     const Uint16 *arr= extP->m_free_page_count;
253     for(; col < EXTENT_SEARCH_MATRIX_COLS && * arr++ == 0; col++);
254   }
255 
256   /**
257    * NOTE
258    *
259    *   If free space on extent is small or zero,
260    *     col will be = EXTENT_SEARCH_MATRIX_COLS
261    *     row will be = EXTENT_SEARCH_MATRIX_ROWS
262    *   in that case pos will be col * row = max pos
263    *   (as fixed by + 1 in declaration)
264    */
265   Uint32 pos= (row * (mask + 1)) + (col & mask);
266 
267   assert(pos < EXTENT_SEARCH_MATRIX_SIZE);
268   return pos;
269 }
270 
271 void
update_extent_pos(Disk_alloc_info & alloc,Ptr<Extent_info> extentPtr,Int32 delta)272 Dbtup::update_extent_pos(Disk_alloc_info& alloc,
273                          Ptr<Extent_info> extentPtr,
274                          Int32 delta)
275 {
276   if (delta < 0)
277   {
278     jam();
279     Uint32 sub = Uint32(- delta);
280     ddassert(extentPtr.p->m_free_space >= sub);
281     extentPtr.p->m_free_space -= sub;
282   }
283   else
284   {
285     jam();
286     extentPtr.p->m_free_space += delta;
287     ndbassert(Uint32(delta) <= alloc.calc_page_free_space(0));
288   }
289 
290 #ifdef VM_TRACE
291   Uint32 cnt = 0;
292   Uint32 sum = 0;
293   for(Uint32 i = 0; i<MAX_FREE_LIST; i++)
294   {
295     cnt += extentPtr.p->m_free_page_count[i];
296     sum += extentPtr.p->m_free_page_count[i] * alloc.calc_page_free_space(i);
297   }
298   if (extentPtr.p->m_free_page_count[0] == cnt)
299   {
300     ddassert(extentPtr.p->m_free_space == cnt*alloc.m_page_free_bits_map[0]);
301   }
302   else
303   {
304     ddassert(extentPtr.p->m_free_space < cnt*alloc.m_page_free_bits_map[0]);
305   }
306   ddassert(extentPtr.p->m_free_space >= sum);
307   ddassert(extentPtr.p->m_free_space <= cnt*alloc.m_page_free_bits_map[0]);
308 #endif
309 
310   Uint32 old = extentPtr.p->m_free_matrix_pos;
311   if (old != RNIL)
312   {
313     Uint32 pos = alloc.calc_extent_pos(extentPtr.p);
314     if (old != pos)
315     {
316       jam();
317       Local_extent_info_list old_list(c_extent_pool, alloc.m_free_extents[old]);
318       Local_extent_info_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
319       old_list.remove(extentPtr);
320       new_list.add(extentPtr);
321       extentPtr.p->m_free_matrix_pos= pos;
322     }
323   }
324   else
325   {
326     ddassert(alloc.m_curr_extent_info_ptr_i == extentPtr.i);
327   }
328 }
329 
330 void
restart_setup_page(Disk_alloc_info & alloc,PagePtr pagePtr,Int32 estimate)331 Dbtup::restart_setup_page(Disk_alloc_info& alloc, PagePtr pagePtr,
332                           Int32 estimate)
333 {
334   jam();
335   /**
336    * Link to extent, clear uncommitted_used_space
337    */
338   pagePtr.p->uncommitted_used_space = 0;
339   pagePtr.p->m_restart_seq = globalData.m_restart_seq;
340 
341   Extent_info key;
342   key.m_key.m_file_no = pagePtr.p->m_file_no;
343   key.m_key.m_page_idx = pagePtr.p->m_extent_no;
344   Ptr<Extent_info> extentPtr;
345   ndbrequire(c_extent_hash.find(extentPtr, key));
346   pagePtr.p->m_extent_info_ptr = extentPtr.i;
347 
348   Uint32 real_free = pagePtr.p->free_space;
349   const bool prealloc = estimate >= 0;
350   Uint32 estimated;
351   if (prealloc)
352   {
353     jam();
354     /**
355      * If this is during prealloc, use estimate from there
356      */
357     estimated = (Uint32)estimate;
358   }
359   else
360   {
361     jam();
362     /**
363      * else use the estimate based on the actual free space
364      */
365     estimated =alloc.calc_page_free_space(alloc.calc_page_free_bits(real_free));
366   }
367 
368 #ifdef VM_TRACE
369   {
370     Local_key page;
371     page.m_file_no = pagePtr.p->m_file_no;
372     page.m_page_no = pagePtr.p->m_page_no;
373 
374     D("Tablespace_client - restart_setup_page");
375     Tablespace_client tsman(0, this, c_tsman,
376 			    0, 0, 0);
377     unsigned uncommitted, committed;
378     uncommitted = committed = ~(unsigned)0;
379     (void) tsman.get_page_free_bits(&page, &uncommitted, &committed);
380     jamEntry();
381 
382     ddassert(alloc.calc_page_free_bits(real_free) == committed);
383     if (prealloc)
384     {
385       /**
386        * tsman.alloc_page sets the uncommitted-bits to MAX_FREE_LIST -1
387        *   to avoid page being preallocated several times
388        */
389       ddassert(uncommitted == MAX_FREE_LIST - 1);
390     }
391     else
392     {
393       ddassert(committed == uncommitted);
394     }
395   }
396 #endif
397 
398   ddassert(real_free >= estimated);
399 
400   if (real_free != estimated)
401   {
402     jam();
403     Uint32 delta = (real_free-estimated);
404     update_extent_pos(alloc, extentPtr, delta);
405   }
406 }
407 
408 /**
409  * - Page free bits -
410  * 0 = 00 - free - 100% free
411  * 1 = 01 - atleast 70% free, 70= pct_free + 2 * (100 - pct_free) / 3
412  * 2 = 10 - atleast 40% free, 40= pct_free + (100 - pct_free) / 3
413  * 3 = 11 - full - less than pct_free% free, pct_free=10%
414  *
415  */
416 
417 #define DBG_DISK 0
418 
419 int
disk_page_prealloc(Signal * signal,Ptr<Fragrecord> fragPtr,Local_key * key,Uint32 sz)420 Dbtup::disk_page_prealloc(Signal* signal,
421 			  Ptr<Fragrecord> fragPtr,
422 			  Local_key* key, Uint32 sz)
423 {
424   int err;
425   Uint32 i, ptrI;
426   Ptr<Page_request> req;
427   Fragrecord* fragPtrP = fragPtr.p;
428   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
429   Uint32 idx= alloc.calc_page_free_bits(sz);
430   D("Tablespace_client - disk_page_prealloc");
431   Tablespace_client tsman(signal, this, c_tsman,
432 			  fragPtrP->fragTableId,
433 			  fragPtrP->fragmentId,
434 			  fragPtrP->m_tablespace_id);
435 
436   if (DBG_DISK)
437     ndbout << "disk_page_prealloc";
438 
439   /**
440    * 1) search current dirty pages
441    */
442   for(i= 0; i <= idx; i++)
443   {
444     if (!alloc.m_dirty_pages[i].isEmpty())
445     {
446       ptrI= alloc.m_dirty_pages[i].firstItem;
447       Ptr<GlobalPage> gpage;
448       m_global_page_pool.getPtr(gpage, ptrI);
449 
450       PagePtr tmp;
451       tmp.i = gpage.i;
452       tmp.p = reinterpret_cast<Page*>(gpage.p);
453       disk_page_prealloc_dirty_page(alloc, tmp, i, sz);
454       key->m_page_no= tmp.p->m_page_no;
455       key->m_file_no= tmp.p->m_file_no;
456       if (DBG_DISK)
457 	ndbout << " found dirty page " << *key << endl;
458       jam();
459       return 0; // Page in memory
460     }
461   }
462 
463   /**
464    * Search outanding page requests
465    *   callback does not need to access page request again
466    *   as it's not the first request to this page
467    */
468   for(i= 0; i <= idx; i++)
469   {
470     if (!alloc.m_page_requests[i].isEmpty())
471     {
472       ptrI= alloc.m_page_requests[i].firstItem;
473       Ptr<Page_request> req;
474       c_page_request_pool.getPtr(req, ptrI);
475 
476       disk_page_prealloc_transit_page(alloc, req, i, sz);
477       * key = req.p->m_key;
478       if (DBG_DISK)
479 	ndbout << " found transit page " << *key << endl;
480       jam();
481       return 0;
482     }
483   }
484 
485   /**
486    * We need to request a page...
487    */
488   if (!c_page_request_pool.seize(req))
489   {
490     jam();
491     err= 1;
492     //XXX set error code
493     ndbout_c("no free request");
494     return -err;
495   }
496 
497   req.p->m_ref_count= 1;
498   req.p->m_frag_ptr_i= fragPtr.i;
499   req.p->m_uncommitted_used_space= sz;
500 
501   int pageBits; // received
502   Ptr<Extent_info> ext;
503   const Uint32 bits= alloc.calc_page_free_bits(sz); // required
504   bool found= false;
505 
506   /**
507    * Do we have a current extent
508    */
509   if ((ext.i= alloc.m_curr_extent_info_ptr_i) != RNIL)
510   {
511     jam();
512     c_extent_pool.getPtr(ext);
513     if ((pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits)) >= 0)
514     {
515       jamEntry();
516       found= true;
517     }
518     else
519     {
520       jamEntry();
521       /**
522        * The current extent is not in a free list
523        *   and since it couldn't accomadate the request
524        *   we put it on the free list
525        */
526       alloc.m_curr_extent_info_ptr_i = RNIL;
527       Uint32 pos= alloc.calc_extent_pos(ext.p);
528       ext.p->m_free_matrix_pos = pos;
529       Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[pos]);
530       list.add(ext);
531     }
532   }
533 
534   if (!found)
535   {
536     Uint32 pos;
537     if ((pos= alloc.find_extent(sz)) != RNIL)
538     {
539       jam();
540       Local_extent_info_list list(c_extent_pool, alloc.m_free_extents[pos]);
541       list.first(ext);
542       list.remove(ext);
543     }
544     else
545     {
546       jam();
547       /**
548        * We need to alloc an extent
549        */
550 #if NOT_YET_UNDO_ALLOC_EXTENT
551       Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id;
552 
553       err = c_lgman->alloc_log_space(logfile_group_id,
554 				     sizeof(Disk_undo::AllocExtent)>>2);
555       jamEntry();
556       if(unlikely(err))
557       {
558 	return -err;
559       }
560 #endif
561 
562       if (!c_extent_pool.seize(ext))
563       {
564 	jam();
565 	//XXX
566 	err= 2;
567 #if NOT_YET_UNDO_ALLOC_EXTENT
568 	c_lgman->free_log_space(logfile_group_id,
569 				sizeof(Disk_undo::AllocExtent)>>2);
570 #endif
571 	c_page_request_pool.release(req);
572 	ndbout_c("no free extent info");
573 	return -err;
574       }
575 
576       if ((err= tsman.alloc_extent(&ext.p->m_key)) < 0)
577       {
578 	jamEntry();
579 #if NOT_YET_UNDO_ALLOC_EXTENT
580 	c_lgman->free_log_space(logfile_group_id,
581 				sizeof(Disk_undo::AllocExtent)>>2);
582 #endif
583 	c_extent_pool.release(ext);
584 	c_page_request_pool.release(req);
585 	return err;
586       }
587 
588       int pages= err;
589 #if NOT_YET_UNDO_ALLOC_EXTENT
590       {
591 	/**
592 	 * Do something here
593 	 */
594 	{
595 	  Callback cb;
596 	  cb.m_callbackData= ext.i;
597 	  cb.m_callbackFunction =
598 	    safe_cast(&Dbtup::disk_page_alloc_extent_log_buffer_callback);
599 	  Uint32 sz= sizeof(Disk_undo::AllocExtent)>>2;
600 
601 	  Logfile_client lgman(this, c_lgman, logfile_group_id);
602 	  int res= lgman.get_log_buffer(signal, sz, &cb);
603           jamEntry();
604 	  switch(res){
605 	  case 0:
606 	    break;
607 	  case -1:
608 	    ndbrequire("NOT YET IMPLEMENTED" == 0);
609 	    break;
610 	  default:
611 	    execute(signal, cb, res);
612 	  }
613 	}
614       }
615 #endif
616 
617 #ifdef VM_TRACE
618       ndbout << "allocated " << pages << " pages: " << ext.p->m_key
619 	     << " table: " << fragPtr.p->fragTableId
620 	     << " fragment: " << fragPtr.p->fragmentId << endl;
621 #endif
622       ext.p->m_first_page_no = ext.p->m_key.m_page_no;
623       memset(ext.p->m_free_page_count, 0, sizeof(ext.p->m_free_page_count));
624       ext.p->m_free_space= alloc.m_page_free_bits_map[0] * pages;
625       ext.p->m_free_page_count[0]= pages; // All pages are "free"-est
626       ext.p->m_empty_page_no = 0;
627       c_extent_hash.add(ext);
628 
629       Local_fragment_extent_list list1(c_extent_pool, alloc.m_extent_list);
630       list1.add(ext);
631     }
632 
633     alloc.m_curr_extent_info_ptr_i= ext.i;
634     ext.p->m_free_matrix_pos= RNIL;
635     pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
636     jamEntry();
637     ddassert(pageBits >= 0);
638   }
639 
640   /**
641    * We have a page from an extent
642    */
643   *key= req.p->m_key= ext.p->m_key;
644 
645   if (DBG_DISK)
646     ndbout << " allocated page " << *key << endl;
647 
648   /**
649    * We don't know exact free space of page
650    *   but we know what page free bits it has.
651    *   compute free space based on them
652    */
653   Uint32 size= alloc.calc_page_free_space((Uint32)pageBits);
654 
655   ddassert(size >= sz);
656   req.p->m_original_estimated_free_space = size;
657 
658   Uint32 new_size = size - sz;   // Subtract alloc rec
659   Uint32 newPageBits= alloc.calc_page_free_bits(new_size);
660   if (newPageBits != (Uint32)pageBits)
661   {
662     jam();
663     ddassert(ext.p->m_free_page_count[pageBits] > 0);
664     ext.p->m_free_page_count[pageBits]--;
665     ext.p->m_free_page_count[newPageBits]++;
666   }
667   update_extent_pos(alloc, ext, -Int32(sz));
668 
669   // And put page request in correct free list
670   idx= alloc.calc_page_free_bits(new_size);
671   {
672     Local_page_request_list list(c_page_request_pool,
673 				 alloc.m_page_requests[idx]);
674 
675     list.add(req);
676   }
677   req.p->m_list_index= idx;
678   req.p->m_extent_info_ptr= ext.i;
679 
680   Page_cache_client::Request preq;
681   preq.m_page = *key;
682   preq.m_callback.m_callbackData= req.i;
683   preq.m_callback.m_callbackFunction =
684     safe_cast(&Dbtup::disk_page_prealloc_callback);
685 
686   int flags= Page_cache_client::ALLOC_REQ;
687   if (pageBits == 0)
688   {
689     jam();
690 
691     if (ext.p->m_first_page_no + ext.p->m_empty_page_no == key->m_page_no)
692     {
693       jam();
694       flags |= Page_cache_client::EMPTY_PAGE;
695       //ndbout << "EMPTY_PAGE " << ext.p->m_empty_page_no << " " << *key << endl;
696       ext.p->m_empty_page_no++;
697     }
698 
699     preq.m_callback.m_callbackFunction =
700       safe_cast(&Dbtup::disk_page_prealloc_initial_callback);
701   }
702 
703   Page_cache_client pgman(this, c_pgman);
704   int res= pgman.get_page(signal, preq, flags);
705   m_pgman_ptr = pgman.m_ptr;
706   jamEntry();
707   switch(res)
708   {
709   case 0:
710     jam();
711     break;
712   case -1:
713     ndbassert(false);
714     break;
715   default:
716     jam();
717     execute(signal, preq.m_callback, res); // run callback
718   }
719 
720   return res;
721 }
722 
723 void
disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,PagePtr pagePtr,Uint32 old_idx,Uint32 sz)724 Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
725 				     PagePtr pagePtr,
726 				     Uint32 old_idx, Uint32 sz)
727 {
728   jam();
729   ddassert(pagePtr.p->list_index == old_idx);
730 
731   Uint32 free= pagePtr.p->free_space;
732   Uint32 used= pagePtr.p->uncommitted_used_space + sz;
733   Uint32 ext= pagePtr.p->m_extent_info_ptr;
734 
735   ddassert(free >= used);
736   Ptr<Extent_info> extentPtr;
737   c_extent_pool.getPtr(extentPtr, ext);
738 
739   Uint32 new_idx= alloc.calc_page_free_bits(free - used);
740 
741   if (old_idx != new_idx)
742   {
743     jam();
744     disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
745   }
746 
747   pagePtr.p->uncommitted_used_space = used;
748   update_extent_pos(alloc, extentPtr, -Int32(sz));
749 }
750 
751 
752 void
disk_page_prealloc_transit_page(Disk_alloc_info & alloc,Ptr<Page_request> req,Uint32 old_idx,Uint32 sz)753 Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
754 				       Ptr<Page_request> req,
755 				       Uint32 old_idx, Uint32 sz)
756 {
757   jam();
758   ddassert(req.p->m_list_index == old_idx);
759 
760   Uint32 free= req.p->m_original_estimated_free_space;
761   Uint32 used= req.p->m_uncommitted_used_space + sz;
762   Uint32 ext= req.p->m_extent_info_ptr;
763 
764   Ptr<Extent_info> extentPtr;
765   c_extent_pool.getPtr(extentPtr, ext);
766 
767   ddassert(free >= used);
768   Uint32 new_idx= alloc.calc_page_free_bits(free - used);
769 
770   if (old_idx != new_idx)
771   {
772     jam();
773     disk_page_move_page_request(alloc, extentPtr, req, old_idx, new_idx);
774   }
775 
776   req.p->m_uncommitted_used_space = used;
777   update_extent_pos(alloc, extentPtr, -Int32(sz));
778 }
779 
780 void
disk_page_prealloc_callback(Signal * signal,Uint32 page_request,Uint32 page_id)781 Dbtup::disk_page_prealloc_callback(Signal* signal,
782 				   Uint32 page_request, Uint32 page_id)
783 {
784   jamEntry();
785   //ndbout_c("disk_alloc_page_callback id: %d", page_id);
786 
787   Ptr<Page_request> req;
788   c_page_request_pool.getPtr(req, page_request);
789 
790   Ptr<GlobalPage> gpage;
791   m_global_page_pool.getPtr(gpage, page_id);
792 
793   Ptr<Fragrecord> fragPtr;
794   fragPtr.i= req.p->m_frag_ptr_i;
795   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
796 
797   PagePtr pagePtr;
798   pagePtr.i = gpage.i;
799   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
800 
801   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
802   if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
803   {
804     jam();
805     D(V(pagePtr.p->m_restart_seq) << V(globalData.m_restart_seq));
806     restart_setup_page(alloc, pagePtr, req.p->m_original_estimated_free_space);
807   }
808 
809   Ptr<Extent_info> extentPtr;
810   c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
811 
812   pagePtr.p->uncommitted_used_space += req.p->m_uncommitted_used_space;
813   ddassert(pagePtr.p->free_space >= pagePtr.p->uncommitted_used_space);
814 
815   Uint32 free = pagePtr.p->free_space - pagePtr.p->uncommitted_used_space;
816   Uint32 idx = req.p->m_list_index;
817   Uint32 real_idx = alloc.calc_page_free_bits(free);
818 
819   if (idx != real_idx)
820   {
821     jam();
822     ddassert(extentPtr.p->m_free_page_count[idx]);
823     extentPtr.p->m_free_page_count[idx]--;
824     extentPtr.p->m_free_page_count[real_idx]++;
825     update_extent_pos(alloc, extentPtr, 0);
826   }
827 
828   {
829     /**
830      * add to dirty list
831      */
832     pagePtr.p->list_index = real_idx;
833     ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
834     LocalDLList<Page> list(* cheat_pool, alloc.m_dirty_pages[real_idx]);
835     list.add(pagePtr);
836   }
837 
838   {
839     /**
840      * release page request
841      */
842     Local_page_request_list list(c_page_request_pool,
843 				 alloc.m_page_requests[idx]);
844     list.release(req);
845   }
846 }
847 
848 void
disk_page_move_dirty_page(Disk_alloc_info & alloc,Ptr<Extent_info> extentPtr,Ptr<Page> pagePtr,Uint32 old_idx,Uint32 new_idx)849 Dbtup::disk_page_move_dirty_page(Disk_alloc_info& alloc,
850                                  Ptr<Extent_info> extentPtr,
851                                  Ptr<Page> pagePtr,
852                                  Uint32 old_idx,
853                                  Uint32 new_idx)
854 {
855   ddassert(extentPtr.p->m_free_page_count[old_idx]);
856   extentPtr.p->m_free_page_count[old_idx]--;
857   extentPtr.p->m_free_page_count[new_idx]++;
858 
859   ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
860   LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
861   LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
862   old_list.remove(pagePtr);
863   new_list.add(pagePtr);
864   pagePtr.p->list_index = new_idx;
865 }
866 
867 void
disk_page_move_page_request(Disk_alloc_info & alloc,Ptr<Extent_info> extentPtr,Ptr<Page_request> req,Uint32 old_idx,Uint32 new_idx)868 Dbtup::disk_page_move_page_request(Disk_alloc_info& alloc,
869                                    Ptr<Extent_info> extentPtr,
870                                    Ptr<Page_request> req,
871                                    Uint32 old_idx, Uint32 new_idx)
872 {
873   Page_request_list::Head *lists = alloc.m_page_requests;
874   Local_page_request_list old_list(c_page_request_pool, lists[old_idx]);
875   Local_page_request_list new_list(c_page_request_pool, lists[new_idx]);
876   old_list.remove(req);
877   new_list.add(req);
878 
879   ddassert(extentPtr.p->m_free_page_count[old_idx]);
880   extentPtr.p->m_free_page_count[old_idx]--;
881   extentPtr.p->m_free_page_count[new_idx]++;
882   req.p->m_list_index= new_idx;
883 }
884 
885 void
disk_page_prealloc_initial_callback(Signal * signal,Uint32 page_request,Uint32 page_id)886 Dbtup::disk_page_prealloc_initial_callback(Signal*signal,
887 					   Uint32 page_request,
888 					   Uint32 page_id)
889 {
890   jamEntry();
891   //ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);
892   /**
893    * 1) lookup page request
894    * 2) lookup page
895    * 3) lookup table
896    * 4) init page (according to page type)
897    * 5) call ordinary callback
898    */
899   Ptr<Page_request> req;
900   c_page_request_pool.getPtr(req, page_request);
901 
902   Ptr<GlobalPage> gpage;
903   m_global_page_pool.getPtr(gpage, page_id);
904   PagePtr pagePtr;
905   pagePtr.i = gpage.i;
906   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
907 
908   Ptr<Fragrecord> fragPtr;
909   fragPtr.i= req.p->m_frag_ptr_i;
910   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
911 
912   Ptr<Tablerec> tabPtr;
913   tabPtr.i = fragPtr.p->fragTableId;
914   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
915 
916   Ptr<Extent_info> extentPtr;
917   c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
918 
919   if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
920   {
921     convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
922   }
923   else
924   {
925     abort();
926   }
927 
928   pagePtr.p->m_page_no= req.p->m_key.m_page_no;
929   pagePtr.p->m_file_no= req.p->m_key.m_file_no;
930   pagePtr.p->m_table_id= fragPtr.p->fragTableId;
931   pagePtr.p->m_fragment_id = fragPtr.p->fragmentId;
932   pagePtr.p->m_extent_no = extentPtr.p->m_key.m_page_idx; // logical extent no
933   pagePtr.p->m_extent_info_ptr= req.p->m_extent_info_ptr;
934   pagePtr.p->m_restart_seq = globalData.m_restart_seq;
935   pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
936   pagePtr.p->list_index = req.p->m_list_index;
937   pagePtr.p->uncommitted_used_space = req.p->m_uncommitted_used_space;
938 
939   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
940   Uint32 idx = req.p->m_list_index;
941 
942   {
943     Uint32 free = pagePtr.p->free_space - pagePtr.p->uncommitted_used_space;
944     ddassert(idx == alloc.calc_page_free_bits(free));
945     ddassert(pagePtr.p->free_space == req.p->m_original_estimated_free_space);
946   }
947 
948   {
949     /**
950      * add to dirty list
951      */
952     ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
953     LocalDLList<Page> list(* cheat_pool, alloc.m_dirty_pages[idx]);
954     list.add(pagePtr);
955   }
956 
957   {
958     /**
959      * release page request
960      */
961     Local_page_request_list list(c_page_request_pool,
962 				 alloc.m_page_requests[idx]);
963     list.release(req);
964   }
965 }
966 
967 void
disk_page_set_dirty(PagePtr pagePtr)968 Dbtup::disk_page_set_dirty(PagePtr pagePtr)
969 {
970   jam();
971   Uint32 idx = pagePtr.p->list_index;
972   if ((pagePtr.p->m_restart_seq == globalData.m_restart_seq) &&
973       ((idx & 0x8000) == 0))
974   {
975     jam();
976     /**
977      * Already in dirty list
978      */
979     return ;
980   }
981 
982   Local_key key;
983   key.m_page_no = pagePtr.p->m_page_no;
984   key.m_file_no = pagePtr.p->m_file_no;
985 
986   pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
987 
988   if (DBG_DISK)
989     ndbout << " disk_page_set_dirty " << key << endl;
990 
991   Ptr<Tablerec> tabPtr;
992   tabPtr.i= pagePtr.p->m_table_id;
993   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
994 
995   Ptr<Fragrecord> fragPtr;
996   getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
997 
998   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
999 
1000   Uint32 free = pagePtr.p->free_space;
1001   Uint32 used = pagePtr.p->uncommitted_used_space;
1002   if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
1003   {
1004     jam();
1005     D(V(pagePtr.p->m_restart_seq) << V(globalData.m_restart_seq));
1006     restart_setup_page(alloc, pagePtr, -1);
1007     ndbassert(free == pagePtr.p->free_space);
1008     idx = alloc.calc_page_free_bits(free);
1009     used = 0;
1010   }
1011   else
1012   {
1013     jam();
1014     idx &= ~0x8000;
1015     ddassert(idx == alloc.calc_page_free_bits(free - used));
1016   }
1017 
1018   ddassert(free >= used);
1019 
1020   D("Tablespace_client - disk_page_set_dirty");
1021   Tablespace_client tsman(0, this, c_tsman,
1022 			  fragPtr.p->fragTableId,
1023 			  fragPtr.p->fragmentId,
1024 			  fragPtr.p->m_tablespace_id);
1025 
1026   pagePtr.p->list_index = idx;
1027   ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
1028   LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
1029   list.add(pagePtr);
1030 
1031   // Make sure no one will allocate it...
1032   tsman.unmap_page(&key, MAX_FREE_LIST - 1);
1033   jamEntry();
1034 }
1035 
1036 void
disk_page_unmap_callback(Uint32 when,Uint32 page_id,Uint32 dirty_count)1037 Dbtup::disk_page_unmap_callback(Uint32 when,
1038 				Uint32 page_id, Uint32 dirty_count)
1039 {
1040   jamEntry();
1041   Ptr<GlobalPage> gpage;
1042   m_global_page_pool.getPtr(gpage, page_id);
1043   PagePtr pagePtr;
1044   pagePtr.i = gpage.i;
1045   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1046 
1047   Uint32 type = pagePtr.p->m_page_header.m_page_type;
1048   if (unlikely((type != File_formats::PT_Tup_fixsize_page &&
1049 		type != File_formats::PT_Tup_varsize_page) ||
1050 	       f_undo_done == false))
1051   {
1052     jam();
1053     D("disk_page_unmap_callback" << V(type) << V(f_undo_done));
1054     return ;
1055   }
1056 
1057   Uint32 idx = pagePtr.p->list_index;
1058 
1059   Ptr<Tablerec> tabPtr;
1060   tabPtr.i= pagePtr.p->m_table_id;
1061   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1062 
1063   Ptr<Fragrecord> fragPtr;
1064   getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
1065 
1066   Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
1067 
1068   if (when == 0)
1069   {
1070     /**
1071      * Before pageout
1072      */
1073     jam();
1074 
1075     if (DBG_DISK)
1076     {
1077       Local_key key;
1078       key.m_page_no = pagePtr.p->m_page_no;
1079       key.m_file_no = pagePtr.p->m_file_no;
1080       ndbout << "disk_page_unmap_callback(before) " << key
1081 	     << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
1082     }
1083 
1084     ndbassert((idx & 0x8000) == 0);
1085 
1086     ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
1087     LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
1088     LocalDLList<Page> list2(*pool, alloc.m_unmap_pages);
1089     list.remove(pagePtr);
1090     list2.add(pagePtr);
1091 
1092     if (dirty_count == 0)
1093     {
1094       jam();
1095       pagePtr.p->list_index = idx | 0x8000;
1096 
1097       Local_key key;
1098       key.m_page_no = pagePtr.p->m_page_no;
1099       key.m_file_no = pagePtr.p->m_file_no;
1100 
1101       Uint32 free = pagePtr.p->free_space;
1102       Uint32 used = pagePtr.p->uncommitted_used_space;
1103       ddassert(free >= used);
1104       ddassert(alloc.calc_page_free_bits(free - used) == idx);
1105 
1106       D("Tablespace_client - disk_page_unmap_callback");
1107       Tablespace_client tsman(0, this, c_tsman,
1108 			      fragPtr.p->fragTableId,
1109 			      fragPtr.p->fragmentId,
1110 			      fragPtr.p->m_tablespace_id);
1111 
1112       tsman.unmap_page(&key, idx);
1113       jamEntry();
1114     }
1115   }
1116   else if (when == 1)
1117   {
1118     /**
1119      * After page out
1120      */
1121     jam();
1122 
1123     Local_key key;
1124     key.m_page_no = pagePtr.p->m_page_no;
1125     key.m_file_no = pagePtr.p->m_file_no;
1126     Uint32 real_free = pagePtr.p->free_space;
1127 
1128     if (DBG_DISK)
1129     {
1130       ndbout << "disk_page_unmap_callback(after) " << key
1131 	     << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
1132     }
1133 
1134     ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
1135     LocalDLList<Page> list(*pool, alloc.m_unmap_pages);
1136     list.remove(pagePtr);
1137 
1138     D("Tablespace_client - disk_page_unmap_callback");
1139     Tablespace_client tsman(0, this, c_tsman,
1140 			    fragPtr.p->fragTableId,
1141 			    fragPtr.p->fragmentId,
1142 			    fragPtr.p->m_tablespace_id);
1143 
1144     if (DBG_DISK && alloc.calc_page_free_bits(real_free) != (idx & ~0x8000))
1145     {
1146       ndbout << key
1147 	     << " calc: " << alloc.calc_page_free_bits(real_free)
1148 	     << " idx: " << (idx & ~0x8000)
1149 	     << endl;
1150     }
1151     tsman.update_page_free_bits(&key, alloc.calc_page_free_bits(real_free));
1152     jamEntry();
1153   }
1154 }
1155 
1156 void
disk_page_alloc(Signal * signal,Tablerec * tabPtrP,Fragrecord * fragPtrP,Local_key * key,PagePtr pagePtr,Uint32 gci)1157 Dbtup::disk_page_alloc(Signal* signal,
1158 		       Tablerec* tabPtrP, Fragrecord* fragPtrP,
1159 		       Local_key* key, PagePtr pagePtr, Uint32 gci)
1160 {
1161   jam();
1162   Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
1163   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
1164 
1165   Uint64 lsn;
1166   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
1167   {
1168     ddassert(pagePtr.p->uncommitted_used_space > 0);
1169     pagePtr.p->uncommitted_used_space--;
1170     key->m_page_idx= ((Fix_page*)pagePtr.p)->alloc_record();
1171     lsn= disk_page_undo_alloc(pagePtr.p, key, 1, gci, logfile_group_id);
1172   }
1173   else
1174   {
1175     Uint32 sz= key->m_page_idx;
1176     ddassert(pagePtr.p->uncommitted_used_space >= sz);
1177     pagePtr.p->uncommitted_used_space -= sz;
1178     key->m_page_idx= ((Var_page*)pagePtr.p)->
1179       alloc_record(sz, (Var_page*)ctemp_page, 0);
1180 
1181     lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
1182   }
1183 }
1184 
1185 void
disk_page_free(Signal * signal,Tablerec * tabPtrP,Fragrecord * fragPtrP,Local_key * key,PagePtr pagePtr,Uint32 gci)1186 Dbtup::disk_page_free(Signal *signal,
1187 		      Tablerec *tabPtrP, Fragrecord * fragPtrP,
1188 		      Local_key* key, PagePtr pagePtr, Uint32 gci)
1189 {
1190   jam();
1191   if (DBG_DISK)
1192     ndbout << " disk_page_free " << *key << endl;
1193 
1194   Uint32 page_idx= key->m_page_idx;
1195   Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
1196   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
1197   Uint32 old_free= pagePtr.p->free_space;
1198 
1199   Uint32 sz;
1200   Uint64 lsn;
1201   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
1202   {
1203     sz = 1;
1204     const Uint32 *src= ((Fix_page*)pagePtr.p)->get_ptr(page_idx, 0);
1205     ndbassert(* (src + 1) != Tup_fixsize_page::FREE_RECORD);
1206     lsn= disk_page_undo_free(pagePtr.p, key,
1207 			     src, tabPtrP->m_offsets[DD].m_fix_header_size,
1208 			     gci, logfile_group_id);
1209 
1210     ((Fix_page*)pagePtr.p)->free_record(page_idx);
1211   }
1212   else
1213   {
1214     const Uint32 *src= ((Var_page*)pagePtr.p)->get_ptr(page_idx);
1215     sz= ((Var_page*)pagePtr.p)->get_entry_len(page_idx);
1216     lsn= disk_page_undo_free(pagePtr.p, key,
1217 			     src, sz,
1218 			     gci, logfile_group_id);
1219 
1220     ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
1221   }
1222 
1223   Uint32 new_free = pagePtr.p->free_space;
1224 
1225   Uint32 ext = pagePtr.p->m_extent_info_ptr;
1226   Uint32 used = pagePtr.p->uncommitted_used_space;
1227   Uint32 old_idx = pagePtr.p->list_index;
1228   ddassert(old_free >= used);
1229   ddassert(new_free >= used);
1230   ddassert(new_free >= old_free);
1231   ddassert((old_idx & 0x8000) == 0);
1232 
1233   Uint32 new_idx = alloc.calc_page_free_bits(new_free - used);
1234   ddassert(alloc.calc_page_free_bits(old_free - used) == old_idx);
1235 
1236   Ptr<Extent_info> extentPtr;
1237   c_extent_pool.getPtr(extentPtr, ext);
1238 
1239   if (old_idx != new_idx)
1240   {
1241     jam();
1242     disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
1243   }
1244 
1245   update_extent_pos(alloc, extentPtr, sz);
1246 #if NOT_YET_FREE_EXTENT
1247   if (check_free(extentPtr.p) == 0)
1248   {
1249     ndbout_c("free: extent is free");
1250   }
1251 #endif
1252 }
1253 
1254 void
disk_page_abort_prealloc(Signal * signal,Fragrecord * fragPtrP,Local_key * key,Uint32 sz)1255 Dbtup::disk_page_abort_prealloc(Signal *signal, Fragrecord* fragPtrP,
1256 				Local_key* key, Uint32 sz)
1257 {
1258   jam();
1259 
1260   Page_cache_client::Request req;
1261   req.m_callback.m_callbackData= sz;
1262   req.m_callback.m_callbackFunction =
1263     safe_cast(&Dbtup::disk_page_abort_prealloc_callback);
1264 
1265   int flags= Page_cache_client::DIRTY_REQ;
1266   memcpy(&req.m_page, key, sizeof(Local_key));
1267 
1268   Page_cache_client pgman(this, c_pgman);
1269   int res= pgman.get_page(signal, req, flags);
1270   m_pgman_ptr = pgman.m_ptr;
1271   jamEntry();
1272   switch(res)
1273   {
1274   case 0:
1275     jam();
1276     break;
1277   case -1:
1278     ndbrequire(false);
1279     break;
1280   default:
1281     jam();
1282     Ptr<GlobalPage> gpage;
1283     m_global_page_pool.getPtr(gpage, (Uint32)res);
1284     PagePtr pagePtr;
1285     pagePtr.i = gpage.i;
1286     pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1287 
1288     disk_page_abort_prealloc_callback_1(signal, fragPtrP, pagePtr, sz);
1289   }
1290 }
1291 
1292 void
disk_page_abort_prealloc_callback(Signal * signal,Uint32 sz,Uint32 page_id)1293 Dbtup::disk_page_abort_prealloc_callback(Signal* signal,
1294 					 Uint32 sz, Uint32 page_id)
1295 {
1296   //ndbout_c("disk_alloc_page_callback id: %d", page_id);
1297   jamEntry();
1298   Ptr<GlobalPage> gpage;
1299   m_global_page_pool.getPtr(gpage, page_id);
1300 
1301   PagePtr pagePtr;
1302   pagePtr.i = gpage.i;
1303   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1304 
1305   Ptr<Tablerec> tabPtr;
1306   tabPtr.i= pagePtr.p->m_table_id;
1307   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1308 
1309   Ptr<Fragrecord> fragPtr;
1310   getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
1311 
1312   disk_page_abort_prealloc_callback_1(signal, fragPtr.p, pagePtr, sz);
1313 }
1314 
1315 void
disk_page_abort_prealloc_callback_1(Signal * signal,Fragrecord * fragPtrP,PagePtr pagePtr,Uint32 sz)1316 Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
1317 					   Fragrecord* fragPtrP,
1318 					   PagePtr pagePtr,
1319 					   Uint32 sz)
1320 {
1321   jam();
1322   disk_page_set_dirty(pagePtr);
1323 
1324   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
1325 
1326   Ptr<Extent_info> extentPtr;
1327   c_extent_pool.getPtr(extentPtr, pagePtr.p->m_extent_info_ptr);
1328 
1329   Uint32 idx = pagePtr.p->list_index & 0x7FFF;
1330   Uint32 used = pagePtr.p->uncommitted_used_space;
1331   Uint32 free = pagePtr.p->free_space;
1332 
1333   ddassert(free >= used);
1334   ddassert(used >= sz);
1335   ddassert(alloc.calc_page_free_bits(free - used) == idx);
1336 
1337   pagePtr.p->uncommitted_used_space = used - sz;
1338 
1339   Uint32 new_idx = alloc.calc_page_free_bits(free - used + sz);
1340 
1341   if (idx != new_idx)
1342   {
1343     jam();
1344     disk_page_move_dirty_page(alloc, extentPtr, pagePtr, idx, new_idx);
1345   }
1346 
1347   update_extent_pos(alloc, extentPtr, sz);
1348 #if NOT_YET_FREE_EXTENT
1349   if (check_free(extentPtr.p) == 0)
1350   {
1351     ndbout_c("abort: extent is free");
1352   }
1353 #endif
1354 }
1355 
1356 #if NOT_YET_UNDO_ALLOC_EXTENT
1357 void
disk_page_alloc_extent_log_buffer_callback(Signal * signal,Uint32 extentPtrI,Uint32 unused)1358 Dbtup::disk_page_alloc_extent_log_buffer_callback(Signal* signal,
1359 						  Uint32 extentPtrI,
1360 						  Uint32 unused)
1361 {
1362   Ptr<Extent_info> extentPtr;
1363   c_extent_pool.getPtr(extentPtr, extentPtrI);
1364 
1365   Local_key key = extentPtr.p->m_key;
1366   Tablespace_client2 tsman(signal, c_tsman, &key);
1367 
1368   Ptr<Tablerec> tabPtr;
1369   tabPtr.i= tsman.m_table_id;
1370   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1371 
1372   Ptr<Fragrecord> fragPtr;
1373   getFragmentrec(fragPtr, tsman.m_fragment_id, tabPtr.p);
1374 
1375   Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
1376 
1377   Disk_undo::AllocExtent alloc;
1378   alloc.m_table = tabPtr.i;
1379   alloc.m_fragment = tsman.m_fragment_id;
1380   alloc.m_page_no = key.m_page_no;
1381   alloc.m_file_no = key.m_file_no;
1382   alloc.m_type_length = (Disk_undo::UNDO_ALLOC_EXTENT<<16)|(sizeof(alloc)>> 2);
1383 
1384   Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
1385 
1386   Uint64 lsn= lgman.add_entry(c, 1);
1387 
1388   tsman.update_lsn(&key, lsn);
1389   jamEntry();
1390 }
1391 #endif
1392 
1393 Uint64
disk_page_undo_alloc(Page * page,const Local_key * key,Uint32 sz,Uint32 gci,Uint32 logfile_group_id)1394 Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
1395 			    Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
1396 {
1397   jam();
1398   D("Logfile_client - disk_page_undo_alloc");
1399   Logfile_client lgman(this, c_lgman, logfile_group_id);
1400 
1401   Disk_undo::Alloc alloc;
1402   alloc.m_type_length= (Disk_undo::UNDO_ALLOC << 16) | (sizeof(alloc) >> 2);
1403   alloc.m_page_no = key->m_page_no;
1404   alloc.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
1405 
1406   Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
1407 
1408   Uint64 lsn= lgman.add_entry(c, 1);
1409   jamEntry();
1410   Page_cache_client pgman(this, c_pgman);
1411   pgman.update_lsn(* key, lsn);
1412   jamEntry();
1413 
1414   return lsn;
1415 }
1416 
1417 Uint64
disk_page_undo_update(Page * page,const Local_key * key,const Uint32 * src,Uint32 sz,Uint32 gci,Uint32 logfile_group_id)1418 Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
1419 			     const Uint32* src, Uint32 sz,
1420 			     Uint32 gci, Uint32 logfile_group_id)
1421 {
1422   jam();
1423   D("Logfile_client - disk_page_undo_update");
1424   Logfile_client lgman(this, c_lgman, logfile_group_id);
1425 
1426   Disk_undo::Update update;
1427   update.m_page_no = key->m_page_no;
1428   update.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
1429   update.m_gci= gci;
1430 
1431   update.m_type_length=
1432     (Disk_undo::UNDO_UPDATE << 16) | (sz + (sizeof(update) >> 2) - 1);
1433 
1434   Logfile_client::Change c[3] = {
1435     { &update, 3 },
1436     { src, sz },
1437     { &update.m_type_length, 1 }
1438   };
1439 
1440   ndbassert(4*(3 + sz + 1) == (sizeof(update) + 4*sz - 4));
1441 
1442   Uint64 lsn= lgman.add_entry(c, 3);
1443   jamEntry();
1444   Page_cache_client pgman(this, c_pgman);
1445   pgman.update_lsn(* key, lsn);
1446   jamEntry();
1447 
1448   return lsn;
1449 }
1450 
1451 Uint64
disk_page_undo_free(Page * page,const Local_key * key,const Uint32 * src,Uint32 sz,Uint32 gci,Uint32 logfile_group_id)1452 Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
1453 			   const Uint32* src, Uint32 sz,
1454 			   Uint32 gci, Uint32 logfile_group_id)
1455 {
1456   jam();
1457   D("Logfile_client - disk_page_undo_free");
1458   Logfile_client lgman(this, c_lgman, logfile_group_id);
1459 
1460   Disk_undo::Free free;
1461   free.m_page_no = key->m_page_no;
1462   free.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
1463   free.m_gci= gci;
1464 
1465   free.m_type_length=
1466     (Disk_undo::UNDO_FREE << 16) | (sz + (sizeof(free) >> 2) - 1);
1467 
1468   Logfile_client::Change c[3] = {
1469     { &free, 3 },
1470     { src, sz },
1471     { &free.m_type_length, 1 }
1472   };
1473 
1474   ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4));
1475 
1476   Uint64 lsn= lgman.add_entry(c, 3);
1477   jamEntry();
1478   Page_cache_client pgman(this, c_pgman);
1479   pgman.update_lsn(* key, lsn);
1480   jamEntry();
1481 
1482   return lsn;
1483 }
1484 
1485 #include <signaldata/LgmanContinueB.hpp>
1486 
1487 #define DBG_UNDO 0
1488 
1489 void
disk_restart_undo(Signal * signal,Uint64 lsn,Uint32 type,const Uint32 * ptr,Uint32 len)1490 Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
1491 			 Uint32 type, const Uint32 * ptr, Uint32 len)
1492 {
1493   f_undo_done = false;
1494   f_undo.m_lsn= lsn;
1495   f_undo.m_ptr= ptr;
1496   f_undo.m_len= len;
1497   f_undo.m_type = type;
1498 
1499   Page_cache_client::Request preq;
1500   switch(f_undo.m_type){
1501   case File_formats::Undofile::UNDO_LCP_FIRST:
1502   case File_formats::Undofile::UNDO_LCP:
1503   {
1504     jam();
1505     ndbrequire(len == 3);
1506     Uint32 lcp = ptr[0];
1507     Uint32 tableId = ptr[1] >> 16;
1508     Uint32 fragId = ptr[1] & 0xFFFF;
1509     disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP, lcp);
1510     if (!isNdbMtLqh())
1511       disk_restart_undo_next(signal);
1512 
1513     if (DBG_UNDO)
1514     {
1515       ndbout_c("UNDO LCP %u (%u, %u)", lcp, tableId, fragId);
1516     }
1517     return;
1518   }
1519   case File_formats::Undofile::UNDO_TUP_ALLOC:
1520   {
1521     jam();
1522     Disk_undo::Alloc* rec= (Disk_undo::Alloc*)ptr;
1523     preq.m_page.m_page_no = rec->m_page_no;
1524     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
1525     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
1526     break;
1527   }
1528   case File_formats::Undofile::UNDO_TUP_UPDATE:
1529   {
1530     jam();
1531     Disk_undo::Update* rec= (Disk_undo::Update*)ptr;
1532     preq.m_page.m_page_no = rec->m_page_no;
1533     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
1534     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
1535     break;
1536   }
1537   case File_formats::Undofile::UNDO_TUP_FREE:
1538   {
1539     jam();
1540     Disk_undo::Free* rec= (Disk_undo::Free*)ptr;
1541     preq.m_page.m_page_no = rec->m_page_no;
1542     preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
1543     preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
1544     break;
1545   }
1546   case File_formats::Undofile::UNDO_TUP_CREATE:
1547     /**
1548      *
1549      */
1550   {
1551     jam();
1552     Disk_undo::Create* rec= (Disk_undo::Create*)ptr;
1553     Ptr<Tablerec> tabPtr;
1554     tabPtr.i= rec->m_table;
1555     ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1556     for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
1557       if (tabPtr.p->fragrec[i] != RNIL)
1558 	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i],
1559 			      Fragrecord::UC_CREATE, 0);
1560     if (!isNdbMtLqh())
1561       disk_restart_undo_next(signal);
1562 
1563     if (DBG_UNDO)
1564     {
1565       ndbout_c("UNDO CREATE (%u)", tabPtr.i);
1566     }
1567     return;
1568   }
1569   case File_formats::Undofile::UNDO_TUP_DROP:
1570   {
1571     jam();
1572     Disk_undo::Drop* rec = (Disk_undo::Drop*)ptr;
1573     Ptr<Tablerec> tabPtr;
1574     tabPtr.i= rec->m_table;
1575     ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1576     for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
1577       if (tabPtr.p->fragrec[i] != RNIL)
1578 	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i],
1579 			      Fragrecord::UC_CREATE, 0);
1580     if (!isNdbMtLqh())
1581       disk_restart_undo_next(signal);
1582 
1583     if (DBG_UNDO)
1584     {
1585       ndbout_c("UNDO DROP (%u)", tabPtr.i);
1586     }
1587     return;
1588   }
1589   case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
1590     jam();
1591   case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
1592     jam();
1593     disk_restart_undo_next(signal);
1594     return;
1595 
1596   case File_formats::Undofile::UNDO_END:
1597     jam();
1598     f_undo_done = true;
1599     return;
1600   default:
1601     ndbrequire(false);
1602   }
1603 
1604   f_undo.m_key = preq.m_page;
1605   preq.m_callback.m_callbackFunction =
1606     safe_cast(&Dbtup::disk_restart_undo_callback);
1607 
1608   int flags = 0;
1609   Page_cache_client pgman(this, c_pgman);
1610   int res= pgman.get_page(signal, preq, flags);
1611   m_pgman_ptr = pgman.m_ptr;
1612   jamEntry();
1613   switch(res)
1614   {
1615   case 0:
1616     break; // Wait for callback
1617   case -1:
1618     ndbrequire(false);
1619     break;
1620   default:
1621     execute(signal, preq.m_callback, res); // run callback
1622   }
1623 }
1624 
1625 void
disk_restart_undo_next(Signal * signal)1626 Dbtup::disk_restart_undo_next(Signal* signal)
1627 {
1628   signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
1629   sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
1630 }
1631 
1632 void
disk_restart_lcp_id(Uint32 tableId,Uint32 fragId,Uint32 lcpId)1633 Dbtup::disk_restart_lcp_id(Uint32 tableId, Uint32 fragId, Uint32 lcpId)
1634 {
1635   jamEntry();
1636 
1637   if (lcpId == RNIL)
1638   {
1639     disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_CREATE, 0);
1640     if (DBG_UNDO)
1641     {
1642       ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
1643     }
1644   }
1645   else
1646   {
1647     disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_SET_LCP, lcpId);
1648     if (DBG_UNDO)
1649     {
1650       ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
1651     }
1652 
1653   }
1654 }
1655 
1656 void
disk_restart_undo_lcp(Uint32 tableId,Uint32 fragId,Uint32 flag,Uint32 lcpId)1657 Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag,
1658 			     Uint32 lcpId)
1659 {
1660   Ptr<Tablerec> tabPtr;
1661   tabPtr.i= tableId;
1662   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1663 
1664   if (tabPtr.p->tableStatus == DEFINED && tabPtr.p->m_no_of_disk_attributes)
1665   {
1666     jam();
1667     FragrecordPtr fragPtr;
1668     getFragmentrec(fragPtr, fragId, tabPtr.p);
1669     if (!fragPtr.isNull())
1670     {
1671       jam();
1672       switch(flag){
1673       case Fragrecord::UC_CREATE:
1674 	jam();
1675 	fragPtr.p->m_undo_complete |= flag;
1676 	return;
1677       case Fragrecord::UC_LCP:
1678 	jam();
1679 	if (fragPtr.p->m_undo_complete == 0 &&
1680 	    fragPtr.p->m_restore_lcp_id == lcpId)
1681 	{
1682 	  jam();
1683 	  fragPtr.p->m_undo_complete |= flag;
1684 	  if (DBG_UNDO)
1685             ndbout_c("table: %u fragment: %u lcp: %u -> done",
1686 		     tableId, fragId, lcpId);
1687 	}
1688 	return;
1689       case Fragrecord::UC_SET_LCP:
1690       {
1691 	jam();
1692 	if (DBG_UNDO)
1693            ndbout_c("table: %u fragment: %u restore to lcp: %u",
1694 		    tableId, fragId, lcpId);
1695 	ndbrequire(fragPtr.p->m_undo_complete == 0);
1696 	ndbrequire(fragPtr.p->m_restore_lcp_id == RNIL);
1697 	fragPtr.p->m_restore_lcp_id = lcpId;
1698 	return;
1699       }
1700       }
1701       jamLine(flag);
1702       ndbrequire(false);
1703     }
1704   }
1705 }
1706 
1707 void
disk_restart_undo_callback(Signal * signal,Uint32 id,Uint32 page_id)1708 Dbtup::disk_restart_undo_callback(Signal* signal,
1709 				  Uint32 id,
1710 				  Uint32 page_id)
1711 {
1712   jamEntry();
1713   Ptr<GlobalPage> gpage;
1714   m_global_page_pool.getPtr(gpage, page_id);
1715   PagePtr pagePtr;
1716   pagePtr.i = gpage.i;
1717   pagePtr.p = reinterpret_cast<Page*>(gpage.p);
1718 
1719   Apply_undo* undo = &f_undo;
1720 
1721   bool update = false;
1722   if (! (pagePtr.p->list_index & 0x8000) ||
1723       pagePtr.p->nextList != RNIL ||
1724       pagePtr.p->prevList != RNIL)
1725   {
1726     jam();
1727     update = true;
1728     pagePtr.p->list_index |= 0x8000;
1729     pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
1730   }
1731 
1732   Uint32 tableId= pagePtr.p->m_table_id;
1733   Uint32 fragId = pagePtr.p->m_fragment_id;
1734 
1735   if (tableId >= cnoOfTablerec)
1736   {
1737     jam();
1738     if (DBG_UNDO)
1739       ndbout_c("UNDO table> %u", tableId);
1740     disk_restart_undo_next(signal);
1741     return;
1742   }
1743   undo->m_table_ptr.i = tableId;
1744   ptrCheckGuard(undo->m_table_ptr, cnoOfTablerec, tablerec);
1745 
1746   if (! (undo->m_table_ptr.p->tableStatus == DEFINED &&
1747          undo->m_table_ptr.p->m_no_of_disk_attributes))
1748   {
1749     jam();
1750     if (DBG_UNDO)
1751       ndbout_c("UNDO !defined (%u) ", tableId);
1752     disk_restart_undo_next(signal);
1753     return;
1754   }
1755 
1756   getFragmentrec(undo->m_fragment_ptr, fragId, undo->m_table_ptr.p);
1757   if(undo->m_fragment_ptr.isNull())
1758   {
1759     jam();
1760     if (DBG_UNDO)
1761       ndbout_c("UNDO fragment null %u/%u", tableId, fragId);
1762     disk_restart_undo_next(signal);
1763     return;
1764   }
1765 
1766   if (undo->m_fragment_ptr.p->m_undo_complete)
1767   {
1768     jam();
1769     if (DBG_UNDO)
1770       ndbout_c("UNDO undo complete %u/%u", tableId, fragId);
1771     disk_restart_undo_next(signal);
1772     return;
1773   }
1774 
1775   Local_key key = undo->m_key;
1776 //  key.m_page_no = pagePtr.p->m_page_no;
1777 //  key.m_file_no = pagePtr.p->m_file_no;
1778 
1779   Uint64 lsn = 0;
1780   lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
1781   lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
1782 
1783   undo->m_page_ptr = pagePtr;
1784 
1785   if (undo->m_lsn <= lsn)
1786   {
1787     jam();
1788     if (DBG_UNDO)
1789     {
1790       ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )"
1791 	     << key << " type: " << undo->m_type << endl;
1792     }
1793 
1794     update = true;
1795     if (DBG_UNDO)
1796       ndbout_c("applying %lld", undo->m_lsn);
1797     /**
1798      * Apply undo record
1799      */
1800     switch(undo->m_type){
1801     case File_formats::Undofile::UNDO_TUP_ALLOC:
1802       jam();
1803       disk_restart_undo_alloc(undo);
1804       break;
1805     case File_formats::Undofile::UNDO_TUP_UPDATE:
1806       jam();
1807       disk_restart_undo_update(undo);
1808       break;
1809     case File_formats::Undofile::UNDO_TUP_FREE:
1810       jam();
1811       disk_restart_undo_free(undo);
1812       break;
1813     default:
1814       ndbrequire(false);
1815     }
1816 
1817     if (DBG_UNDO)
1818       ndbout << "disk_restart_undo: " << undo->m_type << " "
1819 	     << undo->m_key << endl;
1820 
1821     lsn = undo->m_lsn - 1; // make sure undo isn't run again...
1822 
1823     Page_cache_client pgman(this, c_pgman);
1824     pgman.update_lsn(undo->m_key, lsn);
1825     jamEntry();
1826 
1827     disk_restart_undo_page_bits(signal, undo);
1828   }
1829   else if (DBG_UNDO)
1830   {
1831     jam();
1832     ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )"
1833 	   << key << " type: " << undo->m_type
1834 	   << " tab: " << tableId << endl;
1835   }
1836 
1837   disk_restart_undo_next(signal);
1838 }
1839 
1840 void
disk_restart_undo_alloc(Apply_undo * undo)1841 Dbtup::disk_restart_undo_alloc(Apply_undo* undo)
1842 {
1843   ndbassert(undo->m_page_ptr.p->m_file_no == undo->m_key.m_file_no);
1844   ndbassert(undo->m_page_ptr.p->m_page_no == undo->m_key.m_page_no);
1845   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
1846   {
1847     ((Fix_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx);
1848   }
1849   else
1850     ((Var_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx, 0);
1851 }
1852 
1853 void
disk_restart_undo_update(Apply_undo * undo)1854 Dbtup::disk_restart_undo_update(Apply_undo* undo)
1855 {
1856   Uint32* ptr;
1857   Uint32 len= undo->m_len - 4;
1858   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
1859   {
1860     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx, len);
1861     ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
1862   }
1863   else
1864   {
1865     ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
1866     abort();
1867   }
1868 
1869   const Disk_undo::Update *update = (const Disk_undo::Update*)undo->m_ptr;
1870   const Uint32* src= update->m_data;
1871   memcpy(ptr, src, 4 * len);
1872 }
1873 
1874 void
disk_restart_undo_free(Apply_undo * undo)1875 Dbtup::disk_restart_undo_free(Apply_undo* undo)
1876 {
1877   Uint32* ptr, idx = undo->m_key.m_page_idx;
1878   Uint32 len= undo->m_len - 4;
1879   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
1880   {
1881     ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
1882     idx= ((Fix_page*)undo->m_page_ptr.p)->alloc_record(idx);
1883     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(idx, len);
1884   }
1885   else
1886   {
1887     abort();
1888   }
1889 
1890   ndbrequire(idx == undo->m_key.m_page_idx);
1891   const Disk_undo::Free *free = (const Disk_undo::Free*)undo->m_ptr;
1892   const Uint32* src= free->m_data;
1893   memcpy(ptr, src, 4 * len);
1894 }
1895 
1896 void
disk_restart_undo_page_bits(Signal * signal,Apply_undo * undo)1897 Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
1898 {
1899   Fragrecord* fragPtrP = undo->m_fragment_ptr.p;
1900   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
1901 
1902   /**
1903    * Set alloc.m_curr_extent_info_ptr_i to
1904    *   current this extent (and move old extend into free matrix)
1905    */
1906   Page* pageP = undo->m_page_ptr.p;
1907   Uint32 free = pageP->free_space;
1908   Uint32 new_bits = alloc.calc_page_free_bits(free);
1909   pageP->list_index = 0x8000 | new_bits;
1910 
1911   D("Tablespace_client - disk_restart_undo_page_bits");
1912   Tablespace_client tsman(signal, this, c_tsman,
1913 			  fragPtrP->fragTableId,
1914 			  fragPtrP->fragmentId,
1915 			  fragPtrP->m_tablespace_id);
1916 
1917   tsman.restart_undo_page_free_bits(&undo->m_key, new_bits);
1918   jamEntry();
1919 }
1920 
1921 int
disk_restart_alloc_extent(Uint32 tableId,Uint32 fragId,const Local_key * key,Uint32 pages)1922 Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
1923 				 const Local_key* key, Uint32 pages)
1924 {
1925   TablerecPtr tabPtr;
1926   FragrecordPtr fragPtr;
1927   tabPtr.i = tableId;
1928   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1929   if (tabPtr.p->tableStatus == DEFINED && tabPtr.p->m_no_of_disk_attributes)
1930   {
1931     getFragmentrec(fragPtr, fragId, tabPtr.p);
1932 
1933     if (!fragPtr.isNull())
1934     {
1935       jam();
1936 
1937       if (fragPtr.p->m_undo_complete & Fragrecord::UC_CREATE)
1938       {
1939         jam();
1940         return -1;
1941       }
1942 
1943       Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
1944 
1945       Ptr<Extent_info> ext;
1946       ndbrequire(c_extent_pool.seize(ext));
1947 #ifdef VM_TRACE
1948       ndbout << "allocated " << pages << " pages: " << *key
1949 	     << " table: " << tabPtr.i << " fragment: " << fragId << endl;
1950 #endif
1951       ext.p->m_key = *key;
1952       ext.p->m_first_page_no = ext.p->m_key.m_page_no;
1953       ext.p->m_free_space= 0;
1954       ext.p->m_empty_page_no = (1 << 16); // We don't know, so assume none
1955       memset(ext.p->m_free_page_count, 0, sizeof(ext.p->m_free_page_count));
1956 
1957       if (alloc.m_curr_extent_info_ptr_i != RNIL)
1958       {
1959 	jam();
1960 	Ptr<Extent_info> old;
1961 	c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
1962 	ndbassert(old.p->m_free_matrix_pos == RNIL);
1963 	Uint32 pos= alloc.calc_extent_pos(old.p);
1964 	Local_extent_info_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
1965 	new_list.add(old);
1966 	old.p->m_free_matrix_pos= pos;
1967       }
1968 
1969       alloc.m_curr_extent_info_ptr_i = ext.i;
1970       ext.p->m_free_matrix_pos = RNIL;
1971       c_extent_hash.add(ext);
1972 
1973       Local_fragment_extent_list list1(c_extent_pool, alloc.m_extent_list);
1974       list1.add(ext);
1975       return 0;
1976     }
1977   }
1978 
1979   return -1;
1980 }
1981 
1982 void
disk_restart_page_bits(Uint32 tableId,Uint32 fragId,const Local_key * key,Uint32 bits)1983 Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
1984 			      const Local_key* key, Uint32 bits)
1985 {
1986   jam();
1987   TablerecPtr tabPtr;
1988   FragrecordPtr fragPtr;
1989   tabPtr.i = tableId;
1990   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
1991   if (tabPtr.p->tableStatus == DEFINED && tabPtr.p->m_no_of_disk_attributes)
1992   {
1993     jam();
1994     getFragmentrec(fragPtr, fragId, tabPtr.p);
1995     Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
1996 
1997     Ptr<Extent_info> ext;
1998     c_extent_pool.getPtr(ext, alloc.m_curr_extent_info_ptr_i);
1999 
2000     Uint32 size= alloc.calc_page_free_space(bits);
2001 
2002     ext.p->m_free_page_count[bits]++;
2003     update_extent_pos(alloc, ext, size); // actually only to update free_space
2004     ndbassert(ext.p->m_free_matrix_pos == RNIL);
2005   }
2006 }
2007 
2008 void
disk_page_get_allocated(const Tablerec * tabPtrP,const Fragrecord * fragPtrP,Uint64 res[2])2009 Dbtup::disk_page_get_allocated(const Tablerec* tabPtrP,
2010                                const Fragrecord * fragPtrP,
2011                                Uint64 res[2])
2012 {
2013   res[0] = res[1] = 0;
2014   if (tabPtrP->m_no_of_disk_attributes)
2015   {
2016     jam();
2017     const Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
2018     Uint64 cnt = 0;
2019     Uint64 free = 0;
2020 
2021     {
2022       Disk_alloc_info& tmp = const_cast<Disk_alloc_info&>(alloc);
2023       Local_fragment_extent_list list(c_extent_pool, tmp.m_extent_list);
2024       Ptr<Extent_info> extentPtr;
2025       for (list.first(extentPtr); !extentPtr.isNull(); list.next(extentPtr))
2026       {
2027         cnt++;
2028         free += extentPtr.p->m_free_space;
2029       }
2030     }
2031     res[0] = cnt * alloc.m_extent_size * File_formats::NDB_PAGE_SIZE;
2032     res[1] = free * 4 * tabPtrP->m_offsets[DD].m_fix_header_size;
2033   }
2034 }
2035