1 /* Copyright (C) 2004-2008 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 Copyright (C) 2008-2009 Sun Microsystems, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 /* Write a row to a MARIA table */
18
19 #include "ma_fulltext.h"
20 #include "ma_rt_index.h"
21 #include "trnman.h"
22 #include "ma_key_recover.h"
23 #include "ma_blockrec.h"
24
25 /* Functions declared in this file */
26
27 static int w_search(MARIA_HA *info, uint32 comp_flag,
28 MARIA_KEY *key, my_off_t page,
29 MARIA_PAGE *father_page, uchar *father_keypos,
30 my_bool insert_last);
31 static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
32 MARIA_KEY *key, MARIA_PAGE *curr_page,
33 MARIA_PAGE *father_page,
34 uchar *father_key_pos, MARIA_KEY_PARAM *s_temp);
35 static uchar *_ma_find_last_pos(MARIA_KEY *int_key,
36 MARIA_PAGE *page, uchar **after_key);
37 static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key);
38 static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key);
39 static my_bool _ma_ck_write_btree_with_log(MARIA_HA *, MARIA_KEY *, my_off_t *,
40 uint32);
41 static my_bool _ma_log_split(MARIA_PAGE *page, uint org_length,
42 uint new_length,
43 const uchar *key_pos,
44 uint key_length, int move_length,
45 enum en_key_op prefix_or_suffix,
46 const uchar *data, uint data_length,
47 uint changed_length);
48 static my_bool _ma_log_del_prefix(MARIA_PAGE *page,
49 uint org_length, uint new_length,
50 const uchar *key_pos, uint key_length,
51 int move_length);
52 static my_bool _ma_log_key_middle(MARIA_PAGE *page,
53 uint new_length,
54 uint data_added_first,
55 uint data_changed_first,
56 uint data_deleted_last,
57 const uchar *key_pos,
58 uint key_length, int move_length);
59
60 /*
61 @brief Default handler for returing position to new row
62
63 @note
64 This is only called for non transactional tables and not for block format
65 which is why we use info->state here.
66 */
67
_ma_write_init_default(MARIA_HA * info,const uchar * record)68 MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info,
69 const uchar *record
70 __attribute__((unused)))
71 {
72 return ((info->s->state.dellink != HA_OFFSET_ERROR &&
73 !info->append_insert_at_end) ?
74 info->s->state.dellink :
75 info->state->data_file_length);
76 }
77
_ma_write_abort_default(MARIA_HA * info)78 my_bool _ma_write_abort_default(MARIA_HA *info __attribute__((unused)))
79 {
80 return 0;
81 }
82
83
84 /* Write new record to a table */
85
maria_write(MARIA_HA * info,const uchar * record)86 int maria_write(MARIA_HA *info, const uchar *record)
87 {
88 MARIA_SHARE *share= info->s;
89 uint i;
90 int save_errno;
91 MARIA_RECORD_POS filepos, oldpos= info->cur_row.lastpos;
92 uchar *buff;
93 my_bool lock_tree= share->lock_key_trees;
94 my_bool fatal_error;
95 MARIA_KEYDEF *keyinfo;
96 DBUG_ENTER("maria_write");
97 DBUG_PRINT("enter",("index_file: %d data_file: %d",
98 share->kfile.file, info->dfile.file));
99
100 DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
101 maria_print_error(info->s, HA_ERR_CRASHED);
102 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
103 if (share->options & HA_OPTION_READ_ONLY_DATA)
104 {
105 DBUG_RETURN(my_errno=EACCES);
106 }
107 if (_ma_readinfo(info,F_WRLCK,1))
108 DBUG_RETURN(my_errno);
109
110 if ((share->state.changed & STATE_DATA_FILE_FULL) ||
111 (share->base.reloc == (ha_rows) 1 &&
112 share->base.records == (ha_rows) 1 &&
113 share->state.state.records == (ha_rows) 1))
114 { /* System file */
115 my_errno=HA_ERR_RECORD_FILE_FULL;
116 goto err2;
117 }
118 if (share->state.state.key_file_length >= share->base.margin_key_file_length)
119 {
120 my_errno=HA_ERR_INDEX_FILE_FULL;
121 goto err2;
122 }
123 if (_ma_mark_file_changed(share))
124 goto err2;
125
126 /* Calculate and check all unique constraints */
127
128 if (share->state.header.uniques)
129 {
130 for (i=0 ; i < share->state.header.uniques ; i++)
131 {
132 MARIA_UNIQUEDEF *def= share->uniqueinfo + i;
133 ha_checksum unique_hash= _ma_unique_hash(share->uniqueinfo+i,record);
134 if (maria_is_key_active(share->state.key_map, def->key))
135 {
136 if (_ma_check_unique(info, def, record,
137 unique_hash, HA_OFFSET_ERROR))
138 goto err2;
139 }
140 else
141 maria_unique_store(record+ share->keyinfo[def->key].seg->start,
142 unique_hash);
143 }
144 }
145
146 /* Ensure we don't try to restore auto_increment if it doesn't change */
147 info->last_auto_increment= ~(ulonglong) 0;
148
149 if ((info->opt_flag & OPT_NO_ROWS))
150 filepos= HA_OFFSET_ERROR;
151 else
152 {
153 /*
154 This may either calculate a record or, or write the record and return
155 the record id
156 */
157 if ((filepos= (*share->write_record_init)(info, record)) ==
158 HA_OFFSET_ERROR)
159 goto err2;
160 }
161
162 /* Write all keys to indextree */
163 buff= info->lastkey_buff2;
164 for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
165 {
166 MARIA_KEY int_key;
167 if (maria_is_key_active(share->state.key_map, i))
168 {
169 my_bool local_lock_tree= (lock_tree &&
170 !(info->bulk_insert &&
171 is_tree_inited(&info->bulk_insert[i])));
172 if (local_lock_tree)
173 {
174 mysql_rwlock_wrlock(&keyinfo->root_lock);
175 keyinfo->version++;
176 }
177 if (keyinfo->flag & HA_FULLTEXT )
178 {
179 if (_ma_ft_add(info,i, buff,record,filepos))
180 {
181 if (local_lock_tree)
182 mysql_rwlock_unlock(&keyinfo->root_lock);
183 DBUG_PRINT("error",("Got error: %d on write",my_errno));
184 goto err;
185 }
186 }
187 else
188 {
189 while (keyinfo->ck_insert(info,
190 (*keyinfo->make_key)(info, &int_key, i,
191 buff, record, filepos,
192 info->trn->trid)))
193 {
194 TRN *blocker;
195 DBUG_PRINT("error",("Got error: %d on write",my_errno));
196 /*
197 explicit check to filter out temp tables, they aren't
198 transactional and don't have a proper TRN so the code
199 below doesn't work for them.
200 Also, filter out non-thread maria use, and table modified in
201 the same transaction.
202 At last, filter out non-dup-unique errors.
203 */
204 if (!local_lock_tree)
205 goto err;
206 if (info->dup_key_trid == info->trn->trid ||
207 my_errno != HA_ERR_FOUND_DUPP_KEY)
208 {
209 mysql_rwlock_unlock(&keyinfo->root_lock);
210 goto err;
211 }
212 /* Different TrIDs: table must be transactional */
213 DBUG_ASSERT(share->base.born_transactional);
214 /*
215 If transactions are disabled, and dup_key_trid is different from
216 our TrID, it must be ALTER TABLE with dup_key_trid==0 (no
217 transaction). ALTER TABLE does have MARIA_HA::TRN not dummy but
218 puts TrID=0 in rows/keys.
219 */
220 DBUG_ASSERT(share->now_transactional ||
221 (info->dup_key_trid == 0));
222 blocker= trnman_trid_to_trn(info->trn, info->dup_key_trid);
223 /*
224 if blocker TRN was not found, it means that the conflicting
225 transaction was committed long time ago. It could not be
226 aborted, as it would have to wait on the key tree lock
227 to remove the conflicting key it has inserted.
228 */
229 if (!blocker || blocker->commit_trid != ~(TrID)0)
230 { /* committed */
231 if (blocker)
232 mysql_mutex_unlock(& blocker->state_lock);
233 mysql_rwlock_unlock(&keyinfo->root_lock);
234 goto err;
235 }
236 mysql_rwlock_unlock(&keyinfo->root_lock);
237 {
238 /* running. now we wait */
239 WT_RESOURCE_ID rc;
240 int res;
241 PSI_stage_info old_stage_info;
242
243 rc.type= &ma_rc_dup_unique;
244 /* TODO savepoint id when we'll have them */
245 rc.value= (intptr)blocker;
246 res= wt_thd_will_wait_for(info->trn->wt, blocker->wt, & rc);
247 if (res != WT_OK)
248 {
249 mysql_mutex_unlock(& blocker->state_lock);
250 my_errno= HA_ERR_LOCK_DEADLOCK;
251 goto err;
252 }
253 proc_info_hook(0, &stage_waiting_for_a_resource, &old_stage_info,
254 __func__, __FILE__, __LINE__);
255 res= wt_thd_cond_timedwait(info->trn->wt, & blocker->state_lock);
256 proc_info_hook(0, &old_stage_info, 0, __func__, __FILE__, __LINE__);
257
258 mysql_mutex_unlock(& blocker->state_lock);
259 if (res != WT_OK)
260 {
261 my_errno= res == WT_TIMEOUT ? HA_ERR_LOCK_WAIT_TIMEOUT
262 : HA_ERR_LOCK_DEADLOCK;
263 goto err;
264 }
265 }
266 mysql_rwlock_wrlock(&keyinfo->root_lock);
267 #ifndef MARIA_CANNOT_ROLLBACK
268 keyinfo->version++;
269 #endif
270 }
271 }
272
273 /* The above changed info->lastkey2. Inform maria_rnext_same(). */
274 info->update&= ~HA_STATE_RNEXT_SAME;
275
276 if (local_lock_tree)
277 mysql_rwlock_unlock(&keyinfo->root_lock);
278 }
279 }
280 if (share->calc_write_checksum)
281 info->cur_row.checksum= (*share->calc_write_checksum)(info,record);
282 if (filepos != HA_OFFSET_ERROR)
283 {
284 if ((*share->write_record)(info,record))
285 goto err;
286 info->state->checksum+= info->cur_row.checksum;
287 }
288 if (!share->now_transactional)
289 {
290 if (share->base.auto_key != 0)
291 {
292 const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg;
293 const uchar *key= record + keyseg->start;
294 set_if_bigger(share->state.auto_increment,
295 ma_retrieve_auto_increment(key, keyseg->type));
296 }
297 }
298 info->state->records++;
299 info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
300 HA_STATE_ROW_CHANGED);
301 info->row_changes++;
302 share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
303 info->state->changed= 1;
304
305 info->cur_row.lastpos= oldpos;
306 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
307 if (info->invalidator != 0)
308 {
309 DBUG_PRINT("info", ("invalidator... '%s' (update)",
310 share->open_file_name.str));
311 (*info->invalidator)(share->open_file_name.str);
312 info->invalidator=0;
313 }
314
315 /*
316 Update status of the table. We need to do so after each row write
317 for the log tables, as we want the new row to become visible to
318 other threads as soon as possible. We don't lock mutex here
319 (as it is required by pthread memory visibility rules) as (1) it's
320 not critical to use outdated share->is_log_table value (2) locking
321 mutex here for every write is too expensive.
322 */
323 if (share->is_log_table)
324 _ma_update_status((void*) info);
325
326 DBUG_RETURN(0);
327
328 err:
329 save_errno= my_errno;
330 fatal_error= 0;
331 if (my_errno == HA_ERR_FOUND_DUPP_KEY ||
332 my_errno == HA_ERR_RECORD_FILE_FULL ||
333 my_errno == HA_ERR_LOCK_DEADLOCK ||
334 my_errno == HA_ERR_LOCK_WAIT_TIMEOUT ||
335 my_errno == HA_ERR_NULL_IN_SPATIAL ||
336 my_errno == HA_ERR_OUT_OF_MEM)
337 {
338 info->errkey= i < share->base.keys ? (int) i : -1;
339 /*
340 We delete keys in the reverse order of insertion. This is the order that
341 a rollback would do and is important for CLR_ENDs generated by
342 _ma_ft|ck_delete() and write_record_abort() to work (with any other
343 order they would cause wrong jumps in the chain).
344 */
345 while ( i-- > 0)
346 {
347 if (maria_is_key_active(share->state.key_map, i))
348 {
349 my_bool local_lock_tree= (lock_tree &&
350 !(info->bulk_insert &&
351 is_tree_inited(&info->bulk_insert[i])));
352 keyinfo= share->keyinfo + i;
353 if (local_lock_tree)
354 mysql_rwlock_wrlock(&keyinfo->root_lock);
355 /**
356 @todo RECOVERY BUG
357 The key deletes below should generate CLR_ENDs
358 */
359 if (keyinfo->flag & HA_FULLTEXT)
360 {
361 if (_ma_ft_del(info,i,buff,record,filepos))
362 {
363 fatal_error= 1;
364 if (local_lock_tree)
365 mysql_rwlock_unlock(&keyinfo->root_lock);
366 break;
367 }
368 }
369 else
370 {
371 MARIA_KEY key;
372 if (keyinfo->ck_delete(info,
373 (*keyinfo->make_key)(info, &key, i, buff,
374 record,
375 filepos,
376 info->trn->trid)))
377 {
378 fatal_error= 1;
379 if (local_lock_tree)
380 mysql_rwlock_unlock(&keyinfo->root_lock);
381 break;
382 }
383 }
384 if (local_lock_tree)
385 mysql_rwlock_unlock(&keyinfo->root_lock);
386 }
387 }
388 }
389 else
390 fatal_error= 1;
391
392 if (filepos != HA_OFFSET_ERROR)
393 {
394 if ((*share->write_record_abort)(info))
395 fatal_error= 1;
396 }
397
398 if (info->bulk_insert)
399 {
400 uint j;
401 for (j=0 ; j < share->base.keys ; j++)
402 maria_flush_bulk_insert(info, j);
403 }
404
405 if (fatal_error)
406 {
407 maria_print_error(info->s, HA_ERR_CRASHED);
408 maria_mark_crashed(info);
409 }
410
411 info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
412 my_errno=save_errno;
413 err2:
414 save_errno=my_errno;
415 DBUG_ASSERT(save_errno);
416 if (!save_errno)
417 save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
418 DBUG_PRINT("error", ("got error: %d", save_errno));
419 _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
420 DBUG_RETURN(my_errno=save_errno);
421 } /* maria_write */
422
423
424 /*
425 Write one key to btree
426
427 TODO
428 Remove this function and have bulk insert change keyinfo->ck_insert
429 to point to the right function
430 */
431
_ma_ck_write(MARIA_HA * info,MARIA_KEY * key)432 my_bool _ma_ck_write(MARIA_HA *info, MARIA_KEY *key)
433 {
434 DBUG_ENTER("_ma_ck_write");
435
436 if (info->bulk_insert &&
437 is_tree_inited(&info->bulk_insert[key->keyinfo->key_nr]))
438 {
439 DBUG_RETURN(_ma_ck_write_tree(info, key));
440 }
441 DBUG_RETURN(_ma_ck_write_btree(info, key));
442 } /* _ma_ck_write */
443
444
445 /**********************************************************************
446 Insert key into btree (normal case)
447 **********************************************************************/
448
_ma_ck_write_btree(MARIA_HA * info,MARIA_KEY * key)449 static my_bool _ma_ck_write_btree(MARIA_HA *info, MARIA_KEY *key)
450 {
451 my_bool error;
452 MARIA_KEYDEF *keyinfo= key->keyinfo;
453 my_off_t *root= &info->s->state.key_root[keyinfo->key_nr];
454 DBUG_ENTER("_ma_ck_write_btree");
455
456 error= _ma_ck_write_btree_with_log(info, key, root,
457 keyinfo->write_comp_flag | key->flag);
458 if (info->ft1_to_ft2)
459 {
460 if (!error)
461 error= _ma_ft_convert_to_ft2(info, key);
462 delete_dynamic(info->ft1_to_ft2);
463 my_free(info->ft1_to_ft2);
464 info->ft1_to_ft2=0;
465 }
466 DBUG_RETURN(error);
467 } /* _ma_ck_write_btree */
468
469
470 /**
471 @brief Write a key to the b-tree
472
473 @retval 1 error
474 @retval 0 ok
475 */
476
_ma_ck_write_btree_with_log(MARIA_HA * info,MARIA_KEY * key,my_off_t * root,uint32 comp_flag)477 static my_bool _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key,
478 my_off_t *root, uint32 comp_flag)
479 {
480 MARIA_SHARE *share= info->s;
481 LSN lsn= LSN_IMPOSSIBLE;
482 int error;
483 my_off_t new_root= *root;
484 uchar key_buff[MARIA_MAX_KEY_BUFF];
485 MARIA_KEY org_key; /* Set/used when now_transactional=TRUE */
486 my_bool transactional= share->now_transactional;
487 DBUG_ENTER("_ma_ck_write_btree_with_log");
488
489 LINT_INIT_STRUCT(org_key);
490
491 if (transactional)
492 {
493 /* Save original value as the key may change */
494 org_key= *key;
495 memcpy(key_buff, key->data, key->data_length + key->ref_length);
496 }
497
498 error= _ma_ck_real_write_btree(info, key, &new_root, comp_flag);
499 if (!error && transactional)
500 {
501 /* Log the original value */
502 *key= org_key;
503 key->data= key_buff;
504 error= _ma_write_undo_key_insert(info, key, root, new_root, &lsn);
505 }
506 else
507 {
508 *root= new_root;
509 _ma_fast_unlock_key_del(info);
510 }
511 _ma_unpin_all_pages_and_finalize_row(info, lsn);
512
513 DBUG_RETURN(error != 0);
514 } /* _ma_ck_write_btree_with_log */
515
516
517 /**
518 @brief Write a key to the b-tree
519
520 @retval 1 error
521 @retval 0 ok
522 */
523
_ma_ck_real_write_btree(MARIA_HA * info,MARIA_KEY * key,my_off_t * root,uint32 comp_flag)524 my_bool _ma_ck_real_write_btree(MARIA_HA *info, MARIA_KEY *key, my_off_t *root,
525 uint32 comp_flag)
526 {
527 int error;
528 DBUG_ENTER("_ma_ck_real_write_btree");
529
530 /* key_length parameter is used only if comp_flag is SEARCH_FIND */
531 if (*root == HA_OFFSET_ERROR ||
532 (error= w_search(info, comp_flag, key, *root, (MARIA_PAGE *) 0,
533 (uchar*) 0, 1)) > 0)
534 error= _ma_enlarge_root(info, key, root);
535 DBUG_RETURN(error != 0);
536 } /* _ma_ck_real_write_btree */
537
538
539 /**
540 @brief Make a new root with key as only pointer
541
542 @retval 1 error
543 @retval 0 ok
544 */
545
_ma_enlarge_root(MARIA_HA * info,MARIA_KEY * key,my_off_t * root)546 my_bool _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root)
547 {
548 uint t_length, nod_flag;
549 MARIA_KEY_PARAM s_temp;
550 MARIA_SHARE *share= info->s;
551 MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
552 MARIA_KEYDEF *keyinfo= key->keyinfo;
553 MARIA_PAGE page;
554 my_bool res= 0;
555 DBUG_ENTER("_ma_enlarge_root");
556
557 page.info= info;
558 page.keyinfo= keyinfo;
559 page.buff= info->buff;
560 page.flag= 0;
561
562 nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
563 /* Store pointer to prev page if nod */
564 _ma_kpointer(info, page.buff + share->keypage_header, *root);
565 t_length= (*keyinfo->pack_key)(key, nod_flag, (uchar*) 0,
566 (uchar*) 0, (uchar*) 0, &s_temp);
567 page.size= share->keypage_header + t_length + nod_flag;
568
569 bzero(page.buff, share->keypage_header);
570 _ma_store_keynr(share, page.buff, keyinfo->key_nr);
571 if (nod_flag)
572 page.flag|= KEYPAGE_FLAG_ISNOD;
573 if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
574 page.flag|= KEYPAGE_FLAG_HAS_TRANSID;
575 (*keyinfo->store_key)(keyinfo, page.buff + share->keypage_header +
576 nod_flag, &s_temp);
577
578 /* Mark that info->buff was used */
579 info->keyread_buff_used= info->page_changed= 1;
580 if ((page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
581 HA_OFFSET_ERROR)
582 DBUG_RETURN(1);
583 *root= page.pos;
584
585 page_store_info(share, &page);
586
587 /*
588 Clear unitialized part of page to avoid valgrind/purify warnings
589 and to get a clean page that is easier to compress and compare with
590 pages generated with redo
591 */
592 bzero(page.buff + page.size, share->block_size - page.size);
593
594 if (share->now_transactional && _ma_log_new(&page, 1))
595 res= 1;
596
597 if (_ma_write_keypage(&page, page_link->write_lock,
598 PAGECACHE_PRIORITY_HIGH))
599 res= 1;
600
601 DBUG_RETURN(res);
602 } /* _ma_enlarge_root */
603
604
605 /*
606 Search after a position for a key and store it there
607
608 TODO:
609 Change this to use pagecache directly instead of creating a copy
610 of the page. To do this, we must however change write-key-on-page
611 algorithm to not overwrite the buffer but instead store any overflow
612 key in a separate buffer.
613
614 @return
615 @retval -1 error
616 @retval 0 ok
617 @retval > 0 Key should be stored in higher tree
618 */
619
w_search(register MARIA_HA * info,uint32 comp_flag,MARIA_KEY * key,my_off_t page_pos,MARIA_PAGE * father_page,uchar * father_keypos,my_bool insert_last)620 static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
621 my_off_t page_pos,
622 MARIA_PAGE *father_page, uchar *father_keypos,
623 my_bool insert_last)
624 {
625 int error,flag;
626 uchar *temp_buff,*keypos,*keybuff;
627 my_bool was_last_key, buff_alloced;
628 my_off_t next_page, dup_key_pos;
629 MARIA_SHARE *share= info->s;
630 MARIA_KEYDEF *keyinfo= key->keyinfo;
631 MARIA_PAGE page;
632 DBUG_ENTER("w_search");
633 DBUG_PRINT("enter", ("page: %lu", (ulong) (page_pos/keyinfo->block_length)));
634
635 alloc_on_stack(*info->stack_end_ptr, temp_buff, buff_alloced,
636 (keyinfo->block_length + keyinfo->max_store_length*3));
637 if (!temp_buff)
638 DBUG_RETURN(1);
639
640 keybuff= temp_buff + (keyinfo->block_length + keyinfo->max_store_length*2);
641
642 if (_ma_fetch_keypage(&page, info, keyinfo, page_pos, PAGECACHE_LOCK_WRITE,
643 DFLT_INIT_HITS, temp_buff, 0))
644 goto err;
645
646 flag= (*keyinfo->bin_search)(key, &page, comp_flag, &keypos,
647 keybuff, &was_last_key);
648 if (flag == 0)
649 {
650 MARIA_KEY tmp_key;
651 /* get position to record with duplicated key */
652
653 tmp_key.keyinfo= keyinfo;
654 tmp_key.data= keybuff;
655
656 if ((*keyinfo->get_key)(&tmp_key, page.flag, page.node, &keypos))
657 dup_key_pos= _ma_row_pos_from_key(&tmp_key);
658 else
659 dup_key_pos= HA_OFFSET_ERROR;
660
661 if (keyinfo->flag & HA_FULLTEXT)
662 {
663 uint off;
664 int subkeys;
665
666 get_key_full_length_rdonly(off, keybuff);
667 subkeys=ft_sintXkorr(keybuff+off);
668 comp_flag=SEARCH_SAME;
669 if (subkeys >= 0)
670 {
671 /* normal word, one-level tree structure */
672 flag=(*keyinfo->bin_search)(key, &page, comp_flag,
673 &keypos, keybuff, &was_last_key);
674 }
675 else
676 {
677 /* popular word. two-level tree. going down */
678 my_off_t root= dup_key_pos;
679 MARIA_KEY subkey;
680 get_key_full_length_rdonly(off, key->data);
681 subkey.keyinfo= keyinfo= &share->ft2_keyinfo;
682 subkey.data= key->data + off;
683 subkey.data_length= key->data_length - off;
684 subkey.ref_length= key->ref_length;
685 subkey.flag= key->flag;
686
687 /* we'll modify key entry 'in vivo' */
688 keypos-= keyinfo->keylength + page.node;
689 error= _ma_ck_real_write_btree(info, &subkey, &root, comp_flag);
690 _ma_dpointer(share, keypos+HA_FT_WLEN, root);
691 subkeys--; /* should there be underflow protection ? */
692 DBUG_ASSERT(subkeys < 0);
693 ft_intXstore(keypos, subkeys);
694 if (!error)
695 {
696 page_mark_changed(info, &page);
697 if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
698 DFLT_INIT_HITS))
699 goto err;
700 }
701 stack_alloc_free(temp_buff, buff_alloced);
702 DBUG_RETURN(error);
703 }
704 }
705 else /* not HA_FULLTEXT, normal HA_NOSAME key */
706 {
707 /*
708 TODO
709 When the index will support true versioning - with multiple
710 identical values in the UNIQUE index, invisible to each other -
711 the following should be changed to "continue inserting keys, at the
712 end (of the row or statement) wait". We need to wait on *all*
713 unique conflicts at once, not one-at-a-time, because we need to
714 know all blockers in advance, otherwise we'll have incomplete wait-for
715 graph.
716 */
717 /*
718 transaction that has inserted the conflicting key may be in progress.
719 the caller will wait for it to be committed or aborted.
720 */
721 info->dup_key_trid= _ma_trid_from_key(&tmp_key);
722 info->dup_key_pos= dup_key_pos;
723 my_errno= HA_ERR_FOUND_DUPP_KEY;
724 DBUG_PRINT("warning",
725 ("Duplicate key. dup_key_trid: %lu pos %lu visible: %d",
726 (ulong) info->dup_key_trid,
727 (ulong) info->dup_key_pos,
728 info->trn ? trnman_can_read_from(info->trn,
729 info->dup_key_trid) : 2));
730 goto err;
731 }
732 }
733 if (flag == MARIA_FOUND_WRONG_KEY)
734 goto err;
735 if (!was_last_key)
736 insert_last=0;
737 next_page= _ma_kpos(page.node, keypos);
738 if (next_page == HA_OFFSET_ERROR ||
739 (error= w_search(info, comp_flag, key, next_page,
740 &page, keypos, insert_last)) > 0)
741 {
742 error= _ma_insert(info, key, &page, keypos, keybuff,
743 father_page, father_keypos, insert_last);
744 if (error < 0)
745 goto err;
746 page_mark_changed(info, &page);
747 if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
748 DFLT_INIT_HITS))
749 goto err;
750 }
751 stack_alloc_free(temp_buff, buff_alloced);
752 DBUG_RETURN(error);
753 err:
754 stack_alloc_free(temp_buff, buff_alloced);
755 DBUG_PRINT("exit",("Error: %d",my_errno));
756 DBUG_RETURN(-1);
757 } /* w_search */
758
759
760 /*
761 Insert new key.
762
763 SYNOPSIS
764 _ma_insert()
765 info Open table information.
766 keyinfo Key definition information.
767 key New key
768 anc_page Key page (beginning)
769 key_pos Position in key page where to insert.
770 key_buff Copy of previous key if keys where packed.
771 father_page position of parent key page in file.
772 father_key_pos position in parent key page for balancing.
773 insert_last If to append at end of page.
774
775 DESCRIPTION
776 Insert new key at right of key_pos.
777 Note that caller must save anc_buff
778
779 This function writes log records for all changed pages
780 (Including anc_buff and father page)
781
782 RETURN
783 < 0 Error.
784 0 OK
785 1 If key contains key to upper level (from balance page)
786 2 If key contains key to upper level (from split space)
787 */
788
_ma_insert(register MARIA_HA * info,MARIA_KEY * key,MARIA_PAGE * anc_page,uchar * key_pos,uchar * key_buff,MARIA_PAGE * father_page,uchar * father_key_pos,my_bool insert_last)789 int _ma_insert(register MARIA_HA *info, MARIA_KEY *key,
790 MARIA_PAGE *anc_page, uchar *key_pos, uchar *key_buff,
791 MARIA_PAGE *father_page, uchar *father_key_pos,
792 my_bool insert_last)
793 {
794 uint a_length, nod_flag, org_anc_length;
795 int t_length;
796 uchar *endpos, *prev_key, *anc_buff;
797 MARIA_KEY_PARAM s_temp;
798 MARIA_SHARE *share= info->s;
799 MARIA_KEYDEF *keyinfo= key->keyinfo;
800 DBUG_ENTER("_ma_insert");
801 DBUG_PRINT("enter",("key_pos:%p", key_pos));
802 DBUG_EXECUTE("key", _ma_print_key(DBUG_FILE, key););
803
804 /*
805 Note that anc_page->size can be bigger then block_size in case of
806 delete key that caused increase of page length
807 */
808 org_anc_length= a_length= anc_page->size;
809 nod_flag= anc_page->node;
810
811 anc_buff= anc_page->buff;
812 endpos= anc_buff+ a_length;
813 prev_key= (key_pos == anc_buff + share->keypage_header + nod_flag ?
814 (uchar*) 0 : key_buff);
815 t_length= (*keyinfo->pack_key)(key, nod_flag,
816 (key_pos == endpos ? (uchar*) 0 : key_pos),
817 prev_key, prev_key, &s_temp);
818 #ifndef DBUG_OFF
819 if (prev_key && (keyinfo->flag & (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
820 {
821 DBUG_DUMP("prev_key", prev_key, _ma_keylength(keyinfo,prev_key));
822 }
823 if (keyinfo->flag & HA_PACK_KEY)
824 {
825 DBUG_PRINT("test",("t_length: %d ref_len: %d",
826 t_length,s_temp.ref_length));
827 DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: %p",
828 s_temp.n_ref_length, s_temp.n_length, s_temp.key));
829 }
830 #endif
831 if (t_length > 0)
832 {
833 if (t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE)
834 {
835 _ma_set_fatal_error(share, HA_ERR_CRASHED);
836 DBUG_RETURN(-1);
837 }
838 bmove_upp(endpos+t_length, endpos, (uint) (endpos-key_pos));
839 }
840 else
841 {
842 if (-t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE)
843 {
844 _ma_set_fatal_error(share, HA_ERR_CRASHED);
845 DBUG_RETURN(-1);
846 }
847 bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
848 }
849 (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
850 a_length+=t_length;
851
852 if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
853 _ma_mark_page_with_transid(share, anc_page);
854
855 anc_page->size= a_length;
856 page_store_size(share, anc_page);
857
858 /*
859 Check if the new key fits totally into the the page
860 (anc_buff is big enough to contain a full page + one key)
861 */
862 if (a_length <= share->max_index_block_size)
863 {
864 if (share->max_index_block_size - a_length < 32 &&
865 (keyinfo->flag & HA_FULLTEXT) && key_pos == endpos &&
866 share->base.key_reflength <= share->rec_reflength &&
867 share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
868 {
869 /*
870 Normal word. One-level tree. Page is almost full.
871 Let's consider converting.
872 We'll compare 'key' and the first key at anc_buff
873 */
874 const uchar *a= key->data;
875 const uchar *b= anc_buff + share->keypage_header + nod_flag;
876 uint alen, blen, ft2len= share->ft2_keyinfo.keylength;
877 /* the very first key on the page is always unpacked */
878 DBUG_ASSERT((*b & 128) == 0);
879 #if HA_FT_MAXLEN >= 127
880 blen= mi_uint2korr(b); b+=2;
881 When you enable this code, as part of the MyISAM->Maria merge of
882 ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0
883 restore ft2 functionality, fix bugs.
884 Then this will enable two-level fulltext index, which is not totally
885 recoverable yet.
886 So remove this text and inform Guilhem so that he fixes the issue.
887 #else
888 blen= *b++;
889 #endif
890 get_key_length(alen,a);
891 DBUG_ASSERT(info->ft1_to_ft2==0);
892 if (alen == blen &&
893 ha_compare_text(keyinfo->seg->charset, a, alen,
894 b, blen, 0) == 0)
895 {
896 /* Yup. converting */
897 info->ft1_to_ft2=(DYNAMIC_ARRAY *)
898 my_malloc(PSI_INSTRUMENT_ME, sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
899 my_init_dynamic_array(PSI_INSTRUMENT_ME, info->ft1_to_ft2, ft2len, 300,
900 50, MYF(0));
901
902 /*
903 Now, adding all keys from the page to dynarray
904 if the page is a leaf (if not keys will be deleted later)
905 */
906 if (!nod_flag)
907 {
908 /*
909 Let's leave the first key on the page, though, because
910 we cannot easily dispatch an empty page here
911 */
912 b+=blen+ft2len+2;
913 for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
914 insert_dynamic(info->ft1_to_ft2, b);
915
916 /* fixing the page's length - it contains only one key now */
917 anc_page->size= share->keypage_header + blen + ft2len + 2;
918 page_store_size(share, anc_page);
919 }
920 /* the rest will be done when we're back from recursion */
921 }
922 }
923 else
924 {
925 if (share->now_transactional &&
926 _ma_log_add(anc_page, org_anc_length,
927 key_pos, s_temp.changed_length, t_length, 1,
928 KEY_OP_DEBUG_LOG_ADD_1))
929 DBUG_RETURN(-1);
930 }
931 DBUG_RETURN(0); /* There is room on page */
932 }
933 /* Page is full */
934 if (nod_flag)
935 insert_last=0;
936 /*
937 TODO:
938 Remove 'born_transactional' here.
939 The only reason for having it here is that the current
940 _ma_balance_page_ can't handle variable length keys.
941 */
942 if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
943 father_page && !insert_last && !info->quick_mode &&
944 !info->s->base.born_transactional)
945 {
946 s_temp.key_pos= key_pos;
947 page_mark_changed(info, father_page);
948 DBUG_RETURN(_ma_balance_page(info, keyinfo, key, anc_page,
949 father_page, father_key_pos,
950 &s_temp));
951 }
952 DBUG_RETURN(_ma_split_page(info, key, anc_page,
953 MY_MIN(org_anc_length,
954 info->s->max_index_block_size),
955 key_pos, s_temp.changed_length, t_length,
956 key_buff, insert_last));
957 } /* _ma_insert */
958
959
960 /**
961 @brief split a full page in two and assign emerging item to key
962
963 @fn _ma_split_page()
964 info Maria handler
965 keyinfo Key handler
966 key Buffer for middle key
967 split_page Page that should be split
968 org_split_length Original length of split_page before key was inserted
969 inserted_key_pos Address in buffer where key was inserted
970 changed_length Number of bytes changed at 'inserted_key_pos'
971 move_length Number of bytes buffer was moved when key was inserted
972 key_buff Key buffer to use for temporary storage of key
973 insert_last_key If we are insert key on rightmost key page
974
975 @note
976 split_buff is not stored on disk (caller has to do this)
977
978 @return
979 @retval 2 ok (Middle key up from _ma_insert())
980 @retval -1 error
981 */
982
_ma_split_page(MARIA_HA * info,MARIA_KEY * key,MARIA_PAGE * split_page,uint org_split_length,uchar * inserted_key_pos,uint changed_length,int move_length,uchar * key_buff,my_bool insert_last_key)983 int _ma_split_page(MARIA_HA *info, MARIA_KEY *key, MARIA_PAGE *split_page,
984 uint org_split_length,
985 uchar *inserted_key_pos, uint changed_length,
986 int move_length,
987 uchar *key_buff, my_bool insert_last_key)
988 {
989 uint keynr;
990 uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
991 uint page_length, split_length, page_flag;
992 uchar *key_pos, *pos, *UNINIT_VAR(after_key);
993 MARIA_KEY_PARAM s_temp;
994 MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
995 MARIA_SHARE *share= info->s;
996 MARIA_KEYDEF *keyinfo= key->keyinfo;
997 MARIA_KEY tmp_key;
998 MARIA_PAGE new_page;
999 int res;
1000 DBUG_ENTER("_ma_split_page");
1001
1002 DBUG_DUMP("buff", split_page->buff, split_page->size);
1003
1004 info->page_changed=1; /* Info->buff is used */
1005 info->keyread_buff_used=1;
1006 page_flag= split_page->flag;
1007 nod_flag= split_page->node;
1008 key_ref_length= share->keypage_header + nod_flag;
1009
1010 new_page.info= info;
1011 new_page.buff= info->buff;
1012 new_page.keyinfo= keyinfo;
1013
1014 tmp_key.data= key_buff;
1015 tmp_key.keyinfo= keyinfo;
1016 if (insert_last_key)
1017 key_pos= _ma_find_last_pos(&tmp_key, split_page, &after_key);
1018 else
1019 key_pos= _ma_find_half_pos(&tmp_key, split_page, &after_key);
1020 if (!key_pos)
1021 DBUG_RETURN(-1);
1022
1023 key_length= tmp_key.data_length + tmp_key.ref_length;
1024 split_length= (uint) (key_pos - split_page->buff);
1025 a_length= split_page->size;
1026 split_page->size= split_length;
1027 page_store_size(share, split_page);
1028
1029 key_pos=after_key;
1030 if (nod_flag)
1031 {
1032 DBUG_PRINT("test",("Splitting nod"));
1033 pos=key_pos-nod_flag;
1034 memcpy(new_page.buff + share->keypage_header, pos, (size_t) nod_flag);
1035 }
1036
1037 /* Move middle item to key and pointer to new page */
1038 if ((new_page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
1039 HA_OFFSET_ERROR)
1040 DBUG_RETURN(-1);
1041
1042 _ma_copy_key(key, &tmp_key);
1043 _ma_kpointer(info, key->data + key_length, new_page.pos);
1044
1045 /* Store new page */
1046 if (!(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &key_pos))
1047 DBUG_RETURN(-1);
1048
1049 t_length=(*keyinfo->pack_key)(&tmp_key, nod_flag, (uchar *) 0,
1050 (uchar*) 0, (uchar*) 0, &s_temp);
1051 length=(uint) ((split_page->buff + a_length) - key_pos);
1052 memcpy(new_page.buff + key_ref_length + t_length, key_pos,
1053 (size_t) length);
1054 (*keyinfo->store_key)(keyinfo,new_page.buff+key_ref_length,&s_temp);
1055 page_length= length + t_length + key_ref_length;
1056
1057 bzero(new_page.buff, share->keypage_header);
1058 /* Copy KEYFLAG_FLAG_ISNODE and KEYPAGE_FLAG_HAS_TRANSID from parent page */
1059 new_page.flag= page_flag;
1060 new_page.size= page_length;
1061 page_store_info(share, &new_page);
1062
1063 /* Copy key number */
1064 keynr= _ma_get_keynr(share, split_page->buff);
1065 _ma_store_keynr(share, new_page.buff, keynr);
1066
1067 res= 2; /* Middle key up */
1068 if (share->now_transactional && _ma_log_new(&new_page, 0))
1069 res= -1;
1070
1071 /*
1072 Clear unitialized part of page to avoid valgrind/purify warnings
1073 and to get a clean page that is easier to compress and compare with
1074 pages generated with redo
1075 */
1076 bzero(new_page.buff + page_length, share->block_size - page_length);
1077
1078 if (_ma_write_keypage(&new_page, page_link->write_lock,
1079 DFLT_INIT_HITS))
1080 res= -1;
1081
1082 /* Save changes to split pages */
1083 if (share->now_transactional &&
1084 _ma_log_split(split_page, org_split_length, split_length,
1085 inserted_key_pos, changed_length, move_length,
1086 KEY_OP_NONE, (uchar*) 0, 0, 0))
1087 res= -1;
1088
1089 DBUG_DUMP_KEY("middle_key", key);
1090 DBUG_RETURN(res);
1091 } /* _ma_split_page */
1092
1093
1094 /*
1095 Calculate how to much to move to split a page in two
1096
1097 Returns pointer to start of key.
1098 key will contain the key.
1099 after_key will contain the position to where the next key starts
1100 */
1101
_ma_find_half_pos(MARIA_KEY * key,MARIA_PAGE * ma_page,uchar ** after_key)1102 uchar *_ma_find_half_pos(MARIA_KEY *key, MARIA_PAGE *ma_page,
1103 uchar **after_key)
1104 {
1105 uint keys, length, key_ref_length, page_flag, nod_flag;
1106 uchar *page, *end, *lastpos;
1107 MARIA_HA *info= ma_page->info;
1108 MARIA_SHARE *share= info->s;
1109 MARIA_KEYDEF *keyinfo= key->keyinfo;
1110 DBUG_ENTER("_ma_find_half_pos");
1111
1112 nod_flag= ma_page->node;
1113 key_ref_length= share->keypage_header + nod_flag;
1114 page_flag= ma_page->flag;
1115 length= ma_page->size - key_ref_length;
1116 page= ma_page->buff+ key_ref_length; /* Point to first key */
1117
1118 if (!(keyinfo->flag &
1119 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1120 HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1121 {
1122 key_ref_length= keyinfo->keylength+nod_flag;
1123 key->data_length= keyinfo->keylength - info->s->rec_reflength;
1124 key->ref_length= info->s->rec_reflength;
1125 key->flag= 0;
1126 keys=length/(key_ref_length*2);
1127 end=page+keys*key_ref_length;
1128 *after_key=end+key_ref_length;
1129 memcpy(key->data, end, key_ref_length);
1130 DBUG_RETURN(end);
1131 }
1132
1133 end=page+length/2-key_ref_length; /* This is aprox. half */
1134 key->data[0]= 0; /* Safety */
1135 do
1136 {
1137 lastpos=page;
1138 if (!(length= (*keyinfo->get_key)(key, page_flag, nod_flag, &page)))
1139 DBUG_RETURN(0);
1140 } while (page < end);
1141 *after_key= page;
1142 DBUG_PRINT("exit",("returns: %p page: %p half: %p",
1143 lastpos, page, end));
1144 DBUG_RETURN(lastpos);
1145 } /* _ma_find_half_pos */
1146
1147
1148 /**
1149 Find second to last key on leaf page
1150
1151 @notes
1152 Used to split buffer at last key. In this case the next to last
1153 key will be moved to parent page and last key will be on it's own page.
1154
1155 @TODO
1156 Add one argument for 'last key value' to get_key so that one can
1157 do the loop without having to copy the found key the whole time
1158
1159 @return
1160 @retval Pointer to the start of the key before the last key
1161 @retval int_key will contain the last key
1162 */
1163
_ma_find_last_pos(MARIA_KEY * int_key,MARIA_PAGE * ma_page,uchar ** after_key)1164 static uchar *_ma_find_last_pos(MARIA_KEY *int_key, MARIA_PAGE *ma_page,
1165 uchar **after_key)
1166 {
1167 uint keys, length, key_ref_length, page_flag;
1168 uchar *page, *end, *lastpos, *prevpos;
1169 uchar key_buff[MARIA_MAX_KEY_BUFF];
1170 MARIA_HA *info= ma_page->info;
1171 MARIA_SHARE *share= info->s;
1172 MARIA_KEYDEF *keyinfo= int_key->keyinfo;
1173 MARIA_KEY tmp_key;
1174 DBUG_ENTER("_ma_find_last_pos");
1175
1176 key_ref_length= share->keypage_header;
1177 page_flag= ma_page->flag;
1178 length= ma_page->size - key_ref_length;
1179 page= ma_page->buff + key_ref_length;
1180
1181 if (!(keyinfo->flag &
1182 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1183 HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1184 {
1185 keys= length / keyinfo->keylength - 2;
1186 length= keyinfo->keylength;
1187 int_key->data_length= length - info->s->rec_reflength;
1188 int_key->ref_length= info->s->rec_reflength;
1189 int_key->flag= 0;
1190 end=page+keys*length;
1191 *after_key=end+length;
1192 memcpy(int_key->data, end, length);
1193 DBUG_RETURN(end);
1194 }
1195
1196 end=page+length-key_ref_length;
1197 lastpos=page;
1198 tmp_key.data= key_buff;
1199 tmp_key.keyinfo= int_key->keyinfo;
1200 key_buff[0]= 0; /* Safety */
1201
1202 /* We know that there are at least 2 keys on the page */
1203
1204 if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
1205 {
1206 _ma_set_fatal_error(share, HA_ERR_CRASHED);
1207 DBUG_RETURN(0);
1208 }
1209
1210 do
1211 {
1212 prevpos=lastpos; lastpos=page;
1213 int_key->data_length= tmp_key.data_length;
1214 int_key->ref_length= tmp_key.ref_length;
1215 int_key->flag= tmp_key.flag;
1216 memcpy(int_key->data, key_buff, length); /* previous key */
1217 if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
1218 {
1219 _ma_set_fatal_error(share, HA_ERR_CRASHED);
1220 DBUG_RETURN(0);
1221 }
1222 } while (page < end);
1223
1224 *after_key=lastpos;
1225 DBUG_PRINT("exit",("returns: %p page: %p end: %p",
1226 prevpos,page,end));
1227 DBUG_RETURN(prevpos);
1228 } /* _ma_find_last_pos */
1229
1230
1231 /**
1232 @brief Balance page with static size keys with page on right/left
1233
1234 @param key Middle key will be stored here
1235
1236 @notes
1237 Father_buff will always be changed
1238 Caller must handle saving of curr_buff
1239
1240 @return
1241 @retval 0 Balance was done (father buff is saved)
1242 @retval 1 Middle key up (father buff is not saved)
1243 @retval -1 Error
1244 */
1245
_ma_balance_page(MARIA_HA * info,MARIA_KEYDEF * keyinfo,MARIA_KEY * key,MARIA_PAGE * curr_page,MARIA_PAGE * father_page,uchar * father_key_pos,MARIA_KEY_PARAM * s_temp)1246 static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
1247 MARIA_KEY *key, MARIA_PAGE *curr_page,
1248 MARIA_PAGE *father_page,
1249 uchar *father_key_pos, MARIA_KEY_PARAM *s_temp)
1250 {
1251 MARIA_PINNED_PAGE tmp_page_link, *new_page_link= &tmp_page_link;
1252 MARIA_SHARE *share= info->s;
1253 my_bool right, buff_alloced;
1254 uint k_length,father_length,father_keylength,nod_flag,curr_keylength;
1255 uint right_length,left_length,new_right_length,new_left_length,extra_length;
1256 uint keys, tmp_length, extra_buff_length;
1257 uchar *pos, *extra_buff, *parting_key;
1258 uchar *tmp_part_key;
1259 MARIA_PAGE next_page, extra_page, *left_page, *right_page;
1260 DBUG_ENTER("_ma_balance_page");
1261
1262 alloc_on_stack(*info->stack_end_ptr, tmp_part_key, buff_alloced,
1263 keyinfo->max_store_length);
1264 if (!tmp_part_key)
1265 DBUG_RETURN(-1);
1266
1267 k_length= keyinfo->keylength;
1268 father_length= father_page->size;
1269 father_keylength= k_length + share->base.key_reflength;
1270 nod_flag= curr_page->node;
1271 curr_keylength= k_length+nod_flag;
1272 info->page_changed=1;
1273
1274 if ((father_key_pos != father_page->buff+father_length &&
1275 (info->state->records & 1)) ||
1276 father_key_pos == father_page->buff+ share->keypage_header +
1277 share->base.key_reflength)
1278 {
1279 right=1;
1280 next_page.pos= _ma_kpos(share->base.key_reflength,
1281 father_key_pos+father_keylength);
1282 left_page= curr_page;
1283 right_page= &next_page;
1284 DBUG_PRINT("info", ("use right page: %lu",
1285 (ulong) (next_page.pos / keyinfo->block_length)));
1286 }
1287 else
1288 {
1289 right=0;
1290 father_key_pos-=father_keylength;
1291 next_page.pos= _ma_kpos(share->base.key_reflength,father_key_pos);
1292 left_page= &next_page;
1293 right_page= curr_page;
1294 DBUG_PRINT("info", ("use left page: %lu",
1295 (ulong) (next_page.pos / keyinfo->block_length)));
1296 } /* father_key_pos ptr to parting key */
1297
1298 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1299 PAGECACHE_LOCK_WRITE,
1300 DFLT_INIT_HITS, info->buff, 0))
1301 goto err;
1302 page_mark_changed(info, &next_page);
1303 DBUG_DUMP("next", next_page.buff, next_page.size);
1304
1305 /* Test if there is room to share keys */
1306 left_length= left_page->size;
1307 right_length= right_page->size;
1308 keys= ((left_length+right_length-share->keypage_header*2-nod_flag*2)/
1309 curr_keylength);
1310
1311 if ((right ? right_length : left_length) + curr_keylength <=
1312 share->max_index_block_size)
1313 {
1314 /* Enough space to hold all keys in the two buffers ; Balance bufferts */
1315 new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength;
1316 new_right_length=share->keypage_header+nod_flag+(((keys+1)/2)*
1317 curr_keylength);
1318 left_page->size= new_left_length;
1319 page_store_size(share, left_page);
1320 right_page->size= new_right_length;
1321 page_store_size(share, right_page);
1322
1323 DBUG_PRINT("info", ("left_length: %u -> %u right_length: %u -> %u",
1324 left_length, new_left_length,
1325 right_length, new_right_length));
1326 if (left_length < new_left_length)
1327 {
1328 uint length;
1329 DBUG_PRINT("info", ("move keys to end of buff"));
1330
1331 /* Move keys right_page -> left_page */
1332 pos= left_page->buff+left_length;
1333 memcpy(pos,father_key_pos, (size_t) k_length);
1334 memcpy(pos+k_length, right_page->buff + share->keypage_header,
1335 (size_t) (length=new_left_length - left_length - k_length));
1336 pos= right_page->buff + share->keypage_header + length;
1337 memcpy(father_key_pos, pos, (size_t) k_length);
1338 bmove(right_page->buff + share->keypage_header,
1339 pos + k_length, new_right_length - share->keypage_header);
1340
1341 if (share->now_transactional)
1342 {
1343 if (right)
1344 {
1345 /*
1346 Log changes to page on left
1347 The original page is on the left and stored in left_page->buff
1348 We have on the page the newly inserted key and data
1349 from buff added last on the page
1350 */
1351 if (_ma_log_split(curr_page,
1352 left_length - s_temp->move_length,
1353 new_left_length,
1354 s_temp->key_pos, s_temp->changed_length,
1355 s_temp->move_length,
1356 KEY_OP_ADD_SUFFIX,
1357 curr_page->buff + left_length,
1358 new_left_length - left_length,
1359 new_left_length - left_length+ k_length))
1360 goto err;
1361 /*
1362 Log changes to page on right
1363 This contains the original data with some keys deleted from
1364 start of page
1365 */
1366 if (_ma_log_prefix(&next_page, 0,
1367 ((int) new_right_length - (int) right_length),
1368 KEY_OP_DEBUG_LOG_PREFIX_3))
1369 goto err;
1370 }
1371 else
1372 {
1373 /*
1374 Log changes to page on right (the original page) which is in buff
1375 Data is removed from start of page
1376 The inserted key may be in buff or moved to curr_buff
1377 */
1378 if (_ma_log_del_prefix(curr_page,
1379 right_length - s_temp->changed_length,
1380 new_right_length,
1381 s_temp->key_pos, s_temp->changed_length,
1382 s_temp->move_length))
1383 goto err;
1384 /*
1385 Log changes to page on left, which has new data added last
1386 */
1387 if (_ma_log_suffix(&next_page, left_length, new_left_length))
1388 goto err;
1389 }
1390 }
1391 }
1392 else
1393 {
1394 uint length;
1395 DBUG_PRINT("info", ("move keys to start of right_page"));
1396
1397 bmove_upp(right_page->buff + new_right_length,
1398 right_page->buff + right_length,
1399 right_length - share->keypage_header);
1400 length= new_right_length -right_length - k_length;
1401 memcpy(right_page->buff + share->keypage_header + length, father_key_pos,
1402 (size_t) k_length);
1403 pos= left_page->buff + new_left_length;
1404 memcpy(father_key_pos, pos, (size_t) k_length);
1405 memcpy(right_page->buff + share->keypage_header, pos+k_length,
1406 (size_t) length);
1407
1408 if (share->now_transactional)
1409 {
1410 if (right)
1411 {
1412 /*
1413 Log changes to page on left
1414 The original page is on the left and stored in curr_buff
1415 The page is shortened from end and the key may be on the page
1416 */
1417 if (_ma_log_split(curr_page,
1418 left_length - s_temp->move_length,
1419 new_left_length,
1420 s_temp->key_pos, s_temp->changed_length,
1421 s_temp->move_length,
1422 KEY_OP_NONE, (uchar*) 0, 0, 0))
1423 goto err;
1424 /*
1425 Log changes to page on right
1426 This contains the original data, with some data from cur_buff
1427 added first
1428 */
1429 if (_ma_log_prefix(&next_page,
1430 (uint) (new_right_length - right_length),
1431 (int) (new_right_length - right_length),
1432 KEY_OP_DEBUG_LOG_PREFIX_4))
1433 goto err;
1434 }
1435 else
1436 {
1437 /*
1438 Log changes to page on right (the original page) which is in buff
1439 We have on the page the newly inserted key and data
1440 from buff added first on the page
1441 */
1442 uint diff_length= new_right_length - right_length;
1443 if (_ma_log_split(curr_page,
1444 left_length - s_temp->move_length,
1445 new_right_length,
1446 s_temp->key_pos + diff_length,
1447 s_temp->changed_length,
1448 s_temp->move_length,
1449 KEY_OP_ADD_PREFIX,
1450 curr_page->buff + share->keypage_header,
1451 diff_length, diff_length + k_length))
1452 goto err;
1453 /*
1454 Log changes to page on left, which is shortened from end
1455 */
1456 if (_ma_log_suffix(&next_page, left_length, new_left_length))
1457 goto err;
1458 }
1459 }
1460 }
1461
1462 /* Log changes to father (one level up) page */
1463
1464 if (share->now_transactional &&
1465 _ma_log_change(father_page, father_key_pos, k_length,
1466 KEY_OP_DEBUG_FATHER_CHANGED_1))
1467 goto err;
1468
1469 /*
1470 next_page_link->changed is marked as true above and fathers
1471 page_link->changed is marked as true in caller
1472 */
1473 if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
1474 DFLT_INIT_HITS) ||
1475 _ma_write_keypage(father_page,
1476 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1477 goto err;
1478 stack_alloc_free(tmp_part_key, buff_alloced);
1479 DBUG_RETURN(0);
1480 }
1481
1482 /* left_page and right_page are full, lets split and make new nod */
1483
1484 extra_buff= info->buff+share->base.max_key_block_length;
1485 new_left_length= new_right_length= (share->keypage_header + nod_flag +
1486 (keys+1) / 3 * curr_keylength);
1487 extra_page.info= info;
1488 extra_page.keyinfo= keyinfo;
1489 extra_page.buff= extra_buff;
1490
1491 /*
1492 5 is the minum number of keys we can have here. This comes from
1493 the fact that each full page can store at least 2 keys and in this case
1494 we have a 'split' key, ie 2+2+1 = 5
1495 */
1496 if (keys == 5) /* Too few keys to balance */
1497 new_left_length-=curr_keylength;
1498 extra_length= (nod_flag + left_length + right_length -
1499 new_left_length - new_right_length - curr_keylength);
1500 extra_buff_length= extra_length + share->keypage_header;
1501 DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
1502 left_length, right_length,
1503 new_left_length, new_right_length,
1504 extra_length));
1505
1506 left_page->size= new_left_length;
1507 page_store_size(share, left_page);
1508 right_page->size= new_right_length;
1509 page_store_size(share, right_page);
1510
1511 bzero(extra_buff, share->keypage_header);
1512 extra_page.flag= nod_flag ? KEYPAGE_FLAG_ISNOD : 0;
1513 extra_page.size= extra_buff_length;
1514 page_store_info(share, &extra_page);
1515
1516 /* Copy key number */
1517 _ma_store_keynr(share, extra_buff, keyinfo->key_nr);
1518
1519 /* move first largest keys to new page */
1520 pos= right_page->buff + right_length-extra_length;
1521 memcpy(extra_buff + share->keypage_header, pos, extra_length);
1522 /* Zero old data from buffer */
1523 bzero(extra_buff + extra_buff_length,
1524 share->block_size - extra_buff_length);
1525
1526 /* Save new parting key between buff and extra_buff */
1527 memcpy(tmp_part_key, pos-k_length,k_length);
1528 /* Make place for new keys */
1529 bmove_upp(right_page->buff + new_right_length, pos - k_length,
1530 right_length - extra_length - k_length - share->keypage_header);
1531 /* Copy keys from left page */
1532 pos= left_page->buff + new_left_length;
1533 memcpy(right_page->buff + share->keypage_header, pos + k_length,
1534 (size_t) (tmp_length= left_length - new_left_length - k_length));
1535 /* Copy old parting key */
1536 parting_key= right_page->buff + share->keypage_header + tmp_length;
1537 memcpy(parting_key, father_key_pos, (size_t) k_length);
1538
1539 /* Move new parting keys up to caller */
1540 memcpy((right ? key->data : father_key_pos),pos,(size_t) k_length);
1541 memcpy((right ? father_key_pos : key->data),tmp_part_key, k_length);
1542
1543 if ((extra_page.pos= _ma_new(info, DFLT_INIT_HITS, &new_page_link))
1544 == HA_OFFSET_ERROR)
1545 goto err;
1546 _ma_kpointer(info,key->data+k_length, extra_page.pos);
1547 /* This is safe as long we are using not keys with transid */
1548 key->data_length= k_length - info->s->rec_reflength;
1549 key->ref_length= info->s->rec_reflength;
1550
1551 if (right)
1552 {
1553 /*
1554 Page order according to key values:
1555 orignal_page (curr_page = left_page), next_page (buff), extra_buff
1556
1557 Move page positions so that we store data in extra_page where
1558 next_page was and next_page will be stored at the new position
1559 */
1560 swap_variables(my_off_t, extra_page.pos, next_page.pos);
1561 }
1562
1563 if (share->now_transactional)
1564 {
1565 if (right)
1566 {
1567 /*
1568 left_page is shortened,
1569 right_page is getting new keys at start and shortened from end.
1570 extra_page is new page
1571
1572 Note that extra_page (largest key parts) will be stored at the
1573 place of the original 'right' page (next_page) and right page
1574 will be stored at the new page position
1575
1576 This makes the log entries smaller as right_page contains all
1577 data to generate the data extra_buff
1578 */
1579
1580 /*
1581 Log changes to page on left (page shortened page at end)
1582 */
1583 if (_ma_log_split(curr_page,
1584 left_length - s_temp->move_length, new_left_length,
1585 s_temp->key_pos, s_temp->changed_length,
1586 s_temp->move_length,
1587 KEY_OP_NONE, (uchar*) 0, 0, 0))
1588 goto err;
1589 /*
1590 Log changes to right page (stored at next page)
1591 This contains the last 'extra_buff' from 'buff'
1592 */
1593 if (_ma_log_prefix(&extra_page,
1594 0, (int) (extra_buff_length - right_length),
1595 KEY_OP_DEBUG_LOG_PREFIX_5))
1596 goto err;
1597
1598 /*
1599 Log changes to middle page, which is stored at the new page
1600 position
1601 */
1602 if (_ma_log_new(&next_page, 0))
1603 goto err;
1604 }
1605 else
1606 {
1607 /*
1608 Log changes to page on right (the original page) which is in buff
1609 This contains the original data, with some data from curr_buff
1610 added first and shortened at end
1611 */
1612 int data_added_first= left_length - new_left_length;
1613 if (_ma_log_key_middle(right_page,
1614 new_right_length,
1615 data_added_first,
1616 data_added_first,
1617 extra_length,
1618 s_temp->key_pos,
1619 s_temp->changed_length,
1620 s_temp->move_length))
1621 goto err;
1622
1623 /* Log changes to page on left, which is shortened from end */
1624 if (_ma_log_suffix(left_page, left_length, new_left_length))
1625 goto err;
1626
1627 /* Log change to rightmost (new) page */
1628 if (_ma_log_new(&extra_page, 0))
1629 goto err;
1630 }
1631
1632 /* Log changes to father (one level up) page */
1633 if (share->now_transactional &&
1634 _ma_log_change(father_page, father_key_pos, k_length,
1635 KEY_OP_DEBUG_FATHER_CHANGED_2))
1636 goto err;
1637 }
1638
1639 if (_ma_write_keypage(&next_page,
1640 (right ? new_page_link->write_lock :
1641 PAGECACHE_LOCK_LEFT_WRITELOCKED),
1642 DFLT_INIT_HITS) ||
1643 _ma_write_keypage(&extra_page,
1644 (!right ? new_page_link->write_lock :
1645 PAGECACHE_LOCK_LEFT_WRITELOCKED),
1646 DFLT_INIT_HITS))
1647 goto err;
1648
1649 stack_alloc_free(tmp_part_key, buff_alloced);
1650 DBUG_RETURN(1); /* Middle key up */
1651
1652 err:
1653 stack_alloc_free(tmp_part_key, buff_alloced);
1654 DBUG_RETURN(-1);
1655 } /* _ma_balance_page */
1656
1657
1658 /**********************************************************************
1659 * Bulk insert code *
1660 **********************************************************************/
1661
1662 typedef struct {
1663 MARIA_HA *info;
1664 uint keynr;
1665 } bulk_insert_param;
1666
1667
_ma_ck_write_tree(register MARIA_HA * info,MARIA_KEY * key)1668 static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key)
1669 {
1670 my_bool error;
1671 uint keynr= key->keyinfo->key_nr;
1672 DBUG_ENTER("_ma_ck_write_tree");
1673
1674 /* Store ref_length as this is always constant */
1675 info->bulk_insert_ref_length= key->ref_length;
1676 error= tree_insert(&info->bulk_insert[keynr], key->data,
1677 key->data_length + key->ref_length,
1678 info->bulk_insert[keynr].custom_arg) == 0;
1679 DBUG_RETURN(error);
1680 } /* _ma_ck_write_tree */
1681
1682
1683 /* typeof(_ma_keys_compare)=qsort_cmp2 */
1684
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)1685 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
1686 {
1687 uint not_used[2];
1688 return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
1689 key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
1690 not_used);
1691 }
1692
1693
keys_free(void * key_arg,TREE_FREE mode,void * param_arg)1694 static int keys_free(void* key_arg, TREE_FREE mode, void *param_arg)
1695 {
1696 /*
1697 Probably I can use info->lastkey here, but I'm not sure,
1698 and to be safe I'd better use local lastkey.
1699 */
1700 bulk_insert_param *param= (bulk_insert_param*)param_arg;
1701 MARIA_SHARE *share= param->info->s;
1702 uchar lastkey[MARIA_MAX_KEY_BUFF], *key= (uchar*)key_arg;
1703 uint keylen;
1704 MARIA_KEYDEF *keyinfo= share->keyinfo + param->keynr;
1705 MARIA_KEY tmp_key;
1706
1707 switch (mode) {
1708 case free_init:
1709 if (share->lock_key_trees)
1710 {
1711 mysql_rwlock_wrlock(&keyinfo->root_lock);
1712 keyinfo->version++;
1713 }
1714 return 0;
1715 case free_free:
1716 /* Note: keylen doesn't contain transid lengths */
1717 keylen= _ma_keylength(keyinfo, key);
1718 tmp_key.data= lastkey;
1719 tmp_key.keyinfo= keyinfo;
1720 tmp_key.data_length= keylen - share->rec_reflength;
1721 tmp_key.ref_length= param->info->bulk_insert_ref_length;
1722 tmp_key.flag= (param->info->bulk_insert_ref_length ==
1723 share->rec_reflength ? 0 : SEARCH_USER_KEY_HAS_TRANSID);
1724 /*
1725 We have to copy key as ma_ck_write_btree may need the buffer for
1726 copying middle key up if tree is growing
1727 */
1728 memcpy(lastkey, key, tmp_key.data_length + tmp_key.ref_length);
1729 _ma_ck_write_btree(param->info, &tmp_key);
1730 return 0;
1731 case free_end:
1732 if (share->lock_key_trees)
1733 mysql_rwlock_unlock(&keyinfo->root_lock);
1734 return 0;
1735 }
1736 return 0;
1737 }
1738
1739
maria_init_bulk_insert(MARIA_HA * info,size_t cache_size,ha_rows rows)1740 int maria_init_bulk_insert(MARIA_HA *info, size_t cache_size, ha_rows rows)
1741 {
1742 MARIA_SHARE *share= info->s;
1743 MARIA_KEYDEF *key=share->keyinfo;
1744 bulk_insert_param *params;
1745 uint i, num_keys, total_keylength;
1746 ulonglong key_map;
1747 DBUG_ENTER("_ma_init_bulk_insert");
1748 DBUG_PRINT("enter",("cache_size: %lu", (ulong) cache_size));
1749
1750 DBUG_ASSERT(!info->bulk_insert &&
1751 (!rows || rows >= MARIA_MIN_ROWS_TO_USE_BULK_INSERT));
1752
1753 maria_clear_all_keys_active(key_map);
1754 for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
1755 {
1756 if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
1757 maria_is_key_active(share->state.key_map, i))
1758 {
1759 num_keys++;
1760 maria_set_key_active(key_map, i);
1761 total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
1762 }
1763 }
1764
1765 if (num_keys==0 ||
1766 num_keys * (size_t) MARIA_MIN_SIZE_BULK_INSERT_TREE > cache_size)
1767 DBUG_RETURN(0);
1768
1769 if (rows && rows*total_keylength < cache_size)
1770 cache_size= (size_t)rows;
1771 else
1772 cache_size/=total_keylength*16;
1773
1774 info->bulk_insert=(TREE *)
1775 my_malloc(PSI_INSTRUMENT_ME, (sizeof(TREE)*share->base.keys+
1776 sizeof(bulk_insert_param)*num_keys),MYF(0));
1777
1778 if (!info->bulk_insert)
1779 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1780
1781 params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1782 for (i=0 ; i < share->base.keys ; i++)
1783 {
1784 if (maria_is_key_active(key_map, i))
1785 {
1786 params->info=info;
1787 params->keynr=i;
1788 /* Only allocate a 16'th of the buffer at a time */
1789 init_tree(&info->bulk_insert[i],
1790 cache_size * key[i].maxlength,
1791 cache_size * key[i].maxlength, 0,
1792 (qsort_cmp2) keys_compare, keys_free, (void *)params++, MYF(0));
1793 }
1794 else
1795 info->bulk_insert[i].root=0;
1796 }
1797
1798 DBUG_RETURN(0);
1799 }
1800
maria_flush_bulk_insert(MARIA_HA * info,uint inx)1801 void maria_flush_bulk_insert(MARIA_HA *info, uint inx)
1802 {
1803 if (info->bulk_insert)
1804 {
1805 if (is_tree_inited(&info->bulk_insert[inx]))
1806 reset_tree(&info->bulk_insert[inx]);
1807 }
1808 }
1809
1810
maria_end_bulk_insert(MARIA_HA * info,my_bool abort)1811 int maria_end_bulk_insert(MARIA_HA *info, my_bool abort)
1812 {
1813 int first_error= 0;
1814 DBUG_ENTER("maria_end_bulk_insert");
1815 if (info->bulk_insert)
1816 {
1817 uint i;
1818 for (i=0 ; i < info->s->base.keys ; i++)
1819 {
1820 if (is_tree_inited(&info->bulk_insert[i]))
1821 {
1822 int error;
1823 if (info->s->deleting)
1824 reset_free_element(&info->bulk_insert[i]);
1825 if ((error= delete_tree(&info->bulk_insert[i], abort)))
1826 {
1827 first_error= first_error ? first_error : error;
1828 abort= 1;
1829 }
1830 }
1831 }
1832 my_free(info->bulk_insert);
1833 info->bulk_insert= 0;
1834 }
1835 DBUG_RETURN(first_error);
1836 }
1837
1838
1839 /****************************************************************************
1840 Dedicated functions that generate log entries
1841 ****************************************************************************/
1842
1843
_ma_write_undo_key_insert(MARIA_HA * info,const MARIA_KEY * key,my_off_t * root,my_off_t new_root,LSN * res_lsn)1844 int _ma_write_undo_key_insert(MARIA_HA *info, const MARIA_KEY *key,
1845 my_off_t *root, my_off_t new_root, LSN *res_lsn)
1846 {
1847 MARIA_SHARE *share= info->s;
1848 MARIA_KEYDEF *keyinfo= key->keyinfo;
1849 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1850 KEY_NR_STORE_SIZE];
1851 const uchar *key_value;
1852 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1853 struct st_msg_to_write_hook_for_undo_key msg;
1854 uint key_length;
1855
1856 /* Save if we need to write a clr record */
1857 lsn_store(log_data, info->trn->undo_lsn);
1858 key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
1859 keyinfo->key_nr);
1860 key_length= key->data_length + key->ref_length;
1861 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1862 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1863 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
1864 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
1865
1866 msg.root= root;
1867 msg.value= new_root;
1868 msg.auto_increment= 0;
1869 key_value= key->data;
1870 if (share->base.auto_key == ((uint) keyinfo->key_nr + 1))
1871 {
1872 const HA_KEYSEG *keyseg= keyinfo->seg;
1873 uchar reversed[MARIA_MAX_KEY_BUFF];
1874 if (keyseg->flag & HA_SWAP_KEY)
1875 {
1876 /* We put key from log record to "data record" packing format... */
1877 const uchar *key_ptr= key->data, *key_end= key->data + keyseg->length;
1878 uchar *to= reversed + keyseg->length;
1879 do
1880 {
1881 *--to= *key_ptr++;
1882 } while (key_ptr != key_end);
1883 key_value= to;
1884 }
1885 /* ... so that we can read it with: */
1886 msg.auto_increment=
1887 ma_retrieve_auto_increment(key_value, keyseg->type);
1888 /* and write_hook_for_undo_key_insert() will pick this. */
1889 }
1890
1891 return translog_write_record(res_lsn, LOGREC_UNDO_KEY_INSERT,
1892 info->trn, info,
1893 (translog_size_t)
1894 log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1895 key_length,
1896 TRANSLOG_INTERNAL_PARTS + 2, log_array,
1897 log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1898 }
1899
1900
1901 /**
1902 @brief Log creation of new page
1903
1904 @note
1905 We don't have to store the page_length into the log entry as we can
1906 calculate this from the length of the log entry
1907
1908 @retval 1 error
1909 @retval 0 ok
1910 */
1911
_ma_log_new(MARIA_PAGE * ma_page,my_bool root_page)1912 my_bool _ma_log_new(MARIA_PAGE *ma_page, my_bool root_page)
1913 {
1914 LSN lsn;
1915 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE
1916 +1];
1917 uint page_length;
1918 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1919 MARIA_HA *info= ma_page->info;
1920 MARIA_SHARE *share= info->s;
1921 my_off_t page= ma_page->pos / share->block_size;
1922 DBUG_ENTER("_ma_log_new");
1923 DBUG_PRINT("enter", ("page: %lu", (ulong) page));
1924
1925 DBUG_ASSERT(share->now_transactional);
1926
1927 /* Store address of new root page */
1928 page_store(log_data + FILEID_STORE_SIZE, page);
1929
1930 /* Store link to next unused page */
1931 if (info->key_del_used == 2)
1932 page= 0; /* key_del not changed */
1933 else
1934 page= ((share->key_del_current == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1935 share->key_del_current / share->block_size);
1936
1937 page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
1938 key_nr_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE*2,
1939 ma_page->keyinfo->key_nr);
1940 log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE*2 + KEY_NR_STORE_SIZE]=
1941 (uchar) root_page;
1942
1943 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1944 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1945
1946 page_length= ma_page->size - LSN_STORE_SIZE;
1947 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ma_page->buff + LSN_STORE_SIZE;
1948 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length;
1949
1950 /* Remember new page length for future log entires for same page */
1951 ma_page->org_size= ma_page->size;
1952
1953 if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE,
1954 info->trn, info,
1955 (translog_size_t)
1956 (sizeof(log_data) + page_length),
1957 TRANSLOG_INTERNAL_PARTS + 2, log_array,
1958 log_data, NULL))
1959 DBUG_RETURN(1);
1960 DBUG_RETURN(0);
1961 }
1962
1963
1964 /**
1965 @brief
1966 Log when some part of the key page changes
1967 */
1968
_ma_log_change(MARIA_PAGE * ma_page,const uchar * key_pos,uint length,enum en_key_debug debug_marker)1969 my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length,
1970 enum en_key_debug debug_marker __attribute__((unused)))
1971 {
1972 LSN lsn;
1973 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 6 + 7], *log_pos;
1974 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
1975 uint offset= (uint) (key_pos - ma_page->buff), translog_parts;
1976 MARIA_HA *info= ma_page->info;
1977 my_off_t page= ma_page->pos / info->s->block_size;
1978 DBUG_ENTER("_ma_log_change");
1979 DBUG_PRINT("enter", ("page: %lu length: %u", (ulong) page, length));
1980
1981 DBUG_ASSERT(info->s->now_transactional);
1982 DBUG_ASSERT(offset + length <= ma_page->size);
1983 DBUG_ASSERT(ma_page->org_size == ma_page->size);
1984
1985 /* Store address of new root page */
1986 page= ma_page->pos / info->s->block_size;
1987 page_store(log_data + FILEID_STORE_SIZE, page);
1988 log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1989
1990 #ifdef EXTRA_DEBUG_KEY_CHANGES
1991 (*log_pos++)= KEY_OP_DEBUG;
1992 (*log_pos++)= debug_marker;
1993 #endif
1994
1995 log_pos[0]= KEY_OP_OFFSET;
1996 int2store(log_pos+1, offset);
1997 log_pos[3]= KEY_OP_CHANGE;
1998 int2store(log_pos+4, length);
1999 log_pos+= 6;
2000
2001 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2002 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (log_pos - log_data);
2003 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
2004 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
2005 translog_parts= 2;
2006
2007 _ma_log_key_changes(ma_page,
2008 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2009 log_pos, &length, &translog_parts);
2010
2011 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
2012 info->trn, info,
2013 (translog_size_t) (log_pos - log_data) + length,
2014 TRANSLOG_INTERNAL_PARTS + translog_parts,
2015 log_array, log_data, NULL))
2016 DBUG_RETURN(1);
2017 DBUG_RETURN(0);
2018 }
2019
2020
2021 /**
2022 @brief Write log entry for page splitting
2023
2024 @fn _ma_log_split()
2025 @param
2026 ma_page Page that is changed
2027 org_length Original length of page. Can be bigger than block_size
2028 for block that overflowed
2029 new_length New length of page
2030 key_pos Where key is inserted on page (may be 0 if no key)
2031 key_length Number of bytes changed at key_pos
2032 move_length Number of bytes moved at key_pos to make room for key
2033 prefix_or_suffix KEY_OP_NONE Ignored
2034 KEY_OP_ADD_PREFIX Add data to start of page
2035 KEY_OP_ADD_SUFFIX Add data to end of page
2036 data What data was added
2037 data_length Number of bytes added first or last
2038 changed_length Number of bytes changed first or last.
2039
2040 @note
2041 Write log entry for page that has got a key added to the page under
2042 one and only one of the following senarios:
2043 - Page is shortened from end
2044 - Data is added to end of page
2045 - Data added at front of page
2046 */
2047
_ma_log_split(MARIA_PAGE * ma_page,uint org_length,uint new_length,const uchar * key_pos,uint key_length,int move_length,enum en_key_op prefix_or_suffix,const uchar * data,uint data_length,uint changed_length)2048 static my_bool _ma_log_split(MARIA_PAGE *ma_page,
2049 uint org_length, uint new_length,
2050 const uchar *key_pos, uint key_length,
2051 int move_length, enum en_key_op prefix_or_suffix,
2052 const uchar *data, uint data_length,
2053 uint changed_length)
2054 {
2055 LSN lsn;
2056 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+3+3+3+3+2 +7];
2057 uchar *log_pos;
2058 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
2059 uint offset= (uint) (key_pos - ma_page->buff);
2060 uint translog_parts, extra_length;
2061 MARIA_HA *info= ma_page->info;
2062 my_off_t page= ma_page->pos / info->s->block_size;
2063 DBUG_ENTER("_ma_log_split");
2064 DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
2065 (ulong) page, org_length, new_length));
2066
2067 DBUG_ASSERT(changed_length >= data_length);
2068 DBUG_ASSERT(org_length <= info->s->max_index_block_size);
2069 DBUG_ASSERT(new_length == ma_page->size);
2070 DBUG_ASSERT(org_length == ma_page->org_size);
2071
2072 log_pos= log_data + FILEID_STORE_SIZE;
2073 page_store(log_pos, page);
2074 log_pos+= PAGE_STORE_SIZE;
2075
2076 #ifdef EXTRA_DEBUG_KEY_CHANGES
2077 (*log_pos++)= KEY_OP_DEBUG;
2078 (*log_pos++)= KEY_OP_DEBUG_LOG_SPLIT;
2079 #endif
2080
2081 /* Store keypage_flag */
2082 *log_pos++= KEY_OP_SET_PAGEFLAG;
2083 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2084
2085 if (new_length <= offset || !key_pos)
2086 {
2087 /*
2088 Page was split before inserted key. Write redo entry where
2089 we just cut current page at page_length
2090 */
2091 uint length_offset= org_length - new_length;
2092 log_pos[0]= KEY_OP_DEL_SUFFIX;
2093 int2store(log_pos+1, length_offset);
2094 log_pos+= 3;
2095 translog_parts= 1;
2096 extra_length= 0;
2097 DBUG_ASSERT(data_length == 0);
2098 }
2099 else
2100 {
2101 /* Key was added to page which was split after the inserted key */
2102 uint max_key_length;
2103
2104 /*
2105 Handle case when split happened directly after the newly inserted key.
2106 */
2107 max_key_length= new_length - offset;
2108 extra_length= MY_MIN(key_length, max_key_length);
2109 if (offset + move_length > new_length)
2110 {
2111 /* This is true when move_length includes changes for next packed key */
2112 move_length= new_length - offset;
2113 }
2114
2115 if ((int) new_length < (int) (org_length + move_length + data_length))
2116 {
2117 /* Shorten page */
2118 uint diff= org_length + move_length + data_length - new_length;
2119 log_pos[0]= KEY_OP_DEL_SUFFIX;
2120 int2store(log_pos + 1, diff);
2121 log_pos+= 3;
2122 DBUG_ASSERT(data_length == 0); /* Page is shortened */
2123 DBUG_ASSERT(offset <= org_length - diff);
2124 }
2125 else
2126 {
2127 DBUG_ASSERT(new_length == org_length + move_length + data_length);
2128 DBUG_ASSERT(offset <= org_length);
2129 }
2130
2131 log_pos[0]= KEY_OP_OFFSET;
2132 int2store(log_pos+1, offset);
2133 log_pos+= 3;
2134
2135 if (move_length)
2136 {
2137 log_pos[0]= KEY_OP_SHIFT;
2138 int2store(log_pos+1, move_length);
2139 log_pos+= 3;
2140 }
2141
2142 log_pos[0]= KEY_OP_CHANGE;
2143 int2store(log_pos+1, extra_length);
2144 log_pos+= 3;
2145
2146 /* Point to original inserted key data */
2147 if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
2148 key_pos+= data_length;
2149
2150 translog_parts= 2;
2151 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
2152 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extra_length;
2153 }
2154
2155 if (data_length)
2156 {
2157 /* Add prefix or suffix */
2158 log_pos[0]= prefix_or_suffix;
2159 int2store(log_pos+1, data_length);
2160 log_pos+= 3;
2161 if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
2162 {
2163 int2store(log_pos+1, changed_length);
2164 log_pos+= 2;
2165 data_length= changed_length;
2166 }
2167 log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= data;
2168 log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= data_length;
2169 translog_parts++;
2170 extra_length+= data_length;
2171 }
2172
2173 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2174 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2175 log_data);
2176
2177 _ma_log_key_changes(ma_page,
2178 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2179 log_pos, &extra_length, &translog_parts);
2180 /* Remember new page length for future log entires for same page */
2181 ma_page->org_size= ma_page->size;
2182
2183 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2184 info->trn, info,
2185 (translog_size_t)
2186 log_array[TRANSLOG_INTERNAL_PARTS +
2187 0].length + extra_length,
2188 TRANSLOG_INTERNAL_PARTS + translog_parts,
2189 log_array, log_data, NULL));
2190 }
2191
2192
2193 /**
2194 @brief
2195 Write log entry for page that has got a key added to the page
2196 and page is shortened from start of page
2197
2198 @fn _ma_log_del_prefix()
2199 @param info Maria handler
2200 @param page Page number
2201 @param buff Page buffer
2202 @param org_length Length of buffer when read
2203 @param new_length Final length
2204 @param key_pos Where on page buffer key was added. This is position
2205 before prefix was removed
2206 @param key_length How many bytes was changed at 'key_pos'
2207 @param move_length How many bytes was moved up when key was added
2208
2209 @return
2210 @retval 0 ok
2211 @retval 1 error
2212 */
2213
_ma_log_del_prefix(MARIA_PAGE * ma_page,uint org_length,uint new_length,const uchar * key_pos,uint key_length,int move_length)2214 static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page,
2215 uint org_length, uint new_length,
2216 const uchar *key_pos, uint key_length,
2217 int move_length)
2218 {
2219 LSN lsn;
2220 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 12 + 7];
2221 uchar *log_pos;
2222 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
2223 uint offset= (uint) (key_pos - ma_page->buff);
2224 uint diff_length= org_length + move_length - new_length;
2225 uint translog_parts, extra_length;
2226 MARIA_HA *info= ma_page->info;
2227 my_off_t page= ma_page->pos / info->s->block_size;
2228 DBUG_ENTER("_ma_log_del_prefix");
2229 DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
2230 (ulong) page, org_length, new_length));
2231
2232 DBUG_ASSERT((int) diff_length > 0);
2233 DBUG_ASSERT(ma_page->org_size == org_length);
2234 DBUG_ASSERT(ma_page->size == new_length);
2235
2236 log_pos= log_data + FILEID_STORE_SIZE;
2237 page_store(log_pos, page);
2238 log_pos+= PAGE_STORE_SIZE;
2239
2240 translog_parts= 1;
2241 extra_length= 0;
2242
2243 #ifdef EXTRA_DEBUG_KEY_CHANGES
2244 *log_pos++= KEY_OP_DEBUG;
2245 *log_pos++= KEY_OP_DEBUG_LOG_DEL_PREFIX;
2246 #endif
2247
2248 /* Store keypage_flag */
2249 *log_pos++= KEY_OP_SET_PAGEFLAG;
2250 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2251
2252 if (offset < diff_length + info->s->keypage_header)
2253 {
2254 /*
2255 Key is not anymore on page. Move data down, but take into account that
2256 the original page had grown with 'move_length bytes'
2257 */
2258 DBUG_ASSERT(offset + key_length <= diff_length + info->s->keypage_header);
2259
2260 log_pos[0]= KEY_OP_DEL_PREFIX;
2261 int2store(log_pos+1, diff_length - move_length);
2262 log_pos+= 3;
2263 }
2264 else
2265 {
2266 /*
2267 Correct position to key, as data before key has been delete and key
2268 has thus been moved down
2269 */
2270 offset-= diff_length;
2271 key_pos-= diff_length;
2272
2273 /* Move data down */
2274 log_pos[0]= KEY_OP_DEL_PREFIX;
2275 int2store(log_pos+1, diff_length);
2276 log_pos+= 3;
2277
2278 log_pos[0]= KEY_OP_OFFSET;
2279 int2store(log_pos+1, offset);
2280 log_pos+= 3;
2281
2282 if (move_length)
2283 {
2284 log_pos[0]= KEY_OP_SHIFT;
2285 int2store(log_pos+1, move_length);
2286 log_pos+= 3;
2287 }
2288 log_pos[0]= KEY_OP_CHANGE;
2289 int2store(log_pos+1, key_length);
2290 log_pos+= 3;
2291 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
2292 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
2293 translog_parts= 2;
2294 extra_length= key_length;
2295 }
2296 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2297 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2298 log_data);
2299 _ma_log_key_changes(ma_page,
2300 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2301 log_pos, &extra_length, &translog_parts);
2302 /* Remember new page length for future log entires for same page */
2303 ma_page->org_size= ma_page->size;
2304
2305 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2306 info->trn, info,
2307 (translog_size_t)
2308 log_array[TRANSLOG_INTERNAL_PARTS +
2309 0].length + extra_length,
2310 TRANSLOG_INTERNAL_PARTS + translog_parts,
2311 log_array, log_data, NULL));
2312 }
2313
2314
2315 /**
2316 @brief
2317 Write log entry for page that has got data added first and
2318 data deleted last. Old changed key may be part of page
2319 */
2320
_ma_log_key_middle(MARIA_PAGE * ma_page,uint new_length,uint data_added_first,uint data_changed_first,uint data_deleted_last,const uchar * key_pos,uint key_length,int move_length)2321 static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page,
2322 uint new_length,
2323 uint data_added_first,
2324 uint data_changed_first,
2325 uint data_deleted_last,
2326 const uchar *key_pos,
2327 uint key_length, int move_length)
2328 {
2329 LSN lsn;
2330 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+5+3+3+3 + 7];
2331 uchar *log_pos;
2332 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
2333 uint key_offset;
2334 uint translog_parts, extra_length;
2335 MARIA_HA *info= ma_page->info;
2336 my_off_t page= ma_page->pos / info->s->block_size;
2337 DBUG_ENTER("_ma_log_key_middle");
2338 DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2339
2340 DBUG_ASSERT(ma_page->size == new_length);
2341
2342 /* new place of key after changes */
2343 key_pos+= data_added_first;
2344 key_offset= (uint) (key_pos - ma_page->buff);
2345 if (key_offset < new_length)
2346 {
2347 /* key is on page; Calculate how much of the key is there */
2348 uint max_key_length= new_length - key_offset;
2349 if (max_key_length < key_length)
2350 {
2351 /* Key is last on page */
2352 key_length= max_key_length;
2353 move_length= 0;
2354 }
2355 /*
2356 Take into account that new data was added as part of original key
2357 that also needs to be removed from page
2358 */
2359 data_deleted_last+= move_length;
2360 }
2361
2362 /* First log changes to page */
2363 log_pos= log_data + FILEID_STORE_SIZE;
2364 page_store(log_pos, page);
2365 log_pos+= PAGE_STORE_SIZE;
2366
2367 #ifdef EXTRA_DEBUG_KEY_CHANGES
2368 *log_pos++= KEY_OP_DEBUG;
2369 *log_pos++= KEY_OP_DEBUG_LOG_MIDDLE;
2370 #endif
2371
2372 /* Store keypage_flag */
2373 *log_pos++= KEY_OP_SET_PAGEFLAG;
2374 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2375
2376 log_pos[0]= KEY_OP_DEL_SUFFIX;
2377 int2store(log_pos+1, data_deleted_last);
2378 log_pos+= 3;
2379
2380 log_pos[0]= KEY_OP_ADD_PREFIX;
2381 int2store(log_pos+1, data_added_first);
2382 int2store(log_pos+3, data_changed_first);
2383 log_pos+= 5;
2384
2385 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2386 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2387 log_data);
2388 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (ma_page->buff +
2389 info->s->keypage_header);
2390 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2391 translog_parts= 2;
2392 extra_length= data_changed_first;
2393
2394 /* If changed key is on page, log those changes too */
2395
2396 if (key_offset < new_length)
2397 {
2398 uchar *start_log_pos= log_pos;
2399
2400 log_pos[0]= KEY_OP_OFFSET;
2401 int2store(log_pos+1, key_offset);
2402 log_pos+= 3;
2403 if (move_length)
2404 {
2405 log_pos[0]= KEY_OP_SHIFT;
2406 int2store(log_pos+1, move_length);
2407 log_pos+= 3;
2408 }
2409 log_pos[0]= KEY_OP_CHANGE;
2410 int2store(log_pos+1, key_length);
2411 log_pos+= 3;
2412
2413 log_array[TRANSLOG_INTERNAL_PARTS + 2].str= start_log_pos;
2414 log_array[TRANSLOG_INTERNAL_PARTS + 2].length= (uint) (log_pos -
2415 start_log_pos);
2416
2417 log_array[TRANSLOG_INTERNAL_PARTS + 3].str= key_pos;
2418 log_array[TRANSLOG_INTERNAL_PARTS + 3].length= key_length;
2419 translog_parts+=2;
2420 extra_length+= (uint) (log_array[TRANSLOG_INTERNAL_PARTS + 2].length +
2421 key_length);
2422 }
2423
2424 _ma_log_key_changes(ma_page,
2425 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2426 log_pos, &extra_length, &translog_parts);
2427 /* Remember new page length for future log entires for same page */
2428 ma_page->org_size= ma_page->size;
2429
2430 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2431 info->trn, info,
2432 (translog_size_t)
2433 (log_array[TRANSLOG_INTERNAL_PARTS +
2434 0].length + extra_length),
2435 TRANSLOG_INTERNAL_PARTS + translog_parts,
2436 log_array, log_data, NULL));
2437 }
2438
2439
2440 #ifdef NOT_NEEDED
2441
2442 /**
2443 @brief
2444 Write log entry for page that has got data added first and
2445 data deleted last
2446 */
2447
_ma_log_middle(MARIA_PAGE * ma_page,uint data_added_first,uint data_changed_first,uint data_deleted_last)2448 static my_bool _ma_log_middle(MARIA_PAGE *ma_page,
2449 uint data_added_first, uint data_changed_first,
2450 uint data_deleted_last)
2451 {
2452 LSN lsn;
2453 LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
2454 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5 + 7], *log_pos;
2455 MARIA_HA *info= ma_page->info;
2456 my_off_t page= ma_page->page / info->s->block_size;
2457 uint translog_parts, extra_length;
2458 DBUG_ENTER("_ma_log_middle");
2459 DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2460
2461 DBUG_ASSERT(ma_page->org_size + data_added_first - data_deleted_last ==
2462 ma_page->size);
2463
2464 log_pos= log_data + FILEID_STORE_SIZE;
2465 page_store(log_pos, page);
2466 log_pos+= PAGE_STORE_SIZE;
2467
2468 log_pos[0]= KEY_OP_DEL_PREFIX;
2469 int2store(log_pos+1, data_deleted_last);
2470 log_pos+= 3;
2471
2472 log_pos[0]= KEY_OP_ADD_PREFIX;
2473 int2store(log_pos+1, data_added_first);
2474 int2store(log_pos+3, data_changed_first);
2475 log_pos+= 5;
2476
2477 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2478 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2479 log_data);
2480
2481 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ((char*) buff +
2482 info->s->keypage_header);
2483 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2484 translog_parts= 2;
2485 extra_length= data_changed_first;
2486
2487 _ma_log_key_changes(ma_page,
2488 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2489 log_pos, &extra_length, &translog_parts);
2490 /* Remember new page length for future log entires for same page */
2491 ma_page->org_size= ma_page->size;
2492
2493 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2494 info->trn, info,
2495 (translog_size_t)
2496 log_array[TRANSLOG_INTERNAL_PARTS +
2497 0].length + extra_length,
2498 TRANSLOG_INTERNAL_PARTS + translog_parts,
2499 log_array, log_data, NULL));
2500 }
2501 #endif
2502