1 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 Copyright (C) 2009-2010 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "ma_fulltext.h"
18 #include "ma_rt_index.h"
19 #include "trnman.h"
20 #include "ma_key_recover.h"
21
22 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
23 MARIA_PAGE *page);
24 static int del(MARIA_HA *info, MARIA_KEY *key,
25 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
26 uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
27 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
28 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
29 uchar *keypos);
30 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
31 uchar *keypos, uchar *lastkey, uchar *page_end,
32 my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
33
34 /* @breif Remove a row from a MARIA table */
35
maria_delete(MARIA_HA * info,const uchar * record)36 int maria_delete(MARIA_HA *info,const uchar *record)
37 {
38 uint i;
39 uchar *old_key;
40 int save_errno;
41 char lastpos[8];
42 MARIA_SHARE *share= info->s;
43 MARIA_KEYDEF *keyinfo;
44 DBUG_ENTER("maria_delete");
45
46 /* Test if record is in datafile */
47 DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
48 maria_print_error(share, HA_ERR_CRASHED);
49 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
50 DBUG_EXECUTE_IF("my_error_test_undefined_error",
51 maria_print_error(share, INT_MAX);
52 DBUG_RETURN(my_errno= INT_MAX););
53 if (!(info->update & HA_STATE_AKTIV))
54 {
55 DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND); /* No database read */
56 }
57 if (share->options & HA_OPTION_READ_ONLY_DATA)
58 {
59 DBUG_RETURN(my_errno=EACCES);
60 }
61 if (_ma_readinfo(info,F_WRLCK,1))
62 DBUG_RETURN(my_errno);
63 if ((*share->compare_record)(info,record))
64 goto err; /* Error on read-check */
65
66 if (_ma_mark_file_changed(share))
67 goto err;
68
69 /* Ensure we don't change the autoincrement value */
70 info->last_auto_increment= ~(ulonglong) 0;
71 /* Remove all keys from the index file */
72
73 old_key= info->lastkey_buff2;
74
75 for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
76 {
77 if (maria_is_key_active(share->state.key_map, i))
78 {
79 keyinfo->version++;
80 if (keyinfo->flag & HA_FULLTEXT)
81 {
82 if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
83 goto err;
84 }
85 else
86 {
87 MARIA_KEY key;
88 if (keyinfo->ck_delete(info,
89 (*keyinfo->make_key)(info, &key, i, old_key,
90 record,
91 info->cur_row.lastpos,
92 info->cur_row.trid)))
93 goto err;
94 }
95 /* The above changed info->lastkey2. Inform maria_rnext_same(). */
96 info->update&= ~HA_STATE_RNEXT_SAME;
97 }
98 }
99
100 if (share->calc_checksum)
101 {
102 /*
103 We can't use the row based checksum as this doesn't have enough
104 precision.
105 */
106 info->cur_row.checksum= (*share->calc_checksum)(info, record);
107 }
108
109 if ((*share->delete_record)(info, record))
110 goto err; /* Remove record from database */
111
112 info->state->checksum-= info->cur_row.checksum;
113 info->state->records--;
114 info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
115 info->row_changes++;
116 share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
117 STATE_NOT_ZEROFILLED);
118 info->state->changed=1;
119
120 mi_sizestore(lastpos, info->cur_row.lastpos);
121 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
122 if (info->invalidator != 0)
123 {
124 DBUG_PRINT("info", ("invalidator... '%s' (delete)",
125 share->open_file_name.str));
126 (*info->invalidator)(share->open_file_name.str);
127 info->invalidator=0;
128 }
129 DBUG_RETURN(0);
130
131 err:
132 save_errno= my_errno;
133 DBUG_ASSERT(save_errno);
134 if (!save_errno)
135 save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
136
137 mi_sizestore(lastpos, info->cur_row.lastpos);
138 (void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
139 info->update|=HA_STATE_WRITTEN; /* Buffer changed */
140 if (save_errno != HA_ERR_RECORD_CHANGED)
141 {
142 _ma_set_fatal_error(share, HA_ERR_CRASHED);
143 save_errno= HA_ERR_CRASHED;
144 }
145 DBUG_RETURN(my_errno= save_errno);
146 } /* maria_delete */
147
148
149 /*
150 Remove a key from the btree index
151
152 TODO:
153 Change ma_ck_real_delete() to use another buffer for changed keys instead
154 of key->data. This would allows us to remove the copying of the key here.
155 */
156
_ma_ck_delete(MARIA_HA * info,MARIA_KEY * key)157 my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
158 {
159 MARIA_SHARE *share= info->s;
160 int res;
161 my_bool buff_alloced;
162 LSN lsn= LSN_IMPOSSIBLE;
163 my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
164 uchar *key_buff, *save_key_data;
165 MARIA_KEY org_key;
166 DBUG_ENTER("_ma_ck_delete");
167
168 LINT_INIT_STRUCT(org_key);
169
170 alloc_on_stack(*info->stack_end_ptr, key_buff, buff_alloced,
171 key->keyinfo->max_store_length);
172 if (!key_buff)
173 DBUG_RETURN(1);
174
175 save_key_data= key->data;
176 if (share->now_transactional)
177 {
178 /* Save original value as the key may change */
179 memcpy(key_buff, key->data, key->data_length + key->ref_length);
180 org_key= *key;
181 key->data= key_buff;
182 }
183
184 if ((res= _ma_ck_real_delete(info, key, &new_root)))
185 {
186 /* We have to mark the table crashed before unpin_all_pages() */
187 maria_mark_crashed(info);
188 }
189
190 key->data= save_key_data;
191 if (!res && share->now_transactional)
192 res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
193 else
194 {
195 share->state.key_root[key->keyinfo->key_nr]= new_root;
196 _ma_fast_unlock_key_del(info);
197 }
198 _ma_unpin_all_pages_and_finalize_row(info, lsn);
199
200 stack_alloc_free(key_buff, buff_alloced);
201 DBUG_RETURN(res != 0);
202 } /* _ma_ck_delete */
203
204
_ma_ck_real_delete(register MARIA_HA * info,MARIA_KEY * key,my_off_t * root)205 my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
206 my_off_t *root)
207 {
208 int error;
209 my_bool result= 0, buff_alloced;
210 my_off_t old_root;
211 uchar *root_buff;
212 MARIA_KEYDEF *keyinfo= key->keyinfo;
213 MARIA_PAGE page;
214 DBUG_ENTER("_ma_ck_real_delete");
215
216 if ((old_root=*root) == HA_OFFSET_ERROR)
217 {
218 _ma_set_fatal_error(info->s, HA_ERR_CRASHED);
219 DBUG_RETURN(1);
220 }
221
222 alloc_on_stack(*info->stack_end_ptr, root_buff, buff_alloced,
223 (keyinfo->block_length + keyinfo->max_store_length*2));
224 if (!root_buff)
225 DBUG_RETURN(1);
226
227 DBUG_PRINT("info",("root_page: %lu",
228 (ulong) (old_root / keyinfo->block_length)));
229 if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
230 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
231 {
232 result= 1;
233 goto err;
234 }
235 if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
236 SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
237 SEARCH_SAME),
238 &page)))
239 {
240 if (error < 0)
241 result= 1;
242 else if (error == 2)
243 {
244 DBUG_PRINT("test",("Enlarging of root when deleting"));
245 if (_ma_enlarge_root(info, key, root))
246 result= 1;
247 }
248 else /* error == 1 */
249 {
250 MARIA_SHARE *share= info->s;
251
252 page_mark_changed(info, &page);
253
254 if (page.size <= page.node + share->keypage_header + 1)
255 {
256 DBUG_ASSERT(page.size == page.node + share->keypage_header);
257 if (page.node)
258 *root= _ma_kpos(page.node, root_buff +share->keypage_header +
259 page.node);
260 else
261 *root=HA_OFFSET_ERROR;
262 if (_ma_dispose(info, old_root, 0))
263 result= 1;
264 }
265 else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
266 DFLT_INIT_HITS))
267 result= 1;
268 }
269 }
270 err:
271 stack_alloc_free(root_buff, buff_alloced);
272 DBUG_PRINT("exit",("Return: %d",result));
273 DBUG_RETURN(result);
274 } /* _ma_ck_real_delete */
275
276
277 /**
278 @brief Remove key below key root
279
280 @param key Key to delete. Will contain new key if block was enlarged
281
282 @return
283 @retval 0 ok (anc_page is not changed)
284 @retval 1 If data on page is too small; In this case anc_buff is not saved
285 @retval 2 If data on page is too big
286 @retval -1 On errors
287 */
288
d_search(MARIA_HA * info,MARIA_KEY * key,uint32 comp_flag,MARIA_PAGE * anc_page)289 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
290 MARIA_PAGE *anc_page)
291 {
292 int flag,ret_value,save_flag;
293 uint nod_flag, page_flag;
294 my_bool last_key, buff_alloced= 0, lastkey_alloced;
295 uchar *leaf_buff=0, *keypos, *lastkey;
296 MARIA_KEY_PARAM s_temp;
297 MARIA_SHARE *share= info->s;
298 MARIA_KEYDEF *keyinfo= key->keyinfo;
299 MARIA_PAGE leaf_page;
300 DBUG_ENTER("d_search");
301 DBUG_DUMP("page", anc_page->buff, anc_page->size);
302
303 alloc_on_stack(*info->stack_end_ptr, lastkey, lastkey_alloced,
304 keyinfo->max_store_length);
305 if (!lastkey)
306 DBUG_RETURN(1);
307
308 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
309 &last_key);
310 if (flag == MARIA_FOUND_WRONG_KEY)
311 {
312 DBUG_PRINT("error",("Found wrong key"));
313 goto err;
314 }
315 page_flag= anc_page->flag;
316 nod_flag= anc_page->node;
317
318 if (!flag && (keyinfo->flag & HA_FULLTEXT))
319 {
320 uint off;
321 int subkeys;
322
323 get_key_full_length_rdonly(off, lastkey);
324 subkeys=ft_sintXkorr(lastkey+off);
325 DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
326 comp_flag=SEARCH_SAME;
327 if (subkeys >= 0)
328 {
329 /* normal word, one-level tree structure */
330 if (info->ft1_to_ft2)
331 {
332 /* we're in ft1->ft2 conversion mode. Saving key data */
333 insert_dynamic(info->ft1_to_ft2, (lastkey+off));
334 }
335 else
336 {
337 /* we need exact match only if not in ft1->ft2 conversion mode */
338 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
339 lastkey, &last_key);
340 }
341 /* fall through to normal delete */
342 }
343 else
344 {
345 /* popular word. two-level tree. going down */
346 uint tmp_key_length;
347 my_off_t root;
348 uchar *kpos=keypos;
349 MARIA_KEY tmp_key;
350
351 tmp_key.data= lastkey;
352 tmp_key.keyinfo= keyinfo;
353
354 if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
355 &kpos)))
356 {
357 _ma_set_fatal_error(share, HA_ERR_CRASHED);
358 goto err;
359 }
360 root= _ma_row_pos_from_key(&tmp_key);
361 if (subkeys == -1)
362 {
363 /* the last entry in sub-tree */
364 if (_ma_dispose(info, root, 1))
365 goto err;
366 /* fall through to normal delete */
367 }
368 else
369 {
370 MARIA_KEY word_key;
371 keyinfo=&share->ft2_keyinfo;
372 /* we'll modify key entry 'in vivo' */
373 kpos-=keyinfo->keylength+nod_flag;
374 get_key_full_length_rdonly(off, key->data);
375
376 word_key.data= key->data + off;
377 word_key.keyinfo= &share->ft2_keyinfo;
378 word_key.data_length= HA_FT_WLEN;
379 word_key.ref_length= 0;
380 word_key.flag= 0;
381 ret_value= _ma_ck_real_delete(info, &word_key, &root);
382 _ma_dpointer(share, kpos+HA_FT_WLEN, root);
383 subkeys++;
384 ft_intXstore(kpos, subkeys);
385 if (!ret_value)
386 {
387 page_mark_changed(info, anc_page);
388 ret_value= _ma_write_keypage(anc_page,
389 PAGECACHE_LOCK_LEFT_WRITELOCKED,
390 DFLT_INIT_HITS);
391 }
392 goto end;
393 }
394 }
395 }
396 if (nod_flag)
397 {
398 /* Read left child page */
399 leaf_page.pos= _ma_kpos(nod_flag,keypos);
400
401 alloc_on_stack(*info->stack_end_ptr, leaf_buff, buff_alloced,
402 (keyinfo->block_length + keyinfo->max_store_length*2));
403 if (!leaf_buff)
404 goto err;
405
406 if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
407 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
408 0))
409 goto err;
410 }
411
412 if (flag != 0)
413 {
414 if (!nod_flag)
415 {
416 /* This should newer happend */
417 DBUG_PRINT("error",("Didn't find key"));
418 _ma_set_fatal_error(share, HA_ERR_CRASHED);
419 goto err;
420 }
421 save_flag=0;
422 ret_value= d_search(info, key, comp_flag, &leaf_page);
423 }
424 else
425 { /* Found key */
426 uint tmp;
427 uint anc_buff_length= anc_page->size;
428 uint anc_page_flag= anc_page->flag;
429 my_off_t next_block;
430
431 if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
432 anc_page->buff + anc_buff_length,
433 &next_block, &s_temp)))
434 goto err;
435
436 page_mark_changed(info, anc_page);
437 anc_buff_length-= tmp;
438 anc_page->size= anc_buff_length;
439 page_store_size(share, anc_page);
440
441 /*
442 Log initial changes on pages
443 If there is an underflow, there will be more changes logged to the
444 page
445 */
446 if (share->now_transactional &&
447 _ma_log_delete(anc_page, s_temp.key_pos,
448 s_temp.changed_length, s_temp.move_length,
449 0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
450 goto err;
451
452 if (!nod_flag)
453 { /* On leaf page */
454 if (anc_buff_length <= (info->quick_mode ?
455 MARIA_MIN_KEYBLOCK_LENGTH :
456 (uint) keyinfo->underflow_block_length))
457 {
458 /* Page will be written by caller if we return 1 */
459 ret_value= 1;
460 goto end;
461 }
462 if (_ma_write_keypage(anc_page,
463 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
464 goto err;
465
466 ret_value= 0; /* Return ok */
467 goto end;
468 }
469 save_flag=1; /* Mark that anc_buff is changed */
470 ret_value= del(info, key, anc_page, &leaf_page,
471 keypos, next_block, lastkey);
472 }
473 if (ret_value >0)
474 {
475 save_flag= 2;
476 if (ret_value == 1)
477 ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
478 else
479 {
480 /* This can only happen with variable length keys */
481 MARIA_KEY last_key;
482 DBUG_PRINT("test",("Enlarging of key when deleting"));
483
484 last_key.data= lastkey;
485 last_key.keyinfo= keyinfo;
486 if (!_ma_get_last_key(&last_key, anc_page, keypos))
487 goto err;
488 ret_value= _ma_insert(info, key, anc_page, keypos,
489 last_key.data,
490 (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
491
492 if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
493 DFLT_INIT_HITS))
494 ret_value= -1;
495 }
496 }
497 if (ret_value == 0 && anc_page->size > share->max_index_block_size)
498 {
499 /*
500 parent buffer got too big ; We have to split the page.
501 The | 2 is there to force write of anc page below
502 */
503 save_flag= 3;
504 ret_value= _ma_split_page(info, key, anc_page,
505 share->max_index_block_size,
506 (uchar*) 0, 0, 0, lastkey, 0) | 2;
507 DBUG_ASSERT(anc_page->org_size == anc_page->size);
508 }
509 if (save_flag && ret_value != 1)
510 {
511 page_mark_changed(info, anc_page);
512 if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
513 DFLT_INIT_HITS))
514 ret_value= -1;
515 }
516 else
517 {
518 DBUG_DUMP("page", anc_page->buff, anc_page->size);
519 }
520
521 end:
522 stack_alloc_free(leaf_buff, buff_alloced);
523 stack_alloc_free(lastkey, lastkey_alloced);
524 DBUG_PRINT("exit",("Return: %d",ret_value));
525 DBUG_RETURN(ret_value);
526
527 err:
528 stack_alloc_free(leaf_buff, buff_alloced);
529 stack_alloc_free(lastkey, lastkey_alloced);
530 DBUG_PRINT("exit",("Error: %d",my_errno));
531 DBUG_RETURN (-1);
532 } /* d_search */
533
534
535 /**
536 @brief Remove a key that has a page-reference
537
538 @param info Maria handler
539 @param key Buffer for key to be inserted at upper level
540 @param anc_page Page address for page where deleted key was
541 @param anc_buff Page buffer (nod) where deleted key was
542 @param leaf_page Page address for nod before the deleted key
543 @param leaf_buff Buffer for leaf_page
544 @param leaf_buff_link Pinned page link for leaf_buff
545 @param keypos Pos to where deleted key was on anc_buff
546 @param next_block Page adress for nod after deleted key
547 @param ret_key_buff Key before keypos in anc_buff
548
549 @notes
550 leaf_page must be written to disk if retval > 0
551 anc_page is not updated on disk. Caller should do this
552
553 @return
554 @retval < 0 Error
555 @retval 0 OK. leaf_buff is written to disk
556
557 @retval 1 key contains key to upper level (from balance page)
558 leaf_buff has underflow
559 @retval 2 key contains key to upper level (from split space)
560 */
561
del(MARIA_HA * info,MARIA_KEY * key,MARIA_PAGE * anc_page,MARIA_PAGE * leaf_page,uchar * keypos,my_off_t next_block,uchar * ret_key_buff)562 static int del(MARIA_HA *info, MARIA_KEY *key,
563 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
564 uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
565 {
566 int ret_value,length;
567 uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
568 uchar *keybuff,*endpos,*next_buff,*key_start, *prev_key;
569 uchar *anc_buff;
570 my_bool buff_alloced= 0, keybuff_alloced;
571 MARIA_KEY_PARAM s_temp;
572 MARIA_KEY tmp_key;
573 MARIA_SHARE *share= info->s;
574 MARIA_KEYDEF *keyinfo= key->keyinfo;
575 MARIA_KEY ret_key;
576 MARIA_PAGE next_page;
577 DBUG_ENTER("del");
578 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
579 (ulong) (leaf_page->pos / share->block_size),
580 keypos));
581 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
582
583 alloc_on_stack(*info->stack_end_ptr, keybuff, keybuff_alloced,
584 keyinfo->max_store_length);
585 if (!keybuff)
586 DBUG_RETURN(1);
587
588 page_flag= leaf_page->flag;
589 leaf_length= leaf_page->size;
590 nod_flag= leaf_page->node;
591
592 endpos= leaf_page->buff + leaf_length;
593 tmp_key.keyinfo= keyinfo;
594 tmp_key.data= keybuff;
595 next_buff= 0;
596
597 if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
598 goto err;
599
600 if (nod_flag)
601 {
602 next_page.pos= _ma_kpos(nod_flag,endpos);
603
604 alloc_on_stack(*info->stack_end_ptr, next_buff, buff_alloced,
605 (keyinfo->block_length + keyinfo->max_store_length*2));
606 if (!next_buff)
607 goto err;
608
609 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
610 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
611 ret_value= -1;
612 else
613 {
614 DBUG_DUMP("next_page", next_page.buff, next_page.size);
615 if ((ret_value= del(info, key, anc_page, &next_page,
616 keypos, next_block, ret_key_buff)) >0)
617 {
618 /* Get new length after key was deleted */
619 endpos= leaf_page->buff+ leaf_page->size;
620 if (ret_value == 1)
621 {
622 /* underflow writes "next_page" to disk */
623 ret_value= underflow(info, keyinfo, leaf_page, &next_page,
624 endpos);
625 if (ret_value < 0)
626 goto err;
627 if (leaf_page->size > share->max_index_block_size)
628 {
629 DBUG_ASSERT(ret_value == 0);
630 ret_value= (_ma_split_page(info, key, leaf_page,
631 share->max_index_block_size,
632 (uchar*) 0, 0, 0,
633 ret_key_buff, 0) | 2);
634 }
635 }
636 else
637 {
638 if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
639 DFLT_INIT_HITS))
640 goto err;
641 DBUG_PRINT("test",("Inserting of key when deleting"));
642 if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
643 goto err;
644 ret_value= _ma_insert(info, key, leaf_page, endpos,
645 tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
646 0);
647 }
648 }
649 page_mark_changed(info, leaf_page);
650 /*
651 If ret_value <> 0, then leaf_page underflowed and caller will have
652 to handle underflow and write leaf_page to disk.
653 We can't write it here, as if leaf_page is empty we get an assert
654 in _ma_write_keypage.
655 */
656 if (ret_value == 0 && _ma_write_keypage(leaf_page,
657 PAGECACHE_LOCK_LEFT_WRITELOCKED,
658 DFLT_INIT_HITS))
659 goto err;
660 }
661 stack_alloc_free(next_buff, buff_alloced);
662 stack_alloc_free(keybuff, keybuff_alloced);
663 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
664 DBUG_RETURN(ret_value);
665 }
666
667 /*
668 Remove last key from leaf page
669 Note that leaf_page page may only have had one key (can normally only
670 happen in quick mode), in which ase it will now temporary have 0 keys
671 on it. This will be corrected by the caller as we will return 0.
672 */
673 new_leaf_length= (uint) (key_start - leaf_page->buff);
674 leaf_page->size= new_leaf_length;
675 page_store_size(share, leaf_page);
676
677 if (share->now_transactional &&
678 _ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
679 goto err;
680
681 page_mark_changed(info, leaf_page); /* Safety */
682 if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
683 (uint) keyinfo->underflow_block_length))
684 {
685 /* Underflow, leaf_page will be written by caller */
686 ret_value= 1;
687 }
688 else
689 {
690 ret_value= 0;
691 if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
692 DFLT_INIT_HITS))
693 goto err;
694 }
695
696 /* Place last key in ancestor page on deleted key position */
697 a_length= anc_page->size;
698 anc_buff= anc_page->buff;
699 endpos= anc_buff + a_length;
700
701 ret_key.keyinfo= keyinfo;
702 ret_key.data= ret_key_buff;
703
704 prev_key= 0;
705 if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
706 {
707 if (!_ma_get_last_key(&ret_key, anc_page, keypos))
708 goto err;
709 prev_key= ret_key.data;
710 }
711 length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
712 keypos == endpos ? (uchar*) 0 : keypos,
713 prev_key, prev_key,
714 &s_temp);
715 if (length > 0)
716 bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
717 else
718 bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
719 (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
720 key_start= keypos;
721 if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
722 SEARCH_PAGE_KEY_HAS_TRANSID))
723 _ma_mark_page_with_transid(share, anc_page);
724
725 /* Save pointer to next leaf on parent page */
726 if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
727 &keypos))
728 goto err;
729 _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
730 anc_page->size= a_length + length;
731 page_store_size(share, anc_page);
732
733 if (share->now_transactional &&
734 _ma_log_add(anc_page, a_length,
735 key_start, s_temp.changed_length, s_temp.move_length, 1,
736 KEY_OP_DEBUG_LOG_ADD_2))
737 goto err;
738
739 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
740 stack_alloc_free(next_buff, buff_alloced);
741 stack_alloc_free(keybuff, keybuff_alloced);
742 DBUG_RETURN(new_leaf_length <=
743 (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
744 (uint) keyinfo->underflow_block_length));
745
746 err:
747 stack_alloc_free(next_buff, buff_alloced);
748 stack_alloc_free(keybuff, keybuff_alloced);
749 DBUG_RETURN(-1);
750 } /* del */
751
752
753 /**
754 @brief Balances adjacent pages if underflow occours
755
756 @fn underflow()
757 @param anc_buff Anchestor page data
758 @param leaf_page Leaf page (page that underflowed)
759 @param leaf_page_link Pointer to pin information about leaf page
760 @param keypos Position after current key in anc_buff
761
762 @note
763 This function writes redo entries for all changes
764 leaf_page is saved to disk
765 Caller must save anc_buff
766
767 For the algoritm to work, we have to ensure for packed keys that
768 key_length + (underflow_length + max_block_length + key_length) / 2
769 <= block_length.
770 From which follows that underflow_length <= block_length - key_length *3
771 For not packed keys we have:
772 (underflow_length + max_block_length + key_length) / 2 <= block_length
773 From which follows that underflow_length < block_length - key_length
774 This is ensured by setting of underflow_block_length.
775
776 @return
777 @retval 0 ok
778 @retval 1 ok, but anc_page did underflow
779 @retval -1 error
780 */
781
underflow(MARIA_HA * info,MARIA_KEYDEF * keyinfo,MARIA_PAGE * anc_page,MARIA_PAGE * leaf_page,uchar * keypos)782 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
783 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
784 uchar *keypos)
785 {
786 int t_length;
787 uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
788 uint next_buff_length, new_buff_length, key_reflength;
789 uint unchanged_leaf_length, new_leaf_length, new_anc_length;
790 uint anc_page_flag, page_flag;
791 uchar *anc_key_buff, *leaf_key_buff;
792 uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
793 uchar *anc_buff, *leaf_buff;
794 uchar *after_key, *anc_end_pos;
795 MARIA_KEY_PARAM key_deleted, key_inserted;
796 MARIA_SHARE *share= info->s;
797 my_bool first_key, buff_alloced;
798 MARIA_KEY tmp_key, anc_key, leaf_key;
799 MARIA_PAGE next_page;
800 DBUG_ENTER("underflow");
801 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
802 (ulong) (leaf_page->pos / share->block_size),
803 keypos));
804 DBUG_DUMP("anc_buff", anc_page->buff, anc_page->size);
805 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
806
807 alloc_on_stack(*info->stack_end_ptr, anc_key_buff, buff_alloced,
808 keyinfo->max_store_length*2);
809 if (!anc_key_buff)
810 DBUG_RETURN(1);
811
812 leaf_key_buff= anc_key_buff+ keyinfo->max_store_length;
813
814 anc_page_flag= anc_page->flag;
815 anc_buff= anc_page->buff;
816 leaf_buff= leaf_page->buff;
817 info->keyread_buff_used=1;
818 next_keypos=keypos;
819 nod_flag= leaf_page->node;
820 p_length= nod_flag+share->keypage_header;
821 anc_length= anc_page->size;
822 leaf_length= leaf_page->size;
823 key_reflength= share->base.key_reflength;
824 if (share->keyinfo+info->lastinx == keyinfo)
825 info->page_changed=1;
826 first_key= keypos == anc_buff + share->keypage_header + key_reflength;
827
828 tmp_key.data= info->buff;
829 anc_key.data= anc_key_buff;
830 leaf_key.data= leaf_key_buff;
831 tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;
832
833 if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
834 first_key)
835 {
836 uint tmp_length;
837 uint next_page_flag;
838 /* Use page right of anc-page */
839 DBUG_PRINT("test",("use right page"));
840
841 /*
842 Calculate position after the current key. Note that keydata itself is
843 not used
844 */
845 if (keyinfo->flag & HA_BINARY_PACK_KEY)
846 {
847 if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
848 goto err;
849 }
850 else
851 {
852 /* Avoid length error check if packed key */
853 tmp_key.data[0]= tmp_key.data[1]= 0;
854 /* Got to end of found key */
855 if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
856 &next_keypos))
857 goto err;
858 }
859 next_page.pos= _ma_kpos(key_reflength, next_keypos);
860 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
861 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
862 goto err;
863 next_buff_length= next_page.size;
864 next_page_flag= next_page.flag;
865 DBUG_DUMP("next", next_page.buff, next_page.size);
866
867 /* find keys to make a big key-page */
868 bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
869 key_reflength);
870
871 if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
872 !_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
873 goto err;
874
875 /* merge pages and put parting key from anc_page between */
876 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
877 t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
878 prev_key, prev_key, &key_inserted);
879 tmp_length= next_buff_length - p_length;
880 endpos= next_page.buff + tmp_length + leaf_length + t_length;
881 /* next_page.buff will always be larger than before !*/
882 bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
883 memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
884 (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
885 buff_length= (uint) (endpos - next_page.buff);
886
887 /* Set page flag from combination of both key pages and parting key */
888 page_flag= next_page_flag | leaf_page->flag;
889 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
890 SEARCH_PAGE_KEY_HAS_TRANSID))
891 page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
892
893 next_page.size= buff_length;
894 next_page.flag= page_flag;
895 page_store_info(share, &next_page);
896
897 /* remove key from anc_page */
898 if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
899 anc_key_buff, anc_buff+anc_length,
900 (my_off_t *) 0, &key_deleted)))
901 goto err;
902
903 new_anc_length= anc_length - s_length;
904 anc_page->size= new_anc_length;
905 page_store_size(share, anc_page);
906
907 if (buff_length <= share->max_index_block_size)
908 {
909 /* All keys fitted into one page */
910 page_mark_changed(info, &next_page);
911 if (_ma_dispose(info, next_page.pos, 0))
912 goto err;
913
914 memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
915 leaf_page->size= next_page.size;
916 leaf_page->flag= next_page.flag;
917
918 if (share->now_transactional)
919 {
920 /*
921 Log changes to parent page. Note that this page may have been
922 temporarily bigger than block_size.
923 */
924 if (_ma_log_delete(anc_page, key_deleted.key_pos,
925 key_deleted.changed_length,
926 key_deleted.move_length,
927 anc_length - anc_page->org_size,
928 KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
929 goto err;
930 /*
931 Log changes to leaf page. Data for leaf page is in leaf_buff
932 which contains original leaf_buff, parting key and next_buff
933 */
934 if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
935 goto err;
936 }
937 }
938 else
939 {
940 /*
941 Balancing didn't free a page, so we have to split 'buff' into two
942 pages:
943 - Find key in middle of buffer
944 - Store everything before key in 'leaf_page'
945 - Pack key into anc_page at position of deleted key
946 Note that anc_page may overflow! (is handled by caller)
947 - Store remaining keys in next_page (buff)
948 */
949 MARIA_KEY_PARAM anc_key_inserted;
950
951 anc_end_pos= anc_buff + new_anc_length;
952
953 DBUG_PRINT("test",("anc_buff:%p anc_end_pos:%p",
954 anc_buff, anc_end_pos));
955
956 if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
957 goto err;
958 if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
959 goto err;
960 new_leaf_length= (uint) (half_pos - next_page.buff);
961 memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);
962
963 leaf_page->size= new_leaf_length;
964 leaf_page->flag= page_flag;
965 page_store_info(share, leaf_page);
966
967 /* Correct new keypointer to leaf_page */
968 half_pos=after_key;
969 _ma_kpointer(info,
970 leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
971 next_page.pos);
972
973 /* Save key in anc_page */
974 prev_key= (first_key ? (uchar*) 0 : anc_key.data);
975 t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
976 (keypos == anc_end_pos ? (uchar*) 0 :
977 keypos),
978 prev_key, prev_key, &anc_key_inserted);
979 if (t_length >= 0)
980 bmove_upp(anc_end_pos+t_length, anc_end_pos,
981 (uint) (anc_end_pos - keypos));
982 else
983 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
984 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
985 new_anc_length+= t_length;
986 anc_page->size= new_anc_length;
987 page_store_size(share, anc_page);
988
989 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
990 SEARCH_PAGE_KEY_HAS_TRANSID))
991 _ma_mark_page_with_transid(share, anc_page);
992
993 /* Store key first in new page */
994 if (nod_flag)
995 bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
996 (size_t) nod_flag);
997 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
998 goto err;
999 t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1000 (uchar*) 0, (uchar*) 0,
1001 &key_inserted);
1002 /* t_length will always be > 0 for a new page !*/
1003 tmp_length= (uint) ((next_page.buff + buff_length) - half_pos);
1004 bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
1005 (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
1006 new_buff_length= tmp_length + t_length + p_length;
1007 next_page.size= new_buff_length;
1008 page_store_size(share, &next_page);
1009 /* keypage flag is already up to date */
1010
1011 if (share->now_transactional)
1012 {
1013 /*
1014 Log changes to parent page
1015 This has one key deleted from it and one key inserted to it at
1016 keypos
1017
1018 ma_log_add ensures that we don't log changes that is outside of
1019 key block size, as the REDO code can't handle that
1020 */
1021 if (_ma_log_add(anc_page, anc_length, keypos,
1022 anc_key_inserted.move_length +
1023 MY_MAX(anc_key_inserted.changed_length -
1024 anc_key_inserted.move_length,
1025 key_deleted.changed_length),
1026 anc_key_inserted.move_length -
1027 key_deleted.move_length, 1,
1028 KEY_OP_DEBUG_LOG_ADD_3))
1029 goto err;
1030
1031 /*
1032 Log changes to leaf page.
1033 This contains original data with new data added at end
1034 */
1035 DBUG_ASSERT(leaf_length <= new_leaf_length);
1036 if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
1037 goto err;
1038 /*
1039 Log changes to next page
1040
1041 This contains original data with some prefix data deleted and
1042 some compressed data at start possible extended
1043
1044 Data in buff was originally:
1045 org_leaf_buff [leaf_length]
1046 separator_key [buff_key_inserted.move_length]
1047 next_key_changes [buff_key_inserted.changed_length -move_length]
1048 next_page_data [next_buff_length - p_length -
1049 (buff_key_inserted.changed_length -move_length)]
1050
1051 After changes it's now:
1052 unpacked_key [key_inserted.changed_length]
1053 next_suffix [next_buff_length - key_inserted.changed_length]
1054
1055 */
1056 DBUG_ASSERT(new_buff_length <= next_buff_length);
1057 if (_ma_log_prefix(&next_page, key_inserted.changed_length,
1058 (int) (new_buff_length - next_buff_length),
1059 KEY_OP_DEBUG_LOG_PREFIX_1))
1060 goto err;
1061 }
1062 page_mark_changed(info, &next_page);
1063 if (_ma_write_keypage(&next_page,
1064 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1065 goto err;
1066 }
1067
1068 page_mark_changed(info, leaf_page);
1069 if (_ma_write_keypage(leaf_page,
1070 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1071 goto err;
1072 stack_alloc_free(anc_key_buff, buff_alloced);
1073 DBUG_RETURN(new_anc_length <=
1074 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1075 (uint) keyinfo->underflow_block_length)));
1076 }
1077
1078 DBUG_PRINT("test",("use left page"));
1079
1080 keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
1081 if (!keypos)
1082 goto err;
1083 next_page.pos= _ma_kpos(key_reflength,keypos);
1084 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1085 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
1086 goto err;
1087 buff_length= next_page.size;
1088 endpos= next_page.buff + buff_length;
1089 DBUG_DUMP("prev", next_page.buff, next_page.size);
1090
1091 /* find keys to make a big key-page */
1092 bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1093 key_reflength);
1094 next_keypos=keypos;
1095 if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
1096 &next_keypos))
1097 goto err;
1098 if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
1099 goto err;
1100
1101 /* merge pages and put parting key from anc_page between */
1102 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
1103 t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1104 (leaf_length == p_length ?
1105 (uchar*) 0 : leaf_buff+p_length),
1106 prev_key, prev_key,
1107 &key_inserted);
1108 if (t_length >= 0)
1109 bmove(endpos+t_length, leaf_buff+p_length,
1110 (size_t) (leaf_length-p_length));
1111 else /* We gained space */
1112 bmove(endpos,leaf_buff+((int) p_length-t_length),
1113 (size_t) (leaf_length-p_length+t_length));
1114 (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1115
1116 /* Remember for logging how many bytes of leaf_buff that are not changed */
1117 DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
1118 unchanged_leaf_length= (leaf_length - p_length -
1119 (key_inserted.changed_length -
1120 key_inserted.move_length));
1121
1122 new_buff_length= buff_length + leaf_length - p_length + t_length;
1123
1124 #ifdef EXTRA_DEBUG
1125 /* Ensure that unchanged_leaf_length is correct */
1126 DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
1127 leaf_buff + leaf_length - unchanged_leaf_length,
1128 unchanged_leaf_length) == 0);
1129 #endif
1130
1131 page_flag= next_page.flag | leaf_page->flag;
1132 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1133 SEARCH_PAGE_KEY_HAS_TRANSID))
1134 page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
1135
1136 next_page.size= new_buff_length;
1137 next_page.flag= page_flag;
1138 page_store_info(share, &next_page);
1139
1140 /* remove key from anc_page */
1141 if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
1142 anc_key_buff,
1143 anc_buff+anc_length, (my_off_t *) 0,
1144 &key_deleted)))
1145 goto err;
1146
1147 new_anc_length= anc_length - s_length;
1148 anc_page->size= new_anc_length;
1149 page_store_size(share, anc_page);
1150
1151 if (new_buff_length <= share->max_index_block_size)
1152 {
1153 /* All keys fitted into one page */
1154 page_mark_changed(info, leaf_page);
1155 if (_ma_dispose(info, leaf_page->pos, 0))
1156 goto err;
1157
1158 if (share->now_transactional)
1159 {
1160 /*
1161 Log changes to parent page. Note that this page may have been
1162 temporarily bigger than block_size.
1163 */
1164 if (_ma_log_delete(anc_page, key_deleted.key_pos,
1165 key_deleted.changed_length, key_deleted.move_length,
1166 anc_length - anc_page->org_size,
1167 KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
1168 goto err;
1169 /*
1170 Log changes to next page. Data for leaf page is in buff
1171 that contains original leaf_buff, parting key and next_buff
1172 */
1173 if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1174 goto err;
1175 }
1176 }
1177 else
1178 {
1179 /*
1180 Balancing didn't free a page, so we have to split 'next_page' into two
1181 pages
1182 - Find key in middle of buffer (buff)
1183 - Pack key at half_buff into anc_page at position of deleted key
1184 Note that anc_page may overflow! (is handled by caller)
1185 - Move everything after middlekey to 'leaf_buff'
1186 - Shorten buff at 'endpos'
1187 */
1188 MARIA_KEY_PARAM anc_key_inserted;
1189 size_t tmp_length;
1190
1191 if (keypos == anc_buff + share->keypage_header + key_reflength)
1192 anc_pos= 0; /* First key */
1193 else
1194 {
1195 if (!_ma_get_last_key(&anc_key, anc_page, keypos))
1196 goto err;
1197 anc_pos= anc_key.data;
1198 }
1199 if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
1200 goto err;
1201
1202 /* Correct new keypointer to leaf_page */
1203 _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
1204 leaf_key.ref_length, leaf_page->pos);
1205
1206 /* Save parting key found by _ma_find_half_pos() in anc_page */
1207 DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1208 DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1209 anc_end_pos= anc_buff + new_anc_length;
1210 t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1211 keypos == anc_end_pos ? (uchar*) 0
1212 : keypos,
1213 anc_pos, anc_pos,
1214 &anc_key_inserted);
1215 if (t_length >= 0)
1216 bmove_upp(anc_end_pos+t_length, anc_end_pos,
1217 (uint) (anc_end_pos-keypos));
1218 else
1219 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
1220 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1221 new_anc_length+= t_length;
1222 anc_page->size= new_anc_length;
1223 page_store_size(share, anc_page);
1224
1225 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1226 SEARCH_PAGE_KEY_HAS_TRANSID))
1227 _ma_mark_page_with_transid(share, anc_page);
1228
1229 /* Store first key on new page */
1230 if (nod_flag)
1231 bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1232 (size_t) nod_flag);
1233 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1234 goto err;
1235 DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
1236 t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1237 (uchar*) 0, (uchar*) 0, &key_inserted);
1238 /* t_length will always be > 0 for a new page !*/
1239 tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
1240 DBUG_PRINT("info",("t_length: %d length: %d",t_length, (int) tmp_length));
1241 bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1242 (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1243 new_leaf_length= (uint)(tmp_length + t_length + p_length);
1244 DBUG_ASSERT(new_leaf_length <= share->max_index_block_size);
1245
1246 leaf_page->size= new_leaf_length;
1247 leaf_page->flag= page_flag;
1248 page_store_info(share, leaf_page);
1249
1250 new_buff_length= (uint) (endpos - next_page.buff);
1251 next_page.size= new_buff_length;
1252 page_store_size(share, &next_page);
1253
1254 if (share->now_transactional)
1255 {
1256 /*
1257 Log changes to parent page
1258 This has one key deleted from it and one key inserted to it at
1259 keypos
1260
1261 ma_log_add() ensures that we don't log changes that is outside of
1262 key block size, as the REDO code can't handle that
1263 */
1264 if (_ma_log_add(anc_page, anc_length, keypos,
1265 anc_key_inserted.move_length +
1266 MY_MAX(anc_key_inserted.changed_length -
1267 anc_key_inserted.move_length,
1268 key_deleted.changed_length),
1269 anc_key_inserted.move_length -
1270 key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4))
1271 goto err;
1272
1273 /*
1274 Log changes to leaf page.
1275 This contains original data with new data added first
1276 */
1277 DBUG_ASSERT(leaf_length <= new_leaf_length);
1278 DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
1279 if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
1280 (int) (new_leaf_length - leaf_length),
1281 KEY_OP_DEBUG_LOG_PREFIX_2))
1282 goto err;
1283 /*
1284 Log changes to next page
1285 This contains original data with some suffix data deleted
1286 */
1287 DBUG_ASSERT(new_buff_length <= buff_length);
1288 if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1289 goto err;
1290 }
1291
1292 page_mark_changed(info, leaf_page);
1293 if (_ma_write_keypage(leaf_page,
1294 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1295 goto err;
1296 }
1297 page_mark_changed(info, &next_page);
1298 if (_ma_write_keypage(&next_page,
1299 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1300 goto err;
1301
1302 stack_alloc_free(anc_key_buff, buff_alloced);
1303 DBUG_RETURN(new_anc_length <=
1304 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1305 (uint) keyinfo->underflow_block_length)));
1306
1307 err:
1308 stack_alloc_free(anc_key_buff, buff_alloced);
1309 DBUG_RETURN(-1);
1310 } /* underflow */
1311
1312
1313 /**
1314 @brief Remove a key from page
1315
1316 @fn remove_key()
1317 keyinfo Key handle
1318 nod_flag Length of node ptr
1319 keypos Where on page key starts
1320 lastkey Buffer for storing keys to be removed
1321 page_end Pointer to end of page
1322 next_block If <> 0 and node-page, this is set to address of
1323 next page
1324 s_temp Information about what changes was done one the page:
1325 s_temp.key_pos Start of key
1326 s_temp.move_length Number of bytes removed at keypos
1327 s_temp.changed_length Number of bytes changed at keypos
1328
1329 @todo
1330 The current code doesn't handle the case that the next key may be
1331 packed better against the previous key if there is a case difference
1332
1333 @return
1334 @retval 0 error
1335 @retval # How many chars was removed
1336 */
1337
remove_key(MARIA_KEYDEF * keyinfo,uint page_flag,uint nod_flag,uchar * keypos,uchar * lastkey,uchar * page_end,my_off_t * next_block,MARIA_KEY_PARAM * s_temp)1338 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1339 uchar *keypos, uchar *lastkey,
1340 uchar *page_end, my_off_t *next_block,
1341 MARIA_KEY_PARAM *s_temp)
1342 {
1343 int s_length;
1344 uchar *start;
1345 DBUG_ENTER("remove_key");
1346 DBUG_PRINT("enter", ("keypos:%p page_end: %p",
1347 keypos, page_end));
1348
1349 start= s_temp->key_pos= keypos;
1350 s_temp->changed_length= 0;
1351 if (!(keyinfo->flag &
1352 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1353 HA_BINARY_PACK_KEY)) &&
1354 !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1355 {
1356 /* Static length key */
1357 s_length=(int) (keyinfo->keylength+nod_flag);
1358 if (next_block && nod_flag)
1359 *next_block= _ma_kpos(nod_flag,keypos+s_length);
1360 }
1361 else
1362 {
1363 /* Let keypos point at next key */
1364 MARIA_KEY key;
1365
1366 /* Calculate length of key */
1367 key.keyinfo= keyinfo;
1368 key.data= lastkey;
1369 if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1370 DBUG_RETURN(0); /* Error */
1371
1372 if (next_block && nod_flag)
1373 *next_block= _ma_kpos(nod_flag,keypos);
1374 s_length=(int) (keypos-start);
1375 if (keypos != page_end)
1376 {
1377 if (keyinfo->flag & HA_BINARY_PACK_KEY)
1378 {
1379 uchar *old_key= start;
1380 uint next_length,prev_length,prev_pack_length;
1381
1382 /* keypos points here on start of next key */
1383 get_key_length(next_length,keypos);
1384 get_key_pack_length(prev_length,prev_pack_length,old_key);
1385 if (next_length > prev_length)
1386 {
1387 uint diff= (next_length-prev_length);
1388 /* We have to copy data from the current key to the next key */
1389 keypos-= diff + prev_pack_length;
1390 store_key_length(keypos, prev_length);
1391 bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1392 s_length=(int) (keypos-start);
1393 s_temp->changed_length= diff + prev_pack_length;
1394 }
1395 }
1396 else
1397 {
1398 /* Check if a variable length first key part */
1399 if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
1400 {
1401 /* Next key is packed against the current one */
1402 uint next_length,prev_length,prev_pack_length,lastkey_length,
1403 rest_length;
1404 if (keyinfo->seg[0].length >= 127)
1405 {
1406 if (!(prev_length=mi_uint2korr(start) & 32767))
1407 goto end;
1408 next_length=mi_uint2korr(keypos) & 32767;
1409 keypos+=2;
1410 prev_pack_length=2;
1411 }
1412 else
1413 {
1414 if (!(prev_length= *start & 127))
1415 goto end; /* Same key as previous*/
1416 next_length= *keypos & 127;
1417 keypos++;
1418 prev_pack_length=1;
1419 }
1420 if (!(*start & 128))
1421 prev_length=0; /* prev key not packed */
1422 if (keyinfo->seg[0].flag & HA_NULL_PART)
1423 lastkey++; /* Skip null marker */
1424 get_key_length(lastkey_length,lastkey);
1425 if (!next_length) /* Same key after */
1426 {
1427 next_length=lastkey_length;
1428 rest_length=0;
1429 }
1430 else
1431 get_key_length(rest_length,keypos);
1432
1433 if (next_length >= prev_length)
1434 {
1435 /* Next key is based on deleted key */
1436 uint pack_length;
1437 uint diff= (next_length-prev_length);
1438
1439 /* keypos points to data of next key (after key length) */
1440 bmove(keypos - diff, lastkey + prev_length, diff);
1441 rest_length+= diff;
1442 pack_length= prev_length ? get_pack_length(rest_length): 0;
1443 keypos-= diff + pack_length + prev_pack_length;
1444 s_length=(int) (keypos-start);
1445 if (prev_length) /* Pack against prev key */
1446 {
1447 *keypos++= start[0];
1448 if (prev_pack_length == 2)
1449 *keypos++= start[1];
1450 store_key_length(keypos,rest_length);
1451 }
1452 else
1453 {
1454 /* Next key is not packed anymore */
1455 if (keyinfo->seg[0].flag & HA_NULL_PART)
1456 {
1457 rest_length++; /* Mark not null */
1458 }
1459 if (prev_pack_length == 2)
1460 {
1461 mi_int2store(keypos,rest_length);
1462 }
1463 else
1464 *keypos= rest_length;
1465 }
1466 s_temp->changed_length= diff + pack_length + prev_pack_length;
1467 }
1468 }
1469 }
1470 }
1471 }
1472 end:
1473 bmove(start, start+s_length, (uint) (page_end-start-s_length));
1474 s_temp->move_length= s_length;
1475 DBUG_RETURN((uint) s_length);
1476 } /* remove_key */
1477
1478
1479 /****************************************************************************
1480 Logging of redos
1481 ****************************************************************************/
1482
1483 /**
1484 @brief
1485 log entry where some parts are deleted and some things are changed
1486 and some data could be added last.
1487
1488 @fn _ma_log_delete()
1489 @param info Maria handler
1490 @param page Pageaddress for changed page
1491 @param buff Page buffer
1492 @param key_pos Start of change area
1493 @param changed_length How many bytes where changed at key_pos
1494 @param move_length How many bytes where deleted at key_pos
1495 @param append_length Length of data added last
1496 This is taken from end of ma_page->buff
1497
1498 This is mainly used when a key is deleted. The append happens
1499 when we delete a key from a page with data > block_size kept in
1500 memory and we have to add back the data that was stored > block_size
1501 */
1502
_ma_log_delete(MARIA_PAGE * ma_page,const uchar * key_pos,uint changed_length,uint move_length,uint append_length,enum en_key_debug debug_marker)1503 my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
1504 uint changed_length, uint move_length,
1505 uint append_length __attribute__((unused)),
1506 enum en_key_debug debug_marker __attribute__((unused)))
1507 {
1508 LSN lsn;
1509 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7];
1510 uchar *log_pos;
1511 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
1512 uint translog_parts, current_size, extra_length;
1513 uint offset= (uint) (key_pos - ma_page->buff);
1514 MARIA_HA *info= ma_page->info;
1515 MARIA_SHARE *share= info->s;
1516 my_off_t page= ma_page->pos / share->block_size;
1517 DBUG_ENTER("_ma_log_delete");
1518 DBUG_PRINT("enter", ("page: %lu offset: %u changed_length: %u move_length: %u append_length: %u page_size: %u",
1519 (ulong) page, offset, changed_length, move_length,
1520 append_length, ma_page->size));
1521 DBUG_ASSERT(share->now_transactional && move_length);
1522 DBUG_ASSERT(offset + changed_length <= ma_page->size);
1523 DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
1524 DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
1525
1526 /* Store address of new root page */
1527 page_store(log_data + FILEID_STORE_SIZE, page);
1528 log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1529 current_size= ma_page->org_size;
1530
1531 #ifdef EXTRA_DEBUG_KEY_CHANGES
1532 *log_pos++= KEY_OP_DEBUG;
1533 *log_pos++= debug_marker;
1534
1535 *log_pos++= KEY_OP_DEBUG_2;
1536 int2store(log_pos, ma_page->org_size);
1537 int2store(log_pos+2, ma_page->size);
1538 log_pos+=4;
1539 #endif
1540
1541 /* Store keypage_flag */
1542 *log_pos++= KEY_OP_SET_PAGEFLAG;
1543 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
1544
1545 log_pos[0]= KEY_OP_OFFSET;
1546 int2store(log_pos+1, offset);
1547 log_pos+= 3;
1548 translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
1549 extra_length= 0;
1550
1551 if (changed_length)
1552 {
1553 if (offset + changed_length >= share->max_index_block_size)
1554 {
1555 changed_length= share->max_index_block_size - offset;
1556 move_length= 0; /* Nothing to move */
1557 current_size= share->max_index_block_size;
1558 }
1559
1560 log_pos[0]= KEY_OP_CHANGE;
1561 int2store(log_pos+1, changed_length);
1562 log_pos+= 3;
1563 log_array[translog_parts].str= ma_page->buff + offset;
1564 log_array[translog_parts].length= changed_length;
1565 translog_parts++;
1566
1567 /* We only have to move things after offset+changed_length */
1568 offset+= changed_length;
1569 }
1570
1571 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1572 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1573
1574 if (move_length)
1575 {
1576 uint log_length;
1577 if (offset + move_length < share->max_index_block_size)
1578 {
1579 /*
1580 Move down things that is on page.
1581 page_offset in apply_redo_inxed() will be at original offset
1582 + changed_length.
1583 */
1584 log_pos[0]= KEY_OP_SHIFT;
1585 int2store(log_pos+1, - (int) move_length);
1586 log_length= 3;
1587 current_size-= move_length;
1588 }
1589 else
1590 {
1591 /* Delete to end of page */
1592 uint tmp= current_size - offset;
1593 current_size= offset;
1594 log_pos[0]= KEY_OP_DEL_SUFFIX;
1595 int2store(log_pos+1, tmp);
1596 log_length= 3;
1597 }
1598 log_array[translog_parts].str= log_pos;
1599 log_array[translog_parts].length= log_length;
1600 translog_parts++;
1601 log_pos+= log_length;
1602 extra_length+= log_length;
1603 }
1604
1605 if (current_size != ma_page->size &&
1606 current_size != share->max_index_block_size)
1607 {
1608 /* Append data that didn't fit on the page before */
1609 uint length= (MY_MIN(ma_page->size, share->max_index_block_size) -
1610 current_size);
1611 uchar *data= ma_page->buff + current_size;
1612
1613 DBUG_ASSERT(length <= append_length);
1614
1615 log_pos[0]= KEY_OP_ADD_SUFFIX;
1616 int2store(log_pos+1, length);
1617 log_array[translog_parts].str= log_pos;
1618 log_array[translog_parts].length= 3;
1619 log_array[translog_parts + 1].str= data;
1620 log_array[translog_parts + 1].length= length;
1621 log_pos+= 3;
1622 translog_parts+= 2;
1623 current_size+= length;
1624 extra_length+= 3 + length;
1625 }
1626
1627 _ma_log_key_changes(ma_page,
1628 log_array + translog_parts,
1629 log_pos, &extra_length, &translog_parts);
1630 /* Remember new page length for future log entires for same page */
1631 ma_page->org_size= current_size;
1632
1633 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1634 info->trn, info,
1635 (translog_size_t)
1636 log_array[TRANSLOG_INTERNAL_PARTS].length +
1637 changed_length + extra_length, translog_parts,
1638 log_array, log_data, NULL))
1639 DBUG_RETURN(1);
1640
1641 DBUG_RETURN(0);
1642 }
1643
1644
1645 /****************************************************************************
1646 Logging of undos
1647 ****************************************************************************/
1648
_ma_write_undo_key_delete(MARIA_HA * info,const MARIA_KEY * key,my_off_t new_root,LSN * res_lsn)1649 my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
1650 my_off_t new_root, LSN *res_lsn)
1651 {
1652 MARIA_SHARE *share= info->s;
1653 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1654 KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
1655 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1656 struct st_msg_to_write_hook_for_undo_key msg;
1657 enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1658 uint keynr= key->keyinfo->key_nr;
1659
1660 lsn_store(log_data, info->trn->undo_lsn);
1661 key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
1662 log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
1663
1664 /**
1665 @todo BUG if we had concurrent insert/deletes, reading state's key_root
1666 like this would be unsafe.
1667 */
1668 if (new_root != share->state.key_root[keynr])
1669 {
1670 my_off_t page;
1671 page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1672 new_root / share->block_size);
1673 page_store(log_pos, page);
1674 log_pos+= PAGE_STORE_SIZE;
1675 log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
1676 }
1677
1678 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1679 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1680 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
1681 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
1682 key->ref_length);
1683
1684 msg.root= &share->state.key_root[keynr];
1685 msg.value= new_root;
1686 /*
1687 set autoincrement to 1 if this is an auto_increment key
1688 This is only used if we are now in a rollback of a duplicate key
1689 */
1690 msg.auto_increment= share->base.auto_key == keynr + 1;
1691
1692 return translog_write_record(res_lsn, log_type,
1693 info->trn, info,
1694 (translog_size_t)
1695 (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1696 log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
1697 TRANSLOG_INTERNAL_PARTS + 2, log_array,
1698 log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1699 }
1700