1 /* Copyright (C) 2007 Michael Widenius
2    Copyright (c) 2010, 2013, Monty Program Ab.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 /*
18   Bitmap handling (for records in block)
19 
20   The data file starts with a bitmap page, followed by as many data
21   pages as the bitmap can cover. After this there is a new bitmap page
22   and more data pages etc.
23 
24   The bitmap code assumes there is always an active bitmap page and thus
25   that there is at least one bitmap page in the file
26 
27   Structure of bitmap page:
28 
29   Fixed size records (to be implemented later):
30 
31   2 bits are used to indicate:
32 
33   0      Empty
34   1      0-75 % full  (at least room for 2 records)
35   2      75-100 % full (at least room for one record)
36   3      100 % full    (no more room for records)
37 
38   Assuming 8K pages, this will allow us to map:
39   8192 (bytes per page) * 4 (pages mapped per byte) * 8192 (page size)= 256M
40 
41   (For Maria this will be 7*4 * 8192 = 224K smaller because of LSN)
42 
43   Note that for fixed size rows, we can't add more columns without doing
44   a full reorganization of the table. The user can always force a dynamic
45   size row format by specifying ROW_FORMAT=dynamic.
46 
47 
48   Dynamic size records:
49 
50   3 bits are used to indicate				Bytes free in 8K page
51 
52   0      Empty page					8176 (head or tail)
53   1      0-30 % full  (at least room for 3 records)	5724
54   2      30-60 % full (at least room for 2 records)	3271
55   3      60-90 % full (at least room for one record)	818
56   4      100 % full   (no more room for records)	0
57   5      Tail page,  0-40 % full			4906
58   6      Tail page,  40-80 % full			1636
59   7      Full tail page or full blob page		0
60 
61   Assuming 8K pages, this will allow us to map:
62   8192 (bytes per page) * 8 bits/byte / 3 bits/page * 8192 (page size)= 170.7M
63 
64   Note that values 1-3 may be adjust for each individual table based on
65   'min record length'.  Tail pages are for overflow data which can be of
66   any size and thus doesn't have to be adjusted for different tables.
67   If we add more columns to the table, some of the originally calculated
68   'cut off' points may not be optimal, but they shouldn't be 'drasticly
69   wrong'.
70 
71   When allocating data from the bitmap, we are trying to do it in a
72   'best fit' manner. Blobs and varchar blocks are given out in large
73   continuous extents to allow fast access to these. Before allowing a
74   row to 'flow over' to other blocks, we will compact the page and use
75   all space on it. If there is many rows in the page, we will ensure
76   there is *LEFT_TO_GROW_ON_SPLIT* bytes left on the page to allow other
77   rows to grow.
78 
79   The bitmap format allows us to extend the row file in big chunks, if needed.
80 
81   When calculating the size for a packed row, we will calculate the following
82   things separately:
83   - Row header + null_bits + empty_bits fixed size segments etc.
84   - Size of all char/varchar fields
85   - Size of each blob field
86 
87   The bitmap handler will get all the above information and return
88   either one page or a set of pages to put the different parts.
89 
90   Bitmaps are read on demand in response to insert/delete/update operations.
91   The following bitmap pointers will be cached and stored on disk on close:
92   - Current insert_bitmap;  When inserting new data we will first try to
93     fill this one.
94   - First bitmap which is not completely full.  This is updated when we
95     free data with an update or delete.
96 
97   While flushing out bitmaps, we will cache the status of the bitmap in memory
98   to avoid having to read a bitmap for insert of new data that will not
99   be of any use
100   - Total empty space
101   - Largest number of continuous pages
102 
103   Bitmap ONLY goes to disk in the following scenarios
104   - The file is closed (and we flush all changes to disk)
105   - On checkpoint
106   (Ie: When we do a checkpoint, we have to ensure that all bitmaps are
107   put on disk even if they are not in the page cache).
108   - When explicitly requested (for example on backup or after recovery,
109   to simplify things)
110 
111  The flow of writing a row is that:
112  - Mark the bitmap not flushable (_ma_bitmap_flushable(X, 1))
113  - Lock the bitmap
114  - Decide which data pages we will write to
115  - Mark them full in the bitmap page so that other threads do not try to
116     use the same data pages as us
117  - We unlock the bitmap
118  - Write the data pages
119  - Lock the bitmap
120  - Correct the bitmap page with the true final occupation of the data
121    pages (that is, we marked pages full but when we are done we realize
122    we didn't fill them)
123  - Unlock the bitmap.
124  - Mark the bitmap flushable (_ma_bitmap_flushable(X, -1))
125 */
126 
127 #include "maria_def.h"
128 #include "ma_blockrec.h"
129 
130 #define FULL_HEAD_PAGE 4
131 #define FULL_TAIL_PAGE 7
132 
133 const char *bits_to_txt[]=
134 {
135   "empty", "00-30% full", "30-60% full", "60-90% full", "full",
136   "tail 00-40 % full", "tail 40-80 % full", "tail/blob full"
137 };
138 
139 #define WRONG_BITMAP_FLUSH 0 /*define to 1 only for provoking bugs*/
140 
141 static my_bool _ma_read_bitmap_page(MARIA_HA *info,
142                                     MARIA_FILE_BITMAP *bitmap,
143                                     pgcache_page_no_t page);
144 static my_bool _ma_bitmap_create_missing(MARIA_HA *info,
145                                          MARIA_FILE_BITMAP *bitmap,
146                                          pgcache_page_no_t page);
147 static void _ma_bitmap_unpin_all(MARIA_SHARE *share);
148 #ifndef DBUG_OFF
149 static void _ma_check_bitmap(MARIA_FILE_BITMAP *bitmap);
150 #else
151 #define _ma_check_bitmap(A) do { } while(0)
152 #endif
153 
154 
155 /* Write bitmap page to key cache */
156 
write_changed_bitmap(MARIA_SHARE * share,MARIA_FILE_BITMAP * bitmap)157 static inline my_bool write_changed_bitmap(MARIA_SHARE *share,
158                                            MARIA_FILE_BITMAP *bitmap)
159 {
160   my_bool res;
161   DBUG_ENTER("write_changed_bitmap");
162   DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
163   DBUG_ASSERT(bitmap->file.pre_write_hook != 0);
164   DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable));
165 
166   /*
167     Mark that a bitmap page has been written to page cache and we have
168     to flush it during checkpoint.
169   */
170   bitmap->changed_not_flushed= 1;
171 
172   if ((bitmap->non_flushable == 0) || WRONG_BITMAP_FLUSH)
173   {
174     res= pagecache_write(share->pagecache,
175                                  &bitmap->file, bitmap->page, 0,
176                                  bitmap->map, PAGECACHE_PLAIN_PAGE,
177                                  PAGECACHE_LOCK_LEFT_UNLOCKED,
178                                  PAGECACHE_PIN_LEFT_UNPINNED,
179                                  PAGECACHE_WRITE_DELAY, 0, LSN_IMPOSSIBLE);
180     DBUG_ASSERT(!res);
181     DBUG_RETURN(res);
182   }
183   else
184   {
185     /*
186       bitmap->non_flushable means that someone has changed the bitmap,
187       but it's not yet complete so it can't yet be written to disk.
188       In this case we write the changed bitmap to the disk cache,
189       but keep it pinned until the change is completed. The page will
190       be unpinned later by _ma_bitmap_unpin_all() as soon as non_flushable
191       is set back to 0.
192     */
193     MARIA_PINNED_PAGE page_link;
194     DBUG_PRINT("info", ("Writing pinned bitmap page"));
195     res= pagecache_write(share->pagecache,
196                              &bitmap->file, bitmap->page, 0,
197                              bitmap->map, PAGECACHE_PLAIN_PAGE,
198                              PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_PIN,
199                              PAGECACHE_WRITE_DELAY, &page_link.link,
200                              LSN_IMPOSSIBLE);
201     page_link.unlock= PAGECACHE_LOCK_LEFT_UNLOCKED;
202     page_link.changed= 1;
203     push_dynamic(&bitmap->pinned_pages, (const uchar*) (void*) &page_link);
204     DBUG_ASSERT(!res);
205     DBUG_RETURN(res);
206   }
207 }
208 
209 /*
210   Initialize bitmap variables in share
211 
212   SYNOPSIS
213     _ma_bitmap_init()
214     share		Share handler
215     file		Data file handler
216     last_page		Pointer to last page (max_file_size) that needs to be
217 			mapped by the bitmap. This is adjusted to bitmap
218                         alignment.
219 
220   NOTES
221    This is called the first time a file is opened.
222 
223   RETURN
224     0   ok
225     1   error
226 */
227 
_ma_bitmap_init(MARIA_SHARE * share,File file,pgcache_page_no_t * last_page)228 my_bool _ma_bitmap_init(MARIA_SHARE *share, File file,
229                         pgcache_page_no_t *last_page)
230 {
231   uint aligned_bit_blocks;
232   uint max_page_size;
233   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
234   uint size= share->block_size;
235   pgcache_page_no_t first_bitmap_with_space;
236 #ifndef DBUG_OFF
237   /* We want to have a copy of the bitmap to be able to print differences */
238   size*= 2;
239 #endif
240 
241   if (((bitmap->map= (uchar*) my_malloc(size, MYF(MY_WME))) == NULL) ||
242       my_init_dynamic_array(&bitmap->pinned_pages,
243                             sizeof(MARIA_PINNED_PAGE), 1, 1, MYF(0)))
244     return 1;
245 
246   bitmap->share= share;
247   bitmap->block_size= share->block_size;
248   bitmap->file.file= file;
249   _ma_bitmap_set_pagecache_callbacks(&bitmap->file, share);
250 
251   /* Size needs to be aligned on 6 */
252   aligned_bit_blocks= (share->block_size - PAGE_SUFFIX_SIZE) / 6;
253   bitmap->max_total_size= bitmap->total_size= aligned_bit_blocks * 6;
254   /*
255     In each 6 bytes, we have 6*8/3 = 16 pages covered
256     The +1 is to add the bitmap page, as this doesn't have to be covered
257   */
258   bitmap->pages_covered= aligned_bit_blocks * 16 + 1;
259   bitmap->flush_all_requested= bitmap->waiting_for_flush_all_requested=
260     bitmap->waiting_for_non_flushable= 0;
261   bitmap->non_flushable= 0;
262 
263   /* Update size for bits */
264   /* TODO; Make this dependent of the row size */
265   max_page_size= share->block_size - PAGE_OVERHEAD_SIZE(share) + DIR_ENTRY_SIZE;
266   bitmap->sizes[0]= max_page_size;              /* Empty page */
267   bitmap->sizes[1]= max_page_size - max_page_size * 30 / 100;
268   bitmap->sizes[2]= max_page_size - max_page_size * 60 / 100;
269   bitmap->sizes[3]= max_page_size - max_page_size * 90 / 100;
270   bitmap->sizes[4]= 0;                          /* Full page */
271   bitmap->sizes[5]= max_page_size - max_page_size * 40 / 100;
272   bitmap->sizes[6]= max_page_size - max_page_size * 80 / 100;
273   bitmap->sizes[7]= 0;
274 
275   /*
276     If a record size will fit into the smallest empty page, return first
277    found page in find_head()
278   */
279   if (bitmap->sizes[3] >= share->base.max_pack_length)
280     bitmap->return_first_match= 1;
281 
282   mysql_mutex_init(key_SHARE_BITMAP_lock,
283                    &share->bitmap.bitmap_lock, MY_MUTEX_INIT_SLOW);
284   mysql_cond_init(key_SHARE_BITMAP_cond,
285                   &share->bitmap.bitmap_cond, 0);
286 
287   first_bitmap_with_space= share->state.first_bitmap_with_space;
288   _ma_bitmap_reset_cache(share);
289 
290   /*
291     The bitmap used to map the file are aligned on 6 bytes. We now
292     calculate the max file size that can be used by the bitmap. This
293     is needed to get ma_info() give a true file size so that the user can
294     estimate if there is still space free for records in the file.
295   */
296   {
297     pgcache_page_no_t last_bitmap_page;
298     ulong blocks, bytes;
299 
300     last_bitmap_page= *last_page - *last_page % bitmap->pages_covered;
301     blocks= (ulong) (*last_page - last_bitmap_page);
302     bytes= (blocks * 3) / 8;      /* 3 bit per page / 8 bits per byte */
303     /* Size needs to be aligned on 6 */
304     bytes/= 6;
305     bytes*= 6;
306     bitmap->last_bitmap_page= last_bitmap_page;
307     bitmap->last_total_size= (uint)bytes;
308     *last_page= ((last_bitmap_page + bytes*8/3));
309   }
310 
311   /* Restore first_bitmap_with_space if it's resonable */
312   if (first_bitmap_with_space <= (share->state.state.data_file_length /
313                                   share->block_size))
314     share->state.first_bitmap_with_space= first_bitmap_with_space;
315 
316   return 0;
317 }
318 
319 
320 /*
321   Free data allocated by _ma_bitmap_init
322 
323   SYNOPSIS
324     _ma_bitmap_end()
325     share		Share handler
326 */
327 
_ma_bitmap_end(MARIA_SHARE * share)328 my_bool _ma_bitmap_end(MARIA_SHARE *share)
329 {
330   my_bool res;
331 
332 #ifndef DBUG_OFF
333   if (! share->internal_table)
334     mysql_mutex_assert_owner(&share->close_lock);
335 #endif
336   DBUG_ASSERT(share->bitmap.non_flushable == 0);
337   DBUG_ASSERT(share->bitmap.flush_all_requested == 0);
338   DBUG_ASSERT(share->bitmap.waiting_for_non_flushable == 0 &&
339               share->bitmap.waiting_for_flush_all_requested == 0);
340   DBUG_ASSERT(share->bitmap.pinned_pages.elements == 0);
341 
342   res= _ma_bitmap_flush(share);
343   mysql_mutex_destroy(&share->bitmap.bitmap_lock);
344   mysql_cond_destroy(&share->bitmap.bitmap_cond);
345   delete_dynamic(&share->bitmap.pinned_pages);
346   my_free(share->bitmap.map);
347   share->bitmap.map= 0;
348   /*
349     This is to not get an assert in checkpoint. The bitmap will be flushed
350     at once by  _ma_once_end_block_record() as part of the normal flush
351     of the kfile.
352   */
353   share->bitmap.changed_not_flushed= 0;
354   return res;
355 }
356 
357 /*
358   Ensure that we have incremented open count before we try to read/write
359   a page while we have the bitmap lock.
360   This is needed to ensure that we don't call _ma_mark_file_changed() as
361   part of flushing a page to disk, as this locks share->internal_lock
362   and then mutex lock would happen in the wrong order.
363 */
364 
_ma_bitmap_mark_file_changed(MARIA_SHARE * share,my_bool flush_translog)365 static inline void _ma_bitmap_mark_file_changed(MARIA_SHARE *share,
366                                                 my_bool flush_translog)
367 {
368   /*
369     It's extremely unlikely that the following test is true as it
370     only happens once if the table has changed.
371   */
372   if (unlikely(!share->global_changed &&
373                (share->state.changed & STATE_CHANGED)))
374   {
375     /* purecov: begin inspected */
376     /* unlock mutex as it can't be hold during _ma_mark_file_changed() */
377     mysql_mutex_unlock(&share->bitmap.bitmap_lock);
378 
379     /*
380       We have to flush the translog to ensure we have registered that the
381       table is open.
382     */
383     if (flush_translog && share->now_transactional)
384       (void) translog_flush(share->state.logrec_file_id);
385 
386     _ma_mark_file_changed_now(share);
387     mysql_mutex_lock(&share->bitmap.bitmap_lock);
388     /* purecov: end */
389   }
390 }
391 
392 /*
393   Send updated bitmap to the page cache
394 
395   SYNOPSIS
396     _ma_bitmap_flush()
397     share		Share handler
398 
399   NOTES
400     In the future, _ma_bitmap_flush() will be called to flush changes don't
401     by this thread (ie, checking the changed flag is ok). The reason we
402     check it again in the mutex is that if someone else did a flush at the
403     same time, we don't have to do the write.
404     This is also ok for _ma_scan_init_block_record() which does not want to
405     miss rows: it cares only for committed rows, that is, rows for which there
406     was a commit before our transaction started; as commit and transaction's
407     start are protected by the same LOCK_trn_list mutex, we see memory at
408     least as new as at other transaction's commit time, so if the committed
409     rows caused bitmap->changed to be true, we see it; if we see 0 it really
410     means a flush happened since then. So, it's ok to read without bitmap's
411     mutex.
412 
413   RETURN
414     0    ok
415     1    error
416 */
417 
_ma_bitmap_flush(MARIA_SHARE * share)418 my_bool _ma_bitmap_flush(MARIA_SHARE *share)
419 {
420   my_bool res= 0;
421   DBUG_ENTER("_ma_bitmap_flush");
422   if (share->bitmap.changed)
423   {
424     mysql_mutex_lock(&share->bitmap.bitmap_lock);
425     if (share->bitmap.changed)
426     {
427       /*
428         We have to mark the file changed here, as otherwise the following
429         write to pagecache may force a page out from this file, which would
430         cause _ma_mark_file_changed() to be called with bitmaplock hold!
431       */
432       _ma_bitmap_mark_file_changed(share, 1);
433       res= write_changed_bitmap(share, &share->bitmap);
434       share->bitmap.changed= 0;
435     }
436     mysql_mutex_unlock(&share->bitmap.bitmap_lock);
437   }
438   DBUG_RETURN(res);
439 }
440 
441 
442 /**
443    Dirty-page filtering criteria for bitmap pages
444 
445    @param  type                Page's type
446    @param  pageno              Page's number
447    @param  rec_lsn             Page's rec_lsn
448    @param  arg                 pages_covered of bitmap
449 */
450 
451 static enum pagecache_flush_filter_result
filter_flush_bitmap_pages(enum pagecache_page_type type,pgcache_page_no_t pageno,LSN rec_lsn,void * arg)452 filter_flush_bitmap_pages(enum pagecache_page_type type
453                           __attribute__ ((unused)),
454                           pgcache_page_no_t pageno,
455                           LSN rec_lsn __attribute__ ((unused)),
456                           void *arg)
457 {
458   return ((pageno % (*(ulong*)arg)) == 0);
459 }
460 
461 
462 /**
463    Flushes current bitmap page to the pagecache, and then all bitmap pages
464    from pagecache to the file. Used by Checkpoint.
465 
466    @param  share               Table's share
467 */
468 
_ma_bitmap_flush_all(MARIA_SHARE * share)469 my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
470 {
471   my_bool res= 0;
472   uint send_signal= 0;
473   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
474   DBUG_ENTER("_ma_bitmap_flush_all");
475 
476 #ifdef EXTRA_DEBUG_BITMAP
477   {
478     char buff[160];
479     uint len= my_sprintf(buff,
480                          (buff, "bitmap_flush:  fd: %d  id: %u  "
481                           "changed: %d  changed_not_flushed: %d  "
482                           "flush_all_requested: %d",
483                           share->bitmap.file.file,
484                           share->id,
485                           bitmap->changed,
486                           bitmap->changed_not_flushed,
487                           bitmap->flush_all_requested));
488     (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY,
489                                    (uchar*) buff, len);
490   }
491 #endif
492 
493   mysql_mutex_lock(&bitmap->bitmap_lock);
494   if (!bitmap->changed && !bitmap->changed_not_flushed)
495   {
496     mysql_mutex_unlock(&bitmap->bitmap_lock);
497     DBUG_RETURN(0);
498   }
499 
500   _ma_bitmap_mark_file_changed(share, 0);
501 
502   /*
503     The following should be true as it was tested above. We have to test
504     this again as _ma_bitmap_mark_file_changed() did temporarly release
505     the bitmap mutex.
506   */
507   if (bitmap->changed || bitmap->changed_not_flushed)
508   {
509     bitmap->flush_all_requested++;
510     bitmap->waiting_for_non_flushable++;
511 #if !WRONG_BITMAP_FLUSH
512     while (bitmap->non_flushable > 0)
513     {
514       DBUG_PRINT("info", ("waiting for bitmap to be flushable"));
515       mysql_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock);
516     }
517 #endif
518     bitmap->waiting_for_non_flushable--;
519 #ifdef EXTRA_DEBUG_BITMAP
520     {
521       char tmp[MAX_BITMAP_INFO_LENGTH];
522       _ma_get_bitmap_description(bitmap, bitmap->map, bitmap->page, tmp);
523       (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY,
524                                      (uchar*) tmp, strlen(tmp));
525     }
526 #endif
527 
528     DBUG_ASSERT(bitmap->flush_all_requested == 1);
529     /*
530       Bitmap is in a flushable state: its contents in memory are reflected by
531       log records (complete REDO-UNDO groups) and all bitmap pages are
532       unpinned. We keep the mutex to preserve this situation, and flush to the
533       file.
534     */
535     if (bitmap->changed)
536     {
537       bitmap->changed= FALSE;
538       res= write_changed_bitmap(share, bitmap);
539     }
540     /*
541       We do NOT use FLUSH_KEEP_LAZY because we must be sure that bitmap
542       pages have been flushed. That's a condition of correctness of
543       Recovery: data pages may have been all flushed, if we write the
544       checkpoint record Recovery will start from after their REDOs. If
545       bitmap page was not flushed, as the REDOs about it will be skipped, it
546       will wrongly not be recovered. If bitmap pages had a rec_lsn it would
547       be different.
548       There should be no pinned pages as bitmap->non_flushable==0.
549     */
550     if (flush_pagecache_blocks_with_filter(share->pagecache,
551                                            &bitmap->file, FLUSH_KEEP,
552                                            filter_flush_bitmap_pages,
553                                            &bitmap->pages_covered) &
554         PCFLUSH_PINNED_AND_ERROR)
555       res= TRUE;
556     bitmap->changed_not_flushed= FALSE;
557     bitmap->flush_all_requested--;
558     /*
559       Some well-behaved threads may be waiting for flush_all_requested to
560       become false, wake them up.
561     */
562     DBUG_PRINT("info", ("bitmap flusher waking up others"));
563     send_signal= (bitmap->waiting_for_flush_all_requested |
564                   bitmap->waiting_for_non_flushable);
565   }
566   mysql_mutex_unlock(&bitmap->bitmap_lock);
567   if (send_signal)
568     mysql_cond_broadcast(&bitmap->bitmap_cond);
569   DBUG_RETURN(res);
570 }
571 
572 
573 /**
574    @brief Lock bitmap from being used by another thread
575 
576    @fn _ma_bitmap_lock()
577    @param  share               Table's share
578 
579    @notes
580    This is a temporary solution for allowing someone to delete an inserted
581    duplicate-key row while someone else is doing concurrent inserts.
582    This is ok for now as duplicate key errors are not that common.
583 
584    In the future we will add locks for row-pages to ensure two threads doesn't
585    work at the same time on the same page.
586 */
587 
_ma_bitmap_lock(MARIA_SHARE * share)588 void _ma_bitmap_lock(MARIA_SHARE *share)
589 {
590   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
591   DBUG_ENTER("_ma_bitmap_lock");
592 
593   if (!share->now_transactional)
594     DBUG_VOID_RETURN;
595 
596   mysql_mutex_lock(&bitmap->bitmap_lock);
597   bitmap->flush_all_requested++;
598   bitmap->waiting_for_non_flushable++;
599   while (bitmap->non_flushable)
600   {
601     DBUG_PRINT("info", ("waiting for bitmap to be flushable"));
602     mysql_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock);
603   }
604   bitmap->waiting_for_non_flushable--;
605   /*
606     Ensure that _ma_bitmap_flush_all() and _ma_bitmap_lock() are blocked.
607     ma_bitmap_flushable() is blocked thanks to 'flush_all_requested'.
608   */
609   bitmap->non_flushable= 1;
610   mysql_mutex_unlock(&bitmap->bitmap_lock);
611   DBUG_VOID_RETURN;
612 }
613 
614 /**
615    @brief Unlock bitmap after _ma_bitmap_lock()
616 
617    @fn _ma_bitmap_unlock()
618    @param  share               Table's share
619 */
620 
_ma_bitmap_unlock(MARIA_SHARE * share)621 void _ma_bitmap_unlock(MARIA_SHARE *share)
622 {
623   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
624   uint send_signal;
625   DBUG_ENTER("_ma_bitmap_unlock");
626 
627   if (!share->now_transactional)
628     DBUG_VOID_RETURN;
629   DBUG_ASSERT(bitmap->flush_all_requested > 0 && bitmap->non_flushable == 1);
630 
631   mysql_mutex_lock(&bitmap->bitmap_lock);
632   bitmap->non_flushable= 0;
633   _ma_bitmap_unpin_all(share);
634   send_signal= bitmap->waiting_for_non_flushable;
635   if (!--bitmap->flush_all_requested)
636     send_signal|= bitmap->waiting_for_flush_all_requested;
637   mysql_mutex_unlock(&bitmap->bitmap_lock);
638   if (send_signal)
639     mysql_cond_broadcast(&bitmap->bitmap_cond);
640   DBUG_VOID_RETURN;
641 }
642 
643 
644 /**
645   @brief Unpin all pinned bitmap pages
646 
647   @param  share            Table's share
648 
649   @return Operation status
650     @retval   0   ok
651 
652   @note This unpins pages pinned by other threads.
653 */
654 
_ma_bitmap_unpin_all(MARIA_SHARE * share)655 static void _ma_bitmap_unpin_all(MARIA_SHARE *share)
656 {
657   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
658   MARIA_PINNED_PAGE *page_link= ((MARIA_PINNED_PAGE*)
659                                  dynamic_array_ptr(&bitmap->pinned_pages, 0));
660   MARIA_PINNED_PAGE *pinned_page= page_link + bitmap->pinned_pages.elements;
661   DBUG_ENTER("_ma_bitmap_unpin_all");
662   DBUG_PRINT("info", ("pinned: %u", bitmap->pinned_pages.elements));
663   while (pinned_page-- != page_link)
664     pagecache_unlock_by_link(share->pagecache, pinned_page->link,
665                              pinned_page->unlock, PAGECACHE_UNPIN,
666                              LSN_IMPOSSIBLE, LSN_IMPOSSIBLE, FALSE, TRUE);
667   bitmap->pinned_pages.elements= 0;
668   DBUG_VOID_RETURN;
669 }
670 
671 
672 /*
673   Intialize bitmap in memory to a zero bitmap
674 
675   SYNOPSIS
676     _ma_bitmap_delete_all()
677     share		Share handler
678 
679   NOTES
680     This is called on maria_delete_all_rows (truncate data file).
681 */
682 
_ma_bitmap_delete_all(MARIA_SHARE * share)683 void _ma_bitmap_delete_all(MARIA_SHARE *share)
684 {
685   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
686   DBUG_ENTER("_ma_bitmap_delete_all");
687   if (bitmap->map)                              /* Not in create */
688   {
689     bzero(bitmap->map, bitmap->block_size);
690     bitmap->changed= 1;
691     bitmap->page= 0;
692     bitmap->used_size= bitmap->full_tail_size= bitmap->full_head_size= 0;
693     bitmap->total_size= bitmap->max_total_size;
694   }
695   DBUG_VOID_RETURN;
696 }
697 
698 
699 /**
700    @brief Reset bitmap caches
701 
702    @fn    _ma_bitmap_reset_cache()
703    @param share		Maria share
704 
705    @notes
706    This is called after we have swapped file descriptors and we want
707    bitmap to forget all cached information.
708    It's also called directly after we have opened a file.
709 */
710 
_ma_bitmap_reset_cache(MARIA_SHARE * share)711 void _ma_bitmap_reset_cache(MARIA_SHARE *share)
712 {
713   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
714 
715   if (bitmap->map)                              /* If using bitmap */
716   {
717     /* Forget changes in current bitmap page */
718     bitmap->changed= 0;
719 
720     /*
721       We can't read a page yet, as in some case we don't have an active
722       page cache yet.
723       Pretend we have a dummy, full and not changed bitmap page in memory.
724 
725       We set bitmap->page to a value so that if we use it in
726       move_to_next_bitmap() it will point to page 0.
727       (This can only happen if writing to a bitmap page fails)
728     */
729     bitmap->page= ((pgcache_page_no_t) 0) - bitmap->pages_covered;
730     bitmap->used_size= bitmap->total_size= bitmap->max_total_size;
731     bitmap->full_head_size= bitmap->full_tail_size= bitmap->max_total_size;
732     bfill(bitmap->map, share->block_size, 255);
733 #ifndef DBUG_OFF
734     memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
735 #endif
736 
737     /* Start scanning for free space from start of file */
738     share->state.first_bitmap_with_space = 0;
739   }
740 }
741 
742 
743 /*
744   Return bitmap pattern for the smallest head block that can hold 'size'
745 
746   SYNOPSIS
747     size_to_head_pattern()
748     bitmap      Bitmap
749     size        Requested size
750 
751   RETURN
752     0-3         For a description of the bitmap sizes, see the header
753 */
754 
size_to_head_pattern(MARIA_FILE_BITMAP * bitmap,uint size)755 static uint size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size)
756 {
757   if (size <= bitmap->sizes[3])
758     return 3;
759   if (size <= bitmap->sizes[2])
760     return 2;
761   if (size <= bitmap->sizes[1])
762     return 1;
763   DBUG_ASSERT(size <= bitmap->sizes[0]);
764   return 0;
765 }
766 
767 
768 /*
769   Return bitmap pattern for head block where there is size bytes free
770 
771   SYNOPSIS
772     _ma_free_size_to_head_pattern()
773     bitmap      Bitmap
774     size        Requested size
775 
776   RETURN
777     0-4  (Possible bitmap patterns for head block)
778 */
779 
_ma_free_size_to_head_pattern(MARIA_FILE_BITMAP * bitmap,uint size)780 uint _ma_free_size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size)
781 {
782   if (size < bitmap->sizes[3])
783     return 4;
784   if (size < bitmap->sizes[2])
785     return 3;
786   if (size < bitmap->sizes[1])
787     return 2;
788   return (size < bitmap->sizes[0]) ? 1 : 0;
789 }
790 
791 
792 /*
793   Return bitmap pattern for the smallest tail block that can hold 'size'
794 
795   SYNOPSIS
796     size_to_tail_pattern()
797     bitmap      Bitmap
798     size        Requested size
799 
800   RETURN
801     0, 5 or 6   For a description of the bitmap sizes, see the header
802 */
803 
size_to_tail_pattern(MARIA_FILE_BITMAP * bitmap,uint size)804 static uint size_to_tail_pattern(MARIA_FILE_BITMAP *bitmap, uint size)
805 {
806   if (size <= bitmap->sizes[6])
807     return 6;
808   if (size <= bitmap->sizes[5])
809     return 5;
810   DBUG_ASSERT(size <= bitmap->sizes[0]);
811   return 0;
812 }
813 
814 
815 /*
816   Return bitmap pattern for tail block where there is size bytes free
817 
818   SYNOPSIS
819     free_size_to_tail_pattern()
820     bitmap      Bitmap
821     size        Requested size
822 
823   RETURN
824     0, 5, 6, 7   For a description of the bitmap sizes, see the header
825 */
826 
free_size_to_tail_pattern(MARIA_FILE_BITMAP * bitmap,uint size)827 static uint free_size_to_tail_pattern(MARIA_FILE_BITMAP *bitmap, uint size)
828 {
829   if (size >= bitmap->sizes[0])
830     return 0;                                   /* Revert to empty page */
831   if (size < bitmap->sizes[6])
832     return 7;
833   if (size < bitmap->sizes[5])
834     return 6;
835   return 5;
836 }
837 
838 
839 /*
840   Return size guranteed to be available on a page
841 
842   SYNOPSIS
843     pattern_to_head_size()
844     bitmap      Bitmap
845     pattern     Pattern (0-7)
846 
847   RETURN
848     0 - block_size
849 */
850 
pattern_to_size(MARIA_FILE_BITMAP * bitmap,uint pattern)851 static inline uint pattern_to_size(MARIA_FILE_BITMAP *bitmap, uint pattern)
852 {
853   DBUG_ASSERT(pattern <= 7);
854   return bitmap->sizes[pattern];
855 }
856 
857 
858 /*
859   Print bitmap for debugging
860 
861   SYNOPSIS
862   _ma_print_bitmap_changes()
863   bitmap	Bitmap to print
864 
865   IMPLEMENTATION
866     Prints all changed bits since last call to _ma_print_bitmap().
867     This is done by having a copy of the last bitmap in
868     bitmap->map+bitmap->block_size.
869 */
870 
871 #ifndef DBUG_OFF
872 
_ma_print_bitmap_changes(MARIA_FILE_BITMAP * bitmap)873 static void _ma_print_bitmap_changes(MARIA_FILE_BITMAP *bitmap)
874 {
875   uchar *pos, *end, *org_pos;
876   ulong page;
877   DBUG_ENTER("_ma_print_bitmap_changes");
878 
879   end= bitmap->map + bitmap->used_size;
880   DBUG_LOCK_FILE;
881   fprintf(DBUG_FILE,"\nBitmap page changes at page: %lu  bitmap: %p\n",
882           (ulong) bitmap->page, bitmap->map);
883 
884   page= (ulong) bitmap->page+1;
885   for (pos= bitmap->map, org_pos= bitmap->map + bitmap->block_size ;
886        pos < end ;
887        pos+= 6, org_pos+= 6)
888   {
889     ulonglong bits= uint6korr(pos);    /* 6 bytes = 6*8/3= 16 patterns */
890     ulonglong org_bits= uint6korr(org_pos);
891     uint i;
892 
893     /*
894       Test if there is any changes in the next 16 bitmaps (to not have to
895       loop through all bits if we know they are the same)
896     */
897     if (bits != org_bits)
898     {
899       for (i= 0; i < 16 ; i++, bits>>= 3, org_bits>>= 3)
900       {
901         if ((bits & 7) != (org_bits & 7))
902           fprintf(DBUG_FILE, "Page: %8lu  %s -> %s\n", page+i,
903                   bits_to_txt[org_bits & 7], bits_to_txt[bits & 7]);
904       }
905     }
906     page+= 16;
907   }
908   fputc('\n', DBUG_FILE);
909   DBUG_UNLOCK_FILE;
910   memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
911   DBUG_VOID_RETURN;
912 }
913 
914 
915 /* Print content of bitmap for debugging */
916 
_ma_print_bitmap(MARIA_FILE_BITMAP * bitmap,uchar * data,pgcache_page_no_t page)917 void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data,
918                       pgcache_page_no_t page)
919 {
920   uchar *pos, *end;
921   char llbuff[22];
922 
923   DBUG_LOCK_FILE;
924   fprintf(DBUG_FILE,"\nDump of bitmap page at %s\n", llstr(page, llbuff));
925 
926   page++;                                       /* Skip bitmap page */
927   for (pos= data, end= pos + bitmap->max_total_size;
928        pos < end ;
929        pos+= 6)
930   {
931     ulonglong bits= uint6korr(pos);    /* 6 bytes = 6*8/3= 16 patterns */
932 
933     /*
934       Test if there is any changes in the next 16 bitmaps (to not have to
935       loop through all bits if we know they are the same)
936     */
937     if (bits)
938     {
939       uint i;
940       for (i= 0; i < 16 ; i++, bits>>= 3)
941       {
942         if (bits & 7)
943           fprintf(DBUG_FILE, "Page: %8s  %s\n", llstr(page+i, llbuff),
944                   bits_to_txt[bits & 7]);
945       }
946     }
947     page+= 16;
948   }
949   fputc('\n', DBUG_FILE);
950   DBUG_UNLOCK_FILE;
951 }
952 
953 #endif /* DBUG_OFF */
954 
955 
956 /*
957   Return content of bitmap as a printable string
958 */
959 
_ma_get_bitmap_description(MARIA_FILE_BITMAP * bitmap,uchar * bitmap_data,pgcache_page_no_t page,char * out)960 void _ma_get_bitmap_description(MARIA_FILE_BITMAP *bitmap,
961                                 uchar *bitmap_data,
962                                 pgcache_page_no_t page,
963                                 char *out)
964 {
965   uchar *pos, *end;
966   uint count=0, dot_printed= 0, len;
967   char buff[80], last[80];
968 
969   page++;
970   last[0]=0;
971   for (pos= bitmap_data, end= pos+ bitmap->used_size ; pos < end ; pos+= 6)
972   {
973     ulonglong bits= uint6korr(pos);    /* 6 bytes = 6*8/3= 16 patterns */
974     uint i;
975 
976     for (i= 0; i < 16 ; i++, bits>>= 3)
977     {
978       if (count > 60)
979       {
980         if (memcmp(buff, last, count))
981         {
982           memcpy(last, buff, count);
983           len= sprintf(out, "%8lu: ", (ulong) page - count);
984           memcpy(out+len, buff, count);
985           out+= len + count + 1;
986           out[-1]= '\n';
987           dot_printed= 0;
988         }
989         else if (!(dot_printed++))
990         {
991           out= strmov(out, "...\n");
992         }
993         count= 0;
994       }
995       buff[count++]= '0' + (uint) (bits & 7);
996       page++;
997     }
998   }
999   len= sprintf(out, "%8lu: ", (ulong) page - count);
1000   memcpy(out+len, buff, count);
1001   out[len + count]= '\n';
1002   out[len + count + 1]= 0;
1003 }
1004 
1005 
1006 /*
1007   Adjust bitmap->total_size to not go over max_data_file_size
1008 */
1009 
adjust_total_size(MARIA_HA * info,pgcache_page_no_t page)1010 static void adjust_total_size(MARIA_HA *info, pgcache_page_no_t page)
1011 {
1012   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
1013 
1014   if (page < bitmap->last_bitmap_page)
1015     bitmap->total_size= bitmap->max_total_size;    /* Use all bits in bitmap */
1016   else
1017     bitmap->total_size= bitmap->last_total_size;
1018 }
1019 
1020 /***************************************************************************
1021   Reading & writing bitmap pages
1022 ***************************************************************************/
1023 
1024 /*
1025   Read a given bitmap page
1026 
1027   SYNOPSIS
1028     _ma_read_bitmap_page()
1029     info                Maria handler
1030     bitmap              Bitmap handler
1031     page                Page to read
1032 
1033   NOTE
1034     We don't always have share->bitmap.bitmap_lock here
1035     (when called from_ma_check_bitmap_data() for example).
1036 
1037   RETURN
1038     0  ok
1039     1  error  (Error writing old bitmap or reading bitmap page)
1040 */
1041 
_ma_read_bitmap_page(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page)1042 static my_bool _ma_read_bitmap_page(MARIA_HA *info,
1043                                     MARIA_FILE_BITMAP *bitmap,
1044                                     pgcache_page_no_t page)
1045 {
1046   MARIA_SHARE *share= info->s;
1047   my_bool res;
1048   DBUG_ENTER("_ma_read_bitmap_page");
1049   DBUG_PRINT("enter", ("page: %lld  data_file_length: %lld",
1050                        (longlong) page,
1051                        (longlong) share->state.state.data_file_length));
1052   DBUG_ASSERT(page % bitmap->pages_covered == 0);
1053   DBUG_ASSERT(!bitmap->changed);
1054 
1055   bitmap->page= page;
1056   if ((page + 1) * bitmap->block_size >  share->state.state.data_file_length)
1057   {
1058     /* Inexistent or half-created page */
1059     res= _ma_bitmap_create_missing(info, bitmap, page);
1060     if (!res)
1061       adjust_total_size(info, page);
1062     DBUG_RETURN(res);
1063   }
1064 
1065   adjust_total_size(info, page);
1066   bitmap->full_head_size=  bitmap->full_tail_size= 0;
1067   DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
1068   res= pagecache_read(share->pagecache,
1069                       &bitmap->file, page, 0,
1070                       bitmap->map, PAGECACHE_PLAIN_PAGE,
1071                       PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == NULL;
1072 
1073   if (!res)
1074   {
1075     /* Calculate used_size */
1076     const uchar *data, *end=  bitmap->map;
1077     for (data= bitmap->map + bitmap->total_size; --data >= end && *data == 0; )
1078     {}
1079     bitmap->used_size= (uint) ((data + 1) - end);
1080     DBUG_ASSERT(bitmap->used_size <= bitmap->total_size);
1081   }
1082   /*
1083     We can't check maria_bitmap_marker here as if the bitmap page
1084     previously had a true checksum and the user switched mode to not checksum
1085     this may have any value, except maria_normal_page_marker.
1086 
1087     Using maria_normal_page_marker gives us a protection against bugs
1088     when running without any checksums.
1089   */
1090 
1091 #ifndef DBUG_OFF
1092   if (!res)
1093   {
1094     memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
1095     _ma_check_bitmap(bitmap);
1096   }
1097 #endif
1098   DBUG_RETURN(res);
1099 }
1100 
1101 
1102 /*
1103   Change to another bitmap page
1104 
1105   SYNOPSIS
1106   _ma_change_bitmap_page()
1107     info                Maria handler
1108     bitmap              Bitmap handler
1109     page                Bitmap page to read
1110 
1111   NOTES
1112    If old bitmap was changed, write it out before reading new one
1113    We return empty bitmap if page is outside of file size
1114 
1115   RETURN
1116     0  ok
1117     1  error  (Error writing old bitmap or reading bitmap page)
1118 */
1119 
_ma_change_bitmap_page(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page)1120 static my_bool _ma_change_bitmap_page(MARIA_HA *info,
1121                                       MARIA_FILE_BITMAP *bitmap,
1122                                       pgcache_page_no_t page)
1123 {
1124   DBUG_ENTER("_ma_change_bitmap_page");
1125 
1126   _ma_check_bitmap(bitmap);
1127 
1128   /*
1129     We have to mark the file changed here, as otherwise the following
1130     read/write to pagecache may force a page out from this file, which would
1131     cause _ma_mark_file_changed() to be called with bitmaplock hold!
1132   */
1133   _ma_bitmap_mark_file_changed(info->s, 1);
1134 
1135   if (bitmap->changed)
1136   {
1137     if (write_changed_bitmap(info->s, bitmap))
1138       DBUG_RETURN(1);
1139     bitmap->changed= 0;
1140   }
1141   DBUG_RETURN(_ma_read_bitmap_page(info, bitmap, page));
1142 }
1143 
1144 
1145 /*
1146   Read next suitable bitmap
1147 
1148   SYNOPSIS
1149     move_to_next_bitmap()
1150     bitmap              Bitmap handle
1151 
1152   NOTES
1153     The found bitmap may be full, so calling function may need to call this
1154     repeatedly until it finds enough space.
1155 
1156   TODO
1157     Add cache of bitmaps to not read something that is not usable
1158 
1159   RETURN
1160     0  ok
1161     1  error (either couldn't save old bitmap or read new one)
1162 */
1163 
move_to_next_bitmap(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap)1164 static my_bool move_to_next_bitmap(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap)
1165 {
1166   pgcache_page_no_t page= bitmap->page;
1167   MARIA_STATE_INFO *state= &info->s->state;
1168   DBUG_ENTER("move_to_next_bitmap");
1169 
1170   if (state->first_bitmap_with_space != ~(pgcache_page_no_t) 0 &&
1171       state->first_bitmap_with_space != page)
1172   {
1173     page= state->first_bitmap_with_space;
1174     state->first_bitmap_with_space= ~(pgcache_page_no_t) 0;
1175     DBUG_ASSERT(page % bitmap->pages_covered == 0);
1176   }
1177   else
1178   {
1179     page+= bitmap->pages_covered;
1180     DBUG_ASSERT(page % bitmap->pages_covered == 0);
1181   }
1182   DBUG_RETURN(_ma_change_bitmap_page(info, bitmap, page));
1183 }
1184 
1185 
1186 /****************************************************************************
1187  Allocate data in bitmaps
1188 ****************************************************************************/
1189 
1190 /*
1191   Store data in 'block' and mark the place used in the bitmap
1192 
1193   SYNOPSIS
1194     fill_block()
1195     bitmap		Bitmap handle
1196     block		Store data about what we found
1197     best_data		Pointer to best 6 uchar aligned area in bitmap->map
1198     best_pos		Which bit in *best_data the area starts
1199                         0 = first bit pattern, 1 second bit pattern etc
1200     best_bits		The original value of the bits at best_pos
1201     fill_pattern	Bitmap pattern to store in best_data[best_pos]
1202 
1203    NOTES
1204     We mark all pages to be 'TAIL's, which means that
1205     block->page_count is really a row position inside the page.
1206 */
1207 
fill_block(MARIA_FILE_BITMAP * bitmap,MARIA_BITMAP_BLOCK * block,uchar * best_data,uint best_pos,uint best_bits,uint fill_pattern)1208 static void fill_block(MARIA_FILE_BITMAP *bitmap,
1209                        MARIA_BITMAP_BLOCK *block,
1210                        uchar *best_data, uint best_pos, uint best_bits,
1211                        uint fill_pattern)
1212 {
1213   uint page, offset, tmp;
1214   uchar *data;
1215   DBUG_ENTER("fill_block");
1216 
1217   /* For each 6 bytes we have 6*8/3= 16 patterns */
1218   page= ((uint) (best_data - bitmap->map)) / 6 * 16 + best_pos;
1219   DBUG_ASSERT(page + 1 < bitmap->pages_covered);
1220   block->page= bitmap->page + 1 + page;
1221   block->page_count= TAIL_PAGE_COUNT_MARKER;
1222   block->empty_space= pattern_to_size(bitmap, best_bits);
1223   block->sub_blocks= 0;
1224   block->org_bitmap_value= best_bits;
1225   block->used= BLOCKUSED_TAIL; /* See _ma_bitmap_release_unused() */
1226 
1227   /*
1228     Mark place used by reading/writing 2 bytes at a time to handle
1229     bitmaps in overlapping bytes
1230   */
1231   best_pos*= 3;
1232   data= best_data+ best_pos / 8;
1233   offset= best_pos & 7;
1234   tmp= uint2korr(data);
1235 
1236   /* we turn off the 3 bits and replace them with fill_pattern */
1237   tmp= (tmp & ~(7 << offset)) | (fill_pattern << offset);
1238   int2store(data, tmp);
1239   bitmap->changed= 1;
1240   DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
1241   DBUG_VOID_RETURN;
1242 }
1243 
1244 
1245 /*
1246   Allocate data for head block
1247 
1248   SYNOPSIS
1249    allocate_head()
1250    bitmap       bitmap
1251    size         Size of data region we need to store
1252    block        Store found information here
1253 
1254    IMPLEMENTATION
1255      Find the best-fit page to put a region of 'size'
1256      This is defined as the first page of the set of pages
1257      with the smallest free space that can hold 'size'.
1258 
1259    NOTES
1260      Updates bitmap->full_head_size while scanning data
1261 
1262    RETURN
1263     0   ok    (block is updated)
1264     1   error (no space in bitmap; block is not touched)
1265 */
1266 
1267 
allocate_head(MARIA_FILE_BITMAP * bitmap,uint size,MARIA_BITMAP_BLOCK * block)1268 static my_bool allocate_head(MARIA_FILE_BITMAP *bitmap, uint size,
1269                              MARIA_BITMAP_BLOCK *block)
1270 {
1271   uint min_bits= size_to_head_pattern(bitmap, size);
1272   uchar *data, *end;
1273   uchar *best_data= 0;
1274   uint best_bits= (uint) -1, UNINIT_VAR(best_pos);
1275   my_bool first_pattern= 0; /* if doing insert_order */
1276   my_bool first_found= 1;
1277   MARIA_SHARE *share= bitmap->share;
1278   my_bool insert_order=
1279       MY_TEST(share->base.extra_options & MA_EXTRA_OPTIONS_INSERT_ORDER);
1280   DBUG_ENTER("allocate_head");
1281 
1282   DBUG_ASSERT(size <= FULL_PAGE_SIZE(share));
1283 
1284   end= bitmap->map + bitmap->used_size;
1285   if (insert_order && bitmap->page == share->last_insert_bitmap)
1286   {
1287     uint last_insert_page= share->last_insert_page;
1288     uint byte= 6 * (last_insert_page / 16);
1289     first_pattern= last_insert_page % 16;
1290     data= bitmap->map+byte;
1291     first_found= 0;                         /* Don't update full_head_size */
1292     DBUG_ASSERT(data <= end);
1293   }
1294   else
1295     data= bitmap->map + (bitmap->full_head_size/6)*6;
1296 
1297   for (; data < end; data+= 6, first_pattern= 0)
1298   {
1299     ulonglong bits= uint6korr(data);    /* 6 bytes = 6*8/3= 16 patterns */
1300     uint i;
1301 
1302     /*
1303       Skip common patterns
1304       We can skip empty pages (if we already found a match) or
1305       anything matching the following pattern as this will be either
1306       a full page or a tail page
1307     */
1308     if ((!bits && best_data) ||
1309         ((bits & 04444444444444444LL) == 04444444444444444LL))
1310       continue;
1311 
1312     for (i= first_pattern, bits >>= (3 * first_pattern); i < 16 ;
1313 	 i++, bits >>= 3)
1314     {
1315       uint pattern= (uint) (bits & 7);
1316 
1317       if (pattern <= 3)                       /* Room for more data */
1318       {
1319         if (first_found)
1320         {
1321           first_found= 0;
1322           bitmap->full_head_size= (uint)(data - bitmap->map);
1323         }
1324       }
1325       if (pattern <= min_bits)
1326       {
1327         /* There is enough space here, check if we have found better */
1328         if ((int) pattern > (int) best_bits)
1329         {
1330           /*
1331             There is more than enough space here and it's better than what
1332             we have found so far. Remember it, as we will choose it if we
1333             don't find anything in this bitmap page.
1334           */
1335           best_bits= pattern;
1336           best_data= data;
1337           best_pos= i;
1338           if (pattern == min_bits || bitmap->return_first_match)
1339             goto found;                         /* Best possible match */
1340         }
1341       }
1342     }
1343   }
1344   if (!best_data)                               /* Found no place */
1345   {
1346     if (data >= bitmap->map + bitmap->total_size)
1347       DBUG_RETURN(1);                           /* No space in bitmap */
1348     DBUG_ASSERT(uint6korr(data) == 0);
1349     /* Allocate data at end of bitmap */
1350     bitmap->used_size= (uint) (data - bitmap->map) + 6;
1351     best_data= data;
1352     best_pos= best_bits= 0;
1353   }
1354   else
1355   {
1356     /*
1357       This is not stricly needed as used_size should be alligned on 6,
1358        but for easier debugging lets try to keep it more accurate
1359     */
1360     uint position= (uint)  (best_data - bitmap->map) + 6;
1361     set_if_bigger(bitmap->used_size, position);
1362   }
1363   DBUG_ASSERT(bitmap->used_size <= bitmap->total_size);
1364 
1365 found:
1366   if (insert_order)
1367   {
1368     share->last_insert_page=
1369         ((uint) (best_data - bitmap->map)) / 6 * 16 + best_pos;
1370     share->last_insert_bitmap= bitmap->page;
1371   }
1372   fill_block(bitmap, block, best_data, best_pos, best_bits, FULL_HEAD_PAGE);
1373   DBUG_RETURN(0);
1374 }
1375 
1376 
1377 /*
1378   Allocate data for tail block
1379 
1380   SYNOPSIS
1381    allocate_tail()
1382    bitmap       bitmap
1383    size         Size of block we need to find
1384    block        Store found information here
1385 
1386   RETURN
1387    0    ok      (block is updated)
1388    1    error   (no space in bitmap; block is not touched)
1389 */
1390 
1391 
allocate_tail(MARIA_FILE_BITMAP * bitmap,uint size,MARIA_BITMAP_BLOCK * block)1392 static my_bool allocate_tail(MARIA_FILE_BITMAP *bitmap, uint size,
1393                              MARIA_BITMAP_BLOCK *block)
1394 {
1395   uint min_bits= size_to_tail_pattern(bitmap, size);
1396   uchar *data, *end, *best_data= 0;
1397   my_bool first_found= 1;
1398   uint best_bits= (uint) -1, UNINIT_VAR(best_pos);
1399   DBUG_ENTER("allocate_tail");
1400   DBUG_PRINT("enter", ("size: %u", size));
1401 
1402   data= bitmap->map + (bitmap->full_tail_size/6)*6;
1403   end=  bitmap->map + bitmap->used_size;
1404 
1405   /*
1406     We have to add DIR_ENTRY_SIZE here as this is not part of the data size
1407     See call to allocate_tail() in find_tail().
1408   */
1409   DBUG_ASSERT(size <= MAX_TAIL_SIZE(bitmap->block_size) + DIR_ENTRY_SIZE);
1410 
1411   for (; data < end; data += 6)
1412   {
1413     ulonglong bits= uint6korr(data);    /* 6 bytes = 6*8/3= 16 patterns */
1414     uint i;
1415 
1416     /*
1417       Skip common patterns
1418       We can skip empty pages (if we already found a match) or
1419       the following patterns: 1-4 (head pages, not suitable for tail) or
1420       7 (full tail page). See 'Dynamic size records' comment at start of file.
1421 
1422       At the moment we only skip full head and tail pages (ie, all bits are
1423       set) as this is easy to detect with one simple test and is a
1424       quite common case if we have blobs.
1425     */
1426 
1427     if ((!bits && best_data) || bits == 0xffffffffffffLL ||
1428         bits == 04444444444444444LL)
1429       continue;
1430     for (i= 0; i < 16; i++, bits >>= 3)
1431     {
1432       uint pattern= (uint) (bits & 7);
1433 
1434       if (pattern == 0 ||
1435           (pattern > FULL_HEAD_PAGE && pattern < FULL_TAIL_PAGE))
1436       {
1437         /* There is room for tail data */
1438         if (first_found)
1439         {
1440           first_found= 0;
1441           bitmap->full_tail_size= (uint)(data - bitmap->map);
1442         }
1443       }
1444 
1445       if (pattern <= min_bits && (!pattern || pattern > FULL_HEAD_PAGE))
1446       {
1447         if ((int) pattern > (int) best_bits)
1448         {
1449           best_bits= pattern;
1450           best_data= data;
1451           best_pos= i;
1452           if (pattern == min_bits)
1453             goto found;                         /* Can't be better */
1454         }
1455       }
1456     }
1457   }
1458   if (!best_data)
1459   {
1460     if (data >= bitmap->map + bitmap->total_size)
1461       DBUG_RETURN(1);
1462     DBUG_ASSERT(uint6korr(data) == 0);
1463     /* Allocate data at end of bitmap */
1464     best_data= data;
1465     bitmap->used_size= (uint) (data - bitmap->map) + 6;
1466     DBUG_ASSERT(bitmap->used_size <= bitmap->total_size);
1467     best_pos= best_bits= 0;
1468   }
1469 
1470 found:
1471   fill_block(bitmap, block, best_data, best_pos, best_bits, FULL_TAIL_PAGE);
1472   DBUG_RETURN(0);
1473 }
1474 
1475 
1476 /*
1477   Allocate data for full blocks
1478 
1479   SYNOPSIS
1480    allocate_full_pages()
1481    bitmap       bitmap
1482    pages_needed Total size in pages (bitmap->total_size) we would like to have
1483    block        Store found information here
1484    full_page    1 if we are not allowed to split extent
1485 
1486   IMPLEMENTATION
1487     We will return the smallest area >= size.  If there is no such
1488     block, we will return the biggest area that satisfies
1489     area_size >= MY_MIN(BLOB_SEGMENT_MIN_SIZE*full_page_size, size)
1490 
1491     To speed up searches, we will only consider areas that has at least 16 free
1492     pages starting on an even boundary.  When finding such an area, we will
1493     extend it with all previous and following free pages.  This will ensure
1494     we don't get holes between areas
1495 
1496   RETURN
1497    #            Blocks used
1498    0            error   (no space in bitmap; block is not touched)
1499 */
1500 
allocate_full_pages(MARIA_FILE_BITMAP * bitmap,ulong pages_needed,MARIA_BITMAP_BLOCK * block,my_bool full_page)1501 static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap,
1502                                  ulong pages_needed,
1503                                  MARIA_BITMAP_BLOCK *block, my_bool full_page)
1504 {
1505   uchar *data, *data_end, *page_end;
1506   uchar *best_data= 0;
1507   uint min_size;
1508   uint best_area_size, UNINIT_VAR(best_prefix_area_size);
1509   uint page, size;
1510   ulonglong UNINIT_VAR(best_prefix_bits);
1511   DBUG_ENTER("allocate_full_pages");
1512   DBUG_PRINT("enter", ("pages_needed: %lu", pages_needed));
1513 
1514   min_size= pages_needed;
1515   if (!full_page && min_size > BLOB_SEGMENT_MIN_SIZE)
1516     min_size= BLOB_SEGMENT_MIN_SIZE;
1517   best_area_size= ~(uint) 0;
1518 
1519   data= bitmap->map + (bitmap->full_head_size/6)*6;
1520   data_end= bitmap->map + bitmap->used_size;
1521   page_end= bitmap->map + bitmap->total_size;
1522 
1523   for (; data < page_end; data+= 6)
1524   {
1525     ulonglong bits= uint6korr(data);    /* 6 bytes = 6*8/3= 16 patterns */
1526     uchar *data_start;
1527     ulonglong prefix_bits= 0;
1528     uint area_size, prefix_area_size, suffix_area_size;
1529 
1530     /* Find area with at least 16 free pages */
1531     if (bits)
1532       continue;
1533     data_start= data;
1534     /* Find size of area */
1535     for (data+=6 ; data < data_end ; data+= 6)
1536     {
1537       if ((bits= uint6korr(data)))
1538         break;
1539     }
1540     /*
1541       Check if we are end of bitmap. In this case we know that
1542       the rest of the bitmap is usable
1543     */
1544     if (data >= data_end)
1545       data= page_end;
1546     area_size= (uint) (data - data_start) / 6 * 16;
1547     if (area_size >= best_area_size)
1548       continue;
1549     prefix_area_size= suffix_area_size= 0;
1550     if (!bits)
1551     {
1552       /*
1553         End of page; All the rest of the bits on page are part of area
1554         This is needed because bitmap->used_size only covers the set bits
1555         in the bitmap.
1556       */
1557       area_size+= (uint) (page_end - data) / 6 * 16;
1558       if (area_size >= best_area_size)
1559         break;
1560       data= page_end;
1561     }
1562     else
1563     {
1564       /* Add bits at end of page */
1565       for (; !(bits & 7); bits >>= 3)
1566         suffix_area_size++;
1567       area_size+= suffix_area_size;
1568     }
1569     if (data_start != bitmap->map)
1570     {
1571       /* Add bits before page */
1572       bits= prefix_bits= uint6korr(data_start - 6);
1573       DBUG_ASSERT(bits != 0);
1574       /* 111 000 000 000 000 000 000 000 000 000 000 000 000 000 000 000 */
1575       if (!(bits & 07000000000000000LL))
1576       {
1577         data_start-= 6;
1578         do
1579         {
1580           prefix_area_size++;
1581           bits<<= 3;
1582         } while (!(bits & 07000000000000000LL));
1583         area_size+= prefix_area_size;
1584         /* Calculate offset to page from data_start */
1585         prefix_area_size= 16 - prefix_area_size;
1586       }
1587     }
1588     if (area_size >= min_size && area_size <= best_area_size)
1589     {
1590       best_data= data_start;
1591       best_area_size= area_size;
1592       best_prefix_bits= prefix_bits;
1593       best_prefix_area_size= prefix_area_size;
1594 
1595       /* Prefer to put data in biggest possible area */
1596       if (area_size <= pages_needed)
1597         min_size= area_size;
1598       else
1599         min_size= pages_needed;
1600     }
1601   }
1602   if (!best_data)
1603     DBUG_RETURN(0);                             /* No room on page */
1604 
1605   /*
1606     Now allocate MY_MIN(pages_needed, area_size), starting from
1607     best_start + best_prefix_area_size
1608   */
1609   if (best_area_size > pages_needed)
1610     best_area_size= pages_needed;
1611 
1612   /* For each 6 bytes we have 6*8/3= 16 patterns */
1613   page= ((uint) (best_data - bitmap->map) * 8) / 3 + best_prefix_area_size;
1614   block->page= bitmap->page + 1 + page;
1615   block->page_count= best_area_size;
1616   block->empty_space= 0;
1617   block->sub_blocks= 0;
1618   block->org_bitmap_value= 0;
1619   block->used= 0;
1620   DBUG_ASSERT(page + best_area_size < bitmap->pages_covered);
1621   DBUG_PRINT("info", ("page: %lu  page_count: %u",
1622                       (ulong) block->page, block->page_count));
1623 
1624   if (best_prefix_area_size)
1625   {
1626     ulonglong tmp;
1627     /* Convert offset back to bits */
1628     best_prefix_area_size= 16 - best_prefix_area_size;
1629     if (best_area_size < best_prefix_area_size)
1630     {
1631       tmp= (1LL << best_area_size*3) - 1;
1632       best_area_size= best_prefix_area_size;    /* for easy end test */
1633     }
1634     else
1635       tmp= (1LL << best_prefix_area_size*3) - 1;
1636     tmp<<= (16 - best_prefix_area_size) * 3;
1637     DBUG_ASSERT((best_prefix_bits & tmp) == 0);
1638     best_prefix_bits|= tmp;
1639     int6store(best_data, best_prefix_bits);
1640     if (!(best_area_size-= best_prefix_area_size))
1641       goto end;
1642     best_data+= 6;
1643   }
1644   best_area_size*= 3;                       /* Bits to set */
1645   size= best_area_size/8;                   /* Bytes to set */
1646   bfill(best_data, size, 255);
1647   best_data+= size;
1648   if ((best_area_size-= size * 8))
1649   {
1650     /* fill last uchar */
1651     *best_data|= (uchar) ((1 << best_area_size) -1);
1652     best_data++;
1653   }
1654   if (data_end < best_data)
1655   {
1656     bitmap->used_size= (uint) (best_data - bitmap->map);
1657     DBUG_ASSERT(bitmap->used_size <= bitmap->total_size);
1658   }
1659 end:
1660   bitmap->changed= 1;
1661   DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
1662   DBUG_RETURN(block->page_count);
1663 }
1664 
1665 
1666 /****************************************************************************
1667   Find right bitmaps where to store data
1668 ****************************************************************************/
1669 
1670 /*
1671   Find right bitmap and position for head block
1672 
1673   SYNOPSIS
1674     find_head()
1675     info		Maria handler
1676     length	        Size of data region we need store
1677     position		Position in bitmap_blocks where to store the
1678 			information for the head block.
1679 
1680   RETURN
1681     0  ok
1682     1  error
1683 */
1684 
find_head(MARIA_HA * info,uint length,uint position)1685 static my_bool find_head(MARIA_HA *info, uint length, uint position)
1686 {
1687   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
1688   MARIA_BITMAP_BLOCK *block;
1689   /*
1690     There is always place for the head block in bitmap_blocks as these are
1691     preallocated at _ma_init_block_record().
1692   */
1693   block= dynamic_element(&info->bitmap_blocks, position, MARIA_BITMAP_BLOCK *);
1694 
1695   if (info->s->base.extra_options & MA_EXTRA_OPTIONS_INSERT_ORDER)
1696   {
1697     if (bitmap->page != info->s->last_insert_bitmap &&
1698         _ma_change_bitmap_page(info, bitmap,
1699                                info->s->last_insert_bitmap))
1700       return 1;
1701     /* Don't allocate any blocks from earlier pages */
1702     info->s->state.first_bitmap_with_space= info->s->last_insert_bitmap;
1703   }
1704 
1705   /*
1706     We need to have DIRENTRY_SIZE here to take into account that we may
1707     need an extra directory entry for the row
1708   */
1709   while (allocate_head(bitmap, length + DIR_ENTRY_SIZE, block))
1710     if (move_to_next_bitmap(info, bitmap))
1711       return 1;
1712   return 0;
1713 }
1714 
1715 
1716 /*
1717   Find right bitmap and position for tail
1718 
1719   SYNOPSIS
1720     find_tail()
1721     info		Maria handler
1722     length	        Size of data region we need store
1723     position		Position in bitmap_blocks where to store the
1724 			information for the head block.
1725 
1726   RETURN
1727     0  ok
1728     1  error
1729 */
1730 
find_tail(MARIA_HA * info,uint length,uint position)1731 static my_bool find_tail(MARIA_HA *info, uint length, uint position)
1732 {
1733   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
1734   MARIA_BITMAP_BLOCK *block;
1735   DBUG_ENTER("find_tail");
1736   DBUG_ASSERT(length <= info->s->block_size - PAGE_OVERHEAD_SIZE(info->s));
1737 
1738   /* Needed, as there is no error checking in dynamic_element */
1739   if (allocate_dynamic(&info->bitmap_blocks, position))
1740     DBUG_RETURN(1);
1741   block= dynamic_element(&info->bitmap_blocks, position, MARIA_BITMAP_BLOCK *);
1742 
1743   /*
1744     We have to add DIR_ENTRY_SIZE to ensure we have space for the tail and
1745     it's directroy entry on the page
1746   */
1747   while (allocate_tail(bitmap, length + DIR_ENTRY_SIZE, block))
1748     if (move_to_next_bitmap(info, bitmap))
1749       DBUG_RETURN(1);
1750   DBUG_RETURN(0);
1751 }
1752 
1753 
1754 /*
1755   Find right bitmap and position for full blocks in one extent
1756 
1757   SYNOPSIS
1758     find_mid()
1759     info		Maria handler.
1760     pages	        How many pages to allocate.
1761     position		Position in bitmap_blocks where to store the
1762 			information for the head block.
1763   NOTES
1764     This is used to allocate the main extent after the 'head' block
1765     (Ie, the middle part of the head-middle-tail entry)
1766 
1767   RETURN
1768     0  ok
1769     1  error
1770 */
1771 
find_mid(MARIA_HA * info,ulong pages,uint position)1772 static my_bool find_mid(MARIA_HA *info, ulong pages, uint position)
1773 {
1774   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
1775   MARIA_BITMAP_BLOCK *block;
1776   block= dynamic_element(&info->bitmap_blocks, position, MARIA_BITMAP_BLOCK *);
1777 
1778   while (!allocate_full_pages(bitmap, pages, block, 1))
1779   {
1780     if (move_to_next_bitmap(info, bitmap))
1781       return 1;
1782   }
1783   return 0;
1784 }
1785 
1786 
1787 /*
1788   Find right bitmap and position for putting a blob
1789 
1790   SYNOPSIS
1791     find_blob()
1792     info		Maria handler.
1793     length		Length of the blob
1794 
1795   NOTES
1796     The extents are stored last in info->bitmap_blocks
1797 
1798   IMPLEMENTATION
1799     Allocate all full pages for the block + optionally one tail
1800 
1801   RETURN
1802     0  ok
1803     1  error
1804 */
1805 
find_blob(MARIA_HA * info,ulong length)1806 static my_bool find_blob(MARIA_HA *info, ulong length)
1807 {
1808   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
1809   uint full_page_size= FULL_PAGE_SIZE(info->s);
1810   ulong pages;
1811   uint rest_length, used;
1812   uint UNINIT_VAR(first_block_pos);
1813   MARIA_BITMAP_BLOCK *first_block= 0;
1814   DBUG_ENTER("find_blob");
1815   DBUG_PRINT("enter", ("length: %lu", length));
1816 
1817   pages= length / full_page_size;
1818   rest_length= (uint) (length - pages * full_page_size);
1819   if (rest_length >= MAX_TAIL_SIZE(info->s->block_size))
1820   {
1821     pages++;
1822     rest_length= 0;
1823   }
1824 
1825   first_block_pos= info->bitmap_blocks.elements;
1826   if (pages)
1827   {
1828     MARIA_BITMAP_BLOCK *block;
1829     if (allocate_dynamic(&info->bitmap_blocks,
1830                          info->bitmap_blocks.elements +
1831                          pages / BLOB_SEGMENT_MIN_SIZE + 2))
1832       DBUG_RETURN(1);
1833     block= dynamic_element(&info->bitmap_blocks, info->bitmap_blocks.elements,
1834                            MARIA_BITMAP_BLOCK*);
1835     do
1836     {
1837       /*
1838         We use 0x3fff here as the two upmost bits are reserved for
1839         TAIL_BIT and START_EXTENT_BIT
1840       */
1841       used= allocate_full_pages(bitmap,
1842                                 (pages >= 0x3fff ? 0x3fff : (uint) pages),
1843                                 block, 0);
1844       if (!used)
1845       {
1846         if (move_to_next_bitmap(info, bitmap))
1847           DBUG_RETURN(1);
1848       }
1849       else
1850       {
1851         pages-= used;
1852         info->bitmap_blocks.elements++;
1853         block++;
1854       }
1855     } while (pages != 0);
1856   }
1857   if (rest_length && find_tail(info, rest_length,
1858                                info->bitmap_blocks.elements++))
1859     DBUG_RETURN(1);
1860   first_block= dynamic_element(&info->bitmap_blocks, first_block_pos,
1861                                MARIA_BITMAP_BLOCK*);
1862   first_block->sub_blocks= info->bitmap_blocks.elements - first_block_pos;
1863   DBUG_RETURN(0);
1864 }
1865 
1866 
1867 /*
1868   Find pages to put ALL blobs
1869 
1870   SYNOPSIS
1871   allocate_blobs()
1872   info		Maria handler
1873   row		Information of what is in the row (from calc_record_size())
1874 
1875   RETURN
1876    0    ok
1877    1    error
1878 */
1879 
allocate_blobs(MARIA_HA * info,MARIA_ROW * row)1880 static my_bool allocate_blobs(MARIA_HA *info, MARIA_ROW *row)
1881 {
1882   ulong *length, *end;
1883   uint elements;
1884   /*
1885     Reserve size for:
1886     head block
1887     one extent
1888     tail block
1889   */
1890   elements= info->bitmap_blocks.elements;
1891   for (length= row->blob_lengths, end= length + info->s->base.blobs;
1892        length < end; length++)
1893   {
1894     if (*length && find_blob(info, *length))
1895       return 1;
1896   }
1897   row->extents_count= (info->bitmap_blocks.elements - elements);
1898   return 0;
1899 }
1900 
1901 
1902 /*
1903   Reserve the current head page
1904 
1905   SYNOPSIS
1906     use_head()
1907     info		Maria handler
1908     page		Page number to update
1909 			(Note that caller guarantees this is in the active
1910                         bitmap)
1911     size		How much free space is left on the page
1912     block_position	In which info->bitmap_block we have the
1913 			information about the head block.
1914 
1915   NOTES
1916     This is used on update where we are updating an existing head page
1917 */
1918 
use_head(MARIA_HA * info,pgcache_page_no_t page,uint size,uint block_position)1919 static void use_head(MARIA_HA *info, pgcache_page_no_t page, uint size,
1920                      uint block_position)
1921 {
1922   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
1923   MARIA_BITMAP_BLOCK *block;
1924   uchar *data;
1925   uint offset, tmp, offset_page;
1926   DBUG_ENTER("use_head");
1927 
1928   DBUG_ASSERT(page % bitmap->pages_covered);
1929 
1930   block= dynamic_element(&info->bitmap_blocks, block_position,
1931                          MARIA_BITMAP_BLOCK*);
1932   block->page= page;
1933   block->page_count= 1 + TAIL_BIT;
1934   block->empty_space= size;
1935   block->used= BLOCKUSED_TAIL;
1936 
1937   /*
1938     Mark place used by reading/writing 2 bytes at a time to handle
1939     bitmaps in overlapping bytes
1940   */
1941   offset_page= (uint) (page - bitmap->page - 1) * 3;
1942   offset= offset_page & 7;
1943   data= bitmap->map + offset_page / 8;
1944   tmp= uint2korr(data);
1945   block->org_bitmap_value= (tmp >> offset) & 7;
1946   tmp= (tmp & ~(7 << offset)) | (FULL_HEAD_PAGE << offset);
1947   int2store(data, tmp);
1948   bitmap->changed= 1;
1949   DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
1950   DBUG_VOID_RETURN;
1951 }
1952 
1953 
1954 /*
1955   Find out where to split the row (ie, what goes in head, middle, tail etc)
1956 
1957   SYNOPSIS
1958     find_where_to_split_row()
1959     share           Maria share
1960     row		    Information of what is in the row (from calc_record_size())
1961     extents         Max number of extents we have to store in header
1962     split_size	    Free size on the page (The head length must be less
1963                     than this)
1964 
1965   RETURN
1966     row_length for the head block.
1967 */
1968 
find_where_to_split_row(MARIA_SHARE * share,MARIA_ROW * row,uint extents,uint split_size)1969 static uint find_where_to_split_row(MARIA_SHARE *share, MARIA_ROW *row,
1970                                     uint extents, uint split_size)
1971 {
1972   uint *lengths, *lengths_end;
1973   /*
1974     Ensure we have the minimum required space on head page:
1975     - Header + length of field lengths (row->min_length)
1976     - Number of extents
1977     - One extent
1978   */
1979   uint row_length= (row->min_length +
1980                     size_to_store_key_length(extents) +
1981                     ROW_EXTENT_SIZE);
1982   DBUG_ASSERT(row_length <= split_size);
1983 
1984   /*
1985     Store first in all_field_lengths the different parts that are written
1986     to the row. This needs to be in same order as in
1987     ma_block_rec.c::write_block_record()
1988   */
1989   row->null_field_lengths[-3]= extents * ROW_EXTENT_SIZE;
1990   row->null_field_lengths[-2]= share->base.fixed_not_null_fields_length;
1991   row->null_field_lengths[-1]= row->field_lengths_length;
1992   for (lengths= row->null_field_lengths - EXTRA_LENGTH_FIELDS,
1993        lengths_end= (lengths + share->base.fields - share->base.blobs +
1994                      EXTRA_LENGTH_FIELDS); lengths < lengths_end; lengths++)
1995   {
1996     if (row_length + *lengths > split_size)
1997       break;
1998     row_length+= *lengths;
1999   }
2000   return row_length;
2001 }
2002 
2003 
2004 /*
2005   Find where to write the middle parts of the row and the tail
2006 
2007   SYNOPSIS
2008     write_rest_of_head()
2009     info	Maria handler
2010     position    Position in bitmap_blocks. Is 0 for rows that needs
2011                 full blocks (ie, has a head, middle part and optional tail)
2012    rest_length  How much left of the head block to write.
2013 
2014   RETURN
2015     0  ok
2016     1  error
2017 */
2018 
write_rest_of_head(MARIA_HA * info,uint position,ulong rest_length)2019 static my_bool write_rest_of_head(MARIA_HA *info, uint position,
2020                                   ulong rest_length)
2021 {
2022   MARIA_SHARE *share= info->s;
2023   uint full_page_size= FULL_PAGE_SIZE(share);
2024   MARIA_BITMAP_BLOCK *block;
2025   DBUG_ENTER("write_rest_of_head");
2026   DBUG_PRINT("enter", ("position: %u  rest_length: %lu", position,
2027                        rest_length));
2028 
2029   if (position == 0)
2030   {
2031     /* Write out full pages */
2032     uint pages= rest_length / full_page_size;
2033 
2034     rest_length%= full_page_size;
2035     if (rest_length >= MAX_TAIL_SIZE(share->block_size))
2036     {
2037       /* Put tail on a full page */
2038       pages++;
2039       rest_length= 0;
2040     }
2041     if (find_mid(info, pages, 1))
2042       DBUG_RETURN(1);
2043     /*
2044       Insert empty block after full pages, to allow write_block_record() to
2045       split segment into used + free page
2046     */
2047     block= dynamic_element(&info->bitmap_blocks, 2, MARIA_BITMAP_BLOCK*);
2048     block->page_count= 0;
2049     block->used= 0;
2050   }
2051   if (rest_length)
2052   {
2053     if (find_tail(info, rest_length, ELEMENTS_RESERVED_FOR_MAIN_PART - 1))
2054       DBUG_RETURN(1);
2055   }
2056   else
2057   {
2058     /* Empty tail block */
2059     block= dynamic_element(&info->bitmap_blocks,
2060                            ELEMENTS_RESERVED_FOR_MAIN_PART - 1,
2061                            MARIA_BITMAP_BLOCK *);
2062     block->page_count= 0;
2063     block->used= 0;
2064   }
2065   DBUG_RETURN(0);
2066 }
2067 
2068 
2069 /*
2070   Find where to store one row
2071 
2072   SYNPOSIS
2073     _ma_bitmap_find_place()
2074     info                  Maria handler
2075     row                   Information about row to write
2076     blocks                Store data about allocated places here
2077 
2078   RETURN
2079     0  ok
2080        row->space_on_head_page contains minimum number of bytes we
2081        expect to put on the head page.
2082     1  error
2083        my_errno is set to error
2084 */
2085 
_ma_bitmap_find_place(MARIA_HA * info,MARIA_ROW * row,MARIA_BITMAP_BLOCKS * blocks)2086 my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
2087                               MARIA_BITMAP_BLOCKS *blocks)
2088 {
2089   MARIA_SHARE *share= info->s;
2090   my_bool res= 1;
2091   uint full_page_size, position, max_page_size;
2092   uint head_length, row_length, rest_length, extents_length;
2093   DBUG_ENTER("_ma_bitmap_find_place");
2094 
2095   blocks->count= 0;
2096   blocks->tail_page_skipped= blocks->page_skipped= 0;
2097   row->extents_count= 0;
2098 
2099   /*
2100     Reserve place for the following blocks:
2101      - Head block
2102      - Full page block
2103      - Marker block to allow write_block_record() to split full page blocks
2104        into full and free part
2105      - Tail block
2106   */
2107 
2108   info->bitmap_blocks.elements= ELEMENTS_RESERVED_FOR_MAIN_PART;
2109   max_page_size= (share->block_size - PAGE_OVERHEAD_SIZE(share));
2110 
2111   mysql_mutex_lock(&share->bitmap.bitmap_lock);
2112 
2113   if (row->total_length <= max_page_size)
2114   {
2115     /* Row fits in one page */
2116     position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1;
2117     if (find_head(info, (uint) row->total_length, position))
2118       goto abort;
2119     row->space_on_head_page= row->total_length;
2120     goto end;
2121   }
2122 
2123   /*
2124     First allocate all blobs so that we can find out the needed size for
2125     the main block.
2126   */
2127   if (row->blob_length && allocate_blobs(info, row))
2128     goto abort;
2129 
2130   extents_length= row->extents_count * ROW_EXTENT_SIZE;
2131   /*
2132     The + 3 is reserved for storing the number of segments in the row header.
2133   */
2134   if ((head_length= (row->head_length + extents_length + 3)) <=
2135       max_page_size)
2136   {
2137     /* Main row part fits into one page */
2138     position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1;
2139     if (find_head(info, head_length, position))
2140       goto abort;
2141     row->space_on_head_page= head_length;
2142     goto end;
2143   }
2144 
2145   /* Allocate enough space */
2146   head_length+= ELEMENTS_RESERVED_FOR_MAIN_PART * ROW_EXTENT_SIZE;
2147 
2148   /* The first segment size is stored in 'row_length' */
2149   row_length= find_where_to_split_row(share, row, row->extents_count +
2150                                       ELEMENTS_RESERVED_FOR_MAIN_PART-1,
2151                                       max_page_size);
2152 
2153   full_page_size= MAX_TAIL_SIZE(share->block_size);
2154   position= 0;
2155   rest_length= head_length - row_length;
2156   if (rest_length <= full_page_size)
2157     position= ELEMENTS_RESERVED_FOR_MAIN_PART -2;    /* Only head and tail */
2158   if (find_head(info, row_length, position))
2159     goto abort;
2160   row->space_on_head_page= row_length;
2161 
2162   if (write_rest_of_head(info, position, rest_length))
2163     goto abort;
2164 
2165 end:
2166   blocks->block= dynamic_element(&info->bitmap_blocks, position,
2167                                  MARIA_BITMAP_BLOCK*);
2168   blocks->block->sub_blocks= ELEMENTS_RESERVED_FOR_MAIN_PART - position;
2169   /* First block's page_count is for all blocks */
2170   blocks->count= info->bitmap_blocks.elements - position;
2171   res= 0;
2172 
2173 abort:
2174   mysql_mutex_unlock(&share->bitmap.bitmap_lock);
2175   DBUG_RETURN(res);
2176 }
2177 
2178 
2179 /*
2180   Find where to put row on update (when head page is already defined)
2181 
2182   SYNPOSIS
2183     _ma_bitmap_find_new_place()
2184     info                  Maria handler
2185     row                   Information about row to write
2186     page                  On which page original row was stored
2187     free_size             Free size on head page
2188     blocks                Store data about allocated places here
2189 
2190   NOTES
2191    This function is only called when the new row can't fit in the space of
2192    the old row in the head page.
2193 
2194    This is essently same as _ma_bitmap_find_place() except that
2195    we don't call find_head() to search in bitmaps where to put the page.
2196 
2197   RETURN
2198     0  ok
2199     1  error
2200 */
2201 
_ma_bitmap_find_new_place(MARIA_HA * info,MARIA_ROW * row,pgcache_page_no_t page,uint free_size,MARIA_BITMAP_BLOCKS * blocks)2202 my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *row,
2203                                   pgcache_page_no_t page, uint free_size,
2204                                   MARIA_BITMAP_BLOCKS *blocks)
2205 {
2206   MARIA_SHARE *share= info->s;
2207   my_bool res= 1;
2208   uint position;
2209   uint head_length, row_length, rest_length, extents_length;
2210   ulonglong bitmap_page;
2211   DBUG_ENTER("_ma_bitmap_find_new_place");
2212 
2213   blocks->count= 0;
2214   blocks->tail_page_skipped= blocks->page_skipped= 0;
2215   row->extents_count= 0;
2216   info->bitmap_blocks.elements= ELEMENTS_RESERVED_FOR_MAIN_PART;
2217 
2218   mysql_mutex_lock(&share->bitmap.bitmap_lock);
2219 
2220   /*
2221     First allocate all blobs (so that we can find out the needed size for
2222     the main block.
2223   */
2224   if (row->blob_length && allocate_blobs(info, row))
2225     goto abort;
2226 
2227   /* Switch bitmap to current head page */
2228   bitmap_page= page - page % share->bitmap.pages_covered;
2229 
2230   if (share->bitmap.page != bitmap_page &&
2231       _ma_change_bitmap_page(info, &share->bitmap, bitmap_page))
2232     goto abort;
2233 
2234   extents_length= row->extents_count * ROW_EXTENT_SIZE;
2235   if ((head_length= (row->head_length + extents_length + 3)) <= free_size)
2236   {
2237     /* Main row part fits into one page */
2238     position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1;
2239     use_head(info, page, head_length, position);
2240     row->space_on_head_page= head_length;
2241     goto end;
2242   }
2243 
2244   /* Allocate enough space */
2245   head_length+= ELEMENTS_RESERVED_FOR_MAIN_PART * ROW_EXTENT_SIZE;
2246 
2247   /*
2248     The first segment size is stored in 'row_length'
2249     We have to add ELEMENTS_RESERVED_FOR_MAIN_PART here as the extent
2250     information may be up to this size when the header splits.
2251   */
2252   row_length= find_where_to_split_row(share, row, row->extents_count +
2253                                       ELEMENTS_RESERVED_FOR_MAIN_PART-1,
2254                                       free_size);
2255 
2256   position= 0;
2257   rest_length= head_length - row_length;
2258   if (rest_length <= MAX_TAIL_SIZE(share->block_size))
2259     position= ELEMENTS_RESERVED_FOR_MAIN_PART -2;    /* Only head and tail */
2260   use_head(info, page, row_length, position);
2261   row->space_on_head_page= row_length;
2262 
2263   if (write_rest_of_head(info, position, rest_length))
2264     goto abort;
2265 
2266 end:
2267   blocks->block= dynamic_element(&info->bitmap_blocks, position,
2268                                  MARIA_BITMAP_BLOCK*);
2269   blocks->block->sub_blocks= ELEMENTS_RESERVED_FOR_MAIN_PART - position;
2270   /* First block's page_count is for all blocks */
2271   blocks->count= info->bitmap_blocks.elements - position;
2272   res= 0;
2273 
2274 abort:
2275   mysql_mutex_unlock(&share->bitmap.bitmap_lock);
2276   DBUG_RETURN(res);
2277 }
2278 
2279 
2280 /****************************************************************************
2281   Clear and reset bits
2282 ****************************************************************************/
2283 
2284 /*
2285   Set fill pattern for a page
2286 
2287   set_page_bits()
2288   info		Maria handler
2289   bitmap	Bitmap handler
2290   page		Adress to page
2291   fill_pattern  Pattern (not size) for page
2292 
2293   NOTES
2294     Page may not be part of active bitmap
2295 
2296   RETURN
2297     0  ok
2298     1  error
2299 */
2300 
set_page_bits(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page,uint fill_pattern)2301 static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
2302                              pgcache_page_no_t page, uint fill_pattern)
2303 {
2304   pgcache_page_no_t bitmap_page;
2305   uint offset_page, offset, tmp, org_tmp, used_offset;
2306   uchar *data;
2307   DBUG_ENTER("set_page_bits");
2308   DBUG_ASSERT(fill_pattern <= 7);
2309 
2310   bitmap_page= page - page % bitmap->pages_covered;
2311   if (bitmap_page != bitmap->page &&
2312       _ma_change_bitmap_page(info, bitmap, bitmap_page))
2313     DBUG_RETURN(1);
2314 
2315   /* Find page number from start of bitmap */
2316   offset_page= (uint) (page - bitmap->page - 1);
2317 
2318   /*
2319     Mark place used by reading/writing 2 bytes at a time to handle
2320     bitmaps in overlapping bytes
2321   */
2322   offset_page*= 3;
2323   offset= offset_page & 7;
2324   data= bitmap->map + offset_page / 8;
2325   org_tmp= tmp= uint2korr(data);
2326   tmp= (tmp & ~(7 << offset)) | (fill_pattern << offset);
2327   if (tmp == org_tmp)
2328     DBUG_RETURN(0);                             /* No changes */
2329 
2330   /*
2331     Take care to not write bytes outside of bitmap.
2332     fill_pattern is 3 bits, so we need to write two bytes
2333     if bit position we write to is > (8-3)
2334    */
2335   if (offset > 5)
2336     int2store(data, tmp);
2337   else
2338     data[0]= tmp;
2339 
2340   /*
2341     Reset full_head_size or full_tail_size if we are releasing data before
2342     it. Increase used_size if we are allocating data.
2343   */
2344   used_offset= (uint) (data - bitmap->map);
2345   if (fill_pattern < 4)
2346     set_if_smaller(bitmap->full_head_size, used_offset);
2347   if (fill_pattern == 0 || (fill_pattern > 4 && fill_pattern < 7))
2348     set_if_smaller(bitmap->full_tail_size, used_offset);
2349   if (fill_pattern != 0)
2350   {
2351     /* Calulcate which was the last changed byte */
2352     used_offset+= offset > 5 ? 2 : 1;
2353     set_if_bigger(bitmap->used_size, used_offset);
2354   }
2355 
2356   _ma_check_bitmap(bitmap);
2357   bitmap->changed= 1;
2358   DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
2359   if (fill_pattern != FULL_HEAD_PAGE && fill_pattern != FULL_TAIL_PAGE)
2360     set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page);
2361   /*
2362     Note that if the condition above is false (page is full), and all pages of
2363     this bitmap are now full, and that bitmap page was
2364     first_bitmap_with_space, we don't modify first_bitmap_with_space, indeed
2365     its value still tells us where to start our search for a bitmap with space
2366     (which is for sure after this full one).
2367     That does mean that first_bitmap_with_space is only a lower bound.
2368   */
2369   DBUG_RETURN(0);
2370 }
2371 
2372 
2373 /*
2374   Get bitmap pattern for a given page
2375 
2376   SYNOPSIS
2377     bitmap_get_page_bits()
2378     info	Maria handler
2379     bitmap	Bitmap handler
2380     page	Page number
2381 
2382   RETURN
2383     0-7		Bitmap pattern
2384     ~0		Error (couldn't read page)
2385 */
2386 
bitmap_get_page_bits(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page)2387 static uint bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
2388                                  pgcache_page_no_t page)
2389 {
2390   pgcache_page_no_t bitmap_page;
2391   uint offset_page, offset, tmp;
2392   uchar *data;
2393   DBUG_ENTER("_ma_bitmap_get_page_bits");
2394 
2395   bitmap_page= page - page % bitmap->pages_covered;
2396   if (bitmap_page != bitmap->page &&
2397       _ma_change_bitmap_page(info, bitmap, bitmap_page))
2398     DBUG_RETURN(~ (uint) 0);
2399 
2400   /* Find page number from start of bitmap */
2401   offset_page= (uint) (page - bitmap->page - 1);
2402   /*
2403     Mark place used by reading/writing 2 bytes at a time to handle
2404     bitmaps in overlapping bytes
2405   */
2406   offset_page*= 3;
2407   offset= offset_page & 7;
2408   data= bitmap->map + offset_page / 8;
2409   tmp= uint2korr(data);
2410   DBUG_RETURN((tmp >> offset) & 7);
2411 }
2412 
2413 
2414 /* As above, but take a lock while getting the data */
2415 
_ma_bitmap_get_page_bits(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page)2416 uint _ma_bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
2417                               pgcache_page_no_t page)
2418 {
2419   uint tmp;
2420   mysql_mutex_lock(&bitmap->bitmap_lock);
2421   tmp= bitmap_get_page_bits(info, bitmap, page);
2422   mysql_mutex_unlock(&bitmap->bitmap_lock);
2423   return tmp;
2424 }
2425 
2426 
2427 /*
2428   Mark all pages in a region as free
2429 
2430   SYNOPSIS
2431     _ma_bitmap_reset_full_page_bits()
2432     info                Maria handler
2433     bitmap              Bitmap handler
2434     page                Start page
2435     page_count          Number of pages
2436 
2437   NOTES
2438     We assume that all pages in region is covered by same bitmap
2439     One must have a lock on info->s->bitmap.bitmap_lock
2440 
2441   RETURN
2442     0  ok
2443     1  Error (when reading bitmap)
2444 */
2445 
_ma_bitmap_reset_full_page_bits(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page,uint page_count)2446 my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info,
2447                                         MARIA_FILE_BITMAP *bitmap,
2448                                         pgcache_page_no_t page,
2449                                         uint page_count)
2450 {
2451   ulonglong bitmap_page;
2452   uint offset, bit_start, bit_count, tmp, byte_offset;
2453   uchar *data;
2454   DBUG_ENTER("_ma_bitmap_reset_full_page_bits");
2455   DBUG_PRINT("enter", ("page: %lu  page_count: %u", (ulong) page, page_count));
2456   mysql_mutex_assert_owner(&info->s->bitmap.bitmap_lock);
2457 
2458   bitmap_page= page - page % bitmap->pages_covered;
2459   DBUG_ASSERT(page != bitmap_page);
2460 
2461   if (bitmap_page != bitmap->page &&
2462       _ma_change_bitmap_page(info, bitmap, bitmap_page))
2463     DBUG_RETURN(1);
2464 
2465   /* Find page number from start of bitmap */
2466   offset= (uint) (page - bitmap->page - 1);
2467 
2468   /* Clear bits from 'page * 3' -> '(page + page_count) * 3' */
2469   bit_start= offset * 3;
2470   bit_count= page_count * 3;
2471 
2472   byte_offset= bit_start/8;
2473   data= bitmap->map + byte_offset;
2474   offset= bit_start & 7;
2475 
2476   tmp= (255 << offset);                         /* Bits to keep */
2477   if (bit_count + offset < 8)
2478   {
2479     /* Only clear bits between 'offset' and 'offset+bit_count-1' */
2480     tmp^= (255 << (offset + bit_count));
2481   }
2482   *data&= ~tmp;
2483 
2484   set_if_smaller(bitmap->full_head_size, byte_offset);
2485   set_if_smaller(bitmap->full_tail_size, byte_offset);
2486 
2487   if ((int) (bit_count-= (8 - offset)) > 0)
2488   {
2489     uint fill;
2490     data++;
2491     /*
2492       -1 is here to avoid one 'if' statement and to let the following code
2493       handle the last byte
2494     */
2495     if ((fill= (bit_count - 1) / 8))
2496     {
2497       bzero(data, fill);
2498       data+= fill;
2499     }
2500     bit_count-= fill * 8;                       /* Bits left to clear */
2501     tmp= (1 << bit_count) - 1;
2502     *data&= ~tmp;
2503   }
2504   set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page);
2505   bitmap->changed= 1;
2506   DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
2507   DBUG_RETURN(0);
2508 }
2509 
2510 
2511 /*
2512   Set all pages in a region as used
2513 
2514   SYNOPSIS
2515     _ma_bitmap_set_full_page_bits()
2516     info                Maria handler
2517     bitmap              Bitmap handler
2518     page                Start page
2519     page_count          Number of pages
2520 
2521   NOTES
2522     We assume that all pages in region is covered by same bitmap
2523     One must have a lock on info->s->bitmap.bitmap_lock
2524 
2525   RETURN
2526     0  ok
2527     1  Error (when reading bitmap)
2528 */
2529 
_ma_bitmap_set_full_page_bits(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page,uint page_count)2530 my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info,
2531                                       MARIA_FILE_BITMAP *bitmap,
2532                                       pgcache_page_no_t page, uint page_count)
2533 {
2534   ulonglong bitmap_page;
2535   uint offset, bit_start, bit_count, tmp;
2536   uchar *data;
2537   DBUG_ENTER("_ma_bitmap_set_full_page_bits");
2538   DBUG_PRINT("enter", ("page: %lu  page_count: %u", (ulong) page, page_count));
2539   mysql_mutex_assert_owner(&info->s->bitmap.bitmap_lock);
2540 
2541   bitmap_page= page - page % bitmap->pages_covered;
2542   if (page == bitmap_page ||
2543       page + page_count > bitmap_page + bitmap->pages_covered)
2544   {
2545     DBUG_ASSERT(0);                             /* Wrong in data */
2546     DBUG_RETURN(1);
2547   }
2548 
2549   if (bitmap_page != bitmap->page &&
2550       _ma_change_bitmap_page(info, bitmap, bitmap_page))
2551     DBUG_RETURN(1);
2552 
2553   /* Find page number from start of bitmap */
2554   offset= (uint) (page - bitmap->page - 1);
2555 
2556   /* Set bits from 'page * 3' -> '(page + page_count) * 3' */
2557   bit_start= offset * 3;
2558   bit_count= page_count * 3;
2559 
2560   data= bitmap->map + bit_start / 8;
2561   offset= bit_start & 7;
2562 
2563   tmp= (255 << offset);                         /* Bits to keep */
2564   if (bit_count + offset < 8)
2565   {
2566     /* Only set bits between 'offset' and 'offset+bit_count-1' */
2567     tmp^= (255 << (offset + bit_count));
2568   }
2569   *data|= tmp;
2570 
2571   if ((int) (bit_count-= (8 - offset)) > 0)
2572   {
2573     uint fill;
2574     data++;
2575     /*
2576       -1 is here to avoid one 'if' statement and to let the following code
2577       handle the last byte
2578     */
2579     if ((fill= (bit_count - 1) / 8))
2580     {
2581       bfill(data, fill, 255);
2582       data+= fill;
2583     }
2584     bit_count-= fill * 8;                       /* Bits left to set */
2585     tmp= (1 << bit_count) - 1;
2586     *data|= tmp;
2587   }
2588   set_if_bigger(bitmap->used_size, (uint) (data - bitmap->map) + 1);
2589   _ma_check_bitmap(bitmap);
2590   bitmap->changed= 1;
2591   DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
2592   DBUG_RETURN(0);
2593 }
2594 
2595 
2596 /**
2597    @brief
2598    Make a transition of MARIA_FILE_BITMAP::non_flushable.
2599    If the bitmap becomes flushable, which requires that REDO-UNDO has been
2600    logged and all bitmap pages touched by the thread have a correct
2601    allocation, it unpins all bitmap pages, and if _ma_bitmap_flush_all() is
2602    waiting (in practice it is a checkpoint), it wakes it up.
2603    If the bitmap becomes or stays unflushable, the function merely records it
2604    unless a concurrent _ma_bitmap_flush_all() is happening, in which case the
2605    function first waits for the flush to be done.
2606 
2607    @note
2608    this sets info->non_flushable_state to 1 if we have incremented
2609    bitmap->non_flushable and not yet decremented it.
2610 
2611    @param  share               Table's share
2612    @param  non_flushable_inc   Increment of MARIA_FILE_BITMAP::non_flushable
2613                                (-1 or +1).
2614 */
2615 
_ma_bitmap_flushable(MARIA_HA * info,int non_flushable_inc)2616 void _ma_bitmap_flushable(MARIA_HA *info, int non_flushable_inc)
2617 {
2618   MARIA_SHARE *share= info->s;
2619   MARIA_FILE_BITMAP *bitmap;
2620   DBUG_ENTER("_ma_bitmap_flushable");
2621 
2622   /*
2623     Not transactional tables are never automaticly flushed and needs no
2624     protection
2625   */
2626   if (!share->now_transactional)
2627     DBUG_VOID_RETURN;
2628 
2629   bitmap= &share->bitmap;
2630   mysql_mutex_lock(&bitmap->bitmap_lock);
2631 
2632   if (non_flushable_inc == -1)
2633   {
2634     DBUG_ASSERT((int) bitmap->non_flushable > 0);
2635     DBUG_ASSERT(info->non_flushable_state == 1);
2636     if (--bitmap->non_flushable == 0)
2637     {
2638       /*
2639         We unlock and unpin pages locked and pinned by other threads. It does
2640         not seem to be an issue as all bitmap changes are serialized with
2641         the bitmap's mutex.
2642       */
2643       _ma_bitmap_unpin_all(share);
2644       if (unlikely(bitmap->waiting_for_non_flushable))
2645       {
2646         DBUG_PRINT("info", ("bitmap flushable waking up flusher"));
2647         mysql_cond_broadcast(&bitmap->bitmap_cond);
2648       }
2649     }
2650     DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable));
2651     mysql_mutex_unlock(&bitmap->bitmap_lock);
2652     info->non_flushable_state= 0;
2653     DBUG_VOID_RETURN;
2654   }
2655   DBUG_ASSERT(non_flushable_inc == 1);
2656   DBUG_ASSERT(info->non_flushable_state == 0);
2657 
2658   bitmap->waiting_for_flush_all_requested++;
2659   while (unlikely(bitmap->flush_all_requested))
2660   {
2661     /*
2662       Some other thread is waiting for the bitmap to become
2663       flushable. Not the moment to make the bitmap unflushable or more
2664       unflushable; let's rather back off and wait. If we didn't do this, with
2665       multiple writers, there may always be one thread causing the bitmap to
2666       be unflushable and _ma_bitmap_flush_all() would wait for long.
2667       There should not be a deadlock because if our thread increased
2668       non_flushable (and thus _ma_bitmap_flush_all() is waiting for at least
2669       our thread), it is not going to increase it more so is not going to come
2670       here.
2671     */
2672     DBUG_PRINT("info", ("waiting for bitmap flusher"));
2673     mysql_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock);
2674   }
2675   bitmap->waiting_for_flush_all_requested--;
2676   bitmap->non_flushable++;
2677   DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable));
2678   mysql_mutex_unlock(&bitmap->bitmap_lock);
2679   info->non_flushable_state= 1;
2680   DBUG_VOID_RETURN;
2681 }
2682 
2683 
2684 /*
2685   Correct bitmap pages to reflect the true allocation
2686 
2687   SYNOPSIS
2688     _ma_bitmap_release_unused()
2689     info                Maria handle
2690     blocks              Bitmap blocks
2691 
2692   IMPLEMENTATION
2693     If block->used & BLOCKUSED_TAIL is set:
2694        If block->used & BLOCKUSED_USED is set, then the bits for the
2695        corresponding page is set according to block->empty_space
2696        If block->used & BLOCKUSED_USED is not set, then the bits for
2697        the corresponding page is set to org_bitmap_value;
2698 
2699     If block->used & BLOCKUSED_TAIL is not set:
2700        if block->used is not set, the bits for the corresponding page are
2701        cleared
2702 
2703   For the first block (head block) the logic is same as for a tail block
2704 
2705   Note that we may have 'filler blocks' that are used to split a block
2706   in half; These can be recognized by that they have page_count == 0.
2707 
2708   This code also reverse the effect of ma_bitmap_flushable(.., 1);
2709 
2710   RETURN
2711     0  ok
2712     1  error (Couldn't write or read bitmap page)
2713 */
2714 
_ma_bitmap_release_unused(MARIA_HA * info,MARIA_BITMAP_BLOCKS * blocks)2715 my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
2716 {
2717   MARIA_BITMAP_BLOCK *block= blocks->block, *end= block + blocks->count;
2718   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
2719   uint bits, current_bitmap_value;
2720   DBUG_ENTER("_ma_bitmap_release_unused");
2721 
2722   /*
2723     We can skip FULL_HEAD_PAGE (4) as the page was marked as 'full'
2724     when we allocated space in the page
2725   */
2726   current_bitmap_value= FULL_HEAD_PAGE;
2727 
2728   mysql_mutex_lock(&bitmap->bitmap_lock);
2729 
2730   /* First handle head block */
2731   if (block->used & BLOCKUSED_USED)
2732   {
2733     DBUG_PRINT("info", ("head page: %lu  empty_space: %u",
2734                         (ulong) block->page, block->empty_space));
2735     bits= _ma_free_size_to_head_pattern(bitmap, block->empty_space);
2736     if (block->used & BLOCKUSED_USE_ORG_BITMAP)
2737       current_bitmap_value= block->org_bitmap_value;
2738   }
2739   else
2740     bits= block->org_bitmap_value;
2741   if (bits != current_bitmap_value)
2742   {
2743     if (set_page_bits(info, bitmap, block->page, bits))
2744       goto err;
2745   }
2746   else
2747   {
2748     DBUG_ASSERT(current_bitmap_value ==
2749                 bitmap_get_page_bits(info, bitmap, block->page));
2750   }
2751 
2752   /* Handle all full pages and tail pages (for head page and blob) */
2753   for (block++; block < end; block++)
2754   {
2755     uint page_count;
2756     if (!block->page_count)
2757       continue;                               /* Skip 'filler blocks' */
2758 
2759     page_count= block->page_count;
2760     if (block->used & BLOCKUSED_TAIL)
2761     {
2762       current_bitmap_value= FULL_TAIL_PAGE;
2763       /* The bitmap page is only one page */
2764       page_count= 1;
2765       if (block->used & BLOCKUSED_USED)
2766       {
2767         DBUG_PRINT("info", ("tail page: %lu  empty_space: %u",
2768                             (ulong) block->page, block->empty_space));
2769         bits= free_size_to_tail_pattern(bitmap, block->empty_space);
2770         if (block->used & BLOCKUSED_USE_ORG_BITMAP)
2771           current_bitmap_value= block->org_bitmap_value;
2772       }
2773       else
2774         bits= block->org_bitmap_value;
2775 
2776       /*
2777         The page has all bits set; The following test is an optimization
2778         to not set the bits to the same value as before.
2779       */
2780       DBUG_ASSERT(current_bitmap_value ==
2781                   bitmap_get_page_bits(info, bitmap, block->page));
2782 
2783       if (bits != current_bitmap_value)
2784       {
2785         if (set_page_bits(info, bitmap, block->page, bits))
2786           goto err;
2787       }
2788     }
2789     else if (!(block->used & BLOCKUSED_USED) &&
2790              _ma_bitmap_reset_full_page_bits(info, bitmap,
2791                                              block->page, page_count))
2792       goto err;
2793   }
2794 
2795   /* This duplicates ma_bitmap_flushable(-1) except it already has mutex */
2796   if (info->non_flushable_state)
2797   {
2798     DBUG_ASSERT(((int) (bitmap->non_flushable)) > 0);
2799     info->non_flushable_state= 0;
2800     if (--bitmap->non_flushable == 0)
2801     {
2802       _ma_bitmap_unpin_all(info->s);
2803       if (unlikely(bitmap->waiting_for_non_flushable))
2804       {
2805         DBUG_PRINT("info", ("bitmap flushable waking up flusher"));
2806         mysql_cond_broadcast(&bitmap->bitmap_cond);
2807       }
2808     }
2809   }
2810   DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable));
2811 
2812   mysql_mutex_unlock(&bitmap->bitmap_lock);
2813   DBUG_RETURN(0);
2814 
2815 err:
2816   mysql_mutex_unlock(&bitmap->bitmap_lock);
2817   DBUG_RETURN(1);
2818 }
2819 
2820 
2821 /*
2822   Free full pages from bitmap and pagecache
2823 
2824   SYNOPSIS
2825     _ma_bitmap_free_full_pages()
2826     info                Maria handle
2827     extents             Extents (as stored on disk)
2828     count               Number of extents
2829 
2830   IMPLEMENTATION
2831     Mark all full pages (not tails) from extents as free, both in bitmap
2832     and page cache.
2833 
2834   RETURN
2835     0  ok
2836     1  error (Couldn't write or read bitmap page)
2837 */
2838 
_ma_bitmap_free_full_pages(MARIA_HA * info,const uchar * extents,uint count)2839 my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
2840                                    uint count)
2841 {
2842   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
2843   my_bool res;
2844   DBUG_ENTER("_ma_bitmap_free_full_pages");
2845 
2846   for (; count--; extents+= ROW_EXTENT_SIZE)
2847   {
2848     pgcache_page_no_t page=  uint5korr(extents);
2849     uint page_count= (uint2korr(extents + ROW_EXTENT_PAGE_SIZE) &
2850                       ~START_EXTENT_BIT);
2851     if (!(page_count & TAIL_BIT))
2852     {
2853       if (page == 0 && page_count == 0)
2854         continue;                               /* Not used extent */
2855       if (pagecache_delete_pages(info->s->pagecache, &info->dfile, page,
2856                                  page_count, PAGECACHE_LOCK_WRITE, 1))
2857         DBUG_RETURN(1);
2858       mysql_mutex_lock(&bitmap->bitmap_lock);
2859       res= _ma_bitmap_reset_full_page_bits(info, bitmap, page, page_count);
2860       mysql_mutex_unlock(&bitmap->bitmap_lock);
2861       if (res)
2862         DBUG_RETURN(1);
2863     }
2864   }
2865   DBUG_RETURN(0);
2866 }
2867 
2868 
2869 /*
2870   Mark in the bitmap how much free space there is on a page
2871 
2872   SYNOPSIS
2873    _ma_bitmap_set()
2874    info		Maria handler
2875    page		Adress to page
2876    head		1 if page is a head page, 0 if tail page
2877    empty_space	How much empty space there is on page
2878 
2879   RETURN
2880     0  ok
2881     1  error
2882 */
2883 
_ma_bitmap_set(MARIA_HA * info,pgcache_page_no_t page,my_bool head,uint empty_space)2884 my_bool _ma_bitmap_set(MARIA_HA *info, pgcache_page_no_t page, my_bool head,
2885                        uint empty_space)
2886 {
2887   MARIA_FILE_BITMAP *bitmap= &info->s->bitmap;
2888   uint bits;
2889   my_bool res;
2890   DBUG_ENTER("_ma_bitmap_set");
2891   DBUG_PRINT("enter", ("page: %lu  head: %d  empty_space: %u",
2892                        (ulong) page, head, empty_space));
2893 
2894   mysql_mutex_lock(&info->s->bitmap.bitmap_lock);
2895   bits= (head ?
2896          _ma_free_size_to_head_pattern(bitmap, empty_space) :
2897          free_size_to_tail_pattern(bitmap, empty_space));
2898   res= set_page_bits(info, bitmap, page, bits);
2899   mysql_mutex_unlock(&info->s->bitmap.bitmap_lock);
2900   DBUG_RETURN(res);
2901 }
2902 
2903 
2904 /*
2905   Check that bitmap pattern is correct for a page
2906 
2907   NOTES
2908     Used in maria_chk
2909 
2910   SYNOPSIS
2911     _ma_check_bitmap_data()
2912     info	    Maria handler
2913     page_type	    What kind of page this is
2914     page	    Adress to page
2915     empty_space     Empty space on page
2916     bitmap_pattern  Bitmap pattern for page (from bitmap)
2917 
2918   RETURN
2919     0  ok
2920     1  error
2921 */
2922 
_ma_check_bitmap_data(MARIA_HA * info,enum en_page_type page_type,uint empty_space,uint bitmap_pattern)2923 my_bool _ma_check_bitmap_data(MARIA_HA *info, enum en_page_type page_type,
2924                               uint empty_space, uint bitmap_pattern)
2925 {
2926   uint bits;
2927   switch (page_type) {
2928   case UNALLOCATED_PAGE:
2929   case MAX_PAGE_TYPE:
2930     bits= 0;
2931     break;
2932   case HEAD_PAGE:
2933     bits= _ma_free_size_to_head_pattern(&info->s->bitmap, empty_space);
2934     break;
2935   case TAIL_PAGE:
2936     bits= free_size_to_tail_pattern(&info->s->bitmap, empty_space);
2937     break;
2938   case BLOB_PAGE:
2939     bits= FULL_TAIL_PAGE;
2940     break;
2941   default:
2942     bits= 0; /* to satisfy compiler */
2943     DBUG_ASSERT(0);
2944   }
2945   return (bitmap_pattern != bits);
2946 }
2947 
2948 /**
2949    Check that bitmap looks correct
2950 
2951    - All data before full_head_size and full_tail_size are allocated
2952    - There is no allocated data after used_size
2953      All of the above need to be correct only according to 6 byte
2954      alignment as all loops reads 6 bytes at a time and we check both
2955      start and end position according to the current 6 byte position.
2956 */
2957 
2958 #ifndef DBUG_OFF
_ma_check_bitmap(MARIA_FILE_BITMAP * bitmap)2959 static void _ma_check_bitmap(MARIA_FILE_BITMAP *bitmap)
2960 {
2961   uchar *data= bitmap->map;
2962   uchar *end=  bitmap->map + bitmap->total_size;
2963   uchar *full_head_end=0, *full_tail_end=0, *first_empty= bitmap->map;
2964 
2965   for (; data < end; data+= 6)
2966   {
2967     ulonglong bits= uint6korr(data);    /* 6 bytes = 6*8/3= 16 patterns */
2968     uint i;
2969 
2970     if (bits == 04444444444444444LL || bits == 0xffffffffffffLL)
2971     {
2972       first_empty= data + 6;
2973       continue;                                 /* block fully used */
2974     }
2975     if (bits == 0)
2976     {
2977       if (!full_head_end)
2978         full_head_end= data;
2979       if (!full_tail_end)
2980         full_tail_end= data;
2981       continue;
2982     }
2983 
2984     first_empty= data + 6;
2985     if (!full_head_end || !full_tail_end)
2986     {
2987       for (i= 0, bits >>= 0; i < 16 ; i++, bits >>= 3)
2988       {
2989         uint pattern= (uint) (bits & 7);
2990         if (pattern == FULL_HEAD_PAGE || pattern == FULL_TAIL_PAGE)
2991           continue;
2992 
2993         if (pattern < 4 && !full_head_end)
2994           full_head_end= data;
2995         if ((pattern == 0 || (pattern > 4 && pattern < 7)) && !full_tail_end)
2996           full_tail_end= data;
2997       }
2998     }
2999   }
3000   if (!full_head_end)
3001     full_head_end= data;
3002   if (!full_tail_end)
3003     full_tail_end= data;
3004 
3005   /* used_size must point after the last byte that had some data) */
3006   DBUG_ASSERT(bitmap->used_size <= bitmap->total_size);
3007   DBUG_ASSERT((bitmap->map + (bitmap->used_size+5)/6*6) >= first_empty);
3008   /* full_xxxx_size can't point after the first block that has free data */
3009   DBUG_ASSERT((bitmap->map + (bitmap->full_head_size/6*6)) <= full_head_end);
3010   DBUG_ASSERT((bitmap->map + (bitmap->full_tail_size/6*6)) <= full_tail_end);
3011 }
3012 #endif
3013 
3014 
3015 /*
3016   Check if the page type matches the one that we have in the bitmap
3017 
3018   SYNOPSIS
3019     _ma_check_if_right_bitmap_type()
3020     info	    Maria handler
3021     page_type	    What kind of page this is
3022     page	    Adress to page
3023     bitmap_pattern  Store here the pattern that was in the bitmap for the
3024 		    page. This is always updated.
3025 
3026   NOTES
3027     Used in maria_chk
3028 
3029   RETURN
3030     0  ok
3031     1  error
3032 */
3033 
_ma_check_if_right_bitmap_type(MARIA_HA * info,enum en_page_type page_type,pgcache_page_no_t page,uint * bitmap_pattern)3034 my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info,
3035                                        enum en_page_type page_type,
3036                                        pgcache_page_no_t page,
3037                                        uint *bitmap_pattern)
3038 {
3039   if ((*bitmap_pattern= _ma_bitmap_get_page_bits(info, &info->s->bitmap,
3040                                                  page)) > 7)
3041     return 1;                                   /* Couldn't read page */
3042   switch (page_type) {
3043   case HEAD_PAGE:
3044     return *bitmap_pattern < 1 || *bitmap_pattern > 4;
3045   case TAIL_PAGE:
3046     return *bitmap_pattern < 5;
3047   case BLOB_PAGE:
3048     return *bitmap_pattern != 7;
3049   default:
3050     break;
3051   }
3052   DBUG_ASSERT(0);
3053   return 1;
3054 }
3055 
3056 
3057 /**
3058    @brief create the first bitmap page of a freshly created data file
3059 
3060    @param  share           table's share
3061 
3062    @return Operation status
3063      @retval 0      OK
3064      @retval !=0    Error
3065 */
3066 
_ma_bitmap_create_first(MARIA_SHARE * share)3067 int _ma_bitmap_create_first(MARIA_SHARE *share)
3068 {
3069   uint block_size= share->bitmap.block_size;
3070   File file= share->bitmap.file.file;
3071   uchar marker[CRC_SIZE];
3072 
3073   /*
3074     Next write operation of the page will write correct CRC
3075     if it is needed
3076   */
3077   int4store(marker, MARIA_NO_CRC_BITMAP_PAGE);
3078 
3079   if (mysql_file_chsize(file, block_size - sizeof(marker),
3080                         0, MYF(MY_WME)) ||
3081       my_pwrite(file, marker, sizeof(marker),
3082                 block_size - sizeof(marker),
3083                 MYF(MY_NABP | MY_WME)))
3084     return 1;
3085   share->state.state.data_file_length= block_size;
3086   _ma_bitmap_delete_all(share);
3087   return 0;
3088 }
3089 
3090 
3091 /**
3092   @brief Pagecache callback to get the TRANSLOG_ADDRESS to flush up to, when a
3093   bitmap page needs to be flushed.
3094 
3095   @param page            Page's content
3096   @param page_no         Page's number (<offset>/<page length>)
3097   @param data_ptr        Callback data pointer (pointer to MARIA_SHARE)
3098 
3099   @retval TRANSLOG_ADDRESS to flush up to.
3100 */
3101 
3102 static my_bool
flush_log_for_bitmap(PAGECACHE_IO_HOOK_ARGS * args)3103 flush_log_for_bitmap(PAGECACHE_IO_HOOK_ARGS *args __attribute__ ((unused)))
3104 {
3105 #ifdef DBUG_ASSERT_EXISTS
3106   const MARIA_SHARE *share= (MARIA_SHARE*)args->data;
3107 #endif
3108   DBUG_ENTER("flush_log_for_bitmap");
3109   DBUG_ASSERT(share->now_transactional);
3110   /*
3111     WAL imposes that UNDOs reach disk before bitmap is flushed. We don't know
3112     the LSN of the last UNDO about this bitmap page, so we flush whole log.
3113   */
3114   DBUG_RETURN(translog_flush(translog_get_horizon()));
3115 }
3116 
3117 
3118 /**
3119    @brief Set callbacks for bitmap pages
3120 
3121    @note
3122    We don't use pagecache_file_init here, as we want to keep the
3123    code readable
3124 */
3125 
_ma_bitmap_set_pagecache_callbacks(PAGECACHE_FILE * file,MARIA_SHARE * share)3126 void _ma_bitmap_set_pagecache_callbacks(PAGECACHE_FILE *file,
3127                                         MARIA_SHARE *share)
3128 {
3129   pagecache_file_set_null_hooks(file);
3130   file->callback_data= (uchar*) share;
3131   file->flush_log_callback= maria_flush_log_for_page_none;
3132   file->post_write_hook= maria_page_write_failure;
3133 
3134   if (share->temporary)
3135   {
3136     file->post_read_hook=  &maria_page_crc_check_none;
3137     file->pre_write_hook= &maria_page_filler_set_none;
3138   }
3139   else
3140   {
3141     file->post_read_hook=  &maria_page_crc_check_bitmap;
3142     if (share->options & HA_OPTION_PAGE_CHECKSUM)
3143       file->pre_write_hook= &maria_page_crc_set_normal;
3144     else
3145       file->pre_write_hook= &maria_page_filler_set_bitmap;
3146     if (share->now_transactional)
3147       file->flush_log_callback= flush_log_for_bitmap;
3148   }
3149 }
3150 
3151 
3152 /**
3153   Extends data file with zeroes and creates new bitmap pages into page cache.
3154 
3155   Writes all bitmap pages in [from, to].
3156 
3157   Non-bitmap pages of zeroes are correct as they are marked empty in
3158   bitmaps. Bitmap pages will not be zeroes: they will get their CRC fixed when
3159   flushed. And if there is a crash before flush (so they are zeroes at
3160   restart), a REDO will re-create them in page cache.
3161 */
3162 
3163 static my_bool
_ma_bitmap_create_missing_into_pagecache(MARIA_SHARE * share,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t from,pgcache_page_no_t to,uchar * zeroes)3164 _ma_bitmap_create_missing_into_pagecache(MARIA_SHARE *share,
3165                                          MARIA_FILE_BITMAP *bitmap,
3166                                          pgcache_page_no_t from,
3167                                          pgcache_page_no_t to,
3168                                          uchar *zeroes)
3169 {
3170   pgcache_page_no_t i;
3171   /*
3172     We do not use my_chsize() because there can be a race between when it
3173     reads the physical size and when it writes (assume data_file_length is 10,
3174     physical length is 8 and two data pages are in cache, and here we do a
3175     my_chsize: my_chsize sees physical length is 8, then the two data pages go
3176     to disk then my_chsize writes from page 8 and so overwrites the two data
3177     pages, wrongly).
3178     We instead rely on the filesystem filling gaps with zeroes.
3179   */
3180   for (i= from; i <= to; i+= bitmap->pages_covered)
3181   {
3182     /**
3183       No need to keep them pinned, they are new so flushable.
3184       @todo but we may want to keep them pinned, as an optimization: if they
3185       are not pinned they may go to disk before the data pages go (so, the
3186       physical pages would be in non-ascending "sparse" order on disk), or the
3187       filesystem may fill gaps with zeroes physically which is a waste of
3188       time.
3189     */
3190     if (pagecache_write(share->pagecache,
3191                         &bitmap->file, i, 0,
3192                         zeroes, PAGECACHE_PLAIN_PAGE,
3193                         PAGECACHE_LOCK_LEFT_UNLOCKED,
3194                         PAGECACHE_PIN_LEFT_UNPINNED,
3195                         PAGECACHE_WRITE_DELAY, 0, LSN_IMPOSSIBLE))
3196       goto err;
3197   }
3198   /*
3199     Data pages after data_file_length are full of zeroes but that is allowed
3200     as they are marked empty in the bitmap.
3201   */
3202   return FALSE;
3203 err:
3204   return TRUE;
3205 }
3206 
3207 
3208 /**
3209  Creates missing bitmaps when we extend the data file.
3210 
3211  At run-time, when we need a new bitmap page we come here; and only one bitmap
3212  page at a time is created.
3213 
3214  In some recovery cases we insert at a large offset in the data file, way
3215  beyond state.data_file_length, so can need to create more than one bitmap
3216  page in one go. Known case is:
3217  Start a transaction in Maria;
3218  delete last row of very large table (with delete_row)
3219  do a bulk insert
3220  crash
3221  Then UNDO_BULK_INSERT will truncate table files, and
3222  UNDO_ROW_DELETE will want to put the row back to its original position,
3223  extending the data file a lot: bitmap page*s* in the hole must be created,
3224  or he table would look corrupted.
3225 
3226  We need to log REDOs for bitmap creation, consider: we apply a REDO for a
3227  data page, which creates the first data page covered by a new bitmap
3228  not yet created. If the data page is flushed but the bitmap page is not and
3229  there is a crash, re-execution of the REDO will complain about the zeroed
3230  bitmap page (see it as corruption). Thus a REDO is needed to re-create the
3231  bitmap.
3232 
3233  @param  info              Maria handler
3234  @param  bitmap            Bitmap handler
3235  @param  page              Last bitmap page to create
3236 
3237  @note When this function is called this must be true:
3238  ((page + 1) * bitmap->block_size > info->s->state.state.data_file_length)
3239 
3240 */
3241 
_ma_bitmap_create_missing(MARIA_HA * info,MARIA_FILE_BITMAP * bitmap,pgcache_page_no_t page)3242 static my_bool _ma_bitmap_create_missing(MARIA_HA *info,
3243                                          MARIA_FILE_BITMAP *bitmap,
3244                                          pgcache_page_no_t page)
3245 {
3246   MARIA_SHARE *share= info->s;
3247   uint block_size= bitmap->block_size;
3248   pgcache_page_no_t from, to;
3249   my_off_t data_file_length= share->state.state.data_file_length;
3250   DBUG_ENTER("_ma_bitmap_create_missing");
3251   DBUG_PRINT("enter", ("page: %lld", (longlong) page));
3252 
3253   /* First (in offset order) bitmap page to create */
3254   if (data_file_length < block_size)
3255     goto err; /* corrupted, should have first bitmap page */
3256   if (page * block_size >= share->base.max_data_file_length)
3257   {
3258     my_errno= HA_ERR_RECORD_FILE_FULL;
3259     goto err;
3260   }
3261 
3262   from= (data_file_length / block_size - 1) / bitmap->pages_covered + 1;
3263   from*= bitmap->pages_covered;
3264   /*
3265     page>=from because:
3266     (page + 1) * bs > dfl, and page == k * pc so:
3267     (k * pc + 1) * bs > dfl; k * pc + 1 > dfl / bs; k * pc > dfl / bs - 1
3268     k > (dfl / bs - 1) / pc; k >= (dfl / bs - 1) / pc + 1
3269     k * pc >= ((dfl / bs - 1) / pc + 1) * pc == from.
3270   */
3271   DBUG_ASSERT(page >= from);
3272 
3273   if (share->now_transactional)
3274   {
3275     LSN lsn;
3276     uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2];
3277     LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
3278     page_store(log_data + FILEID_STORE_SIZE, from);
3279     page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
3280     log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
3281     log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
3282     /*
3283       We don't use info->trn so that this REDO is always executed even though
3284       the UNDO does not reach disk due to crash. This is also consistent with
3285       the fact that the new bitmap pages are not pinned.
3286     */
3287     if (translog_write_record(&lsn, LOGREC_REDO_BITMAP_NEW_PAGE,
3288                               &dummy_transaction_object, info,
3289                               (translog_size_t)sizeof(log_data),
3290                               TRANSLOG_INTERNAL_PARTS + 1, log_array,
3291                               log_data, NULL))
3292       goto err;
3293     /*
3294       No need to flush the log: the bitmap pages we are going to create will
3295       flush it when they go to disk.
3296     */
3297   }
3298 
3299   /*
3300     Last bitmap page. It has special creation: will go to the page cache
3301     only later as we are going to modify it very soon.
3302   */
3303   bzero(bitmap->map, bitmap->block_size);
3304   bitmap->used_size= bitmap->full_head_size= bitmap->full_tail_size= 0;
3305   bitmap->changed=1;
3306 #ifndef DBUG_OFF
3307   /*
3308     Make a copy of the page to be able to print out bitmap changes during
3309     debugging
3310   */
3311   memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
3312 #endif
3313 
3314   /* Last bitmap page to create before 'page' */
3315   DBUG_ASSERT(page >= bitmap->pages_covered);
3316   to= page - bitmap->pages_covered;
3317   /*
3318     In run-time situations, from>=to is always false, i.e. we always create
3319     one bitmap at a time ('page').
3320   */
3321   if ((from <= to) &&
3322       _ma_bitmap_create_missing_into_pagecache(share, bitmap, from, to,
3323                                                bitmap->map))
3324     goto err;
3325 
3326   share->state.state.data_file_length= (page + 1) * bitmap->block_size;
3327 
3328  DBUG_RETURN(FALSE);
3329 err:
3330  DBUG_RETURN(TRUE);
3331 }
3332 
3333 
_ma_apply_redo_bitmap_new_page(MARIA_HA * info,LSN lsn,const uchar * header)3334 my_bool _ma_apply_redo_bitmap_new_page(MARIA_HA *info,
3335                                        LSN lsn __attribute__ ((unused)),
3336                                        const uchar *header)
3337 {
3338   MARIA_SHARE *share= info->s;
3339   MARIA_FILE_BITMAP *bitmap= &share->bitmap;
3340   my_bool error;
3341   pgcache_page_no_t from, to, min_from;
3342   DBUG_ENTER("_ma_apply_redo_bitmap_new_page");
3343 
3344   from= page_korr(header);
3345   to=   page_korr(header + PAGE_STORE_SIZE);
3346   DBUG_PRINT("info", ("from: %lu to: %lu", (ulong)from, (ulong)to));
3347   if ((from > to) ||
3348       (from % bitmap->pages_covered) != 0 ||
3349       (to % bitmap->pages_covered) != 0)
3350   {
3351     error= TRUE; /* corrupted log record */
3352     goto err;
3353   }
3354 
3355   min_from= (share->state.state.data_file_length / bitmap->block_size - 1) /
3356     bitmap->pages_covered + 1;
3357   min_from*= bitmap->pages_covered;
3358   if (from < min_from)
3359   {
3360     DBUG_PRINT("info", ("overwrite bitmap pages from %lu", (ulong)min_from));
3361     /*
3362       We have to overwrite. It could be that there was a bitmap page in
3363       memory, covering a data page which went to disk, then crash: the
3364       bitmap page is now full of zeros and is ==min_from, we have to overwrite
3365       it with correct checksum.
3366     */
3367   }
3368   share->state.changed|= STATE_CHANGED;
3369   bzero(info->buff, bitmap->block_size);
3370   if (!(error=
3371         _ma_bitmap_create_missing_into_pagecache(share, bitmap, from, to,
3372                                                  info->buff)))
3373     share->state.state.data_file_length= (to + 1) * bitmap->block_size;
3374 
3375 err:
3376   DBUG_RETURN(error);
3377 }
3378