1 /* Copyright (c) 2000, 2013, Oracle and/or its affiliates
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 /* Write a row to a MyISAM table */
17
18 #include "fulltext.h"
19 #include "rt_index.h"
20
21 #define MAX_POINTER_LENGTH 8
22
23 /* Functions declared in this file */
24
25 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
26 uint comp_flag, uchar *key,
27 uint key_length, my_off_t pos, uchar *father_buff,
28 uchar *father_keypos, my_off_t father_page,
29 my_bool insert_last);
30 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
31 uchar *curr_buff,uchar *father_buff,
32 uchar *father_keypos,my_off_t father_page);
33 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
34 uchar *key, uint *return_key_length,
35 uchar **after_key);
36 int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
37 uint key_length);
38 int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
39 uint key_length);
40
41 /* Write new record to database */
42
mi_write(MI_INFO * info,const uchar * record)43 int mi_write(MI_INFO *info, const uchar *record)
44 {
45 MYISAM_SHARE *share=info->s;
46 uint i;
47 int save_errno;
48 my_off_t filepos;
49 uchar *buff;
50 my_bool lock_tree= share->concurrent_insert;
51 DBUG_ENTER("mi_write");
52 DBUG_PRINT("enter",("isam: %d data: %d",info->s->kfile,info->dfile));
53
54 DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
55 mi_print_error(info->s, HA_ERR_CRASHED);
56 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
57
58 /* it's always a bug to try to write a record with the deleted flag set */
59 DBUG_ASSERT(info->s->data_file_type != STATIC_RECORD || *record);
60
61 if (share->options & HA_OPTION_READ_ONLY_DATA)
62 {
63 DBUG_RETURN(my_errno=EACCES);
64 }
65 if (_mi_readinfo(info,F_WRLCK,1))
66 DBUG_RETURN(my_errno);
67
68 filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
69 !info->append_insert_at_end) ?
70 share->state.dellink :
71 info->state->data_file_length);
72
73 if (share->base.reloc == (ha_rows) 1 &&
74 share->base.records == (ha_rows) 1 &&
75 info->state->records == (ha_rows) 1)
76 { /* System file */
77 my_errno=HA_ERR_RECORD_FILE_FULL;
78 goto err2;
79 }
80 if (info->state->key_file_length >= share->base.margin_key_file_length)
81 {
82 my_errno=HA_ERR_INDEX_FILE_FULL;
83 goto err2;
84 }
85 if (_mi_mark_file_changed(info))
86 goto err2;
87
88 /* Calculate and check all unique constraints */
89 for (i=0 ; i < share->state.header.uniques ; i++)
90 {
91 MI_UNIQUEDEF *def= share->uniqueinfo + i;
92 if (mi_is_key_active(share->state.key_map, def->key) &&
93 mi_check_unique(info, def, record, mi_unique_hash(def, record),
94 HA_OFFSET_ERROR))
95 goto err2;
96 }
97
98 /* Write all keys to indextree */
99
100 buff=info->lastkey2;
101 for (i=0 ; i < share->base.keys ; i++)
102 {
103 if (mi_is_key_active(share->state.key_map, i))
104 {
105 my_bool local_lock_tree= (lock_tree &&
106 !(info->bulk_insert &&
107 is_tree_inited(&info->bulk_insert[i])));
108 if (local_lock_tree)
109 {
110 mysql_rwlock_wrlock(&share->key_root_lock[i]);
111 share->keyinfo[i].version++;
112 }
113 if (share->keyinfo[i].flag & HA_FULLTEXT )
114 {
115 if (_mi_ft_add(info,i, buff, record, filepos))
116 {
117 if (local_lock_tree)
118 mysql_rwlock_unlock(&share->key_root_lock[i]);
119 DBUG_PRINT("error",("Got error: %d on write",my_errno));
120 goto err;
121 }
122 }
123 else
124 {
125 if (share->keyinfo[i].ck_insert(info,i,buff,
126 _mi_make_key(info,i,buff,record,filepos)))
127 {
128 if (local_lock_tree)
129 mysql_rwlock_unlock(&share->key_root_lock[i]);
130 DBUG_PRINT("error",("Got error: %d on write",my_errno));
131 goto err;
132 }
133 }
134
135 /* The above changed info->lastkey2. Inform mi_rnext_same(). */
136 info->update&= ~HA_STATE_RNEXT_SAME;
137
138 if (local_lock_tree)
139 mysql_rwlock_unlock(&share->key_root_lock[i]);
140 }
141 }
142 if (share->calc_checksum)
143 info->checksum=(*share->calc_checksum)(info,record);
144 if (!(info->opt_flag & OPT_NO_ROWS))
145 {
146 if ((*share->write_record)(info,record))
147 goto err;
148 info->state->checksum+=info->checksum;
149 }
150 if (share->base.auto_key)
151 set_if_bigger(info->s->state.auto_increment,
152 retrieve_auto_increment(info, record));
153 info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
154 HA_STATE_ROW_CHANGED);
155 info->state->records++;
156 myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
157 (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
158 if (info->invalidator != 0)
159 {
160 DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
161 (*info->invalidator)(info->filename);
162 info->invalidator=0;
163 }
164
165 /*
166 Update status of the table. We need to do so after each row write
167 for the log tables, as we want the new row to become visible to
168 other threads as soon as possible. We don't lock mutex here
169 (as it is required by pthread memory visibility rules) as (1) it's
170 not critical to use outdated share->is_log_table value (2) locking
171 mutex here for every write is too expensive.
172 */
173 if (share->is_log_table)
174 mi_update_status((void*) info);
175
176 DBUG_RETURN(0);
177
178 err:
179 save_errno=my_errno;
180 if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
181 my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
182 {
183 if (info->bulk_insert)
184 {
185 uint j;
186 for (j=0 ; j < share->base.keys ; j++)
187 mi_flush_bulk_insert(info, j);
188 }
189 info->errkey= (int) i;
190 while ( i-- > 0)
191 {
192 if (mi_is_key_active(share->state.key_map, i))
193 {
194 my_bool local_lock_tree= (lock_tree &&
195 !(info->bulk_insert &&
196 is_tree_inited(&info->bulk_insert[i])));
197 if (local_lock_tree)
198 mysql_rwlock_wrlock(&share->key_root_lock[i]);
199 if (share->keyinfo[i].flag & HA_FULLTEXT)
200 {
201 if (_mi_ft_del(info,i, buff,record,filepos))
202 {
203 if (local_lock_tree)
204 mysql_rwlock_unlock(&share->key_root_lock[i]);
205 break;
206 }
207 }
208 else
209 {
210 uint key_length=_mi_make_key(info,i,buff,record,filepos);
211 if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
212 {
213 if (local_lock_tree)
214 mysql_rwlock_unlock(&share->key_root_lock[i]);
215 break;
216 }
217 }
218 if (local_lock_tree)
219 mysql_rwlock_unlock(&share->key_root_lock[i]);
220 }
221 }
222 }
223 else
224 {
225 mi_print_error(info->s, HA_ERR_CRASHED);
226 mi_mark_crashed(info);
227 }
228 info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
229 my_errno=save_errno;
230 err2:
231 save_errno=my_errno;
232 myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
233 (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
234 DBUG_RETURN(my_errno=save_errno);
235 } /* mi_write */
236
237
238 /* Write one key to btree */
239
_mi_ck_write(MI_INFO * info,uint keynr,uchar * key,uint key_length)240 int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
241 {
242 DBUG_ENTER("_mi_ck_write");
243
244 if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
245 {
246 DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
247 }
248 else
249 {
250 DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
251 }
252 } /* _mi_ck_write */
253
254
255 /**********************************************************************
256 * Normal insert code *
257 **********************************************************************/
258
_mi_ck_write_btree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)259 int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
260 uint key_length)
261 {
262 int error;
263 uint comp_flag;
264 MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
265 my_off_t *root=&info->s->state.key_root[keynr];
266 DBUG_ENTER("_mi_ck_write_btree");
267
268 if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
269 comp_flag=SEARCH_BIGGER; /* Put after same key */
270 else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
271 {
272 comp_flag=SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT; /* No duplicates */
273 if (keyinfo->flag & HA_NULL_ARE_EQUAL)
274 comp_flag|= SEARCH_NULL_ARE_EQUAL;
275 }
276 else
277 comp_flag=SEARCH_SAME; /* Keys in rec-pos order */
278
279 error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
280 root, comp_flag);
281 if (info->ft1_to_ft2)
282 {
283 if (!error)
284 error= _mi_ft_convert_to_ft2(info, keynr, key);
285 delete_dynamic(info->ft1_to_ft2);
286 my_free(info->ft1_to_ft2);
287 info->ft1_to_ft2=0;
288 }
289 DBUG_RETURN(error);
290 } /* _mi_ck_write_btree */
291
_mi_ck_real_write_btree(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uint key_length,my_off_t * root,uint comp_flag)292 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
293 uchar *key, uint key_length, my_off_t *root, uint comp_flag)
294 {
295 int error;
296 DBUG_ENTER("_mi_ck_real_write_btree");
297 /* key_length parameter is used only if comp_flag is SEARCH_FIND */
298 if (*root == HA_OFFSET_ERROR ||
299 (error=w_search(info, keyinfo, comp_flag, key, key_length,
300 *root, (uchar *) 0, (uchar*) 0,
301 (my_off_t) 0, 1)) > 0)
302 error=_mi_enlarge_root(info,keyinfo,key,root);
303 DBUG_RETURN(error);
304 } /* _mi_ck_real_write_btree */
305
306
307 /* Make a new root with key as only pointer */
308
_mi_enlarge_root(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,my_off_t * root)309 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
310 my_off_t *root)
311 {
312 uint t_length,nod_flag;
313 MI_KEY_PARAM s_temp;
314 MYISAM_SHARE *share=info->s;
315 DBUG_ENTER("_mi_enlarge_root");
316
317 nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
318 _mi_kpointer(info,info->buff+2,*root); /* if nod */
319 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
320 (uchar*) 0, (uchar*) 0, key,&s_temp);
321 mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
322 (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
323 info->buff_used=info->page_changed=1; /* info->buff is used */
324 if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
325 _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
326 DBUG_RETURN(-1);
327 DBUG_RETURN(0);
328 } /* _mi_enlarge_root */
329
330
331 /*
332 Search after a position for a key and store it there
333 Returns -1 = error
334 0 = ok
335 1 = key should be stored in higher tree
336 */
337
w_search(register MI_INFO * info,register MI_KEYDEF * keyinfo,uint comp_flag,uchar * key,uint key_length,my_off_t page,uchar * father_buff,uchar * father_keypos,my_off_t father_page,my_bool insert_last)338 static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
339 uint comp_flag, uchar *key, uint key_length, my_off_t page,
340 uchar *father_buff, uchar *father_keypos,
341 my_off_t father_page, my_bool insert_last)
342 {
343 int error,flag;
344 uint nod_flag, search_key_length;
345 uchar *temp_buff,*keypos;
346 uchar keybuff[HA_MAX_KEY_BUFF];
347 my_bool was_last_key;
348 my_off_t next_page, dupp_key_pos;
349 DBUG_ENTER("w_search");
350 DBUG_PRINT("enter",("page: %ld", (long) page));
351
352 search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
353 if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
354 HA_MAX_KEY_BUFF*2)))
355 DBUG_RETURN(-1);
356 if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
357 goto err;
358
359 flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
360 comp_flag, &keypos, keybuff, &was_last_key);
361 nod_flag=mi_test_if_nod(temp_buff);
362 if (flag == 0)
363 {
364 uint tmp_key_length;
365 /* get position to record with duplicated key */
366 tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
367 if (tmp_key_length)
368 dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
369 else
370 dupp_key_pos= HA_OFFSET_ERROR;
371
372 if (keyinfo->flag & HA_FULLTEXT)
373 {
374 uint off;
375 int subkeys;
376
377 get_key_full_length_rdonly(off, keybuff);
378 subkeys=ft_sintXkorr(keybuff+off);
379 comp_flag=SEARCH_SAME;
380 if (subkeys >= 0)
381 {
382 /* normal word, one-level tree structure */
383 flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
384 USE_WHOLE_KEY, comp_flag,
385 &keypos, keybuff, &was_last_key);
386 }
387 else
388 {
389 /* popular word. two-level tree. going down */
390 my_off_t root=dupp_key_pos;
391 keyinfo=&info->s->ft2_keyinfo;
392 get_key_full_length_rdonly(off, key);
393 key+=off;
394 keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
395 error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
396 &root, comp_flag);
397 _mi_dpointer(info, keypos+HA_FT_WLEN, root);
398 subkeys--; /* should there be underflow protection ? */
399 DBUG_ASSERT(subkeys < 0);
400 ft_intXstore(keypos, subkeys);
401 if (!error)
402 error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
403 my_afree((uchar*) temp_buff);
404 DBUG_RETURN(error);
405 }
406 }
407 else /* not HA_FULLTEXT, normal HA_NOSAME key */
408 {
409 info->dupp_key_pos= dupp_key_pos;
410 my_afree((uchar*) temp_buff);
411 my_errno=HA_ERR_FOUND_DUPP_KEY;
412 DBUG_RETURN(-1);
413 }
414 }
415 if (flag == MI_FOUND_WRONG_KEY)
416 DBUG_RETURN(-1);
417 if (!was_last_key)
418 insert_last=0;
419 next_page=_mi_kpos(nod_flag,keypos);
420 if (next_page == HA_OFFSET_ERROR ||
421 (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
422 temp_buff, keypos, page, insert_last)) >0)
423 {
424 error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
425 father_keypos,father_page, insert_last);
426 if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
427 goto err;
428 }
429 my_afree((uchar*) temp_buff);
430 DBUG_RETURN(error);
431 err:
432 my_afree((uchar*) temp_buff);
433 DBUG_PRINT("exit",("Error: %d",my_errno));
434 DBUG_RETURN (-1);
435 } /* w_search */
436
437
438 /*
439 Insert new key.
440
441 SYNOPSIS
442 _mi_insert()
443 info Open table information.
444 keyinfo Key definition information.
445 key New key.
446 anc_buff Key page (beginning).
447 key_pos Position in key page where to insert.
448 key_buff Copy of previous key.
449 father_buff parent key page for balancing.
450 father_key_pos position in parent key page for balancing.
451 father_page position of parent key page in file.
452 insert_last If to append at end of page.
453
454 DESCRIPTION
455 Insert new key at right of key_pos.
456
457 RETURN
458 2 if key contains key to upper level.
459 0 OK.
460 < 0 Error.
461 */
462
_mi_insert(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * anc_buff,uchar * key_pos,uchar * key_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page,my_bool insert_last)463 int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
464 uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
465 uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
466 my_bool insert_last)
467 {
468 uint a_length,nod_flag;
469 int t_length;
470 uchar *endpos, *prev_key;
471 MI_KEY_PARAM s_temp;
472 DBUG_ENTER("_mi_insert");
473 DBUG_PRINT("enter",("key_pos: %p", key_pos));
474 DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
475
476 nod_flag=mi_test_if_nod(anc_buff);
477 a_length=mi_getint(anc_buff);
478 endpos= anc_buff+ a_length;
479 prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
480 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
481 (key_pos == endpos ? (uchar*) 0 : key_pos),
482 prev_key, prev_key,
483 key,&s_temp);
484 #ifndef DBUG_OFF
485 if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
486 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
487 {
488 DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
489 }
490 if (keyinfo->flag & HA_PACK_KEY)
491 {
492 DBUG_PRINT("test",("t_length: %d ref_len: %d",
493 t_length,s_temp.ref_length));
494 DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: %p",
495 s_temp.n_ref_length,s_temp.n_length, s_temp.key));
496 }
497 #endif
498 if (t_length > 0)
499 {
500 if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
501 {
502 mi_print_error(info->s, HA_ERR_CRASHED);
503 my_errno=HA_ERR_CRASHED;
504 DBUG_RETURN(-1);
505 }
506 bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
507 }
508 else
509 {
510 if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
511 {
512 mi_print_error(info->s, HA_ERR_CRASHED);
513 my_errno=HA_ERR_CRASHED;
514 DBUG_RETURN(-1);
515 }
516 bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
517 }
518 (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
519 a_length+=t_length;
520 mi_putint(anc_buff,a_length,nod_flag);
521 if (a_length <= keyinfo->block_length)
522 {
523 if (keyinfo->block_length - a_length < 32 &&
524 keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
525 info->s->base.key_reflength <= info->s->rec_reflength &&
526 info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
527 {
528 /*
529 Normal word. One-level tree. Page is almost full.
530 Let's consider converting.
531 We'll compare 'key' and the first key at anc_buff
532 */
533 uchar *a=key, *b=anc_buff+2+nod_flag;
534 uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
535 /* the very first key on the page is always unpacked */
536 DBUG_ASSERT((*b & 128) == 0);
537 #if HA_FT_MAXLEN >= 127
538 blen= mi_uint2korr(b); b+=2;
539 #else
540 blen= *b++;
541 #endif
542 get_key_length(alen,a);
543 DBUG_ASSERT(info->ft1_to_ft2==0);
544 if (alen == blen &&
545 ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0)==0)
546 {
547 /* yup. converting */
548 info->ft1_to_ft2=(DYNAMIC_ARRAY *)
549 my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
550 my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50, MYF(0));
551
552 /*
553 now, adding all keys from the page to dynarray
554 if the page is a leaf (if not keys will be deleted later)
555 */
556 if (!nod_flag)
557 {
558 /* let's leave the first key on the page, though, because
559 we cannot easily dispatch an empty page here */
560 b+=blen+ft2len+2;
561 for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
562 {
563 if (insert_dynamic(info->ft1_to_ft2, b))
564 {
565 mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
566 my_errno= HA_ERR_OUT_OF_MEM;
567 DBUG_RETURN(-1);
568 }
569 }
570
571 /* fixing the page's length - it contains only one key now */
572 mi_putint(anc_buff,2+blen+ft2len+2,0);
573 }
574 /* the rest will be done when we're back from recursion */
575 }
576 }
577 DBUG_RETURN(0); /* There is room on page */
578 }
579 /* Page is full */
580 if (nod_flag)
581 insert_last=0;
582 if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
583 father_buff && !insert_last)
584 DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
585 father_key_pos,father_page));
586 DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
587 } /* _mi_insert */
588
589
590 /* split a full page in two and assign emerging item to key */
591
_mi_split_page(register MI_INFO * info,register MI_KEYDEF * keyinfo,uchar * key,uchar * buff,uchar * key_buff,my_bool insert_last_key)592 int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
593 uchar *key, uchar *buff, uchar *key_buff,
594 my_bool insert_last_key)
595 {
596 uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
597 uchar *key_pos,*pos, *UNINIT_VAR(after_key);
598 my_off_t new_pos;
599 MI_KEY_PARAM s_temp;
600 DBUG_ENTER("mi_split_page");
601 DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
602
603 if (info->s->keyinfo+info->lastinx == keyinfo)
604 info->page_changed=1; /* Info->buff is used */
605 info->buff_used=1;
606 nod_flag=mi_test_if_nod(buff);
607 key_ref_length=2+nod_flag;
608 if (insert_last_key)
609 key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
610 else
611 key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
612 &after_key);
613 if (!key_pos)
614 DBUG_RETURN(-1);
615
616 length=(uint) (key_pos-buff);
617 a_length=mi_getint(buff);
618 mi_putint(buff,length,nod_flag);
619
620 key_pos=after_key;
621 if (nod_flag)
622 {
623 DBUG_PRINT("test",("Splitting nod"));
624 pos=key_pos-nod_flag;
625 memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
626 }
627
628 /* Move middle item to key and pointer to new page */
629 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
630 DBUG_RETURN(-1);
631 _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
632
633 /* Store new page */
634 if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
635 DBUG_RETURN(-1);
636
637 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
638 (uchar*) 0, (uchar*) 0,
639 key_buff, &s_temp);
640 length=(uint) ((buff+a_length)-key_pos);
641 memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
642 (size_t) length);
643 (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
644 mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
645
646 if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
647 DBUG_RETURN(-1);
648 DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
649 DBUG_RETURN(2); /* Middle key up */
650 } /* _mi_split_page */
651
652
653 /*
654 Calculate how to much to move to split a page in two
655 Returns pointer to start of key.
656 key will contain the key.
657 return_key_length will contain the length of key
658 after_key will contain the position to where the next key starts
659 */
660
_mi_find_half_pos(uint nod_flag,MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)661 uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
662 uchar *key, uint *return_key_length,
663 uchar **after_key)
664 {
665 uint keys,length,key_ref_length;
666 uchar *end,*lastpos;
667 DBUG_ENTER("_mi_find_half_pos");
668
669 key_ref_length=2+nod_flag;
670 length=mi_getint(page)-key_ref_length;
671 page+=key_ref_length;
672 if (!(keyinfo->flag &
673 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
674 HA_BINARY_PACK_KEY)))
675 {
676 key_ref_length=keyinfo->keylength+nod_flag;
677 keys=length/(key_ref_length*2);
678 *return_key_length=keyinfo->keylength;
679 end=page+keys*key_ref_length;
680 *after_key=end+key_ref_length;
681 memcpy(key,end,key_ref_length);
682 DBUG_RETURN(end);
683 }
684
685 end=page+length/2-key_ref_length; /* This is aprox. half */
686 *key='\0';
687 do
688 {
689 lastpos=page;
690 if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
691 DBUG_RETURN(0);
692 } while (page < end);
693 *return_key_length=length;
694 *after_key=page;
695 DBUG_PRINT("exit",("returns: %p page: %p half: %p",
696 lastpos, page, end));
697 DBUG_RETURN(lastpos);
698 } /* _mi_find_half_pos */
699
700
701 /*
702 Split buffer at last key
703 Returns pointer to the start of the key before the last key
704 key will contain the last key
705 */
706
_mi_find_last_pos(MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)707 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
708 uchar *key, uint *return_key_length,
709 uchar **after_key)
710 {
711 uint keys, length, UNINIT_VAR(last_length), key_ref_length;
712 uchar *end,*lastpos,*prevpos;
713 uchar key_buff[HA_MAX_KEY_BUFF];
714 DBUG_ENTER("_mi_find_last_pos");
715
716 key_ref_length=2;
717 length=mi_getint(page)-key_ref_length;
718 page+=key_ref_length;
719 if (!(keyinfo->flag &
720 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
721 HA_BINARY_PACK_KEY)))
722 {
723 keys=length/keyinfo->keylength-2;
724 *return_key_length=length=keyinfo->keylength;
725 end=page+keys*length;
726 *after_key=end+length;
727 memcpy(key,end,length);
728 DBUG_RETURN(end);
729 }
730
731 end=page+length-key_ref_length;
732 DBUG_ASSERT(page < end);
733 *key='\0';
734 length=0;
735 lastpos=page;
736
737 do
738 {
739 prevpos=lastpos; lastpos=page;
740 last_length=length;
741 memcpy(key, key_buff, length); /* previous key */
742 if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
743 {
744 mi_print_error(keyinfo->share, HA_ERR_CRASHED);
745 my_errno=HA_ERR_CRASHED;
746 DBUG_RETURN(0);
747 }
748 } while (page < end);
749
750 *return_key_length=last_length;
751 *after_key=lastpos;
752 DBUG_PRINT("exit",("returns: %p page: %p end: %p",
753 prevpos, page, end));
754 DBUG_RETURN(prevpos);
755 } /* _mi_find_last_pos */
756
757
758 /* Balance page with not packed keys with page on right/left */
759 /* returns 0 if balance was done */
760
_mi_balance_page(register MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * curr_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page)761 static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
762 uchar *key, uchar *curr_buff, uchar *father_buff,
763 uchar *father_key_pos, my_off_t father_page)
764 {
765 my_bool right;
766 uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
767 right_length,left_length,new_right_length,new_left_length,extra_length,
768 length,keys;
769 uchar *pos,*buff,*extra_buff;
770 my_off_t next_page,new_pos;
771 uchar tmp_part_key[HA_MAX_KEY_BUFF];
772 DBUG_ENTER("_mi_balance_page");
773
774 k_length=keyinfo->keylength;
775 father_length=mi_getint(father_buff);
776 father_keylength=k_length+info->s->base.key_reflength;
777 nod_flag=mi_test_if_nod(curr_buff);
778 curr_keylength=k_length+nod_flag;
779 info->page_changed=1;
780
781 if ((father_key_pos != father_buff+father_length &&
782 (info->state->records & 1)) ||
783 father_key_pos == father_buff+2+info->s->base.key_reflength)
784 {
785 right=1;
786 next_page= _mi_kpos(info->s->base.key_reflength,
787 father_key_pos+father_keylength);
788 buff=info->buff;
789 DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
790 }
791 else
792 {
793 right=0;
794 father_key_pos-=father_keylength;
795 next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
796 /* Fix that curr_buff is to left */
797 buff=curr_buff; curr_buff=info->buff;
798 DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
799 } /* father_key_pos ptr to parting key */
800
801 if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
802 goto err;
803 DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
804
805 /* Test if there is room to share keys */
806
807 left_length=mi_getint(curr_buff);
808 right_length=mi_getint(buff);
809 keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
810
811 if ((right ? right_length : left_length) + curr_keylength <=
812 keyinfo->block_length)
813 { /* Merge buffs */
814 new_left_length=2+nod_flag+(keys/2)*curr_keylength;
815 new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
816 mi_putint(curr_buff,new_left_length,nod_flag);
817 mi_putint(buff,new_right_length,nod_flag);
818
819 if (left_length < new_left_length)
820 { /* Move keys buff -> leaf */
821 pos=curr_buff+left_length;
822 memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
823 memcpy((uchar*) pos+k_length, (uchar*) buff+2,
824 (size_t) (length=new_left_length - left_length - k_length));
825 pos=buff+2+length;
826 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
827 bmove((uchar*) buff + 2, (uchar*) pos + k_length, new_right_length - 2);
828 }
829 else
830 { /* Move keys -> buff */
831
832 bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
833 right_length-2);
834 length=new_right_length-right_length-k_length;
835 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
836 pos=curr_buff+new_left_length;
837 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
838 memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
839 }
840
841 if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
842 _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
843 goto err;
844 DBUG_RETURN(0);
845 }
846
847 /* curr_buff[] and buff[] are full, lets split and make new nod */
848
849 extra_buff=info->buff+info->s->base.max_key_block_length;
850 new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
851 if (keys == 5) /* Too few keys to balance */
852 new_left_length-=curr_keylength;
853 extra_length=nod_flag+left_length+right_length-
854 new_left_length-new_right_length-curr_keylength;
855 DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
856 left_length, right_length,
857 new_left_length, new_right_length,
858 extra_length));
859 mi_putint(curr_buff,new_left_length,nod_flag);
860 mi_putint(buff,new_right_length,nod_flag);
861 mi_putint(extra_buff,extra_length+2,nod_flag);
862
863 /* move first largest keys to new page */
864 pos=buff+right_length-extra_length;
865 memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
866 /* Save new parting key */
867 memcpy(tmp_part_key, pos-k_length,k_length);
868 /* Make place for new keys */
869 bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
870 right_length-extra_length-k_length-2);
871 /* Copy keys from left page */
872 pos= curr_buff+new_left_length;
873 memcpy((uchar*) buff+2,(uchar*) pos+k_length,
874 (size_t) (length=left_length-new_left_length-k_length));
875 /* Copy old parting key */
876 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
877
878 /* Move new parting keys up to caller */
879 memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
880 memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
881
882 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
883 goto err;
884 _mi_kpointer(info,key+k_length,new_pos);
885 if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
886 DFLT_INIT_HITS,info->buff) ||
887 _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
888 DFLT_INIT_HITS,extra_buff))
889 goto err;
890
891 DBUG_RETURN(1); /* Middle key up */
892
893 err:
894 DBUG_RETURN(-1);
895 } /* _mi_balance_page */
896
897 /**********************************************************************
898 * Bulk insert code *
899 **********************************************************************/
900
901 typedef struct {
902 MI_INFO *info;
903 uint keynr;
904 } bulk_insert_param;
905
_mi_ck_write_tree(register MI_INFO * info,uint keynr,uchar * key,uint key_length)906 int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
907 uint key_length)
908 {
909 int error;
910 DBUG_ENTER("_mi_ck_write_tree");
911
912 error= tree_insert(&info->bulk_insert[keynr], key,
913 key_length + info->s->rec_reflength,
914 info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
915
916 DBUG_RETURN(error);
917 } /* _mi_ck_write_tree */
918
919
920 /* typeof(_mi_keys_compare)=qsort_cmp2 */
921
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)922 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
923 {
924 uint not_used[2];
925 return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
926 key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
927 not_used);
928 }
929
930
keys_free(void * key_arg,TREE_FREE mode,void * param_arg)931 static int keys_free(void* key_arg, TREE_FREE mode, void *param_arg)
932 {
933 /*
934 Probably I can use info->lastkey here, but I'm not sure,
935 and to be safe I'd better use local lastkey.
936 */
937 bulk_insert_param *param= (bulk_insert_param*)param_arg;
938 uchar lastkey[HA_MAX_KEY_BUFF], *key= (uchar*)key_arg;
939 uint keylen;
940 MI_KEYDEF *keyinfo;
941
942 switch (mode) {
943 case free_init:
944 if (param->info->s->concurrent_insert)
945 {
946 mysql_rwlock_wrlock(¶m->info->s->key_root_lock[param->keynr]);
947 param->info->s->keyinfo[param->keynr].version++;
948 }
949 return 0;
950 case free_free:
951 keyinfo=param->info->s->keyinfo+param->keynr;
952 keylen=_mi_keylength(keyinfo, key);
953 memcpy(lastkey, key, keylen);
954 _mi_ck_write_btree(param->info, param->keynr, lastkey,
955 keylen - param->info->s->rec_reflength);
956 return 0;
957 case free_end:
958 if (param->info->s->concurrent_insert)
959 mysql_rwlock_unlock(¶m->info->s->key_root_lock[param->keynr]);
960 return 0;
961 }
962 return 0;
963 }
964
965
mi_init_bulk_insert(MI_INFO * info,size_t cache_size,ha_rows rows)966 int mi_init_bulk_insert(MI_INFO *info, size_t cache_size, ha_rows rows)
967 {
968 MYISAM_SHARE *share=info->s;
969 MI_KEYDEF *key=share->keyinfo;
970 bulk_insert_param *params;
971 uint i, num_keys, total_keylength;
972 ulonglong key_map;
973 DBUG_ENTER("_mi_init_bulk_insert");
974 DBUG_PRINT("enter",("cache_size: %lu", (ulong) cache_size));
975
976 DBUG_ASSERT(!info->bulk_insert &&
977 (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
978
979 mi_clear_all_keys_active(key_map);
980 for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
981 {
982 if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
983 mi_is_key_active(share->state.key_map, i))
984 {
985 num_keys++;
986 mi_set_key_active(key_map, i);
987 total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
988 }
989 }
990
991 if (num_keys==0 ||
992 num_keys * (size_t) MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
993 DBUG_RETURN(0);
994
995 if (rows && rows*total_keylength < cache_size)
996 cache_size= (size_t) rows;
997 else
998 cache_size/=total_keylength*16;
999
1000 info->bulk_insert=(TREE *)
1001 my_malloc((sizeof(TREE)*share->base.keys+
1002 sizeof(bulk_insert_param)*num_keys),MYF(0));
1003
1004 if (!info->bulk_insert)
1005 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1006
1007 params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1008 for (i=0 ; i < share->base.keys ; i++)
1009 {
1010 if (mi_is_key_active(key_map, i))
1011 {
1012 params->info=info;
1013 params->keynr=i;
1014 /* Only allocate a 16'th of the buffer at a time */
1015 init_tree(&info->bulk_insert[i],
1016 cache_size * key[i].maxlength,
1017 cache_size * key[i].maxlength, 0,
1018 (qsort_cmp2)keys_compare, keys_free, (void *)params++, MYF(0));
1019 }
1020 else
1021 info->bulk_insert[i].root=0;
1022 }
1023
1024 DBUG_RETURN(0);
1025 }
1026
mi_flush_bulk_insert(MI_INFO * info,uint inx)1027 void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1028 {
1029 if (info->bulk_insert)
1030 {
1031 if (is_tree_inited(&info->bulk_insert[inx]))
1032 reset_tree(&info->bulk_insert[inx]);
1033 }
1034 }
1035
mi_end_bulk_insert(MI_INFO * info,my_bool abort)1036 int mi_end_bulk_insert(MI_INFO *info, my_bool abort)
1037 {
1038 int first_error= 0;
1039 if (info->bulk_insert)
1040 {
1041 uint i;
1042 for (i=0 ; i < info->s->base.keys ; i++)
1043 {
1044 if (is_tree_inited(& info->bulk_insert[i]))
1045 {
1046 int error;
1047 if ((error= delete_tree(& info->bulk_insert[i], abort)))
1048 {
1049 first_error= first_error ? first_error : error;
1050 abort= 1;
1051 }
1052 }
1053 }
1054 my_free(info->bulk_insert);
1055 info->bulk_insert=0;
1056 }
1057 return first_error;
1058 }
1059