1 /* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /*
24   Creates a index for a database by reading keys, sorting them and outputing
25   them in sorted order through SORT_INFO functions.
26 */
27 
28 #include <algorithm>
29 #include <functional>
30 #include <my_global.h>
31 #include "fulltext.h"
32 #if defined(__WIN__)
33 #include <fcntl.h>
34 #else
35 #include <stddef.h>
36 #endif
37 #include <queues.h>
38 
39 /* static variables */
40 
41 #undef MYF_RW
42 #undef DISK_BUFFER_SIZE
43 
44 #define MERGEBUFF 15
45 #define MERGEBUFF2 31
46 #define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
47 #define DISK_BUFFER_SIZE (IO_SIZE*16)
48 
49 class Key_compare :
50   public std::binary_function<const uchar*, const uchar*, bool>
51 {
52 public:
Key_compare(MI_SORT_PARAM * param)53   Key_compare(MI_SORT_PARAM *param) : info(param) {}
operator ()(const uchar * a,const uchar * b)54   bool operator()(const uchar *a, const uchar *b)
55   {
56     return info->key_cmp(info, &a, &b) < 0;
57   }
58   MI_SORT_PARAM *info;
59 };
60 
61 /*
62  Pointers of functions for store and read keys from temp file
63 */
64 
65 extern void print_error(const char *fmt,...);
66 
67 /* Functions defined in this file */
68 
69 static ha_rows find_all_keys(MI_SORT_PARAM *info,uint keys,
70                              uchar **sort_keys,
71                              DYNAMIC_ARRAY *buffpek,int *maxbuffer,
72                              IO_CACHE *tempfile,
73                              IO_CACHE *tempfile_for_exceptions);
74 static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
75                       uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
76 static int write_key(MI_SORT_PARAM *info, uchar *key,
77                      IO_CACHE *tempfile);
78 static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
79                        uint count);
80 static int merge_many_buff(MI_SORT_PARAM *info,uint keys,
81                            uchar * *sort_keys,
82                            BUFFPEK *buffpek,int *maxbuffer,
83                            IO_CACHE *t_file);
84 static uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
85                            uint sort_length);
86 static int merge_buffers(MI_SORT_PARAM *info,uint keys,
87                          IO_CACHE *from_file, IO_CACHE *to_file,
88                          uchar * *sort_keys, BUFFPEK *lastbuff,
89                          BUFFPEK *Fb, BUFFPEK *Tb);
90 static int merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
91                        IO_CACHE *);
92 static int flush_ft_buf(MI_SORT_PARAM *info);
93 
94 static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
95                              uint count, BUFFPEK *buffpek,
96                              IO_CACHE *tempfile);
97 static uint read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
98                                   uint sort_length);
99 static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
100                            uchar *key, uint sort_length, uint count);
101 static int write_merge_key_varlen(MI_SORT_PARAM *info,
102                                   IO_CACHE *to_file,
103                                   uchar* key, uint sort_length,
104                                   uint count);
105 static inline int
106 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
107 
108 /*
109   Creates a index of sorted keys
110 
111   SYNOPSIS
112     _create_index_by_sort()
113     info		Sort parameters
114     no_messages		Set to 1 if no output
115     sortbuff_size	Size if sortbuffer to allocate
116 
117   RESULT
118     0	ok
119    <> 0 Error
120 */
121 
_create_index_by_sort(MI_SORT_PARAM * info,my_bool no_messages,ulonglong sortbuff_size)122 int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
123 			  ulonglong sortbuff_size)
124 {
125   int error,maxbuffer,skr;
126   uint sort_length, keys;
127   ulonglong memavl, old_memavl;
128   DYNAMIC_ARRAY buffpek;
129   ha_rows records;
130   uchar **sort_keys;
131   IO_CACHE tempfile, tempfile_for_exceptions;
132   DBUG_ENTER("_create_index_by_sort");
133   DBUG_PRINT("enter",("sort_length: %d", info->key_length));
134 
135   if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
136   {
137     info->write_keys=write_keys_varlen;
138     info->read_to_buffer=read_to_buffer_varlen;
139     info->write_key= write_merge_key_varlen;
140   }
141   else
142   {
143     info->write_keys=write_keys;
144     info->read_to_buffer=read_to_buffer;
145     info->write_key=write_merge_key;
146   }
147 
148   my_b_clear(&tempfile);
149   my_b_clear(&tempfile_for_exceptions);
150   memset(&buffpek, 0, sizeof(buffpek));
151   sort_keys= (uchar **) NULL; error= 1;
152   maxbuffer=1;
153 
154   memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER);
155   records=	info->sort_info->max_records;
156   sort_length=	info->key_length;
157   LINT_INIT(keys);
158 
159   if ((memavl - sizeof(BUFFPEK)) / (sort_length + sizeof(char *)) > UINT_MAX32)
160     memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
161 
162   while (memavl >= MIN_SORT_BUFFER)
163   {
164     if ((records < UINT_MAX32) &&
165        ((my_off_t) (records + 1) *
166         (sort_length + sizeof(char*)) <= (my_off_t) memavl))
167       keys= (uint)records+1;
168     else
169       do
170       {
171 	skr=maxbuffer;
172 	if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
173 	    (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
174              (sort_length+sizeof(char*))) <= 1 ||
175             keys < (uint) maxbuffer)
176 	{
177 	  mi_check_print_error(info->sort_info->param,
178 			       "myisam_sort_buffer_size is too small");
179 	  goto err;
180 	}
181       }
182       while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
183 
184     if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
185 				       HA_FT_MAXBYTELEN, MYF(0))))
186     {
187       if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
188 			     maxbuffer/2))
189       {
190 	my_free(sort_keys);
191         sort_keys= 0;
192       }
193       else
194 	break;
195     }
196     old_memavl=memavl;
197     if ((memavl= memavl/4*3) < MIN_SORT_BUFFER && old_memavl > MIN_SORT_BUFFER)
198       memavl= MIN_SORT_BUFFER;
199   }
200   if (memavl < MIN_SORT_BUFFER)
201   {
202     mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
203     goto err; /* purecov: tested */
204   }
205   (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
206 
207   if (!no_messages)
208     printf("  - Searching for keys, allocating buffer for %d keys\n",keys);
209 
210   if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
211                                   &tempfile,&tempfile_for_exceptions))
212       == HA_POS_ERROR)
213     goto err; /* purecov: tested */
214   if (maxbuffer == 0)
215   {
216     if (!no_messages)
217       printf("  - Dumping %lu keys\n", (ulong) records);
218     if (write_index(info,sort_keys, (uint) records))
219       goto err; /* purecov: inspected */
220   }
221   else
222   {
223     keys=(keys*(sort_length+sizeof(char*)))/sort_length;
224     if (maxbuffer >= MERGEBUFF2)
225     {
226       if (!no_messages)
227 	printf("  - Merging %lu keys\n", (ulong) records); /* purecov: tested */
228       if (merge_many_buff(info,keys,sort_keys,
229                   dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
230 	goto err;				/* purecov: inspected */
231     }
232     if (flush_io_cache(&tempfile) ||
233 	reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
234       goto err;					/* purecov: inspected */
235     if (!no_messages)
236       printf("  - Last merge and dumping keys\n"); /* purecov: tested */
237     if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
238                     maxbuffer,&tempfile))
239       goto err;					/* purecov: inspected */
240   }
241 
242   if (flush_ft_buf(info) || flush_pending_blocks(info))
243     goto err;
244 
245   if (my_b_inited(&tempfile_for_exceptions))
246   {
247     MI_INFO *idx=info->sort_info->info;
248     uint     keyno=info->key;
249     uint     key_length, ref_length=idx->s->rec_reflength;
250 
251     if (!no_messages)
252       printf("  - Adding exceptions\n"); /* purecov: tested */
253     if (flush_io_cache(&tempfile_for_exceptions) ||
254 	reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
255       goto err;
256 
257     while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
258 		      sizeof(key_length))
259         && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
260 		      (uint) key_length))
261     {
262 	if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
263 	  goto err;
264     }
265   }
266 
267   error =0;
268 
269 err:
270   my_free(sort_keys);
271   delete_dynamic(&buffpek);
272   close_cached_file(&tempfile);
273   close_cached_file(&tempfile_for_exceptions);
274 
275   DBUG_RETURN(error ? -1 : 0);
276 } /* _create_index_by_sort */
277 
278 
279 /* Search after all keys and place them in a temp. file */
280 
find_all_keys(MI_SORT_PARAM * info,uint keys,uchar ** sort_keys,DYNAMIC_ARRAY * buffpek,int * maxbuffer,IO_CACHE * tempfile,IO_CACHE * tempfile_for_exceptions)281 static ha_rows find_all_keys(MI_SORT_PARAM *info, uint keys,
282                              uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
283                              int *maxbuffer, IO_CACHE *tempfile,
284                              IO_CACHE *tempfile_for_exceptions)
285 {
286   int error;
287   uint idx;
288   DBUG_ENTER("find_all_keys");
289 
290   idx=error=0;
291   sort_keys[0]=(uchar*) (sort_keys+keys);
292 
293   while (!(error=(*info->key_read)(info,sort_keys[idx])))
294   {
295     if (info->real_key_length > info->key_length)
296     {
297       if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
298         DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
299       continue;
300     }
301 
302     if (++idx == keys)
303     {
304       if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
305 		     tempfile))
306       DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
307 
308       sort_keys[0]=(uchar*) (sort_keys+keys);
309       memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
310       idx=1;
311     }
312     sort_keys[idx]=sort_keys[idx-1]+info->key_length;
313   }
314   if (error > 0)
315     DBUG_RETURN(HA_POS_ERROR);		/* Aborted by get_key */ /* purecov: inspected */
316   if (buffpek->elements)
317   {
318     if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
319 		   tempfile))
320       DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
321     *maxbuffer=buffpek->elements-1;
322   }
323   else
324     *maxbuffer=0;
325 
326   DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
327 } /* find_all_keys */
328 
329 
330 /* Search after all keys and place them in a temp. file */
331 
thr_find_all_keys(void * arg)332 pthread_handler_t thr_find_all_keys(void *arg)
333 {
334   MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
335   int error;
336   ulonglong memavl, old_memavl;
337   uint keys, sort_length;
338   uint idx, maxbuffer;
339   uchar **sort_keys=0;
340 
341   LINT_INIT(keys);
342 
343   error=1;
344 
345   if (my_thread_init())
346     goto err;
347 
348   { /* Add extra block since DBUG_ENTER declare variables */
349     DBUG_ENTER("thr_find_all_keys");
350     DBUG_PRINT("enter", ("master: %d", sort_param->master));
351     if (sort_param->sort_info->got_error)
352       goto err;
353 
354     if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
355     {
356       sort_param->write_keys=     write_keys_varlen;
357       sort_param->read_to_buffer= read_to_buffer_varlen;
358       sort_param->write_key=      write_merge_key_varlen;
359     }
360     else
361     {
362       sort_param->write_keys=     write_keys;
363       sort_param->read_to_buffer= read_to_buffer;
364       sort_param->write_key=      write_merge_key;
365     }
366 
367     my_b_clear(&sort_param->tempfile);
368     my_b_clear(&sort_param->tempfile_for_exceptions);
369     memset(&sort_param->buffpek, 0, sizeof(sort_param->buffpek));
370     memset(&sort_param->unique, 0, sizeof(sort_param->unique));
371     sort_keys= (uchar **) NULL;
372 
373     memavl=       MY_MAX(sort_param->sortbuff_size, MIN_SORT_BUFFER);
374     idx=          (uint)sort_param->sort_info->max_records;
375     sort_length=  sort_param->key_length;
376     maxbuffer=    1;
377 
378     if ((memavl - sizeof(BUFFPEK)) / (sort_length +
379                                       sizeof(char *)) > UINT_MAX32)
380       memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
381 
382     while (memavl >= MIN_SORT_BUFFER)
383     {
384       if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
385           (my_off_t) memavl)
386         keys= idx+1;
387       else
388       {
389         uint skr;
390         do
391         {
392           skr= maxbuffer;
393           if (memavl < sizeof(BUFFPEK)*maxbuffer ||
394               (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
395                (sort_length+sizeof(char*))) <= 1 ||
396               keys < (uint) maxbuffer)
397           {
398             mi_check_print_error(sort_param->sort_info->param,
399                                  "myisam_sort_buffer_size is too small");
400             goto err;
401           }
402         }
403         while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
404       }
405       if ((sort_keys= (uchar**)
406            my_malloc(keys*(sort_length+sizeof(char*))+
407                      ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
408                       HA_FT_MAXBYTELEN : 0), MYF(0))))
409       {
410         if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
411                                   maxbuffer, maxbuffer/2))
412         {
413           my_free(sort_keys);
414           sort_keys= (uchar **) NULL; /* for err: label */
415         }
416         else
417           break;
418       }
419       old_memavl= memavl;
420       if ((memavl= memavl / 4 * 3) < MIN_SORT_BUFFER &&
421           old_memavl > MIN_SORT_BUFFER)
422         memavl= MIN_SORT_BUFFER;
423     }
424     if (memavl < MIN_SORT_BUFFER)
425     {
426       mi_check_print_error(sort_param->sort_info->param,
427                            "MyISAM sort buffer too small");
428       goto err; /* purecov: tested */
429     }
430 
431     if (sort_param->sort_info->param->testflag & T_VERBOSE)
432       printf("Key %d - Allocating buffer for %d keys\n",
433              sort_param->key + 1, keys);
434     sort_param->sort_keys= sort_keys;
435 
436     idx= error= 0;
437     sort_keys[0]= (uchar*) (sort_keys+keys);
438 
439     DBUG_PRINT("info", ("reading keys"));
440     while (!(error= sort_param->sort_info->got_error) &&
441            !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
442     {
443       if (sort_param->real_key_length > sort_param->key_length)
444       {
445         if (write_key(sort_param, sort_keys[idx],
446                       &sort_param->tempfile_for_exceptions))
447           goto err;
448         continue;
449       }
450 
451       if (++idx == keys)
452       {
453         if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
454                                    (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
455                                    &sort_param->tempfile))
456           goto err;
457         sort_keys[0]= (uchar*) (sort_keys+keys);
458         memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
459         idx= 1;
460       }
461       sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
462     }
463     if (error > 0)
464       goto err;
465     if (sort_param->buffpek.elements)
466     {
467       if (sort_param->write_keys(sort_param, sort_keys, idx,
468                                  (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
469                                  &sort_param->tempfile))
470         goto err;
471       sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
472     }
473     else
474       sort_param->keys= idx;
475 
476     sort_param->sort_keys_length= keys;
477     goto ok;
478 
479 err:
480     DBUG_PRINT("error", ("got some error"));
481     sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
482     my_free(sort_keys);
483     sort_param->sort_keys= 0;
484     delete_dynamic(& sort_param->buffpek);
485     close_cached_file(&sort_param->tempfile);
486     close_cached_file(&sort_param->tempfile_for_exceptions);
487 
488 ok:
489     free_root(&sort_param->wordroot, MYF(0));
490     /*
491       Detach from the share if the writer is involved. Avoid others to
492       be blocked. This includes a flush of the write buffer. This will
493       also indicate EOF to the readers.
494       That means that a writer always gets here first and readers -
495       only when they see EOF. But if a reader finishes prematurely
496       because of an error it may reach this earlier - don't allow it
497       to detach the writer thread.
498     */
499     if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
500       remove_io_thread(&sort_param->sort_info->info->rec_cache);
501 
502     /* Readers detach from the share if any. Avoid others to be blocked. */
503     if (sort_param->read_cache.share)
504       remove_io_thread(&sort_param->read_cache);
505 
506     mysql_mutex_lock(&sort_param->sort_info->mutex);
507     if (!--sort_param->sort_info->threads_running)
508       mysql_cond_signal(&sort_param->sort_info->cond);
509     mysql_mutex_unlock(&sort_param->sort_info->mutex);
510     DBUG_PRINT("exit", ("======== ending thread ========"));
511   }
512   my_thread_end();
513   return NULL;
514 }
515 
516 
thr_write_keys(MI_SORT_PARAM * sort_param)517 int thr_write_keys(MI_SORT_PARAM *sort_param)
518 {
519   SORT_INFO *sort_info=sort_param->sort_info;
520   MI_CHECK *param=sort_info->param;
521   ulong UNINIT_VAR(length), keys;
522   ulong *rec_per_key_part=param->rec_per_key_part;
523   int got_error=sort_info->got_error;
524   uint i;
525   MI_INFO *info=sort_info->info;
526   MYISAM_SHARE *share=info->s;
527   MI_SORT_PARAM *sinfo;
528   uchar *mergebuf=0;
529   DBUG_ENTER("thr_write_keys");
530   LINT_INIT(length);
531 
532   for (i= 0, sinfo= sort_param ;
533        i < sort_info->total_keys ;
534        i++, sinfo++)
535   {
536     if (!sinfo->sort_keys)
537     {
538       got_error=1;
539       my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
540       continue;
541     }
542     if (!got_error)
543     {
544       mi_set_key_active(share->state.key_map, sinfo->key);
545       if (!sinfo->buffpek.elements)
546       {
547         if (param->testflag & T_VERBOSE)
548         {
549           printf("Key %d  - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
550           fflush(stdout);
551         }
552         if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
553             flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
554           got_error=1;
555       }
556     }
557     my_free(sinfo->sort_keys);
558     my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
559     sinfo->sort_keys=0;
560   }
561 
562   for (i= 0, sinfo= sort_param ;
563        i < sort_info->total_keys ;
564        i++,
565 	 delete_dynamic(&sinfo->buffpek),
566 	 close_cached_file(&sinfo->tempfile),
567 	 close_cached_file(&sinfo->tempfile_for_exceptions),
568          rec_per_key_part+= sinfo->keyinfo->keysegs, sinfo++)
569   {
570     if (got_error)
571       continue;
572     if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
573     {
574       sinfo->write_keys=write_keys_varlen;
575       sinfo->read_to_buffer=read_to_buffer_varlen;
576       sinfo->write_key=write_merge_key_varlen;
577     }
578     else
579     {
580       sinfo->write_keys=write_keys;
581       sinfo->read_to_buffer=read_to_buffer;
582       sinfo->write_key=write_merge_key;
583     }
584     if (sinfo->buffpek.elements)
585     {
586       uint maxbuffer=sinfo->buffpek.elements-1;
587       if (!mergebuf)
588       {
589         length=param->sort_buffer_length;
590         while (length >= MIN_SORT_BUFFER)
591         {
592           if ((mergebuf= (uchar *) my_malloc(length, MYF(0))))
593               break;
594           length=length*3/4;
595         }
596         if (!mergebuf)
597         {
598           got_error=1;
599           continue;
600         }
601       }
602       keys=length/sinfo->key_length;
603       if (maxbuffer >= MERGEBUFF2)
604       {
605         if (param->testflag & T_VERBOSE)
606           printf("Key %d  - Merging %u keys\n",sinfo->key+1, sinfo->keys);
607         if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
608 			    dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
609 			    (int*) &maxbuffer, &sinfo->tempfile))
610         {
611           got_error=1;
612           continue;
613         }
614       }
615       if (flush_io_cache(&sinfo->tempfile) ||
616           reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
617       {
618         got_error=1;
619         continue;
620       }
621       if (param->testflag & T_VERBOSE)
622         printf("Key %d  - Last merge and dumping keys\n", sinfo->key+1);
623       if (merge_index(sinfo, keys, (uchar **)mergebuf,
624                       dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
625                       maxbuffer,&sinfo->tempfile) ||
626           flush_ft_buf(sinfo) ||
627 	  flush_pending_blocks(sinfo))
628       {
629         got_error=1;
630         continue;
631       }
632     }
633     if (my_b_inited(&sinfo->tempfile_for_exceptions))
634     {
635       uint key_length;
636 
637       if (param->testflag & T_VERBOSE)
638         printf("Key %d  - Dumping 'long' keys\n", sinfo->key+1);
639 
640       if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
641           reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
642       {
643         got_error=1;
644         continue;
645       }
646 
647       while (!got_error &&
648 	     !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
649 			sizeof(key_length)))
650       {
651         uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
652         if (key_length > sizeof(ft_buf) ||
653             my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
654                       (uint)key_length) ||
655             _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
656                          key_length - info->s->rec_reflength))
657           got_error=1;
658       }
659     }
660     if (!got_error && param->testflag & T_STATISTICS)
661       update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
662                        param->stats_method == MI_STATS_METHOD_IGNORE_NULLS ?
663                        sinfo->notnull : NULL,
664                        (ulonglong) info->state->records);
665   }
666   my_free(mergebuf);
667   DBUG_RETURN(got_error);
668 }
669 
670         /* Write all keys in memory to file for later merge */
671 
write_keys(MI_SORT_PARAM * info,register uchar ** sort_keys,uint count,BUFFPEK * buffpek,IO_CACHE * tempfile)672 static int write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
673                       uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
674 {
675   uchar **end;
676   uint sort_length=info->key_length;
677   DBUG_ENTER("write_keys");
678 
679   std::sort(sort_keys, sort_keys + count, Key_compare(info));
680 
681   if (!my_b_inited(tempfile) &&
682       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
683                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
684     DBUG_RETURN(1); /* purecov: inspected */
685 
686   buffpek->file_pos=my_b_tell(tempfile);
687   buffpek->count=count;
688 
689   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
690   {
691     if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
692       DBUG_RETURN(1); /* purecov: inspected */
693   }
694   DBUG_RETURN(0);
695 } /* write_keys */
696 
697 
698 static inline int
my_var_write(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * bufs)699 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
700 {
701   int err;
702   uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
703 
704   /* The following is safe as this is a local file */
705   if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
706     return (err);
707   if ((err= my_b_write(to_file,bufs, (uint) len)))
708     return (err);
709   return (0);
710 }
711 
712 
write_keys_varlen(MI_SORT_PARAM * info,register uchar ** sort_keys,uint count,BUFFPEK * buffpek,IO_CACHE * tempfile)713 static int write_keys_varlen(MI_SORT_PARAM *info,
714                              register uchar **sort_keys,
715                              uint count, BUFFPEK *buffpek,
716                              IO_CACHE *tempfile)
717 {
718   uchar **end;
719   int err;
720   DBUG_ENTER("write_keys_varlen");
721 
722   std::sort(sort_keys, sort_keys + count, Key_compare(info));
723 
724   if (!my_b_inited(tempfile) &&
725       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
726                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
727     DBUG_RETURN(1); /* purecov: inspected */
728 
729   buffpek->file_pos=my_b_tell(tempfile);
730   buffpek->count=count;
731   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
732   {
733     if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
734       DBUG_RETURN(err);
735   }
736   DBUG_RETURN(0);
737 } /* write_keys_varlen */
738 
739 
write_key(MI_SORT_PARAM * info,uchar * key,IO_CACHE * tempfile)740 static int write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile)
741 {
742   uint key_length=info->real_key_length;
743   DBUG_ENTER("write_key");
744 
745   if (!my_b_inited(tempfile) &&
746       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
747                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
748     DBUG_RETURN(1);
749 
750   if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
751       my_b_write(tempfile,(uchar*)key,(uint) key_length))
752     DBUG_RETURN(1);
753   DBUG_RETURN(0);
754 } /* write_key */
755 
756 
757 /* Write index */
758 
write_index(MI_SORT_PARAM * info,register uchar ** sort_keys,register uint count)759 static int write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
760                        register uint count)
761 {
762   DBUG_ENTER("write_index");
763 
764   std::sort(sort_keys, sort_keys + count, Key_compare(info));
765 
766   while (count--)
767   {
768     if ((*info->key_write)(info,*sort_keys++))
769       DBUG_RETURN(-1); /* purecov: inspected */
770   }
771   DBUG_RETURN(0);
772 } /* write_index */
773 
774 
775         /* Merge buffers to make < MERGEBUFF2 buffers */
776 
merge_many_buff(MI_SORT_PARAM * info,uint keys,uchar ** sort_keys,BUFFPEK * buffpek,int * maxbuffer,IO_CACHE * t_file)777 static int merge_many_buff(MI_SORT_PARAM *info, uint keys,
778                            uchar **sort_keys, BUFFPEK *buffpek,
779                            int *maxbuffer, IO_CACHE *t_file)
780 {
781   register int i;
782   IO_CACHE t_file2, *from_file, *to_file, *temp;
783   BUFFPEK *lastbuff;
784   DBUG_ENTER("merge_many_buff");
785 
786   if (*maxbuffer < MERGEBUFF2)
787     DBUG_RETURN(0);                             /* purecov: inspected */
788   if (flush_io_cache(t_file) ||
789       open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
790                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
791     DBUG_RETURN(1);                             /* purecov: inspected */
792 
793   from_file= t_file ; to_file= &t_file2;
794   while (*maxbuffer >= MERGEBUFF2)
795   {
796     reinit_io_cache(from_file,READ_CACHE,0L,0,0);
797     reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
798     lastbuff=buffpek;
799     for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
800     {
801       if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
802                         buffpek+i,buffpek+i+MERGEBUFF-1))
803         goto cleanup;
804     }
805     if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
806                       buffpek+i,buffpek+ *maxbuffer))
807       break; /* purecov: inspected */
808     if (flush_io_cache(to_file))
809       break;                                    /* purecov: inspected */
810     temp=from_file; from_file=to_file; to_file=temp;
811     *maxbuffer= (int) (lastbuff-buffpek)-1;
812   }
813 cleanup:
814   close_cached_file(to_file);                   /* This holds old result */
815   if (to_file == t_file)
816   {
817     DBUG_ASSERT(t_file2.type == WRITE_CACHE);
818     *t_file=t_file2;                            /* Copy result file */
819     t_file->current_pos= &t_file->write_pos;
820     t_file->current_end= &t_file->write_end;
821   }
822 
823   DBUG_RETURN(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
824 } /* merge_many_buff */
825 
826 
827 /*
828    Read data to buffer
829 
830   SYNOPSIS
831     read_to_buffer()
832     fromfile		File to read from
833     buffpek		Where to read from
834     sort_length		max length to read
835   RESULT
836     > 0	Ammount of bytes read
837     -1	Error
838 */
839 
read_to_buffer(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)840 static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
841                            uint sort_length)
842 {
843   register uint count;
844   uint length;
845 
846   if ((count=(uint) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
847   {
848     if (mysql_file_pread(fromfile->file, (uchar*) buffpek->base,
849                          (length= sort_length*count),
850                          buffpek->file_pos, MYF_RW))
851       return((uint) -1);                        /* purecov: inspected */
852     buffpek->key=buffpek->base;
853     buffpek->file_pos+= length;                 /* New filepos */
854     buffpek->count-=    count;
855     buffpek->mem_count= count;
856   }
857   return (count*sort_length);
858 } /* read_to_buffer */
859 
read_to_buffer_varlen(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)860 static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
861                                   uint sort_length)
862 {
863   register uint count;
864   uint16 length_of_key = 0;
865   uint idx;
866   uchar *buffp;
867 
868   if ((count=(uint) MY_MIN((ha_rows) buffpek->max_keys, buffpek->count)))
869   {
870     buffp = buffpek->base;
871 
872     for (idx=1;idx<=count;idx++)
873     {
874       if (mysql_file_pread(fromfile->file, (uchar*)&length_of_key,
875                            sizeof(length_of_key), buffpek->file_pos, MYF_RW))
876         return((uint) -1);
877       buffpek->file_pos+=sizeof(length_of_key);
878       if (mysql_file_pread(fromfile->file, (uchar*) buffp,
879                            length_of_key, buffpek->file_pos, MYF_RW))
880         return((uint) -1);
881       buffpek->file_pos+=length_of_key;
882       buffp = buffp + sort_length;
883     }
884     buffpek->key=buffpek->base;
885     buffpek->count-=    count;
886     buffpek->mem_count= count;
887   }
888   return (count*sort_length);
889 } /* read_to_buffer_varlen */
890 
891 
write_merge_key_varlen(MI_SORT_PARAM * info,IO_CACHE * to_file,uchar * key,uint sort_length,uint count)892 static int write_merge_key_varlen(MI_SORT_PARAM *info,
893                                   IO_CACHE *to_file, uchar* key,
894                                   uint sort_length, uint count)
895 {
896   uint idx;
897   uchar *bufs = key;
898 
899   for (idx=1;idx<=count;idx++)
900   {
901     int err;
902     if ((err= my_var_write(info, to_file, bufs)))
903       return (err);
904     bufs=bufs+sort_length;
905   }
906   return(0);
907 }
908 
909 
write_merge_key(MI_SORT_PARAM * info MY_ATTRIBUTE ((unused)),IO_CACHE * to_file,uchar * key,uint sort_length,uint count)910 static int write_merge_key(MI_SORT_PARAM *info MY_ATTRIBUTE((unused)),
911                            IO_CACHE *to_file, uchar *key,
912                            uint sort_length, uint count)
913 {
914   return my_b_write(to_file, key, (size_t) sort_length*count);
915 }
916 
917 /*
918   Merge buffers to one buffer
919   If to_file == 0 then use info->key_write
920 */
921 
922 static int
merge_buffers(MI_SORT_PARAM * info,uint keys,IO_CACHE * from_file,IO_CACHE * to_file,uchar ** sort_keys,BUFFPEK * lastbuff,BUFFPEK * Fb,BUFFPEK * Tb)923 merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
924               IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
925               BUFFPEK *Fb, BUFFPEK *Tb)
926 {
927   int error;
928   uint sort_length,maxcount;
929   ha_rows count;
930   my_off_t UNINIT_VAR(to_start_filepos);
931   uchar *strpos;
932   BUFFPEK *buffpek,**refpek;
933   QUEUE queue;
934   volatile int *killed= killed_ptr(info->sort_info->param);
935   DBUG_ENTER("merge_buffers");
936 
937   count=error=0;
938   maxcount=keys/((uint) (Tb-Fb) +1);
939   DBUG_ASSERT(maxcount > 0);
940   LINT_INIT(to_start_filepos);
941   if (to_file)
942     to_start_filepos=my_b_tell(to_file);
943   strpos=(uchar*) sort_keys;
944   sort_length=info->key_length;
945 
946   if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
947                  (int (*)(void*, uchar *,uchar*)) info->key_cmp,
948                  (void*) info))
949     DBUG_RETURN(1); /* purecov: inspected */
950 
951   for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
952   {
953     count+= buffpek->count;
954     buffpek->base= strpos;
955     buffpek->max_keys=maxcount;
956     strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
957                                                       sort_length));
958     if (error == -1)
959       goto err; /* purecov: inspected */
960     queue_insert(&queue,(uchar*) buffpek);
961   }
962 
963   while (queue.elements > 1)
964   {
965     for (;;)
966     {
967       if (*killed)
968       {
969         error=1; goto err;
970       }
971       buffpek=(BUFFPEK*) queue_top(&queue);
972       if (to_file)
973       {
974         if (info->write_key(info,to_file,(uchar*) buffpek->key,
975                             (uint) sort_length,1))
976         {
977           error=1; goto err; /* purecov: inspected */
978         }
979       }
980       else
981       {
982         if ((*info->key_write)(info,(void*) buffpek->key))
983         {
984           error=1; goto err; /* purecov: inspected */
985         }
986       }
987       buffpek->key+=sort_length;
988       if (! --buffpek->mem_count)
989       {
990         if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
991         {
992           uchar *base=buffpek->base;
993           uint max_keys=buffpek->max_keys;
994 
995           (void) queue_remove(&queue,0);
996 
997           /* Put room used by buffer to use in other buffer */
998           for (refpek= (BUFFPEK**) &queue_top(&queue);
999                refpek <= (BUFFPEK**) &queue_end(&queue);
1000                refpek++)
1001           {
1002             buffpek= *refpek;
1003             if (buffpek->base+buffpek->max_keys*sort_length == base)
1004             {
1005               buffpek->max_keys+=max_keys;
1006               break;
1007             }
1008             else if (base+max_keys*sort_length == buffpek->base)
1009             {
1010               buffpek->base=base;
1011               buffpek->max_keys+=max_keys;
1012               break;
1013             }
1014           }
1015           break;                /* One buffer have been removed */
1016         }
1017       }
1018       else if (error == -1)
1019         goto err;               /* purecov: inspected */
1020       queue_replaced(&queue);   /* Top element has been replaced */
1021     }
1022   }
1023   buffpek=(BUFFPEK*) queue_top(&queue);
1024   buffpek->base=(uchar *) sort_keys;
1025   buffpek->max_keys=keys;
1026   do
1027   {
1028     if (to_file)
1029     {
1030       if (info->write_key(info,to_file,(uchar*) buffpek->key,
1031                          sort_length,buffpek->mem_count))
1032       {
1033         error=1; goto err; /* purecov: inspected */
1034       }
1035     }
1036     else
1037     {
1038       register uchar *end;
1039       strpos= buffpek->key;
1040       for (end=strpos+buffpek->mem_count*sort_length;
1041            strpos != end ;
1042            strpos+=sort_length)
1043       {
1044         if ((*info->key_write)(info,(void*) strpos))
1045         {
1046           error=1; goto err; /* purecov: inspected */
1047         }
1048       }
1049     }
1050   }
1051   while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
1052          error != 0);
1053 
1054   lastbuff->count=count;
1055   if (to_file)
1056     lastbuff->file_pos=to_start_filepos;
1057 err:
1058   delete_queue(&queue);
1059   DBUG_RETURN(error);
1060 } /* merge_buffers */
1061 
1062 
1063         /* Do a merge to output-file (save only positions) */
1064 
1065 static int
merge_index(MI_SORT_PARAM * info,uint keys,uchar ** sort_keys,BUFFPEK * buffpek,int maxbuffer,IO_CACHE * tempfile)1066 merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
1067             BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
1068 {
1069   DBUG_ENTER("merge_index");
1070   if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1071                     buffpek+maxbuffer))
1072     DBUG_RETURN(1); /* purecov: inspected */
1073   DBUG_RETURN(0);
1074 } /* merge_index */
1075 
1076 static int
flush_ft_buf(MI_SORT_PARAM * info)1077 flush_ft_buf(MI_SORT_PARAM *info)
1078 {
1079   int err=0;
1080   if (info->sort_info->ft_buf)
1081   {
1082     err=sort_ft_buf_flush(info);
1083     my_free(info->sort_info->ft_buf);
1084     info->sort_info->ft_buf=0;
1085   }
1086   return err;
1087 }
1088 
1089