1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 /** @file Temporary tables implementation */
24 
25 #include "sql_tmp_table.h"
26 
27 #include "myisam.h"               // MI_COLUMNDEF
28 #include "debug_sync.h"           // DEBUG_SYNC
29 #include "filesort.h"             // filesort_free_buffers
30 #include "item_func.h"            // Item_func
31 #include "item_sum.h"             // Item_sum
32 #include "mem_root_array.h"       // Mem_root_array
33 #include "opt_range.h"            // QUICK_SELECT_I
34 #include "opt_trace.h"            // Opt_trace_object
35 #include "opt_trace_context.h"    // Opt_trace_context
36 #include "sql_base.h"             // free_io_cache
37 #include "sql_class.h"            // THD
38 #include "sql_executor.h"         // SJ_TMP_TABLE
39 #include "sql_plugin.h"           // plugin_unlock
40 
41 #include <algorithm>
42 
43 using std::max;
44 using std::min;
45 
46 /****************************************************************************
47   Create internal temporary table
48 ****************************************************************************/
49 
50 /**
51   Create field for temporary table from given field.
52 
53   @param thd	       Thread handler
54   @param org_field    field from which new field will be created
55   @param name         New field name
56   @param table	       Temporary table
57   @param item	       !=NULL if item->result_field should point to new field.
58                       This is relevant for how fill_record() is going to work:
59                       If item != NULL then fill_record() will update
60                       the record in the original table.
61                       If item == NULL then fill_record() will update
62                       the temporary table
63 
64   @retval
65     NULL		on error
66   @retval
67     new_created field
68 */
69 
create_tmp_field_from_field(THD * thd,Field * org_field,const char * name,TABLE * table,Item_field * item)70 Field *create_tmp_field_from_field(THD *thd, Field *org_field,
71                                    const char *name, TABLE *table,
72                                    Item_field *item)
73 {
74   Field *new_field;
75 
76   new_field= org_field->new_field(thd->mem_root, table,
77                                   table == org_field->table);
78   if (new_field)
79   {
80     new_field->init(table);
81     new_field->orig_table= org_field->table;
82     if (item)
83       item->result_field= new_field;
84     else
85       new_field->field_name= name;
86     new_field->flags|= (org_field->flags & NO_DEFAULT_VALUE_FLAG);
87     if (org_field->maybe_null() || (item && item->maybe_null))
88       new_field->flags&= ~NOT_NULL_FLAG;	// Because of outer join
89     if (org_field->type() == FIELD_TYPE_DOUBLE)
90       ((Field_double *) new_field)->not_fixed= TRUE;
91     /*
92       This field will belong to an internal temporary table, it cannot be
93       generated.
94     */
95     new_field->gcol_info= NULL;
96     new_field->stored_in_db= true;
97   }
98   return new_field;
99 }
100 
101 /**
102   Create field for temporary table using type of given item.
103 
104   @param thd                   Thread handler
105   @param item                  Item to create a field for
106   @param table                 Temporary table
107   @param copy_func             If set and item is a function, store copy of
108                                item in this array
109   @param modify_item           1 if item->result_field should point to new
110                                item. This is relevent for how fill_record()
111                                is going to work:
112                                If modify_item is 1 then fill_record() will
113                                update the record in the original table.
114                                If modify_item is 0 then fill_record() will
115                                update the temporary table
116 
117   @retval
118     0  on error
119   @retval
120     new_created field
121 */
122 
create_tmp_field_from_item(THD * thd,Item * item,TABLE * table,Func_ptr_array * copy_func,bool modify_item)123 static Field *create_tmp_field_from_item(THD *thd, Item *item, TABLE *table,
124                                          Func_ptr_array *copy_func,
125                                          bool modify_item)
126 {
127   bool maybe_null= item->maybe_null;
128   Field *new_field= NULL;
129 
130   switch (item->result_type()) {
131   case REAL_RESULT:
132     new_field= new Field_double(item->max_length, maybe_null,
133                                 item->item_name.ptr(), item->decimals, TRUE);
134     break;
135   case INT_RESULT:
136     /*
137       Select an integer type with the minimal fit precision.
138       MY_INT32_NUM_DECIMAL_DIGITS is sign inclusive, don't consider the sign.
139       Values with MY_INT32_NUM_DECIMAL_DIGITS digits may or may not fit into
140       Field_long : make them Field_longlong.
141     */
142     if (item->max_length >= (MY_INT32_NUM_DECIMAL_DIGITS - 1))
143       new_field=new Field_longlong(item->max_length, maybe_null,
144                                    item->item_name.ptr(), item->unsigned_flag);
145     else
146       new_field=new Field_long(item->max_length, maybe_null,
147                                item->item_name.ptr(), item->unsigned_flag);
148     break;
149   case STRING_RESULT:
150     assert(item->collation.collation);
151 
152     /*
153       DATE/TIME, GEOMETRY and JSON fields have STRING_RESULT result type.
154       To preserve type they needed to be handled separately.
155     */
156     if (item->is_temporal() ||
157         item->field_type() == MYSQL_TYPE_GEOMETRY ||
158         item->field_type() == MYSQL_TYPE_JSON)
159     {
160       new_field= item->tmp_table_field_from_field_type(table, 1);
161     }
162     else
163     {
164       new_field= item->make_string_field(table);
165     }
166     new_field->set_derivation(item->collation.derivation);
167     break;
168   case DECIMAL_RESULT:
169     new_field= Field_new_decimal::create_from_item(item);
170     break;
171   case ROW_RESULT:
172   default:
173     // This case should never be choosen
174     assert(0);
175     new_field= 0;
176     break;
177   }
178   if (new_field)
179     new_field->init(table);
180 
181   /*
182     If the item is a function, a pointer to the item is stored in
183     copy_func. We separate fields from functions by checking if the
184     item is a result field item. The real_item() must be checked to
185     avoid falsely identifying Item_ref and its subclasses as functions
186     when they refer to field-like items, such as Item_copy and
187     subclasses. References to true fields have already been untangled
188     in the beginning of create_tmp_field().
189    */
190   if (copy_func && item->real_item()->is_result_field())
191     copy_func->push_back(item);
192   if (modify_item)
193     item->set_result_field(new_field);
194   if (item->type() == Item::NULL_ITEM)
195     new_field->is_created_from_null_item= TRUE;
196   return new_field;
197 }
198 
199 
200 /**
201   Create field for information schema table.
202 
203   @param thd		Thread handler
204   @param table		Temporary table
205   @param item		Item to create a field for
206 
207   @retval
208     0			on error
209   @retval
210     new_created field
211 */
212 
create_tmp_field_for_schema(THD * thd,Item * item,TABLE * table)213 static Field *create_tmp_field_for_schema(THD *thd, Item *item, TABLE *table)
214 {
215   if (item->field_type() == MYSQL_TYPE_VARCHAR)
216   {
217     Field *field;
218     if (item->max_length > MAX_FIELD_VARCHARLENGTH)
219       field= new Field_blob(item->max_length, item->maybe_null,
220                             item->item_name.ptr(),
221                             item->collation.collation, false);
222     else
223     {
224       field= new Field_varstring(item->max_length, item->maybe_null,
225                                  item->item_name.ptr(),
226                                  table->s, item->collation.collation);
227       table->s->db_create_options|= HA_OPTION_PACK_RECORD;
228     }
229     if (field)
230       field->init(table);
231     return field;
232   }
233   return item->tmp_table_field_from_field_type(table, 0);
234 }
235 
236 
237 /**
238   Create field for temporary table.
239 
240   @param thd		Thread handler
241   @param table		Temporary table
242   @param item		Item to create a field for
243   @param type		Type of item (normally item->type)
244   @param copy_func	If set and item is a function, store copy of item
245                        in this array
246   @param from_field    if field will be created using other field as example,
247                        pointer example field will be written here
248   @param default_field	If field has a default value field, store it here
249   @param group		1 if we are going to do a relative group by on result
250   @param modify_item	1 if item->result_field should point to new item.
251                        This is relevent for how fill_record() is going to
252                        work:
253                        If modify_item is 1 then fill_record() will update
254                        the record in the original table.
255                        If modify_item is 0 then fill_record() will update
256                        the temporary table
257 
258   @retval NULL On error.
259 
260   @retval new_created field
261 */
262 
create_tmp_field(THD * thd,TABLE * table,Item * item,Item::Type type,Func_ptr_array * copy_func,Field ** from_field,Field ** default_field,bool group,bool modify_item,bool table_cant_handle_bit_fields,bool make_copy_field)263 Field *create_tmp_field(THD *thd, TABLE *table,Item *item, Item::Type type,
264                         Func_ptr_array *copy_func, Field **from_field,
265                         Field **default_field,
266                         bool group, bool modify_item,
267                         bool table_cant_handle_bit_fields,
268                         bool make_copy_field)
269 {
270   Field *result= NULL;
271   Item::Type orig_type= type;
272   Item *orig_item= 0;
273 
274   if (type != Item::FIELD_ITEM &&
275       item->real_item()->type() == Item::FIELD_ITEM)
276   {
277     orig_item= item;
278     item= item->real_item();
279     type= Item::FIELD_ITEM;
280   }
281 
282   switch (type) {
283   case Item::SUM_FUNC_ITEM:
284   {
285     Item_sum *item_sum=(Item_sum*) item;
286     result= item_sum->create_tmp_field(group, table);
287     if (!result)
288       my_error(ER_OUT_OF_RESOURCES, MYF(ME_FATALERROR));
289     break;
290   }
291   case Item::FIELD_ITEM:
292   case Item::DEFAULT_VALUE_ITEM:
293   case Item::TRIGGER_FIELD_ITEM:
294   {
295     Item_field *field= (Item_field*) item;
296     bool orig_modify= modify_item;
297     if (orig_type == Item::REF_ITEM)
298       modify_item= 0;
299     /*
300       If item have to be able to store NULLs but underlaid field can't do it,
301       create_tmp_field_from_field() can't be used for tmp field creation.
302     */
303     if (field->maybe_null && !field->field->maybe_null())
304     {
305       result= create_tmp_field_from_item(thd, item, table, NULL,
306                                          modify_item);
307       if (!result)
308         break;
309       *from_field= field->field;
310       if (modify_item)
311         field->result_field= result;
312     }
313     else if (table_cant_handle_bit_fields && field->field->type() ==
314              MYSQL_TYPE_BIT)
315     {
316       *from_field= field->field;
317       result= create_tmp_field_from_item(thd, item, table, copy_func,
318                                          modify_item);
319       if (!result)
320         break;
321       if (modify_item)
322         field->result_field= result;
323     }
324     else
325     {
326       result= create_tmp_field_from_field(thd, (*from_field= field->field),
327                                           orig_item ? orig_item->item_name.ptr() :
328                                           item->item_name.ptr(),
329                                           table,
330                                           modify_item ? field :
331                                           NULL);
332       if (!result)
333         break;
334     }
335     if (orig_type == Item::REF_ITEM && orig_modify)
336       ((Item_ref*)orig_item)->set_result_field(result);
337     /*
338       Fields that are used as arguments to the DEFAULT() function already have
339       their data pointers set to the default value during name resulotion. See
340       Item_default_value::fix_fields.
341     */
342     if (orig_type != Item::DEFAULT_VALUE_ITEM && field->field->eq_def(result))
343       *default_field= field->field;
344     break;
345   }
346   /* Fall through */
347   case Item::FUNC_ITEM:
348     if (((Item_func *) item)->functype() == Item_func::FUNC_SP)
349     {
350       Item_func_sp *item_func_sp= (Item_func_sp *) item;
351       Field *sp_result_field= item_func_sp->get_sp_result_field();
352 
353       if (make_copy_field)
354       {
355         assert(item_func_sp->result_field);
356         *from_field= item_func_sp->result_field;
357       }
358       else
359       {
360         copy_func->push_back(item);
361       }
362 
363       result= create_tmp_field_from_field(thd,
364                                           sp_result_field,
365                                           item_func_sp->item_name.ptr(),
366                                           table,
367                                           NULL);
368       if (!result)
369         break;
370       if (modify_item)
371         item->set_result_field(result);
372       break;
373     }
374 
375     /* Fall through */
376   case Item::COND_ITEM:
377   case Item::FIELD_AVG_ITEM:
378   case Item::FIELD_STD_ITEM:
379   case Item::FIELD_VARIANCE_ITEM:
380   case Item::SUBSELECT_ITEM:
381     /* The following can only happen with 'CREATE TABLE ... SELECT' */
382   case Item::PROC_ITEM:
383   case Item::INT_ITEM:
384   case Item::REAL_ITEM:
385   case Item::DECIMAL_ITEM:
386   case Item::STRING_ITEM:
387   case Item::REF_ITEM:
388   case Item::NULL_ITEM:
389   case Item::VARBIN_ITEM:
390   case Item::PARAM_ITEM:
391     if (make_copy_field)
392     {
393       assert(((Item_result_field*)item)->result_field);
394       *from_field= ((Item_result_field*)item)->result_field;
395     }
396     result= create_tmp_field_from_item(thd, item, table,
397                                        (make_copy_field ? NULL : copy_func),
398                                        modify_item);
399     break;
400   case Item::TYPE_HOLDER:
401     result= ((Item_type_holder *)item)->make_field_by_type(table,
402                                                            thd->is_strict_mode());
403     if (!result)
404       break;
405     result->set_derivation(item->collation.derivation);
406     break;
407   default:					// Dosen't have to be stored
408     assert(false);
409     break;
410   }
411 
412   /* Make sure temporary fields are never compressed */
413   if (result->column_format() == COLUMN_FORMAT_TYPE_COMPRESSED)
414     result->flags&= ~FIELD_FLAGS_COLUMN_FORMAT_MASK;
415   result->zip_dict_name= null_lex_cstr;
416   result->zip_dict_data= null_lex_cstr;
417 
418   return result;
419 }
420 
421 /*
422   Set up column usage bitmaps for a temporary table
423 
424   IMPLEMENTATION
425     For temporary tables, we need one bitmap with all columns set and
426     a tmp_set bitmap to be used by things like filesort.
427 */
428 
setup_tmp_table_column_bitmaps(TABLE * table,uchar * bitmaps)429 static void setup_tmp_table_column_bitmaps(TABLE *table, uchar *bitmaps)
430 {
431   uint field_count= table->s->fields;
432   bitmap_init(&table->def_read_set, (my_bitmap_map*) bitmaps, field_count,
433               FALSE);
434   bitmap_init(&table->tmp_set,
435               (my_bitmap_map*) (bitmaps + bitmap_buffer_size(field_count)),
436               field_count, FALSE);
437   bitmap_init(&table->cond_set,
438               (my_bitmap_map*) (bitmaps + bitmap_buffer_size(field_count) * 2),
439               field_count, FALSE);
440   /* write_set and all_set are copies of read_set */
441   table->def_write_set= table->def_read_set;
442   table->s->all_set= table->def_read_set;
443   bitmap_set_all(&table->s->all_set);
444   table->default_column_bitmaps();
445   table->s->column_bitmap_size= bitmap_buffer_size(field_count);
446 }
447 
448 /**
449   Cache for the storage engine properties for the alternative temporary table
450   storage engines. This cache is initialized during startup of the server by
451   asking the storage engines for the values properties.
452 */
453 
454 class Cache_temp_engine_properties
455 {
456 public:
457   static uint HEAP_MAX_KEY_LENGTH;
458   static uint MYISAM_MAX_KEY_LENGTH;
459   static uint INNODB_MAX_KEY_LENGTH;
460   static uint HEAP_MAX_KEY_PART_LENGTH;
461   static uint MYISAM_MAX_KEY_PART_LENGTH;
462   static uint INNODB_MAX_KEY_PART_LENGTH;
463   static uint HEAP_MAX_KEY_PARTS;
464   static uint MYISAM_MAX_KEY_PARTS;
465   static uint INNODB_MAX_KEY_PARTS;
466 
467   static void init(THD *thd);
468 };
469 
init(THD * thd)470 void Cache_temp_engine_properties::init(THD *thd)
471 {
472   handler *handler;
473   plugin_ref db_plugin;
474 
475   // Cache HEAP engine's
476   db_plugin= ha_lock_engine(0, heap_hton);
477   handler= get_new_handler((TABLE_SHARE *)0, thd->mem_root, heap_hton);
478   HEAP_MAX_KEY_LENGTH= handler->max_key_length();
479   HEAP_MAX_KEY_PART_LENGTH= handler->max_key_part_length(0);
480   HEAP_MAX_KEY_PARTS= handler->max_key_parts();
481   delete handler;
482   plugin_unlock(0, db_plugin);
483   // Cache MYISAM engine's
484   db_plugin= ha_lock_engine(0, myisam_hton);
485   handler= get_new_handler((TABLE_SHARE *)0, thd->mem_root, myisam_hton);
486   MYISAM_MAX_KEY_LENGTH= handler->max_key_length();
487   MYISAM_MAX_KEY_PART_LENGTH= handler->max_key_part_length(0);
488   MYISAM_MAX_KEY_PARTS= handler->max_key_parts();
489   delete handler;
490   plugin_unlock(0, db_plugin);
491   // Cache INNODB engine's
492   db_plugin= ha_lock_engine(0, innodb_hton);
493   handler= get_new_handler((TABLE_SHARE *)0, thd->mem_root, innodb_hton);
494   INNODB_MAX_KEY_LENGTH= handler->max_key_length();
495   /*
496     For ha_innobase::max_supported_key_part_length(), the returned value
497     relies on innodb_large_prefix. However, in innodb itself, the limitation
498     on key_part length is up to the ROW_FORMAT. In current trunk, internal
499     temp table's ROW_FORMAT is DYNAMIC. In order to keep the consistence
500     between server and innodb, here we hard-coded 3072 as the maximum of
501     key_part length supported by innodb until bug#20629014 is fixed.
502 
503     TODO: Remove the hard-code here after bug#20629014 is fixed.
504   */
505   INNODB_MAX_KEY_PART_LENGTH= 3072;
506   INNODB_MAX_KEY_PARTS= handler->max_key_parts();
507   delete handler;
508   plugin_unlock(0, db_plugin);
509 }
510 
511 uint Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH= 0;
512 uint Cache_temp_engine_properties::MYISAM_MAX_KEY_LENGTH= 0;
513 uint Cache_temp_engine_properties::INNODB_MAX_KEY_LENGTH= 0;
514 uint Cache_temp_engine_properties::HEAP_MAX_KEY_PART_LENGTH= 0;
515 uint Cache_temp_engine_properties::MYISAM_MAX_KEY_PART_LENGTH= 0;
516 uint Cache_temp_engine_properties::INNODB_MAX_KEY_PART_LENGTH= 0;
517 uint Cache_temp_engine_properties::HEAP_MAX_KEY_PARTS= 0;
518 uint Cache_temp_engine_properties::MYISAM_MAX_KEY_PARTS= 0;
519 uint Cache_temp_engine_properties::INNODB_MAX_KEY_PARTS= 0;
520 
init_cache_tmp_engine_properties()521 void init_cache_tmp_engine_properties()
522 {
523   assert(!current_thd);
524   THD *thd= new THD();
525   thd->thread_stack= pointer_cast<char *>(&thd);
526   thd->store_globals();
527   Cache_temp_engine_properties::init(thd);
528   delete thd;
529 }
530 
531 /**
532   Get the minimum of max_key_length and max_key_part_length.
533   The minimum is between HEAP engine and internal_tmp_disk_storage_engine.
534 
535   @param[out] max_key_length Minimum of max_key_length
536   @param[out] max_key_part_length Minimum of max_key_part_length
537 */
538 
get_max_key_and_part_length(uint * max_key_length,uint * max_key_part_length)539 void get_max_key_and_part_length(uint *max_key_length,
540                                  uint *max_key_part_length)
541 {
542   // Make sure these cached properties are initialized.
543   assert(Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH);
544 
545   switch (internal_tmp_disk_storage_engine)
546   {
547   case TMP_TABLE_MYISAM:
548     *max_key_length=
549       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH,
550                Cache_temp_engine_properties::MYISAM_MAX_KEY_LENGTH);
551     *max_key_part_length=
552       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_PART_LENGTH,
553                Cache_temp_engine_properties::MYISAM_MAX_KEY_PART_LENGTH);
554     /*
555       create_tmp_table() tests tmp_se->max_key_parts() too, not only HEAP's.
556       It is correct as long as HEAP'S not bigger than on-disk temp table
557       engine's, which we check here.
558     */
559     assert(Cache_temp_engine_properties::HEAP_MAX_KEY_PARTS <=
560            Cache_temp_engine_properties::MYISAM_MAX_KEY_PARTS);
561     break;
562   case TMP_TABLE_INNODB:
563   default:
564     *max_key_length=
565       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH,
566                Cache_temp_engine_properties::INNODB_MAX_KEY_LENGTH);
567     *max_key_part_length=
568       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_PART_LENGTH,
569                Cache_temp_engine_properties::INNODB_MAX_KEY_PART_LENGTH);
570     assert(Cache_temp_engine_properties::HEAP_MAX_KEY_PARTS <=
571            Cache_temp_engine_properties::INNODB_MAX_KEY_PARTS);
572     break;
573   }
574 }
575 
576 /**
577   Create a temporary name for one field if the field_name is empty.
578 
579   @param thd          Thread handle
580   @param field_index  Index of this field in table->field
581 */
582 
create_tmp_table_field_tmp_name(THD * thd,int field_index)583 static const char *create_tmp_table_field_tmp_name(THD *thd, int field_index)
584 {
585   char buf[64];
586   my_snprintf(buf, 64, "tmp_field_%d", field_index);
587   return thd->mem_strdup(buf);
588 }
589 
590 /**
591   Helper function for create_tmp_table().
592 
593   Insert a field at the head of the hidden field area.
594 
595   @param table            Temporary table
596   @param default_field    Default value array pointer
597   @param from_field       Original field array pointer
598   @param blob_field       Array pointer to record fields index of blob type
599   @param field            The registed hidden field
600  */
601 
register_hidden_field(TABLE * table,Field ** default_field,Field ** from_field,uint * blob_field,Field * field)602 static void register_hidden_field(TABLE *table,
603                               Field **default_field,
604                               Field **from_field,
605                               uint *blob_field,
606                               Field *field)
607 {
608   uint i;
609   Field **tmp_field= table->field;
610 
611   /* Increase all of registed fields index */
612   for (i= 0; i < table->s->fields; i++)
613     tmp_field[i]->field_index++;
614 
615   // Increase the field_index of visible blob field
616   for (i= 0; i < table->s->blob_fields; i++)
617     blob_field[i]++;
618   // Insert field
619   table->field[-1]= field;
620   default_field[-1]= NULL;
621   from_field[-1]= NULL;
622   field->table= field->orig_table= table;
623   field->field_index= 0;
624 }
625 
626 /**
627   Create a temp table according to a field list.
628 
629   Given field pointers are changed to point at tmp_table for
630   send_result_set_metadata. The table object is self contained: it's
631   allocated in its own memory root, as well as Field objects
632   created for table columns.
633   This function will replace Item_sum items in 'fields' list with
634   corresponding Item_field items, pointing at the fields in the
635   temporary table, unless this was prohibited by TRUE
636   value of argument save_sum_fields. The Item_field objects
637   are created in THD memory root.
638 
639   @param thd                  thread handle
640   @param param                a description used as input to create the table
641   @param fields               list of items that will be used to define
642                               column types of the table (also see NOTES)
643   @param group                Group key to use for temporary table, NULL if none
644   @param distinct             should table rows be distinct
645   @param save_sum_fields      see NOTES
646   @param select_options
647   @param rows_limit
648   @param table_alias          possible name of the temporary table that can
649                               be used for name resolving; can be "".
650 
651   @remark mysql_create_view() checks that views have less than
652           MAX_FIELDS columns. This prevents any MyISAM temp table
653           made when materializing the view from hitting the 64k
654           MyISAM header size limit.
655 
656   @remark We may actually end up with a table without any columns at all.
657           See comment below: We don't have to store this.
658 */
659 
660 #define STRING_TOTAL_LENGTH_TO_PACK_ROWS 128
661 #define AVG_STRING_LENGTH_TO_PACK_ROWS   64
662 #define RATIO_TO_PACK_ROWS	       2
663 #define MIN_STRING_LENGTH_TO_PACK_ROWS   10
664 
665 TABLE *
create_tmp_table(THD * thd,Temp_table_param * param,List<Item> & fields,ORDER * group,bool distinct,bool save_sum_fields,ulonglong select_options,ha_rows rows_limit,const char * table_alias)666 create_tmp_table(THD *thd, Temp_table_param *param, List<Item> &fields,
667 		 ORDER *group, bool distinct, bool save_sum_fields,
668 		 ulonglong select_options, ha_rows rows_limit,
669 		 const char *table_alias)
670 {
671   MEM_ROOT *mem_root_save, own_root;
672   TABLE *table;
673   TABLE_SHARE *share;
674   uint	i,field_count,null_count,null_pack_length;
675   uint  copy_func_count= param->func_count;
676   uint  hidden_null_count, hidden_null_pack_length;
677   long hidden_field_count;
678   uint  blob_count,group_null_items, string_count;
679   uint  temp_pool_slot=MY_BIT_NONE;
680   uint fieldnr= 0;
681   ulong reclength, string_total_length, distinct_key_length= 0;
682   /**
683     When true, enforces unique constraint (by adding a hidden hash_field and
684     creating a key over this field) when:
685     (1) unique key is too long or
686     (2) number of key parts in distinct key is too big.
687   */
688   bool  using_unique_constraint= false;
689   bool  use_packed_rows= false;
690   bool  not_all_columns= !(select_options & TMP_TABLE_ALL_COLUMNS);
691   char  *tmpname,path[FN_REFLEN];
692   uchar	*pos, *group_buff, *bitmaps;
693   uchar *null_flags;
694   Field **reg_field, **from_field, **default_field;
695   uint *blob_field;
696   Copy_field *copy=0;
697   KEY *keyinfo;
698   KEY_PART_INFO *key_part_info;
699   MI_COLUMNDEF *recinfo;
700   /*
701     total_uneven_bit_length is uneven bit length for visible fields
702     hidden_uneven_bit_length is uneven bit length for hidden fields
703   */
704   uint total_uneven_bit_length= 0, hidden_uneven_bit_length= 0;
705   bool force_copy_fields= param->force_copy_fields;
706 
707   uint max_key_length;
708   uint max_key_part_length;
709   /* Treat sum functions as normal ones when loose index scan is used. */
710   save_sum_fields|= param->precomputed_group_by;
711   DBUG_ENTER("create_tmp_table");
712   DBUG_PRINT("enter",
713              ("distinct: %d  save_sum_fields: %d  rows_limit: %lu  group: %d",
714               (int) distinct, (int) save_sum_fields,
715               (ulong) rows_limit, MY_TEST(group)));
716 
717   thd->inc_status_created_tmp_tables();
718 
719   if (use_temp_pool && !(test_flags & TEST_KEEP_TMP_TABLES))
720     temp_pool_slot = bitmap_lock_set_next(&temp_pool);
721 
722   if (temp_pool_slot != MY_BIT_NONE) // we got a slot
723     sprintf(path, "%s_%lx_%i", tmp_file_prefix,
724             current_pid, temp_pool_slot);
725   else
726   {
727     /* if we run out of slots or we are not using tempool */
728     assert(sizeof(my_thread_id) == 4);
729     sprintf(path,"%s%lx_%x_%x", tmp_file_prefix, current_pid,
730             thd->thread_id(), thd->tmp_table++);
731   }
732 
733   /*
734     No need to change table name to lower case as we are only creating
735     MyISAM or HEAP tables here
736   */
737   fn_format(path, path, mysql_tmpdir, "", MY_REPLACE_EXT|MY_UNPACK_FILENAME);
738 
739 
740   if (group)
741   {
742     if (!param->quick_group)
743       group=0;					// Can't use group key
744     else for (ORDER *tmp=group ; tmp ; tmp=tmp->next)
745     {
746       /*
747         marker == 4 means two things:
748         - store NULLs in the key, and
749         - convert BIT fields to 64-bit long, needed because MEMORY tables
750           can't index BIT fields.
751       */
752       (*tmp->item)->marker= 4;
753       const uint char_len=
754         (*tmp->item)->max_length / (*tmp->item)->collation.collation->mbmaxlen;
755       /*
756         Use hash key as the unique constraint if the group-by key is
757         big or if it is non-deterministic. Group-by items get evaluated
758         twice and a non-deterministic function would cause a discrepancy.
759       */
760       if (char_len > CONVERT_IF_BIGGER_TO_BLOB ||
761           (*tmp->item)->is_non_deterministic()) {
762         using_unique_constraint= true;
763       }
764     }
765     if (group)
766     {
767       if (param->group_length >= MAX_BLOB_WIDTH)
768         using_unique_constraint= true;
769       distinct=0;				// Can't use distinct
770     }
771   }
772 
773   field_count=param->field_count+param->func_count+param->sum_func_count;
774   hidden_field_count=param->hidden_field_count;
775 
776   /*
777     When loose index scan is employed as access method, it already
778     computes all groups and the result of all aggregate functions. We
779     make space for the items of the aggregate function in the list of
780     functions Temp_table_param::items_to_copy, so that the values of
781     these items are stored in the temporary table.
782   */
783   if (param->precomputed_group_by)
784     copy_func_count+= param->sum_func_count;
785 
786   init_sql_alloc(key_memory_TABLE, &own_root, TABLE_ALLOC_BLOCK_SIZE, 0);
787 
788   void *rawmem= alloc_root(&own_root, sizeof(Func_ptr_array));
789   if (!rawmem)
790     DBUG_RETURN(NULL);                        /* purecov: inspected */
791   Func_ptr_array *copy_func= new (rawmem) Func_ptr_array(&own_root);
792   copy_func->reserve(copy_func_count);
793 
794   if (!multi_alloc_root(&own_root,
795                         &table, sizeof(*table),
796                         &share, sizeof(*share),
797                         &reg_field, sizeof(Field*) * (field_count + 2),
798                         &default_field, sizeof(Field*) * (field_count + 1),
799                         &blob_field, sizeof(uint)*(field_count+2),
800                         &from_field, sizeof(Field*)*(field_count + 1),
801                         &param->keyinfo, sizeof(*param->keyinfo),
802                         &key_part_info,
803                         sizeof(*key_part_info)*(param->group_parts+1),
804                         &param->start_recinfo,
805                         sizeof(*param->recinfo)*(field_count*2+4),
806                         &tmpname, strlen(path)+1,
807                         &group_buff, (group && !using_unique_constraint ?
808                                       param->group_length : 0),
809                         &bitmaps, bitmap_buffer_size(field_count + 1) * 3,
810                         NullS))
811   {
812     if (temp_pool_slot != MY_BIT_NONE)
813       bitmap_lock_clear_bit(&temp_pool, temp_pool_slot);
814     DBUG_RETURN(NULL);				/* purecov: inspected */
815   }
816   /* Copy_field belongs to Temp_table_param, allocate it in THD mem_root */
817   if (!(param->copy_field= copy= new (thd->mem_root) Copy_field[field_count]))
818   {
819     if (temp_pool_slot != MY_BIT_NONE)
820       bitmap_lock_clear_bit(&temp_pool, temp_pool_slot);
821     free_root(&own_root, MYF(0));               /* purecov: inspected */
822     DBUG_RETURN(NULL);				/* purecov: inspected */
823   }
824   param->items_to_copy= copy_func;
825   my_stpcpy(tmpname,path);
826   /* make table according to fields */
827 
828   new (table) TABLE;
829   memset(reg_field, 0, sizeof(Field*)*(field_count + 2));
830   memset(default_field, 0, sizeof(Field*) * (field_count + 1));
831   memset(from_field, 0, sizeof(Field*)*(field_count + 1));
832 
833   // This invokes (the synthesized) st_mem_root &operator=(const st_mem_root&)
834   table->mem_root= own_root;
835   mem_root_save= thd->mem_root;
836   thd->mem_root= &table->mem_root;
837   copy_func->set_mem_root(&table->mem_root);
838 
839   // Leave the first place to be prepared for hash_field
840   reg_field++;
841   default_field++;
842   from_field++;
843   table->field=reg_field;
844   table->alias= table_alias;
845   table->reginfo.lock_type=TL_WRITE;	/* Will be updated */
846   table->db_stat=HA_OPEN_KEYFILE+HA_OPEN_RNDFILE;
847   table->temp_pool_slot = temp_pool_slot;
848   table->copy_blobs= 1;
849   table->in_use= thd;
850   table->quick_keys.init();
851   table->possible_quick_keys.init();
852   table->covering_keys.init();
853   table->merge_keys.init();
854   table->keys_in_use_for_query.init();
855   table->keys_in_use_for_group_by.init();
856   table->keys_in_use_for_order_by.init();
857 
858   table->s= share;
859   init_tmp_table_share(thd, share, "", 0, tmpname, tmpname);
860   share->blob_field= blob_field;
861   share->db_low_byte_first=1;                // True for HEAP and MyISAM
862   share->table_charset= param->table_charset;
863   share->primary_key= MAX_KEY;               // Indicate no primary key
864   share->keys_for_keyread.init();
865   share->keys_in_use.init();
866   share->keys= 0;
867   if (param->schema_table)
868     share->db= INFORMATION_SCHEMA_NAME;
869 
870   /* Calculate which type of fields we will store in the temporary table */
871 
872   reclength= string_total_length= 0;
873   blob_count= string_count= null_count= hidden_null_count= group_null_items= 0;
874   param->using_outer_summary_function= 0;
875 
876   List_iterator_fast<Item> li(fields);
877   Item *item;
878   Field **tmp_from_field=from_field;
879   while ((item=li++))
880   {
881     Field *new_field= NULL;
882     Item::Type type= item->type();
883     if (type == Item::COPY_STR_ITEM)
884     {
885       item= ((Item_copy *)item)->get_item();
886       type= item->type();
887     }
888     if (not_all_columns)
889     {
890       if (item->with_sum_func && type != Item::SUM_FUNC_ITEM)
891       {
892         if (item->used_tables() & OUTER_REF_TABLE_BIT)
893           item->update_used_tables();
894         if (type == Item::SUBSELECT_ITEM ||
895             (item->used_tables() & ~OUTER_REF_TABLE_BIT))
896         {
897 	  /*
898 	    Mark that we have ignored an item that refers to a summary
899 	    function. We need to know this if someone is going to use
900 	    DISTINCT on the result.
901 	  */
902 	  param->using_outer_summary_function= 1;
903           goto update_hidden;
904         }
905       }
906 
907       if (item->const_item() && (int)hidden_field_count <= 0)
908         continue; // We don't have to store this
909     }
910     if (type == Item::SUM_FUNC_ITEM && !group && !save_sum_fields)
911     {						/* Can't calc group yet */
912       Item_sum *sum_item= (Item_sum *) item;
913       for (i=0 ; i < sum_item->get_arg_count() ; i++)
914       {
915         Item *arg= sum_item->get_arg(i);
916         if (!arg->const_item())
917         {
918           new_field=
919             create_tmp_field(thd, table, arg, arg->type(), copy_func,
920                              tmp_from_field, &default_field[fieldnr],
921                              group != 0, not_all_columns,
922                              distinct, false);
923           if (!new_field)
924             goto err;					// Should be OOM
925           tmp_from_field++;
926           reclength+= new_field->pack_length();
927           if (new_field->flags & BLOB_FLAG)
928           {
929             *blob_field++= fieldnr;
930             blob_count++;
931           }
932           if (new_field->type() == MYSQL_TYPE_BIT)
933             total_uneven_bit_length+= new_field->field_length & 7;
934           *(reg_field++)= new_field;
935           if (new_field->real_type() == MYSQL_TYPE_STRING ||
936               new_field->real_type() == MYSQL_TYPE_VARCHAR)
937           {
938             string_count++;
939             string_total_length+= new_field->pack_length();
940           }
941           thd->mem_root= mem_root_save;
942           arg= sum_item->set_arg(i, thd, new Item_field(new_field));
943           thd->mem_root= &table->mem_root;
944           if (!(new_field->flags & NOT_NULL_FLAG))
945           {
946             null_count++;
947             /*
948               new_field->maybe_null() is still false, it will be
949               changed below. But we have to setup Item_field correctly
950             */
951             arg->maybe_null=1;
952           }
953           new_field->field_index= fieldnr++;
954           /* InnoDB temp table doesn't allow field with empty_name */
955           if (!new_field->field_name)
956             new_field->field_name= create_tmp_table_field_tmp_name(thd,
957                                                         new_field->field_index);
958         }
959       }
960     }
961     else
962     {
963       /*
964         The last parameter to create_tmp_field() is a bit tricky:
965 
966         We need to set it to 0 in union, to get fill_record() to modify the
967         temporary table.
968         We need to set it to 1 on multi-table-update and in select to
969         write rows to the temporary table.
970         We here distinguish between UNION and multi-table-updates by the fact
971         that in the later case group is set to the row pointer.
972       */
973       new_field= (param->schema_table) ?
974         create_tmp_field_for_schema(thd, item, table) :
975         create_tmp_field(thd, table, item, type, copy_func,
976                          tmp_from_field, &default_field[fieldnr],
977                          group != 0,
978                          !force_copy_fields &&
979                            (not_all_columns || group !=0),
980                          /*
981                            If item->marker == 4 then we force create_tmp_field
982                            to create a 64-bit longs for BIT fields because HEAP
983                            tables can't index BIT fields directly. We do the same
984                            for distinct, as we want the distinct index to be
985                            usable in this case too.
986                          */
987                          item->marker == 4 || param->bit_fields_as_long,
988                          force_copy_fields);
989 
990       if (!new_field)
991       {
992         assert(thd->is_fatal_error);
993         goto err;				// Got OOM
994       }
995       if (type == Item::SUM_FUNC_ITEM)
996 	((Item_sum *) item)->result_field= new_field;
997       tmp_from_field++;
998       reclength+=new_field->pack_length();
999       if (!(new_field->flags & NOT_NULL_FLAG))
1000 	null_count++;
1001       if (new_field->type() == MYSQL_TYPE_BIT)
1002         total_uneven_bit_length+= new_field->field_length & 7;
1003       if (new_field->flags & BLOB_FLAG)
1004       {
1005         *blob_field++= fieldnr;
1006 	blob_count++;
1007       }
1008 
1009       if (new_field->real_type() == MYSQL_TYPE_STRING ||
1010           new_field->real_type() == MYSQL_TYPE_VARCHAR)
1011       {
1012         string_count++;
1013         string_total_length+= new_field->pack_length();
1014       }
1015       // In order to reduce footprint ask SE to pack variable-length fields.
1016       if (new_field->type() == MYSQL_TYPE_VAR_STRING ||
1017           new_field->type() == MYSQL_TYPE_VARCHAR)
1018         table->s->db_create_options|= HA_OPTION_PACK_RECORD;
1019 
1020       if (item->marker == 4 && item->maybe_null)
1021       {
1022 	group_null_items++;
1023 	new_field->flags|= GROUP_FLAG;
1024       }
1025       new_field->field_index= fieldnr++;
1026       *(reg_field++)= new_field;
1027       /* InnoDB temp table doesn't allow field with empty_name */
1028       if (!new_field->field_name)
1029         new_field->field_name= create_tmp_table_field_tmp_name(thd, new_field->field_index);
1030     }
1031 
1032 update_hidden:
1033     /*
1034       Calculate length of distinct key. The goal is to decide what to use -
1035       key or unique constraint. As blobs force unique constraint on their
1036       own due to their length, they aren't taken into account.
1037     */
1038     if (distinct && !using_unique_constraint && hidden_field_count <= 0 &&
1039         new_field)
1040     {
1041       if (new_field->flags & BLOB_FLAG)
1042         using_unique_constraint= true;
1043       else
1044         distinct_key_length+= new_field->pack_length();
1045     }
1046     if (!--hidden_field_count)
1047     {
1048       /*
1049         This was the last hidden field; Remember how many hidden fields could
1050         have null
1051       */
1052       hidden_null_count=null_count;
1053       /*
1054 	We need to update hidden_field_count as we may have stored group
1055 	functions with constant arguments
1056       */
1057       param->hidden_field_count= fieldnr;
1058       null_count= 0;
1059       /*
1060         On last hidden field we store uneven bit length in
1061         hidden_uneven_bit_length and proceed calculation of
1062         uneven bits for visible fields into
1063         total_uneven_bit_length variable.
1064       */
1065       hidden_uneven_bit_length= total_uneven_bit_length;
1066       total_uneven_bit_length= 0;
1067     }
1068   }
1069   assert(fieldnr == (uint) (reg_field - table->field));
1070   assert(field_count >= (uint) (reg_field - table->field));
1071   field_count= fieldnr;
1072   *reg_field= 0;
1073   *blob_field= 0;				// End marker
1074   share->fields= field_count;
1075   share->blob_fields= blob_count;
1076 
1077   /* If result table is small; use a heap */
1078   if (select_options & TMP_TABLE_FORCE_MYISAM)
1079   {
1080     share->db_plugin= ha_lock_engine(0, myisam_hton);
1081     table->file= get_new_handler(share, &table->mem_root,
1082                                  share->db_type());
1083   }
1084   else if (blob_count ||
1085            (thd->variables.big_tables &&
1086             !(select_options & SELECT_SMALL_RESULT)))
1087   {
1088     /*
1089      * Except for special conditions, tmp table engine will be choosen by user.
1090      */
1091     switch (internal_tmp_disk_storage_engine)
1092     {
1093     case TMP_TABLE_MYISAM:
1094       share->db_plugin= ha_lock_engine(0, myisam_hton);
1095       break;
1096     case TMP_TABLE_INNODB:
1097       share->db_plugin= ha_lock_engine(0, innodb_hton);
1098       break;
1099     default:
1100       assert(0);
1101       share->db_plugin= ha_lock_engine(0, innodb_hton);
1102     }
1103 
1104     table->file= get_new_handler(share, &table->mem_root,
1105                                  share->db_type());
1106   }
1107   else
1108   {
1109     share->db_plugin= ha_lock_engine(0, heap_hton);
1110     table->file= get_new_handler(share, &table->mem_root,
1111                                  share->db_type());
1112   }
1113 
1114   /*
1115     Different temp table engine supports different max_key_length
1116     and max_key_part_lengthi. If HEAP engine is selected, it can be
1117     possible to convert into on-disk engine later. We must choose
1118     the minimal of max_key_length and max_key_part_length between
1119     HEAP engine and possible on-disk engine to verify whether unique
1120     constraint is needed so that the convertion goes well.
1121    */
1122   get_max_key_and_part_length(&max_key_length,
1123                               &max_key_part_length);
1124 
1125   if (!table->file)
1126     goto err;
1127   if (group &&
1128       (param->group_parts > table->file->max_key_parts() ||
1129        param->group_length > max_key_length))
1130     using_unique_constraint= true;
1131   keyinfo= param->keyinfo;
1132   keyinfo->table= table;
1133 
1134   if (group)
1135   {
1136     DBUG_PRINT("info",("Creating group key in temporary table"));
1137     table->group= group;				/* Table is grouped by key */
1138     param->group_buff= group_buff;
1139     share->keys= 1;
1140     // Use key definition created below only if the key isn't too long.
1141     // Otherwise a dedicated key over a hash value will be created and this
1142     // definition will be used by server to calc hash.
1143     if (!using_unique_constraint)
1144     {
1145       table->key_info= share->key_info= keyinfo;
1146       keyinfo->key_part= key_part_info;
1147       keyinfo->flags= HA_NOSAME;
1148       keyinfo->usable_key_parts= keyinfo->user_defined_key_parts=
1149         param->group_parts;
1150       keyinfo->actual_key_parts= keyinfo->user_defined_key_parts;
1151       keyinfo->rec_per_key= 0;
1152       keyinfo->algorithm= HA_KEY_ALG_UNDEF;
1153       keyinfo->set_rec_per_key_array(NULL, NULL);
1154       keyinfo->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1155       keyinfo->name= (char*) "<group_key>";
1156       ORDER *cur_group= group;
1157       for (; cur_group ; cur_group= cur_group->next, key_part_info++)
1158       {
1159         Field *field= (*cur_group->item)->get_tmp_table_field();
1160         assert(field->table == table);
1161         key_part_info->init_from_field(field);
1162 
1163         /* In GROUP BY 'a' and 'a ' are equal for VARCHAR fields */
1164         key_part_info->key_part_flag|= HA_END_SPACE_ARE_EQUAL;
1165 
1166         if (key_part_info->store_length > max_key_part_length)
1167         {
1168           using_unique_constraint= true;
1169           break;
1170         }
1171       }
1172       keyinfo->actual_flags= keyinfo->flags;
1173     }
1174   }
1175 
1176   if (distinct && field_count != param->hidden_field_count)
1177   {
1178     /*
1179       Create an unique key or an unique constraint over all columns
1180       that should be in the result.  In the temporary table, there are
1181       'param->hidden_field_count' extra columns, whose null bits are stored
1182       in the first 'hidden_null_pack_length' bytes of the row.
1183     */
1184     DBUG_PRINT("info",("hidden_field_count: %d", param->hidden_field_count));
1185     share->keys= 1;
1186     table->distinct= 1;
1187     if (!using_unique_constraint)
1188     {
1189       Field **reg_field;
1190       keyinfo->user_defined_key_parts= field_count-param->hidden_field_count;
1191       keyinfo->actual_key_parts= keyinfo->user_defined_key_parts;
1192       if (!(key_part_info= (KEY_PART_INFO*)
1193             alloc_root(&table->mem_root,
1194                        keyinfo->user_defined_key_parts * sizeof(KEY_PART_INFO))))
1195         goto err;
1196       memset(key_part_info, 0, keyinfo->user_defined_key_parts *
1197              sizeof(KEY_PART_INFO));
1198       table->key_info= share->key_info= keyinfo;
1199       keyinfo->key_part= key_part_info;
1200       keyinfo->actual_flags= keyinfo->flags= HA_NOSAME | HA_NULL_ARE_EQUAL;
1201       // TODO rename to <distinct_key>
1202       keyinfo->name= (char*) "<auto_key>";
1203       keyinfo->algorithm= HA_KEY_ALG_UNDEF;
1204       keyinfo->set_rec_per_key_array(NULL, NULL);
1205       keyinfo->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1206       /* Create a distinct key over the columns we are going to return */
1207       for (i=param->hidden_field_count, reg_field=table->field + i ;
1208            i < field_count;
1209            i++, reg_field++, key_part_info++)
1210       {
1211         key_part_info->init_from_field(*reg_field);
1212         if (key_part_info->store_length > max_key_part_length)
1213         {
1214           using_unique_constraint= true;
1215           break;
1216         }
1217       }
1218     }
1219   }
1220 
1221   /*
1222     To enforce unique constraint we need to add a field to hold key's hash
1223     1) already detected unique constraint
1224     2) distinct key is too long
1225     3) number of keyparts in distinct key is too big
1226   */
1227   if (using_unique_constraint ||                              // 1
1228       distinct_key_length > max_key_length ||                 // 2
1229       (distinct &&                                            // 3
1230        (fieldnr - param->hidden_field_count) > table->file->max_key_parts()))
1231   {
1232     using_unique_constraint= true;
1233     Field_longlong *field= new(&table->mem_root)
1234           Field_longlong(sizeof(ulonglong), false, "<hash_field>", true);
1235     if (!field)
1236     {
1237       assert(thd->is_fatal_error);
1238       goto err;   // Got OOM
1239     }
1240 
1241     // Mark hash_field as NOT NULL
1242     field->flags &= NOT_NULL_FLAG;
1243     // Register hash_field as a hidden field.
1244     register_hidden_field(table, default_field,
1245                           from_field, share->blob_field, field);
1246     // Repoint arrays
1247     table->field--;
1248     default_field--;
1249     from_field--;
1250     reclength+= field->pack_length();
1251     field_count= ++fieldnr;
1252     param->hidden_field_count++;
1253     share->fields= field_count;
1254     table->hash_field= field;
1255   }
1256 
1257   // Update the handler with information about the table object
1258   table->file->change_table_ptr(table, share);
1259   table->hidden_field_count= param->hidden_field_count;
1260 
1261   if (table->file->set_ha_share_ref(&share->ha_share))
1262   {
1263     delete table->file;
1264     goto err;
1265   }
1266 
1267   // Initialize cost model for this table
1268   table->init_cost_model(thd->cost_model());
1269 
1270   if (!using_unique_constraint)
1271     reclength+= group_null_items;	// null flag is stored separately
1272 
1273   if (blob_count == 0)
1274   {
1275     /* We need to ensure that first byte is not 0 for the delete link */
1276     if (param->hidden_field_count)
1277       hidden_null_count++;
1278     else
1279       null_count++;
1280   }
1281   hidden_null_pack_length= (hidden_null_count + 7 +
1282                             hidden_uneven_bit_length) / 8;
1283   null_pack_length= (hidden_null_pack_length +
1284                      (null_count + total_uneven_bit_length + 7) / 8);
1285   reclength+=null_pack_length;
1286   if (!reclength)
1287     reclength=1;				// Dummy select
1288   /* Use packed rows if there is blobs or a lot of space to gain */
1289   if (blob_count ||
1290       (string_total_length >= STRING_TOTAL_LENGTH_TO_PACK_ROWS &&
1291       (reclength / string_total_length <= RATIO_TO_PACK_ROWS ||
1292        string_total_length / string_count >= AVG_STRING_LENGTH_TO_PACK_ROWS)))
1293     use_packed_rows= true;
1294 
1295   if (!use_packed_rows)
1296     share->db_create_options&= ~HA_OPTION_PACK_RECORD;
1297 
1298   share->reclength= reclength;
1299   {
1300     uint alloc_length=ALIGN_SIZE(reclength+MI_UNIQUE_HASH_LENGTH+1);
1301     share->rec_buff_length= alloc_length;
1302     if (!(table->record[0]= (uchar*)
1303           alloc_root(&table->mem_root, alloc_length*3)))
1304       goto err;
1305     table->record[1]= table->record[0]+alloc_length;
1306     share->default_values= table->record[1]+alloc_length;
1307   }
1308   param->func_count= copy_func->size();
1309   assert(param->func_count <= copy_func_count); // Used <= allocated
1310 
1311   setup_tmp_table_column_bitmaps(table, bitmaps);
1312 
1313   recinfo=param->start_recinfo;
1314   null_flags= table->record[0];
1315   pos= table->record[0] + null_pack_length;
1316   if (null_pack_length)
1317   {
1318     memset(recinfo, 0, sizeof(*recinfo));
1319     recinfo->type=FIELD_NORMAL;
1320     recinfo->length=null_pack_length;
1321     recinfo++;
1322     memset(null_flags, 255, null_pack_length);	// Set null fields
1323 
1324     table->null_flags= table->record[0];
1325     share->null_fields= null_count+ hidden_null_count;
1326     share->null_bytes= null_pack_length;
1327   }
1328   null_count= (blob_count == 0) ? 1 : 0;
1329   hidden_field_count=param->hidden_field_count;
1330   assert((uint)hidden_field_count <= field_count);
1331   for (i=0,reg_field=table->field; i < field_count; i++,reg_field++,recinfo++)
1332   {
1333     Field *field= *reg_field;
1334     uint length;
1335     memset(recinfo, 0, sizeof(*recinfo));
1336 
1337     if (!(field->flags & NOT_NULL_FLAG))
1338     {
1339       if (field->flags & GROUP_FLAG && !using_unique_constraint)
1340       {
1341 	/*
1342 	  We have to reserve one byte here for NULL bits,
1343 	  as this is updated by 'end_update()'
1344 	*/
1345 	*pos++=0;				// Null is stored here
1346 	recinfo->length=1;
1347 	recinfo->type=FIELD_NORMAL;
1348 	recinfo++;
1349 	memset(recinfo, 0, sizeof(*recinfo));
1350       }
1351       else
1352       {
1353 	recinfo->null_bit= (uint8)1 << (null_count & 7);
1354 	recinfo->null_pos= null_count/8;
1355       }
1356       field->move_field(pos,null_flags+null_count/8,
1357 			(uint8)1 << (null_count & 7));
1358       null_count++;
1359     }
1360     else
1361       field->move_field(pos,(uchar*) 0,0);
1362     if (field->type() == MYSQL_TYPE_BIT)
1363     {
1364       /* We have to reserve place for extra bits among null bits */
1365       ((Field_bit*) field)->set_bit_ptr(null_flags + null_count / 8,
1366                                         null_count & 7);
1367       null_count+= (field->field_length & 7);
1368     }
1369     field->reset();
1370 
1371     /*
1372       Test if there is a default field value. The test for ->ptr is to skip
1373       'offset' fields generated by initalize_tables
1374     */
1375     if (default_field[i] && default_field[i]->ptr)
1376     {
1377       /*
1378          default_field[i] is set only in the cases  when 'field' can
1379          inherit the default value that is defined for the field referred
1380          by the Item_field object from which 'field' has been created.
1381       */
1382       Field *orig_field= default_field[i];
1383       /* Get the value from default_values */
1384       my_ptrdiff_t diff= orig_field->table->default_values_offset();
1385       orig_field->move_field_offset(diff);      // Points now at default_values
1386       if (orig_field->is_real_null())
1387         field->set_null();
1388       else
1389       {
1390         field->set_notnull();
1391         memcpy(field->ptr, orig_field->ptr, field->pack_length());
1392       }
1393       orig_field->move_field_offset(-diff);     // Back to record[0]
1394     }
1395 
1396     if (from_field[i])
1397     {						/* Not a table Item */
1398       copy->set(field,from_field[i],save_sum_fields);
1399       copy++;
1400     }
1401     length=field->pack_length();
1402     pos+= length;
1403 
1404     /* Make entry for create table */
1405     recinfo->length=length;
1406     if (field->flags & BLOB_FLAG)
1407       recinfo->type= (int) FIELD_BLOB;
1408     else if (use_packed_rows &&
1409              field->real_type() == MYSQL_TYPE_STRING &&
1410 	     length >= MIN_STRING_LENGTH_TO_PACK_ROWS)
1411       recinfo->type=FIELD_SKIP_ENDSPACE;
1412     else if (use_packed_rows &&
1413              field->real_type() == MYSQL_TYPE_VARCHAR &&
1414              length >= MIN_STRING_LENGTH_TO_PACK_ROWS)
1415       recinfo->type= FIELD_VARCHAR;
1416     else
1417       recinfo->type=FIELD_NORMAL;
1418     if (!--hidden_field_count)
1419       null_count=(null_count+7) & ~7;		// move to next byte
1420 
1421     // fix table name in field entry
1422     field->table_name= &table->alias;
1423   }
1424 
1425   param->copy_field_end=copy;
1426   param->recinfo=recinfo;
1427   store_record(table,s->default_values);        // Make empty default record
1428 
1429   if (thd->variables.tmp_table_size == ~ (ulonglong) 0)		// No limit
1430     share->max_rows= ~(ha_rows) 0;
1431   else
1432     share->max_rows= (ha_rows) (((share->db_type() == heap_hton) ?
1433                                  min(thd->variables.tmp_table_size,
1434                                      thd->variables.max_heap_table_size) :
1435                                  thd->variables.tmp_table_size) /
1436 			         share->reclength);
1437   set_if_bigger(share->max_rows,1);		// For dummy start options
1438   /*
1439     Push the LIMIT clause to the temporary table creation, so that we
1440     materialize only up to 'rows_limit' records instead of all result records.
1441   */
1442   set_if_smaller(share->max_rows, rows_limit);
1443   param->end_write_records= rows_limit;
1444 
1445   if (group && !using_unique_constraint)
1446   {
1447     ORDER *cur_group= group;
1448     key_part_info= keyinfo->key_part;
1449     if (param->can_use_pk_for_unique)
1450       share->primary_key= 0;
1451     keyinfo->key_length= 0;  // Will compute the sum of the parts below.
1452     /*
1453       Here, we have to make the group fields point to the right record
1454       position.
1455     */
1456     for (; cur_group ; cur_group= cur_group->next, key_part_info++)
1457     {
1458       Field *field= (*cur_group->item)->get_tmp_table_field();
1459       assert(field->table == table);
1460       bool maybe_null= (*cur_group->item)->maybe_null;
1461       key_part_info->init_from_field(key_part_info->field);
1462       keyinfo->key_length+= key_part_info->store_length;
1463 
1464       cur_group->buff= (char*) group_buff;
1465       cur_group->field= field->new_key_field(thd->mem_root, table,
1466                                              group_buff + MY_TEST(maybe_null));
1467 
1468       if (!cur_group->field)
1469         goto err; /* purecov: inspected */
1470 
1471       if (maybe_null)
1472       {
1473         /*
1474           To be able to group on NULL, we reserved place in group_buff
1475           for the NULL flag just before the column. (see above).
1476           The field data is after this flag.
1477           The NULL flag is updated in 'end_update()' and 'end_write()'
1478         */
1479         keyinfo->flags|= HA_NULL_ARE_EQUAL;	// def. that NULL == NULL
1480         cur_group->buff++;                        // Pointer to field data
1481         group_buff++;                         // Skipp null flag
1482       }
1483       group_buff+= cur_group->field->pack_length();
1484     }
1485   }
1486 
1487   if (distinct && field_count != param->hidden_field_count &&
1488       !using_unique_constraint)
1489   {
1490     null_pack_length-=hidden_null_pack_length;
1491     key_part_info= keyinfo->key_part;
1492     if (param->can_use_pk_for_unique)
1493       share->primary_key= 0;
1494     keyinfo->key_length= 0;  // Will compute the sum of the parts below.
1495     /*
1496       Here, we have to make the key fields point to the right record
1497       position.
1498     */
1499     for (i=param->hidden_field_count, reg_field=table->field + i ;
1500          i < field_count;
1501          i++, reg_field++, key_part_info++)
1502     {
1503       key_part_info->init_from_field(*reg_field);
1504       keyinfo->key_length+= key_part_info->store_length;
1505     }
1506   }
1507 
1508   // Create a key over hash_field to enforce unique constraint
1509   if (using_unique_constraint)
1510   {
1511     KEY *hash_key;
1512     KEY_PART_INFO *hash_kpi;
1513 
1514     if (!multi_alloc_root(&table->mem_root,
1515                           &hash_key, sizeof(*hash_key),
1516                           &hash_kpi, sizeof(*hash_kpi), // Only one key part
1517                           NullS))
1518       goto err;
1519     table->key_info= share->key_info= hash_key;
1520     hash_key->table= table;
1521     hash_key->key_part= hash_kpi;
1522     hash_key->actual_flags= hash_key->flags= HA_NULL_ARE_EQUAL;
1523     hash_key->actual_key_parts= hash_key->usable_key_parts= 1;
1524     hash_key->user_defined_key_parts= 1;
1525     hash_key->set_rec_per_key_array(NULL, NULL);
1526     hash_key->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1527     hash_key->algorithm= HA_KEY_ALG_UNDEF;
1528     if (distinct)
1529       hash_key->name= (char*) "<hash_distinct_key>";
1530     else
1531       hash_key->name= (char*) "<hash_group_key>";
1532     hash_kpi->init_from_field(table->hash_field);
1533     hash_key->key_length= hash_kpi->store_length;
1534     param->keyinfo= hash_key;
1535   }
1536 
1537   if (thd->is_fatal_error)				// If end of memory
1538     goto err;					 /* purecov: inspected */
1539   share->db_record_offset= 1;
1540   if (!param->skip_create_table)
1541   {
1542     if (instantiate_tmp_table(table, param->keyinfo, param->start_recinfo,
1543                               &param->recinfo, select_options,
1544                               thd->variables.big_tables, &thd->opt_trace))
1545       goto err;
1546   }
1547 
1548   thd->mem_root= mem_root_save;
1549 
1550   DEBUG_SYNC(thd, "tmp_table_created");
1551 
1552   DBUG_RETURN(table);
1553 
1554 err:
1555   thd->mem_root= mem_root_save;
1556   free_tmp_table(thd,table);                    /* purecov: inspected */
1557   DBUG_RETURN(NULL);				/* purecov: inspected */
1558 }
1559 
1560 /*
1561   Create a temporary table to weed out duplicate rowid combinations
1562 
1563   SYNOPSIS
1564 
1565     create_duplicate_weedout_tmp_table()
1566       thd                    Thread handle
1567       uniq_tuple_length_arg  Length of the table's column
1568       sjtbl                  Update sjtbl->[start_]recinfo values which
1569                              will be needed if we'll need to convert the
1570                              created temptable from HEAP to MyISAM/Maria.
1571 
1572   DESCRIPTION
1573     Create a temporary table to weed out duplicate rowid combinations. The
1574     table has a single column that is a concatenation of all rowids in the
1575     combination.
1576 
1577     Depending on the needed length, there are two cases:
1578 
1579     1. When the length of the column < max_key_length:
1580 
1581       CREATE TABLE tmp (col VARBINARY(n) NOT NULL, UNIQUE KEY(col));
1582 
1583     2. Otherwise (not a valid SQL syntax but internally supported):
1584 
1585       CREATE TABLE tmp (col VARBINARY NOT NULL, UNIQUE CONSTRAINT(col));
1586 
1587     The code in this function was produced by extraction of relevant parts
1588     from create_tmp_table().
1589 
1590   RETURN
1591     created table
1592     NULL on error
1593 */
1594 
create_duplicate_weedout_tmp_table(THD * thd,uint uniq_tuple_length_arg,SJ_TMP_TABLE * sjtbl)1595 TABLE *create_duplicate_weedout_tmp_table(THD *thd,
1596                                           uint uniq_tuple_length_arg,
1597                                           SJ_TMP_TABLE *sjtbl)
1598 {
1599   MEM_ROOT *mem_root_save, own_root;
1600   TABLE *table;
1601   TABLE_SHARE *share;
1602   uint  temp_pool_slot=MY_BIT_NONE;
1603   char	*tmpname,path[FN_REFLEN];
1604   Field **reg_field;
1605   KEY_PART_INFO *key_part_info;
1606   KEY *keyinfo;
1607   uchar *group_buff;
1608   uchar *bitmaps;
1609   uint *blob_field;
1610   MI_COLUMNDEF *recinfo, *start_recinfo;
1611   bool using_unique_constraint=false;
1612   Field *field, *key_field;
1613   uint null_pack_length;
1614   uchar *null_flags;
1615   uchar	*pos;
1616   uint i;
1617 
1618   DBUG_ENTER("create_duplicate_weedout_tmp_table");
1619   assert(!sjtbl->is_confluent);
1620   /*
1621     STEP 1: Get temporary table name
1622   */
1623   thd->inc_status_created_tmp_tables();
1624   if (use_temp_pool && !(test_flags & TEST_KEEP_TMP_TABLES))
1625     temp_pool_slot = bitmap_lock_set_next(&temp_pool);
1626 
1627   if (temp_pool_slot != MY_BIT_NONE) // we got a slot
1628     sprintf(path, "%s_%lx_%i", tmp_file_prefix,
1629 	    current_pid, temp_pool_slot);
1630   else
1631   {
1632     /* if we run out of slots or we are not using tempool */
1633     assert(sizeof(my_thread_id) == 4);
1634     sprintf(path,"%s%lx_%x_%x", tmp_file_prefix,current_pid,
1635             thd->thread_id(), thd->tmp_table++);
1636   }
1637   fn_format(path, path, mysql_tmpdir, "", MY_REPLACE_EXT|MY_UNPACK_FILENAME);
1638 
1639   /* STEP 2: Figure if we'll be using a key or blob+constraint */
1640   if (uniq_tuple_length_arg > CONVERT_IF_BIGGER_TO_BLOB)
1641     using_unique_constraint= true;
1642 
1643   /* STEP 3: Allocate memory for temptable description */
1644   init_sql_alloc(key_memory_TABLE, &own_root, TABLE_ALLOC_BLOCK_SIZE, 0);
1645   if (!multi_alloc_root(&own_root,
1646                         &table, sizeof(*table),
1647                         &share, sizeof(*share),
1648                         &reg_field, sizeof(Field*) * (1+2),
1649                         &blob_field, sizeof(uint)*3,
1650                         &keyinfo, sizeof(*keyinfo),
1651                         &key_part_info, sizeof(*key_part_info) * 2,
1652                         &start_recinfo,
1653                         sizeof(*recinfo)*(1*2+2),
1654                         &tmpname, strlen(path)+1,
1655                         &group_buff, (!using_unique_constraint ?
1656                                       uniq_tuple_length_arg : 0),
1657                         &bitmaps, bitmap_buffer_size(1) * 3,
1658                         NullS))
1659   {
1660     if (temp_pool_slot != MY_BIT_NONE)
1661       bitmap_lock_clear_bit(&temp_pool, temp_pool_slot);
1662     DBUG_RETURN(NULL);
1663   }
1664   my_stpcpy(tmpname,path);
1665 
1666   /* STEP 4: Create TABLE description */
1667   new (table) TABLE;
1668   memset(reg_field, 0, sizeof(Field*) * 3);
1669 
1670   table->mem_root= own_root;
1671   mem_root_save= thd->mem_root;
1672   thd->mem_root= &table->mem_root;
1673 
1674   table->field=reg_field;
1675   table->alias= "weedout-tmp";
1676   table->reginfo.lock_type=TL_WRITE;	/* Will be updated */
1677   table->db_stat=HA_OPEN_KEYFILE+HA_OPEN_RNDFILE;
1678   table->temp_pool_slot = temp_pool_slot;
1679   table->copy_blobs= 1;
1680   table->in_use= thd;
1681   table->quick_keys.init();
1682   table->possible_quick_keys.init();
1683   table->covering_keys.init();
1684   table->keys_in_use_for_query.init();
1685 
1686   table->s= share;
1687   init_tmp_table_share(thd, share, "", 0, tmpname, tmpname);
1688   share->blob_field= blob_field;
1689   share->db_low_byte_first=1;                // True for HEAP and MyISAM
1690   share->table_charset= NULL;
1691   share->primary_key= MAX_KEY;               // Indicate no primary key
1692   share->keys_for_keyread.init();
1693   share->keys_in_use.init();
1694 
1695   uint reclength= 0;
1696   uint null_count= 0;
1697 
1698   /* Create the field */
1699   if (using_unique_constraint)
1700   {
1701     Field_longlong *field= new(&table->mem_root)
1702           Field_longlong(sizeof(ulonglong), false, "<hash_field>", true);
1703     if (!field)
1704     {
1705       assert(thd->is_fatal_error);
1706       goto err;				// Got OOM
1707     }
1708     // Mark hash_field as NOT NULL
1709     field->flags &= NOT_NULL_FLAG;
1710     *(reg_field++)= sjtbl->hash_field= field;
1711     table->hash_field= field;
1712     field->table= field->orig_table= table;
1713     share->fields++;
1714     field->field_index= 0;
1715     reclength= field->pack_length();
1716     table->hidden_field_count++;
1717   }
1718   {
1719     /*
1720       For the sake of uniformity, always use Field_varstring (altough we could
1721       use Field_string for shorter keys)
1722     */
1723     field= new Field_varstring(uniq_tuple_length_arg, FALSE, "rowids", share,
1724                                &my_charset_bin);
1725     if (!field)
1726       DBUG_RETURN(0);
1727     field->table= table;
1728     field->unireg_check= Field::NONE;
1729     field->flags= (NOT_NULL_FLAG | BINARY_FLAG | NO_DEFAULT_VALUE_FLAG);
1730     field->reset_fields();
1731     field->init(table);
1732     field->orig_table= NULL;
1733     *(reg_field++)= field;
1734     *blob_field= 0;
1735     *reg_field= 0;
1736 
1737     field->field_index= share->fields;
1738     share->fields++;
1739     share->blob_fields= 0;
1740     reclength+= field->pack_length();
1741     null_count++;
1742   }
1743 
1744   if (using_unique_constraint)
1745   {
1746     switch (internal_tmp_disk_storage_engine)
1747     {
1748     case TMP_TABLE_MYISAM:
1749       share->db_plugin= ha_lock_engine(0, myisam_hton);
1750       break;
1751     case TMP_TABLE_INNODB:
1752       share->db_plugin= ha_lock_engine(0, innodb_hton);
1753       break;
1754     default:
1755       assert(0);
1756       share->db_plugin= ha_lock_engine(0, innodb_hton);
1757     }
1758     table->file= get_new_handler(share, &table->mem_root,
1759                                  share->db_type());
1760   }
1761   else
1762   {
1763     share->db_plugin= ha_lock_engine(0, heap_hton);
1764     table->file= get_new_handler(share, &table->mem_root,
1765                                  share->db_type());
1766   }
1767 
1768 
1769   if (!table->file)
1770     goto err;
1771 
1772   if (table->file->set_ha_share_ref(&share->ha_share))
1773   {
1774     delete table->file;
1775     goto err;
1776   }
1777 
1778   null_pack_length= 1;
1779   reclength+= null_pack_length;
1780 
1781   share->reclength= reclength;
1782   {
1783     uint alloc_length= ALIGN_SIZE(share->reclength + MI_UNIQUE_HASH_LENGTH+1);
1784     share->rec_buff_length= alloc_length;
1785     if (!(table->record[0]= (uchar*)
1786                             alloc_root(&table->mem_root, alloc_length * 3)))
1787       goto err;
1788     table->record[1]= table->record[0] + alloc_length;
1789     share->default_values= table->record[1] + alloc_length;
1790   }
1791   setup_tmp_table_column_bitmaps(table, bitmaps);
1792 
1793   recinfo= start_recinfo;
1794   null_flags= table->record[0];
1795 
1796   pos= table->record[0] + null_pack_length;
1797   if (null_pack_length)
1798   {
1799     memset(recinfo, 0, sizeof(*recinfo));
1800     recinfo->type= FIELD_NORMAL;
1801     recinfo->length= null_pack_length;
1802     recinfo++;
1803     memset(null_flags, 255, null_pack_length);	// Set null fields
1804 
1805     table->null_flags= table->record[0];
1806     share->null_fields= null_count;
1807     share->null_bytes= null_pack_length;
1808   }
1809   null_count=1;
1810   for (i=0, reg_field=table->field; i < share->fields;
1811        i++, reg_field++, recinfo++)
1812   {
1813     Field *field= *reg_field;
1814     uint length;
1815     /* Table description for the concatenated rowid column */
1816     memset(recinfo, 0, sizeof(*recinfo));
1817 
1818     if (!(field->flags & NOT_NULL_FLAG))
1819     {
1820       if (field->flags & GROUP_FLAG && !using_unique_constraint)
1821       {
1822         /*
1823           We have to reserve one byte here for NULL bits,
1824           as this is updated by 'end_update()'
1825         */
1826         *pos++= 0;				// Null is stored here
1827         recinfo->length= 1;
1828         recinfo->type= FIELD_NORMAL;
1829         recinfo++;
1830         memset(recinfo, 0, sizeof(*recinfo));
1831       }
1832       else
1833       {
1834         recinfo->null_bit= (uint8)1 << (null_count & 7);
1835         recinfo->null_pos= null_count/8;
1836       }
1837       field->move_field(pos,null_flags+null_count/8,
1838                         (uint8)1 << (null_count & 7));
1839       null_count++;
1840     }
1841     else
1842       field->move_field(pos,(uchar*) 0, 0);
1843     if (field->type() == MYSQL_TYPE_BIT)
1844     {
1845       /* We have to reserve place for extra bits among null bits */
1846       ((Field_bit*) field)->set_bit_ptr(null_flags + null_count / 8,
1847                                         null_count & 7);
1848       null_count+= (field->field_length & 7);
1849     }
1850     field->reset();
1851 
1852     length= field->pack_length();
1853     pos+= length;
1854 
1855     /*
1856       Don't care about packing the VARCHAR since it's only a
1857       concatenation of rowids. @see create_tmp_table() for how
1858       packed VARCHARs can be achieved
1859     */
1860     recinfo->length= length;
1861     recinfo->type= FIELD_NORMAL;
1862 
1863     // fix table name in field entry
1864     field->table_name= &table->alias;
1865   }
1866 
1867   if (thd->variables.tmp_table_size == ~ (ulonglong) 0)		// No limit
1868     share->max_rows= ~(ha_rows) 0;
1869   else
1870     share->max_rows= (ha_rows) (((share->db_type() == heap_hton) ?
1871                                  min(thd->variables.tmp_table_size,
1872                                      thd->variables.max_heap_table_size) :
1873                                  thd->variables.tmp_table_size) /
1874 			         share->reclength);
1875   set_if_bigger(share->max_rows,1);		// For dummy start options
1876 
1877 
1878   // Create a key over param->hash_field to enforce unique constraint
1879   if (using_unique_constraint)
1880   {
1881     KEY *hash_key= keyinfo;
1882     KEY_PART_INFO *hash_kpi= key_part_info;
1883 
1884     share->keys= 1;
1885     table->key_info= share->key_info= hash_key;
1886     hash_key->table= table;
1887     hash_key->key_part= hash_kpi;
1888     hash_key->actual_flags= hash_key->flags= HA_NULL_ARE_EQUAL;
1889     hash_kpi->init_from_field(sjtbl->hash_field);
1890     hash_key->key_length= hash_kpi->store_length;
1891   }
1892   else
1893   {
1894     DBUG_PRINT("info",("Creating group key in temporary table"));
1895     share->keys=1;
1896     table->key_info= table->s->key_info= keyinfo;
1897     keyinfo->key_part=key_part_info;
1898     keyinfo->actual_flags= keyinfo->flags= HA_NOSAME;
1899     keyinfo->key_length=0;
1900     {
1901       key_part_info->init_from_field(field);
1902       assert(key_part_info->key_type == FIELDFLAG_BINARY);
1903 
1904       key_field= field->new_key_field(thd->mem_root, table, group_buff);
1905       if (!key_field)
1906         goto err;
1907       key_part_info->key_part_flag|= HA_END_SPACE_ARE_EQUAL; //todo need this?
1908       keyinfo->key_length+=  key_part_info->length;
1909     }
1910   }
1911   {
1912     table->key_info->user_defined_key_parts= 1;
1913     table->key_info->usable_key_parts= 1;
1914     table->key_info->actual_key_parts= table->key_info->user_defined_key_parts;
1915     table->key_info->set_rec_per_key_array(NULL, NULL);
1916     table->key_info->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1917     table->key_info->algorithm= HA_KEY_ALG_UNDEF;
1918     table->key_info->name= (char*) "weedout_key";
1919   }
1920 
1921   if (thd->is_fatal_error)				// If end of memory
1922     goto err;
1923   share->db_record_offset= 1;
1924   if (instantiate_tmp_table(table, table->key_info, start_recinfo, &recinfo,
1925                             0, 0, &thd->opt_trace))
1926     goto err;
1927 
1928   sjtbl->start_recinfo= start_recinfo;
1929   sjtbl->recinfo=       recinfo;
1930 
1931   thd->mem_root= mem_root_save;
1932   DBUG_RETURN(table);
1933 
1934 err:
1935   thd->mem_root= mem_root_save;
1936   table->file->ha_index_or_rnd_end();
1937   free_tmp_table(thd,table);                    /* purecov: inspected */
1938   DBUG_RETURN(NULL);				/* purecov: inspected */
1939 }
1940 
1941 
1942 /****************************************************************************/
1943 
1944 /**
1945   Create a reduced TABLE object with properly set up Field list from a
1946   list of field definitions.
1947 
1948     The created table doesn't have a table handler associated with
1949     it, has no keys, no group/distinct, no copy_funcs array.
1950     The sole purpose of this TABLE object is to use the power of Field
1951     class to read/write data to/from table->record[0]. Then one can store
1952     the record in any container (RB tree, hash, etc).
1953     The table is created in THD mem_root, so are the table's fields.
1954     Consequently, if you don't BLOB fields, you don't need to free it.
1955 
1956   @param thd         connection handle
1957   @param field_list  list of column definitions
1958 
1959   @return
1960     0 if out of memory, TABLE object in case of success
1961 */
1962 
create_virtual_tmp_table(THD * thd,List<Create_field> & field_list)1963 TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list)
1964 {
1965   uint field_count= field_list.elements;
1966   uint blob_count= 0;
1967   Field **field;
1968   Create_field *cdef;                           /* column definition */
1969   uint record_length= 0;
1970   uint null_count= 0;                 /* number of columns which may be null */
1971   uint null_pack_length;              /* NULL representation array length */
1972   uint *blob_field;
1973   uchar *bitmaps;
1974   TABLE *table;
1975   TABLE_SHARE *share;
1976 
1977   if (!multi_alloc_root(thd->mem_root,
1978                         &table, sizeof(*table),
1979                         &share, sizeof(*share),
1980                         &field, (field_count + 1) * sizeof(Field*),
1981                         &blob_field, (field_count+1) *sizeof(uint),
1982                         &bitmaps, bitmap_buffer_size(field_count) * 3,
1983                         NullS))
1984     return 0;
1985 
1986   new (table) TABLE;
1987   new (share) TABLE_SHARE;
1988   table->field= field;
1989   table->s= share;
1990   table->temp_pool_slot= MY_BIT_NONE;
1991   share->blob_field= blob_field;
1992   share->fields= field_count;
1993   share->db_low_byte_first=1;                // True for HEAP and MyISAM
1994   setup_tmp_table_column_bitmaps(table, bitmaps);
1995 
1996   /* Create all fields and calculate the total length of record */
1997   List_iterator_fast<Create_field> it(field_list);
1998   while ((cdef= it++))
1999   {
2000     *field= make_field(share, 0, cdef->length,
2001                        (uchar*) (f_maybe_null(cdef->pack_flag) ? "" : 0),
2002                        f_maybe_null(cdef->pack_flag) ? 1 : 0,
2003                        cdef->pack_flag, cdef->sql_type, cdef->charset,
2004                        cdef->geom_type, cdef->unireg_check,
2005                        cdef->interval, cdef->field_name);
2006     if (!*field)
2007       goto error;
2008     (*field)->init(table);
2009     record_length+= (*field)->pack_length();
2010     if (! ((*field)->flags & NOT_NULL_FLAG))
2011       null_count++;
2012 
2013     if ((*field)->flags & BLOB_FLAG)
2014       share->blob_field[blob_count++]= (uint) (field - table->field);
2015 
2016     field++;
2017   }
2018   *field= NULL;                             /* mark the end of the list */
2019   share->blob_field[blob_count]= 0;            /* mark the end of the list */
2020   share->blob_fields= blob_count;
2021 
2022   null_pack_length= (null_count + 7)/8;
2023   share->reclength= record_length + null_pack_length;
2024   share->rec_buff_length= ALIGN_SIZE(share->reclength + 1);
2025   table->record[0]= (uchar*) thd->alloc(share->rec_buff_length);
2026   if (!table->record[0])
2027     goto error;
2028 
2029   if (null_pack_length)
2030   {
2031     table->null_flags= table->record[0];
2032     share->null_fields= null_count;
2033     share->null_bytes= null_pack_length;
2034   }
2035 
2036   table->in_use= thd;           /* field->reset() may access table->in_use */
2037   {
2038     /* Set up field pointers */
2039     uchar *null_pos= table->record[0];
2040     uchar *field_pos= null_pos + share->null_bytes;
2041     uint null_bit= 1;
2042 
2043     for (field= table->field; *field; ++field)
2044     {
2045       Field *cur_field= *field;
2046       if ((cur_field->flags & NOT_NULL_FLAG))
2047         cur_field->move_field(field_pos);
2048       else
2049       {
2050         cur_field->move_field(field_pos, null_pos, null_bit);
2051         null_bit<<= 1;
2052         if (null_bit == (uint8)1 << 8)
2053         {
2054           ++null_pos;
2055           null_bit= 1;
2056         }
2057       }
2058       if (cur_field->type() == MYSQL_TYPE_BIT &&
2059           cur_field->key_type() == HA_KEYTYPE_BIT)
2060       {
2061         /* This is a Field_bit since key_type is HA_KEYTYPE_BIT */
2062         static_cast<Field_bit*>(cur_field)->set_bit_ptr(null_pos, null_bit);
2063         null_bit+= cur_field->field_length & 7;
2064         if (null_bit > 7)
2065         {
2066           null_pos++;
2067           null_bit-= 8;
2068         }
2069       }
2070       cur_field->reset();
2071 
2072       field_pos+= cur_field->pack_length();
2073     }
2074   }
2075   return table;
2076 error:
2077   for (field= table->field; *field; ++field)
2078     delete *field;                         /* just invokes field destructor */
2079   return 0;
2080 }
2081 
2082 
open_tmp_table(TABLE * table)2083 bool open_tmp_table(TABLE *table)
2084 {
2085   int error;
2086   if ((error=table->file->ha_open(table, table->s->table_name.str,O_RDWR,
2087                                   HA_OPEN_TMP_TABLE | HA_OPEN_INTERNAL_TABLE)))
2088   {
2089     table->file->print_error(error,MYF(0)); /* purecov: inspected */
2090     table->db_stat=0;
2091     return(1);
2092   }
2093   (void) table->file->extra(HA_EXTRA_QUICK);		/* Faster */
2094 
2095   table->set_created();
2096 
2097   return false;
2098 }
2099 
2100 
2101 /*
2102   Create MyISAM temporary table
2103 
2104   SYNOPSIS
2105     create_myisam_tmp_table()
2106       table           Table object that descrimes the table to be created
2107       keyinfo         Description of the index (there is always one index)
2108       start_recinfo   MyISAM's column descriptions
2109       recinfo INOUT   End of MyISAM's column descriptions
2110       options         Option bits
2111 
2112   DESCRIPTION
2113     Create a MyISAM temporary table according to passed description. The is
2114     assumed to have one unique index or constraint.
2115 
2116     The passed array or MI_COLUMNDEF structures must have this form:
2117 
2118       1. 1-byte column (afaiu for 'deleted' flag) (note maybe not 1-byte
2119          when there are many nullable columns)
2120       2. Table columns
2121       3. One free MI_COLUMNDEF element (*recinfo points here)
2122 
2123     This function may use the free element to create hash column for unique
2124     constraint.
2125 
2126    RETURN
2127      FALSE - OK
2128      TRUE  - Error
2129 */
2130 
create_myisam_tmp_table(TABLE * table,KEY * keyinfo,MI_COLUMNDEF * start_recinfo,MI_COLUMNDEF ** recinfo,ulonglong options,my_bool big_tables)2131 bool create_myisam_tmp_table(TABLE *table, KEY *keyinfo,
2132                              MI_COLUMNDEF *start_recinfo,
2133                              MI_COLUMNDEF **recinfo,
2134                              ulonglong options, my_bool big_tables)
2135 {
2136   int error;
2137   MI_KEYDEF keydef;
2138   MI_UNIQUEDEF uniquedef;
2139   TABLE_SHARE *share= table->s;
2140   DBUG_ENTER("create_myisam_tmp_table");
2141 
2142   if (share->keys)
2143   {						// Get keys for ni_create
2144     if (share->keys > 1)
2145     {
2146       assert(0); // This code can't handle more than 1 key
2147       share->keys= 1;
2148     }
2149     HA_KEYSEG *seg= (HA_KEYSEG*) alloc_root(&table->mem_root,
2150                                             sizeof(*seg) *
2151                                             keyinfo->user_defined_key_parts);
2152     if (!seg)
2153       goto err;
2154 
2155     memset(seg, 0, sizeof(*seg) * keyinfo->user_defined_key_parts);
2156 
2157     /* Create an unique key */
2158     memset(&keydef, 0, sizeof(keydef));
2159     keydef.flag= static_cast<uint16>(keyinfo->flags);
2160     keydef.keysegs=  keyinfo->user_defined_key_parts;
2161     keydef.seg= seg;
2162 
2163     for (uint i=0; i < keyinfo->user_defined_key_parts ; i++,seg++)
2164     {
2165       Field *field=keyinfo->key_part[i].field;
2166       seg->flag=     0;
2167       seg->language= field->charset()->number;
2168       seg->length=   keyinfo->key_part[i].length;
2169       seg->start=    keyinfo->key_part[i].offset;
2170       if (field->flags & BLOB_FLAG)
2171       {
2172 	seg->type=
2173 	((keyinfo->key_part[i].key_type & FIELDFLAG_BINARY) ?
2174 	 HA_KEYTYPE_VARBINARY2 : HA_KEYTYPE_VARTEXT2);
2175 	seg->bit_start= (uint8)(field->pack_length() -
2176                                 portable_sizeof_char_ptr);
2177 	seg->flag= HA_BLOB_PART;
2178 	seg->length=0;			// Whole blob in unique constraint
2179       }
2180       else
2181       {
2182 	seg->type= keyinfo->key_part[i].type;
2183         /* Tell handler if it can do suffic space compression */
2184 	if (field->real_type() == MYSQL_TYPE_STRING &&
2185 	    keyinfo->key_part[i].length > 4)
2186 	  seg->flag|= HA_SPACE_PACK;
2187       }
2188       if (!(field->flags & NOT_NULL_FLAG))
2189       {
2190 	seg->null_bit= field->null_bit;
2191 	seg->null_pos= field->null_offset();
2192       }
2193     }
2194   }
2195   MI_CREATE_INFO create_info;
2196   memset(&create_info, 0, sizeof(create_info));
2197 
2198   if (big_tables && !(options & SELECT_SMALL_RESULT))
2199     create_info.data_file_length= ~(ulonglong) 0;
2200 
2201   if ((error=mi_create(share->table_name.str, share->keys, &keydef,
2202                        (uint) (*recinfo - start_recinfo),
2203                        start_recinfo,
2204                        0, &uniquedef,
2205                        &create_info,
2206                        HA_CREATE_TMP_TABLE | HA_CREATE_INTERNAL_TABLE |
2207                        ((share->db_create_options & HA_OPTION_PACK_RECORD) ?
2208                         HA_PACK_RECORD : 0)
2209                        )))
2210   {
2211     table->file->print_error(error,MYF(0));	/* purecov: inspected */
2212     /*
2213       Table name which was allocated from temp-pool is already occupied
2214       in SE. Probably we hit a bug in server or some problem with system
2215       configuration. Prevent problem from re-occurring by marking temp-pool
2216       slot for this name as permanently busy, to do this we only need to set
2217       TABLE::temp_pool_slot to MY_BIT_NONE in order to avoid freeing it
2218       in free_tmp_table().
2219     */
2220     if (error == EEXIST)
2221       table->temp_pool_slot= MY_BIT_NONE;
2222 
2223     table->db_stat=0;
2224     goto err;
2225   }
2226   table->in_use->inc_status_created_tmp_disk_tables();
2227   share->db_record_offset= 1;
2228   DBUG_RETURN(0);
2229  err:
2230   DBUG_RETURN(1);
2231 }
2232 
2233 /*
2234   Create InnoDB temporary table
2235 
2236   SYNOPSIS
2237     create_innodb_tmp_table()
2238       table           Table object that describes the table to be created
2239       keyinfo         Description of the index (there is always one index)
2240 
2241   DESCRIPTION
2242     Create an InnoDB temporary table according to passed description. It is
2243     assumed to have one unique index or constraint.
2244 
2245     The passed array or MI_COLUMNDEF structures must have this form:
2246 
2247       1. 1-byte column (afaiu for 'deleted' flag) (note maybe not 1-byte
2248          when there are many nullable columns)
2249       2. Table columns
2250       3. One free MI_COLUMNDEF element (*recinfo points here)
2251 
2252     This function may use the free element to create hash column for unique
2253     constraint.
2254 
2255    RETURN
2256      FALSE - OK
2257      TRUE  - Error
2258 */
create_innodb_tmp_table(TABLE * table,KEY * keyinfo)2259 bool create_innodb_tmp_table(TABLE *table, KEY *keyinfo)
2260 {
2261   TABLE_SHARE *share= table->s;
2262 
2263   DBUG_ENTER("create_innodb_tmp_table");
2264 
2265   HA_CREATE_INFO create_info;
2266 
2267   create_info.db_type= table->s->db_type();
2268   create_info.row_type= table->s->row_type;
2269   create_info.options|= HA_LEX_CREATE_TMP_TABLE |
2270                         HA_LEX_CREATE_INTERNAL_TMP_TABLE;
2271 
2272   table->file->adjust_create_info_for_frm(&create_info);
2273 
2274   /*
2275     INNODB's fixed length column size is restricted to 1024. Exceeding this can
2276     result in incorrect behavior.
2277   */
2278   if (table->s->db_type() == innodb_hton)
2279   {
2280     for (Field **field= table->field; *field; ++field)
2281     {
2282       if ((*field)->type() == MYSQL_TYPE_STRING &&
2283           (*field)->key_length() > 1024)
2284       {
2285         my_error(ER_TOO_LONG_KEY, MYF(0), 1024);
2286         DBUG_RETURN(true);
2287       }
2288     }
2289   }
2290 
2291   int error;
2292   if ((error= table->file->create(share->table_name.str, table, &create_info)))
2293   {
2294     table->file->print_error(error,MYF(0));    /* purecov: inspected */
2295     /*
2296       Table name which was allocated from temp-pool is already occupied
2297       in SE. Probably we hit a bug in server or some problem with system
2298       configuration. Prevent problem from re-occurring by marking temp-pool
2299       slot for this name as permanently busy, to do this we only need to set
2300       TABLE::temp_pool_slot to MY_BIT_NONE in order to avoid freeing it
2301       in free_tmp_table().
2302 
2303       Note that currently InnoDB never reports an error in this case but
2304       instead aborts on failed assertion. So the below if-statement is here
2305       mostly to make code future-proof and consistent with MyISAM case.
2306    */
2307 
2308    if (error == HA_ERR_FOUND_DUPP_KEY || error == HA_ERR_TABLESPACE_EXISTS ||
2309        error == HA_ERR_TABLE_EXIST)
2310      table->temp_pool_slot= MY_BIT_NONE;
2311     table->db_stat= 0;
2312     DBUG_RETURN(true);
2313   }
2314   else
2315   {
2316     table->in_use->inc_status_created_tmp_disk_tables();
2317     share->db_record_offset= 1;
2318     DBUG_RETURN(false);
2319   }
2320 }
2321 
trace_tmp_table(Opt_trace_context * trace,const TABLE * table)2322 static void trace_tmp_table(Opt_trace_context *trace, const TABLE *table)
2323 {
2324   Opt_trace_object trace_tmp(trace, "tmp_table_info");
2325   if (strlen(table->alias) != 0)
2326     trace_tmp.add_utf8_table(table->pos_in_table_list);
2327   else
2328     trace_tmp.add_alnum("table", "intermediate_tmp_table");
2329 
2330   trace_tmp.add("row_length",table->s->reclength).
2331     add("key_length", table->s->key_info ?
2332         table->s->key_info->key_length : 0).
2333     add("unique_constraint", table->hash_field ? true : false);
2334 
2335   if (table->s->db_type() == myisam_hton)
2336   {
2337     trace_tmp.add_alnum("location", "disk (MyISAM)");
2338     if (table->s->db_create_options & HA_OPTION_PACK_RECORD)
2339       trace_tmp.add_alnum("record_format", "packed");
2340     else
2341       trace_tmp.add_alnum("record_format", "fixed");
2342   }
2343   else if(table->s->db_type() == innodb_hton)
2344   {
2345     trace_tmp.add_alnum("location", "disk (InnoDB)");
2346     if (table->s->db_create_options & HA_OPTION_PACK_RECORD)
2347       trace_tmp.add_alnum("record_format", "packed");
2348     else
2349       trace_tmp.add_alnum("record_format", "fixed");
2350   }
2351   else
2352   {
2353     assert(table->s->db_type() == heap_hton);
2354     trace_tmp.add_alnum("location", "memory (heap)").
2355       add("row_limit_estimate", table->s->max_rows);
2356   }
2357 }
2358 
2359 /**
2360   @brief
2361   Instantiates temporary table
2362 
2363   @param  table           Table object that describes the table to be
2364                           instantiated
2365   @param  keyinfo         Description of the index (there is always one index)
2366   @param  start_recinfo   Column descriptions
2367   @param  recinfo INOUT   End of column descriptions
2368   @param  options         Option bits
2369   @param  trace           Optimizer trace to write info to
2370 
2371   @details
2372     Creates tmp table and opens it.
2373 
2374   @return
2375      FALSE - OK
2376      TRUE  - Error
2377 */
2378 
instantiate_tmp_table(TABLE * table,KEY * keyinfo,MI_COLUMNDEF * start_recinfo,MI_COLUMNDEF ** recinfo,ulonglong options,my_bool big_tables,Opt_trace_context * trace)2379 bool instantiate_tmp_table(TABLE *table, KEY *keyinfo,
2380                            MI_COLUMNDEF *start_recinfo,
2381                            MI_COLUMNDEF **recinfo,
2382                            ulonglong options, my_bool big_tables,
2383                            Opt_trace_context *trace)
2384 {
2385 #ifndef NDEBUG
2386   for (uint i= 0; i < table->s->fields; i++)
2387     assert(table->field[i]->gcol_info== NULL && table->field[i]->stored_in_db);
2388 #endif
2389 
2390   if (table->s->db_type() == innodb_hton)
2391   {
2392     if (create_innodb_tmp_table(table, keyinfo))
2393       return TRUE;
2394     // Make empty record so random data is not written to disk
2395     empty_record(table);
2396   }
2397   else if (table->s->db_type() == myisam_hton)
2398   {
2399     if (create_myisam_tmp_table(table, keyinfo, start_recinfo, recinfo,
2400                                 options, big_tables))
2401       return TRUE;
2402     // Make empty record so random data is not written to disk
2403     empty_record(table);
2404   }
2405 
2406   if (open_tmp_table(table))
2407   {
2408     table->file->ha_delete_table(table->s->table_name.str);
2409     return TRUE;
2410   }
2411 
2412   if (unlikely(trace->is_started()))
2413   {
2414     Opt_trace_object wrapper(trace);
2415     Opt_trace_object convert(trace, "creating_tmp_table");
2416     trace_tmp_table(trace, table);
2417   }
2418   return FALSE;
2419 }
2420 
2421 void
free_tmp_table(THD * thd,TABLE * entry)2422 free_tmp_table(THD *thd, TABLE *entry)
2423 {
2424   MEM_ROOT own_root= entry->mem_root;
2425   const char *save_proc_info;
2426   DBUG_ENTER("free_tmp_table");
2427   DBUG_PRINT("enter",("table: %s",entry->alias));
2428 
2429   save_proc_info=thd->proc_info;
2430   THD_STAGE_INFO(thd, stage_removing_tmp_table);
2431 
2432   thd->tmp_tables_used++;
2433   if (entry->file)
2434   {
2435       thd->tmp_tables_size += entry->file->stats.data_file_length;
2436       if (entry->file->ht->db_type != DB_TYPE_HEAP)
2437           thd->tmp_tables_disk_used++;
2438   }
2439 
2440   // Release latches since this can take a long time
2441   ha_release_temporary_latches(thd);
2442 
2443   filesort_free_buffers(entry, true);
2444 
2445   if (entry->is_created())
2446   {
2447     if (entry->db_stat)
2448       entry->file->ha_drop_table(entry->s->table_name.str);
2449     else
2450       entry->file->ha_delete_table(entry->s->table_name.str);
2451     delete entry->file;
2452     entry->file= NULL;
2453 
2454     entry->set_deleted();
2455   }
2456   /* free blobs */
2457   for (Field **ptr=entry->field ; *ptr ; ptr++)
2458     (*ptr)->mem_free();
2459   free_io_cache(entry);
2460 
2461   if (entry->temp_pool_slot != MY_BIT_NONE)
2462     bitmap_lock_clear_bit(&temp_pool, entry->temp_pool_slot);
2463 
2464   plugin_unlock(0, entry->s->db_plugin);
2465 
2466   free_root(&own_root, MYF(0)); /* the table is allocated in its own root */
2467   thd_proc_info(thd, save_proc_info);
2468 
2469   DBUG_VOID_RETURN;
2470 }
2471 
2472 /**
2473   If a MEMORY table gets full, create a disk-based table and copy all rows
2474   to this.
2475 
2476   @param thd             THD reference
2477   @param table           Table reference
2478   @param start_recinfo   Engine's column descriptions
2479   @param recinfo[in,out] End of engine's column descriptions
2480   @param error           Reason why inserting into MEMORY table failed.
2481   @param ignore_last_dup If true, ignore duplicate key error for last
2482                          inserted key (see detailed description below).
2483   @param is_duplicate[out] if non-NULL and ignore_last_dup is TRUE,
2484                          return TRUE if last key was a duplicate,
2485                          and FALSE otherwise.
2486 
2487   @detail
2488     Function can be called with any error code, but only HA_ERR_RECORD_FILE_FULL
2489     will be handled, all other errors cause a fatal error to be thrown.
2490     The function creates a disk-based temporary table, copies all records
2491     from the MEMORY table into this new table, deletes the old table and
2492     switches to use the new table within the table handle.
2493     The function uses table->record[1] as a temporary buffer while copying.
2494 
2495     The function assumes that table->record[0] contains the row that caused
2496     the error when inserting into the MEMORY table (the "last row").
2497     After all existing rows have been copied to the new table, the last row
2498     is attempted to be inserted as well. If ignore_last_dup is true,
2499     this row can be a duplicate of an existing row without throwing an error.
2500     If is_duplicate is non-NULL, an indication of whether the last row was
2501     a duplicate is returned.
2502 
2503   @note that any index/scan access initialized on the MEMORY table is not
2504   replicated to the on-disk table - it's the caller's responsibility.
2505 */
2506 
create_ondisk_from_heap(THD * thd,TABLE * table,MI_COLUMNDEF * start_recinfo,MI_COLUMNDEF ** recinfo,int error,bool ignore_last_dup,bool * is_duplicate)2507 bool create_ondisk_from_heap(THD *thd, TABLE *table,
2508                              MI_COLUMNDEF *start_recinfo,
2509                              MI_COLUMNDEF **recinfo,
2510 			     int error, bool ignore_last_dup,
2511                              bool *is_duplicate)
2512 {
2513   TABLE new_table;
2514   TABLE_SHARE share;
2515   const char *save_proc_info;
2516   int write_err;
2517   DBUG_ENTER("create_ondisk_from_heap");
2518 
2519   if (table->s->db_type() != heap_hton ||
2520       error != HA_ERR_RECORD_FILE_FULL)
2521   {
2522     /*
2523       We don't want this error to be converted to a warning, e.g. in case of
2524       INSERT IGNORE ... SELECT.
2525     */
2526     table->file->print_error(error, MYF(ME_FATALERROR));
2527     DBUG_RETURN(1);
2528   }
2529 
2530   // Release latches since this can take a long time
2531   ha_release_temporary_latches(thd);
2532 
2533   new_table= *table;
2534   share= *table->s;
2535   share.ha_share= NULL;
2536   new_table.s= &share;
2537   switch (internal_tmp_disk_storage_engine)
2538   {
2539   case TMP_TABLE_MYISAM:
2540     new_table.s->db_plugin= ha_lock_engine(thd, myisam_hton);
2541     break;
2542   case TMP_TABLE_INNODB:
2543     new_table.s->db_plugin= ha_lock_engine(thd, innodb_hton);
2544     break;
2545   default:
2546     assert(0);
2547     new_table.s->db_plugin= ha_lock_engine(thd, innodb_hton);
2548   }
2549 
2550   if (!(new_table.file= get_new_handler(&share, &new_table.mem_root,
2551                                         new_table.s->db_type())))
2552     DBUG_RETURN(1);				// End of memory
2553   if (new_table.file->set_ha_share_ref(&share.ha_share))
2554   {
2555     delete new_table.file;
2556     DBUG_RETURN(1);
2557   }
2558   save_proc_info=thd->proc_info;
2559   THD_STAGE_INFO(thd, stage_converting_heap_to_ondisk);
2560 
2561   if (share.db_type() == myisam_hton)
2562   {
2563     if (create_myisam_tmp_table(&new_table, table->s->key_info,
2564                                 start_recinfo, recinfo,
2565                                 thd->lex->select_lex->active_options(),
2566                                 thd->variables.big_tables))
2567       goto err2;
2568   }
2569   else if (share.db_type() == innodb_hton)
2570   {
2571     if (create_innodb_tmp_table(&new_table, table->s->key_info))
2572       goto err2;
2573   }
2574 
2575   if (open_tmp_table(&new_table))
2576     goto err1;
2577 
2578 
2579   if (unlikely(thd->opt_trace.is_started()))
2580   {
2581     Opt_trace_context * trace= &thd->opt_trace;
2582     Opt_trace_object wrapper(trace);
2583     Opt_trace_object convert(trace, "converting_tmp_table_to_ondisk");
2584     assert(error == HA_ERR_RECORD_FILE_FULL);
2585     convert.add_alnum("cause", "memory_table_size_exceeded");
2586     trace_tmp_table(trace, &new_table);
2587   }
2588 
2589   if (table->file->indexes_are_disabled())
2590     new_table.file->ha_disable_indexes(HA_KEY_SWITCH_ALL);
2591   table->file->ha_index_or_rnd_end();
2592   if ((write_err= table->file->ha_rnd_init(1)))
2593   {
2594     table->file->print_error(write_err, MYF(ME_FATALERROR));
2595     write_err= 0;
2596     goto err;
2597   }
2598   if (table->no_rows)
2599   {
2600     new_table.file->extra(HA_EXTRA_NO_ROWS);
2601     new_table.no_rows=1;
2602   }
2603 
2604   /* HA_EXTRA_WRITE_CACHE can stay until close, no need to disable it */
2605   new_table.file->extra(HA_EXTRA_WRITE_CACHE);
2606 
2607   /*
2608     copy all old rows from heap table to on-disk table
2609     This is the only code that uses record[1] to read/write but this
2610     is safe as this is a temporary on-disk table without timestamp/
2611     autoincrement or partitioning.
2612   */
2613   while (!table->file->ha_rnd_next(new_table.record[1]))
2614   {
2615     write_err= new_table.file->ha_write_row(new_table.record[1]);
2616     DBUG_EXECUTE_IF("raise_error", write_err= HA_ERR_FOUND_DUPP_KEY ;);
2617     if (write_err)
2618       goto err;
2619   }
2620   /* copy row that filled HEAP table */
2621   if ((write_err=new_table.file->ha_write_row(table->record[0])))
2622   {
2623     if (!new_table.file->is_ignorable_error(write_err) ||
2624 	!ignore_last_dup)
2625       goto err;
2626     if (is_duplicate)
2627       *is_duplicate= TRUE;
2628   }
2629   else
2630   {
2631     if (is_duplicate)
2632       *is_duplicate= FALSE;
2633   }
2634 
2635   /* remove heap table and change to use on-disk table */
2636   (void) table->file->ha_rnd_end();
2637   (void) table->file->ha_close();              // This deletes the table !
2638   delete table->file;
2639   table->file=0;
2640   plugin_unlock(0, table->s->db_plugin);
2641   share.db_plugin= my_plugin_lock(0, &share.db_plugin);
2642   new_table.s= table->s;                       // Keep old share
2643   *table= new_table;
2644   *table->s= share;
2645   /* Update quick select, if any. */
2646   {
2647     QEP_TAB *tab= table->reginfo.qep_tab;
2648     assert(tab || !table->reginfo.join_tab);
2649     if (tab && tab->quick())
2650     {
2651       /*
2652         This could happen only with result of derived table/view
2653         materialization.
2654       */
2655       assert(tab->table_ref && tab->table_ref->uses_materialization());
2656       tab->quick()->set_handler(table->file);
2657     }
2658   }
2659   table->file->change_table_ptr(table, table->s);
2660   table->use_all_columns();
2661   if (save_proc_info)
2662     thd_proc_info(thd, (!strcmp(save_proc_info,"Copying to tmp table") ?
2663                   "Copying to tmp table on disk" : save_proc_info));
2664   DBUG_RETURN(0);
2665 
2666  err:
2667   if (write_err)
2668   {
2669     DBUG_PRINT("error",("Got error: %d",write_err));
2670     new_table.file->print_error(write_err, MYF(0));
2671   }
2672   if (table->file->inited)
2673     (void) table->file->ha_rnd_end();
2674   (void) new_table.file->ha_close();
2675  err1:
2676   new_table.file->ha_delete_table(new_table.s->table_name.str);
2677  err2:
2678   delete new_table.file;
2679   thd_proc_info(thd, save_proc_info);
2680   table->mem_root= new_table.mem_root;
2681   DBUG_RETURN(1);
2682 }
2683