1 /* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 /*
24 Creates a index for a database by reading keys, sorting them and outputing
25 them in sorted order through SORT_INFO functions.
26 */
27
28 #include <algorithm>
29 #include <functional>
30 #include <my_global.h>
31 #include "fulltext.h"
32 #if defined(__WIN__)
33 #include <fcntl.h>
34 #else
35 #include <stddef.h>
36 #endif
37 #include <queues.h>
38
39 /* static variables */
40
41 #undef MYF_RW
42 #undef DISK_BUFFER_SIZE
43
44 #define MERGEBUFF 15
45 #define MERGEBUFF2 31
46 #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
47 #define DISK_BUFFER_SIZE (IO_SIZE*16)
48
49 class Key_compare :
50 public std::binary_function<const uchar*, const uchar*, bool>
51 {
52 public:
Key_compare(MI_SORT_PARAM * param)53 Key_compare(MI_SORT_PARAM *param) : info(param) {}
operator ()(const uchar * a,const uchar * b)54 bool operator()(const uchar *a, const uchar *b)
55 {
56 return info->key_cmp(info, &a, &b) < 0;
57 }
58 MI_SORT_PARAM *info;
59 };
60
61 /*
62 Pointers of functions for store and read keys from temp file
63 */
64
65 extern void print_error(const char *fmt,...);
66
67 /* Functions defined in this file */
68
69 static ha_rows find_all_keys(MI_SORT_PARAM *info, ulong keys,
70 uchar **sort_keys,
71 DYNAMIC_ARRAY *buffpek, long *maxbuffer,
72 IO_CACHE *tempfile,
73 IO_CACHE *tempfile_for_exceptions);
74 static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
75 ulong count, BUFFPEK *buffpek,IO_CACHE *tempfile);
76 static int write_key(MI_SORT_PARAM *info, uchar *key,
77 IO_CACHE *tempfile);
78 static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
79 ulong count);
80 static int merge_many_buff(MI_SORT_PARAM *info, ulong keys,
81 uchar * *sort_keys,
82 BUFFPEK *buffpek, long *maxbuffer,
83 IO_CACHE *t_file);
84 static ulong read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
85 uint sort_length);
86 static int merge_buffers(MI_SORT_PARAM *info, ulong keys,
87 IO_CACHE *from_file, IO_CACHE *to_file,
88 uchar * *sort_keys, BUFFPEK *lastbuff,
89 BUFFPEK *Fb, BUFFPEK *Tb);
90 static int merge_index(MI_SORT_PARAM *, ulong, uchar **, BUFFPEK *, long,
91 IO_CACHE *);
92 static int flush_ft_buf(MI_SORT_PARAM *info);
93
94 static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
95 ulong count, BUFFPEK *buffpek,
96 IO_CACHE *tempfile);
97 static ulong read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
98 uint sort_length);
99 static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
100 uchar *key, uint sort_length, ulong count);
101 static int write_merge_key_varlen(MI_SORT_PARAM *info,
102 IO_CACHE *to_file,
103 uchar* key, uint sort_length,
104 ulong count);
105 static inline int
106 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
107
108 /*
109 Creates a index of sorted keys
110
111 SYNOPSIS
112 _create_index_by_sort()
113 info Sort parameters
114 no_messages Set to 1 if no output
115 sortbuff_size Size if sortbuffer to allocate
116
117 RESULT
118 0 ok
119 <> 0 Error
120 */
121
_create_index_by_sort(MI_SORT_PARAM * info,my_bool no_messages,ulonglong sortbuff_size)122 int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
123 ulonglong sortbuff_size)
124 {
125 int error;
126 long maxbuffer,skr;
127 ulong sort_length, keys;
128 ulonglong memavl, old_memavl;
129 DYNAMIC_ARRAY buffpek;
130 ha_rows records;
131 uchar **sort_keys;
132 IO_CACHE tempfile, tempfile_for_exceptions;
133 DBUG_ENTER("_create_index_by_sort");
134 DBUG_PRINT("enter",("sort_length: %d", info->key_length));
135
136 if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
137 {
138 info->write_keys=write_keys_varlen;
139 info->read_to_buffer=read_to_buffer_varlen;
140 info->write_key= write_merge_key_varlen;
141 }
142 else
143 {
144 info->write_keys=write_keys;
145 info->read_to_buffer=read_to_buffer;
146 info->write_key=write_merge_key;
147 }
148
149 my_b_clear(&tempfile);
150 my_b_clear(&tempfile_for_exceptions);
151 memset(&buffpek, 0, sizeof(buffpek));
152 sort_keys= (uchar **) NULL; error= 1;
153 maxbuffer=1;
154
155 memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER);
156 records= info->sort_info->max_records;
157 sort_length= info->key_length;
158 LINT_INIT(keys);
159
160 if ((memavl - sizeof(BUFFPEK)) / (sort_length + sizeof(char *)) > UINT_MAX32)
161 memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
162
163 while (memavl >= MIN_SORT_BUFFER)
164 {
165 if ((records < ULONG_MAX) &&
166 ((my_off_t) (records + 1) *
167 (sort_length + sizeof(char*)) <= (my_off_t) memavl))
168 keys= (ulong) records + 1;
169 else
170 do
171 {
172 skr=maxbuffer;
173 if (memavl < sizeof(BUFFPEK) * (ulong) maxbuffer ||
174 (keys = (memavl - sizeof(BUFFPEK) * (ulong) maxbuffer) /
175 (sort_length+sizeof(char*))) <= 1 ||
176 keys < (ulong) maxbuffer)
177 {
178 mi_check_print_error(info->sort_info->param,
179 "myisam_sort_buffer_size is too small");
180 goto err;
181 }
182 }
183 while ((maxbuffer= (long) (records / (keys - 1) + 1)) != skr);
184
185 if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
186 HA_FT_MAXBYTELEN, MYF(0))))
187 {
188 if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
189 maxbuffer/2))
190 {
191 my_free(sort_keys);
192 sort_keys= 0;
193 }
194 else
195 break;
196 }
197 old_memavl=memavl;
198 if ((memavl= memavl/4*3) < MIN_SORT_BUFFER && old_memavl > MIN_SORT_BUFFER)
199 memavl= MIN_SORT_BUFFER;
200 }
201 if (memavl < MIN_SORT_BUFFER)
202 {
203 mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
204 goto err; /* purecov: tested */
205 }
206 (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
207
208 if (!no_messages)
209 printf(" - Searching for keys, allocating buffer for %lu keys\n", keys);
210
211 if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
212 &tempfile,&tempfile_for_exceptions))
213 == HA_POS_ERROR)
214 goto err; /* purecov: tested */
215 if (maxbuffer == 0)
216 {
217 if (!no_messages)
218 printf(" - Dumping %lu keys\n", (ulong) records);
219 if (write_index(info,sort_keys, (ulong) records))
220 goto err; /* purecov: inspected */
221 }
222 else
223 {
224 keys=(keys*(sort_length+sizeof(char*)))/sort_length;
225 if (maxbuffer >= MERGEBUFF2)
226 {
227 if (!no_messages)
228 printf(" - Merging %lu keys\n", (ulong) records); /* purecov: tested */
229 if (merge_many_buff(info,keys,sort_keys,
230 dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
231 goto err; /* purecov: inspected */
232 }
233 if (flush_io_cache(&tempfile) ||
234 reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
235 goto err; /* purecov: inspected */
236 if (!no_messages)
237 printf(" - Last merge and dumping keys\n"); /* purecov: tested */
238 if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
239 maxbuffer,&tempfile))
240 goto err; /* purecov: inspected */
241 }
242
243 if (flush_ft_buf(info) || flush_pending_blocks(info))
244 goto err;
245
246 if (my_b_inited(&tempfile_for_exceptions))
247 {
248 MI_INFO *idx=info->sort_info->info;
249 uint keyno=info->key;
250 uint key_length, ref_length=idx->s->rec_reflength;
251
252 if (!no_messages)
253 printf(" - Adding exceptions\n"); /* purecov: tested */
254 if (flush_io_cache(&tempfile_for_exceptions) ||
255 reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
256 goto err;
257
258 while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
259 sizeof(key_length))
260 && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
261 (uint) key_length))
262 {
263 if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
264 goto err;
265 }
266 }
267
268 error =0;
269
270 err:
271 my_free(sort_keys);
272 delete_dynamic(&buffpek);
273 close_cached_file(&tempfile);
274 close_cached_file(&tempfile_for_exceptions);
275
276 DBUG_RETURN(error ? -1 : 0);
277 } /* _create_index_by_sort */
278
279
280 /* Search after all keys and place them in a temp. file */
281
find_all_keys(MI_SORT_PARAM * info,ulong keys,uchar ** sort_keys,DYNAMIC_ARRAY * buffpek,long * maxbuffer,IO_CACHE * tempfile,IO_CACHE * tempfile_for_exceptions)282 static ha_rows find_all_keys(MI_SORT_PARAM *info, ulong keys,
283 uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
284 long *maxbuffer, IO_CACHE *tempfile,
285 IO_CACHE *tempfile_for_exceptions)
286 {
287 int error;
288 ulong idx;
289 DBUG_ENTER("find_all_keys");
290
291 idx=error=0;
292 sort_keys[0]=(uchar*) (sort_keys+keys);
293
294 while (!(error=(*info->key_read)(info,sort_keys[idx])))
295 {
296 if (info->real_key_length > info->key_length)
297 {
298 if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
299 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
300 continue;
301 }
302
303 if (++idx == keys)
304 {
305 if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
306 tempfile))
307 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
308
309 sort_keys[0]=(uchar*) (sort_keys+keys);
310 memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
311 idx=1;
312 }
313 sort_keys[idx]=sort_keys[idx-1]+info->key_length;
314 }
315 if (error > 0)
316 DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
317 if (buffpek->elements)
318 {
319 if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
320 tempfile))
321 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
322 *maxbuffer=buffpek->elements-1;
323 }
324 else
325 *maxbuffer=0;
326
327 DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
328 } /* find_all_keys */
329
330
331 /* Search after all keys and place them in a temp. file */
332
thr_find_all_keys(void * arg)333 pthread_handler_t thr_find_all_keys(void *arg)
334 {
335 MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
336 int error;
337 ulonglong memavl, old_memavl;
338 ulong keys, sort_length;
339 ulong idx, maxbuffer;
340 uchar **sort_keys=0;
341
342 LINT_INIT(keys);
343
344 error=1;
345
346 if (my_thread_init())
347 goto err;
348
349 { /* Add extra block since DBUG_ENTER declare variables */
350 DBUG_ENTER("thr_find_all_keys");
351 DBUG_PRINT("enter", ("master: %d", sort_param->master));
352 if (sort_param->sort_info->got_error)
353 goto err;
354
355 if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
356 {
357 sort_param->write_keys= write_keys_varlen;
358 sort_param->read_to_buffer= read_to_buffer_varlen;
359 sort_param->write_key= write_merge_key_varlen;
360 }
361 else
362 {
363 sort_param->write_keys= write_keys;
364 sort_param->read_to_buffer= read_to_buffer;
365 sort_param->write_key= write_merge_key;
366 }
367
368 my_b_clear(&sort_param->tempfile);
369 my_b_clear(&sort_param->tempfile_for_exceptions);
370 memset(&sort_param->buffpek, 0, sizeof(sort_param->buffpek));
371 memset(&sort_param->unique, 0, sizeof(sort_param->unique));
372 sort_keys= (uchar **) NULL;
373
374 memavl= MY_MAX(sort_param->sortbuff_size, MIN_SORT_BUFFER);
375 idx= (ulong) sort_param->sort_info->max_records;
376 sort_length= sort_param->key_length;
377 maxbuffer= 1;
378
379 if ((memavl - sizeof(BUFFPEK)) / (sort_length +
380 sizeof(char *)) > UINT_MAX32)
381 memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
382
383 while (memavl >= MIN_SORT_BUFFER)
384 {
385 if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
386 (my_off_t) memavl)
387 keys= idx+1;
388 else
389 {
390 ulong skr;
391 do
392 {
393 skr= maxbuffer;
394 if (memavl < sizeof(BUFFPEK)*maxbuffer ||
395 (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
396 (sort_length+sizeof(char*))) <= 1 ||
397 keys < maxbuffer)
398 {
399 mi_check_print_error(sort_param->sort_info->param,
400 "myisam_sort_buffer_size is too small");
401 goto err;
402 }
403 }
404 while ((maxbuffer= (idx/(keys-1)+1)) != skr);
405 }
406 if ((sort_keys= (uchar**)
407 my_malloc(keys*(sort_length+sizeof(char*))+
408 ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
409 HA_FT_MAXBYTELEN : 0), MYF(0))))
410 {
411 if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
412 maxbuffer, maxbuffer/2))
413 {
414 my_free(sort_keys);
415 sort_keys= (uchar **) NULL; /* for err: label */
416 }
417 else
418 break;
419 }
420 old_memavl= memavl;
421 if ((memavl= memavl / 4 * 3) < MIN_SORT_BUFFER &&
422 old_memavl > MIN_SORT_BUFFER)
423 memavl= MIN_SORT_BUFFER;
424 }
425 if (memavl < MIN_SORT_BUFFER)
426 {
427 mi_check_print_error(sort_param->sort_info->param,
428 "MyISAM sort buffer too small");
429 goto err; /* purecov: tested */
430 }
431
432 if (sort_param->sort_info->param->testflag & T_VERBOSE)
433 printf("Key %d - Allocating buffer for %lu keys\n",
434 sort_param->key + 1, keys);
435 sort_param->sort_keys= sort_keys;
436
437 idx= error= 0;
438 sort_keys[0]= (uchar*) (sort_keys+keys);
439
440 DBUG_PRINT("info", ("reading keys"));
441 while (!(error= sort_param->sort_info->got_error) &&
442 !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
443 {
444 if (sort_param->real_key_length > sort_param->key_length)
445 {
446 if (write_key(sort_param, sort_keys[idx],
447 &sort_param->tempfile_for_exceptions))
448 goto err;
449 continue;
450 }
451
452 if (++idx == keys)
453 {
454 if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
455 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
456 &sort_param->tempfile))
457 goto err;
458 sort_keys[0]= (uchar*) (sort_keys+keys);
459 memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
460 idx= 1;
461 }
462 sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
463 }
464 if (error > 0)
465 goto err;
466 if (sort_param->buffpek.elements)
467 {
468 if (sort_param->write_keys(sort_param, sort_keys, idx,
469 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
470 &sort_param->tempfile))
471 goto err;
472 sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
473 }
474 else
475 sort_param->keys= idx;
476
477 sort_param->sort_keys_length= keys;
478 goto ok;
479
480 err:
481 DBUG_PRINT("error", ("got some error"));
482 sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
483 my_free(sort_keys);
484 sort_param->sort_keys= 0;
485 delete_dynamic(& sort_param->buffpek);
486 close_cached_file(&sort_param->tempfile);
487 close_cached_file(&sort_param->tempfile_for_exceptions);
488
489 ok:
490 free_root(&sort_param->wordroot, MYF(0));
491 /*
492 Detach from the share if the writer is involved. Avoid others to
493 be blocked. This includes a flush of the write buffer. This will
494 also indicate EOF to the readers.
495 That means that a writer always gets here first and readers -
496 only when they see EOF. But if a reader finishes prematurely
497 because of an error it may reach this earlier - don't allow it
498 to detach the writer thread.
499 */
500 if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
501 remove_io_thread(&sort_param->sort_info->info->rec_cache);
502
503 /* Readers detach from the share if any. Avoid others to be blocked. */
504 if (sort_param->read_cache.share)
505 remove_io_thread(&sort_param->read_cache);
506
507 mysql_mutex_lock(&sort_param->sort_info->mutex);
508 if (!--sort_param->sort_info->threads_running)
509 mysql_cond_signal(&sort_param->sort_info->cond);
510 mysql_mutex_unlock(&sort_param->sort_info->mutex);
511 DBUG_PRINT("exit", ("======== ending thread ========"));
512 }
513 my_thread_end();
514 return NULL;
515 }
516
517
thr_write_keys(MI_SORT_PARAM * sort_param)518 int thr_write_keys(MI_SORT_PARAM *sort_param)
519 {
520 SORT_INFO *sort_info=sort_param->sort_info;
521 MI_CHECK *param=sort_info->param;
522 ulong UNINIT_VAR(length), keys;
523 ulong *rec_per_key_part=param->rec_per_key_part;
524 int got_error=sort_info->got_error;
525 uint i;
526 MI_INFO *info=sort_info->info;
527 MYISAM_SHARE *share=info->s;
528 MI_SORT_PARAM *sinfo;
529 uchar *mergebuf=0;
530 DBUG_ENTER("thr_write_keys");
531 LINT_INIT(length);
532
533 for (i= 0, sinfo= sort_param ;
534 i < sort_info->total_keys ;
535 i++, sinfo++)
536 {
537 if (!sinfo->sort_keys)
538 {
539 got_error=1;
540 my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
541 continue;
542 }
543 if (!got_error)
544 {
545 mi_set_key_active(share->state.key_map, sinfo->key);
546 if (!sinfo->buffpek.elements)
547 {
548 if (param->testflag & T_VERBOSE)
549 {
550 printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
551 fflush(stdout);
552 }
553 if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
554 flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
555 got_error=1;
556 }
557 }
558 my_free(sinfo->sort_keys);
559 my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
560 sinfo->sort_keys=0;
561 }
562
563 for (i= 0, sinfo= sort_param ;
564 i < sort_info->total_keys ;
565 i++,
566 delete_dynamic(&sinfo->buffpek),
567 close_cached_file(&sinfo->tempfile),
568 close_cached_file(&sinfo->tempfile_for_exceptions),
569 rec_per_key_part+= sinfo->keyinfo->keysegs, sinfo++)
570 {
571 if (got_error)
572 continue;
573 if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
574 {
575 sinfo->write_keys=write_keys_varlen;
576 sinfo->read_to_buffer=read_to_buffer_varlen;
577 sinfo->write_key=write_merge_key_varlen;
578 }
579 else
580 {
581 sinfo->write_keys=write_keys;
582 sinfo->read_to_buffer=read_to_buffer;
583 sinfo->write_key=write_merge_key;
584 }
585 if (sinfo->buffpek.elements)
586 {
587 ulong maxbuffer=sinfo->buffpek.elements-1;
588 if (!mergebuf)
589 {
590 length=param->sort_buffer_length;
591 while (length >= MIN_SORT_BUFFER)
592 {
593 if ((mergebuf= (uchar *) my_malloc(length, MYF(0))))
594 break;
595 length=length*3/4;
596 }
597 if (!mergebuf)
598 {
599 got_error=1;
600 continue;
601 }
602 }
603 keys=length/sinfo->key_length;
604 if (maxbuffer >= MERGEBUFF2)
605 {
606 if (param->testflag & T_VERBOSE)
607 printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
608 if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
609 dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
610 (long *) &maxbuffer, &sinfo->tempfile))
611 {
612 got_error=1;
613 continue;
614 }
615 }
616 if (flush_io_cache(&sinfo->tempfile) ||
617 reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
618 {
619 got_error=1;
620 continue;
621 }
622 if (param->testflag & T_VERBOSE)
623 printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
624 if (merge_index(sinfo, keys, (uchar **)mergebuf,
625 dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
626 maxbuffer,&sinfo->tempfile) ||
627 flush_ft_buf(sinfo) ||
628 flush_pending_blocks(sinfo))
629 {
630 got_error=1;
631 continue;
632 }
633 }
634 if (my_b_inited(&sinfo->tempfile_for_exceptions))
635 {
636 uint key_length;
637
638 if (param->testflag & T_VERBOSE)
639 printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
640
641 if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
642 reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
643 {
644 got_error=1;
645 continue;
646 }
647
648 while (!got_error &&
649 !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
650 sizeof(key_length)))
651 {
652 uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
653 if (key_length > sizeof(ft_buf) ||
654 my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
655 (uint)key_length) ||
656 _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
657 key_length - info->s->rec_reflength))
658 got_error=1;
659 }
660 }
661 if (!got_error && param->testflag & T_STATISTICS)
662 update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
663 param->stats_method == MI_STATS_METHOD_IGNORE_NULLS ?
664 sinfo->notnull : NULL,
665 (ulonglong) info->state->records);
666 }
667 my_free(mergebuf);
668 DBUG_RETURN(got_error);
669 }
670
671 /* Write all keys in memory to file for later merge */
672
write_keys(MI_SORT_PARAM * info,uchar ** sort_keys,ulong count,BUFFPEK * buffpek,IO_CACHE * tempfile)673 static int write_keys(MI_SORT_PARAM *info, uchar **sort_keys,
674 ulong count, BUFFPEK *buffpek, IO_CACHE *tempfile)
675 {
676 uchar **end;
677 uint sort_length=info->key_length;
678 DBUG_ENTER("write_keys");
679
680 std::sort(sort_keys, sort_keys + count, Key_compare(info));
681
682 if (!my_b_inited(tempfile) &&
683 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
684 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
685 DBUG_RETURN(1); /* purecov: inspected */
686
687 buffpek->file_pos=my_b_tell(tempfile);
688 buffpek->count=count;
689
690 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
691 {
692 if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
693 DBUG_RETURN(1); /* purecov: inspected */
694 }
695 DBUG_RETURN(0);
696 } /* write_keys */
697
698
699 static inline int
my_var_write(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * bufs)700 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
701 {
702 int err;
703 uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
704
705 /* The following is safe as this is a local file */
706 if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
707 return (err);
708 if ((err= my_b_write(to_file,bufs, (uint) len)))
709 return (err);
710 return (0);
711 }
712
713
write_keys_varlen(MI_SORT_PARAM * info,uchar ** sort_keys,ulong count,BUFFPEK * buffpek,IO_CACHE * tempfile)714 static int write_keys_varlen(MI_SORT_PARAM *info,
715 uchar **sort_keys,
716 ulong count, BUFFPEK *buffpek,
717 IO_CACHE *tempfile)
718 {
719 uchar **end;
720 int err;
721 DBUG_ENTER("write_keys_varlen");
722
723 std::sort(sort_keys, sort_keys + count, Key_compare(info));
724
725 if (!my_b_inited(tempfile) &&
726 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
727 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
728 DBUG_RETURN(1); /* purecov: inspected */
729
730 buffpek->file_pos=my_b_tell(tempfile);
731 buffpek->count=count;
732 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
733 {
734 if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
735 DBUG_RETURN(err);
736 }
737 DBUG_RETURN(0);
738 } /* write_keys_varlen */
739
740
write_key(MI_SORT_PARAM * info,uchar * key,IO_CACHE * tempfile)741 static int write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile)
742 {
743 uint key_length=info->real_key_length;
744 DBUG_ENTER("write_key");
745
746 if (!my_b_inited(tempfile) &&
747 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
748 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
749 DBUG_RETURN(1);
750
751 if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
752 my_b_write(tempfile,(uchar*)key,(uint) key_length))
753 DBUG_RETURN(1);
754 DBUG_RETURN(0);
755 } /* write_key */
756
757
758 /* Write index */
759
write_index(MI_SORT_PARAM * info,uchar ** sort_keys,ulong count)760 static int write_index(MI_SORT_PARAM *info, uchar **sort_keys,
761 ulong count)
762 {
763 DBUG_ENTER("write_index");
764
765 std::sort(sort_keys, sort_keys + count, Key_compare(info));
766
767 while (count--)
768 {
769 if ((*info->key_write)(info,*sort_keys++))
770 DBUG_RETURN(-1); /* purecov: inspected */
771 }
772 DBUG_RETURN(0);
773 } /* write_index */
774
775
776 /* Merge buffers to make < MERGEBUFF2 buffers */
777
merge_many_buff(MI_SORT_PARAM * info,ulong keys,uchar ** sort_keys,BUFFPEK * buffpek,long * maxbuffer,IO_CACHE * t_file)778 static int merge_many_buff(MI_SORT_PARAM *info, ulong keys,
779 uchar **sort_keys, BUFFPEK *buffpek,
780 long *maxbuffer, IO_CACHE *t_file)
781 {
782 long i;
783 IO_CACHE t_file2, *from_file, *to_file, *temp;
784 BUFFPEK *lastbuff;
785 DBUG_ENTER("merge_many_buff");
786
787 if (*maxbuffer < MERGEBUFF2)
788 DBUG_RETURN(0); /* purecov: inspected */
789 if (flush_io_cache(t_file) ||
790 open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
791 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
792 DBUG_RETURN(1); /* purecov: inspected */
793
794 from_file= t_file ; to_file= &t_file2;
795 while (*maxbuffer >= MERGEBUFF2)
796 {
797 reinit_io_cache(from_file,READ_CACHE,0L,0,0);
798 reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
799 lastbuff=buffpek;
800 for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
801 {
802 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
803 buffpek+i,buffpek+i+MERGEBUFF-1))
804 goto cleanup;
805 }
806 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
807 buffpek+i,buffpek+ *maxbuffer))
808 break; /* purecov: inspected */
809 if (flush_io_cache(to_file))
810 break; /* purecov: inspected */
811 temp=from_file; from_file=to_file; to_file=temp;
812 *maxbuffer= (long) (lastbuff-buffpek)-1;
813 }
814 cleanup:
815 close_cached_file(to_file); /* This holds old result */
816 if (to_file == t_file)
817 {
818 DBUG_ASSERT(t_file2.type == WRITE_CACHE);
819 *t_file=t_file2; /* Copy result file */
820 t_file->current_pos= &t_file->write_pos;
821 t_file->current_end= &t_file->write_end;
822 }
823
824 DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
825 } /* merge_many_buff */
826
827
828 /*
829 Read data to buffer
830
831 SYNOPSIS
832 read_to_buffer()
833 fromfile File to read from
834 buffpek Where to read from
835 sort_length max length to read
836 RESULT
837 > 0 Ammount of bytes read
838 -1 Error
839 */
840
read_to_buffer(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)841 static ulong read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
842 uint sort_length)
843 {
844 ulong count;
845 ulong length;
846
847 if ((count=(ulong) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
848 {
849 if (mysql_file_pread(fromfile->file, (uchar*) buffpek->base,
850 (length= sort_length*count),
851 buffpek->file_pos, MYF_RW))
852 return((ulong) -1); /* purecov: inspected */
853 buffpek->key=buffpek->base;
854 buffpek->file_pos+= length; /* New filepos */
855 buffpek->count-= count;
856 buffpek->mem_count= count;
857 }
858 return (count*sort_length);
859 } /* read_to_buffer */
860
read_to_buffer_varlen(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)861 static ulong read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
862 uint sort_length)
863 {
864 ulong count;
865 uint16 length_of_key = 0;
866 ulong idx;
867 uchar *buffp;
868
869 if ((count=(ulong) MY_MIN((ha_rows) buffpek->max_keys, buffpek->count)))
870 {
871 buffp = buffpek->base;
872
873 for (idx=1;idx<=count;idx++)
874 {
875 if (mysql_file_pread(fromfile->file, (uchar*)&length_of_key,
876 sizeof(length_of_key), buffpek->file_pos, MYF_RW))
877 return((ulong) -1);
878 buffpek->file_pos+=sizeof(length_of_key);
879 if (mysql_file_pread(fromfile->file, (uchar*) buffp,
880 length_of_key, buffpek->file_pos, MYF_RW))
881 return((ulong) -1);
882 buffpek->file_pos+=length_of_key;
883 buffp = buffp + sort_length;
884 }
885 buffpek->key=buffpek->base;
886 buffpek->count-= count;
887 buffpek->mem_count= count;
888 }
889 return (count*sort_length);
890 } /* read_to_buffer_varlen */
891
892
write_merge_key_varlen(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * key,uint sort_length,ulong count)893 static int write_merge_key_varlen(MI_SORT_PARAM *info,
894 IO_CACHE *to_file, uchar* key,
895 uint sort_length, ulong count)
896 {
897 ulong idx;
898 uchar *bufs = key;
899
900 for (idx=1;idx<=count;idx++)
901 {
902 int err;
903 if ((err= my_var_write(info, to_file, bufs)))
904 return (err);
905 bufs=bufs+sort_length;
906 }
907 return(0);
908 }
909
910
write_merge_key(MI_SORT_PARAM * info MY_ATTRIBUTE ((unused)),IO_CACHE * to_file,uchar * key,uint sort_length,ulong count)911 static int write_merge_key(MI_SORT_PARAM *info MY_ATTRIBUTE((unused)),
912 IO_CACHE *to_file, uchar *key,
913 uint sort_length, ulong count)
914 {
915 return my_b_write(to_file, key, (size_t) sort_length*count);
916 }
917
918 /*
919 Merge buffers to one buffer
920 If to_file == 0 then use info->key_write
921 */
922
923 static int
merge_buffers(MI_SORT_PARAM * info,ulong keys,IO_CACHE * from_file,IO_CACHE * to_file,uchar ** sort_keys,BUFFPEK * lastbuff,BUFFPEK * Fb,BUFFPEK * Tb)924 merge_buffers(MI_SORT_PARAM *info, ulong keys, IO_CACHE *from_file,
925 IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
926 BUFFPEK *Fb, BUFFPEK *Tb)
927 {
928 ulong error;
929 uint sort_length;
930 ulong maxcount;
931 ha_rows count;
932 my_off_t UNINIT_VAR(to_start_filepos);
933 uchar *strpos;
934 BUFFPEK *buffpek,**refpek;
935 QUEUE queue;
936 volatile int *killed= killed_ptr(info->sort_info->param);
937 DBUG_ENTER("merge_buffers");
938
939 count=error=0;
940 maxcount= keys / ((ulong) (Tb-Fb) + 1);
941 DBUG_ASSERT(maxcount > 0);
942 LINT_INIT(to_start_filepos);
943 if (to_file)
944 to_start_filepos=my_b_tell(to_file);
945 strpos=(uchar*) sort_keys;
946 sort_length=info->key_length;
947
948 if (init_queue(&queue, (uint) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
949 (int (*)(void*, uchar *,uchar*)) info->key_cmp,
950 (void*) info))
951 DBUG_RETURN(1); /* purecov: inspected */
952
953 for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
954 {
955 count+= buffpek->count;
956 buffpek->base= strpos;
957 buffpek->max_keys=maxcount;
958 strpos+= (error= info->read_to_buffer(from_file,buffpek, sort_length));
959 if (error == (ulong) -1)
960 goto err; /* purecov: inspected */
961 queue_insert(&queue,(uchar*) buffpek);
962 }
963
964 while (queue.elements > 1)
965 {
966 for (;;)
967 {
968 if (*killed)
969 {
970 error=1; goto err;
971 }
972 buffpek=(BUFFPEK*) queue_top(&queue);
973 if (to_file)
974 {
975 if (info->write_key(info,to_file,(uchar*) buffpek->key,
976 (uint) sort_length,1))
977 {
978 error=1; goto err; /* purecov: inspected */
979 }
980 }
981 else
982 {
983 if ((*info->key_write)(info,(void*) buffpek->key))
984 {
985 error=1; goto err; /* purecov: inspected */
986 }
987 }
988 buffpek->key+=sort_length;
989 if (! --buffpek->mem_count)
990 {
991 if (!(error= info->read_to_buffer(from_file,buffpek,sort_length)))
992 {
993 uchar *base=buffpek->base;
994 ulong max_keys=buffpek->max_keys;
995
996 (void) queue_remove(&queue,0);
997
998 /* Put room used by buffer to use in other buffer */
999 for (refpek= (BUFFPEK**) &queue_top(&queue);
1000 refpek <= (BUFFPEK**) &queue_end(&queue);
1001 refpek++)
1002 {
1003 buffpek= *refpek;
1004 if (buffpek->base+buffpek->max_keys*sort_length == base)
1005 {
1006 buffpek->max_keys+=max_keys;
1007 break;
1008 }
1009 else if (base+max_keys*sort_length == buffpek->base)
1010 {
1011 buffpek->base=base;
1012 buffpek->max_keys+=max_keys;
1013 break;
1014 }
1015 }
1016 break; /* One buffer have been removed */
1017 }
1018 }
1019 else if (error == (ulong) -1)
1020 goto err; /* purecov: inspected */
1021 queue_replaced(&queue); /* Top element has been replaced */
1022 }
1023 }
1024 buffpek=(BUFFPEK*) queue_top(&queue);
1025 buffpek->base=(uchar *) sort_keys;
1026 buffpek->max_keys=keys;
1027 do
1028 {
1029 if (to_file)
1030 {
1031 if (info->write_key(info,to_file,(uchar*) buffpek->key,
1032 sort_length,buffpek->mem_count))
1033 {
1034 error=1; goto err; /* purecov: inspected */
1035 }
1036 }
1037 else
1038 {
1039 uchar *end;
1040 strpos= buffpek->key;
1041 for (end=strpos+buffpek->mem_count*sort_length;
1042 strpos != end ;
1043 strpos+=sort_length)
1044 {
1045 if ((*info->key_write)(info,(void*) strpos))
1046 {
1047 error=1; goto err; /* purecov: inspected */
1048 }
1049 }
1050 }
1051 }
1052 while ((error= info->read_to_buffer(from_file,buffpek,sort_length))
1053 != (ulong) -1 && error != 0);
1054
1055 lastbuff->count=count;
1056 if (to_file)
1057 lastbuff->file_pos=to_start_filepos;
1058 err:
1059 delete_queue(&queue);
1060 DBUG_RETURN(error != 0);
1061 } /* merge_buffers */
1062
1063
1064 /* Do a merge to output-file (save only positions) */
1065
1066 static int
merge_index(MI_SORT_PARAM * info,ulong keys,uchar ** sort_keys,BUFFPEK * buffpek,long maxbuffer,IO_CACHE * tempfile)1067 merge_index(MI_SORT_PARAM *info, ulong keys, uchar **sort_keys,
1068 BUFFPEK *buffpek, long maxbuffer, IO_CACHE *tempfile)
1069 {
1070 DBUG_ENTER("merge_index");
1071 if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1072 buffpek+maxbuffer))
1073 DBUG_RETURN(1); /* purecov: inspected */
1074 DBUG_RETURN(0);
1075 } /* merge_index */
1076
1077 static int
flush_ft_buf(MI_SORT_PARAM * info)1078 flush_ft_buf(MI_SORT_PARAM *info)
1079 {
1080 int err=0;
1081 if (info->sort_info->ft_buf)
1082 {
1083 err=sort_ft_buf_flush(info);
1084 my_free(info->sort_info->ft_buf);
1085 info->sort_info->ft_buf=0;
1086 }
1087 return err;
1088 }
1089
1090