1 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2    Copyright (C) 2009-2010 Monty Program Ab
backend_termion_should_only_write_diffs() -> Result<(), Box<dyn std::error::Error>>3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 #include "ma_fulltext.h"
18 #include "ma_rt_index.h"
19 #include "trnman.h"
20 #include "ma_key_recover.h"
21 
22 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
23                     MARIA_PAGE *page);
24 static int del(MARIA_HA *info, MARIA_KEY *key,
25                MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
26 	       uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
27 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
28 		     MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
29 		     uchar *keypos);
30 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
31                        uchar *keypos, uchar *lastkey, uchar *page_end,
32 		       my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
33 
34 /* @breif Remove a row from a MARIA table */
35 
36 int maria_delete(MARIA_HA *info,const uchar *record)
37 {
38   uint i;
39   uchar *old_key;
40   int save_errno;
41   char lastpos[8];
42   MARIA_SHARE *share= info->s;
43   MARIA_KEYDEF *keyinfo;
44   DBUG_ENTER("maria_delete");
45 
46   /* Test if record is in datafile */
47   DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
48                   maria_print_error(share, HA_ERR_CRASHED);
49                   DBUG_RETURN(my_errno= HA_ERR_CRASHED););
50   DBUG_EXECUTE_IF("my_error_test_undefined_error",
51                   maria_print_error(share, INT_MAX);
52                   DBUG_RETURN(my_errno= INT_MAX););
53   if (!(info->update & HA_STATE_AKTIV))
54   {
55     DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND);	/* No database read */
56   }
57   if (share->options & HA_OPTION_READ_ONLY_DATA)
58   {
59     DBUG_RETURN(my_errno=EACCES);
60   }
61   if (_ma_readinfo(info,F_WRLCK,1))
62     DBUG_RETURN(my_errno);
63   if ((*share->compare_record)(info,record))
64     goto err;				/* Error on read-check */
65 
66   if (_ma_mark_file_changed(share))
67     goto err;
68 
69   /* Ensure we don't change the autoincrement value */
70   info->last_auto_increment= ~(ulonglong) 0;
71   /* Remove all keys from the index file */
72 
73   old_key= info->lastkey_buff2;
74 
75   for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
76   {
77     if (maria_is_key_active(share->state.key_map, i))
78     {
79       keyinfo->version++;
80       if (keyinfo->flag & HA_FULLTEXT)
81       {
82         if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
83           goto err;
84       }
85       else
86       {
87         MARIA_KEY key;
88         if (keyinfo->ck_delete(info,
89                                (*keyinfo->make_key)(info, &key, i, old_key,
90                                                     record,
91                                                     info->cur_row.lastpos,
92                                                     info->cur_row.trid)))
93           goto err;
94       }
95       /* The above changed info->lastkey2. Inform maria_rnext_same(). */
96       info->update&= ~HA_STATE_RNEXT_SAME;
97     }
98   }
99 
100   if (share->calc_checksum)
101   {
102     /*
103       We can't use the row based checksum as this doesn't have enough
104       precision.
105     */
106     info->cur_row.checksum= (*share->calc_checksum)(info, record);
107   }
108 
109   if ((*share->delete_record)(info, record))
110     goto err;				/* Remove record from database */
111 
112   info->state->checksum-= info->cur_row.checksum;
113   info->state->records--;
114   info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
115   info->row_changes++;
116   share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
117                           STATE_NOT_ZEROFILLED);
118   info->state->changed=1;
119 
120   mi_sizestore(lastpos, info->cur_row.lastpos);
121   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
122   if (info->invalidator != 0)
123   {
124     DBUG_PRINT("info", ("invalidator... '%s' (delete)",
125                         share->open_file_name.str));
126     (*info->invalidator)(share->open_file_name.str);
127     info->invalidator=0;
128   }
129   DBUG_RETURN(0);
130 
131 err:
132   save_errno= my_errno;
133   DBUG_ASSERT(save_errno);
134   if (!save_errno)
135     save_errno= HA_ERR_INTERNAL_ERROR;          /* Should never happen */
136 
137   mi_sizestore(lastpos, info->cur_row.lastpos);
138   (void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
139   info->update|=HA_STATE_WRITTEN;	/* Buffer changed */
140   if (save_errno != HA_ERR_RECORD_CHANGED)
141   {
142     _ma_set_fatal_error(share, HA_ERR_CRASHED);
143     save_errno= HA_ERR_CRASHED;
144   }
145   DBUG_RETURN(my_errno= save_errno);
146 } /* maria_delete */
147 
148 
149 /*
150   Remove a key from the btree index
151 
152   TODO:
153    Change ma_ck_real_delete() to use another buffer for changed keys instead
154    of key->data. This would allows us to remove the copying of the key here.
155 */
156 
157 my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
158 {
159   MARIA_SHARE *share= info->s;
160   int res;
161   LSN lsn= LSN_IMPOSSIBLE;
162   my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
163   uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data;
164   MARIA_KEY org_key;
165   DBUG_ENTER("_ma_ck_delete");
166 
167   LINT_INIT_STRUCT(org_key);
168 
169   save_key_data= key->data;
170   if (share->now_transactional)
171   {
172     /* Save original value as the key may change */
173     memcpy(key_buff, key->data, key->data_length + key->ref_length);
174     org_key= *key;
175     key->data= key_buff;
176   }
177 
178   if ((res= _ma_ck_real_delete(info, key, &new_root)))
179   {
180     /* We have to mark the table crashed before unpin_all_pages() */
181     maria_mark_crashed(info);
182   }
183 
184   key->data= save_key_data;
185   if (!res && share->now_transactional)
186     res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
187   else
188   {
189     share->state.key_root[key->keyinfo->key_nr]= new_root;
190     _ma_fast_unlock_key_del(info);
191   }
192   _ma_unpin_all_pages_and_finalize_row(info, lsn);
193   DBUG_RETURN(res != 0);
194 } /* _ma_ck_delete */
195 
196 
197 my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
198                            my_off_t *root)
199 {
200   int error;
201   my_bool result= 0;
202   my_off_t old_root;
203   uchar *root_buff;
204   MARIA_KEYDEF *keyinfo= key->keyinfo;
205   MARIA_PAGE page;
206   DBUG_ENTER("_ma_ck_real_delete");
207 
208   if ((old_root=*root) == HA_OFFSET_ERROR)
209   {
210     _ma_set_fatal_error(info->s, HA_ERR_CRASHED);
211     DBUG_RETURN(1);
212   }
213   if (!(root_buff= (uchar*)  my_alloca((uint) keyinfo->block_length+
214                                        MARIA_MAX_KEY_BUFF*2)))
215   {
216     DBUG_PRINT("error",("Couldn't allocate memory"));
217     my_errno=ENOMEM;
218     DBUG_RETURN(1);
219   }
220   DBUG_PRINT("info",("root_page: %lu",
221                      (ulong) (old_root / keyinfo->block_length)));
222   if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
223                         PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
224   {
225     result= 1;
226     goto err;
227   }
228   if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
229                                    SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
230                                    SEARCH_SAME),
231                        &page)))
232   {
233     if (error < 0)
234       result= 1;
235     else if (error == 2)
236     {
237       DBUG_PRINT("test",("Enlarging of root when deleting"));
238       if (_ma_enlarge_root(info, key, root))
239         result= 1;
240     }
241     else /* error == 1 */
242     {
243       MARIA_SHARE *share= info->s;
244 
245       page_mark_changed(info, &page);
246 
247       if (page.size <= page.node + share->keypage_header + 1)
248       {
249         DBUG_ASSERT(page.size == page.node + share->keypage_header);
250 	if (page.node)
251 	  *root= _ma_kpos(page.node, root_buff +share->keypage_header +
252                           page.node);
253 	else
254 	  *root=HA_OFFSET_ERROR;
255 	if (_ma_dispose(info, old_root, 0))
256 	  result= 1;
257       }
258       else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
259                                  DFLT_INIT_HITS))
260         result= 1;
261     }
262   }
263 err:
264   my_afree(root_buff);
265   DBUG_PRINT("exit",("Return: %d",result));
266   DBUG_RETURN(result);
267 } /* _ma_ck_real_delete */
268 
269 
270 /**
271    @brief Remove key below key root
272 
273    @param key  Key to delete.  Will contain new key if block was enlarged
274 
275    @return
276    @retval 0   ok (anc_page is not changed)
277    @retval 1   If data on page is too small; In this case anc_buff is not saved
278    @retval 2   If data on page is too big
279    @retval -1  On errors
280 */
281 
282 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
283                     MARIA_PAGE *anc_page)
284 {
285   int flag,ret_value,save_flag;
286   uint nod_flag, page_flag;
287   my_bool last_key;
288   uchar *leaf_buff,*keypos;
289   uchar lastkey[MARIA_MAX_KEY_BUFF];
290   MARIA_KEY_PARAM s_temp;
291   MARIA_SHARE *share= info->s;
292   MARIA_KEYDEF *keyinfo= key->keyinfo;
293   MARIA_PAGE leaf_page;
294   DBUG_ENTER("d_search");
295   DBUG_DUMP("page", anc_page->buff, anc_page->size);
296 
297   flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
298                               &last_key);
299   if (flag == MARIA_FOUND_WRONG_KEY)
300   {
301     DBUG_PRINT("error",("Found wrong key"));
302     DBUG_RETURN(-1);
303   }
304   page_flag= anc_page->flag;
305   nod_flag=  anc_page->node;
306 
307   if (!flag && (keyinfo->flag & HA_FULLTEXT))
308   {
309     uint off;
310     int  subkeys;
311 
312     get_key_full_length_rdonly(off, lastkey);
313     subkeys=ft_sintXkorr(lastkey+off);
314     DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
315     comp_flag=SEARCH_SAME;
316     if (subkeys >= 0)
317     {
318       /* normal word, one-level tree structure */
319       if (info->ft1_to_ft2)
320       {
321         /* we're in ft1->ft2 conversion mode. Saving key data */
322         insert_dynamic(info->ft1_to_ft2, (lastkey+off));
323       }
324       else
325       {
326         /* we need exact match only if not in ft1->ft2 conversion mode */
327         flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
328                                     lastkey, &last_key);
329       }
330       /* fall through to normal delete */
331     }
332     else
333     {
334       /* popular word. two-level tree. going down */
335       uint tmp_key_length;
336       my_off_t root;
337       uchar *kpos=keypos;
338       MARIA_KEY tmp_key;
339 
340       tmp_key.data=    lastkey;
341       tmp_key.keyinfo= keyinfo;
342 
343       if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
344                                                &kpos)))
345       {
346         _ma_set_fatal_error(share, HA_ERR_CRASHED);
347         DBUG_RETURN(-1);
348       }
349       root= _ma_row_pos_from_key(&tmp_key);
350       if (subkeys == -1)
351       {
352         /* the last entry in sub-tree */
353         if (_ma_dispose(info, root, 1))
354           DBUG_RETURN(-1);
355         /* fall through to normal delete */
356       }
357       else
358       {
359         MARIA_KEY word_key;
360         keyinfo=&share->ft2_keyinfo;
361         /* we'll modify key entry 'in vivo' */
362         kpos-=keyinfo->keylength+nod_flag;
363         get_key_full_length_rdonly(off, key->data);
364 
365         word_key.data=        key->data + off;
366         word_key.keyinfo=     &share->ft2_keyinfo;
367         word_key.data_length= HA_FT_WLEN;
368         word_key.ref_length= 0;
369         word_key.flag= 0;
370         ret_value= _ma_ck_real_delete(info, &word_key, &root);
371         _ma_dpointer(share, kpos+HA_FT_WLEN, root);
372         subkeys++;
373         ft_intXstore(kpos, subkeys);
374         if (!ret_value)
375         {
376           page_mark_changed(info, anc_page);
377           ret_value= _ma_write_keypage(anc_page,
378                                        PAGECACHE_LOCK_LEFT_WRITELOCKED,
379                                        DFLT_INIT_HITS);
380         }
381         DBUG_PRINT("exit",("Return: %d",ret_value));
382         DBUG_RETURN(ret_value);
383       }
384     }
385   }
386   leaf_buff=0;
387   if (nod_flag)
388   {
389     /* Read left child page */
390     leaf_page.pos= _ma_kpos(nod_flag,keypos);
391     if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
392                                        MARIA_MAX_KEY_BUFF*2)))
393     {
394       DBUG_PRINT("error", ("Couldn't allocate memory"));
395       my_errno=ENOMEM;
396       DBUG_RETURN(-1);
397     }
398     if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
399                           PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
400                           0))
401       goto err;
402   }
403 
404   if (flag != 0)
405   {
406     if (!nod_flag)
407     {
408       /* This should newer happend */
409       DBUG_PRINT("error",("Didn't find key"));
410       _ma_set_fatal_error(share, HA_ERR_CRASHED);
411       goto err;
412     }
413     save_flag=0;
414     ret_value= d_search(info, key, comp_flag, &leaf_page);
415   }
416   else
417   {						/* Found key */
418     uint tmp;
419     uint anc_buff_length= anc_page->size;
420     uint anc_page_flag=   anc_page->flag;
421     my_off_t next_block;
422 
423     if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
424                           anc_page->buff + anc_buff_length,
425                           &next_block, &s_temp)))
426       goto err;
427 
428     page_mark_changed(info, anc_page);
429     anc_buff_length-= tmp;
430     anc_page->size= anc_buff_length;
431     page_store_size(share, anc_page);
432 
433     /*
434       Log initial changes on pages
435       If there is an underflow, there will be more changes logged to the
436       page
437     */
438     if (share->now_transactional &&
439         _ma_log_delete(anc_page, s_temp.key_pos,
440                        s_temp.changed_length, s_temp.move_length,
441                        0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
442       DBUG_RETURN(-1);
443 
444     if (!nod_flag)
445     {						/* On leaf page */
446       if (anc_buff_length <= (info->quick_mode ?
447                               MARIA_MIN_KEYBLOCK_LENGTH :
448                               (uint) keyinfo->underflow_block_length))
449       {
450         /* Page will be written by caller if we return 1 */
451         DBUG_RETURN(1);
452       }
453       if (_ma_write_keypage(anc_page,
454                             PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
455 	DBUG_RETURN(-1);
456       DBUG_RETURN(0);
457     }
458     save_flag=1;                         /* Mark that anc_buff is changed */
459     ret_value= del(info, key, anc_page, &leaf_page,
460                    keypos, next_block, lastkey);
461   }
462   if (ret_value >0)
463   {
464     save_flag= 2;
465     if (ret_value == 1)
466       ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
467     else
468     {
469       /* This can only happen with variable length keys */
470       MARIA_KEY last_key;
471       DBUG_PRINT("test",("Enlarging of key when deleting"));
472 
473       last_key.data=    lastkey;
474       last_key.keyinfo= keyinfo;
475       if (!_ma_get_last_key(&last_key, anc_page, keypos))
476 	goto err;
477       ret_value= _ma_insert(info, key, anc_page, keypos,
478                             last_key.data,
479                             (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
480 
481       if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
482                             DFLT_INIT_HITS))
483         ret_value= -1;
484     }
485   }
486   if (ret_value == 0 && anc_page->size > share->max_index_block_size)
487   {
488     /*
489       parent buffer got too big ; We have to split the page.
490       The | 2 is there to force write of anc page below
491     */
492     save_flag= 3;
493     ret_value= _ma_split_page(info, key, anc_page,
494                               share->max_index_block_size,
495                               (uchar*) 0, 0, 0, lastkey, 0) | 2;
496     DBUG_ASSERT(anc_page->org_size == anc_page->size);
497   }
498   if (save_flag && ret_value != 1)
499   {
500     page_mark_changed(info, anc_page);
501     if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
502                           DFLT_INIT_HITS))
503       ret_value= -1;
504   }
505   else
506   {
507     DBUG_DUMP("page", anc_page->buff, anc_page->size);
508   }
509   my_afree(leaf_buff);
510   DBUG_PRINT("exit",("Return: %d",ret_value));
511   DBUG_RETURN(ret_value);
512 
513 err:
514   my_afree(leaf_buff);
515   DBUG_PRINT("exit",("Error: %d",my_errno));
516   DBUG_RETURN (-1);
517 } /* d_search */
518 
519 
520 /**
521    @brief Remove a key that has a page-reference
522 
523    @param info		 Maria handler
524    @param key		 Buffer for key to be inserted at upper level
525    @param anc_page	 Page address for page where deleted key was
526    @param anc_buff       Page buffer (nod) where deleted key was
527    @param leaf_page      Page address for nod before the deleted key
528    @param leaf_buff      Buffer for leaf_page
529    @param leaf_buff_link Pinned page link for leaf_buff
530    @param keypos         Pos to where deleted key was on anc_buff
531    @param next_block	 Page adress for nod after deleted key
532    @param ret_key_buff	 Key before keypos in anc_buff
533 
534    @notes
535       leaf_page must be written to disk if retval > 0
536       anc_page  is not updated on disk. Caller should do this
537 
538    @return
539    @retval < 0   Error
540    @retval 0     OK.    leaf_buff is written to disk
541 
542    @retval 1     key contains key to upper level (from balance page)
543                  leaf_buff has underflow
544    @retval 2     key contains key to upper level (from split space)
545 */
546 
547 static int del(MARIA_HA *info, MARIA_KEY *key,
548                MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
549 	       uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
550 {
551   int ret_value,length;
552   uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
553   uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
554   uchar *anc_buff;
555   MARIA_KEY_PARAM s_temp;
556   MARIA_KEY tmp_key;
557   MARIA_SHARE *share= info->s;
558   MARIA_KEYDEF *keyinfo= key->keyinfo;
559   MARIA_KEY ret_key;
560   MARIA_PAGE next_page;
561   DBUG_ENTER("del");
562   DBUG_PRINT("enter",("leaf_page: %lu  keypos: %p",
563                       (ulong) (leaf_page->pos / share->block_size),
564 		      keypos));
565   DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
566 
567   page_flag=   leaf_page->flag;
568   leaf_length= leaf_page->size;
569   nod_flag=    leaf_page->node;
570 
571   endpos= leaf_page->buff + leaf_length;
572   tmp_key.keyinfo= keyinfo;
573   tmp_key.data=    keybuff;
574   next_buff= 0;
575 
576   if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
577     DBUG_RETURN(-1);
578 
579   if (nod_flag)
580   {
581     next_page.pos= _ma_kpos(nod_flag,endpos);
582     if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
583 					MARIA_MAX_KEY_BUFF*2)))
584       DBUG_RETURN(-1);
585     if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
586                           PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
587       ret_value= -1;
588     else
589     {
590       DBUG_DUMP("next_page", next_page.buff, next_page.size);
591       if ((ret_value= del(info, key, anc_page, &next_page,
592                           keypos, next_block, ret_key_buff)) >0)
593       {
594         /* Get new length after key was deleted */
595 	endpos= leaf_page->buff+ leaf_page->size;
596 	if (ret_value == 1)
597 	{
598           /* underflow writes "next_page" to disk */
599 	  ret_value= underflow(info, keyinfo, leaf_page, &next_page,
600                                endpos);
601           if (ret_value < 0)
602             goto err;
603 	  if (leaf_page->size > share->max_index_block_size)
604 	  {
605             DBUG_ASSERT(ret_value == 0);
606 	    ret_value= (_ma_split_page(info, key, leaf_page,
607                                        share->max_index_block_size,
608                                        (uchar*) 0, 0, 0,
609                                        ret_key_buff, 0) | 2);
610 	  }
611 	}
612 	else
613 	{
614           if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
615                                 DFLT_INIT_HITS))
616             goto err;
617 	  DBUG_PRINT("test",("Inserting of key when deleting"));
618 	  if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
619 	    goto err;
620 	  ret_value= _ma_insert(info, key, leaf_page, endpos,
621                                 tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
622                                 0);
623 	}
624       }
625       page_mark_changed(info, leaf_page);
626       /*
627         If ret_value <> 0, then leaf_page underflowed and caller will have
628         to handle underflow and write leaf_page to disk.
629         We can't write it here, as if leaf_page is empty we get an assert
630         in _ma_write_keypage.
631       */
632       if (ret_value == 0 && _ma_write_keypage(leaf_page,
633                                               PAGECACHE_LOCK_LEFT_WRITELOCKED,
634                                               DFLT_INIT_HITS))
635 	goto err;
636     }
637     my_afree(next_buff);
638     DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
639     DBUG_RETURN(ret_value);
640   }
641 
642   /*
643     Remove last key from leaf page
644     Note that leaf_page page may only have had one key (can normally only
645     happen in quick mode), in which ase it will now temporary have 0 keys
646     on it. This will be corrected by the caller as we will return 0.
647   */
648   new_leaf_length= (uint) (key_start - leaf_page->buff);
649   leaf_page->size= new_leaf_length;
650   page_store_size(share, leaf_page);
651 
652   if (share->now_transactional &&
653       _ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
654     goto err;
655 
656   page_mark_changed(info, leaf_page);           /* Safety */
657   if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
658                           (uint) keyinfo->underflow_block_length))
659   {
660     /* Underflow, leaf_page will be written by caller */
661     ret_value= 1;
662   }
663   else
664   {
665     ret_value= 0;
666     if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
667                           DFLT_INIT_HITS))
668       goto err;
669   }
670 
671   /* Place last key in ancestor page on deleted key position */
672   a_length= anc_page->size;
673   anc_buff= anc_page->buff;
674   endpos=   anc_buff + a_length;
675 
676   ret_key.keyinfo= keyinfo;
677   ret_key.data=    ret_key_buff;
678 
679   prev_key= 0;
680   if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
681   {
682     if (!_ma_get_last_key(&ret_key, anc_page, keypos))
683       goto err;
684     prev_key= ret_key.data;
685   }
686   length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
687                                keypos == endpos ? (uchar*) 0 : keypos,
688                                prev_key, prev_key,
689                                &s_temp);
690   if (length > 0)
691     bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
692   else
693     bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
694   (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
695   key_start= keypos;
696   if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
697                       SEARCH_PAGE_KEY_HAS_TRANSID))
698     _ma_mark_page_with_transid(share, anc_page);
699 
700   /* Save pointer to next leaf on parent page */
701   if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
702                            &keypos))
703     goto err;
704   _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
705   anc_page->size= a_length + length;
706   page_store_size(share, anc_page);
707 
708   if (share->now_transactional &&
709       _ma_log_add(anc_page, a_length,
710                   key_start, s_temp.changed_length, s_temp.move_length, 1,
711                   KEY_OP_DEBUG_LOG_ADD_2))
712     goto err;
713 
714   DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
715   DBUG_RETURN(new_leaf_length <=
716               (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
717                (uint) keyinfo->underflow_block_length));
718 err:
719   if (next_buff)
720     my_afree(next_buff);
721 
722   DBUG_RETURN(-1);
723 } /* del */
724 
725 
726 /**
727    @brief Balances adjacent pages if underflow occours
728 
729    @fn    underflow()
730    @param anc_buff        Anchestor page data
731    @param leaf_page       Leaf page (page that underflowed)
732    @param leaf_page_link  Pointer to pin information about leaf page
733    @param keypos          Position after current key in anc_buff
734 
735    @note
736      This function writes redo entries for all changes
737      leaf_page is saved to disk
738      Caller must save anc_buff
739 
740      For the algoritm to work, we have to ensure for packed keys that
741      key_length + (underflow_length + max_block_length + key_length) / 2
742      <= block_length.
743      From which follows that underflow_length <= block_length - key_length *3
744      For not packed keys we have:
745      (underflow_length + max_block_length + key_length) / 2 <= block_length
746      From which follows that underflow_length < block_length - key_length
747      This is ensured by setting of underflow_block_length.
748 
749    @return
750    @retval  0  ok
751    @retval  1  ok, but anc_page did underflow
752    @retval -1  error
753  */
754 
755 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
756 		     MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
757 		     uchar *keypos)
758 {
759   int t_length;
760   uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
761   uint next_buff_length, new_buff_length, key_reflength;
762   uint unchanged_leaf_length, new_leaf_length, new_anc_length;
763   uint anc_page_flag, page_flag;
764   uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF];
765   uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
766   uchar *anc_buff, *leaf_buff;
767   uchar *after_key, *anc_end_pos;
768   MARIA_KEY_PARAM key_deleted, key_inserted;
769   MARIA_SHARE *share= info->s;
770   my_bool first_key;
771   MARIA_KEY tmp_key, anc_key, leaf_key;
772   MARIA_PAGE next_page;
773   DBUG_ENTER("underflow");
774   DBUG_PRINT("enter",("leaf_page: %lu  keypos: %p",
775                       (ulong) (leaf_page->pos / share->block_size),
776 		      keypos));
777   DBUG_DUMP("anc_buff", anc_page->buff,  anc_page->size);
778   DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
779 
780   anc_page_flag= anc_page->flag;
781   anc_buff= anc_page->buff;
782   leaf_buff= leaf_page->buff;
783   info->keyread_buff_used=1;
784   next_keypos=keypos;
785   nod_flag= leaf_page->node;
786   p_length= nod_flag+share->keypage_header;
787   anc_length= anc_page->size;
788   leaf_length= leaf_page->size;
789   key_reflength= share->base.key_reflength;
790   if (share->keyinfo+info->lastinx == keyinfo)
791     info->page_changed=1;
792   first_key= keypos == anc_buff + share->keypage_header + key_reflength;
793 
794   tmp_key.data=  info->buff;
795   anc_key.data=  anc_key_buff;
796   leaf_key.data= leaf_key_buff;
797   tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;
798 
799   if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
800       first_key)
801   {
802     uint tmp_length;
803     uint next_page_flag;
804     /* Use page right of anc-page */
805     DBUG_PRINT("test",("use right page"));
806 
807     /*
808       Calculate position after the current key. Note that keydata itself is
809       not used
810     */
811     if (keyinfo->flag & HA_BINARY_PACK_KEY)
812     {
813       if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
814 	goto err;
815     }
816     else
817     {
818       /* Avoid length error check if packed key */
819       tmp_key.data[0]= tmp_key.data[1]= 0;
820       /* Got to end of found key */
821       if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
822                                &next_keypos))
823         goto err;
824     }
825     next_page.pos= _ma_kpos(key_reflength, next_keypos);
826     if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
827                           PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
828       goto err;
829     next_buff_length= next_page.size;
830     next_page_flag=   next_page.flag;
831     DBUG_DUMP("next", next_page.buff, next_page.size);
832 
833     /* find keys to make a big key-page */
834     bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
835           key_reflength);
836 
837     if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
838 	!_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
839       goto err;
840 
841     /* merge pages and put parting key from anc_page between */
842     prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
843     t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
844                                    prev_key, prev_key, &key_inserted);
845     tmp_length= next_buff_length - p_length;
846     endpos= next_page.buff + tmp_length + leaf_length + t_length;
847     /* next_page.buff will always be larger than before !*/
848     bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
849     memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
850     (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
851     buff_length= (uint) (endpos - next_page.buff);
852 
853     /* Set page flag from combination of both key pages and parting key */
854     page_flag= next_page_flag | leaf_page->flag;
855     if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
856                         SEARCH_PAGE_KEY_HAS_TRANSID))
857       page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
858 
859     next_page.size= buff_length;
860     next_page.flag= page_flag;
861     page_store_info(share, &next_page);
862 
863     /* remove key from anc_page */
864     if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
865                               anc_key_buff, anc_buff+anc_length,
866                               (my_off_t *) 0, &key_deleted)))
867       goto err;
868 
869     new_anc_length= anc_length - s_length;
870     anc_page->size= new_anc_length;
871     page_store_size(share, anc_page);
872 
873     if (buff_length <= share->max_index_block_size)
874     {
875       /* All keys fitted into one page */
876       page_mark_changed(info, &next_page);
877       if (_ma_dispose(info, next_page.pos, 0))
878        goto err;
879 
880       memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
881       leaf_page->size= next_page.size;
882       leaf_page->flag= next_page.flag;
883 
884       if (share->now_transactional)
885       {
886         /*
887           Log changes to parent page. Note that this page may have been
888           temporarily bigger than block_size.
889          */
890         if (_ma_log_delete(anc_page, key_deleted.key_pos,
891                            key_deleted.changed_length,
892                            key_deleted.move_length,
893                            anc_length - anc_page->org_size,
894                            KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
895           goto err;
896         /*
897           Log changes to leaf page. Data for leaf page is in leaf_buff
898           which contains original leaf_buff, parting key and next_buff
899         */
900         if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
901           goto err;
902       }
903     }
904     else
905     {
906       /*
907         Balancing didn't free a page, so we have to split 'buff' into two
908         pages:
909         - Find key in middle of buffer
910         - Store everything before key in 'leaf_page'
911         - Pack key into anc_page at position of deleted key
912           Note that anc_page may overflow! (is handled by caller)
913         - Store remaining keys in next_page (buff)
914       */
915       MARIA_KEY_PARAM anc_key_inserted;
916 
917       anc_end_pos= anc_buff + new_anc_length;
918 
919       DBUG_PRINT("test",("anc_buff:%p anc_end_pos:%p",
920                          anc_buff, anc_end_pos));
921 
922       if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
923 	goto err;
924       if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
925 	goto err;
926       new_leaf_length= (uint) (half_pos - next_page.buff);
927       memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);
928 
929       leaf_page->size= new_leaf_length;
930       leaf_page->flag= page_flag;
931       page_store_info(share, leaf_page);
932 
933       /* Correct new keypointer to leaf_page */
934       half_pos=after_key;
935       _ma_kpointer(info,
936                    leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
937                    next_page.pos);
938 
939       /* Save key in anc_page */
940       prev_key= (first_key  ? (uchar*) 0 : anc_key.data);
941       t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
942                                      (keypos == anc_end_pos ? (uchar*) 0 :
943                                       keypos),
944                                      prev_key, prev_key, &anc_key_inserted);
945       if (t_length >= 0)
946 	bmove_upp(anc_end_pos+t_length, anc_end_pos,
947                   (uint) (anc_end_pos - keypos));
948       else
949 	bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
950       (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
951       new_anc_length+= t_length;
952       anc_page->size= new_anc_length;
953       page_store_size(share, anc_page);
954 
955       if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
956                            SEARCH_PAGE_KEY_HAS_TRANSID))
957         _ma_mark_page_with_transid(share, anc_page);
958 
959       /* Store key first in new page */
960       if (nod_flag)
961 	bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
962               (size_t) nod_flag);
963       if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
964 	goto err;
965       t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
966 					  (uchar*) 0, (uchar*) 0,
967 					  &key_inserted);
968       /* t_length will always be > 0 for a new page !*/
969       tmp_length= (uint) ((next_page.buff + buff_length) - half_pos);
970       bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
971       (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
972       new_buff_length= tmp_length + t_length + p_length;
973       next_page.size= new_buff_length;
974       page_store_size(share, &next_page);
975       /* keypage flag is already up to date */
976 
977       if (share->now_transactional)
978       {
979         /*
980           Log changes to parent page
981           This has one key deleted from it and one key inserted to it at
982           keypos
983 
984           ma_log_add ensures that we don't log changes that is outside of
985           key block size, as the REDO code can't handle that
986         */
987         if (_ma_log_add(anc_page, anc_length, keypos,
988                         anc_key_inserted.move_length +
989                         MY_MAX(anc_key_inserted.changed_length -
990                             anc_key_inserted.move_length,
991                             key_deleted.changed_length),
992                         anc_key_inserted.move_length -
993                         key_deleted.move_length, 1,
994                         KEY_OP_DEBUG_LOG_ADD_3))
995           goto err;
996 
997         /*
998           Log changes to leaf page.
999           This contains original data with new data added at end
1000         */
1001         DBUG_ASSERT(leaf_length <= new_leaf_length);
1002         if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
1003           goto err;
1004         /*
1005           Log changes to next page
1006 
1007           This contains original data with some prefix data deleted and
1008           some compressed data at start possible extended
1009 
1010           Data in buff was originally:
1011           org_leaf_buff     [leaf_length]
1012           separator_key     [buff_key_inserted.move_length]
1013           next_key_changes  [buff_key_inserted.changed_length -move_length]
1014           next_page_data    [next_buff_length - p_length -
1015                             (buff_key_inserted.changed_length -move_length)]
1016 
1017           After changes it's now:
1018           unpacked_key      [key_inserted.changed_length]
1019           next_suffix       [next_buff_length - key_inserted.changed_length]
1020 
1021         */
1022         DBUG_ASSERT(new_buff_length <= next_buff_length);
1023         if (_ma_log_prefix(&next_page, key_inserted.changed_length,
1024                            (int) (new_buff_length - next_buff_length),
1025                            KEY_OP_DEBUG_LOG_PREFIX_1))
1026           goto err;
1027       }
1028       page_mark_changed(info, &next_page);
1029       if (_ma_write_keypage(&next_page,
1030                             PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1031 	goto err;
1032     }
1033 
1034     page_mark_changed(info, leaf_page);
1035     if (_ma_write_keypage(leaf_page,
1036                           PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1037       goto err;
1038     DBUG_RETURN(new_anc_length <=
1039                 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1040                   (uint) keyinfo->underflow_block_length)));
1041   }
1042 
1043   DBUG_PRINT("test",("use left page"));
1044 
1045   keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
1046   if (!keypos)
1047     goto err;
1048   next_page.pos= _ma_kpos(key_reflength,keypos);
1049   if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1050                         PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
1051     goto err;
1052   buff_length= next_page.size;
1053   endpos= next_page.buff + buff_length;
1054   DBUG_DUMP("prev", next_page.buff, next_page.size);
1055 
1056   /* find keys to make a big key-page */
1057   bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1058         key_reflength);
1059   next_keypos=keypos;
1060   if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
1061                            &next_keypos))
1062     goto err;
1063   if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
1064     goto err;
1065 
1066   /* merge pages and put parting key from anc_page between */
1067   prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
1068   t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1069 				(leaf_length == p_length ?
1070                                  (uchar*) 0 : leaf_buff+p_length),
1071 				prev_key, prev_key,
1072 				&key_inserted);
1073   if (t_length >= 0)
1074     bmove(endpos+t_length, leaf_buff+p_length,
1075           (size_t) (leaf_length-p_length));
1076   else						/* We gained space */
1077     bmove(endpos,leaf_buff+((int) p_length-t_length),
1078 	  (size_t) (leaf_length-p_length+t_length));
1079   (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1080 
1081   /* Remember for logging how many bytes of leaf_buff that are not changed */
1082   DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
1083   unchanged_leaf_length= (leaf_length - p_length -
1084                           (key_inserted.changed_length -
1085                            key_inserted.move_length));
1086 
1087   new_buff_length= buff_length + leaf_length - p_length + t_length;
1088 
1089 #ifdef EXTRA_DEBUG
1090   /* Ensure that unchanged_leaf_length is correct */
1091   DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
1092                    leaf_buff + leaf_length - unchanged_leaf_length,
1093                    unchanged_leaf_length) == 0);
1094 #endif
1095 
1096   page_flag= next_page.flag | leaf_page->flag;
1097   if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1098                        SEARCH_PAGE_KEY_HAS_TRANSID))
1099     page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
1100 
1101   next_page.size= new_buff_length;
1102   next_page.flag= page_flag;
1103   page_store_info(share, &next_page);
1104 
1105   /* remove key from anc_page */
1106   if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
1107                              anc_key_buff,
1108                              anc_buff+anc_length, (my_off_t *) 0,
1109                              &key_deleted)))
1110     goto err;
1111 
1112   new_anc_length= anc_length - s_length;
1113   anc_page->size= new_anc_length;
1114   page_store_size(share, anc_page);
1115 
1116   if (new_buff_length <= share->max_index_block_size)
1117   {
1118     /* All keys fitted into one page */
1119     page_mark_changed(info, leaf_page);
1120     if (_ma_dispose(info, leaf_page->pos, 0))
1121       goto err;
1122 
1123     if (share->now_transactional)
1124     {
1125       /*
1126         Log changes to parent page. Note that this page may have been
1127         temporarily bigger than block_size.
1128       */
1129       if (_ma_log_delete(anc_page, key_deleted.key_pos,
1130                          key_deleted.changed_length, key_deleted.move_length,
1131                          anc_length - anc_page->org_size,
1132                          KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
1133         goto err;
1134       /*
1135         Log changes to next page. Data for leaf page is in buff
1136         that contains original leaf_buff, parting key and next_buff
1137       */
1138       if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1139         goto err;
1140     }
1141   }
1142   else
1143   {
1144     /*
1145       Balancing didn't free a page, so we have to split 'next_page' into two
1146       pages
1147       - Find key in middle of buffer (buff)
1148       - Pack key at half_buff into anc_page at position of deleted key
1149         Note that anc_page may overflow! (is handled by caller)
1150       - Move everything after middlekey to 'leaf_buff'
1151       - Shorten buff at 'endpos'
1152     */
1153     MARIA_KEY_PARAM anc_key_inserted;
1154     size_t tmp_length;
1155 
1156     if (keypos == anc_buff + share->keypage_header + key_reflength)
1157       anc_pos= 0;				/* First key */
1158     else
1159     {
1160       if (!_ma_get_last_key(&anc_key, anc_page, keypos))
1161         goto err;
1162       anc_pos= anc_key.data;
1163     }
1164     if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
1165       goto err;
1166 
1167     /* Correct new keypointer to leaf_page */
1168     _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
1169                  leaf_key.ref_length, leaf_page->pos);
1170 
1171     /* Save parting key found by _ma_find_half_pos() in anc_page */
1172     DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1173     DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1174     anc_end_pos= anc_buff + new_anc_length;
1175     t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1176 				  keypos == anc_end_pos ? (uchar*) 0
1177 				  : keypos,
1178 				  anc_pos, anc_pos,
1179 				  &anc_key_inserted);
1180     if (t_length >= 0)
1181       bmove_upp(anc_end_pos+t_length, anc_end_pos,
1182                 (uint) (anc_end_pos-keypos));
1183     else
1184       bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
1185     (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1186     new_anc_length+= t_length;
1187     anc_page->size= new_anc_length;
1188     page_store_size(share, anc_page);
1189 
1190     if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1191                          SEARCH_PAGE_KEY_HAS_TRANSID))
1192       _ma_mark_page_with_transid(share, anc_page);
1193 
1194     /* Store first key on new page */
1195     if (nod_flag)
1196       bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1197             (size_t) nod_flag);
1198     if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1199       goto err;
1200     DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
1201     t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1202 				  (uchar*) 0, (uchar*) 0, &key_inserted);
1203     /* t_length will always be > 0 for a new page !*/
1204     tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
1205     DBUG_PRINT("info",("t_length: %d  length: %d",t_length, (int) tmp_length));
1206     bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1207     (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1208     new_leaf_length= (uint)(tmp_length + t_length + p_length);
1209     DBUG_ASSERT(new_leaf_length <= share->max_index_block_size);
1210 
1211     leaf_page->size= new_leaf_length;
1212     leaf_page->flag= page_flag;
1213     page_store_info(share, leaf_page);
1214 
1215     new_buff_length= (uint) (endpos - next_page.buff);
1216     next_page.size= new_buff_length;
1217     page_store_size(share, &next_page);
1218 
1219     if (share->now_transactional)
1220     {
1221       /*
1222         Log changes to parent page
1223         This has one key deleted from it and one key inserted to it at
1224         keypos
1225 
1226         ma_log_add() ensures that we don't log changes that is outside of
1227         key block size, as the REDO code can't handle that
1228       */
1229       if (_ma_log_add(anc_page, anc_length, keypos,
1230                       anc_key_inserted.move_length +
1231                       MY_MAX(anc_key_inserted.changed_length -
1232                           anc_key_inserted.move_length,
1233                           key_deleted.changed_length),
1234                       anc_key_inserted.move_length -
1235                       key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4))
1236         goto err;
1237 
1238       /*
1239         Log changes to leaf page.
1240         This contains original data with new data added first
1241       */
1242       DBUG_ASSERT(leaf_length <= new_leaf_length);
1243       DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
1244       if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
1245                          (int) (new_leaf_length - leaf_length),
1246                          KEY_OP_DEBUG_LOG_PREFIX_2))
1247         goto err;
1248       /*
1249         Log changes to next page
1250         This contains original data with some suffix data deleted
1251       */
1252       DBUG_ASSERT(new_buff_length <= buff_length);
1253       if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1254         goto err;
1255     }
1256 
1257     page_mark_changed(info, leaf_page);
1258     if (_ma_write_keypage(leaf_page,
1259                           PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1260       goto err;
1261   }
1262   page_mark_changed(info, &next_page);
1263   if (_ma_write_keypage(&next_page,
1264                         PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1265     goto err;
1266 
1267   DBUG_RETURN(new_anc_length <=
1268               ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1269                 (uint) keyinfo->underflow_block_length)));
1270 
1271 err:
1272   DBUG_RETURN(-1);
1273 } /* underflow */
1274 
1275 
1276 /**
1277   @brief Remove a key from page
1278 
1279   @fn remove_key()
1280     keyinfo	          Key handle
1281     nod_flag              Length of node ptr
1282     keypos	          Where on page key starts
1283     lastkey	          Buffer for storing keys to be removed
1284     page_end	          Pointer to end of page
1285     next_block	          If <> 0 and node-page, this is set to address of
1286     		          next page
1287     s_temp	          Information about what changes was done one the page:
1288     s_temp.key_pos        Start of key
1289     s_temp.move_length    Number of bytes removed at keypos
1290     s_temp.changed_length Number of bytes changed at keypos
1291 
1292   @todo
1293     The current code doesn't handle the case that the next key may be
1294     packed better against the previous key if there is a case difference
1295 
1296   @return
1297   @retval 0  error
1298   @retval #  How many chars was removed
1299 */
1300 
1301 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1302 		       uchar *keypos, uchar *lastkey,
1303 		       uchar *page_end, my_off_t *next_block,
1304                        MARIA_KEY_PARAM *s_temp)
1305 {
1306   int s_length;
1307   uchar *start;
1308   DBUG_ENTER("remove_key");
1309   DBUG_PRINT("enter", ("keypos:%p page_end: %p",
1310                        keypos, page_end));
1311 
1312   start= s_temp->key_pos= keypos;
1313   s_temp->changed_length= 0;
1314   if (!(keyinfo->flag &
1315 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1316 	 HA_BINARY_PACK_KEY)) &&
1317       !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1318   {
1319     /* Static length key */
1320     s_length=(int) (keyinfo->keylength+nod_flag);
1321     if (next_block && nod_flag)
1322       *next_block= _ma_kpos(nod_flag,keypos+s_length);
1323   }
1324   else
1325   {
1326     /* Let keypos point at next key */
1327     MARIA_KEY key;
1328 
1329     /* Calculate length of key */
1330     key.keyinfo= keyinfo;
1331     key.data=    lastkey;
1332     if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1333       DBUG_RETURN(0);				/* Error */
1334 
1335     if (next_block && nod_flag)
1336       *next_block= _ma_kpos(nod_flag,keypos);
1337     s_length=(int) (keypos-start);
1338     if (keypos != page_end)
1339     {
1340       if (keyinfo->flag & HA_BINARY_PACK_KEY)
1341       {
1342 	uchar *old_key= start;
1343 	uint next_length,prev_length,prev_pack_length;
1344 
1345         /* keypos points here on start of next key */
1346 	get_key_length(next_length,keypos);
1347 	get_key_pack_length(prev_length,prev_pack_length,old_key);
1348 	if (next_length > prev_length)
1349 	{
1350           uint diff= (next_length-prev_length);
1351 	  /* We have to copy data from the current key to the next key */
1352 	  keypos-= diff + prev_pack_length;
1353 	  store_key_length(keypos, prev_length);
1354           bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1355 	  s_length=(int) (keypos-start);
1356           s_temp->changed_length= diff + prev_pack_length;
1357 	}
1358       }
1359       else
1360       {
1361 	/* Check if a variable length first key part */
1362 	if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
1363 	{
1364 	  /* Next key is packed against the current one */
1365 	  uint next_length,prev_length,prev_pack_length,lastkey_length,
1366 	    rest_length;
1367 	  if (keyinfo->seg[0].length >= 127)
1368 	  {
1369 	    if (!(prev_length=mi_uint2korr(start) & 32767))
1370 	      goto end;
1371 	    next_length=mi_uint2korr(keypos) & 32767;
1372 	    keypos+=2;
1373 	    prev_pack_length=2;
1374 	  }
1375 	  else
1376 	  {
1377 	    if (!(prev_length= *start & 127))
1378 	      goto end;				/* Same key as previous*/
1379 	    next_length= *keypos & 127;
1380 	    keypos++;
1381 	    prev_pack_length=1;
1382 	  }
1383 	  if (!(*start & 128))
1384 	    prev_length=0;			/* prev key not packed */
1385 	  if (keyinfo->seg[0].flag & HA_NULL_PART)
1386 	    lastkey++;				/* Skip null marker */
1387 	  get_key_length(lastkey_length,lastkey);
1388 	  if (!next_length)			/* Same key after */
1389 	  {
1390 	    next_length=lastkey_length;
1391 	    rest_length=0;
1392 	  }
1393 	  else
1394 	    get_key_length(rest_length,keypos);
1395 
1396 	  if (next_length >= prev_length)
1397 	  {
1398             /* Next key is based on deleted key */
1399             uint pack_length;
1400             uint diff= (next_length-prev_length);
1401 
1402             /* keypos points to data of next key (after key length) */
1403 	    bmove(keypos - diff, lastkey + prev_length, diff);
1404 	    rest_length+= diff;
1405 	    pack_length= prev_length ? get_pack_length(rest_length): 0;
1406 	    keypos-= diff + pack_length + prev_pack_length;
1407 	    s_length=(int) (keypos-start);
1408 	    if (prev_length)			/* Pack against prev key */
1409 	    {
1410 	      *keypos++= start[0];
1411 	      if (prev_pack_length == 2)
1412 		*keypos++= start[1];
1413 	      store_key_length(keypos,rest_length);
1414 	    }
1415 	    else
1416 	    {
1417 	      /* Next key is not packed anymore */
1418 	      if (keyinfo->seg[0].flag & HA_NULL_PART)
1419 	      {
1420 		rest_length++;			/* Mark not null */
1421 	      }
1422 	      if (prev_pack_length == 2)
1423 	      {
1424 		mi_int2store(keypos,rest_length);
1425 	      }
1426 	      else
1427 		*keypos= rest_length;
1428 	    }
1429             s_temp->changed_length= diff + pack_length + prev_pack_length;
1430 	  }
1431 	}
1432       }
1433     }
1434   }
1435   end:
1436   bmove(start, start+s_length, (uint) (page_end-start-s_length));
1437   s_temp->move_length= s_length;
1438   DBUG_RETURN((uint) s_length);
1439 } /* remove_key */
1440 
1441 
1442 /****************************************************************************
1443   Logging of redos
1444 ****************************************************************************/
1445 
1446 /**
1447    @brief
1448    log entry where some parts are deleted and some things are changed
1449    and some data could be added last.
1450 
1451    @fn _ma_log_delete()
1452    @param info		  Maria handler
1453    @param page	          Pageaddress for changed page
1454    @param buff		  Page buffer
1455    @param key_pos         Start of change area
1456    @param changed_length  How many bytes where changed at key_pos
1457    @param move_length     How many bytes where deleted at key_pos
1458    @param append_length	  Length of data added last
1459 		          This is taken from end of ma_page->buff
1460 
1461    This is mainly used when a key is deleted. The append happens
1462    when we delete a key from a page with data > block_size kept in
1463    memory and we have to add back the data that was stored > block_size
1464 */
1465 
1466 my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
1467                        uint changed_length, uint move_length,
1468                        uint append_length __attribute__((unused)),
1469                        enum en_key_debug debug_marker __attribute__((unused)))
1470 {
1471   LSN lsn;
1472   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7];
1473   uchar *log_pos;
1474   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
1475   uint translog_parts, current_size, extra_length;
1476   uint offset= (uint) (key_pos - ma_page->buff);
1477   MARIA_HA *info= ma_page->info;
1478   MARIA_SHARE *share= info->s;
1479   my_off_t page= ma_page->pos / share->block_size;
1480   DBUG_ENTER("_ma_log_delete");
1481   DBUG_PRINT("enter", ("page: %lu  offset: %u  changed_length: %u  move_length: %u  append_length: %u  page_size: %u",
1482                        (ulong) page, offset, changed_length, move_length,
1483                        append_length, ma_page->size));
1484   DBUG_ASSERT(share->now_transactional && move_length);
1485   DBUG_ASSERT(offset + changed_length <= ma_page->size);
1486   DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
1487   DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
1488 
1489   /* Store address of new root page */
1490   page_store(log_data + FILEID_STORE_SIZE, page);
1491   log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1492   current_size= ma_page->org_size;
1493 
1494 #ifdef EXTRA_DEBUG_KEY_CHANGES
1495   *log_pos++= KEY_OP_DEBUG;
1496   *log_pos++= debug_marker;
1497 
1498   *log_pos++= KEY_OP_DEBUG_2;
1499   int2store(log_pos,   ma_page->org_size);
1500   int2store(log_pos+2, ma_page->size);
1501   log_pos+=4;
1502 #endif
1503 
1504   /* Store keypage_flag */
1505   *log_pos++= KEY_OP_SET_PAGEFLAG;
1506   *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
1507 
1508   log_pos[0]= KEY_OP_OFFSET;
1509   int2store(log_pos+1, offset);
1510   log_pos+= 3;
1511   translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
1512   extra_length= 0;
1513 
1514   if (changed_length)
1515   {
1516     if (offset + changed_length >= share->max_index_block_size)
1517     {
1518       changed_length= share->max_index_block_size - offset;
1519       move_length= 0;                           /* Nothing to move */
1520       current_size= share->max_index_block_size;
1521     }
1522 
1523     log_pos[0]= KEY_OP_CHANGE;
1524     int2store(log_pos+1, changed_length);
1525     log_pos+= 3;
1526     log_array[translog_parts].str=    ma_page->buff + offset;
1527     log_array[translog_parts].length= changed_length;
1528     translog_parts++;
1529 
1530     /* We only have to move things after offset+changed_length */
1531     offset+= changed_length;
1532   }
1533 
1534   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
1535   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1536 
1537   if (move_length)
1538   {
1539     uint log_length;
1540     if (offset + move_length < share->max_index_block_size)
1541     {
1542       /*
1543         Move down things that is on page.
1544         page_offset in apply_redo_inxed() will be at original offset
1545         + changed_length.
1546       */
1547       log_pos[0]= KEY_OP_SHIFT;
1548       int2store(log_pos+1, - (int) move_length);
1549       log_length= 3;
1550       current_size-= move_length;
1551     }
1552     else
1553     {
1554       /* Delete to end of page */
1555       uint tmp= current_size - offset;
1556       current_size= offset;
1557       log_pos[0]= KEY_OP_DEL_SUFFIX;
1558       int2store(log_pos+1, tmp);
1559       log_length= 3;
1560     }
1561     log_array[translog_parts].str=    log_pos;
1562     log_array[translog_parts].length= log_length;
1563     translog_parts++;
1564     log_pos+= log_length;
1565     extra_length+= log_length;
1566   }
1567 
1568   if (current_size != ma_page->size &&
1569       current_size != share->max_index_block_size)
1570   {
1571     /* Append data that didn't fit on the page before */
1572     uint length= (MY_MIN(ma_page->size, share->max_index_block_size) -
1573                   current_size);
1574     uchar *data= ma_page->buff + current_size;
1575 
1576     DBUG_ASSERT(length <= append_length);
1577 
1578     log_pos[0]= KEY_OP_ADD_SUFFIX;
1579     int2store(log_pos+1, length);
1580     log_array[translog_parts].str=        log_pos;
1581     log_array[translog_parts].length=     3;
1582     log_array[translog_parts + 1].str=    data;
1583     log_array[translog_parts + 1].length= length;
1584     log_pos+= 3;
1585     translog_parts+= 2;
1586     current_size+= length;
1587     extra_length+= 3 + length;
1588   }
1589 
1590   _ma_log_key_changes(ma_page,
1591                       log_array + translog_parts,
1592                       log_pos, &extra_length, &translog_parts);
1593   /* Remember new page length for future log entires for same page */
1594   ma_page->org_size= current_size;
1595 
1596   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1597                             info->trn, info,
1598                             (translog_size_t)
1599                             log_array[TRANSLOG_INTERNAL_PARTS].length +
1600                             changed_length + extra_length, translog_parts,
1601                             log_array, log_data, NULL))
1602     DBUG_RETURN(1);
1603 
1604   DBUG_RETURN(0);
1605 }
1606 
1607 
1608 /****************************************************************************
1609   Logging of undos
1610 ****************************************************************************/
1611 
1612 my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
1613                                   my_off_t new_root, LSN *res_lsn)
1614 {
1615   MARIA_SHARE *share= info->s;
1616   uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1617                  KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
1618   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1619   struct st_msg_to_write_hook_for_undo_key msg;
1620   enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1621   uint keynr= key->keyinfo->key_nr;
1622 
1623   lsn_store(log_data, info->trn->undo_lsn);
1624   key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
1625   log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
1626 
1627   /**
1628     @todo BUG if we had concurrent insert/deletes, reading state's key_root
1629     like this would be unsafe.
1630   */
1631   if (new_root != share->state.key_root[keynr])
1632   {
1633     my_off_t page;
1634     page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1635            new_root / share->block_size);
1636     page_store(log_pos, page);
1637     log_pos+= PAGE_STORE_SIZE;
1638     log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
1639   }
1640 
1641   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
1642   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1643   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key->data;
1644   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
1645                                                   key->ref_length);
1646 
1647   msg.root= &share->state.key_root[keynr];
1648   msg.value= new_root;
1649   /*
1650     set autoincrement to 1 if this is an auto_increment key
1651     This is only used if we are now in a rollback of a duplicate key
1652   */
1653   msg.auto_increment= share->base.auto_key == keynr + 1;
1654 
1655   return translog_write_record(res_lsn, log_type,
1656                                info->trn, info,
1657                                (translog_size_t)
1658                                (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1659                                 log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
1660                                TRANSLOG_INTERNAL_PARTS + 2, log_array,
1661                                log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1662 }
1663