1 /*
2 Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17 /* Write a row to a MyISAM table */
18
19 #include "fulltext.h"
20 #include "rt_index.h"
21
22 #define MAX_POINTER_LENGTH 8
23
24 /* Functions declared in this file */
25
26 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
27 uint comp_flag, uchar *key,
28 uint key_length, my_off_t pos, uchar *father_buff,
29 uchar *father_keypos, my_off_t father_page,
30 my_bool insert_last);
31 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
32 uchar *curr_buff,uchar *father_buff,
33 uchar *father_keypos,my_off_t father_page);
34 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
35 uchar *key, uint *return_key_length,
36 uchar **after_key);
37 int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
38 uint key_length);
39 int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
40 uint key_length);
41
42 /* Write new record to database */
43
mi_write(MI_INFO * info,uchar * record)44 int mi_write(MI_INFO *info, uchar *record)
45 {
46 MYISAM_SHARE *share=info->s;
47 uint i;
48 int save_errno;
49 my_off_t filepos;
50 uchar *buff;
51 my_bool lock_tree= share->concurrent_insert;
52 DBUG_ENTER("mi_write");
53 DBUG_PRINT("enter",("isam: %d data: %d",info->s->kfile,info->dfile));
54
55 DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
56 mi_print_error(info->s, HA_ERR_CRASHED);
57 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
58 if (share->options & HA_OPTION_READ_ONLY_DATA)
59 {
60 DBUG_RETURN(my_errno=EACCES);
61 }
62 if (_mi_readinfo(info,F_WRLCK,1))
63 DBUG_RETURN(my_errno);
64
65 filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
66 !info->append_insert_at_end) ?
67 share->state.dellink :
68 info->state->data_file_length);
69
70 if (share->base.reloc == (ha_rows) 1 &&
71 share->base.records == (ha_rows) 1 &&
72 info->state->records == (ha_rows) 1)
73 { /* System file */
74 my_errno=HA_ERR_RECORD_FILE_FULL;
75 goto err2;
76 }
77 if (info->state->key_file_length >= share->base.margin_key_file_length)
78 {
79 my_errno=HA_ERR_INDEX_FILE_FULL;
80 goto err2;
81 }
82 if (_mi_mark_file_changed(info))
83 goto err2;
84
85 /* Calculate and check all unique constraints */
86 if (mi_is_any_key_active(share->state.key_map))
87 {
88 for (i= 0 ; i < share->state.header.uniques ; i++)
89 {
90 if (mi_check_unique(info, share->uniqueinfo + i, record,
91 mi_unique_hash(share->uniqueinfo + i, record),
92 HA_OFFSET_ERROR))
93 goto err2;
94 }
95 }
96
97 /* Write all keys to indextree */
98
99 buff=info->lastkey2;
100 for (i=0 ; i < share->base.keys ; i++)
101 {
102 if (mi_is_key_active(share->state.key_map, i))
103 {
104 my_bool local_lock_tree= (lock_tree &&
105 !(info->bulk_insert &&
106 is_tree_inited(&info->bulk_insert[i])));
107 if (local_lock_tree)
108 {
109 mysql_rwlock_wrlock(&share->key_root_lock[i]);
110 share->keyinfo[i].version++;
111 }
112 if (share->keyinfo[i].flag & HA_FULLTEXT )
113 {
114 if (_mi_ft_add(info,i, buff, record, filepos))
115 {
116 if (local_lock_tree)
117 mysql_rwlock_unlock(&share->key_root_lock[i]);
118 DBUG_PRINT("error",("Got error: %d on write",my_errno));
119 goto err;
120 }
121 }
122 else
123 {
124 if (share->keyinfo[i].ck_insert(info,i,buff,
125 _mi_make_key(info,i,buff,record,filepos)))
126 {
127 if (local_lock_tree)
128 mysql_rwlock_unlock(&share->key_root_lock[i]);
129 DBUG_PRINT("error",("Got error: %d on write",my_errno));
130 goto err;
131 }
132 }
133
134 /* The above changed info->lastkey2. Inform mi_rnext_same(). */
135 info->update&= ~HA_STATE_RNEXT_SAME;
136
137 if (local_lock_tree)
138 mysql_rwlock_unlock(&share->key_root_lock[i]);
139 }
140 }
141 if (share->calc_checksum)
142 info->checksum=(*share->calc_checksum)(info,record);
143 if (!(info->opt_flag & OPT_NO_ROWS))
144 {
145 if ((*share->write_record)(info,record))
146 goto err;
147 info->state->checksum+=info->checksum;
148 }
149 if (share->base.auto_key)
150 set_if_bigger(info->s->state.auto_increment,
151 retrieve_auto_increment(info, record));
152 info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
153 HA_STATE_ROW_CHANGED);
154 info->state->records++;
155 info->lastpos=filepos;
156 myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
157 (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
158 if (info->invalidator != 0)
159 {
160 DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
161 (*info->invalidator)(info->filename);
162 info->invalidator=0;
163 }
164
165 /*
166 Update status of the table. We need to do so after each row write
167 for the log tables, as we want the new row to become visible to
168 other threads as soon as possible. We don't lock mutex here
169 (as it is required by pthread memory visibility rules) as (1) it's
170 not critical to use outdated share->is_log_table value (2) locking
171 mutex here for every write is too expensive.
172 */
173 if (share->is_log_table)
174 mi_update_status((void*) info);
175
176 DBUG_RETURN(0);
177
178 err:
179 save_errno=my_errno;
180 if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
181 my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
182 {
183 if (info->bulk_insert)
184 {
185 uint j;
186 for (j=0 ; j < share->base.keys ; j++)
187 mi_flush_bulk_insert(info, j);
188 }
189 info->errkey= (int) i;
190 while ( i-- > 0)
191 {
192 if (mi_is_key_active(share->state.key_map, i))
193 {
194 my_bool local_lock_tree= (lock_tree &&
195 !(info->bulk_insert &&
196 is_tree_inited(&info->bulk_insert[i])));
197 if (local_lock_tree)
198 mysql_rwlock_wrlock(&share->key_root_lock[i]);
199 if (share->keyinfo[i].flag & HA_FULLTEXT)
200 {
201 if (_mi_ft_del(info,i, buff,record,filepos))
202 {
203 if (local_lock_tree)
204 mysql_rwlock_unlock(&share->key_root_lock[i]);
205 break;
206 }
207 }
208 else
209 {
210 uint key_length=_mi_make_key(info,i,buff,record,filepos);
211 if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
212 {
213 if (local_lock_tree)
214 mysql_rwlock_unlock(&share->key_root_lock[i]);
215 break;
216 }
217 }
218 if (local_lock_tree)
219 mysql_rwlock_unlock(&share->key_root_lock[i]);
220 }
221 }
222 }
223 else
224 {
225 mi_print_error(info->s, HA_ERR_CRASHED);
226 mi_mark_crashed(info);
227 }
228 info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
229 my_errno=save_errno;
230 err2:
231 save_errno=my_errno;
232 myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
233 (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
234 DBUG_RETURN(my_errno=save_errno);
235 } /* mi_write */
236
237
238 /* Write one key to btree */
239
_mi_ck_write(MI_INFO * info,uint keynr,uchar * key,uint key_length)240 int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
241 {
242 DBUG_ENTER("_mi_ck_write");
243
244 if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
245 {
246 DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
247 }
248 else
249 {
250 DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
251 }
252 } /* _mi_ck_write */
253
254
255 /**********************************************************************
256 * Normal insert code *
257 **********************************************************************/
258
_mi_ck_write_btree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)259 int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
260 uint key_length)
261 {
262 int error;
263 uint comp_flag;
264 MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
265 my_off_t *root=&info->s->state.key_root[keynr];
266 DBUG_ENTER("_mi_ck_write_btree");
267
268 if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
269 comp_flag=SEARCH_BIGGER; /* Put after same key */
270 else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
271 {
272 comp_flag=SEARCH_FIND | SEARCH_UPDATE; /* No duplicates */
273 if (keyinfo->flag & HA_NULL_ARE_EQUAL)
274 comp_flag|= SEARCH_NULL_ARE_EQUAL;
275 }
276 else
277 comp_flag=SEARCH_SAME; /* Keys in rec-pos order */
278
279 error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
280 root, comp_flag);
281 if (info->ft1_to_ft2)
282 {
283 if (!error)
284 error= _mi_ft_convert_to_ft2(info, keynr, key);
285 delete_dynamic(info->ft1_to_ft2);
286 my_free(info->ft1_to_ft2);
287 info->ft1_to_ft2=0;
288 }
289 DBUG_RETURN(error);
290 } /* _mi_ck_write_btree */
291
_mi_ck_real_write_btree(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uint key_length,my_off_t * root,uint comp_flag)292 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
293 uchar *key, uint key_length, my_off_t *root, uint comp_flag)
294 {
295 int error;
296 DBUG_ENTER("_mi_ck_real_write_btree");
297 /* key_length parameter is used only if comp_flag is SEARCH_FIND */
298 if (*root == HA_OFFSET_ERROR ||
299 (error=w_search(info, keyinfo, comp_flag, key, key_length,
300 *root, (uchar *) 0, (uchar*) 0,
301 (my_off_t) 0, 1)) > 0)
302 error=_mi_enlarge_root(info,keyinfo,key,root);
303 DBUG_RETURN(error);
304 } /* _mi_ck_real_write_btree */
305
306
307 /* Make a new root with key as only pointer */
308
_mi_enlarge_root(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,my_off_t * root)309 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
310 my_off_t *root)
311 {
312 uint t_length,nod_flag;
313 MI_KEY_PARAM s_temp;
314 MYISAM_SHARE *share=info->s;
315 DBUG_ENTER("_mi_enlarge_root");
316
317 nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
318 _mi_kpointer(info,info->buff+2,*root); /* if nod */
319 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
320 (uchar*) 0, (uchar*) 0, key,&s_temp);
321 mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
322 (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
323 info->buff_used=info->page_changed=1; /* info->buff is used */
324 if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
325 _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
326 DBUG_RETURN(-1);
327 DBUG_RETURN(0);
328 } /* _mi_enlarge_root */
329
330
331 /*
332 Search after a position for a key and store it there
333 Returns -1 = error
334 0 = ok
335 1 = key should be stored in higher tree
336 */
337
w_search(register MI_INFO * info,register MI_KEYDEF * keyinfo,uint comp_flag,uchar * key,uint key_length,my_off_t page,uchar * father_buff,uchar * father_keypos,my_off_t father_page,my_bool insert_last)338 static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
339 uint comp_flag, uchar *key, uint key_length, my_off_t page,
340 uchar *father_buff, uchar *father_keypos,
341 my_off_t father_page, my_bool insert_last)
342 {
343 int error,flag;
344 uint nod_flag, search_key_length;
345 uchar *temp_buff,*keypos;
346 uchar keybuff[MI_MAX_KEY_BUFF];
347 my_bool was_last_key;
348 my_off_t next_page, dupp_key_pos;
349 DBUG_ENTER("w_search");
350 DBUG_PRINT("enter",("page: %ld", (long) page));
351
352 search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
353 if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
354 MI_MAX_KEY_BUFF*2)))
355 DBUG_RETURN(-1);
356 if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
357 goto err;
358
359 flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
360 comp_flag, &keypos, keybuff, &was_last_key);
361 nod_flag=mi_test_if_nod(temp_buff);
362 if (flag == 0)
363 {
364 uint tmp_key_length;
365 /* get position to record with duplicated key */
366 tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
367 if (tmp_key_length)
368 dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
369 else
370 dupp_key_pos= HA_OFFSET_ERROR;
371
372 if (keyinfo->flag & HA_FULLTEXT)
373 {
374 uint off;
375 int subkeys;
376
377 get_key_full_length_rdonly(off, keybuff);
378 subkeys=ft_sintXkorr(keybuff+off);
379 comp_flag=SEARCH_SAME;
380 if (subkeys >= 0)
381 {
382 /* normal word, one-level tree structure */
383 flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
384 USE_WHOLE_KEY, comp_flag,
385 &keypos, keybuff, &was_last_key);
386 }
387 else
388 {
389 /* popular word. two-level tree. going down */
390 my_off_t root=dupp_key_pos;
391 keyinfo=&info->s->ft2_keyinfo;
392 get_key_full_length_rdonly(off, key);
393 key+=off;
394 keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
395 error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
396 &root, comp_flag);
397 _mi_dpointer(info, keypos+HA_FT_WLEN, root);
398 subkeys--; /* should there be underflow protection ? */
399 DBUG_ASSERT(subkeys < 0);
400 ft_intXstore(keypos, subkeys);
401 if (!error)
402 error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
403 my_afree((uchar*) temp_buff);
404 DBUG_RETURN(error);
405 }
406 }
407 else /* not HA_FULLTEXT, normal HA_NOSAME key */
408 {
409 info->dupp_key_pos= dupp_key_pos;
410 my_afree((uchar*) temp_buff);
411 my_errno=HA_ERR_FOUND_DUPP_KEY;
412 DBUG_RETURN(-1);
413 }
414 }
415 if (flag == MI_FOUND_WRONG_KEY)
416 DBUG_RETURN(-1);
417 if (!was_last_key)
418 insert_last=0;
419 next_page=_mi_kpos(nod_flag,keypos);
420 if (next_page == HA_OFFSET_ERROR ||
421 (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
422 temp_buff, keypos, page, insert_last)) >0)
423 {
424 error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
425 father_keypos,father_page, insert_last);
426 if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
427 goto err;
428 }
429 my_afree((uchar*) temp_buff);
430 DBUG_RETURN(error);
431 err:
432 my_afree((uchar*) temp_buff);
433 DBUG_PRINT("exit",("Error: %d",my_errno));
434 DBUG_RETURN (-1);
435 } /* w_search */
436
437
438 /*
439 Insert new key.
440
441 SYNOPSIS
442 _mi_insert()
443 info Open table information.
444 keyinfo Key definition information.
445 key New key.
446 anc_buff Key page (beginning).
447 key_pos Position in key page where to insert.
448 key_buff Copy of previous key.
449 father_buff parent key page for balancing.
450 father_key_pos position in parent key page for balancing.
451 father_page position of parent key page in file.
452 insert_last If to append at end of page.
453
454 DESCRIPTION
455 Insert new key at right of key_pos.
456
457 RETURN
458 2 if key contains key to upper level.
459 0 OK.
460 < 0 Error.
461 */
462
_mi_insert(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * anc_buff,uchar * key_pos,uchar * key_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page,my_bool insert_last)463 int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
464 uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
465 uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
466 my_bool insert_last)
467 {
468 uint a_length,nod_flag;
469 int t_length;
470 uchar *endpos, *prev_key;
471 MI_KEY_PARAM s_temp;
472 DBUG_ENTER("_mi_insert");
473 DBUG_PRINT("enter",("key_pos: 0x%lx", (long) key_pos));
474 DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
475
476 nod_flag=mi_test_if_nod(anc_buff);
477 a_length=mi_getint(anc_buff);
478 endpos= anc_buff+ a_length;
479 prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
480 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
481 (key_pos == endpos ? (uchar*) 0 : key_pos),
482 prev_key, prev_key,
483 key,&s_temp);
484 #ifndef DBUG_OFF
485 if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
486 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
487 {
488 DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
489 }
490 if (keyinfo->flag & HA_PACK_KEY)
491 {
492 DBUG_PRINT("test",("t_length: %d ref_len: %d",
493 t_length,s_temp.ref_length));
494 DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: 0x%lx",
495 s_temp.n_ref_length,s_temp.n_length, (long) s_temp.key));
496 }
497 #endif
498 if (t_length > 0)
499 {
500 if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
501 {
502 mi_print_error(info->s, HA_ERR_CRASHED);
503 my_errno=HA_ERR_CRASHED;
504 DBUG_RETURN(-1);
505 }
506 bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
507 }
508 else
509 {
510 if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
511 {
512 mi_print_error(info->s, HA_ERR_CRASHED);
513 my_errno=HA_ERR_CRASHED;
514 DBUG_RETURN(-1);
515 }
516 bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
517 }
518 (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
519 a_length+=t_length;
520 mi_putint(anc_buff,a_length,nod_flag);
521 if (a_length <= keyinfo->block_length)
522 {
523 if (keyinfo->block_length - a_length < 32 &&
524 keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
525 info->s->base.key_reflength <= info->s->rec_reflength &&
526 info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
527 {
528 /*
529 Normal word. One-level tree. Page is almost full.
530 Let's consider converting.
531 We'll compare 'key' and the first key at anc_buff
532 */
533 uchar *a=key, *b=anc_buff+2+nod_flag;
534 uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
535 /* the very first key on the page is always unpacked */
536 DBUG_ASSERT((*b & 128) == 0);
537 #if HA_FT_MAXLEN >= 127
538 blen= mi_uint2korr(b); b+=2;
539 #else
540 blen= *b++;
541 #endif
542 get_key_length(alen,a);
543 DBUG_ASSERT(info->ft1_to_ft2==0);
544 if (alen == blen &&
545 ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0, 0)==0)
546 {
547 /* yup. converting */
548 info->ft1_to_ft2=(DYNAMIC_ARRAY *)
549 my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
550 my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50);
551
552 /*
553 now, adding all keys from the page to dynarray
554 if the page is a leaf (if not keys will be deleted later)
555 */
556 if (!nod_flag)
557 {
558 /* let's leave the first key on the page, though, because
559 we cannot easily dispatch an empty page here */
560 b+=blen+ft2len+2;
561 for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
562 {
563 if (insert_dynamic(info->ft1_to_ft2, b))
564 {
565 mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
566 my_errno= HA_ERR_OUT_OF_MEM;
567 DBUG_RETURN(-1);
568 }
569 }
570
571 /* fixing the page's length - it contains only one key now */
572 mi_putint(anc_buff,2+blen+ft2len+2,0);
573 }
574 /* the rest will be done when we're back from recursion */
575 }
576 }
577 DBUG_RETURN(0); /* There is room on page */
578 }
579 /* Page is full */
580 if (nod_flag)
581 insert_last=0;
582 if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
583 father_buff && !insert_last)
584 DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
585 father_key_pos,father_page));
586 DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
587 } /* _mi_insert */
588
589
590 /* split a full page in two and assign emerging item to key */
591
_mi_split_page(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * buff,uchar * key_buff,my_bool insert_last_key)592 int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
593 uchar *key, uchar *buff, uchar *key_buff,
594 my_bool insert_last_key)
595 {
596 uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
597 uchar *key_pos,*pos, *after_key;
598 my_off_t new_pos;
599 MI_KEY_PARAM s_temp;
600 DBUG_ENTER("mi_split_page");
601 LINT_INIT(after_key);
602 DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
603
604 if (info->s->keyinfo+info->lastinx == keyinfo)
605 info->page_changed=1; /* Info->buff is used */
606 info->buff_used=1;
607 nod_flag=mi_test_if_nod(buff);
608 key_ref_length=2+nod_flag;
609 if (insert_last_key)
610 key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
611 else
612 key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
613 &after_key);
614 if (!key_pos)
615 DBUG_RETURN(-1);
616
617 length=(uint) (key_pos-buff);
618 a_length=mi_getint(buff);
619 mi_putint(buff,length,nod_flag);
620
621 key_pos=after_key;
622 if (nod_flag)
623 {
624 DBUG_PRINT("test",("Splitting nod"));
625 pos=key_pos-nod_flag;
626 memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
627 }
628
629 /* Move middle item to key and pointer to new page */
630 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
631 DBUG_RETURN(-1);
632 _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
633
634 /* Store new page */
635 if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
636 DBUG_RETURN(-1);
637
638 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
639 (uchar*) 0, (uchar*) 0,
640 key_buff, &s_temp);
641 length=(uint) ((buff+a_length)-key_pos);
642 memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
643 (size_t) length);
644 (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
645 mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
646
647 if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
648 DBUG_RETURN(-1);
649 DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
650 DBUG_RETURN(2); /* Middle key up */
651 } /* _mi_split_page */
652
653
654 /*
655 Calculate how to much to move to split a page in two
656 Returns pointer to start of key.
657 key will contain the key.
658 return_key_length will contain the length of key
659 after_key will contain the position to where the next key starts
660 */
661
_mi_find_half_pos(uint nod_flag,MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)662 uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
663 uchar *key, uint *return_key_length,
664 uchar **after_key)
665 {
666 uint keys,length,key_ref_length;
667 uchar *end,*lastpos;
668 DBUG_ENTER("_mi_find_half_pos");
669
670 key_ref_length=2+nod_flag;
671 length=mi_getint(page)-key_ref_length;
672 page+=key_ref_length;
673 if (!(keyinfo->flag &
674 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
675 HA_BINARY_PACK_KEY)))
676 {
677 key_ref_length=keyinfo->keylength+nod_flag;
678 keys=length/(key_ref_length*2);
679 *return_key_length=keyinfo->keylength;
680 end=page+keys*key_ref_length;
681 *after_key=end+key_ref_length;
682 memcpy(key,end,key_ref_length);
683 DBUG_RETURN(end);
684 }
685
686 end=page+length/2-key_ref_length; /* This is aprox. half */
687 *key='\0';
688 do
689 {
690 lastpos=page;
691 if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
692 DBUG_RETURN(0);
693 } while (page < end);
694 *return_key_length=length;
695 *after_key=page;
696 DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx half: 0x%lx",
697 (long) lastpos, (long) page, (long) end));
698 DBUG_RETURN(lastpos);
699 } /* _mi_find_half_pos */
700
701
702 /*
703 Split buffer at last key
704 Returns pointer to the start of the key before the last key
705 key will contain the last key
706 */
707
_mi_find_last_pos(MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)708 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
709 uchar *key, uint *return_key_length,
710 uchar **after_key)
711 {
712 uint keys,length,UNINIT_VAR(last_length),key_ref_length;
713 uchar *end,*lastpos,*UNINIT_VAR(prevpos);
714 uchar key_buff[MI_MAX_KEY_BUFF];
715 DBUG_ENTER("_mi_find_last_pos");
716
717 key_ref_length=2;
718 length=mi_getint(page)-key_ref_length;
719 page+=key_ref_length;
720 if (!(keyinfo->flag &
721 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
722 HA_BINARY_PACK_KEY)))
723 {
724 keys=length/keyinfo->keylength-2;
725 *return_key_length=length=keyinfo->keylength;
726 end=page+keys*length;
727 *after_key=end+length;
728 memcpy(key,end,length);
729 DBUG_RETURN(end);
730 }
731
732 end=page+length-key_ref_length;
733 *key='\0';
734 length=0;
735 lastpos=page;
736 while (page < end)
737 {
738 prevpos=lastpos; lastpos=page;
739 last_length=length;
740 memcpy(key, key_buff, length); /* previous key */
741 if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
742 {
743 mi_print_error(keyinfo->share, HA_ERR_CRASHED);
744 my_errno=HA_ERR_CRASHED;
745 DBUG_RETURN(0);
746 }
747 }
748 *return_key_length=last_length;
749 *after_key=lastpos;
750 DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx end: 0x%lx",
751 (long) prevpos,(long) page,(long) end));
752 DBUG_RETURN(prevpos);
753 } /* _mi_find_last_pos */
754
755
756 /* Balance page with not packed keys with page on right/left */
757 /* returns 0 if balance was done */
758
_mi_balance_page(register MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * curr_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page)759 static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
760 uchar *key, uchar *curr_buff, uchar *father_buff,
761 uchar *father_key_pos, my_off_t father_page)
762 {
763 my_bool right;
764 uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
765 right_length,left_length,new_right_length,new_left_length,extra_length,
766 length,keys;
767 uchar *pos,*buff,*extra_buff;
768 my_off_t next_page,new_pos;
769 uchar tmp_part_key[MI_MAX_KEY_BUFF];
770 DBUG_ENTER("_mi_balance_page");
771
772 k_length=keyinfo->keylength;
773 father_length=mi_getint(father_buff);
774 father_keylength=k_length+info->s->base.key_reflength;
775 nod_flag=mi_test_if_nod(curr_buff);
776 curr_keylength=k_length+nod_flag;
777 info->page_changed=1;
778
779 if ((father_key_pos != father_buff+father_length &&
780 (info->state->records & 1)) ||
781 father_key_pos == father_buff+2+info->s->base.key_reflength)
782 {
783 right=1;
784 next_page= _mi_kpos(info->s->base.key_reflength,
785 father_key_pos+father_keylength);
786 buff=info->buff;
787 DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
788 }
789 else
790 {
791 right=0;
792 father_key_pos-=father_keylength;
793 next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
794 /* Fix that curr_buff is to left */
795 buff=curr_buff; curr_buff=info->buff;
796 DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
797 } /* father_key_pos ptr to parting key */
798
799 if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
800 goto err;
801 DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
802
803 /* Test if there is room to share keys */
804
805 left_length=mi_getint(curr_buff);
806 right_length=mi_getint(buff);
807 keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
808
809 if ((right ? right_length : left_length) + curr_keylength <=
810 keyinfo->block_length)
811 { /* Merge buffs */
812 new_left_length=2+nod_flag+(keys/2)*curr_keylength;
813 new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
814 mi_putint(curr_buff,new_left_length,nod_flag);
815 mi_putint(buff,new_right_length,nod_flag);
816
817 if (left_length < new_left_length)
818 { /* Move keys buff -> leaf */
819 pos=curr_buff+left_length;
820 memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
821 memcpy((uchar*) pos+k_length, (uchar*) buff+2,
822 (size_t) (length=new_left_length - left_length - k_length));
823 pos=buff+2+length;
824 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
825 bmove((uchar*) buff + 2, (uchar*) pos + k_length, new_right_length - 2);
826 }
827 else
828 { /* Move keys -> buff */
829
830 bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
831 right_length-2);
832 length=new_right_length-right_length-k_length;
833 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
834 pos=curr_buff+new_left_length;
835 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
836 memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
837 }
838
839 if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
840 _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
841 goto err;
842 DBUG_RETURN(0);
843 }
844
845 /* curr_buff[] and buff[] are full, lets split and make new nod */
846
847 extra_buff=info->buff+info->s->base.max_key_block_length;
848 new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
849 if (keys == 5) /* Too few keys to balance */
850 new_left_length-=curr_keylength;
851 extra_length=nod_flag+left_length+right_length-
852 new_left_length-new_right_length-curr_keylength;
853 DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
854 left_length, right_length,
855 new_left_length, new_right_length,
856 extra_length));
857 mi_putint(curr_buff,new_left_length,nod_flag);
858 mi_putint(buff,new_right_length,nod_flag);
859 mi_putint(extra_buff,extra_length+2,nod_flag);
860
861 /* move first largest keys to new page */
862 pos=buff+right_length-extra_length;
863 memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
864 /* Save new parting key */
865 memcpy(tmp_part_key, pos-k_length,k_length);
866 /* Make place for new keys */
867 bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
868 right_length-extra_length-k_length-2);
869 /* Copy keys from left page */
870 pos= curr_buff+new_left_length;
871 memcpy((uchar*) buff+2,(uchar*) pos+k_length,
872 (size_t) (length=left_length-new_left_length-k_length));
873 /* Copy old parting key */
874 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
875
876 /* Move new parting keys up to caller */
877 memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
878 memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
879
880 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
881 goto err;
882 _mi_kpointer(info,key+k_length,new_pos);
883 if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
884 DFLT_INIT_HITS,info->buff) ||
885 _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
886 DFLT_INIT_HITS,extra_buff))
887 goto err;
888
889 DBUG_RETURN(1); /* Middle key up */
890
891 err:
892 DBUG_RETURN(-1);
893 } /* _mi_balance_page */
894
895 /**********************************************************************
896 * Bulk insert code *
897 **********************************************************************/
898
899 typedef struct {
900 MI_INFO *info;
901 uint keynr;
902 } bulk_insert_param;
903
_mi_ck_write_tree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)904 int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
905 uint key_length)
906 {
907 int error;
908 DBUG_ENTER("_mi_ck_write_tree");
909
910 error= tree_insert(&info->bulk_insert[keynr], key,
911 key_length + info->s->rec_reflength,
912 info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
913
914 DBUG_RETURN(error);
915 } /* _mi_ck_write_tree */
916
917
918 /* typeof(_mi_keys_compare)=qsort_cmp2 */
919
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)920 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
921 {
922 uint not_used[2];
923 return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
924 key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
925 not_used);
926 }
927
928
keys_free(uchar * key,TREE_FREE mode,bulk_insert_param * param)929 static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
930 {
931 /*
932 Probably I can use info->lastkey here, but I'm not sure,
933 and to be safe I'd better use local lastkey.
934 */
935 uchar lastkey[MI_MAX_KEY_BUFF];
936 uint keylen;
937 MI_KEYDEF *keyinfo;
938
939 switch (mode) {
940 case free_init:
941 if (param->info->s->concurrent_insert)
942 {
943 mysql_rwlock_wrlock(¶m->info->s->key_root_lock[param->keynr]);
944 param->info->s->keyinfo[param->keynr].version++;
945 }
946 return 0;
947 case free_free:
948 keyinfo=param->info->s->keyinfo+param->keynr;
949 keylen=_mi_keylength(keyinfo, key);
950 memcpy(lastkey, key, keylen);
951 return _mi_ck_write_btree(param->info,param->keynr,lastkey,
952 keylen - param->info->s->rec_reflength);
953 case free_end:
954 if (param->info->s->concurrent_insert)
955 mysql_rwlock_unlock(¶m->info->s->key_root_lock[param->keynr]);
956 return 0;
957 }
958 return -1;
959 }
960
961
mi_init_bulk_insert(MI_INFO * info,ulong cache_size,ha_rows rows)962 int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows)
963 {
964 MYISAM_SHARE *share=info->s;
965 MI_KEYDEF *key=share->keyinfo;
966 bulk_insert_param *params;
967 uint i, num_keys, total_keylength;
968 ulonglong key_map;
969 DBUG_ENTER("_mi_init_bulk_insert");
970 DBUG_PRINT("enter",("cache_size: %lu", cache_size));
971
972 DBUG_ASSERT(!info->bulk_insert &&
973 (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
974
975 mi_clear_all_keys_active(key_map);
976 for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
977 {
978 if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
979 mi_is_key_active(share->state.key_map, i))
980 {
981 num_keys++;
982 mi_set_key_active(key_map, i);
983 total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
984 }
985 }
986
987 if (num_keys==0 ||
988 num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
989 DBUG_RETURN(0);
990
991 if (rows && rows*total_keylength < cache_size)
992 cache_size= (ulong)rows;
993 else
994 cache_size/=total_keylength*16;
995
996 info->bulk_insert=(TREE *)
997 my_malloc((sizeof(TREE)*share->base.keys+
998 sizeof(bulk_insert_param)*num_keys),MYF(0));
999
1000 if (!info->bulk_insert)
1001 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1002
1003 params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1004 for (i=0 ; i < share->base.keys ; i++)
1005 {
1006 if (mi_is_key_active(key_map, i))
1007 {
1008 params->info=info;
1009 params->keynr=i;
1010 /* Only allocate a 16'th of the buffer at a time */
1011 init_tree(&info->bulk_insert[i],
1012 cache_size * key[i].maxlength,
1013 cache_size * key[i].maxlength, 0,
1014 (qsort_cmp2)keys_compare, 0,
1015 (tree_element_free) keys_free, (void *)params++);
1016 }
1017 else
1018 info->bulk_insert[i].root=0;
1019 }
1020
1021 DBUG_RETURN(0);
1022 }
1023
mi_flush_bulk_insert(MI_INFO * info,uint inx)1024 void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1025 {
1026 if (info->bulk_insert)
1027 {
1028 if (is_tree_inited(&info->bulk_insert[inx]))
1029 reset_tree(&info->bulk_insert[inx]);
1030 }
1031 }
1032
mi_end_bulk_insert(MI_INFO * info)1033 void mi_end_bulk_insert(MI_INFO *info)
1034 {
1035 if (info->bulk_insert)
1036 {
1037 uint i;
1038 for (i=0 ; i < info->s->base.keys ; i++)
1039 {
1040 if (is_tree_inited(& info->bulk_insert[i]))
1041 {
1042 delete_tree(& info->bulk_insert[i]);
1043 }
1044 }
1045 my_free(info->bulk_insert);
1046 info->bulk_insert=0;
1047 }
1048 }
1049