1 /*
2    Copyright (c) 2000, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 /* Write a row to a MyISAM table */
25 
26 #include "fulltext.h"
27 #include "rt_index.h"
28 
29 #define MAX_POINTER_LENGTH 8
30 
31 	/* Functions declared in this file */
32 
33 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
34 		    uint comp_flag, uchar *key,
35 		    uint key_length, my_off_t pos, uchar *father_buff,
36 		    uchar *father_keypos, my_off_t father_page,
37 		    my_bool insert_last);
38 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
39 			    uchar *curr_buff,uchar *father_buff,
40 			    uchar *father_keypos,my_off_t father_page);
41 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
42 				uchar *key, uint *return_key_length,
43 				uchar **after_key);
44 int _mi_ck_write_tree(MI_INFO *info, uint keynr,uchar *key,
45 		      uint key_length);
46 int _mi_ck_write_btree(MI_INFO *info, uint keynr,uchar *key,
47 		       uint key_length);
48 
49 	/* Write new record to database */
50 
mi_write(MI_INFO * info,uchar * record)51 int mi_write(MI_INFO *info, uchar *record)
52 {
53   MYISAM_SHARE *share=info->s;
54   uint i;
55   int save_errno;
56   my_off_t filepos;
57   uchar *buff;
58   my_bool lock_tree= share->concurrent_insert;
59   DBUG_ENTER("mi_write");
60   DBUG_PRINT("enter",("isam: %d  data: %d",info->s->kfile,info->dfile));
61 
62   DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
63                   mi_print_error(info->s, HA_ERR_CRASHED);
64                   set_my_errno(HA_ERR_CRASHED);
65                   DBUG_RETURN(HA_ERR_CRASHED););
66   if (share->options & HA_OPTION_READ_ONLY_DATA)
67   {
68     set_my_errno(EACCES);
69     DBUG_RETURN(EACCES);
70   }
71   if (_mi_readinfo(info,F_WRLCK,1))
72     DBUG_RETURN(my_errno());
73 
74   filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
75              !info->append_insert_at_end) ?
76 	    share->state.dellink :
77 	    info->state->data_file_length);
78 
79   if (share->base.reloc == (ha_rows) 1 &&
80       share->base.records == (ha_rows) 1 &&
81       info->state->records == (ha_rows) 1)
82   {						/* System file */
83     set_my_errno(HA_ERR_RECORD_FILE_FULL);
84     goto err2;
85   }
86   if (info->state->key_file_length >= share->base.margin_key_file_length)
87   {
88     set_my_errno(HA_ERR_INDEX_FILE_FULL);
89     goto err2;
90   }
91   if (_mi_mark_file_changed(info))
92     goto err2;
93 
94   /* Calculate and check all unique constraints */
95   if (mi_is_any_key_active(share->state.key_map))
96   {
97     for (i= 0 ; i < share->state.header.uniques ; i++)
98     {
99       if (mi_check_unique(info, share->uniqueinfo + i, record,
100                           mi_unique_hash(share->uniqueinfo + i, record),
101                           HA_OFFSET_ERROR))
102         goto err2;
103     }
104   }
105 
106 	/* Write all keys to indextree */
107 
108   buff=info->lastkey2;
109   for (i=0 ; i < share->base.keys ; i++)
110   {
111     if (mi_is_key_active(share->state.key_map, i))
112     {
113       my_bool local_lock_tree= (lock_tree &&
114                                 !(info->bulk_insert &&
115                                   is_tree_inited(&info->bulk_insert[i])));
116       if (local_lock_tree)
117       {
118         mysql_rwlock_wrlock(&share->key_root_lock[i]);
119 	share->keyinfo[i].version++;
120       }
121       if (share->keyinfo[i].flag & HA_FULLTEXT )
122       {
123         if (_mi_ft_add(info,i, buff, record, filepos))
124         {
125 	  if (local_lock_tree)
126             mysql_rwlock_unlock(&share->key_root_lock[i]);
127           DBUG_PRINT("error",("Got error: %d on write",my_errno()));
128           goto err;
129         }
130       }
131       else
132       {
133         if (share->keyinfo[i].ck_insert(info,i,buff,
134 			_mi_make_key(info,i,buff,record,filepos)))
135         {
136           if (local_lock_tree)
137             mysql_rwlock_unlock(&share->key_root_lock[i]);
138           DBUG_PRINT("error",("Got error: %d on write",my_errno()));
139           goto err;
140         }
141       }
142 
143       if (local_lock_tree)
144         mysql_rwlock_unlock(&share->key_root_lock[i]);
145     }
146   }
147   if (share->calc_checksum)
148     info->checksum=(*share->calc_checksum)(info,record);
149   if (!(info->opt_flag & OPT_NO_ROWS))
150   {
151     if ((*share->write_record)(info,record))
152       goto err;
153     info->state->checksum+=info->checksum;
154   }
155   if (share->base.auto_key)
156     set_if_bigger(info->s->state.auto_increment,
157                   retrieve_auto_increment(info, record));
158   info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
159 		 HA_STATE_ROW_CHANGED);
160   info->state->records++;
161   info->lastpos=filepos;
162   myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
163   (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
164   if (info->invalidator != 0)
165   {
166     DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
167     (*info->invalidator)(info->filename);
168     info->invalidator=0;
169   }
170 
171   /*
172     Update status of the table. We need to do so after each row write
173     for the log tables, as we want the new row to become visible to
174     other threads as soon as possible. We don't lock mutex here
175     (as it is required by pthread memory visibility rules) as (1) it's
176     not critical to use outdated share->is_log_table value (2) locking
177     mutex here for every write is too expensive.
178   */
179   if (share->is_log_table)
180     mi_update_status((void*) info);
181 
182   DBUG_RETURN(0);
183 
184 err:
185   save_errno=my_errno();
186   if (my_errno() == HA_ERR_FOUND_DUPP_KEY || my_errno() == HA_ERR_RECORD_FILE_FULL ||
187       my_errno() == HA_ERR_NULL_IN_SPATIAL || my_errno() == HA_ERR_OUT_OF_MEM)
188   {
189     if (info->bulk_insert)
190     {
191       uint j;
192       for (j=0 ; j < share->base.keys ; j++)
193         mi_flush_bulk_insert(info, j);
194     }
195     info->errkey= (int) i;
196     while ( i-- > 0)
197     {
198       if (mi_is_key_active(share->state.key_map, i))
199       {
200 	my_bool local_lock_tree= (lock_tree &&
201                                   !(info->bulk_insert &&
202                                     is_tree_inited(&info->bulk_insert[i])));
203 	if (local_lock_tree)
204           mysql_rwlock_wrlock(&share->key_root_lock[i]);
205 	if (share->keyinfo[i].flag & HA_FULLTEXT)
206         {
207           if (_mi_ft_del(info,i, buff,record,filepos))
208 	  {
209 	    if (local_lock_tree)
210               mysql_rwlock_unlock(&share->key_root_lock[i]);
211             break;
212 	  }
213         }
214         else
215 	{
216 	  uint key_length=_mi_make_key(info,i,buff,record,filepos);
217 	  if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
218 	  {
219 	    if (local_lock_tree)
220               mysql_rwlock_unlock(&share->key_root_lock[i]);
221 	    break;
222 	  }
223 	}
224 	if (local_lock_tree)
225           mysql_rwlock_unlock(&share->key_root_lock[i]);
226       }
227     }
228   }
229   else
230   {
231     mi_print_error(info->s, HA_ERR_CRASHED);
232     mi_mark_crashed(info);
233   }
234   info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
235   set_my_errno(save_errno);
236 err2:
237   save_errno=my_errno();
238   myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno());
239   (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
240   set_my_errno(save_errno);
241   DBUG_RETURN(save_errno);
242 } /* mi_write */
243 
244 
245 	/* Write one key to btree */
246 
_mi_ck_write(MI_INFO * info,uint keynr,uchar * key,uint key_length)247 int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
248 {
249   DBUG_ENTER("_mi_ck_write");
250 
251   if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
252   {
253     DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
254   }
255   else
256   {
257     DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
258   }
259 } /* _mi_ck_write */
260 
261 
262 /**********************************************************************
263  *                Normal insert code                                  *
264  **********************************************************************/
265 
_mi_ck_write_btree(MI_INFO * info,uint keynr,uchar * key,uint key_length)266 int _mi_ck_write_btree(MI_INFO *info, uint keynr, uchar *key,
267 		       uint key_length)
268 {
269   int error;
270   uint comp_flag;
271   MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
272   my_off_t  *root=&info->s->state.key_root[keynr];
273   DBUG_ENTER("_mi_ck_write_btree");
274 
275   if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
276     comp_flag=SEARCH_BIGGER;			/* Put after same key */
277   else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
278   {
279     comp_flag=SEARCH_FIND | SEARCH_UPDATE;	/* No duplicates */
280     if (keyinfo->flag & HA_NULL_ARE_EQUAL)
281       comp_flag|= SEARCH_NULL_ARE_EQUAL;
282   }
283   else
284     comp_flag=SEARCH_SAME;			/* Keys in rec-pos order */
285 
286   error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
287                                 root, comp_flag);
288   if (info->ft1_to_ft2)
289   {
290     if (!error)
291       error= _mi_ft_convert_to_ft2(info, keynr, key);
292     delete_dynamic(info->ft1_to_ft2);
293     my_free(info->ft1_to_ft2);
294     info->ft1_to_ft2=0;
295   }
296   DBUG_RETURN(error);
297 } /* _mi_ck_write_btree */
298 
_mi_ck_real_write_btree(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uint key_length,my_off_t * root,uint comp_flag)299 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
300     uchar *key, uint key_length, my_off_t *root, uint comp_flag)
301 {
302   int error;
303   DBUG_ENTER("_mi_ck_real_write_btree");
304   /* key_length parameter is used only if comp_flag is SEARCH_FIND */
305   if (*root == HA_OFFSET_ERROR ||
306       (error=w_search(info, keyinfo, comp_flag, key, key_length,
307 		      *root, (uchar *) 0, (uchar*) 0,
308 		      (my_off_t) 0, 1)) > 0)
309     error=_mi_enlarge_root(info,keyinfo,key,root);
310   DBUG_RETURN(error);
311 } /* _mi_ck_real_write_btree */
312 
313 
314 	/* Make a new root with key as only pointer */
315 
_mi_enlarge_root(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,my_off_t * root)316 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
317                      my_off_t *root)
318 {
319   uint t_length,nod_flag;
320   MI_KEY_PARAM s_temp;
321   MYISAM_SHARE *share=info->s;
322   DBUG_ENTER("_mi_enlarge_root");
323 
324   nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
325   _mi_kpointer(info,info->buff+2,*root); /* if nod */
326   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
327 				(uchar*) 0, (uchar*) 0, key,&s_temp);
328   mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
329   (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
330   info->buff_used=info->page_changed=1;		/* info->buff is used */
331   if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
332       _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
333     DBUG_RETURN(-1);
334   DBUG_RETURN(0);
335 } /* _mi_enlarge_root */
336 
337 
338 	/*
339 	  Search after a position for a key and store it there
340 	  Returns -1 = error
341 		   0  = ok
342 		   1  = key should be stored in higher tree
343 	*/
344 
w_search(MI_INFO * info,MI_KEYDEF * keyinfo,uint comp_flag,uchar * key,uint key_length,my_off_t page,uchar * father_buff,uchar * father_keypos,my_off_t father_page,my_bool insert_last)345 static int w_search(MI_INFO *info, MI_KEYDEF *keyinfo,
346 		    uint comp_flag, uchar *key, uint key_length, my_off_t page,
347 		    uchar *father_buff, uchar *father_keypos,
348 		    my_off_t father_page, my_bool insert_last)
349 {
350   int error,flag;
351   uint nod_flag, search_key_length;
352   uchar *temp_buff,*keypos;
353   uchar keybuff[MI_MAX_KEY_BUFF];
354   my_bool was_last_key;
355   my_off_t next_page, dupp_key_pos;
356   DBUG_ENTER("w_search");
357   DBUG_PRINT("enter",("page: %ld", (long) page));
358 
359   search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
360   if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
361 				      MI_MAX_KEY_BUFF*2)))
362     DBUG_RETURN(-1);
363   if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
364     goto err;
365 
366   flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
367 			      comp_flag, &keypos, keybuff, &was_last_key);
368   nod_flag=mi_test_if_nod(temp_buff);
369   if (flag == 0)
370   {
371     uint tmp_key_length;
372 	/* get position to record with duplicated key */
373     tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
374     if (tmp_key_length)
375       dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
376     else
377       dupp_key_pos= HA_OFFSET_ERROR;
378 
379     if (keyinfo->flag & HA_FULLTEXT)
380     {
381       uint off;
382       int  subkeys;
383 
384       get_key_full_length_rdonly(off, keybuff);
385       subkeys=ft_sintXkorr(keybuff+off);
386       comp_flag=SEARCH_SAME;
387       if (subkeys >= 0)
388       {
389         /* normal word, one-level tree structure */
390         flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
391                                     USE_WHOLE_KEY, comp_flag,
392                                     &keypos, keybuff, &was_last_key);
393       }
394       else
395       {
396         /* popular word. two-level tree. going down */
397         my_off_t root=dupp_key_pos;
398         keyinfo=&info->s->ft2_keyinfo;
399         get_key_full_length_rdonly(off, key);
400         key+=off;
401         keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
402         error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
403                                       &root, comp_flag);
404         _mi_dpointer(info, keypos+HA_FT_WLEN, root);
405         subkeys--; /* should there be underflow protection ? */
406         assert(subkeys < 0);
407         ft_intXstore(keypos, subkeys);
408         if (!error)
409           error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
410         DBUG_RETURN(error);
411       }
412     }
413     else /* not HA_FULLTEXT, normal HA_NOSAME key */
414     {
415       info->dupp_key_pos= dupp_key_pos;
416       set_my_errno(HA_ERR_FOUND_DUPP_KEY);
417       DBUG_RETURN(-1);
418     }
419   }
420   if (flag == MI_FOUND_WRONG_KEY)
421     DBUG_RETURN(-1);
422   if (!was_last_key)
423     insert_last=0;
424   next_page=_mi_kpos(nod_flag,keypos);
425   if (next_page == HA_OFFSET_ERROR ||
426       (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
427 		      temp_buff, keypos, page, insert_last)) >0)
428   {
429     error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
430 		     father_keypos,father_page, insert_last);
431     if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
432       goto err;
433   }
434   DBUG_RETURN(error);
435 err:
436   DBUG_PRINT("exit",("Error: %d",my_errno()));
437   DBUG_RETURN (-1);
438 } /* w_search */
439 
440 
441 /*
442   Insert new key.
443 
444   SYNOPSIS
445     _mi_insert()
446     info                        Open table information.
447     keyinfo                     Key definition information.
448     key                         New key.
449     anc_buff                    Key page (beginning).
450     key_pos                     Position in key page where to insert.
451     key_buff                    Copy of previous key.
452     father_buff                 parent key page for balancing.
453     father_key_pos              position in parent key page for balancing.
454     father_page                 position of parent key page in file.
455     insert_last                 If to append at end of page.
456 
457   DESCRIPTION
458     Insert new key at right of key_pos.
459 
460   RETURN
461     2           if key contains key to upper level.
462     0           OK.
463     < 0         Error.
464 */
465 
_mi_insert(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * anc_buff,uchar * key_pos,uchar * key_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page,my_bool insert_last)466 int _mi_insert(MI_INFO *info, MI_KEYDEF *keyinfo,
467 	       uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
468                uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
469 	       my_bool insert_last)
470 {
471   uint a_length,nod_flag;
472   int t_length;
473   uchar *endpos, *prev_key;
474   MI_KEY_PARAM s_temp;
475   DBUG_ENTER("_mi_insert");
476   DBUG_PRINT("enter",("key_pos: 0x%lx", (long) key_pos));
477   DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
478 
479   nod_flag=mi_test_if_nod(anc_buff);
480   a_length=mi_getint(anc_buff);
481   endpos= anc_buff+ a_length;
482   prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
483   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
484 				(key_pos == endpos ? (uchar*) 0 : key_pos),
485 				prev_key, prev_key,
486 				key,&s_temp);
487 #ifndef NDEBUG
488   if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
489 					 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
490   {
491     DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
492   }
493   if (keyinfo->flag & HA_PACK_KEY)
494   {
495     DBUG_PRINT("test",("t_length: %d  ref_len: %d",
496 		       t_length,s_temp.ref_length));
497     DBUG_PRINT("test",("n_ref_len: %d  n_length: %d  key_pos: 0x%lx",
498 		       s_temp.n_ref_length,s_temp.n_length, (long) s_temp.key));
499   }
500 #endif
501   if (t_length > 0)
502   {
503     if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
504     {
505       mi_print_error(info->s, HA_ERR_CRASHED);
506       set_my_errno(HA_ERR_CRASHED);
507       DBUG_RETURN(-1);
508     }
509     memmove(key_pos + t_length, key_pos, (size_t) (endpos - key_pos));
510   }
511   else
512   {
513     if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
514     {
515       mi_print_error(info->s, HA_ERR_CRASHED);
516       set_my_errno(HA_ERR_CRASHED);
517       DBUG_RETURN(-1);
518     }
519     memmove(key_pos, key_pos - t_length, (uint) (endpos - key_pos) + t_length);
520   }
521   (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
522   a_length+=t_length;
523   mi_putint(anc_buff,a_length,nod_flag);
524   if (a_length <= keyinfo->block_length)
525   {
526     if (keyinfo->block_length - a_length < 32 &&
527         keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
528         info->s->base.key_reflength <= info->s->rec_reflength &&
529         info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
530     {
531       /*
532         Normal word. One-level tree. Page is almost full.
533         Let's consider converting.
534         We'll compare 'key' and the first key at anc_buff
535        */
536       uchar *a=key, *b=anc_buff+2+nod_flag;
537       uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
538       /* the very first key on the page is always unpacked */
539       assert((*b & 128) == 0);
540 #if HA_FT_MAXLEN >= 127 /* TODO: Undefined symbol */
541       blen= mi_uint2korr(b); b+=2;
542 #else
543       blen= *b++;
544 #endif
545       get_key_length(alen,a);
546       assert(info->ft1_to_ft2==0);
547       if (alen == blen &&
548           ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0, 0)==0)
549       {
550         /* yup. converting */
551         info->ft1_to_ft2=(DYNAMIC_ARRAY *)
552           my_malloc(mi_key_memory_MI_INFO_ft1_to_ft2,
553                     sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
554         my_init_dynamic_array(info->ft1_to_ft2,
555                               mi_key_memory_MI_INFO_ft1_to_ft2,
556                               ft2len, NULL, 300, 50);
557 
558         /*
559           now, adding all keys from the page to dynarray
560           if the page is a leaf (if not keys will be deleted later)
561         */
562         if (!nod_flag)
563         {
564           /* let's leave the first key on the page, though, because
565              we cannot easily dispatch an empty page here */
566           b+=blen+ft2len+2;
567           for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
568           {
569             if (insert_dynamic(info->ft1_to_ft2, b))
570             {
571               mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
572               set_my_errno(HA_ERR_OUT_OF_MEM);
573               DBUG_RETURN(-1);
574             }
575           }
576 
577           /* fixing the page's length - it contains only one key now */
578           mi_putint(anc_buff,2+blen+ft2len+2,0);
579         }
580         /* the rest will be done when we're back from recursion */
581       }
582     }
583     DBUG_RETURN(0);				/* There is room on page */
584   }
585   /* Page is full */
586   if (nod_flag)
587     insert_last=0;
588   if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
589       father_buff && !insert_last)
590     DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
591 				 father_key_pos,father_page));
592   DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
593 } /* _mi_insert */
594 
595 
596 	/* split a full page in two and assign emerging item to key */
597 
_mi_split_page(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * buff,uchar * key_buff,my_bool insert_last_key)598 int _mi_split_page(MI_INFO *info, MI_KEYDEF *keyinfo,
599 		   uchar *key, uchar *buff, uchar *key_buff,
600 		   my_bool insert_last_key)
601 {
602   uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
603   uchar *key_pos,*pos, *after_key= NULL;
604   my_off_t new_pos;
605   MI_KEY_PARAM s_temp;
606   DBUG_ENTER("mi_split_page");
607   DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
608 
609   if (info->s->keyinfo+info->lastinx == keyinfo)
610     info->page_changed=1;			/* Info->buff is used */
611   info->buff_used=1;
612   nod_flag=mi_test_if_nod(buff);
613   key_ref_length=2+nod_flag;
614   if (insert_last_key)
615     key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
616   else
617     key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
618 			      &after_key);
619   if (!key_pos)
620     DBUG_RETURN(-1);
621 
622   length=(uint) (key_pos-buff);
623   a_length=mi_getint(buff);
624   mi_putint(buff,length,nod_flag);
625 
626   key_pos=after_key;
627   if (nod_flag)
628   {
629     DBUG_PRINT("test",("Splitting nod"));
630     pos=key_pos-nod_flag;
631     memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
632   }
633 
634 	/* Move middle item to key and pointer to new page */
635   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
636     DBUG_RETURN(-1);
637   _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
638 
639 	/* Store new page */
640   if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
641     DBUG_RETURN(-1);
642 
643   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
644 				(uchar*) 0, (uchar*) 0,
645 				key_buff, &s_temp);
646   length=(uint) ((buff+a_length)-key_pos);
647   memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
648 	 (size_t) length);
649   (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
650   mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
651 
652   if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
653     DBUG_RETURN(-1);
654   DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
655   DBUG_RETURN(2);				/* Middle key up */
656 } /* _mi_split_page */
657 
658 
659 	/*
660 	  Calculate how to much to move to split a page in two
661 	  Returns pointer to start of key.
662 	  key will contain the key.
663 	  return_key_length will contain the length of key
664 	  after_key will contain the position to where the next key starts
665 	*/
666 
_mi_find_half_pos(uint nod_flag,MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)667 uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
668 			 uchar *key, uint *return_key_length,
669 			 uchar **after_key)
670 {
671   uint keys,length,key_ref_length;
672   uchar *end,*lastpos;
673   DBUG_ENTER("_mi_find_half_pos");
674 
675   key_ref_length=2+nod_flag;
676   length=mi_getint(page)-key_ref_length;
677   page+=key_ref_length;
678   if (!(keyinfo->flag &
679 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
680 	 HA_BINARY_PACK_KEY)))
681   {
682     key_ref_length=keyinfo->keylength+nod_flag;
683     keys=length/(key_ref_length*2);
684     *return_key_length=keyinfo->keylength;
685     end=page+keys*key_ref_length;
686     *after_key=end+key_ref_length;
687     memcpy(key,end,key_ref_length);
688     DBUG_RETURN(end);
689   }
690 
691   end=page+length/2-key_ref_length;		/* This is aprox. half */
692   *key='\0';
693   do
694   {
695     lastpos=page;
696     if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
697       DBUG_RETURN(0);
698   } while (page < end);
699   *return_key_length=length;
700   *after_key=page;
701   DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  half: 0x%lx",
702                      (long) lastpos, (long) page, (long) end));
703   DBUG_RETURN(lastpos);
704 } /* _mi_find_half_pos */
705 
706 
707 	/*
708 	  Split buffer at last key
709 	  Returns pointer to the start of the key before the last key
710 	  key will contain the last key
711 	*/
712 
_mi_find_last_pos(MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)713 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
714 				uchar *key, uint *return_key_length,
715 				uchar **after_key)
716 {
717   uint keys, length, last_length= 0, key_ref_length;
718   uchar *end, *lastpos, *prevpos= NULL;
719   uchar key_buff[MI_MAX_KEY_BUFF];
720   DBUG_ENTER("_mi_find_last_pos");
721 
722   key_ref_length=2;
723   length=mi_getint(page)-key_ref_length;
724   page+=key_ref_length;
725   if (!(keyinfo->flag &
726 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
727 	 HA_BINARY_PACK_KEY)))
728   {
729     keys=length/keyinfo->keylength-2;
730     *return_key_length=length=keyinfo->keylength;
731     end=page+keys*length;
732     *after_key=end+length;
733     memcpy(key,end,length);
734     DBUG_RETURN(end);
735   }
736 
737   end=page+length-key_ref_length;
738   *key='\0';
739   length=0;
740   lastpos=page;
741   while (page < end)
742   {
743     prevpos=lastpos; lastpos=page;
744     last_length=length;
745     memcpy(key, key_buff, length);		/* previous key */
746     if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
747     {
748       mi_print_error(keyinfo->share, HA_ERR_CRASHED);
749       set_my_errno(HA_ERR_CRASHED);
750       DBUG_RETURN(0);
751     }
752   }
753   *return_key_length=last_length;
754   *after_key=lastpos;
755   DBUG_PRINT("exit",("returns: 0x%lx  page: 0x%lx  end: 0x%lx",
756                      (long) prevpos,(long) page,(long) end));
757   DBUG_RETURN(prevpos);
758 } /* _mi_find_last_pos */
759 
760 
761 	/* Balance page with not packed keys with page on right/left */
762 	/* returns 0 if balance was done */
763 
_mi_balance_page(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * curr_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page)764 static int _mi_balance_page(MI_INFO *info, MI_KEYDEF *keyinfo,
765 			    uchar *key, uchar *curr_buff, uchar *father_buff,
766 			    uchar *father_key_pos, my_off_t father_page)
767 {
768   my_bool right;
769   uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
770        right_length,left_length,new_right_length,new_left_length,extra_length,
771        length,keys;
772   uchar *pos,*buff,*extra_buff;
773   my_off_t next_page,new_pos;
774   uchar tmp_part_key[MI_MAX_KEY_BUFF];
775   DBUG_ENTER("_mi_balance_page");
776 
777   k_length=keyinfo->keylength;
778   father_length=mi_getint(father_buff);
779   father_keylength=k_length+info->s->base.key_reflength;
780   nod_flag=mi_test_if_nod(curr_buff);
781   curr_keylength=k_length+nod_flag;
782   info->page_changed=1;
783 
784   if ((father_key_pos != father_buff+father_length &&
785        (info->state->records & 1)) ||
786       father_key_pos == father_buff+2+info->s->base.key_reflength)
787   {
788     right=1;
789     next_page= _mi_kpos(info->s->base.key_reflength,
790 			father_key_pos+father_keylength);
791     buff=info->buff;
792     DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
793   }
794   else
795   {
796     right=0;
797     father_key_pos-=father_keylength;
798     next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
799 					/* Fix that curr_buff is to left */
800     buff=curr_buff; curr_buff=info->buff;
801     DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
802   }					/* father_key_pos ptr to parting key */
803 
804   if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
805     goto err;
806   DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
807 
808 	/* Test if there is room to share keys */
809 
810   left_length=mi_getint(curr_buff);
811   right_length=mi_getint(buff);
812   keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
813 
814   if ((right ? right_length : left_length) + curr_keylength <=
815       keyinfo->block_length)
816   {						/* Merge buffs */
817     new_left_length=2+nod_flag+(keys/2)*curr_keylength;
818     new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
819     mi_putint(curr_buff,new_left_length,nod_flag);
820     mi_putint(buff,new_right_length,nod_flag);
821 
822     if (left_length < new_left_length)
823     {						/* Move keys buff -> leaf */
824       pos=curr_buff+left_length;
825       memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
826       memcpy((uchar*) pos+k_length, (uchar*) buff+2,
827 	     (size_t) (length=new_left_length - left_length - k_length));
828       pos=buff+2+length;
829       memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
830       memmove((uchar*) buff + 2,
831               (uchar*) pos + k_length, new_right_length - 2);
832     }
833     else
834     {						/* Move keys -> buff */
835 
836       memmove(buff + new_right_length - right_length + 2,
837               buff + 2, right_length - 2);
838       length=new_right_length-right_length-k_length;
839       memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
840       pos=curr_buff+new_left_length;
841       memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
842       memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
843     }
844 
845     if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
846 	_mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
847       goto err;
848     DBUG_RETURN(0);
849   }
850 
851 	/* curr_buff[] and buff[] are full, lets split and make new nod */
852 
853   extra_buff=info->buff+info->s->base.max_key_block_length;
854   new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
855   if (keys == 5)				/* Too few keys to balance */
856     new_left_length-=curr_keylength;
857   extra_length=nod_flag+left_length+right_length-
858     new_left_length-new_right_length-curr_keylength;
859   DBUG_PRINT("info",("left_length: %d  right_length: %d  new_left_length: %d  new_right_length: %d  extra_length: %d",
860 		     left_length, right_length,
861 		     new_left_length, new_right_length,
862 		     extra_length));
863   mi_putint(curr_buff,new_left_length,nod_flag);
864   mi_putint(buff,new_right_length,nod_flag);
865   mi_putint(extra_buff,extra_length+2,nod_flag);
866 
867   /* move first largest keys to new page  */
868   pos=buff+right_length-extra_length;
869   memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
870   /* Save new parting key */
871   memcpy(tmp_part_key, pos-k_length,k_length);
872   /* Make place for new keys */
873   memmove(buff + new_right_length - right_length + extra_length + k_length + 2,
874           pos - right_length + extra_length + 2,
875           right_length - extra_length - k_length - 2);
876   /* Copy keys from left page */
877   pos= curr_buff+new_left_length;
878   memcpy((uchar*) buff+2,(uchar*) pos+k_length,
879 	 (size_t) (length=left_length-new_left_length-k_length));
880   /* Copy old parting key */
881   memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
882 
883   /* Move new parting keys up to caller */
884   memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
885   memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
886 
887   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
888     goto err;
889   _mi_kpointer(info,key+k_length,new_pos);
890   if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
891 			DFLT_INIT_HITS,info->buff) ||
892       _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
893                         DFLT_INIT_HITS,extra_buff))
894     goto err;
895 
896   DBUG_RETURN(1);				/* Middle key up */
897 
898 err:
899   DBUG_RETURN(-1);
900 } /* _mi_balance_page */
901 
902 /**********************************************************************
903  *                Bulk insert code                                    *
904  **********************************************************************/
905 
906 typedef struct {
907   MI_INFO *info;
908   uint keynr;
909 } bulk_insert_param;
910 
_mi_ck_write_tree(MI_INFO * info,uint keynr,uchar * key,uint key_length)911 int _mi_ck_write_tree(MI_INFO *info, uint keynr, uchar *key,
912 		      uint key_length)
913 {
914   int error;
915   DBUG_ENTER("_mi_ck_write_tree");
916 
917   error= tree_insert(&info->bulk_insert[keynr], key,
918          key_length + info->s->rec_reflength,
919          info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
920 
921   DBUG_RETURN(error);
922 } /* _mi_ck_write_tree */
923 
924 
925 /* typeof(_mi_keys_compare)=qsort_cmp2 */
926 
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)927 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
928 {
929   uint not_used[2];
930   return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
931                     key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
932                     not_used);
933 }
934 
935 
keys_free(void * vkey,TREE_FREE mode,const void * vparam)936 static void keys_free(void* vkey, TREE_FREE mode, const void *vparam)
937 {
938   /*
939     Probably I can use info->lastkey here, but I'm not sure,
940     and to be safe I'd better use local lastkey.
941   */
942   uchar lastkey[MI_MAX_KEY_BUFF];
943   uint keylen;
944   MI_KEYDEF *keyinfo;
945   uchar *key= (uchar*)(vkey);
946   bulk_insert_param *param= (bulk_insert_param*)(vparam);
947 
948   switch (mode) {
949   case free_init:
950     if (param->info->s->concurrent_insert)
951     {
952       mysql_rwlock_wrlock(&param->info->s->key_root_lock[param->keynr]);
953       param->info->s->keyinfo[param->keynr].version++;
954     }
955     return;
956   case free_free:
957     keyinfo=param->info->s->keyinfo+param->keynr;
958     keylen=_mi_keylength(keyinfo, key);
959     memcpy(lastkey, key, keylen);
960     _mi_ck_write_btree(param->info,param->keynr,lastkey,
961                        keylen - param->info->s->rec_reflength);
962     return;
963   case free_end:
964     if (param->info->s->concurrent_insert)
965       mysql_rwlock_unlock(&param->info->s->key_root_lock[param->keynr]);
966     return;
967   }
968   return;
969 }
970 
971 
mi_init_bulk_insert(MI_INFO * info,ulong cache_size,ha_rows rows)972 int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows)
973 {
974   MYISAM_SHARE *share=info->s;
975   MI_KEYDEF *key=share->keyinfo;
976   bulk_insert_param *params;
977   uint i, num_keys, total_keylength;
978   ulonglong key_map;
979   DBUG_ENTER("_mi_init_bulk_insert");
980   DBUG_PRINT("enter",("cache_size: %lu", cache_size));
981 
982   assert(!info->bulk_insert &&
983          (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
984 
985   mi_clear_all_keys_active(key_map);
986   for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
987   {
988     if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
989         mi_is_key_active(share->state.key_map, i))
990     {
991       num_keys++;
992       mi_set_key_active(key_map, i);
993       total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
994     }
995   }
996 
997   if (num_keys==0 ||
998       num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
999     DBUG_RETURN(0);
1000 
1001   if (rows && rows*total_keylength < cache_size)
1002     cache_size= (ulong)rows;
1003   else
1004     cache_size/=total_keylength*16;
1005 
1006   info->bulk_insert=(TREE *)
1007     my_malloc(mi_key_memory_MI_INFO_bulk_insert,
1008               (sizeof(TREE)*share->base.keys+
1009                sizeof(bulk_insert_param)*num_keys),MYF(0));
1010 
1011   if (!info->bulk_insert)
1012     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1013 
1014   params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1015   for (i=0 ; i < share->base.keys ; i++)
1016   {
1017     if (mi_is_key_active(key_map, i))
1018     {
1019       params->info=info;
1020       params->keynr=i;
1021       /* Only allocate a 16'th of the buffer at a time */
1022       init_tree(&info->bulk_insert[i],
1023                 cache_size * key[i].maxlength,
1024                 cache_size * key[i].maxlength, 0,
1025 		(qsort_cmp2)keys_compare, 0,
1026 		keys_free, (void *)params++);
1027     }
1028     else
1029      info->bulk_insert[i].root=0;
1030   }
1031 
1032   DBUG_RETURN(0);
1033 }
1034 
mi_flush_bulk_insert(MI_INFO * info,uint inx)1035 void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1036 {
1037   if (info->bulk_insert)
1038   {
1039     if (is_tree_inited(&info->bulk_insert[inx]))
1040       reset_tree(&info->bulk_insert[inx]);
1041   }
1042 }
1043 
mi_end_bulk_insert(MI_INFO * info)1044 void mi_end_bulk_insert(MI_INFO *info)
1045 {
1046   if (info->bulk_insert)
1047   {
1048     uint i;
1049     for (i=0 ; i < info->s->base.keys ; i++)
1050     {
1051       if (is_tree_inited(& info->bulk_insert[i]))
1052       {
1053         delete_tree(& info->bulk_insert[i]);
1054       }
1055     }
1056     my_free(info->bulk_insert);
1057     info->bulk_insert=0;
1058   }
1059 }
1060