1 /* Copyright (c) 2006, 2013, Oracle and/or its affiliates.
2    Copyright (c) 2011, 2013, Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1335  USA */
16 
17 #include "mariadb.h"
18 #include <my_bit.h>
19 #include "rpl_utility.h"
20 #include "log_event.h"
21 
22 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
23 #include "rpl_rli.h"
24 #include "sql_select.h"
25 
26 /**
27   Calculate display length for MySQL56 temporal data types from their metadata.
28   It contains fractional precision in the low 16-bit word.
29 */
30 static uint32
max_display_length_for_temporal2_field(uint32 int_display_length,unsigned int metadata)31 max_display_length_for_temporal2_field(uint32 int_display_length,
32                                        unsigned int metadata)
33 {
34   metadata&= 0x00ff;
35   return int_display_length + metadata + (metadata ? 1 : 0);
36 }
37 
38 
39 /**
40    Compute the maximum display length of a field.
41 
42    @param sql_type Type of the field
43    @param metadata The metadata from the master for the field.
44    @return Maximum length of the field in bytes.
45 
46    The precise values calculated by field->max_display_length() and
47    calculated by max_display_length_for_field() can differ (by +1 or -1)
48    for integer data types (TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT).
49    This slight difference is not important here, because we call
50    this function only for two *different* integer data types.
51  */
52 static uint32
max_display_length_for_field(enum_field_types sql_type,unsigned int metadata)53 max_display_length_for_field(enum_field_types sql_type, unsigned int metadata)
54 {
55   DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata));
56   DBUG_ASSERT(metadata >> 16 == 0);
57 
58   switch (sql_type) {
59   case MYSQL_TYPE_NEWDECIMAL:
60     return metadata >> 8;
61 
62   case MYSQL_TYPE_FLOAT:
63     return 12;
64 
65   case MYSQL_TYPE_DOUBLE:
66     return 22;
67 
68   case MYSQL_TYPE_SET:
69   case MYSQL_TYPE_ENUM:
70       return metadata & 0x00ff;
71 
72   case MYSQL_TYPE_STRING:
73   {
74     uchar type= metadata >> 8;
75     if (type == MYSQL_TYPE_SET || type == MYSQL_TYPE_ENUM)
76       return metadata & 0xff;
77     else
78       /* This is taken from Field_string::unpack. */
79       return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
80   }
81 
82   case MYSQL_TYPE_YEAR:
83   case MYSQL_TYPE_TINY:
84     return 4;
85 
86   case MYSQL_TYPE_SHORT:
87     return 6;
88 
89   case MYSQL_TYPE_INT24:
90     return 9;
91 
92   case MYSQL_TYPE_LONG:
93     return 11;
94 
95 #ifdef HAVE_LONG_LONG
96   case MYSQL_TYPE_LONGLONG:
97     return 20;
98 
99 #endif
100   case MYSQL_TYPE_NULL:
101     return 0;
102 
103   case MYSQL_TYPE_NEWDATE:
104     return 3;
105 
106   case MYSQL_TYPE_DATE:
107     return 3;
108 
109   case MYSQL_TYPE_TIME:
110     return MIN_TIME_WIDTH;
111 
112   case MYSQL_TYPE_TIME2:
113     return max_display_length_for_temporal2_field(MIN_TIME_WIDTH, metadata);
114 
115   case MYSQL_TYPE_TIMESTAMP:
116     return MAX_DATETIME_WIDTH;
117 
118   case MYSQL_TYPE_TIMESTAMP2:
119     return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH, metadata);
120 
121   case MYSQL_TYPE_DATETIME:
122     return MAX_DATETIME_WIDTH;
123 
124   case MYSQL_TYPE_DATETIME2:
125     return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH, metadata);
126 
127   case MYSQL_TYPE_BIT:
128     /*
129       Decode the size of the bit field from the master.
130     */
131     DBUG_ASSERT((metadata & 0xff) <= 7);
132     return 8 * (metadata >> 8U) + (metadata & 0x00ff);
133 
134   case MYSQL_TYPE_VAR_STRING:
135   case MYSQL_TYPE_VARCHAR:
136     return metadata;
137   case MYSQL_TYPE_VARCHAR_COMPRESSED:
138     return metadata - 1;
139 
140     /*
141       The actual length for these types does not really matter since
142       they are used to calc_pack_length, which ignores the given
143       length for these types.
144 
145       Since we want this to be accurate for other uses, we return the
146       maximum size in bytes of these BLOBs.
147     */
148 
149   case MYSQL_TYPE_TINY_BLOB:
150     return (uint32)my_set_bits(1 * 8);
151 
152   case MYSQL_TYPE_MEDIUM_BLOB:
153     return (uint32)my_set_bits(3 * 8);
154 
155   case MYSQL_TYPE_BLOB:
156   case MYSQL_TYPE_BLOB_COMPRESSED:
157     /*
158       For the blob type, Field::real_type() lies and say that all
159       blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
160       at the length instead to decide what the max display size is.
161      */
162     return (uint32)my_set_bits(metadata * 8);
163 
164   case MYSQL_TYPE_LONG_BLOB:
165   case MYSQL_TYPE_GEOMETRY:
166     return (uint32)my_set_bits(4 * 8);
167 
168   default:
169     return ~(uint32) 0;
170   }
171 }
172 
173 
174 /*
175   Compare the pack lengths of a source field (on the master) and a
176   target field (on the slave).
177 
178   @param field    Target field.
179   @param type     Source field type.
180   @param metadata Source field metadata.
181 
182   @retval -1 The length of the source field is smaller than the target field.
183   @retval  0 The length of the source and target fields are the same.
184   @retval  1 The length of the source field is greater than the target field.
185  */
compare_lengths(Field * field,enum_field_types source_type,uint16 metadata)186 int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
187 {
188   DBUG_ENTER("compare_lengths");
189   size_t const source_length=
190     max_display_length_for_field(source_type, metadata);
191   size_t const target_length= field->max_display_length();
192   DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
193                        " target_length: %lu, target_type: %u",
194                        (unsigned long) source_length, source_type,
195                        (unsigned long) target_length, field->real_type()));
196   int result= source_length < target_length ? -1 : source_length > target_length;
197   DBUG_PRINT("result", ("%d", result));
198   DBUG_RETURN(result);
199 }
200 #endif //MYSQL_CLIENT
201 /*********************************************************************
202  *                   table_def member definitions                    *
203  *********************************************************************/
204 
205 /*
206   This function returns the field size in raw bytes based on the type
207   and the encoded field data from the master's raw data.
208 */
calc_field_size(uint col,uchar * master_data) const209 uint32 table_def::calc_field_size(uint col, uchar *master_data) const
210 {
211   uint32 length= 0;
212 
213   switch (type(col)) {
214   case MYSQL_TYPE_NEWDECIMAL:
215     length= my_decimal_get_binary_size(m_field_metadata[col] >> 8,
216                                        m_field_metadata[col] & 0xff);
217     break;
218   case MYSQL_TYPE_DECIMAL:
219   case MYSQL_TYPE_FLOAT:
220   case MYSQL_TYPE_DOUBLE:
221     length= m_field_metadata[col];
222     break;
223   /*
224     The cases for SET and ENUM are include for completeness, however
225     both are mapped to type MYSQL_TYPE_STRING and their real types
226     are encoded in the field metadata.
227   */
228   case MYSQL_TYPE_SET:
229   case MYSQL_TYPE_ENUM:
230   case MYSQL_TYPE_STRING:
231   {
232     uchar type= m_field_metadata[col] >> 8U;
233     if ((type == MYSQL_TYPE_SET) || (type == MYSQL_TYPE_ENUM))
234       length= m_field_metadata[col] & 0x00ff;
235     else
236     {
237       /*
238         We are reading the actual size from the master_data record
239         because this field has the actual lengh stored in the first
240         byte.
241       */
242       length= (uint) *master_data + 1;
243       DBUG_ASSERT(length != 0);
244     }
245     break;
246   }
247   case MYSQL_TYPE_YEAR:
248   case MYSQL_TYPE_TINY:
249     length= 1;
250     break;
251   case MYSQL_TYPE_SHORT:
252     length= 2;
253     break;
254   case MYSQL_TYPE_INT24:
255     length= 3;
256     break;
257   case MYSQL_TYPE_LONG:
258     length= 4;
259     break;
260 #ifdef HAVE_LONG_LONG
261   case MYSQL_TYPE_LONGLONG:
262     length= 8;
263     break;
264 #endif
265   case MYSQL_TYPE_NULL:
266     length= 0;
267     break;
268   case MYSQL_TYPE_NEWDATE:
269     length= 3;
270     break;
271   case MYSQL_TYPE_DATE:
272   case MYSQL_TYPE_TIME:
273     length= 3;
274     break;
275   case MYSQL_TYPE_TIME2:
276     length= my_time_binary_length(m_field_metadata[col]);
277     break;
278   case MYSQL_TYPE_TIMESTAMP:
279     length= 4;
280     break;
281   case MYSQL_TYPE_TIMESTAMP2:
282     length= my_timestamp_binary_length(m_field_metadata[col]);
283     break;
284   case MYSQL_TYPE_DATETIME:
285     length= 8;
286     break;
287   case MYSQL_TYPE_DATETIME2:
288     length= my_datetime_binary_length(m_field_metadata[col]);
289     break;
290   case MYSQL_TYPE_BIT:
291   {
292     /*
293       Decode the size of the bit field from the master.
294         from_len is the length in bytes from the master
295         from_bit_len is the number of extra bits stored in the master record
296       If from_bit_len is not 0, add 1 to the length to account for accurate
297       number of bytes needed.
298     */
299     uint from_len= (m_field_metadata[col] >> 8U) & 0x00ff;
300     uint from_bit_len= m_field_metadata[col] & 0x00ff;
301     DBUG_ASSERT(from_bit_len <= 7);
302     length= from_len + ((from_bit_len > 0) ? 1 : 0);
303     break;
304   }
305   case MYSQL_TYPE_VARCHAR:
306   case MYSQL_TYPE_VARCHAR_COMPRESSED:
307   {
308     length= m_field_metadata[col] > 255 ? 2 : 1; // c&p of Field_varstring::data_length()
309     length+= length == 1 ? (uint32) *master_data : uint2korr(master_data);
310     break;
311   }
312   case MYSQL_TYPE_TINY_BLOB:
313   case MYSQL_TYPE_MEDIUM_BLOB:
314   case MYSQL_TYPE_LONG_BLOB:
315   case MYSQL_TYPE_BLOB:
316   case MYSQL_TYPE_BLOB_COMPRESSED:
317   case MYSQL_TYPE_GEOMETRY:
318   {
319     /*
320       Compute the length of the data. We cannot use get_length() here
321       since it is dependent on the specific table (and also checks the
322       packlength using the internal 'table' pointer) and replication
323       is using a fixed format for storing data in the binlog.
324     */
325     switch (m_field_metadata[col]) {
326     case 1:
327       length= *master_data;
328       break;
329     case 2:
330       length= uint2korr(master_data);
331       break;
332     case 3:
333       length= uint3korr(master_data);
334       break;
335     case 4:
336       length= uint4korr(master_data);
337       break;
338     default:
339       DBUG_ASSERT(0);		// Should not come here
340       break;
341     }
342 
343     length+= m_field_metadata[col];
344     break;
345   }
346   default:
347     length= ~(uint32) 0;
348   }
349   return length;
350 }
351 
352 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
353 /**
354  */
show_sql_type(enum_field_types type,uint16 metadata,String * str,bool char_with_octets)355 void show_sql_type(enum_field_types type, uint16 metadata, String *str,
356                    bool char_with_octets)
357 {
358   DBUG_ENTER("show_sql_type");
359   DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata));
360 
361   switch (type)
362   {
363   case MYSQL_TYPE_TINY:
364     str->set_ascii(STRING_WITH_LEN("tinyint"));
365     break;
366 
367   case MYSQL_TYPE_SHORT:
368     str->set_ascii(STRING_WITH_LEN("smallint"));
369     break;
370 
371   case MYSQL_TYPE_LONG:
372     str->set_ascii(STRING_WITH_LEN("int"));
373     break;
374 
375   case MYSQL_TYPE_FLOAT:
376     str->set_ascii(STRING_WITH_LEN("float"));
377     break;
378 
379   case MYSQL_TYPE_DOUBLE:
380     str->set_ascii(STRING_WITH_LEN("double"));
381     break;
382 
383   case MYSQL_TYPE_NULL:
384     str->set_ascii(STRING_WITH_LEN("null"));
385     break;
386 
387   case MYSQL_TYPE_TIMESTAMP:
388   case MYSQL_TYPE_TIMESTAMP2:
389     str->set_ascii(STRING_WITH_LEN("timestamp"));
390     break;
391 
392   case MYSQL_TYPE_LONGLONG:
393     str->set_ascii(STRING_WITH_LEN("bigint"));
394     break;
395 
396   case MYSQL_TYPE_INT24:
397     str->set_ascii(STRING_WITH_LEN("mediumint"));
398     break;
399 
400   case MYSQL_TYPE_NEWDATE:
401   case MYSQL_TYPE_DATE:
402     str->set_ascii(STRING_WITH_LEN("date"));
403     break;
404 
405   case MYSQL_TYPE_TIME:
406   case MYSQL_TYPE_TIME2:
407     str->set_ascii(STRING_WITH_LEN("time"));
408     break;
409 
410   case MYSQL_TYPE_DATETIME:
411   case MYSQL_TYPE_DATETIME2:
412     str->set_ascii(STRING_WITH_LEN("datetime"));
413     break;
414 
415   case MYSQL_TYPE_YEAR:
416     str->set_ascii(STRING_WITH_LEN("year"));
417     break;
418 
419   case MYSQL_TYPE_VAR_STRING:
420   case MYSQL_TYPE_VARCHAR:
421   case MYSQL_TYPE_VARCHAR_COMPRESSED:
422     {
423       CHARSET_INFO *cs= str->charset();
424       size_t length=0;
425       if (char_with_octets)
426         length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
427                                    "varchar(%u octets)", metadata);
428       else
429         length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
430                                    "varbinary(%u)", metadata);
431       str->length(length);
432     }
433     break;
434 
435   case MYSQL_TYPE_BIT:
436     {
437       CHARSET_INFO *cs= str->charset();
438       int bit_length= 8 * (metadata >> 8) + (metadata & 0xFF);
439       size_t length=
440         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
441                            "bit(%d)", bit_length);
442       str->length(length);
443     }
444     break;
445 
446   case MYSQL_TYPE_DECIMAL:
447     {
448       CHARSET_INFO *cs= str->charset();
449       size_t length=
450         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
451                            "decimal(%d,?)/*old*/", metadata);
452       str->length(length);
453     }
454     break;
455 
456   case MYSQL_TYPE_NEWDECIMAL:
457     {
458       CHARSET_INFO *cs= str->charset();
459       size_t length=
460         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
461                            "decimal(%d,%d)", metadata >> 8, metadata & 0xff);
462       str->length(length);
463     }
464     break;
465 
466   case MYSQL_TYPE_ENUM:
467     str->set_ascii(STRING_WITH_LEN("enum"));
468     break;
469 
470   case MYSQL_TYPE_SET:
471     str->set_ascii(STRING_WITH_LEN("set"));
472     break;
473 
474   case MYSQL_TYPE_BLOB:
475   case MYSQL_TYPE_BLOB_COMPRESSED:
476     /*
477       Field::real_type() lies regarding the actual type of a BLOB, so
478       it is necessary to check the pack length to figure out what kind
479       of blob it really is.
480      */
481     switch (metadata)
482     {
483     case 1:
484       str->set_ascii(STRING_WITH_LEN("tinyblob"));
485       break;
486 
487     case 2:
488       str->set_ascii(STRING_WITH_LEN("blob"));
489       break;
490 
491     case 3:
492       str->set_ascii(STRING_WITH_LEN("mediumblob"));
493       break;
494 
495     case 4:
496       str->set_ascii(STRING_WITH_LEN("longblob"));
497       break;
498 
499     default:
500       DBUG_ASSERT(0);
501       break;
502     }
503 
504     if (type == MYSQL_TYPE_BLOB_COMPRESSED)
505       str->append(STRING_WITH_LEN(" compressed"));
506     break;
507 
508   case MYSQL_TYPE_STRING:
509     {
510       /*
511         This is taken from Field_string::unpack.
512       */
513       CHARSET_INFO *cs= str->charset();
514       uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
515       size_t length=0;
516       if (char_with_octets)
517         length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
518                                    "char(%u octets)", bytes);
519       else
520         length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
521                                    "binary(%u)", bytes);
522       str->length(length);
523     }
524     break;
525 
526   case MYSQL_TYPE_GEOMETRY:
527     str->set_ascii(STRING_WITH_LEN("geometry"));
528     break;
529 
530   default:
531     str->set_ascii(STRING_WITH_LEN("<unknown type>"));
532   }
533   DBUG_VOID_RETURN;
534 }
535 
536 
537 /**
538    Check the order variable and print errors if the order is not
539    acceptable according to the current settings.
540 
541    @param order  The computed order of the conversion needed.
542    @param rli    The relay log info data structure: for error reporting.
543  */
is_conversion_ok(int order,Relay_log_info * rli)544 bool is_conversion_ok(int order, Relay_log_info *rli)
545 {
546   DBUG_ENTER("is_conversion_ok");
547   bool allow_non_lossy, allow_lossy;
548 
549   allow_non_lossy = slave_type_conversions_options &
550                     (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
551   allow_lossy= slave_type_conversions_options &
552                (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
553 
554   DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
555                        allow_non_lossy ? " ALL_NON_LOSSY" : "",
556                        allow_lossy ? " ALL_LOSSY" : ""));
557   if (order < 0 && !allow_non_lossy)
558   {
559     /* !!! Add error message saying that non-lossy conversions need to be allowed. */
560     DBUG_RETURN(false);
561   }
562 
563   if (order > 0 && !allow_lossy)
564   {
565     /* !!! Add error message saying that lossy conversions need to be allowed. */
566     DBUG_RETURN(false);
567   }
568 
569   DBUG_RETURN(true);
570 }
571 
572 
573 /**
574    Can a type potentially be converted to another type?
575 
576    This function check if the types are convertible and what
577    conversion is required.
578 
579    If conversion is not possible, and error is printed.
580 
581    If conversion is possible:
582 
583    - *order will be set to -1 if source type is smaller than target
584      type and a non-lossy conversion can be required. This includes
585      the case where the field types are different but types could
586      actually be converted in either direction.
587 
588    - *order will be set to 0 if no conversion is required.
589 
590    - *order will be set to 1 if the source type is strictly larger
591       than the target type and that conversion is potentially lossy.
592 
593    @param[in] field    Target field
594    @param[in] type     Source field type
595    @param[in] metadata Source field metadata
596    @param[in] rli      Relay log info (for error reporting)
597    @param[in] mflags   Flags from the table map event
598    @param[out] order   Order between source field and target field
599 
600    @return @c true if conversion is possible according to the current
601    settings, @c false if conversion is not possible according to the
602    current setting.
603  */
604 static bool
can_convert_field_to(Field * field,enum_field_types source_type,uint16 metadata,Relay_log_info * rli,uint16 mflags,int * order_var)605 can_convert_field_to(Field *field,
606                      enum_field_types source_type, uint16 metadata,
607                      Relay_log_info *rli, uint16 mflags,
608                      int *order_var)
609 {
610   DBUG_ENTER("can_convert_field_to");
611   bool same_type;
612 #ifndef DBUG_OFF
613   char field_type_buf[MAX_FIELD_WIDTH];
614   String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
615   field->sql_type(field_type);
616   DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
617                        field_type.c_ptr_safe(), field->real_type(), source_type, metadata));
618 #endif
619   /**
620     @todo
621       Implement Field_varstring_cmopressed::real_type() and
622       Field_blob_compressed::real_type() properly. All occurencies
623       of Field::real_type() have to be inspected and adjusted if needed.
624 
625       Until it is not ready we have to compare source_type against
626       binlog_type() when replicating from or to compressed data types.
627 
628       @sa Comment for Field::binlog_type()
629   */
630   if (source_type == MYSQL_TYPE_VARCHAR_COMPRESSED ||
631       source_type == MYSQL_TYPE_BLOB_COMPRESSED ||
632       field->binlog_type() == MYSQL_TYPE_VARCHAR_COMPRESSED ||
633       field->binlog_type() == MYSQL_TYPE_BLOB_COMPRESSED)
634     same_type= field->binlog_type() == source_type;
635   else
636     same_type= field->real_type() == source_type;
637 
638   /*
639     If the real type is the same, we need to check the metadata to
640     decide if conversions are allowed.
641    */
642   if (same_type)
643   {
644     if (metadata == 0) // Metadata can only be zero if no metadata was provided
645     {
646       /*
647         If there is no metadata, we either have an old event where no
648         metadata were supplied, or a type that does not require any
649         metadata. In either case, conversion can be done but no
650         conversion table is necessary.
651        */
652       DBUG_PRINT("debug", ("Base types are identical, but there is no metadata"));
653       *order_var= 0;
654       DBUG_RETURN(true);
655     }
656 
657     DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
658     if (field->compatible_field_size(metadata, rli, mflags, order_var))
659       DBUG_RETURN(is_conversion_ok(*order_var, rli));
660     else
661       DBUG_RETURN(false);
662   }
663   else if (
664             /*
665               Conversion from MariaDB TIMESTAMP(0), TIME(0), DATETIME(0)
666               to the corresponding MySQL56 types is non-lossy.
667             */
668            (metadata == 0 &&
669             ((field->real_type() == MYSQL_TYPE_TIMESTAMP2 &&
670               source_type == MYSQL_TYPE_TIMESTAMP) ||
671              (field->real_type() == MYSQL_TYPE_TIME2 &&
672               source_type == MYSQL_TYPE_TIME) ||
673              (field->real_type() == MYSQL_TYPE_DATETIME2 &&
674               source_type == MYSQL_TYPE_DATETIME))) ||
675             /*
676               Conversion from MySQL56 TIMESTAMP(N), TIME(N), DATETIME(N)
677               to the corresponding MariaDB or MySQL55 types is non-lossy.
678             */
679             (metadata == field->decimals() &&
680              ((field->real_type() == MYSQL_TYPE_TIMESTAMP &&
681               source_type == MYSQL_TYPE_TIMESTAMP2) ||
682              (field->real_type() == MYSQL_TYPE_TIME &&
683               source_type == MYSQL_TYPE_TIME2) ||
684              (field->real_type() == MYSQL_TYPE_DATETIME &&
685               source_type == MYSQL_TYPE_DATETIME2))))
686   {
687     /*
688       TS-TODO: conversion from FSP1>FSP2.
689     */
690     *order_var= -1;
691     DBUG_RETURN(true);
692   }
693   else if (!slave_type_conversions_options)
694     DBUG_RETURN(false);
695 
696   /*
697     Here, from and to will always be different. Since the types are
698     different, we cannot use the compatible_field_size() function, but
699     have to rely on hard-coded max-sizes for fields.
700   */
701 
702   DBUG_PRINT("debug", ("Base types are different, checking conversion"));
703   switch (source_type)                      // Source type (on master)
704   {
705   case MYSQL_TYPE_DECIMAL:
706   case MYSQL_TYPE_NEWDECIMAL:
707   case MYSQL_TYPE_FLOAT:
708   case MYSQL_TYPE_DOUBLE:
709     switch (field->real_type())
710     {
711     case MYSQL_TYPE_NEWDECIMAL:
712       /*
713         Then the other type is either FLOAT, DOUBLE, or old style
714         DECIMAL, so we require lossy conversion.
715       */
716       *order_var= 1;
717       DBUG_RETURN(is_conversion_ok(*order_var, rli));
718 
719     case MYSQL_TYPE_DECIMAL:
720     case MYSQL_TYPE_FLOAT:
721     case MYSQL_TYPE_DOUBLE:
722     {
723       if (source_type == MYSQL_TYPE_NEWDECIMAL ||
724           source_type == MYSQL_TYPE_DECIMAL)
725         *order_var = 1;                         // Always require lossy conversions
726       else
727         *order_var= compare_lengths(field, source_type, metadata);
728       DBUG_ASSERT(*order_var != 0);
729       DBUG_RETURN(is_conversion_ok(*order_var, rli));
730     }
731 
732     default:
733       DBUG_RETURN(false);
734     }
735     break;
736 
737   /*
738     The length comparison check will do the correct job of comparing
739     the field lengths (in bytes) of two integer types.
740   */
741   case MYSQL_TYPE_TINY:
742   case MYSQL_TYPE_SHORT:
743   case MYSQL_TYPE_INT24:
744   case MYSQL_TYPE_LONG:
745   case MYSQL_TYPE_LONGLONG:
746     switch (field->real_type())
747     {
748     case MYSQL_TYPE_TINY:
749     case MYSQL_TYPE_SHORT:
750     case MYSQL_TYPE_INT24:
751     case MYSQL_TYPE_LONG:
752     case MYSQL_TYPE_LONGLONG:
753       /*
754         max_display_length_for_field() is not fully precise for the integer
755         data types. So its result cannot be compared to the result of
756         field->max_dispay_length() when the table field and the binlog field
757         are of the same type.
758         This code should eventually be rewritten not to use
759         compare_lengths(), to detect subtype/supetype relations
760         just using the type codes.
761       */
762       DBUG_ASSERT(source_type != field->real_type());
763       *order_var= compare_lengths(field, source_type, metadata);
764       DBUG_ASSERT(*order_var != 0);
765       DBUG_RETURN(is_conversion_ok(*order_var, rli));
766 
767     default:
768       DBUG_RETURN(false);
769     }
770     break;
771 
772   /*
773     Since source and target type is different, and it is not possible
774     to convert bit types to anything else, this will return false.
775    */
776   case MYSQL_TYPE_BIT:
777     DBUG_RETURN(false);
778 
779   /*
780     If all conversions are disabled, it is not allowed to convert
781     between these types. Since the TEXT vs. BINARY is distinguished by
782     the charset, and the charset is not replicated, we cannot
783     currently distinguish between , e.g., TEXT and BLOB.
784    */
785   case MYSQL_TYPE_TINY_BLOB:
786   case MYSQL_TYPE_MEDIUM_BLOB:
787   case MYSQL_TYPE_LONG_BLOB:
788   case MYSQL_TYPE_BLOB:
789   case MYSQL_TYPE_BLOB_COMPRESSED:
790   case MYSQL_TYPE_STRING:
791   case MYSQL_TYPE_VAR_STRING:
792   case MYSQL_TYPE_VARCHAR:
793   case MYSQL_TYPE_VARCHAR_COMPRESSED:
794     switch (field->real_type())
795     {
796     case MYSQL_TYPE_TINY_BLOB:
797     case MYSQL_TYPE_MEDIUM_BLOB:
798     case MYSQL_TYPE_LONG_BLOB:
799     case MYSQL_TYPE_BLOB:
800     case MYSQL_TYPE_BLOB_COMPRESSED:
801     case MYSQL_TYPE_STRING:
802     case MYSQL_TYPE_VAR_STRING:
803     case MYSQL_TYPE_VARCHAR:
804     case MYSQL_TYPE_VARCHAR_COMPRESSED:
805       *order_var= compare_lengths(field, source_type, metadata);
806       /*
807         Here we know that the types are different, so if the order
808         gives that they do not require any conversion, we still need
809         to have non-lossy conversion enabled to allow conversion
810         between different (string) types of the same length.
811        */
812       if (*order_var == 0)
813         *order_var= -1;
814       DBUG_RETURN(is_conversion_ok(*order_var, rli));
815 
816     default:
817       DBUG_RETURN(false);
818     }
819     break;
820 
821   case MYSQL_TYPE_GEOMETRY:
822   case MYSQL_TYPE_TIMESTAMP:
823   case MYSQL_TYPE_DATE:
824   case MYSQL_TYPE_TIME:
825   case MYSQL_TYPE_DATETIME:
826   case MYSQL_TYPE_YEAR:
827   case MYSQL_TYPE_NULL:
828   case MYSQL_TYPE_ENUM:
829   case MYSQL_TYPE_SET:
830   case MYSQL_TYPE_TIMESTAMP2:
831   case MYSQL_TYPE_TIME2:
832     DBUG_RETURN(false);
833   case MYSQL_TYPE_NEWDATE:
834     {
835       if (field->real_type() == MYSQL_TYPE_DATETIME2 ||
836           field->real_type() == MYSQL_TYPE_DATETIME)
837       {
838         *order_var= -1;
839         DBUG_RETURN(is_conversion_ok(*order_var, rli));
840       }
841       else
842       {
843         DBUG_RETURN(false);
844       }
845     }
846     break;
847 
848   //case MYSQL_TYPE_DATETIME: TODO: fix MDEV-17394 and uncomment.
849   //
850   //The "old" type does not specify the fraction part size which is required
851   //for correct conversion.
852   case MYSQL_TYPE_DATETIME2:
853     {
854       if (field->real_type() == MYSQL_TYPE_NEWDATE)
855       {
856         *order_var= 1;
857         DBUG_RETURN(is_conversion_ok(*order_var, rli));
858       }
859       else
860       {
861         DBUG_RETURN(false);
862       }
863     }
864     break;
865   }
866   DBUG_RETURN(false);                                 // To keep GCC happy
867 }
868 
869 
870 /**
871   Is the definition compatible with a table?
872 
873   This function will compare the master table with an existing table
874   on the slave and see if they are compatible with respect to the
875   current settings of @c SLAVE_TYPE_CONVERSIONS.
876 
877   If the tables are compatible and conversions are required, @c
878   *tmp_table_var will be set to a virtual temporary table with field
879   pointers for the fields that require conversions.  This allow simple
880   checking of whether a conversion are to be applied or not.
881 
882   If tables are compatible, but no conversions are necessary, @c
883   *tmp_table_var will be set to NULL.
884 
885   @param rli_arg[in]
886   Relay log info, for error reporting.
887 
888   @param table[in]
889   Table to compare with
890 
891   @param tmp_table_var[out]
892   Virtual temporary table for performing conversions, if necessary.
893 
894   @retval true Master table is compatible with slave table.
895   @retval false Master table is not compatible with slave table.
896 */
897 bool
compatible_with(THD * thd,rpl_group_info * rgi,TABLE * table,TABLE ** conv_table_var) const898 table_def::compatible_with(THD *thd, rpl_group_info *rgi,
899                            TABLE *table, TABLE **conv_table_var)
900   const
901 {
902   /*
903     We only check the initial columns for the tables.
904   */
905   uint const cols_to_check= MY_MIN(table->s->fields, size());
906   Relay_log_info *rli= rgi->rli;
907   TABLE *tmp_table= NULL;
908 
909   for (uint col= 0 ; col < cols_to_check ; ++col)
910   {
911     Field *const field= table->field[col];
912     int order;
913     if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order))
914     {
915       DBUG_PRINT("debug", ("Checking column %d -"
916                            " field '%s' can be converted - order: %d",
917                            col, field->field_name.str, order));
918       DBUG_ASSERT(order >= -1 && order <= 1);
919 
920       /*
921         If order is not 0, a conversion is required, so we need to set
922         up the conversion table.
923        */
924       if (order != 0 && tmp_table == NULL)
925       {
926         /*
927           This will create the full table with all fields. This is
928           necessary to ge the correct field lengths for the record.
929         */
930         tmp_table= create_conversion_table(thd, rgi, table);
931         if (tmp_table == NULL)
932             return false;
933         /*
934           Clear all fields up to, but not including, this column.
935         */
936         for (unsigned int i= 0; i < col; ++i)
937           tmp_table->field[i]= NULL;
938       }
939 
940       if (order == 0 && tmp_table != NULL)
941         tmp_table->field[col]= NULL;
942     }
943     else
944     {
945       DBUG_PRINT("debug", ("Checking column %d -"
946                            " field '%s' can not be converted",
947                            col, field->field_name.str));
948       DBUG_ASSERT(col < size() && col < table->s->fields);
949       DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
950       DBUG_ASSERT(table->in_use);
951       const char *db_name= table->s->db.str;
952       const char *tbl_name= table->s->table_name.str;
953       char source_buf[MAX_FIELD_WIDTH];
954       char target_buf[MAX_FIELD_WIDTH];
955       String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
956       String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
957       THD *thd= table->in_use;
958       bool char_with_octets= field->cmp_type() == STRING_RESULT ?
959                              field->has_charset() : true;
960 
961       show_sql_type(type(col), field_metadata(col), &source_type,
962                     char_with_octets);
963       field->sql_rpl_type(&target_type);
964 
965       rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED, rgi->gtid_info(),
966                   ER_THD(thd, ER_SLAVE_CONVERSION_FAILED),
967                   col, db_name, tbl_name,
968                   source_type.c_ptr_safe(), target_type.c_ptr_safe());
969       return false;
970     }
971   }
972 
973 #ifndef DBUG_OFF
974   if (tmp_table)
975   {
976     for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
977       if (tmp_table->field[col])
978       {
979         char source_buf[MAX_FIELD_WIDTH];
980         char target_buf[MAX_FIELD_WIDTH];
981         String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
982         String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
983         tmp_table->field[col]->sql_type(source_type);
984         table->field[col]->sql_type(target_type);
985         DBUG_PRINT("debug", ("Field %s - conversion required."
986                              " Source type: '%s', Target type: '%s'",
987                              tmp_table->field[col]->field_name.str,
988                              source_type.c_ptr_safe(), target_type.c_ptr_safe()));
989       }
990   }
991 #endif
992 
993   *conv_table_var= tmp_table;
994   return true;
995 }
996 
997 
998 /**
999   A wrapper to Virtual_tmp_table, to get access to its constructor,
1000   which is protected for safety purposes (against illegal use on stack).
1001 */
1002 class Virtual_conversion_table: public Virtual_tmp_table
1003 {
1004 public:
Virtual_conversion_table(THD * thd)1005   Virtual_conversion_table(THD *thd) :Virtual_tmp_table(thd) { }
1006   /**
1007     Add a new field into the virtual table.
1008     @param sql_type     - The real_type of the field.
1009     @param metadata     - The RBR binary log metadata for this field.
1010     @param target_field - The field from the target table, to get extra
1011                           attributes from (e.g. typelib in case of ENUM).
1012   */
add(enum_field_types sql_type,uint16 metadata,const Field * target_field)1013   bool add(enum_field_types sql_type,
1014            uint16 metadata, const Field *target_field)
1015   {
1016     const Type_handler *handler= Type_handler::get_handler_by_real_type(sql_type);
1017     if (!handler)
1018     {
1019       sql_print_error("In RBR mode, Slave received unknown field type field %d "
1020                       " for column Name: %s.%s.%s.",
1021                       (int) sql_type,
1022                       target_field->table->s->db.str,
1023                       target_field->table->s->table_name.str,
1024                       target_field->field_name.str);
1025       return true;
1026     }
1027     Field *tmp= handler->make_conversion_table_field(this, metadata,
1028                                                      target_field);
1029     if (!tmp)
1030       return true;
1031     Virtual_tmp_table::add(tmp);
1032     DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
1033                          " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
1034                          sql_type, target_field->field_name.str,
1035                          tmp->field_length, tmp->decimals(), TRUE,
1036                          tmp->flags, tmp->pack_length()));
1037     return false;
1038   }
1039 };
1040 
1041 
1042 /**
1043   Create a conversion table.
1044 
1045   If the function is unable to create the conversion table, an error
1046   will be printed and NULL will be returned.
1047 
1048   @return Pointer to conversion table, or NULL if unable to create
1049   conversion table.
1050  */
1051 
create_conversion_table(THD * thd,rpl_group_info * rgi,TABLE * target_table) const1052 TABLE *table_def::create_conversion_table(THD *thd, rpl_group_info *rgi,
1053                                           TABLE *target_table) const
1054 {
1055   DBUG_ENTER("table_def::create_conversion_table");
1056 
1057   Virtual_conversion_table *conv_table;
1058   Relay_log_info *rli= rgi->rli;
1059   /*
1060     At slave, columns may differ. So we should create
1061     MY_MIN(columns@master, columns@slave) columns in the
1062     conversion table.
1063   */
1064   uint const cols_to_create= MY_MIN(target_table->s->fields, size());
1065   if (!(conv_table= new(thd) Virtual_conversion_table(thd)) ||
1066       conv_table->init(cols_to_create))
1067     goto err;
1068   for (uint col= 0 ; col < cols_to_create; ++col)
1069   {
1070     if (conv_table->add(type(col), field_metadata(col),
1071                         target_table->field[col]))
1072     {
1073       DBUG_PRINT("debug", ("binlog_type: %d, metadata: %04X, target_field: '%s'"
1074                            " make_conversion_table_field() failed",
1075                            binlog_type(col), field_metadata(col),
1076                            target_table->field[col]->field_name.str));
1077       goto err;
1078     }
1079   }
1080 
1081   if (conv_table->open())
1082     goto err; // Could not allocate record buffer?
1083 
1084   DBUG_RETURN(conv_table);
1085 
1086 err:
1087   if (conv_table)
1088     delete conv_table;
1089   rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION, rgi->gtid_info(),
1090               ER_THD(thd, ER_SLAVE_CANT_CREATE_CONVERSION),
1091               target_table->s->db.str,
1092               target_table->s->table_name.str);
1093   DBUG_RETURN(NULL);
1094 }
1095 #endif /* MYSQL_CLIENT */
1096 
table_def(unsigned char * types,ulong size,uchar * field_metadata,int metadata_size,uchar * null_bitmap,uint16 flags)1097 table_def::table_def(unsigned char *types, ulong size,
1098                      uchar *field_metadata, int metadata_size,
1099                      uchar *null_bitmap, uint16 flags)
1100   : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
1101     m_field_metadata(0), m_null_bits(0), m_flags(flags),
1102     m_memory(NULL)
1103 {
1104   m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
1105                                      &m_type, size,
1106                                      &m_field_metadata,
1107                                      size * sizeof(uint16),
1108                                      &m_null_bits, (size + 7) / 8,
1109                                      NULL);
1110 
1111   bzero(m_field_metadata, size * sizeof(uint16));
1112 
1113   if (m_type)
1114     memcpy(m_type, types, size);
1115   else
1116     m_size= 0;
1117   /*
1118     Extract the data from the table map into the field metadata array
1119     iff there is field metadata. The variable metadata_size will be
1120     0 if we are replicating from an older version server since no field
1121     metadata was written to the table map. This can also happen if
1122     there were no fields in the master that needed extra metadata.
1123   */
1124   if (m_size && metadata_size)
1125   {
1126     int index= 0;
1127     for (unsigned int i= 0; i < m_size; i++)
1128     {
1129       switch (binlog_type(i)) {
1130       case MYSQL_TYPE_TINY_BLOB:
1131       case MYSQL_TYPE_BLOB:
1132       case MYSQL_TYPE_BLOB_COMPRESSED:
1133       case MYSQL_TYPE_MEDIUM_BLOB:
1134       case MYSQL_TYPE_LONG_BLOB:
1135       case MYSQL_TYPE_DOUBLE:
1136       case MYSQL_TYPE_FLOAT:
1137       case MYSQL_TYPE_GEOMETRY:
1138       {
1139         /*
1140           These types store a single byte.
1141         */
1142         m_field_metadata[i]= field_metadata[index];
1143         index++;
1144         break;
1145       }
1146       case MYSQL_TYPE_SET:
1147       case MYSQL_TYPE_ENUM:
1148       case MYSQL_TYPE_STRING:
1149       {
1150         uint16 x= field_metadata[index++] << 8U; // real_type
1151         x+= field_metadata[index++];            // pack or field length
1152         m_field_metadata[i]= x;
1153         break;
1154       }
1155       case MYSQL_TYPE_BIT:
1156       {
1157         uint16 x= field_metadata[index++];
1158         x = x + (field_metadata[index++] << 8U);
1159         m_field_metadata[i]= x;
1160         break;
1161       }
1162       case MYSQL_TYPE_VARCHAR:
1163       case MYSQL_TYPE_VARCHAR_COMPRESSED:
1164       {
1165         /*
1166           These types store two bytes.
1167         */
1168         char *ptr= (char *)&field_metadata[index];
1169         m_field_metadata[i]= uint2korr(ptr);
1170         index= index + 2;
1171         break;
1172       }
1173       case MYSQL_TYPE_NEWDECIMAL:
1174       {
1175         uint16 x= field_metadata[index++] << 8U; // precision
1176         x+= field_metadata[index++];            // decimals
1177         m_field_metadata[i]= x;
1178         break;
1179       }
1180       case MYSQL_TYPE_TIME2:
1181       case MYSQL_TYPE_DATETIME2:
1182       case MYSQL_TYPE_TIMESTAMP2:
1183         m_field_metadata[i]= field_metadata[index++];
1184         break;
1185       default:
1186         m_field_metadata[i]= 0;
1187         break;
1188       }
1189     }
1190   }
1191   if (m_size && null_bitmap)
1192     memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
1193 }
1194 
1195 
~table_def()1196 table_def::~table_def()
1197 {
1198   my_free(m_memory);
1199 #ifndef DBUG_OFF
1200   m_type= 0;
1201   m_size= 0;
1202 #endif
1203 }
1204 
1205 
1206 /**
1207    @param   even_buf    point to the buffer containing serialized event
1208    @param   event_len   length of the event accounting possible checksum alg
1209 
1210    @return  TRUE        if test fails
1211             FALSE       as success
1212 */
event_checksum_test(uchar * event_buf,ulong event_len,enum enum_binlog_checksum_alg alg)1213 bool event_checksum_test(uchar *event_buf, ulong event_len, enum enum_binlog_checksum_alg alg)
1214 {
1215   bool res= FALSE;
1216   uint16 flags= 0; // to store in FD's buffer flags orig value
1217 
1218   if (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF)
1219   {
1220     ha_checksum incoming;
1221     ha_checksum computed;
1222 
1223     if (event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT)
1224     {
1225 #ifdef DBUG_ASSERT_EXISTS
1226       int8 fd_alg= event_buf[event_len - BINLOG_CHECKSUM_LEN -
1227                              BINLOG_CHECKSUM_ALG_DESC_LEN];
1228 #endif
1229       /*
1230         FD event is checksummed and therefore verified w/o the binlog-in-use flag
1231       */
1232       flags= uint2korr(event_buf + FLAGS_OFFSET);
1233       if (flags & LOG_EVENT_BINLOG_IN_USE_F)
1234         event_buf[FLAGS_OFFSET] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1235       /*
1236          The only algorithm currently is CRC32. Zero indicates
1237          the binlog file is checksum-free *except* the FD-event.
1238       */
1239       DBUG_ASSERT(fd_alg == BINLOG_CHECKSUM_ALG_CRC32 || fd_alg == 0);
1240       DBUG_ASSERT(alg == BINLOG_CHECKSUM_ALG_CRC32);
1241       /*
1242         Complile time guard to watch over  the max number of alg
1243       */
1244       compile_time_assert(BINLOG_CHECKSUM_ALG_ENUM_END <= 0x80);
1245     }
1246     incoming= uint4korr(event_buf + event_len - BINLOG_CHECKSUM_LEN);
1247     /* checksum the event content without the checksum part itself */
1248     computed= my_checksum(0, event_buf, event_len - BINLOG_CHECKSUM_LEN);
1249     if (flags != 0)
1250     {
1251       /* restoring the orig value of flags of FD */
1252       DBUG_ASSERT(event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
1253       event_buf[FLAGS_OFFSET]= (uchar) flags;
1254     }
1255     res= DBUG_EVALUATE_IF("simulate_checksum_test_failure", TRUE, computed != incoming);
1256   }
1257   return res;
1258 }
1259 
1260 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
1261 
Deferred_log_events(Relay_log_info * rli)1262 Deferred_log_events::Deferred_log_events(Relay_log_info *rli) : last_added(NULL)
1263 {
1264   my_init_dynamic_array(&array, sizeof(Log_event *), 32, 16, MYF(0));
1265 }
1266 
~Deferred_log_events()1267 Deferred_log_events::~Deferred_log_events()
1268 {
1269   delete_dynamic(&array);
1270 }
1271 
add(Log_event * ev)1272 int Deferred_log_events::add(Log_event *ev)
1273 {
1274   last_added= ev;
1275   insert_dynamic(&array, (uchar*) &ev);
1276   return 0;
1277 }
1278 
is_empty()1279 bool Deferred_log_events::is_empty()
1280 {
1281   return array.elements == 0;
1282 }
1283 
execute(rpl_group_info * rgi)1284 bool Deferred_log_events::execute(rpl_group_info *rgi)
1285 {
1286   bool res= false;
1287   DBUG_ENTER("Deferred_log_events::execute");
1288   DBUG_ASSERT(rgi->deferred_events_collecting);
1289 
1290   rgi->deferred_events_collecting= false;
1291   for (uint i=  0; !res && i < array.elements; i++)
1292   {
1293     Log_event *ev= (* (Log_event **)
1294                     dynamic_array_ptr(&array, i));
1295     res= ev->apply_event(rgi);
1296   }
1297   rgi->deferred_events_collecting= true;
1298   DBUG_RETURN(res);
1299 }
1300 
rewind()1301 void Deferred_log_events::rewind()
1302 {
1303   /*
1304     Reset preceding Query log event events which execution was
1305     deferred because of slave side filtering.
1306   */
1307   if (!is_empty())
1308   {
1309     for (uint i=  0; i < array.elements; i++)
1310     {
1311       Log_event *ev= *(Log_event **) dynamic_array_ptr(&array, i);
1312       delete ev;
1313     }
1314     last_added= NULL;
1315     if (array.elements > array.max_element)
1316       freeze_size(&array);
1317     reset_dynamic(&array);
1318   }
1319   last_added= NULL;
1320 }
1321 
1322 #endif
1323