1 /*
2    Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
16 
17 /* Write a row to a MyISAM table */
18 
19 #include "fulltext.h"
20 #include "rt_index.h"
21 
22 #define MAX_POINTER_LENGTH 8
23 
24 	/* Functions declared in this file */
25 
26 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
27 		    uint comp_flag, uchar *key,
28 		    uint key_length, my_off_t pos, uchar *father_buff,
29 		    uchar *father_keypos, my_off_t father_page,
30 		    my_bool insert_last);
31 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
32 			    uchar *curr_buff,uchar *father_buff,
33 			    uchar *father_keypos,my_off_t father_page);
34 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
35 				uchar *key, uint *return_key_length,
36 				uchar **after_key);
37 int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
38 		      uint key_length);
39 int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
40 		       uint key_length);
41 
42 	/* Write new record to database */
43 
mi_write(MI_INFO * info,uchar * record)44 int mi_write(MI_INFO *info, uchar *record)
45 {
46   MYISAM_SHARE *share=info->s;
47   uint i;
48   int save_errno;
49   my_off_t filepos;
50   uchar *buff;
51   my_bool lock_tree= share->concurrent_insert;
52   DBUG_ENTER("mi_write");
53   DBUG_PRINT("enter",("isam: %d  data: %d",info->s->kfile,info->dfile));
54 
55   DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
56                   mi_print_error(info->s, HA_ERR_CRASHED);
57                   DBUG_RETURN(my_errno= HA_ERR_CRASHED););
58   if (share->options & HA_OPTION_READ_ONLY_DATA)
59   {
60     DBUG_RETURN(my_errno=EACCES);
61   }
62   if (_mi_readinfo(info,F_WRLCK,1))
63     DBUG_RETURN(my_errno);
64 
65   filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
66              !info->append_insert_at_end) ?
67 	    share->state.dellink :
68 	    info->state->data_file_length);
69 
70   if (share->base.reloc == (ha_rows) 1 &&
71       share->base.records == (ha_rows) 1 &&
72       info->state->records == (ha_rows) 1)
73   {						/* System file */
74     my_errno=HA_ERR_RECORD_FILE_FULL;
75     goto err2;
76   }
77   if (info->state->key_file_length >= share->base.margin_key_file_length)
78   {
79     my_errno=HA_ERR_INDEX_FILE_FULL;
80     goto err2;
81   }
82   if (_mi_mark_file_changed(info))
83     goto err2;
84 
85   /* Calculate and check all unique constraints */
86   if (mi_is_any_key_active(share->state.key_map))
87   {
88     for (i= 0 ; i < share->state.header.uniques ; i++)
89     {
90       if (mi_check_unique(info, share->uniqueinfo + i, record,
91                           mi_unique_hash(share->uniqueinfo + i, record),
92                           HA_OFFSET_ERROR))
93         goto err2;
94     }
95   }
96 
97 	/* Write all keys to indextree */
98 
99   buff=info->lastkey2;
100   for (i=0 ; i < share->base.keys ; i++)
101   {
102     if (mi_is_key_active(share->state.key_map, i))
103     {
104       my_bool local_lock_tree= (lock_tree &&
105                                 !(info->bulk_insert &&
106                                   is_tree_inited(&info->bulk_insert[i])));
107       if (local_lock_tree)
108       {
109         mysql_rwlock_wrlock(&share->key_root_lock[i]);
110 	share->keyinfo[i].version++;
111       }
112       if (share->keyinfo[i].flag & HA_FULLTEXT )
113       {
114         if (_mi_ft_add(info,i, buff, record, filepos))
115         {
116 	  if (local_lock_tree)
117             mysql_rwlock_unlock(&share->key_root_lock[i]);
118           DBUG_PRINT("error",("Got error: %d on write",my_errno));
119           goto err;
120         }
121       }
122       else
123       {
124         if (share->keyinfo[i].ck_insert(info,i,buff,
125 			_mi_make_key(info,i,buff,record,filepos)))
126         {
127           if (local_lock_tree)
128             mysql_rwlock_unlock(&share->key_root_lock[i]);
129           DBUG_PRINT("error",("Got error: %d on write",my_errno));
130           goto err;
131         }
132       }
133 
134       /* The above changed info->lastkey2. Inform mi_rnext_same(). */
135       info->update&= ~HA_STATE_RNEXT_SAME;
136 
137       if (local_lock_tree)
138         mysql_rwlock_unlock(&share->key_root_lock[i]);
139     }
140   }
141   if (share->calc_checksum)
142     info->checksum=(*share->calc_checksum)(info,record);
143   if (!(info->opt_flag & OPT_NO_ROWS))
144   {
145     if ((*share->write_record)(info,record))
146       goto err;
147     info->state->checksum+=info->checksum;
148   }
149   if (share->base.auto_key)
150     set_if_bigger(info->s->state.auto_increment,
151                   retrieve_auto_increment(info, record));
152   info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
153 		 HA_STATE_ROW_CHANGED);
154   info->state->records++;
155   info->lastpos=filepos;
156   myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
157   (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
158   if (info->invalidator != 0)
159   {
160     DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
161     (*info->invalidator)(info->filename);
162     info->invalidator=0;
163   }
164 
165   /*
166     Update status of the table. We need to do so after each row write
167     for the log tables, as we want the new row to become visible to
168     other threads as soon as possible. We don't lock mutex here
169     (as it is required by pthread memory visibility rules) as (1) it's
170     not critical to use outdated share->is_log_table value (2) locking
171     mutex here for every write is too expensive.
172   */
173   if (share->is_log_table)
174     mi_update_status((void*) info);
175 
176   DBUG_RETURN(0);
177 
178 err:
179   save_errno=my_errno;
180   if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
181       my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
182   {
183     if (info->bulk_insert)
184     {
185       uint j;
186       for (j=0 ; j < share->base.keys ; j++)
187         mi_flush_bulk_insert(info, j);
188     }
189     info->errkey= (int) i;
190     while ( i-- > 0)
191     {
192       if (mi_is_key_active(share->state.key_map, i))
193       {
194 	my_bool local_lock_tree= (lock_tree &&
195                                   !(info->bulk_insert &&
196                                     is_tree_inited(&info->bulk_insert[i])));
197 	if (local_lock_tree)
198           mysql_rwlock_wrlock(&share->key_root_lock[i]);
199 	if (share->keyinfo[i].flag & HA_FULLTEXT)
200         {
201           if (_mi_ft_del(info,i, buff,record,filepos))
202 	  {
203 	    if (local_lock_tree)
204               mysql_rwlock_unlock(&share->key_root_lock[i]);
205             break;
206 	  }
207         }
208         else
209 	{
210 	  uint key_length=_mi_make_key(info,i,buff,record,filepos);
211 	  if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
212 	  {
213 	    if (local_lock_tree)
214               mysql_rwlock_unlock(&share->key_root_lock[i]);
215 	    break;
216 	  }
217 	}
218 	if (local_lock_tree)
219           mysql_rwlock_unlock(&share->key_root_lock[i]);
220       }
221     }
222   }
223   else
224   {
225     mi_print_error(info->s, HA_ERR_CRASHED);
226     mi_mark_crashed(info);
227   }
228   info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
229   my_errno=save_errno;
230 err2:
231   save_errno=my_errno;
232   myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
233   (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
234   DBUG_RETURN(my_errno=save_errno);
235 } /* mi_write */
236 
237 
238 	/* Write one key to btree */
239 
_mi_ck_write(MI_INFO * info,uint keynr,uchar * key,uint key_length)240 int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
241 {
242   DBUG_ENTER("_mi_ck_write");
243 
244   if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
245   {
246     DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
247   }
248   else
249   {
250     DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
251   }
252 } /* _mi_ck_write */
253 
254 
255 /**********************************************************************
256  *                Normal insert code                                  *
257  **********************************************************************/
258 
_mi_ck_write_btree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)259 int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
260 		       uint key_length)
261 {
262   int error;
263   uint comp_flag;
264   MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
265   my_off_t  *root=&info->s->state.key_root[keynr];
266   DBUG_ENTER("_mi_ck_write_btree");
267 
268   if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
269     comp_flag=SEARCH_BIGGER;			/* Put after same key */
270   else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
271   {
272     comp_flag=SEARCH_FIND | SEARCH_UPDATE;	/* No duplicates */
273     if (keyinfo->flag & HA_NULL_ARE_EQUAL)
274       comp_flag|= SEARCH_NULL_ARE_EQUAL;
275   }
276   else
277     comp_flag=SEARCH_SAME;			/* Keys in rec-pos order */
278 
279   error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
280                                 root, comp_flag);
281   if (info->ft1_to_ft2)
282   {
283     if (!error)
284       error= _mi_ft_convert_to_ft2(info, keynr, key);
285     delete_dynamic(info->ft1_to_ft2);
286     my_free(info->ft1_to_ft2);
287     info->ft1_to_ft2=0;
288   }
289   DBUG_RETURN(error);
290 } /* _mi_ck_write_btree */
291 
_mi_ck_real_write_btree(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uint key_length,my_off_t * root,uint comp_flag)292 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
293     uchar *key, uint key_length, my_off_t *root, uint comp_flag)
294 {
295   int error;
296   DBUG_ENTER("_mi_ck_real_write_btree");
297   /* key_length parameter is used only if comp_flag is SEARCH_FIND */
298   if (*root == HA_OFFSET_ERROR ||
299       (error=w_search(info, keyinfo, comp_flag, key, key_length,
300 		      *root, (uchar *) 0, (uchar*) 0,
301 		      (my_off_t) 0, 1)) > 0)
302     error=_mi_enlarge_root(info,keyinfo,key,root);
303   DBUG_RETURN(error);
304 } /* _mi_ck_real_write_btree */
305 
306 
307 	/* Make a new root with key as only pointer */
308 
_mi_enlarge_root(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,my_off_t * root)309 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
310                      my_off_t *root)
311 {
312   uint t_length,nod_flag;
313   MI_KEY_PARAM s_temp;
314   MYISAM_SHARE *share=info->s;
315   DBUG_ENTER("_mi_enlarge_root");
316 
317   nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
318   _mi_kpointer(info,info->buff+2,*root); /* if nod */
319   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
320 				(uchar*) 0, (uchar*) 0, key,&s_temp);
321   mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
322   (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
323   info->buff_used=info->page_changed=1;		/* info->buff is used */
324   if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
325       _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
326     DBUG_RETURN(-1);
327   DBUG_RETURN(0);
328 } /* _mi_enlarge_root */
329 
330 
331 	/*
332 	  Search after a position for a key and store it there
333 	  Returns -1 = error
334 		   0  = ok
335 		   1  = key should be stored in higher tree
336 	*/
337 
w_search(register MI_INFO * info,register MI_KEYDEF * keyinfo,uint comp_flag,uchar * key,uint key_length,my_off_t page,uchar * father_buff,uchar * father_keypos,my_off_t father_page,my_bool insert_last)338 static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
339 		    uint comp_flag, uchar *key, uint key_length, my_off_t page,
340 		    uchar *father_buff, uchar *father_keypos,
341 		    my_off_t father_page, my_bool insert_last)
342 {
343   int error,flag;
344   uint nod_flag, search_key_length;
345   uchar *temp_buff,*keypos;
346   uchar keybuff[MI_MAX_KEY_BUFF];
347   my_bool was_last_key;
348   my_off_t next_page, dupp_key_pos;
349   DBUG_ENTER("w_search");
350   DBUG_PRINT("enter",("page: %ld", (long) page));
351 
352   search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
353   if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
354 				      MI_MAX_KEY_BUFF*2)))
355     DBUG_RETURN(-1);
356   if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
357     goto err;
358 
359   flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
360 			      comp_flag, &keypos, keybuff, &was_last_key);
361   nod_flag=mi_test_if_nod(temp_buff);
362   if (flag == 0)
363   {
364     uint tmp_key_length;
365 	/* get position to record with duplicated key */
366     tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
367     if (tmp_key_length)
368       dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
369     else
370       dupp_key_pos= HA_OFFSET_ERROR;
371 
372     if (keyinfo->flag & HA_FULLTEXT)
373     {
374       uint off;
375       int  subkeys;
376 
377       get_key_full_length_rdonly(off, keybuff);
378       subkeys=ft_sintXkorr(keybuff+off);
379       comp_flag=SEARCH_SAME;
380       if (subkeys >= 0)
381       {
382         /* normal word, one-level tree structure */
383         flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
384                                     USE_WHOLE_KEY, comp_flag,
385                                     &keypos, keybuff, &was_last_key);
386       }
387       else
388       {
389         /* popular word. two-level tree. going down */
390         my_off_t root=dupp_key_pos;
391         keyinfo=&info->s->ft2_keyinfo;
392         get_key_full_length_rdonly(off, key);
393         key+=off;
394         keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
395         error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
396                                       &root, comp_flag);
397         _mi_dpointer(info, keypos+HA_FT_WLEN, root);
398         subkeys--; /* should there be underflow protection ? */
399         DBUG_ASSERT(subkeys < 0);
400         ft_intXstore(keypos, subkeys);
401         if (!error)
402           error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
403         my_afree((uchar*) temp_buff);
404         DBUG_RETURN(error);
405       }
406     }
407     else /* not HA_FULLTEXT, normal HA_NOSAME key */
408     {
409       info->dupp_key_pos= dupp_key_pos;
410       my_afree((uchar*) temp_buff);
411       my_errno=HA_ERR_FOUND_DUPP_KEY;
412       DBUG_RETURN(-1);
413     }
414   }
415   if (flag == MI_FOUND_WRONG_KEY)
416     DBUG_RETURN(-1);
417   if (!was_last_key)
418     insert_last=0;
419   next_page=_mi_kpos(nod_flag,keypos);
420   if (next_page == HA_OFFSET_ERROR ||
421       (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
422 		      temp_buff, keypos, page, insert_last)) >0)
423   {
424     error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
425 		     father_keypos,father_page, insert_last);
426     if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
427       goto err;
428   }
429   my_afree((uchar*) temp_buff);
430   DBUG_RETURN(error);
431 err:
432   my_afree((uchar*) temp_buff);
433   DBUG_PRINT("exit",("Error: %d",my_errno));
434   DBUG_RETURN (-1);
435 } /* w_search */
436 
437 
438 /*
439   Insert new key.
440 
441   SYNOPSIS
442     _mi_insert()
443     info                        Open table information.
444     keyinfo                     Key definition information.
445     key                         New key.
446     anc_buff                    Key page (beginning).
447     key_pos                     Position in key page where to insert.
448     key_buff                    Copy of previous key.
449     father_buff                 parent key page for balancing.
450     father_key_pos              position in parent key page for balancing.
451     father_page                 position of parent key page in file.
452     insert_last                 If to append at end of page.
453 
454   DESCRIPTION
455     Insert new key at right of key_pos.
456 
457   RETURN
458     2           if key contains key to upper level.
459     0           OK.
460     < 0         Error.
461 */
462 
_mi_insert(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * anc_buff,uchar * key_pos,uchar * key_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page,my_bool insert_last)463 int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
464 	       uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
465                uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
466 	       my_bool insert_last)
467 {
468   uint a_length,nod_flag;
469   int t_length;
470   uchar *endpos, *prev_key;
471   MI_KEY_PARAM s_temp;
472   DBUG_ENTER("_mi_insert");
473   DBUG_PRINT("enter",("key_pos: 0x%lx", (long) key_pos));
474   DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
475 
476   nod_flag=mi_test_if_nod(anc_buff);
477   a_length=mi_getint(anc_buff);
478   endpos= anc_buff+ a_length;
479   prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
480   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
481 				(key_pos == endpos ? (uchar*) 0 : key_pos),
482 				prev_key, prev_key,
483 				key,&s_temp);
484 #ifndef DBUG_OFF
485   if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
486 					 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
487   {
488     DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
489   }
490   if (keyinfo->flag & HA_PACK_KEY)
491   {
492     DBUG_PRINT("test",("t_length: %d  ref_len: %d",
493 		       t_length,s_temp.ref_length));
494     DBUG_PRINT("test",("n_ref_len: %d  n_length: %d  key_pos: 0x%lx",
495 		       s_temp.n_ref_length,s_temp.n_length, (long) s_temp.key));
496   }
497 #endif
498   if (t_length > 0)
499   {
500     if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
501     {
502       mi_print_error(info->s, HA_ERR_CRASHED);
503       my_errno=HA_ERR_CRASHED;
504       DBUG_RETURN(-1);
505     }
506     bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
507   }
508   else
509   {
510     if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
511     {
512       mi_print_error(info->s, HA_ERR_CRASHED);
513       my_errno=HA_ERR_CRASHED;
514       DBUG_RETURN(-1);
515     }
516     bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
517   }
518   (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
519   a_length+=t_length;
520   mi_putint(anc_buff,a_length,nod_flag);
521   if (a_length <= keyinfo->block_length)
522   {
523     if (keyinfo->block_length - a_length < 32 &&
524         keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
525         info->s->base.key_reflength <= info->s->rec_reflength &&
526         info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
527     {
528       /*
529         Normal word. One-level tree. Page is almost full.
530         Let's consider converting.
531         We'll compare 'key' and the first key at anc_buff
532        */
533       uchar *a=key, *b=anc_buff+2+nod_flag;
534       uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
535       /* the very first key on the page is always unpacked */
536       DBUG_ASSERT((*b & 128) == 0);
537 #if HA_FT_MAXLEN >= 127
538       blen= mi_uint2korr(b); b+=2;
539 #else
540       blen= *b++;
541 #endif
542       get_key_length(alen,a);
543       DBUG_ASSERT(info->ft1_to_ft2==0);
544       if (alen == blen &&
545           ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0, 0)==0)
546       {
547         /* yup. converting */
548         info->ft1_to_ft2=(DYNAMIC_ARRAY *)
549           my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
550         my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50);
551 
552         /*
553           now, adding all keys from the page to dynarray
554           if the page is a leaf (if not keys will be deleted later)
555         */
556         if (!nod_flag)
557         {
558           /* let's leave the first key on the page, though, because
559              we cannot easily dispatch an empty page here */
560           b+=blen+ft2len+2;
561           for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
562           {
563             if (insert_dynamic(info->ft1_to_ft2, b))
564             {
565               mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
566               my_errno= HA_ERR_OUT_OF_MEM;
567               DBUG_RETURN(-1);
568             }
569           }
570 
571           /* fixing the page's length - it contains only one key now */
572           mi_putint(anc_buff,2+blen+ft2len+2,0);
573         }
574         /* the rest will be done when we're back from recursion */
575       }
576     }
577     DBUG_RETURN(0);				/* There is room on page */
578   }
579   /* Page is full */
580   if (nod_flag)
581     insert_last=0;
582   if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
583       father_buff && !insert_last)
584     DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
585 				 father_key_pos,father_page));
586   DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
587 } /* _mi_insert */
588 
589 
590 	/* split a full page in two and assign emerging item to key */
591 
_mi_split_page(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * buff,uchar * key_buff,my_bool insert_last_key)592 int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
593 		   uchar *key, uchar *buff, uchar *key_buff,
594 		   my_bool insert_last_key)
595 {
596   uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
597   uchar *key_pos,*pos, *after_key;
598   my_off_t new_pos;
599   MI_KEY_PARAM s_temp;
600   DBUG_ENTER("mi_split_page");
601   LINT_INIT(after_key);
602   DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
603 
604   if (info->s->keyinfo+info->lastinx == keyinfo)
605     info->page_changed=1;			/* Info->buff is used */
606   info->buff_used=1;
607   nod_flag=mi_test_if_nod(buff);
608   key_ref_length=2+nod_flag;
609   if (insert_last_key)
610     key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
611   else
612     key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
613 			      &after_key);
614   if (!key_pos)
615     DBUG_RETURN(-1);
616 
617   length=(uint) (key_pos-buff);
618   a_length=mi_getint(buff);
619   mi_putint(buff,length,nod_flag);
620 
621   key_pos=after_key;
622   if (nod_flag)
623   {
624     DBUG_PRINT("test",("Splitting nod"));
625     pos=key_pos-nod_flag;
626     memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
627   }
628 
629 	/* Move middle item to key and pointer to new page */
630   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
631     DBUG_RETURN(-1);
632   _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
633 
634 	/* Store new page */
635   if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
636     DBUG_RETURN(-1);
637 
638   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
639 				(uchar*) 0, (uchar*) 0,
640 				key_buff, &s_temp);
641   length=(uint) ((buff+a_length)-key_pos);
642   memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
643 	 (size_t) length);
644   (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
645   mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
646 
647   if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
648     DBUG_RETURN(-1);
649   DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
650   DBUG_RETURN(2);				/* Middle key up */
651 } /* _mi_split_page */
652 
653 
654 	/*
655 	  Calculate how to much to move to split a page in two
656 	  Returns pointer to start of key.
657 	  key will contain the key.
658 	  return_key_length will contain the length of key
659 	  after_key will contain the position to where the next key starts
660 	*/
661 
_mi_find_half_pos(uint nod_flag,MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)662 uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
663 			 uchar *key, uint *return_key_length,
664 			 uchar **after_key)
665 {
666   uint keys,length,key_ref_length;
667   uchar *end,*lastpos;
668   DBUG_ENTER("_mi_find_half_pos");
669 
670   key_ref_length=2+nod_flag;
671   length=mi_getint(page)-key_ref_length;
672   page+=key_ref_length;
673   if (!(keyinfo->flag &
674 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
675 	 HA_BINARY_PACK_KEY)))
676   {
677     key_ref_length=keyinfo->keylength+nod_flag;
678     keys=length/(key_ref_length*2);
679     *return_key_length=keyinfo->keylength;
680     end=page+keys*key_ref_length;
681     *after_key=end+key_ref_length;
682     memcpy(key,end,key_ref_length);
683     DBUG_RETURN(end);
684   }
685 
686   end=page+length/2-key_ref_length;		/* This is aprox. half */
687   *key='\0';
688   do
689   {
690     lastpos=page;
691     if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
692       DBUG_RETURN(0);
693   } while (page < end);
694   *return_key_length=length;
695   *after_key=page;
696   DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  half: 0x%lx",
697                      (long) lastpos, (long) page, (long) end));
698   DBUG_RETURN(lastpos);
699 } /* _mi_find_half_pos */
700 
701 
702 	/*
703 	  Split buffer at last key
704 	  Returns pointer to the start of the key before the last key
705 	  key will contain the last key
706 	*/
707 
_mi_find_last_pos(MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)708 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
709 				uchar *key, uint *return_key_length,
710 				uchar **after_key)
711 {
712   uint keys,length,UNINIT_VAR(last_length),key_ref_length;
713   uchar *end,*lastpos,*UNINIT_VAR(prevpos);
714   uchar key_buff[MI_MAX_KEY_BUFF];
715   DBUG_ENTER("_mi_find_last_pos");
716 
717   key_ref_length=2;
718   length=mi_getint(page)-key_ref_length;
719   page+=key_ref_length;
720   if (!(keyinfo->flag &
721 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
722 	 HA_BINARY_PACK_KEY)))
723   {
724     keys=length/keyinfo->keylength-2;
725     *return_key_length=length=keyinfo->keylength;
726     end=page+keys*length;
727     *after_key=end+length;
728     memcpy(key,end,length);
729     DBUG_RETURN(end);
730   }
731 
732   end=page+length-key_ref_length;
733   *key='\0';
734   length=0;
735   lastpos=page;
736   while (page < end)
737   {
738     prevpos=lastpos; lastpos=page;
739     last_length=length;
740     memcpy(key, key_buff, length);		/* previous key */
741     if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
742     {
743       mi_print_error(keyinfo->share, HA_ERR_CRASHED);
744       my_errno=HA_ERR_CRASHED;
745       DBUG_RETURN(0);
746     }
747   }
748   *return_key_length=last_length;
749   *after_key=lastpos;
750   DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  end: 0x%lx",
751                      (long) prevpos,(long) page,(long) end));
752   DBUG_RETURN(prevpos);
753 } /* _mi_find_last_pos */
754 
755 
756 	/* Balance page with not packed keys with page on right/left */
757 	/* returns 0 if balance was done */
758 
_mi_balance_page(register MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * curr_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page)759 static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
760 			    uchar *key, uchar *curr_buff, uchar *father_buff,
761 			    uchar *father_key_pos, my_off_t father_page)
762 {
763   my_bool right;
764   uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
765        right_length,left_length,new_right_length,new_left_length,extra_length,
766        length,keys;
767   uchar *pos,*buff,*extra_buff;
768   my_off_t next_page,new_pos;
769   uchar tmp_part_key[MI_MAX_KEY_BUFF];
770   DBUG_ENTER("_mi_balance_page");
771 
772   k_length=keyinfo->keylength;
773   father_length=mi_getint(father_buff);
774   father_keylength=k_length+info->s->base.key_reflength;
775   nod_flag=mi_test_if_nod(curr_buff);
776   curr_keylength=k_length+nod_flag;
777   info->page_changed=1;
778 
779   if ((father_key_pos != father_buff+father_length &&
780        (info->state->records & 1)) ||
781       father_key_pos == father_buff+2+info->s->base.key_reflength)
782   {
783     right=1;
784     next_page= _mi_kpos(info->s->base.key_reflength,
785 			father_key_pos+father_keylength);
786     buff=info->buff;
787     DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
788   }
789   else
790   {
791     right=0;
792     father_key_pos-=father_keylength;
793     next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
794 					/* Fix that curr_buff is to left */
795     buff=curr_buff; curr_buff=info->buff;
796     DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
797   }					/* father_key_pos ptr to parting key */
798 
799   if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
800     goto err;
801   DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
802 
803 	/* Test if there is room to share keys */
804 
805   left_length=mi_getint(curr_buff);
806   right_length=mi_getint(buff);
807   keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
808 
809   if ((right ? right_length : left_length) + curr_keylength <=
810       keyinfo->block_length)
811   {						/* Merge buffs */
812     new_left_length=2+nod_flag+(keys/2)*curr_keylength;
813     new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
814     mi_putint(curr_buff,new_left_length,nod_flag);
815     mi_putint(buff,new_right_length,nod_flag);
816 
817     if (left_length < new_left_length)
818     {						/* Move keys buff -> leaf */
819       pos=curr_buff+left_length;
820       memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
821       memcpy((uchar*) pos+k_length, (uchar*) buff+2,
822 	     (size_t) (length=new_left_length - left_length - k_length));
823       pos=buff+2+length;
824       memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
825       bmove((uchar*) buff + 2, (uchar*) pos + k_length, new_right_length - 2);
826     }
827     else
828     {						/* Move keys -> buff */
829 
830       bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
831 		right_length-2);
832       length=new_right_length-right_length-k_length;
833       memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
834       pos=curr_buff+new_left_length;
835       memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
836       memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
837     }
838 
839     if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
840 	_mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
841       goto err;
842     DBUG_RETURN(0);
843   }
844 
845 	/* curr_buff[] and buff[] are full, lets split and make new nod */
846 
847   extra_buff=info->buff+info->s->base.max_key_block_length;
848   new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
849   if (keys == 5)				/* Too few keys to balance */
850     new_left_length-=curr_keylength;
851   extra_length=nod_flag+left_length+right_length-
852     new_left_length-new_right_length-curr_keylength;
853   DBUG_PRINT("info",("left_length: %d  right_length: %d  new_left_length: %d  new_right_length: %d  extra_length: %d",
854 		     left_length, right_length,
855 		     new_left_length, new_right_length,
856 		     extra_length));
857   mi_putint(curr_buff,new_left_length,nod_flag);
858   mi_putint(buff,new_right_length,nod_flag);
859   mi_putint(extra_buff,extra_length+2,nod_flag);
860 
861   /* move first largest keys to new page  */
862   pos=buff+right_length-extra_length;
863   memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
864   /* Save new parting key */
865   memcpy(tmp_part_key, pos-k_length,k_length);
866   /* Make place for new keys */
867   bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
868 	    right_length-extra_length-k_length-2);
869   /* Copy keys from left page */
870   pos= curr_buff+new_left_length;
871   memcpy((uchar*) buff+2,(uchar*) pos+k_length,
872 	 (size_t) (length=left_length-new_left_length-k_length));
873   /* Copy old parting key */
874   memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
875 
876   /* Move new parting keys up to caller */
877   memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
878   memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
879 
880   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
881     goto err;
882   _mi_kpointer(info,key+k_length,new_pos);
883   if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
884 			DFLT_INIT_HITS,info->buff) ||
885       _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
886                         DFLT_INIT_HITS,extra_buff))
887     goto err;
888 
889   DBUG_RETURN(1);				/* Middle key up */
890 
891 err:
892   DBUG_RETURN(-1);
893 } /* _mi_balance_page */
894 
895 /**********************************************************************
896  *                Bulk insert code                                    *
897  **********************************************************************/
898 
899 typedef struct {
900   MI_INFO *info;
901   uint keynr;
902 } bulk_insert_param;
903 
_mi_ck_write_tree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)904 int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
905 		      uint key_length)
906 {
907   int error;
908   DBUG_ENTER("_mi_ck_write_tree");
909 
910   error= tree_insert(&info->bulk_insert[keynr], key,
911          key_length + info->s->rec_reflength,
912          info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
913 
914   DBUG_RETURN(error);
915 } /* _mi_ck_write_tree */
916 
917 
918 /* typeof(_mi_keys_compare)=qsort_cmp2 */
919 
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)920 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
921 {
922   uint not_used[2];
923   return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
924                     key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
925                     not_used);
926 }
927 
928 
keys_free(uchar * key,TREE_FREE mode,bulk_insert_param * param)929 static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
930 {
931   /*
932     Probably I can use info->lastkey here, but I'm not sure,
933     and to be safe I'd better use local lastkey.
934   */
935   uchar lastkey[MI_MAX_KEY_BUFF];
936   uint keylen;
937   MI_KEYDEF *keyinfo;
938 
939   switch (mode) {
940   case free_init:
941     if (param->info->s->concurrent_insert)
942     {
943       mysql_rwlock_wrlock(&param->info->s->key_root_lock[param->keynr]);
944       param->info->s->keyinfo[param->keynr].version++;
945     }
946     return 0;
947   case free_free:
948     keyinfo=param->info->s->keyinfo+param->keynr;
949     keylen=_mi_keylength(keyinfo, key);
950     memcpy(lastkey, key, keylen);
951     return _mi_ck_write_btree(param->info,param->keynr,lastkey,
952 			      keylen - param->info->s->rec_reflength);
953   case free_end:
954     if (param->info->s->concurrent_insert)
955       mysql_rwlock_unlock(&param->info->s->key_root_lock[param->keynr]);
956     return 0;
957   }
958   return -1;
959 }
960 
961 
mi_init_bulk_insert(MI_INFO * info,ulong cache_size,ha_rows rows)962 int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows)
963 {
964   MYISAM_SHARE *share=info->s;
965   MI_KEYDEF *key=share->keyinfo;
966   bulk_insert_param *params;
967   uint i, num_keys, total_keylength;
968   ulonglong key_map;
969   DBUG_ENTER("_mi_init_bulk_insert");
970   DBUG_PRINT("enter",("cache_size: %lu", cache_size));
971 
972   DBUG_ASSERT(!info->bulk_insert &&
973 	      (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
974 
975   mi_clear_all_keys_active(key_map);
976   for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
977   {
978     if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
979         mi_is_key_active(share->state.key_map, i))
980     {
981       num_keys++;
982       mi_set_key_active(key_map, i);
983       total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
984     }
985   }
986 
987   if (num_keys==0 ||
988       num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
989     DBUG_RETURN(0);
990 
991   if (rows && rows*total_keylength < cache_size)
992     cache_size= (ulong)rows;
993   else
994     cache_size/=total_keylength*16;
995 
996   info->bulk_insert=(TREE *)
997     my_malloc((sizeof(TREE)*share->base.keys+
998                sizeof(bulk_insert_param)*num_keys),MYF(0));
999 
1000   if (!info->bulk_insert)
1001     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1002 
1003   params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1004   for (i=0 ; i < share->base.keys ; i++)
1005   {
1006     if (mi_is_key_active(key_map, i))
1007     {
1008       params->info=info;
1009       params->keynr=i;
1010       /* Only allocate a 16'th of the buffer at a time */
1011       init_tree(&info->bulk_insert[i],
1012                 cache_size * key[i].maxlength,
1013                 cache_size * key[i].maxlength, 0,
1014 		(qsort_cmp2)keys_compare, 0,
1015 		(tree_element_free) keys_free, (void *)params++);
1016     }
1017     else
1018      info->bulk_insert[i].root=0;
1019   }
1020 
1021   DBUG_RETURN(0);
1022 }
1023 
mi_flush_bulk_insert(MI_INFO * info,uint inx)1024 void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1025 {
1026   if (info->bulk_insert)
1027   {
1028     if (is_tree_inited(&info->bulk_insert[inx]))
1029       reset_tree(&info->bulk_insert[inx]);
1030   }
1031 }
1032 
mi_end_bulk_insert(MI_INFO * info)1033 void mi_end_bulk_insert(MI_INFO *info)
1034 {
1035   if (info->bulk_insert)
1036   {
1037     uint i;
1038     for (i=0 ; i < info->s->base.keys ; i++)
1039     {
1040       if (is_tree_inited(& info->bulk_insert[i]))
1041       {
1042         delete_tree(& info->bulk_insert[i]);
1043       }
1044     }
1045     my_free(info->bulk_insert);
1046     info->bulk_insert=0;
1047   }
1048 }
1049