1 /* Copyright (C) 2004-2008 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2    Copyright (C) 2008-2009 Sun Microsystems, Inc.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 /* Write a row to a MARIA table */
18 
19 #include "ma_fulltext.h"
20 #include "ma_rt_index.h"
21 #include "trnman.h"
22 #include "ma_key_recover.h"
23 #include "ma_blockrec.h"
24 
25 	/* Functions declared in this file */
26 
27 static int w_search(MARIA_HA *info, uint32 comp_flag,
28                     MARIA_KEY *key, my_off_t page,
29 		    MARIA_PAGE *father_page, uchar *father_keypos,
30 		    my_bool insert_last);
31 static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
32 			    MARIA_KEY *key, MARIA_PAGE *curr_page,
33                             MARIA_PAGE *father_page,
34                             uchar *father_key_pos, MARIA_KEY_PARAM *s_temp);
35 static uchar *_ma_find_last_pos(MARIA_KEY *int_key,
36                                 MARIA_PAGE *page, uchar **after_key);
37 static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key);
38 static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key);
39 static my_bool _ma_ck_write_btree_with_log(MARIA_HA *, MARIA_KEY *, my_off_t *,
40                                            uint32);
41 static my_bool _ma_log_split(MARIA_PAGE *page, uint org_length,
42                              uint new_length,
43                              const uchar *key_pos,
44                              uint key_length, int move_length,
45                              enum en_key_op prefix_or_suffix,
46                              const uchar *data, uint data_length,
47                              uint changed_length);
48 static my_bool _ma_log_del_prefix(MARIA_PAGE *page,
49                                   uint org_length, uint new_length,
50                                   const uchar *key_pos, uint key_length,
51                                   int move_length);
52 static my_bool _ma_log_key_middle(MARIA_PAGE *page,
53                                   uint new_length,
54                                   uint data_added_first,
55                                   uint data_changed_first,
56                                   uint data_deleted_last,
57                                   const uchar *key_pos,
58                                   uint key_length, int move_length);
59 
60 /*
61   @brief Default handler for returing position to new row
62 
63   @note
64     This is only called for non transactional tables and not for block format
65     which is why we use info->state here.
66 */
67 
_ma_write_init_default(MARIA_HA * info,const uchar * record)68 MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info,
69                                         const uchar *record
70                                         __attribute__((unused)))
71 {
72   return ((info->s->state.dellink != HA_OFFSET_ERROR &&
73            !info->append_insert_at_end) ?
74           info->s->state.dellink :
75           info->state->data_file_length);
76 }
77 
_ma_write_abort_default(MARIA_HA * info)78 my_bool _ma_write_abort_default(MARIA_HA *info __attribute__((unused)))
79 {
80   return 0;
81 }
82 
83 
84 /* Write new record to a table */
85 
maria_write(MARIA_HA * info,const uchar * record)86 int maria_write(MARIA_HA *info, const uchar *record)
87 {
88   MARIA_SHARE *share= info->s;
89   uint i;
90   int save_errno;
91   MARIA_RECORD_POS filepos, oldpos= info->cur_row.lastpos;
92   uchar *buff;
93   my_bool lock_tree= share->lock_key_trees;
94   my_bool fatal_error;
95   MARIA_KEYDEF *keyinfo;
96   DBUG_ENTER("maria_write");
97   DBUG_PRINT("enter",("index_file: %d  data_file: %d",
98                       share->kfile.file, info->dfile.file));
99 
100   DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
101                   maria_print_error(info->s, HA_ERR_CRASHED);
102                   DBUG_RETURN(my_errno= HA_ERR_CRASHED););
103   if (share->options & HA_OPTION_READ_ONLY_DATA)
104   {
105     DBUG_RETURN(my_errno=EACCES);
106   }
107   if (_ma_readinfo(info,F_WRLCK,1))
108     DBUG_RETURN(my_errno);
109 
110   if ((share->state.changed & STATE_DATA_FILE_FULL) ||
111       (share->base.reloc == (ha_rows) 1 &&
112        share->base.records == (ha_rows) 1 &&
113        share->state.state.records == (ha_rows) 1))
114   {						/* System file */
115     my_errno=HA_ERR_RECORD_FILE_FULL;
116     goto err2;
117   }
118   if (share->state.state.key_file_length >= share->base.margin_key_file_length)
119   {
120     my_errno=HA_ERR_INDEX_FILE_FULL;
121     goto err2;
122   }
123   if (_ma_mark_file_changed(share))
124     goto err2;
125 
126   /* Calculate and check all unique constraints */
127 
128   if (share->state.header.uniques)
129   {
130     for (i=0 ; i < share->state.header.uniques ; i++)
131     {
132       MARIA_UNIQUEDEF *def= share->uniqueinfo + i;
133       ha_checksum unique_hash= _ma_unique_hash(share->uniqueinfo+i,record);
134       if (maria_is_key_active(share->state.key_map, def->key))
135       {
136         if (_ma_check_unique(info, def, record,
137                              unique_hash, HA_OFFSET_ERROR))
138           goto err2;
139       }
140       else
141         maria_unique_store(record+ share->keyinfo[def->key].seg->start,
142                            unique_hash);
143     }
144   }
145 
146   /* Ensure we don't try to restore auto_increment if it doesn't change */
147   info->last_auto_increment= ~(ulonglong) 0;
148 
149   if ((info->opt_flag & OPT_NO_ROWS))
150     filepos= HA_OFFSET_ERROR;
151   else
152   {
153     /*
154       This may either calculate a record or, or write the record and return
155       the record id
156     */
157     if ((filepos= (*share->write_record_init)(info, record)) ==
158         HA_OFFSET_ERROR)
159       goto err2;
160   }
161 
162   /* Write all keys to indextree */
163   buff= info->lastkey_buff2;
164   for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
165   {
166     MARIA_KEY int_key;
167     if (maria_is_key_active(share->state.key_map, i))
168     {
169       my_bool local_lock_tree= (lock_tree &&
170                                 !(info->bulk_insert &&
171                                   is_tree_inited(&info->bulk_insert[i])));
172       if (local_lock_tree)
173       {
174 	mysql_rwlock_wrlock(&keyinfo->root_lock);
175 	keyinfo->version++;
176       }
177       if (keyinfo->flag & HA_FULLTEXT )
178       {
179         if (_ma_ft_add(info,i, buff,record,filepos))
180         {
181 	  if (local_lock_tree)
182 	    mysql_rwlock_unlock(&keyinfo->root_lock);
183           DBUG_PRINT("error",("Got error: %d on write",my_errno));
184           goto err;
185         }
186       }
187       else
188       {
189         while (keyinfo->ck_insert(info,
190                                   (*keyinfo->make_key)(info, &int_key, i,
191                                                        buff, record, filepos,
192                                                        info->trn->trid)))
193         {
194           TRN *blocker;
195           DBUG_PRINT("error",("Got error: %d on write",my_errno));
196           /*
197             explicit check to filter out temp tables, they aren't
198             transactional and don't have a proper TRN so the code
199             below doesn't work for them.
200             Also, filter out non-thread maria use, and table modified in
201             the same transaction.
202             At last, filter out non-dup-unique errors.
203           */
204           if (!local_lock_tree)
205             goto err;
206           if (info->dup_key_trid == info->trn->trid ||
207               my_errno != HA_ERR_FOUND_DUPP_KEY)
208           {
209 	    mysql_rwlock_unlock(&keyinfo->root_lock);
210             goto err;
211           }
212           /* Different TrIDs: table must be transactional */
213           DBUG_ASSERT(share->base.born_transactional);
214           /*
215             If transactions are disabled, and dup_key_trid is different from
216             our TrID, it must be ALTER TABLE with dup_key_trid==0 (no
217             transaction). ALTER TABLE does have MARIA_HA::TRN not dummy but
218             puts TrID=0 in rows/keys.
219           */
220           DBUG_ASSERT(share->now_transactional ||
221                       (info->dup_key_trid == 0));
222           blocker= trnman_trid_to_trn(info->trn, info->dup_key_trid);
223           /*
224             if blocker TRN was not found, it means that the conflicting
225             transaction was committed long time ago. It could not be
226             aborted, as it would have to wait on the key tree lock
227             to remove the conflicting key it has inserted.
228           */
229           if (!blocker || blocker->commit_trid != ~(TrID)0)
230           { /* committed */
231             if (blocker)
232               mysql_mutex_unlock(& blocker->state_lock);
233             mysql_rwlock_unlock(&keyinfo->root_lock);
234             goto err;
235           }
236           mysql_rwlock_unlock(&keyinfo->root_lock);
237           {
238             /* running. now we wait */
239             WT_RESOURCE_ID rc;
240             int res;
241             PSI_stage_info old_stage_info;
242 
243             rc.type= &ma_rc_dup_unique;
244             /* TODO savepoint id when we'll have them */
245             rc.value= (intptr)blocker;
246             res= wt_thd_will_wait_for(info->trn->wt, blocker->wt, & rc);
247             if (res != WT_OK)
248             {
249               mysql_mutex_unlock(& blocker->state_lock);
250               my_errno= HA_ERR_LOCK_DEADLOCK;
251               goto err;
252             }
253             proc_info_hook(0, &stage_waiting_for_a_resource, &old_stage_info,
254                            __func__, __FILE__, __LINE__);
255             res= wt_thd_cond_timedwait(info->trn->wt, & blocker->state_lock);
256             proc_info_hook(0, &old_stage_info, 0, __func__, __FILE__, __LINE__);
257 
258             mysql_mutex_unlock(& blocker->state_lock);
259             if (res != WT_OK)
260             {
261               my_errno= res == WT_TIMEOUT ? HA_ERR_LOCK_WAIT_TIMEOUT
262                                           : HA_ERR_LOCK_DEADLOCK;
263               goto err;
264             }
265           }
266           mysql_rwlock_wrlock(&keyinfo->root_lock);
267 #ifndef MARIA_CANNOT_ROLLBACK
268           keyinfo->version++;
269 #endif
270         }
271       }
272 
273       /* The above changed info->lastkey2. Inform maria_rnext_same(). */
274       info->update&= ~HA_STATE_RNEXT_SAME;
275 
276       if (local_lock_tree)
277         mysql_rwlock_unlock(&keyinfo->root_lock);
278     }
279   }
280   if (share->calc_write_checksum)
281     info->cur_row.checksum= (*share->calc_write_checksum)(info,record);
282   if (filepos != HA_OFFSET_ERROR)
283   {
284     if ((*share->write_record)(info,record))
285       goto err;
286     info->state->checksum+= info->cur_row.checksum;
287   }
288   if (!share->now_transactional)
289   {
290     if (share->base.auto_key != 0)
291     {
292       const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg;
293       const uchar *key= record + keyseg->start;
294       set_if_bigger(share->state.auto_increment,
295                     ma_retrieve_auto_increment(key, keyseg->type));
296     }
297   }
298   info->state->records++;
299   info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
300 		 HA_STATE_ROW_CHANGED);
301   info->row_changes++;
302   share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
303   info->state->changed= 1;
304 
305   info->cur_row.lastpos= oldpos;
306   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
307   if (info->invalidator != 0)
308   {
309     DBUG_PRINT("info", ("invalidator... '%s' (update)",
310                         share->open_file_name.str));
311     (*info->invalidator)(share->open_file_name.str);
312     info->invalidator=0;
313   }
314 
315   /*
316     Update status of the table. We need to do so after each row write
317     for the log tables, as we want the new row to become visible to
318     other threads as soon as possible. We don't lock mutex here
319     (as it is required by pthread memory visibility rules) as (1) it's
320     not critical to use outdated share->is_log_table value (2) locking
321     mutex here for every write is too expensive.
322   */
323   if (share->is_log_table)
324     _ma_update_status((void*) info);
325 
326   DBUG_RETURN(0);
327 
328 err:
329   save_errno= my_errno;
330   fatal_error= 0;
331   if (my_errno == HA_ERR_FOUND_DUPP_KEY ||
332       my_errno == HA_ERR_RECORD_FILE_FULL ||
333       my_errno == HA_ERR_LOCK_DEADLOCK ||
334       my_errno == HA_ERR_LOCK_WAIT_TIMEOUT ||
335       my_errno == HA_ERR_NULL_IN_SPATIAL ||
336       my_errno == HA_ERR_OUT_OF_MEM)
337   {
338     info->errkey= i < share->base.keys ? (int) i : -1;
339     /*
340       We delete keys in the reverse order of insertion. This is the order that
341       a rollback would do and is important for CLR_ENDs generated by
342       _ma_ft|ck_delete() and write_record_abort() to work (with any other
343       order they would cause wrong jumps in the chain).
344     */
345     while ( i-- > 0)
346     {
347       if (maria_is_key_active(share->state.key_map, i))
348       {
349 	my_bool local_lock_tree= (lock_tree &&
350                                   !(info->bulk_insert &&
351                                     is_tree_inited(&info->bulk_insert[i])));
352         keyinfo= share->keyinfo + i;
353 	if (local_lock_tree)
354 	  mysql_rwlock_wrlock(&keyinfo->root_lock);
355         /**
356            @todo RECOVERY BUG
357            The key deletes below should generate CLR_ENDs
358         */
359 	if (keyinfo->flag & HA_FULLTEXT)
360         {
361           if (_ma_ft_del(info,i,buff,record,filepos))
362 	  {
363             fatal_error= 1;
364 	    if (local_lock_tree)
365 	      mysql_rwlock_unlock(&keyinfo->root_lock);
366             break;
367 	  }
368         }
369         else
370 	{
371 	  MARIA_KEY key;
372 	  if (keyinfo->ck_delete(info,
373                                  (*keyinfo->make_key)(info, &key, i, buff,
374                                                       record,
375                                                       filepos,
376                                                       info->trn->trid)))
377 	  {
378             fatal_error= 1;
379 	    if (local_lock_tree)
380 	      mysql_rwlock_unlock(&keyinfo->root_lock);
381 	    break;
382 	  }
383 	}
384 	if (local_lock_tree)
385 	  mysql_rwlock_unlock(&keyinfo->root_lock);
386       }
387     }
388   }
389   else
390     fatal_error= 1;
391 
392   if (filepos != HA_OFFSET_ERROR)
393   {
394     if ((*share->write_record_abort)(info))
395       fatal_error= 1;
396   }
397 
398   if (info->bulk_insert)
399   {
400     uint j;
401     for (j=0 ; j < share->base.keys ; j++)
402       maria_flush_bulk_insert(info, j);
403   }
404 
405   if (fatal_error)
406   {
407     maria_print_error(info->s, HA_ERR_CRASHED);
408     maria_mark_crashed(info);
409   }
410 
411   info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
412   my_errno=save_errno;
413 err2:
414   save_errno=my_errno;
415   DBUG_ASSERT(save_errno);
416   if (!save_errno)
417     save_errno= HA_ERR_INTERNAL_ERROR;          /* Should never happen */
418   DBUG_PRINT("error", ("got error: %d", save_errno));
419   _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
420   DBUG_RETURN(my_errno=save_errno);
421 } /* maria_write */
422 
423 
424 /*
425   Write one key to btree
426 
427   TODO
428     Remove this function and have bulk insert change keyinfo->ck_insert
429     to point to the right function
430 */
431 
_ma_ck_write(MARIA_HA * info,MARIA_KEY * key)432 my_bool _ma_ck_write(MARIA_HA *info, MARIA_KEY *key)
433 {
434   DBUG_ENTER("_ma_ck_write");
435 
436   if (info->bulk_insert &&
437       is_tree_inited(&info->bulk_insert[key->keyinfo->key_nr]))
438   {
439     DBUG_RETURN(_ma_ck_write_tree(info, key));
440   }
441   DBUG_RETURN(_ma_ck_write_btree(info, key));
442 } /* _ma_ck_write */
443 
444 
445 /**********************************************************************
446   Insert key into btree (normal case)
447 **********************************************************************/
448 
_ma_ck_write_btree(MARIA_HA * info,MARIA_KEY * key)449 static my_bool _ma_ck_write_btree(MARIA_HA *info, MARIA_KEY *key)
450 {
451   my_bool error;
452   MARIA_KEYDEF *keyinfo= key->keyinfo;
453   my_off_t  *root= &info->s->state.key_root[keyinfo->key_nr];
454   DBUG_ENTER("_ma_ck_write_btree");
455 
456   error= _ma_ck_write_btree_with_log(info, key, root,
457                                      keyinfo->write_comp_flag | key->flag);
458   if (info->ft1_to_ft2)
459   {
460     if (!error)
461       error= _ma_ft_convert_to_ft2(info, key);
462     delete_dynamic(info->ft1_to_ft2);
463     my_free(info->ft1_to_ft2);
464     info->ft1_to_ft2=0;
465   }
466   DBUG_RETURN(error);
467 } /* _ma_ck_write_btree */
468 
469 
470 /**
471   @brief Write a key to the b-tree
472 
473   @retval 1   error
474   @retval 0    ok
475 */
476 
_ma_ck_write_btree_with_log(MARIA_HA * info,MARIA_KEY * key,my_off_t * root,uint32 comp_flag)477 static my_bool _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key,
478                                            my_off_t *root, uint32 comp_flag)
479 {
480   MARIA_SHARE *share= info->s;
481   LSN lsn= LSN_IMPOSSIBLE;
482   int error;
483   my_off_t new_root= *root;
484   uchar key_buff[MARIA_MAX_KEY_BUFF];
485   MARIA_KEY org_key; /* Set/used when now_transactional=TRUE */
486   my_bool transactional= share->now_transactional;
487   DBUG_ENTER("_ma_ck_write_btree_with_log");
488 
489   LINT_INIT_STRUCT(org_key);
490 
491   if (transactional)
492   {
493     /* Save original value as the key may change */
494     org_key= *key;
495     memcpy(key_buff, key->data, key->data_length + key->ref_length);
496   }
497 
498   error= _ma_ck_real_write_btree(info, key, &new_root, comp_flag);
499   if (!error && transactional)
500   {
501     /* Log the original value */
502     *key= org_key;
503     key->data= key_buff;
504     error= _ma_write_undo_key_insert(info, key, root, new_root, &lsn);
505   }
506   else
507   {
508     *root= new_root;
509     _ma_fast_unlock_key_del(info);
510   }
511   _ma_unpin_all_pages_and_finalize_row(info, lsn);
512 
513   DBUG_RETURN(error != 0);
514 } /* _ma_ck_write_btree_with_log */
515 
516 
517 /**
518   @brief Write a key to the b-tree
519 
520   @retval 1   error
521   @retval 0    ok
522 */
523 
_ma_ck_real_write_btree(MARIA_HA * info,MARIA_KEY * key,my_off_t * root,uint32 comp_flag)524 my_bool _ma_ck_real_write_btree(MARIA_HA *info, MARIA_KEY *key, my_off_t *root,
525                             uint32 comp_flag)
526 {
527   int error;
528   DBUG_ENTER("_ma_ck_real_write_btree");
529 
530   /* key_length parameter is used only if comp_flag is SEARCH_FIND */
531   if (*root == HA_OFFSET_ERROR ||
532       (error= w_search(info, comp_flag, key, *root, (MARIA_PAGE *) 0,
533                        (uchar*) 0, 1)) > 0)
534     error= _ma_enlarge_root(info, key, root);
535   DBUG_RETURN(error != 0);
536 } /* _ma_ck_real_write_btree */
537 
538 
539 /**
540   @brief Make a new root with key as only pointer
541 
542   @retval 1   error
543   @retval 0    ok
544 */
545 
_ma_enlarge_root(MARIA_HA * info,MARIA_KEY * key,my_off_t * root)546 my_bool _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root)
547 {
548   uint t_length, nod_flag;
549   MARIA_KEY_PARAM s_temp;
550   MARIA_SHARE *share= info->s;
551   MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
552   MARIA_KEYDEF *keyinfo= key->keyinfo;
553   MARIA_PAGE page;
554   my_bool res= 0;
555   DBUG_ENTER("_ma_enlarge_root");
556 
557   page.info=    info;
558   page.keyinfo= keyinfo;
559   page.buff=    info->buff;
560   page.flag=    0;
561 
562   nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
563   /* Store pointer to prev page if nod */
564   _ma_kpointer(info, page.buff + share->keypage_header, *root);
565   t_length= (*keyinfo->pack_key)(key, nod_flag, (uchar*) 0,
566                                  (uchar*) 0, (uchar*) 0, &s_temp);
567   page.size= share->keypage_header + t_length + nod_flag;
568 
569   bzero(page.buff, share->keypage_header);
570   _ma_store_keynr(share, page.buff, keyinfo->key_nr);
571   if (nod_flag)
572     page.flag|= KEYPAGE_FLAG_ISNOD;
573   if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
574     page.flag|= KEYPAGE_FLAG_HAS_TRANSID;
575   (*keyinfo->store_key)(keyinfo, page.buff + share->keypage_header +
576                         nod_flag, &s_temp);
577 
578   /* Mark that info->buff was used */
579   info->keyread_buff_used= info->page_changed= 1;
580   if ((page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
581       HA_OFFSET_ERROR)
582     DBUG_RETURN(1);
583   *root= page.pos;
584 
585   page_store_info(share, &page);
586 
587   /*
588     Clear unitialized part of page to avoid valgrind/purify warnings
589     and to get a clean page that is easier to compress and compare with
590     pages generated with redo
591   */
592   bzero(page.buff + page.size, share->block_size - page.size);
593 
594   if (share->now_transactional && _ma_log_new(&page, 1))
595     res= 1;
596 
597   if (_ma_write_keypage(&page, page_link->write_lock,
598                         PAGECACHE_PRIORITY_HIGH))
599     res= 1;
600 
601   DBUG_RETURN(res);
602 } /* _ma_enlarge_root */
603 
604 
605 /*
606   Search after a position for a key and store it there
607 
608   TODO:
609   Change this to use pagecache directly instead of creating a copy
610   of the page. To do this, we must however change write-key-on-page
611   algorithm to not overwrite the buffer but instead store any overflow
612   key in a separate buffer.
613 
614   @return
615   @retval -1   error
616   @retval 0    ok
617   @retval > 0  Key should be stored in higher tree
618 */
619 
w_search(register MARIA_HA * info,uint32 comp_flag,MARIA_KEY * key,my_off_t page_pos,MARIA_PAGE * father_page,uchar * father_keypos,my_bool insert_last)620 static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
621                     my_off_t page_pos,
622                     MARIA_PAGE *father_page, uchar *father_keypos,
623 		    my_bool insert_last)
624 {
625   int error,flag;
626   uchar *temp_buff,*keypos,*keybuff;
627   my_bool was_last_key, buff_alloced;
628   my_off_t next_page, dup_key_pos;
629   MARIA_SHARE *share= info->s;
630   MARIA_KEYDEF *keyinfo= key->keyinfo;
631   MARIA_PAGE page;
632   DBUG_ENTER("w_search");
633   DBUG_PRINT("enter", ("page: %lu", (ulong) (page_pos/keyinfo->block_length)));
634 
635   alloc_on_stack(*info->stack_end_ptr, temp_buff, buff_alloced,
636                  (keyinfo->block_length + keyinfo->max_store_length*3));
637   if (!temp_buff)
638     DBUG_RETURN(1);
639 
640   keybuff= temp_buff + (keyinfo->block_length + keyinfo->max_store_length*2);
641 
642   if (_ma_fetch_keypage(&page, info, keyinfo, page_pos, PAGECACHE_LOCK_WRITE,
643                         DFLT_INIT_HITS, temp_buff, 0))
644     goto err;
645 
646   flag= (*keyinfo->bin_search)(key, &page, comp_flag, &keypos,
647                                keybuff, &was_last_key);
648   if (flag == 0)
649   {
650     MARIA_KEY tmp_key;
651     /* get position to record with duplicated key */
652 
653     tmp_key.keyinfo= keyinfo;
654     tmp_key.data= keybuff;
655 
656     if ((*keyinfo->get_key)(&tmp_key, page.flag, page.node, &keypos))
657       dup_key_pos= _ma_row_pos_from_key(&tmp_key);
658     else
659       dup_key_pos= HA_OFFSET_ERROR;
660 
661     if (keyinfo->flag & HA_FULLTEXT)
662     {
663       uint off;
664       int  subkeys;
665 
666       get_key_full_length_rdonly(off, keybuff);
667       subkeys=ft_sintXkorr(keybuff+off);
668       comp_flag=SEARCH_SAME;
669       if (subkeys >= 0)
670       {
671         /* normal word, one-level tree structure */
672         flag=(*keyinfo->bin_search)(key, &page, comp_flag,
673                                     &keypos, keybuff, &was_last_key);
674       }
675       else
676       {
677         /* popular word. two-level tree. going down */
678         my_off_t root= dup_key_pos;
679         MARIA_KEY subkey;
680         get_key_full_length_rdonly(off, key->data);
681         subkey.keyinfo= keyinfo= &share->ft2_keyinfo;
682         subkey.data= key->data + off;
683         subkey.data_length= key->data_length - off;
684         subkey.ref_length= key->ref_length;
685         subkey.flag= key->flag;
686 
687         /* we'll modify key entry 'in vivo' */
688         keypos-= keyinfo->keylength + page.node;
689         error= _ma_ck_real_write_btree(info, &subkey, &root, comp_flag);
690         _ma_dpointer(share, keypos+HA_FT_WLEN, root);
691         subkeys--; /* should there be underflow protection ? */
692         DBUG_ASSERT(subkeys < 0);
693         ft_intXstore(keypos, subkeys);
694         if (!error)
695         {
696           page_mark_changed(info, &page);
697           if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
698                                 DFLT_INIT_HITS))
699             goto err;
700         }
701         stack_alloc_free(temp_buff, buff_alloced);
702         DBUG_RETURN(error);
703       }
704     }
705     else /* not HA_FULLTEXT, normal HA_NOSAME key */
706     {
707       /*
708         TODO
709         When the index will support true versioning - with multiple
710         identical values in the UNIQUE index, invisible to each other -
711         the following should be changed to "continue inserting keys, at the
712         end (of the row or statement) wait". We need to wait on *all*
713         unique conflicts at once, not one-at-a-time, because we need to
714         know all blockers in advance, otherwise we'll have incomplete wait-for
715         graph.
716       */
717       /*
718         transaction that has inserted the conflicting key may be in progress.
719         the caller will wait for it to be committed or aborted.
720       */
721       info->dup_key_trid= _ma_trid_from_key(&tmp_key);
722       info->dup_key_pos= dup_key_pos;
723       my_errno= HA_ERR_FOUND_DUPP_KEY;
724       DBUG_PRINT("warning",
725                  ("Duplicate key. dup_key_trid: %lu  pos %lu  visible: %d",
726                   (ulong) info->dup_key_trid,
727                   (ulong) info->dup_key_pos,
728                   info->trn ? trnman_can_read_from(info->trn,
729                                                    info->dup_key_trid) : 2));
730       goto err;
731     }
732   }
733   if (flag == MARIA_FOUND_WRONG_KEY)
734     goto err;
735   if (!was_last_key)
736     insert_last=0;
737   next_page= _ma_kpos(page.node, keypos);
738   if (next_page == HA_OFFSET_ERROR ||
739       (error= w_search(info, comp_flag, key, next_page,
740                        &page, keypos, insert_last)) > 0)
741   {
742     error= _ma_insert(info, key, &page, keypos, keybuff,
743                       father_page, father_keypos, insert_last);
744     if (error < 0)
745       goto err;
746     page_mark_changed(info, &page);
747     if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
748                           DFLT_INIT_HITS))
749       goto err;
750   }
751   stack_alloc_free(temp_buff, buff_alloced);
752   DBUG_RETURN(error);
753 err:
754   stack_alloc_free(temp_buff, buff_alloced);
755   DBUG_PRINT("exit",("Error: %d",my_errno));
756   DBUG_RETURN(-1);
757 } /* w_search */
758 
759 
760 /*
761   Insert new key.
762 
763   SYNOPSIS
764     _ma_insert()
765     info                        Open table information.
766     keyinfo                     Key definition information.
767     key                         New key
768     anc_page                    Key page (beginning)
769     key_pos                     Position in key page where to insert.
770     key_buff                    Copy of previous key if keys where packed.
771     father_page                 position of parent key page in file.
772     father_key_pos              position in parent key page for balancing.
773     insert_last                 If to append at end of page.
774 
775   DESCRIPTION
776     Insert new key at right of key_pos.
777     Note that caller must save anc_buff
778 
779     This function writes log records for all changed pages
780     (Including anc_buff and father page)
781 
782   RETURN
783     < 0         Error.
784     0           OK
785     1           If key contains key to upper level (from balance page)
786     2           If key contains key to upper level (from split space)
787 */
788 
_ma_insert(register MARIA_HA * info,MARIA_KEY * key,MARIA_PAGE * anc_page,uchar * key_pos,uchar * key_buff,MARIA_PAGE * father_page,uchar * father_key_pos,my_bool insert_last)789 int _ma_insert(register MARIA_HA *info, MARIA_KEY *key,
790                MARIA_PAGE *anc_page, uchar *key_pos, uchar *key_buff,
791                MARIA_PAGE *father_page, uchar *father_key_pos,
792                my_bool insert_last)
793 {
794   uint a_length, nod_flag, org_anc_length;
795   int t_length;
796   uchar *endpos, *prev_key, *anc_buff;
797   MARIA_KEY_PARAM s_temp;
798   MARIA_SHARE *share= info->s;
799   MARIA_KEYDEF *keyinfo= key->keyinfo;
800   DBUG_ENTER("_ma_insert");
801   DBUG_PRINT("enter",("key_pos:%p", key_pos));
802   DBUG_EXECUTE("key", _ma_print_key(DBUG_FILE, key););
803 
804   /*
805     Note that anc_page->size can be bigger then block_size in case of
806     delete key that caused increase of page length
807   */
808   org_anc_length= a_length= anc_page->size;
809   nod_flag= anc_page->node;
810 
811   anc_buff= anc_page->buff;
812   endpos= anc_buff+ a_length;
813   prev_key= (key_pos == anc_buff + share->keypage_header + nod_flag ?
814              (uchar*) 0 : key_buff);
815   t_length= (*keyinfo->pack_key)(key, nod_flag,
816                                  (key_pos == endpos ? (uchar*) 0 : key_pos),
817                                  prev_key, prev_key, &s_temp);
818 #ifndef DBUG_OFF
819   if (prev_key && (keyinfo->flag & (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
820   {
821     DBUG_DUMP("prev_key", prev_key, _ma_keylength(keyinfo,prev_key));
822   }
823   if (keyinfo->flag & HA_PACK_KEY)
824   {
825     DBUG_PRINT("test",("t_length: %d  ref_len: %d",
826 		       t_length,s_temp.ref_length));
827     DBUG_PRINT("test",("n_ref_len: %d  n_length: %d  key_pos: %p",
828 		       s_temp.n_ref_length, s_temp.n_length, s_temp.key));
829   }
830 #endif
831   if (t_length > 0)
832   {
833     if (t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE)
834     {
835       _ma_set_fatal_error(share, HA_ERR_CRASHED);
836       DBUG_RETURN(-1);
837     }
838     bmove_upp(endpos+t_length, endpos, (uint) (endpos-key_pos));
839   }
840   else
841   {
842     if (-t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE)
843     {
844       _ma_set_fatal_error(share, HA_ERR_CRASHED);
845       DBUG_RETURN(-1);
846     }
847     bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
848   }
849   (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
850   a_length+=t_length;
851 
852   if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
853     _ma_mark_page_with_transid(share, anc_page);
854 
855   anc_page->size= a_length;
856   page_store_size(share, anc_page);
857 
858   /*
859     Check if the new key fits totally into the the page
860     (anc_buff is big enough to contain a full page + one key)
861   */
862   if (a_length <= share->max_index_block_size)
863   {
864     if (share->max_index_block_size - a_length < 32 &&
865         (keyinfo->flag & HA_FULLTEXT) && key_pos == endpos &&
866         share->base.key_reflength <= share->rec_reflength &&
867         share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
868     {
869       /*
870         Normal word. One-level tree. Page is almost full.
871         Let's consider converting.
872         We'll compare 'key' and the first key at anc_buff
873       */
874       const uchar *a= key->data;
875       const uchar *b= anc_buff + share->keypage_header + nod_flag;
876       uint alen, blen, ft2len= share->ft2_keyinfo.keylength;
877       /* the very first key on the page is always unpacked */
878       DBUG_ASSERT((*b & 128) == 0);
879 #if HA_FT_MAXLEN >= 127
880       blen= mi_uint2korr(b); b+=2;
881       When you enable this code, as part of the MyISAM->Maria merge of
882 ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0
883   restore ft2 functionality, fix bugs.
884       Then this will enable two-level fulltext index, which is not totally
885       recoverable yet.
886       So remove this text and inform Guilhem so that he fixes the issue.
887 #else
888       blen= *b++;
889 #endif
890       get_key_length(alen,a);
891       DBUG_ASSERT(info->ft1_to_ft2==0);
892       if (alen == blen &&
893           ha_compare_text(keyinfo->seg->charset, a, alen,
894                           b, blen, 0) == 0)
895       {
896         /* Yup. converting */
897         info->ft1_to_ft2=(DYNAMIC_ARRAY *)
898           my_malloc(PSI_INSTRUMENT_ME, sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
899         my_init_dynamic_array(PSI_INSTRUMENT_ME, info->ft1_to_ft2, ft2len, 300,
900                               50, MYF(0));
901 
902         /*
903           Now, adding all keys from the page to dynarray
904           if the page is a leaf (if not keys will be deleted later)
905         */
906         if (!nod_flag)
907         {
908           /*
909             Let's leave the first key on the page, though, because
910             we cannot easily dispatch an empty page here
911           */
912           b+=blen+ft2len+2;
913           for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
914             insert_dynamic(info->ft1_to_ft2, b);
915 
916           /* fixing the page's length - it contains only one key now */
917           anc_page->size= share->keypage_header + blen + ft2len + 2;
918           page_store_size(share, anc_page);
919         }
920         /* the rest will be done when we're back from recursion */
921       }
922     }
923     else
924     {
925       if (share->now_transactional &&
926           _ma_log_add(anc_page, org_anc_length,
927                       key_pos, s_temp.changed_length, t_length, 1,
928                       KEY_OP_DEBUG_LOG_ADD_1))
929         DBUG_RETURN(-1);
930     }
931     DBUG_RETURN(0);				/* There is room on page */
932   }
933   /* Page is full */
934   if (nod_flag)
935     insert_last=0;
936   /*
937     TODO:
938     Remove 'born_transactional' here.
939     The only reason for having it here is that the current
940     _ma_balance_page_ can't handle variable length keys.
941   */
942   if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
943       father_page && !insert_last && !info->quick_mode &&
944       !info->s->base.born_transactional)
945   {
946     s_temp.key_pos= key_pos;
947     page_mark_changed(info, father_page);
948     DBUG_RETURN(_ma_balance_page(info, keyinfo, key, anc_page,
949                                  father_page, father_key_pos,
950                                  &s_temp));
951   }
952   DBUG_RETURN(_ma_split_page(info, key, anc_page,
953                              MY_MIN(org_anc_length,
954                                  info->s->max_index_block_size),
955                              key_pos, s_temp.changed_length, t_length,
956                              key_buff, insert_last));
957 } /* _ma_insert */
958 
959 
960 /**
961   @brief split a full page in two and assign emerging item to key
962 
963   @fn _ma_split_page()
964     info	     Maria handler
965     keyinfo	     Key handler
966     key		     Buffer for middle key
967     split_page       Page that should be split
968     org_split_length Original length of split_page before key was inserted
969     inserted_key_pos Address in buffer where key was inserted
970     changed_length   Number of bytes changed at 'inserted_key_pos'
971     move_length	     Number of bytes buffer was moved when key was inserted
972     key_buff	     Key buffer to use for temporary storage of key
973     insert_last_key  If we are insert key on rightmost key page
974 
975   @note
976     split_buff is not stored on disk    (caller has to do this)
977 
978   @return
979   @retval 2   ok  (Middle key up from _ma_insert())
980   @retval -1  error
981 */
982 
_ma_split_page(MARIA_HA * info,MARIA_KEY * key,MARIA_PAGE * split_page,uint org_split_length,uchar * inserted_key_pos,uint changed_length,int move_length,uchar * key_buff,my_bool insert_last_key)983 int _ma_split_page(MARIA_HA *info, MARIA_KEY *key, MARIA_PAGE *split_page,
984                    uint org_split_length,
985                    uchar *inserted_key_pos, uint changed_length,
986                    int move_length,
987                    uchar *key_buff, my_bool insert_last_key)
988 {
989   uint keynr;
990   uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
991   uint page_length, split_length, page_flag;
992   uchar *key_pos, *pos, *UNINIT_VAR(after_key);
993   MARIA_KEY_PARAM s_temp;
994   MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
995   MARIA_SHARE *share= info->s;
996   MARIA_KEYDEF *keyinfo= key->keyinfo;
997   MARIA_KEY tmp_key;
998   MARIA_PAGE new_page;
999   int res;
1000   DBUG_ENTER("_ma_split_page");
1001 
1002   DBUG_DUMP("buff", split_page->buff, split_page->size);
1003 
1004   info->page_changed=1;			/* Info->buff is used */
1005   info->keyread_buff_used=1;
1006   page_flag= split_page->flag;
1007   nod_flag=  split_page->node;
1008   key_ref_length= share->keypage_header + nod_flag;
1009 
1010   new_page.info= info;
1011   new_page.buff= info->buff;
1012   new_page.keyinfo= keyinfo;
1013 
1014   tmp_key.data=   key_buff;
1015   tmp_key.keyinfo= keyinfo;
1016   if (insert_last_key)
1017     key_pos= _ma_find_last_pos(&tmp_key, split_page, &after_key);
1018   else
1019     key_pos= _ma_find_half_pos(&tmp_key, split_page, &after_key);
1020   if (!key_pos)
1021     DBUG_RETURN(-1);
1022 
1023   key_length= tmp_key.data_length + tmp_key.ref_length;
1024   split_length= (uint) (key_pos - split_page->buff);
1025   a_length= split_page->size;
1026   split_page->size= split_length;
1027   page_store_size(share, split_page);
1028 
1029   key_pos=after_key;
1030   if (nod_flag)
1031   {
1032     DBUG_PRINT("test",("Splitting nod"));
1033     pos=key_pos-nod_flag;
1034     memcpy(new_page.buff + share->keypage_header, pos, (size_t) nod_flag);
1035   }
1036 
1037   /* Move middle item to key and pointer to new page */
1038   if ((new_page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
1039       HA_OFFSET_ERROR)
1040     DBUG_RETURN(-1);
1041 
1042   _ma_copy_key(key, &tmp_key);
1043   _ma_kpointer(info, key->data + key_length, new_page.pos);
1044 
1045   /* Store new page */
1046   if (!(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &key_pos))
1047     DBUG_RETURN(-1);
1048 
1049   t_length=(*keyinfo->pack_key)(&tmp_key, nod_flag, (uchar *) 0,
1050 				(uchar*) 0, (uchar*) 0, &s_temp);
1051   length=(uint) ((split_page->buff + a_length) - key_pos);
1052   memcpy(new_page.buff + key_ref_length + t_length, key_pos,
1053 	 (size_t) length);
1054   (*keyinfo->store_key)(keyinfo,new_page.buff+key_ref_length,&s_temp);
1055   page_length= length + t_length + key_ref_length;
1056 
1057   bzero(new_page.buff, share->keypage_header);
1058   /* Copy KEYFLAG_FLAG_ISNODE and KEYPAGE_FLAG_HAS_TRANSID from parent page */
1059   new_page.flag= page_flag;
1060   new_page.size= page_length;
1061   page_store_info(share, &new_page);
1062 
1063   /* Copy key number */
1064   keynr= _ma_get_keynr(share, split_page->buff);
1065   _ma_store_keynr(share, new_page.buff, keynr);
1066 
1067   res= 2;                                       /* Middle key up */
1068   if (share->now_transactional && _ma_log_new(&new_page, 0))
1069     res= -1;
1070 
1071   /*
1072     Clear unitialized part of page to avoid valgrind/purify warnings
1073     and to get a clean page that is easier to compress and compare with
1074     pages generated with redo
1075   */
1076   bzero(new_page.buff + page_length, share->block_size - page_length);
1077 
1078   if (_ma_write_keypage(&new_page, page_link->write_lock,
1079                         DFLT_INIT_HITS))
1080     res= -1;
1081 
1082   /* Save changes to split pages */
1083   if (share->now_transactional &&
1084       _ma_log_split(split_page, org_split_length, split_length,
1085                     inserted_key_pos, changed_length, move_length,
1086                     KEY_OP_NONE, (uchar*) 0, 0, 0))
1087     res= -1;
1088 
1089   DBUG_DUMP_KEY("middle_key", key);
1090   DBUG_RETURN(res);
1091 } /* _ma_split_page */
1092 
1093 
1094 /*
1095   Calculate how to much to move to split a page in two
1096 
1097   Returns pointer to start of key.
1098   key will contain the key.
1099   after_key will contain the position to where the next key starts
1100 */
1101 
_ma_find_half_pos(MARIA_KEY * key,MARIA_PAGE * ma_page,uchar ** after_key)1102 uchar *_ma_find_half_pos(MARIA_KEY *key, MARIA_PAGE *ma_page,
1103                          uchar **after_key)
1104 {
1105   uint keys, length, key_ref_length, page_flag, nod_flag;
1106   uchar *page, *end, *lastpos;
1107   MARIA_HA *info= ma_page->info;
1108   MARIA_SHARE *share= info->s;
1109   MARIA_KEYDEF *keyinfo= key->keyinfo;
1110   DBUG_ENTER("_ma_find_half_pos");
1111 
1112   nod_flag= ma_page->node;
1113   key_ref_length= share->keypage_header + nod_flag;
1114   page_flag= ma_page->flag;
1115   length=    ma_page->size - key_ref_length;
1116   page=      ma_page->buff+ key_ref_length;        /* Point to first key */
1117 
1118   if (!(keyinfo->flag &
1119 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1120 	 HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1121   {
1122     key_ref_length=   keyinfo->keylength+nod_flag;
1123     key->data_length= keyinfo->keylength - info->s->rec_reflength;
1124     key->ref_length=  info->s->rec_reflength;
1125     key->flag= 0;
1126     keys=length/(key_ref_length*2);
1127     end=page+keys*key_ref_length;
1128     *after_key=end+key_ref_length;
1129     memcpy(key->data, end, key_ref_length);
1130     DBUG_RETURN(end);
1131   }
1132 
1133   end=page+length/2-key_ref_length;		/* This is aprox. half */
1134   key->data[0]= 0;                               /* Safety */
1135   do
1136   {
1137     lastpos=page;
1138     if (!(length= (*keyinfo->get_key)(key, page_flag, nod_flag, &page)))
1139       DBUG_RETURN(0);
1140   } while (page < end);
1141   *after_key= page;
1142   DBUG_PRINT("exit",("returns: %p  page: %p  half: %p",
1143                      lastpos, page, end));
1144   DBUG_RETURN(lastpos);
1145 } /* _ma_find_half_pos */
1146 
1147 
1148 /**
1149   Find second to last key on leaf page
1150 
1151   @notes
1152   Used to split buffer at last key.  In this case the next to last
1153   key will be moved to parent page and last key will be on it's own page.
1154 
1155   @TODO
1156   Add one argument for 'last key value' to get_key so that one can
1157   do the loop without having to copy the found key the whole time
1158 
1159   @return
1160   @retval Pointer to the start of the key before the last key
1161   @retval int_key will contain the last key
1162 */
1163 
_ma_find_last_pos(MARIA_KEY * int_key,MARIA_PAGE * ma_page,uchar ** after_key)1164 static uchar *_ma_find_last_pos(MARIA_KEY *int_key, MARIA_PAGE *ma_page,
1165                                 uchar **after_key)
1166 {
1167   uint keys, length, key_ref_length, page_flag;
1168   uchar *page, *end, *lastpos, *prevpos;
1169   uchar key_buff[MARIA_MAX_KEY_BUFF];
1170   MARIA_HA *info= ma_page->info;
1171   MARIA_SHARE *share= info->s;
1172   MARIA_KEYDEF *keyinfo= int_key->keyinfo;
1173   MARIA_KEY tmp_key;
1174   DBUG_ENTER("_ma_find_last_pos");
1175 
1176   key_ref_length= share->keypage_header;
1177   page_flag= ma_page->flag;
1178   length= ma_page->size - key_ref_length;
1179   page=   ma_page->buff + key_ref_length;
1180 
1181   if (!(keyinfo->flag &
1182 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1183 	 HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1184   {
1185     keys= length / keyinfo->keylength - 2;
1186     length= keyinfo->keylength;
1187     int_key->data_length= length - info->s->rec_reflength;
1188     int_key->ref_length=  info->s->rec_reflength;
1189     int_key->flag= 0;
1190     end=page+keys*length;
1191     *after_key=end+length;
1192     memcpy(int_key->data, end, length);
1193     DBUG_RETURN(end);
1194   }
1195 
1196   end=page+length-key_ref_length;
1197   lastpos=page;
1198   tmp_key.data= key_buff;
1199   tmp_key.keyinfo= int_key->keyinfo;
1200   key_buff[0]= 0;                               /* Safety */
1201 
1202   /* We know that there are at least 2 keys on the page */
1203 
1204   if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
1205   {
1206     _ma_set_fatal_error(share, HA_ERR_CRASHED);
1207     DBUG_RETURN(0);
1208   }
1209 
1210   do
1211   {
1212     prevpos=lastpos; lastpos=page;
1213     int_key->data_length= tmp_key.data_length;
1214     int_key->ref_length=  tmp_key.ref_length;
1215     int_key->flag=        tmp_key.flag;
1216     memcpy(int_key->data, key_buff, length);		/* previous key */
1217     if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
1218     {
1219       _ma_set_fatal_error(share, HA_ERR_CRASHED);
1220       DBUG_RETURN(0);
1221     }
1222   } while (page < end);
1223 
1224   *after_key=lastpos;
1225   DBUG_PRINT("exit",("returns: %p  page: %p  end: %p",
1226                      prevpos,page,end));
1227   DBUG_RETURN(prevpos);
1228 } /* _ma_find_last_pos */
1229 
1230 
1231 /**
1232   @brief Balance page with static size keys with page on right/left
1233 
1234   @param key 	Middle key will be stored here
1235 
1236   @notes
1237     Father_buff will always be changed
1238     Caller must handle saving of curr_buff
1239 
1240   @return
1241   @retval  0   Balance was done (father buff is saved)
1242   @retval  1   Middle key up    (father buff is not saved)
1243   @retval  -1  Error
1244 */
1245 
_ma_balance_page(MARIA_HA * info,MARIA_KEYDEF * keyinfo,MARIA_KEY * key,MARIA_PAGE * curr_page,MARIA_PAGE * father_page,uchar * father_key_pos,MARIA_KEY_PARAM * s_temp)1246 static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
1247 			    MARIA_KEY *key, MARIA_PAGE *curr_page,
1248                             MARIA_PAGE *father_page,
1249                             uchar *father_key_pos, MARIA_KEY_PARAM *s_temp)
1250 {
1251   MARIA_PINNED_PAGE tmp_page_link, *new_page_link= &tmp_page_link;
1252   MARIA_SHARE *share= info->s;
1253   my_bool right, buff_alloced;
1254   uint k_length,father_length,father_keylength,nod_flag,curr_keylength;
1255   uint right_length,left_length,new_right_length,new_left_length,extra_length;
1256   uint keys, tmp_length, extra_buff_length;
1257   uchar *pos, *extra_buff, *parting_key;
1258   uchar *tmp_part_key;
1259   MARIA_PAGE next_page, extra_page, *left_page, *right_page;
1260   DBUG_ENTER("_ma_balance_page");
1261 
1262   alloc_on_stack(*info->stack_end_ptr, tmp_part_key, buff_alloced,
1263                  keyinfo->max_store_length);
1264   if (!tmp_part_key)
1265     DBUG_RETURN(-1);
1266 
1267   k_length= keyinfo->keylength;
1268   father_length= father_page->size;
1269   father_keylength= k_length + share->base.key_reflength;
1270   nod_flag= curr_page->node;
1271   curr_keylength= k_length+nod_flag;
1272   info->page_changed=1;
1273 
1274   if ((father_key_pos != father_page->buff+father_length &&
1275        (info->state->records & 1)) ||
1276       father_key_pos == father_page->buff+ share->keypage_header +
1277       share->base.key_reflength)
1278   {
1279     right=1;
1280     next_page.pos= _ma_kpos(share->base.key_reflength,
1281                             father_key_pos+father_keylength);
1282     left_page=  curr_page;
1283     right_page= &next_page;
1284     DBUG_PRINT("info", ("use right page: %lu",
1285                         (ulong) (next_page.pos / keyinfo->block_length)));
1286   }
1287   else
1288   {
1289     right=0;
1290     father_key_pos-=father_keylength;
1291     next_page.pos= _ma_kpos(share->base.key_reflength,father_key_pos);
1292     left_page=  &next_page;
1293     right_page= curr_page;
1294     DBUG_PRINT("info", ("use left page: %lu",
1295                         (ulong) (next_page.pos / keyinfo->block_length)));
1296   }					/* father_key_pos ptr to parting key */
1297 
1298   if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1299                         PAGECACHE_LOCK_WRITE,
1300                         DFLT_INIT_HITS, info->buff, 0))
1301     goto err;
1302   page_mark_changed(info, &next_page);
1303   DBUG_DUMP("next", next_page.buff, next_page.size);
1304 
1305   /* Test if there is room to share keys */
1306   left_length= left_page->size;
1307   right_length= right_page->size;
1308   keys= ((left_length+right_length-share->keypage_header*2-nod_flag*2)/
1309          curr_keylength);
1310 
1311   if ((right ? right_length : left_length) + curr_keylength <=
1312       share->max_index_block_size)
1313   {
1314     /* Enough space to hold all keys in the two buffers ; Balance bufferts */
1315     new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength;
1316     new_right_length=share->keypage_header+nod_flag+(((keys+1)/2)*
1317                                                        curr_keylength);
1318     left_page->size=  new_left_length;
1319     page_store_size(share, left_page);
1320     right_page->size= new_right_length;
1321     page_store_size(share, right_page);
1322 
1323     DBUG_PRINT("info", ("left_length: %u -> %u  right_length: %u -> %u",
1324                         left_length, new_left_length,
1325                         right_length, new_right_length));
1326     if (left_length < new_left_length)
1327     {
1328       uint length;
1329       DBUG_PRINT("info", ("move keys to end of buff"));
1330 
1331       /* Move keys right_page -> left_page */
1332       pos= left_page->buff+left_length;
1333       memcpy(pos,father_key_pos, (size_t) k_length);
1334       memcpy(pos+k_length, right_page->buff + share->keypage_header,
1335 	     (size_t) (length=new_left_length - left_length - k_length));
1336       pos= right_page->buff + share->keypage_header + length;
1337       memcpy(father_key_pos, pos, (size_t) k_length);
1338       bmove(right_page->buff + share->keypage_header,
1339             pos + k_length, new_right_length - share->keypage_header);
1340 
1341       if (share->now_transactional)
1342       {
1343         if (right)
1344         {
1345           /*
1346             Log changes to page on left
1347             The original page is on the left and stored in left_page->buff
1348             We have on the page the newly inserted key and data
1349             from buff added last on the page
1350           */
1351           if (_ma_log_split(curr_page,
1352                             left_length - s_temp->move_length,
1353                             new_left_length,
1354                             s_temp->key_pos, s_temp->changed_length,
1355                             s_temp->move_length,
1356                             KEY_OP_ADD_SUFFIX,
1357                             curr_page->buff + left_length,
1358                             new_left_length - left_length,
1359                             new_left_length - left_length+ k_length))
1360             goto err;
1361           /*
1362             Log changes to page on right
1363             This contains the original data with some keys deleted from
1364             start of page
1365           */
1366           if (_ma_log_prefix(&next_page, 0,
1367                              ((int) new_right_length - (int) right_length),
1368                              KEY_OP_DEBUG_LOG_PREFIX_3))
1369             goto err;
1370         }
1371         else
1372         {
1373           /*
1374             Log changes to page on right (the original page) which is in buff
1375             Data is removed from start of page
1376             The inserted key may be in buff or moved to curr_buff
1377           */
1378           if (_ma_log_del_prefix(curr_page,
1379                                  right_length - s_temp->changed_length,
1380                                  new_right_length,
1381                                  s_temp->key_pos, s_temp->changed_length,
1382                                  s_temp->move_length))
1383             goto err;
1384           /*
1385             Log changes to page on left, which has new data added last
1386           */
1387           if (_ma_log_suffix(&next_page, left_length, new_left_length))
1388             goto err;
1389         }
1390       }
1391     }
1392     else
1393     {
1394       uint length;
1395       DBUG_PRINT("info", ("move keys to start of right_page"));
1396 
1397       bmove_upp(right_page->buff + new_right_length,
1398                 right_page->buff + right_length,
1399 		right_length - share->keypage_header);
1400       length= new_right_length -right_length - k_length;
1401       memcpy(right_page->buff + share->keypage_header + length, father_key_pos,
1402              (size_t) k_length);
1403       pos= left_page->buff + new_left_length;
1404       memcpy(father_key_pos, pos, (size_t) k_length);
1405       memcpy(right_page->buff + share->keypage_header, pos+k_length,
1406              (size_t) length);
1407 
1408       if (share->now_transactional)
1409       {
1410         if (right)
1411         {
1412           /*
1413             Log changes to page on left
1414             The original page is on the left and stored in curr_buff
1415             The page is shortened from end and the key may be on the page
1416           */
1417           if (_ma_log_split(curr_page,
1418                             left_length - s_temp->move_length,
1419                             new_left_length,
1420                             s_temp->key_pos, s_temp->changed_length,
1421                             s_temp->move_length,
1422                             KEY_OP_NONE, (uchar*) 0, 0, 0))
1423             goto err;
1424           /*
1425             Log changes to page on right
1426             This contains the original data, with some data from cur_buff
1427             added first
1428           */
1429           if (_ma_log_prefix(&next_page,
1430                              (uint) (new_right_length - right_length),
1431                              (int) (new_right_length - right_length),
1432                              KEY_OP_DEBUG_LOG_PREFIX_4))
1433             goto err;
1434         }
1435         else
1436         {
1437           /*
1438             Log changes to page on right (the original page) which is in buff
1439             We have on the page the newly inserted key and data
1440             from buff added first on the page
1441           */
1442           uint diff_length= new_right_length - right_length;
1443           if (_ma_log_split(curr_page,
1444                             left_length - s_temp->move_length,
1445                             new_right_length,
1446                             s_temp->key_pos + diff_length,
1447                             s_temp->changed_length,
1448                             s_temp->move_length,
1449                             KEY_OP_ADD_PREFIX,
1450                             curr_page->buff + share->keypage_header,
1451                             diff_length, diff_length + k_length))
1452             goto err;
1453           /*
1454             Log changes to page on left, which is shortened from end
1455           */
1456           if (_ma_log_suffix(&next_page, left_length, new_left_length))
1457             goto err;
1458         }
1459       }
1460     }
1461 
1462     /* Log changes to father (one level up) page */
1463 
1464     if (share->now_transactional &&
1465         _ma_log_change(father_page, father_key_pos, k_length,
1466                        KEY_OP_DEBUG_FATHER_CHANGED_1))
1467       goto err;
1468 
1469     /*
1470       next_page_link->changed is marked as true above and fathers
1471       page_link->changed is marked as true in caller
1472     */
1473     if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
1474                           DFLT_INIT_HITS) ||
1475         _ma_write_keypage(father_page,
1476                           PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1477       goto err;
1478     stack_alloc_free(tmp_part_key, buff_alloced);
1479     DBUG_RETURN(0);
1480   }
1481 
1482   /* left_page and right_page are full, lets split and make new nod */
1483 
1484   extra_buff= info->buff+share->base.max_key_block_length;
1485   new_left_length= new_right_length= (share->keypage_header + nod_flag +
1486                                       (keys+1) / 3 * curr_keylength);
1487   extra_page.info=    info;
1488   extra_page.keyinfo= keyinfo;
1489   extra_page.buff=    extra_buff;
1490 
1491   /*
1492     5 is the minum number of keys we can have here. This comes from
1493     the fact that each full page can store at least 2 keys and in this case
1494     we have a 'split' key, ie 2+2+1 = 5
1495   */
1496   if (keys == 5)				/* Too few keys to balance */
1497     new_left_length-=curr_keylength;
1498   extra_length= (nod_flag + left_length + right_length -
1499                  new_left_length - new_right_length - curr_keylength);
1500   extra_buff_length= extra_length + share->keypage_header;
1501   DBUG_PRINT("info",("left_length: %d  right_length: %d  new_left_length: %d  new_right_length: %d  extra_length: %d",
1502                      left_length, right_length,
1503                      new_left_length, new_right_length,
1504                      extra_length));
1505 
1506   left_page->size= new_left_length;
1507   page_store_size(share, left_page);
1508   right_page->size= new_right_length;
1509   page_store_size(share, right_page);
1510 
1511   bzero(extra_buff, share->keypage_header);
1512   extra_page.flag= nod_flag ? KEYPAGE_FLAG_ISNOD : 0;
1513   extra_page.size= extra_buff_length;
1514   page_store_info(share, &extra_page);
1515 
1516   /* Copy key number */
1517   _ma_store_keynr(share, extra_buff, keyinfo->key_nr);
1518 
1519   /* move first largest keys to new page  */
1520   pos= right_page->buff + right_length-extra_length;
1521   memcpy(extra_buff + share->keypage_header, pos, extra_length);
1522   /* Zero old data from buffer */
1523   bzero(extra_buff + extra_buff_length,
1524         share->block_size - extra_buff_length);
1525 
1526   /* Save new parting key between buff and extra_buff */
1527   memcpy(tmp_part_key, pos-k_length,k_length);
1528   /* Make place for new keys */
1529   bmove_upp(right_page->buff + new_right_length, pos - k_length,
1530             right_length - extra_length - k_length - share->keypage_header);
1531   /* Copy keys from left page */
1532   pos= left_page->buff + new_left_length;
1533   memcpy(right_page->buff + share->keypage_header, pos + k_length,
1534          (size_t) (tmp_length= left_length - new_left_length - k_length));
1535   /* Copy old parting key */
1536   parting_key= right_page->buff + share->keypage_header + tmp_length;
1537   memcpy(parting_key, father_key_pos, (size_t) k_length);
1538 
1539   /* Move new parting keys up to caller */
1540   memcpy((right ? key->data : father_key_pos),pos,(size_t) k_length);
1541   memcpy((right ? father_key_pos : key->data),tmp_part_key, k_length);
1542 
1543   if ((extra_page.pos= _ma_new(info, DFLT_INIT_HITS, &new_page_link))
1544       == HA_OFFSET_ERROR)
1545     goto err;
1546   _ma_kpointer(info,key->data+k_length, extra_page.pos);
1547   /* This is safe as long we are using not keys with transid */
1548   key->data_length= k_length - info->s->rec_reflength;
1549   key->ref_length= info->s->rec_reflength;
1550 
1551   if (right)
1552   {
1553     /*
1554       Page order according to key values:
1555       orignal_page (curr_page = left_page), next_page (buff), extra_buff
1556 
1557       Move page positions so that we store data in extra_page where
1558       next_page was and next_page will be stored at the new position
1559     */
1560     swap_variables(my_off_t, extra_page.pos, next_page.pos);
1561   }
1562 
1563   if (share->now_transactional)
1564   {
1565     if (right)
1566     {
1567       /*
1568         left_page is shortened,
1569         right_page is getting new keys at start and shortened from end.
1570         extra_page is new page
1571 
1572         Note that extra_page (largest key parts) will be stored at the
1573         place of the original 'right' page (next_page) and right page
1574         will be stored at the new page position
1575 
1576         This makes the log entries smaller as right_page contains all
1577         data to generate the data extra_buff
1578       */
1579 
1580       /*
1581         Log changes to page on left (page shortened page at end)
1582       */
1583       if (_ma_log_split(curr_page,
1584                         left_length - s_temp->move_length, new_left_length,
1585                         s_temp->key_pos, s_temp->changed_length,
1586                         s_temp->move_length,
1587                         KEY_OP_NONE, (uchar*) 0, 0, 0))
1588         goto err;
1589       /*
1590         Log changes to right page (stored at next page)
1591         This contains the last 'extra_buff' from 'buff'
1592       */
1593       if (_ma_log_prefix(&extra_page,
1594                          0, (int) (extra_buff_length - right_length),
1595                          KEY_OP_DEBUG_LOG_PREFIX_5))
1596         goto err;
1597 
1598       /*
1599         Log changes to middle page, which is stored at the new page
1600         position
1601       */
1602       if (_ma_log_new(&next_page, 0))
1603         goto err;
1604     }
1605     else
1606     {
1607       /*
1608         Log changes to page on right (the original page) which is in buff
1609         This contains the original data, with some data from curr_buff
1610         added first and shortened at end
1611       */
1612       int data_added_first= left_length - new_left_length;
1613       if (_ma_log_key_middle(right_page,
1614                              new_right_length,
1615                              data_added_first,
1616                              data_added_first,
1617                              extra_length,
1618                              s_temp->key_pos,
1619                              s_temp->changed_length,
1620                              s_temp->move_length))
1621         goto err;
1622 
1623       /* Log changes to page on left, which is shortened from end */
1624       if (_ma_log_suffix(left_page, left_length, new_left_length))
1625         goto err;
1626 
1627       /* Log change to rightmost (new) page */
1628       if (_ma_log_new(&extra_page, 0))
1629         goto err;
1630     }
1631 
1632     /* Log changes to father (one level up) page */
1633     if (share->now_transactional &&
1634         _ma_log_change(father_page, father_key_pos, k_length,
1635                        KEY_OP_DEBUG_FATHER_CHANGED_2))
1636       goto err;
1637   }
1638 
1639   if (_ma_write_keypage(&next_page,
1640                         (right ? new_page_link->write_lock :
1641                          PAGECACHE_LOCK_LEFT_WRITELOCKED),
1642                         DFLT_INIT_HITS) ||
1643       _ma_write_keypage(&extra_page,
1644                         (!right ? new_page_link->write_lock :
1645                          PAGECACHE_LOCK_LEFT_WRITELOCKED),
1646                         DFLT_INIT_HITS))
1647     goto err;
1648 
1649   stack_alloc_free(tmp_part_key, buff_alloced);
1650   DBUG_RETURN(1);				/* Middle key up */
1651 
1652 err:
1653   stack_alloc_free(tmp_part_key, buff_alloced);
1654   DBUG_RETURN(-1);
1655 } /* _ma_balance_page */
1656 
1657 
1658 /**********************************************************************
1659  *                Bulk insert code                                    *
1660  **********************************************************************/
1661 
1662 typedef struct {
1663   MARIA_HA *info;
1664   uint keynr;
1665 } bulk_insert_param;
1666 
1667 
_ma_ck_write_tree(register MARIA_HA * info,MARIA_KEY * key)1668 static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key)
1669 {
1670   my_bool error;
1671   uint keynr= key->keyinfo->key_nr;
1672   DBUG_ENTER("_ma_ck_write_tree");
1673 
1674   /* Store ref_length as this is always constant */
1675   info->bulk_insert_ref_length= key->ref_length;
1676   error= tree_insert(&info->bulk_insert[keynr], key->data,
1677                      key->data_length + key->ref_length,
1678                      info->bulk_insert[keynr].custom_arg) == 0;
1679   DBUG_RETURN(error);
1680 } /* _ma_ck_write_tree */
1681 
1682 
1683 /* typeof(_ma_keys_compare)=qsort_cmp2 */
1684 
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)1685 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
1686 {
1687   uint not_used[2];
1688   return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
1689                     key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
1690                     not_used);
1691 }
1692 
1693 
keys_free(void * key_arg,TREE_FREE mode,void * param_arg)1694 static int keys_free(void* key_arg, TREE_FREE mode, void *param_arg)
1695 {
1696   /*
1697     Probably I can use info->lastkey here, but I'm not sure,
1698     and to be safe I'd better use local lastkey.
1699   */
1700   bulk_insert_param *param= (bulk_insert_param*)param_arg;
1701   MARIA_SHARE *share= param->info->s;
1702   uchar lastkey[MARIA_MAX_KEY_BUFF], *key= (uchar*)key_arg;
1703   uint keylen;
1704   MARIA_KEYDEF *keyinfo= share->keyinfo + param->keynr;
1705   MARIA_KEY tmp_key;
1706 
1707   switch (mode) {
1708   case free_init:
1709     if (share->lock_key_trees)
1710     {
1711       mysql_rwlock_wrlock(&keyinfo->root_lock);
1712       keyinfo->version++;
1713     }
1714     return 0;
1715   case free_free:
1716     /* Note: keylen doesn't contain transid lengths */
1717     keylen= _ma_keylength(keyinfo, key);
1718     tmp_key.data=        lastkey;
1719     tmp_key.keyinfo=     keyinfo;
1720     tmp_key.data_length= keylen - share->rec_reflength;
1721     tmp_key.ref_length=  param->info->bulk_insert_ref_length;
1722     tmp_key.flag= (param->info->bulk_insert_ref_length ==
1723                    share->rec_reflength ? 0 : SEARCH_USER_KEY_HAS_TRANSID);
1724     /*
1725       We have to copy key as ma_ck_write_btree may need the buffer for
1726       copying middle key up if tree is growing
1727     */
1728     memcpy(lastkey, key, tmp_key.data_length + tmp_key.ref_length);
1729     _ma_ck_write_btree(param->info, &tmp_key);
1730     return 0;
1731   case free_end:
1732     if (share->lock_key_trees)
1733       mysql_rwlock_unlock(&keyinfo->root_lock);
1734     return 0;
1735   }
1736   return 0;
1737 }
1738 
1739 
maria_init_bulk_insert(MARIA_HA * info,size_t cache_size,ha_rows rows)1740 int maria_init_bulk_insert(MARIA_HA *info, size_t cache_size, ha_rows rows)
1741 {
1742   MARIA_SHARE *share= info->s;
1743   MARIA_KEYDEF *key=share->keyinfo;
1744   bulk_insert_param *params;
1745   uint i, num_keys, total_keylength;
1746   ulonglong key_map;
1747   DBUG_ENTER("_ma_init_bulk_insert");
1748   DBUG_PRINT("enter",("cache_size: %lu", (ulong) cache_size));
1749 
1750   DBUG_ASSERT(!info->bulk_insert &&
1751 	      (!rows || rows >= MARIA_MIN_ROWS_TO_USE_BULK_INSERT));
1752 
1753   maria_clear_all_keys_active(key_map);
1754   for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
1755   {
1756     if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
1757         maria_is_key_active(share->state.key_map, i))
1758     {
1759       num_keys++;
1760       maria_set_key_active(key_map, i);
1761       total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
1762     }
1763   }
1764 
1765   if (num_keys==0 ||
1766       num_keys * (size_t) MARIA_MIN_SIZE_BULK_INSERT_TREE > cache_size)
1767     DBUG_RETURN(0);
1768 
1769   if (rows && rows*total_keylength < cache_size)
1770     cache_size= (size_t)rows;
1771   else
1772     cache_size/=total_keylength*16;
1773 
1774   info->bulk_insert=(TREE *)
1775     my_malloc(PSI_INSTRUMENT_ME, (sizeof(TREE)*share->base.keys+
1776                sizeof(bulk_insert_param)*num_keys),MYF(0));
1777 
1778   if (!info->bulk_insert)
1779     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1780 
1781   params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1782   for (i=0 ; i < share->base.keys ; i++)
1783   {
1784     if (maria_is_key_active(key_map, i))
1785     {
1786       params->info=info;
1787       params->keynr=i;
1788       /* Only allocate a 16'th of the buffer at a time */
1789       init_tree(&info->bulk_insert[i],
1790                 cache_size * key[i].maxlength,
1791                 cache_size * key[i].maxlength, 0,
1792                 (qsort_cmp2) keys_compare, keys_free, (void *)params++, MYF(0));
1793     }
1794     else
1795      info->bulk_insert[i].root=0;
1796   }
1797 
1798   DBUG_RETURN(0);
1799 }
1800 
maria_flush_bulk_insert(MARIA_HA * info,uint inx)1801 void maria_flush_bulk_insert(MARIA_HA *info, uint inx)
1802 {
1803   if (info->bulk_insert)
1804   {
1805     if (is_tree_inited(&info->bulk_insert[inx]))
1806       reset_tree(&info->bulk_insert[inx]);
1807   }
1808 }
1809 
1810 
maria_end_bulk_insert(MARIA_HA * info,my_bool abort)1811 int maria_end_bulk_insert(MARIA_HA *info, my_bool abort)
1812 {
1813   int first_error= 0;
1814   DBUG_ENTER("maria_end_bulk_insert");
1815   if (info->bulk_insert)
1816   {
1817     uint i;
1818     for (i=0 ; i < info->s->base.keys ; i++)
1819     {
1820       if (is_tree_inited(&info->bulk_insert[i]))
1821       {
1822         int error;
1823         if (info->s->deleting)
1824           reset_free_element(&info->bulk_insert[i]);
1825         if ((error= delete_tree(&info->bulk_insert[i], abort)))
1826         {
1827           first_error= first_error ? first_error : error;
1828           abort= 1;
1829         }
1830       }
1831     }
1832     my_free(info->bulk_insert);
1833     info->bulk_insert= 0;
1834   }
1835   DBUG_RETURN(first_error);
1836 }
1837 
1838 
1839 /****************************************************************************
1840   Dedicated functions that generate log entries
1841 ****************************************************************************/
1842 
1843 
_ma_write_undo_key_insert(MARIA_HA * info,const MARIA_KEY * key,my_off_t * root,my_off_t new_root,LSN * res_lsn)1844 int _ma_write_undo_key_insert(MARIA_HA *info, const MARIA_KEY *key,
1845                               my_off_t *root, my_off_t new_root, LSN *res_lsn)
1846 {
1847   MARIA_SHARE *share= info->s;
1848   MARIA_KEYDEF *keyinfo= key->keyinfo;
1849   uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1850                  KEY_NR_STORE_SIZE];
1851   const uchar *key_value;
1852   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1853   struct st_msg_to_write_hook_for_undo_key msg;
1854   uint key_length;
1855 
1856   /* Save if we need to write a clr record */
1857   lsn_store(log_data, info->trn->undo_lsn);
1858   key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
1859                keyinfo->key_nr);
1860   key_length= key->data_length + key->ref_length;
1861   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
1862   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1863   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key->data;
1864   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
1865 
1866   msg.root= root;
1867   msg.value= new_root;
1868   msg.auto_increment= 0;
1869   key_value= key->data;
1870   if (share->base.auto_key == ((uint) keyinfo->key_nr + 1))
1871   {
1872     const HA_KEYSEG *keyseg= keyinfo->seg;
1873     uchar reversed[MARIA_MAX_KEY_BUFF];
1874     if (keyseg->flag & HA_SWAP_KEY)
1875     {
1876       /* We put key from log record to "data record" packing format... */
1877       const uchar *key_ptr= key->data, *key_end= key->data + keyseg->length;
1878       uchar *to= reversed + keyseg->length;
1879       do
1880       {
1881         *--to= *key_ptr++;
1882       } while (key_ptr != key_end);
1883       key_value= to;
1884     }
1885     /* ... so that we can read it with: */
1886     msg.auto_increment=
1887       ma_retrieve_auto_increment(key_value, keyseg->type);
1888     /* and write_hook_for_undo_key_insert() will pick this. */
1889   }
1890 
1891   return translog_write_record(res_lsn, LOGREC_UNDO_KEY_INSERT,
1892                                info->trn, info,
1893                                (translog_size_t)
1894                                log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1895                                key_length,
1896                                TRANSLOG_INTERNAL_PARTS + 2, log_array,
1897                                log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1898 }
1899 
1900 
1901 /**
1902   @brief Log creation of new page
1903 
1904   @note
1905     We don't have to store the page_length into the log entry as we can
1906     calculate this from the length of the log entry
1907 
1908   @retval 1   error
1909   @retval 0    ok
1910 */
1911 
_ma_log_new(MARIA_PAGE * ma_page,my_bool root_page)1912 my_bool _ma_log_new(MARIA_PAGE *ma_page, my_bool root_page)
1913 {
1914   LSN lsn;
1915   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE
1916                  +1];
1917   uint page_length;
1918   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1919   MARIA_HA *info= ma_page->info;
1920   MARIA_SHARE *share= info->s;
1921   my_off_t page= ma_page->pos / share->block_size;
1922   DBUG_ENTER("_ma_log_new");
1923   DBUG_PRINT("enter", ("page: %lu", (ulong) page));
1924 
1925   DBUG_ASSERT(share->now_transactional);
1926 
1927   /* Store address of new root page */
1928   page_store(log_data + FILEID_STORE_SIZE, page);
1929 
1930   /* Store link to next unused page */
1931   if (info->key_del_used == 2)
1932     page= 0;                                    /* key_del not changed */
1933   else
1934     page= ((share->key_del_current == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1935            share->key_del_current / share->block_size);
1936 
1937   page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
1938   key_nr_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE*2,
1939                ma_page->keyinfo->key_nr);
1940   log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE*2 + KEY_NR_STORE_SIZE]=
1941     (uchar) root_page;
1942 
1943   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
1944   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1945 
1946   page_length= ma_page->size - LSN_STORE_SIZE;
1947   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=   ma_page->buff + LSN_STORE_SIZE;
1948   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length;
1949 
1950   /* Remember new page length for future log entires for same page */
1951   ma_page->org_size= ma_page->size;
1952 
1953   if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE,
1954                             info->trn, info,
1955                             (translog_size_t)
1956                             (sizeof(log_data) + page_length),
1957                             TRANSLOG_INTERNAL_PARTS + 2, log_array,
1958                             log_data, NULL))
1959     DBUG_RETURN(1);
1960   DBUG_RETURN(0);
1961 }
1962 
1963 
1964 /**
1965    @brief
1966    Log when some part of the key page changes
1967 */
1968 
_ma_log_change(MARIA_PAGE * ma_page,const uchar * key_pos,uint length,enum en_key_debug debug_marker)1969 my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length,
1970                        enum en_key_debug debug_marker __attribute__((unused)))
1971 {
1972   LSN lsn;
1973   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 6 + 7], *log_pos;
1974   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
1975   uint offset= (uint) (key_pos - ma_page->buff), translog_parts;
1976   MARIA_HA *info= ma_page->info;
1977   my_off_t page= ma_page->pos / info->s->block_size;
1978   DBUG_ENTER("_ma_log_change");
1979   DBUG_PRINT("enter", ("page: %lu  length: %u", (ulong) page, length));
1980 
1981   DBUG_ASSERT(info->s->now_transactional);
1982   DBUG_ASSERT(offset + length <= ma_page->size);
1983   DBUG_ASSERT(ma_page->org_size == ma_page->size);
1984 
1985   /* Store address of new root page */
1986   page= ma_page->pos / info->s->block_size;
1987   page_store(log_data + FILEID_STORE_SIZE, page);
1988   log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1989 
1990 #ifdef EXTRA_DEBUG_KEY_CHANGES
1991   (*log_pos++)= KEY_OP_DEBUG;
1992   (*log_pos++)= debug_marker;
1993 #endif
1994 
1995   log_pos[0]= KEY_OP_OFFSET;
1996   int2store(log_pos+1, offset);
1997   log_pos[3]= KEY_OP_CHANGE;
1998   int2store(log_pos+4, length);
1999   log_pos+= 6;
2000 
2001   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
2002   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (log_pos - log_data);
2003   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
2004   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
2005   translog_parts= 2;
2006 
2007   _ma_log_key_changes(ma_page,
2008                       log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2009                       log_pos, &length, &translog_parts);
2010 
2011   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
2012                             info->trn, info,
2013                             (translog_size_t) (log_pos - log_data) + length,
2014                             TRANSLOG_INTERNAL_PARTS + translog_parts,
2015                             log_array, log_data, NULL))
2016     DBUG_RETURN(1);
2017   DBUG_RETURN(0);
2018 }
2019 
2020 
2021 /**
2022    @brief Write log entry for page splitting
2023 
2024    @fn     _ma_log_split()
2025    @param
2026      ma_page		Page that is changed
2027      org_length	        Original length of page. Can be bigger than block_size
2028                         for block that overflowed
2029      new_length		New length of page
2030      key_pos		Where key is inserted on page (may be 0 if no key)
2031      key_length		Number of bytes changed at key_pos
2032      move_length	Number of bytes moved at key_pos to make room for key
2033      prefix_or_suffix   KEY_OP_NONE	    Ignored
2034    			KEY_OP_ADD_PREFIX   Add data to start of page
2035 			KEY_OP_ADD_SUFFIX   Add data to end of page
2036      data		What data was added
2037      data_length	Number of bytes added first or last
2038      changed_length	Number of bytes changed first or last.
2039 
2040    @note
2041      Write log entry for page that has got a key added to the page under
2042      one and only one of the following senarios:
2043      - Page is shortened from end
2044      - Data is added to end of page
2045      - Data added at front of page
2046 */
2047 
_ma_log_split(MARIA_PAGE * ma_page,uint org_length,uint new_length,const uchar * key_pos,uint key_length,int move_length,enum en_key_op prefix_or_suffix,const uchar * data,uint data_length,uint changed_length)2048 static my_bool _ma_log_split(MARIA_PAGE *ma_page,
2049                              uint org_length, uint new_length,
2050                              const uchar *key_pos, uint key_length,
2051                              int move_length, enum en_key_op prefix_or_suffix,
2052                              const uchar *data, uint data_length,
2053                              uint changed_length)
2054 {
2055   LSN lsn;
2056   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+3+3+3+3+2 +7];
2057   uchar *log_pos;
2058   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
2059   uint offset= (uint) (key_pos - ma_page->buff);
2060   uint translog_parts, extra_length;
2061   MARIA_HA *info= ma_page->info;
2062   my_off_t page= ma_page->pos / info->s->block_size;
2063   DBUG_ENTER("_ma_log_split");
2064   DBUG_PRINT("enter", ("page: %lu  org_length: %u  new_length: %u",
2065                        (ulong) page, org_length, new_length));
2066 
2067   DBUG_ASSERT(changed_length >= data_length);
2068   DBUG_ASSERT(org_length <= info->s->max_index_block_size);
2069   DBUG_ASSERT(new_length == ma_page->size);
2070   DBUG_ASSERT(org_length == ma_page->org_size);
2071 
2072   log_pos= log_data + FILEID_STORE_SIZE;
2073   page_store(log_pos, page);
2074   log_pos+= PAGE_STORE_SIZE;
2075 
2076 #ifdef EXTRA_DEBUG_KEY_CHANGES
2077   (*log_pos++)= KEY_OP_DEBUG;
2078   (*log_pos++)= KEY_OP_DEBUG_LOG_SPLIT;
2079 #endif
2080 
2081   /* Store keypage_flag */
2082   *log_pos++= KEY_OP_SET_PAGEFLAG;
2083   *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2084 
2085   if (new_length <= offset || !key_pos)
2086   {
2087     /*
2088       Page was split before inserted key. Write redo entry where
2089       we just cut current page at page_length
2090     */
2091     uint length_offset= org_length - new_length;
2092     log_pos[0]= KEY_OP_DEL_SUFFIX;
2093     int2store(log_pos+1, length_offset);
2094     log_pos+= 3;
2095     translog_parts= 1;
2096     extra_length= 0;
2097     DBUG_ASSERT(data_length == 0);
2098   }
2099   else
2100   {
2101     /* Key was added to page which was split after the inserted key */
2102     uint max_key_length;
2103 
2104     /*
2105       Handle case when split happened directly after the newly inserted key.
2106     */
2107     max_key_length= new_length - offset;
2108     extra_length= MY_MIN(key_length, max_key_length);
2109     if (offset + move_length > new_length)
2110     {
2111       /* This is true when move_length includes changes for next packed key */
2112       move_length= new_length - offset;
2113     }
2114 
2115     if ((int) new_length < (int) (org_length + move_length + data_length))
2116     {
2117       /* Shorten page */
2118       uint diff= org_length + move_length + data_length - new_length;
2119       log_pos[0]= KEY_OP_DEL_SUFFIX;
2120       int2store(log_pos + 1, diff);
2121       log_pos+= 3;
2122       DBUG_ASSERT(data_length == 0);            /* Page is shortened */
2123       DBUG_ASSERT(offset <= org_length - diff);
2124     }
2125     else
2126     {
2127       DBUG_ASSERT(new_length == org_length + move_length + data_length);
2128       DBUG_ASSERT(offset <= org_length);
2129     }
2130 
2131     log_pos[0]= KEY_OP_OFFSET;
2132     int2store(log_pos+1, offset);
2133     log_pos+= 3;
2134 
2135     if (move_length)
2136     {
2137       log_pos[0]= KEY_OP_SHIFT;
2138       int2store(log_pos+1, move_length);
2139       log_pos+= 3;
2140     }
2141 
2142     log_pos[0]= KEY_OP_CHANGE;
2143     int2store(log_pos+1, extra_length);
2144     log_pos+= 3;
2145 
2146     /* Point to original inserted key data */
2147     if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
2148       key_pos+= data_length;
2149 
2150     translog_parts= 2;
2151     log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
2152     log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extra_length;
2153   }
2154 
2155   if (data_length)
2156   {
2157     /* Add prefix or suffix */
2158     log_pos[0]= prefix_or_suffix;
2159     int2store(log_pos+1, data_length);
2160     log_pos+= 3;
2161     if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
2162     {
2163       int2store(log_pos+1, changed_length);
2164       log_pos+= 2;
2165       data_length= changed_length;
2166     }
2167     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str=    data;
2168     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= data_length;
2169     translog_parts++;
2170     extra_length+= data_length;
2171   }
2172 
2173   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
2174   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2175                                                          log_data);
2176 
2177   _ma_log_key_changes(ma_page,
2178                       log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2179                       log_pos, &extra_length, &translog_parts);
2180   /* Remember new page length for future log entires for same page */
2181   ma_page->org_size= ma_page->size;
2182 
2183   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2184                                     info->trn, info,
2185                                     (translog_size_t)
2186                                     log_array[TRANSLOG_INTERNAL_PARTS +
2187                                               0].length + extra_length,
2188                                     TRANSLOG_INTERNAL_PARTS + translog_parts,
2189                                     log_array, log_data, NULL));
2190 }
2191 
2192 
2193 /**
2194    @brief
2195    Write log entry for page that has got a key added to the page
2196    and page is shortened from start of page
2197 
2198    @fn _ma_log_del_prefix()
2199    @param info		Maria handler
2200    @param page		Page number
2201    @param buff		Page buffer
2202    @param org_length	Length of buffer when read
2203    @param new_length	Final length
2204    @param key_pos	Where on page buffer key was added. This is position
2205 			before prefix was removed
2206    @param key_length    How many bytes was changed at 'key_pos'
2207    @param move_length   How many bytes was moved up when key was added
2208 
2209    @return
2210    @retval  0  ok
2211    @retval  1  error
2212 */
2213 
_ma_log_del_prefix(MARIA_PAGE * ma_page,uint org_length,uint new_length,const uchar * key_pos,uint key_length,int move_length)2214 static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page,
2215                                   uint org_length, uint new_length,
2216                                   const uchar *key_pos, uint key_length,
2217                                   int move_length)
2218 {
2219   LSN lsn;
2220   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 12 + 7];
2221   uchar *log_pos;
2222   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
2223   uint offset= (uint) (key_pos - ma_page->buff);
2224   uint diff_length= org_length + move_length - new_length;
2225   uint translog_parts, extra_length;
2226   MARIA_HA *info= ma_page->info;
2227   my_off_t page= ma_page->pos / info->s->block_size;
2228   DBUG_ENTER("_ma_log_del_prefix");
2229   DBUG_PRINT("enter", ("page: %lu  org_length: %u  new_length: %u",
2230                        (ulong) page, org_length, new_length));
2231 
2232   DBUG_ASSERT((int) diff_length > 0);
2233   DBUG_ASSERT(ma_page->org_size == org_length);
2234   DBUG_ASSERT(ma_page->size == new_length);
2235 
2236   log_pos= log_data + FILEID_STORE_SIZE;
2237   page_store(log_pos, page);
2238   log_pos+= PAGE_STORE_SIZE;
2239 
2240   translog_parts= 1;
2241   extra_length= 0;
2242 
2243 #ifdef EXTRA_DEBUG_KEY_CHANGES
2244   *log_pos++= KEY_OP_DEBUG;
2245   *log_pos++= KEY_OP_DEBUG_LOG_DEL_PREFIX;
2246 #endif
2247 
2248   /* Store keypage_flag */
2249   *log_pos++= KEY_OP_SET_PAGEFLAG;
2250   *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2251 
2252   if (offset < diff_length + info->s->keypage_header)
2253   {
2254     /*
2255       Key is not anymore on page. Move data down, but take into account that
2256       the original page had grown with 'move_length bytes'
2257     */
2258     DBUG_ASSERT(offset + key_length <= diff_length + info->s->keypage_header);
2259 
2260     log_pos[0]= KEY_OP_DEL_PREFIX;
2261     int2store(log_pos+1, diff_length - move_length);
2262     log_pos+= 3;
2263   }
2264   else
2265   {
2266     /*
2267       Correct position to key, as data before key has been delete and key
2268       has thus been moved down
2269     */
2270     offset-= diff_length;
2271     key_pos-= diff_length;
2272 
2273     /* Move data down */
2274     log_pos[0]= KEY_OP_DEL_PREFIX;
2275     int2store(log_pos+1, diff_length);
2276     log_pos+= 3;
2277 
2278     log_pos[0]= KEY_OP_OFFSET;
2279     int2store(log_pos+1, offset);
2280     log_pos+= 3;
2281 
2282     if (move_length)
2283     {
2284       log_pos[0]= KEY_OP_SHIFT;
2285       int2store(log_pos+1, move_length);
2286       log_pos+= 3;
2287     }
2288     log_pos[0]= KEY_OP_CHANGE;
2289     int2store(log_pos+1, key_length);
2290     log_pos+= 3;
2291     log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
2292     log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
2293     translog_parts= 2;
2294     extra_length= key_length;
2295   }
2296   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
2297   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2298                                                          log_data);
2299   _ma_log_key_changes(ma_page,
2300                       log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2301                       log_pos, &extra_length, &translog_parts);
2302   /* Remember new page length for future log entires for same page */
2303   ma_page->org_size= ma_page->size;
2304 
2305   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2306                                     info->trn, info,
2307                                     (translog_size_t)
2308                                     log_array[TRANSLOG_INTERNAL_PARTS +
2309                                               0].length + extra_length,
2310                                     TRANSLOG_INTERNAL_PARTS + translog_parts,
2311                                     log_array, log_data, NULL));
2312 }
2313 
2314 
2315 /**
2316    @brief
2317    Write log entry for page that has got data added first and
2318    data deleted last. Old changed key may be part of page
2319 */
2320 
_ma_log_key_middle(MARIA_PAGE * ma_page,uint new_length,uint data_added_first,uint data_changed_first,uint data_deleted_last,const uchar * key_pos,uint key_length,int move_length)2321 static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page,
2322                                   uint new_length,
2323                                   uint data_added_first,
2324                                   uint data_changed_first,
2325                                   uint data_deleted_last,
2326                                   const uchar *key_pos,
2327                                   uint key_length, int move_length)
2328 {
2329   LSN lsn;
2330   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+5+3+3+3 + 7];
2331   uchar *log_pos;
2332   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
2333   uint key_offset;
2334   uint translog_parts, extra_length;
2335   MARIA_HA *info= ma_page->info;
2336   my_off_t page= ma_page->pos / info->s->block_size;
2337   DBUG_ENTER("_ma_log_key_middle");
2338   DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2339 
2340   DBUG_ASSERT(ma_page->size == new_length);
2341 
2342   /* new place of key after changes */
2343   key_pos+= data_added_first;
2344   key_offset= (uint) (key_pos - ma_page->buff);
2345   if (key_offset < new_length)
2346   {
2347     /* key is on page; Calculate how much of the key is there */
2348     uint max_key_length= new_length - key_offset;
2349     if (max_key_length < key_length)
2350     {
2351       /* Key is last on page */
2352       key_length= max_key_length;
2353       move_length= 0;
2354     }
2355     /*
2356       Take into account that new data was added as part of original key
2357       that also needs to be removed from page
2358     */
2359     data_deleted_last+= move_length;
2360   }
2361 
2362   /* First log changes to page */
2363   log_pos= log_data + FILEID_STORE_SIZE;
2364   page_store(log_pos, page);
2365   log_pos+= PAGE_STORE_SIZE;
2366 
2367 #ifdef EXTRA_DEBUG_KEY_CHANGES
2368   *log_pos++= KEY_OP_DEBUG;
2369   *log_pos++= KEY_OP_DEBUG_LOG_MIDDLE;
2370 #endif
2371 
2372   /* Store keypage_flag */
2373   *log_pos++= KEY_OP_SET_PAGEFLAG;
2374   *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2375 
2376   log_pos[0]= KEY_OP_DEL_SUFFIX;
2377   int2store(log_pos+1, data_deleted_last);
2378   log_pos+= 3;
2379 
2380   log_pos[0]= KEY_OP_ADD_PREFIX;
2381   int2store(log_pos+1, data_added_first);
2382   int2store(log_pos+3, data_changed_first);
2383   log_pos+= 5;
2384 
2385   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
2386   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2387                                                          log_data);
2388   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    (ma_page->buff +
2389                                                   info->s->keypage_header);
2390   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2391   translog_parts= 2;
2392   extra_length= data_changed_first;
2393 
2394   /* If changed key is on page, log those changes too */
2395 
2396   if (key_offset < new_length)
2397   {
2398     uchar *start_log_pos= log_pos;
2399 
2400     log_pos[0]= KEY_OP_OFFSET;
2401     int2store(log_pos+1, key_offset);
2402     log_pos+= 3;
2403     if (move_length)
2404     {
2405       log_pos[0]= KEY_OP_SHIFT;
2406       int2store(log_pos+1, move_length);
2407       log_pos+= 3;
2408     }
2409     log_pos[0]= KEY_OP_CHANGE;
2410     int2store(log_pos+1, key_length);
2411     log_pos+= 3;
2412 
2413     log_array[TRANSLOG_INTERNAL_PARTS + 2].str=    start_log_pos;
2414     log_array[TRANSLOG_INTERNAL_PARTS + 2].length= (uint) (log_pos -
2415                                                            start_log_pos);
2416 
2417     log_array[TRANSLOG_INTERNAL_PARTS + 3].str=    key_pos;
2418     log_array[TRANSLOG_INTERNAL_PARTS + 3].length= key_length;
2419     translog_parts+=2;
2420     extra_length+= (uint) (log_array[TRANSLOG_INTERNAL_PARTS + 2].length +
2421                            key_length);
2422   }
2423 
2424   _ma_log_key_changes(ma_page,
2425                       log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2426                       log_pos, &extra_length, &translog_parts);
2427   /* Remember new page length for future log entires for same page */
2428   ma_page->org_size= ma_page->size;
2429 
2430   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2431                                     info->trn, info,
2432                                     (translog_size_t)
2433                                     (log_array[TRANSLOG_INTERNAL_PARTS +
2434                                                0].length + extra_length),
2435                                     TRANSLOG_INTERNAL_PARTS + translog_parts,
2436                                     log_array, log_data, NULL));
2437 }
2438 
2439 
2440 #ifdef NOT_NEEDED
2441 
2442 /**
2443    @brief
2444    Write log entry for page that has got data added first and
2445    data deleted last
2446 */
2447 
_ma_log_middle(MARIA_PAGE * ma_page,uint data_added_first,uint data_changed_first,uint data_deleted_last)2448 static my_bool _ma_log_middle(MARIA_PAGE *ma_page,
2449                               uint data_added_first, uint data_changed_first,
2450                               uint data_deleted_last)
2451 {
2452   LSN lsn;
2453   LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
2454   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5 + 7], *log_pos;
2455   MARIA_HA *info= ma_page->info;
2456   my_off_t page= ma_page->page / info->s->block_size;
2457   uint translog_parts, extra_length;
2458   DBUG_ENTER("_ma_log_middle");
2459   DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2460 
2461   DBUG_ASSERT(ma_page->org_size + data_added_first - data_deleted_last ==
2462               ma_page->size);
2463 
2464   log_pos= log_data + FILEID_STORE_SIZE;
2465   page_store(log_pos, page);
2466   log_pos+= PAGE_STORE_SIZE;
2467 
2468   log_pos[0]= KEY_OP_DEL_PREFIX;
2469   int2store(log_pos+1, data_deleted_last);
2470   log_pos+= 3;
2471 
2472   log_pos[0]= KEY_OP_ADD_PREFIX;
2473   int2store(log_pos+1, data_added_first);
2474   int2store(log_pos+3, data_changed_first);
2475   log_pos+= 5;
2476 
2477   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
2478   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2479                                                          log_data);
2480 
2481   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    ((char*) buff +
2482                                                   info->s->keypage_header);
2483   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2484   translog_parts= 2;
2485   extra_length= data_changed_first;
2486 
2487   _ma_log_key_changes(ma_page,
2488                       log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2489                       log_pos, &extra_length, &translog_parts);
2490   /* Remember new page length for future log entires for same page */
2491   ma_page->org_size= ma_page->size;
2492 
2493   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2494                                     info->trn, info,
2495                                     (translog_size_t)
2496                                     log_array[TRANSLOG_INTERNAL_PARTS +
2497                                               0].length + extra_length,
2498                                     TRANSLOG_INTERNAL_PARTS + translog_parts,
2499                                     log_array, log_data, NULL));
2500 }
2501 #endif
2502