1 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2    Copyright (C) 2009-2010 Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 #include "ma_fulltext.h"
18 #include "ma_rt_index.h"
19 #include "trnman.h"
20 #include "ma_key_recover.h"
21 
22 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
23                     MARIA_PAGE *page);
24 static int del(MARIA_HA *info, MARIA_KEY *key,
25                MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
26 	       uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
27 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
28 		     MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
29 		     uchar *keypos);
30 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
31                        uchar *keypos, uchar *lastkey, uchar *page_end,
32 		       my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
33 
34 /* @breif Remove a row from a MARIA table */
35 
maria_delete(MARIA_HA * info,const uchar * record)36 int maria_delete(MARIA_HA *info,const uchar *record)
37 {
38   uint i;
39   uchar *old_key;
40   int save_errno;
41   char lastpos[8];
42   MARIA_SHARE *share= info->s;
43   MARIA_KEYDEF *keyinfo;
44   DBUG_ENTER("maria_delete");
45 
46   /* Test if record is in datafile */
47   DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
48                   maria_print_error(share, HA_ERR_CRASHED);
49                   DBUG_RETURN(my_errno= HA_ERR_CRASHED););
50   DBUG_EXECUTE_IF("my_error_test_undefined_error",
51                   maria_print_error(share, INT_MAX);
52                   DBUG_RETURN(my_errno= INT_MAX););
53   if (!(info->update & HA_STATE_AKTIV))
54   {
55     DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND);	/* No database read */
56   }
57   if (share->options & HA_OPTION_READ_ONLY_DATA)
58   {
59     DBUG_RETURN(my_errno=EACCES);
60   }
61   if (_ma_readinfo(info,F_WRLCK,1))
62     DBUG_RETURN(my_errno);
63   if ((*share->compare_record)(info,record))
64     goto err;				/* Error on read-check */
65 
66   if (_ma_mark_file_changed(share))
67     goto err;
68 
69   /* Ensure we don't change the autoincrement value */
70   info->last_auto_increment= ~(ulonglong) 0;
71   /* Remove all keys from the index file */
72 
73   old_key= info->lastkey_buff2;
74 
75   for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
76   {
77     if (maria_is_key_active(share->state.key_map, i))
78     {
79       keyinfo->version++;
80       if (keyinfo->flag & HA_FULLTEXT)
81       {
82         if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
83           goto err;
84       }
85       else
86       {
87         MARIA_KEY key;
88         if (keyinfo->ck_delete(info,
89                                (*keyinfo->make_key)(info, &key, i, old_key,
90                                                     record,
91                                                     info->cur_row.lastpos,
92                                                     info->cur_row.trid)))
93           goto err;
94       }
95       /* The above changed info->lastkey2. Inform maria_rnext_same(). */
96       info->update&= ~HA_STATE_RNEXT_SAME;
97     }
98   }
99 
100   if (share->calc_checksum)
101   {
102     /*
103       We can't use the row based checksum as this doesn't have enough
104       precision.
105     */
106     info->cur_row.checksum= (*share->calc_checksum)(info, record);
107   }
108 
109   if ((*share->delete_record)(info, record))
110     goto err;				/* Remove record from database */
111 
112   info->state->checksum-= info->cur_row.checksum;
113   info->state->records--;
114   info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
115   info->row_changes++;
116   share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
117                           STATE_NOT_ZEROFILLED);
118   info->state->changed=1;
119 
120   mi_sizestore(lastpos, info->cur_row.lastpos);
121   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
122   if (info->invalidator != 0)
123   {
124     DBUG_PRINT("info", ("invalidator... '%s' (delete)",
125                         share->open_file_name.str));
126     (*info->invalidator)(share->open_file_name.str);
127     info->invalidator=0;
128   }
129   DBUG_RETURN(0);
130 
131 err:
132   save_errno= my_errno;
133   DBUG_ASSERT(save_errno);
134   if (!save_errno)
135     save_errno= HA_ERR_INTERNAL_ERROR;          /* Should never happen */
136 
137   mi_sizestore(lastpos, info->cur_row.lastpos);
138   (void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
139   info->update|=HA_STATE_WRITTEN;	/* Buffer changed */
140   if (save_errno != HA_ERR_RECORD_CHANGED)
141   {
142     _ma_set_fatal_error(share, HA_ERR_CRASHED);
143     save_errno= HA_ERR_CRASHED;
144   }
145   DBUG_RETURN(my_errno= save_errno);
146 } /* maria_delete */
147 
148 
149 /*
150   Remove a key from the btree index
151 
152   TODO:
153    Change ma_ck_real_delete() to use another buffer for changed keys instead
154    of key->data. This would allows us to remove the copying of the key here.
155 */
156 
_ma_ck_delete(MARIA_HA * info,MARIA_KEY * key)157 my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
158 {
159   MARIA_SHARE *share= info->s;
160   int res;
161   my_bool buff_alloced;
162   LSN lsn= LSN_IMPOSSIBLE;
163   my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
164   uchar *key_buff, *save_key_data;
165   MARIA_KEY org_key;
166   DBUG_ENTER("_ma_ck_delete");
167 
168   LINT_INIT_STRUCT(org_key);
169 
170   alloc_on_stack(*info->stack_end_ptr, key_buff, buff_alloced,
171                  key->keyinfo->max_store_length);
172   if (!key_buff)
173     DBUG_RETURN(1);
174 
175   save_key_data= key->data;
176   if (share->now_transactional)
177   {
178     /* Save original value as the key may change */
179     memcpy(key_buff, key->data, key->data_length + key->ref_length);
180     org_key= *key;
181     key->data= key_buff;
182   }
183 
184   if ((res= _ma_ck_real_delete(info, key, &new_root)))
185   {
186     /* We have to mark the table crashed before unpin_all_pages() */
187     maria_mark_crashed(info);
188   }
189 
190   key->data= save_key_data;
191   if (!res && share->now_transactional)
192     res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
193   else
194   {
195     share->state.key_root[key->keyinfo->key_nr]= new_root;
196     _ma_fast_unlock_key_del(info);
197   }
198   _ma_unpin_all_pages_and_finalize_row(info, lsn);
199 
200   stack_alloc_free(key_buff, buff_alloced);
201   DBUG_RETURN(res != 0);
202 } /* _ma_ck_delete */
203 
204 
_ma_ck_real_delete(register MARIA_HA * info,MARIA_KEY * key,my_off_t * root)205 my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
206                            my_off_t *root)
207 {
208   int error;
209   my_bool result= 0, buff_alloced;
210   my_off_t old_root;
211   uchar *root_buff;
212   MARIA_KEYDEF *keyinfo= key->keyinfo;
213   MARIA_PAGE page;
214   DBUG_ENTER("_ma_ck_real_delete");
215 
216   if ((old_root=*root) == HA_OFFSET_ERROR)
217   {
218     _ma_set_fatal_error(info->s, HA_ERR_CRASHED);
219     DBUG_RETURN(1);
220   }
221 
222   alloc_on_stack(*info->stack_end_ptr, root_buff, buff_alloced,
223                  (keyinfo->block_length + keyinfo->max_store_length*2));
224   if (!root_buff)
225     DBUG_RETURN(1);
226 
227   DBUG_PRINT("info",("root_page: %lu",
228                      (ulong) (old_root / keyinfo->block_length)));
229   if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
230                         PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
231   {
232     result= 1;
233     goto err;
234   }
235   if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
236                                    SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
237                                    SEARCH_SAME),
238                        &page)))
239   {
240     if (error < 0)
241       result= 1;
242     else if (error == 2)
243     {
244       DBUG_PRINT("test",("Enlarging of root when deleting"));
245       if (_ma_enlarge_root(info, key, root))
246         result= 1;
247     }
248     else /* error == 1 */
249     {
250       MARIA_SHARE *share= info->s;
251 
252       page_mark_changed(info, &page);
253 
254       if (page.size <= page.node + share->keypage_header + 1)
255       {
256         DBUG_ASSERT(page.size == page.node + share->keypage_header);
257 	if (page.node)
258 	  *root= _ma_kpos(page.node, root_buff +share->keypage_header +
259                           page.node);
260 	else
261 	  *root=HA_OFFSET_ERROR;
262 	if (_ma_dispose(info, old_root, 0))
263 	  result= 1;
264       }
265       else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
266                                  DFLT_INIT_HITS))
267         result= 1;
268     }
269   }
270 err:
271   stack_alloc_free(root_buff, buff_alloced);
272   DBUG_PRINT("exit",("Return: %d",result));
273   DBUG_RETURN(result);
274 } /* _ma_ck_real_delete */
275 
276 
277 /**
278    @brief Remove key below key root
279 
280    @param key  Key to delete.  Will contain new key if block was enlarged
281 
282    @return
283    @retval 0   ok (anc_page is not changed)
284    @retval 1   If data on page is too small; In this case anc_buff is not saved
285    @retval 2   If data on page is too big
286    @retval -1  On errors
287 */
288 
d_search(MARIA_HA * info,MARIA_KEY * key,uint32 comp_flag,MARIA_PAGE * anc_page)289 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
290                     MARIA_PAGE *anc_page)
291 {
292   int flag,ret_value,save_flag;
293   uint nod_flag, page_flag;
294   my_bool last_key, buff_alloced= 0, lastkey_alloced;
295   uchar *leaf_buff=0, *keypos, *lastkey;
296   MARIA_KEY_PARAM s_temp;
297   MARIA_SHARE *share= info->s;
298   MARIA_KEYDEF *keyinfo= key->keyinfo;
299   MARIA_PAGE leaf_page;
300   DBUG_ENTER("d_search");
301   DBUG_DUMP("page", anc_page->buff, anc_page->size);
302 
303   alloc_on_stack(*info->stack_end_ptr, lastkey, lastkey_alloced,
304                  keyinfo->max_store_length);
305   if (!lastkey)
306     DBUG_RETURN(1);
307 
308   flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
309                               &last_key);
310   if (flag == MARIA_FOUND_WRONG_KEY)
311   {
312     DBUG_PRINT("error",("Found wrong key"));
313     goto err;
314   }
315   page_flag= anc_page->flag;
316   nod_flag=  anc_page->node;
317 
318   if (!flag && (keyinfo->flag & HA_FULLTEXT))
319   {
320     uint off;
321     int  subkeys;
322 
323     get_key_full_length_rdonly(off, lastkey);
324     subkeys=ft_sintXkorr(lastkey+off);
325     DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
326     comp_flag=SEARCH_SAME;
327     if (subkeys >= 0)
328     {
329       /* normal word, one-level tree structure */
330       if (info->ft1_to_ft2)
331       {
332         /* we're in ft1->ft2 conversion mode. Saving key data */
333         insert_dynamic(info->ft1_to_ft2, (lastkey+off));
334       }
335       else
336       {
337         /* we need exact match only if not in ft1->ft2 conversion mode */
338         flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
339                                     lastkey, &last_key);
340       }
341       /* fall through to normal delete */
342     }
343     else
344     {
345       /* popular word. two-level tree. going down */
346       uint tmp_key_length;
347       my_off_t root;
348       uchar *kpos=keypos;
349       MARIA_KEY tmp_key;
350 
351       tmp_key.data=    lastkey;
352       tmp_key.keyinfo= keyinfo;
353 
354       if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
355                                                &kpos)))
356       {
357         _ma_set_fatal_error(share, HA_ERR_CRASHED);
358         goto err;
359       }
360       root= _ma_row_pos_from_key(&tmp_key);
361       if (subkeys == -1)
362       {
363         /* the last entry in sub-tree */
364         if (_ma_dispose(info, root, 1))
365           goto err;
366         /* fall through to normal delete */
367       }
368       else
369       {
370         MARIA_KEY word_key;
371         keyinfo=&share->ft2_keyinfo;
372         /* we'll modify key entry 'in vivo' */
373         kpos-=keyinfo->keylength+nod_flag;
374         get_key_full_length_rdonly(off, key->data);
375 
376         word_key.data=        key->data + off;
377         word_key.keyinfo=     &share->ft2_keyinfo;
378         word_key.data_length= HA_FT_WLEN;
379         word_key.ref_length= 0;
380         word_key.flag= 0;
381         ret_value= _ma_ck_real_delete(info, &word_key, &root);
382         _ma_dpointer(share, kpos+HA_FT_WLEN, root);
383         subkeys++;
384         ft_intXstore(kpos, subkeys);
385         if (!ret_value)
386         {
387           page_mark_changed(info, anc_page);
388           ret_value= _ma_write_keypage(anc_page,
389                                        PAGECACHE_LOCK_LEFT_WRITELOCKED,
390                                        DFLT_INIT_HITS);
391         }
392         goto end;
393       }
394     }
395   }
396   if (nod_flag)
397   {
398     /* Read left child page */
399     leaf_page.pos= _ma_kpos(nod_flag,keypos);
400 
401     alloc_on_stack(*info->stack_end_ptr, leaf_buff, buff_alloced,
402                    (keyinfo->block_length + keyinfo->max_store_length*2));
403     if (!leaf_buff)
404       goto err;
405 
406     if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
407                           PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
408                           0))
409       goto err;
410   }
411 
412   if (flag != 0)
413   {
414     if (!nod_flag)
415     {
416       /* This should newer happend */
417       DBUG_PRINT("error",("Didn't find key"));
418       _ma_set_fatal_error(share, HA_ERR_CRASHED);
419       goto err;
420     }
421     save_flag=0;
422     ret_value= d_search(info, key, comp_flag, &leaf_page);
423   }
424   else
425   {						/* Found key */
426     uint tmp;
427     uint anc_buff_length= anc_page->size;
428     uint anc_page_flag=   anc_page->flag;
429     my_off_t next_block;
430 
431     if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
432                           anc_page->buff + anc_buff_length,
433                           &next_block, &s_temp)))
434       goto err;
435 
436     page_mark_changed(info, anc_page);
437     anc_buff_length-= tmp;
438     anc_page->size= anc_buff_length;
439     page_store_size(share, anc_page);
440 
441     /*
442       Log initial changes on pages
443       If there is an underflow, there will be more changes logged to the
444       page
445     */
446     if (share->now_transactional &&
447         _ma_log_delete(anc_page, s_temp.key_pos,
448                        s_temp.changed_length, s_temp.move_length,
449                        0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
450       goto err;
451 
452     if (!nod_flag)
453     {						/* On leaf page */
454       if (anc_buff_length <= (info->quick_mode ?
455                               MARIA_MIN_KEYBLOCK_LENGTH :
456                               (uint) keyinfo->underflow_block_length))
457       {
458         /* Page will be written by caller if we return 1 */
459         ret_value= 1;
460         goto end;
461       }
462       if (_ma_write_keypage(anc_page,
463                             PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
464 	goto err;
465 
466       ret_value= 0;                             /* Return ok */
467       goto end;
468     }
469     save_flag=1;                         /* Mark that anc_buff is changed */
470     ret_value= del(info, key, anc_page, &leaf_page,
471                    keypos, next_block, lastkey);
472   }
473   if (ret_value >0)
474   {
475     save_flag= 2;
476     if (ret_value == 1)
477       ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
478     else
479     {
480       /* This can only happen with variable length keys */
481       MARIA_KEY last_key;
482       DBUG_PRINT("test",("Enlarging of key when deleting"));
483 
484       last_key.data=    lastkey;
485       last_key.keyinfo= keyinfo;
486       if (!_ma_get_last_key(&last_key, anc_page, keypos))
487 	goto err;
488       ret_value= _ma_insert(info, key, anc_page, keypos,
489                             last_key.data,
490                             (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
491 
492       if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
493                             DFLT_INIT_HITS))
494         ret_value= -1;
495     }
496   }
497   if (ret_value == 0 && anc_page->size > share->max_index_block_size)
498   {
499     /*
500       parent buffer got too big ; We have to split the page.
501       The | 2 is there to force write of anc page below
502     */
503     save_flag= 3;
504     ret_value= _ma_split_page(info, key, anc_page,
505                               share->max_index_block_size,
506                               (uchar*) 0, 0, 0, lastkey, 0) | 2;
507     DBUG_ASSERT(anc_page->org_size == anc_page->size);
508   }
509   if (save_flag && ret_value != 1)
510   {
511     page_mark_changed(info, anc_page);
512     if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
513                           DFLT_INIT_HITS))
514       ret_value= -1;
515   }
516   else
517   {
518     DBUG_DUMP("page", anc_page->buff, anc_page->size);
519   }
520 
521 end:
522   stack_alloc_free(leaf_buff, buff_alloced);
523   stack_alloc_free(lastkey, lastkey_alloced);
524   DBUG_PRINT("exit",("Return: %d",ret_value));
525   DBUG_RETURN(ret_value);
526 
527 err:
528   stack_alloc_free(leaf_buff, buff_alloced);
529   stack_alloc_free(lastkey, lastkey_alloced);
530   DBUG_PRINT("exit",("Error: %d",my_errno));
531   DBUG_RETURN (-1);
532 } /* d_search */
533 
534 
535 /**
536    @brief Remove a key that has a page-reference
537 
538    @param info		 Maria handler
539    @param key		 Buffer for key to be inserted at upper level
540    @param anc_page	 Page address for page where deleted key was
541    @param anc_buff       Page buffer (nod) where deleted key was
542    @param leaf_page      Page address for nod before the deleted key
543    @param leaf_buff      Buffer for leaf_page
544    @param leaf_buff_link Pinned page link for leaf_buff
545    @param keypos         Pos to where deleted key was on anc_buff
546    @param next_block	 Page adress for nod after deleted key
547    @param ret_key_buff	 Key before keypos in anc_buff
548 
549    @notes
550       leaf_page must be written to disk if retval > 0
551       anc_page  is not updated on disk. Caller should do this
552 
553    @return
554    @retval < 0   Error
555    @retval 0     OK.    leaf_buff is written to disk
556 
557    @retval 1     key contains key to upper level (from balance page)
558                  leaf_buff has underflow
559    @retval 2     key contains key to upper level (from split space)
560 */
561 
del(MARIA_HA * info,MARIA_KEY * key,MARIA_PAGE * anc_page,MARIA_PAGE * leaf_page,uchar * keypos,my_off_t next_block,uchar * ret_key_buff)562 static int del(MARIA_HA *info, MARIA_KEY *key,
563                MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
564 	       uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
565 {
566   int ret_value,length;
567   uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
568   uchar *keybuff,*endpos,*next_buff,*key_start, *prev_key;
569   uchar *anc_buff;
570   my_bool buff_alloced= 0, keybuff_alloced;
571   MARIA_KEY_PARAM s_temp;
572   MARIA_KEY tmp_key;
573   MARIA_SHARE *share= info->s;
574   MARIA_KEYDEF *keyinfo= key->keyinfo;
575   MARIA_KEY ret_key;
576   MARIA_PAGE next_page;
577   DBUG_ENTER("del");
578   DBUG_PRINT("enter",("leaf_page: %lu  keypos: %p",
579                       (ulong) (leaf_page->pos / share->block_size),
580 		      keypos));
581   DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
582 
583   alloc_on_stack(*info->stack_end_ptr, keybuff, keybuff_alloced,
584                  keyinfo->max_store_length);
585   if (!keybuff)
586     DBUG_RETURN(1);
587 
588   page_flag=   leaf_page->flag;
589   leaf_length= leaf_page->size;
590   nod_flag=    leaf_page->node;
591 
592   endpos= leaf_page->buff + leaf_length;
593   tmp_key.keyinfo= keyinfo;
594   tmp_key.data=    keybuff;
595   next_buff= 0;
596 
597   if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
598     goto err;
599 
600   if (nod_flag)
601   {
602     next_page.pos= _ma_kpos(nod_flag,endpos);
603 
604     alloc_on_stack(*info->stack_end_ptr, next_buff, buff_alloced,
605                    (keyinfo->block_length + keyinfo->max_store_length*2));
606     if (!next_buff)
607       goto err;
608 
609     if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
610                           PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
611       ret_value= -1;
612     else
613     {
614       DBUG_DUMP("next_page", next_page.buff, next_page.size);
615       if ((ret_value= del(info, key, anc_page, &next_page,
616                           keypos, next_block, ret_key_buff)) >0)
617       {
618         /* Get new length after key was deleted */
619 	endpos= leaf_page->buff+ leaf_page->size;
620 	if (ret_value == 1)
621 	{
622           /* underflow writes "next_page" to disk */
623 	  ret_value= underflow(info, keyinfo, leaf_page, &next_page,
624                                endpos);
625           if (ret_value < 0)
626             goto err;
627 	  if (leaf_page->size > share->max_index_block_size)
628 	  {
629             DBUG_ASSERT(ret_value == 0);
630 	    ret_value= (_ma_split_page(info, key, leaf_page,
631                                        share->max_index_block_size,
632                                        (uchar*) 0, 0, 0,
633                                        ret_key_buff, 0) | 2);
634 	  }
635 	}
636 	else
637 	{
638           if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
639                                 DFLT_INIT_HITS))
640             goto err;
641 	  DBUG_PRINT("test",("Inserting of key when deleting"));
642 	  if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
643 	    goto err;
644 	  ret_value= _ma_insert(info, key, leaf_page, endpos,
645                                 tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
646                                 0);
647 	}
648       }
649       page_mark_changed(info, leaf_page);
650       /*
651         If ret_value <> 0, then leaf_page underflowed and caller will have
652         to handle underflow and write leaf_page to disk.
653         We can't write it here, as if leaf_page is empty we get an assert
654         in _ma_write_keypage.
655       */
656       if (ret_value == 0 && _ma_write_keypage(leaf_page,
657                                               PAGECACHE_LOCK_LEFT_WRITELOCKED,
658                                               DFLT_INIT_HITS))
659 	goto err;
660     }
661     stack_alloc_free(next_buff, buff_alloced);
662     stack_alloc_free(keybuff, keybuff_alloced);
663     DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
664     DBUG_RETURN(ret_value);
665   }
666 
667   /*
668     Remove last key from leaf page
669     Note that leaf_page page may only have had one key (can normally only
670     happen in quick mode), in which ase it will now temporary have 0 keys
671     on it. This will be corrected by the caller as we will return 0.
672   */
673   new_leaf_length= (uint) (key_start - leaf_page->buff);
674   leaf_page->size= new_leaf_length;
675   page_store_size(share, leaf_page);
676 
677   if (share->now_transactional &&
678       _ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
679     goto err;
680 
681   page_mark_changed(info, leaf_page);           /* Safety */
682   if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
683                           (uint) keyinfo->underflow_block_length))
684   {
685     /* Underflow, leaf_page will be written by caller */
686     ret_value= 1;
687   }
688   else
689   {
690     ret_value= 0;
691     if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
692                           DFLT_INIT_HITS))
693       goto err;
694   }
695 
696   /* Place last key in ancestor page on deleted key position */
697   a_length= anc_page->size;
698   anc_buff= anc_page->buff;
699   endpos=   anc_buff + a_length;
700 
701   ret_key.keyinfo= keyinfo;
702   ret_key.data=    ret_key_buff;
703 
704   prev_key= 0;
705   if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
706   {
707     if (!_ma_get_last_key(&ret_key, anc_page, keypos))
708       goto err;
709     prev_key= ret_key.data;
710   }
711   length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
712                                keypos == endpos ? (uchar*) 0 : keypos,
713                                prev_key, prev_key,
714                                &s_temp);
715   if (length > 0)
716     bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
717   else
718     bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
719   (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
720   key_start= keypos;
721   if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
722                       SEARCH_PAGE_KEY_HAS_TRANSID))
723     _ma_mark_page_with_transid(share, anc_page);
724 
725   /* Save pointer to next leaf on parent page */
726   if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
727                            &keypos))
728     goto err;
729   _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
730   anc_page->size= a_length + length;
731   page_store_size(share, anc_page);
732 
733   if (share->now_transactional &&
734       _ma_log_add(anc_page, a_length,
735                   key_start, s_temp.changed_length, s_temp.move_length, 1,
736                   KEY_OP_DEBUG_LOG_ADD_2))
737     goto err;
738 
739   DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
740   stack_alloc_free(next_buff, buff_alloced);
741   stack_alloc_free(keybuff, keybuff_alloced);
742   DBUG_RETURN(new_leaf_length <=
743               (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
744                (uint) keyinfo->underflow_block_length));
745 
746 err:
747   stack_alloc_free(next_buff, buff_alloced);
748   stack_alloc_free(keybuff, keybuff_alloced);
749   DBUG_RETURN(-1);
750 } /* del */
751 
752 
753 /**
754    @brief Balances adjacent pages if underflow occours
755 
756    @fn    underflow()
757    @param anc_buff        Anchestor page data
758    @param leaf_page       Leaf page (page that underflowed)
759    @param leaf_page_link  Pointer to pin information about leaf page
760    @param keypos          Position after current key in anc_buff
761 
762    @note
763      This function writes redo entries for all changes
764      leaf_page is saved to disk
765      Caller must save anc_buff
766 
767      For the algoritm to work, we have to ensure for packed keys that
768      key_length + (underflow_length + max_block_length + key_length) / 2
769      <= block_length.
770      From which follows that underflow_length <= block_length - key_length *3
771      For not packed keys we have:
772      (underflow_length + max_block_length + key_length) / 2 <= block_length
773      From which follows that underflow_length < block_length - key_length
774      This is ensured by setting of underflow_block_length.
775 
776    @return
777    @retval  0  ok
778    @retval  1  ok, but anc_page did underflow
779    @retval -1  error
780  */
781 
underflow(MARIA_HA * info,MARIA_KEYDEF * keyinfo,MARIA_PAGE * anc_page,MARIA_PAGE * leaf_page,uchar * keypos)782 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
783 		     MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
784 		     uchar *keypos)
785 {
786   int t_length;
787   uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
788   uint next_buff_length, new_buff_length, key_reflength;
789   uint unchanged_leaf_length, new_leaf_length, new_anc_length;
790   uint anc_page_flag, page_flag;
791   uchar *anc_key_buff, *leaf_key_buff;
792   uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
793   uchar *anc_buff, *leaf_buff;
794   uchar *after_key, *anc_end_pos;
795   MARIA_KEY_PARAM key_deleted, key_inserted;
796   MARIA_SHARE *share= info->s;
797   my_bool first_key, buff_alloced;
798   MARIA_KEY tmp_key, anc_key, leaf_key;
799   MARIA_PAGE next_page;
800   DBUG_ENTER("underflow");
801   DBUG_PRINT("enter",("leaf_page: %lu  keypos: %p",
802                       (ulong) (leaf_page->pos / share->block_size),
803 		      keypos));
804   DBUG_DUMP("anc_buff", anc_page->buff,  anc_page->size);
805   DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
806 
807   alloc_on_stack(*info->stack_end_ptr, anc_key_buff, buff_alloced,
808                  keyinfo->max_store_length*2);
809   if (!anc_key_buff)
810     DBUG_RETURN(1);
811 
812   leaf_key_buff= anc_key_buff+ keyinfo->max_store_length;
813 
814   anc_page_flag= anc_page->flag;
815   anc_buff= anc_page->buff;
816   leaf_buff= leaf_page->buff;
817   info->keyread_buff_used=1;
818   next_keypos=keypos;
819   nod_flag= leaf_page->node;
820   p_length= nod_flag+share->keypage_header;
821   anc_length= anc_page->size;
822   leaf_length= leaf_page->size;
823   key_reflength= share->base.key_reflength;
824   if (share->keyinfo+info->lastinx == keyinfo)
825     info->page_changed=1;
826   first_key= keypos == anc_buff + share->keypage_header + key_reflength;
827 
828   tmp_key.data=  info->buff;
829   anc_key.data=  anc_key_buff;
830   leaf_key.data= leaf_key_buff;
831   tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;
832 
833   if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
834       first_key)
835   {
836     uint tmp_length;
837     uint next_page_flag;
838     /* Use page right of anc-page */
839     DBUG_PRINT("test",("use right page"));
840 
841     /*
842       Calculate position after the current key. Note that keydata itself is
843       not used
844     */
845     if (keyinfo->flag & HA_BINARY_PACK_KEY)
846     {
847       if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
848 	goto err;
849     }
850     else
851     {
852       /* Avoid length error check if packed key */
853       tmp_key.data[0]= tmp_key.data[1]= 0;
854       /* Got to end of found key */
855       if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
856                                &next_keypos))
857         goto err;
858     }
859     next_page.pos= _ma_kpos(key_reflength, next_keypos);
860     if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
861                           PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
862       goto err;
863     next_buff_length= next_page.size;
864     next_page_flag=   next_page.flag;
865     DBUG_DUMP("next", next_page.buff, next_page.size);
866 
867     /* find keys to make a big key-page */
868     bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
869           key_reflength);
870 
871     if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
872 	!_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
873       goto err;
874 
875     /* merge pages and put parting key from anc_page between */
876     prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
877     t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
878                                    prev_key, prev_key, &key_inserted);
879     tmp_length= next_buff_length - p_length;
880     endpos= next_page.buff + tmp_length + leaf_length + t_length;
881     /* next_page.buff will always be larger than before !*/
882     bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
883     memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
884     (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
885     buff_length= (uint) (endpos - next_page.buff);
886 
887     /* Set page flag from combination of both key pages and parting key */
888     page_flag= next_page_flag | leaf_page->flag;
889     if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
890                         SEARCH_PAGE_KEY_HAS_TRANSID))
891       page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
892 
893     next_page.size= buff_length;
894     next_page.flag= page_flag;
895     page_store_info(share, &next_page);
896 
897     /* remove key from anc_page */
898     if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
899                               anc_key_buff, anc_buff+anc_length,
900                               (my_off_t *) 0, &key_deleted)))
901       goto err;
902 
903     new_anc_length= anc_length - s_length;
904     anc_page->size= new_anc_length;
905     page_store_size(share, anc_page);
906 
907     if (buff_length <= share->max_index_block_size)
908     {
909       /* All keys fitted into one page */
910       page_mark_changed(info, &next_page);
911       if (_ma_dispose(info, next_page.pos, 0))
912        goto err;
913 
914       memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
915       leaf_page->size= next_page.size;
916       leaf_page->flag= next_page.flag;
917 
918       if (share->now_transactional)
919       {
920         /*
921           Log changes to parent page. Note that this page may have been
922           temporarily bigger than block_size.
923          */
924         if (_ma_log_delete(anc_page, key_deleted.key_pos,
925                            key_deleted.changed_length,
926                            key_deleted.move_length,
927                            anc_length - anc_page->org_size,
928                            KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
929           goto err;
930         /*
931           Log changes to leaf page. Data for leaf page is in leaf_buff
932           which contains original leaf_buff, parting key and next_buff
933         */
934         if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
935           goto err;
936       }
937     }
938     else
939     {
940       /*
941         Balancing didn't free a page, so we have to split 'buff' into two
942         pages:
943         - Find key in middle of buffer
944         - Store everything before key in 'leaf_page'
945         - Pack key into anc_page at position of deleted key
946           Note that anc_page may overflow! (is handled by caller)
947         - Store remaining keys in next_page (buff)
948       */
949       MARIA_KEY_PARAM anc_key_inserted;
950 
951       anc_end_pos= anc_buff + new_anc_length;
952 
953       DBUG_PRINT("test",("anc_buff:%p anc_end_pos:%p",
954                          anc_buff, anc_end_pos));
955 
956       if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
957 	goto err;
958       if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
959 	goto err;
960       new_leaf_length= (uint) (half_pos - next_page.buff);
961       memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);
962 
963       leaf_page->size= new_leaf_length;
964       leaf_page->flag= page_flag;
965       page_store_info(share, leaf_page);
966 
967       /* Correct new keypointer to leaf_page */
968       half_pos=after_key;
969       _ma_kpointer(info,
970                    leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
971                    next_page.pos);
972 
973       /* Save key in anc_page */
974       prev_key= (first_key  ? (uchar*) 0 : anc_key.data);
975       t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
976                                      (keypos == anc_end_pos ? (uchar*) 0 :
977                                       keypos),
978                                      prev_key, prev_key, &anc_key_inserted);
979       if (t_length >= 0)
980 	bmove_upp(anc_end_pos+t_length, anc_end_pos,
981                   (uint) (anc_end_pos - keypos));
982       else
983 	bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
984       (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
985       new_anc_length+= t_length;
986       anc_page->size= new_anc_length;
987       page_store_size(share, anc_page);
988 
989       if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
990                            SEARCH_PAGE_KEY_HAS_TRANSID))
991         _ma_mark_page_with_transid(share, anc_page);
992 
993       /* Store key first in new page */
994       if (nod_flag)
995 	bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
996               (size_t) nod_flag);
997       if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
998 	goto err;
999       t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1000 					  (uchar*) 0, (uchar*) 0,
1001 					  &key_inserted);
1002       /* t_length will always be > 0 for a new page !*/
1003       tmp_length= (uint) ((next_page.buff + buff_length) - half_pos);
1004       bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
1005       (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
1006       new_buff_length= tmp_length + t_length + p_length;
1007       next_page.size= new_buff_length;
1008       page_store_size(share, &next_page);
1009       /* keypage flag is already up to date */
1010 
1011       if (share->now_transactional)
1012       {
1013         /*
1014           Log changes to parent page
1015           This has one key deleted from it and one key inserted to it at
1016           keypos
1017 
1018           ma_log_add ensures that we don't log changes that is outside of
1019           key block size, as the REDO code can't handle that
1020         */
1021         if (_ma_log_add(anc_page, anc_length, keypos,
1022                         anc_key_inserted.move_length +
1023                         MY_MAX(anc_key_inserted.changed_length -
1024                             anc_key_inserted.move_length,
1025                             key_deleted.changed_length),
1026                         anc_key_inserted.move_length -
1027                         key_deleted.move_length, 1,
1028                         KEY_OP_DEBUG_LOG_ADD_3))
1029           goto err;
1030 
1031         /*
1032           Log changes to leaf page.
1033           This contains original data with new data added at end
1034         */
1035         DBUG_ASSERT(leaf_length <= new_leaf_length);
1036         if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
1037           goto err;
1038         /*
1039           Log changes to next page
1040 
1041           This contains original data with some prefix data deleted and
1042           some compressed data at start possible extended
1043 
1044           Data in buff was originally:
1045           org_leaf_buff     [leaf_length]
1046           separator_key     [buff_key_inserted.move_length]
1047           next_key_changes  [buff_key_inserted.changed_length -move_length]
1048           next_page_data    [next_buff_length - p_length -
1049                             (buff_key_inserted.changed_length -move_length)]
1050 
1051           After changes it's now:
1052           unpacked_key      [key_inserted.changed_length]
1053           next_suffix       [next_buff_length - key_inserted.changed_length]
1054 
1055         */
1056         DBUG_ASSERT(new_buff_length <= next_buff_length);
1057         if (_ma_log_prefix(&next_page, key_inserted.changed_length,
1058                            (int) (new_buff_length - next_buff_length),
1059                            KEY_OP_DEBUG_LOG_PREFIX_1))
1060           goto err;
1061       }
1062       page_mark_changed(info, &next_page);
1063       if (_ma_write_keypage(&next_page,
1064                             PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1065 	goto err;
1066     }
1067 
1068     page_mark_changed(info, leaf_page);
1069     if (_ma_write_keypage(leaf_page,
1070                           PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1071       goto err;
1072     stack_alloc_free(anc_key_buff, buff_alloced);
1073     DBUG_RETURN(new_anc_length <=
1074                 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1075                   (uint) keyinfo->underflow_block_length)));
1076   }
1077 
1078   DBUG_PRINT("test",("use left page"));
1079 
1080   keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
1081   if (!keypos)
1082     goto err;
1083   next_page.pos= _ma_kpos(key_reflength,keypos);
1084   if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1085                         PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
1086     goto err;
1087   buff_length= next_page.size;
1088   endpos= next_page.buff + buff_length;
1089   DBUG_DUMP("prev", next_page.buff, next_page.size);
1090 
1091   /* find keys to make a big key-page */
1092   bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1093         key_reflength);
1094   next_keypos=keypos;
1095   if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
1096                            &next_keypos))
1097     goto err;
1098   if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
1099     goto err;
1100 
1101   /* merge pages and put parting key from anc_page between */
1102   prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
1103   t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1104 				(leaf_length == p_length ?
1105                                  (uchar*) 0 : leaf_buff+p_length),
1106 				prev_key, prev_key,
1107 				&key_inserted);
1108   if (t_length >= 0)
1109     bmove(endpos+t_length, leaf_buff+p_length,
1110           (size_t) (leaf_length-p_length));
1111   else						/* We gained space */
1112     bmove(endpos,leaf_buff+((int) p_length-t_length),
1113 	  (size_t) (leaf_length-p_length+t_length));
1114   (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1115 
1116   /* Remember for logging how many bytes of leaf_buff that are not changed */
1117   DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
1118   unchanged_leaf_length= (leaf_length - p_length -
1119                           (key_inserted.changed_length -
1120                            key_inserted.move_length));
1121 
1122   new_buff_length= buff_length + leaf_length - p_length + t_length;
1123 
1124 #ifdef EXTRA_DEBUG
1125   /* Ensure that unchanged_leaf_length is correct */
1126   DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
1127                    leaf_buff + leaf_length - unchanged_leaf_length,
1128                    unchanged_leaf_length) == 0);
1129 #endif
1130 
1131   page_flag= next_page.flag | leaf_page->flag;
1132   if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1133                        SEARCH_PAGE_KEY_HAS_TRANSID))
1134     page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
1135 
1136   next_page.size= new_buff_length;
1137   next_page.flag= page_flag;
1138   page_store_info(share, &next_page);
1139 
1140   /* remove key from anc_page */
1141   if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
1142                              anc_key_buff,
1143                              anc_buff+anc_length, (my_off_t *) 0,
1144                              &key_deleted)))
1145     goto err;
1146 
1147   new_anc_length= anc_length - s_length;
1148   anc_page->size= new_anc_length;
1149   page_store_size(share, anc_page);
1150 
1151   if (new_buff_length <= share->max_index_block_size)
1152   {
1153     /* All keys fitted into one page */
1154     page_mark_changed(info, leaf_page);
1155     if (_ma_dispose(info, leaf_page->pos, 0))
1156       goto err;
1157 
1158     if (share->now_transactional)
1159     {
1160       /*
1161         Log changes to parent page. Note that this page may have been
1162         temporarily bigger than block_size.
1163       */
1164       if (_ma_log_delete(anc_page, key_deleted.key_pos,
1165                          key_deleted.changed_length, key_deleted.move_length,
1166                          anc_length - anc_page->org_size,
1167                          KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
1168         goto err;
1169       /*
1170         Log changes to next page. Data for leaf page is in buff
1171         that contains original leaf_buff, parting key and next_buff
1172       */
1173       if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1174         goto err;
1175     }
1176   }
1177   else
1178   {
1179     /*
1180       Balancing didn't free a page, so we have to split 'next_page' into two
1181       pages
1182       - Find key in middle of buffer (buff)
1183       - Pack key at half_buff into anc_page at position of deleted key
1184         Note that anc_page may overflow! (is handled by caller)
1185       - Move everything after middlekey to 'leaf_buff'
1186       - Shorten buff at 'endpos'
1187     */
1188     MARIA_KEY_PARAM anc_key_inserted;
1189     size_t tmp_length;
1190 
1191     if (keypos == anc_buff + share->keypage_header + key_reflength)
1192       anc_pos= 0;				/* First key */
1193     else
1194     {
1195       if (!_ma_get_last_key(&anc_key, anc_page, keypos))
1196         goto err;
1197       anc_pos= anc_key.data;
1198     }
1199     if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
1200       goto err;
1201 
1202     /* Correct new keypointer to leaf_page */
1203     _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
1204                  leaf_key.ref_length, leaf_page->pos);
1205 
1206     /* Save parting key found by _ma_find_half_pos() in anc_page */
1207     DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1208     DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1209     anc_end_pos= anc_buff + new_anc_length;
1210     t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1211 				  keypos == anc_end_pos ? (uchar*) 0
1212 				  : keypos,
1213 				  anc_pos, anc_pos,
1214 				  &anc_key_inserted);
1215     if (t_length >= 0)
1216       bmove_upp(anc_end_pos+t_length, anc_end_pos,
1217                 (uint) (anc_end_pos-keypos));
1218     else
1219       bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
1220     (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1221     new_anc_length+= t_length;
1222     anc_page->size= new_anc_length;
1223     page_store_size(share, anc_page);
1224 
1225     if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1226                          SEARCH_PAGE_KEY_HAS_TRANSID))
1227       _ma_mark_page_with_transid(share, anc_page);
1228 
1229     /* Store first key on new page */
1230     if (nod_flag)
1231       bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1232             (size_t) nod_flag);
1233     if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1234       goto err;
1235     DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
1236     t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1237 				  (uchar*) 0, (uchar*) 0, &key_inserted);
1238     /* t_length will always be > 0 for a new page !*/
1239     tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
1240     DBUG_PRINT("info",("t_length: %d  length: %d",t_length, (int) tmp_length));
1241     bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1242     (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1243     new_leaf_length= (uint)(tmp_length + t_length + p_length);
1244     DBUG_ASSERT(new_leaf_length <= share->max_index_block_size);
1245 
1246     leaf_page->size= new_leaf_length;
1247     leaf_page->flag= page_flag;
1248     page_store_info(share, leaf_page);
1249 
1250     new_buff_length= (uint) (endpos - next_page.buff);
1251     next_page.size= new_buff_length;
1252     page_store_size(share, &next_page);
1253 
1254     if (share->now_transactional)
1255     {
1256       /*
1257         Log changes to parent page
1258         This has one key deleted from it and one key inserted to it at
1259         keypos
1260 
1261         ma_log_add() ensures that we don't log changes that is outside of
1262         key block size, as the REDO code can't handle that
1263       */
1264       if (_ma_log_add(anc_page, anc_length, keypos,
1265                       anc_key_inserted.move_length +
1266                       MY_MAX(anc_key_inserted.changed_length -
1267                           anc_key_inserted.move_length,
1268                           key_deleted.changed_length),
1269                       anc_key_inserted.move_length -
1270                       key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4))
1271         goto err;
1272 
1273       /*
1274         Log changes to leaf page.
1275         This contains original data with new data added first
1276       */
1277       DBUG_ASSERT(leaf_length <= new_leaf_length);
1278       DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
1279       if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
1280                          (int) (new_leaf_length - leaf_length),
1281                          KEY_OP_DEBUG_LOG_PREFIX_2))
1282         goto err;
1283       /*
1284         Log changes to next page
1285         This contains original data with some suffix data deleted
1286       */
1287       DBUG_ASSERT(new_buff_length <= buff_length);
1288       if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1289         goto err;
1290     }
1291 
1292     page_mark_changed(info, leaf_page);
1293     if (_ma_write_keypage(leaf_page,
1294                           PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1295       goto err;
1296   }
1297   page_mark_changed(info, &next_page);
1298   if (_ma_write_keypage(&next_page,
1299                         PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1300     goto err;
1301 
1302   stack_alloc_free(anc_key_buff, buff_alloced);
1303   DBUG_RETURN(new_anc_length <=
1304               ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1305                 (uint) keyinfo->underflow_block_length)));
1306 
1307 err:
1308   stack_alloc_free(anc_key_buff, buff_alloced);
1309   DBUG_RETURN(-1);
1310 } /* underflow */
1311 
1312 
1313 /**
1314   @brief Remove a key from page
1315 
1316   @fn remove_key()
1317     keyinfo	          Key handle
1318     nod_flag              Length of node ptr
1319     keypos	          Where on page key starts
1320     lastkey	          Buffer for storing keys to be removed
1321     page_end	          Pointer to end of page
1322     next_block	          If <> 0 and node-page, this is set to address of
1323     		          next page
1324     s_temp	          Information about what changes was done one the page:
1325     s_temp.key_pos        Start of key
1326     s_temp.move_length    Number of bytes removed at keypos
1327     s_temp.changed_length Number of bytes changed at keypos
1328 
1329   @todo
1330     The current code doesn't handle the case that the next key may be
1331     packed better against the previous key if there is a case difference
1332 
1333   @return
1334   @retval 0  error
1335   @retval #  How many chars was removed
1336 */
1337 
remove_key(MARIA_KEYDEF * keyinfo,uint page_flag,uint nod_flag,uchar * keypos,uchar * lastkey,uchar * page_end,my_off_t * next_block,MARIA_KEY_PARAM * s_temp)1338 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1339 		       uchar *keypos, uchar *lastkey,
1340 		       uchar *page_end, my_off_t *next_block,
1341                        MARIA_KEY_PARAM *s_temp)
1342 {
1343   int s_length;
1344   uchar *start;
1345   DBUG_ENTER("remove_key");
1346   DBUG_PRINT("enter", ("keypos:%p page_end: %p",
1347                        keypos, page_end));
1348 
1349   start= s_temp->key_pos= keypos;
1350   s_temp->changed_length= 0;
1351   if (!(keyinfo->flag &
1352 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1353 	 HA_BINARY_PACK_KEY)) &&
1354       !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1355   {
1356     /* Static length key */
1357     s_length=(int) (keyinfo->keylength+nod_flag);
1358     if (next_block && nod_flag)
1359       *next_block= _ma_kpos(nod_flag,keypos+s_length);
1360   }
1361   else
1362   {
1363     /* Let keypos point at next key */
1364     MARIA_KEY key;
1365 
1366     /* Calculate length of key */
1367     key.keyinfo= keyinfo;
1368     key.data=    lastkey;
1369     if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1370       DBUG_RETURN(0);				/* Error */
1371 
1372     if (next_block && nod_flag)
1373       *next_block= _ma_kpos(nod_flag,keypos);
1374     s_length=(int) (keypos-start);
1375     if (keypos != page_end)
1376     {
1377       if (keyinfo->flag & HA_BINARY_PACK_KEY)
1378       {
1379 	uchar *old_key= start;
1380 	uint next_length,prev_length,prev_pack_length;
1381 
1382         /* keypos points here on start of next key */
1383 	get_key_length(next_length,keypos);
1384 	get_key_pack_length(prev_length,prev_pack_length,old_key);
1385 	if (next_length > prev_length)
1386 	{
1387           uint diff= (next_length-prev_length);
1388 	  /* We have to copy data from the current key to the next key */
1389 	  keypos-= diff + prev_pack_length;
1390 	  store_key_length(keypos, prev_length);
1391           bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1392 	  s_length=(int) (keypos-start);
1393           s_temp->changed_length= diff + prev_pack_length;
1394 	}
1395       }
1396       else
1397       {
1398 	/* Check if a variable length first key part */
1399 	if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
1400 	{
1401 	  /* Next key is packed against the current one */
1402 	  uint next_length,prev_length,prev_pack_length,lastkey_length,
1403 	    rest_length;
1404 	  if (keyinfo->seg[0].length >= 127)
1405 	  {
1406 	    if (!(prev_length=mi_uint2korr(start) & 32767))
1407 	      goto end;
1408 	    next_length=mi_uint2korr(keypos) & 32767;
1409 	    keypos+=2;
1410 	    prev_pack_length=2;
1411 	  }
1412 	  else
1413 	  {
1414 	    if (!(prev_length= *start & 127))
1415 	      goto end;				/* Same key as previous*/
1416 	    next_length= *keypos & 127;
1417 	    keypos++;
1418 	    prev_pack_length=1;
1419 	  }
1420 	  if (!(*start & 128))
1421 	    prev_length=0;			/* prev key not packed */
1422 	  if (keyinfo->seg[0].flag & HA_NULL_PART)
1423 	    lastkey++;				/* Skip null marker */
1424 	  get_key_length(lastkey_length,lastkey);
1425 	  if (!next_length)			/* Same key after */
1426 	  {
1427 	    next_length=lastkey_length;
1428 	    rest_length=0;
1429 	  }
1430 	  else
1431 	    get_key_length(rest_length,keypos);
1432 
1433 	  if (next_length >= prev_length)
1434 	  {
1435             /* Next key is based on deleted key */
1436             uint pack_length;
1437             uint diff= (next_length-prev_length);
1438 
1439             /* keypos points to data of next key (after key length) */
1440 	    bmove(keypos - diff, lastkey + prev_length, diff);
1441 	    rest_length+= diff;
1442 	    pack_length= prev_length ? get_pack_length(rest_length): 0;
1443 	    keypos-= diff + pack_length + prev_pack_length;
1444 	    s_length=(int) (keypos-start);
1445 	    if (prev_length)			/* Pack against prev key */
1446 	    {
1447 	      *keypos++= start[0];
1448 	      if (prev_pack_length == 2)
1449 		*keypos++= start[1];
1450 	      store_key_length(keypos,rest_length);
1451 	    }
1452 	    else
1453 	    {
1454 	      /* Next key is not packed anymore */
1455 	      if (keyinfo->seg[0].flag & HA_NULL_PART)
1456 	      {
1457 		rest_length++;			/* Mark not null */
1458 	      }
1459 	      if (prev_pack_length == 2)
1460 	      {
1461 		mi_int2store(keypos,rest_length);
1462 	      }
1463 	      else
1464 		*keypos= rest_length;
1465 	    }
1466             s_temp->changed_length= diff + pack_length + prev_pack_length;
1467 	  }
1468 	}
1469       }
1470     }
1471   }
1472   end:
1473   bmove(start, start+s_length, (uint) (page_end-start-s_length));
1474   s_temp->move_length= s_length;
1475   DBUG_RETURN((uint) s_length);
1476 } /* remove_key */
1477 
1478 
1479 /****************************************************************************
1480   Logging of redos
1481 ****************************************************************************/
1482 
1483 /**
1484    @brief
1485    log entry where some parts are deleted and some things are changed
1486    and some data could be added last.
1487 
1488    @fn _ma_log_delete()
1489    @param info		  Maria handler
1490    @param page	          Pageaddress for changed page
1491    @param buff		  Page buffer
1492    @param key_pos         Start of change area
1493    @param changed_length  How many bytes where changed at key_pos
1494    @param move_length     How many bytes where deleted at key_pos
1495    @param append_length	  Length of data added last
1496 		          This is taken from end of ma_page->buff
1497 
1498    This is mainly used when a key is deleted. The append happens
1499    when we delete a key from a page with data > block_size kept in
1500    memory and we have to add back the data that was stored > block_size
1501 */
1502 
_ma_log_delete(MARIA_PAGE * ma_page,const uchar * key_pos,uint changed_length,uint move_length,uint append_length,enum en_key_debug debug_marker)1503 my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
1504                        uint changed_length, uint move_length,
1505                        uint append_length __attribute__((unused)),
1506                        enum en_key_debug debug_marker __attribute__((unused)))
1507 {
1508   LSN lsn;
1509   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7];
1510   uchar *log_pos;
1511   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
1512   uint translog_parts, current_size, extra_length;
1513   uint offset= (uint) (key_pos - ma_page->buff);
1514   MARIA_HA *info= ma_page->info;
1515   MARIA_SHARE *share= info->s;
1516   my_off_t page= ma_page->pos / share->block_size;
1517   DBUG_ENTER("_ma_log_delete");
1518   DBUG_PRINT("enter", ("page: %lu  offset: %u  changed_length: %u  move_length: %u  append_length: %u  page_size: %u",
1519                        (ulong) page, offset, changed_length, move_length,
1520                        append_length, ma_page->size));
1521   DBUG_ASSERT(share->now_transactional && move_length);
1522   DBUG_ASSERT(offset + changed_length <= ma_page->size);
1523   DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
1524   DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
1525 
1526   /* Store address of new root page */
1527   page_store(log_data + FILEID_STORE_SIZE, page);
1528   log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1529   current_size= ma_page->org_size;
1530 
1531 #ifdef EXTRA_DEBUG_KEY_CHANGES
1532   *log_pos++= KEY_OP_DEBUG;
1533   *log_pos++= debug_marker;
1534 
1535   *log_pos++= KEY_OP_DEBUG_2;
1536   int2store(log_pos,   ma_page->org_size);
1537   int2store(log_pos+2, ma_page->size);
1538   log_pos+=4;
1539 #endif
1540 
1541   /* Store keypage_flag */
1542   *log_pos++= KEY_OP_SET_PAGEFLAG;
1543   *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
1544 
1545   log_pos[0]= KEY_OP_OFFSET;
1546   int2store(log_pos+1, offset);
1547   log_pos+= 3;
1548   translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
1549   extra_length= 0;
1550 
1551   if (changed_length)
1552   {
1553     if (offset + changed_length >= share->max_index_block_size)
1554     {
1555       changed_length= share->max_index_block_size - offset;
1556       move_length= 0;                           /* Nothing to move */
1557       current_size= share->max_index_block_size;
1558     }
1559 
1560     log_pos[0]= KEY_OP_CHANGE;
1561     int2store(log_pos+1, changed_length);
1562     log_pos+= 3;
1563     log_array[translog_parts].str=    ma_page->buff + offset;
1564     log_array[translog_parts].length= changed_length;
1565     translog_parts++;
1566 
1567     /* We only have to move things after offset+changed_length */
1568     offset+= changed_length;
1569   }
1570 
1571   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
1572   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1573 
1574   if (move_length)
1575   {
1576     uint log_length;
1577     if (offset + move_length < share->max_index_block_size)
1578     {
1579       /*
1580         Move down things that is on page.
1581         page_offset in apply_redo_inxed() will be at original offset
1582         + changed_length.
1583       */
1584       log_pos[0]= KEY_OP_SHIFT;
1585       int2store(log_pos+1, - (int) move_length);
1586       log_length= 3;
1587       current_size-= move_length;
1588     }
1589     else
1590     {
1591       /* Delete to end of page */
1592       uint tmp= current_size - offset;
1593       current_size= offset;
1594       log_pos[0]= KEY_OP_DEL_SUFFIX;
1595       int2store(log_pos+1, tmp);
1596       log_length= 3;
1597     }
1598     log_array[translog_parts].str=    log_pos;
1599     log_array[translog_parts].length= log_length;
1600     translog_parts++;
1601     log_pos+= log_length;
1602     extra_length+= log_length;
1603   }
1604 
1605   if (current_size != ma_page->size &&
1606       current_size != share->max_index_block_size)
1607   {
1608     /* Append data that didn't fit on the page before */
1609     uint length= (MY_MIN(ma_page->size, share->max_index_block_size) -
1610                   current_size);
1611     uchar *data= ma_page->buff + current_size;
1612 
1613     DBUG_ASSERT(length <= append_length);
1614 
1615     log_pos[0]= KEY_OP_ADD_SUFFIX;
1616     int2store(log_pos+1, length);
1617     log_array[translog_parts].str=        log_pos;
1618     log_array[translog_parts].length=     3;
1619     log_array[translog_parts + 1].str=    data;
1620     log_array[translog_parts + 1].length= length;
1621     log_pos+= 3;
1622     translog_parts+= 2;
1623     current_size+= length;
1624     extra_length+= 3 + length;
1625   }
1626 
1627   _ma_log_key_changes(ma_page,
1628                       log_array + translog_parts,
1629                       log_pos, &extra_length, &translog_parts);
1630   /* Remember new page length for future log entires for same page */
1631   ma_page->org_size= current_size;
1632 
1633   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1634                             info->trn, info,
1635                             (translog_size_t)
1636                             log_array[TRANSLOG_INTERNAL_PARTS].length +
1637                             changed_length + extra_length, translog_parts,
1638                             log_array, log_data, NULL))
1639     DBUG_RETURN(1);
1640 
1641   DBUG_RETURN(0);
1642 }
1643 
1644 
1645 /****************************************************************************
1646   Logging of undos
1647 ****************************************************************************/
1648 
_ma_write_undo_key_delete(MARIA_HA * info,const MARIA_KEY * key,my_off_t new_root,LSN * res_lsn)1649 my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
1650                                   my_off_t new_root, LSN *res_lsn)
1651 {
1652   MARIA_SHARE *share= info->s;
1653   uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1654                  KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
1655   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1656   struct st_msg_to_write_hook_for_undo_key msg;
1657   enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1658   uint keynr= key->keyinfo->key_nr;
1659 
1660   lsn_store(log_data, info->trn->undo_lsn);
1661   key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
1662   log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
1663 
1664   /**
1665     @todo BUG if we had concurrent insert/deletes, reading state's key_root
1666     like this would be unsafe.
1667   */
1668   if (new_root != share->state.key_root[keynr])
1669   {
1670     my_off_t page;
1671     page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1672            new_root / share->block_size);
1673     page_store(log_pos, page);
1674     log_pos+= PAGE_STORE_SIZE;
1675     log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
1676   }
1677 
1678   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
1679   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1680   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key->data;
1681   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
1682                                                   key->ref_length);
1683 
1684   msg.root= &share->state.key_root[keynr];
1685   msg.value= new_root;
1686   /*
1687     set autoincrement to 1 if this is an auto_increment key
1688     This is only used if we are now in a rollback of a duplicate key
1689   */
1690   msg.auto_increment= share->base.auto_key == keynr + 1;
1691 
1692   return translog_write_record(res_lsn, log_type,
1693                                info->trn, info,
1694                                (translog_size_t)
1695                                (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1696                                 log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
1697                                TRANSLOG_INTERNAL_PARTS + 2, log_array,
1698                                log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1699 }
1700