1 /*
2 Copyright (c) 2000, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 /* Write a row to a MyISAM table */
25
26 #include "fulltext.h"
27 #include "rt_index.h"
28
29 #define MAX_POINTER_LENGTH 8
30
31 /* Functions declared in this file */
32
33 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
34 uint comp_flag, uchar *key,
35 uint key_length, my_off_t pos, uchar *father_buff,
36 uchar *father_keypos, my_off_t father_page,
37 my_bool insert_last);
38 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
39 uchar *curr_buff,uchar *father_buff,
40 uchar *father_keypos,my_off_t father_page);
41 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
42 uchar *key, uint *return_key_length,
43 uchar **after_key);
44 int _mi_ck_write_tree(MI_INFO *info, uint keynr,uchar *key,
45 uint key_length);
46 int _mi_ck_write_btree(MI_INFO *info, uint keynr,uchar *key,
47 uint key_length);
48
49 /* Write new record to database */
50
mi_write(MI_INFO * info,uchar * record)51 int mi_write(MI_INFO *info, uchar *record)
52 {
53 MYISAM_SHARE *share=info->s;
54 uint i;
55 int save_errno;
56 my_off_t filepos;
57 uchar *buff;
58 my_bool lock_tree= share->concurrent_insert;
59 DBUG_ENTER("mi_write");
60 DBUG_PRINT("enter",("isam: %d data: %d",info->s->kfile,info->dfile));
61
62 DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
63 mi_print_error(info->s, HA_ERR_CRASHED);
64 set_my_errno(HA_ERR_CRASHED);
65 DBUG_RETURN(HA_ERR_CRASHED););
66 if (share->options & HA_OPTION_READ_ONLY_DATA)
67 {
68 set_my_errno(EACCES);
69 DBUG_RETURN(EACCES);
70 }
71 if (_mi_readinfo(info,F_WRLCK,1))
72 DBUG_RETURN(my_errno());
73
74 filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
75 !info->append_insert_at_end) ?
76 share->state.dellink :
77 info->state->data_file_length);
78
79 if (share->base.reloc == (ha_rows) 1 &&
80 share->base.records == (ha_rows) 1 &&
81 info->state->records == (ha_rows) 1)
82 { /* System file */
83 set_my_errno(HA_ERR_RECORD_FILE_FULL);
84 goto err2;
85 }
86 if (info->state->key_file_length >= share->base.margin_key_file_length)
87 {
88 set_my_errno(HA_ERR_INDEX_FILE_FULL);
89 goto err2;
90 }
91 if (_mi_mark_file_changed(info))
92 goto err2;
93
94 /* Calculate and check all unique constraints */
95 if (mi_is_any_key_active(share->state.key_map))
96 {
97 for (i= 0 ; i < share->state.header.uniques ; i++)
98 {
99 if (mi_check_unique(info, share->uniqueinfo + i, record,
100 mi_unique_hash(share->uniqueinfo + i, record),
101 HA_OFFSET_ERROR))
102 goto err2;
103 }
104 }
105
106 /* Write all keys to indextree */
107
108 buff=info->lastkey2;
109 for (i=0 ; i < share->base.keys ; i++)
110 {
111 if (mi_is_key_active(share->state.key_map, i))
112 {
113 my_bool local_lock_tree= (lock_tree &&
114 !(info->bulk_insert &&
115 is_tree_inited(&info->bulk_insert[i])));
116 if (local_lock_tree)
117 {
118 mysql_rwlock_wrlock(&share->key_root_lock[i]);
119 share->keyinfo[i].version++;
120 }
121 if (share->keyinfo[i].flag & HA_FULLTEXT )
122 {
123 if (_mi_ft_add(info,i, buff, record, filepos))
124 {
125 if (local_lock_tree)
126 mysql_rwlock_unlock(&share->key_root_lock[i]);
127 DBUG_PRINT("error",("Got error: %d on write",my_errno()));
128 goto err;
129 }
130 }
131 else
132 {
133 if (share->keyinfo[i].ck_insert(info,i,buff,
134 _mi_make_key(info,i,buff,record,filepos)))
135 {
136 if (local_lock_tree)
137 mysql_rwlock_unlock(&share->key_root_lock[i]);
138 DBUG_PRINT("error",("Got error: %d on write",my_errno()));
139 goto err;
140 }
141 }
142
143 if (local_lock_tree)
144 mysql_rwlock_unlock(&share->key_root_lock[i]);
145 }
146 }
147 if (share->calc_checksum)
148 info->checksum=(*share->calc_checksum)(info,record);
149 if (!(info->opt_flag & OPT_NO_ROWS))
150 {
151 if ((*share->write_record)(info,record))
152 goto err;
153 info->state->checksum+=info->checksum;
154 }
155 if (share->base.auto_key)
156 set_if_bigger(info->s->state.auto_increment,
157 retrieve_auto_increment(info, record));
158 info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
159 HA_STATE_ROW_CHANGED);
160 info->state->records++;
161 info->lastpos=filepos;
162 myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
163 (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
164 if (info->invalidator != 0)
165 {
166 DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
167 (*info->invalidator)(info->filename);
168 info->invalidator=0;
169 }
170
171 /*
172 Update status of the table. We need to do so after each row write
173 for the log tables, as we want the new row to become visible to
174 other threads as soon as possible. We don't lock mutex here
175 (as it is required by pthread memory visibility rules) as (1) it's
176 not critical to use outdated share->is_log_table value (2) locking
177 mutex here for every write is too expensive.
178 */
179 if (share->is_log_table)
180 mi_update_status((void*) info);
181
182 DBUG_RETURN(0);
183
184 err:
185 save_errno=my_errno();
186 if (my_errno() == HA_ERR_FOUND_DUPP_KEY || my_errno() == HA_ERR_RECORD_FILE_FULL ||
187 my_errno() == HA_ERR_NULL_IN_SPATIAL || my_errno() == HA_ERR_OUT_OF_MEM)
188 {
189 if (info->bulk_insert)
190 {
191 uint j;
192 for (j=0 ; j < share->base.keys ; j++)
193 mi_flush_bulk_insert(info, j);
194 }
195 info->errkey= (int) i;
196 while ( i-- > 0)
197 {
198 if (mi_is_key_active(share->state.key_map, i))
199 {
200 my_bool local_lock_tree= (lock_tree &&
201 !(info->bulk_insert &&
202 is_tree_inited(&info->bulk_insert[i])));
203 if (local_lock_tree)
204 mysql_rwlock_wrlock(&share->key_root_lock[i]);
205 if (share->keyinfo[i].flag & HA_FULLTEXT)
206 {
207 if (_mi_ft_del(info,i, buff,record,filepos))
208 {
209 if (local_lock_tree)
210 mysql_rwlock_unlock(&share->key_root_lock[i]);
211 break;
212 }
213 }
214 else
215 {
216 uint key_length=_mi_make_key(info,i,buff,record,filepos);
217 if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
218 {
219 if (local_lock_tree)
220 mysql_rwlock_unlock(&share->key_root_lock[i]);
221 break;
222 }
223 }
224 if (local_lock_tree)
225 mysql_rwlock_unlock(&share->key_root_lock[i]);
226 }
227 }
228 }
229 else
230 {
231 mi_print_error(info->s, HA_ERR_CRASHED);
232 mi_mark_crashed(info);
233 }
234 info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
235 set_my_errno(save_errno);
236 err2:
237 save_errno=my_errno();
238 myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno());
239 (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
240 set_my_errno(save_errno);
241 DBUG_RETURN(save_errno);
242 } /* mi_write */
243
244
245 /* Write one key to btree */
246
_mi_ck_write(MI_INFO * info,uint keynr,uchar * key,uint key_length)247 int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
248 {
249 DBUG_ENTER("_mi_ck_write");
250
251 if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
252 {
253 DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
254 }
255 else
256 {
257 DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
258 }
259 } /* _mi_ck_write */
260
261
262 /**********************************************************************
263 * Normal insert code *
264 **********************************************************************/
265
_mi_ck_write_btree(MI_INFO * info,uint keynr,uchar * key,uint key_length)266 int _mi_ck_write_btree(MI_INFO *info, uint keynr, uchar *key,
267 uint key_length)
268 {
269 int error;
270 uint comp_flag;
271 MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
272 my_off_t *root=&info->s->state.key_root[keynr];
273 DBUG_ENTER("_mi_ck_write_btree");
274
275 if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
276 comp_flag=SEARCH_BIGGER; /* Put after same key */
277 else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
278 {
279 comp_flag=SEARCH_FIND | SEARCH_UPDATE; /* No duplicates */
280 if (keyinfo->flag & HA_NULL_ARE_EQUAL)
281 comp_flag|= SEARCH_NULL_ARE_EQUAL;
282 }
283 else
284 comp_flag=SEARCH_SAME; /* Keys in rec-pos order */
285
286 error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
287 root, comp_flag);
288 if (info->ft1_to_ft2)
289 {
290 if (!error)
291 error= _mi_ft_convert_to_ft2(info, keynr, key);
292 delete_dynamic(info->ft1_to_ft2);
293 my_free(info->ft1_to_ft2);
294 info->ft1_to_ft2=0;
295 }
296 DBUG_RETURN(error);
297 } /* _mi_ck_write_btree */
298
_mi_ck_real_write_btree(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uint key_length,my_off_t * root,uint comp_flag)299 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
300 uchar *key, uint key_length, my_off_t *root, uint comp_flag)
301 {
302 int error;
303 DBUG_ENTER("_mi_ck_real_write_btree");
304 /* key_length parameter is used only if comp_flag is SEARCH_FIND */
305 if (*root == HA_OFFSET_ERROR ||
306 (error=w_search(info, keyinfo, comp_flag, key, key_length,
307 *root, (uchar *) 0, (uchar*) 0,
308 (my_off_t) 0, 1)) > 0)
309 error=_mi_enlarge_root(info,keyinfo,key,root);
310 DBUG_RETURN(error);
311 } /* _mi_ck_real_write_btree */
312
313
314 /* Make a new root with key as only pointer */
315
_mi_enlarge_root(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,my_off_t * root)316 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
317 my_off_t *root)
318 {
319 uint t_length,nod_flag;
320 MI_KEY_PARAM s_temp;
321 MYISAM_SHARE *share=info->s;
322 DBUG_ENTER("_mi_enlarge_root");
323
324 nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
325 _mi_kpointer(info,info->buff+2,*root); /* if nod */
326 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
327 (uchar*) 0, (uchar*) 0, key,&s_temp);
328 mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
329 (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
330 info->buff_used=info->page_changed=1; /* info->buff is used */
331 if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
332 _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
333 DBUG_RETURN(-1);
334 DBUG_RETURN(0);
335 } /* _mi_enlarge_root */
336
337
338 /*
339 Search after a position for a key and store it there
340 Returns -1 = error
341 0 = ok
342 1 = key should be stored in higher tree
343 */
344
w_search(MI_INFO * info,MI_KEYDEF * keyinfo,uint comp_flag,uchar * key,uint key_length,my_off_t page,uchar * father_buff,uchar * father_keypos,my_off_t father_page,my_bool insert_last)345 static int w_search(MI_INFO *info, MI_KEYDEF *keyinfo,
346 uint comp_flag, uchar *key, uint key_length, my_off_t page,
347 uchar *father_buff, uchar *father_keypos,
348 my_off_t father_page, my_bool insert_last)
349 {
350 int error,flag;
351 uint nod_flag, search_key_length;
352 uchar *temp_buff,*keypos;
353 uchar keybuff[MI_MAX_KEY_BUFF];
354 my_bool was_last_key;
355 my_off_t next_page, dupp_key_pos;
356 DBUG_ENTER("w_search");
357 DBUG_PRINT("enter",("page: %ld", (long) page));
358
359 search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
360 if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
361 MI_MAX_KEY_BUFF*2)))
362 DBUG_RETURN(-1);
363 if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
364 goto err;
365
366 flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
367 comp_flag, &keypos, keybuff, &was_last_key);
368 nod_flag=mi_test_if_nod(temp_buff);
369 if (flag == 0)
370 {
371 uint tmp_key_length;
372 /* get position to record with duplicated key */
373 tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
374 if (tmp_key_length)
375 dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
376 else
377 dupp_key_pos= HA_OFFSET_ERROR;
378
379 if (keyinfo->flag & HA_FULLTEXT)
380 {
381 uint off;
382 int subkeys;
383
384 get_key_full_length_rdonly(off, keybuff);
385 subkeys=ft_sintXkorr(keybuff+off);
386 comp_flag=SEARCH_SAME;
387 if (subkeys >= 0)
388 {
389 /* normal word, one-level tree structure */
390 flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
391 USE_WHOLE_KEY, comp_flag,
392 &keypos, keybuff, &was_last_key);
393 }
394 else
395 {
396 /* popular word. two-level tree. going down */
397 my_off_t root=dupp_key_pos;
398 keyinfo=&info->s->ft2_keyinfo;
399 get_key_full_length_rdonly(off, key);
400 key+=off;
401 keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
402 error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
403 &root, comp_flag);
404 _mi_dpointer(info, keypos+HA_FT_WLEN, root);
405 subkeys--; /* should there be underflow protection ? */
406 assert(subkeys < 0);
407 ft_intXstore(keypos, subkeys);
408 if (!error)
409 error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
410 DBUG_RETURN(error);
411 }
412 }
413 else /* not HA_FULLTEXT, normal HA_NOSAME key */
414 {
415 info->dupp_key_pos= dupp_key_pos;
416 set_my_errno(HA_ERR_FOUND_DUPP_KEY);
417 DBUG_RETURN(-1);
418 }
419 }
420 if (flag == MI_FOUND_WRONG_KEY)
421 DBUG_RETURN(-1);
422 if (!was_last_key)
423 insert_last=0;
424 next_page=_mi_kpos(nod_flag,keypos);
425 if (next_page == HA_OFFSET_ERROR ||
426 (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
427 temp_buff, keypos, page, insert_last)) >0)
428 {
429 error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
430 father_keypos,father_page, insert_last);
431 if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
432 goto err;
433 }
434 DBUG_RETURN(error);
435 err:
436 DBUG_PRINT("exit",("Error: %d",my_errno()));
437 DBUG_RETURN (-1);
438 } /* w_search */
439
440
441 /*
442 Insert new key.
443
444 SYNOPSIS
445 _mi_insert()
446 info Open table information.
447 keyinfo Key definition information.
448 key New key.
449 anc_buff Key page (beginning).
450 key_pos Position in key page where to insert.
451 key_buff Copy of previous key.
452 father_buff parent key page for balancing.
453 father_key_pos position in parent key page for balancing.
454 father_page position of parent key page in file.
455 insert_last If to append at end of page.
456
457 DESCRIPTION
458 Insert new key at right of key_pos.
459
460 RETURN
461 2 if key contains key to upper level.
462 0 OK.
463 < 0 Error.
464 */
465
_mi_insert(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * anc_buff,uchar * key_pos,uchar * key_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page,my_bool insert_last)466 int _mi_insert(MI_INFO *info, MI_KEYDEF *keyinfo,
467 uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
468 uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
469 my_bool insert_last)
470 {
471 uint a_length,nod_flag;
472 int t_length;
473 uchar *endpos, *prev_key;
474 MI_KEY_PARAM s_temp;
475 DBUG_ENTER("_mi_insert");
476 DBUG_PRINT("enter",("key_pos: 0x%lx", (long) key_pos));
477 DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
478
479 nod_flag=mi_test_if_nod(anc_buff);
480 a_length=mi_getint(anc_buff);
481 endpos= anc_buff+ a_length;
482 prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
483 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
484 (key_pos == endpos ? (uchar*) 0 : key_pos),
485 prev_key, prev_key,
486 key,&s_temp);
487 #ifndef NDEBUG
488 if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
489 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
490 {
491 DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
492 }
493 if (keyinfo->flag & HA_PACK_KEY)
494 {
495 DBUG_PRINT("test",("t_length: %d ref_len: %d",
496 t_length,s_temp.ref_length));
497 DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: 0x%lx",
498 s_temp.n_ref_length,s_temp.n_length, (long) s_temp.key));
499 }
500 #endif
501 if (t_length > 0)
502 {
503 if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
504 {
505 mi_print_error(info->s, HA_ERR_CRASHED);
506 set_my_errno(HA_ERR_CRASHED);
507 DBUG_RETURN(-1);
508 }
509 memmove(key_pos + t_length, key_pos, (size_t) (endpos - key_pos));
510 }
511 else
512 {
513 if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
514 {
515 mi_print_error(info->s, HA_ERR_CRASHED);
516 set_my_errno(HA_ERR_CRASHED);
517 DBUG_RETURN(-1);
518 }
519 memmove(key_pos, key_pos - t_length, (uint) (endpos - key_pos) + t_length);
520 }
521 (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
522 a_length+=t_length;
523 mi_putint(anc_buff,a_length,nod_flag);
524 if (a_length <= keyinfo->block_length)
525 {
526 if (keyinfo->block_length - a_length < 32 &&
527 keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
528 info->s->base.key_reflength <= info->s->rec_reflength &&
529 info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
530 {
531 /*
532 Normal word. One-level tree. Page is almost full.
533 Let's consider converting.
534 We'll compare 'key' and the first key at anc_buff
535 */
536 uchar *a=key, *b=anc_buff+2+nod_flag;
537 uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
538 /* the very first key on the page is always unpacked */
539 assert((*b & 128) == 0);
540 #if HA_FT_MAXLEN >= 127 /* TODO: Undefined symbol */
541 blen= mi_uint2korr(b); b+=2;
542 #else
543 blen= *b++;
544 #endif
545 get_key_length(alen,a);
546 assert(info->ft1_to_ft2==0);
547 if (alen == blen &&
548 ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0, 0)==0)
549 {
550 /* yup. converting */
551 info->ft1_to_ft2=(DYNAMIC_ARRAY *)
552 my_malloc(mi_key_memory_MI_INFO_ft1_to_ft2,
553 sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
554 my_init_dynamic_array(info->ft1_to_ft2,
555 mi_key_memory_MI_INFO_ft1_to_ft2,
556 ft2len, NULL, 300, 50);
557
558 /*
559 now, adding all keys from the page to dynarray
560 if the page is a leaf (if not keys will be deleted later)
561 */
562 if (!nod_flag)
563 {
564 /* let's leave the first key on the page, though, because
565 we cannot easily dispatch an empty page here */
566 b+=blen+ft2len+2;
567 for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
568 {
569 if (insert_dynamic(info->ft1_to_ft2, b))
570 {
571 mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
572 set_my_errno(HA_ERR_OUT_OF_MEM);
573 DBUG_RETURN(-1);
574 }
575 }
576
577 /* fixing the page's length - it contains only one key now */
578 mi_putint(anc_buff,2+blen+ft2len+2,0);
579 }
580 /* the rest will be done when we're back from recursion */
581 }
582 }
583 DBUG_RETURN(0); /* There is room on page */
584 }
585 /* Page is full */
586 if (nod_flag)
587 insert_last=0;
588 if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
589 father_buff && !insert_last)
590 DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
591 father_key_pos,father_page));
592 DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
593 } /* _mi_insert */
594
595
596 /* split a full page in two and assign emerging item to key */
597
_mi_split_page(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * buff,uchar * key_buff,my_bool insert_last_key)598 int _mi_split_page(MI_INFO *info, MI_KEYDEF *keyinfo,
599 uchar *key, uchar *buff, uchar *key_buff,
600 my_bool insert_last_key)
601 {
602 uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
603 uchar *key_pos,*pos, *after_key= NULL;
604 my_off_t new_pos;
605 MI_KEY_PARAM s_temp;
606 DBUG_ENTER("mi_split_page");
607 DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
608
609 if (info->s->keyinfo+info->lastinx == keyinfo)
610 info->page_changed=1; /* Info->buff is used */
611 info->buff_used=1;
612 nod_flag=mi_test_if_nod(buff);
613 key_ref_length=2+nod_flag;
614 if (insert_last_key)
615 key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
616 else
617 key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
618 &after_key);
619 if (!key_pos)
620 DBUG_RETURN(-1);
621
622 length=(uint) (key_pos-buff);
623 a_length=mi_getint(buff);
624 mi_putint(buff,length,nod_flag);
625
626 key_pos=after_key;
627 if (nod_flag)
628 {
629 DBUG_PRINT("test",("Splitting nod"));
630 pos=key_pos-nod_flag;
631 memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
632 }
633
634 /* Move middle item to key and pointer to new page */
635 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
636 DBUG_RETURN(-1);
637 _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
638
639 /* Store new page */
640 if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
641 DBUG_RETURN(-1);
642
643 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
644 (uchar*) 0, (uchar*) 0,
645 key_buff, &s_temp);
646 length=(uint) ((buff+a_length)-key_pos);
647 memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
648 (size_t) length);
649 (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
650 mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
651
652 if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
653 DBUG_RETURN(-1);
654 DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
655 DBUG_RETURN(2); /* Middle key up */
656 } /* _mi_split_page */
657
658
659 /*
660 Calculate how to much to move to split a page in two
661 Returns pointer to start of key.
662 key will contain the key.
663 return_key_length will contain the length of key
664 after_key will contain the position to where the next key starts
665 */
666
_mi_find_half_pos(uint nod_flag,MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)667 uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
668 uchar *key, uint *return_key_length,
669 uchar **after_key)
670 {
671 uint keys,length,key_ref_length;
672 uchar *end,*lastpos;
673 DBUG_ENTER("_mi_find_half_pos");
674
675 key_ref_length=2+nod_flag;
676 length=mi_getint(page)-key_ref_length;
677 page+=key_ref_length;
678 if (!(keyinfo->flag &
679 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
680 HA_BINARY_PACK_KEY)))
681 {
682 key_ref_length=keyinfo->keylength+nod_flag;
683 keys=length/(key_ref_length*2);
684 *return_key_length=keyinfo->keylength;
685 end=page+keys*key_ref_length;
686 *after_key=end+key_ref_length;
687 memcpy(key,end,key_ref_length);
688 DBUG_RETURN(end);
689 }
690
691 end=page+length/2-key_ref_length; /* This is aprox. half */
692 *key='\0';
693 do
694 {
695 lastpos=page;
696 if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
697 DBUG_RETURN(0);
698 } while (page < end);
699 *return_key_length=length;
700 *after_key=page;
701 DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx half: 0x%lx",
702 (long) lastpos, (long) page, (long) end));
703 DBUG_RETURN(lastpos);
704 } /* _mi_find_half_pos */
705
706
707 /*
708 Split buffer at last key
709 Returns pointer to the start of the key before the last key
710 key will contain the last key
711 */
712
_mi_find_last_pos(MI_KEYDEF * keyinfo,uchar * page,uchar * key,uint * return_key_length,uchar ** after_key)713 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
714 uchar *key, uint *return_key_length,
715 uchar **after_key)
716 {
717 uint keys, length, last_length= 0, key_ref_length;
718 uchar *end, *lastpos, *prevpos= NULL;
719 uchar key_buff[MI_MAX_KEY_BUFF];
720 DBUG_ENTER("_mi_find_last_pos");
721
722 key_ref_length=2;
723 length=mi_getint(page)-key_ref_length;
724 page+=key_ref_length;
725 if (!(keyinfo->flag &
726 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
727 HA_BINARY_PACK_KEY)))
728 {
729 keys=length/keyinfo->keylength-2;
730 *return_key_length=length=keyinfo->keylength;
731 end=page+keys*length;
732 *after_key=end+length;
733 memcpy(key,end,length);
734 DBUG_RETURN(end);
735 }
736
737 end=page+length-key_ref_length;
738 *key='\0';
739 length=0;
740 lastpos=page;
741 while (page < end)
742 {
743 prevpos=lastpos; lastpos=page;
744 last_length=length;
745 memcpy(key, key_buff, length); /* previous key */
746 if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
747 {
748 mi_print_error(keyinfo->share, HA_ERR_CRASHED);
749 set_my_errno(HA_ERR_CRASHED);
750 DBUG_RETURN(0);
751 }
752 }
753 *return_key_length=last_length;
754 *after_key=lastpos;
755 DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx end: 0x%lx",
756 (long) prevpos,(long) page,(long) end));
757 DBUG_RETURN(prevpos);
758 } /* _mi_find_last_pos */
759
760
761 /* Balance page with not packed keys with page on right/left */
762 /* returns 0 if balance was done */
763
_mi_balance_page(MI_INFO * info,MI_KEYDEF * keyinfo,uchar * key,uchar * curr_buff,uchar * father_buff,uchar * father_key_pos,my_off_t father_page)764 static int _mi_balance_page(MI_INFO *info, MI_KEYDEF *keyinfo,
765 uchar *key, uchar *curr_buff, uchar *father_buff,
766 uchar *father_key_pos, my_off_t father_page)
767 {
768 my_bool right;
769 uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
770 right_length,left_length,new_right_length,new_left_length,extra_length,
771 length,keys;
772 uchar *pos,*buff,*extra_buff;
773 my_off_t next_page,new_pos;
774 uchar tmp_part_key[MI_MAX_KEY_BUFF];
775 DBUG_ENTER("_mi_balance_page");
776
777 k_length=keyinfo->keylength;
778 father_length=mi_getint(father_buff);
779 father_keylength=k_length+info->s->base.key_reflength;
780 nod_flag=mi_test_if_nod(curr_buff);
781 curr_keylength=k_length+nod_flag;
782 info->page_changed=1;
783
784 if ((father_key_pos != father_buff+father_length &&
785 (info->state->records & 1)) ||
786 father_key_pos == father_buff+2+info->s->base.key_reflength)
787 {
788 right=1;
789 next_page= _mi_kpos(info->s->base.key_reflength,
790 father_key_pos+father_keylength);
791 buff=info->buff;
792 DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
793 }
794 else
795 {
796 right=0;
797 father_key_pos-=father_keylength;
798 next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
799 /* Fix that curr_buff is to left */
800 buff=curr_buff; curr_buff=info->buff;
801 DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
802 } /* father_key_pos ptr to parting key */
803
804 if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
805 goto err;
806 DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
807
808 /* Test if there is room to share keys */
809
810 left_length=mi_getint(curr_buff);
811 right_length=mi_getint(buff);
812 keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
813
814 if ((right ? right_length : left_length) + curr_keylength <=
815 keyinfo->block_length)
816 { /* Merge buffs */
817 new_left_length=2+nod_flag+(keys/2)*curr_keylength;
818 new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
819 mi_putint(curr_buff,new_left_length,nod_flag);
820 mi_putint(buff,new_right_length,nod_flag);
821
822 if (left_length < new_left_length)
823 { /* Move keys buff -> leaf */
824 pos=curr_buff+left_length;
825 memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
826 memcpy((uchar*) pos+k_length, (uchar*) buff+2,
827 (size_t) (length=new_left_length - left_length - k_length));
828 pos=buff+2+length;
829 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
830 memmove((uchar*) buff + 2,
831 (uchar*) pos + k_length, new_right_length - 2);
832 }
833 else
834 { /* Move keys -> buff */
835
836 memmove(buff + new_right_length - right_length + 2,
837 buff + 2, right_length - 2);
838 length=new_right_length-right_length-k_length;
839 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
840 pos=curr_buff+new_left_length;
841 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
842 memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
843 }
844
845 if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
846 _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
847 goto err;
848 DBUG_RETURN(0);
849 }
850
851 /* curr_buff[] and buff[] are full, lets split and make new nod */
852
853 extra_buff=info->buff+info->s->base.max_key_block_length;
854 new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
855 if (keys == 5) /* Too few keys to balance */
856 new_left_length-=curr_keylength;
857 extra_length=nod_flag+left_length+right_length-
858 new_left_length-new_right_length-curr_keylength;
859 DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
860 left_length, right_length,
861 new_left_length, new_right_length,
862 extra_length));
863 mi_putint(curr_buff,new_left_length,nod_flag);
864 mi_putint(buff,new_right_length,nod_flag);
865 mi_putint(extra_buff,extra_length+2,nod_flag);
866
867 /* move first largest keys to new page */
868 pos=buff+right_length-extra_length;
869 memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
870 /* Save new parting key */
871 memcpy(tmp_part_key, pos-k_length,k_length);
872 /* Make place for new keys */
873 memmove(buff + new_right_length - right_length + extra_length + k_length + 2,
874 pos - right_length + extra_length + 2,
875 right_length - extra_length - k_length - 2);
876 /* Copy keys from left page */
877 pos= curr_buff+new_left_length;
878 memcpy((uchar*) buff+2,(uchar*) pos+k_length,
879 (size_t) (length=left_length-new_left_length-k_length));
880 /* Copy old parting key */
881 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
882
883 /* Move new parting keys up to caller */
884 memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
885 memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
886
887 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
888 goto err;
889 _mi_kpointer(info,key+k_length,new_pos);
890 if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
891 DFLT_INIT_HITS,info->buff) ||
892 _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
893 DFLT_INIT_HITS,extra_buff))
894 goto err;
895
896 DBUG_RETURN(1); /* Middle key up */
897
898 err:
899 DBUG_RETURN(-1);
900 } /* _mi_balance_page */
901
902 /**********************************************************************
903 * Bulk insert code *
904 **********************************************************************/
905
906 typedef struct {
907 MI_INFO *info;
908 uint keynr;
909 } bulk_insert_param;
910
_mi_ck_write_tree(MI_INFO * info,uint keynr,uchar * key,uint key_length)911 int _mi_ck_write_tree(MI_INFO *info, uint keynr, uchar *key,
912 uint key_length)
913 {
914 int error;
915 DBUG_ENTER("_mi_ck_write_tree");
916
917 error= tree_insert(&info->bulk_insert[keynr], key,
918 key_length + info->s->rec_reflength,
919 info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
920
921 DBUG_RETURN(error);
922 } /* _mi_ck_write_tree */
923
924
925 /* typeof(_mi_keys_compare)=qsort_cmp2 */
926
keys_compare(bulk_insert_param * param,uchar * key1,uchar * key2)927 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
928 {
929 uint not_used[2];
930 return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
931 key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
932 not_used);
933 }
934
935
keys_free(void * vkey,TREE_FREE mode,const void * vparam)936 static void keys_free(void* vkey, TREE_FREE mode, const void *vparam)
937 {
938 /*
939 Probably I can use info->lastkey here, but I'm not sure,
940 and to be safe I'd better use local lastkey.
941 */
942 uchar lastkey[MI_MAX_KEY_BUFF];
943 uint keylen;
944 MI_KEYDEF *keyinfo;
945 uchar *key= (uchar*)(vkey);
946 bulk_insert_param *param= (bulk_insert_param*)(vparam);
947
948 switch (mode) {
949 case free_init:
950 if (param->info->s->concurrent_insert)
951 {
952 mysql_rwlock_wrlock(¶m->info->s->key_root_lock[param->keynr]);
953 param->info->s->keyinfo[param->keynr].version++;
954 }
955 return;
956 case free_free:
957 keyinfo=param->info->s->keyinfo+param->keynr;
958 keylen=_mi_keylength(keyinfo, key);
959 memcpy(lastkey, key, keylen);
960 _mi_ck_write_btree(param->info,param->keynr,lastkey,
961 keylen - param->info->s->rec_reflength);
962 return;
963 case free_end:
964 if (param->info->s->concurrent_insert)
965 mysql_rwlock_unlock(¶m->info->s->key_root_lock[param->keynr]);
966 return;
967 }
968 return;
969 }
970
971
mi_init_bulk_insert(MI_INFO * info,ulong cache_size,ha_rows rows)972 int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows)
973 {
974 MYISAM_SHARE *share=info->s;
975 MI_KEYDEF *key=share->keyinfo;
976 bulk_insert_param *params;
977 uint i, num_keys, total_keylength;
978 ulonglong key_map;
979 DBUG_ENTER("_mi_init_bulk_insert");
980 DBUG_PRINT("enter",("cache_size: %lu", cache_size));
981
982 assert(!info->bulk_insert &&
983 (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
984
985 mi_clear_all_keys_active(key_map);
986 for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
987 {
988 if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
989 mi_is_key_active(share->state.key_map, i))
990 {
991 num_keys++;
992 mi_set_key_active(key_map, i);
993 total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
994 }
995 }
996
997 if (num_keys==0 ||
998 num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
999 DBUG_RETURN(0);
1000
1001 if (rows && rows*total_keylength < cache_size)
1002 cache_size= (ulong)rows;
1003 else
1004 cache_size/=total_keylength*16;
1005
1006 info->bulk_insert=(TREE *)
1007 my_malloc(mi_key_memory_MI_INFO_bulk_insert,
1008 (sizeof(TREE)*share->base.keys+
1009 sizeof(bulk_insert_param)*num_keys),MYF(0));
1010
1011 if (!info->bulk_insert)
1012 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1013
1014 params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1015 for (i=0 ; i < share->base.keys ; i++)
1016 {
1017 if (mi_is_key_active(key_map, i))
1018 {
1019 params->info=info;
1020 params->keynr=i;
1021 /* Only allocate a 16'th of the buffer at a time */
1022 init_tree(&info->bulk_insert[i],
1023 cache_size * key[i].maxlength,
1024 cache_size * key[i].maxlength, 0,
1025 (qsort_cmp2)keys_compare, 0,
1026 keys_free, (void *)params++);
1027 }
1028 else
1029 info->bulk_insert[i].root=0;
1030 }
1031
1032 DBUG_RETURN(0);
1033 }
1034
mi_flush_bulk_insert(MI_INFO * info,uint inx)1035 void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1036 {
1037 if (info->bulk_insert)
1038 {
1039 if (is_tree_inited(&info->bulk_insert[inx]))
1040 reset_tree(&info->bulk_insert[inx]);
1041 }
1042 }
1043
mi_end_bulk_insert(MI_INFO * info)1044 void mi_end_bulk_insert(MI_INFO *info)
1045 {
1046 if (info->bulk_insert)
1047 {
1048 uint i;
1049 for (i=0 ; i < info->s->base.keys ; i++)
1050 {
1051 if (is_tree_inited(& info->bulk_insert[i]))
1052 {
1053 delete_tree(& info->bulk_insert[i]);
1054 }
1055 }
1056 my_free(info->bulk_insert);
1057 info->bulk_insert=0;
1058 }
1059 }
1060