1 /* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 /*
24 Creates a index for a database by reading keys, sorting them and outputing
25 them in sorted order through SORT_INFO functions.
26 */
27
28 #include <algorithm>
29 #include <functional>
30 #include <my_global.h>
31 #include "fulltext.h"
32 #if defined(__WIN__)
33 #include <fcntl.h>
34 #else
35 #include <stddef.h>
36 #endif
37 #include <queues.h>
38
39 /* static variables */
40
41 #undef MYF_RW
42 #undef DISK_BUFFER_SIZE
43
44 #define MERGEBUFF 15
45 #define MERGEBUFF2 31
46 #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
47 #define DISK_BUFFER_SIZE (IO_SIZE*16)
48
49 class Key_compare :
50 public std::binary_function<const uchar*, const uchar*, bool>
51 {
52 public:
Key_compare(MI_SORT_PARAM * param)53 Key_compare(MI_SORT_PARAM *param) : info(param) {}
operator ()(const uchar * a,const uchar * b)54 bool operator()(const uchar *a, const uchar *b)
55 {
56 return info->key_cmp(info, &a, &b) < 0;
57 }
58 MI_SORT_PARAM *info;
59 };
60
61 /*
62 Pointers of functions for store and read keys from temp file
63 */
64
65 extern void print_error(const char *fmt,...);
66
67 /* Functions defined in this file */
68
69 static ha_rows find_all_keys(MI_SORT_PARAM *info,uint keys,
70 uchar **sort_keys,
71 DYNAMIC_ARRAY *buffpek,int *maxbuffer,
72 IO_CACHE *tempfile,
73 IO_CACHE *tempfile_for_exceptions);
74 static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
75 uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
76 static int write_key(MI_SORT_PARAM *info, uchar *key,
77 IO_CACHE *tempfile);
78 static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
79 uint count);
80 static int merge_many_buff(MI_SORT_PARAM *info,uint keys,
81 uchar * *sort_keys,
82 BUFFPEK *buffpek,int *maxbuffer,
83 IO_CACHE *t_file);
84 static uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
85 uint sort_length);
86 static int merge_buffers(MI_SORT_PARAM *info,uint keys,
87 IO_CACHE *from_file, IO_CACHE *to_file,
88 uchar * *sort_keys, BUFFPEK *lastbuff,
89 BUFFPEK *Fb, BUFFPEK *Tb);
90 static int merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
91 IO_CACHE *);
92 static int flush_ft_buf(MI_SORT_PARAM *info);
93
94 static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
95 uint count, BUFFPEK *buffpek,
96 IO_CACHE *tempfile);
97 static uint read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
98 uint sort_length);
99 static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
100 uchar *key, uint sort_length, uint count);
101 static int write_merge_key_varlen(MI_SORT_PARAM *info,
102 IO_CACHE *to_file,
103 uchar* key, uint sort_length,
104 uint count);
105 static inline int
106 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
107
108 /*
109 Creates a index of sorted keys
110
111 SYNOPSIS
112 _create_index_by_sort()
113 info Sort parameters
114 no_messages Set to 1 if no output
115 sortbuff_size Size if sortbuffer to allocate
116
117 RESULT
118 0 ok
119 <> 0 Error
120 */
121
_create_index_by_sort(MI_SORT_PARAM * info,my_bool no_messages,ulonglong sortbuff_size)122 int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
123 ulonglong sortbuff_size)
124 {
125 int error,maxbuffer,skr;
126 uint sort_length, keys;
127 ulonglong memavl, old_memavl;
128 DYNAMIC_ARRAY buffpek;
129 ha_rows records;
130 uchar **sort_keys;
131 IO_CACHE tempfile, tempfile_for_exceptions;
132 DBUG_ENTER("_create_index_by_sort");
133 DBUG_PRINT("enter",("sort_length: %d", info->key_length));
134
135 if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
136 {
137 info->write_keys=write_keys_varlen;
138 info->read_to_buffer=read_to_buffer_varlen;
139 info->write_key= write_merge_key_varlen;
140 }
141 else
142 {
143 info->write_keys=write_keys;
144 info->read_to_buffer=read_to_buffer;
145 info->write_key=write_merge_key;
146 }
147
148 my_b_clear(&tempfile);
149 my_b_clear(&tempfile_for_exceptions);
150 memset(&buffpek, 0, sizeof(buffpek));
151 sort_keys= (uchar **) NULL; error= 1;
152 maxbuffer=1;
153
154 memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER);
155 records= info->sort_info->max_records;
156 sort_length= info->key_length;
157 LINT_INIT(keys);
158
159 if ((memavl - sizeof(BUFFPEK)) / (sort_length + sizeof(char *)) > UINT_MAX32)
160 memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
161
162 while (memavl >= MIN_SORT_BUFFER)
163 {
164 if ((records < UINT_MAX32) &&
165 ((my_off_t) (records + 1) *
166 (sort_length + sizeof(char*)) <= (my_off_t) memavl))
167 keys= (uint)records+1;
168 else
169 do
170 {
171 skr=maxbuffer;
172 if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
173 (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
174 (sort_length+sizeof(char*))) <= 1 ||
175 keys < (uint) maxbuffer)
176 {
177 mi_check_print_error(info->sort_info->param,
178 "myisam_sort_buffer_size is too small");
179 goto err;
180 }
181 }
182 while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
183
184 if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
185 HA_FT_MAXBYTELEN, MYF(0))))
186 {
187 if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
188 maxbuffer/2))
189 {
190 my_free(sort_keys);
191 sort_keys= 0;
192 }
193 else
194 break;
195 }
196 old_memavl=memavl;
197 if ((memavl= memavl/4*3) < MIN_SORT_BUFFER && old_memavl > MIN_SORT_BUFFER)
198 memavl= MIN_SORT_BUFFER;
199 }
200 if (memavl < MIN_SORT_BUFFER)
201 {
202 mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
203 goto err; /* purecov: tested */
204 }
205 (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
206
207 if (!no_messages)
208 printf(" - Searching for keys, allocating buffer for %d keys\n",keys);
209
210 if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
211 &tempfile,&tempfile_for_exceptions))
212 == HA_POS_ERROR)
213 goto err; /* purecov: tested */
214 if (maxbuffer == 0)
215 {
216 if (!no_messages)
217 printf(" - Dumping %lu keys\n", (ulong) records);
218 if (write_index(info,sort_keys, (uint) records))
219 goto err; /* purecov: inspected */
220 }
221 else
222 {
223 keys=(keys*(sort_length+sizeof(char*)))/sort_length;
224 if (maxbuffer >= MERGEBUFF2)
225 {
226 if (!no_messages)
227 printf(" - Merging %lu keys\n", (ulong) records); /* purecov: tested */
228 if (merge_many_buff(info,keys,sort_keys,
229 dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
230 goto err; /* purecov: inspected */
231 }
232 if (flush_io_cache(&tempfile) ||
233 reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
234 goto err; /* purecov: inspected */
235 if (!no_messages)
236 printf(" - Last merge and dumping keys\n"); /* purecov: tested */
237 if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
238 maxbuffer,&tempfile))
239 goto err; /* purecov: inspected */
240 }
241
242 if (flush_ft_buf(info) || flush_pending_blocks(info))
243 goto err;
244
245 if (my_b_inited(&tempfile_for_exceptions))
246 {
247 MI_INFO *idx=info->sort_info->info;
248 uint keyno=info->key;
249 uint key_length, ref_length=idx->s->rec_reflength;
250
251 if (!no_messages)
252 printf(" - Adding exceptions\n"); /* purecov: tested */
253 if (flush_io_cache(&tempfile_for_exceptions) ||
254 reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
255 goto err;
256
257 while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
258 sizeof(key_length))
259 && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
260 (uint) key_length))
261 {
262 if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
263 goto err;
264 }
265 }
266
267 error =0;
268
269 err:
270 my_free(sort_keys);
271 delete_dynamic(&buffpek);
272 close_cached_file(&tempfile);
273 close_cached_file(&tempfile_for_exceptions);
274
275 DBUG_RETURN(error ? -1 : 0);
276 } /* _create_index_by_sort */
277
278
279 /* Search after all keys and place them in a temp. file */
280
find_all_keys(MI_SORT_PARAM * info,uint keys,uchar ** sort_keys,DYNAMIC_ARRAY * buffpek,int * maxbuffer,IO_CACHE * tempfile,IO_CACHE * tempfile_for_exceptions)281 static ha_rows find_all_keys(MI_SORT_PARAM *info, uint keys,
282 uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
283 int *maxbuffer, IO_CACHE *tempfile,
284 IO_CACHE *tempfile_for_exceptions)
285 {
286 int error;
287 uint idx;
288 DBUG_ENTER("find_all_keys");
289
290 idx=error=0;
291 sort_keys[0]=(uchar*) (sort_keys+keys);
292
293 while (!(error=(*info->key_read)(info,sort_keys[idx])))
294 {
295 if (info->real_key_length > info->key_length)
296 {
297 if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
298 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
299 continue;
300 }
301
302 if (++idx == keys)
303 {
304 if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
305 tempfile))
306 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
307
308 sort_keys[0]=(uchar*) (sort_keys+keys);
309 memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
310 idx=1;
311 }
312 sort_keys[idx]=sort_keys[idx-1]+info->key_length;
313 }
314 if (error > 0)
315 DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
316 if (buffpek->elements)
317 {
318 if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
319 tempfile))
320 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
321 *maxbuffer=buffpek->elements-1;
322 }
323 else
324 *maxbuffer=0;
325
326 DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
327 } /* find_all_keys */
328
329
330 /* Search after all keys and place them in a temp. file */
331
thr_find_all_keys(void * arg)332 pthread_handler_t thr_find_all_keys(void *arg)
333 {
334 MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
335 int error;
336 ulonglong memavl, old_memavl;
337 uint keys, sort_length;
338 uint idx, maxbuffer;
339 uchar **sort_keys=0;
340
341 LINT_INIT(keys);
342
343 error=1;
344
345 if (my_thread_init())
346 goto err;
347
348 { /* Add extra block since DBUG_ENTER declare variables */
349 DBUG_ENTER("thr_find_all_keys");
350 DBUG_PRINT("enter", ("master: %d", sort_param->master));
351 if (sort_param->sort_info->got_error)
352 goto err;
353
354 if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
355 {
356 sort_param->write_keys= write_keys_varlen;
357 sort_param->read_to_buffer= read_to_buffer_varlen;
358 sort_param->write_key= write_merge_key_varlen;
359 }
360 else
361 {
362 sort_param->write_keys= write_keys;
363 sort_param->read_to_buffer= read_to_buffer;
364 sort_param->write_key= write_merge_key;
365 }
366
367 my_b_clear(&sort_param->tempfile);
368 my_b_clear(&sort_param->tempfile_for_exceptions);
369 memset(&sort_param->buffpek, 0, sizeof(sort_param->buffpek));
370 memset(&sort_param->unique, 0, sizeof(sort_param->unique));
371 sort_keys= (uchar **) NULL;
372
373 memavl= MY_MAX(sort_param->sortbuff_size, MIN_SORT_BUFFER);
374 idx= (uint)sort_param->sort_info->max_records;
375 sort_length= sort_param->key_length;
376 maxbuffer= 1;
377
378 if ((memavl - sizeof(BUFFPEK)) / (sort_length +
379 sizeof(char *)) > UINT_MAX32)
380 memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
381
382 while (memavl >= MIN_SORT_BUFFER)
383 {
384 if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
385 (my_off_t) memavl)
386 keys= idx+1;
387 else
388 {
389 uint skr;
390 do
391 {
392 skr= maxbuffer;
393 if (memavl < sizeof(BUFFPEK)*maxbuffer ||
394 (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
395 (sort_length+sizeof(char*))) <= 1 ||
396 keys < (uint) maxbuffer)
397 {
398 mi_check_print_error(sort_param->sort_info->param,
399 "myisam_sort_buffer_size is too small");
400 goto err;
401 }
402 }
403 while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
404 }
405 if ((sort_keys= (uchar**)
406 my_malloc(keys*(sort_length+sizeof(char*))+
407 ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
408 HA_FT_MAXBYTELEN : 0), MYF(0))))
409 {
410 if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
411 maxbuffer, maxbuffer/2))
412 {
413 my_free(sort_keys);
414 sort_keys= (uchar **) NULL; /* for err: label */
415 }
416 else
417 break;
418 }
419 old_memavl= memavl;
420 if ((memavl= memavl / 4 * 3) < MIN_SORT_BUFFER &&
421 old_memavl > MIN_SORT_BUFFER)
422 memavl= MIN_SORT_BUFFER;
423 }
424 if (memavl < MIN_SORT_BUFFER)
425 {
426 mi_check_print_error(sort_param->sort_info->param,
427 "MyISAM sort buffer too small");
428 goto err; /* purecov: tested */
429 }
430
431 if (sort_param->sort_info->param->testflag & T_VERBOSE)
432 printf("Key %d - Allocating buffer for %d keys\n",
433 sort_param->key + 1, keys);
434 sort_param->sort_keys= sort_keys;
435
436 idx= error= 0;
437 sort_keys[0]= (uchar*) (sort_keys+keys);
438
439 DBUG_PRINT("info", ("reading keys"));
440 while (!(error= sort_param->sort_info->got_error) &&
441 !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
442 {
443 if (sort_param->real_key_length > sort_param->key_length)
444 {
445 if (write_key(sort_param, sort_keys[idx],
446 &sort_param->tempfile_for_exceptions))
447 goto err;
448 continue;
449 }
450
451 if (++idx == keys)
452 {
453 if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
454 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
455 &sort_param->tempfile))
456 goto err;
457 sort_keys[0]= (uchar*) (sort_keys+keys);
458 memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
459 idx= 1;
460 }
461 sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
462 }
463 if (error > 0)
464 goto err;
465 if (sort_param->buffpek.elements)
466 {
467 if (sort_param->write_keys(sort_param, sort_keys, idx,
468 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
469 &sort_param->tempfile))
470 goto err;
471 sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
472 }
473 else
474 sort_param->keys= idx;
475
476 sort_param->sort_keys_length= keys;
477 goto ok;
478
479 err:
480 DBUG_PRINT("error", ("got some error"));
481 sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
482 my_free(sort_keys);
483 sort_param->sort_keys= 0;
484 delete_dynamic(& sort_param->buffpek);
485 close_cached_file(&sort_param->tempfile);
486 close_cached_file(&sort_param->tempfile_for_exceptions);
487
488 ok:
489 free_root(&sort_param->wordroot, MYF(0));
490 /*
491 Detach from the share if the writer is involved. Avoid others to
492 be blocked. This includes a flush of the write buffer. This will
493 also indicate EOF to the readers.
494 That means that a writer always gets here first and readers -
495 only when they see EOF. But if a reader finishes prematurely
496 because of an error it may reach this earlier - don't allow it
497 to detach the writer thread.
498 */
499 if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
500 remove_io_thread(&sort_param->sort_info->info->rec_cache);
501
502 /* Readers detach from the share if any. Avoid others to be blocked. */
503 if (sort_param->read_cache.share)
504 remove_io_thread(&sort_param->read_cache);
505
506 mysql_mutex_lock(&sort_param->sort_info->mutex);
507 if (!--sort_param->sort_info->threads_running)
508 mysql_cond_signal(&sort_param->sort_info->cond);
509 mysql_mutex_unlock(&sort_param->sort_info->mutex);
510 DBUG_PRINT("exit", ("======== ending thread ========"));
511 }
512 my_thread_end();
513 return NULL;
514 }
515
516
thr_write_keys(MI_SORT_PARAM * sort_param)517 int thr_write_keys(MI_SORT_PARAM *sort_param)
518 {
519 SORT_INFO *sort_info=sort_param->sort_info;
520 MI_CHECK *param=sort_info->param;
521 ulong UNINIT_VAR(length), keys;
522 ulong *rec_per_key_part=param->rec_per_key_part;
523 int got_error=sort_info->got_error;
524 uint i;
525 MI_INFO *info=sort_info->info;
526 MYISAM_SHARE *share=info->s;
527 MI_SORT_PARAM *sinfo;
528 uchar *mergebuf=0;
529 DBUG_ENTER("thr_write_keys");
530 LINT_INIT(length);
531
532 for (i= 0, sinfo= sort_param ;
533 i < sort_info->total_keys ;
534 i++, sinfo++)
535 {
536 if (!sinfo->sort_keys)
537 {
538 got_error=1;
539 my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
540 continue;
541 }
542 if (!got_error)
543 {
544 mi_set_key_active(share->state.key_map, sinfo->key);
545 if (!sinfo->buffpek.elements)
546 {
547 if (param->testflag & T_VERBOSE)
548 {
549 printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
550 fflush(stdout);
551 }
552 if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
553 flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
554 got_error=1;
555 }
556 }
557 my_free(sinfo->sort_keys);
558 my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
559 sinfo->sort_keys=0;
560 }
561
562 for (i= 0, sinfo= sort_param ;
563 i < sort_info->total_keys ;
564 i++,
565 delete_dynamic(&sinfo->buffpek),
566 close_cached_file(&sinfo->tempfile),
567 close_cached_file(&sinfo->tempfile_for_exceptions),
568 rec_per_key_part+= sinfo->keyinfo->keysegs, sinfo++)
569 {
570 if (got_error)
571 continue;
572 if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
573 {
574 sinfo->write_keys=write_keys_varlen;
575 sinfo->read_to_buffer=read_to_buffer_varlen;
576 sinfo->write_key=write_merge_key_varlen;
577 }
578 else
579 {
580 sinfo->write_keys=write_keys;
581 sinfo->read_to_buffer=read_to_buffer;
582 sinfo->write_key=write_merge_key;
583 }
584 if (sinfo->buffpek.elements)
585 {
586 uint maxbuffer=sinfo->buffpek.elements-1;
587 if (!mergebuf)
588 {
589 length=param->sort_buffer_length;
590 while (length >= MIN_SORT_BUFFER)
591 {
592 if ((mergebuf= (uchar *) my_malloc(length, MYF(0))))
593 break;
594 length=length*3/4;
595 }
596 if (!mergebuf)
597 {
598 got_error=1;
599 continue;
600 }
601 }
602 keys=length/sinfo->key_length;
603 if (maxbuffer >= MERGEBUFF2)
604 {
605 if (param->testflag & T_VERBOSE)
606 printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
607 if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
608 dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
609 (int*) &maxbuffer, &sinfo->tempfile))
610 {
611 got_error=1;
612 continue;
613 }
614 }
615 if (flush_io_cache(&sinfo->tempfile) ||
616 reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
617 {
618 got_error=1;
619 continue;
620 }
621 if (param->testflag & T_VERBOSE)
622 printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
623 if (merge_index(sinfo, keys, (uchar **)mergebuf,
624 dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
625 maxbuffer,&sinfo->tempfile) ||
626 flush_ft_buf(sinfo) ||
627 flush_pending_blocks(sinfo))
628 {
629 got_error=1;
630 continue;
631 }
632 }
633 if (my_b_inited(&sinfo->tempfile_for_exceptions))
634 {
635 uint key_length;
636
637 if (param->testflag & T_VERBOSE)
638 printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
639
640 if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
641 reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
642 {
643 got_error=1;
644 continue;
645 }
646
647 while (!got_error &&
648 !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
649 sizeof(key_length)))
650 {
651 uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
652 if (key_length > sizeof(ft_buf) ||
653 my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
654 (uint)key_length) ||
655 _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
656 key_length - info->s->rec_reflength))
657 got_error=1;
658 }
659 }
660 if (!got_error && param->testflag & T_STATISTICS)
661 update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
662 param->stats_method == MI_STATS_METHOD_IGNORE_NULLS ?
663 sinfo->notnull : NULL,
664 (ulonglong) info->state->records);
665 }
666 my_free(mergebuf);
667 DBUG_RETURN(got_error);
668 }
669
670 /* Write all keys in memory to file for later merge */
671
write_keys(MI_SORT_PARAM * info,register uchar ** sort_keys,uint count,BUFFPEK * buffpek,IO_CACHE * tempfile)672 static int write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
673 uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
674 {
675 uchar **end;
676 uint sort_length=info->key_length;
677 DBUG_ENTER("write_keys");
678
679 std::sort(sort_keys, sort_keys + count, Key_compare(info));
680
681 if (!my_b_inited(tempfile) &&
682 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
683 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
684 DBUG_RETURN(1); /* purecov: inspected */
685
686 buffpek->file_pos=my_b_tell(tempfile);
687 buffpek->count=count;
688
689 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
690 {
691 if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
692 DBUG_RETURN(1); /* purecov: inspected */
693 }
694 DBUG_RETURN(0);
695 } /* write_keys */
696
697
698 static inline int
my_var_write(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * bufs)699 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
700 {
701 int err;
702 uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
703
704 /* The following is safe as this is a local file */
705 if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
706 return (err);
707 if ((err= my_b_write(to_file,bufs, (uint) len)))
708 return (err);
709 return (0);
710 }
711
712
write_keys_varlen(MI_SORT_PARAM * info,register uchar ** sort_keys,uint count,BUFFPEK * buffpek,IO_CACHE * tempfile)713 static int write_keys_varlen(MI_SORT_PARAM *info,
714 register uchar **sort_keys,
715 uint count, BUFFPEK *buffpek,
716 IO_CACHE *tempfile)
717 {
718 uchar **end;
719 int err;
720 DBUG_ENTER("write_keys_varlen");
721
722 std::sort(sort_keys, sort_keys + count, Key_compare(info));
723
724 if (!my_b_inited(tempfile) &&
725 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
726 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
727 DBUG_RETURN(1); /* purecov: inspected */
728
729 buffpek->file_pos=my_b_tell(tempfile);
730 buffpek->count=count;
731 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
732 {
733 if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
734 DBUG_RETURN(err);
735 }
736 DBUG_RETURN(0);
737 } /* write_keys_varlen */
738
739
write_key(MI_SORT_PARAM * info,uchar * key,IO_CACHE * tempfile)740 static int write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile)
741 {
742 uint key_length=info->real_key_length;
743 DBUG_ENTER("write_key");
744
745 if (!my_b_inited(tempfile) &&
746 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
747 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
748 DBUG_RETURN(1);
749
750 if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
751 my_b_write(tempfile,(uchar*)key,(uint) key_length))
752 DBUG_RETURN(1);
753 DBUG_RETURN(0);
754 } /* write_key */
755
756
757 /* Write index */
758
write_index(MI_SORT_PARAM * info,register uchar ** sort_keys,register uint count)759 static int write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
760 register uint count)
761 {
762 DBUG_ENTER("write_index");
763
764 std::sort(sort_keys, sort_keys + count, Key_compare(info));
765
766 while (count--)
767 {
768 if ((*info->key_write)(info,*sort_keys++))
769 DBUG_RETURN(-1); /* purecov: inspected */
770 }
771 DBUG_RETURN(0);
772 } /* write_index */
773
774
775 /* Merge buffers to make < MERGEBUFF2 buffers */
776
merge_many_buff(MI_SORT_PARAM * info,uint keys,uchar ** sort_keys,BUFFPEK * buffpek,int * maxbuffer,IO_CACHE * t_file)777 static int merge_many_buff(MI_SORT_PARAM *info, uint keys,
778 uchar **sort_keys, BUFFPEK *buffpek,
779 int *maxbuffer, IO_CACHE *t_file)
780 {
781 register int i;
782 IO_CACHE t_file2, *from_file, *to_file, *temp;
783 BUFFPEK *lastbuff;
784 DBUG_ENTER("merge_many_buff");
785
786 if (*maxbuffer < MERGEBUFF2)
787 DBUG_RETURN(0); /* purecov: inspected */
788 if (flush_io_cache(t_file) ||
789 open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
790 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
791 DBUG_RETURN(1); /* purecov: inspected */
792
793 from_file= t_file ; to_file= &t_file2;
794 while (*maxbuffer >= MERGEBUFF2)
795 {
796 reinit_io_cache(from_file,READ_CACHE,0L,0,0);
797 reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
798 lastbuff=buffpek;
799 for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
800 {
801 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
802 buffpek+i,buffpek+i+MERGEBUFF-1))
803 goto cleanup;
804 }
805 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
806 buffpek+i,buffpek+ *maxbuffer))
807 break; /* purecov: inspected */
808 if (flush_io_cache(to_file))
809 break; /* purecov: inspected */
810 temp=from_file; from_file=to_file; to_file=temp;
811 *maxbuffer= (int) (lastbuff-buffpek)-1;
812 }
813 cleanup:
814 close_cached_file(to_file); /* This holds old result */
815 if (to_file == t_file)
816 {
817 DBUG_ASSERT(t_file2.type == WRITE_CACHE);
818 *t_file=t_file2; /* Copy result file */
819 t_file->current_pos= &t_file->write_pos;
820 t_file->current_end= &t_file->write_end;
821 }
822
823 DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
824 } /* merge_many_buff */
825
826
827 /*
828 Read data to buffer
829
830 SYNOPSIS
831 read_to_buffer()
832 fromfile File to read from
833 buffpek Where to read from
834 sort_length max length to read
835 RESULT
836 > 0 Ammount of bytes read
837 -1 Error
838 */
839
read_to_buffer(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)840 static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
841 uint sort_length)
842 {
843 register uint count;
844 uint length;
845
846 if ((count=(uint) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
847 {
848 if (mysql_file_pread(fromfile->file, (uchar*) buffpek->base,
849 (length= sort_length*count),
850 buffpek->file_pos, MYF_RW))
851 return((uint) -1); /* purecov: inspected */
852 buffpek->key=buffpek->base;
853 buffpek->file_pos+= length; /* New filepos */
854 buffpek->count-= count;
855 buffpek->mem_count= count;
856 }
857 return (count*sort_length);
858 } /* read_to_buffer */
859
read_to_buffer_varlen(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)860 static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
861 uint sort_length)
862 {
863 register uint count;
864 uint16 length_of_key = 0;
865 uint idx;
866 uchar *buffp;
867
868 if ((count=(uint) MY_MIN((ha_rows) buffpek->max_keys, buffpek->count)))
869 {
870 buffp = buffpek->base;
871
872 for (idx=1;idx<=count;idx++)
873 {
874 if (mysql_file_pread(fromfile->file, (uchar*)&length_of_key,
875 sizeof(length_of_key), buffpek->file_pos, MYF_RW))
876 return((uint) -1);
877 buffpek->file_pos+=sizeof(length_of_key);
878 if (mysql_file_pread(fromfile->file, (uchar*) buffp,
879 length_of_key, buffpek->file_pos, MYF_RW))
880 return((uint) -1);
881 buffpek->file_pos+=length_of_key;
882 buffp = buffp + sort_length;
883 }
884 buffpek->key=buffpek->base;
885 buffpek->count-= count;
886 buffpek->mem_count= count;
887 }
888 return (count*sort_length);
889 } /* read_to_buffer_varlen */
890
891
write_merge_key_varlen(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * key,uint sort_length,uint count)892 static int write_merge_key_varlen(MI_SORT_PARAM *info,
893 IO_CACHE *to_file, uchar* key,
894 uint sort_length, uint count)
895 {
896 uint idx;
897 uchar *bufs = key;
898
899 for (idx=1;idx<=count;idx++)
900 {
901 int err;
902 if ((err= my_var_write(info, to_file, bufs)))
903 return (err);
904 bufs=bufs+sort_length;
905 }
906 return(0);
907 }
908
909
write_merge_key(MI_SORT_PARAM * info MY_ATTRIBUTE ((unused)),IO_CACHE * to_file,uchar * key,uint sort_length,uint count)910 static int write_merge_key(MI_SORT_PARAM *info MY_ATTRIBUTE((unused)),
911 IO_CACHE *to_file, uchar *key,
912 uint sort_length, uint count)
913 {
914 return my_b_write(to_file, key, (size_t) sort_length*count);
915 }
916
917 /*
918 Merge buffers to one buffer
919 If to_file == 0 then use info->key_write
920 */
921
922 static int
merge_buffers(MI_SORT_PARAM * info,uint keys,IO_CACHE * from_file,IO_CACHE * to_file,uchar ** sort_keys,BUFFPEK * lastbuff,BUFFPEK * Fb,BUFFPEK * Tb)923 merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
924 IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
925 BUFFPEK *Fb, BUFFPEK *Tb)
926 {
927 int error;
928 uint sort_length,maxcount;
929 ha_rows count;
930 my_off_t UNINIT_VAR(to_start_filepos);
931 uchar *strpos;
932 BUFFPEK *buffpek,**refpek;
933 QUEUE queue;
934 volatile int *killed= killed_ptr(info->sort_info->param);
935 DBUG_ENTER("merge_buffers");
936
937 count=error=0;
938 maxcount=keys/((uint) (Tb-Fb) +1);
939 DBUG_ASSERT(maxcount > 0);
940 LINT_INIT(to_start_filepos);
941 if (to_file)
942 to_start_filepos=my_b_tell(to_file);
943 strpos=(uchar*) sort_keys;
944 sort_length=info->key_length;
945
946 if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
947 (int (*)(void*, uchar *,uchar*)) info->key_cmp,
948 (void*) info))
949 DBUG_RETURN(1); /* purecov: inspected */
950
951 for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
952 {
953 count+= buffpek->count;
954 buffpek->base= strpos;
955 buffpek->max_keys=maxcount;
956 strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
957 sort_length));
958 if (error == -1)
959 goto err; /* purecov: inspected */
960 queue_insert(&queue,(uchar*) buffpek);
961 }
962
963 while (queue.elements > 1)
964 {
965 for (;;)
966 {
967 if (*killed)
968 {
969 error=1; goto err;
970 }
971 buffpek=(BUFFPEK*) queue_top(&queue);
972 if (to_file)
973 {
974 if (info->write_key(info,to_file,(uchar*) buffpek->key,
975 (uint) sort_length,1))
976 {
977 error=1; goto err; /* purecov: inspected */
978 }
979 }
980 else
981 {
982 if ((*info->key_write)(info,(void*) buffpek->key))
983 {
984 error=1; goto err; /* purecov: inspected */
985 }
986 }
987 buffpek->key+=sort_length;
988 if (! --buffpek->mem_count)
989 {
990 if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
991 {
992 uchar *base=buffpek->base;
993 uint max_keys=buffpek->max_keys;
994
995 (void) queue_remove(&queue,0);
996
997 /* Put room used by buffer to use in other buffer */
998 for (refpek= (BUFFPEK**) &queue_top(&queue);
999 refpek <= (BUFFPEK**) &queue_end(&queue);
1000 refpek++)
1001 {
1002 buffpek= *refpek;
1003 if (buffpek->base+buffpek->max_keys*sort_length == base)
1004 {
1005 buffpek->max_keys+=max_keys;
1006 break;
1007 }
1008 else if (base+max_keys*sort_length == buffpek->base)
1009 {
1010 buffpek->base=base;
1011 buffpek->max_keys+=max_keys;
1012 break;
1013 }
1014 }
1015 break; /* One buffer have been removed */
1016 }
1017 }
1018 else if (error == -1)
1019 goto err; /* purecov: inspected */
1020 queue_replaced(&queue); /* Top element has been replaced */
1021 }
1022 }
1023 buffpek=(BUFFPEK*) queue_top(&queue);
1024 buffpek->base=(uchar *) sort_keys;
1025 buffpek->max_keys=keys;
1026 do
1027 {
1028 if (to_file)
1029 {
1030 if (info->write_key(info,to_file,(uchar*) buffpek->key,
1031 sort_length,buffpek->mem_count))
1032 {
1033 error=1; goto err; /* purecov: inspected */
1034 }
1035 }
1036 else
1037 {
1038 register uchar *end;
1039 strpos= buffpek->key;
1040 for (end=strpos+buffpek->mem_count*sort_length;
1041 strpos != end ;
1042 strpos+=sort_length)
1043 {
1044 if ((*info->key_write)(info,(void*) strpos))
1045 {
1046 error=1; goto err; /* purecov: inspected */
1047 }
1048 }
1049 }
1050 }
1051 while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
1052 error != 0);
1053
1054 lastbuff->count=count;
1055 if (to_file)
1056 lastbuff->file_pos=to_start_filepos;
1057 err:
1058 delete_queue(&queue);
1059 DBUG_RETURN(error);
1060 } /* merge_buffers */
1061
1062
1063 /* Do a merge to output-file (save only positions) */
1064
1065 static int
merge_index(MI_SORT_PARAM * info,uint keys,uchar ** sort_keys,BUFFPEK * buffpek,int maxbuffer,IO_CACHE * tempfile)1066 merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
1067 BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
1068 {
1069 DBUG_ENTER("merge_index");
1070 if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1071 buffpek+maxbuffer))
1072 DBUG_RETURN(1); /* purecov: inspected */
1073 DBUG_RETURN(0);
1074 } /* merge_index */
1075
1076 static int
flush_ft_buf(MI_SORT_PARAM * info)1077 flush_ft_buf(MI_SORT_PARAM *info)
1078 {
1079 int err=0;
1080 if (info->sort_info->ft_buf)
1081 {
1082 err=sort_ft_buf_flush(info);
1083 my_free(info->sort_info->ft_buf);
1084 info->sort_info->ft_buf=0;
1085 }
1086 return err;
1087 }
1088
1089