1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 /** @file Temporary tables implementation */
24 
25 #include "sql_tmp_table.h"
26 
27 #include "myisam.h"               // MI_COLUMNDEF
28 #include "debug_sync.h"           // DEBUG_SYNC
29 #include "filesort.h"             // filesort_free_buffers
30 #include "item_func.h"            // Item_func
31 #include "item_sum.h"             // Item_sum
32 #include "mem_root_array.h"       // Mem_root_array
33 #include "opt_range.h"            // QUICK_SELECT_I
34 #include "opt_trace.h"            // Opt_trace_object
35 #include "opt_trace_context.h"    // Opt_trace_context
36 #include "sql_base.h"             // free_io_cache
37 #include "sql_class.h"            // THD
38 #include "sql_executor.h"         // SJ_TMP_TABLE
39 #include "sql_plugin.h"           // plugin_unlock
40 
41 #include <algorithm>
42 
43 using std::max;
44 using std::min;
45 
46 /****************************************************************************
47   Create internal temporary table
48 ****************************************************************************/
49 
50 /**
51   Create field for temporary table from given field.
52 
53   @param thd	       Thread handler
54   @param org_field    field from which new field will be created
55   @param name         New field name
56   @param table	       Temporary table
57   @param item	       !=NULL if item->result_field should point to new field.
58                       This is relevant for how fill_record() is going to work:
59                       If item != NULL then fill_record() will update
60                       the record in the original table.
61                       If item == NULL then fill_record() will update
62                       the temporary table
63 
64   @retval
65     NULL		on error
66   @retval
67     new_created field
68 */
69 
create_tmp_field_from_field(THD * thd,Field * org_field,const char * name,TABLE * table,Item_field * item)70 Field *create_tmp_field_from_field(THD *thd, Field *org_field,
71                                    const char *name, TABLE *table,
72                                    Item_field *item)
73 {
74   Field *new_field;
75 
76   new_field= org_field->new_field(thd->mem_root, table,
77                                   table == org_field->table);
78   if (new_field)
79   {
80     new_field->init(table);
81     new_field->orig_table= org_field->table;
82     if (item)
83       item->result_field= new_field;
84     else
85       new_field->field_name= name;
86     new_field->flags|= (org_field->flags & NO_DEFAULT_VALUE_FLAG);
87     if (org_field->maybe_null() || (item && item->maybe_null))
88       new_field->flags&= ~NOT_NULL_FLAG;	// Because of outer join
89     if (org_field->type() == FIELD_TYPE_DOUBLE)
90       ((Field_double *) new_field)->not_fixed= TRUE;
91     /*
92       This field will belong to an internal temporary table, it cannot be
93       generated.
94     */
95     new_field->gcol_info= NULL;
96     new_field->stored_in_db= true;
97   }
98   return new_field;
99 }
100 
101 /**
102   Create field for temporary table using type of given item.
103 
104   @param thd                   Thread handler
105   @param item                  Item to create a field for
106   @param table                 Temporary table
107   @param copy_func             If set and item is a function, store copy of
108                                item in this array
109   @param modify_item           1 if item->result_field should point to new
110                                item. This is relevent for how fill_record()
111                                is going to work:
112                                If modify_item is 1 then fill_record() will
113                                update the record in the original table.
114                                If modify_item is 0 then fill_record() will
115                                update the temporary table
116 
117   @retval
118     0  on error
119   @retval
120     new_created field
121 */
122 
create_tmp_field_from_item(THD * thd,Item * item,TABLE * table,Func_ptr_array * copy_func,bool modify_item)123 static Field *create_tmp_field_from_item(THD *thd, Item *item, TABLE *table,
124                                          Func_ptr_array *copy_func,
125                                          bool modify_item)
126 {
127   bool maybe_null= item->maybe_null;
128   Field *new_field= NULL;
129 
130   switch (item->result_type()) {
131   case REAL_RESULT:
132     new_field= new Field_double(item->max_length, maybe_null,
133                                 item->item_name.ptr(), item->decimals, TRUE);
134     break;
135   case INT_RESULT:
136     /*
137       Select an integer type with the minimal fit precision.
138       MY_INT32_NUM_DECIMAL_DIGITS is sign inclusive, don't consider the sign.
139       Values with MY_INT32_NUM_DECIMAL_DIGITS digits may or may not fit into
140       Field_long : make them Field_longlong.
141     */
142     if (item->max_length >= (MY_INT32_NUM_DECIMAL_DIGITS - 1))
143       new_field=new Field_longlong(item->max_length, maybe_null,
144                                    item->item_name.ptr(), item->unsigned_flag);
145     else
146       new_field=new Field_long(item->max_length, maybe_null,
147                                item->item_name.ptr(), item->unsigned_flag);
148     break;
149   case STRING_RESULT:
150     assert(item->collation.collation);
151 
152     /*
153       DATE/TIME, GEOMETRY and JSON fields have STRING_RESULT result type.
154       To preserve type they needed to be handled separately.
155     */
156     if (item->is_temporal() ||
157         item->field_type() == MYSQL_TYPE_GEOMETRY ||
158         item->field_type() == MYSQL_TYPE_JSON)
159     {
160       new_field= item->tmp_table_field_from_field_type(table, 1);
161     }
162     else
163     {
164       new_field= item->make_string_field(table);
165     }
166     new_field->set_derivation(item->collation.derivation);
167     break;
168   case DECIMAL_RESULT:
169     new_field= Field_new_decimal::create_from_item(item);
170     break;
171   case ROW_RESULT:
172   default:
173     // This case should never be choosen
174     assert(0);
175     new_field= 0;
176     break;
177   }
178   if (new_field)
179     new_field->init(table);
180 
181   /*
182     If the item is a function, a pointer to the item is stored in
183     copy_func. We separate fields from functions by checking if the
184     item is a result field item. The real_item() must be checked to
185     avoid falsely identifying Item_ref and its subclasses as functions
186     when they refer to field-like items, such as Item_copy and
187     subclasses. References to true fields have already been untangled
188     in the beginning of create_tmp_field().
189    */
190   if (copy_func && item->real_item()->is_result_field())
191     copy_func->push_back(item);
192   if (modify_item)
193     item->set_result_field(new_field);
194   if (item->type() == Item::NULL_ITEM)
195     new_field->is_created_from_null_item= TRUE;
196   return new_field;
197 }
198 
199 
200 /**
201   Create field for information schema table.
202 
203   @param thd		Thread handler
204   @param table		Temporary table
205   @param item		Item to create a field for
206 
207   @retval
208     0			on error
209   @retval
210     new_created field
211 */
212 
create_tmp_field_for_schema(THD * thd,Item * item,TABLE * table)213 static Field *create_tmp_field_for_schema(THD *thd, Item *item, TABLE *table)
214 {
215   if (item->field_type() == MYSQL_TYPE_VARCHAR)
216   {
217     Field *field;
218     if (item->max_length > MAX_FIELD_VARCHARLENGTH)
219       field= new Field_blob(item->max_length, item->maybe_null,
220                             item->item_name.ptr(),
221                             item->collation.collation, false);
222     else
223     {
224       field= new Field_varstring(item->max_length, item->maybe_null,
225                                  item->item_name.ptr(),
226                                  table->s, item->collation.collation);
227       table->s->db_create_options|= HA_OPTION_PACK_RECORD;
228     }
229     if (field)
230       field->init(table);
231     return field;
232   }
233   return item->tmp_table_field_from_field_type(table, 0);
234 }
235 
236 
237 /**
238   Create field for temporary table.
239 
240   @param thd		Thread handler
241   @param table		Temporary table
242   @param item		Item to create a field for
243   @param type		Type of item (normally item->type)
244   @param copy_func	If set and item is a function, store copy of item
245                        in this array
246   @param from_field    if field will be created using other field as example,
247                        pointer example field will be written here
248   @param default_field	If field has a default value field, store it here
249   @param group		1 if we are going to do a relative group by on result
250   @param modify_item	1 if item->result_field should point to new item.
251                        This is relevent for how fill_record() is going to
252                        work:
253                        If modify_item is 1 then fill_record() will update
254                        the record in the original table.
255                        If modify_item is 0 then fill_record() will update
256                        the temporary table
257 
258   @retval NULL On error.
259 
260   @retval new_created field
261 */
262 
create_tmp_field(THD * thd,TABLE * table,Item * item,Item::Type type,Func_ptr_array * copy_func,Field ** from_field,Field ** default_field,bool group,bool modify_item,bool table_cant_handle_bit_fields,bool make_copy_field)263 Field *create_tmp_field(THD *thd, TABLE *table,Item *item, Item::Type type,
264                         Func_ptr_array *copy_func, Field **from_field,
265                         Field **default_field,
266                         bool group, bool modify_item,
267                         bool table_cant_handle_bit_fields,
268                         bool make_copy_field)
269 {
270   Field *result= NULL;
271   Item::Type orig_type= type;
272   Item *orig_item= 0;
273 
274   if (type != Item::FIELD_ITEM &&
275       item->real_item()->type() == Item::FIELD_ITEM)
276   {
277     orig_item= item;
278     item= item->real_item();
279     type= Item::FIELD_ITEM;
280   }
281 
282   switch (type) {
283   case Item::SUM_FUNC_ITEM:
284   {
285     Item_sum *item_sum=(Item_sum*) item;
286     result= item_sum->create_tmp_field(group, table);
287     if (!result)
288       my_error(ER_OUT_OF_RESOURCES, MYF(ME_FATALERROR));
289     break;
290   }
291   case Item::FIELD_ITEM:
292   case Item::DEFAULT_VALUE_ITEM:
293   case Item::TRIGGER_FIELD_ITEM:
294   {
295     Item_field *field= (Item_field*) item;
296     bool orig_modify= modify_item;
297     if (orig_type == Item::REF_ITEM)
298       modify_item= 0;
299     /*
300       If item have to be able to store NULLs but underlaid field can't do it,
301       create_tmp_field_from_field() can't be used for tmp field creation.
302     */
303     if (field->maybe_null && !field->field->maybe_null())
304     {
305       result= create_tmp_field_from_item(thd, item, table, NULL,
306                                          modify_item);
307       if (!result)
308         break;
309       *from_field= field->field;
310       if (modify_item)
311         field->result_field= result;
312     }
313     else if (table_cant_handle_bit_fields && field->field->type() ==
314              MYSQL_TYPE_BIT)
315     {
316       *from_field= field->field;
317       result= create_tmp_field_from_item(thd, item, table, copy_func,
318                                          modify_item);
319       if (!result)
320         break;
321       if (modify_item)
322         field->result_field= result;
323     }
324     else
325     {
326       result= create_tmp_field_from_field(thd, (*from_field= field->field),
327                                           orig_item ? orig_item->item_name.ptr() :
328                                           item->item_name.ptr(),
329                                           table,
330                                           modify_item ? field :
331                                           NULL);
332       if (!result)
333         break;
334     }
335     if (orig_type == Item::REF_ITEM && orig_modify)
336       ((Item_ref*)orig_item)->set_result_field(result);
337     /*
338       Fields that are used as arguments to the DEFAULT() function already have
339       their data pointers set to the default value during name resulotion. See
340       Item_default_value::fix_fields.
341     */
342     if (orig_type != Item::DEFAULT_VALUE_ITEM && field->field->eq_def(result))
343       *default_field= field->field;
344     break;
345   }
346   /* Fall through */
347   case Item::FUNC_ITEM:
348     if (((Item_func *) item)->functype() == Item_func::FUNC_SP)
349     {
350       Item_func_sp *item_func_sp= (Item_func_sp *) item;
351       Field *sp_result_field= item_func_sp->get_sp_result_field();
352 
353       if (make_copy_field)
354       {
355         assert(item_func_sp->result_field);
356         *from_field= item_func_sp->result_field;
357       }
358       else
359       {
360         copy_func->push_back(item);
361       }
362 
363       result= create_tmp_field_from_field(thd,
364                                           sp_result_field,
365                                           item_func_sp->item_name.ptr(),
366                                           table,
367                                           NULL);
368       if (!result)
369         break;
370       if (modify_item)
371         item->set_result_field(result);
372       break;
373     }
374 
375     /* Fall through */
376   case Item::COND_ITEM:
377   case Item::FIELD_AVG_ITEM:
378   case Item::FIELD_STD_ITEM:
379   case Item::FIELD_VARIANCE_ITEM:
380   case Item::SUBSELECT_ITEM:
381     /* The following can only happen with 'CREATE TABLE ... SELECT' */
382   case Item::PROC_ITEM:
383   case Item::INT_ITEM:
384   case Item::REAL_ITEM:
385   case Item::DECIMAL_ITEM:
386   case Item::STRING_ITEM:
387   case Item::REF_ITEM:
388   case Item::NULL_ITEM:
389   case Item::VARBIN_ITEM:
390   case Item::PARAM_ITEM:
391     if (make_copy_field)
392     {
393       assert(((Item_result_field*)item)->result_field);
394       *from_field= ((Item_result_field*)item)->result_field;
395     }
396     result= create_tmp_field_from_item(thd, item, table,
397                                        (make_copy_field ? NULL : copy_func),
398                                        modify_item);
399     break;
400   case Item::TYPE_HOLDER:
401     result= ((Item_type_holder *)item)->make_field_by_type(table,
402                                                            thd->is_strict_mode());
403     if (!result)
404       break;
405     result->set_derivation(item->collation.derivation);
406     break;
407   default:					// Dosen't have to be stored
408     assert(false);
409     break;
410   }
411   return result;
412 }
413 
414 /*
415   Set up column usage bitmaps for a temporary table
416 
417   IMPLEMENTATION
418     For temporary tables, we need one bitmap with all columns set and
419     a tmp_set bitmap to be used by things like filesort.
420 */
421 
setup_tmp_table_column_bitmaps(TABLE * table,uchar * bitmaps)422 static void setup_tmp_table_column_bitmaps(TABLE *table, uchar *bitmaps)
423 {
424   uint field_count= table->s->fields;
425   bitmap_init(&table->def_read_set, (my_bitmap_map*) bitmaps, field_count,
426               FALSE);
427   bitmap_init(&table->tmp_set,
428               (my_bitmap_map*) (bitmaps + bitmap_buffer_size(field_count)),
429               field_count, FALSE);
430   bitmap_init(&table->cond_set,
431               (my_bitmap_map*) (bitmaps + bitmap_buffer_size(field_count) * 2),
432               field_count, FALSE);
433   /* write_set and all_set are copies of read_set */
434   table->def_write_set= table->def_read_set;
435   table->s->all_set= table->def_read_set;
436   bitmap_set_all(&table->s->all_set);
437   table->default_column_bitmaps();
438   table->s->column_bitmap_size= bitmap_buffer_size(field_count);
439 }
440 
441 /**
442   Cache for the storage engine properties for the alternative temporary table
443   storage engines. This cache is initialized during startup of the server by
444   asking the storage engines for the values properties.
445 */
446 
447 class Cache_temp_engine_properties
448 {
449 public:
450   static uint HEAP_MAX_KEY_LENGTH;
451   static uint MYISAM_MAX_KEY_LENGTH;
452   static uint INNODB_MAX_KEY_LENGTH;
453   static uint HEAP_MAX_KEY_PART_LENGTH;
454   static uint MYISAM_MAX_KEY_PART_LENGTH;
455   static uint INNODB_MAX_KEY_PART_LENGTH;
456   static uint HEAP_MAX_KEY_PARTS;
457   static uint MYISAM_MAX_KEY_PARTS;
458   static uint INNODB_MAX_KEY_PARTS;
459 
460   static void init(THD *thd);
461 };
462 
init(THD * thd)463 void Cache_temp_engine_properties::init(THD *thd)
464 {
465   handler *handler;
466   plugin_ref db_plugin;
467 
468   // Cache HEAP engine's
469   db_plugin= ha_lock_engine(0, heap_hton);
470   handler= get_new_handler((TABLE_SHARE *)0, thd->mem_root, heap_hton);
471   HEAP_MAX_KEY_LENGTH= handler->max_key_length();
472   HEAP_MAX_KEY_PART_LENGTH= handler->max_key_part_length(0);
473   HEAP_MAX_KEY_PARTS= handler->max_key_parts();
474   delete handler;
475   plugin_unlock(0, db_plugin);
476   // Cache MYISAM engine's
477   db_plugin= ha_lock_engine(0, myisam_hton);
478   handler= get_new_handler((TABLE_SHARE *)0, thd->mem_root, myisam_hton);
479   MYISAM_MAX_KEY_LENGTH= handler->max_key_length();
480   MYISAM_MAX_KEY_PART_LENGTH= handler->max_key_part_length(0);
481   MYISAM_MAX_KEY_PARTS= handler->max_key_parts();
482   delete handler;
483   plugin_unlock(0, db_plugin);
484   // Cache INNODB engine's
485   db_plugin= ha_lock_engine(0, innodb_hton);
486   handler= get_new_handler((TABLE_SHARE *)0, thd->mem_root, innodb_hton);
487   INNODB_MAX_KEY_LENGTH= handler->max_key_length();
488   /*
489     For ha_innobase::max_supported_key_part_length(), the returned value
490     relies on innodb_large_prefix. However, in innodb itself, the limitation
491     on key_part length is up to the ROW_FORMAT. In current trunk, internal
492     temp table's ROW_FORMAT is DYNAMIC. In order to keep the consistence
493     between server and innodb, here we hard-coded 3072 as the maximum of
494     key_part length supported by innodb until bug#20629014 is fixed.
495 
496     TODO: Remove the hard-code here after bug#20629014 is fixed.
497   */
498   INNODB_MAX_KEY_PART_LENGTH= 3072;
499   INNODB_MAX_KEY_PARTS= handler->max_key_parts();
500   delete handler;
501   plugin_unlock(0, db_plugin);
502 }
503 
504 uint Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH= 0;
505 uint Cache_temp_engine_properties::MYISAM_MAX_KEY_LENGTH= 0;
506 uint Cache_temp_engine_properties::INNODB_MAX_KEY_LENGTH= 0;
507 uint Cache_temp_engine_properties::HEAP_MAX_KEY_PART_LENGTH= 0;
508 uint Cache_temp_engine_properties::MYISAM_MAX_KEY_PART_LENGTH= 0;
509 uint Cache_temp_engine_properties::INNODB_MAX_KEY_PART_LENGTH= 0;
510 uint Cache_temp_engine_properties::HEAP_MAX_KEY_PARTS= 0;
511 uint Cache_temp_engine_properties::MYISAM_MAX_KEY_PARTS= 0;
512 uint Cache_temp_engine_properties::INNODB_MAX_KEY_PARTS= 0;
513 
init_cache_tmp_engine_properties()514 void init_cache_tmp_engine_properties()
515 {
516   assert(!current_thd);
517   THD *thd= new THD();
518   thd->thread_stack= pointer_cast<char *>(&thd);
519   thd->store_globals();
520   Cache_temp_engine_properties::init(thd);
521   delete thd;
522 }
523 
524 /**
525   Get the minimum of max_key_length and max_key_part_length.
526   The minimum is between HEAP engine and internal_tmp_disk_storage_engine.
527 
528   @param[out] max_key_length Minimum of max_key_length
529   @param[out] max_key_part_length Minimum of max_key_part_length
530 */
531 
get_max_key_and_part_length(uint * max_key_length,uint * max_key_part_length)532 void get_max_key_and_part_length(uint *max_key_length,
533                                  uint *max_key_part_length)
534 {
535   // Make sure these cached properties are initialized.
536   assert(Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH);
537 
538   switch (internal_tmp_disk_storage_engine)
539   {
540   case TMP_TABLE_MYISAM:
541     *max_key_length=
542       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH,
543                Cache_temp_engine_properties::MYISAM_MAX_KEY_LENGTH);
544     *max_key_part_length=
545       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_PART_LENGTH,
546                Cache_temp_engine_properties::MYISAM_MAX_KEY_PART_LENGTH);
547     /*
548       create_tmp_table() tests tmp_se->max_key_parts() too, not only HEAP's.
549       It is correct as long as HEAP'S not bigger than on-disk temp table
550       engine's, which we check here.
551     */
552     assert(Cache_temp_engine_properties::HEAP_MAX_KEY_PARTS <=
553            Cache_temp_engine_properties::MYISAM_MAX_KEY_PARTS);
554     break;
555   case TMP_TABLE_INNODB:
556   default:
557     *max_key_length=
558       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_LENGTH,
559                Cache_temp_engine_properties::INNODB_MAX_KEY_LENGTH);
560     *max_key_part_length=
561       std::min(Cache_temp_engine_properties::HEAP_MAX_KEY_PART_LENGTH,
562                Cache_temp_engine_properties::INNODB_MAX_KEY_PART_LENGTH);
563     assert(Cache_temp_engine_properties::HEAP_MAX_KEY_PARTS <=
564            Cache_temp_engine_properties::INNODB_MAX_KEY_PARTS);
565     break;
566   }
567 }
568 
569 /**
570   Create a temporary name for one field if the field_name is empty.
571 
572   @param thd          Thread handle
573   @param field_index  Index of this field in table->field
574 */
575 
create_tmp_table_field_tmp_name(THD * thd,int field_index)576 static const char *create_tmp_table_field_tmp_name(THD *thd, int field_index)
577 {
578   char buf[64];
579   my_snprintf(buf, 64, "tmp_field_%d", field_index);
580   return thd->mem_strdup(buf);
581 }
582 
583 /**
584   Helper function for create_tmp_table().
585 
586   Insert a field at the head of the hidden field area.
587 
588   @param table            Temporary table
589   @param default_field    Default value array pointer
590   @param from_field       Original field array pointer
591   @param blob_field       Array pointer to record fields index of blob type
592   @param field            The registed hidden field
593  */
594 
register_hidden_field(TABLE * table,Field ** default_field,Field ** from_field,uint * blob_field,Field * field)595 static void register_hidden_field(TABLE *table,
596                               Field **default_field,
597                               Field **from_field,
598                               uint *blob_field,
599                               Field *field)
600 {
601   uint i;
602   Field **tmp_field= table->field;
603 
604   /* Increase all of registed fields index */
605   for (i= 0; i < table->s->fields; i++)
606     tmp_field[i]->field_index++;
607 
608   // Increase the field_index of visible blob field
609   for (i= 0; i < table->s->blob_fields; i++)
610     blob_field[i]++;
611   // Insert field
612   table->field[-1]= field;
613   default_field[-1]= NULL;
614   from_field[-1]= NULL;
615   field->table= field->orig_table= table;
616   field->field_index= 0;
617 }
618 
619 /**
620   Create a temp table according to a field list.
621 
622   Given field pointers are changed to point at tmp_table for
623   send_result_set_metadata. The table object is self contained: it's
624   allocated in its own memory root, as well as Field objects
625   created for table columns.
626   This function will replace Item_sum items in 'fields' list with
627   corresponding Item_field items, pointing at the fields in the
628   temporary table, unless this was prohibited by TRUE
629   value of argument save_sum_fields. The Item_field objects
630   are created in THD memory root.
631 
632   @param thd                  thread handle
633   @param param                a description used as input to create the table
634   @param fields               list of items that will be used to define
635                               column types of the table (also see NOTES)
636   @param group                Group key to use for temporary table, NULL if none
637   @param distinct             should table rows be distinct
638   @param save_sum_fields      see NOTES
639   @param select_options
640   @param rows_limit
641   @param table_alias          possible name of the temporary table that can
642                               be used for name resolving; can be "".
643 
644   @remark mysql_create_view() checks that views have less than
645           MAX_FIELDS columns. This prevents any MyISAM temp table
646           made when materializing the view from hitting the 64k
647           MyISAM header size limit.
648 
649   @remark We may actually end up with a table without any columns at all.
650           See comment below: We don't have to store this.
651 */
652 
653 #define STRING_TOTAL_LENGTH_TO_PACK_ROWS 128
654 #define AVG_STRING_LENGTH_TO_PACK_ROWS   64
655 #define RATIO_TO_PACK_ROWS	       2
656 #define MIN_STRING_LENGTH_TO_PACK_ROWS   10
657 
658 TABLE *
create_tmp_table(THD * thd,Temp_table_param * param,List<Item> & fields,ORDER * group,bool distinct,bool save_sum_fields,ulonglong select_options,ha_rows rows_limit,const char * table_alias)659 create_tmp_table(THD *thd, Temp_table_param *param, List<Item> &fields,
660 		 ORDER *group, bool distinct, bool save_sum_fields,
661 		 ulonglong select_options, ha_rows rows_limit,
662 		 const char *table_alias)
663 {
664   MEM_ROOT *mem_root_save, own_root;
665   TABLE *table;
666   TABLE_SHARE *share;
667   uint	i,field_count,null_count,null_pack_length;
668   uint  copy_func_count= param->func_count;
669   uint  hidden_null_count, hidden_null_pack_length;
670   long hidden_field_count;
671   uint  blob_count,group_null_items, string_count;
672   uint  temp_pool_slot=MY_BIT_NONE;
673   uint fieldnr= 0;
674   ulong reclength, string_total_length, distinct_key_length= 0;
675   /**
676     When true, enforces unique constraint (by adding a hidden hash_field and
677     creating a key over this field) when:
678     (1) unique key is too long or
679     (2) number of key parts in distinct key is too big.
680   */
681   bool  using_unique_constraint= false;
682   bool  use_packed_rows= false;
683   bool  not_all_columns= !(select_options & TMP_TABLE_ALL_COLUMNS);
684   char  *tmpname,path[FN_REFLEN];
685   uchar	*pos, *group_buff, *bitmaps;
686   uchar *null_flags;
687   Field **reg_field, **from_field, **default_field;
688   uint *blob_field;
689   Copy_field *copy=0;
690   KEY *keyinfo;
691   KEY_PART_INFO *key_part_info;
692   MI_COLUMNDEF *recinfo;
693   /*
694     total_uneven_bit_length is uneven bit length for visible fields
695     hidden_uneven_bit_length is uneven bit length for hidden fields
696   */
697   uint total_uneven_bit_length= 0, hidden_uneven_bit_length= 0;
698   bool force_copy_fields= param->force_copy_fields;
699 
700   uint max_key_length;
701   uint max_key_part_length;
702   /* Treat sum functions as normal ones when loose index scan is used. */
703   save_sum_fields|= param->precomputed_group_by;
704   DBUG_ENTER("create_tmp_table");
705   DBUG_PRINT("enter",
706              ("distinct: %d  save_sum_fields: %d  rows_limit: %lu  group: %d",
707               (int) distinct, (int) save_sum_fields,
708               (ulong) rows_limit, MY_TEST(group)));
709 
710   thd->inc_status_created_tmp_tables();
711 
712   if (use_temp_pool && !(test_flags & TEST_KEEP_TMP_TABLES))
713     temp_pool_slot = bitmap_lock_set_next(&temp_pool);
714 
715   if (temp_pool_slot != MY_BIT_NONE) // we got a slot
716     sprintf(path, "%s_%lx_%i", tmp_file_prefix,
717             current_pid, temp_pool_slot);
718   else
719   {
720     /* if we run out of slots or we are not using tempool */
721     assert(sizeof(my_thread_id) == 4);
722     sprintf(path,"%s%lx_%x_%x", tmp_file_prefix, current_pid,
723             thd->thread_id(), thd->tmp_table++);
724   }
725 
726   /*
727     No need to change table name to lower case as we are only creating
728     MyISAM or HEAP tables here
729   */
730   fn_format(path, path, mysql_tmpdir, "", MY_REPLACE_EXT|MY_UNPACK_FILENAME);
731 
732 
733   if (group)
734   {
735     if (!param->quick_group)
736       group=0;					// Can't use group key
737     else for (ORDER *tmp=group ; tmp ; tmp=tmp->next)
738     {
739       /*
740         marker == 4 means two things:
741         - store NULLs in the key, and
742         - convert BIT fields to 64-bit long, needed because MEMORY tables
743           can't index BIT fields.
744       */
745       (*tmp->item)->marker= 4;
746       const uint char_len=
747         (*tmp->item)->max_length / (*tmp->item)->collation.collation->mbmaxlen;
748       if (char_len > CONVERT_IF_BIGGER_TO_BLOB)
749         using_unique_constraint= true;
750     }
751     if (group)
752     {
753       if (param->group_length >= MAX_BLOB_WIDTH)
754         using_unique_constraint= true;
755       distinct=0;				// Can't use distinct
756     }
757   }
758 
759   field_count=param->field_count+param->func_count+param->sum_func_count;
760   hidden_field_count=param->hidden_field_count;
761 
762   /*
763     When loose index scan is employed as access method, it already
764     computes all groups and the result of all aggregate functions. We
765     make space for the items of the aggregate function in the list of
766     functions Temp_table_param::items_to_copy, so that the values of
767     these items are stored in the temporary table.
768   */
769   if (param->precomputed_group_by)
770     copy_func_count+= param->sum_func_count;
771 
772   init_sql_alloc(key_memory_TABLE, &own_root, TABLE_ALLOC_BLOCK_SIZE, 0);
773 
774   void *rawmem= alloc_root(&own_root, sizeof(Func_ptr_array));
775   if (!rawmem)
776     DBUG_RETURN(NULL);                        /* purecov: inspected */
777   Func_ptr_array *copy_func= new (rawmem) Func_ptr_array(&own_root);
778   copy_func->reserve(copy_func_count);
779 
780   if (!multi_alloc_root(&own_root,
781                         &table, sizeof(*table),
782                         &share, sizeof(*share),
783                         &reg_field, sizeof(Field*) * (field_count + 2),
784                         &default_field, sizeof(Field*) * (field_count + 1),
785                         &blob_field, sizeof(uint)*(field_count+2),
786                         &from_field, sizeof(Field*)*(field_count + 1),
787                         &param->keyinfo, sizeof(*param->keyinfo),
788                         &key_part_info,
789                         sizeof(*key_part_info)*(param->group_parts+1),
790                         &param->start_recinfo,
791                         sizeof(*param->recinfo)*(field_count*2+4),
792                         &tmpname, strlen(path)+1,
793                         &group_buff, (group && !using_unique_constraint ?
794                                       param->group_length : 0),
795                         &bitmaps, bitmap_buffer_size(field_count + 1) * 3,
796                         NullS))
797   {
798     if (temp_pool_slot != MY_BIT_NONE)
799       bitmap_lock_clear_bit(&temp_pool, temp_pool_slot);
800     DBUG_RETURN(NULL);				/* purecov: inspected */
801   }
802   /* Copy_field belongs to Temp_table_param, allocate it in THD mem_root */
803   if (!(param->copy_field= copy= new (thd->mem_root) Copy_field[field_count]))
804   {
805     if (temp_pool_slot != MY_BIT_NONE)
806       bitmap_lock_clear_bit(&temp_pool, temp_pool_slot);
807     free_root(&own_root, MYF(0));               /* purecov: inspected */
808     DBUG_RETURN(NULL);				/* purecov: inspected */
809   }
810   param->items_to_copy= copy_func;
811   my_stpcpy(tmpname,path);
812   /* make table according to fields */
813 
814   new (table) TABLE;
815   memset(reg_field, 0, sizeof(Field*)*(field_count + 2));
816   memset(default_field, 0, sizeof(Field*) * (field_count + 1));
817   memset(from_field, 0, sizeof(Field*)*(field_count + 1));
818 
819   // This invokes (the synthesized) st_mem_root &operator=(const st_mem_root&)
820   table->mem_root= own_root;
821   mem_root_save= thd->mem_root;
822   thd->mem_root= &table->mem_root;
823   copy_func->set_mem_root(&table->mem_root);
824 
825   // Leave the first place to be prepared for hash_field
826   reg_field++;
827   default_field++;
828   from_field++;
829   table->field=reg_field;
830   table->alias= table_alias;
831   table->reginfo.lock_type=TL_WRITE;	/* Will be updated */
832   table->db_stat=HA_OPEN_KEYFILE+HA_OPEN_RNDFILE;
833   table->temp_pool_slot = temp_pool_slot;
834   table->copy_blobs= 1;
835   table->in_use= thd;
836   table->quick_keys.init();
837   table->possible_quick_keys.init();
838   table->covering_keys.init();
839   table->merge_keys.init();
840   table->keys_in_use_for_query.init();
841   table->keys_in_use_for_group_by.init();
842   table->keys_in_use_for_order_by.init();
843 
844   table->s= share;
845   init_tmp_table_share(thd, share, "", 0, tmpname, tmpname);
846   share->blob_field= blob_field;
847   share->db_low_byte_first=1;                // True for HEAP and MyISAM
848   share->table_charset= param->table_charset;
849   share->primary_key= MAX_KEY;               // Indicate no primary key
850   share->keys_for_keyread.init();
851   share->keys_in_use.init();
852   share->keys= 0;
853   if (param->schema_table)
854     share->db= INFORMATION_SCHEMA_NAME;
855 
856   /* Calculate which type of fields we will store in the temporary table */
857 
858   reclength= string_total_length= 0;
859   blob_count= string_count= null_count= hidden_null_count= group_null_items= 0;
860   param->using_outer_summary_function= 0;
861 
862   List_iterator_fast<Item> li(fields);
863   Item *item;
864   Field **tmp_from_field=from_field;
865   while ((item=li++))
866   {
867     Field *new_field= NULL;
868     Item::Type type= item->type();
869     if (type == Item::COPY_STR_ITEM)
870     {
871       item= ((Item_copy *)item)->get_item();
872       type= item->type();
873     }
874     if (not_all_columns)
875     {
876       if (item->with_sum_func && type != Item::SUM_FUNC_ITEM)
877       {
878         if (item->used_tables() & OUTER_REF_TABLE_BIT)
879           item->update_used_tables();
880         if (type == Item::SUBSELECT_ITEM ||
881             (item->used_tables() & ~OUTER_REF_TABLE_BIT))
882         {
883 	  /*
884 	    Mark that we have ignored an item that refers to a summary
885 	    function. We need to know this if someone is going to use
886 	    DISTINCT on the result.
887 	  */
888 	  param->using_outer_summary_function= 1;
889           goto update_hidden;
890         }
891       }
892 
893       if (item->const_item() && (int)hidden_field_count <= 0)
894         continue; // We don't have to store this
895     }
896     if (type == Item::SUM_FUNC_ITEM && !group && !save_sum_fields)
897     {						/* Can't calc group yet */
898       Item_sum *sum_item= (Item_sum *) item;
899       for (i=0 ; i < sum_item->get_arg_count() ; i++)
900       {
901         Item *arg= sum_item->get_arg(i);
902         if (!arg->const_item())
903         {
904           new_field=
905             create_tmp_field(thd, table, arg, arg->type(), copy_func,
906                              tmp_from_field, &default_field[fieldnr],
907                              group != 0, not_all_columns,
908                              distinct, false);
909           if (!new_field)
910             goto err;					// Should be OOM
911           tmp_from_field++;
912           reclength+= new_field->pack_length();
913           if (new_field->flags & BLOB_FLAG)
914           {
915             *blob_field++= fieldnr;
916             blob_count++;
917           }
918           if (new_field->type() == MYSQL_TYPE_BIT)
919             total_uneven_bit_length+= new_field->field_length & 7;
920           *(reg_field++)= new_field;
921           if (new_field->real_type() == MYSQL_TYPE_STRING ||
922               new_field->real_type() == MYSQL_TYPE_VARCHAR)
923           {
924             string_count++;
925             string_total_length+= new_field->pack_length();
926           }
927           thd->mem_root= mem_root_save;
928           arg= sum_item->set_arg(i, thd, new Item_field(new_field));
929           thd->mem_root= &table->mem_root;
930           if (!(new_field->flags & NOT_NULL_FLAG))
931           {
932             null_count++;
933             /*
934               new_field->maybe_null() is still false, it will be
935               changed below. But we have to setup Item_field correctly
936             */
937             arg->maybe_null=1;
938           }
939           new_field->field_index= fieldnr++;
940           /* InnoDB temp table doesn't allow field with empty_name */
941           if (!new_field->field_name)
942             new_field->field_name= create_tmp_table_field_tmp_name(thd,
943                                                         new_field->field_index);
944         }
945       }
946     }
947     else
948     {
949       /*
950         The last parameter to create_tmp_field() is a bit tricky:
951 
952         We need to set it to 0 in union, to get fill_record() to modify the
953         temporary table.
954         We need to set it to 1 on multi-table-update and in select to
955         write rows to the temporary table.
956         We here distinguish between UNION and multi-table-updates by the fact
957         that in the later case group is set to the row pointer.
958       */
959       new_field= (param->schema_table) ?
960         create_tmp_field_for_schema(thd, item, table) :
961         create_tmp_field(thd, table, item, type, copy_func,
962                          tmp_from_field, &default_field[fieldnr],
963                          group != 0,
964                          !force_copy_fields &&
965                            (not_all_columns || group !=0),
966                          /*
967                            If item->marker == 4 then we force create_tmp_field
968                            to create a 64-bit longs for BIT fields because HEAP
969                            tables can't index BIT fields directly. We do the same
970                            for distinct, as we want the distinct index to be
971                            usable in this case too.
972                          */
973                          item->marker == 4 || param->bit_fields_as_long,
974                          force_copy_fields);
975 
976       if (!new_field)
977       {
978         assert(thd->is_fatal_error);
979         goto err;				// Got OOM
980       }
981       if (type == Item::SUM_FUNC_ITEM)
982 	((Item_sum *) item)->result_field= new_field;
983       tmp_from_field++;
984       reclength+=new_field->pack_length();
985       if (!(new_field->flags & NOT_NULL_FLAG))
986 	null_count++;
987       if (new_field->type() == MYSQL_TYPE_BIT)
988         total_uneven_bit_length+= new_field->field_length & 7;
989       if (new_field->flags & BLOB_FLAG)
990       {
991         *blob_field++= fieldnr;
992 	blob_count++;
993       }
994 
995       if (new_field->real_type() == MYSQL_TYPE_STRING ||
996           new_field->real_type() == MYSQL_TYPE_VARCHAR)
997       {
998         string_count++;
999         string_total_length+= new_field->pack_length();
1000       }
1001       // In order to reduce footprint ask SE to pack variable-length fields.
1002       if (new_field->type() == MYSQL_TYPE_VAR_STRING ||
1003           new_field->type() == MYSQL_TYPE_VARCHAR)
1004         table->s->db_create_options|= HA_OPTION_PACK_RECORD;
1005 
1006       if (item->marker == 4 && item->maybe_null)
1007       {
1008 	group_null_items++;
1009 	new_field->flags|= GROUP_FLAG;
1010       }
1011       new_field->field_index= fieldnr++;
1012       *(reg_field++)= new_field;
1013       /* InnoDB temp table doesn't allow field with empty_name */
1014       if (!new_field->field_name)
1015         new_field->field_name= create_tmp_table_field_tmp_name(thd, new_field->field_index);
1016     }
1017 
1018 update_hidden:
1019     /*
1020       Calculate length of distinct key. The goal is to decide what to use -
1021       key or unique constraint. As blobs force unique constraint on their
1022       own due to their length, they aren't taken into account.
1023     */
1024     if (distinct && !using_unique_constraint && hidden_field_count <= 0 &&
1025         new_field)
1026     {
1027       if (new_field->flags & BLOB_FLAG)
1028         using_unique_constraint= true;
1029       else
1030         distinct_key_length+= new_field->pack_length();
1031     }
1032     if (!--hidden_field_count)
1033     {
1034       /*
1035         This was the last hidden field; Remember how many hidden fields could
1036         have null
1037       */
1038       hidden_null_count=null_count;
1039       /*
1040 	We need to update hidden_field_count as we may have stored group
1041 	functions with constant arguments
1042       */
1043       param->hidden_field_count= fieldnr;
1044       null_count= 0;
1045       /*
1046         On last hidden field we store uneven bit length in
1047         hidden_uneven_bit_length and proceed calculation of
1048         uneven bits for visible fields into
1049         total_uneven_bit_length variable.
1050       */
1051       hidden_uneven_bit_length= total_uneven_bit_length;
1052       total_uneven_bit_length= 0;
1053     }
1054   }
1055   assert(fieldnr == (uint) (reg_field - table->field));
1056   assert(field_count >= (uint) (reg_field - table->field));
1057   field_count= fieldnr;
1058   *reg_field= 0;
1059   *blob_field= 0;				// End marker
1060   share->fields= field_count;
1061   share->blob_fields= blob_count;
1062 
1063   /* If result table is small; use a heap */
1064   if (select_options & TMP_TABLE_FORCE_MYISAM)
1065   {
1066     share->db_plugin= ha_lock_engine(0, myisam_hton);
1067     table->file= get_new_handler(share, &table->mem_root,
1068                                  share->db_type());
1069   }
1070   else if (blob_count ||
1071            (thd->variables.big_tables &&
1072             !(select_options & SELECT_SMALL_RESULT)))
1073   {
1074     /*
1075      * Except for special conditions, tmp table engine will be choosen by user.
1076      */
1077     switch (internal_tmp_disk_storage_engine)
1078     {
1079     case TMP_TABLE_MYISAM:
1080       share->db_plugin= ha_lock_engine(0, myisam_hton);
1081       break;
1082     case TMP_TABLE_INNODB:
1083       share->db_plugin= ha_lock_engine(0, innodb_hton);
1084       break;
1085     default:
1086       assert(0);
1087       share->db_plugin= ha_lock_engine(0, innodb_hton);
1088     }
1089 
1090     table->file= get_new_handler(share, &table->mem_root,
1091                                  share->db_type());
1092   }
1093   else
1094   {
1095     share->db_plugin= ha_lock_engine(0, heap_hton);
1096     table->file= get_new_handler(share, &table->mem_root,
1097                                  share->db_type());
1098   }
1099 
1100   /*
1101     Different temp table engine supports different max_key_length
1102     and max_key_part_lengthi. If HEAP engine is selected, it can be
1103     possible to convert into on-disk engine later. We must choose
1104     the minimal of max_key_length and max_key_part_length between
1105     HEAP engine and possible on-disk engine to verify whether unique
1106     constraint is needed so that the convertion goes well.
1107    */
1108   get_max_key_and_part_length(&max_key_length,
1109                               &max_key_part_length);
1110 
1111   if (!table->file)
1112     goto err;
1113   if (group &&
1114       (param->group_parts > table->file->max_key_parts() ||
1115        param->group_length > max_key_length))
1116     using_unique_constraint= true;
1117   keyinfo= param->keyinfo;
1118   keyinfo->table= table;
1119 
1120   if (group)
1121   {
1122     DBUG_PRINT("info",("Creating group key in temporary table"));
1123     table->group= group;				/* Table is grouped by key */
1124     param->group_buff= group_buff;
1125     share->keys= 1;
1126     // Use key definition created below only if the key isn't too long.
1127     // Otherwise a dedicated key over a hash value will be created and this
1128     // definition will be used by server to calc hash.
1129     if (!using_unique_constraint)
1130     {
1131       table->key_info= share->key_info= keyinfo;
1132       keyinfo->key_part= key_part_info;
1133       keyinfo->flags= HA_NOSAME;
1134       keyinfo->usable_key_parts= keyinfo->user_defined_key_parts=
1135         param->group_parts;
1136       keyinfo->actual_key_parts= keyinfo->user_defined_key_parts;
1137       keyinfo->rec_per_key= 0;
1138       keyinfo->algorithm= HA_KEY_ALG_UNDEF;
1139       keyinfo->set_rec_per_key_array(NULL, NULL);
1140       keyinfo->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1141       keyinfo->name= (char*) "<group_key>";
1142       ORDER *cur_group= group;
1143       for (; cur_group ; cur_group= cur_group->next, key_part_info++)
1144       {
1145         Field *field= (*cur_group->item)->get_tmp_table_field();
1146         assert(field->table == table);
1147         key_part_info->init_from_field(field);
1148 
1149         /* In GROUP BY 'a' and 'a ' are equal for VARCHAR fields */
1150         key_part_info->key_part_flag|= HA_END_SPACE_ARE_EQUAL;
1151 
1152         if (key_part_info->store_length > max_key_part_length)
1153         {
1154           using_unique_constraint= true;
1155           break;
1156         }
1157       }
1158       keyinfo->actual_flags= keyinfo->flags;
1159     }
1160   }
1161 
1162   if (distinct && field_count != param->hidden_field_count)
1163   {
1164     /*
1165       Create an unique key or an unique constraint over all columns
1166       that should be in the result.  In the temporary table, there are
1167       'param->hidden_field_count' extra columns, whose null bits are stored
1168       in the first 'hidden_null_pack_length' bytes of the row.
1169     */
1170     DBUG_PRINT("info",("hidden_field_count: %d", param->hidden_field_count));
1171     share->keys= 1;
1172     table->distinct= 1;
1173     if (!using_unique_constraint)
1174     {
1175       Field **reg_field;
1176       keyinfo->user_defined_key_parts= field_count-param->hidden_field_count;
1177       keyinfo->actual_key_parts= keyinfo->user_defined_key_parts;
1178       if (!(key_part_info= (KEY_PART_INFO*)
1179             alloc_root(&table->mem_root,
1180                        keyinfo->user_defined_key_parts * sizeof(KEY_PART_INFO))))
1181         goto err;
1182       memset(key_part_info, 0, keyinfo->user_defined_key_parts *
1183              sizeof(KEY_PART_INFO));
1184       table->key_info= share->key_info= keyinfo;
1185       keyinfo->key_part= key_part_info;
1186       keyinfo->actual_flags= keyinfo->flags= HA_NOSAME | HA_NULL_ARE_EQUAL;
1187       // TODO rename to <distinct_key>
1188       keyinfo->name= (char*) "<auto_key>";
1189       keyinfo->algorithm= HA_KEY_ALG_UNDEF;
1190       keyinfo->set_rec_per_key_array(NULL, NULL);
1191       keyinfo->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1192       /* Create a distinct key over the columns we are going to return */
1193       for (i=param->hidden_field_count, reg_field=table->field + i ;
1194            i < field_count;
1195            i++, reg_field++, key_part_info++)
1196       {
1197         key_part_info->init_from_field(*reg_field);
1198         if (key_part_info->store_length > max_key_part_length)
1199         {
1200           using_unique_constraint= true;
1201           break;
1202         }
1203       }
1204     }
1205   }
1206 
1207   /*
1208     To enforce unique constraint we need to add a field to hold key's hash
1209     1) already detected unique constraint
1210     2) distinct key is too long
1211     3) number of keyparts in distinct key is too big
1212   */
1213   if (using_unique_constraint ||                              // 1
1214       distinct_key_length > max_key_length ||                 // 2
1215       (distinct &&                                            // 3
1216        (fieldnr - param->hidden_field_count) > table->file->max_key_parts()))
1217   {
1218     using_unique_constraint= true;
1219     Field_longlong *field= new(&table->mem_root)
1220           Field_longlong(sizeof(ulonglong), false, "<hash_field>", true);
1221     if (!field)
1222     {
1223       assert(thd->is_fatal_error);
1224       goto err;   // Got OOM
1225     }
1226 
1227     // Mark hash_field as NOT NULL
1228     field->flags &= NOT_NULL_FLAG;
1229     // Register hash_field as a hidden field.
1230     register_hidden_field(table, default_field,
1231                           from_field, share->blob_field, field);
1232     // Repoint arrays
1233     table->field--;
1234     default_field--;
1235     from_field--;
1236     reclength+= field->pack_length();
1237     field_count= ++fieldnr;
1238     param->hidden_field_count++;
1239     share->fields= field_count;
1240     table->hash_field= field;
1241   }
1242 
1243   // Update the handler with information about the table object
1244   table->file->change_table_ptr(table, share);
1245   table->hidden_field_count= param->hidden_field_count;
1246 
1247   if (table->file->set_ha_share_ref(&share->ha_share))
1248   {
1249     delete table->file;
1250     goto err;
1251   }
1252 
1253   // Initialize cost model for this table
1254   table->init_cost_model(thd->cost_model());
1255 
1256   if (!using_unique_constraint)
1257     reclength+= group_null_items;	// null flag is stored separately
1258 
1259   if (blob_count == 0)
1260   {
1261     /* We need to ensure that first byte is not 0 for the delete link */
1262     if (param->hidden_field_count)
1263       hidden_null_count++;
1264     else
1265       null_count++;
1266   }
1267   hidden_null_pack_length= (hidden_null_count + 7 +
1268                             hidden_uneven_bit_length) / 8;
1269   null_pack_length= (hidden_null_pack_length +
1270                      (null_count + total_uneven_bit_length + 7) / 8);
1271   reclength+=null_pack_length;
1272   if (!reclength)
1273     reclength=1;				// Dummy select
1274   /* Use packed rows if there is blobs or a lot of space to gain */
1275   if (blob_count ||
1276       (string_total_length >= STRING_TOTAL_LENGTH_TO_PACK_ROWS &&
1277       (reclength / string_total_length <= RATIO_TO_PACK_ROWS ||
1278        string_total_length / string_count >= AVG_STRING_LENGTH_TO_PACK_ROWS)))
1279     use_packed_rows= true;
1280 
1281   if (!use_packed_rows)
1282     share->db_create_options&= ~HA_OPTION_PACK_RECORD;
1283 
1284   share->reclength= reclength;
1285   {
1286     uint alloc_length=ALIGN_SIZE(reclength+MI_UNIQUE_HASH_LENGTH+1);
1287     share->rec_buff_length= alloc_length;
1288     if (!(table->record[0]= (uchar*)
1289           alloc_root(&table->mem_root, alloc_length*3)))
1290       goto err;
1291     table->record[1]= table->record[0]+alloc_length;
1292     share->default_values= table->record[1]+alloc_length;
1293   }
1294   param->func_count= copy_func->size();
1295   assert(param->func_count <= copy_func_count); // Used <= allocated
1296 
1297   setup_tmp_table_column_bitmaps(table, bitmaps);
1298 
1299   recinfo=param->start_recinfo;
1300   null_flags= table->record[0];
1301   pos= table->record[0] + null_pack_length;
1302   if (null_pack_length)
1303   {
1304     memset(recinfo, 0, sizeof(*recinfo));
1305     recinfo->type=FIELD_NORMAL;
1306     recinfo->length=null_pack_length;
1307     recinfo++;
1308     memset(null_flags, 255, null_pack_length);	// Set null fields
1309 
1310     table->null_flags= table->record[0];
1311     share->null_fields= null_count+ hidden_null_count;
1312     share->null_bytes= null_pack_length;
1313   }
1314   null_count= (blob_count == 0) ? 1 : 0;
1315   hidden_field_count=param->hidden_field_count;
1316   assert((uint)hidden_field_count <= field_count);
1317   for (i=0,reg_field=table->field; i < field_count; i++,reg_field++,recinfo++)
1318   {
1319     Field *field= *reg_field;
1320     uint length;
1321     memset(recinfo, 0, sizeof(*recinfo));
1322 
1323     if (!(field->flags & NOT_NULL_FLAG))
1324     {
1325       if (field->flags & GROUP_FLAG && !using_unique_constraint)
1326       {
1327 	/*
1328 	  We have to reserve one byte here for NULL bits,
1329 	  as this is updated by 'end_update()'
1330 	*/
1331 	*pos++=0;				// Null is stored here
1332 	recinfo->length=1;
1333 	recinfo->type=FIELD_NORMAL;
1334 	recinfo++;
1335 	memset(recinfo, 0, sizeof(*recinfo));
1336       }
1337       else
1338       {
1339 	recinfo->null_bit= (uint8)1 << (null_count & 7);
1340 	recinfo->null_pos= null_count/8;
1341       }
1342       field->move_field(pos,null_flags+null_count/8,
1343 			(uint8)1 << (null_count & 7));
1344       null_count++;
1345     }
1346     else
1347       field->move_field(pos,(uchar*) 0,0);
1348     if (field->type() == MYSQL_TYPE_BIT)
1349     {
1350       /* We have to reserve place for extra bits among null bits */
1351       ((Field_bit*) field)->set_bit_ptr(null_flags + null_count / 8,
1352                                         null_count & 7);
1353       null_count+= (field->field_length & 7);
1354     }
1355     field->reset();
1356 
1357     /*
1358       Test if there is a default field value. The test for ->ptr is to skip
1359       'offset' fields generated by initalize_tables
1360     */
1361     if (default_field[i] && default_field[i]->ptr)
1362     {
1363       /*
1364          default_field[i] is set only in the cases  when 'field' can
1365          inherit the default value that is defined for the field referred
1366          by the Item_field object from which 'field' has been created.
1367       */
1368       Field *orig_field= default_field[i];
1369       /* Get the value from default_values */
1370       my_ptrdiff_t diff= orig_field->table->default_values_offset();
1371       orig_field->move_field_offset(diff);      // Points now at default_values
1372       if (orig_field->is_real_null())
1373         field->set_null();
1374       else
1375       {
1376         field->set_notnull();
1377         memcpy(field->ptr, orig_field->ptr, field->pack_length());
1378       }
1379       orig_field->move_field_offset(-diff);     // Back to record[0]
1380     }
1381 
1382     if (from_field[i])
1383     {						/* Not a table Item */
1384       copy->set(field,from_field[i],save_sum_fields);
1385       copy++;
1386     }
1387     length=field->pack_length();
1388     pos+= length;
1389 
1390     /* Make entry for create table */
1391     recinfo->length=length;
1392     if (field->flags & BLOB_FLAG)
1393       recinfo->type= (int) FIELD_BLOB;
1394     else if (use_packed_rows &&
1395              field->real_type() == MYSQL_TYPE_STRING &&
1396 	     length >= MIN_STRING_LENGTH_TO_PACK_ROWS)
1397       recinfo->type=FIELD_SKIP_ENDSPACE;
1398     else if (use_packed_rows &&
1399              field->real_type() == MYSQL_TYPE_VARCHAR &&
1400              length >= MIN_STRING_LENGTH_TO_PACK_ROWS)
1401       recinfo->type= FIELD_VARCHAR;
1402     else
1403       recinfo->type=FIELD_NORMAL;
1404     if (!--hidden_field_count)
1405       null_count=(null_count+7) & ~7;		// move to next byte
1406 
1407     // fix table name in field entry
1408     field->table_name= &table->alias;
1409   }
1410 
1411   param->copy_field_end=copy;
1412   param->recinfo=recinfo;
1413   store_record(table,s->default_values);        // Make empty default record
1414 
1415   if (thd->variables.tmp_table_size == ~ (ulonglong) 0)		// No limit
1416     share->max_rows= ~(ha_rows) 0;
1417   else
1418     share->max_rows= (ha_rows) (((share->db_type() == heap_hton) ?
1419                                  min(thd->variables.tmp_table_size,
1420                                      thd->variables.max_heap_table_size) :
1421                                  thd->variables.tmp_table_size) /
1422 			         share->reclength);
1423   set_if_bigger(share->max_rows,1);		// For dummy start options
1424   /*
1425     Push the LIMIT clause to the temporary table creation, so that we
1426     materialize only up to 'rows_limit' records instead of all result records.
1427   */
1428   set_if_smaller(share->max_rows, rows_limit);
1429   param->end_write_records= rows_limit;
1430 
1431   if (group && !using_unique_constraint)
1432   {
1433     ORDER *cur_group= group;
1434     key_part_info= keyinfo->key_part;
1435     if (param->can_use_pk_for_unique)
1436       share->primary_key= 0;
1437     keyinfo->key_length= 0;  // Will compute the sum of the parts below.
1438     /*
1439       Here, we have to make the group fields point to the right record
1440       position.
1441     */
1442     for (; cur_group ; cur_group= cur_group->next, key_part_info++)
1443     {
1444       Field *field= (*cur_group->item)->get_tmp_table_field();
1445       assert(field->table == table);
1446       bool maybe_null= (*cur_group->item)->maybe_null;
1447       key_part_info->init_from_field(key_part_info->field);
1448       keyinfo->key_length+= key_part_info->store_length;
1449 
1450       cur_group->buff= (char*) group_buff;
1451       cur_group->field= field->new_key_field(thd->mem_root, table,
1452                                              group_buff + MY_TEST(maybe_null));
1453 
1454       if (!cur_group->field)
1455         goto err; /* purecov: inspected */
1456 
1457       if (maybe_null)
1458       {
1459         /*
1460           To be able to group on NULL, we reserved place in group_buff
1461           for the NULL flag just before the column. (see above).
1462           The field data is after this flag.
1463           The NULL flag is updated in 'end_update()' and 'end_write()'
1464         */
1465         keyinfo->flags|= HA_NULL_ARE_EQUAL;	// def. that NULL == NULL
1466         cur_group->buff++;                        // Pointer to field data
1467         group_buff++;                         // Skipp null flag
1468       }
1469       group_buff+= cur_group->field->pack_length();
1470     }
1471   }
1472 
1473   if (distinct && field_count != param->hidden_field_count &&
1474       !using_unique_constraint)
1475   {
1476     null_pack_length-=hidden_null_pack_length;
1477     key_part_info= keyinfo->key_part;
1478     if (param->can_use_pk_for_unique)
1479       share->primary_key= 0;
1480     keyinfo->key_length= 0;  // Will compute the sum of the parts below.
1481     /*
1482       Here, we have to make the key fields point to the right record
1483       position.
1484     */
1485     for (i=param->hidden_field_count, reg_field=table->field + i ;
1486          i < field_count;
1487          i++, reg_field++, key_part_info++)
1488     {
1489       key_part_info->init_from_field(*reg_field);
1490       keyinfo->key_length+= key_part_info->store_length;
1491     }
1492   }
1493 
1494   // Create a key over hash_field to enforce unique constraint
1495   if (using_unique_constraint)
1496   {
1497     KEY *hash_key;
1498     KEY_PART_INFO *hash_kpi;
1499 
1500     if (!multi_alloc_root(&table->mem_root,
1501                           &hash_key, sizeof(*hash_key),
1502                           &hash_kpi, sizeof(*hash_kpi), // Only one key part
1503                           NullS))
1504       goto err;
1505     table->key_info= share->key_info= hash_key;
1506     hash_key->table= table;
1507     hash_key->key_part= hash_kpi;
1508     hash_key->actual_flags= hash_key->flags= HA_NULL_ARE_EQUAL;
1509     hash_key->actual_key_parts= hash_key->usable_key_parts= 1;
1510     hash_key->user_defined_key_parts= 1;
1511     hash_key->set_rec_per_key_array(NULL, NULL);
1512     hash_key->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1513     hash_key->algorithm= HA_KEY_ALG_UNDEF;
1514     if (distinct)
1515       hash_key->name= (char*) "<hash_distinct_key>";
1516     else
1517       hash_key->name= (char*) "<hash_group_key>";
1518     hash_kpi->init_from_field(table->hash_field);
1519     hash_key->key_length= hash_kpi->store_length;
1520     param->keyinfo= hash_key;
1521   }
1522 
1523   if (thd->is_fatal_error)				// If end of memory
1524     goto err;					 /* purecov: inspected */
1525   share->db_record_offset= 1;
1526   if (!param->skip_create_table)
1527   {
1528     if (instantiate_tmp_table(table, param->keyinfo, param->start_recinfo,
1529                               &param->recinfo, select_options,
1530                               thd->variables.big_tables, &thd->opt_trace))
1531       goto err;
1532   }
1533 
1534   thd->mem_root= mem_root_save;
1535 
1536   DEBUG_SYNC(thd, "tmp_table_created");
1537 
1538   DBUG_RETURN(table);
1539 
1540 err:
1541   thd->mem_root= mem_root_save;
1542   free_tmp_table(thd,table);                    /* purecov: inspected */
1543   DBUG_RETURN(NULL);				/* purecov: inspected */
1544 }
1545 
1546 /*
1547   Create a temporary table to weed out duplicate rowid combinations
1548 
1549   SYNOPSIS
1550 
1551     create_duplicate_weedout_tmp_table()
1552       thd                    Thread handle
1553       uniq_tuple_length_arg  Length of the table's column
1554       sjtbl                  Update sjtbl->[start_]recinfo values which
1555                              will be needed if we'll need to convert the
1556                              created temptable from HEAP to MyISAM/Maria.
1557 
1558   DESCRIPTION
1559     Create a temporary table to weed out duplicate rowid combinations. The
1560     table has a single column that is a concatenation of all rowids in the
1561     combination.
1562 
1563     Depending on the needed length, there are two cases:
1564 
1565     1. When the length of the column < max_key_length:
1566 
1567       CREATE TABLE tmp (col VARBINARY(n) NOT NULL, UNIQUE KEY(col));
1568 
1569     2. Otherwise (not a valid SQL syntax but internally supported):
1570 
1571       CREATE TABLE tmp (col VARBINARY NOT NULL, UNIQUE CONSTRAINT(col));
1572 
1573     The code in this function was produced by extraction of relevant parts
1574     from create_tmp_table().
1575 
1576   RETURN
1577     created table
1578     NULL on error
1579 */
1580 
create_duplicate_weedout_tmp_table(THD * thd,uint uniq_tuple_length_arg,SJ_TMP_TABLE * sjtbl)1581 TABLE *create_duplicate_weedout_tmp_table(THD *thd,
1582                                           uint uniq_tuple_length_arg,
1583                                           SJ_TMP_TABLE *sjtbl)
1584 {
1585   MEM_ROOT *mem_root_save, own_root;
1586   TABLE *table;
1587   TABLE_SHARE *share;
1588   uint  temp_pool_slot=MY_BIT_NONE;
1589   char	*tmpname,path[FN_REFLEN];
1590   Field **reg_field;
1591   KEY_PART_INFO *key_part_info;
1592   KEY *keyinfo;
1593   uchar *group_buff;
1594   uchar *bitmaps;
1595   uint *blob_field;
1596   MI_COLUMNDEF *recinfo, *start_recinfo;
1597   bool using_unique_constraint=false;
1598   Field *field, *key_field;
1599   uint null_pack_length;
1600   uchar *null_flags;
1601   uchar	*pos;
1602   uint i;
1603 
1604   DBUG_ENTER("create_duplicate_weedout_tmp_table");
1605   assert(!sjtbl->is_confluent);
1606   /*
1607     STEP 1: Get temporary table name
1608   */
1609   thd->inc_status_created_tmp_tables();
1610   if (use_temp_pool && !(test_flags & TEST_KEEP_TMP_TABLES))
1611     temp_pool_slot = bitmap_lock_set_next(&temp_pool);
1612 
1613   if (temp_pool_slot != MY_BIT_NONE) // we got a slot
1614     sprintf(path, "%s_%lx_%i", tmp_file_prefix,
1615 	    current_pid, temp_pool_slot);
1616   else
1617   {
1618     /* if we run out of slots or we are not using tempool */
1619     assert(sizeof(my_thread_id) == 4);
1620     sprintf(path,"%s%lx_%x_%x", tmp_file_prefix,current_pid,
1621             thd->thread_id(), thd->tmp_table++);
1622   }
1623   fn_format(path, path, mysql_tmpdir, "", MY_REPLACE_EXT|MY_UNPACK_FILENAME);
1624 
1625   /* STEP 2: Figure if we'll be using a key or blob+constraint */
1626   if (uniq_tuple_length_arg > CONVERT_IF_BIGGER_TO_BLOB)
1627     using_unique_constraint= true;
1628 
1629   /* STEP 3: Allocate memory for temptable description */
1630   init_sql_alloc(key_memory_TABLE, &own_root, TABLE_ALLOC_BLOCK_SIZE, 0);
1631   if (!multi_alloc_root(&own_root,
1632                         &table, sizeof(*table),
1633                         &share, sizeof(*share),
1634                         &reg_field, sizeof(Field*) * (1+2),
1635                         &blob_field, sizeof(uint)*3,
1636                         &keyinfo, sizeof(*keyinfo),
1637                         &key_part_info, sizeof(*key_part_info) * 2,
1638                         &start_recinfo,
1639                         sizeof(*recinfo)*(1*2+2),
1640                         &tmpname, strlen(path)+1,
1641                         &group_buff, (!using_unique_constraint ?
1642                                       uniq_tuple_length_arg : 0),
1643                         &bitmaps, bitmap_buffer_size(1) * 3,
1644                         NullS))
1645   {
1646     if (temp_pool_slot != MY_BIT_NONE)
1647       bitmap_lock_clear_bit(&temp_pool, temp_pool_slot);
1648     DBUG_RETURN(NULL);
1649   }
1650   my_stpcpy(tmpname,path);
1651 
1652   /* STEP 4: Create TABLE description */
1653   new (table) TABLE;
1654   memset(reg_field, 0, sizeof(Field*) * 3);
1655 
1656   table->mem_root= own_root;
1657   mem_root_save= thd->mem_root;
1658   thd->mem_root= &table->mem_root;
1659 
1660   table->field=reg_field;
1661   table->alias= "weedout-tmp";
1662   table->reginfo.lock_type=TL_WRITE;	/* Will be updated */
1663   table->db_stat=HA_OPEN_KEYFILE+HA_OPEN_RNDFILE;
1664   table->temp_pool_slot = temp_pool_slot;
1665   table->copy_blobs= 1;
1666   table->in_use= thd;
1667   table->quick_keys.init();
1668   table->possible_quick_keys.init();
1669   table->covering_keys.init();
1670   table->keys_in_use_for_query.init();
1671 
1672   table->s= share;
1673   init_tmp_table_share(thd, share, "", 0, tmpname, tmpname);
1674   share->blob_field= blob_field;
1675   share->db_low_byte_first=1;                // True for HEAP and MyISAM
1676   share->table_charset= NULL;
1677   share->primary_key= MAX_KEY;               // Indicate no primary key
1678   share->keys_for_keyread.init();
1679   share->keys_in_use.init();
1680 
1681   uint reclength= 0;
1682   uint null_count= 0;
1683 
1684   /* Create the field */
1685   if (using_unique_constraint)
1686   {
1687     Field_longlong *field= new(&table->mem_root)
1688           Field_longlong(sizeof(ulonglong), false, "<hash_field>", true);
1689     if (!field)
1690     {
1691       assert(thd->is_fatal_error);
1692       goto err;				// Got OOM
1693     }
1694     // Mark hash_field as NOT NULL
1695     field->flags &= NOT_NULL_FLAG;
1696     *(reg_field++)= sjtbl->hash_field= field;
1697     table->hash_field= field;
1698     field->table= field->orig_table= table;
1699     share->fields++;
1700     field->field_index= 0;
1701     reclength= field->pack_length();
1702     table->hidden_field_count++;
1703   }
1704   {
1705     /*
1706       For the sake of uniformity, always use Field_varstring (altough we could
1707       use Field_string for shorter keys)
1708     */
1709     field= new Field_varstring(uniq_tuple_length_arg, FALSE, "rowids", share,
1710                                &my_charset_bin);
1711     if (!field)
1712       DBUG_RETURN(0);
1713     field->table= table;
1714     field->unireg_check= Field::NONE;
1715     field->flags= (NOT_NULL_FLAG | BINARY_FLAG | NO_DEFAULT_VALUE_FLAG);
1716     field->reset_fields();
1717     field->init(table);
1718     field->orig_table= NULL;
1719     *(reg_field++)= field;
1720     *blob_field= 0;
1721     *reg_field= 0;
1722 
1723     field->field_index= share->fields;
1724     share->fields++;
1725     share->blob_fields= 0;
1726     reclength+= field->pack_length();
1727     null_count++;
1728   }
1729 
1730   if (using_unique_constraint)
1731   {
1732     switch (internal_tmp_disk_storage_engine)
1733     {
1734     case TMP_TABLE_MYISAM:
1735       share->db_plugin= ha_lock_engine(0, myisam_hton);
1736       break;
1737     case TMP_TABLE_INNODB:
1738       share->db_plugin= ha_lock_engine(0, innodb_hton);
1739       break;
1740     default:
1741       assert(0);
1742       share->db_plugin= ha_lock_engine(0, innodb_hton);
1743     }
1744     table->file= get_new_handler(share, &table->mem_root,
1745                                  share->db_type());
1746   }
1747   else
1748   {
1749     share->db_plugin= ha_lock_engine(0, heap_hton);
1750     table->file= get_new_handler(share, &table->mem_root,
1751                                  share->db_type());
1752   }
1753 
1754 
1755   if (!table->file)
1756     goto err;
1757 
1758   if (table->file->set_ha_share_ref(&share->ha_share))
1759   {
1760     delete table->file;
1761     goto err;
1762   }
1763 
1764   null_pack_length= 1;
1765   reclength+= null_pack_length;
1766 
1767   share->reclength= reclength;
1768   {
1769     uint alloc_length= ALIGN_SIZE(share->reclength + MI_UNIQUE_HASH_LENGTH+1);
1770     share->rec_buff_length= alloc_length;
1771     if (!(table->record[0]= (uchar*)
1772                             alloc_root(&table->mem_root, alloc_length * 3)))
1773       goto err;
1774     table->record[1]= table->record[0] + alloc_length;
1775     share->default_values= table->record[1] + alloc_length;
1776   }
1777   setup_tmp_table_column_bitmaps(table, bitmaps);
1778 
1779   recinfo= start_recinfo;
1780   null_flags= table->record[0];
1781 
1782   pos= table->record[0] + null_pack_length;
1783   if (null_pack_length)
1784   {
1785     memset(recinfo, 0, sizeof(*recinfo));
1786     recinfo->type= FIELD_NORMAL;
1787     recinfo->length= null_pack_length;
1788     recinfo++;
1789     memset(null_flags, 255, null_pack_length);	// Set null fields
1790 
1791     table->null_flags= table->record[0];
1792     share->null_fields= null_count;
1793     share->null_bytes= null_pack_length;
1794   }
1795   null_count=1;
1796   for (i=0, reg_field=table->field; i < share->fields;
1797        i++, reg_field++, recinfo++)
1798   {
1799     Field *field= *reg_field;
1800     uint length;
1801     /* Table description for the concatenated rowid column */
1802     memset(recinfo, 0, sizeof(*recinfo));
1803 
1804     if (!(field->flags & NOT_NULL_FLAG))
1805     {
1806       if (field->flags & GROUP_FLAG && !using_unique_constraint)
1807       {
1808         /*
1809           We have to reserve one byte here for NULL bits,
1810           as this is updated by 'end_update()'
1811         */
1812         *pos++= 0;				// Null is stored here
1813         recinfo->length= 1;
1814         recinfo->type= FIELD_NORMAL;
1815         recinfo++;
1816         memset(recinfo, 0, sizeof(*recinfo));
1817       }
1818       else
1819       {
1820         recinfo->null_bit= (uint8)1 << (null_count & 7);
1821         recinfo->null_pos= null_count/8;
1822       }
1823       field->move_field(pos,null_flags+null_count/8,
1824                         (uint8)1 << (null_count & 7));
1825       null_count++;
1826     }
1827     else
1828       field->move_field(pos,(uchar*) 0, 0);
1829     if (field->type() == MYSQL_TYPE_BIT)
1830     {
1831       /* We have to reserve place for extra bits among null bits */
1832       ((Field_bit*) field)->set_bit_ptr(null_flags + null_count / 8,
1833                                         null_count & 7);
1834       null_count+= (field->field_length & 7);
1835     }
1836     field->reset();
1837 
1838     length= field->pack_length();
1839     pos+= length;
1840 
1841     /*
1842       Don't care about packing the VARCHAR since it's only a
1843       concatenation of rowids. @see create_tmp_table() for how
1844       packed VARCHARs can be achieved
1845     */
1846     recinfo->length= length;
1847     recinfo->type= FIELD_NORMAL;
1848 
1849     // fix table name in field entry
1850     field->table_name= &table->alias;
1851   }
1852 
1853   if (thd->variables.tmp_table_size == ~ (ulonglong) 0)		// No limit
1854     share->max_rows= ~(ha_rows) 0;
1855   else
1856     share->max_rows= (ha_rows) (((share->db_type() == heap_hton) ?
1857                                  min(thd->variables.tmp_table_size,
1858                                      thd->variables.max_heap_table_size) :
1859                                  thd->variables.tmp_table_size) /
1860 			         share->reclength);
1861   set_if_bigger(share->max_rows,1);		// For dummy start options
1862 
1863 
1864   // Create a key over param->hash_field to enforce unique constraint
1865   if (using_unique_constraint)
1866   {
1867     KEY *hash_key= keyinfo;
1868     KEY_PART_INFO *hash_kpi= key_part_info;
1869 
1870     share->keys= 1;
1871     table->key_info= share->key_info= hash_key;
1872     hash_key->table= table;
1873     hash_key->key_part= hash_kpi;
1874     hash_key->actual_flags= hash_key->flags= HA_NULL_ARE_EQUAL;
1875     hash_kpi->init_from_field(sjtbl->hash_field);
1876     hash_key->key_length= hash_kpi->store_length;
1877   }
1878   else
1879   {
1880     DBUG_PRINT("info",("Creating group key in temporary table"));
1881     share->keys=1;
1882     table->key_info= table->s->key_info= keyinfo;
1883     keyinfo->key_part=key_part_info;
1884     keyinfo->actual_flags= keyinfo->flags= HA_NOSAME;
1885     keyinfo->key_length=0;
1886     {
1887       key_part_info->init_from_field(field);
1888       assert(key_part_info->key_type == FIELDFLAG_BINARY);
1889 
1890       key_field= field->new_key_field(thd->mem_root, table, group_buff);
1891       if (!key_field)
1892         goto err;
1893       key_part_info->key_part_flag|= HA_END_SPACE_ARE_EQUAL; //todo need this?
1894       keyinfo->key_length+=  key_part_info->length;
1895     }
1896   }
1897   {
1898     table->key_info->user_defined_key_parts= 1;
1899     table->key_info->usable_key_parts= 1;
1900     table->key_info->actual_key_parts= table->key_info->user_defined_key_parts;
1901     table->key_info->set_rec_per_key_array(NULL, NULL);
1902     table->key_info->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1903     table->key_info->algorithm= HA_KEY_ALG_UNDEF;
1904     table->key_info->name= (char*) "weedout_key";
1905   }
1906 
1907   if (thd->is_fatal_error)				// If end of memory
1908     goto err;
1909   share->db_record_offset= 1;
1910   if (instantiate_tmp_table(table, table->key_info, start_recinfo, &recinfo,
1911                             0, 0, &thd->opt_trace))
1912     goto err;
1913 
1914   sjtbl->start_recinfo= start_recinfo;
1915   sjtbl->recinfo=       recinfo;
1916 
1917   thd->mem_root= mem_root_save;
1918   DBUG_RETURN(table);
1919 
1920 err:
1921   thd->mem_root= mem_root_save;
1922   table->file->ha_index_or_rnd_end();
1923   free_tmp_table(thd,table);                    /* purecov: inspected */
1924   DBUG_RETURN(NULL);				/* purecov: inspected */
1925 }
1926 
1927 
1928 /****************************************************************************/
1929 
1930 /**
1931   Create a reduced TABLE object with properly set up Field list from a
1932   list of field definitions.
1933 
1934     The created table doesn't have a table handler associated with
1935     it, has no keys, no group/distinct, no copy_funcs array.
1936     The sole purpose of this TABLE object is to use the power of Field
1937     class to read/write data to/from table->record[0]. Then one can store
1938     the record in any container (RB tree, hash, etc).
1939     The table is created in THD mem_root, so are the table's fields.
1940     Consequently, if you don't BLOB fields, you don't need to free it.
1941 
1942   @param thd         connection handle
1943   @param field_list  list of column definitions
1944 
1945   @return
1946     0 if out of memory, TABLE object in case of success
1947 */
1948 
create_virtual_tmp_table(THD * thd,List<Create_field> & field_list)1949 TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list)
1950 {
1951   uint field_count= field_list.elements;
1952   uint blob_count= 0;
1953   Field **field;
1954   Create_field *cdef;                           /* column definition */
1955   uint record_length= 0;
1956   uint null_count= 0;                 /* number of columns which may be null */
1957   uint null_pack_length;              /* NULL representation array length */
1958   uint *blob_field;
1959   uchar *bitmaps;
1960   TABLE *table;
1961   TABLE_SHARE *share;
1962 
1963   if (!multi_alloc_root(thd->mem_root,
1964                         &table, sizeof(*table),
1965                         &share, sizeof(*share),
1966                         &field, (field_count + 1) * sizeof(Field*),
1967                         &blob_field, (field_count+1) *sizeof(uint),
1968                         &bitmaps, bitmap_buffer_size(field_count) * 3,
1969                         NullS))
1970     return 0;
1971 
1972   new (table) TABLE;
1973   new (share) TABLE_SHARE;
1974   table->field= field;
1975   table->s= share;
1976   table->temp_pool_slot= MY_BIT_NONE;
1977   share->blob_field= blob_field;
1978   share->fields= field_count;
1979   share->db_low_byte_first=1;                // True for HEAP and MyISAM
1980   setup_tmp_table_column_bitmaps(table, bitmaps);
1981 
1982   /* Create all fields and calculate the total length of record */
1983   List_iterator_fast<Create_field> it(field_list);
1984   while ((cdef= it++))
1985   {
1986     *field= make_field(share, 0, cdef->length,
1987                        (uchar*) (f_maybe_null(cdef->pack_flag) ? "" : 0),
1988                        f_maybe_null(cdef->pack_flag) ? 1 : 0,
1989                        cdef->pack_flag, cdef->sql_type, cdef->charset,
1990                        cdef->geom_type, cdef->unireg_check,
1991                        cdef->interval, cdef->field_name);
1992     if (!*field)
1993       goto error;
1994     (*field)->init(table);
1995     record_length+= (*field)->pack_length();
1996     if (! ((*field)->flags & NOT_NULL_FLAG))
1997       null_count++;
1998 
1999     if ((*field)->flags & BLOB_FLAG)
2000       share->blob_field[blob_count++]= (uint) (field - table->field);
2001 
2002     field++;
2003   }
2004   *field= NULL;                             /* mark the end of the list */
2005   share->blob_field[blob_count]= 0;            /* mark the end of the list */
2006   share->blob_fields= blob_count;
2007 
2008   null_pack_length= (null_count + 7)/8;
2009   share->reclength= record_length + null_pack_length;
2010   share->rec_buff_length= ALIGN_SIZE(share->reclength + 1);
2011   table->record[0]= (uchar*) thd->alloc(share->rec_buff_length);
2012   if (!table->record[0])
2013     goto error;
2014 
2015   if (null_pack_length)
2016   {
2017     table->null_flags= table->record[0];
2018     share->null_fields= null_count;
2019     share->null_bytes= null_pack_length;
2020   }
2021 
2022   table->in_use= thd;           /* field->reset() may access table->in_use */
2023   {
2024     /* Set up field pointers */
2025     uchar *null_pos= table->record[0];
2026     uchar *field_pos= null_pos + share->null_bytes;
2027     uint null_bit= 1;
2028 
2029     for (field= table->field; *field; ++field)
2030     {
2031       Field *cur_field= *field;
2032       if ((cur_field->flags & NOT_NULL_FLAG))
2033         cur_field->move_field(field_pos);
2034       else
2035       {
2036         cur_field->move_field(field_pos, null_pos, null_bit);
2037         null_bit<<= 1;
2038         if (null_bit == (uint8)1 << 8)
2039         {
2040           ++null_pos;
2041           null_bit= 1;
2042         }
2043       }
2044       if (cur_field->type() == MYSQL_TYPE_BIT &&
2045           cur_field->key_type() == HA_KEYTYPE_BIT)
2046       {
2047         /* This is a Field_bit since key_type is HA_KEYTYPE_BIT */
2048         static_cast<Field_bit*>(cur_field)->set_bit_ptr(null_pos, null_bit);
2049         null_bit+= cur_field->field_length & 7;
2050         if (null_bit > 7)
2051         {
2052           null_pos++;
2053           null_bit-= 8;
2054         }
2055       }
2056       cur_field->reset();
2057 
2058       field_pos+= cur_field->pack_length();
2059     }
2060   }
2061   return table;
2062 error:
2063   for (field= table->field; *field; ++field)
2064     delete *field;                         /* just invokes field destructor */
2065   return 0;
2066 }
2067 
2068 
open_tmp_table(TABLE * table)2069 bool open_tmp_table(TABLE *table)
2070 {
2071   int error;
2072   if ((error=table->file->ha_open(table, table->s->table_name.str,O_RDWR,
2073                                   HA_OPEN_TMP_TABLE | HA_OPEN_INTERNAL_TABLE)))
2074   {
2075     table->file->print_error(error,MYF(0)); /* purecov: inspected */
2076     table->db_stat=0;
2077     return(1);
2078   }
2079   (void) table->file->extra(HA_EXTRA_QUICK);		/* Faster */
2080 
2081   table->set_created();
2082 
2083   return false;
2084 }
2085 
2086 
2087 /*
2088   Create MyISAM temporary table
2089 
2090   SYNOPSIS
2091     create_myisam_tmp_table()
2092       table           Table object that descrimes the table to be created
2093       keyinfo         Description of the index (there is always one index)
2094       start_recinfo   MyISAM's column descriptions
2095       recinfo INOUT   End of MyISAM's column descriptions
2096       options         Option bits
2097 
2098   DESCRIPTION
2099     Create a MyISAM temporary table according to passed description. The is
2100     assumed to have one unique index or constraint.
2101 
2102     The passed array or MI_COLUMNDEF structures must have this form:
2103 
2104       1. 1-byte column (afaiu for 'deleted' flag) (note maybe not 1-byte
2105          when there are many nullable columns)
2106       2. Table columns
2107       3. One free MI_COLUMNDEF element (*recinfo points here)
2108 
2109     This function may use the free element to create hash column for unique
2110     constraint.
2111 
2112    RETURN
2113      FALSE - OK
2114      TRUE  - Error
2115 */
2116 
create_myisam_tmp_table(TABLE * table,KEY * keyinfo,MI_COLUMNDEF * start_recinfo,MI_COLUMNDEF ** recinfo,ulonglong options,my_bool big_tables)2117 bool create_myisam_tmp_table(TABLE *table, KEY *keyinfo,
2118                              MI_COLUMNDEF *start_recinfo,
2119                              MI_COLUMNDEF **recinfo,
2120                              ulonglong options, my_bool big_tables)
2121 {
2122   int error;
2123   MI_KEYDEF keydef;
2124   MI_UNIQUEDEF uniquedef;
2125   TABLE_SHARE *share= table->s;
2126   DBUG_ENTER("create_myisam_tmp_table");
2127 
2128   if (share->keys)
2129   {						// Get keys for ni_create
2130     if (share->keys > 1)
2131     {
2132       assert(0); // This code can't handle more than 1 key
2133       share->keys= 1;
2134     }
2135     HA_KEYSEG *seg= (HA_KEYSEG*) alloc_root(&table->mem_root,
2136                                             sizeof(*seg) *
2137                                             keyinfo->user_defined_key_parts);
2138     if (!seg)
2139       goto err;
2140 
2141     memset(seg, 0, sizeof(*seg) * keyinfo->user_defined_key_parts);
2142 
2143     /* Create an unique key */
2144     memset(&keydef, 0, sizeof(keydef));
2145     keydef.flag= static_cast<uint16>(keyinfo->flags);
2146     keydef.keysegs=  keyinfo->user_defined_key_parts;
2147     keydef.seg= seg;
2148 
2149     for (uint i=0; i < keyinfo->user_defined_key_parts ; i++,seg++)
2150     {
2151       Field *field=keyinfo->key_part[i].field;
2152       seg->flag=     0;
2153       seg->language= field->charset()->number;
2154       seg->length=   keyinfo->key_part[i].length;
2155       seg->start=    keyinfo->key_part[i].offset;
2156       if (field->flags & BLOB_FLAG)
2157       {
2158 	seg->type=
2159 	((keyinfo->key_part[i].key_type & FIELDFLAG_BINARY) ?
2160 	 HA_KEYTYPE_VARBINARY2 : HA_KEYTYPE_VARTEXT2);
2161 	seg->bit_start= (uint8)(field->pack_length() -
2162                                 portable_sizeof_char_ptr);
2163 	seg->flag= HA_BLOB_PART;
2164 	seg->length=0;			// Whole blob in unique constraint
2165       }
2166       else
2167       {
2168 	seg->type= keyinfo->key_part[i].type;
2169         /* Tell handler if it can do suffic space compression */
2170 	if (field->real_type() == MYSQL_TYPE_STRING &&
2171 	    keyinfo->key_part[i].length > 4)
2172 	  seg->flag|= HA_SPACE_PACK;
2173       }
2174       if (!(field->flags & NOT_NULL_FLAG))
2175       {
2176 	seg->null_bit= field->null_bit;
2177 	seg->null_pos= field->null_offset();
2178       }
2179     }
2180   }
2181   MI_CREATE_INFO create_info;
2182   memset(&create_info, 0, sizeof(create_info));
2183 
2184   if (big_tables && !(options & SELECT_SMALL_RESULT))
2185     create_info.data_file_length= ~(ulonglong) 0;
2186 
2187   if ((error=mi_create(share->table_name.str, share->keys, &keydef,
2188                        (uint) (*recinfo - start_recinfo),
2189                        start_recinfo,
2190                        0, &uniquedef,
2191                        &create_info,
2192                        HA_CREATE_TMP_TABLE | HA_CREATE_INTERNAL_TABLE |
2193                        ((share->db_create_options & HA_OPTION_PACK_RECORD) ?
2194                         HA_PACK_RECORD : 0)
2195                        )))
2196   {
2197     table->file->print_error(error,MYF(0));	/* purecov: inspected */
2198     /*
2199       Table name which was allocated from temp-pool is already occupied
2200       in SE. Probably we hit a bug in server or some problem with system
2201       configuration. Prevent problem from re-occurring by marking temp-pool
2202       slot for this name as permanently busy, to do this we only need to set
2203       TABLE::temp_pool_slot to MY_BIT_NONE in order to avoid freeing it
2204       in free_tmp_table().
2205     */
2206     if (error == EEXIST)
2207       table->temp_pool_slot= MY_BIT_NONE;
2208 
2209     table->db_stat=0;
2210     goto err;
2211   }
2212   table->in_use->inc_status_created_tmp_disk_tables();
2213   share->db_record_offset= 1;
2214   DBUG_RETURN(0);
2215  err:
2216   DBUG_RETURN(1);
2217 }
2218 
2219 /*
2220   Create InnoDB temporary table
2221 
2222   SYNOPSIS
2223     create_innodb_tmp_table()
2224       table           Table object that describes the table to be created
2225       keyinfo         Description of the index (there is always one index)
2226 
2227   DESCRIPTION
2228     Create an InnoDB temporary table according to passed description. It is
2229     assumed to have one unique index or constraint.
2230 
2231     The passed array or MI_COLUMNDEF structures must have this form:
2232 
2233       1. 1-byte column (afaiu for 'deleted' flag) (note maybe not 1-byte
2234          when there are many nullable columns)
2235       2. Table columns
2236       3. One free MI_COLUMNDEF element (*recinfo points here)
2237 
2238     This function may use the free element to create hash column for unique
2239     constraint.
2240 
2241    RETURN
2242      FALSE - OK
2243      TRUE  - Error
2244 */
create_innodb_tmp_table(TABLE * table,KEY * keyinfo)2245 bool create_innodb_tmp_table(TABLE *table, KEY *keyinfo)
2246 {
2247   TABLE_SHARE *share= table->s;
2248 
2249   DBUG_ENTER("create_innodb_tmp_table");
2250 
2251   HA_CREATE_INFO create_info;
2252 
2253   create_info.db_type= table->s->db_type();
2254   create_info.row_type= table->s->row_type;
2255   create_info.options|= HA_LEX_CREATE_TMP_TABLE |
2256                         HA_LEX_CREATE_INTERNAL_TMP_TABLE;
2257   /*
2258     INNODB's fixed length column size is restricted to 1024. Exceeding this can
2259     result in incorrect behavior.
2260   */
2261   if (table->s->db_type() == innodb_hton)
2262   {
2263     for (Field **field= table->field; *field; ++field)
2264     {
2265       if ((*field)->type() == MYSQL_TYPE_STRING &&
2266           (*field)->key_length() > 1024)
2267       {
2268         my_error(ER_TOO_LONG_KEY, MYF(0), 1024);
2269         DBUG_RETURN(true);
2270       }
2271     }
2272   }
2273 
2274   int error;
2275   if ((error= table->file->create(share->table_name.str, table, &create_info)))
2276   {
2277     table->file->print_error(error,MYF(0));    /* purecov: inspected */
2278     /*
2279       Table name which was allocated from temp-pool is already occupied
2280       in SE. Probably we hit a bug in server or some problem with system
2281       configuration. Prevent problem from re-occurring by marking temp-pool
2282       slot for this name as permanently busy, to do this we only need to set
2283       TABLE::temp_pool_slot to MY_BIT_NONE in order to avoid freeing it
2284       in free_tmp_table().
2285 
2286       Note that currently InnoDB never reports an error in this case but
2287       instead aborts on failed assertion. So the below if-statement is here
2288       mostly to make code future-proof and consistent with MyISAM case.
2289    */
2290 
2291    if (error == HA_ERR_FOUND_DUPP_KEY || error == HA_ERR_TABLESPACE_EXISTS ||
2292        error == HA_ERR_TABLE_EXIST)
2293      table->temp_pool_slot= MY_BIT_NONE;
2294     table->db_stat= 0;
2295     DBUG_RETURN(true);
2296   }
2297   else
2298   {
2299     table->in_use->inc_status_created_tmp_disk_tables();
2300     share->db_record_offset= 1;
2301     DBUG_RETURN(false);
2302   }
2303 }
2304 
trace_tmp_table(Opt_trace_context * trace,const TABLE * table)2305 static void trace_tmp_table(Opt_trace_context *trace, const TABLE *table)
2306 {
2307   Opt_trace_object trace_tmp(trace, "tmp_table_info");
2308   if (strlen(table->alias) != 0)
2309     trace_tmp.add_utf8_table(table->pos_in_table_list);
2310   else
2311     trace_tmp.add_alnum("table", "intermediate_tmp_table");
2312 
2313   trace_tmp.add("row_length",table->s->reclength).
2314     add("key_length", table->s->key_info ?
2315         table->s->key_info->key_length : 0).
2316     add("unique_constraint", table->hash_field ? true : false);
2317 
2318   if (table->s->db_type() == myisam_hton)
2319   {
2320     trace_tmp.add_alnum("location", "disk (MyISAM)");
2321     if (table->s->db_create_options & HA_OPTION_PACK_RECORD)
2322       trace_tmp.add_alnum("record_format", "packed");
2323     else
2324       trace_tmp.add_alnum("record_format", "fixed");
2325   }
2326   else if(table->s->db_type() == innodb_hton)
2327   {
2328     trace_tmp.add_alnum("location", "disk (InnoDB)");
2329     if (table->s->db_create_options & HA_OPTION_PACK_RECORD)
2330       trace_tmp.add_alnum("record_format", "packed");
2331     else
2332       trace_tmp.add_alnum("record_format", "fixed");
2333   }
2334   else
2335   {
2336     assert(table->s->db_type() == heap_hton);
2337     trace_tmp.add_alnum("location", "memory (heap)").
2338       add("row_limit_estimate", table->s->max_rows);
2339   }
2340 }
2341 
2342 /**
2343   @brief
2344   Instantiates temporary table
2345 
2346   @param  table           Table object that describes the table to be
2347                           instantiated
2348   @param  keyinfo         Description of the index (there is always one index)
2349   @param  start_recinfo   Column descriptions
2350   @param  recinfo INOUT   End of column descriptions
2351   @param  options         Option bits
2352   @param  trace           Optimizer trace to write info to
2353 
2354   @details
2355     Creates tmp table and opens it.
2356 
2357   @return
2358      FALSE - OK
2359      TRUE  - Error
2360 */
2361 
instantiate_tmp_table(TABLE * table,KEY * keyinfo,MI_COLUMNDEF * start_recinfo,MI_COLUMNDEF ** recinfo,ulonglong options,my_bool big_tables,Opt_trace_context * trace)2362 bool instantiate_tmp_table(TABLE *table, KEY *keyinfo,
2363                            MI_COLUMNDEF *start_recinfo,
2364                            MI_COLUMNDEF **recinfo,
2365                            ulonglong options, my_bool big_tables,
2366                            Opt_trace_context *trace)
2367 {
2368 #ifndef NDEBUG
2369   for (uint i= 0; i < table->s->fields; i++)
2370     assert(table->field[i]->gcol_info== NULL && table->field[i]->stored_in_db);
2371 #endif
2372 
2373   if (table->s->db_type() == innodb_hton)
2374   {
2375     if (create_innodb_tmp_table(table, keyinfo))
2376       return TRUE;
2377     // Make empty record so random data is not written to disk
2378     empty_record(table);
2379   }
2380   else if (table->s->db_type() == myisam_hton)
2381   {
2382     if (create_myisam_tmp_table(table, keyinfo, start_recinfo, recinfo,
2383                                 options, big_tables))
2384       return TRUE;
2385     // Make empty record so random data is not written to disk
2386     empty_record(table);
2387   }
2388 
2389   if (open_tmp_table(table))
2390   {
2391     table->file->ha_delete_table(table->s->table_name.str);
2392     return TRUE;
2393   }
2394 
2395   if (unlikely(trace->is_started()))
2396   {
2397     Opt_trace_object wrapper(trace);
2398     Opt_trace_object convert(trace, "creating_tmp_table");
2399     trace_tmp_table(trace, table);
2400   }
2401   return FALSE;
2402 }
2403 
2404 void
free_tmp_table(THD * thd,TABLE * entry)2405 free_tmp_table(THD *thd, TABLE *entry)
2406 {
2407   MEM_ROOT own_root= entry->mem_root;
2408   const char *save_proc_info;
2409   DBUG_ENTER("free_tmp_table");
2410   DBUG_PRINT("enter",("table: %s",entry->alias));
2411 
2412   save_proc_info=thd->proc_info;
2413   THD_STAGE_INFO(thd, stage_removing_tmp_table);
2414 
2415   // Release latches since this can take a long time
2416   ha_release_temporary_latches(thd);
2417 
2418   filesort_free_buffers(entry, true);
2419 
2420   if (entry->is_created())
2421   {
2422     if (entry->db_stat)
2423       entry->file->ha_drop_table(entry->s->table_name.str);
2424     else
2425       entry->file->ha_delete_table(entry->s->table_name.str);
2426     delete entry->file;
2427     entry->file= NULL;
2428 
2429     entry->set_deleted();
2430   }
2431   /* free blobs */
2432   for (Field **ptr=entry->field ; *ptr ; ptr++)
2433     (*ptr)->mem_free();
2434   free_io_cache(entry);
2435 
2436   if (entry->temp_pool_slot != MY_BIT_NONE)
2437     bitmap_lock_clear_bit(&temp_pool, entry->temp_pool_slot);
2438 
2439   plugin_unlock(0, entry->s->db_plugin);
2440 
2441   free_root(&own_root, MYF(0)); /* the table is allocated in its own root */
2442   thd_proc_info(thd, save_proc_info);
2443 
2444   DBUG_VOID_RETURN;
2445 }
2446 
2447 /**
2448   If a MEMORY table gets full, create a disk-based table and copy all rows
2449   to this.
2450 
2451   @param thd             THD reference
2452   @param table           Table reference
2453   @param start_recinfo   Engine's column descriptions
2454   @param recinfo[in,out] End of engine's column descriptions
2455   @param error           Reason why inserting into MEMORY table failed.
2456   @param ignore_last_dup If true, ignore duplicate key error for last
2457                          inserted key (see detailed description below).
2458   @param is_duplicate[out] if non-NULL and ignore_last_dup is TRUE,
2459                          return TRUE if last key was a duplicate,
2460                          and FALSE otherwise.
2461 
2462   @detail
2463     Function can be called with any error code, but only HA_ERR_RECORD_FILE_FULL
2464     will be handled, all other errors cause a fatal error to be thrown.
2465     The function creates a disk-based temporary table, copies all records
2466     from the MEMORY table into this new table, deletes the old table and
2467     switches to use the new table within the table handle.
2468     The function uses table->record[1] as a temporary buffer while copying.
2469 
2470     The function assumes that table->record[0] contains the row that caused
2471     the error when inserting into the MEMORY table (the "last row").
2472     After all existing rows have been copied to the new table, the last row
2473     is attempted to be inserted as well. If ignore_last_dup is true,
2474     this row can be a duplicate of an existing row without throwing an error.
2475     If is_duplicate is non-NULL, an indication of whether the last row was
2476     a duplicate is returned.
2477 
2478   @note that any index/scan access initialized on the MEMORY table is not
2479   replicated to the on-disk table - it's the caller's responsibility.
2480 */
2481 
create_ondisk_from_heap(THD * thd,TABLE * table,MI_COLUMNDEF * start_recinfo,MI_COLUMNDEF ** recinfo,int error,bool ignore_last_dup,bool * is_duplicate)2482 bool create_ondisk_from_heap(THD *thd, TABLE *table,
2483                              MI_COLUMNDEF *start_recinfo,
2484                              MI_COLUMNDEF **recinfo,
2485 			     int error, bool ignore_last_dup,
2486                              bool *is_duplicate)
2487 {
2488   TABLE new_table;
2489   TABLE_SHARE share;
2490   const char *save_proc_info;
2491   int write_err;
2492   DBUG_ENTER("create_ondisk_from_heap");
2493 
2494   if (table->s->db_type() != heap_hton ||
2495       error != HA_ERR_RECORD_FILE_FULL)
2496   {
2497     /*
2498       We don't want this error to be converted to a warning, e.g. in case of
2499       INSERT IGNORE ... SELECT.
2500     */
2501     table->file->print_error(error, MYF(ME_FATALERROR));
2502     DBUG_RETURN(1);
2503   }
2504 
2505   // Release latches since this can take a long time
2506   ha_release_temporary_latches(thd);
2507 
2508   new_table= *table;
2509   share= *table->s;
2510   share.ha_share= NULL;
2511   new_table.s= &share;
2512   switch (internal_tmp_disk_storage_engine)
2513   {
2514   case TMP_TABLE_MYISAM:
2515     new_table.s->db_plugin= ha_lock_engine(thd, myisam_hton);
2516     break;
2517   case TMP_TABLE_INNODB:
2518     new_table.s->db_plugin= ha_lock_engine(thd, innodb_hton);
2519     break;
2520   default:
2521     assert(0);
2522     new_table.s->db_plugin= ha_lock_engine(thd, innodb_hton);
2523   }
2524 
2525   if (!(new_table.file= get_new_handler(&share, &new_table.mem_root,
2526                                         new_table.s->db_type())))
2527     DBUG_RETURN(1);				// End of memory
2528   if (new_table.file->set_ha_share_ref(&share.ha_share))
2529   {
2530     delete new_table.file;
2531     DBUG_RETURN(1);
2532   }
2533   save_proc_info=thd->proc_info;
2534   THD_STAGE_INFO(thd, stage_converting_heap_to_ondisk);
2535 
2536   if (share.db_type() == myisam_hton)
2537   {
2538     if (create_myisam_tmp_table(&new_table, table->s->key_info,
2539                                 start_recinfo, recinfo,
2540                                 thd->lex->select_lex->active_options(),
2541                                 thd->variables.big_tables))
2542       goto err2;
2543   }
2544   else if (share.db_type() == innodb_hton)
2545   {
2546     if (create_innodb_tmp_table(&new_table, table->s->key_info))
2547       goto err2;
2548   }
2549 
2550   if (open_tmp_table(&new_table))
2551     goto err1;
2552 
2553 
2554   if (unlikely(thd->opt_trace.is_started()))
2555   {
2556     Opt_trace_context * trace= &thd->opt_trace;
2557     Opt_trace_object wrapper(trace);
2558     Opt_trace_object convert(trace, "converting_tmp_table_to_ondisk");
2559     assert(error == HA_ERR_RECORD_FILE_FULL);
2560     convert.add_alnum("cause", "memory_table_size_exceeded");
2561     trace_tmp_table(trace, &new_table);
2562   }
2563 
2564   if (table->file->indexes_are_disabled())
2565     new_table.file->ha_disable_indexes(HA_KEY_SWITCH_ALL);
2566   table->file->ha_index_or_rnd_end();
2567   if ((write_err= table->file->ha_rnd_init(1)))
2568   {
2569     table->file->print_error(write_err, MYF(ME_FATALERROR));
2570     write_err= 0;
2571     goto err;
2572   }
2573   if (table->no_rows)
2574   {
2575     new_table.file->extra(HA_EXTRA_NO_ROWS);
2576     new_table.no_rows=1;
2577   }
2578 
2579   /* HA_EXTRA_WRITE_CACHE can stay until close, no need to disable it */
2580   new_table.file->extra(HA_EXTRA_WRITE_CACHE);
2581 
2582   /*
2583     copy all old rows from heap table to on-disk table
2584     This is the only code that uses record[1] to read/write but this
2585     is safe as this is a temporary on-disk table without timestamp/
2586     autoincrement or partitioning.
2587   */
2588   while (!table->file->ha_rnd_next(new_table.record[1]))
2589   {
2590     write_err= new_table.file->ha_write_row(new_table.record[1]);
2591     DBUG_EXECUTE_IF("raise_error", write_err= HA_ERR_FOUND_DUPP_KEY ;);
2592     if (write_err)
2593       goto err;
2594   }
2595   /* copy row that filled HEAP table */
2596   if ((write_err=new_table.file->ha_write_row(table->record[0])))
2597   {
2598     if (!new_table.file->is_ignorable_error(write_err) ||
2599 	!ignore_last_dup)
2600       goto err;
2601     if (is_duplicate)
2602       *is_duplicate= TRUE;
2603   }
2604   else
2605   {
2606     if (is_duplicate)
2607       *is_duplicate= FALSE;
2608   }
2609 
2610   /* remove heap table and change to use on-disk table */
2611   (void) table->file->ha_rnd_end();
2612   (void) table->file->ha_close();              // This deletes the table !
2613   delete table->file;
2614   table->file=0;
2615   plugin_unlock(0, table->s->db_plugin);
2616   share.db_plugin= my_plugin_lock(0, &share.db_plugin);
2617   new_table.s= table->s;                       // Keep old share
2618   *table= new_table;
2619   *table->s= share;
2620   /* Update quick select, if any. */
2621   {
2622     QEP_TAB *tab= table->reginfo.qep_tab;
2623     assert(tab || !table->reginfo.join_tab);
2624     if (tab && tab->quick())
2625     {
2626       /*
2627         This could happen only with result of derived table/view
2628         materialization.
2629       */
2630       assert(tab->table_ref && tab->table_ref->uses_materialization());
2631       tab->quick()->set_handler(table->file);
2632     }
2633   }
2634   table->file->change_table_ptr(table, table->s);
2635   table->use_all_columns();
2636   if (save_proc_info)
2637     thd_proc_info(thd, (!strcmp(save_proc_info,"Copying to tmp table") ?
2638                   "Copying to tmp table on disk" : save_proc_info));
2639   DBUG_RETURN(0);
2640 
2641  err:
2642   if (write_err)
2643   {
2644     DBUG_PRINT("error",("Got error: %d",write_err));
2645     new_table.file->print_error(write_err, MYF(0));
2646   }
2647   if (table->file->inited)
2648     (void) table->file->ha_rnd_end();
2649   (void) new_table.file->ha_close();
2650  err1:
2651   new_table.file->ha_delete_table(new_table.s->table_name.str);
2652  err2:
2653   delete new_table.file;
2654   thd_proc_info(thd, save_proc_info);
2655   table->mem_root= new_table.mem_root;
2656   DBUG_RETURN(1);
2657 }
2658