1 /*
2    Copyright (c) 2005, 2014, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUP_C
26 #define DBTUP_VAR_ALLOC_CPP
27 #include "Dbtup.hpp"
28 
29 #define JAM_FILE_ID 405
30 
31 
init_list_sizes(void)32 void Dbtup::init_list_sizes(void)
33 {
34   c_min_list_size[0]= 200;
35   c_max_list_size[0]= 499;
36 
37   c_min_list_size[1]= 500;
38   c_max_list_size[1]= 999;
39 
40   c_min_list_size[2]= 1000;
41   c_max_list_size[2]= 4079;
42 
43   c_min_list_size[3]= 4080;
44   c_max_list_size[3]= 8159;
45 
46   c_min_list_size[4]= 0;
47   c_max_list_size[4]= 199;
48 }
49 
50 /*
51   Allocator for variable sized segments
52   Part of the external interface for variable sized segments
53 
54   This method is used to allocate and free variable sized tuples and
55   parts of tuples. This part can be used to implement variable sized
56   attributes without wasting memory. It can be used to support small
57   BLOB's attached to the record. It can also be used to support adding
58   and dropping attributes without the need to copy the entire table.
59 
60   SYNOPSIS
61     fragPtr         A pointer to the fragment description
62     tabPtr          A pointer to the table description
63     alloc_size       Size of the allocated record
64     signal           The signal object to be used if a signal needs to
65                      be sent
66   RETURN VALUES
67     Returns true if allocation was successful otherwise false
68 
69     page_offset      Page offset of allocated record
70     page_index       Page index of allocated record
71     page_ptr         The i and p value of the page where the record was
72                      allocated
73 */
alloc_var_rec(Uint32 * err,Fragrecord * fragPtr,Tablerec * tabPtr,Uint32 alloc_size,Local_key * key,Uint32 * out_frag_page_id)74 Uint32* Dbtup::alloc_var_rec(Uint32 * err,
75                              Fragrecord* fragPtr,
76 			     Tablerec* tabPtr,
77 			     Uint32 alloc_size,
78 			     Local_key* key,
79 			     Uint32 * out_frag_page_id)
80 {
81   /**
82    * TODO alloc fix+var part
83    */
84   Uint32 *ptr = alloc_fix_rec(jamBuffer(), err, fragPtr, tabPtr, key,
85                               out_frag_page_id);
86   if (unlikely(ptr == 0))
87   {
88     return 0;
89   }
90 
91   Local_key varref;
92   Tuple_header* tuple = (Tuple_header*)ptr;
93   Var_part_ref* dst = tuple->get_var_part_ref_ptr(tabPtr);
94   if (alloc_size)
95   {
96     if (likely(alloc_var_part(err, fragPtr, tabPtr, alloc_size, &varref) != 0))
97     {
98       dst->assign(&varref);
99       return ptr;
100     }
101   }
102   else
103   {
104     varref.m_page_no = RNIL;
105     dst->assign(&varref);
106     return ptr;
107   }
108 
109   PagePtr pagePtr;
110   c_page_pool.getPtr(pagePtr, key->m_page_no);
111   free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p);
112   return 0;
113 }
114 
115 Uint32*
alloc_var_part(Uint32 * err,Fragrecord * fragPtr,Tablerec * tabPtr,Uint32 alloc_size,Local_key * key)116 Dbtup::alloc_var_part(Uint32 * err,
117                       Fragrecord* fragPtr,
118 		      Tablerec* tabPtr,
119 		      Uint32 alloc_size,
120 		      Local_key* key)
121 {
122   PagePtr pagePtr;
123   pagePtr.i= get_alloc_page(fragPtr, (alloc_size + 1));
124   if (pagePtr.i == RNIL) {
125     jam();
126     if ((pagePtr.i= get_empty_var_page(fragPtr)) == RNIL) {
127       jam();
128       * err = ZMEM_NOMEM_ERROR;
129       return 0;
130     }
131     c_page_pool.getPtr(pagePtr);
132     ((Var_page*)pagePtr.p)->init();
133     fragPtr->m_varWordsFree += ((Var_page*)pagePtr.p)->free_space;
134     pagePtr.p->list_index = MAX_FREE_LIST - 1;
135     LocalDLList<Page> list(c_page_pool,
136 			   fragPtr->free_var_page_array[MAX_FREE_LIST-1]);
137     list.addFirst(pagePtr);
138   } else {
139     c_page_pool.getPtr(pagePtr);
140     jam();
141   }
142   /*
143     First we remove the current free space on this page from fragment total.
144     Then we calculate a new free space value for the page. Finally we call
145     update_free_page_list() which adds this new value to the fragment total.
146   */
147   ndbassert(fragPtr->m_varWordsFree >= ((Var_page*)pagePtr.p)->free_space);
148   fragPtr->m_varWordsFree -= ((Var_page*)pagePtr.p)->free_space;
149   Uint32 idx= ((Var_page*)pagePtr.p)
150     ->alloc_record(alloc_size, (Var_page*)ctemp_page, Var_page::CHAIN);
151 
152   fragPtr->m_varElemCount++;
153   key->m_page_no = pagePtr.i;
154   key->m_page_idx = idx;
155 
156   update_free_page_list(fragPtr, pagePtr);
157   return ((Var_page*)pagePtr.p)->get_ptr(idx);
158 }
159 
160 /*
161   free_var_part is used to free the variable length storage associated
162   with the passed local key.
163   It is not assumed that there is a corresponding fixed-length part.
164   // TODO : Any need for tabPtr?
165 */
free_var_part(Fragrecord * fragPtr,Tablerec * tabPtr,Local_key * key)166 void Dbtup::free_var_part(Fragrecord* fragPtr,
167                           Tablerec* tabPtr,
168                           Local_key* key)
169 {
170   Ptr<Page> pagePtr;
171   if (key->m_page_no != RNIL)
172   {
173     c_page_pool.getPtr(pagePtr, key->m_page_no);
174     ndbassert(fragPtr->m_varWordsFree >= ((Var_page*)pagePtr.p)->free_space);
175     fragPtr->m_varWordsFree -= ((Var_page*)pagePtr.p)->free_space;
176     ((Var_page*)pagePtr.p)->free_record(key->m_page_idx, Var_page::CHAIN);
177     ndbassert(fragPtr->m_varElemCount > 0);
178     fragPtr->m_varElemCount--;
179 
180     ndbassert(pagePtr.p->free_space <= Var_page::DATA_WORDS);
181     if (pagePtr.p->free_space == Var_page::DATA_WORDS - 1)
182     {
183       jam();
184       Uint32 idx = pagePtr.p->list_index;
185       LocalDLList<Page> list(c_page_pool, fragPtr->free_var_page_array[idx]);
186       list.remove(pagePtr);
187       returnCommonArea(pagePtr.i, 1);
188       fragPtr->noOfVarPages --;
189     } else {
190       jam();
191       // Adds the new free space value for the page to the fragment total.
192       update_free_page_list(fragPtr, pagePtr);
193     }
194     ndbassert(fragPtr->verifyVarSpace());
195   }
196   return;
197 }
198 
199 /*
200   Deallocator for variable sized segments
201   Part of the external interface for variable sized segments
202 
203   SYNOPSIS
204     fragPtr         A pointer to the fragment description
205     tabPtr          A pointer to the table description
206     signal           The signal object to be used if a signal needs to
207                      be sent
208     page_ptr         A reference to the page of the variable sized
209                      segment
210     free_page_index  Page index on page of variable sized segment
211                      which is freed
212   RETURN VALUES
213     Returns true if deallocation was successful otherwise false
214 */
free_var_rec(Fragrecord * fragPtr,Tablerec * tabPtr,Local_key * key,Ptr<Page> pagePtr)215 void Dbtup::free_var_rec(Fragrecord* fragPtr,
216 			 Tablerec* tabPtr,
217 			 Local_key* key,
218 			 Ptr<Page> pagePtr)
219 {
220   /**
221    * TODO free fix + var part
222    */
223   Uint32 *ptr = ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
224   Tuple_header* tuple = (Tuple_header*)ptr;
225 
226   Local_key ref;
227   Var_part_ref * varref = tuple->get_var_part_ref_ptr(tabPtr);
228   varref->copyout(&ref);
229 
230   free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p);
231 
232   if (ref.m_page_no != RNIL)
233   {
234     jam();
235     c_page_pool.getPtr(pagePtr, ref.m_page_no);
236     free_var_part(fragPtr, pagePtr, ref.m_page_idx);
237   }
238   return;
239 }
240 
241 void
free_var_part(Fragrecord * fragPtr,PagePtr pagePtr,Uint32 page_idx)242 Dbtup::free_var_part(Fragrecord* fragPtr, PagePtr pagePtr, Uint32 page_idx)
243 {
244   ndbassert(fragPtr->m_varWordsFree >= ((Var_page*)pagePtr.p)->free_space);
245   fragPtr->m_varWordsFree -= ((Var_page*)pagePtr.p)->free_space;
246   ((Var_page*)pagePtr.p)->free_record(page_idx, Var_page::CHAIN);
247   ndbassert(fragPtr->m_varElemCount > 0);
248   fragPtr->m_varElemCount--;
249 
250   ndbassert(pagePtr.p->free_space <= Var_page::DATA_WORDS);
251   if (pagePtr.p->free_space == Var_page::DATA_WORDS - 1)
252   {
253     jam();
254     Uint32 idx = pagePtr.p->list_index;
255     LocalDLList<Page> list(c_page_pool, fragPtr->free_var_page_array[idx]);
256     list.remove(pagePtr);
257     returnCommonArea(pagePtr.i, 1);
258     fragPtr->noOfVarPages --;
259   }
260   else
261   {
262     jam();
263     // Adds the new free space value for the page to the fragment total.
264     update_free_page_list(fragPtr, pagePtr);
265   }
266   ndbassert(fragPtr->verifyVarSpace());
267 }
268 
269 Uint32 *
realloc_var_part(Uint32 * err,Fragrecord * fragPtr,Tablerec * tabPtr,PagePtr pagePtr,Var_part_ref * refptr,Uint32 oldsz,Uint32 newsz)270 Dbtup::realloc_var_part(Uint32 * err,
271                         Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr,
272 			Var_part_ref* refptr, Uint32 oldsz, Uint32 newsz)
273 {
274   Uint32 add = newsz - oldsz;
275   Uint32 *new_var_ptr;
276   Var_page* pageP = (Var_page*)pagePtr.p;
277   Local_key oldref;
278   refptr->copyout(&oldref);
279 
280   ndbassert(newsz);
281   ndbassert(add);
282 
283   if (oldsz && pageP->free_space >= add)
284   {
285     jam();
286     new_var_ptr= pageP->get_ptr(oldref.m_page_idx);
287     if(!pageP->is_space_behind_entry(oldref.m_page_idx, add))
288     {
289       if(0) printf("extra reorg");
290       jam();
291       /**
292        * In this case we need to reorganise the page to fit. To ensure we
293        * don't complicate matters we make a little trick here where we
294        * fool the reorg_page to avoid copying the entry at hand and copy
295        * that separately at the end. This means we need to copy it out of
296        * the page before reorg_page to save the entry contents.
297        */
298       Uint32* copyBuffer= cinBuffer;
299       memcpy(copyBuffer, new_var_ptr, 4*oldsz);
300       pageP->set_entry_len(oldref.m_page_idx, 0);
301       pageP->free_space += oldsz;
302       fragPtr->m_varWordsFree += oldsz;
303       pageP->reorg((Var_page*)ctemp_page);
304       new_var_ptr= pageP->get_free_space_ptr();
305       memcpy(new_var_ptr, copyBuffer, 4*oldsz);
306       pageP->set_entry_offset(oldref.m_page_idx, pageP->insert_pos);
307       add += oldsz;
308     }
309     ndbassert(fragPtr->m_varWordsFree >= pageP->free_space);
310     fragPtr->m_varWordsFree -= pageP->free_space;
311 
312     pageP->grow_entry(oldref.m_page_idx, add);
313     // Adds the new free space value for the page to the fragment total.
314     update_free_page_list(fragPtr, pagePtr);
315   }
316   else
317   {
318     jam();
319     Local_key newref;
320     new_var_ptr = alloc_var_part(err, fragPtr, tabPtr, newsz, &newref);
321     if (unlikely(new_var_ptr == 0))
322       return NULL;
323 
324     if (oldsz)
325     {
326       jam();
327       Uint32 *src = pageP->get_ptr(oldref.m_page_idx);
328       ndbassert(oldref.m_page_no != newref.m_page_no);
329       ndbassert(pageP->get_entry_len(oldref.m_page_idx) == oldsz);
330       memcpy(new_var_ptr, src, 4*oldsz);
331       free_var_part(fragPtr, pagePtr, oldref.m_page_idx);
332     }
333 
334     refptr->assign(&newref);
335   }
336 
337   return new_var_ptr;
338 }
339 
340 void
move_var_part(Fragrecord * fragPtr,Tablerec * tabPtr,PagePtr pagePtr,Var_part_ref * refptr,Uint32 size)341 Dbtup::move_var_part(Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr,
342                      Var_part_ref* refptr, Uint32 size)
343 {
344   jam();
345 
346   ndbassert(size);
347   Var_page* pageP = (Var_page*)pagePtr.p;
348   Local_key oldref;
349   refptr->copyout(&oldref);
350 
351   /**
352    * to find destination page index of free list
353    */
354   Uint32 new_index = calculate_free_list_impl(size);
355 
356   /**
357    * do not move tuple from big-free-size page list
358    * to small-free-size page list
359    */
360   if (new_index > pageP->list_index)
361   {
362     jam();
363     return;
364   }
365 
366   PagePtr new_pagePtr;
367   new_pagePtr.i = get_alloc_page(fragPtr, size + 1);
368 
369   if (new_pagePtr.i == RNIL)
370   {
371     jam();
372     return;
373   }
374 
375   /**
376    * do not move varpart if new var part page is same as old
377    */
378   if (new_pagePtr.i == pagePtr.i)
379   {
380     jam();
381     return;
382   }
383 
384   c_page_pool.getPtr(new_pagePtr);
385 
386   ndbassert(fragPtr->m_varWordsFree >= ((Var_page*)new_pagePtr.p)->free_space);
387   fragPtr->m_varWordsFree -= ((Var_page*)new_pagePtr.p)->free_space;
388 
389   Uint32 idx= ((Var_page*)new_pagePtr.p)
390     ->alloc_record(size,(Var_page*)ctemp_page, Var_page::CHAIN);
391 
392   /**
393    * update new page into new free list after alloc_record
394    */
395   update_free_page_list(fragPtr, new_pagePtr);
396 
397   Uint32 *dst = ((Var_page*)new_pagePtr.p)->get_ptr(idx);
398   const Uint32 *src = pageP->get_ptr(oldref.m_page_idx);
399 
400   /**
401    * copy old varpart to new position
402    */
403   memcpy(dst, src, 4*size);
404 
405   fragPtr->m_varElemCount++;
406   /**
407    * remove old var part of tuple (and decrement m_varElemCount).
408    */
409   free_var_part(fragPtr, pagePtr, oldref.m_page_idx);
410 
411   /**
412    * update var part ref of fix part tuple to newref
413    */
414   Local_key newref;
415   newref.m_page_no = new_pagePtr.i;
416   newref.m_page_idx = idx;
417   refptr->assign(&newref);
418 }
419 
420 /* ------------------------------------------------------------------------ */
421 // Get a page from one of free lists. If the desired free list is empty we
422 // try with the next until we have tried all possible lists.
423 /* ------------------------------------------------------------------------ */
424 Uint32
get_alloc_page(Fragrecord * fragPtr,Uint32 alloc_size)425 Dbtup::get_alloc_page(Fragrecord* fragPtr, Uint32 alloc_size)
426 {
427   Uint32 i, start_index, loop= 0;
428   PagePtr pagePtr;
429 
430   start_index= calculate_free_list_impl(alloc_size);
431   if (start_index == (MAX_FREE_LIST - 1))
432   {
433     jam();
434   }
435   else
436   {
437     jam();
438     ndbrequire(start_index < (MAX_FREE_LIST - 1));
439     start_index++;
440   }
441   for (i= start_index; i < MAX_FREE_LIST; i++)
442   {
443     jam();
444     if (!fragPtr->free_var_page_array[i].isEmpty())
445     {
446       jam();
447       return fragPtr->free_var_page_array[i].getFirst();
448     }
449   }
450   ndbrequire(start_index > 0);
451   i= start_index - 1;
452   LocalDLList<Page> list(c_page_pool, fragPtr->free_var_page_array[i]);
453   for(list.first(pagePtr); !pagePtr.isNull() && loop < 16; )
454   {
455     jam();
456     if (pagePtr.p->free_space >= alloc_size) {
457       jam();
458       return pagePtr.i;
459     }
460     loop++;
461     list.next(pagePtr);
462   }
463   return RNIL;
464 }
465 
466 Uint32
get_empty_var_page(Fragrecord * fragPtr)467 Dbtup::get_empty_var_page(Fragrecord* fragPtr)
468 {
469   PagePtr ptr;
470   Uint32 cnt;
471   allocConsPages(jamBuffer(), 1, cnt, ptr.i);
472   fragPtr->noOfVarPages+= cnt;
473   if (unlikely(cnt == 0))
474   {
475     return RNIL;
476   }
477 
478   c_page_pool.getPtr(ptr);
479   ptr.p->physical_page_id = ptr.i;
480   ptr.p->page_state = ~0;
481   ptr.p->nextList = RNIL;
482   ptr.p->prevList = RNIL;
483   ptr.p->frag_page_id = RNIL;
484 
485   return ptr.i;
486 }
487 
488 /* ------------------------------------------------------------------------ */
489 // Check if the page needs to go to a new free page list.
490 /* ------------------------------------------------------------------------ */
update_free_page_list(Fragrecord * fragPtr,Ptr<Page> pagePtr)491 void Dbtup::update_free_page_list(Fragrecord* fragPtr,
492                                   Ptr<Page> pagePtr)
493 {
494   Uint32 free_space, list_index;
495   free_space= pagePtr.p->free_space;
496   list_index= pagePtr.p->list_index;
497   fragPtr->m_varWordsFree+= free_space;
498   ndbassert(fragPtr->verifyVarSpace());
499 
500   if ((free_space < c_min_list_size[list_index]) ||
501       (free_space > c_max_list_size[list_index])) {
502     Uint32 new_list_index= calculate_free_list_impl(free_space);
503 
504     {
505       /**
506        * Remove from free list
507        */
508       LocalDLList<Page>
509         list(c_page_pool, fragPtr->free_var_page_array[list_index]);
510       list.remove(pagePtr);
511     }
512     if (free_space < c_min_list_size[new_list_index])
513     {
514       /*
515 	We have not sufficient amount of free space to put it into any
516 	free list. Thus the page will not be available for new inserts.
517 	This can only happen for the free list with least guaranteed
518 	free space.
519 
520         Put in on MAX_FREE_LIST-list (i.e full pages)
521       */
522       jam();
523       ndbrequire(new_list_index == 0);
524       new_list_index = MAX_FREE_LIST;
525     }
526 
527     {
528       LocalDLList<Page> list(c_page_pool,
529                              fragPtr->free_var_page_array[new_list_index]);
530       list.addFirst(pagePtr);
531       pagePtr.p->list_index = new_list_index;
532     }
533   }
534 }
535 
536 /* ------------------------------------------------------------------------ */
537 // Given size of free space, calculate the free list to put it into
538 /* ------------------------------------------------------------------------ */
calculate_free_list_impl(Uint32 free_space_size) const539 Uint32 Dbtup::calculate_free_list_impl(Uint32 free_space_size) const
540 {
541   Uint32 i;
542   for (i = 0; i < MAX_FREE_LIST; i++) {
543     jam();
544     if (free_space_size <= c_max_list_size[i]) {
545       jam();
546       return i;
547     }
548   }
549   ndbrequire(false);
550   return 0;
551 }
552 
calculate_used_var_words(Fragrecord * fragPtr)553 Uint64 Dbtup::calculate_used_var_words(Fragrecord* fragPtr)
554 {
555   /* Loop over all VarSize pages in this fragment, summing
556    * their used space
557    */
558   Uint64 totalUsed= 0;
559   for (Uint32 freeList= 0; freeList <= MAX_FREE_LIST; freeList++)
560   {
561     LocalDLList<Page> list(c_page_pool,
562                            fragPtr->free_var_page_array[freeList]);
563     Ptr<Page> pagePtr;
564 
565     if (list.first(pagePtr))
566     {
567       do
568       {
569         totalUsed+= (Tup_varsize_page::DATA_WORDS - pagePtr.p->free_space);
570       } while (list.next(pagePtr));
571     };
572   };
573 
574   return totalUsed;
575 }
576 
577 Uint32*
alloc_var_rowid(Uint32 * err,Fragrecord * fragPtr,Tablerec * tabPtr,Uint32 alloc_size,Local_key * key,Uint32 * out_frag_page_id)578 Dbtup::alloc_var_rowid(Uint32 * err,
579                        Fragrecord* fragPtr,
580 		       Tablerec* tabPtr,
581 		       Uint32 alloc_size,
582 		       Local_key* key,
583 		       Uint32 * out_frag_page_id)
584 {
585   Uint32 *ptr = alloc_fix_rowid(err, fragPtr, tabPtr, key, out_frag_page_id);
586   if (unlikely(ptr == 0))
587   {
588     return 0;
589   }
590 
591   Local_key varref;
592   Tuple_header* tuple = (Tuple_header*)ptr;
593   Var_part_ref* dst = (Var_part_ref*)tuple->get_var_part_ref_ptr(tabPtr);
594 
595   if (alloc_size)
596   {
597     if (likely(alloc_var_part(err, fragPtr, tabPtr, alloc_size, &varref) != 0))
598     {
599       dst->assign(&varref);
600       return ptr;
601     }
602   }
603   else
604   {
605     varref.m_page_no = RNIL;
606     dst->assign(&varref);
607     return ptr;
608   }
609 
610   PagePtr pagePtr;
611   c_page_pool.getPtr(pagePtr, key->m_page_no);
612   free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p);
613   return 0;
614 }
615