1 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB 2 Copyright (C) 2009-2010 Monty Program Ab backend_termion_should_only_write_diffs() -> Result<(), Box<dyn std::error::Error>>3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation; version 2 of the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ 16 17 #include "ma_fulltext.h" 18 #include "ma_rt_index.h" 19 #include "trnman.h" 20 #include "ma_key_recover.h" 21 22 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag, 23 MARIA_PAGE *page); 24 static int del(MARIA_HA *info, MARIA_KEY *key, 25 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page, 26 uchar *keypos, my_off_t next_block, uchar *ret_key_buff); 27 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo, 28 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page, 29 uchar *keypos); 30 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag, 31 uchar *keypos, uchar *lastkey, uchar *page_end, 32 my_off_t *next_block, MARIA_KEY_PARAM *s_temp); 33 34 /* @breif Remove a row from a MARIA table */ 35 36 int maria_delete(MARIA_HA *info,const uchar *record) 37 { 38 uint i; 39 uchar *old_key; 40 int save_errno; 41 char lastpos[8]; 42 MARIA_SHARE *share= info->s; 43 MARIA_KEYDEF *keyinfo; 44 DBUG_ENTER("maria_delete"); 45 46 /* Test if record is in datafile */ 47 DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage", 48 maria_print_error(share, HA_ERR_CRASHED); 49 DBUG_RETURN(my_errno= HA_ERR_CRASHED);); 50 DBUG_EXECUTE_IF("my_error_test_undefined_error", 51 maria_print_error(share, INT_MAX); 52 DBUG_RETURN(my_errno= INT_MAX);); 53 if (!(info->update & HA_STATE_AKTIV)) 54 { 55 DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND); /* No database read */ 56 } 57 if (share->options & HA_OPTION_READ_ONLY_DATA) 58 { 59 DBUG_RETURN(my_errno=EACCES); 60 } 61 if (_ma_readinfo(info,F_WRLCK,1)) 62 DBUG_RETURN(my_errno); 63 if ((*share->compare_record)(info,record)) 64 goto err; /* Error on read-check */ 65 66 if (_ma_mark_file_changed(share)) 67 goto err; 68 69 /* Ensure we don't change the autoincrement value */ 70 info->last_auto_increment= ~(ulonglong) 0; 71 /* Remove all keys from the index file */ 72 73 old_key= info->lastkey_buff2; 74 75 for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++) 76 { 77 if (maria_is_key_active(share->state.key_map, i)) 78 { 79 keyinfo->version++; 80 if (keyinfo->flag & HA_FULLTEXT) 81 { 82 if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos)) 83 goto err; 84 } 85 else 86 { 87 MARIA_KEY key; 88 if (keyinfo->ck_delete(info, 89 (*keyinfo->make_key)(info, &key, i, old_key, 90 record, 91 info->cur_row.lastpos, 92 info->cur_row.trid))) 93 goto err; 94 } 95 /* The above changed info->lastkey2. Inform maria_rnext_same(). */ 96 info->update&= ~HA_STATE_RNEXT_SAME; 97 } 98 } 99 100 if (share->calc_checksum) 101 { 102 /* 103 We can't use the row based checksum as this doesn't have enough 104 precision. 105 */ 106 info->cur_row.checksum= (*share->calc_checksum)(info, record); 107 } 108 109 if ((*share->delete_record)(info, record)) 110 goto err; /* Remove record from database */ 111 112 info->state->checksum-= info->cur_row.checksum; 113 info->state->records--; 114 info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED; 115 info->row_changes++; 116 share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE | 117 STATE_NOT_ZEROFILLED); 118 info->state->changed=1; 119 120 mi_sizestore(lastpos, info->cur_row.lastpos); 121 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); 122 if (info->invalidator != 0) 123 { 124 DBUG_PRINT("info", ("invalidator... '%s' (delete)", 125 share->open_file_name.str)); 126 (*info->invalidator)(share->open_file_name.str); 127 info->invalidator=0; 128 } 129 DBUG_RETURN(0); 130 131 err: 132 save_errno= my_errno; 133 DBUG_ASSERT(save_errno); 134 if (!save_errno) 135 save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */ 136 137 mi_sizestore(lastpos, info->cur_row.lastpos); 138 (void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE); 139 info->update|=HA_STATE_WRITTEN; /* Buffer changed */ 140 if (save_errno != HA_ERR_RECORD_CHANGED) 141 { 142 _ma_set_fatal_error(share, HA_ERR_CRASHED); 143 save_errno= HA_ERR_CRASHED; 144 } 145 DBUG_RETURN(my_errno= save_errno); 146 } /* maria_delete */ 147 148 149 /* 150 Remove a key from the btree index 151 152 TODO: 153 Change ma_ck_real_delete() to use another buffer for changed keys instead 154 of key->data. This would allows us to remove the copying of the key here. 155 */ 156 157 my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key) 158 { 159 MARIA_SHARE *share= info->s; 160 int res; 161 LSN lsn= LSN_IMPOSSIBLE; 162 my_off_t new_root= share->state.key_root[key->keyinfo->key_nr]; 163 uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data; 164 MARIA_KEY org_key; 165 DBUG_ENTER("_ma_ck_delete"); 166 167 LINT_INIT_STRUCT(org_key); 168 169 save_key_data= key->data; 170 if (share->now_transactional) 171 { 172 /* Save original value as the key may change */ 173 memcpy(key_buff, key->data, key->data_length + key->ref_length); 174 org_key= *key; 175 key->data= key_buff; 176 } 177 178 if ((res= _ma_ck_real_delete(info, key, &new_root))) 179 { 180 /* We have to mark the table crashed before unpin_all_pages() */ 181 maria_mark_crashed(info); 182 } 183 184 key->data= save_key_data; 185 if (!res && share->now_transactional) 186 res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn); 187 else 188 { 189 share->state.key_root[key->keyinfo->key_nr]= new_root; 190 _ma_fast_unlock_key_del(info); 191 } 192 _ma_unpin_all_pages_and_finalize_row(info, lsn); 193 DBUG_RETURN(res != 0); 194 } /* _ma_ck_delete */ 195 196 197 my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key, 198 my_off_t *root) 199 { 200 int error; 201 my_bool result= 0; 202 my_off_t old_root; 203 uchar *root_buff; 204 MARIA_KEYDEF *keyinfo= key->keyinfo; 205 MARIA_PAGE page; 206 DBUG_ENTER("_ma_ck_real_delete"); 207 208 if ((old_root=*root) == HA_OFFSET_ERROR) 209 { 210 _ma_set_fatal_error(info->s, HA_ERR_CRASHED); 211 DBUG_RETURN(1); 212 } 213 if (!(root_buff= (uchar*) my_alloca((uint) keyinfo->block_length+ 214 MARIA_MAX_KEY_BUFF*2))) 215 { 216 DBUG_PRINT("error",("Couldn't allocate memory")); 217 my_errno=ENOMEM; 218 DBUG_RETURN(1); 219 } 220 DBUG_PRINT("info",("root_page: %lu", 221 (ulong) (old_root / keyinfo->block_length))); 222 if (_ma_fetch_keypage(&page, info, keyinfo, old_root, 223 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0)) 224 { 225 result= 1; 226 goto err; 227 } 228 if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ? 229 SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT: 230 SEARCH_SAME), 231 &page))) 232 { 233 if (error < 0) 234 result= 1; 235 else if (error == 2) 236 { 237 DBUG_PRINT("test",("Enlarging of root when deleting")); 238 if (_ma_enlarge_root(info, key, root)) 239 result= 1; 240 } 241 else /* error == 1 */ 242 { 243 MARIA_SHARE *share= info->s; 244 245 page_mark_changed(info, &page); 246 247 if (page.size <= page.node + share->keypage_header + 1) 248 { 249 DBUG_ASSERT(page.size == page.node + share->keypage_header); 250 if (page.node) 251 *root= _ma_kpos(page.node, root_buff +share->keypage_header + 252 page.node); 253 else 254 *root=HA_OFFSET_ERROR; 255 if (_ma_dispose(info, old_root, 0)) 256 result= 1; 257 } 258 else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED, 259 DFLT_INIT_HITS)) 260 result= 1; 261 } 262 } 263 err: 264 my_afree(root_buff); 265 DBUG_PRINT("exit",("Return: %d",result)); 266 DBUG_RETURN(result); 267 } /* _ma_ck_real_delete */ 268 269 270 /** 271 @brief Remove key below key root 272 273 @param key Key to delete. Will contain new key if block was enlarged 274 275 @return 276 @retval 0 ok (anc_page is not changed) 277 @retval 1 If data on page is too small; In this case anc_buff is not saved 278 @retval 2 If data on page is too big 279 @retval -1 On errors 280 */ 281 282 static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag, 283 MARIA_PAGE *anc_page) 284 { 285 int flag,ret_value,save_flag; 286 uint nod_flag, page_flag; 287 my_bool last_key; 288 uchar *leaf_buff,*keypos; 289 uchar lastkey[MARIA_MAX_KEY_BUFF]; 290 MARIA_KEY_PARAM s_temp; 291 MARIA_SHARE *share= info->s; 292 MARIA_KEYDEF *keyinfo= key->keyinfo; 293 MARIA_PAGE leaf_page; 294 DBUG_ENTER("d_search"); 295 DBUG_DUMP("page", anc_page->buff, anc_page->size); 296 297 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey, 298 &last_key); 299 if (flag == MARIA_FOUND_WRONG_KEY) 300 { 301 DBUG_PRINT("error",("Found wrong key")); 302 DBUG_RETURN(-1); 303 } 304 page_flag= anc_page->flag; 305 nod_flag= anc_page->node; 306 307 if (!flag && (keyinfo->flag & HA_FULLTEXT)) 308 { 309 uint off; 310 int subkeys; 311 312 get_key_full_length_rdonly(off, lastkey); 313 subkeys=ft_sintXkorr(lastkey+off); 314 DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0); 315 comp_flag=SEARCH_SAME; 316 if (subkeys >= 0) 317 { 318 /* normal word, one-level tree structure */ 319 if (info->ft1_to_ft2) 320 { 321 /* we're in ft1->ft2 conversion mode. Saving key data */ 322 insert_dynamic(info->ft1_to_ft2, (lastkey+off)); 323 } 324 else 325 { 326 /* we need exact match only if not in ft1->ft2 conversion mode */ 327 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, 328 lastkey, &last_key); 329 } 330 /* fall through to normal delete */ 331 } 332 else 333 { 334 /* popular word. two-level tree. going down */ 335 uint tmp_key_length; 336 my_off_t root; 337 uchar *kpos=keypos; 338 MARIA_KEY tmp_key; 339 340 tmp_key.data= lastkey; 341 tmp_key.keyinfo= keyinfo; 342 343 if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, 344 &kpos))) 345 { 346 _ma_set_fatal_error(share, HA_ERR_CRASHED); 347 DBUG_RETURN(-1); 348 } 349 root= _ma_row_pos_from_key(&tmp_key); 350 if (subkeys == -1) 351 { 352 /* the last entry in sub-tree */ 353 if (_ma_dispose(info, root, 1)) 354 DBUG_RETURN(-1); 355 /* fall through to normal delete */ 356 } 357 else 358 { 359 MARIA_KEY word_key; 360 keyinfo=&share->ft2_keyinfo; 361 /* we'll modify key entry 'in vivo' */ 362 kpos-=keyinfo->keylength+nod_flag; 363 get_key_full_length_rdonly(off, key->data); 364 365 word_key.data= key->data + off; 366 word_key.keyinfo= &share->ft2_keyinfo; 367 word_key.data_length= HA_FT_WLEN; 368 word_key.ref_length= 0; 369 word_key.flag= 0; 370 ret_value= _ma_ck_real_delete(info, &word_key, &root); 371 _ma_dpointer(share, kpos+HA_FT_WLEN, root); 372 subkeys++; 373 ft_intXstore(kpos, subkeys); 374 if (!ret_value) 375 { 376 page_mark_changed(info, anc_page); 377 ret_value= _ma_write_keypage(anc_page, 378 PAGECACHE_LOCK_LEFT_WRITELOCKED, 379 DFLT_INIT_HITS); 380 } 381 DBUG_PRINT("exit",("Return: %d",ret_value)); 382 DBUG_RETURN(ret_value); 383 } 384 } 385 } 386 leaf_buff=0; 387 if (nod_flag) 388 { 389 /* Read left child page */ 390 leaf_page.pos= _ma_kpos(nod_flag,keypos); 391 if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+ 392 MARIA_MAX_KEY_BUFF*2))) 393 { 394 DBUG_PRINT("error", ("Couldn't allocate memory")); 395 my_errno=ENOMEM; 396 DBUG_RETURN(-1); 397 } 398 if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos, 399 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff, 400 0)) 401 goto err; 402 } 403 404 if (flag != 0) 405 { 406 if (!nod_flag) 407 { 408 /* This should newer happend */ 409 DBUG_PRINT("error",("Didn't find key")); 410 _ma_set_fatal_error(share, HA_ERR_CRASHED); 411 goto err; 412 } 413 save_flag=0; 414 ret_value= d_search(info, key, comp_flag, &leaf_page); 415 } 416 else 417 { /* Found key */ 418 uint tmp; 419 uint anc_buff_length= anc_page->size; 420 uint anc_page_flag= anc_page->flag; 421 my_off_t next_block; 422 423 if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey, 424 anc_page->buff + anc_buff_length, 425 &next_block, &s_temp))) 426 goto err; 427 428 page_mark_changed(info, anc_page); 429 anc_buff_length-= tmp; 430 anc_page->size= anc_buff_length; 431 page_store_size(share, anc_page); 432 433 /* 434 Log initial changes on pages 435 If there is an underflow, there will be more changes logged to the 436 page 437 */ 438 if (share->now_transactional && 439 _ma_log_delete(anc_page, s_temp.key_pos, 440 s_temp.changed_length, s_temp.move_length, 441 0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1)) 442 DBUG_RETURN(-1); 443 444 if (!nod_flag) 445 { /* On leaf page */ 446 if (anc_buff_length <= (info->quick_mode ? 447 MARIA_MIN_KEYBLOCK_LENGTH : 448 (uint) keyinfo->underflow_block_length)) 449 { 450 /* Page will be written by caller if we return 1 */ 451 DBUG_RETURN(1); 452 } 453 if (_ma_write_keypage(anc_page, 454 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS)) 455 DBUG_RETURN(-1); 456 DBUG_RETURN(0); 457 } 458 save_flag=1; /* Mark that anc_buff is changed */ 459 ret_value= del(info, key, anc_page, &leaf_page, 460 keypos, next_block, lastkey); 461 } 462 if (ret_value >0) 463 { 464 save_flag= 2; 465 if (ret_value == 1) 466 ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos); 467 else 468 { 469 /* This can only happen with variable length keys */ 470 MARIA_KEY last_key; 471 DBUG_PRINT("test",("Enlarging of key when deleting")); 472 473 last_key.data= lastkey; 474 last_key.keyinfo= keyinfo; 475 if (!_ma_get_last_key(&last_key, anc_page, keypos)) 476 goto err; 477 ret_value= _ma_insert(info, key, anc_page, keypos, 478 last_key.data, 479 (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0); 480 481 if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED, 482 DFLT_INIT_HITS)) 483 ret_value= -1; 484 } 485 } 486 if (ret_value == 0 && anc_page->size > share->max_index_block_size) 487 { 488 /* 489 parent buffer got too big ; We have to split the page. 490 The | 2 is there to force write of anc page below 491 */ 492 save_flag= 3; 493 ret_value= _ma_split_page(info, key, anc_page, 494 share->max_index_block_size, 495 (uchar*) 0, 0, 0, lastkey, 0) | 2; 496 DBUG_ASSERT(anc_page->org_size == anc_page->size); 497 } 498 if (save_flag && ret_value != 1) 499 { 500 page_mark_changed(info, anc_page); 501 if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED, 502 DFLT_INIT_HITS)) 503 ret_value= -1; 504 } 505 else 506 { 507 DBUG_DUMP("page", anc_page->buff, anc_page->size); 508 } 509 my_afree(leaf_buff); 510 DBUG_PRINT("exit",("Return: %d",ret_value)); 511 DBUG_RETURN(ret_value); 512 513 err: 514 my_afree(leaf_buff); 515 DBUG_PRINT("exit",("Error: %d",my_errno)); 516 DBUG_RETURN (-1); 517 } /* d_search */ 518 519 520 /** 521 @brief Remove a key that has a page-reference 522 523 @param info Maria handler 524 @param key Buffer for key to be inserted at upper level 525 @param anc_page Page address for page where deleted key was 526 @param anc_buff Page buffer (nod) where deleted key was 527 @param leaf_page Page address for nod before the deleted key 528 @param leaf_buff Buffer for leaf_page 529 @param leaf_buff_link Pinned page link for leaf_buff 530 @param keypos Pos to where deleted key was on anc_buff 531 @param next_block Page adress for nod after deleted key 532 @param ret_key_buff Key before keypos in anc_buff 533 534 @notes 535 leaf_page must be written to disk if retval > 0 536 anc_page is not updated on disk. Caller should do this 537 538 @return 539 @retval < 0 Error 540 @retval 0 OK. leaf_buff is written to disk 541 542 @retval 1 key contains key to upper level (from balance page) 543 leaf_buff has underflow 544 @retval 2 key contains key to upper level (from split space) 545 */ 546 547 static int del(MARIA_HA *info, MARIA_KEY *key, 548 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page, 549 uchar *keypos, my_off_t next_block, uchar *ret_key_buff) 550 { 551 int ret_value,length; 552 uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length; 553 uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key; 554 uchar *anc_buff; 555 MARIA_KEY_PARAM s_temp; 556 MARIA_KEY tmp_key; 557 MARIA_SHARE *share= info->s; 558 MARIA_KEYDEF *keyinfo= key->keyinfo; 559 MARIA_KEY ret_key; 560 MARIA_PAGE next_page; 561 DBUG_ENTER("del"); 562 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p", 563 (ulong) (leaf_page->pos / share->block_size), 564 keypos)); 565 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size); 566 567 page_flag= leaf_page->flag; 568 leaf_length= leaf_page->size; 569 nod_flag= leaf_page->node; 570 571 endpos= leaf_page->buff + leaf_length; 572 tmp_key.keyinfo= keyinfo; 573 tmp_key.data= keybuff; 574 next_buff= 0; 575 576 if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos))) 577 DBUG_RETURN(-1); 578 579 if (nod_flag) 580 { 581 next_page.pos= _ma_kpos(nod_flag,endpos); 582 if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+ 583 MARIA_MAX_KEY_BUFF*2))) 584 DBUG_RETURN(-1); 585 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos, 586 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0)) 587 ret_value= -1; 588 else 589 { 590 DBUG_DUMP("next_page", next_page.buff, next_page.size); 591 if ((ret_value= del(info, key, anc_page, &next_page, 592 keypos, next_block, ret_key_buff)) >0) 593 { 594 /* Get new length after key was deleted */ 595 endpos= leaf_page->buff+ leaf_page->size; 596 if (ret_value == 1) 597 { 598 /* underflow writes "next_page" to disk */ 599 ret_value= underflow(info, keyinfo, leaf_page, &next_page, 600 endpos); 601 if (ret_value < 0) 602 goto err; 603 if (leaf_page->size > share->max_index_block_size) 604 { 605 DBUG_ASSERT(ret_value == 0); 606 ret_value= (_ma_split_page(info, key, leaf_page, 607 share->max_index_block_size, 608 (uchar*) 0, 0, 0, 609 ret_key_buff, 0) | 2); 610 } 611 } 612 else 613 { 614 if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED, 615 DFLT_INIT_HITS)) 616 goto err; 617 DBUG_PRINT("test",("Inserting of key when deleting")); 618 if (!_ma_get_last_key(&tmp_key, leaf_page, endpos)) 619 goto err; 620 ret_value= _ma_insert(info, key, leaf_page, endpos, 621 tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0, 622 0); 623 } 624 } 625 page_mark_changed(info, leaf_page); 626 /* 627 If ret_value <> 0, then leaf_page underflowed and caller will have 628 to handle underflow and write leaf_page to disk. 629 We can't write it here, as if leaf_page is empty we get an assert 630 in _ma_write_keypage. 631 */ 632 if (ret_value == 0 && _ma_write_keypage(leaf_page, 633 PAGECACHE_LOCK_LEFT_WRITELOCKED, 634 DFLT_INIT_HITS)) 635 goto err; 636 } 637 my_afree(next_buff); 638 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size); 639 DBUG_RETURN(ret_value); 640 } 641 642 /* 643 Remove last key from leaf page 644 Note that leaf_page page may only have had one key (can normally only 645 happen in quick mode), in which ase it will now temporary have 0 keys 646 on it. This will be corrected by the caller as we will return 0. 647 */ 648 new_leaf_length= (uint) (key_start - leaf_page->buff); 649 leaf_page->size= new_leaf_length; 650 page_store_size(share, leaf_page); 651 652 if (share->now_transactional && 653 _ma_log_suffix(leaf_page, leaf_length, new_leaf_length)) 654 goto err; 655 656 page_mark_changed(info, leaf_page); /* Safety */ 657 if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH : 658 (uint) keyinfo->underflow_block_length)) 659 { 660 /* Underflow, leaf_page will be written by caller */ 661 ret_value= 1; 662 } 663 else 664 { 665 ret_value= 0; 666 if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED, 667 DFLT_INIT_HITS)) 668 goto err; 669 } 670 671 /* Place last key in ancestor page on deleted key position */ 672 a_length= anc_page->size; 673 anc_buff= anc_page->buff; 674 endpos= anc_buff + a_length; 675 676 ret_key.keyinfo= keyinfo; 677 ret_key.data= ret_key_buff; 678 679 prev_key= 0; 680 if (keypos != anc_buff+share->keypage_header + share->base.key_reflength) 681 { 682 if (!_ma_get_last_key(&ret_key, anc_page, keypos)) 683 goto err; 684 prev_key= ret_key.data; 685 } 686 length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength, 687 keypos == endpos ? (uchar*) 0 : keypos, 688 prev_key, prev_key, 689 &s_temp); 690 if (length > 0) 691 bmove_upp(endpos+length,endpos,(uint) (endpos-keypos)); 692 else 693 bmove(keypos,keypos-length, (int) (endpos-keypos)+length); 694 (*keyinfo->store_key)(keyinfo,keypos,&s_temp); 695 key_start= keypos; 696 if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID | 697 SEARCH_PAGE_KEY_HAS_TRANSID)) 698 _ma_mark_page_with_transid(share, anc_page); 699 700 /* Save pointer to next leaf on parent page */ 701 if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength, 702 &keypos)) 703 goto err; 704 _ma_kpointer(info,keypos - share->base.key_reflength,next_block); 705 anc_page->size= a_length + length; 706 page_store_size(share, anc_page); 707 708 if (share->now_transactional && 709 _ma_log_add(anc_page, a_length, 710 key_start, s_temp.changed_length, s_temp.move_length, 1, 711 KEY_OP_DEBUG_LOG_ADD_2)) 712 goto err; 713 714 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size); 715 DBUG_RETURN(new_leaf_length <= 716 (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH : 717 (uint) keyinfo->underflow_block_length)); 718 err: 719 if (next_buff) 720 my_afree(next_buff); 721 722 DBUG_RETURN(-1); 723 } /* del */ 724 725 726 /** 727 @brief Balances adjacent pages if underflow occours 728 729 @fn underflow() 730 @param anc_buff Anchestor page data 731 @param leaf_page Leaf page (page that underflowed) 732 @param leaf_page_link Pointer to pin information about leaf page 733 @param keypos Position after current key in anc_buff 734 735 @note 736 This function writes redo entries for all changes 737 leaf_page is saved to disk 738 Caller must save anc_buff 739 740 For the algoritm to work, we have to ensure for packed keys that 741 key_length + (underflow_length + max_block_length + key_length) / 2 742 <= block_length. 743 From which follows that underflow_length <= block_length - key_length *3 744 For not packed keys we have: 745 (underflow_length + max_block_length + key_length) / 2 <= block_length 746 From which follows that underflow_length < block_length - key_length 747 This is ensured by setting of underflow_block_length. 748 749 @return 750 @retval 0 ok 751 @retval 1 ok, but anc_page did underflow 752 @retval -1 error 753 */ 754 755 static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo, 756 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page, 757 uchar *keypos) 758 { 759 int t_length; 760 uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag; 761 uint next_buff_length, new_buff_length, key_reflength; 762 uint unchanged_leaf_length, new_leaf_length, new_anc_length; 763 uint anc_page_flag, page_flag; 764 uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF]; 765 uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key; 766 uchar *anc_buff, *leaf_buff; 767 uchar *after_key, *anc_end_pos; 768 MARIA_KEY_PARAM key_deleted, key_inserted; 769 MARIA_SHARE *share= info->s; 770 my_bool first_key; 771 MARIA_KEY tmp_key, anc_key, leaf_key; 772 MARIA_PAGE next_page; 773 DBUG_ENTER("underflow"); 774 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p", 775 (ulong) (leaf_page->pos / share->block_size), 776 keypos)); 777 DBUG_DUMP("anc_buff", anc_page->buff, anc_page->size); 778 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size); 779 780 anc_page_flag= anc_page->flag; 781 anc_buff= anc_page->buff; 782 leaf_buff= leaf_page->buff; 783 info->keyread_buff_used=1; 784 next_keypos=keypos; 785 nod_flag= leaf_page->node; 786 p_length= nod_flag+share->keypage_header; 787 anc_length= anc_page->size; 788 leaf_length= leaf_page->size; 789 key_reflength= share->base.key_reflength; 790 if (share->keyinfo+info->lastinx == keyinfo) 791 info->page_changed=1; 792 first_key= keypos == anc_buff + share->keypage_header + key_reflength; 793 794 tmp_key.data= info->buff; 795 anc_key.data= anc_key_buff; 796 leaf_key.data= leaf_key_buff; 797 tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo; 798 799 if ((keypos < anc_buff + anc_length && (info->state->records & 1)) || 800 first_key) 801 { 802 uint tmp_length; 803 uint next_page_flag; 804 /* Use page right of anc-page */ 805 DBUG_PRINT("test",("use right page")); 806 807 /* 808 Calculate position after the current key. Note that keydata itself is 809 not used 810 */ 811 if (keyinfo->flag & HA_BINARY_PACK_KEY) 812 { 813 if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos))) 814 goto err; 815 } 816 else 817 { 818 /* Avoid length error check if packed key */ 819 tmp_key.data[0]= tmp_key.data[1]= 0; 820 /* Got to end of found key */ 821 if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength, 822 &next_keypos)) 823 goto err; 824 } 825 next_page.pos= _ma_kpos(key_reflength, next_keypos); 826 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos, 827 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0)) 828 goto err; 829 next_buff_length= next_page.size; 830 next_page_flag= next_page.flag; 831 DBUG_DUMP("next", next_page.buff, next_page.size); 832 833 /* find keys to make a big key-page */ 834 bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header, 835 key_reflength); 836 837 if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) || 838 !_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length)) 839 goto err; 840 841 /* merge pages and put parting key from anc_page between */ 842 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data); 843 t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length, 844 prev_key, prev_key, &key_inserted); 845 tmp_length= next_buff_length - p_length; 846 endpos= next_page.buff + tmp_length + leaf_length + t_length; 847 /* next_page.buff will always be larger than before !*/ 848 bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length); 849 memcpy(next_page.buff, leaf_buff,(size_t) leaf_length); 850 (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted); 851 buff_length= (uint) (endpos - next_page.buff); 852 853 /* Set page flag from combination of both key pages and parting key */ 854 page_flag= next_page_flag | leaf_page->flag; 855 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID | 856 SEARCH_PAGE_KEY_HAS_TRANSID)) 857 page_flag|= KEYPAGE_FLAG_HAS_TRANSID; 858 859 next_page.size= buff_length; 860 next_page.flag= page_flag; 861 page_store_info(share, &next_page); 862 863 /* remove key from anc_page */ 864 if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos, 865 anc_key_buff, anc_buff+anc_length, 866 (my_off_t *) 0, &key_deleted))) 867 goto err; 868 869 new_anc_length= anc_length - s_length; 870 anc_page->size= new_anc_length; 871 page_store_size(share, anc_page); 872 873 if (buff_length <= share->max_index_block_size) 874 { 875 /* All keys fitted into one page */ 876 page_mark_changed(info, &next_page); 877 if (_ma_dispose(info, next_page.pos, 0)) 878 goto err; 879 880 memcpy(leaf_buff, next_page.buff, (size_t) buff_length); 881 leaf_page->size= next_page.size; 882 leaf_page->flag= next_page.flag; 883 884 if (share->now_transactional) 885 { 886 /* 887 Log changes to parent page. Note that this page may have been 888 temporarily bigger than block_size. 889 */ 890 if (_ma_log_delete(anc_page, key_deleted.key_pos, 891 key_deleted.changed_length, 892 key_deleted.move_length, 893 anc_length - anc_page->org_size, 894 KEY_OP_DEBUG_LOG_DEL_CHANGE_2)) 895 goto err; 896 /* 897 Log changes to leaf page. Data for leaf page is in leaf_buff 898 which contains original leaf_buff, parting key and next_buff 899 */ 900 if (_ma_log_suffix(leaf_page, leaf_length, buff_length)) 901 goto err; 902 } 903 } 904 else 905 { 906 /* 907 Balancing didn't free a page, so we have to split 'buff' into two 908 pages: 909 - Find key in middle of buffer 910 - Store everything before key in 'leaf_page' 911 - Pack key into anc_page at position of deleted key 912 Note that anc_page may overflow! (is handled by caller) 913 - Store remaining keys in next_page (buff) 914 */ 915 MARIA_KEY_PARAM anc_key_inserted; 916 917 anc_end_pos= anc_buff + new_anc_length; 918 919 DBUG_PRINT("test",("anc_buff:%p anc_end_pos:%p", 920 anc_buff, anc_end_pos)); 921 922 if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos)) 923 goto err; 924 if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key))) 925 goto err; 926 new_leaf_length= (uint) (half_pos - next_page.buff); 927 memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length); 928 929 leaf_page->size= new_leaf_length; 930 leaf_page->flag= page_flag; 931 page_store_info(share, leaf_page); 932 933 /* Correct new keypointer to leaf_page */ 934 half_pos=after_key; 935 _ma_kpointer(info, 936 leaf_key.data + leaf_key.data_length + leaf_key.ref_length, 937 next_page.pos); 938 939 /* Save key in anc_page */ 940 prev_key= (first_key ? (uchar*) 0 : anc_key.data); 941 t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength, 942 (keypos == anc_end_pos ? (uchar*) 0 : 943 keypos), 944 prev_key, prev_key, &anc_key_inserted); 945 if (t_length >= 0) 946 bmove_upp(anc_end_pos+t_length, anc_end_pos, 947 (uint) (anc_end_pos - keypos)); 948 else 949 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length); 950 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted); 951 new_anc_length+= t_length; 952 anc_page->size= new_anc_length; 953 page_store_size(share, anc_page); 954 955 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID | 956 SEARCH_PAGE_KEY_HAS_TRANSID)) 957 _ma_mark_page_with_transid(share, anc_page); 958 959 /* Store key first in new page */ 960 if (nod_flag) 961 bmove(next_page.buff + share->keypage_header, half_pos-nod_flag, 962 (size_t) nod_flag); 963 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos)) 964 goto err; 965 t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0, 966 (uchar*) 0, (uchar*) 0, 967 &key_inserted); 968 /* t_length will always be > 0 for a new page !*/ 969 tmp_length= (uint) ((next_page.buff + buff_length) - half_pos); 970 bmove(next_page.buff + p_length + t_length, half_pos, tmp_length); 971 (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted); 972 new_buff_length= tmp_length + t_length + p_length; 973 next_page.size= new_buff_length; 974 page_store_size(share, &next_page); 975 /* keypage flag is already up to date */ 976 977 if (share->now_transactional) 978 { 979 /* 980 Log changes to parent page 981 This has one key deleted from it and one key inserted to it at 982 keypos 983 984 ma_log_add ensures that we don't log changes that is outside of 985 key block size, as the REDO code can't handle that 986 */ 987 if (_ma_log_add(anc_page, anc_length, keypos, 988 anc_key_inserted.move_length + 989 MY_MAX(anc_key_inserted.changed_length - 990 anc_key_inserted.move_length, 991 key_deleted.changed_length), 992 anc_key_inserted.move_length - 993 key_deleted.move_length, 1, 994 KEY_OP_DEBUG_LOG_ADD_3)) 995 goto err; 996 997 /* 998 Log changes to leaf page. 999 This contains original data with new data added at end 1000 */ 1001 DBUG_ASSERT(leaf_length <= new_leaf_length); 1002 if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length)) 1003 goto err; 1004 /* 1005 Log changes to next page 1006 1007 This contains original data with some prefix data deleted and 1008 some compressed data at start possible extended 1009 1010 Data in buff was originally: 1011 org_leaf_buff [leaf_length] 1012 separator_key [buff_key_inserted.move_length] 1013 next_key_changes [buff_key_inserted.changed_length -move_length] 1014 next_page_data [next_buff_length - p_length - 1015 (buff_key_inserted.changed_length -move_length)] 1016 1017 After changes it's now: 1018 unpacked_key [key_inserted.changed_length] 1019 next_suffix [next_buff_length - key_inserted.changed_length] 1020 1021 */ 1022 DBUG_ASSERT(new_buff_length <= next_buff_length); 1023 if (_ma_log_prefix(&next_page, key_inserted.changed_length, 1024 (int) (new_buff_length - next_buff_length), 1025 KEY_OP_DEBUG_LOG_PREFIX_1)) 1026 goto err; 1027 } 1028 page_mark_changed(info, &next_page); 1029 if (_ma_write_keypage(&next_page, 1030 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS)) 1031 goto err; 1032 } 1033 1034 page_mark_changed(info, leaf_page); 1035 if (_ma_write_keypage(leaf_page, 1036 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS)) 1037 goto err; 1038 DBUG_RETURN(new_anc_length <= 1039 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH : 1040 (uint) keyinfo->underflow_block_length))); 1041 } 1042 1043 DBUG_PRINT("test",("use left page")); 1044 1045 keypos= _ma_get_last_key(&anc_key, anc_page, keypos); 1046 if (!keypos) 1047 goto err; 1048 next_page.pos= _ma_kpos(key_reflength,keypos); 1049 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos, 1050 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0)) 1051 goto err; 1052 buff_length= next_page.size; 1053 endpos= next_page.buff + buff_length; 1054 DBUG_DUMP("prev", next_page.buff, next_page.size); 1055 1056 /* find keys to make a big key-page */ 1057 bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header, 1058 key_reflength); 1059 next_keypos=keypos; 1060 if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength, 1061 &next_keypos)) 1062 goto err; 1063 if (!_ma_get_last_key(&leaf_key, &next_page, endpos)) 1064 goto err; 1065 1066 /* merge pages and put parting key from anc_page between */ 1067 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data); 1068 t_length=(*keyinfo->pack_key)(&anc_key, nod_flag, 1069 (leaf_length == p_length ? 1070 (uchar*) 0 : leaf_buff+p_length), 1071 prev_key, prev_key, 1072 &key_inserted); 1073 if (t_length >= 0) 1074 bmove(endpos+t_length, leaf_buff+p_length, 1075 (size_t) (leaf_length-p_length)); 1076 else /* We gained space */ 1077 bmove(endpos,leaf_buff+((int) p_length-t_length), 1078 (size_t) (leaf_length-p_length+t_length)); 1079 (*keyinfo->store_key)(keyinfo,endpos, &key_inserted); 1080 1081 /* Remember for logging how many bytes of leaf_buff that are not changed */ 1082 DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length); 1083 unchanged_leaf_length= (leaf_length - p_length - 1084 (key_inserted.changed_length - 1085 key_inserted.move_length)); 1086 1087 new_buff_length= buff_length + leaf_length - p_length + t_length; 1088 1089 #ifdef EXTRA_DEBUG 1090 /* Ensure that unchanged_leaf_length is correct */ 1091 DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length, 1092 leaf_buff + leaf_length - unchanged_leaf_length, 1093 unchanged_leaf_length) == 0); 1094 #endif 1095 1096 page_flag= next_page.flag | leaf_page->flag; 1097 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID | 1098 SEARCH_PAGE_KEY_HAS_TRANSID)) 1099 page_flag|= KEYPAGE_FLAG_HAS_TRANSID; 1100 1101 next_page.size= new_buff_length; 1102 next_page.flag= page_flag; 1103 page_store_info(share, &next_page); 1104 1105 /* remove key from anc_page */ 1106 if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos, 1107 anc_key_buff, 1108 anc_buff+anc_length, (my_off_t *) 0, 1109 &key_deleted))) 1110 goto err; 1111 1112 new_anc_length= anc_length - s_length; 1113 anc_page->size= new_anc_length; 1114 page_store_size(share, anc_page); 1115 1116 if (new_buff_length <= share->max_index_block_size) 1117 { 1118 /* All keys fitted into one page */ 1119 page_mark_changed(info, leaf_page); 1120 if (_ma_dispose(info, leaf_page->pos, 0)) 1121 goto err; 1122 1123 if (share->now_transactional) 1124 { 1125 /* 1126 Log changes to parent page. Note that this page may have been 1127 temporarily bigger than block_size. 1128 */ 1129 if (_ma_log_delete(anc_page, key_deleted.key_pos, 1130 key_deleted.changed_length, key_deleted.move_length, 1131 anc_length - anc_page->org_size, 1132 KEY_OP_DEBUG_LOG_DEL_CHANGE_3)) 1133 goto err; 1134 /* 1135 Log changes to next page. Data for leaf page is in buff 1136 that contains original leaf_buff, parting key and next_buff 1137 */ 1138 if (_ma_log_suffix(&next_page, buff_length, new_buff_length)) 1139 goto err; 1140 } 1141 } 1142 else 1143 { 1144 /* 1145 Balancing didn't free a page, so we have to split 'next_page' into two 1146 pages 1147 - Find key in middle of buffer (buff) 1148 - Pack key at half_buff into anc_page at position of deleted key 1149 Note that anc_page may overflow! (is handled by caller) 1150 - Move everything after middlekey to 'leaf_buff' 1151 - Shorten buff at 'endpos' 1152 */ 1153 MARIA_KEY_PARAM anc_key_inserted; 1154 size_t tmp_length; 1155 1156 if (keypos == anc_buff + share->keypage_header + key_reflength) 1157 anc_pos= 0; /* First key */ 1158 else 1159 { 1160 if (!_ma_get_last_key(&anc_key, anc_page, keypos)) 1161 goto err; 1162 anc_pos= anc_key.data; 1163 } 1164 if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos))) 1165 goto err; 1166 1167 /* Correct new keypointer to leaf_page */ 1168 _ma_kpointer(info,leaf_key.data + leaf_key.data_length + 1169 leaf_key.ref_length, leaf_page->pos); 1170 1171 /* Save parting key found by _ma_find_half_pos() in anc_page */ 1172 DBUG_DUMP("anc_buff", anc_buff, new_anc_length); 1173 DBUG_DUMP_KEY("key_to_anc", &leaf_key); 1174 anc_end_pos= anc_buff + new_anc_length; 1175 t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength, 1176 keypos == anc_end_pos ? (uchar*) 0 1177 : keypos, 1178 anc_pos, anc_pos, 1179 &anc_key_inserted); 1180 if (t_length >= 0) 1181 bmove_upp(anc_end_pos+t_length, anc_end_pos, 1182 (uint) (anc_end_pos-keypos)); 1183 else 1184 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length); 1185 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted); 1186 new_anc_length+= t_length; 1187 anc_page->size= new_anc_length; 1188 page_store_size(share, anc_page); 1189 1190 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID | 1191 SEARCH_PAGE_KEY_HAS_TRANSID)) 1192 _ma_mark_page_with_transid(share, anc_page); 1193 1194 /* Store first key on new page */ 1195 if (nod_flag) 1196 bmove(leaf_buff + share->keypage_header, half_pos-nod_flag, 1197 (size_t) nod_flag); 1198 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos)) 1199 goto err; 1200 DBUG_DUMP_KEY("key_to_leaf", &leaf_key); 1201 t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0, 1202 (uchar*) 0, (uchar*) 0, &key_inserted); 1203 /* t_length will always be > 0 for a new page !*/ 1204 tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos); 1205 DBUG_PRINT("info",("t_length: %d length: %d",t_length, (int) tmp_length)); 1206 bmove(leaf_buff+p_length+t_length, half_pos, tmp_length); 1207 (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted); 1208 new_leaf_length= (uint)(tmp_length + t_length + p_length); 1209 DBUG_ASSERT(new_leaf_length <= share->max_index_block_size); 1210 1211 leaf_page->size= new_leaf_length; 1212 leaf_page->flag= page_flag; 1213 page_store_info(share, leaf_page); 1214 1215 new_buff_length= (uint) (endpos - next_page.buff); 1216 next_page.size= new_buff_length; 1217 page_store_size(share, &next_page); 1218 1219 if (share->now_transactional) 1220 { 1221 /* 1222 Log changes to parent page 1223 This has one key deleted from it and one key inserted to it at 1224 keypos 1225 1226 ma_log_add() ensures that we don't log changes that is outside of 1227 key block size, as the REDO code can't handle that 1228 */ 1229 if (_ma_log_add(anc_page, anc_length, keypos, 1230 anc_key_inserted.move_length + 1231 MY_MAX(anc_key_inserted.changed_length - 1232 anc_key_inserted.move_length, 1233 key_deleted.changed_length), 1234 anc_key_inserted.move_length - 1235 key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4)) 1236 goto err; 1237 1238 /* 1239 Log changes to leaf page. 1240 This contains original data with new data added first 1241 */ 1242 DBUG_ASSERT(leaf_length <= new_leaf_length); 1243 DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length); 1244 if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length, 1245 (int) (new_leaf_length - leaf_length), 1246 KEY_OP_DEBUG_LOG_PREFIX_2)) 1247 goto err; 1248 /* 1249 Log changes to next page 1250 This contains original data with some suffix data deleted 1251 */ 1252 DBUG_ASSERT(new_buff_length <= buff_length); 1253 if (_ma_log_suffix(&next_page, buff_length, new_buff_length)) 1254 goto err; 1255 } 1256 1257 page_mark_changed(info, leaf_page); 1258 if (_ma_write_keypage(leaf_page, 1259 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS)) 1260 goto err; 1261 } 1262 page_mark_changed(info, &next_page); 1263 if (_ma_write_keypage(&next_page, 1264 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS)) 1265 goto err; 1266 1267 DBUG_RETURN(new_anc_length <= 1268 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH : 1269 (uint) keyinfo->underflow_block_length))); 1270 1271 err: 1272 DBUG_RETURN(-1); 1273 } /* underflow */ 1274 1275 1276 /** 1277 @brief Remove a key from page 1278 1279 @fn remove_key() 1280 keyinfo Key handle 1281 nod_flag Length of node ptr 1282 keypos Where on page key starts 1283 lastkey Buffer for storing keys to be removed 1284 page_end Pointer to end of page 1285 next_block If <> 0 and node-page, this is set to address of 1286 next page 1287 s_temp Information about what changes was done one the page: 1288 s_temp.key_pos Start of key 1289 s_temp.move_length Number of bytes removed at keypos 1290 s_temp.changed_length Number of bytes changed at keypos 1291 1292 @todo 1293 The current code doesn't handle the case that the next key may be 1294 packed better against the previous key if there is a case difference 1295 1296 @return 1297 @retval 0 error 1298 @retval # How many chars was removed 1299 */ 1300 1301 static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag, 1302 uchar *keypos, uchar *lastkey, 1303 uchar *page_end, my_off_t *next_block, 1304 MARIA_KEY_PARAM *s_temp) 1305 { 1306 int s_length; 1307 uchar *start; 1308 DBUG_ENTER("remove_key"); 1309 DBUG_PRINT("enter", ("keypos:%p page_end: %p", 1310 keypos, page_end)); 1311 1312 start= s_temp->key_pos= keypos; 1313 s_temp->changed_length= 0; 1314 if (!(keyinfo->flag & 1315 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY | 1316 HA_BINARY_PACK_KEY)) && 1317 !(page_flag & KEYPAGE_FLAG_HAS_TRANSID)) 1318 { 1319 /* Static length key */ 1320 s_length=(int) (keyinfo->keylength+nod_flag); 1321 if (next_block && nod_flag) 1322 *next_block= _ma_kpos(nod_flag,keypos+s_length); 1323 } 1324 else 1325 { 1326 /* Let keypos point at next key */ 1327 MARIA_KEY key; 1328 1329 /* Calculate length of key */ 1330 key.keyinfo= keyinfo; 1331 key.data= lastkey; 1332 if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos)) 1333 DBUG_RETURN(0); /* Error */ 1334 1335 if (next_block && nod_flag) 1336 *next_block= _ma_kpos(nod_flag,keypos); 1337 s_length=(int) (keypos-start); 1338 if (keypos != page_end) 1339 { 1340 if (keyinfo->flag & HA_BINARY_PACK_KEY) 1341 { 1342 uchar *old_key= start; 1343 uint next_length,prev_length,prev_pack_length; 1344 1345 /* keypos points here on start of next key */ 1346 get_key_length(next_length,keypos); 1347 get_key_pack_length(prev_length,prev_pack_length,old_key); 1348 if (next_length > prev_length) 1349 { 1350 uint diff= (next_length-prev_length); 1351 /* We have to copy data from the current key to the next key */ 1352 keypos-= diff + prev_pack_length; 1353 store_key_length(keypos, prev_length); 1354 bmove(keypos + prev_pack_length, lastkey + prev_length, diff); 1355 s_length=(int) (keypos-start); 1356 s_temp->changed_length= diff + prev_pack_length; 1357 } 1358 } 1359 else 1360 { 1361 /* Check if a variable length first key part */ 1362 if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128) 1363 { 1364 /* Next key is packed against the current one */ 1365 uint next_length,prev_length,prev_pack_length,lastkey_length, 1366 rest_length; 1367 if (keyinfo->seg[0].length >= 127) 1368 { 1369 if (!(prev_length=mi_uint2korr(start) & 32767)) 1370 goto end; 1371 next_length=mi_uint2korr(keypos) & 32767; 1372 keypos+=2; 1373 prev_pack_length=2; 1374 } 1375 else 1376 { 1377 if (!(prev_length= *start & 127)) 1378 goto end; /* Same key as previous*/ 1379 next_length= *keypos & 127; 1380 keypos++; 1381 prev_pack_length=1; 1382 } 1383 if (!(*start & 128)) 1384 prev_length=0; /* prev key not packed */ 1385 if (keyinfo->seg[0].flag & HA_NULL_PART) 1386 lastkey++; /* Skip null marker */ 1387 get_key_length(lastkey_length,lastkey); 1388 if (!next_length) /* Same key after */ 1389 { 1390 next_length=lastkey_length; 1391 rest_length=0; 1392 } 1393 else 1394 get_key_length(rest_length,keypos); 1395 1396 if (next_length >= prev_length) 1397 { 1398 /* Next key is based on deleted key */ 1399 uint pack_length; 1400 uint diff= (next_length-prev_length); 1401 1402 /* keypos points to data of next key (after key length) */ 1403 bmove(keypos - diff, lastkey + prev_length, diff); 1404 rest_length+= diff; 1405 pack_length= prev_length ? get_pack_length(rest_length): 0; 1406 keypos-= diff + pack_length + prev_pack_length; 1407 s_length=(int) (keypos-start); 1408 if (prev_length) /* Pack against prev key */ 1409 { 1410 *keypos++= start[0]; 1411 if (prev_pack_length == 2) 1412 *keypos++= start[1]; 1413 store_key_length(keypos,rest_length); 1414 } 1415 else 1416 { 1417 /* Next key is not packed anymore */ 1418 if (keyinfo->seg[0].flag & HA_NULL_PART) 1419 { 1420 rest_length++; /* Mark not null */ 1421 } 1422 if (prev_pack_length == 2) 1423 { 1424 mi_int2store(keypos,rest_length); 1425 } 1426 else 1427 *keypos= rest_length; 1428 } 1429 s_temp->changed_length= diff + pack_length + prev_pack_length; 1430 } 1431 } 1432 } 1433 } 1434 } 1435 end: 1436 bmove(start, start+s_length, (uint) (page_end-start-s_length)); 1437 s_temp->move_length= s_length; 1438 DBUG_RETURN((uint) s_length); 1439 } /* remove_key */ 1440 1441 1442 /**************************************************************************** 1443 Logging of redos 1444 ****************************************************************************/ 1445 1446 /** 1447 @brief 1448 log entry where some parts are deleted and some things are changed 1449 and some data could be added last. 1450 1451 @fn _ma_log_delete() 1452 @param info Maria handler 1453 @param page Pageaddress for changed page 1454 @param buff Page buffer 1455 @param key_pos Start of change area 1456 @param changed_length How many bytes where changed at key_pos 1457 @param move_length How many bytes where deleted at key_pos 1458 @param append_length Length of data added last 1459 This is taken from end of ma_page->buff 1460 1461 This is mainly used when a key is deleted. The append happens 1462 when we delete a key from a page with data > block_size kept in 1463 memory and we have to add back the data that was stored > block_size 1464 */ 1465 1466 my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos, 1467 uint changed_length, uint move_length, 1468 uint append_length __attribute__((unused)), 1469 enum en_key_debug debug_marker __attribute__((unused))) 1470 { 1471 LSN lsn; 1472 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7]; 1473 uchar *log_pos; 1474 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7]; 1475 uint translog_parts, current_size, extra_length; 1476 uint offset= (uint) (key_pos - ma_page->buff); 1477 MARIA_HA *info= ma_page->info; 1478 MARIA_SHARE *share= info->s; 1479 my_off_t page= ma_page->pos / share->block_size; 1480 DBUG_ENTER("_ma_log_delete"); 1481 DBUG_PRINT("enter", ("page: %lu offset: %u changed_length: %u move_length: %u append_length: %u page_size: %u", 1482 (ulong) page, offset, changed_length, move_length, 1483 append_length, ma_page->size)); 1484 DBUG_ASSERT(share->now_transactional && move_length); 1485 DBUG_ASSERT(offset + changed_length <= ma_page->size); 1486 DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size); 1487 DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header); 1488 1489 /* Store address of new root page */ 1490 page_store(log_data + FILEID_STORE_SIZE, page); 1491 log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE; 1492 current_size= ma_page->org_size; 1493 1494 #ifdef EXTRA_DEBUG_KEY_CHANGES 1495 *log_pos++= KEY_OP_DEBUG; 1496 *log_pos++= debug_marker; 1497 1498 *log_pos++= KEY_OP_DEBUG_2; 1499 int2store(log_pos, ma_page->org_size); 1500 int2store(log_pos+2, ma_page->size); 1501 log_pos+=4; 1502 #endif 1503 1504 /* Store keypage_flag */ 1505 *log_pos++= KEY_OP_SET_PAGEFLAG; 1506 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff); 1507 1508 log_pos[0]= KEY_OP_OFFSET; 1509 int2store(log_pos+1, offset); 1510 log_pos+= 3; 1511 translog_parts= TRANSLOG_INTERNAL_PARTS + 1; 1512 extra_length= 0; 1513 1514 if (changed_length) 1515 { 1516 if (offset + changed_length >= share->max_index_block_size) 1517 { 1518 changed_length= share->max_index_block_size - offset; 1519 move_length= 0; /* Nothing to move */ 1520 current_size= share->max_index_block_size; 1521 } 1522 1523 log_pos[0]= KEY_OP_CHANGE; 1524 int2store(log_pos+1, changed_length); 1525 log_pos+= 3; 1526 log_array[translog_parts].str= ma_page->buff + offset; 1527 log_array[translog_parts].length= changed_length; 1528 translog_parts++; 1529 1530 /* We only have to move things after offset+changed_length */ 1531 offset+= changed_length; 1532 } 1533 1534 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; 1535 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data); 1536 1537 if (move_length) 1538 { 1539 uint log_length; 1540 if (offset + move_length < share->max_index_block_size) 1541 { 1542 /* 1543 Move down things that is on page. 1544 page_offset in apply_redo_inxed() will be at original offset 1545 + changed_length. 1546 */ 1547 log_pos[0]= KEY_OP_SHIFT; 1548 int2store(log_pos+1, - (int) move_length); 1549 log_length= 3; 1550 current_size-= move_length; 1551 } 1552 else 1553 { 1554 /* Delete to end of page */ 1555 uint tmp= current_size - offset; 1556 current_size= offset; 1557 log_pos[0]= KEY_OP_DEL_SUFFIX; 1558 int2store(log_pos+1, tmp); 1559 log_length= 3; 1560 } 1561 log_array[translog_parts].str= log_pos; 1562 log_array[translog_parts].length= log_length; 1563 translog_parts++; 1564 log_pos+= log_length; 1565 extra_length+= log_length; 1566 } 1567 1568 if (current_size != ma_page->size && 1569 current_size != share->max_index_block_size) 1570 { 1571 /* Append data that didn't fit on the page before */ 1572 uint length= (MY_MIN(ma_page->size, share->max_index_block_size) - 1573 current_size); 1574 uchar *data= ma_page->buff + current_size; 1575 1576 DBUG_ASSERT(length <= append_length); 1577 1578 log_pos[0]= KEY_OP_ADD_SUFFIX; 1579 int2store(log_pos+1, length); 1580 log_array[translog_parts].str= log_pos; 1581 log_array[translog_parts].length= 3; 1582 log_array[translog_parts + 1].str= data; 1583 log_array[translog_parts + 1].length= length; 1584 log_pos+= 3; 1585 translog_parts+= 2; 1586 current_size+= length; 1587 extra_length+= 3 + length; 1588 } 1589 1590 _ma_log_key_changes(ma_page, 1591 log_array + translog_parts, 1592 log_pos, &extra_length, &translog_parts); 1593 /* Remember new page length for future log entires for same page */ 1594 ma_page->org_size= current_size; 1595 1596 if (translog_write_record(&lsn, LOGREC_REDO_INDEX, 1597 info->trn, info, 1598 (translog_size_t) 1599 log_array[TRANSLOG_INTERNAL_PARTS].length + 1600 changed_length + extra_length, translog_parts, 1601 log_array, log_data, NULL)) 1602 DBUG_RETURN(1); 1603 1604 DBUG_RETURN(0); 1605 } 1606 1607 1608 /**************************************************************************** 1609 Logging of undos 1610 ****************************************************************************/ 1611 1612 my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key, 1613 my_off_t new_root, LSN *res_lsn) 1614 { 1615 MARIA_SHARE *share= info->s; 1616 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + 1617 KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos; 1618 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; 1619 struct st_msg_to_write_hook_for_undo_key msg; 1620 enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE; 1621 uint keynr= key->keyinfo->key_nr; 1622 1623 lsn_store(log_data, info->trn->undo_lsn); 1624 key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr); 1625 log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE; 1626 1627 /** 1628 @todo BUG if we had concurrent insert/deletes, reading state's key_root 1629 like this would be unsafe. 1630 */ 1631 if (new_root != share->state.key_root[keynr]) 1632 { 1633 my_off_t page; 1634 page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO : 1635 new_root / share->block_size); 1636 page_store(log_pos, page); 1637 log_pos+= PAGE_STORE_SIZE; 1638 log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT; 1639 } 1640 1641 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; 1642 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data); 1643 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data; 1644 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length + 1645 key->ref_length); 1646 1647 msg.root= &share->state.key_root[keynr]; 1648 msg.value= new_root; 1649 /* 1650 set autoincrement to 1 if this is an auto_increment key 1651 This is only used if we are now in a rollback of a duplicate key 1652 */ 1653 msg.auto_increment= share->base.auto_key == keynr + 1; 1654 1655 return translog_write_record(res_lsn, log_type, 1656 info->trn, info, 1657 (translog_size_t) 1658 (log_array[TRANSLOG_INTERNAL_PARTS + 0].length + 1659 log_array[TRANSLOG_INTERNAL_PARTS + 1].length), 1660 TRANSLOG_INTERNAL_PARTS + 2, log_array, 1661 log_data + LSN_STORE_SIZE, &msg) ? -1 : 0; 1662 } 1663