1 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 Copyright (C) 2009-2010 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "ma_fulltext.h"
18 #include "ma_rt_index.h"
19 #include "trnman.h"
20 #include "ma_key_recover.h"
21
22 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
23 MARIA_PAGE *page);
24 static int del(MARIA_HA *info, MARIA_KEY *key,
25 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
26 uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
27 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
28 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
29 uchar *keypos);
30 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
31 uchar *keypos, uchar *lastkey, uchar *page_end,
32 my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
33
34 /* @breif Remove a row from a MARIA table */
35
maria_delete(MARIA_HA * info,const uchar * record)36 int maria_delete(MARIA_HA *info,const uchar *record)
37 {
38 uint i;
39 uchar *old_key;
40 int save_errno;
41 char lastpos[8];
42 MARIA_SHARE *share= info->s;
43 MARIA_KEYDEF *keyinfo;
44 DBUG_ENTER("maria_delete");
45
46 /* Test if record is in datafile */
47 DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
48 maria_print_error(share, HA_ERR_CRASHED);
49 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
50 DBUG_EXECUTE_IF("my_error_test_undefined_error",
51 maria_print_error(share, INT_MAX);
52 DBUG_RETURN(my_errno= INT_MAX););
53 if (!(info->update & HA_STATE_AKTIV))
54 {
55 DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND); /* No database read */
56 }
57 if (share->options & HA_OPTION_READ_ONLY_DATA)
58 {
59 DBUG_RETURN(my_errno=EACCES);
60 }
61 if (_ma_readinfo(info,F_WRLCK,1))
62 DBUG_RETURN(my_errno);
63 if ((*share->compare_record)(info,record))
64 goto err; /* Error on read-check */
65
66 if (_ma_mark_file_changed(share))
67 goto err;
68
69 /* Ensure we don't change the autoincrement value */
70 info->last_auto_increment= ~(ulonglong) 0;
71 /* Remove all keys from the index file */
72
73 old_key= info->lastkey_buff2;
74
75 for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
76 {
77 if (maria_is_key_active(share->state.key_map, i))
78 {
79 keyinfo->version++;
80 if (keyinfo->flag & HA_FULLTEXT)
81 {
82 if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
83 goto err;
84 }
85 else
86 {
87 MARIA_KEY key;
88 if (keyinfo->ck_delete(info,
89 (*keyinfo->make_key)(info, &key, i, old_key,
90 record,
91 info->cur_row.lastpos,
92 info->cur_row.trid)))
93 goto err;
94 }
95 /* The above changed info->lastkey2. Inform maria_rnext_same(). */
96 info->update&= ~HA_STATE_RNEXT_SAME;
97 }
98 }
99
100 if (share->calc_checksum)
101 {
102 /*
103 We can't use the row based checksum as this doesn't have enough
104 precision.
105 */
106 info->cur_row.checksum= (*share->calc_checksum)(info, record);
107 }
108
109 if ((*share->delete_record)(info, record))
110 goto err; /* Remove record from database */
111
112 info->state->checksum-= info->cur_row.checksum;
113 info->state->records--;
114 info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
115 info->row_changes++;
116 share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
117 STATE_NOT_ZEROFILLED);
118 info->state->changed=1;
119
120 mi_sizestore(lastpos, info->cur_row.lastpos);
121 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
122 if (info->invalidator != 0)
123 {
124 DBUG_PRINT("info", ("invalidator... '%s' (delete)",
125 share->open_file_name.str));
126 (*info->invalidator)(share->open_file_name.str);
127 info->invalidator=0;
128 }
129 DBUG_RETURN(0);
130
131 err:
132 save_errno= my_errno;
133 DBUG_ASSERT(save_errno);
134 if (!save_errno)
135 save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
136
137 mi_sizestore(lastpos, info->cur_row.lastpos);
138 (void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
139 info->update|=HA_STATE_WRITTEN; /* Buffer changed */
140 if (save_errno != HA_ERR_RECORD_CHANGED)
141 {
142 _ma_set_fatal_error(share, HA_ERR_CRASHED);
143 save_errno= HA_ERR_CRASHED;
144 }
145 DBUG_RETURN(my_errno= save_errno);
146 } /* maria_delete */
147
148
149 /*
150 Remove a key from the btree index
151
152 TODO:
153 Change ma_ck_real_delete() to use another buffer for changed keys instead
154 of key->data. This would allows us to remove the copying of the key here.
155 */
156
_ma_ck_delete(MARIA_HA * info,MARIA_KEY * key)157 my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
158 {
159 MARIA_SHARE *share= info->s;
160 int res;
161 LSN lsn= LSN_IMPOSSIBLE;
162 my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
163 uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data;
164 MARIA_KEY org_key;
165 DBUG_ENTER("_ma_ck_delete");
166
167 LINT_INIT_STRUCT(org_key);
168
169 save_key_data= key->data;
170 if (share->now_transactional)
171 {
172 /* Save original value as the key may change */
173 memcpy(key_buff, key->data, key->data_length + key->ref_length);
174 org_key= *key;
175 key->data= key_buff;
176 }
177
178 if ((res= _ma_ck_real_delete(info, key, &new_root)))
179 {
180 /* We have to mark the table crashed before unpin_all_pages() */
181 maria_mark_crashed(info);
182 }
183
184 key->data= save_key_data;
185 if (!res && share->now_transactional)
186 res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
187 else
188 {
189 share->state.key_root[key->keyinfo->key_nr]= new_root;
190 _ma_fast_unlock_key_del(info);
191 }
192 _ma_unpin_all_pages_and_finalize_row(info, lsn);
193 DBUG_RETURN(res != 0);
194 } /* _ma_ck_delete */
195
196
_ma_ck_real_delete(register MARIA_HA * info,MARIA_KEY * key,my_off_t * root)197 my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
198 my_off_t *root)
199 {
200 int error;
201 my_bool result= 0;
202 my_off_t old_root;
203 uchar *root_buff;
204 MARIA_KEYDEF *keyinfo= key->keyinfo;
205 MARIA_PAGE page;
206 DBUG_ENTER("_ma_ck_real_delete");
207
208 if ((old_root=*root) == HA_OFFSET_ERROR)
209 {
210 _ma_set_fatal_error(info->s, HA_ERR_CRASHED);
211 DBUG_RETURN(1);
212 }
213 if (!(root_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
214 MARIA_MAX_KEY_BUFF*2)))
215 {
216 DBUG_PRINT("error",("Couldn't allocate memory"));
217 my_errno=ENOMEM;
218 DBUG_RETURN(1);
219 }
220 DBUG_PRINT("info",("root_page: %lu",
221 (ulong) (old_root / keyinfo->block_length)));
222 if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
223 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
224 {
225 result= 1;
226 goto err;
227 }
228 if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
229 SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
230 SEARCH_SAME),
231 &page)))
232 {
233 if (error < 0)
234 result= 1;
235 else if (error == 2)
236 {
237 DBUG_PRINT("test",("Enlarging of root when deleting"));
238 if (_ma_enlarge_root(info, key, root))
239 result= 1;
240 }
241 else /* error == 1 */
242 {
243 MARIA_SHARE *share= info->s;
244
245 page_mark_changed(info, &page);
246
247 if (page.size <= page.node + share->keypage_header + 1)
248 {
249 DBUG_ASSERT(page.size == page.node + share->keypage_header);
250 if (page.node)
251 *root= _ma_kpos(page.node, root_buff +share->keypage_header +
252 page.node);
253 else
254 *root=HA_OFFSET_ERROR;
255 if (_ma_dispose(info, old_root, 0))
256 result= 1;
257 }
258 else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
259 DFLT_INIT_HITS))
260 result= 1;
261 }
262 }
263 err:
264 my_afree(root_buff);
265 DBUG_PRINT("exit",("Return: %d",result));
266 DBUG_RETURN(result);
267 } /* _ma_ck_real_delete */
268
269
270 /**
271 @brief Remove key below key root
272
273 @param key Key to delete. Will contain new key if block was enlarged
274
275 @return
276 @retval 0 ok (anc_page is not changed)
277 @retval 1 If data on page is too small; In this case anc_buff is not saved
278 @retval 2 If data on page is too big
279 @retval -1 On errors
280 */
281
d_search(MARIA_HA * info,MARIA_KEY * key,uint32 comp_flag,MARIA_PAGE * anc_page)282 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
283 MARIA_PAGE *anc_page)
284 {
285 int flag,ret_value,save_flag;
286 uint nod_flag, page_flag;
287 my_bool last_key;
288 uchar *leaf_buff,*keypos;
289 uchar lastkey[MARIA_MAX_KEY_BUFF];
290 MARIA_KEY_PARAM s_temp;
291 MARIA_SHARE *share= info->s;
292 MARIA_KEYDEF *keyinfo= key->keyinfo;
293 MARIA_PAGE leaf_page;
294 DBUG_ENTER("d_search");
295 DBUG_DUMP("page", anc_page->buff, anc_page->size);
296
297 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
298 &last_key);
299 if (flag == MARIA_FOUND_WRONG_KEY)
300 {
301 DBUG_PRINT("error",("Found wrong key"));
302 DBUG_RETURN(-1);
303 }
304 page_flag= anc_page->flag;
305 nod_flag= anc_page->node;
306
307 if (!flag && (keyinfo->flag & HA_FULLTEXT))
308 {
309 uint off;
310 int subkeys;
311
312 get_key_full_length_rdonly(off, lastkey);
313 subkeys=ft_sintXkorr(lastkey+off);
314 DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
315 comp_flag=SEARCH_SAME;
316 if (subkeys >= 0)
317 {
318 /* normal word, one-level tree structure */
319 if (info->ft1_to_ft2)
320 {
321 /* we're in ft1->ft2 conversion mode. Saving key data */
322 insert_dynamic(info->ft1_to_ft2, (lastkey+off));
323 }
324 else
325 {
326 /* we need exact match only if not in ft1->ft2 conversion mode */
327 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
328 lastkey, &last_key);
329 }
330 /* fall through to normal delete */
331 }
332 else
333 {
334 /* popular word. two-level tree. going down */
335 uint tmp_key_length;
336 my_off_t root;
337 uchar *kpos=keypos;
338 MARIA_KEY tmp_key;
339
340 tmp_key.data= lastkey;
341 tmp_key.keyinfo= keyinfo;
342
343 if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
344 &kpos)))
345 {
346 _ma_set_fatal_error(share, HA_ERR_CRASHED);
347 DBUG_RETURN(-1);
348 }
349 root= _ma_row_pos_from_key(&tmp_key);
350 if (subkeys == -1)
351 {
352 /* the last entry in sub-tree */
353 if (_ma_dispose(info, root, 1))
354 DBUG_RETURN(-1);
355 /* fall through to normal delete */
356 }
357 else
358 {
359 MARIA_KEY word_key;
360 keyinfo=&share->ft2_keyinfo;
361 /* we'll modify key entry 'in vivo' */
362 kpos-=keyinfo->keylength+nod_flag;
363 get_key_full_length_rdonly(off, key->data);
364
365 word_key.data= key->data + off;
366 word_key.keyinfo= &share->ft2_keyinfo;
367 word_key.data_length= HA_FT_WLEN;
368 word_key.ref_length= 0;
369 word_key.flag= 0;
370 ret_value= _ma_ck_real_delete(info, &word_key, &root);
371 _ma_dpointer(share, kpos+HA_FT_WLEN, root);
372 subkeys++;
373 ft_intXstore(kpos, subkeys);
374 if (!ret_value)
375 {
376 page_mark_changed(info, anc_page);
377 ret_value= _ma_write_keypage(anc_page,
378 PAGECACHE_LOCK_LEFT_WRITELOCKED,
379 DFLT_INIT_HITS);
380 }
381 DBUG_PRINT("exit",("Return: %d",ret_value));
382 DBUG_RETURN(ret_value);
383 }
384 }
385 }
386 leaf_buff=0;
387 if (nod_flag)
388 {
389 /* Read left child page */
390 leaf_page.pos= _ma_kpos(nod_flag,keypos);
391 if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
392 MARIA_MAX_KEY_BUFF*2)))
393 {
394 DBUG_PRINT("error", ("Couldn't allocate memory"));
395 my_errno=ENOMEM;
396 DBUG_RETURN(-1);
397 }
398 if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
399 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
400 0))
401 goto err;
402 }
403
404 if (flag != 0)
405 {
406 if (!nod_flag)
407 {
408 /* This should newer happend */
409 DBUG_PRINT("error",("Didn't find key"));
410 _ma_set_fatal_error(share, HA_ERR_CRASHED);
411 goto err;
412 }
413 save_flag=0;
414 ret_value= d_search(info, key, comp_flag, &leaf_page);
415 }
416 else
417 { /* Found key */
418 uint tmp;
419 uint anc_buff_length= anc_page->size;
420 uint anc_page_flag= anc_page->flag;
421 my_off_t next_block;
422
423 if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
424 anc_page->buff + anc_buff_length,
425 &next_block, &s_temp)))
426 goto err;
427
428 page_mark_changed(info, anc_page);
429 anc_buff_length-= tmp;
430 anc_page->size= anc_buff_length;
431 page_store_size(share, anc_page);
432
433 /*
434 Log initial changes on pages
435 If there is an underflow, there will be more changes logged to the
436 page
437 */
438 if (share->now_transactional &&
439 _ma_log_delete(anc_page, s_temp.key_pos,
440 s_temp.changed_length, s_temp.move_length,
441 0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
442 DBUG_RETURN(-1);
443
444 if (!nod_flag)
445 { /* On leaf page */
446 if (anc_buff_length <= (info->quick_mode ?
447 MARIA_MIN_KEYBLOCK_LENGTH :
448 (uint) keyinfo->underflow_block_length))
449 {
450 /* Page will be written by caller if we return 1 */
451 DBUG_RETURN(1);
452 }
453 if (_ma_write_keypage(anc_page,
454 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
455 DBUG_RETURN(-1);
456 DBUG_RETURN(0);
457 }
458 save_flag=1; /* Mark that anc_buff is changed */
459 ret_value= del(info, key, anc_page, &leaf_page,
460 keypos, next_block, lastkey);
461 }
462 if (ret_value >0)
463 {
464 save_flag= 2;
465 if (ret_value == 1)
466 ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
467 else
468 {
469 /* This can only happen with variable length keys */
470 MARIA_KEY last_key;
471 DBUG_PRINT("test",("Enlarging of key when deleting"));
472
473 last_key.data= lastkey;
474 last_key.keyinfo= keyinfo;
475 if (!_ma_get_last_key(&last_key, anc_page, keypos))
476 goto err;
477 ret_value= _ma_insert(info, key, anc_page, keypos,
478 last_key.data,
479 (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
480
481 if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
482 DFLT_INIT_HITS))
483 ret_value= -1;
484 }
485 }
486 if (ret_value == 0 && anc_page->size > share->max_index_block_size)
487 {
488 /*
489 parent buffer got too big ; We have to split the page.
490 The | 2 is there to force write of anc page below
491 */
492 save_flag= 3;
493 ret_value= _ma_split_page(info, key, anc_page,
494 share->max_index_block_size,
495 (uchar*) 0, 0, 0, lastkey, 0) | 2;
496 DBUG_ASSERT(anc_page->org_size == anc_page->size);
497 }
498 if (save_flag && ret_value != 1)
499 {
500 page_mark_changed(info, anc_page);
501 if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
502 DFLT_INIT_HITS))
503 ret_value= -1;
504 }
505 else
506 {
507 DBUG_DUMP("page", anc_page->buff, anc_page->size);
508 }
509 my_afree(leaf_buff);
510 DBUG_PRINT("exit",("Return: %d",ret_value));
511 DBUG_RETURN(ret_value);
512
513 err:
514 my_afree(leaf_buff);
515 DBUG_PRINT("exit",("Error: %d",my_errno));
516 DBUG_RETURN (-1);
517 } /* d_search */
518
519
520 /**
521 @brief Remove a key that has a page-reference
522
523 @param info Maria handler
524 @param key Buffer for key to be inserted at upper level
525 @param anc_page Page address for page where deleted key was
526 @param anc_buff Page buffer (nod) where deleted key was
527 @param leaf_page Page address for nod before the deleted key
528 @param leaf_buff Buffer for leaf_page
529 @param leaf_buff_link Pinned page link for leaf_buff
530 @param keypos Pos to where deleted key was on anc_buff
531 @param next_block Page adress for nod after deleted key
532 @param ret_key_buff Key before keypos in anc_buff
533
534 @notes
535 leaf_page must be written to disk if retval > 0
536 anc_page is not updated on disk. Caller should do this
537
538 @return
539 @retval < 0 Error
540 @retval 0 OK. leaf_buff is written to disk
541
542 @retval 1 key contains key to upper level (from balance page)
543 leaf_buff has underflow
544 @retval 2 key contains key to upper level (from split space)
545 */
546
del(MARIA_HA * info,MARIA_KEY * key,MARIA_PAGE * anc_page,MARIA_PAGE * leaf_page,uchar * keypos,my_off_t next_block,uchar * ret_key_buff)547 static int del(MARIA_HA *info, MARIA_KEY *key,
548 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
549 uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
550 {
551 int ret_value,length;
552 uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
553 uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
554 uchar *anc_buff;
555 MARIA_KEY_PARAM s_temp;
556 MARIA_KEY tmp_key;
557 MARIA_SHARE *share= info->s;
558 MARIA_KEYDEF *keyinfo= key->keyinfo;
559 MARIA_KEY ret_key;
560 MARIA_PAGE next_page;
561 DBUG_ENTER("del");
562 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
563 (ulong) (leaf_page->pos / share->block_size),
564 keypos));
565 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
566
567 page_flag= leaf_page->flag;
568 leaf_length= leaf_page->size;
569 nod_flag= leaf_page->node;
570
571 endpos= leaf_page->buff + leaf_length;
572 tmp_key.keyinfo= keyinfo;
573 tmp_key.data= keybuff;
574 next_buff= 0;
575
576 if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
577 DBUG_RETURN(-1);
578
579 if (nod_flag)
580 {
581 next_page.pos= _ma_kpos(nod_flag,endpos);
582 if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
583 MARIA_MAX_KEY_BUFF*2)))
584 DBUG_RETURN(-1);
585 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
586 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
587 ret_value= -1;
588 else
589 {
590 DBUG_DUMP("next_page", next_page.buff, next_page.size);
591 if ((ret_value= del(info, key, anc_page, &next_page,
592 keypos, next_block, ret_key_buff)) >0)
593 {
594 /* Get new length after key was deleted */
595 endpos= leaf_page->buff+ leaf_page->size;
596 if (ret_value == 1)
597 {
598 /* underflow writes "next_page" to disk */
599 ret_value= underflow(info, keyinfo, leaf_page, &next_page,
600 endpos);
601 if (ret_value < 0)
602 goto err;
603 if (leaf_page->size > share->max_index_block_size)
604 {
605 DBUG_ASSERT(ret_value == 0);
606 ret_value= (_ma_split_page(info, key, leaf_page,
607 share->max_index_block_size,
608 (uchar*) 0, 0, 0,
609 ret_key_buff, 0) | 2);
610 }
611 }
612 else
613 {
614 if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
615 DFLT_INIT_HITS))
616 goto err;
617 DBUG_PRINT("test",("Inserting of key when deleting"));
618 if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
619 goto err;
620 ret_value= _ma_insert(info, key, leaf_page, endpos,
621 tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
622 0);
623 }
624 }
625 page_mark_changed(info, leaf_page);
626 /*
627 If ret_value <> 0, then leaf_page underflowed and caller will have
628 to handle underflow and write leaf_page to disk.
629 We can't write it here, as if leaf_page is empty we get an assert
630 in _ma_write_keypage.
631 */
632 if (ret_value == 0 && _ma_write_keypage(leaf_page,
633 PAGECACHE_LOCK_LEFT_WRITELOCKED,
634 DFLT_INIT_HITS))
635 goto err;
636 }
637 my_afree(next_buff);
638 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
639 DBUG_RETURN(ret_value);
640 }
641
642 /*
643 Remove last key from leaf page
644 Note that leaf_page page may only have had one key (can normally only
645 happen in quick mode), in which ase it will now temporary have 0 keys
646 on it. This will be corrected by the caller as we will return 0.
647 */
648 new_leaf_length= (uint) (key_start - leaf_page->buff);
649 leaf_page->size= new_leaf_length;
650 page_store_size(share, leaf_page);
651
652 if (share->now_transactional &&
653 _ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
654 goto err;
655
656 page_mark_changed(info, leaf_page); /* Safety */
657 if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
658 (uint) keyinfo->underflow_block_length))
659 {
660 /* Underflow, leaf_page will be written by caller */
661 ret_value= 1;
662 }
663 else
664 {
665 ret_value= 0;
666 if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
667 DFLT_INIT_HITS))
668 goto err;
669 }
670
671 /* Place last key in ancestor page on deleted key position */
672 a_length= anc_page->size;
673 anc_buff= anc_page->buff;
674 endpos= anc_buff + a_length;
675
676 ret_key.keyinfo= keyinfo;
677 ret_key.data= ret_key_buff;
678
679 prev_key= 0;
680 if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
681 {
682 if (!_ma_get_last_key(&ret_key, anc_page, keypos))
683 goto err;
684 prev_key= ret_key.data;
685 }
686 length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
687 keypos == endpos ? (uchar*) 0 : keypos,
688 prev_key, prev_key,
689 &s_temp);
690 if (length > 0)
691 bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
692 else
693 bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
694 (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
695 key_start= keypos;
696 if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
697 SEARCH_PAGE_KEY_HAS_TRANSID))
698 _ma_mark_page_with_transid(share, anc_page);
699
700 /* Save pointer to next leaf on parent page */
701 if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
702 &keypos))
703 goto err;
704 _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
705 anc_page->size= a_length + length;
706 page_store_size(share, anc_page);
707
708 if (share->now_transactional &&
709 _ma_log_add(anc_page, a_length,
710 key_start, s_temp.changed_length, s_temp.move_length, 1,
711 KEY_OP_DEBUG_LOG_ADD_2))
712 goto err;
713
714 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
715 DBUG_RETURN(new_leaf_length <=
716 (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
717 (uint) keyinfo->underflow_block_length));
718 err:
719 if (next_buff)
720 my_afree(next_buff);
721
722 DBUG_RETURN(-1);
723 } /* del */
724
725
726 /**
727 @brief Balances adjacent pages if underflow occours
728
729 @fn underflow()
730 @param anc_buff Anchestor page data
731 @param leaf_page Leaf page (page that underflowed)
732 @param leaf_page_link Pointer to pin information about leaf page
733 @param keypos Position after current key in anc_buff
734
735 @note
736 This function writes redo entries for all changes
737 leaf_page is saved to disk
738 Caller must save anc_buff
739
740 For the algoritm to work, we have to ensure for packed keys that
741 key_length + (underflow_length + max_block_length + key_length) / 2
742 <= block_length.
743 From which follows that underflow_length <= block_length - key_length *3
744 For not packed keys we have:
745 (underflow_length + max_block_length + key_length) / 2 <= block_length
746 From which follows that underflow_length < block_length - key_length
747 This is ensured by setting of underflow_block_length.
748
749 @return
750 @retval 0 ok
751 @retval 1 ok, but anc_page did underflow
752 @retval -1 error
753 */
754
underflow(MARIA_HA * info,MARIA_KEYDEF * keyinfo,MARIA_PAGE * anc_page,MARIA_PAGE * leaf_page,uchar * keypos)755 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
756 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
757 uchar *keypos)
758 {
759 int t_length;
760 uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
761 uint next_buff_length, new_buff_length, key_reflength;
762 uint unchanged_leaf_length, new_leaf_length, new_anc_length;
763 uint anc_page_flag, page_flag;
764 uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF];
765 uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
766 uchar *anc_buff, *leaf_buff;
767 uchar *after_key, *anc_end_pos;
768 MARIA_KEY_PARAM key_deleted, key_inserted;
769 MARIA_SHARE *share= info->s;
770 my_bool first_key;
771 MARIA_KEY tmp_key, anc_key, leaf_key;
772 MARIA_PAGE next_page;
773 DBUG_ENTER("underflow");
774 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
775 (ulong) (leaf_page->pos / share->block_size),
776 keypos));
777 DBUG_DUMP("anc_buff", anc_page->buff, anc_page->size);
778 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
779
780 anc_page_flag= anc_page->flag;
781 anc_buff= anc_page->buff;
782 leaf_buff= leaf_page->buff;
783 info->keyread_buff_used=1;
784 next_keypos=keypos;
785 nod_flag= leaf_page->node;
786 p_length= nod_flag+share->keypage_header;
787 anc_length= anc_page->size;
788 leaf_length= leaf_page->size;
789 key_reflength= share->base.key_reflength;
790 if (share->keyinfo+info->lastinx == keyinfo)
791 info->page_changed=1;
792 first_key= keypos == anc_buff + share->keypage_header + key_reflength;
793
794 tmp_key.data= info->buff;
795 anc_key.data= anc_key_buff;
796 leaf_key.data= leaf_key_buff;
797 tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;
798
799 if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
800 first_key)
801 {
802 uint tmp_length;
803 uint next_page_flag;
804 /* Use page right of anc-page */
805 DBUG_PRINT("test",("use right page"));
806
807 /*
808 Calculate position after the current key. Note that keydata itself is
809 not used
810 */
811 if (keyinfo->flag & HA_BINARY_PACK_KEY)
812 {
813 if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
814 goto err;
815 }
816 else
817 {
818 /* Avoid length error check if packed key */
819 tmp_key.data[0]= tmp_key.data[1]= 0;
820 /* Got to end of found key */
821 if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
822 &next_keypos))
823 goto err;
824 }
825 next_page.pos= _ma_kpos(key_reflength, next_keypos);
826 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
827 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
828 goto err;
829 next_buff_length= next_page.size;
830 next_page_flag= next_page.flag;
831 DBUG_DUMP("next", next_page.buff, next_page.size);
832
833 /* find keys to make a big key-page */
834 bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
835 key_reflength);
836
837 if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
838 !_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
839 goto err;
840
841 /* merge pages and put parting key from anc_page between */
842 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
843 t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
844 prev_key, prev_key, &key_inserted);
845 tmp_length= next_buff_length - p_length;
846 endpos= next_page.buff + tmp_length + leaf_length + t_length;
847 /* next_page.buff will always be larger than before !*/
848 bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
849 memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
850 (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
851 buff_length= (uint) (endpos - next_page.buff);
852
853 /* Set page flag from combination of both key pages and parting key */
854 page_flag= next_page_flag | leaf_page->flag;
855 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
856 SEARCH_PAGE_KEY_HAS_TRANSID))
857 page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
858
859 next_page.size= buff_length;
860 next_page.flag= page_flag;
861 page_store_info(share, &next_page);
862
863 /* remove key from anc_page */
864 if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
865 anc_key_buff, anc_buff+anc_length,
866 (my_off_t *) 0, &key_deleted)))
867 goto err;
868
869 new_anc_length= anc_length - s_length;
870 anc_page->size= new_anc_length;
871 page_store_size(share, anc_page);
872
873 if (buff_length <= share->max_index_block_size)
874 {
875 /* All keys fitted into one page */
876 page_mark_changed(info, &next_page);
877 if (_ma_dispose(info, next_page.pos, 0))
878 goto err;
879
880 memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
881 leaf_page->size= next_page.size;
882 leaf_page->flag= next_page.flag;
883
884 if (share->now_transactional)
885 {
886 /*
887 Log changes to parent page. Note that this page may have been
888 temporarily bigger than block_size.
889 */
890 if (_ma_log_delete(anc_page, key_deleted.key_pos,
891 key_deleted.changed_length,
892 key_deleted.move_length,
893 anc_length - anc_page->org_size,
894 KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
895 goto err;
896 /*
897 Log changes to leaf page. Data for leaf page is in leaf_buff
898 which contains original leaf_buff, parting key and next_buff
899 */
900 if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
901 goto err;
902 }
903 }
904 else
905 {
906 /*
907 Balancing didn't free a page, so we have to split 'buff' into two
908 pages:
909 - Find key in middle of buffer
910 - Store everything before key in 'leaf_page'
911 - Pack key into anc_page at position of deleted key
912 Note that anc_page may overflow! (is handled by caller)
913 - Store remaining keys in next_page (buff)
914 */
915 MARIA_KEY_PARAM anc_key_inserted;
916
917 anc_end_pos= anc_buff + new_anc_length;
918
919 DBUG_PRINT("test",("anc_buff:%p anc_end_pos:%p",
920 anc_buff, anc_end_pos));
921
922 if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
923 goto err;
924 if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
925 goto err;
926 new_leaf_length= (uint) (half_pos - next_page.buff);
927 memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);
928
929 leaf_page->size= new_leaf_length;
930 leaf_page->flag= page_flag;
931 page_store_info(share, leaf_page);
932
933 /* Correct new keypointer to leaf_page */
934 half_pos=after_key;
935 _ma_kpointer(info,
936 leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
937 next_page.pos);
938
939 /* Save key in anc_page */
940 prev_key= (first_key ? (uchar*) 0 : anc_key.data);
941 t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
942 (keypos == anc_end_pos ? (uchar*) 0 :
943 keypos),
944 prev_key, prev_key, &anc_key_inserted);
945 if (t_length >= 0)
946 bmove_upp(anc_end_pos+t_length, anc_end_pos,
947 (uint) (anc_end_pos - keypos));
948 else
949 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
950 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
951 new_anc_length+= t_length;
952 anc_page->size= new_anc_length;
953 page_store_size(share, anc_page);
954
955 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
956 SEARCH_PAGE_KEY_HAS_TRANSID))
957 _ma_mark_page_with_transid(share, anc_page);
958
959 /* Store key first in new page */
960 if (nod_flag)
961 bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
962 (size_t) nod_flag);
963 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
964 goto err;
965 t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
966 (uchar*) 0, (uchar*) 0,
967 &key_inserted);
968 /* t_length will always be > 0 for a new page !*/
969 tmp_length= (uint) ((next_page.buff + buff_length) - half_pos);
970 bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
971 (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
972 new_buff_length= tmp_length + t_length + p_length;
973 next_page.size= new_buff_length;
974 page_store_size(share, &next_page);
975 /* keypage flag is already up to date */
976
977 if (share->now_transactional)
978 {
979 /*
980 Log changes to parent page
981 This has one key deleted from it and one key inserted to it at
982 keypos
983
984 ma_log_add ensures that we don't log changes that is outside of
985 key block size, as the REDO code can't handle that
986 */
987 if (_ma_log_add(anc_page, anc_length, keypos,
988 anc_key_inserted.move_length +
989 MY_MAX(anc_key_inserted.changed_length -
990 anc_key_inserted.move_length,
991 key_deleted.changed_length),
992 anc_key_inserted.move_length -
993 key_deleted.move_length, 1,
994 KEY_OP_DEBUG_LOG_ADD_3))
995 goto err;
996
997 /*
998 Log changes to leaf page.
999 This contains original data with new data added at end
1000 */
1001 DBUG_ASSERT(leaf_length <= new_leaf_length);
1002 if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
1003 goto err;
1004 /*
1005 Log changes to next page
1006
1007 This contains original data with some prefix data deleted and
1008 some compressed data at start possible extended
1009
1010 Data in buff was originally:
1011 org_leaf_buff [leaf_length]
1012 separator_key [buff_key_inserted.move_length]
1013 next_key_changes [buff_key_inserted.changed_length -move_length]
1014 next_page_data [next_buff_length - p_length -
1015 (buff_key_inserted.changed_length -move_length)]
1016
1017 After changes it's now:
1018 unpacked_key [key_inserted.changed_length]
1019 next_suffix [next_buff_length - key_inserted.changed_length]
1020
1021 */
1022 DBUG_ASSERT(new_buff_length <= next_buff_length);
1023 if (_ma_log_prefix(&next_page, key_inserted.changed_length,
1024 (int) (new_buff_length - next_buff_length),
1025 KEY_OP_DEBUG_LOG_PREFIX_1))
1026 goto err;
1027 }
1028 page_mark_changed(info, &next_page);
1029 if (_ma_write_keypage(&next_page,
1030 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1031 goto err;
1032 }
1033
1034 page_mark_changed(info, leaf_page);
1035 if (_ma_write_keypage(leaf_page,
1036 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1037 goto err;
1038 DBUG_RETURN(new_anc_length <=
1039 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1040 (uint) keyinfo->underflow_block_length)));
1041 }
1042
1043 DBUG_PRINT("test",("use left page"));
1044
1045 keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
1046 if (!keypos)
1047 goto err;
1048 next_page.pos= _ma_kpos(key_reflength,keypos);
1049 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1050 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
1051 goto err;
1052 buff_length= next_page.size;
1053 endpos= next_page.buff + buff_length;
1054 DBUG_DUMP("prev", next_page.buff, next_page.size);
1055
1056 /* find keys to make a big key-page */
1057 bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1058 key_reflength);
1059 next_keypos=keypos;
1060 if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
1061 &next_keypos))
1062 goto err;
1063 if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
1064 goto err;
1065
1066 /* merge pages and put parting key from anc_page between */
1067 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
1068 t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1069 (leaf_length == p_length ?
1070 (uchar*) 0 : leaf_buff+p_length),
1071 prev_key, prev_key,
1072 &key_inserted);
1073 if (t_length >= 0)
1074 bmove(endpos+t_length, leaf_buff+p_length,
1075 (size_t) (leaf_length-p_length));
1076 else /* We gained space */
1077 bmove(endpos,leaf_buff+((int) p_length-t_length),
1078 (size_t) (leaf_length-p_length+t_length));
1079 (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1080
1081 /* Remember for logging how many bytes of leaf_buff that are not changed */
1082 DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
1083 unchanged_leaf_length= (leaf_length - p_length -
1084 (key_inserted.changed_length -
1085 key_inserted.move_length));
1086
1087 new_buff_length= buff_length + leaf_length - p_length + t_length;
1088
1089 #ifdef EXTRA_DEBUG
1090 /* Ensure that unchanged_leaf_length is correct */
1091 DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
1092 leaf_buff + leaf_length - unchanged_leaf_length,
1093 unchanged_leaf_length) == 0);
1094 #endif
1095
1096 page_flag= next_page.flag | leaf_page->flag;
1097 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1098 SEARCH_PAGE_KEY_HAS_TRANSID))
1099 page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
1100
1101 next_page.size= new_buff_length;
1102 next_page.flag= page_flag;
1103 page_store_info(share, &next_page);
1104
1105 /* remove key from anc_page */
1106 if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
1107 anc_key_buff,
1108 anc_buff+anc_length, (my_off_t *) 0,
1109 &key_deleted)))
1110 goto err;
1111
1112 new_anc_length= anc_length - s_length;
1113 anc_page->size= new_anc_length;
1114 page_store_size(share, anc_page);
1115
1116 if (new_buff_length <= share->max_index_block_size)
1117 {
1118 /* All keys fitted into one page */
1119 page_mark_changed(info, leaf_page);
1120 if (_ma_dispose(info, leaf_page->pos, 0))
1121 goto err;
1122
1123 if (share->now_transactional)
1124 {
1125 /*
1126 Log changes to parent page. Note that this page may have been
1127 temporarily bigger than block_size.
1128 */
1129 if (_ma_log_delete(anc_page, key_deleted.key_pos,
1130 key_deleted.changed_length, key_deleted.move_length,
1131 anc_length - anc_page->org_size,
1132 KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
1133 goto err;
1134 /*
1135 Log changes to next page. Data for leaf page is in buff
1136 that contains original leaf_buff, parting key and next_buff
1137 */
1138 if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1139 goto err;
1140 }
1141 }
1142 else
1143 {
1144 /*
1145 Balancing didn't free a page, so we have to split 'next_page' into two
1146 pages
1147 - Find key in middle of buffer (buff)
1148 - Pack key at half_buff into anc_page at position of deleted key
1149 Note that anc_page may overflow! (is handled by caller)
1150 - Move everything after middlekey to 'leaf_buff'
1151 - Shorten buff at 'endpos'
1152 */
1153 MARIA_KEY_PARAM anc_key_inserted;
1154 size_t tmp_length;
1155
1156 if (keypos == anc_buff + share->keypage_header + key_reflength)
1157 anc_pos= 0; /* First key */
1158 else
1159 {
1160 if (!_ma_get_last_key(&anc_key, anc_page, keypos))
1161 goto err;
1162 anc_pos= anc_key.data;
1163 }
1164 if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
1165 goto err;
1166
1167 /* Correct new keypointer to leaf_page */
1168 _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
1169 leaf_key.ref_length, leaf_page->pos);
1170
1171 /* Save parting key found by _ma_find_half_pos() in anc_page */
1172 DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1173 DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1174 anc_end_pos= anc_buff + new_anc_length;
1175 t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1176 keypos == anc_end_pos ? (uchar*) 0
1177 : keypos,
1178 anc_pos, anc_pos,
1179 &anc_key_inserted);
1180 if (t_length >= 0)
1181 bmove_upp(anc_end_pos+t_length, anc_end_pos,
1182 (uint) (anc_end_pos-keypos));
1183 else
1184 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
1185 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1186 new_anc_length+= t_length;
1187 anc_page->size= new_anc_length;
1188 page_store_size(share, anc_page);
1189
1190 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1191 SEARCH_PAGE_KEY_HAS_TRANSID))
1192 _ma_mark_page_with_transid(share, anc_page);
1193
1194 /* Store first key on new page */
1195 if (nod_flag)
1196 bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1197 (size_t) nod_flag);
1198 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1199 goto err;
1200 DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
1201 t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1202 (uchar*) 0, (uchar*) 0, &key_inserted);
1203 /* t_length will always be > 0 for a new page !*/
1204 tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
1205 DBUG_PRINT("info",("t_length: %d length: %d",t_length, (int) tmp_length));
1206 bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1207 (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1208 new_leaf_length= (uint)(tmp_length + t_length + p_length);
1209 DBUG_ASSERT(new_leaf_length <= share->max_index_block_size);
1210
1211 leaf_page->size= new_leaf_length;
1212 leaf_page->flag= page_flag;
1213 page_store_info(share, leaf_page);
1214
1215 new_buff_length= (uint) (endpos - next_page.buff);
1216 next_page.size= new_buff_length;
1217 page_store_size(share, &next_page);
1218
1219 if (share->now_transactional)
1220 {
1221 /*
1222 Log changes to parent page
1223 This has one key deleted from it and one key inserted to it at
1224 keypos
1225
1226 ma_log_add() ensures that we don't log changes that is outside of
1227 key block size, as the REDO code can't handle that
1228 */
1229 if (_ma_log_add(anc_page, anc_length, keypos,
1230 anc_key_inserted.move_length +
1231 MY_MAX(anc_key_inserted.changed_length -
1232 anc_key_inserted.move_length,
1233 key_deleted.changed_length),
1234 anc_key_inserted.move_length -
1235 key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4))
1236 goto err;
1237
1238 /*
1239 Log changes to leaf page.
1240 This contains original data with new data added first
1241 */
1242 DBUG_ASSERT(leaf_length <= new_leaf_length);
1243 DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
1244 if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
1245 (int) (new_leaf_length - leaf_length),
1246 KEY_OP_DEBUG_LOG_PREFIX_2))
1247 goto err;
1248 /*
1249 Log changes to next page
1250 This contains original data with some suffix data deleted
1251 */
1252 DBUG_ASSERT(new_buff_length <= buff_length);
1253 if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1254 goto err;
1255 }
1256
1257 page_mark_changed(info, leaf_page);
1258 if (_ma_write_keypage(leaf_page,
1259 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1260 goto err;
1261 }
1262 page_mark_changed(info, &next_page);
1263 if (_ma_write_keypage(&next_page,
1264 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1265 goto err;
1266
1267 DBUG_RETURN(new_anc_length <=
1268 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1269 (uint) keyinfo->underflow_block_length)));
1270
1271 err:
1272 DBUG_RETURN(-1);
1273 } /* underflow */
1274
1275
1276 /**
1277 @brief Remove a key from page
1278
1279 @fn remove_key()
1280 keyinfo Key handle
1281 nod_flag Length of node ptr
1282 keypos Where on page key starts
1283 lastkey Buffer for storing keys to be removed
1284 page_end Pointer to end of page
1285 next_block If <> 0 and node-page, this is set to address of
1286 next page
1287 s_temp Information about what changes was done one the page:
1288 s_temp.key_pos Start of key
1289 s_temp.move_length Number of bytes removed at keypos
1290 s_temp.changed_length Number of bytes changed at keypos
1291
1292 @todo
1293 The current code doesn't handle the case that the next key may be
1294 packed better against the previous key if there is a case difference
1295
1296 @return
1297 @retval 0 error
1298 @retval # How many chars was removed
1299 */
1300
remove_key(MARIA_KEYDEF * keyinfo,uint page_flag,uint nod_flag,uchar * keypos,uchar * lastkey,uchar * page_end,my_off_t * next_block,MARIA_KEY_PARAM * s_temp)1301 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1302 uchar *keypos, uchar *lastkey,
1303 uchar *page_end, my_off_t *next_block,
1304 MARIA_KEY_PARAM *s_temp)
1305 {
1306 int s_length;
1307 uchar *start;
1308 DBUG_ENTER("remove_key");
1309 DBUG_PRINT("enter", ("keypos:%p page_end: %p",
1310 keypos, page_end));
1311
1312 start= s_temp->key_pos= keypos;
1313 s_temp->changed_length= 0;
1314 if (!(keyinfo->flag &
1315 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1316 HA_BINARY_PACK_KEY)) &&
1317 !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1318 {
1319 /* Static length key */
1320 s_length=(int) (keyinfo->keylength+nod_flag);
1321 if (next_block && nod_flag)
1322 *next_block= _ma_kpos(nod_flag,keypos+s_length);
1323 }
1324 else
1325 {
1326 /* Let keypos point at next key */
1327 MARIA_KEY key;
1328
1329 /* Calculate length of key */
1330 key.keyinfo= keyinfo;
1331 key.data= lastkey;
1332 if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1333 DBUG_RETURN(0); /* Error */
1334
1335 if (next_block && nod_flag)
1336 *next_block= _ma_kpos(nod_flag,keypos);
1337 s_length=(int) (keypos-start);
1338 if (keypos != page_end)
1339 {
1340 if (keyinfo->flag & HA_BINARY_PACK_KEY)
1341 {
1342 uchar *old_key= start;
1343 uint next_length,prev_length,prev_pack_length;
1344
1345 /* keypos points here on start of next key */
1346 get_key_length(next_length,keypos);
1347 get_key_pack_length(prev_length,prev_pack_length,old_key);
1348 if (next_length > prev_length)
1349 {
1350 uint diff= (next_length-prev_length);
1351 /* We have to copy data from the current key to the next key */
1352 keypos-= diff + prev_pack_length;
1353 store_key_length(keypos, prev_length);
1354 bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1355 s_length=(int) (keypos-start);
1356 s_temp->changed_length= diff + prev_pack_length;
1357 }
1358 }
1359 else
1360 {
1361 /* Check if a variable length first key part */
1362 if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
1363 {
1364 /* Next key is packed against the current one */
1365 uint next_length,prev_length,prev_pack_length,lastkey_length,
1366 rest_length;
1367 if (keyinfo->seg[0].length >= 127)
1368 {
1369 if (!(prev_length=mi_uint2korr(start) & 32767))
1370 goto end;
1371 next_length=mi_uint2korr(keypos) & 32767;
1372 keypos+=2;
1373 prev_pack_length=2;
1374 }
1375 else
1376 {
1377 if (!(prev_length= *start & 127))
1378 goto end; /* Same key as previous*/
1379 next_length= *keypos & 127;
1380 keypos++;
1381 prev_pack_length=1;
1382 }
1383 if (!(*start & 128))
1384 prev_length=0; /* prev key not packed */
1385 if (keyinfo->seg[0].flag & HA_NULL_PART)
1386 lastkey++; /* Skip null marker */
1387 get_key_length(lastkey_length,lastkey);
1388 if (!next_length) /* Same key after */
1389 {
1390 next_length=lastkey_length;
1391 rest_length=0;
1392 }
1393 else
1394 get_key_length(rest_length,keypos);
1395
1396 if (next_length >= prev_length)
1397 {
1398 /* Next key is based on deleted key */
1399 uint pack_length;
1400 uint diff= (next_length-prev_length);
1401
1402 /* keypos points to data of next key (after key length) */
1403 bmove(keypos - diff, lastkey + prev_length, diff);
1404 rest_length+= diff;
1405 pack_length= prev_length ? get_pack_length(rest_length): 0;
1406 keypos-= diff + pack_length + prev_pack_length;
1407 s_length=(int) (keypos-start);
1408 if (prev_length) /* Pack against prev key */
1409 {
1410 *keypos++= start[0];
1411 if (prev_pack_length == 2)
1412 *keypos++= start[1];
1413 store_key_length(keypos,rest_length);
1414 }
1415 else
1416 {
1417 /* Next key is not packed anymore */
1418 if (keyinfo->seg[0].flag & HA_NULL_PART)
1419 {
1420 rest_length++; /* Mark not null */
1421 }
1422 if (prev_pack_length == 2)
1423 {
1424 mi_int2store(keypos,rest_length);
1425 }
1426 else
1427 *keypos= rest_length;
1428 }
1429 s_temp->changed_length= diff + pack_length + prev_pack_length;
1430 }
1431 }
1432 }
1433 }
1434 }
1435 end:
1436 bmove(start, start+s_length, (uint) (page_end-start-s_length));
1437 s_temp->move_length= s_length;
1438 DBUG_RETURN((uint) s_length);
1439 } /* remove_key */
1440
1441
1442 /****************************************************************************
1443 Logging of redos
1444 ****************************************************************************/
1445
1446 /**
1447 @brief
1448 log entry where some parts are deleted and some things are changed
1449 and some data could be added last.
1450
1451 @fn _ma_log_delete()
1452 @param info Maria handler
1453 @param page Pageaddress for changed page
1454 @param buff Page buffer
1455 @param key_pos Start of change area
1456 @param changed_length How many bytes where changed at key_pos
1457 @param move_length How many bytes where deleted at key_pos
1458 @param append_length Length of data added last
1459 This is taken from end of ma_page->buff
1460
1461 This is mainly used when a key is deleted. The append happens
1462 when we delete a key from a page with data > block_size kept in
1463 memory and we have to add back the data that was stored > block_size
1464 */
1465
_ma_log_delete(MARIA_PAGE * ma_page,const uchar * key_pos,uint changed_length,uint move_length,uint append_length,enum en_key_debug debug_marker)1466 my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
1467 uint changed_length, uint move_length,
1468 uint append_length __attribute__((unused)),
1469 enum en_key_debug debug_marker __attribute__((unused)))
1470 {
1471 LSN lsn;
1472 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7];
1473 uchar *log_pos;
1474 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
1475 uint translog_parts, current_size, extra_length;
1476 uint offset= (uint) (key_pos - ma_page->buff);
1477 MARIA_HA *info= ma_page->info;
1478 MARIA_SHARE *share= info->s;
1479 my_off_t page= ma_page->pos / share->block_size;
1480 DBUG_ENTER("_ma_log_delete");
1481 DBUG_PRINT("enter", ("page: %lu offset: %u changed_length: %u move_length: %u append_length: %u page_size: %u",
1482 (ulong) page, offset, changed_length, move_length,
1483 append_length, ma_page->size));
1484 DBUG_ASSERT(share->now_transactional && move_length);
1485 DBUG_ASSERT(offset + changed_length <= ma_page->size);
1486 DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
1487 DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
1488
1489 /* Store address of new root page */
1490 page_store(log_data + FILEID_STORE_SIZE, page);
1491 log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1492 current_size= ma_page->org_size;
1493
1494 #ifdef EXTRA_DEBUG_KEY_CHANGES
1495 *log_pos++= KEY_OP_DEBUG;
1496 *log_pos++= debug_marker;
1497
1498 *log_pos++= KEY_OP_DEBUG_2;
1499 int2store(log_pos, ma_page->org_size);
1500 int2store(log_pos+2, ma_page->size);
1501 log_pos+=4;
1502 #endif
1503
1504 /* Store keypage_flag */
1505 *log_pos++= KEY_OP_SET_PAGEFLAG;
1506 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
1507
1508 log_pos[0]= KEY_OP_OFFSET;
1509 int2store(log_pos+1, offset);
1510 log_pos+= 3;
1511 translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
1512 extra_length= 0;
1513
1514 if (changed_length)
1515 {
1516 if (offset + changed_length >= share->max_index_block_size)
1517 {
1518 changed_length= share->max_index_block_size - offset;
1519 move_length= 0; /* Nothing to move */
1520 current_size= share->max_index_block_size;
1521 }
1522
1523 log_pos[0]= KEY_OP_CHANGE;
1524 int2store(log_pos+1, changed_length);
1525 log_pos+= 3;
1526 log_array[translog_parts].str= ma_page->buff + offset;
1527 log_array[translog_parts].length= changed_length;
1528 translog_parts++;
1529
1530 /* We only have to move things after offset+changed_length */
1531 offset+= changed_length;
1532 }
1533
1534 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1535 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1536
1537 if (move_length)
1538 {
1539 uint log_length;
1540 if (offset + move_length < share->max_index_block_size)
1541 {
1542 /*
1543 Move down things that is on page.
1544 page_offset in apply_redo_inxed() will be at original offset
1545 + changed_length.
1546 */
1547 log_pos[0]= KEY_OP_SHIFT;
1548 int2store(log_pos+1, - (int) move_length);
1549 log_length= 3;
1550 current_size-= move_length;
1551 }
1552 else
1553 {
1554 /* Delete to end of page */
1555 uint tmp= current_size - offset;
1556 current_size= offset;
1557 log_pos[0]= KEY_OP_DEL_SUFFIX;
1558 int2store(log_pos+1, tmp);
1559 log_length= 3;
1560 }
1561 log_array[translog_parts].str= log_pos;
1562 log_array[translog_parts].length= log_length;
1563 translog_parts++;
1564 log_pos+= log_length;
1565 extra_length+= log_length;
1566 }
1567
1568 if (current_size != ma_page->size &&
1569 current_size != share->max_index_block_size)
1570 {
1571 /* Append data that didn't fit on the page before */
1572 uint length= (MY_MIN(ma_page->size, share->max_index_block_size) -
1573 current_size);
1574 uchar *data= ma_page->buff + current_size;
1575
1576 DBUG_ASSERT(length <= append_length);
1577
1578 log_pos[0]= KEY_OP_ADD_SUFFIX;
1579 int2store(log_pos+1, length);
1580 log_array[translog_parts].str= log_pos;
1581 log_array[translog_parts].length= 3;
1582 log_array[translog_parts + 1].str= data;
1583 log_array[translog_parts + 1].length= length;
1584 log_pos+= 3;
1585 translog_parts+= 2;
1586 current_size+= length;
1587 extra_length+= 3 + length;
1588 }
1589
1590 _ma_log_key_changes(ma_page,
1591 log_array + translog_parts,
1592 log_pos, &extra_length, &translog_parts);
1593 /* Remember new page length for future log entires for same page */
1594 ma_page->org_size= current_size;
1595
1596 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1597 info->trn, info,
1598 (translog_size_t)
1599 log_array[TRANSLOG_INTERNAL_PARTS].length +
1600 changed_length + extra_length, translog_parts,
1601 log_array, log_data, NULL))
1602 DBUG_RETURN(1);
1603
1604 DBUG_RETURN(0);
1605 }
1606
1607
1608 /****************************************************************************
1609 Logging of undos
1610 ****************************************************************************/
1611
_ma_write_undo_key_delete(MARIA_HA * info,const MARIA_KEY * key,my_off_t new_root,LSN * res_lsn)1612 my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
1613 my_off_t new_root, LSN *res_lsn)
1614 {
1615 MARIA_SHARE *share= info->s;
1616 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1617 KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
1618 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1619 struct st_msg_to_write_hook_for_undo_key msg;
1620 enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1621 uint keynr= key->keyinfo->key_nr;
1622
1623 lsn_store(log_data, info->trn->undo_lsn);
1624 key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
1625 log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
1626
1627 /**
1628 @todo BUG if we had concurrent insert/deletes, reading state's key_root
1629 like this would be unsafe.
1630 */
1631 if (new_root != share->state.key_root[keynr])
1632 {
1633 my_off_t page;
1634 page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1635 new_root / share->block_size);
1636 page_store(log_pos, page);
1637 log_pos+= PAGE_STORE_SIZE;
1638 log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
1639 }
1640
1641 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1642 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1643 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
1644 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
1645 key->ref_length);
1646
1647 msg.root= &share->state.key_root[keynr];
1648 msg.value= new_root;
1649 /*
1650 set autoincrement to 1 if this is an auto_increment key
1651 This is only used if we are now in a rollback of a duplicate key
1652 */
1653 msg.auto_increment= share->base.auto_key == keynr + 1;
1654
1655 return translog_write_record(res_lsn, log_type,
1656 info->trn, info,
1657 (translog_size_t)
1658 (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1659 log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
1660 TRANSLOG_INTERNAL_PARTS + 2, log_array,
1661 log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1662 }
1663