1 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 /*
17   Creates a index for a database by reading keys, sorting them and outputing
18   them in sorted order through MARIA_SORT_INFO functions.
19 */
20 
21 #include "ma_fulltext.h"
22 #include <my_check_opt.h>
23 #if defined(MSDOS) || defined(__WIN__)
24 #include <fcntl.h>
25 #else
26 #include <stddef.h>
27 #endif
28 #include <queues.h>
29 
30 /* static variables */
31 
32 #undef MIN_SORT_MEMORY
33 #undef MYF_RW
34 #undef DISK_BUFFER_SIZE
35 
36 #define MERGEBUFF 15
37 #define MERGEBUFF2 31
38 #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
39 #define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
40 #define DISK_BUFFER_SIZE (IO_SIZE*128)
41 
42 /* How many keys we can keep in memory */
43 typedef ulonglong ha_keys;
44 
45 /*
46  Pointers of functions for store and read keys from temp file
47 */
48 
49 extern void print_error(const char *fmt,...);
50 
51 /* Functions defined in this file */
52 
53 static ha_rows find_all_keys(MARIA_SORT_PARAM *info, ha_keys keys,
54                              uchar **sort_keys,
55                              DYNAMIC_ARRAY *buffpek,uint *maxbuffer,
56                              IO_CACHE *tempfile,
57                              IO_CACHE *tempfile_for_exceptions);
58 static int write_keys(MARIA_SORT_PARAM *info,uchar **sort_keys,
59                       ha_keys count, BUFFPEK *buffpek,IO_CACHE *tempfile);
60 static int write_key(MARIA_SORT_PARAM *info, uchar *key,
61                      IO_CACHE *tempfile);
62 static int write_index(MARIA_SORT_PARAM *info, uchar **sort_keys,
63                       ha_keys count);
64 static int merge_many_buff(MARIA_SORT_PARAM *info, ha_keys keys,
65                            uchar **sort_keys,
66                            BUFFPEK *buffpek, uint *maxbuffer,
67                            IO_CACHE *t_file);
68 static my_off_t read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
69                                uint sort_length);
70 static int merge_buffers(MARIA_SORT_PARAM *info, ha_keys keys,
71                          IO_CACHE *from_file, IO_CACHE *to_file,
72                          uchar **sort_keys, BUFFPEK *lastbuff,
73                          BUFFPEK *Fb, BUFFPEK *Tb);
74 static int merge_index(MARIA_SORT_PARAM *,ha_keys,uchar **,BUFFPEK *, uint,
75                        IO_CACHE *);
76 static int flush_maria_ft_buf(MARIA_SORT_PARAM *info);
77 
78 static int write_keys_varlen(MARIA_SORT_PARAM *info,uchar **sort_keys,
79                              ha_keys count, BUFFPEK *buffpek,
80                              IO_CACHE *tempfile);
81 static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
82                                       uint sort_length);
83 static int write_merge_key(MARIA_SORT_PARAM *info, IO_CACHE *to_file,
84                            uchar *key, uint sort_length, ha_keys count);
85 static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
86                                   IO_CACHE *to_file,
87                                   uchar* key, uint sort_length,
88                                   ha_keys count);
89 static inline int
90 my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
91 
92 /*
93   Sets the appropriate read and write methods for the MARIA_SORT_PARAM
94   based on the variable length key flag.
95 */
set_sort_param_read_write(MARIA_SORT_PARAM * sort_param)96 static void set_sort_param_read_write(MARIA_SORT_PARAM *sort_param)
97 {
98   if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
99   {
100     sort_param->write_keys=     write_keys_varlen;
101     sort_param->read_to_buffer= read_to_buffer_varlen;
102     sort_param->write_key=      write_merge_key_varlen;
103   }
104   else
105   {
106     sort_param->write_keys=     write_keys;
107     sort_param->read_to_buffer= read_to_buffer;
108     sort_param->write_key=      write_merge_key;
109   }
110 }
111 
112 
113 /*
114   Creates a index of sorted keys
115 
116   SYNOPSIS
117     _ma_create_index_by_sort()
118     info		Sort parameters
119     no_messages		Set to 1 if no output
120     sortbuff_size	Size of sortbuffer to allocate
121 
122   RESULT
123     0	ok
124    <> 0 Error
125 */
126 
_ma_create_index_by_sort(MARIA_SORT_PARAM * info,my_bool no_messages,size_t sortbuff_size)127 int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
128                              size_t sortbuff_size)
129 {
130   int error;
131   uint sort_length, maxbuffer;
132   size_t memavl, old_memavl;
133   DYNAMIC_ARRAY buffpek;
134   ha_rows records, UNINIT_VAR(keys);
135   uchar **sort_keys;
136   IO_CACHE tempfile, tempfile_for_exceptions;
137   DBUG_ENTER("_ma_create_index_by_sort");
138   DBUG_PRINT("enter",("sort_buff_size: %lu  sort_length: %d  max_records: %lu",
139                       (ulong) sortbuff_size, info->key_length,
140                       (ulong) info->sort_info->max_records));
141 
142   set_sort_param_read_write(info);
143 
144   my_b_clear(&tempfile);
145   my_b_clear(&tempfile_for_exceptions);
146   bzero((char*) &buffpek,sizeof(buffpek));
147   sort_keys= (uchar **) NULL; error= 1;
148   maxbuffer=1;
149 
150   memavl=MY_MAX(sortbuff_size,MIN_SORT_MEMORY);
151   records=	info->sort_info->max_records;
152   sort_length=	info->key_length;
153 
154   while (memavl >= MIN_SORT_MEMORY)
155   {
156     /* Check if we can fit all keys into memory */
157     if (((ulonglong) (records + 1) *
158          (sort_length + sizeof(char*)) <= memavl))
159       keys= records+1;
160     else if ((info->sort_info->param->testflag &
161               (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) ==
162              T_FORCE_SORT_MEMORY)
163     {
164       /*
165         Use all of the given sort buffer for key data.
166         Allocate 1000 buffers at a start for new data. More buffers
167         will be allocated when needed.
168       */
169       keys= memavl / (sort_length+sizeof(char*));
170       maxbuffer= (uint) MY_MIN((ulonglong) 1000, (records / keys)+1);
171     }
172     else
173     {
174       /*
175         All keys can't fit in memory.
176         Calculate how many keys + buffers we can keep in memory
177       */
178       uint maxbuffer_org;
179       do
180       {
181 	maxbuffer_org= maxbuffer;
182 	if (memavl < sizeof(BUFFPEK) * maxbuffer ||
183 	    (keys= (memavl-sizeof(BUFFPEK)*maxbuffer)/
184              (sort_length+sizeof(char*))) <= 1 ||
185             keys < maxbuffer)
186 	{
187 	  _ma_check_print_error(info->sort_info->param,
188                                 "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu  rows: %llu  sort_length: %u",
189                                 (ulonglong) sortbuff_size, (ulonglong) records,
190                                 sort_length);
191           my_errno= ENOMEM;
192 	  goto err;
193 	}
194       }
195       while ((maxbuffer= (uint) (records/(keys-1)+1)) != maxbuffer_org);
196     }
197 
198     if ((sort_keys= ((uchar**)
199                      my_malloc((size_t) (keys*(sort_length+sizeof(char*))+
200                                          HA_FT_MAXBYTELEN),
201                                MYF(0)))))
202     {
203       if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
204                                 MY_MIN(maxbuffer/2, 1000), MYF(0)))
205       {
206 	my_free(sort_keys);
207         sort_keys= 0;
208       }
209       else
210 	break;
211     }
212     old_memavl=memavl;
213     if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
214       memavl=MIN_SORT_MEMORY;
215   }
216   if (memavl < MIN_SORT_MEMORY)
217   {
218     /* purecov: begin inspected */
219     _ma_check_print_error(info->sort_info->param,
220                           "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu  rows: %llu  sort_length: %u",
221                           (ulonglong) sortbuff_size, (ulonglong) records, sort_length);
222     my_errno= ENOMEM;
223     goto err;
224     /* purecov: end inspected */
225   }
226   (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
227 
228   if (!no_messages)
229     my_fprintf(stdout,
230                "  - Searching for keys, allocating buffer for %llu keys\n",
231                (ulonglong) keys);
232 
233   if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
234                                   &tempfile,&tempfile_for_exceptions))
235       == HA_POS_ERROR)
236     goto err; /* purecov: tested */
237 
238   info->sort_info->param->stage++;                         /* Merge stage */
239 
240   if (maxbuffer == 0)
241   {
242     if (!no_messages)
243       my_fprintf(stdout, "  - Dumping %llu keys\n", (ulonglong) records);
244     if (write_index(info, sort_keys, (ha_keys) records))
245       goto err; /* purecov: inspected */
246   }
247   else
248   {
249     keys=(keys*(sort_length+sizeof(char*)))/sort_length;
250     if (maxbuffer >= MERGEBUFF2)
251     {
252       if (!no_messages)
253 	my_fprintf(stdout, "  - Merging %llu keys\n",
254                    (ulonglong) records); /* purecov: tested */
255       if (merge_many_buff(info,keys,sort_keys,
256                   dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
257 	goto err;				/* purecov: inspected */
258     }
259     if (flush_io_cache(&tempfile) ||
260 	reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
261       goto err;					/* purecov: inspected */
262     if (!no_messages)
263       printf("  - Last merge and dumping keys\n"); /* purecov: tested */
264     if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
265                     maxbuffer,&tempfile))
266       goto err;					/* purecov: inspected */
267   }
268 
269   if (flush_maria_ft_buf(info) || _ma_flush_pending_blocks(info))
270     goto err;
271 
272   if (my_b_inited(&tempfile_for_exceptions))
273   {
274     MARIA_HA *idx=info->sort_info->info;
275     uint16    key_length;
276     MARIA_KEY key;
277     key.keyinfo= idx->s->keyinfo + info->key;
278 
279     if (!no_messages)
280       printf("  - Adding exceptions\n"); /* purecov: tested */
281     if (flush_io_cache(&tempfile_for_exceptions) ||
282 	reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
283       goto err;
284 
285     while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
286 		      sizeof(key_length))
287         && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
288 		      (uint) key_length))
289     {
290       key.data=       (uchar*) sort_keys;
291       key.ref_length= idx->s->rec_reflength;
292       key.data_length= key_length - key.ref_length;
293       key.flag= 0;
294       if (_ma_ck_write(idx, &key))
295         goto err;
296     }
297   }
298 
299   error =0;
300 
301 err:
302   my_free(sort_keys);
303   delete_dynamic(&buffpek);
304   close_cached_file(&tempfile);
305   close_cached_file(&tempfile_for_exceptions);
306 
307   DBUG_RETURN(error ? -1 : 0);
308 } /* _ma_create_index_by_sort */
309 
310 
311 /* Search after all keys and place them in a temp. file */
312 
find_all_keys(MARIA_SORT_PARAM * info,ha_rows keys,uchar ** sort_keys,DYNAMIC_ARRAY * buffpek,uint * maxbuffer,IO_CACHE * tempfile,IO_CACHE * tempfile_for_exceptions)313 static ha_rows find_all_keys(MARIA_SORT_PARAM *info, ha_rows keys,
314                              uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
315                              uint *maxbuffer, IO_CACHE *tempfile,
316                              IO_CACHE *tempfile_for_exceptions)
317 {
318   int error;
319   ha_rows idx;
320   DBUG_ENTER("find_all_keys");
321 
322   idx=error=0;
323   sort_keys[0]= (uchar*) (sort_keys+keys);
324 
325   info->sort_info->info->in_check_table= 1;
326   while (!(error=(*info->key_read)(info,sort_keys[idx])))
327   {
328     if (info->real_key_length > info->key_length)
329     {
330       if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
331         goto err;                             /* purecov: inspected */
332       continue;
333     }
334 
335     if (++idx == keys)
336     {
337       if (info->write_keys(info,sort_keys,idx-1,
338                            (BUFFPEK *)alloc_dynamic(buffpek),
339                            tempfile))
340         goto err;                             /* purecov: inspected */
341 
342       sort_keys[0]=(uchar*) (sort_keys+keys);
343       memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
344       idx=1;
345     }
346     sort_keys[idx]=sort_keys[idx-1]+info->key_length;
347   }
348   if (error > 0)
349     goto err;                             /* purecov: inspected */
350   if (buffpek->elements)
351   {
352     if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
353                          tempfile))
354       goto err;                         /* purecov: inspected */
355     *maxbuffer=buffpek->elements-1;
356   }
357   else
358     *maxbuffer=0;
359 
360   info->sort_info->info->in_check_table= 0;
361   DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
362 
363 err:
364   info->sort_info->info->in_check_table= 0;   /* purecov: inspected */
365   DBUG_RETURN(HA_POS_ERROR);                  /* purecov: inspected */
366 } /* find_all_keys */
367 
368 
_ma_thr_find_all_keys_exec(MARIA_SORT_PARAM * sort_param)369 static my_bool _ma_thr_find_all_keys_exec(MARIA_SORT_PARAM* sort_param)
370 {
371   int error= 0;
372   ulonglong memavl, old_memavl;
373   longlong sortbuff_size;
374   ha_keys UNINIT_VAR(keys), idx;
375   uint sort_length;
376   uint maxbuffer;
377   uchar **sort_keys= NULL;
378   DBUG_ENTER("_ma_thr_find_all_keys_exec");
379   DBUG_PRINT("enter", ("master: %d", sort_param->master));
380 
381   if (sort_param->sort_info->got_error)
382     DBUG_RETURN(TRUE);
383 
384   set_sort_param_read_write(sort_param);
385 
386   my_b_clear(&sort_param->tempfile);
387   my_b_clear(&sort_param->tempfile_for_exceptions);
388   bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
389   bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
390 
391   sortbuff_size= sort_param->sortbuff_size;
392   memavl=       MY_MAX(sortbuff_size, MIN_SORT_MEMORY);
393   idx=          (ha_keys) sort_param->sort_info->max_records;
394   sort_length=  sort_param->key_length;
395   maxbuffer=    1;
396 
397   while (memavl >= MIN_SORT_MEMORY)
398   {
399     if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= (my_off_t) memavl)
400       keys= idx+1;
401     else if ((sort_param->sort_info->param->testflag &
402               (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) ==
403              T_FORCE_SORT_MEMORY)
404     {
405       /*
406         Use all of the given sort buffer for key data.
407         Allocate 1000 buffers at a start for new data. More buffers
408         will be allocated when needed.
409       */
410       keys= memavl / (sort_length+sizeof(char*));
411       maxbuffer= (uint) MY_MIN((ulonglong) 1000, (idx / keys)+1);
412     }
413     else
414     {
415       uint maxbuffer_org;
416       do
417       {
418         maxbuffer_org= maxbuffer;
419         if (memavl < sizeof(BUFFPEK)*maxbuffer ||
420             (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
421              (sort_length+sizeof(char*))) <= 1 ||
422             keys < maxbuffer)
423         {
424           _ma_check_print_error(sort_param->sort_info->param,
425                                 "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu  rows: %llu  sort_length: %u",
426                                 sortbuff_size, (ulonglong) idx, sort_length);
427           goto err;
428         }
429       }
430       while ((maxbuffer= (uint) (idx/(keys-1)+1)) != maxbuffer_org);
431     }
432     if ((sort_keys= (uchar **)
433          my_malloc((size_t)(keys*(sort_length+sizeof(char*))+
434                    ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
435                     HA_FT_MAXBYTELEN : 0)), MYF(0))))
436     {
437       if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
438                              maxbuffer, MY_MIN(maxbuffer / 2, 1000), MYF(0)))
439       {
440         my_free(sort_keys);
441         sort_keys= NULL;          /* Safety against double free on error. */
442       }
443       else
444         break;
445     }
446     old_memavl= memavl;
447     if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
448         old_memavl > MIN_SORT_MEMORY)
449       memavl= MIN_SORT_MEMORY;
450   }
451   if (memavl < MIN_SORT_MEMORY)
452   {
453     /* purecov: begin inspected */
454     _ma_check_print_error(sort_param->sort_info->param,
455                           "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu  rows: %llu  sort_length: %u",
456                           sortbuff_size, (ulonglong) idx, sort_length);
457     my_errno= ENOMEM;
458     goto err;
459   /* purecov: end inspected */
460   }
461 
462   if (sort_param->sort_info->param->testflag & T_VERBOSE)
463     my_fprintf(stdout,
464                "Key %d - Allocating buffer for %llu keys\n",
465                sort_param->key + 1, (ulonglong) keys);
466   sort_param->sort_keys= sort_keys;
467 
468   idx= error= 0;
469   sort_keys[0]= (uchar*) (sort_keys+keys);
470 
471   DBUG_PRINT("info", ("reading keys"));
472   while (!(error= sort_param->sort_info->got_error) &&
473          !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
474   {
475     if (sort_param->real_key_length > sort_param->key_length)
476     {
477       if (write_key(sort_param, sort_keys[idx],
478                     &sort_param->tempfile_for_exceptions))
479         goto err;
480       continue;
481     }
482 
483     if (++idx == keys)
484     {
485       if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
486                                  (BUFFPEK *)alloc_dynamic(&sort_param->buffpek),
487                                  &sort_param->tempfile))
488         goto err;
489       sort_keys[0]= (uchar*) (sort_keys+keys);
490       memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
491       idx= 1;
492     }
493     sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
494   }
495   if (error > 0)
496     goto err;
497   if (sort_param->buffpek.elements)
498   {
499     if (sort_param->write_keys(sort_param,sort_keys, idx,
500                                (BUFFPEK *) alloc_dynamic(&sort_param->buffpek),
501                                &sort_param->tempfile))
502       goto err;
503     sort_param->keys= (uint)((sort_param->buffpek.elements - 1) * (keys - 1) + idx);
504   }
505   else
506     sort_param->keys= (uint)idx;
507 
508   DBUG_RETURN(FALSE);
509 
510 err:
511   DBUG_PRINT("error", ("got some error"));
512   my_free(sort_keys);
513   sort_param->sort_keys= 0;
514   delete_dynamic(& sort_param->buffpek);
515   close_cached_file(&sort_param->tempfile);
516   close_cached_file(&sort_param->tempfile_for_exceptions);
517 
518   DBUG_RETURN(TRUE);
519 }
520 
521 /* Search after all keys and place them in a temp. file */
522 
_ma_thr_find_all_keys(void * arg)523 pthread_handler_t _ma_thr_find_all_keys(void *arg)
524 {
525   MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg;
526   my_bool error= FALSE;
527   /* If my_thread_init fails */
528   if (my_thread_init() || _ma_thr_find_all_keys_exec(sort_param))
529     error= TRUE;
530 
531   /*
532      Thread must clean up after itself.
533   */
534   free_root(&sort_param->wordroot, MYF(0));
535   /*
536     Detach from the share if the writer is involved. Avoid others to
537     be blocked. This includes a flush of the write buffer. This will
538     also indicate EOF to the readers.
539     That means that a writer always gets here first and readers -
540     only when they see EOF. But if a reader finishes prematurely
541     because of an error it may reach this earlier - don't allow it
542     to detach the writer thread.
543   */
544   if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
545     remove_io_thread(&sort_param->sort_info->info->rec_cache);
546 
547   /* Readers detach from the share if any. Avoid others to be blocked. */
548   if (sort_param->read_cache.share)
549     remove_io_thread(&sort_param->read_cache);
550 
551   mysql_mutex_lock(&sort_param->sort_info->mutex);
552   if (error)
553     sort_param->sort_info->got_error= 1;
554 
555   if (!--sort_param->sort_info->threads_running)
556     mysql_cond_signal(&sort_param->sort_info->cond);
557   mysql_mutex_unlock(&sort_param->sort_info->mutex);
558 
559   my_thread_end();
560   return NULL;
561 }
562 
563 
_ma_thr_write_keys(MARIA_SORT_PARAM * sort_param)564 int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
565 {
566   MARIA_SORT_INFO *sort_info=sort_param->sort_info;
567   HA_CHECK *param=sort_info->param;
568   size_t UNINIT_VAR(length), keys;
569   double *rec_per_key_part= param->new_rec_per_key_part;
570   int got_error=sort_info->got_error;
571   uint i;
572   MARIA_HA *info=sort_info->info;
573   MARIA_SHARE *share= info->s;
574   MARIA_SORT_PARAM *sinfo;
575   uchar *mergebuf=0;
576   DBUG_ENTER("_ma_thr_write_keys");
577 
578   for (i= 0, sinfo= sort_param ;
579        i < sort_info->total_keys ;
580        i++, sinfo++)
581   {
582     if (!sinfo->sort_keys)
583     {
584       got_error=1;
585       my_free(sinfo->rec_buff);
586       continue;
587     }
588     if (!got_error)
589     {
590       maria_set_key_active(share->state.key_map, sinfo->key);
591 
592       if (!sinfo->buffpek.elements)
593       {
594         if (param->testflag & T_VERBOSE)
595         {
596           my_fprintf(stdout,
597                      "Key %d  - Dumping %llu keys\n", sinfo->key+1,
598                      (ulonglong) sinfo->keys);
599           fflush(stdout);
600         }
601         if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
602             flush_maria_ft_buf(sinfo) || _ma_flush_pending_blocks(sinfo))
603           got_error=1;
604       }
605     }
606     my_free(sinfo->sort_keys);
607     my_free(sinfo->rec_buff);
608     sinfo->sort_keys=0;
609   }
610 
611   for (i= 0, sinfo= sort_param ;
612        i < sort_info->total_keys ;
613        i++,
614 	 delete_dynamic(&sinfo->buffpek),
615 	 close_cached_file(&sinfo->tempfile),
616 	 close_cached_file(&sinfo->tempfile_for_exceptions),
617          rec_per_key_part+= sinfo->keyinfo->keysegs,
618 	 sinfo++)
619   {
620     if (got_error)
621       continue;
622 
623     set_sort_param_read_write(sinfo);
624 
625     if (sinfo->buffpek.elements)
626     {
627       uint maxbuffer=sinfo->buffpek.elements-1;
628       if (!mergebuf)
629       {
630         length=(size_t)param->sort_buffer_length;
631         while (length >= MIN_SORT_MEMORY)
632         {
633           if ((mergebuf= my_malloc((size_t) length, MYF(0))))
634               break;
635           length=length*3/4;
636         }
637         if (!mergebuf)
638         {
639           got_error=1;
640           continue;
641         }
642       }
643       keys=length/sinfo->key_length;
644       if (maxbuffer >= MERGEBUFF2)
645       {
646         if (param->testflag & T_VERBOSE)
647           my_fprintf(stdout,
648                      "Key %d  - Merging %llu keys\n",
649                      sinfo->key+1, (ulonglong) sinfo->keys);
650         if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
651 			    dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
652 			    &maxbuffer, &sinfo->tempfile))
653         {
654           got_error=1;
655           continue;
656         }
657       }
658       if (flush_io_cache(&sinfo->tempfile) ||
659           reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
660       {
661         got_error=1;
662         continue;
663       }
664       if (param->testflag & T_VERBOSE)
665         printf("Key %d  - Last merge and dumping keys\n", sinfo->key+1);
666       if (merge_index(sinfo, keys, (uchar**) mergebuf,
667                       dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
668                       maxbuffer,&sinfo->tempfile) ||
669           flush_maria_ft_buf(sinfo) ||
670 	  _ma_flush_pending_blocks(sinfo))
671       {
672         got_error=1;
673         continue;
674       }
675     }
676     if (my_b_inited(&sinfo->tempfile_for_exceptions))
677     {
678       uint16 key_length;
679 
680       if (param->testflag & T_VERBOSE)
681         printf("Key %d  - Dumping 'long' keys\n", sinfo->key+1);
682 
683       if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
684           reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
685       {
686         got_error=1;
687         continue;
688       }
689 
690       while (!got_error &&
691 	     !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
692 			sizeof(key_length)))
693       {
694         uchar maria_ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
695         if (key_length > sizeof(maria_ft_buf) ||
696             my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)maria_ft_buf,
697                       (uint) key_length))
698           got_error= 1;
699         else
700         {
701           MARIA_KEY tmp_key;
702           tmp_key.keyinfo= info->s->keyinfo + sinfo->key;
703           tmp_key.data= maria_ft_buf;
704           tmp_key.ref_length= info->s->rec_reflength;
705           tmp_key.data_length= key_length - info->s->rec_reflength;
706           tmp_key.flag= 0;
707           if (_ma_ck_write(info, &tmp_key))
708             got_error=1;
709         }
710       }
711     }
712     if (!got_error && (param->testflag & T_STATISTICS))
713       maria_update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
714                              param->stats_method ==
715                              MI_STATS_METHOD_IGNORE_NULLS ?
716                              sinfo->notnull : NULL,
717                              (ulonglong) share->state.state.records);
718 
719   }
720   my_free(mergebuf);
721   DBUG_RETURN(got_error);
722 }
723 
724 
725 /* Write all keys in memory to file for later merge */
726 
write_keys(MARIA_SORT_PARAM * info,register uchar ** sort_keys,ha_keys count,BUFFPEK * buffpek,IO_CACHE * tempfile)727 static int write_keys(MARIA_SORT_PARAM *info, register uchar **sort_keys,
728                       ha_keys count, BUFFPEK *buffpek, IO_CACHE *tempfile)
729 {
730   uchar **end;
731   uint sort_length=info->key_length;
732   DBUG_ENTER("write_keys");
733 
734   if (!buffpek)
735     DBUG_RETURN(1);                             /* Out of memory */
736 
737   my_qsort2((uchar*) sort_keys,(size_t) count, sizeof(uchar*),
738             (qsort2_cmp) info->key_cmp, info);
739   if (!my_b_inited(tempfile) &&
740       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
741                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
742     DBUG_RETURN(1); /* purecov: inspected */
743 
744   buffpek->file_pos=my_b_tell(tempfile);
745   buffpek->count=count;
746 
747   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
748   {
749     if (my_b_write(tempfile, *sort_keys, sort_length))
750       DBUG_RETURN(1); /* purecov: inspected */
751   }
752   DBUG_RETURN(0);
753 } /* write_keys */
754 
755 
756 static inline int
my_var_write(MARIA_SORT_PARAM * info,IO_CACHE * to_file,uchar * bufs)757 my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
758 {
759   int err;
760   uint16 len= _ma_keylength(info->keyinfo, bufs);
761 
762   /* The following is safe as this is a local file */
763   if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
764     return (err);
765   if ((err= my_b_write(to_file,bufs, (uint) len)))
766     return (err);
767   return (0);
768 }
769 
770 
write_keys_varlen(MARIA_SORT_PARAM * info,register uchar ** sort_keys,ha_keys count,BUFFPEK * buffpek,IO_CACHE * tempfile)771 static int write_keys_varlen(MARIA_SORT_PARAM *info,
772                              register uchar **sort_keys,
773                              ha_keys count, BUFFPEK *buffpek,
774                              IO_CACHE *tempfile)
775 {
776   uchar **end;
777   int err;
778   DBUG_ENTER("write_keys_varlen");
779 
780   if (!buffpek)
781     DBUG_RETURN(1);                             /* Out of memory */
782 
783   my_qsort2((uchar*) sort_keys, (size_t) count, sizeof(uchar*),
784             (qsort2_cmp) info->key_cmp, info);
785   if (!my_b_inited(tempfile) &&
786       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
787                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
788     DBUG_RETURN(1); /* purecov: inspected */
789 
790   buffpek->file_pos=my_b_tell(tempfile);
791   buffpek->count=count;
792   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
793   {
794     if ((err= my_var_write(info,tempfile, *sort_keys)))
795       DBUG_RETURN(err);
796   }
797   DBUG_RETURN(0);
798 } /* write_keys_varlen */
799 
800 
write_key(MARIA_SORT_PARAM * info,uchar * key,IO_CACHE * tempfile)801 static int write_key(MARIA_SORT_PARAM *info, uchar *key,
802 			    IO_CACHE *tempfile)
803 {
804   uint16 key_length=info->real_key_length;
805   DBUG_ENTER("write_key");
806 
807   if (!my_b_inited(tempfile) &&
808       open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
809                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
810     DBUG_RETURN(1);
811 
812   if (my_b_write(tempfile, (uchar*)&key_length,sizeof(key_length)) ||
813       my_b_write(tempfile, key, (uint) key_length))
814     DBUG_RETURN(1);
815   DBUG_RETURN(0);
816 } /* write_key */
817 
818 
819 /* Write index */
820 
write_index(MARIA_SORT_PARAM * info,register uchar ** sort_keys,register ha_keys count)821 static int write_index(MARIA_SORT_PARAM *info, register uchar **sort_keys,
822                        register ha_keys count)
823 {
824   DBUG_ENTER("write_index");
825 
826   my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
827             (qsort2_cmp) info->key_cmp,info);
828   while (count--)
829   {
830     if ((*info->key_write)(info, *sort_keys++))
831       DBUG_RETURN(-1); /* purecov: inspected */
832   }
833   if (info->sort_info->param->max_stage != 1)          /* If not parallel */
834     _ma_report_progress(info->sort_info->param, 1, 1);
835   DBUG_RETURN(0);
836 } /* write_index */
837 
838 
839         /* Merge buffers to make < MERGEBUFF2 buffers */
840 
merge_many_buff(MARIA_SORT_PARAM * info,ha_keys keys,uchar ** sort_keys,BUFFPEK * buffpek,uint * maxbuffer,IO_CACHE * t_file)841 static int merge_many_buff(MARIA_SORT_PARAM *info, ha_keys keys,
842                            uchar **sort_keys, BUFFPEK *buffpek,
843                            uint *maxbuffer, IO_CACHE *t_file)
844 {
845   uint tmp, merges, max_merges;
846   IO_CACHE t_file2, *from_file, *to_file, *temp;
847   BUFFPEK *lastbuff;
848   DBUG_ENTER("merge_many_buff");
849 
850   if (*maxbuffer < MERGEBUFF2)
851     DBUG_RETURN(0);                             /* purecov: inspected */
852   if (flush_io_cache(t_file) ||
853       open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
854                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
855     DBUG_RETURN(1);                             /* purecov: inspected */
856 
857   /* Calculate how many merges are needed */
858   max_merges= 1;                                /* Count merge_index */
859   tmp= *maxbuffer;
860   while (tmp >= MERGEBUFF2)
861   {
862     merges= (tmp-MERGEBUFF*3/2 + 1) / MERGEBUFF + 1;
863     max_merges+= merges;
864     tmp= merges;
865   }
866   merges= 0;
867 
868   from_file= t_file ; to_file= &t_file2;
869   while (*maxbuffer >= MERGEBUFF2)
870   {
871     uint i;
872     reinit_io_cache(from_file,READ_CACHE,0L,0,0);
873     reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
874     lastbuff=buffpek;
875     for (i=0 ; i + MERGEBUFF*3/2 <= *maxbuffer ; i+=MERGEBUFF)
876     {
877       if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
878                         buffpek+i,buffpek+i+MERGEBUFF-1))
879         goto cleanup;
880       if (info->sort_info->param->max_stage != 1) /* If not parallel */
881         _ma_report_progress(info->sort_info->param, merges++, max_merges);
882     }
883     if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
884                       buffpek+i,buffpek+ *maxbuffer))
885       break; /* purecov: inspected */
886     if (flush_io_cache(to_file))
887       break;                                    /* purecov: inspected */
888     temp=from_file; from_file=to_file; to_file=temp;
889     *maxbuffer= (uint) (lastbuff-buffpek)-1;
890     if (info->sort_info->param->max_stage != 1) /* If not parallel */
891       _ma_report_progress(info->sort_info->param, merges++, max_merges);
892   }
893 cleanup:
894   close_cached_file(to_file);                   /* This holds old result */
895   if (to_file == t_file)
896   {
897     DBUG_ASSERT(t_file2.type == WRITE_CACHE);
898     *t_file=t_file2;                            /* Copy result file */
899   }
900 
901   DBUG_RETURN(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
902 } /* merge_many_buff */
903 
904 
905 /*
906    Read data to buffer
907 
908   SYNOPSIS
909     read_to_buffer()
910     fromfile		File to read from
911     buffpek		Where to read from
912     sort_length		max length to read
913   RESULT
914     > 0	Ammount of bytes read
915     -1	Error
916 */
917 
read_to_buffer(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)918 static my_off_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
919                                 uint sort_length)
920 {
921   register ha_keys count;
922   size_t length;
923 
924   if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,
925                                (ha_rows) buffpek->count)))
926   {
927     if (my_b_pread(fromfile, (uchar*) buffpek->base,
928                    (length= sort_length * (size_t)count), buffpek->file_pos))
929       return(HA_OFFSET_ERROR);               /* purecov: inspected */
930     buffpek->key=buffpek->base;
931     buffpek->file_pos+= length;                 /* New filepos */
932     buffpek->count-=    count;
933     buffpek->mem_count= count;
934   }
935   return (((my_off_t) count) * sort_length);
936 } /* read_to_buffer */
937 
938 
read_to_buffer_varlen(IO_CACHE * fromfile,BUFFPEK * buffpek,uint sort_length)939 static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
940                                       uint sort_length)
941 {
942   register ha_keys count;
943   uint idx;
944   uchar *buffp;
945 
946   if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
947   {
948     buffp= buffpek->base;
949 
950     for (idx=1;idx<=count;idx++)
951     {
952       uint16 length_of_key;
953       if (my_b_pread(fromfile, (uchar*)&length_of_key,
954                      sizeof(length_of_key), buffpek->file_pos))
955         return(HA_OFFSET_ERROR);
956       buffpek->file_pos+=sizeof(length_of_key);
957       if (my_b_pread(fromfile, (uchar*) buffp,
958                      length_of_key, buffpek->file_pos))
959         return((uint) -1);
960       buffpek->file_pos+=length_of_key;
961       buffp = buffp + sort_length;
962     }
963     buffpek->key=buffpek->base;
964     buffpek->count-=    count;
965     buffpek->mem_count= count;
966   }
967   return (((my_off_t) count) * sort_length);
968 } /* read_to_buffer_varlen */
969 
970 
write_merge_key_varlen(MARIA_SORT_PARAM * info,IO_CACHE * to_file,uchar * key,uint sort_length,ha_keys count)971 static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
972                                   IO_CACHE *to_file, uchar* key,
973                                   uint sort_length, ha_keys count)
974 {
975   ha_keys idx;
976   uchar *bufs = key;
977 
978   for (idx=1;idx<=count;idx++)
979   {
980     int err;
981     if ((err= my_var_write(info, to_file, bufs)))
982       return (err);
983     bufs=bufs+sort_length;
984   }
985   return(0);
986 }
987 
988 
write_merge_key(MARIA_SORT_PARAM * info,IO_CACHE * to_file,uchar * key,uint sort_length,ha_keys count)989 static int write_merge_key(MARIA_SORT_PARAM *info __attribute__((unused)),
990                            IO_CACHE *to_file, uchar *key,
991                            uint sort_length, ha_keys count)
992 {
993   return my_b_write(to_file, key, (size_t) (sort_length * count));
994 }
995 
996 /*
997   Merge buffers to one buffer
998   If to_file == 0 then use info->key_write
999 
1000   Return:
1001   0 ok
1002   1 error
1003 */
1004 
1005 static int
merge_buffers(MARIA_SORT_PARAM * info,ha_keys keys,IO_CACHE * from_file,IO_CACHE * to_file,uchar ** sort_keys,BUFFPEK * lastbuff,BUFFPEK * Fb,BUFFPEK * Tb)1006 merge_buffers(MARIA_SORT_PARAM *info, ha_keys keys, IO_CACHE *from_file,
1007               IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
1008               BUFFPEK *Fb, BUFFPEK *Tb)
1009 {
1010   int error= 1;
1011   uint sort_length;
1012   ha_keys maxcount;
1013   ha_rows count;
1014   my_off_t UNINIT_VAR(to_start_filepos), read_length;
1015   uchar *strpos;
1016   BUFFPEK *buffpek,**refpek;
1017   QUEUE queue;
1018   DBUG_ENTER("merge_buffers");
1019 
1020   count= 0;
1021   maxcount= keys/((uint) (Tb-Fb) +1);
1022   DBUG_ASSERT(maxcount > 0);
1023   if (to_file)
1024     to_start_filepos=my_b_tell(to_file);
1025   strpos= (uchar*) sort_keys;
1026   sort_length=info->key_length;
1027 
1028   if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
1029                  (int (*)(void*, uchar *,uchar*)) info->key_cmp,
1030                  (void*) info, 0, 0))
1031     DBUG_RETURN(1); /* purecov: inspected */
1032 
1033   for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1034   {
1035     count+= buffpek->count;
1036     buffpek->base= strpos;
1037     buffpek->max_keys= maxcount;
1038     strpos+= (read_length= info->read_to_buffer(from_file,buffpek,
1039                                                 sort_length));
1040     if (read_length == HA_OFFSET_ERROR)
1041       goto err; /* purecov: inspected */
1042     queue_insert(&queue,(uchar*) buffpek);
1043   }
1044 
1045   while (queue.elements > 1)
1046   {
1047     for (;;)
1048     {
1049       buffpek=(BUFFPEK*) queue_top(&queue);
1050       if (to_file)
1051       {
1052         if (info->write_key(info,to_file, buffpek->key,
1053                             sort_length, 1))
1054           goto err;                           /* purecov: inspected */
1055       }
1056       else
1057       {
1058         if ((*info->key_write)(info,(void*) buffpek->key))
1059           goto err;                           /* purecov: inspected */
1060       }
1061       buffpek->key+=sort_length;
1062       if (! --buffpek->mem_count)
1063       {
1064         /* It's enough to check for killedptr before a slow operation */
1065         if (_ma_killed_ptr(info->sort_info->param))
1066           goto err;
1067         if (!(read_length= info->read_to_buffer(from_file,buffpek,sort_length)))
1068         {
1069           uchar *base= buffpek->base;
1070           ha_keys max_keys=buffpek->max_keys;
1071 
1072           queue_remove_top(&queue);
1073 
1074           /* Put room used by buffer to use in other buffer */
1075           for (refpek= (BUFFPEK**) &queue_top(&queue);
1076                refpek <= (BUFFPEK**) &queue_end(&queue);
1077                refpek++)
1078           {
1079             buffpek= *refpek;
1080             if (buffpek->base+buffpek->max_keys*sort_length == base)
1081             {
1082               buffpek->max_keys+=max_keys;
1083               break;
1084             }
1085             else if (base+max_keys*sort_length == buffpek->base)
1086             {
1087               buffpek->base=base;
1088               buffpek->max_keys+=max_keys;
1089               break;
1090             }
1091           }
1092           break;                /* One buffer have been removed */
1093         }
1094         else if (read_length == HA_OFFSET_ERROR)
1095           goto err;               /* purecov: inspected */
1096       }
1097       queue_replace_top(&queue);   /* Top element has been replaced */
1098     }
1099   }
1100   buffpek=(BUFFPEK*) queue_top(&queue);
1101   buffpek->base= (uchar*) sort_keys;
1102   buffpek->max_keys=keys;
1103   do
1104   {
1105     if (to_file)
1106     {
1107       if (info->write_key(info, to_file, buffpek->key,
1108                          sort_length,buffpek->mem_count))
1109       {
1110         error=1; goto err; /* purecov: inspected */
1111       }
1112     }
1113     else
1114     {
1115       register uchar *end;
1116       strpos= buffpek->key;
1117       for (end= strpos+buffpek->mem_count*sort_length;
1118            strpos != end ;
1119            strpos+=sort_length)
1120       {
1121         if ((*info->key_write)(info, strpos))
1122         {
1123           error=1; goto err; /* purecov: inspected */
1124         }
1125       }
1126     }
1127   }
1128   while ((read_length= info->read_to_buffer(from_file,buffpek,sort_length)) != HA_OFFSET_ERROR && read_length != 0);
1129   if (read_length == 0)
1130     error= 0;
1131 
1132   lastbuff->count=count;
1133   if (to_file)
1134     lastbuff->file_pos=to_start_filepos;
1135 err:
1136   delete_queue(&queue);
1137   DBUG_RETURN(error);
1138 } /* merge_buffers */
1139 
1140 
1141         /* Do a merge to output-file (save only positions) */
1142 
1143 static int
merge_index(MARIA_SORT_PARAM * info,ha_keys keys,uchar ** sort_keys,BUFFPEK * buffpek,uint maxbuffer,IO_CACHE * tempfile)1144 merge_index(MARIA_SORT_PARAM *info, ha_keys keys, uchar **sort_keys,
1145             BUFFPEK *buffpek, uint maxbuffer, IO_CACHE *tempfile)
1146 {
1147   DBUG_ENTER("merge_index");
1148   if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1149                     buffpek+maxbuffer))
1150     DBUG_RETURN(1); /* purecov: inspected */
1151   if (info->sort_info->param->max_stage != 1)          /* If not parallel */
1152     _ma_report_progress(info->sort_info->param, 1, 1);
1153   DBUG_RETURN(0);
1154 } /* merge_index */
1155 
1156 
flush_maria_ft_buf(MARIA_SORT_PARAM * info)1157 static int flush_maria_ft_buf(MARIA_SORT_PARAM *info)
1158 {
1159   int err=0;
1160   if (info->sort_info->ft_buf)
1161   {
1162     err=_ma_sort_ft_buf_flush(info);
1163     my_free(info->sort_info->ft_buf);
1164     info->sort_info->ft_buf=0;
1165   }
1166   return err;
1167 }
1168