1 /* Copyright (c) 2000, 2013, Oracle and/or its affiliates
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
15 
16 /* Write a row to a MyISAM table */
17 
18 #include "fulltext.h"
19 #include "rt_index.h"
20 
21 #define MAX_POINTER_LENGTH 8
22 
23 	/* Functions declared in this file */
24 
25 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
26 		    uint comp_flag, uchar *key,
27 		    uint key_length, my_off_t pos, uchar *father_buff,
28 		    uchar *father_keypos, my_off_t father_page,
29 		    my_bool insert_last);
30 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
31 			    uchar *curr_buff,uchar *father_buff,
32 			    uchar *father_keypos,my_off_t father_page);
33 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
34 				uchar *key, uint *return_key_length,
35 				uchar **after_key);
36 int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
37 		      uint key_length);
38 int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
39 		       uint key_length);
40 
41 	/* Write new record to database */
42 
mi_write(MI_INFO * info,uchar * record)43 int mi_write(MI_INFO *info, uchar *record)
44 {
45   MYISAM_SHARE *share=info->s;
46   uint i;
47   int save_errno;
48   my_off_t filepos;
49   uchar *buff;
50   my_bool lock_tree= share->concurrent_insert;
51   DBUG_ENTER("mi_write");
52   DBUG_PRINT("enter",("isam: %d  data: %d",info->s->kfile,info->dfile));
53 
54   DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
55                   mi_print_error(info->s, HA_ERR_CRASHED);
56                   DBUG_RETURN(my_errno= HA_ERR_CRASHED););
57 
58   /* it's always a bug to try to write a record with the deleted flag set */
59   DBUG_ASSERT(info->s->data_file_type != STATIC_RECORD || *record);
60 
61   if (share->options & HA_OPTION_READ_ONLY_DATA)
62   {
63     DBUG_RETURN(my_errno=EACCES);
64   }
65   if (_mi_readinfo(info,F_WRLCK,1))
66     DBUG_RETURN(my_errno);
67 
68   filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
69              !info->append_insert_at_end) ?
70 	    share->state.dellink :
71 	    info->state->data_file_length);
72 
73   if (share->base.reloc == (ha_rows) 1 &&
74       share->base.records == (ha_rows) 1 &&
75       info->state->records == (ha_rows) 1)
76   {						/* System file */
77     my_errno=HA_ERR_RECORD_FILE_FULL;
78     goto err2;
79   }
80   if (info->state->key_file_length >= share->base.margin_key_file_length)
81   {
82     my_errno=HA_ERR_INDEX_FILE_FULL;
83     goto err2;
84   }
85   if (_mi_mark_file_changed(info))
86     goto err2;
87 
88   /* Calculate and check all unique constraints */
89   for (i=0 ; i < share->state.header.uniques ; i++)
90   {
91     MI_UNIQUEDEF *def= share->uniqueinfo + i;
92     if (mi_is_key_active(share->state.key_map, def->key) &&
93         mi_check_unique(info, def, record, mi_unique_hash(def, record),
94                         HA_OFFSET_ERROR))
95       goto err2;
96   }
97 
98 	/* Write all keys to indextree */
99 
100   buff=info->lastkey2;
101   for (i=0 ; i < share->base.keys ; i++)
102   {
103     if (mi_is_key_active(share->state.key_map, i))
104     {
105       my_bool local_lock_tree= (lock_tree &&
106                                 !(info->bulk_insert &&
107                                   is_tree_inited(&info->bulk_insert[i])));
108       if (local_lock_tree)
109       {
110         mysql_rwlock_wrlock(&share->key_root_lock[i]);
111 	share->keyinfo[i].version++;
112       }
113       if (share->keyinfo[i].flag & HA_FULLTEXT )
114       {
115         if (_mi_ft_add(info,i, buff, record, filepos))
116         {
117 	  if (local_lock_tree)
118             mysql_rwlock_unlock(&share->key_root_lock[i]);
119           DBUG_PRINT("error",("Got error: %d on write",my_errno));
120           goto err;
121         }
122       }
123       else
124       {
125         if (share->keyinfo[i].ck_insert(info,i,buff,
126 			_mi_make_key(info,i,buff,record,filepos)))
127         {
128           if (local_lock_tree)
129             mysql_rwlock_unlock(&share->key_root_lock[i]);
130           DBUG_PRINT("error",("Got error: %d on write",my_errno));
131           goto err;
132         }
133       }
134 
135       /* The above changed info->lastkey2. Inform mi_rnext_same(). */
136       info->update&= ~HA_STATE_RNEXT_SAME;
137 
138       if (local_lock_tree)
139         mysql_rwlock_unlock(&share->key_root_lock[i]);
140     }
141   }
142   if (share->calc_checksum)
143     info->checksum=(*share->calc_checksum)(info,record);
144   if (!(info->opt_flag & OPT_NO_ROWS))
145   {
146     if ((*share->write_record)(info,record))
147       goto err;
148     info->state->checksum+=info->checksum;
149   }
150   if (share->base.auto_key)
151     set_if_bigger(info->s->state.auto_increment,
152                   retrieve_auto_increment(info, record));
153   info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
154 		 HA_STATE_ROW_CHANGED);
155   info->state->records++;
156   myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
157   (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
158   if (info->invalidator != 0)
159   {
160     DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
161     (*info->invalidator)(info->filename);
162     info->invalidator=0;
163   }
164 
165   /*
166     Update status of the table. We need to do so after each row write
167     for the log tables, as we want the new row to become visible to
168     other threads as soon as possible. We don't lock mutex here
169     (as it is required by pthread memory visibility rules) as (1) it's
170     not critical to use outdated share->is_log_table value (2) locking
171     mutex here for every write is too expensive.
172   */
173   if (share->is_log_table)
174     mi_update_status((void*) info);
175 
176   DBUG_RETURN(0);
177 
178 err:
179   save_errno=my_errno;
180   if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
181       my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
182   {
183     if (info->bulk_insert)
184     {
185       uint j;
186       for (j=0 ; j < share->base.keys ; j++)
187         mi_flush_bulk_insert(info, j);
188     }
189     info->errkey= (int) i;
190     while ( i-- > 0)
191     {
192       if (mi_is_key_active(share->state.key_map, i))
193       {
194 	my_bool local_lock_tree= (lock_tree &&
195                                   !(info->bulk_insert &&
196                                     is_tree_inited(&info->bulk_insert[i])));
197 	if (local_lock_tree)
198           mysql_rwlock_wrlock(&share->key_root_lock[i]);
199 	if (share->keyinfo[i].flag & HA_FULLTEXT)
200         {
201           if (_mi_ft_del(info,i, buff,record,filepos))
202 	  {
203 	    if (local_lock_tree)
204               mysql_rwlock_unlock(&share->key_root_lock[i]);
205             break;
206 	  }
207         }
208         else
209 	{
210 	  uint key_length=_mi_make_key(info,i,buff,record,filepos);
211 	  if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
212 	  {
213 	    if (local_lock_tree)
214               mysql_rwlock_unlock(&share->key_root_lock[i]);
215 	    break;
216 	  }
217 	}
218 	if (local_lock_tree)
219           mysql_rwlock_unlock(&share->key_root_lock[i]);
220       }
221     }
222   }
223   else
224   {
225     mi_print_error(info->s, HA_ERR_CRASHED);
226     mi_mark_crashed(info);
227   }
228   info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
229   my_errno=save_errno;
230 err2:
231   save_errno=my_errno;
232   myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
233   (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
234   DBUG_RETURN(my_errno=save_errno);
235 } /* mi_write */
236 
237 
238 	/* Write one key to btree */
239 
_mi_ck_write(MI_INFO * info,uint keynr,uchar * key,uint key_length)240 int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
241 {
242   DBUG_ENTER("_mi_ck_write");
243 
244   if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
245   {
246     DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
247   }
248   else
249   {
250     DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
251   }
252 } /* _mi_ck_write */
253 
254 
255 /**********************************************************************
256  *                Normal insert code                                  *
257  **********************************************************************/
258 
_mi_ck_write_btree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)259 int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
260 		       uint key_length)
261 {
262   int error;
263   uint comp_flag;
264   MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
265   my_off_t  *root=&info->s->state.key_root[keynr];
266   DBUG_ENTER("_mi_ck_write_btree");
267 
268   if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
269     comp_flag=SEARCH_BIGGER;			/* Put after same key */
270   else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
271   {
272     comp_flag=SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT; /* No duplicates */
273     if (keyinfo->flag & HA_NULL_ARE_EQUAL)
274       comp_flag|= SEARCH_NULL_ARE_EQUAL;
275   }
276   else
277     comp_flag=SEARCH_SAME;			/* Keys in rec-pos order */
278 
279   error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
280                                 root, comp_flag);
281   if (info->ft1_to_ft2)
282   {
283     if (!error)
284       error= _mi_ft_convert_to_ft2(info, keynr, key);
285     delete_dynamic(info->ft1_to_ft2);
286     my_free(info->ft1_to_ft2);
287     info->ft1_to_ft2=0;
288   }
289   DBUG_RETURN(error);
290 } /* _mi_ck_write_btree */
291 
_mi_ck_real_write_btree(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uint key_length,my_off_t * root,uint comp_flag)292 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
293     uchar *key, uint key_length, my_off_t *root, uint comp_flag)
294 {
295   int error;
296   DBUG_ENTER("_mi_ck_real_write_btree");
297   /* key_length parameter is used only if comp_flag is SEARCH_FIND */
298   if (*root == HA_OFFSET_ERROR ||
299       (error=w_search(info, keyinfo, comp_flag, key, key_length,
300 		      *root, (uchar *) 0, (uchar*) 0,
301 		      (my_off_t) 0, 1)) > 0)
302     error=_mi_enlarge_root(info,keyinfo,key,root);
303   DBUG_RETURN(error);
304 } /* _mi_ck_real_write_btree */
305 
306 
307 	/* Make a new root with key as only pointer */
308 
_mi_enlarge_root(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,my_off_t * root)309 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
310                      my_off_t *root)
311 {
312   uint t_length,nod_flag;
313   MI_KEY_PARAM s_temp;
314   MYISAM_SHARE *share=info->s;
315   DBUG_ENTER("_mi_enlarge_root");
316 
317   nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
318   _mi_kpointer(info,info->buff+2,*root); /* if nod */
319   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
320 				(uchar*) 0, (uchar*) 0, key,&s_temp);
321   mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
322   (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
323   info->buff_used=info->page_changed=1;		/* info->buff is used */
324   if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
325       _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
326     DBUG_RETURN(-1);
327   DBUG_RETURN(0);
328 } /* _mi_enlarge_root */
329 
330 
331 	/*
332 	  Search after a position for a key and store it there
333 	  Returns -1 = error
334 		   0  = ok
335 		   1  = key should be stored in higher tree
336 	*/
337 
w_search(register MI_INFO * info,register MI_KEYDEF * keyinfo,uint comp_flag,uchar * key,uint key_length,my_off_t page,uchar * father_buff,uchar * father_keypos,my_off_t father_page,my_bool insert_last)338 static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
339 		    uint comp_flag, uchar *key, uint key_length, my_off_t page,
340 		    uchar *father_buff, uchar *father_keypos,
341 		    my_off_t father_page, my_bool insert_last)
342 {
343   int error,flag;
344   uint nod_flag, search_key_length;
345   uchar *temp_buff,*keypos;
346   uchar keybuff[HA_MAX_KEY_BUFF];
347   my_bool was_last_key;
348   my_off_t next_page, dupp_key_pos;
349   DBUG_ENTER("w_search");
350   DBUG_PRINT("enter",("page: %ld", (long) page));
351 
352   search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
353   if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
354 				      HA_MAX_KEY_BUFF*2)))
355     DBUG_RETURN(-1);
356   if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
357     goto err;
358 
359   flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
360 			      comp_flag, &keypos, keybuff, &was_last_key);
361   nod_flag=mi_test_if_nod(temp_buff);
362   if (flag == 0)
363   {
364     uint tmp_key_length;
365 	/* get position to record with duplicated key */
366     tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
367     if (tmp_key_length)
368       dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
369     else
370       dupp_key_pos= HA_OFFSET_ERROR;
371 
372     if (keyinfo->flag & HA_FULLTEXT)
373     {
374       uint off;
375       int  subkeys;
376 
377       get_key_full_length_rdonly(off, keybuff);
378       subkeys=ft_sintXkorr(keybuff+off);
379       comp_flag=SEARCH_SAME;
380       if (subkeys >= 0)
381       {
382         /* normal word, one-level tree structure */
383         flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
384                                     USE_WHOLE_KEY, comp_flag,
385                                     &keypos, keybuff, &was_last_key);
386       }
387       else
388       {
389         /* popular word. two-level tree. going down */
390         my_off_t root=dupp_key_pos;
391         keyinfo=&info->s->ft2_keyinfo;
392         get_key_full_length_rdonly(off, key);
393         key+=off;
394         keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
395         error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
396                                       &root, comp_flag);
397         _mi_dpointer(info, keypos+HA_FT_WLEN, root);
398         subkeys--; /* should there be underflow protection ? */
399         DBUG_ASSERT(subkeys < 0);
400         ft_intXstore(keypos, subkeys);
401         if (!error)
402           error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
403         my_afree((uchar*) temp_buff);
404         DBUG_RETURN(error);
405       }
406     }
407     else /* not HA_FULLTEXT, normal HA_NOSAME key */
408     {
409       info->dupp_key_pos= dupp_key_pos;
410       my_afree((uchar*) temp_buff);
411       my_errno=HA_ERR_FOUND_DUPP_KEY;
412       DBUG_RETURN(-1);
413     }
414   }
415   if (flag == MI_FOUND_WRONG_KEY)
416     DBUG_RETURN(-1);
417   if (!was_last_key)
418     insert_last=0;
419   next_page=_mi_kpos(nod_flag,keypos);
420   if (next_page == HA_OFFSET_ERROR ||
421       (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
422 		      temp_buff, keypos, page, insert_last)) >0)
423   {
424     error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
425 		     father_keypos,father_page, insert_last);
426     if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
427       goto err;
428   }
429   my_afree((uchar*) temp_buff);
430   DBUG_RETURN(error);
431 err:
432   my_afree((uchar*) temp_buff);
433   DBUG_PRINT("exit",("Error: %d",my_errno));
434   DBUG_RETURN (-1);
435 } /* w_search */
436 
437 
438 /*
439   Insert new key.
440 
441   SYNOPSIS
442     _mi_insert()
443     info                        Open table information.
444     keyinfo                     Key definition information.
445     key                         New key.
446     anc_buff                    Key page (beginning).
447     key_pos                     Position in key page where to insert.
448     key_buff                    Copy of previous key.
449     father_buff                 parent key page for balancing.
450     father_key_pos              position in parent key page for balancing.
451     father_page                 position of parent key page in file.
452     insert_last                 If to append at end of page.
453 
454   DESCRIPTION
455     Insert new key at right of key_pos.
456 
457   RETURN
458     2           if key contains key to upper level.
459     0           OK.
460     < 0         Error.
461 */
462 
_mi_insert(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * anc_buff,uchar * key_pos,uchar * key_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page,my_bool insert_last)463 int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
464 	       uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
465                uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
466 	       my_bool insert_last)
467 {
468   uint a_length,nod_flag;
469   int t_length;
470   uchar *endpos, *prev_key;
471   MI_KEY_PARAM s_temp;
472   DBUG_ENTER("_mi_insert");
473   DBUG_PRINT("enter",("key_pos: %p", key_pos));
474   DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
475 
476   nod_flag=mi_test_if_nod(anc_buff);
477   a_length=mi_getint(anc_buff);
478   endpos= anc_buff+ a_length;
479   prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
480   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
481 				(key_pos == endpos ? (uchar*) 0 : key_pos),
482 				prev_key, prev_key,
483 				key,&s_temp);
484 #ifndef DBUG_OFF
485   if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
486 					 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
487   {
488     DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
489   }
490   if (keyinfo->flag & HA_PACK_KEY)
491   {
492     DBUG_PRINT("test",("t_length: %d  ref_len: %d",
493 		       t_length,s_temp.ref_length));
494     DBUG_PRINT("test",("n_ref_len: %d  n_length: %d  key_pos: %p",
495 		       s_temp.n_ref_length,s_temp.n_length, s_temp.key));
496   }
497 #endif
498   if (t_length > 0)
499   {
500     if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
501     {
502       mi_print_error(info->s, HA_ERR_CRASHED);
503       my_errno=HA_ERR_CRASHED;
504       DBUG_RETURN(-1);
505     }
506     bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
507   }
508   else
509   {
510     if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
511     {
512       mi_print_error(info->s, HA_ERR_CRASHED);
513       my_errno=HA_ERR_CRASHED;
514       DBUG_RETURN(-1);
515     }
516     bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
517   }
518   (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
519   a_length+=t_length;
520   mi_putint(anc_buff,a_length,nod_flag);
521   if (a_length <= keyinfo->block_length)
522   {
523     if (keyinfo->block_length - a_length < 32 &&
524         keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
525         info->s->base.key_reflength <= info->s->rec_reflength &&
526         info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
527     {
528       /*
529         Normal word. One-level tree. Page is almost full.
530         Let's consider converting.
531         We'll compare 'key' and the first key at anc_buff
532        */
533       uchar *a=key, *b=anc_buff+2+nod_flag;
534       uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
535       /* the very first key on the page is always unpacked */
536       DBUG_ASSERT((*b & 128) == 0);
537 #if HA_FT_MAXLEN >= 127
538       blen= mi_uint2korr(b); b+=2;
539 #else
540       blen= *b++;
541 #endif
542       get_key_length(alen,a);
543       DBUG_ASSERT(info->ft1_to_ft2==0);
544       if (alen == blen &&
545           ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0)==0)
546       {
547         /* yup. converting */
548         info->ft1_to_ft2=(DYNAMIC_ARRAY *)
549           my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
550         my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50, MYF(0));
551 
552         /*
553           now, adding all keys from the page to dynarray
554           if the page is a leaf (if not keys will be deleted later)
555         */
556         if (!nod_flag)
557         {
558           /* let's leave the first key on the page, though, because
559              we cannot easily dispatch an empty page here */
560           b+=blen+ft2len+2;
561           for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
562           {
563             if (insert_dynamic(info->ft1_to_ft2, b))
564             {
565               mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
566               my_errno= HA_ERR_OUT_OF_MEM;
567               DBUG_RETURN(-1);
568             }
569           }
570 
571           /* fixing the page's length - it contains only one key now */
572           mi_putint(anc_buff,2+blen+ft2len+2,0);
573         }
574         /* the rest will be done when we're back from recursion */
575       }
576     }
577     DBUG_RETURN(0);				/* There is room on page */
578   }
579   /* Page is full */
580   if (nod_flag)
581     insert_last=0;
582   if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
583       father_buff && !insert_last)
584     DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
585 				 father_key_pos,father_page));
586   DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
587 } /* _mi_insert */
588 
589 
590 	/* split a full page in two and assign emerging item to key */
591 
_mi_split_page(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * buff,uchar * key_buff,my_bool insert_last_key)592 int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
593 		   uchar *key, uchar *buff, uchar *key_buff,
594 		   my_bool insert_last_key)
595 {
596   uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
597   uchar *key_pos,*pos, *UNINIT_VAR(after_key);
598   my_off_t new_pos;
599   MI_KEY_PARAM s_temp;
600   DBUG_ENTER("mi_split_page");
601   DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
602 
603   if (info->s->keyinfo+info->lastinx == keyinfo)
604     info->page_changed=1;			/* Info->buff is used */
605   info->buff_used=1;
606   nod_flag=mi_test_if_nod(buff);
607   key_ref_length=2+nod_flag;
608   if (insert_last_key)
609     key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
610   else
611     key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
612 			      &after_key);
613   if (!key_pos)
614     DBUG_RETURN(-1);
615 
616   length=(uint) (key_pos-buff);
617   a_length=mi_getint(buff);
618   mi_putint(buff,length,nod_flag);
619 
620   key_pos=after_key;
621   if (nod_flag)
622   {
623     DBUG_PRINT("test",("Splitting nod"));
624     pos=key_pos-nod_flag;
625     memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
626   }
627 
628 	/* Move middle item to key and pointer to new page */
629   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
630     DBUG_RETURN(-1);
631   _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
632 
633 	/* Store new page */
634   if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
635     DBUG_RETURN(-1);
636 
637   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
638 				(uchar*) 0, (uchar*) 0,
639 				key_buff, &s_temp);
640   length=(uint) ((buff+a_length)-key_pos);
641   memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
642 	 (size_t) length);
643   (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
644   mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
645 
646   if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
647     DBUG_RETURN(-1);
648   DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
649   DBUG_RETURN(2);				/* Middle key up */
650 } /* _mi_split_page */
651 
652 
653 	/*
654 	  Calculate how to much to move to split a page in two
655 	  Returns pointer to start of key.
656 	  key will contain the key.
657 	  return_key_length will contain the length of key
658 	  after_key will contain the position to where the next key starts
659 	*/
660 
_mi_find_half_pos(uint nod_flag,MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)661 uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
662 			 uchar *key, uint *return_key_length,
663 			 uchar **after_key)
664 {
665   uint keys,length,key_ref_length;
666   uchar *end,*lastpos;
667   DBUG_ENTER("_mi_find_half_pos");
668 
669   key_ref_length=2+nod_flag;
670   length=mi_getint(page)-key_ref_length;
671   page+=key_ref_length;
672   if (!(keyinfo->flag &
673 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
674 	 HA_BINARY_PACK_KEY)))
675   {
676     key_ref_length=keyinfo->keylength+nod_flag;
677     keys=length/(key_ref_length*2);
678     *return_key_length=keyinfo->keylength;
679     end=page+keys*key_ref_length;
680     *after_key=end+key_ref_length;
681     memcpy(key,end,key_ref_length);
682     DBUG_RETURN(end);
683   }
684 
685   end=page+length/2-key_ref_length;		/* This is aprox. half */
686   *key='\0';
687   do
688   {
689     lastpos=page;
690     if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
691       DBUG_RETURN(0);
692   } while (page < end);
693   *return_key_length=length;
694   *after_key=page;
695   DBUG_PRINT("exit",("returns: %p  page: %p  half: %p",
696                      lastpos, page, end));
697   DBUG_RETURN(lastpos);
698 } /* _mi_find_half_pos */
699 
700 
701 /*
702   Split buffer at last key
703   Returns pointer to the start of the key before the last key
704   key will contain the last key
705 */
706 
_mi_find_last_pos(MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)707 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
708 				uchar *key, uint *return_key_length,
709 				uchar **after_key)
710 {
711   uint keys, length, UNINIT_VAR(last_length), key_ref_length;
712   uchar *end,*lastpos,*prevpos;
713   uchar key_buff[HA_MAX_KEY_BUFF];
714   DBUG_ENTER("_mi_find_last_pos");
715 
716   key_ref_length=2;
717   length=mi_getint(page)-key_ref_length;
718   page+=key_ref_length;
719   if (!(keyinfo->flag &
720 	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
721 	 HA_BINARY_PACK_KEY)))
722   {
723     keys=length/keyinfo->keylength-2;
724     *return_key_length=length=keyinfo->keylength;
725     end=page+keys*length;
726     *after_key=end+length;
727     memcpy(key,end,length);
728     DBUG_RETURN(end);
729   }
730 
731   end=page+length-key_ref_length;
732   DBUG_ASSERT(page < end);
733   *key='\0';
734   length=0;
735   lastpos=page;
736 
737   do
738   {
739     prevpos=lastpos; lastpos=page;
740     last_length=length;
741     memcpy(key, key_buff, length);		/* previous key */
742     if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
743     {
744       mi_print_error(keyinfo->share, HA_ERR_CRASHED);
745       my_errno=HA_ERR_CRASHED;
746       DBUG_RETURN(0);
747     }
748   } while (page < end);
749 
750   *return_key_length=last_length;
751   *after_key=lastpos;
752   DBUG_PRINT("exit",("returns: %p  page: %p  end: %p",
753                      prevpos, page, end));
754   DBUG_RETURN(prevpos);
755 } /* _mi_find_last_pos */
756 
757 
758 	/* Balance page with not packed keys with page on right/left */
759 	/* returns 0 if balance was done */
760 
_mi_balance_page(register MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * curr_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page)761 static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
762 			    uchar *key, uchar *curr_buff, uchar *father_buff,
763 			    uchar *father_key_pos, my_off_t father_page)
764 {
765   my_bool right;
766   uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
767        right_length,left_length,new_right_length,new_left_length,extra_length,
768        length,keys;
769   uchar *pos,*buff,*extra_buff;
770   my_off_t next_page,new_pos;
771   uchar tmp_part_key[HA_MAX_KEY_BUFF];
772   DBUG_ENTER("_mi_balance_page");
773 
774   k_length=keyinfo->keylength;
775   father_length=mi_getint(father_buff);
776   father_keylength=k_length+info->s->base.key_reflength;
777   nod_flag=mi_test_if_nod(curr_buff);
778   curr_keylength=k_length+nod_flag;
779   info->page_changed=1;
780 
781   if ((father_key_pos != father_buff+father_length &&
782        (info->state->records & 1)) ||
783       father_key_pos == father_buff+2+info->s->base.key_reflength)
784   {
785     right=1;
786     next_page= _mi_kpos(info->s->base.key_reflength,
787 			father_key_pos+father_keylength);
788     buff=info->buff;
789     DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
790   }
791   else
792   {
793     right=0;
794     father_key_pos-=father_keylength;
795     next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
796 					/* Fix that curr_buff is to left */
797     buff=curr_buff; curr_buff=info->buff;
798     DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
799   }					/* father_key_pos ptr to parting key */
800 
801   if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
802     goto err;
803   DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
804 
805 	/* Test if there is room to share keys */
806 
807   left_length=mi_getint(curr_buff);
808   right_length=mi_getint(buff);
809   keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
810 
811   if ((right ? right_length : left_length) + curr_keylength <=
812       keyinfo->block_length)
813   {						/* Merge buffs */
814     new_left_length=2+nod_flag+(keys/2)*curr_keylength;
815     new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
816     mi_putint(curr_buff,new_left_length,nod_flag);
817     mi_putint(buff,new_right_length,nod_flag);
818 
819     if (left_length < new_left_length)
820     {						/* Move keys buff -> leaf */
821       pos=curr_buff+left_length;
822       memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
823       memcpy((uchar*) pos+k_length, (uchar*) buff+2,
824 	     (size_t) (length=new_left_length - left_length - k_length));
825       pos=buff+2+length;
826       memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
827       bmove((uchar*) buff + 2, (uchar*) pos + k_length, new_right_length - 2);
828     }
829     else
830     {						/* Move keys -> buff */
831 
832       bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
833 		right_length-2);
834       length=new_right_length-right_length-k_length;
835       memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
836       pos=curr_buff+new_left_length;
837       memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
838       memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
839     }
840 
841     if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
842 	_mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
843       goto err;
844     DBUG_RETURN(0);
845   }
846 
847 	/* curr_buff[] and buff[] are full, lets split and make new nod */
848 
849   extra_buff=info->buff+info->s->base.max_key_block_length;
850   new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
851   if (keys == 5)				/* Too few keys to balance */
852     new_left_length-=curr_keylength;
853   extra_length=nod_flag+left_length+right_length-
854     new_left_length-new_right_length-curr_keylength;
855   DBUG_PRINT("info",("left_length: %d  right_length: %d  new_left_length: %d  new_right_length: %d  extra_length: %d",
856 		     left_length, right_length,
857 		     new_left_length, new_right_length,
858 		     extra_length));
859   mi_putint(curr_buff,new_left_length,nod_flag);
860   mi_putint(buff,new_right_length,nod_flag);
861   mi_putint(extra_buff,extra_length+2,nod_flag);
862 
863   /* move first largest keys to new page  */
864   pos=buff+right_length-extra_length;
865   memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
866   /* Save new parting key */
867   memcpy(tmp_part_key, pos-k_length,k_length);
868   /* Make place for new keys */
869   bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
870 	    right_length-extra_length-k_length-2);
871   /* Copy keys from left page */
872   pos= curr_buff+new_left_length;
873   memcpy((uchar*) buff+2,(uchar*) pos+k_length,
874 	 (size_t) (length=left_length-new_left_length-k_length));
875   /* Copy old parting key */
876   memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
877 
878   /* Move new parting keys up to caller */
879   memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
880   memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
881 
882   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
883     goto err;
884   _mi_kpointer(info,key+k_length,new_pos);
885   if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
886 			DFLT_INIT_HITS,info->buff) ||
887       _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
888                         DFLT_INIT_HITS,extra_buff))
889     goto err;
890 
891   DBUG_RETURN(1);				/* Middle key up */
892 
893 err:
894   DBUG_RETURN(-1);
895 } /* _mi_balance_page */
896 
897 /**********************************************************************
898  *                Bulk insert code                                    *
899  **********************************************************************/
900 
901 typedef struct {
902   MI_INFO *info;
903   uint keynr;
904 } bulk_insert_param;
905 
_mi_ck_write_tree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)906 int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
907 		      uint key_length)
908 {
909   int error;
910   DBUG_ENTER("_mi_ck_write_tree");
911 
912   error= tree_insert(&info->bulk_insert[keynr], key,
913          key_length + info->s->rec_reflength,
914          info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
915 
916   DBUG_RETURN(error);
917 } /* _mi_ck_write_tree */
918 
919 
920 /* typeof(_mi_keys_compare)=qsort_cmp2 */
921 
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)922 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
923 {
924   uint not_used[2];
925   return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
926                     key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
927                     not_used);
928 }
929 
930 
keys_free(void * key_arg,TREE_FREE mode,void * param_arg)931 static int keys_free(void* key_arg, TREE_FREE mode, void *param_arg)
932 {
933   /*
934     Probably I can use info->lastkey here, but I'm not sure,
935     and to be safe I'd better use local lastkey.
936   */
937   bulk_insert_param *param= (bulk_insert_param*)param_arg;
938   uchar lastkey[HA_MAX_KEY_BUFF], *key= (uchar*)key_arg;
939   uint keylen;
940   MI_KEYDEF *keyinfo;
941 
942   switch (mode) {
943   case free_init:
944     if (param->info->s->concurrent_insert)
945     {
946       mysql_rwlock_wrlock(&param->info->s->key_root_lock[param->keynr]);
947       param->info->s->keyinfo[param->keynr].version++;
948     }
949     return 0;
950   case free_free:
951     keyinfo=param->info->s->keyinfo+param->keynr;
952     keylen=_mi_keylength(keyinfo, key);
953     memcpy(lastkey, key, keylen);
954     _mi_ck_write_btree(param->info, param->keynr, lastkey,
955 			                 keylen - param->info->s->rec_reflength);
956     return 0;
957   case free_end:
958     if (param->info->s->concurrent_insert)
959       mysql_rwlock_unlock(&param->info->s->key_root_lock[param->keynr]);
960     return 0;
961   }
962   return 0;
963 }
964 
965 
mi_init_bulk_insert(MI_INFO * info,size_t cache_size,ha_rows rows)966 int mi_init_bulk_insert(MI_INFO *info, size_t cache_size, ha_rows rows)
967 {
968   MYISAM_SHARE *share=info->s;
969   MI_KEYDEF *key=share->keyinfo;
970   bulk_insert_param *params;
971   uint i, num_keys, total_keylength;
972   ulonglong key_map;
973   DBUG_ENTER("_mi_init_bulk_insert");
974   DBUG_PRINT("enter",("cache_size: %lu", (ulong) cache_size));
975 
976   DBUG_ASSERT(!info->bulk_insert &&
977 	      (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
978 
979   mi_clear_all_keys_active(key_map);
980   for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
981   {
982     if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
983         mi_is_key_active(share->state.key_map, i))
984     {
985       num_keys++;
986       mi_set_key_active(key_map, i);
987       total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
988     }
989   }
990 
991   if (num_keys==0 ||
992       num_keys * (size_t) MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
993     DBUG_RETURN(0);
994 
995   if (rows && rows*total_keylength < cache_size)
996     cache_size= (size_t) rows;
997   else
998     cache_size/=total_keylength*16;
999 
1000   info->bulk_insert=(TREE *)
1001     my_malloc((sizeof(TREE)*share->base.keys+
1002                sizeof(bulk_insert_param)*num_keys),MYF(0));
1003 
1004   if (!info->bulk_insert)
1005     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1006 
1007   params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1008   for (i=0 ; i < share->base.keys ; i++)
1009   {
1010     if (mi_is_key_active(key_map, i))
1011     {
1012       params->info=info;
1013       params->keynr=i;
1014       /* Only allocate a 16'th of the buffer at a time */
1015       init_tree(&info->bulk_insert[i],
1016                 cache_size * key[i].maxlength,
1017                 cache_size * key[i].maxlength, 0,
1018                 (qsort_cmp2)keys_compare, keys_free, (void *)params++, MYF(0));
1019     }
1020     else
1021      info->bulk_insert[i].root=0;
1022   }
1023 
1024   DBUG_RETURN(0);
1025 }
1026 
mi_flush_bulk_insert(MI_INFO * info,uint inx)1027 void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1028 {
1029   if (info->bulk_insert)
1030   {
1031     if (is_tree_inited(&info->bulk_insert[inx]))
1032       reset_tree(&info->bulk_insert[inx]);
1033   }
1034 }
1035 
mi_end_bulk_insert(MI_INFO * info,my_bool abort)1036 int mi_end_bulk_insert(MI_INFO *info, my_bool abort)
1037 {
1038   int first_error= 0;
1039   if (info->bulk_insert)
1040   {
1041     uint i;
1042     for (i=0 ; i < info->s->base.keys ; i++)
1043     {
1044       if (is_tree_inited(& info->bulk_insert[i]))
1045       {
1046         int error;
1047         if ((error= delete_tree(& info->bulk_insert[i], abort)))
1048         {
1049           first_error= first_error ? first_error : error;
1050           abort= 1;
1051         }
1052       }
1053     }
1054     my_free(info->bulk_insert);
1055     info->bulk_insert=0;
1056   }
1057   return first_error;
1058 }
1059