1 /* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /*
24   Creates a index for a database by reading keys, sorting them and outputing
25   them in sorted order through SORT_INFO functions.
26 */
27 
28 #include <algorithm>
29 #include <functional>
30 #include <my_global.h>
31 #include "fulltext.h"
32 #if defined(__WIN__)
33 #include <fcntl.h>
34 #else
35 #include <stddef.h>
36 #endif
37 #include <queues.h>
38 
39 /* static variables */
40 
41 #undef MYF_RW
42 #undef DISK_BUFFER_SIZE
43 
44 #define MERGEBUFF 15
45 #define MERGEBUFF2 31
46 #define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
47 #define DISK_BUFFER_SIZE (IO_SIZE*16)
48 
49 class Key_compare :
50   public std::binary_function<const uchar*, const uchar*, bool>
51 {
52 public:
Key_compare(MI_SORT_PARAM * param)53   Key_compare(MI_SORT_PARAM *param) : info(param) {}
operator ()(const uchar * a,const uchar * b)54   bool operator()(const uchar *a, const uchar *b)
55   {
56     return info->key_cmp(info, &a, &b) < 0;
57   }
58   MI_SORT_PARAM *info;
59 };
60 
61 /*
62  Pointers of functions for store and read keys from temp file
63 */
64 
65 extern void print_error(const char *fmt,...);
66 
67 /* Functions defined in this file */
68 
69 static ha_rows find_all_keys(MI_SORT_PARAM *info, ulong keys,
70                              uchar **sort_keys,
71                              DYNAMIC_ARRAY *buffpek, long *maxbuffer,
72                              IO_CACHE *tempfile,
73                              IO_CACHE *tempfile_for_exceptions);
74 static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
75                       ulong count, BUFFPEK *buffpek,IO_CACHE *tempfile);
76 static int write_key(MI_SORT_PARAM *info, uchar *key,
77                      IO_CACHE *tempfile);
78 static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
79                        ulong count);
80 static int merge_many_buff(MI_SORT_PARAM *info, ulong keys,
81                            uchar * *sort_keys,
82                            BUFFPEK *buffpek, long *maxbuffer,
83                            IO_CACHE *t_file);
84 static ulong read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
85                            uint sort_length);
86 static int merge_buffers(MI_SORT_PARAM *info, ulong keys,
87                          IO_CACHE *from_file, IO_CACHE *to_file,
88                          uchar * *sort_keys, BUFFPEK *lastbuff,
89                          BUFFPEK *Fb, BUFFPEK *Tb);
90 static int merge_index(MI_SORT_PARAM *, ulong, uchar **, BUFFPEK *, long,
91                        IO_CACHE *);
92 static int flush_ft_buf(MI_SORT_PARAM *info);
93 
94 static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
95                              ulong count, BUFFPEK *buffpek,
96                              IO_CACHE *tempfile);
97 static ulong read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
98                                    uint sort_length);
99 static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
100                            uchar *key, uint sort_length, ulong count);
101 static int write_merge_key_varlen(MI_SORT_PARAM *info,
102                                   IO_CACHE *to_file,
103                                   uchar* key, uint sort_length,
104                                   ulong count);
105 static inline int
106 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
107 
108 /*
109   Creates a index of sorted keys
110 
111   SYNOPSIS
112     _create_index_by_sort()
113     info		Sort parameters
114     no_messages		Set to 1 if no output
115     sortbuff_size	Size if sortbuffer to allocate
116 
117   RESULT
118     0	ok
119    <> 0 Error
120 */
121 
_create_index_by_sort(MI_SORT_PARAM * info,my_bool no_messages,ulonglong sortbuff_size)122 int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
123 			  ulonglong sortbuff_size)
124 {
125   int error;
126   long maxbuffer,skr;
127   ulong sort_length, keys;
128   ulonglong memavl, old_memavl;
129   DYNAMIC_ARRAY buffpek;
130   ha_rows records;
131   uchar **sort_keys;
132   IO_CACHE tempfile, tempfile_for_exceptions;
133   DBUG_ENTER("_create_index_by_sort");
134   DBUG_PRINT("enter",("sort_length: %d", info->key_length));
135 
136   if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
137   {
138     info->write_keys=write_keys_varlen;
139     info->read_to_buffer=read_to_buffer_varlen;
140     info->write_key= write_merge_key_varlen;
141   }
142   else
143   {
144     info->write_keys=write_keys;
145     info->read_to_buffer=read_to_buffer;
146     info->write_key=write_merge_key;
147   }
148 
149   my_b_clear(&tempfile);
150   my_b_clear(&tempfile_for_exceptions);
151   memset(&buffpek, 0, sizeof(buffpek));
152   sort_keys= (uchar **) NULL; error= 1;
153   maxbuffer=1;
154 
155   memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER);
156   records=	info->sort_info->max_records;
157   sort_length=	info->key_length;
158   LINT_INIT(keys);
159 
160   if ((memavl - sizeof(BUFFPEK)) / (sort_length + sizeof(char *)) > UINT_MAX32)
161     memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
162 
163   while (memavl >= MIN_SORT_BUFFER)
164   {
165     if ((records < ULONG_MAX) &&
166        ((my_off_t) (records + 1) *
167         (sort_length + sizeof(char*)) <= (my_off_t) memavl))
168       keys= (ulong) records + 1;
169     else
170       do
171       {
172 	skr=maxbuffer;
173         if (memavl < sizeof(BUFFPEK) * (ulong) maxbuffer ||
174             (keys = (memavl - sizeof(BUFFPEK) * (ulong) maxbuffer) /
175              (sort_length+sizeof(char*))) <= 1 ||
176             keys < (ulong) maxbuffer)
177 	{
178 	  mi_check_print_error(info->sort_info->param,
179 			       "myisam_sort_buffer_size is too small");
180 	  goto err;
181 	}
182       }
183       while ((maxbuffer= (long) (records / (keys - 1) + 1)) != skr);
184 
185     if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
186 				       HA_FT_MAXBYTELEN, MYF(0))))
187     {
188       if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
189 			     maxbuffer/2))
190       {
191 	my_free(sort_keys);
192         sort_keys= 0;
193       }
194       else
195 	break;
196     }
197     old_memavl=memavl;
198     if ((memavl= memavl/4*3) < MIN_SORT_BUFFER && old_memavl > MIN_SORT_BUFFER)
199       memavl= MIN_SORT_BUFFER;
200   }
201   if (memavl < MIN_SORT_BUFFER)
202   {
203     mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
204     goto err; /* purecov: tested */
205   }
206   (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
207 
208   if (!no_messages)
209     printf("  - Searching for keys, allocating buffer for %lu keys\n", keys);
210 
211   if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
212                                   &tempfile,&tempfile_for_exceptions))
213       == HA_POS_ERROR)
214     goto err; /* purecov: tested */
215   if (maxbuffer == 0)
216   {
217     if (!no_messages)
218       printf("  - Dumping %lu keys\n", (ulong) records);
219     if (write_index(info,sort_keys, (ulong) records))
220       goto err; /* purecov: inspected */
221   }
222   else
223   {
224     keys=(keys*(sort_length+sizeof(char*)))/sort_length;
225     if (maxbuffer >= MERGEBUFF2)
226     {
227       if (!no_messages)
228 	printf("  - Merging %lu keys\n", (ulong) records); /* purecov: tested */
229       if (merge_many_buff(info,keys,sort_keys,
230                   dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
231 	goto err;				/* purecov: inspected */
232     }
233     if (flush_io_cache(&tempfile) ||
234 	reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
235       goto err;					/* purecov: inspected */
236     if (!no_messages)
237       printf("  - Last merge and dumping keys\n"); /* purecov: tested */
238     if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
239                     maxbuffer,&tempfile))
240       goto err;					/* purecov: inspected */
241   }
242 
243   if (flush_ft_buf(info) || flush_pending_blocks(info))
244     goto err;
245 
246   if (my_b_inited(&tempfile_for_exceptions))
247   {
248     MI_INFO *idx=info->sort_info->info;
249     uint     keyno=info->key;
250     uint     key_length, ref_length=idx->s->rec_reflength;
251 
252     if (!no_messages)
253       printf("  - Adding exceptions\n"); /* purecov: tested */
254     if (flush_io_cache(&tempfile_for_exceptions) ||
255 	reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
256       goto err;
257 
258     while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
259 		      sizeof(key_length))
260         && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
261 		      (uint) key_length))
262     {
263 	if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
264 	  goto err;
265     }
266   }
267 
268   error =0;
269 
270 err:
271   my_free(sort_keys);
272   delete_dynamic(&buffpek);
273   close_cached_file(&tempfile);
274   close_cached_file(&tempfile_for_exceptions);
275 
276   DBUG_RETURN(error ? -1 : 0);
277 } /* _create_index_by_sort */
278 
279 
280 /* Search after all keys and place them in a temp. file */
281 
find_all_keys(MI_SORT_PARAM * info,ulong keys,uchar ** sort_keys,DYNAMIC_ARRAY * buffpek,long * maxbuffer,IO_CACHE * tempfile,IO_CACHE * tempfile_for_exceptions)282 static ha_rows find_all_keys(MI_SORT_PARAM *info, ulong keys,
283                              uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
284                              long *maxbuffer, IO_CACHE *tempfile,
285                              IO_CACHE *tempfile_for_exceptions)
286 {
287   int error;
288   ulong idx;
289   DBUG_ENTER("find_all_keys");
290 
291   idx=error=0;
292   sort_keys[0]=(uchar*) (sort_keys+keys);
293 
294   while (!(error=(*info->key_read)(info,sort_keys[idx])))
295   {
296     if (info->real_key_length > info->key_length)
297     {
298       if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
299         DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
300       continue;
301     }
302 
303     if (++idx == keys)
304     {
305       if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
306 		     tempfile))
307       DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
308 
309       sort_keys[0]=(uchar*) (sort_keys+keys);
310       memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
311       idx=1;
312     }
313     sort_keys[idx]=sort_keys[idx-1]+info->key_length;
314   }
315   if (error > 0)
316     DBUG_RETURN(HA_POS_ERROR);		/* Aborted by get_key */ /* purecov: inspected */
317   if (buffpek->elements)
318   {
319     if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
320 		   tempfile))
321       DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
322     *maxbuffer=buffpek->elements-1;
323   }
324   else
325     *maxbuffer=0;
326 
327   DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
328 } /* find_all_keys */
329 
330 
331 /* Search after all keys and place them in a temp. file */
332 
thr_find_all_keys(void * arg)333 pthread_handler_t thr_find_all_keys(void *arg)
334 {
335   MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
336   int error;
337   ulonglong memavl, old_memavl;
338   ulong keys, sort_length;
339   ulong idx, maxbuffer;
340   uchar **sort_keys=0;
341 
342   LINT_INIT(keys);
343 
344   error=1;
345 
346   if (my_thread_init())
347     goto err;
348 
349   { /* Add extra block since DBUG_ENTER declare variables */
350     DBUG_ENTER("thr_find_all_keys");
351     DBUG_PRINT("enter", ("master: %d", sort_param->master));
352     if (sort_param->sort_info->got_error)
353       goto err;
354 
355     if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
356     {
357       sort_param->write_keys=     write_keys_varlen;
358       sort_param->read_to_buffer= read_to_buffer_varlen;
359       sort_param->write_key=      write_merge_key_varlen;
360     }
361     else
362     {
363       sort_param->write_keys=     write_keys;
364       sort_param->read_to_buffer= read_to_buffer;
365       sort_param->write_key=      write_merge_key;
366     }
367 
368     my_b_clear(&sort_param->tempfile);
369     my_b_clear(&sort_param->tempfile_for_exceptions);
370     memset(&sort_param->buffpek, 0, sizeof(sort_param->buffpek));
371     memset(&sort_param->unique, 0, sizeof(sort_param->unique));
372     sort_keys= (uchar **) NULL;
373 
374     memavl=       MY_MAX(sort_param->sortbuff_size, MIN_SORT_BUFFER);
375     idx=          (ulong) sort_param->sort_info->max_records;
376     sort_length=  sort_param->key_length;
377     maxbuffer=    1;
378 
379     if ((memavl - sizeof(BUFFPEK)) / (sort_length +
380                                       sizeof(char *)) > UINT_MAX32)
381       memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
382 
383     while (memavl >= MIN_SORT_BUFFER)
384     {
385       if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
386           (my_off_t) memavl)
387         keys= idx+1;
388       else
389       {
390         ulong skr;
391         do
392         {
393           skr= maxbuffer;
394           if (memavl < sizeof(BUFFPEK)*maxbuffer ||
395               (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
396                (sort_length+sizeof(char*))) <= 1 ||
397               keys < maxbuffer)
398           {
399             mi_check_print_error(sort_param->sort_info->param,
400                                  "myisam_sort_buffer_size is too small");
401             goto err;
402           }
403         }
404         while ((maxbuffer= (idx/(keys-1)+1)) != skr);
405       }
406       if ((sort_keys= (uchar**)
407            my_malloc(keys*(sort_length+sizeof(char*))+
408                      ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
409                       HA_FT_MAXBYTELEN : 0), MYF(0))))
410       {
411         if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
412                                   maxbuffer, maxbuffer/2))
413         {
414           my_free(sort_keys);
415           sort_keys= (uchar **) NULL; /* for err: label */
416         }
417         else
418           break;
419       }
420       old_memavl= memavl;
421       if ((memavl= memavl / 4 * 3) < MIN_SORT_BUFFER &&
422           old_memavl > MIN_SORT_BUFFER)
423         memavl= MIN_SORT_BUFFER;
424     }
425     if (memavl < MIN_SORT_BUFFER)
426     {
427       mi_check_print_error(sort_param->sort_info->param,
428                            "MyISAM sort buffer too small");
429       goto err; /* purecov: tested */
430     }
431 
432     if (sort_param->sort_info->param->testflag & T_VERBOSE)
433       printf("Key %d - Allocating buffer for %lu keys\n",
434              sort_param->key + 1, keys);
435     sort_param->sort_keys= sort_keys;
436 
437     idx= error= 0;
438     sort_keys[0]= (uchar*) (sort_keys+keys);
439 
440     DBUG_PRINT("info", ("reading keys"));
441     while (!(error= sort_param->sort_info->got_error) &&
442            !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
443     {
444       if (sort_param->real_key_length > sort_param->key_length)
445       {
446         if (write_key(sort_param, sort_keys[idx],
447                       &sort_param->tempfile_for_exceptions))
448           goto err;
449         continue;
450       }
451 
452       if (++idx == keys)
453       {
454         if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
455                                    (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
456                                    &sort_param->tempfile))
457           goto err;
458         sort_keys[0]= (uchar*) (sort_keys+keys);
459         memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
460         idx= 1;
461       }
462       sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
463     }
464     if (error > 0)
465       goto err;
466     if (sort_param->buffpek.elements)
467     {
468       if (sort_param->write_keys(sort_param, sort_keys, idx,
469                                  (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
470                                  &sort_param->tempfile))
471         goto err;
472       sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
473     }
474     else
475       sort_param->keys= idx;
476 
477     sort_param->sort_keys_length= keys;
478     goto ok;
479 
480 err:
481     DBUG_PRINT("error", ("got some error"));
482     sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
483     my_free(sort_keys);
484     sort_param->sort_keys= 0;
485     delete_dynamic(& sort_param->buffpek);
486     close_cached_file(&sort_param->tempfile);
487     close_cached_file(&sort_param->tempfile_for_exceptions);
488 
489 ok:
490     free_root(&sort_param->wordroot, MYF(0));
491     /*
492       Detach from the share if the writer is involved. Avoid others to
493       be blocked. This includes a flush of the write buffer. This will
494       also indicate EOF to the readers.
495       That means that a writer always gets here first and readers -
496       only when they see EOF. But if a reader finishes prematurely
497       because of an error it may reach this earlier - don't allow it
498       to detach the writer thread.
499     */
500     if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
501       remove_io_thread(&sort_param->sort_info->info->rec_cache);
502 
503     /* Readers detach from the share if any. Avoid others to be blocked. */
504     if (sort_param->read_cache.share)
505       remove_io_thread(&sort_param->read_cache);
506 
507     mysql_mutex_lock(&sort_param->sort_info->mutex);
508     if (!--sort_param->sort_info->threads_running)
509       mysql_cond_signal(&sort_param->sort_info->cond);
510     mysql_mutex_unlock(&sort_param->sort_info->mutex);
511     DBUG_PRINT("exit", ("======== ending thread ========"));
512   }
513   my_thread_end();
514   return NULL;
515 }
516 
517 
thr_write_keys(MI_SORT_PARAM * sort_param)518 int thr_write_keys(MI_SORT_PARAM *sort_param)
519 {
520   SORT_INFO *sort_info=sort_param->sort_info;
521   MI_CHECK *param=sort_info->param;
522   ulong UNINIT_VAR(length), keys;
523   ulong *rec_per_key_part=param->rec_per_key_part;
524   int got_error=sort_info->got_error;
525   uint i;
526   MI_INFO *info=sort_info->info;
527   MYISAM_SHARE *share=info->s;
528   MI_SORT_PARAM *sinfo;
529   uchar *mergebuf=0;
530   DBUG_ENTER("thr_write_keys");
531   LINT_INIT(length);
532 
533   for (i= 0, sinfo= sort_param ;
534        i < sort_info->total_keys ;
535        i++, sinfo++)
536   {
537     if (!sinfo->sort_keys)
538     {
539       got_error=1;
540       my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
541       continue;
542     }
543     if (!got_error)
544     {
545       mi_set_key_active(share->state.key_map, sinfo->key);
546       if (!sinfo->buffpek.elements)
547       {
548         if (param->testflag & T_VERBOSE)
549         {
550           printf("Key %d  - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
551           fflush(stdout);
552         }
553         if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
554             flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
555           got_error=1;
556       }
557     }
558     my_free(sinfo->sort_keys);
559     my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
560     sinfo->sort_keys=0;
561   }
562 
563   for (i= 0, sinfo= sort_param ;
564        i < sort_info->total_keys ;
565        i++,
566 	 delete_dynamic(&sinfo->buffpek),
567 	 close_cached_file(&sinfo->tempfile),
568 	 close_cached_file(&sinfo->tempfile_for_exceptions),
569          rec_per_key_part+= sinfo->keyinfo->keysegs, sinfo++)
570   {
571     if (got_error)
572       continue;
573     if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
574     {
575       sinfo->write_keys=write_keys_varlen;
576       sinfo->read_to_buffer=read_to_buffer_varlen;
577       sinfo->write_key=write_merge_key_varlen;
578     }
579     else
580     {
581       sinfo->write_keys=write_keys;
582       sinfo->read_to_buffer=read_to_buffer;
583       sinfo->write_key=write_merge_key;
584     }
585     if (sinfo->buffpek.elements)
586     {
587       ulong maxbuffer=sinfo->buffpek.elements-1;
588       if (!mergebuf)
589       {
590         length=param->sort_buffer_length;
591         while (length >= MIN_SORT_BUFFER)
592         {
593           if ((mergebuf= (uchar *) my_malloc(length, MYF(0))))
594               break;
595           length=length*3/4;
596         }
597         if (!mergebuf)
598         {
599           got_error=1;
600           continue;
601         }
602       }
603       keys=length/sinfo->key_length;
604       if (maxbuffer >= MERGEBUFF2)
605       {
606         if (param->testflag & T_VERBOSE)
607           printf("Key %d  - Merging %u keys\n",sinfo->key+1, sinfo->keys);
608         if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
609 			    dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
610 			    (long *) &maxbuffer, &sinfo->tempfile))
611         {
612           got_error=1;
613           continue;
614         }
615       }
616       if (flush_io_cache(&sinfo->tempfile) ||
617           reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
618       {
619         got_error=1;
620         continue;
621       }
622       if (param->testflag & T_VERBOSE)
623         printf("Key %d  - Last merge and dumping keys\n", sinfo->key+1);
624       if (merge_index(sinfo, keys, (uchar **)mergebuf,
625                       dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
626                       maxbuffer,&sinfo->tempfile) ||
627           flush_ft_buf(sinfo) ||
628 	  flush_pending_blocks(sinfo))
629       {
630         got_error=1;
631         continue;
632       }
633     }
634     if (my_b_inited(&sinfo->tempfile_for_exceptions))
635     {
636       uint key_length;
637 
638       if (param->testflag & T_VERBOSE)
639         printf("Key %d  - Dumping 'long' keys\n", sinfo->key+1);
640 
641       if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
642           reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
643       {
644         got_error=1;
645         continue;
646       }
647 
648       while (!got_error &&
649 	     !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
650 			sizeof(key_length)))
651       {
652         uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
653         if (key_length > sizeof(ft_buf) ||
654             my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
655                       (uint)key_length) ||
656             _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
657                          key_length - info->s->rec_reflength))
658           got_error=1;
659       }
660     }
661     if (!got_error && param->testflag & T_STATISTICS)
662       update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
663                        param->stats_method == MI_STATS_METHOD_IGNORE_NULLS ?
664                        sinfo->notnull : NULL,
665                        (ulonglong) info->state->records);
666   }
667   my_free(mergebuf);
668   DBUG_RETURN(got_error);
669 }
670 
671         /* Write all keys in memory to file for later merge */
672 
write_keys(MI_SORT_PARAM * info,uchar ** sort_keys,ulong count,BUFFPEK * buffpek,IO_CACHE * tempfile)673 static int write_keys(MI_SORT_PARAM *info, uchar **sort_keys,
674                       ulong count, BUFFPEK *buffpek, IO_CACHE *tempfile)
675 {
676   uchar **end;
677   uint sort_length=info->key_length;
678   DBUG_ENTER("write_keys");
679 
680   std::sort(sort_keys, sort_keys + count, Key_compare(info));
681 
682   if (!my_b_inited(tempfile) &&
683       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
684                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
685     DBUG_RETURN(1); /* purecov: inspected */
686 
687   buffpek->file_pos=my_b_tell(tempfile);
688   buffpek->count=count;
689 
690   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
691   {
692     if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
693       DBUG_RETURN(1); /* purecov: inspected */
694   }
695   DBUG_RETURN(0);
696 } /* write_keys */
697 
698 
699 static inline int
my_var_write(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * bufs)700 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
701 {
702   int err;
703   uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
704 
705   /* The following is safe as this is a local file */
706   if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
707     return (err);
708   if ((err= my_b_write(to_file,bufs, (uint) len)))
709     return (err);
710   return (0);
711 }
712 
713 
write_keys_varlen(MI_SORT_PARAM * info,uchar ** sort_keys,ulong count,BUFFPEK * buffpek,IO_CACHE * tempfile)714 static int write_keys_varlen(MI_SORT_PARAM *info,
715                              uchar **sort_keys,
716                              ulong count, BUFFPEK *buffpek,
717                              IO_CACHE *tempfile)
718 {
719   uchar **end;
720   int err;
721   DBUG_ENTER("write_keys_varlen");
722 
723   std::sort(sort_keys, sort_keys + count, Key_compare(info));
724 
725   if (!my_b_inited(tempfile) &&
726       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
727                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
728     DBUG_RETURN(1); /* purecov: inspected */
729 
730   buffpek->file_pos=my_b_tell(tempfile);
731   buffpek->count=count;
732   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
733   {
734     if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
735       DBUG_RETURN(err);
736   }
737   DBUG_RETURN(0);
738 } /* write_keys_varlen */
739 
740 
write_key(MI_SORT_PARAM * info,uchar * key,IO_CACHE * tempfile)741 static int write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile)
742 {
743   uint key_length=info->real_key_length;
744   DBUG_ENTER("write_key");
745 
746   if (!my_b_inited(tempfile) &&
747       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
748                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
749     DBUG_RETURN(1);
750 
751   if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
752       my_b_write(tempfile,(uchar*)key,(uint) key_length))
753     DBUG_RETURN(1);
754   DBUG_RETURN(0);
755 } /* write_key */
756 
757 
758 /* Write index */
759 
write_index(MI_SORT_PARAM * info,uchar ** sort_keys,ulong count)760 static int write_index(MI_SORT_PARAM *info, uchar **sort_keys,
761                        ulong count)
762 {
763   DBUG_ENTER("write_index");
764 
765   std::sort(sort_keys, sort_keys + count, Key_compare(info));
766 
767   while (count--)
768   {
769     if ((*info->key_write)(info,*sort_keys++))
770       DBUG_RETURN(-1); /* purecov: inspected */
771   }
772   DBUG_RETURN(0);
773 } /* write_index */
774 
775 
776         /* Merge buffers to make < MERGEBUFF2 buffers */
777 
merge_many_buff(MI_SORT_PARAM * info,ulong keys,uchar ** sort_keys,BUFFPEK * buffpek,long * maxbuffer,IO_CACHE * t_file)778 static int merge_many_buff(MI_SORT_PARAM *info, ulong keys,
779                            uchar **sort_keys, BUFFPEK *buffpek,
780                            long *maxbuffer, IO_CACHE *t_file)
781 {
782   long i;
783   IO_CACHE t_file2, *from_file, *to_file, *temp;
784   BUFFPEK *lastbuff;
785   DBUG_ENTER("merge_many_buff");
786 
787   if (*maxbuffer < MERGEBUFF2)
788     DBUG_RETURN(0);                             /* purecov: inspected */
789   if (flush_io_cache(t_file) ||
790       open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
791                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
792     DBUG_RETURN(1);                             /* purecov: inspected */
793 
794   from_file= t_file ; to_file= &t_file2;
795   while (*maxbuffer >= MERGEBUFF2)
796   {
797     reinit_io_cache(from_file,READ_CACHE,0L,0,0);
798     reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
799     lastbuff=buffpek;
800     for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
801     {
802       if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
803                         buffpek+i,buffpek+i+MERGEBUFF-1))
804         goto cleanup;
805     }
806     if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
807                       buffpek+i,buffpek+ *maxbuffer))
808       break; /* purecov: inspected */
809     if (flush_io_cache(to_file))
810       break;                                    /* purecov: inspected */
811     temp=from_file; from_file=to_file; to_file=temp;
812     *maxbuffer= (long) (lastbuff-buffpek)-1;
813   }
814 cleanup:
815   close_cached_file(to_file);                   /* This holds old result */
816   if (to_file == t_file)
817   {
818     DBUG_ASSERT(t_file2.type == WRITE_CACHE);
819     *t_file=t_file2;                            /* Copy result file */
820     t_file->current_pos= &t_file->write_pos;
821     t_file->current_end= &t_file->write_end;
822   }
823 
824   DBUG_RETURN(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
825 } /* merge_many_buff */
826 
827 
828 /*
829    Read data to buffer
830 
831   SYNOPSIS
832     read_to_buffer()
833     fromfile		File to read from
834     buffpek		Where to read from
835     sort_length		max length to read
836   RESULT
837     > 0	Ammount of bytes read
838     -1	Error
839 */
840 
read_to_buffer(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)841 static ulong read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
842                             uint sort_length)
843 {
844   ulong count;
845   ulong length;
846 
847   if ((count=(ulong) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
848   {
849     if (mysql_file_pread(fromfile->file, (uchar*) buffpek->base,
850                          (length= sort_length*count),
851                          buffpek->file_pos, MYF_RW))
852       return((ulong) -1);                        /* purecov: inspected */
853     buffpek->key=buffpek->base;
854     buffpek->file_pos+= length;                 /* New filepos */
855     buffpek->count-=    count;
856     buffpek->mem_count= count;
857   }
858   return (count*sort_length);
859 } /* read_to_buffer */
860 
read_to_buffer_varlen(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)861 static ulong read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
862                                   uint sort_length)
863 {
864   ulong count;
865   uint16 length_of_key = 0;
866   ulong idx;
867   uchar *buffp;
868 
869   if ((count=(ulong) MY_MIN((ha_rows) buffpek->max_keys, buffpek->count)))
870   {
871     buffp = buffpek->base;
872 
873     for (idx=1;idx<=count;idx++)
874     {
875       if (mysql_file_pread(fromfile->file, (uchar*)&length_of_key,
876                            sizeof(length_of_key), buffpek->file_pos, MYF_RW))
877         return((ulong) -1);
878       buffpek->file_pos+=sizeof(length_of_key);
879       if (mysql_file_pread(fromfile->file, (uchar*) buffp,
880                            length_of_key, buffpek->file_pos, MYF_RW))
881         return((ulong) -1);
882       buffpek->file_pos+=length_of_key;
883       buffp = buffp + sort_length;
884     }
885     buffpek->key=buffpek->base;
886     buffpek->count-=    count;
887     buffpek->mem_count= count;
888   }
889   return (count*sort_length);
890 } /* read_to_buffer_varlen */
891 
892 
write_merge_key_varlen(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * key,uint sort_length,ulong count)893 static int write_merge_key_varlen(MI_SORT_PARAM *info,
894                                   IO_CACHE *to_file, uchar* key,
895                                   uint sort_length, ulong count)
896 {
897   ulong idx;
898   uchar *bufs = key;
899 
900   for (idx=1;idx<=count;idx++)
901   {
902     int err;
903     if ((err= my_var_write(info, to_file, bufs)))
904       return (err);
905     bufs=bufs+sort_length;
906   }
907   return(0);
908 }
909 
910 
write_merge_key(MI_SORT_PARAM * info MY_ATTRIBUTE ((unused)),IO_CACHE * to_file,uchar * key,uint sort_length,ulong count)911 static int write_merge_key(MI_SORT_PARAM *info MY_ATTRIBUTE((unused)),
912                            IO_CACHE *to_file, uchar *key,
913                            uint sort_length, ulong count)
914 {
915   return my_b_write(to_file, key, (size_t) sort_length*count);
916 }
917 
918 /*
919   Merge buffers to one buffer
920   If to_file == 0 then use info->key_write
921 */
922 
923 static int
merge_buffers(MI_SORT_PARAM * info,ulong keys,IO_CACHE * from_file,IO_CACHE * to_file,uchar ** sort_keys,BUFFPEK * lastbuff,BUFFPEK * Fb,BUFFPEK * Tb)924 merge_buffers(MI_SORT_PARAM *info, ulong keys, IO_CACHE *from_file,
925               IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
926               BUFFPEK *Fb, BUFFPEK *Tb)
927 {
928   ulong error;
929   uint sort_length;
930   ulong maxcount;
931   ha_rows count;
932   my_off_t UNINIT_VAR(to_start_filepos);
933   uchar *strpos;
934   BUFFPEK *buffpek,**refpek;
935   QUEUE queue;
936   volatile int *killed= killed_ptr(info->sort_info->param);
937   DBUG_ENTER("merge_buffers");
938 
939   count=error=0;
940   maxcount= keys / ((ulong) (Tb-Fb) + 1);
941   DBUG_ASSERT(maxcount > 0);
942   LINT_INIT(to_start_filepos);
943   if (to_file)
944     to_start_filepos=my_b_tell(to_file);
945   strpos=(uchar*) sort_keys;
946   sort_length=info->key_length;
947 
948   if (init_queue(&queue, (uint) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
949                  (int (*)(void*, uchar *,uchar*)) info->key_cmp,
950                  (void*) info))
951     DBUG_RETURN(1); /* purecov: inspected */
952 
953   for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
954   {
955     count+= buffpek->count;
956     buffpek->base= strpos;
957     buffpek->max_keys=maxcount;
958     strpos+= (error= info->read_to_buffer(from_file,buffpek, sort_length));
959     if (error == (ulong) -1)
960       goto err; /* purecov: inspected */
961     queue_insert(&queue,(uchar*) buffpek);
962   }
963 
964   while (queue.elements > 1)
965   {
966     for (;;)
967     {
968       if (*killed)
969       {
970         error=1; goto err;
971       }
972       buffpek=(BUFFPEK*) queue_top(&queue);
973       if (to_file)
974       {
975         if (info->write_key(info,to_file,(uchar*) buffpek->key,
976                             (uint) sort_length,1))
977         {
978           error=1; goto err; /* purecov: inspected */
979         }
980       }
981       else
982       {
983         if ((*info->key_write)(info,(void*) buffpek->key))
984         {
985           error=1; goto err; /* purecov: inspected */
986         }
987       }
988       buffpek->key+=sort_length;
989       if (! --buffpek->mem_count)
990       {
991         if (!(error= info->read_to_buffer(from_file,buffpek,sort_length)))
992         {
993           uchar *base=buffpek->base;
994           ulong max_keys=buffpek->max_keys;
995 
996           (void) queue_remove(&queue,0);
997 
998           /* Put room used by buffer to use in other buffer */
999           for (refpek= (BUFFPEK**) &queue_top(&queue);
1000                refpek <= (BUFFPEK**) &queue_end(&queue);
1001                refpek++)
1002           {
1003             buffpek= *refpek;
1004             if (buffpek->base+buffpek->max_keys*sort_length == base)
1005             {
1006               buffpek->max_keys+=max_keys;
1007               break;
1008             }
1009             else if (base+max_keys*sort_length == buffpek->base)
1010             {
1011               buffpek->base=base;
1012               buffpek->max_keys+=max_keys;
1013               break;
1014             }
1015           }
1016           break;                /* One buffer have been removed */
1017         }
1018       }
1019       else if (error == (ulong) -1)
1020         goto err;               /* purecov: inspected */
1021       queue_replaced(&queue);   /* Top element has been replaced */
1022     }
1023   }
1024   buffpek=(BUFFPEK*) queue_top(&queue);
1025   buffpek->base=(uchar *) sort_keys;
1026   buffpek->max_keys=keys;
1027   do
1028   {
1029     if (to_file)
1030     {
1031       if (info->write_key(info,to_file,(uchar*) buffpek->key,
1032                          sort_length,buffpek->mem_count))
1033       {
1034         error=1; goto err; /* purecov: inspected */
1035       }
1036     }
1037     else
1038     {
1039       uchar *end;
1040       strpos= buffpek->key;
1041       for (end=strpos+buffpek->mem_count*sort_length;
1042            strpos != end ;
1043            strpos+=sort_length)
1044       {
1045         if ((*info->key_write)(info,(void*) strpos))
1046         {
1047           error=1; goto err; /* purecov: inspected */
1048         }
1049       }
1050     }
1051   }
1052   while ((error= info->read_to_buffer(from_file,buffpek,sort_length))
1053          != (ulong) -1 && error != 0);
1054 
1055   lastbuff->count=count;
1056   if (to_file)
1057     lastbuff->file_pos=to_start_filepos;
1058 err:
1059   delete_queue(&queue);
1060   DBUG_RETURN(error != 0);
1061 } /* merge_buffers */
1062 
1063 
1064         /* Do a merge to output-file (save only positions) */
1065 
1066 static int
merge_index(MI_SORT_PARAM * info,ulong keys,uchar ** sort_keys,BUFFPEK * buffpek,long maxbuffer,IO_CACHE * tempfile)1067 merge_index(MI_SORT_PARAM *info, ulong keys, uchar **sort_keys,
1068             BUFFPEK *buffpek, long maxbuffer, IO_CACHE *tempfile)
1069 {
1070   DBUG_ENTER("merge_index");
1071   if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1072                     buffpek+maxbuffer))
1073     DBUG_RETURN(1); /* purecov: inspected */
1074   DBUG_RETURN(0);
1075 } /* merge_index */
1076 
1077 static int
flush_ft_buf(MI_SORT_PARAM * info)1078 flush_ft_buf(MI_SORT_PARAM *info)
1079 {
1080   int err=0;
1081   if (info->sort_info->ft_buf)
1082   {
1083     err=sort_ft_buf_flush(info);
1084     my_free(info->sort_info->ft_buf);
1085     info->sort_info->ft_buf=0;
1086   }
1087   return err;
1088 }
1089 
1090