1 /*
2    Copyright (c) 2004, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "consumer_restore.hpp"
26 #include <kernel/ndb_limits.h>
27 #include <NdbSleep.h>
28 #include <NdbTick.h>
29 #include <NdbToolsProgramExitCodes.hpp>
30 #include <Properties.hpp>
31 #include <NdbTypesUtil.hpp>
32 #include <ndb_rand.h>
33 
34 #include <ndb_internal.hpp>
35 #include <ndb_logevent.h>
36 #include "../src/ndbapi/NdbDictionaryImpl.hpp"
37 #include "../ndb_lib_move_data.hpp"
38 
39 #define NDB_ANYVALUE_FOR_NOLOGGING 0x8000007f
40 
41 /**
42  * PK mapping index has a known name.
43  * Multiple ndb_restore instances can share an index
44  */
45 static const char* PK_MAPPING_IDX_NAME = "NDB$RESTORE_PK_MAPPING";
46 
47 extern FilteredNdbOut err;
48 extern FilteredNdbOut info;
49 extern FilteredNdbOut debug;
50 extern RestoreLogger restoreLogger;
51 
52 static void callback(int, NdbTransaction*, void*);
53 static Uint32 get_part_id(const NdbDictionary::Table *table,
54                           Uint32 hash_value);
55 
56 extern BaseString g_options;
57 extern unsigned int opt_no_binlog;
58 extern bool ga_skip_broken_objects;
59 
60 extern Properties g_rewrite_databases;
61 
62 bool BackupRestore::m_preserve_trailing_spaces = false;
63 
64 // ----------------------------------------------------------------------
65 // conversion handlers
66 // ----------------------------------------------------------------------
67 
68 void *
convert_bitset(const void * source,void * target,bool & truncated)69 BackupRestore::convert_bitset(const void *source,
70                               void *target,
71                               bool &truncated)
72 {
73   if (!source || !target)
74     return NULL;
75 
76   // shortcuts
77   const unsigned char * const s = (const unsigned char *)source;
78   char_n_padding_struct * const t = (char_n_padding_struct *)target;
79 
80   // write data
81   if (t->n_new >= t->n_old)
82   {
83     // clear all bits
84     memset(t->new_row, 0, t->n_new);
85 
86     memcpy(t->new_row, s, t->n_old);
87     truncated = false;
88   } else {
89     // set all bits, for parity with replication's demotion semantics
90     memset(t->new_row, 0xFF, t->n_new);
91     truncated = true;
92   }
93 
94   return t->new_row;
95 }
96 
97 template< typename S, typename T >
98 void *
convert_array(const void * source,void * target,bool & truncated)99 BackupRestore::convert_array(const void * source,
100                              void * target,
101                              bool & truncated)
102 {
103   if (!source || !target)
104     return NULL;
105 
106   // shortcuts (note that all S::... and T::... are compile-time expr)
107   const unsigned char * const s = (const unsigned char *)source;
108   char_n_padding_struct * const t = (char_n_padding_struct *)target;
109   const Uint32 s_prefix_length = S::lengthPrefixSize();
110   const Uint32 t_prefix_length = T::lengthPrefixSize();
111 
112   // read and adjust length
113   Uint32 length = (S::isFixedSized() ? t->n_old : S::readLengthPrefix(s));
114   const Uint32 max_length = t->n_new - t_prefix_length;
115   if (S::isFixedSized() && !m_preserve_trailing_spaces) {
116     const char s_padding_char = (S::isBinary() ? 0x00 : ' ');
117     // ignore padding chars for data copying or truncation reporting
118     while (length > 0 && s[length - 1] == s_padding_char) {
119       length--;
120     }
121   }
122   if (length <= max_length) {
123     truncated = false;
124   } else {
125     length = max_length;
126     truncated = true;
127   }
128 
129   // write length prefix
130   if (!T::isFixedSized()) {
131     T::writeLengthPrefix(t->new_row, length);
132   }
133 
134   // write data
135   memcpy(t->new_row + t_prefix_length, s + s_prefix_length, length);
136 
137   // write padding
138   if (T::isFixedSized()) {
139     const char t_padding_char = (T::isBinary() ? 0x00 : ' ');
140     const Uint32 l = max_length - length;
141     memset(t->new_row + t_prefix_length + length, t_padding_char, l);
142   }
143 
144   return t->new_row;
145 }
146 
147 template< typename S, typename T >
148 void *
convert_integral(const void * source,void * target,bool & truncated)149 BackupRestore::convert_integral(const void * source,
150                                 void * target,
151                                 bool & truncated)
152 {
153   if (!source || !target)
154     return NULL;
155 
156   // read the source value
157   typename S::DomainT s;
158   S::load(&s, (char *)source);
159 
160   // Note: important to correctly handle mixed signedness comparisons.
161   //
162   // The problem: A straight-forward approach to convert value 's' into
163   // type 'T' might be to check into which of these subranges 's' falls
164   //    ... < T's lower bound <= ... <= T's upper bound < ...
165   // However, this approach is _incorrect_ when applied to generic code
166   //    if (s < T::lowest()) ... else if (s > T::highest()) ... else ...
167   // since 'S' and 'T' may be types of different signedness.
168   //
169   // Under ansi (and even more K&R) C promotion rules, if 'T' is unsigned
170   // and if there's no larger signed type available, the value 's' gets
171   // promoted to unsigned; then, a negative value of 's' becomes (large)
172   // positive -- with a wrong comparison outcome.
173   //
174   // Furthermore, the code should not trigger compiler warnings for any
175   // selection of integral types 'S', 'T' ("mixed signedness comparison",
176   // "comparison of unsigned expression <0 / >=0 is always false/true").
177   //
178   // The correct approach: do lower bound comparisons on signed types and
179   // upper bound comparisons on unsigned types only; this requires casts.
180   // For the casts to be safe, compare the value against the zero literal
181   //    if (s <= 0) { check as signed } else { check as unsigned }
182   // which is a valid + nontrivial test for signed and unsigned types.
183   //
184   // This implies that correct, generic conversion code must test into
185   // which of these _four_ subranges value 's' falls
186   //    ... < T's lower bound <= ... <= 0 < ... <= T's upper bound < ...
187   // while handling 's' as signed/unsigned where less-equal/greater zero.
188   //
189   // Obviously, simplifications are possible if 'S' is unsigned or known
190   // to be a subset of 'T'.  This can be accomplished by a few additional
191   // compile-time expression tests, which allow code optimization to
192   // issue fewer checks for certain specializations of types 'S' and 'T'.
193 
194   // write the target value
195   typename T::DomainT t;
196   if (s <= 0) {
197 
198     // check value against lower bound as _signed_, safe since all <= 0
199     assert(S::lowest() <= 0 && T::lowest() <= 0 && s <= 0);
200     const typename S::SignedT s_l_s = S::asSigned(S::lowest());
201     const typename T::SignedT t_l_s = T::asSigned(T::lowest());
202     const typename S::SignedT s_s = S::asSigned(s);
203     if ((s_l_s < t_l_s)      // compile-time expr
204         && (s_s < t_l_s)) {  // lower bound check
205       t = T::lowest();
206       truncated = true;
207     } else {                 // within both bounds
208       t = static_cast< typename T::DomainT >(s);
209       truncated = false;
210     }
211 
212   } else { // (s > 0)
213 
214     // check value against upper bound as _unsigned_, safe since all > 0
215     assert(S::highest() > 0 && T::highest() > 0 && s > 0);
216     const typename S::UnsignedT s_h_u = S::asUnsigned(S::highest());
217     const typename T::UnsignedT t_h_u = T::asUnsigned(T::highest());
218     const typename S::UnsignedT s_u = S::asUnsigned(s);
219     if ((s_h_u > t_h_u)      // compile-time expr
220         && (s_u > t_h_u)) {  // upper bound check
221       t = T::highest();
222       truncated = true;
223     } else {                 // within both bounds
224       t = static_cast< typename T::DomainT >(s);
225       truncated = false;
226     }
227 
228   }
229   T::store((char *)target, &t);
230 
231   return target;
232 }
233 
234 static uint
truncate_fraction(uint f,uint n_old,uint n_new,bool & truncated)235 truncate_fraction(uint f, uint n_old, uint n_new, bool& truncated)
236 {
237   static const uint pow10[1 + 6] = {
238     1, 10, 100, 1000, 10000, 100000, 1000000
239   };
240   assert(n_old <= 6 && n_new <= 6);
241   if (n_old <= n_new)
242     return f;
243   uint k = n_old - n_new;
244   uint n = pow10[k];
245   uint g = f / n;
246   if (g * n != f)
247     truncated = true;
248   return g;
249 }
250 
251 void *
convert_time_time2(const void * source,void * target,bool & truncated)252 BackupRestore::convert_time_time2(const void * source,
253                                   void * target,
254                                   bool & truncated)
255 {
256   if (!source || !target)
257     return NULL;
258 
259   const uchar* s = (const uchar*)source;
260   char_n_padding_struct* t = (char_n_padding_struct*)target;
261   assert(t->n_old == 0 && t->n_new <= 6);
262 
263   NdbSqlUtil::Time ss;
264   NdbSqlUtil::Time2 ts;
265   truncated = false;
266 
267   NdbSqlUtil::unpack_time(ss, s);
268 
269   ts.sign = ss.sign;
270   ts.interval = 0;
271   ts.hour = ss.hour;
272   ts.minute = ss.minute;
273   ts.second = ss.second;
274   ts.fraction = 0;
275   NdbSqlUtil::pack_time2(ts, (uchar*)t->new_row, t->n_new);
276 
277   return t->new_row;
278 }
279 
280 void *
convert_time2_time(const void * source,void * target,bool & truncated)281 BackupRestore::convert_time2_time(const void * source,
282                                   void * target,
283                                   bool & truncated)
284 {
285   if (!source || !target)
286     return NULL;
287 
288   const uchar* s = (const uchar*)source;
289   char_n_padding_struct* t = (char_n_padding_struct*)target;
290   assert(t->n_old <= 6 && t->n_new == 0);
291 
292   NdbSqlUtil::Time2 ss;
293   NdbSqlUtil::Time ts;
294   truncated = false;
295 
296   NdbSqlUtil::unpack_time2(ss, s, t->n_old);
297   if (ss.fraction != 0)
298     truncated = true;
299 
300   ts.sign = ss.sign;
301   ts.hour = ss.hour;
302   ts.minute = ss.minute;
303   ts.second = ss.second;
304   NdbSqlUtil::pack_time(ts, (uchar*)t->new_row);
305 
306   return t->new_row;
307 }
308 
309 void *
convert_time2_time2(const void * source,void * target,bool & truncated)310 BackupRestore::convert_time2_time2(const void * source,
311                                    void * target,
312                                    bool & truncated)
313 {
314   if (!source || !target)
315     return NULL;
316 
317   const uchar* s = (const uchar*)source;
318   char_n_padding_struct* t = (char_n_padding_struct*)target;
319   assert(t->n_old <= 6 && t->n_new <= 6);
320 
321   NdbSqlUtil::Time2 ss;
322   NdbSqlUtil::Time2 ts;
323   truncated = false;
324 
325   NdbSqlUtil::unpack_time2(ss, s, t->n_old);
326   uint fraction = truncate_fraction(ss.fraction,
327                                     t->n_old, t->n_new, truncated);
328 
329   ts.sign = ss.sign;
330   ts.interval = ss.interval;
331   ts.hour = ss.hour;
332   ts.minute = ss.minute;
333   ts.second = ss.second;
334   ts.fraction = fraction;
335   NdbSqlUtil::pack_time2(ts, (uchar*)t->new_row, t->n_new);
336 
337   return t->new_row;
338 }
339 
340 void *
convert_datetime_datetime2(const void * source,void * target,bool & truncated)341 BackupRestore::convert_datetime_datetime2(const void * source,
342                                           void * target,
343                                           bool & truncated)
344 {
345   if (!source || !target)
346     return NULL;
347 
348   const uchar* s = (const uchar*)source;
349   char_n_padding_struct* t = (char_n_padding_struct*)target;
350   assert(t->n_old == 0 && t->n_new <= 6);
351 
352   NdbSqlUtil::Datetime ss;
353   NdbSqlUtil::Datetime2 ts;
354   truncated = false;
355 
356   NdbSqlUtil::unpack_datetime(ss, s);
357 
358   ts.sign = 1;
359   ts.year = ss.year;
360   ts.month = ss.month;
361   ts.day = ss.day;
362   ts.hour = ss.hour;
363   ts.minute = ss.minute;
364   ts.second = ss.second;
365   ts.fraction = 0;
366   NdbSqlUtil::pack_datetime2(ts, (uchar*)t->new_row, t->n_new);
367 
368   return t->new_row;
369 }
370 
371 void *
convert_datetime2_datetime(const void * source,void * target,bool & truncated)372 BackupRestore::convert_datetime2_datetime(const void * source,
373                                           void * target,
374                                           bool & truncated)
375 {
376   if (!source || !target)
377     return NULL;
378 
379   const uchar* s = (const uchar*)source;
380   char_n_padding_struct* t = (char_n_padding_struct*)target;
381   assert(t->n_old <= 6 && t->n_new == 0);
382 
383   NdbSqlUtil::Datetime2 ss;
384   NdbSqlUtil::Datetime ts;
385   truncated = false;
386 
387   NdbSqlUtil::unpack_datetime2(ss, s, t->n_old);
388   if (ss.fraction != 0)
389     truncated = true;
390   if (ss.sign != 1) // should not happen
391     truncated = true;
392 
393   ts.year = ss.year;
394   ts.month = ss.month;
395   ts.day = ss.day;
396   ts.hour = ss.hour;
397   ts.minute = ss.minute;
398   ts.second = ss.second;
399   NdbSqlUtil::pack_datetime(ts, (uchar*)t->new_row);
400 
401   return t->new_row;
402 }
403 
404 void *
convert_datetime2_datetime2(const void * source,void * target,bool & truncated)405 BackupRestore::convert_datetime2_datetime2(const void * source,
406                                            void * target,
407                                            bool & truncated)
408 {
409   if (!source || !target)
410     return NULL;
411 
412   const uchar* s = (const uchar*)source;
413   char_n_padding_struct* t = (char_n_padding_struct*)target;
414   assert(t->n_old <= 6 && t->n_new <= 6);
415 
416   NdbSqlUtil::Datetime2 ss;
417   NdbSqlUtil::Datetime2 ts;
418   truncated = false;
419 
420   NdbSqlUtil::unpack_datetime2(ss, s, t->n_old);
421   uint fraction = truncate_fraction(ss.fraction,
422                                     t->n_old, t->n_new, truncated);
423 
424   ts.sign = ss.sign;
425   ts.year = ss.year;
426   ts.month = ss.month;
427   ts.day = ss.day;
428   ts.hour = ss.hour;
429   ts.minute = ss.minute;
430   ts.second = ss.second;
431   ts.fraction = fraction;
432   NdbSqlUtil::pack_datetime2(ts, (uchar*)t->new_row, t->n_new);
433 
434   return t->new_row;
435 }
436 
437 void *
convert_timestamp_timestamp2(const void * source,void * target,bool & truncated)438 BackupRestore::convert_timestamp_timestamp2(const void * source,
439                                             void * target,
440                                             bool & truncated)
441 {
442   if (!source || !target)
443     return NULL;
444 
445   const uchar* s = (const uchar*)source;
446   char_n_padding_struct* t = (char_n_padding_struct*)target;
447   assert(t->n_old == 0 && t->n_new <= 6);
448 
449   NdbSqlUtil::Timestamp ss;
450   NdbSqlUtil::Timestamp2 ts;
451   truncated = false;
452 
453   NdbSqlUtil::unpack_timestamp(ss, s);
454 
455   ts.second = ss.second;
456   ts.fraction = 0;
457   NdbSqlUtil::pack_timestamp2(ts, (uchar*)t->new_row, t->n_new);
458 
459   return t->new_row;
460 }
461 
462 void *
convert_timestamp2_timestamp(const void * source,void * target,bool & truncated)463 BackupRestore::convert_timestamp2_timestamp(const void * source,
464                                             void * target,
465                                             bool & truncated)
466 {
467   if (!source || !target)
468     return NULL;
469 
470   const uchar* s = (const uchar*)source;
471   char_n_padding_struct* t = (char_n_padding_struct*)target;
472   assert(t->n_old <= 6 && t->n_new == 0);
473 
474   NdbSqlUtil::Timestamp2 ss;
475   NdbSqlUtil::Timestamp ts;
476   truncated = false;
477 
478   NdbSqlUtil::unpack_timestamp2(ss, s, t->n_old);
479   if (ss.fraction != 0)
480     truncated = true;
481 
482   ts.second = ss.second;
483   NdbSqlUtil::pack_timestamp(ts, (uchar*)t->new_row);
484 
485   return t->new_row;
486 }
487 
488 void *
convert_timestamp2_timestamp2(const void * source,void * target,bool & truncated)489 BackupRestore::convert_timestamp2_timestamp2(const void * source,
490                                              void * target,
491                                              bool & truncated)
492 {
493   if (!source || !target)
494     return NULL;
495 
496   const uchar* s = (const uchar*)source;
497   char_n_padding_struct* t = (char_n_padding_struct*)target;
498   assert(t->n_old <= 6 && t->n_new <= 6);
499 
500   NdbSqlUtil::Timestamp2 ss;
501   NdbSqlUtil::Timestamp2 ts;
502   truncated = false;
503 
504   NdbSqlUtil::unpack_timestamp2(ss, s, t->n_old);
505   uint fraction = truncate_fraction(ss.fraction,
506                                     t->n_old, t->n_new, truncated);
507 
508   ts.second = ss.second;
509   ts.fraction = fraction;
510   NdbSqlUtil::pack_timestamp2(ts, (uchar*)t->new_row, t->n_new);
511 
512   return t->new_row;
513 }
514 
515 // ----------------------------------------------------------------------
516 // conversion rules
517 // ----------------------------------------------------------------------
518 
519 const PromotionRules
520 BackupRestore::m_allowed_promotion_attrs[] = {
521   // bitset promotions/demotions
522   {NDBCOL::Bit,            NDBCOL::Bit,            check_compat_sizes,
523    convert_bitset},
524 
525   // char array promotions/demotions
526   {NDBCOL::Char,           NDBCOL::Char,           check_compat_sizes,
527    convert_array< Hchar, Hchar >},
528   {NDBCOL::Char,           NDBCOL::Varchar,        check_compat_sizes,
529    convert_array< Hchar, Hvarchar >},
530   {NDBCOL::Char,           NDBCOL::Longvarchar,    check_compat_sizes,
531    convert_array< Hchar, Hlongvarchar >},
532   {NDBCOL::Varchar,        NDBCOL::Char,           check_compat_sizes,
533    convert_array< Hvarchar, Hchar >},
534   {NDBCOL::Varchar,        NDBCOL::Varchar,        check_compat_sizes,
535    convert_array< Hvarchar, Hvarchar >},
536   {NDBCOL::Varchar,        NDBCOL::Longvarchar,    check_compat_sizes,
537    convert_array< Hvarchar, Hlongvarchar >},
538   {NDBCOL::Longvarchar,    NDBCOL::Char,           check_compat_sizes,
539    convert_array< Hlongvarchar, Hchar >},
540   {NDBCOL::Longvarchar,    NDBCOL::Varchar,        check_compat_sizes,
541    convert_array< Hlongvarchar, Hvarchar >},
542   {NDBCOL::Longvarchar,    NDBCOL::Longvarchar,    check_compat_sizes,
543    convert_array< Hlongvarchar, Hlongvarchar >},
544 
545   // binary array promotions/demotions
546   {NDBCOL::Binary,         NDBCOL::Binary,         check_compat_sizes,
547    convert_array< Hbinary, Hbinary >},
548   {NDBCOL::Binary,         NDBCOL::Varbinary,      check_compat_sizes,
549    convert_array< Hbinary, Hvarbinary >},
550   {NDBCOL::Binary,         NDBCOL::Longvarbinary,  check_compat_sizes,
551    convert_array< Hbinary, Hlongvarbinary >},
552   {NDBCOL::Varbinary,      NDBCOL::Binary,         check_compat_sizes,
553    convert_array< Hvarbinary, Hbinary >},
554   {NDBCOL::Varbinary,      NDBCOL::Varbinary,      check_compat_sizes,
555    convert_array< Hvarbinary, Hvarbinary >},
556   {NDBCOL::Varbinary,      NDBCOL::Longvarbinary,  check_compat_sizes,
557    convert_array< Hvarbinary, Hlongvarbinary >},
558   {NDBCOL::Longvarbinary,  NDBCOL::Binary,         check_compat_sizes,
559    convert_array< Hlongvarbinary, Hbinary >},
560   {NDBCOL::Longvarbinary,  NDBCOL::Varbinary,      check_compat_sizes,
561    convert_array< Hlongvarbinary, Hvarbinary >},
562   {NDBCOL::Longvarbinary,  NDBCOL::Longvarbinary,  check_compat_sizes,
563    convert_array< Hlongvarbinary, Hlongvarbinary >},
564 
565   // char to binary promotions/demotions
566   {NDBCOL::Char,           NDBCOL::Binary,         check_compat_char_binary,
567    convert_array< Hchar, Hbinary >},
568   {NDBCOL::Char,           NDBCOL::Varbinary,      check_compat_char_binary,
569    convert_array< Hchar, Hvarbinary >},
570   {NDBCOL::Char,           NDBCOL::Longvarbinary,  check_compat_char_binary,
571    convert_array< Hchar, Hlongvarbinary >},
572   {NDBCOL::Varchar,        NDBCOL::Binary,         check_compat_char_binary,
573    convert_array< Hvarchar, Hbinary >},
574   {NDBCOL::Varchar,        NDBCOL::Varbinary,      check_compat_char_binary,
575    convert_array< Hvarchar, Hvarbinary >},
576   {NDBCOL::Varchar,        NDBCOL::Longvarbinary,  check_compat_char_binary,
577    convert_array< Hvarchar, Hlongvarbinary >},
578   {NDBCOL::Longvarchar,    NDBCOL::Binary,         check_compat_char_binary,
579    convert_array< Hlongvarchar, Hbinary >},
580   {NDBCOL::Longvarchar,    NDBCOL::Varbinary,      check_compat_char_binary,
581    convert_array< Hlongvarchar, Hvarbinary >},
582   {NDBCOL::Longvarchar,    NDBCOL::Longvarbinary,  check_compat_char_binary,
583    convert_array< Hlongvarchar, Hlongvarbinary >},
584 
585   // binary to char promotions/demotions
586   {NDBCOL::Binary,         NDBCOL::Char,           check_compat_char_binary,
587    convert_array< Hbinary, Hchar >},
588   {NDBCOL::Binary,         NDBCOL::Varchar,        check_compat_char_binary,
589    convert_array< Hbinary, Hvarchar >},
590   {NDBCOL::Binary,         NDBCOL::Longvarchar,    check_compat_char_binary,
591    convert_array< Hbinary, Hlongvarchar >},
592   {NDBCOL::Varbinary,      NDBCOL::Char,           check_compat_char_binary,
593    convert_array< Hvarbinary, Hchar >},
594   {NDBCOL::Varbinary,      NDBCOL::Varchar,        check_compat_char_binary,
595    convert_array< Hvarbinary, Hvarchar >},
596   {NDBCOL::Varbinary,      NDBCOL::Longvarchar,    check_compat_char_binary,
597    convert_array< Hvarbinary, Hlongvarchar >},
598   {NDBCOL::Longvarbinary,  NDBCOL::Char,           check_compat_char_binary,
599    convert_array< Hlongvarbinary, Hchar >},
600   {NDBCOL::Longvarbinary,  NDBCOL::Varchar,        check_compat_char_binary,
601    convert_array< Hlongvarbinary, Hvarchar >},
602   {NDBCOL::Longvarbinary,  NDBCOL::Longvarchar,    check_compat_char_binary,
603    convert_array< Hlongvarbinary, Hlongvarchar >},
604 
605   // char to text promotions (uses staging table)
606   {NDBCOL::Char,           NDBCOL::Text,           check_compat_char_to_text,
607    NULL},
608   {NDBCOL::Varchar,        NDBCOL::Text,           check_compat_char_to_text,
609    NULL},
610   {NDBCOL::Longvarchar,    NDBCOL::Text,           check_compat_char_to_text,
611    NULL},
612 
613   // text to char promotions (uses staging table)
614   {NDBCOL::Text,           NDBCOL::Char,           check_compat_text_to_char,
615    NULL},
616   {NDBCOL::Text,           NDBCOL::Varchar,        check_compat_text_to_char,
617    NULL},
618   {NDBCOL::Text,           NDBCOL::Longvarchar,    check_compat_text_to_char,
619    NULL},
620 
621   // text to text promotions (uses staging table)
622   // required when part lengths of text columns are not equal
623   {NDBCOL::Text,           NDBCOL::Text,           check_compat_text_to_text,
624    NULL},
625 
626   // text to blob promotions (uses staging table)
627   // blobs use the BINARY charset, while texts use charsets like UTF8
628   // ignore charset diffs by using check_compat_blob_to_blob
629   {NDBCOL::Text,           NDBCOL::Blob, check_compat_blob_to_blob,
630    NULL},
631 
632   // binary to blob promotions (uses staging table)
633   {NDBCOL::Binary,         NDBCOL::Blob,           check_compat_binary_to_blob,
634    NULL},
635   {NDBCOL::Varbinary,      NDBCOL::Blob,           check_compat_binary_to_blob,
636    NULL},
637   {NDBCOL::Longvarbinary,  NDBCOL::Blob,           check_compat_binary_to_blob,
638    NULL},
639 
640   // blob to binary promotions (uses staging table)
641   {NDBCOL::Blob,           NDBCOL::Binary,         check_compat_blob_to_binary,
642    NULL},
643   {NDBCOL::Blob,           NDBCOL::Varbinary,      check_compat_blob_to_binary,
644    NULL},
645   {NDBCOL::Blob,           NDBCOL::Longvarbinary,  check_compat_blob_to_binary,
646    NULL},
647 
648   // blob to blob promotions (uses staging table)
649   // required when part lengths of blob columns are not equal
650   {NDBCOL::Blob,           NDBCOL::Blob,           check_compat_blob_to_blob,
651    NULL},
652 
653   // blob to text promotions (uses staging table)
654   // blobs use the BINARY charset, while texts use charsets like UTF8
655   // ignore charset diffs by using check_compat_blob_to_blob
656   {NDBCOL::Blob,           NDBCOL::Text, check_compat_blob_to_blob,
657    NULL},
658 
659   // integral promotions
660   {NDBCOL::Tinyint,        NDBCOL::Smallint,       check_compat_promotion,
661    convert_integral< Hint8, Hint16>},
662   {NDBCOL::Tinyint,        NDBCOL::Mediumint,      check_compat_promotion,
663    convert_integral< Hint8, Hint24>},
664   {NDBCOL::Tinyint,        NDBCOL::Int,            check_compat_promotion,
665    convert_integral< Hint8, Hint32>},
666   {NDBCOL::Tinyint,        NDBCOL::Bigint,         check_compat_promotion,
667    convert_integral< Hint8, Hint64>},
668   {NDBCOL::Smallint,       NDBCOL::Mediumint,      check_compat_promotion,
669    convert_integral< Hint16, Hint24>},
670   {NDBCOL::Smallint,       NDBCOL::Int,            check_compat_promotion,
671    convert_integral< Hint16, Hint32>},
672   {NDBCOL::Smallint,       NDBCOL::Bigint,         check_compat_promotion,
673    convert_integral< Hint16, Hint64>},
674   {NDBCOL::Mediumint,      NDBCOL::Int,            check_compat_promotion,
675    convert_integral< Hint24, Hint32>},
676   {NDBCOL::Mediumint,      NDBCOL::Bigint,         check_compat_promotion,
677    convert_integral< Hint24, Hint64>},
678   {NDBCOL::Int,            NDBCOL::Bigint,         check_compat_promotion,
679    convert_integral< Hint32, Hint64>},
680   {NDBCOL::Tinyunsigned,   NDBCOL::Smallunsigned,  check_compat_promotion,
681    convert_integral< Huint8, Huint16>},
682   {NDBCOL::Tinyunsigned,   NDBCOL::Mediumunsigned, check_compat_promotion,
683    convert_integral< Huint8, Huint24>},
684   {NDBCOL::Tinyunsigned,   NDBCOL::Unsigned,       check_compat_promotion,
685    convert_integral< Huint8, Huint32>},
686   {NDBCOL::Tinyunsigned,   NDBCOL::Bigunsigned,    check_compat_promotion,
687    convert_integral< Huint8, Huint64>},
688   {NDBCOL::Smallunsigned,  NDBCOL::Mediumunsigned, check_compat_promotion,
689    convert_integral< Huint16, Huint24>},
690   {NDBCOL::Smallunsigned,  NDBCOL::Unsigned,       check_compat_promotion,
691    convert_integral< Huint16, Huint32>},
692   {NDBCOL::Smallunsigned,  NDBCOL::Bigunsigned,    check_compat_promotion,
693    convert_integral< Huint16, Huint64>},
694   {NDBCOL::Mediumunsigned, NDBCOL::Unsigned,       check_compat_promotion,
695    convert_integral< Huint24, Huint32>},
696   {NDBCOL::Mediumunsigned, NDBCOL::Bigunsigned,    check_compat_promotion,
697    convert_integral< Huint24, Huint64>},
698   {NDBCOL::Unsigned,       NDBCOL::Bigunsigned,    check_compat_promotion,
699    convert_integral< Huint32, Huint64>},
700 
701   // integral demotions
702   {NDBCOL::Smallint,       NDBCOL::Tinyint,        check_compat_lossy,
703    convert_integral< Hint16, Hint8>},
704   {NDBCOL::Mediumint,      NDBCOL::Tinyint,        check_compat_lossy,
705    convert_integral< Hint24, Hint8>},
706   {NDBCOL::Mediumint,      NDBCOL::Smallint,       check_compat_lossy,
707    convert_integral< Hint24, Hint16>},
708   {NDBCOL::Int,            NDBCOL::Tinyint,        check_compat_lossy,
709    convert_integral< Hint32, Hint8>},
710   {NDBCOL::Int,            NDBCOL::Smallint,       check_compat_lossy,
711    convert_integral< Hint32, Hint16>},
712   {NDBCOL::Int,            NDBCOL::Mediumint,      check_compat_lossy,
713    convert_integral< Hint32, Hint24>},
714   {NDBCOL::Bigint,         NDBCOL::Tinyint,        check_compat_lossy,
715    convert_integral< Hint64, Hint8>},
716   {NDBCOL::Bigint,         NDBCOL::Smallint,       check_compat_lossy,
717    convert_integral< Hint64, Hint16>},
718   {NDBCOL::Bigint,         NDBCOL::Mediumint,      check_compat_lossy,
719    convert_integral< Hint64, Hint24>},
720   {NDBCOL::Bigint,         NDBCOL::Int,            check_compat_lossy,
721    convert_integral< Hint64, Hint32>},
722   {NDBCOL::Smallunsigned,  NDBCOL::Tinyunsigned,   check_compat_lossy,
723    convert_integral< Huint16, Huint8>},
724   {NDBCOL::Mediumunsigned, NDBCOL::Tinyunsigned,   check_compat_lossy,
725    convert_integral< Huint24, Huint8>},
726   {NDBCOL::Mediumunsigned, NDBCOL::Smallunsigned,  check_compat_lossy,
727    convert_integral< Huint24, Huint16>},
728   {NDBCOL::Unsigned,       NDBCOL::Tinyunsigned,   check_compat_lossy,
729    convert_integral< Huint32, Huint8>},
730   {NDBCOL::Unsigned,       NDBCOL::Smallunsigned,  check_compat_lossy,
731    convert_integral< Huint32, Huint16>},
732   {NDBCOL::Unsigned,       NDBCOL::Mediumunsigned, check_compat_lossy,
733    convert_integral< Huint32, Huint24>},
734   {NDBCOL::Bigunsigned,    NDBCOL::Tinyunsigned,   check_compat_lossy,
735    convert_integral< Huint64, Huint8>},
736   {NDBCOL::Bigunsigned,    NDBCOL::Smallunsigned,  check_compat_lossy,
737    convert_integral< Huint64, Huint16>},
738   {NDBCOL::Bigunsigned,    NDBCOL::Mediumunsigned, check_compat_lossy,
739    convert_integral< Huint64, Huint24>},
740   {NDBCOL::Bigunsigned,    NDBCOL::Unsigned,       check_compat_lossy,
741    convert_integral< Huint64, Huint32>},
742 
743   // integral signedness conversions
744   {NDBCOL::Tinyint,        NDBCOL::Tinyunsigned,   check_compat_lossy,
745    convert_integral< Hint8, Huint8>},
746   {NDBCOL::Smallint,       NDBCOL::Smallunsigned,  check_compat_lossy,
747    convert_integral< Hint16, Huint16>},
748   {NDBCOL::Mediumint,      NDBCOL::Mediumunsigned, check_compat_lossy,
749    convert_integral< Hint24, Huint24>},
750   {NDBCOL::Int,            NDBCOL::Unsigned,       check_compat_lossy,
751    convert_integral< Hint32, Huint32>},
752   {NDBCOL::Bigint,         NDBCOL::Bigunsigned,    check_compat_lossy,
753    convert_integral< Hint64, Huint64>},
754   {NDBCOL::Tinyunsigned,   NDBCOL::Tinyint,        check_compat_lossy,
755    convert_integral< Huint8, Hint8>},
756   {NDBCOL::Smallunsigned,  NDBCOL::Smallint,       check_compat_lossy,
757    convert_integral< Huint16, Hint16>},
758   {NDBCOL::Mediumunsigned, NDBCOL::Mediumint,      check_compat_lossy,
759    convert_integral< Huint24, Hint24>},
760   {NDBCOL::Unsigned,       NDBCOL::Int,            check_compat_lossy,
761    convert_integral< Huint32, Hint32>},
762   {NDBCOL::Bigunsigned,    NDBCOL::Bigint,         check_compat_lossy,
763    convert_integral< Huint64, Hint64>},
764 
765   // integral signedness+promotion conversions
766   {NDBCOL::Tinyint,        NDBCOL::Smallunsigned,  check_compat_lossy,
767    convert_integral< Hint8, Huint16>},
768   {NDBCOL::Tinyint,        NDBCOL::Mediumunsigned, check_compat_lossy,
769    convert_integral< Hint8, Huint24>},
770   {NDBCOL::Tinyint,        NDBCOL::Unsigned,       check_compat_lossy,
771    convert_integral< Hint8, Huint32>},
772   {NDBCOL::Tinyint,        NDBCOL::Bigunsigned,    check_compat_lossy,
773    convert_integral< Hint8, Huint64>},
774   {NDBCOL::Smallint,       NDBCOL::Mediumunsigned, check_compat_lossy,
775    convert_integral< Hint16, Huint24>},
776   {NDBCOL::Smallint,       NDBCOL::Unsigned,       check_compat_lossy,
777    convert_integral< Hint16, Huint32>},
778   {NDBCOL::Smallint,       NDBCOL::Bigunsigned,    check_compat_lossy,
779    convert_integral< Hint16, Huint64>},
780   {NDBCOL::Mediumint,      NDBCOL::Unsigned,       check_compat_lossy,
781    convert_integral< Hint24, Huint32>},
782   {NDBCOL::Mediumint,      NDBCOL::Bigunsigned,    check_compat_lossy,
783    convert_integral< Hint24, Huint64>},
784   {NDBCOL::Int,            NDBCOL::Bigunsigned,    check_compat_lossy,
785    convert_integral< Hint32, Huint64>},
786   {NDBCOL::Tinyunsigned,   NDBCOL::Smallint,       check_compat_lossy,
787    convert_integral< Huint8, Hint16>},
788   {NDBCOL::Tinyunsigned,   NDBCOL::Mediumint,      check_compat_lossy,
789    convert_integral< Huint8, Hint24>},
790   {NDBCOL::Tinyunsigned,   NDBCOL::Int,            check_compat_lossy,
791    convert_integral< Huint8, Hint32>},
792   {NDBCOL::Tinyunsigned,   NDBCOL::Bigint,         check_compat_lossy,
793    convert_integral< Huint8, Hint64>},
794   {NDBCOL::Smallunsigned,  NDBCOL::Mediumint,      check_compat_lossy,
795    convert_integral< Huint16, Hint24>},
796   {NDBCOL::Smallunsigned,  NDBCOL::Int,            check_compat_lossy,
797    convert_integral< Huint16, Hint32>},
798   {NDBCOL::Smallunsigned,  NDBCOL::Bigint,         check_compat_lossy,
799    convert_integral< Huint16, Hint64>},
800   {NDBCOL::Mediumunsigned, NDBCOL::Int,            check_compat_lossy,
801    convert_integral< Huint24, Hint32>},
802   {NDBCOL::Mediumunsigned, NDBCOL::Bigint,         check_compat_lossy,
803    convert_integral< Huint24, Hint64>},
804   {NDBCOL::Unsigned,       NDBCOL::Bigint,         check_compat_lossy,
805    convert_integral< Huint32, Hint64>},
806 
807   // integral signedness+demotion conversions
808   {NDBCOL::Smallint,       NDBCOL::Tinyunsigned,   check_compat_lossy,
809    convert_integral< Hint16, Huint8>},
810   {NDBCOL::Mediumint,      NDBCOL::Tinyunsigned,   check_compat_lossy,
811    convert_integral< Hint24, Huint8>},
812   {NDBCOL::Mediumint,      NDBCOL::Smallunsigned,  check_compat_lossy,
813    convert_integral< Hint24, Huint16>},
814   {NDBCOL::Int,            NDBCOL::Tinyunsigned,   check_compat_lossy,
815    convert_integral< Hint32, Huint8>},
816   {NDBCOL::Int,            NDBCOL::Smallunsigned,  check_compat_lossy,
817    convert_integral< Hint32, Huint16>},
818   {NDBCOL::Int,            NDBCOL::Mediumunsigned, check_compat_lossy,
819    convert_integral< Hint32, Huint24>},
820   {NDBCOL::Bigint,         NDBCOL::Tinyunsigned,   check_compat_lossy,
821    convert_integral< Hint64, Huint8>},
822   {NDBCOL::Bigint,         NDBCOL::Smallunsigned,  check_compat_lossy,
823    convert_integral< Hint64, Huint16>},
824   {NDBCOL::Bigint,         NDBCOL::Mediumunsigned, check_compat_lossy,
825    convert_integral< Hint64, Huint24>},
826   {NDBCOL::Bigint,         NDBCOL::Unsigned,       check_compat_lossy,
827    convert_integral< Hint64, Huint32>},
828   {NDBCOL::Smallunsigned,  NDBCOL::Tinyint,        check_compat_lossy,
829    convert_integral< Huint16, Hint8>},
830   {NDBCOL::Mediumunsigned, NDBCOL::Tinyint,        check_compat_lossy,
831    convert_integral< Huint24, Hint8>},
832   {NDBCOL::Mediumunsigned, NDBCOL::Smallint,       check_compat_lossy,
833    convert_integral< Huint24, Hint16>},
834   {NDBCOL::Unsigned,       NDBCOL::Tinyint,        check_compat_lossy,
835    convert_integral< Huint32, Hint8>},
836   {NDBCOL::Unsigned,       NDBCOL::Smallint,       check_compat_lossy,
837    convert_integral< Huint32, Hint16>},
838   {NDBCOL::Unsigned,       NDBCOL::Mediumint,      check_compat_lossy,
839    convert_integral< Huint32, Hint24>},
840   {NDBCOL::Bigunsigned,    NDBCOL::Tinyint,        check_compat_lossy,
841    convert_integral< Huint64, Hint8>},
842   {NDBCOL::Bigunsigned,    NDBCOL::Smallint,       check_compat_lossy,
843    convert_integral< Huint64, Hint16>},
844   {NDBCOL::Bigunsigned,    NDBCOL::Mediumint,      check_compat_lossy,
845    convert_integral< Huint64, Hint24>},
846   {NDBCOL::Bigunsigned,    NDBCOL::Int,            check_compat_lossy,
847    convert_integral< Huint64, Hint32>},
848 
849   // times with fractional seconds
850   {NDBCOL::Time,           NDBCOL::Time2,          check_compat_precision,
851    convert_time_time2},
852   {NDBCOL::Time2,          NDBCOL::Time,           check_compat_precision,
853    convert_time2_time},
854   {NDBCOL::Time2,          NDBCOL::Time2,          check_compat_precision,
855    convert_time2_time2},
856   {NDBCOL::Datetime,       NDBCOL::Datetime2,      check_compat_precision,
857    convert_datetime_datetime2},
858   {NDBCOL::Datetime2,      NDBCOL::Datetime,       check_compat_precision,
859    convert_datetime2_datetime},
860   {NDBCOL::Datetime2,      NDBCOL::Datetime2,      check_compat_precision,
861    convert_datetime2_datetime2},
862   {NDBCOL::Timestamp,      NDBCOL::Timestamp2,     check_compat_precision,
863    convert_timestamp_timestamp2},
864   {NDBCOL::Timestamp2,     NDBCOL::Timestamp,      check_compat_precision,
865    convert_timestamp2_timestamp},
866   {NDBCOL::Timestamp2,     NDBCOL::Timestamp2,     check_compat_precision,
867    convert_timestamp2_timestamp2},
868 
869   {NDBCOL::Undefined,      NDBCOL::Undefined,      NULL,                  NULL}
870 };
871 
872 bool
init(Uint32 tableChangesMask)873 BackupRestore::init(Uint32 tableChangesMask)
874 {
875   release();
876 
877   if (!m_restore && !m_metadata_work_requested && !m_restore_epoch_requested)
878     return true;
879 
880   m_tableChangesMask = tableChangesMask;
881 
882   m_ndb = new Ndb(m_cluster_connection);
883   if (m_ndb == NULL)
884     return false;
885 
886   m_ndb->init(1024);
887   if (m_ndb->waitUntilReady(30) != 0)
888   {
889     restoreLogger.log_error("Failed to connect to ndb!!");
890     return false;
891   }
892   restoreLogger.log_info("Connected to ndb!!");
893 
894   m_callback = new restore_callback_t[m_parallelism];
895 
896   if (m_callback == 0)
897   {
898     restoreLogger.log_error("Failed to allocate callback structs");
899     return false;
900   }
901 
902   m_free_callback= m_callback;
903   for (Uint32 i= 0; i < m_parallelism; i++) {
904     m_callback[i].restore= this;
905     m_callback[i].connection= 0;
906     if (i > 0)
907       m_callback[i-1].next= &(m_callback[i]);
908   }
909   m_callback[m_parallelism-1].next = 0;
910 
911   return true;
912 }
913 
release()914 void BackupRestore::release()
915 {
916   for (unsigned i = 0; i < m_index_per_table.size(); i++)
917   {
918     Vector<NdbDictionary::Index*> & list = m_index_per_table[i];
919     for (unsigned j = 0; j < list.size(); j++)
920       delete list[j];
921     list.clear();
922   }
923   m_index_per_table.clear();
924 
925   for (unsigned i = 0; i < m_tablespaces.size(); i++)
926   {
927     delete m_tablespaces[i];
928   }
929   m_tablespaces.clear();
930 
931   for (unsigned i = 0; i < m_logfilegroups.size(); i++)
932   {
933     delete m_logfilegroups[i];
934   }
935   m_logfilegroups.clear();
936 
937   for (unsigned i = 0; i < m_hashmaps.size(); i++)
938   {
939     delete m_hashmaps[i];
940   }
941   m_hashmaps.clear();
942 
943   if (m_ndb)
944   {
945     delete m_ndb;
946     m_ndb= 0;
947   }
948 
949   if (m_callback)
950   {
951     delete [] m_callback;
952     m_callback= 0;
953   }
954 }
955 
~BackupRestore()956 BackupRestore::~BackupRestore()
957 {
958   release();
959 }
960 
961 static
962 int
match_blob(const char * name)963 match_blob(const char * name){
964   int cnt, id1, id2;
965   char buf[256];
966   if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
967     return id1;
968   }
969 
970   return -1;
971 }
972 
973 /**
974  * Extracts the database, schema, and table name from an internal table name;
975  * prints an error message and returns false in case of a format violation.
976  */
977 static
978 bool
dissect_table_name(const char * qualified_table_name,BaseString & db_name,BaseString & schema_name,BaseString & table_name)979 dissect_table_name(const char * qualified_table_name,
980                    BaseString & db_name,
981                    BaseString & schema_name,
982                    BaseString & table_name) {
983   Vector<BaseString> split;
984   BaseString tmp(qualified_table_name);
985   if (tmp.split(split, "/") != 3) {
986     restoreLogger.log_error("Invalid table name format `%s`",
987                             qualified_table_name);
988     return false;
989   }
990   db_name = split[0];
991   schema_name = split[1];
992   table_name = split[2];
993   return true;
994 }
995 
996 /**
997  * Similar method for index, only last component is relevant.
998  */
999 static
1000 bool
dissect_index_name(const char * qualified_index_name,BaseString & db_name,BaseString & schema_name,BaseString & index_name)1001 dissect_index_name(const char * qualified_index_name,
1002                    BaseString & db_name,
1003                    BaseString & schema_name,
1004                    BaseString & index_name) {
1005   Vector<BaseString> split;
1006   BaseString tmp(qualified_index_name);
1007   if (tmp.split(split, "/") != 4) {
1008     restoreLogger.log_error("Invalid index name format `%s`",
1009                             qualified_index_name);
1010     return false;
1011   }
1012   db_name = split[0];
1013   schema_name = split[1];
1014   index_name = split[3];
1015   return true;
1016 }
1017 
1018 /**
1019  * Assigns the new name for a database, if and only if to be rewritten.
1020  */
1021 static
1022 void
check_rewrite_database(BaseString & db_name)1023 check_rewrite_database(BaseString & db_name) {
1024   const char * new_db_name;
1025   if (g_rewrite_databases.get(db_name.c_str(), &new_db_name))
1026     db_name.assign(new_db_name);
1027 }
1028 
1029 const NdbDictionary::Table*
get_table(const TableS & tableS)1030 BackupRestore::get_table(const TableS & tableS){
1031   const NdbDictionary::Table * tab = tableS.m_dictTable;
1032   if(m_cache.m_old_table == tab)
1033     return m_cache.m_new_table;
1034   m_cache.m_old_table = tab;
1035 
1036   int cnt, id1, id2;
1037   char db[256], schema[256];
1038   if (strcmp(tab->getName(), "SYSTAB_0") == 0 ||
1039       strcmp(tab->getName(), "sys/def/SYSTAB_0") == 0) {
1040     /*
1041       Restore SYSTAB_0 to itself
1042     */
1043     m_cache.m_new_table = tab;
1044   }
1045   else if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d",
1046                         db, schema, &id1, &id2)) == 4){
1047     m_ndb->setDatabaseName(db);
1048     m_ndb->setSchemaName(schema);
1049 
1050     assert(tableS.getMainTable() != NULL);
1051     const TableS & mainTableS = *tableS.getMainTable();
1052 
1053     int mainColumnId = (int)tableS.getMainColumnId();
1054     assert(mainColumnId >= 0 && mainColumnId < mainTableS.getNoOfAttributes());
1055 
1056     const AttributeDesc & attr_desc =
1057       *mainTableS.getAttributeDesc(mainColumnId);
1058 
1059     BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d",
1060 			 m_new_tables[id1]->getTableId(), attr_desc.attrId);
1061 
1062     m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
1063 
1064   } else {
1065     m_cache.m_new_table = m_new_tables[tab->getTableId()];
1066   }
1067   assert(m_cache.m_new_table);
1068   return m_cache.m_new_table;
1069 }
1070 
1071 // create staging table
1072 bool
prepare_staging(const TableS & tableS)1073 BackupRestore::prepare_staging(const TableS & tableS)
1074 {
1075   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
1076 
1077   NdbDictionary::Table* stagingTable = tableS.m_stagingTable;
1078   const BaseString& stagingName = tableS.m_stagingName;
1079 
1080   const char* tablename = stagingName.c_str();
1081   BaseString db_name, schema_name, table_name;
1082   if (!dissect_table_name(tablename, db_name, schema_name, table_name)) {
1083     return false;
1084   }
1085   stagingTable->setName(table_name.c_str());
1086 
1087   // using defaults
1088   const Ndb_move_data::Opts::Tries ot;
1089   int createtries = 0;
1090   int createdelay = 0;
1091   while (1)
1092   {
1093     if (!(ot.maxtries == 0 || createtries < ot.maxtries))
1094     {
1095       restoreLogger.log_error("Create table %s "
1096           ": too many temporary errors: %u", tablename, createtries);
1097       return false;
1098     }
1099     createtries++;
1100 
1101     m_ndb->setDatabaseName(db_name.c_str());
1102     m_ndb->setSchemaName(schema_name.c_str());
1103     if (dict->createTable(*stagingTable) != 0)
1104     {
1105       const NdbError& error = dict->getNdbError();
1106       if (error.status != NdbError::TemporaryError)
1107       {
1108         restoreLogger.log_error("Error: Failed to create staging source %s: %u: %s",
1109                                  tablename, error.code, error.message);
1110         return false;
1111       }
1112       restoreLogger.log_error("Temporary: Failed to create staging source %s: %u: %s",
1113                                  tablename, error.code, error.message);
1114 
1115       createdelay *= 2;
1116       if (createdelay < ot.mindelay)
1117         createdelay = ot.mindelay;
1118       if (createdelay > ot.maxdelay)
1119         createdelay = ot.maxdelay;
1120 
1121       restoreLogger.log_info("Sleeping %u ms", createdelay);
1122       NdbSleep_MilliSleep(createdelay);
1123       continue;
1124     }
1125     restoreLogger.log_info("Created staging source %s", tablename);
1126     break;
1127   }
1128 
1129   const NdbDictionary::Table* tab = dict->getTable(table_name.c_str());
1130   if (tab == NULL)
1131   {
1132     restoreLogger.log_error("Unable to find table '%s' error: %u: %s",
1133 				    tablename, dict->getNdbError().code, dict->getNdbError().message);
1134   }
1135 
1136   /* Replace real target table with staging table in m_new_tables */
1137   const Uint32 orig_table_id = tableS.m_dictTable->getTableId();
1138   assert(m_new_tables[orig_table_id] != NULL);
1139 
1140   m_new_tables[orig_table_id] = tab;
1141   m_cache.m_old_table = NULL;
1142 
1143   return true;
1144 }
1145 
1146 // move rows from staging to real and drop staging
1147 bool
finalize_staging(const TableS & tableS)1148 BackupRestore::finalize_staging(const TableS & tableS)
1149 {
1150   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
1151 
1152   const NdbDictionary::Table* source = 0;
1153   const NdbDictionary::Table* target = 0;
1154 
1155   const char* stablename = tableS.m_stagingName.c_str();
1156   BaseString sdb_name, sschema_name, stable_name;
1157   if (!dissect_table_name(stablename, sdb_name, sschema_name, stable_name)) {
1158     return false;
1159   }
1160   m_ndb->setDatabaseName(sdb_name.c_str());
1161   m_ndb->setSchemaName(sschema_name.c_str());
1162   source = dict->getTable(stable_name.c_str());
1163   if (source == 0)
1164   {
1165     restoreLogger.log_error("Failed to find staging source %s: %u: %s",
1166                             stablename, dict->getNdbError().code, dict->getNdbError().message);
1167     return false;
1168   }
1169 
1170   const char* ttablename = tableS.getTableName();
1171   BaseString tdb_name, tschema_name, ttable_name;
1172   if (!dissect_table_name(ttablename, tdb_name, tschema_name, ttable_name)) {
1173     return false;
1174   }
1175   m_ndb->setDatabaseName(tdb_name.c_str());
1176   m_ndb->setSchemaName(tschema_name.c_str());
1177   target = dict->getTable(ttable_name.c_str());
1178   if (target == 0)
1179   {
1180     restoreLogger.log_error("Failed to find staging target %s: %u: %s",
1181                             ttablename, dict->getNdbError().code, dict->getNdbError().message);
1182     return false;
1183   }
1184 
1185   Ndb_move_data md;
1186   const Ndb_move_data::Stat& stat = md.get_stat();
1187 
1188   if (md.init(source, target) != 0)
1189   {
1190     const Ndb_move_data::Error& error = md.get_error();
1191     restoreLogger.log_error("Move data %s to %s : %u %s", stablename, ttablename, error.code, error.message);
1192     return false;
1193   }
1194 
1195   md.set_opts_flags(tableS.m_stagingFlags);
1196 
1197   // using defaults
1198   const Ndb_move_data::Opts::Tries ot;
1199   int tries = 0;
1200   int delay = 0;
1201   while (1)
1202   {
1203     if (!(ot.maxtries == 0 || tries < ot.maxtries))
1204     {
1205       restoreLogger.log_error("Move data %s to %s: too many temporary errors: %u",
1206                               stablename, ttablename, tries);
1207       return false;
1208     }
1209     tries++;
1210 
1211     if (md.move_data(m_ndb) != 0)
1212     {
1213       const Ndb_move_data::Error& error = md.get_error();
1214 
1215       restoreLogger.log_error("Move data %s to %s %s at try %u at rows moved %llu total %llu error %u %s",
1216          stablename, ttablename,
1217          (error.is_temporary() ? "temporary error" : "permanent error"),
1218          tries, // default is no limit
1219          stat.rows_moved, stat.rows_total, error.code, error.message);
1220 
1221       if (!error.is_temporary())
1222         return false;
1223 
1224       if (stat.rows_moved == 0) // this try
1225         delay *= 2;
1226       else
1227         delay /= 2;
1228       if (delay < ot.mindelay)
1229         delay = ot.mindelay;
1230       if (delay > ot.maxdelay)
1231         delay = ot.maxdelay;
1232 
1233       restoreLogger.log_info("Sleeping %u ms", delay);
1234       NdbSleep_MilliSleep(delay);
1235       continue;
1236     }
1237 
1238     restoreLogger.log_info("Successfully staged %s, moved all %llu rows",
1239         ttablename, stat.rows_total);
1240     if ((tableS.m_stagingFlags & Ndb_move_data::Opts::MD_ATTRIBUTE_DEMOTION)
1241         || stat.truncated != 0) // just in case
1242     restoreLogger.log_info("Truncated %llu attribute values", stat.truncated);
1243     break;
1244   }
1245 
1246   int droptries = 0;
1247   int dropdelay = 0;
1248   while (1)
1249   {
1250     if (!(ot.maxtries == 0 || droptries < ot.maxtries))
1251     {
1252       restoreLogger.log_error("Drop table %s: too many temporary errors: %u",
1253           stablename, droptries);
1254       return false;
1255     }
1256     droptries++;
1257 
1258     // dropTable(const Table&) is not defined ...
1259     m_ndb->setDatabaseName(sdb_name.c_str());
1260     m_ndb->setSchemaName(sschema_name.c_str());
1261     if (dict->dropTable(stable_name.c_str()) != 0)
1262     {
1263       const NdbError& error = dict->getNdbError();
1264       if (error.status != NdbError::TemporaryError)
1265       {
1266         restoreLogger.log_error("Error: Failed to drop staging source %s: %u: %s",
1267             stablename, error.code, error.message);
1268         return false;
1269       }
1270       restoreLogger.log_error("Temporary: Failed to drop staging source %s: %u: %s",
1271             stablename, error.code, error.message);
1272 
1273       dropdelay *= 2;
1274       if (dropdelay < ot.mindelay)
1275         dropdelay = ot.mindelay;
1276       if (dropdelay > ot.maxdelay)
1277         dropdelay = ot.maxdelay;
1278 
1279       restoreLogger.log_info("Sleeping %u ms", dropdelay);
1280       NdbSleep_MilliSleep(dropdelay);
1281       continue;
1282     }
1283     restoreLogger.log_info("Dropped staging source %s", stablename);
1284     break;
1285   }
1286 
1287   /* Replace staging table with real target table in m_new_tables */
1288   const Uint32 orig_table_id = tableS.m_dictTable->getTableId();
1289   assert(m_new_tables[orig_table_id] == source);
1290 
1291   m_new_tables[orig_table_id] = target;
1292   m_cache.m_old_table = NULL;
1293 
1294   return true;
1295 }
1296 
1297 bool
finalize_table(const TableS & table)1298 BackupRestore::finalize_table(const TableS & table){
1299   bool ret= true;
1300   if (!m_restore && !m_restore_meta)
1301     return ret;
1302   if (!table.have_auto_inc())
1303     return ret;
1304 
1305   const Uint32 orig_table_id = table.m_dictTable->getTableId();
1306   const Uint64 restore_next_val = m_auto_values[orig_table_id];
1307   do
1308   {
1309     Uint64 db_next_val = ~(Uint64)0;
1310     int r= m_ndb->readAutoIncrementValue(get_table(table), db_next_val);
1311     if (r == -1)
1312     {
1313       if (m_ndb->getNdbError().status == NdbError::TemporaryError)
1314       {
1315         NdbSleep_MilliSleep(50);
1316         continue; // retry
1317       }
1318       restoreLogger.log_error("Finalize_table failed to read auto increment "
1319                               "value for table %s.  Error : %u %s",
1320                               get_table(table)->getName(),
1321                               m_ndb->getNdbError().code,
1322                               m_ndb->getNdbError().message);
1323       return false;
1324     }
1325     if (restore_next_val > db_next_val)
1326     {
1327       Ndb::TupleIdRange emptyRange;
1328       emptyRange.reset();
1329 
1330       r= m_ndb->setAutoIncrementValue(get_table(table),
1331                                       emptyRange,
1332                                       restore_next_val,
1333                                       true);
1334       if (r == -1 &&
1335             m_ndb->getNdbError().status == NdbError::TemporaryError)
1336       {
1337         NdbSleep_MilliSleep(50);
1338         continue; // retry
1339       }
1340       ret = (r == 0);
1341     }
1342     return (ret);
1343   } while (1);
1344 }
1345 
1346 bool
rebuild_indexes(const TableS & table)1347 BackupRestore::rebuild_indexes(const TableS& table)
1348 {
1349   if (!m_rebuild_indexes)
1350      return true;
1351 
1352   const char *tablename = table.getTableName();
1353 
1354   const NdbDictionary::Table * tab = get_table(table);
1355   Uint32 id = tab->getObjectId();
1356   if (m_index_per_table.size() <= id)
1357     return true;
1358 
1359   BaseString db_name, schema_name, table_name;
1360   if (!dissect_table_name(tablename, db_name, schema_name, table_name)) {
1361     return false;
1362   }
1363   check_rewrite_database(db_name);
1364 
1365   m_ndb->setDatabaseName(db_name.c_str());
1366   m_ndb->setSchemaName(schema_name.c_str());
1367   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
1368 
1369   /* First drop any support indexes */
1370   if (!dropPkMappingIndex(&table))
1371   {
1372     return false;
1373   }
1374 
1375   Vector<NdbDictionary::Index*> & indexes = m_index_per_table[id];
1376   for(unsigned i = 0; i<indexes.size(); i++)
1377   {
1378     const NdbDictionary::Index * const idx = indexes[i];
1379     const char * const idx_name = idx->getName();
1380     const char * const tab_name = idx->getTable();
1381     const NDB_TICKS start = NdbTick_getCurrentTicks();
1382     restoreLogger.log_info("Rebuilding index `%s` on table `%s` ...",
1383         idx_name, tab_name);
1384     bool done = false;
1385     for(int retries = 0; retries<11; retries++)
1386     {
1387       if ((dict->getIndex(idx_name, tab_name) == NULL)
1388           && (dict->createIndex(* idx, 1) != 0))
1389       {
1390         if(dict->getNdbError().status == NdbError::TemporaryError)
1391         {
1392           restoreLogger.log_error("retry sleep 50 ms on error %u",
1393                       dict->getNdbError().code);
1394           NdbSleep_MilliSleep(50);
1395           continue;  // retry on temporary error
1396         }
1397         else
1398         {
1399           break;
1400         }
1401       }
1402       else
1403       {
1404         done = true;
1405         break;
1406       }
1407     }
1408     if(!done)
1409     {
1410       restoreLogger.log_info("FAIL!");
1411       restoreLogger.log_error("Rebuilding index `%s` on table `%s` failed: %u: %s",
1412           idx_name, tab_name, dict->getNdbError().code, dict->getNdbError().message);
1413       return false;
1414     }
1415     const NDB_TICKS stop = NdbTick_getCurrentTicks();
1416     const Uint64 elapsed = NdbTick_Elapsed(start,stop).seconds();
1417     restoreLogger.log_info("OK (%llu s)", elapsed);
1418   }
1419 
1420   return true;
1421 }
1422 
1423 #ifdef NOT_USED
default_nodegroups(NdbDictionary::Table * table)1424 static bool default_nodegroups(NdbDictionary::Table *table)
1425 {
1426   Uint16 *node_groups = (Uint16*)table->getFragmentData();
1427   Uint32 no_parts = table->getFragmentDataLen() >> 1;
1428   Uint32 i;
1429 
1430   if (node_groups[0] != 0)
1431     return false;
1432   for (i = 1; i < no_parts; i++)
1433   {
1434     if (node_groups[i] != NDB_UNDEF_NODEGROUP)
1435       return false;
1436   }
1437   return true;
1438 }
1439 #endif
1440 
1441 
get_no_fragments(Uint64 max_rows,Uint32 no_nodes)1442 static Uint32 get_no_fragments(Uint64 max_rows, Uint32 no_nodes)
1443 {
1444   Uint32 i = 0;
1445   Uint32 acc_row_size = 27;
1446   Uint32 acc_fragment_size = 512*1024*1024;
1447   Uint32 no_parts= Uint32((max_rows*acc_row_size)/acc_fragment_size + 1);
1448   Uint32 reported_parts = no_nodes;
1449   while (reported_parts < no_parts && ++i < 4 &&
1450          (reported_parts + no_parts) < MAX_NDB_PARTITIONS)
1451     reported_parts+= no_nodes;
1452   if (reported_parts < no_parts)
1453   {
1454     restoreLogger.log_error("Table will be restored but will not be able to handle the maximum"
1455                             " amount of rows as requested");
1456   }
1457   return reported_parts;
1458 }
1459 
1460 
set_default_nodegroups(NdbDictionary::Table * table)1461 static void set_default_nodegroups(NdbDictionary::Table *table)
1462 {
1463   Uint32 no_parts = table->getFragmentCount();
1464   Uint32 node_group[MAX_NDB_PARTITIONS];
1465   Uint32 i;
1466 
1467   node_group[0] = 0;
1468   for (i = 1; i < no_parts; i++)
1469   {
1470     node_group[i] = NDB_UNDEF_NODEGROUP;
1471   }
1472   table->setFragmentData(node_group, no_parts);
1473 }
1474 
map_ng(Uint32 ng) const1475 Uint32 BackupRestore::map_ng(Uint32 ng) const
1476 {
1477   NODE_GROUP_MAP *ng_map = m_nodegroup_map;
1478 
1479   if (ng == NDB_UNDEF_NODEGROUP ||
1480       ng_map[ng].map_array[0] == NDB_UNDEF_NODEGROUP)
1481   {
1482     return ng;
1483   }
1484   else
1485   {
1486     Uint32 new_ng;
1487     Uint32 curr_inx = ng_map[ng].curr_index;
1488     Uint32 new_curr_inx = curr_inx + 1;
1489 
1490     assert(ng < MAX_NDB_PARTITIONS);
1491     assert(curr_inx < MAX_MAPS_PER_NODE_GROUP);
1492     assert(new_curr_inx < MAX_MAPS_PER_NODE_GROUP);
1493 
1494     if (new_curr_inx >= MAX_MAPS_PER_NODE_GROUP)
1495       new_curr_inx = 0;
1496     else if (ng_map[ng].map_array[new_curr_inx] == NDB_UNDEF_NODEGROUP)
1497       new_curr_inx = 0;
1498     new_ng = ng_map[ng].map_array[curr_inx];
1499     ng_map[ng].curr_index = new_curr_inx;
1500     return new_ng;
1501   }
1502 }
1503 
1504 
map_nodegroups(Uint32 * ng_array,Uint32 no_parts) const1505 bool BackupRestore::map_nodegroups(Uint32 *ng_array, Uint32 no_parts) const
1506 {
1507   Uint32 i;
1508   bool mapped = FALSE;
1509   DBUG_ENTER("map_nodegroups");
1510 
1511   assert(no_parts < MAX_NDB_PARTITIONS);
1512   for (i = 0; i < no_parts; i++)
1513   {
1514     Uint32 ng;
1515     ng = map_ng(ng_array[i]);
1516     if (ng != ng_array[i])
1517       mapped = TRUE;
1518     ng_array[i] = ng;
1519   }
1520   DBUG_RETURN(mapped);
1521 }
1522 
1523 
copy_byte(const char ** data,char ** new_data,uint * len)1524 static void copy_byte(const char **data, char **new_data, uint *len)
1525 {
1526   **new_data = **data;
1527   (*data)++;
1528   (*new_data)++;
1529   (*len)++;
1530 }
1531 
1532 
search_replace(char * search_str,char ** new_data,const char ** data,const char * end_data,uint * new_data_len) const1533 bool BackupRestore::search_replace(char *search_str, char **new_data,
1534                                    const char **data, const char *end_data,
1535                                    uint *new_data_len) const
1536 {
1537   uint search_str_len = (uint)strlen(search_str);
1538   uint inx = 0;
1539   bool in_delimiters = FALSE;
1540   bool escape_char = FALSE;
1541   char start_delimiter = 0;
1542   DBUG_ENTER("search_replace");
1543 
1544   do
1545   {
1546     char c = **data;
1547     copy_byte(data, new_data, new_data_len);
1548     if (escape_char)
1549     {
1550       escape_char = FALSE;
1551     }
1552     else if (in_delimiters)
1553     {
1554       if (c == start_delimiter)
1555         in_delimiters = FALSE;
1556     }
1557     else if (c == '\'' || c == '\"')
1558     {
1559       in_delimiters = TRUE;
1560       start_delimiter = c;
1561     }
1562     else if (c == '\\')
1563     {
1564       escape_char = TRUE;
1565     }
1566     else if (c == search_str[inx])
1567     {
1568       inx++;
1569       if (inx == search_str_len)
1570       {
1571         bool found = FALSE;
1572         uint number = 0;
1573         while (*data != end_data)
1574         {
1575           if (isdigit(**data))
1576           {
1577             found = TRUE;
1578             number = (10 * number) + (**data);
1579             if (number > MAX_NDB_NODES)
1580               break;
1581           }
1582           else if (found)
1583           {
1584             /*
1585                After long and tedious preparations we have actually found
1586                a node group identifier to convert. We'll use the mapping
1587                table created for node groups and then insert the new number
1588                instead of the old number.
1589             */
1590             uint temp = map_ng(number);
1591             int no_digits = 0;
1592             char digits[10];
1593             while (temp != 0)
1594             {
1595               digits[no_digits] = temp % 10;
1596               no_digits++;
1597               temp/=10;
1598             }
1599             for (no_digits--; no_digits >= 0; no_digits--)
1600             {
1601               **new_data = digits[no_digits];
1602               *new_data_len+=1;
1603             }
1604             DBUG_RETURN(FALSE);
1605           }
1606           else
1607             break;
1608           (*data)++;
1609         }
1610         DBUG_RETURN(TRUE);
1611       }
1612     }
1613     else
1614       inx = 0;
1615   } while (*data < end_data);
1616   DBUG_RETURN(FALSE);
1617 }
1618 
map_in_frm(char * new_data,const char * data,uint data_len,uint * new_data_len) const1619 bool BackupRestore::map_in_frm(char *new_data, const char *data,
1620                                        uint data_len, uint *new_data_len) const
1621 {
1622   const char *end_data= data + data_len;
1623   const char *end_part_data;
1624   const char *part_data;
1625   char *extra_ptr;
1626   uint start_key_definition_len = uint2korr(data + 6);
1627   uint key_definition_len = uint4korr(data + 47);
1628   uint part_info_len;
1629   DBUG_ENTER("map_in_frm");
1630 
1631   if (data_len < 4096) goto error;
1632   extra_ptr = (char*)data + start_key_definition_len + key_definition_len;
1633   if ((int)data_len < ((extra_ptr - data) + 2)) goto error;
1634   extra_ptr = extra_ptr + 2 + uint2korr(extra_ptr);
1635   if ((int)data_len < ((extra_ptr - data) + 2)) goto error;
1636   extra_ptr = extra_ptr + 2 + uint2korr(extra_ptr);
1637   if ((int)data_len < ((extra_ptr - data) + 4)) goto error;
1638   part_info_len = uint4korr(extra_ptr);
1639   part_data = extra_ptr + 4;
1640   if ((int)data_len < ((part_data + part_info_len) - data)) goto error;
1641 
1642   do
1643   {
1644     copy_byte(&data, &new_data, new_data_len);
1645   } while (data < part_data);
1646   end_part_data = part_data + part_info_len;
1647   do
1648   {
1649     if (search_replace((char*)" NODEGROUP = ", &new_data, &data,
1650                        end_part_data, new_data_len))
1651       goto error;
1652   } while (data != end_part_data);
1653   do
1654   {
1655     copy_byte(&data, &new_data, new_data_len);
1656   } while (data < end_data);
1657   DBUG_RETURN(FALSE);
1658 error:
1659   DBUG_RETURN(TRUE);
1660 }
1661 
1662 
translate_frm(NdbDictionary::Table * table) const1663 bool BackupRestore::translate_frm(NdbDictionary::Table *table) const
1664 {
1665   uchar *data;
1666   char *new_data;
1667   uint new_data_len;
1668   size_t data_len;
1669   DBUG_ENTER("translate_frm");
1670 
1671   {
1672     // Extract extra metadata for this table, check for version 1
1673     Uint32 version;
1674     void* unpacked_data;
1675     Uint32 unpacked_len;
1676     const int get_result =
1677         table->getExtraMetadata(version,
1678                                 &unpacked_data, &unpacked_len);
1679     if (get_result != 0)
1680     {
1681       DBUG_RETURN(true);
1682     }
1683 
1684     if (version != 1)
1685     {
1686       free(unpacked_data);
1687       DBUG_RETURN(true);
1688     }
1689 
1690     data = (uchar*)unpacked_data;
1691     data_len = unpacked_len;
1692   }
1693 
1694   /*
1695     Add max 4 characters per partition to handle worst case
1696     of mapping from single digit to 5-digit number.
1697     Fairly future-proof, ok up to 99999 node groups.
1698   */
1699   const uint no_parts = table->getFragmentCount();
1700   const uint extra_growth = no_parts * 4;
1701   if ((new_data = (char*) malloc(data_len + extra_growth)))
1702   {
1703     DBUG_RETURN(TRUE);
1704   }
1705   if (map_in_frm(new_data, (const char*)data, (uint)data_len, &new_data_len))
1706   {
1707     free(new_data);
1708     DBUG_RETURN(TRUE);
1709   }
1710   const int set_result =
1711       table->setExtraMetadata(1, // version 1 for frm
1712                               new_data, (Uint32)new_data_len);
1713   if (set_result != 0)
1714   {
1715     free(new_data);
1716     DBUG_RETURN(TRUE);
1717   }
1718 
1719   // NOTE! the memory allocated in 'new_data' is not released here
1720   // NOTE! the memory returned in 'data' from getExtraMetadata() is not
1721   // released here(and a few error places above)
1722   // NOTE! the usage of this function and its functionality is described in
1723   // BUG25449055 NDB_RESTORE TRANSLATE FRM FOR USERDEFINED PARTITIOING TABLES
1724 
1725   DBUG_RETURN(FALSE);
1726 }
1727 
1728 #include <signaldata/DictTabInfo.hpp>
1729 
1730 bool
object(Uint32 type,const void * ptr)1731 BackupRestore::object(Uint32 type, const void * ptr)
1732 {
1733   if (!m_restore_meta)
1734     return true;
1735 
1736   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
1737   switch(type){
1738   case DictTabInfo::Tablespace:
1739   {
1740     NdbDictionary::Tablespace old(*(NdbDictionary::Tablespace*)ptr);
1741 
1742     Uint32 id = old.getObjectId();
1743 
1744     if (!m_no_restore_disk)
1745     {
1746       NdbDictionary::LogfileGroup * lg = m_logfilegroups[old.getDefaultLogfileGroupId()];
1747       old.setDefaultLogfileGroup(* lg);
1748       restoreLogger.log_info("Creating tablespace: %s...", old.getName());
1749       int ret = dict->createTablespace(old);
1750       if (ret)
1751       {
1752 	NdbError errobj= dict->getNdbError();
1753 	restoreLogger.log_info("FAILED");
1754         restoreLogger.log_error("Create tablespace failed: %s: %u: %s",
1755             old.getName(), errobj.code, errobj.message);
1756 	return false;
1757       }
1758       restoreLogger.log_info("done");
1759     }
1760 
1761     NdbDictionary::Tablespace curr = dict->getTablespace(old.getName());
1762     NdbError errobj = dict->getNdbError();
1763     if ((int) errobj.classification == (int) ndberror_cl_none)
1764     {
1765       NdbDictionary::Tablespace* currptr = new NdbDictionary::Tablespace(curr);
1766       NdbDictionary::Tablespace * null = 0;
1767       m_tablespaces.set(currptr, id, null);
1768       restoreLogger.log_debug("Retreived tablespace: %s oldid: %u newid: %u"
1769           " %p", currptr->getName(), id, currptr->getObjectId(),
1770 	 (void*)currptr);
1771       m_n_tablespace++;
1772       return true;
1773     }
1774 
1775     restoreLogger.log_error("Failed to retrieve tablespace \"%s\": %u: %s",
1776         old.getName(), errobj.code, errobj.message);
1777 
1778     return false;
1779     break;
1780   }
1781   case DictTabInfo::LogfileGroup:
1782   {
1783     NdbDictionary::LogfileGroup old(*(NdbDictionary::LogfileGroup*)ptr);
1784 
1785     Uint32 id = old.getObjectId();
1786 
1787     if (!m_no_restore_disk)
1788     {
1789       restoreLogger.log_info("Creating logfile group: %s...", old.getName());
1790       int ret = dict->createLogfileGroup(old);
1791       if (ret)
1792       {
1793 	NdbError errobj= dict->getNdbError();
1794 	restoreLogger.log_info("FAILED");
1795         restoreLogger.log_error("Create logfile group failed: %s: %u: %s",
1796             old.getName(), errobj.code, errobj.message);
1797 	return false;
1798       }
1799       restoreLogger.log_info("done");
1800     }
1801 
1802     NdbDictionary::LogfileGroup curr = dict->getLogfileGroup(old.getName());
1803     NdbError errobj = dict->getNdbError();
1804     if ((int) errobj.classification == (int) ndberror_cl_none)
1805     {
1806       NdbDictionary::LogfileGroup* currptr =
1807 	new NdbDictionary::LogfileGroup(curr);
1808       NdbDictionary::LogfileGroup * null = 0;
1809       m_logfilegroups.set(currptr, id, null);
1810       restoreLogger.log_debug("Retreived logfile group: %s oldid: %u newid: %u"
1811             " %p", currptr->getName(), id, currptr->getObjectId(),
1812             (void*)currptr);
1813       m_n_logfilegroup++;
1814       return true;
1815     }
1816 
1817     restoreLogger.log_error("Failed to retrieve logfile group \"%s\": %u: %s",
1818         old.getName(), errobj.code, errobj.message);
1819 
1820     return false;
1821     break;
1822   }
1823   case DictTabInfo::Datafile:
1824   {
1825     if (!m_no_restore_disk)
1826     {
1827       NdbDictionary::Datafile old(*(NdbDictionary::Datafile*)ptr);
1828       NdbDictionary::ObjectId objid;
1829       old.getTablespaceId(&objid);
1830       NdbDictionary::Tablespace * ts = m_tablespaces[objid.getObjectId()];
1831       restoreLogger.log_debug("Connecting datafile %s to tablespace"
1832                               "/logfile group: oldid: %u newid: %u",
1833                               old.getPath(), objid.getObjectId(),
1834                               ts->getObjectId());
1835       old.setTablespace(* ts);
1836       restoreLogger.log_info("Creating datafile \"%s\"...", old.getPath());
1837       if (dict->createDatafile(old))
1838       {
1839 	NdbError errobj= dict->getNdbError();
1840 	restoreLogger.log_info("FAILED");
1841         restoreLogger.log_error("Create datafile failed: %s: %u: %s",
1842             old.getPath(), errobj.code, errobj.message);
1843 	return false;
1844       }
1845       restoreLogger.log_info("done");
1846       m_n_datafile++;
1847     }
1848     return true;
1849     break;
1850   }
1851   case DictTabInfo::Undofile:
1852   {
1853     if (!m_no_restore_disk)
1854     {
1855       NdbDictionary::Undofile old(*(NdbDictionary::Undofile*)ptr);
1856       NdbDictionary::ObjectId objid;
1857       old.getLogfileGroupId(&objid);
1858       NdbDictionary::LogfileGroup * lg = m_logfilegroups[objid.getObjectId()];
1859       restoreLogger.log_debug("Connecting undofile %s to logfile group: oldid:"
1860           " %u newid: %u %p", old.getPath(), objid.getObjectId(),
1861           lg->getObjectId(), (void*)lg);
1862       old.setLogfileGroup(* lg);
1863       restoreLogger.log_info("Creating undofile \"%s\"...", old.getPath());
1864       if (dict->createUndofile(old))
1865       {
1866 	NdbError errobj= dict->getNdbError();
1867 	restoreLogger.log_info("FAILED");
1868         restoreLogger.log_error("Create undofile failed: %s: %u: %s",
1869             old.getPath(), errobj.code, errobj.message);
1870 	return false;
1871       }
1872       restoreLogger.log_info("done");
1873       m_n_undofile++;
1874     }
1875     return true;
1876     break;
1877   }
1878   case DictTabInfo::HashMap:
1879   {
1880     NdbDictionary::HashMap old(*(NdbDictionary::HashMap*)ptr);
1881 
1882     Uint32 id = old.getObjectId();
1883 
1884     if (m_restore_meta)
1885     {
1886       int ret = dict->createHashMap(old);
1887       if (ret == 0)
1888       {
1889         restoreLogger.log_info("Created hashmap: %s", old.getName());
1890       }
1891       else
1892       {
1893         NdbError errobj = dict->getNdbError();
1894         // We ignore schema already exists, this is fine
1895         if (errobj.code != 721)
1896         {
1897           restoreLogger.log_error("Could not create hashmap \"%s\": %u: %s",
1898               old.getName(), errobj.code, errobj.message);
1899           return false;
1900         }
1901       }
1902     }
1903 
1904     NdbDictionary::HashMap curr;
1905     if (dict->getHashMap(curr, old.getName()) == 0)
1906     {
1907       NdbDictionary::HashMap* currptr =
1908         new NdbDictionary::HashMap(curr);
1909       NdbDictionary::HashMap * null = 0;
1910       m_hashmaps.set(currptr, id, null);
1911       restoreLogger.log_debug("Retreived hashmap: %s oldid %u newid %u %p",
1912           currptr->getName(), id, currptr->getObjectId(), (void*)currptr);
1913       return true;
1914     }
1915 
1916     NdbError errobj = dict->getNdbError();
1917     restoreLogger.log_error("Failed to retrieve hashmap \"%s\": %u: %s",
1918         old.getName(), errobj.code, errobj.message);
1919 
1920     return false;
1921   }
1922   case DictTabInfo::ForeignKey: // done after tables
1923   {
1924     return true;
1925   }
1926   default:
1927   {
1928     restoreLogger.log_error("Unknown object type: %u", type);
1929     break;
1930   }
1931   }
1932   return true;
1933 }
1934 
1935 bool
has_temp_error()1936 BackupRestore::has_temp_error(){
1937   return m_temp_error;
1938 }
1939 
1940 struct TransGuard
1941 {
1942   NdbTransaction* pTrans;
TransGuardTransGuard1943   TransGuard(NdbTransaction* p) : pTrans(p) {}
~TransGuardTransGuard1944   ~TransGuard() { if (pTrans) pTrans->close();}
1945 };
1946 
1947 bool
update_apply_status(const RestoreMetaData & metaData,bool snapshotstart)1948 BackupRestore::update_apply_status(const RestoreMetaData &metaData, bool snapshotstart)
1949 {
1950   if (!m_restore_epoch)
1951     return true;
1952 
1953   bool result= false;
1954   unsigned apply_table_format= 0;
1955 
1956   m_ndb->setDatabaseName(NDB_REP_DB);
1957   m_ndb->setSchemaName("def");
1958 
1959   NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
1960   const NdbDictionary::Table *ndbtab= dict->getTable(NDB_APPLY_TABLE);
1961   if (!ndbtab)
1962   {
1963     restoreLogger.log_error("%s: %u: %s", NDB_APPLY_TABLE, dict->getNdbError().code, dict->getNdbError().message);
1964     return false;
1965   }
1966   if (ndbtab->getColumn(0)->getType() == NdbDictionary::Column::Unsigned &&
1967       ndbtab->getColumn(1)->getType() == NdbDictionary::Column::Bigunsigned)
1968   {
1969     if (ndbtab->getNoOfColumns() == 2)
1970     {
1971       apply_table_format= 1;
1972     }
1973     else if
1974       (ndbtab->getColumn(2)->getType() == NdbDictionary::Column::Varchar &&
1975        ndbtab->getColumn(3)->getType() == NdbDictionary::Column::Bigunsigned &&
1976        ndbtab->getColumn(4)->getType() == NdbDictionary::Column::Bigunsigned)
1977     {
1978       apply_table_format= 2;
1979     }
1980   }
1981   if (apply_table_format == 0)
1982   {
1983     restoreLogger.log_error("%s has wrong format\n", NDB_APPLY_TABLE);
1984     return false;
1985   }
1986 
1987   Uint32 server_id= 0;
1988   Uint32 version= metaData.getNdbVersion();
1989 
1990   Uint64 epoch= 0;
1991   if (snapshotstart)
1992   {
1993     epoch = Uint64(metaData.getStartGCP());
1994   }
1995   else
1996   {
1997     epoch = Uint64(metaData.getStopGCP());
1998   }
1999 
2000   if (version >= NDBD_MICRO_GCP_63 ||
2001       (version >= NDBD_MICRO_GCP_62 && getMinor(version) == 2))
2002   {
2003     epoch<<= 32; // Only gci_hi is saved...
2004 
2005     /**
2006      * Backup contains all epochs with those top bits,
2007      * so we indicate that with max setting
2008      */
2009     epoch += (Uint64(1) << 32) - 1;
2010   }
2011 
2012   Uint64 zero= 0;
2013   char empty_string[1];
2014   empty_string[0]= 0;
2015 
2016   int retries;
2017   for (retries = 0; retries <10; retries++)
2018   {
2019     if (retries > 0)
2020     {
2021       NdbSleep_MilliSleep(100 + (retries - 1) * 100);
2022     }
2023     NdbTransaction * trans= m_ndb->startTransaction();
2024     if (!trans)
2025     {
2026       restoreLogger.log_error("%s : failed to get transaction in --restore-epoch: %u:%s",
2027           NDB_APPLY_TABLE, m_ndb->getNdbError().code, m_ndb->getNdbError().message);
2028       if (m_ndb->getNdbError().status == NdbError::TemporaryError)
2029       {
2030         continue;
2031       }
2032     }
2033 
2034     TransGuard g(trans);
2035     NdbOperation * op= trans->getNdbOperation(ndbtab);
2036     if (!op)
2037     {
2038       restoreLogger.log_error("%s : failed to get operation in --restore-epoch: %u:%s",
2039           NDB_APPLY_TABLE, trans->getNdbError().code, trans->getNdbError().message);
2040       if (trans->getNdbError().status == NdbError::TemporaryError)
2041       {
2042         continue;
2043       }
2044       return false;
2045     }
2046     if (op->writeTuple() ||
2047         op->equal(0u, (const char *)&server_id, sizeof(server_id)) ||
2048         op->setValue(1u, (const char *)&epoch, sizeof(epoch)))
2049     {
2050       restoreLogger.log_error("%s : failed to set epoch value in --restore-epoch: %u:%s",
2051           NDB_APPLY_TABLE, op->getNdbError().code, op->getNdbError().message);
2052       return false;
2053     }
2054     if ((apply_table_format == 2) &&
2055         (op->setValue(2u, (const char *)&empty_string, 1) ||
2056          op->setValue(3u, (const char *)&zero, sizeof(zero)) ||
2057          op->setValue(4u, (const char *)&zero, sizeof(zero))))
2058     {
2059       restoreLogger.log_error("%s : failed to set values in --restore-epoch: %u:%s",
2060           NDB_APPLY_TABLE, op->getNdbError().code, op->getNdbError().message);
2061       return false;
2062     }
2063 
2064     int res = trans->execute(NdbTransaction::Commit);
2065     if (res != 0)
2066     {
2067       restoreLogger.log_error("%s : failed to commit transaction in --restore-epoch: %u:%s",
2068           NDB_APPLY_TABLE, trans->getNdbError().code, trans->getNdbError().message);
2069       if (trans->getNdbError().status == NdbError::TemporaryError)
2070       {
2071         continue;
2072       }
2073       return false;
2074     }
2075     else
2076     {
2077       result= true;
2078       break;
2079     }
2080   }
2081   if (result &&
2082       retries > 0)
2083     restoreLogger.log_error("--restore-epoch completed successfully "
2084                             "after retries");
2085 
2086   return result;
2087 }
2088 
2089 bool
report_started(unsigned backup_id,unsigned node_id)2090 BackupRestore::report_started(unsigned backup_id, unsigned node_id)
2091 {
2092   if (m_ndb)
2093   {
2094     Uint32 data[3];
2095     data[0]= NDB_LE_RestoreStarted;
2096     data[1]= backup_id;
2097     data[2]= node_id;
2098     Ndb_internal::send_event_report(false /* has lock */, m_ndb, data, 3);
2099   }
2100   return true;
2101 }
2102 
2103 bool
report_meta_data(unsigned backup_id,unsigned node_id)2104 BackupRestore::report_meta_data(unsigned backup_id, unsigned node_id)
2105 {
2106   if (m_ndb)
2107   {
2108     Uint32 data[8];
2109     data[0]= NDB_LE_RestoreMetaData;
2110     data[1]= backup_id;
2111     data[2]= node_id;
2112     data[3]= m_n_tables;
2113     data[4]= m_n_tablespace;
2114     data[5]= m_n_logfilegroup;
2115     data[6]= m_n_datafile;
2116     data[7]= m_n_undofile;
2117     Ndb_internal::send_event_report(false /* has lock */, m_ndb, data, 8);
2118   }
2119   return true;
2120 }
2121 bool
report_data(unsigned backup_id,unsigned node_id)2122 BackupRestore::report_data(unsigned backup_id, unsigned node_id)
2123 {
2124   if (m_ndb)
2125   {
2126     Uint32 data[7];
2127     data[0]= NDB_LE_RestoreData;
2128     data[1]= backup_id;
2129     data[2]= node_id;
2130     data[3]= m_dataCount & 0xFFFFFFFF;
2131     data[4]= 0;
2132     data[5]= (Uint32)(m_dataBytes & 0xFFFFFFFF);
2133     data[6]= (Uint32)((m_dataBytes >> 32) & 0xFFFFFFFF);
2134     Ndb_internal::send_event_report(false /* has lock */, m_ndb, data, 7);
2135   }
2136   return true;
2137 }
2138 
2139 bool
report_log(unsigned backup_id,unsigned node_id)2140 BackupRestore::report_log(unsigned backup_id, unsigned node_id)
2141 {
2142   if (m_ndb)
2143   {
2144     Uint32 data[7];
2145     data[0]= NDB_LE_RestoreLog;
2146     data[1]= backup_id;
2147     data[2]= node_id;
2148     data[3]= m_logCount & 0xFFFFFFFF;
2149     data[4]= 0;
2150     data[5]= (Uint32)(m_logBytes & 0xFFFFFFFF);
2151     data[6]= (Uint32)((m_logBytes >> 32) & 0xFFFFFFFF);
2152     Ndb_internal::send_event_report(false /* has lock */, m_ndb, data, 7);
2153   }
2154   return true;
2155 }
2156 
2157 bool
report_completed(unsigned backup_id,unsigned node_id)2158 BackupRestore::report_completed(unsigned backup_id, unsigned node_id)
2159 {
2160   if (m_ndb)
2161   {
2162     Uint32 data[3];
2163     data[0]= NDB_LE_RestoreCompleted;
2164     data[1]= backup_id;
2165     data[2]= node_id;
2166     Ndb_internal::send_event_report(false /* has lock */, m_ndb, data, 3);
2167   }
2168   return true;
2169 }
2170 
2171 bool
column_compatible_check(const char * tableName,const NDBCOL * backupCol,const NDBCOL * dbCol)2172 BackupRestore::column_compatible_check(const char* tableName,
2173                                        const NDBCOL* backupCol,
2174                                        const NDBCOL* dbCol)
2175 {
2176   if (backupCol->equal(*dbCol))
2177     return true;
2178 
2179   /* Something is different between the columns, but some differences don't
2180    * matter.
2181    * Investigate which parts are different, and inform user
2182    */
2183   bool similarEnough = true;
2184 
2185   /* We check similar things to NdbColumnImpl::equal() here */
2186   if (strcmp(backupCol->getName(), dbCol->getName()) != 0)
2187   {
2188     restoreLogger.log_info("Column %s.%s "
2189         "has different name in DB(%s)",
2190         tableName, backupCol->getName(), dbCol->getName());
2191     similarEnough = false;
2192   }
2193 
2194   if (backupCol->getType() != dbCol->getType())
2195   {
2196     restoreLogger.log_info("Column %s.%s "
2197         "%s has different type in DB; promotion or lossy type conversion"
2198         " (demotion, signed/unsigned) may be required.",
2199         tableName, backupCol->getName(), dbCol->getName());
2200 
2201     similarEnough = false;
2202   }
2203 
2204   if (backupCol->getPrimaryKey() != dbCol->getPrimaryKey())
2205   {
2206     restoreLogger.log_info("Column %s.%s "
2207         "%s a primary key in the DB", tableName, backupCol->getName(),
2208         (dbCol->getPrimaryKey()?" is":" is not"));
2209     /* If --allow-pk-changes is set, this may be ok */
2210   }
2211   else
2212   {
2213     if (backupCol->getPrimaryKey())
2214     {
2215       if (backupCol->getDistributionKey() != dbCol->getDistributionKey())
2216       {
2217         restoreLogger.log_info("Column %s.%s "
2218             "%s a distribution key in the DB", tableName, backupCol->getName(),
2219             (dbCol->getDistributionKey()?" is":" is not"));
2220         /* Not a problem for restore though */
2221       }
2222     }
2223   }
2224 
2225   if (backupCol->getNullable() != dbCol->getNullable())
2226   {
2227     restoreLogger.log_info("Column %s.%s "
2228         "%s nullable in the DB", tableName, backupCol->getName(),
2229         (dbCol->getNullable()?" is":" is not"));
2230     similarEnough = false;
2231   }
2232 
2233   if (backupCol->getPrecision() != dbCol->getPrecision())
2234   {
2235     restoreLogger.log_info("Column %s.%s "
2236         "precision is different in the DB",
2237         tableName, backupCol->getName());
2238     similarEnough = false;
2239   }
2240 
2241   if (backupCol->getScale() != dbCol->getScale())
2242   {
2243     restoreLogger.log_info("Column %s.%s "
2244         "scale is different in the DB",
2245         tableName, backupCol->getName());
2246     similarEnough = false;
2247   }
2248 
2249   if (backupCol->getLength() != dbCol->getLength())
2250   {
2251     restoreLogger.log_info("Column %s.%s "
2252         "length is different in the DB",
2253         tableName, backupCol->getName());
2254     similarEnough = false;
2255   }
2256 
2257   if (backupCol->getCharset() != dbCol->getCharset())
2258   {
2259     restoreLogger.log_info("Column %s.%s "
2260         "charset is different in the DB",
2261         tableName, backupCol->getName());
2262     similarEnough = false;
2263   }
2264 
2265   if (backupCol->getAutoIncrement() != dbCol->getAutoIncrement())
2266   {
2267     restoreLogger.log_info("Column %s.%s "
2268         "%s AutoIncrementing in the DB", tableName, backupCol->getName(),
2269         (dbCol->getAutoIncrement()?" is":" is not"));
2270     /* TODO : Can this be ignored? */
2271     similarEnough = false;
2272   }
2273 
2274   {
2275     unsigned int backupDefaultLen, dbDefaultLen;
2276     const void *backupDefaultPtr, *dbDefaultPtr;
2277     backupDefaultPtr = backupCol->getDefaultValue(&backupDefaultLen);
2278     dbDefaultPtr = dbCol->getDefaultValue(&dbDefaultLen);
2279 
2280     if ((backupDefaultLen != dbDefaultLen) ||
2281         (memcmp(backupDefaultPtr, dbDefaultPtr, backupDefaultLen) != 0))
2282     {
2283       restoreLogger.log_info("Column %s.%s "
2284           "Default value is different in the DB",
2285           tableName, backupCol->getName());
2286       /* This doesn't matter */
2287     }
2288   }
2289 
2290   if (backupCol->getArrayType() != dbCol->getArrayType())
2291   {
2292     restoreLogger.log_info("Column %s.%s "
2293         "ArrayType is different in the DB",
2294         tableName, backupCol->getName());
2295     similarEnough = false;
2296   }
2297 
2298   if (backupCol->getStorageType() != dbCol->getStorageType())
2299   {
2300     restoreLogger.log_info("Column %s.%s "
2301         "Storagetype is different in the DB",
2302         tableName, backupCol->getName());
2303     /* This doesn't matter */
2304   }
2305 
2306   if (backupCol->getBlobVersion() != dbCol->getBlobVersion())
2307   {
2308     restoreLogger.log_info("Column %s.%s "
2309         "Blob version is different in the DB",
2310         tableName, backupCol->getName());
2311     similarEnough = false;
2312   }
2313 
2314   if (backupCol->getDynamic() != dbCol->getDynamic())
2315   {
2316     restoreLogger.log_info("Column %s.%s "
2317         "%s Dynamic in the DB", tableName, backupCol->getName(),
2318         (dbCol->getDynamic()?" is":" is not"));
2319     /* This doesn't matter */
2320   }
2321 
2322   if (similarEnough)
2323     restoreLogger.log_info("  Difference(s) will be ignored during restore.");
2324   else
2325     restoreLogger.log_info("  Difference(s) cannot be ignored.  Column requires conversion to restore.");
2326 
2327   return similarEnough;
2328 }
2329 
is_array(NDBCOL::Type type)2330 bool is_array(NDBCOL::Type type)
2331 {
2332   if (type == NDBCOL::Char ||
2333       type == NDBCOL::Binary ||
2334       type == NDBCOL::Varchar ||
2335       type == NDBCOL::Varbinary ||
2336       type == NDBCOL::Longvarchar ||
2337       type == NDBCOL::Longvarbinary)
2338   {
2339     return true;
2340   }
2341   return false;
2342 
2343 }
2344 
2345 bool
check_blobs(TableS & tableS)2346 BackupRestore::check_blobs(TableS & tableS)
2347 {
2348    /**
2349    * Nothing to check when printing data
2350    */
2351   if (!m_restore) {
2352     return true;
2353   }
2354 
2355   /**
2356    * For blob tables, check if there is a conversion on any PK of the main table.
2357    * If there is, the blob table PK needs the same conversion as the main table PK.
2358    * Copy the conversion to the blob table.
2359    * If a staging table is used, there may only be a partial conversion done
2360    * during data + log restore
2361    */
2362   if(match_blob(tableS.getTableName()) == -1)
2363     return true;
2364 
2365   int mainColumnId = tableS.getMainColumnId();
2366   const TableS *mainTableS = tableS.getMainTable();
2367   if(mainTableS->m_dictTable->getColumn(mainColumnId)->getBlobVersion() == NDB_BLOB_V1)
2368     return true; /* only to make old ndb_restore_compat* tests on v1 blobs pass */
2369 
2370   /**
2371    * Loop over columns in Backup schema for Blob parts table.
2372    * v2 Blobs have e.g. <Main table PK col(s)>, NDB$PART, NDB$PKID, NDB$DATA
2373    */
2374   for(int i=0; i<tableS.m_dictTable->getNoOfColumns(); i++)
2375   {
2376     NDBCOL *col = tableS.m_dictTable->getColumn(i);
2377     AttributeDesc *attrDesc = tableS.getAttributeDesc(col->getAttrId());
2378 
2379     /* get corresponding pk column in main table, backup + kernel versions */
2380     NDBCOL *backupMainCol = mainTableS->m_dictTable->getColumn(col->getName());
2381     const NdbDictionary::Table* ndbMainTab = get_table(*mainTableS);
2382     const NdbDictionary::Column* ndbMainCol = ndbMainTab->getColumn(col->getName());
2383     const NdbDictionary::Table* ndbTab = get_table(tableS);
2384     const NdbDictionary::Column* ndbCol = ndbTab->getColumn(col->getName());
2385 
2386     if(!backupMainCol || !backupMainCol->getPrimaryKey())
2387     {
2388       /* Finished with Blob part table's pk columns shared with main table
2389        * (Blob parts table always has main table PKs first)
2390        * Now just setting attrId values to match kernel table
2391        */
2392       assert(ndbCol != NULL);
2393       attrDesc->attrId = ndbCol->getColumnNo();
2394       continue;
2395     }
2396 
2397     int mainTableAttrId = backupMainCol->getAttrId();
2398     AttributeDesc *mainTableAttrDesc = mainTableS->getAttributeDesc(mainTableAttrId);
2399 
2400     if (mainTableAttrDesc->m_exclude)
2401     {
2402       /**
2403        * This column is gone from the main table pk, remove it from the
2404        * Blob part table pk here
2405        */
2406       restoreLogger.log_debug("Column excluded from main table, "
2407                               "exclude from blob parts pk");
2408       attrDesc->m_exclude = true;
2409       continue;
2410     }
2411 
2412     /* Column is part of main table pk in backup, check DB */
2413     if (!ndbMainCol->getPrimaryKey())
2414     {
2415       /* This column is still in the main table, but no longer
2416        * as part of the primary key
2417        */
2418       restoreLogger.log_debug("Column moved from pk in main table, "
2419                               "exclude from blob parts pk");
2420       attrDesc->m_exclude = true;
2421       continue;
2422     }
2423 
2424     attrDesc->attrId = ndbCol->getColumnNo();
2425 
2426     if(mainTableAttrDesc->convertFunc)
2427     {
2428       /* copy convertFunc from main table PK to blob table PK */
2429       attrDesc->convertFunc = mainTableAttrDesc->convertFunc;
2430       attrDesc->parameter = malloc(mainTableAttrDesc->parameterSz);
2431       memcpy(attrDesc->parameter, mainTableAttrDesc->parameter, mainTableAttrDesc->parameterSz);
2432     }
2433   }
2434   return true;
2435 }
2436 
2437 bool
table_compatible_check(TableS & tableS)2438 BackupRestore::table_compatible_check(TableS & tableS)
2439 {
2440   if (!m_restore)
2441     return true;
2442 
2443   const char *tablename = tableS.getTableName();
2444 
2445   if(tableS.m_dictTable == NULL){
2446     restoreLogger.log_error("Table %s has no m_dictTable", tablename);
2447     return false;
2448   }
2449   /**
2450    * Ignore blob tables
2451    */
2452   if(match_blob(tablename) >= 0)
2453     return true;
2454 
2455   const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* tableS.m_dictTable);
2456   if ((int) tmptab.m_indexType != (int) NdbDictionary::Index::Undefined)
2457   {
2458     if((int) tmptab.m_indexType == (int) NdbDictionary::Index::UniqueHashIndex)
2459     {
2460       BaseString dummy1, dummy2, indexname;
2461       dissect_index_name(tablename, dummy1, dummy2, indexname);
2462       restoreLogger.log_error( "WARNING: Table %s contains unique index %s."
2463            "This can cause ndb_restore failures with duplicate key errors "
2464            "while restoring data. To avoid duplicate key errors, use "
2465            "--disable-indexes before restoring data and --rebuild-indexes "
2466            "after data is restored.",
2467            tmptab.m_primaryTable.c_str(), indexname.c_str());
2468     }
2469     return true;
2470   }
2471 
2472   BaseString db_name, schema_name, table_name;
2473   if (!dissect_table_name(tablename, db_name, schema_name, table_name)) {
2474     restoreLogger.log_error("Failed to dissect table name %s", tablename);
2475     return false;
2476   }
2477   check_rewrite_database(db_name);
2478 
2479   m_ndb->setDatabaseName(db_name.c_str());
2480   m_ndb->setSchemaName(schema_name.c_str());
2481 
2482   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
2483   const NdbDictionary::Table* tab = dict->getTable(table_name.c_str());
2484   if(tab == 0){
2485     restoreLogger.log_error("Unable to find table: %s error: %u: %s",
2486         table_name.c_str(), dict->getNdbError().code, dict->getNdbError().message);
2487     return false;
2488   }
2489 
2490   /**
2491    * Allowed primary key modifications
2492    *
2493    * Extend pk
2494    *   a) Using existing non-pk non-nullable column(s)
2495    *   b) NOT SUPPORTED Using new defaulted columns
2496    *
2497    * Contract pk
2498    *   c) Leaving columns in the table
2499    *   d) Removing columns entirely
2500    *
2501    * b) not currently supported as
2502    *   - NdbApi does not represent default-valued pk
2503    *     columns
2504    *   - NdbApi does not have a concept of a default-init
2505    *     value for a type like MySQLD
2506    *   In future these concepts could be added to NdbApi
2507    *   or even to ndb_restore.
2508    *   An autoincrement column could also be considered a
2509    *   type of defaulted column in a future extension.
2510    *
2511    * Note that
2512    *   a) + c) are symmetric
2513    *   b) + d) are symmetric
2514    *
2515    * Since b) is not supported, d) must be used with care
2516    * as it is not 'reversible' in e.g. a rollback / replication
2517    * use case.
2518    *
2519    * Reducing or demoting the pk columns has the risk that
2520    * the reduced pk is no longer unique across the set of
2521    * key values in the backup.
2522    * This is a user responsibility to avoid, as it is today
2523    * when a pk column undergoes a lossy type demotion.
2524    *
2525    * When INSERTing rows (from .Data or .Log), all column
2526    * values are present, so support is trivial.
2527    *
2528    * PK mapping index
2529    *
2530    * For UPDATE and DELETE, c) and d) are trivial, but
2531    * a) requires some way to identify which row to
2532    * update or delete.  This is managed using an optional
2533    * secondary index on the old primary key column(s).
2534    *
2535    * Changes to PK columns in log
2536    *
2537    * For case a), it is possible that a backup log contains
2538    * UPDATEs to the columns which are becoming part
2539    * of the primary key.  When applying those to the new
2540    * table schema, they are mapped to separate DELETE + INSERT
2541    * operations.
2542    *
2543    * Blobs
2544    *
2545    * Blob columns have part tables which share the primary key of
2546    * the main table, but do not have all of the other columns.
2547    *
2548    * For a), this would require that a column from the main table row
2549    * is found and used when inserting/updating/deleting a part table
2550    * row.
2551    *
2552    * This is not practical for ndb_restore to do inline in a single
2553    * pass, so for pk changes to tables with Blobs, we require the
2554    * use of a staging table to achieve this transform.
2555    */
2556   bool full_pk_present_in_kernel = true;
2557   bool pk_extended_in_kernel = false;
2558   bool table_has_blob_parts = false;
2559 
2560 
2561   /**
2562    * remap column(s) based on column-names
2563    * Loop over columns recorded in the Backup
2564    */
2565   for (int i = 0; i<tableS.m_dictTable->getNoOfColumns(); i++)
2566   {
2567     AttributeDesc * attr_desc = tableS.getAttributeDesc(i);
2568     const NDBCOL * col_in_backup = tableS.m_dictTable->getColumn(i);
2569     const NDBCOL * col_in_kernel = tab->getColumn(col_in_backup->getName());
2570     const bool col_in_backup_pk = col_in_backup->getPrimaryKey();
2571 
2572     if (col_in_kernel == 0)
2573     {
2574       /* Col in backup does not exist in kernel */
2575 
2576       if ((m_tableChangesMask & TCM_EXCLUDE_MISSING_COLUMNS) == 0)
2577       {
2578         restoreLogger.log_error( "Missing column(%s.%s) in DB and "
2579             "exclude-missing-columns not specified",
2580             tableS.m_dictTable->getName(), col_in_backup->getName());
2581         return false;
2582       }
2583 
2584       restoreLogger.log_info("Column in backup (%s.%s) missing in DB."
2585           " Excluding column from restore.",
2586           tableS.m_dictTable->getName(), col_in_backup->getName());
2587 
2588       attr_desc->m_exclude = true;
2589 
2590       if (col_in_backup_pk)
2591       {
2592         restoreLogger.log_info("  Missing column (%s.%s) in DB was "
2593                                "part of primary key in Backup.  "
2594                                "Risk of row loss or merge if remaining "
2595                                "key(s) not unique.",
2596                                tableS.m_dictTable->getName(),
2597                                col_in_backup->getName());
2598 
2599         full_pk_present_in_kernel = false;
2600       }
2601     }
2602     else
2603     {
2604       /* Col in backup exists in kernel */
2605       attr_desc->attrId = col_in_kernel->getColumnNo();
2606 
2607       {
2608         const bool col_in_kernel_pk = col_in_kernel->getPrimaryKey();
2609 
2610         if (col_in_backup_pk)
2611         {
2612           if (!col_in_kernel_pk)
2613           {
2614             restoreLogger.log_info("Column (%s.%s) is part of "
2615                                    "primary key in Backup but "
2616                                    "not part of primary key in DB. "
2617                                    " Risk of row loss or merge if remaining "
2618                                    " key(s) not unique.",
2619                                    tableS.m_dictTable->getName(),
2620                                    col_in_backup->getName());
2621 
2622             full_pk_present_in_kernel = false;
2623           }
2624         }
2625         else
2626         {
2627           if (col_in_kernel_pk)
2628           {
2629             restoreLogger.log_info("Column (%s.%s) is not part of "
2630                                    "primary key in Backup but "
2631                                    "changed to be part of primary "
2632                                    "key in DB.",
2633                                    tableS.m_dictTable->getName(),
2634                                    col_in_backup->getName());
2635 
2636             pk_extended_in_kernel = true;
2637           }
2638         }
2639 
2640         /* Check for blobs with part tables */
2641         switch (col_in_kernel->getType())
2642         {
2643         case NDB_TYPE_BLOB:
2644         case NDB_TYPE_TEXT:
2645           if (col_in_kernel->getPartSize() > 0)
2646           {
2647             table_has_blob_parts = true;
2648           }
2649         default:
2650           break;
2651         }
2652       }
2653     }
2654   }
2655 
2656   /* Loop over columns present in the DB */
2657   for (int i = 0; i<tab->getNoOfColumns(); i++)
2658   {
2659     const NDBCOL * col_in_kernel = tab->getColumn(i);
2660     const NDBCOL * col_in_backup =
2661       tableS.m_dictTable->getColumn(col_in_kernel->getName());
2662 
2663     if (col_in_backup == 0)
2664     {
2665       /* New column in database */
2666       if ((m_tableChangesMask & TCM_EXCLUDE_MISSING_COLUMNS) == 0)
2667       {
2668         restoreLogger.log_error( "Missing column(%s.%s) in backup and "
2669             "exclude-missing-columns not specified",
2670              tableS.m_dictTable->getName(), col_in_kernel->getName());
2671         return false;
2672       }
2673 
2674       /**
2675        * only nullable or defaulted non primary key columns can be missing from backup
2676        *
2677        */
2678       if (col_in_kernel->getPrimaryKey() ||
2679           ((col_in_kernel->getNullable() == false) &&
2680            (col_in_kernel->getDefaultValue() == NULL)))
2681       {
2682         restoreLogger.log_error( "Missing column(%s.%s) in backup "
2683             " is primary key or not nullable or defaulted in DB",
2684             tableS.m_dictTable->getName(), col_in_kernel->getName());
2685         return false;
2686       }
2687 
2688       restoreLogger.log_info("Column in DB (%s.%s) missing in Backup."
2689           " Will be set to %s.",
2690           tableS.m_dictTable->getName(), col_in_kernel->getName(),
2691            ((col_in_kernel->getDefaultValue() == NULL) ?
2692                                             "Null" : "Default value"));
2693     }
2694   }
2695 
2696   /* Check pk changes against flags */
2697 
2698   if (pk_extended_in_kernel)
2699   {
2700     if ((m_tableChangesMask & TCM_ALLOW_PK_CHANGES) == 0)
2701     {
2702       restoreLogger.log_error("Error : Primary key extended in DB without "
2703                               "allow-pk-changes.");
2704       return false;
2705     }
2706 
2707     if (m_restore && !m_disable_indexes)
2708     {
2709       /**
2710        * Prefer to use disable_indexes here as it supports safer use of
2711        * a single shared mapping index rather than per
2712        * ndb_restore / slice / thread indices
2713        */
2714       restoreLogger.log_info("Warning : Primary key extended in DB with "
2715                              "allow-pk-changes, and --restore-data but without "
2716                              "--disable-indexes.  A final --rebuild-indexes step "
2717                              "is required to drop any mapping indices created.");
2718       /**
2719        * This could be a hard error (requiring --disable-indexes), but
2720        * for now it is a warning, allowing serialised use of ndb_restore
2721        * without --disable-indexes and --rebuild-indexes
2722        */
2723       //return false;
2724     }
2725 
2726     if (table_has_blob_parts)
2727     {
2728       /**
2729        * Problem as the blob parts tables will not have the
2730        * non-pk column(s) required to do a 1-pass reformat.
2731        * This requires staging tables.
2732        */
2733       restoreLogger.log_info("Table %s has Blob/Text columns with part tables "
2734                              "and an extended primary key.  This requires "
2735                              "staging.", tableS.getTableName());
2736       tableS.m_staging = true;
2737     }
2738   }
2739 
2740   if (!full_pk_present_in_kernel)
2741   {
2742     if ((m_tableChangesMask & TCM_ALLOW_PK_CHANGES) == 0)
2743     {
2744       restoreLogger.log_error("Error : Primary key reduced in DB without "
2745                               "allow-pk-changes.");
2746       return false;
2747     }
2748     if ((m_tableChangesMask & TCM_ATTRIBUTE_DEMOTION) == 0)
2749     {
2750       restoreLogger.log_error("Error : Primary key reduced in DB without "
2751                               "lossy-conversions.");
2752       return false;
2753     }
2754   }
2755 
2756   if (pk_extended_in_kernel ||
2757       !full_pk_present_in_kernel)
2758   {
2759     if (tab->getFragmentType() == NdbDictionary::Object::UserDefined)
2760     {
2761       /**
2762        * Note
2763        *
2764        * 1.  Type promotion/demotion on distribution keys may also
2765        *     affect stored hash for user defined partitioning
2766        *     As we don't know the function mapping we cannot allow
2767        *     this.
2768        *
2769        * 2.  Could allow changes to non-distribution primary keys
2770        *     if there are any, but not for now.
2771        */
2772       restoreLogger.log_error("Error : Primary key changes to table with "
2773                               "user-defined partitioning not supported as "
2774                               "new value of stored distribution keys "
2775                               "potentially unknown.");
2776       return false;
2777     }
2778   }
2779 
2780   tableS.m_pk_extended = pk_extended_in_kernel;
2781 
2782   AttrCheckCompatFunc attrCheckCompatFunc = NULL;
2783   for(int i = 0; i<tableS.m_dictTable->getNoOfColumns(); i++)
2784   {
2785     AttributeDesc * attr_desc = tableS.getAttributeDesc(i);
2786     attr_desc->staging = false;
2787     if (attr_desc->m_exclude)
2788       continue;
2789 
2790     const NDBCOL * col_in_kernel = tab->getColumn(attr_desc->attrId);
2791     const NDBCOL * col_in_backup = tableS.m_dictTable->getColumn(i);
2792 
2793     if(column_compatible_check(tablename,
2794                                col_in_backup,
2795                                col_in_kernel))
2796     {
2797       continue;
2798     }
2799 
2800     NDBCOL::Type type_in_backup = col_in_backup->getType();
2801     NDBCOL::Type type_in_kernel = col_in_kernel->getType();
2802     const bool col_in_kernel_pk = col_in_kernel->getPrimaryKey();
2803     attrCheckCompatFunc = get_attr_check_compatability(type_in_backup,
2804                                                        type_in_kernel);
2805     AttrConvType compat
2806       = (attrCheckCompatFunc == NULL ? ACT_UNSUPPORTED
2807          : attrCheckCompatFunc(*col_in_backup, *col_in_kernel));
2808     switch (compat) {
2809     case ACT_UNSUPPORTED:
2810       {
2811         restoreLogger.log_error("Table: %s column: %s"
2812             " incompatible with kernel's definition. "
2813             "Conversion not possible",
2814             tablename, col_in_backup->getName());
2815         return false;
2816       }
2817     case ACT_PRESERVING:
2818       if ((m_tableChangesMask & TCM_ATTRIBUTE_PROMOTION) == 0)
2819       {
2820         restoreLogger.log_error("Table: %s column: %s"
2821             " promotable to kernel's definition but option"
2822             " promote-attributes not specified",
2823             tablename, col_in_backup->getName());
2824         return false;
2825       }
2826       break;
2827     case ACT_LOSSY:
2828       if ((m_tableChangesMask & TCM_ATTRIBUTE_DEMOTION) == 0)
2829       {
2830         restoreLogger.log_error("Table: %s column: %s"
2831             " convertable to kernel's definition but option"
2832             " lossy-conversions not specified",
2833             tablename, col_in_backup->getName());
2834         return false;
2835       }
2836       if (col_in_kernel_pk)
2837       {
2838         restoreLogger.log_info("Warning : Table: %s column: %s "
2839                                "is part of primary key and involves "
2840                                "a lossy conversion.  Risk of row loss "
2841                                "or merge if demoted key(s) not unique.",
2842                                tablename, col_in_backup->getName());
2843       }
2844       break;
2845     case ACT_STAGING_PRESERVING:
2846       if ((m_tableChangesMask & TCM_ATTRIBUTE_PROMOTION) == 0)
2847       {
2848         restoreLogger.log_error("Table: %s column: %s"
2849             " promotable to kernel's definition via staging but option"
2850             " promote-attributes not specified",
2851             tablename, col_in_backup->getName());
2852         return false;
2853       }
2854       /**
2855        * Staging lossy conversions should be safe w.r.t pk uniqueness
2856        * as staging conversion rejects duplicate keys
2857        */
2858       attr_desc->staging = true;
2859       tableS.m_staging = true;
2860       tableS.m_stagingFlags |= Ndb_move_data::Opts::MD_ATTRIBUTE_PROMOTION;
2861       break;
2862     case ACT_STAGING_LOSSY:
2863       if ((m_tableChangesMask & TCM_ATTRIBUTE_DEMOTION) == 0)
2864       {
2865         restoreLogger.log_error("Table: %s column: %s"
2866            " convertable to kernel's definition via staging but option"
2867            " lossy-conversions not specified",
2868             tablename, col_in_backup->getName());
2869         return false;
2870       }
2871       attr_desc->staging = true;
2872       tableS.m_staging = true;
2873       tableS.m_stagingFlags |= Ndb_move_data::Opts::MD_ATTRIBUTE_DEMOTION;
2874       break;
2875     default:
2876       restoreLogger.log_error("internal error: illegal value of compat = %u", compat);
2877       assert(false);
2878       return false;
2879     };
2880 
2881     attr_desc->convertFunc = get_convert_func(type_in_backup,
2882                                               type_in_kernel);
2883     Uint32 m_attrSize = NdbColumnImpl::getImpl(*col_in_kernel).m_attrSize;
2884     Uint32 m_arraySize = NdbColumnImpl::getImpl(*col_in_kernel).m_arraySize;
2885 
2886     // use a char_n_padding_struct to pass length information to convert()
2887     if (type_in_backup == NDBCOL::Char ||
2888         type_in_backup == NDBCOL::Binary ||
2889         type_in_backup == NDBCOL::Bit ||
2890         type_in_backup == NDBCOL::Varchar ||
2891         type_in_backup == NDBCOL::Longvarchar ||
2892         type_in_backup == NDBCOL::Varbinary ||
2893         type_in_backup == NDBCOL::Longvarbinary)
2894     {
2895       unsigned int size = sizeof(struct char_n_padding_struct) +
2896         m_attrSize * m_arraySize;
2897       struct char_n_padding_struct *s = (struct char_n_padding_struct *)
2898         malloc(size +2);
2899       if (!s)
2900       {
2901         restoreLogger.log_error("No more memory available!");
2902         exitHandler();
2903       }
2904       s->n_old = (attr_desc->size * attr_desc->arraySize) / 8;
2905       s->n_new = m_attrSize * m_arraySize;
2906       memset(s->new_row, 0 , m_attrSize * m_arraySize + 2);
2907       attr_desc->parameter = s;
2908       attr_desc->parameterSz = size + 2;
2909     }
2910     else if (type_in_backup == NDBCOL::Time ||
2911              type_in_backup == NDBCOL::Datetime ||
2912              type_in_backup == NDBCOL::Timestamp ||
2913              type_in_backup == NDBCOL::Time2 ||
2914              type_in_backup == NDBCOL::Datetime2 ||
2915              type_in_backup == NDBCOL::Timestamp2)
2916     {
2917       const unsigned int maxdata = 8;
2918       unsigned int size = sizeof(struct char_n_padding_struct) + maxdata;
2919       struct char_n_padding_struct *s = (struct char_n_padding_struct *)
2920         malloc(size);
2921       if (!s)
2922       {
2923         restoreLogger.log_error("No more memory available!");
2924         exitHandler();
2925       }
2926       s->n_old = col_in_backup->getPrecision();
2927       s->n_new = col_in_kernel->getPrecision();
2928       memset(s->new_row, 0 , maxdata);
2929       attr_desc->parameter = s;
2930     }
2931     else
2932     {
2933       unsigned int size = m_attrSize * m_arraySize;
2934       attr_desc->parameter = malloc(size + 2);
2935       if (!attr_desc->parameter)
2936       {
2937         restoreLogger.log_error("No more memory available!");
2938         exitHandler();
2939       }
2940       memset(attr_desc->parameter, 0, size + 2);
2941       attr_desc->parameterSz = size + 2;
2942     }
2943 
2944     restoreLogger.log_info("Data for column %s.%s"
2945         " will be converted from Backup type into DB type.",
2946         tablename, col_in_backup->getName());
2947   }
2948 
2949   if (tableS.m_staging)
2950   {
2951     // fully qualified name, dissected at createTable()
2952     // For mt-restore, each thread creates its own staging table.
2953     // To ensure that each thread has a unique staging table name,
2954     // the tablename contains m_instance_name=nodeID.threadID
2955     BaseString& stagingName = tableS.m_stagingName;
2956     stagingName.assfmt("%s%s%s", tableS.getTableName(),
2957                        NDB_RESTORE_STAGING_SUFFIX, m_instance_name);
2958     NdbDictionary::Table* stagingTable = new NdbDictionary::Table;
2959 
2960     // handle very many rows
2961     stagingTable->setFragmentType(tab->getFragmentType());
2962     // XXX not sure about this
2963     if (tab->getFragmentType() == NdbDictionary::Table::HashMapPartition &&
2964         !tab->getDefaultNoPartitionsFlag())
2965     {
2966       stagingTable->setDefaultNoPartitionsFlag(false);
2967       stagingTable->setFragmentCount(tab->getFragmentCount());
2968       stagingTable->setFragmentData(0, 0);
2969     }
2970 
2971     // if kernel is DD, staging will be too
2972     bool kernel_is_dd = false;
2973     Uint32 ts_id = ~(Uint32)0;
2974     if (tab->getTablespace(&ts_id))
2975     {
2976       // must be an initialization
2977       NdbDictionary::Tablespace ts = dict->getTablespace(ts_id);
2978       const char* ts_name = ts.getName();
2979       // how to detect error?
2980       if (strlen(ts_name) == 0)
2981       {
2982         restoreLogger.log_error("Kernel table %s: "
2983             "Failed to fetch tablespace id=%u: %u:%s",
2984             tablename, ts_id, dict->getNdbError().code, dict->getNdbError().message);
2985         return false;
2986       }
2987       restoreLogger.log_info("Kernel table %s tablespace %s",
2988           tablename, ts_name);
2989       stagingTable->setTablespaceName(ts_name);
2990       kernel_is_dd = true;
2991     }
2992 
2993     /*
2994      * Staging table is the table in backup, omit excluded columns.
2995      * Reset column mappings and convert methods.
2996      */
2997     int j = 0;
2998     for (int i = 0; i < tableS.m_dictTable->getNoOfColumns(); i++)
2999     {
3000       AttributeDesc * attr_desc = tableS.getAttributeDesc(i);
3001       const NDBCOL * col_in_backup = tableS.m_dictTable->getColumn(i);
3002       if (attr_desc->m_exclude)
3003         continue;
3004       attr_desc->attrId = (uint32)(j++);
3005       if(attr_desc->convertFunc)
3006       {
3007         const NDBCOL * col_in_kernel = tab->getColumn(col_in_backup->getName());
3008 
3009         // Skip built-in conversions from smaller array types
3010         // to larger array types so that they are handled by staging.
3011         // This prevents staging table from growing too large and
3012         // causing ndb_restore to fail with error 738: record too big.
3013         NDBCOL::Type type_in_backup = col_in_backup->getType();
3014         NDBCOL::Type type_in_kernel = col_in_kernel->getType();
3015         if(is_array(type_in_backup) && is_array(type_in_kernel) &&
3016            col_in_kernel->getLength() > col_in_backup->getLength())
3017         {
3018           stagingTable->addColumn(*col_in_backup);
3019           attr_desc->convertFunc = NULL;
3020           attr_desc->staging = true;
3021           tableS.m_stagingFlags |= Ndb_move_data::Opts::MD_ATTRIBUTE_PROMOTION;
3022         }
3023         else
3024         {
3025           // Add column of destination type to staging table so that
3026           // built-in conversion is done while loading data into
3027           // staging table.
3028           stagingTable->addColumn(*col_in_kernel);
3029         }
3030       }
3031       else
3032       {
3033         stagingTable->addColumn(*col_in_backup);
3034         attr_desc->convertFunc = NULL;
3035       }
3036     }
3037 
3038     if (m_tableChangesMask & TCM_EXCLUDE_MISSING_COLUMNS)
3039       tableS.m_stagingFlags |= Ndb_move_data::Opts::MD_EXCLUDE_MISSING_COLUMNS;
3040 
3041     tableS.m_stagingTable = stagingTable;
3042   }
3043 
3044   return true;
3045 }
3046 
3047 bool
createSystable(const TableS & tables)3048 BackupRestore::createSystable(const TableS & tables){
3049   if (!m_restore && !m_metadata_work_requested)
3050     return true;
3051   const char *tablename = tables.getTableName();
3052 
3053   if( strcmp(tablename, NDB_REP_DB "/def/" NDB_APPLY_TABLE) != 0 &&
3054       strcmp(tablename, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE) != 0 )
3055   {
3056     // Dont restore any other system table than those listed above
3057     return true;
3058   }
3059 
3060   BaseString db_name, schema_name, table_name;
3061   if (!dissect_table_name(tablename, db_name, schema_name, table_name)) {
3062     return false;
3063   }
3064   // do not rewrite database for system tables:
3065   // check_rewrite_database(db_name);
3066 
3067   m_ndb->setDatabaseName(db_name.c_str());
3068   m_ndb->setSchemaName(schema_name.c_str());
3069 
3070   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
3071   if( dict->getTable(table_name.c_str()) != NULL ){
3072     return true;
3073   }
3074   return table(tables);
3075 }
3076 
3077 bool
table(const TableS & table)3078 BackupRestore::table(const TableS & table){
3079   if (!m_restore && !m_metadata_work_requested)
3080     return true;
3081 
3082   const char * name = table.getTableName();
3083 
3084   /**
3085    * Ignore blob tables
3086    */
3087   if(match_blob(name) >= 0)
3088     return true;
3089 
3090   const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
3091   if ((int) tmptab.m_indexType != (int) NdbDictionary::Index::Undefined){
3092     m_indexes.push_back(table.m_dictTable);
3093     return true;
3094   }
3095 
3096   BaseString db_name, schema_name, table_name;
3097   if (!dissect_table_name(name, db_name, schema_name, table_name)) {
3098     return false;
3099   }
3100   check_rewrite_database(db_name);
3101 
3102   m_ndb->setDatabaseName(db_name.c_str());
3103   m_ndb->setSchemaName(schema_name.c_str());
3104 
3105   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
3106   if(m_restore_meta)
3107   {
3108     NdbDictionary::Table* tab = table.m_dictTable;
3109     NdbDictionary::Table copy(*tab);
3110 
3111     copy.setName(table_name.c_str());
3112     Uint32 id;
3113     if (copy.getTablespace(&id))
3114     {
3115       NdbDictionary::Tablespace* ts = m_tablespaces[id];
3116       restoreLogger.log_debug("Connecting %s to tablespace oldid: %u newid: %u",
3117                               name, id, ts->getObjectId());
3118       copy.setTablespace(* ts);
3119     }
3120 
3121     NdbDictionary::Object::PartitionBalance part_bal;
3122     part_bal = copy.getPartitionBalance();
3123     assert(part_bal != 0);
3124     if (part_bal == NdbDictionary::Object::PartitionBalance_ForRPByLDM)
3125     {
3126       /**
3127        * For backups created by versions prior to the introduction of
3128        * PartitionBalance, we may have picked up the default partition
3129        * balance member, but we should have a specific setting.
3130        */
3131       if (!copy.getDefaultNoPartitionsFlag())
3132       {
3133         /* This is actually a specifically partitioned table, check that
3134          * it has a specific fragment count we can reuse
3135          */
3136         assert(copy.getFragmentCount() != 0);
3137         part_bal = NdbDictionary::Object::PartitionBalance_Specific;
3138         copy.setPartitionBalance(part_bal);
3139         restoreLogger.log_info("Setting %s to specific partition balance with "
3140                                "%u fragments.",
3141                                name, copy.getFragmentCount());
3142       }
3143     }
3144     if (part_bal != NdbDictionary::Object::PartitionBalance_Specific)
3145     {
3146       /* Let the partition balance decide partition count */
3147       copy.setFragmentCount(0);
3148     }
3149     if (copy.getFragmentType() == NdbDictionary::Object::HashMapPartition)
3150     {
3151       /**
3152        * The only specific information we have in specific hash map
3153        * partitions is really the number of fragments. Other than
3154        * that we can use a new hash map. We won't be able to restore
3155        * in exactly the same distribution anyways. So we set the
3156        * hash map to be non-existing and thus it will be created
3157        * as part of creating the table. The fragment count is already
3158        * set in the copy object.
3159        *
3160        * Use the PartitionBalance to resize table for this cluster...
3161        *   set "null" hashmap
3162        */
3163       NdbDictionary::HashMap nullMap;
3164       assert(Uint32(nullMap.getObjectId()) == RNIL);
3165       assert(Uint32(nullMap.getObjectVersion()) == ~Uint32(0));
3166       copy.setHashMap(nullMap);
3167     }
3168     else if (copy.getDefaultNoPartitionsFlag())
3169     {
3170       /*
3171         Table was defined with default number of partitions. We can restore
3172         it with whatever is the default in this cluster.
3173         We use the max_rows parameter in calculating the default number.
3174       */
3175       Uint32 no_nodes = m_cluster_connection->no_db_nodes();
3176       copy.setFragmentCount(get_no_fragments(copy.getMaxRows(),
3177                             no_nodes));
3178       set_default_nodegroups(&copy);
3179     }
3180     else
3181     {
3182       /*
3183         Table was defined with specific number of partitions. It should be
3184         restored with the same number of partitions. It will either be
3185         restored in the same node groups as when backup was taken or by
3186         using a node group map supplied to the ndb_restore program.
3187       */
3188       Vector<Uint32> new_array;
3189       Uint16 no_parts = copy.getFragmentCount();
3190       new_array.assign(copy.getFragmentData(), no_parts);
3191       if (map_nodegroups(new_array.getBase(), no_parts))
3192       {
3193         if (translate_frm(&copy))
3194         {
3195           restoreLogger.log_error("Create table %s failed\n"
3196               "Translate frm error", table.getTableName());
3197           return false;
3198         }
3199       }
3200       copy.setFragmentData(new_array.getBase(), no_parts);
3201     }
3202 
3203     /**
3204      * Force of varpart was introduced in 5.1.18, telco 6.1.7 and 6.2.1
3205      * Since default from mysqld is to add force of varpart (disable with
3206      * ROW_FORMAT=FIXED) we force varpart onto tables when they are restored
3207      * from backups taken with older versions. This will be wrong if
3208      * ROW_FORMAT=FIXED was used on original table, however the likelyhood of
3209      * this is low, since ROW_FORMAT= was a NOOP in older versions.
3210      */
3211 
3212     if (table.getBackupVersion() < MAKE_VERSION(5,1,18))
3213       copy.setForceVarPart(true);
3214     else if (getMajor(table.getBackupVersion()) == 6 &&
3215              (table.getBackupVersion() < MAKE_VERSION(6,1,7) ||
3216               table.getBackupVersion() == MAKE_VERSION(6,2,0)))
3217       copy.setForceVarPart(true);
3218 
3219     /*
3220       update min and max rows to reflect the table, this to
3221       ensure that memory is allocated properly in the ndb kernel
3222     */
3223     copy.setMinRows(table.getNoOfRecords());
3224     if (tab->getMaxRows() != 0 &&
3225         table.getNoOfRecords() > copy.getMaxRows())
3226     {
3227       copy.setMaxRows(table.getNoOfRecords());
3228     }
3229 
3230     NdbTableImpl &tableImpl = NdbTableImpl::getImpl(copy);
3231     if (table.getBackupVersion() < MAKE_VERSION(5,1,0) && !m_no_upgrade){
3232       for(int i= 0; i < copy.getNoOfColumns(); i++)
3233       {
3234         NdbDictionary::Column::Type t = copy.getColumn(i)->getType();
3235 
3236         if (t == NdbDictionary::Column::Varchar ||
3237           t == NdbDictionary::Column::Varbinary)
3238           tableImpl.getColumn(i)->setArrayType(NdbDictionary::Column::ArrayTypeShortVar);
3239         if (t == NdbDictionary::Column::Longvarchar ||
3240           t == NdbDictionary::Column::Longvarbinary)
3241           tableImpl.getColumn(i)->setArrayType(NdbDictionary::Column::ArrayTypeMediumVar);
3242       }
3243     }
3244 
3245     if (dict->createTable(copy) == -1)
3246     {
3247       restoreLogger.log_error("Create table `%s` failed: %u: %s",
3248           table.getTableName(), dict->getNdbError().code, dict->getNdbError().message);
3249       if (dict->getNdbError().code == 771)
3250       {
3251         /*
3252           The user on the cluster where the backup was created had specified
3253           specific node groups for partitions. Some of these node groups
3254           didn't exist on this cluster. We will warn the user of this and
3255           inform him of his option.
3256         */
3257         restoreLogger.log_error("The node groups defined in the table didn't exist in this"
3258             " cluster. \nThere is an option to use the"
3259             " the parameter ndb-nodegroup-map to define a mapping from"
3260             " the old nodegroups to new nodegroups");
3261       }
3262       return false;
3263     }
3264     info.setLevel(254);
3265     restoreLogger.log_info("Successfully restored table `%s`",
3266         table.getTableName());
3267   }
3268 
3269   // In mt-restore, many restore-threads may be querying DICT for the
3270   // same table at one time, which could result in failures. Add retries.
3271   const NdbDictionary::Table* tab = 0;
3272   for (int retries = 0; retries < 10; retries++)
3273   {
3274     tab = dict->getTable(table_name.c_str());
3275     if (tab)
3276       break;
3277     else
3278     {
3279       const NdbError& error = dict->getNdbError();
3280       if (error.status != NdbError::TemporaryError)
3281         NdbSleep_MilliSleep((ndb_rand() % 10) * 10);
3282       else
3283         break;
3284     }
3285   }
3286   if(tab == 0)
3287   {
3288     restoreLogger.log_error("Unable to find table: `%s` error : %u: %s",
3289         table_name.c_str(), dict->getNdbError().code, dict->getNdbError().message);
3290     return false;
3291   }
3292   if (m_restore_meta)
3293   {
3294     if (tab->getNoOfAutoIncrementColumns())
3295     {
3296       // Ensure that auto-inc metadata is created in database
3297       Uint32 retries = 10;
3298       while (retries--)
3299       {
3300         int res = m_ndb->setAutoIncrementValue(tab,
3301                                                Uint64(1),
3302                                                false);
3303         if (res == 0)
3304         {
3305           break;
3306         }
3307 
3308         if (m_ndb->getNdbError().status == NdbError::TemporaryError)
3309         {
3310           NdbSleep_MilliSleep(50);
3311           continue;
3312         }
3313         restoreLogger.log_error("Failed to create auto increment value "
3314                                 "for table : %s error : %u %s.",
3315                                 table_name.c_str(),
3316                                 m_ndb->getNdbError().code,
3317                                 m_ndb->getNdbError().message);
3318         return false;
3319       }
3320     }
3321   }
3322   const Uint32 orig_table_id = table.m_dictTable->getTableId();
3323   const NdbDictionary::Table* null = 0;
3324   m_new_tables.fill(orig_table_id, null);
3325   m_new_tables[orig_table_id] = tab;
3326   Uint64 zeroAutoVal = 0;
3327   m_auto_values.fill(orig_table_id, zeroAutoVal);
3328 
3329   m_n_tables++;
3330 
3331   return true;
3332 }
3333 
3334 bool
fk(Uint32 type,const void * ptr)3335 BackupRestore::fk(Uint32 type, const void * ptr)
3336 {
3337   if (!m_restore_meta && !m_rebuild_indexes && !m_disable_indexes)
3338     return true;
3339 
3340   // only record FKs, create in endOfTables()
3341   switch (type){
3342   case DictTabInfo::ForeignKey:
3343   {
3344     const NdbDictionary::ForeignKey* fk_ptr =
3345       (const NdbDictionary::ForeignKey*)ptr;
3346     const NdbDictionary::Table *child = NULL, *parent=NULL;
3347     BaseString db_name, dummy, table_name;
3348     //check if the child table is a part of the restoration
3349     if (!dissect_table_name(fk_ptr->getChildTable(),
3350                        db_name, dummy, table_name))
3351       return false;
3352     for(unsigned i = 0; i < m_new_tables.size(); i++)
3353     {
3354       if(m_new_tables[i] == NULL)
3355         continue;
3356       BaseString new_table_name(m_new_tables[i]->getMysqlName());
3357       //table name in format db-name/table-name
3358       Vector<BaseString> split;
3359       if (new_table_name.split(split, "/") != 2) {
3360         continue;
3361       }
3362       if(db_name == split[0] && table_name == split[1])
3363       {
3364         child = m_new_tables[i];
3365         break;
3366       }
3367     }
3368     if(child)
3369     {
3370       //check if parent exists
3371       if (!dissect_table_name(fk_ptr->getParentTable(),
3372                               db_name, dummy, table_name))
3373         return false;
3374       m_ndb->setDatabaseName(db_name.c_str());
3375       NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
3376       parent = dict->getTable(table_name.c_str());
3377       if (parent == 0)
3378       {
3379         restoreLogger.log_error("Foreign key %s "
3380             "parent table %s.%s not found: %u: %s",
3381             fk_ptr->getName(),
3382             db_name.c_str(),
3383             table_name.c_str(),
3384             dict->getNdbError().code, dict->getNdbError().message);
3385         return false;
3386       }
3387       m_fks.push_back(fk_ptr);
3388       restoreLogger.log_info("Save FK %s", fk_ptr->getName());
3389     }
3390     return true;
3391     break;
3392   }
3393   default:
3394   {
3395     break;
3396   }
3397   }
3398   return true;
3399 }
3400 
3401 bool
endOfTables()3402 BackupRestore::endOfTables(){
3403   if(!m_restore_meta && !m_rebuild_indexes && !m_disable_indexes)
3404     return true;
3405 
3406   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
3407   for(unsigned i = 0; i<m_indexes.size(); i++){
3408     NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
3409 
3410     BaseString db_name, schema_name, table_name;
3411     if (!dissect_table_name(indtab.m_primaryTable.c_str(),
3412                             db_name, schema_name, table_name)) {
3413       return false;
3414     }
3415     check_rewrite_database(db_name);
3416 
3417     m_ndb->setDatabaseName(db_name.c_str());
3418     m_ndb->setSchemaName(schema_name.c_str());
3419 
3420     const NdbDictionary::Table * prim = dict->getTable(table_name.c_str());
3421     if(prim == 0){
3422       restoreLogger.log_error("Unable to find base table `%s` for index `%s`",
3423           table_name.c_str(), indtab.getName());
3424       if (ga_skip_broken_objects)
3425       {
3426         continue;
3427       }
3428       return false;
3429     }
3430     NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
3431     NdbIndexImpl* idx;
3432     Vector<BaseString> split_idx;
3433     {
3434       BaseString tmp(indtab.getName());
3435       if (tmp.split(split_idx, "/") != 4)
3436       {
3437         restoreLogger.log_error("Invalid index name format `%s`",
3438             indtab.getName());
3439         return false;
3440       }
3441     }
3442     if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
3443     {
3444       restoreLogger.log_error("Failed to create index `%s` on `%s`",
3445           split_idx[3].c_str(), table_name.c_str());
3446 	return false;
3447     }
3448     idx->setName(split_idx[3].c_str());
3449     if (m_restore_meta && !m_disable_indexes && !m_rebuild_indexes)
3450     {
3451       bool done = false;
3452       for(unsigned int retries = 0; retries < 11; retries++)
3453       {
3454         if(dict->createIndex(* idx) == 0)
3455         {
3456           done = true;  // success
3457           break;
3458         }
3459         else if(dict->getNdbError().status == NdbError::TemporaryError)
3460         {
3461           restoreLogger.log_error("retry sleep 50 ms on error %u",
3462                       dict->getNdbError().code);
3463           NdbSleep_MilliSleep(50);
3464           continue;  // retry on temporary error
3465         }
3466         else
3467         {
3468           break; // error out on permanent error
3469         }
3470       }
3471       if(!done)
3472       {
3473         delete idx;
3474         restoreLogger.log_error("Failed to create index `%s` on `%s`: %u: %s",
3475             split_idx[3].c_str(), table_name.c_str(), dict->getNdbError().code, dict->getNdbError().message);
3476         return false;
3477       }
3478       restoreLogger.log_info("Successfully created index `%s` on `%s`",
3479             split_idx[3].c_str(), table_name.c_str());
3480     }
3481     else if (m_disable_indexes)
3482     {
3483       int res = dict->dropIndex(idx->getName(), prim->getName());
3484       if (res == 0)
3485       {
3486       restoreLogger.log_info("Dropped index `%s` on `%s`",
3487             split_idx[3].c_str(), table_name.c_str());
3488       }
3489     }
3490     Uint32 id = prim->getObjectId();
3491     if (m_index_per_table.size() <= id)
3492     {
3493       Vector<NdbDictionary::Index*> tmp;
3494       m_index_per_table.fill(id + 1, tmp);
3495     }
3496     Vector<NdbDictionary::Index*> & list = m_index_per_table[id];
3497     list.push_back(idx);
3498   }
3499   return true;
3500 }
3501 
3502 bool
endOfTablesFK()3503 BackupRestore::endOfTablesFK()
3504 {
3505   if (!m_restore_meta && !m_rebuild_indexes && !m_disable_indexes)
3506     return true;
3507 
3508   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
3509   restoreLogger.log_info("Create foreign keys");
3510   for (unsigned i = 0; i < m_fks.size(); i++)
3511   {
3512     const NdbDictionary::ForeignKey& fkinfo = *m_fks[i];
3513 
3514     // full name is e.g. 10/14/fk1 where 10,14 are old table ids
3515     const char* fkname = 0;
3516     Vector<BaseString> splitname;
3517     BaseString tmpname(fkinfo.getName());
3518     int n = tmpname.split(splitname, "/");
3519     // may get these from ndbapi-created FKs prior to bug#18824753
3520     if (n == 1)
3521       fkname = splitname[0].c_str();
3522     else if (n == 3)
3523       fkname = splitname[2].c_str();
3524     else
3525     {
3526       restoreLogger.log_error("Invalid foreign key name %s",tmpname.c_str());
3527       return false;
3528     }
3529 
3530     // retrieve fk parent and child
3531     const NdbDictionary::Table* pTab = 0;
3532     const NdbDictionary::Index* pInd = 0;
3533     const NdbDictionary::Table* cTab = 0;
3534     const NdbDictionary::Index* cInd = 0;
3535     // parent and child info - db.table.index
3536     char pInfo[512] = "?";
3537     char cInfo[512] = "?";
3538     {
3539       BaseString db_name, dummy2, table_name;
3540       if (!dissect_table_name(fkinfo.getParentTable(),
3541                               db_name, dummy2, table_name))
3542         return false;
3543       m_ndb->setDatabaseName(db_name.c_str());
3544       pTab = dict->getTable(table_name.c_str());
3545       if (pTab == 0)
3546       {
3547         restoreLogger.log_error("Foreign key %s"
3548             " parent table %s.%s not found: %u: %s",
3549             fkname, db_name.c_str(), table_name.c_str(), dict->getNdbError().code, dict->getNdbError().message);
3550         return false;
3551       }
3552       if (fkinfo.getParentIndex() != 0)
3553       {
3554         BaseString dummy1, dummy2, index_name;
3555         if (!dissect_index_name(fkinfo.getParentIndex(),
3556                                 dummy1, dummy2, index_name))
3557           return false;
3558         pInd = dict->getIndex(index_name.c_str(), table_name.c_str());
3559         if (pInd == 0)
3560         {
3561           restoreLogger.log_error("Foreign key %s"
3562               " parent index %s.%s not found: %u: %s",
3563               fkname, db_name.c_str(), table_name.c_str(), dict->getNdbError().code, dict->getNdbError().message);
3564           return false;
3565         }
3566       }
3567       BaseString::snprintf(pInfo, sizeof(pInfo), "%s.%s.%s",
3568           db_name.c_str(), table_name.c_str(),
3569           pInd ? pInd->getName() : "PK");
3570     }
3571     {
3572       BaseString db_name, dummy2, table_name;
3573       if (!dissect_table_name(fkinfo.getChildTable(),
3574                               db_name, dummy2, table_name))
3575         return false;
3576       m_ndb->setDatabaseName(db_name.c_str());
3577       cTab = dict->getTable(table_name.c_str());
3578       if (cTab == 0)
3579       {
3580         restoreLogger.log_error("Foreign key %s"
3581             " child table %s.%s not found: %u: %s",
3582             fkname, db_name.c_str(), table_name.c_str(), dict->getNdbError().code, dict->getNdbError().message);
3583         return false;
3584       }
3585       if (fkinfo.getChildIndex() != 0)
3586       {
3587         BaseString dummy1, dummy2, index_name;
3588         if (!dissect_index_name(fkinfo.getChildIndex(),
3589                                 dummy1, dummy2, index_name))
3590           return false;
3591         cInd = dict->getIndex(index_name.c_str(), table_name.c_str());
3592         if (cInd == 0)
3593         {
3594           restoreLogger.log_error("Foreign key %s"
3595               " child index %s.%s not found: %u: %s",
3596               fkname, db_name.c_str(), table_name.c_str(), dict->getNdbError().code, dict->getNdbError().message);
3597           return false;
3598         }
3599       }
3600       BaseString::snprintf(cInfo, sizeof(cInfo), "%s.%s.%s",
3601           db_name.c_str(), table_name.c_str(),
3602           cInd ? cInd->getName() : "PK");
3603     }
3604 
3605     // define the fk
3606     NdbDictionary::ForeignKey fk;
3607     fk.setName(fkname);
3608     static const int MaxAttrs = MAX_ATTRIBUTES_IN_INDEX;
3609     {
3610       const NdbDictionary::Column* cols[MaxAttrs+1]; // NULL terminated
3611       const int n = fkinfo.getParentColumnCount();
3612       int i = 0;
3613       while (i < n)
3614       {
3615         int j = fkinfo.getParentColumnNo(i);
3616         const NdbDictionary::Column* pCol = pTab->getColumn(j);
3617         if (pCol == 0)
3618         {
3619           restoreLogger.log_error("Foreign key %s fk column %u"
3620               " parent column %u out of range",
3621               fkname, i, j);
3622           return false;
3623         }
3624         cols[i++] = pCol;
3625       }
3626       cols[i] = 0;
3627       fk.setParent(*pTab, pInd, cols);
3628     }
3629     {
3630       const NdbDictionary::Column* cols[MaxAttrs+1]; // NULL terminated
3631       const int n = fkinfo.getChildColumnCount();
3632       int i = 0;
3633       while (i < n)
3634       {
3635         int j = fkinfo.getChildColumnNo(i);
3636         const NdbDictionary::Column* cCol = cTab->getColumn(j);
3637         if (cCol == 0)
3638         {
3639           restoreLogger.log_error("Foreign key %s fk column %u"
3640               " child column %u out of range",
3641               fkname, i, j);
3642           return false;
3643         }
3644         cols[i++] = cCol;
3645       }
3646       cols[i] = 0;
3647       fk.setChild(*cTab, cInd, cols);
3648     }
3649     fk.setOnUpdateAction(fkinfo.getOnUpdateAction());
3650     fk.setOnDeleteAction(fkinfo.getOnDeleteAction());
3651 
3652     // create
3653     if (dict->createForeignKey(fk) != 0)
3654     {
3655       restoreLogger.log_error("Failed to create foreign key %s"
3656           " parent %s child %s : %u: %s",
3657           fkname, pInfo, cInfo, dict->getNdbError().code, dict->getNdbError().message);
3658       return false;
3659     }
3660     restoreLogger.log_info("Successfully created foreign key %s"
3661           " parent %s child %s",
3662           fkname, pInfo, cInfo);
3663   }
3664   restoreLogger.log_info("Create foreign keys done");
3665   return true;
3666 }
3667 
extract_auto_val(const char * data,int size,NdbDictionary::Column::Type type)3668 static Uint64 extract_auto_val(const char *data,
3669                                int size,
3670                                NdbDictionary::Column::Type type)
3671 {
3672   union {
3673     Int8  i8;
3674     Int16 i16;
3675     Int32 i32;
3676   } val;
3677   Int64 v; /* Get sign-extension on assignment */
3678   switch(size){
3679   case 64:
3680     memcpy(&v,data,8);
3681     break;
3682   case 32:
3683     memcpy(&val.i32,data,4);
3684     v= val.i32;
3685     break;
3686   case 24:
3687     v= sint3korr((unsigned char*)data);
3688     break;
3689   case 16:
3690     memcpy(&val.i16,data,2);
3691     v= val.i16;
3692     break;
3693   case 8:
3694     memcpy(&val.i8,data,1);
3695     v= val.i8;
3696     break;
3697   default:
3698     return 0;
3699   };
3700 
3701   /* Don't return negative signed values */
3702   if (unlikely(v & 0x80000000))
3703   {
3704     if (type == NdbDictionary::Column::Bigint ||
3705         type == NdbDictionary::Column::Int ||
3706         type == NdbDictionary::Column::Mediumint ||
3707         type == NdbDictionary::Column::Smallint ||
3708         type == NdbDictionary::Column::Tinyint)
3709     {
3710       /* Negative signed value */
3711       v = 0;
3712     }
3713   }
3714 
3715   return (Uint64) v;
3716 }
3717 
3718 void
update_next_auto_val(Uint32 orig_table_id,Uint64 next_val)3719 BackupRestore::update_next_auto_val(Uint32 orig_table_id,
3720                                     Uint64 next_val)
3721 {
3722   if (orig_table_id < m_auto_values.size())
3723   {
3724     if (next_val > m_auto_values[orig_table_id])
3725     {
3726       m_auto_values[orig_table_id] = next_val;
3727     }
3728   }
3729 }
3730 
tuple(const TupleS & tup,Uint32 fragmentId)3731 void BackupRestore::tuple(const TupleS & tup, Uint32 fragmentId)
3732 {
3733   const TableS * tab = tup.getTable();
3734 
3735   if (!m_restore)
3736     return;
3737 
3738   while (m_free_callback == 0)
3739   {
3740     assert(m_transactions == m_parallelism);
3741     // send-poll all transactions
3742     // close transaction is done in callback
3743     m_ndb->sendPollNdb(3000, 1);
3744   }
3745 
3746   restore_callback_t * cb = m_free_callback;
3747 
3748   if (cb == 0)
3749     assert(false);
3750 
3751   cb->retries = 0;
3752   cb->fragId = fragmentId;
3753   cb->tup = tup; // must do copy!
3754 
3755   if (tab->isSYSTAB_0())
3756   {
3757     tuple_SYSTAB_0(cb, *tab);
3758     return;
3759   }
3760 
3761   m_free_callback = cb->next;
3762 
3763   tuple_a(cb);
3764 }
3765 
tuple_a(restore_callback_t * cb)3766 void BackupRestore::tuple_a(restore_callback_t *cb)
3767 {
3768   Uint32 partition_id = cb->fragId;
3769   Uint32 n_bytes;
3770   while (cb->retries < 10)
3771   {
3772     /**
3773      * start transactions
3774      */
3775     cb->connection = m_ndb->startTransaction();
3776     if (cb->connection == NULL)
3777     {
3778       if (errorHandler(cb))
3779       {
3780 	m_ndb->sendPollNdb(3000, 1);
3781 	continue;
3782       }
3783       restoreLogger.log_error("Cannot start transaction");
3784       exitHandler();
3785     } // if
3786 
3787     const TupleS &tup = cb->tup;
3788     const NdbDictionary::Table * table = get_table(*tup.getTable());
3789 
3790     NdbOperation * op = cb->connection->getNdbOperation(table);
3791 
3792     if (op == NULL)
3793     {
3794       if (errorHandler(cb))
3795 	continue;
3796       restoreLogger.log_error("Cannot get operation: %u: %s", cb->connection->getNdbError().code, cb->connection->getNdbError().message);
3797       exitHandler();
3798     } // if
3799 
3800     if (op->writeTuple() == -1)
3801     {
3802       if (errorHandler(cb))
3803 	continue;
3804       restoreLogger.log_error("Error defining op: %u: %s", cb->connection->getNdbError().code, cb->connection->getNdbError().message);
3805       exitHandler();
3806     } // if
3807 
3808     // XXX until NdbRecord is used
3809     op->set_disable_fk();
3810 
3811     n_bytes= 0;
3812 
3813     if (table->getFragmentType() == NdbDictionary::Object::UserDefined)
3814     {
3815       if (table->getDefaultNoPartitionsFlag())
3816       {
3817         /*
3818           This can only happen for HASH partitioning with
3819           user defined hash function where user hasn't
3820           specified the number of partitions and we
3821           have to calculate it. We use the hash value
3822           stored in the record to calculate the partition
3823           to use.
3824         */
3825         int i = tup.getNoOfAttributes() - 1;
3826 	const AttributeData  *attr_data = tup.getData(i);
3827         Uint32 hash_value =  *attr_data->u_int32_value;
3828         op->setPartitionId(get_part_id(table, hash_value));
3829       }
3830       else
3831       {
3832         /*
3833           Either RANGE or LIST (with or without subparts)
3834           OR HASH partitioning with user defined hash
3835           function but with fixed set of partitions.
3836         */
3837         op->setPartitionId(partition_id);
3838       }
3839     }
3840     int ret = 0;
3841     for (int j = 0; j < 2; j++)
3842     {
3843       for (int i = 0; i < tup.getNoOfAttributes(); i++)
3844       {
3845 	AttributeDesc * attr_desc = tup.getDesc(i);
3846 	const AttributeData * attr_data = tup.getData(i);
3847 	int size = attr_desc->size;
3848 	int arraySize = attr_desc->arraySize;
3849 	char * dataPtr = attr_data->string_value;
3850 	Uint32 length = 0;
3851 
3852         if (attr_desc->m_exclude)
3853           continue;
3854 
3855         if (!attr_data->null)
3856         {
3857           const unsigned char * src = (const unsigned char *)dataPtr;
3858           switch(attr_desc->m_column->getType()){
3859           case NdbDictionary::Column::Varchar:
3860           case NdbDictionary::Column::Varbinary:
3861             length = src[0] + 1;
3862             break;
3863           case NdbDictionary::Column::Longvarchar:
3864           case NdbDictionary::Column::Longvarbinary:
3865             length = src[0] + (src[1] << 8) + 2;
3866             break;
3867           default:
3868             length = attr_data->size;
3869             break;
3870           }
3871         }
3872 	if (j == 0 && tup.getTable()->have_auto_inc(i))
3873         {
3874           Uint64 usedAutoVal = extract_auto_val(dataPtr,
3875                                                 size * arraySize,
3876                                                 attr_desc->m_column->getType());
3877           Uint32 orig_table_id = tup.getTable()->m_dictTable->getTableId();
3878           update_next_auto_val(orig_table_id, usedAutoVal + 1);
3879         }
3880 
3881         /* Use column's DB pk status to decide whether it is a key or data */
3882         const bool col_pk_in_kernel =
3883           table->getColumn(attr_desc->attrId)->getPrimaryKey();
3884 
3885         if (attr_desc->convertFunc)
3886         {
3887           if ((col_pk_in_kernel && j == 0) ||
3888               (j == 1 && !attr_data->null))
3889           {
3890             bool truncated = true; // assume data truncation until overridden
3891             dataPtr = (char*)attr_desc->convertFunc(dataPtr,
3892                                                     attr_desc->parameter,
3893                                                     truncated);
3894             if (!dataPtr)
3895             {
3896               const char* tabname = tup.getTable()->m_dictTable->getName();
3897               restoreLogger.log_error("Error: Convert data failed when restoring tuples!"
3898                  " Data part, table %s", tabname);
3899               exitHandler();
3900             }
3901             if (truncated)
3902             {
3903               // wl5421: option to report data truncation on tuple of desired
3904               //restoreLogger.log_error("======  data truncation detected for column: "
3905               //    << attr_desc->m_column->getName());
3906               attr_desc->truncation_detected = true;
3907             }
3908           }
3909         }
3910 
3911 	if (col_pk_in_kernel)
3912 	{
3913 	  if (j == 1) continue;
3914 	  ret = op->equal(attr_desc->attrId, dataPtr, length);
3915 	}
3916 	else
3917 	{
3918 	  if (j == 0) continue;
3919 	  if (attr_data->null)
3920 	    ret = op->setValue(attr_desc->attrId, NULL, 0);
3921 	  else
3922 	    ret = op->setValue(attr_desc->attrId, dataPtr, length);
3923 	}
3924 	if (ret < 0) {
3925 	  ndbout_c("Column: %d type %d %d %d %d",i,
3926 		   attr_desc->m_column->getType(),
3927 		   size, arraySize, length);
3928 	  break;
3929 	}
3930         n_bytes+= length;
3931       }
3932       if (ret < 0)
3933 	break;
3934     }
3935     if (ret < 0)
3936     {
3937       if (errorHandler(cb))
3938 	continue;
3939       restoreLogger.log_error("Error defining op: %u: %s", cb->connection->getNdbError().code, cb->connection->getNdbError().message);
3940       exitHandler();
3941     }
3942 
3943     if (opt_no_binlog)
3944     {
3945       op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
3946     }
3947 
3948     // Prepare transaction (the transaction is NOT yet sent to NDB)
3949     cb->n_bytes= n_bytes;
3950     cb->connection->executeAsynchPrepare(NdbTransaction::Commit,
3951 					 &callback, cb);
3952     m_transactions++;
3953     return;
3954   }
3955   restoreLogger.log_error("Retried transaction %u times.\nLast error %u %s"
3956       "...Unable to recover from errors. Exiting...",
3957       cb->retries, m_ndb->getNdbError(cb->error_code).code, m_ndb->getNdbError(cb->error_code).message);
3958   exitHandler();
3959 }
3960 
tuple_SYSTAB_0(restore_callback_t * cb,const TableS & tab)3961 void BackupRestore::tuple_SYSTAB_0(restore_callback_t *cb,
3962                                    const TableS & tab)
3963 {
3964   const TupleS & tup = cb->tup;
3965   Uint32 syskey;
3966   Uint64 nextid;
3967 
3968   if (tab.get_auto_data(tup, &syskey, &nextid))
3969   {
3970     /*
3971       We found a valid auto_increment value in SYSTAB_0
3972       where syskey is a table_id and nextid is next auto_increment
3973       value.
3974       Update next auto val metadata
3975      */
3976     update_next_auto_val(syskey, nextid);
3977   }
3978 }
3979 
isMissingTable(const TableS & table)3980 bool BackupRestore::isMissingTable(const TableS& table)
3981 {
3982   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
3983   const char* tablename = table.getTableName();
3984   BaseString db_name, schema_name, table_name;
3985   Vector<BaseString> split;
3986   BaseString tmp(tablename);
3987   if (tmp.split(split, "/") != 3) {
3988     return false;
3989   }
3990   db_name = split[0];
3991   schema_name = split[1];
3992   table_name = split[2];
3993   check_rewrite_database(db_name);
3994   m_ndb->setDatabaseName(db_name.c_str());
3995   m_ndb->setSchemaName(schema_name.c_str());
3996 
3997   const NdbDictionary::Table* tab = dict->getTable(table_name.c_str());
3998 
3999   /* 723 == NoSuchTableExisted */
4000   return ((tab == NULL) && (dict->getNdbError().code == 723));
4001 }
4002 
cback(int result,restore_callback_t * cb)4003 void BackupRestore::cback(int result, restore_callback_t *cb)
4004 {
4005   m_transactions--;
4006 
4007   if (result < 0)
4008   {
4009     /**
4010      * Error. temporary or permanent?
4011      */
4012     if (errorHandler(cb))
4013       tuple_a(cb); // retry
4014     else
4015     {
4016       restoreLogger.log_error("Restore: Failed to restore data due to a unrecoverable error. Exiting...");
4017       exitHandler();
4018     }
4019   }
4020   else
4021   {
4022     /**
4023      * OK! close transaction
4024      */
4025     m_ndb->closeTransaction(cb->connection);
4026     cb->connection= 0;
4027     cb->next= m_free_callback;
4028     m_free_callback= cb;
4029     m_dataBytes+= cb->n_bytes;
4030     m_dataCount++;
4031   }
4032 }
4033 
4034 /**
4035  * returns true if is recoverable,
4036  * Error handling based on hugo
4037  *  false if it is an  error that generates an abort.
4038  */
errorHandler(restore_callback_t * cb)4039 bool BackupRestore::errorHandler(restore_callback_t *cb)
4040 {
4041   NdbError error;
4042   if(cb->connection)
4043   {
4044     error= cb->connection->getNdbError();
4045     m_ndb->closeTransaction(cb->connection);
4046     cb->connection= 0;
4047   }
4048   else
4049   {
4050     error= m_ndb->getNdbError();
4051   }
4052 
4053   Uint32 sleepTime = 100 + cb->retries * 300;
4054 
4055   cb->retries++;
4056   cb->error_code = error.code;
4057 
4058   switch(error.status)
4059   {
4060   case NdbError::Success:
4061     restoreLogger.log_error("Success error: %u %s", error.code, error.message);
4062     return false;
4063     // ERROR!
4064 
4065   case NdbError::TemporaryError:
4066     restoreLogger.log_error("Temporary error: %u %s", error.code, error.message);
4067     m_temp_error = true;
4068     NdbSleep_MilliSleep(sleepTime);
4069     return true;
4070     // RETRY
4071 
4072   case NdbError::UnknownResult:
4073     restoreLogger.log_error("Unknown: %u %s", error.code, error.message);
4074     return false;
4075     // ERROR!
4076 
4077   default:
4078   case NdbError::PermanentError:
4079     //ERROR
4080     restoreLogger.log_error("Permanent: %u %s", error.code, error.message);
4081     return false;
4082   }
4083   restoreLogger.log_error("No error status");
4084   return false;
4085 }
4086 
exitHandler()4087 void BackupRestore::exitHandler()
4088 {
4089   release();
4090   _exit(NdbToolsProgramExitCode::FAILED);
4091 }
4092 
4093 
4094 void
tuple_free()4095 BackupRestore::tuple_free()
4096 {
4097   if (!m_restore)
4098     return;
4099 
4100   // Poll all transactions
4101   while (m_transactions)
4102   {
4103     m_ndb->sendPollNdb(3000);
4104   }
4105 }
4106 
4107 void
endOfTuples()4108 BackupRestore::endOfTuples()
4109 {
4110   tuple_free();
4111 }
4112 
4113 bool
tryCreatePkMappingIndex(TableS * table,const char * short_table_name)4114 BackupRestore::tryCreatePkMappingIndex(TableS* table,
4115                                        const char* short_table_name)
4116 {
4117   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
4118   const NdbDictionary::Table* ndbtab = dict->getTable(short_table_name);
4119 
4120   if (ndbtab == NULL)
4121   {
4122     restoreLogger.log_error("Failed to find table %s in DB.  Error : %u %s.",
4123                             table->getTableName(),
4124                             dict->getNdbError().code,
4125                             dict->getNdbError().message);
4126     return false;
4127   }
4128   NdbDictionary::Index idx(PK_MAPPING_IDX_NAME);
4129 
4130   if (idx.setTable(short_table_name) != 0)
4131   {
4132     restoreLogger.log_error("Error in idx::setTable.");
4133     return false;
4134   }
4135 
4136   idx.setType(NdbDictionary::Index::UniqueHashIndex);
4137   idx.setLogging(false); /* Save on redo + lcp */
4138 
4139   Uint32 oldPkColsAvailable = 0;
4140 
4141   for (int i=0; i<table->getNoOfAttributes(); i++)
4142   {
4143     const AttributeDesc* attrDesc = table->getAttributeDesc(i);
4144     if (attrDesc->m_column->getPrimaryKey())
4145     {
4146       /* This was a primary key before.
4147        * If it's still in the table then add as
4148        * an index key
4149        */
4150       const NdbDictionary::Column* col =
4151         ndbtab->getColumn(attrDesc->m_column->getName());
4152 
4153       if (col != NULL)
4154       {
4155         restoreLogger.log_info("Adding column (%s) DB(%s) to "
4156                                "PK mapping index for table %s.",
4157                                attrDesc->m_column->getName(),
4158                                col->getName(),
4159                                table->getTableName());
4160 
4161         if (idx.addColumn(*col) != 0)
4162         {
4163           restoreLogger.log_error("Problem adding column %s to index",
4164                                   col->getName());
4165           return false;
4166         }
4167 
4168         oldPkColsAvailable++;
4169       }
4170       else
4171       {
4172         restoreLogger.log_info("Warning : Table %s primary key column %s "
4173                                "no longer exists in DB.",
4174                                table->getTableName(),
4175                                attrDesc->m_column->getName());
4176       }
4177     }
4178   }
4179 
4180   if (oldPkColsAvailable == 0)
4181   {
4182     restoreLogger.log_error("Table %s has update or delete backup log "
4183                             "entries and no columns from the old "
4184                             "primary key are available. "
4185                             "Restore using backup schema then ALTER to "
4186                             "new schema.",
4187                             table->getTableName());
4188     return false;
4189   }
4190 
4191   if (dict->createIndex(idx) == 0)
4192   {
4193     restoreLogger.log_info("Built PK mapping index on table %s.",
4194                            table->getTableName());
4195 
4196     restoreLogger.log_info("Remember to run ndb_restore --rebuild-indexes "
4197                            "after all ndb_restore --restore-data steps as this "
4198                            "will also drop this PK mapping index.");
4199     return true;
4200   }
4201 
4202 
4203   /* Potential errors :
4204      - Index now exists - someone else created it
4205      - System busy with other operation
4206      - Temp error
4207      - Permanent error
4208   */
4209   NdbError createError = dict->getNdbError();
4210 
4211   if (createError.code == 721)
4212   {
4213     /* Index now exists - we will use it */
4214     return true;
4215   } else if (createError.code == 701)
4216   {
4217     /**
4218      * System busy with other (schema) operation
4219      *
4220      * This could be e.g. another ndb_restore instance building
4221      * the index, or something else
4222      */
4223     restoreLogger.log_info("Build PK mapping index : System busy with "
4224                            "other schema operation, retrying.");
4225     NdbSleep_MilliSleep(1000);
4226     return true;
4227   }
4228   else if (createError.status == NdbError::TemporaryError)
4229   {
4230     NdbSleep_MilliSleep(500);
4231     return true;
4232   }
4233   else
4234   {
4235     restoreLogger.log_error("Failed to create pk mapping index on "
4236                             "table %s %u %s.",
4237                             table->getTableName(),
4238                             createError.code,
4239                             createError.message);
4240     return false;
4241   }
4242 }
4243 
4244 bool
getPkMappingIndex(TableS * table)4245 BackupRestore::getPkMappingIndex(TableS* table)
4246 {
4247   /**
4248    * A table can have more pk columns in the DB than
4249    * in the Backup.
4250    * For UPDATE and DELETE log events, where the full
4251    * DB pk is not available, we need some means to
4252    * identify which row to modify.
4253    * This is done using a PkMappingIndex, on the
4254    * available primary keys from the Backup schema.
4255    *
4256    * Optimisations :
4257    *  - A mapping index is only built if needed
4258    *    (e.g. pk extension + UPDATE/DELETE log
4259    *    event must be applied)
4260    *  - A mapping index can be shared between
4261    *    multiple ndb_restore instances
4262    *    - It is created when the first
4263    *      ndb_restore instance to need one
4264    *      creates one
4265    *    - It is dropped as part of the
4266    *      --rebuild-indexes step
4267    */
4268   const NdbDictionary::Index* dbIdx = NULL;
4269   const Uint32 Max_Retries = 20;
4270   Uint32 retry_count = 0;
4271 
4272   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
4273 
4274   /* Set database, schema */
4275   BaseString db_name, schema_name, table_name;
4276   if (!dissect_table_name(table->getTableName(),
4277                           db_name, schema_name, table_name))
4278   {
4279     restoreLogger.log_error("Failed to dissect table name : %s",
4280                             table->getTableName());
4281     return false;
4282   }
4283 
4284   check_rewrite_database(db_name);
4285   m_ndb->setDatabaseName(db_name.c_str());
4286   m_ndb->setSchemaName(schema_name.c_str());
4287   const char* short_table_name = table_name.c_str();
4288 
4289   do
4290   {
4291     dbIdx = dict->getIndex(PK_MAPPING_IDX_NAME,
4292                            short_table_name);
4293 
4294     if (dbIdx)
4295     {
4296       /* Found index, use it */
4297       table->m_pk_index = dbIdx;
4298       return true;
4299     }
4300     else
4301     {
4302       NdbError getErr = dict->getNdbError();
4303 
4304       if (getErr.code == 701)
4305       {
4306         /**
4307          * System busy with other (schema) operation
4308          *
4309          * This could be e.g. another ndb_restore instance building
4310          * the index, or some other DDL.
4311          */
4312         restoreLogger.log_info("Build PK mapping index : System busy with "
4313                                "other schema operation, retrying.");
4314         NdbSleep_MilliSleep(1000);
4315         continue;
4316       }
4317 
4318       if (getErr.code == 4243)
4319       {
4320         /**
4321          * Index not found
4322          * Let's try to create it
4323          */
4324         if (!tryCreatePkMappingIndex(table,
4325                                      short_table_name))
4326         {
4327           /* Hard failure */
4328           return false;
4329         }
4330         retry_count = 0;
4331 
4332         /* Retry lookup */
4333         continue;
4334       }
4335       else if (getErr.status == NdbError::TemporaryError)
4336       {
4337         NdbSleep_MilliSleep(500);
4338 
4339         /* Retry lookup */
4340         continue;
4341       }
4342       else
4343       {
4344         restoreLogger.log_error("Failure looking up PK mapping index on "
4345                                 "table %s %u %s.",
4346                                 table->getTableName(),
4347                                 getErr.code,
4348                                 getErr.message);
4349         return false;
4350       }
4351     }
4352   } while (retry_count++ < Max_Retries);
4353 
4354   restoreLogger.log_error("Failure to lookup / create PK mapping "
4355                           "index after %u attempts.",
4356                           Max_Retries);
4357   return false;
4358 }
4359 
4360 bool
dropPkMappingIndex(const TableS * table)4361 BackupRestore::dropPkMappingIndex(const TableS* table)
4362 {
4363   const char *tablename = table->getTableName();
4364 
4365   BaseString db_name, schema_name, table_name;
4366   if (!dissect_table_name(tablename, db_name, schema_name, table_name)) {
4367     return false;
4368   }
4369   check_rewrite_database(db_name);
4370 
4371   m_ndb->setDatabaseName(db_name.c_str());
4372   m_ndb->setSchemaName(schema_name.c_str());
4373   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
4374 
4375   /* Drop any support indexes */
4376   bool dropped = false;
4377   int attempts = 11;
4378   while (!dropped && attempts--)
4379   {
4380     dict->dropIndex(PK_MAPPING_IDX_NAME,
4381                     table_name.c_str());
4382     const NdbError dropErr = dict->getNdbError();
4383     switch (dropErr.status)
4384     {
4385     case NdbError::Success:
4386       restoreLogger.log_info("Dropped PK mapping index on %s.",
4387                              tablename);
4388       dropped = true;
4389       break;
4390     case NdbError::TemporaryError:
4391       restoreLogger.log_error("Temporary error: %u %s.",
4392                               dropErr.code,
4393                               dropErr.message);
4394       NdbSleep_MilliSleep(500);
4395       continue;
4396     case NdbError::PermanentError:
4397       if (dropErr.code == 723 ||
4398           dropErr.code == 4243)
4399       {
4400         // No such table exists
4401         dropped = true;
4402         break;
4403       }
4404       /* Fall through */
4405     default:
4406       restoreLogger.log_error("Error dropping mapping index on %s %u %s",
4407                               tablename,
4408                               dropErr.code,
4409                               dropErr.message);
4410       return false;
4411     }
4412   }
4413 
4414   return dropped;
4415 }
4416 
4417 #ifdef NOT_USED
use_part_id(const NdbDictionary::Table * table)4418 static bool use_part_id(const NdbDictionary::Table *table)
4419 {
4420   if (table->getDefaultNoPartitionsFlag() &&
4421       (table->getFragmentType() == NdbDictionary::Object::UserDefined))
4422     return false;
4423   else
4424     return true;
4425 }
4426 #endif
4427 
get_part_id(const NdbDictionary::Table * table,Uint32 hash_value)4428 static Uint32 get_part_id(const NdbDictionary::Table *table,
4429                           Uint32 hash_value)
4430 {
4431   Uint32 no_frags = table->getFragmentCount();
4432 
4433   if (table->getLinearFlag())
4434   {
4435     Uint32 part_id;
4436     Uint32 mask = 1;
4437     while (no_frags > mask) mask <<= 1;
4438     mask--;
4439     part_id = hash_value & mask;
4440     if (part_id >= no_frags)
4441       part_id = hash_value & (mask >> 1);
4442     return part_id;
4443   }
4444   else
4445     return (hash_value % no_frags);
4446 }
4447 
4448 void
logEntry(const LogEntry & tup)4449 BackupRestore::logEntry(const LogEntry & tup)
4450 {
4451   if (!m_restore)
4452     return;
4453 
4454   bool use_mapping_idx = false;
4455 
4456   if (unlikely((tup.m_table->m_pk_extended) &&
4457                (tup.m_type != LogEntry::LE_INSERT) &&
4458                (!tup.m_table->m_staging)))
4459   {
4460     /**
4461      * We will need to find a row to operate on, using
4462      * a secondary unique index on the remains of the
4463      * old PK
4464      */
4465     if (unlikely(tup.m_table->m_pk_index == NULL))
4466     {
4467       /* Need to get/build an index for this purpose */
4468       if (!getPkMappingIndex(tup.m_table))
4469       {
4470         restoreLogger.log_error("Build of PK mapping index failed "
4471                                 "on table %s.",
4472                                 tup.m_table->getTableName());
4473         exitHandler();
4474       }
4475       assert(tup.m_table->m_pk_index != NULL);
4476 
4477       restoreLogger.log_info("Using PK mapping index on table %s.",
4478                              tup.m_table->getTableName());
4479     }
4480 
4481     use_mapping_idx = true;
4482   }
4483 
4484   if (tup.m_table->isSYSTAB_0())
4485   {
4486     /* We don't restore from SYSTAB_0 log entries */
4487     return;
4488   }
4489 
4490   Uint32 retries = 0;
4491   NdbError errobj;
4492 retry:
4493   Uint32 mapping_idx_key_count = 0;
4494 
4495   if (retries == 11)
4496   {
4497     restoreLogger.log_error("execute failed: %u", errobj.code);
4498     exitHandler();
4499   }
4500   else if (retries > 0)
4501   {
4502     NdbSleep_MilliSleep(100 + (retries - 1) * 100);
4503   }
4504 
4505   retries++;
4506 
4507   NdbTransaction * trans = m_ndb->startTransaction();
4508   if (trans == NULL)
4509   {
4510     errobj = m_ndb->getNdbError();
4511     if (errobj.status == NdbError::TemporaryError)
4512     {
4513       goto retry;
4514     }
4515     restoreLogger.log_error("Cannot start transaction: %u: %s", errobj.code, errobj.message);
4516     exitHandler();
4517   } // if
4518 
4519   TransGuard g(trans);
4520   const NdbDictionary::Table * table = get_table(*tup.m_table);
4521   NdbOperation * op = NULL;
4522 
4523   if (unlikely(use_mapping_idx))
4524   {
4525     /* UI access */
4526     op = trans->getNdbIndexOperation(tup.m_table->m_pk_index,
4527                                      table);
4528   }
4529   else
4530   {
4531     /* Normal pk access */
4532     op = trans->getNdbOperation(table);
4533   }
4534   if (op == NULL)
4535   {
4536     restoreLogger.log_error("Cannot get operation: %u: %s", trans->getNdbError().code, trans->getNdbError().message);
4537     exitHandler();
4538   } // if
4539 
4540   int check = 0;
4541   switch(tup.m_type)
4542   {
4543   case LogEntry::LE_INSERT:
4544     check = op->insertTuple();
4545     break;
4546   case LogEntry::LE_UPDATE:
4547     check = op->updateTuple();
4548     break;
4549   case LogEntry::LE_DELETE:
4550     check = op->deleteTuple();
4551     break;
4552   default:
4553     restoreLogger.log_error("Log entry has wrong operation type."
4554 	  " Exiting...");
4555     exitHandler();
4556   }
4557 
4558   if (check != 0)
4559   {
4560     restoreLogger.log_error("Error defining op: %u: %s",trans->getNdbError().code, trans->getNdbError().message);
4561     exitHandler();
4562   } // if
4563 
4564   op->set_disable_fk();
4565 
4566   if (table->getFragmentType() == NdbDictionary::Object::UserDefined)
4567   {
4568     if (table->getDefaultNoPartitionsFlag())
4569     {
4570       const AttributeS * attr = tup[tup.size()-1];
4571       Uint32 hash_value = *(Uint32*)attr->Data.string_value;
4572       op->setPartitionId(get_part_id(table, hash_value));
4573     }
4574     else
4575       op->setPartitionId(tup.m_frag_id);
4576   }
4577 
4578   Bitmask<4096> keys;
4579   Uint32 n_bytes= 0;
4580   for (Uint32 pass= 0; pass < 2; pass++)  // Keys then Values
4581   {
4582     for (Uint32 i= 0; i < tup.size(); i++)
4583     {
4584       const AttributeS * attr = tup[i];
4585       int size = attr->Desc->size;
4586       int arraySize = attr->Desc->arraySize;
4587       const char * dataPtr = attr->Data.string_value;
4588       const bool col_pk_in_backup = attr->Desc->m_column->getPrimaryKey();
4589 
4590       if (attr->Desc->m_exclude)
4591         continue;
4592 
4593       const bool col_pk_in_kernel =
4594         table->getColumn(attr->Desc->attrId)->getPrimaryKey();
4595       bool col_is_key = col_pk_in_kernel;
4596       Uint32 keyAttrId = attr->Desc->attrId;
4597 
4598       if (unlikely(use_mapping_idx))
4599       {
4600         if (col_pk_in_backup)
4601         {
4602           /* Using a secondary UI to map non-excluded
4603            * backup keys to kernel rows.
4604            * Backup pks are UI keys, using key
4605            * AttrIds in declaration order.
4606            * Therefore we set the attrId here.
4607            */
4608           col_is_key = true;
4609           keyAttrId = mapping_idx_key_count++;
4610         }
4611         else
4612         {
4613           col_is_key = false;
4614         }
4615       }
4616 
4617       if ((!col_is_key && pass == 0) ||  // Keys
4618           (col_is_key && pass == 1))     // Values
4619       {
4620         continue;
4621       }
4622 
4623       /* Check for unsupported PK update */
4624       if (unlikely(!col_pk_in_backup && col_pk_in_kernel))
4625      {
4626         if (unlikely(tup.m_type == LogEntry::LE_UPDATE))
4627         {
4628           if ((m_tableChangesMask & TCM_IGNORE_EXTENDED_PK_UPDATES) != 0)
4629           {
4630             /* Ignore it as requested */
4631             m_pk_update_warning_count++;
4632             continue;
4633           }
4634           else
4635           {
4636             /**
4637              * Problem as a non-pk column has become part of
4638              * the table's primary key, but is updated in
4639              * the backup - which would require DELETE + INSERT
4640              * to represent
4641              */
4642             restoreLogger.log_error("Error : Primary key remapping failed "
4643                                     "during log apply for table %s which "
4644                                     "UPDATEs column(s) now included in the "
4645                                     "table's primary key.  "
4646                                     "Perhaps the --ignore-extended-pk-updates "
4647                                     "switch is missing?",
4648                                     tup.m_table->m_dictTable->getName());
4649             exitHandler();
4650           }
4651         }
4652      }
4653       if (tup.m_table->have_auto_inc(attr->Desc->attrId))
4654       {
4655         Uint64 usedAutoVal = extract_auto_val(dataPtr,
4656                                               size * arraySize,
4657                                               attr->Desc->m_column->getType());
4658         Uint32 orig_table_id = tup.m_table->m_dictTable->getTableId();
4659         update_next_auto_val(orig_table_id, usedAutoVal + 1);
4660       }
4661 
4662       const Uint32 length = (size / 8) * arraySize;
4663       n_bytes+= length;
4664 
4665       if (attr->Desc->convertFunc &&
4666           dataPtr != NULL) // NULL will not be converted
4667       {
4668         bool truncated = true; // assume data truncation until overridden
4669         dataPtr = (char*)attr->Desc->convertFunc(dataPtr,
4670                                                  attr->Desc->parameter,
4671                                                  truncated);
4672         if (!dataPtr)
4673         {
4674           const char* tabname = tup.m_table->m_dictTable->getName();
4675           restoreLogger.log_error("Error: Convert data failed when restoring tuples! "
4676                                   "Log part, table %s, entry type %u.",
4677                                   tabname, tup.m_type);
4678           exitHandler();
4679         }
4680         if (truncated)
4681         {
4682           // wl5421: option to report data truncation on tuple of desired
4683           //err << "******  data truncation detected for column: "
4684           //    << attr->Desc->m_column->getName() << endl;
4685           attr->Desc->truncation_detected = true;
4686         }
4687       }
4688 
4689       if (col_is_key)
4690       {
4691         assert(pass == 0);
4692 
4693         if(!keys.get(keyAttrId))
4694         {
4695           keys.set(keyAttrId);
4696           check= op->equal(keyAttrId, dataPtr, length);
4697         }
4698       }
4699       else
4700       {
4701         assert(pass == 1);
4702         if (tup.m_type != LogEntry::LE_DELETE)
4703         {
4704           check= op->setValue(attr->Desc->attrId, dataPtr, length);
4705         }
4706       }
4707 
4708       if (check != 0)
4709       {
4710         restoreLogger.log_error("Error defining log op: %u %s.",
4711                                 trans->getNdbError().code,
4712                                 trans->getNdbError().message);
4713         exitHandler();
4714       } // if
4715     }
4716   }
4717 
4718   if (opt_no_binlog)
4719   {
4720     op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
4721   }
4722   const int ret = trans->execute(NdbTransaction::Commit);
4723 
4724 #ifndef DBUG_OFF
4725   /* Test retry path */
4726   if ((m_logCount % 100000) == 3)
4727   {
4728     if (retries < 3)
4729     {
4730       restoreLogger.log_info("Testing log retry path");
4731       goto retry;
4732     }
4733   }
4734 #endif
4735 
4736   if (ret != 0)
4737   {
4738     // Both insert update and delete can fail during log running
4739     // and it's ok
4740     bool ok= false;
4741     errobj= trans->getNdbError();
4742     if (errobj.status == NdbError::TemporaryError)
4743       goto retry;
4744 
4745     switch(tup.m_type)
4746     {
4747     case LogEntry::LE_INSERT:
4748       if(errobj.status == NdbError::PermanentError &&
4749 	 errobj.classification == NdbError::ConstraintViolation)
4750 	ok= true;
4751       break;
4752     case LogEntry::LE_UPDATE:
4753     case LogEntry::LE_DELETE:
4754       if(errobj.status == NdbError::PermanentError &&
4755 	 errobj.classification == NdbError::NoDataFound)
4756 	ok= true;
4757       break;
4758     }
4759     if (!ok)
4760     {
4761       restoreLogger.log_error("execute failed: %u: %s", errobj.code, errobj.message);
4762       exitHandler();
4763     }
4764   }
4765 
4766   m_logBytes+= n_bytes;
4767   m_logCount++;
4768 }
4769 
4770 void
endOfLogEntrys()4771 BackupRestore::endOfLogEntrys()
4772 {
4773   if (!m_restore)
4774     return;
4775 
4776   if (m_pk_update_warning_count > 0)
4777   {
4778     restoreLogger.log_info("Warning : --ignore-extended-pk-updates resulted in %llu "
4779                            "modifications to extended primary key columns being "
4780                            "ignored.",
4781                            m_pk_update_warning_count);
4782   }
4783 
4784   info.setLevel(254);
4785   restoreLogger.log_info("Restored %u tuples and "
4786       "%u log entries", m_dataCount, m_logCount);
4787 }
4788 
4789 /*
4790  *   callback : This is called when the transaction is polled
4791  *
4792  *   (This function must have three arguments:
4793  *   - The result of the transaction,
4794  *   - The NdbTransaction object, and
4795  *   - A pointer to an arbitrary object.)
4796  */
4797 
4798 static void
callback(int result,NdbTransaction * trans,void * aObject)4799 callback(int result, NdbTransaction* trans, void* aObject)
4800 {
4801   restore_callback_t *cb = (restore_callback_t *)aObject;
4802   (cb->restore)->cback(result, cb);
4803 }
4804 
4805 
4806 AttrCheckCompatFunc
get_attr_check_compatability(const NDBCOL::Type & old_type,const NDBCOL::Type & new_type)4807 BackupRestore::get_attr_check_compatability(const NDBCOL::Type &old_type,
4808                                             const NDBCOL::Type &new_type)
4809 {
4810   int i = 0;
4811   NDBCOL::Type first_item = m_allowed_promotion_attrs[0].old_type;
4812   NDBCOL::Type second_item = m_allowed_promotion_attrs[0].new_type;
4813 
4814   while (first_item != old_type || second_item != new_type)
4815   {
4816     if (first_item == NDBCOL::Undefined)
4817       break;
4818 
4819     i++;
4820     first_item = m_allowed_promotion_attrs[i].old_type;
4821     second_item = m_allowed_promotion_attrs[i].new_type;
4822   }
4823   if (first_item == old_type && second_item == new_type)
4824     return m_allowed_promotion_attrs[i].attr_check_compatability;
4825   return  NULL;
4826 }
4827 
4828 AttrConvertFunc
get_convert_func(const NDBCOL::Type & old_type,const NDBCOL::Type & new_type)4829 BackupRestore::get_convert_func(const NDBCOL::Type &old_type,
4830                                 const NDBCOL::Type &new_type)
4831 {
4832   int i = 0;
4833   NDBCOL::Type first_item = m_allowed_promotion_attrs[0].old_type;
4834   NDBCOL::Type second_item = m_allowed_promotion_attrs[0].new_type;
4835 
4836   while (first_item != old_type || second_item != new_type)
4837   {
4838     if (first_item == NDBCOL::Undefined)
4839       break;
4840     i++;
4841     first_item = m_allowed_promotion_attrs[i].old_type;
4842     second_item = m_allowed_promotion_attrs[i].new_type;
4843   }
4844   if (first_item == old_type && second_item == new_type)
4845     return m_allowed_promotion_attrs[i].attr_convert;
4846 
4847   return  NULL;
4848 
4849 }
4850 
4851 AttrConvType
check_compat_promotion(const NDBCOL & old_col,const NDBCOL & new_col)4852 BackupRestore::check_compat_promotion(const NDBCOL &old_col,
4853                                       const NDBCOL &new_col)
4854 {
4855   return ACT_PRESERVING;
4856 }
4857 
4858 AttrConvType
check_compat_lossy(const NDBCOL & old_col,const NDBCOL & new_col)4859 BackupRestore::check_compat_lossy(const NDBCOL &old_col,
4860                                   const NDBCOL &new_col)
4861 {
4862   return ACT_LOSSY;
4863 }
4864 
4865 AttrConvType
check_compat_sizes(const NDBCOL & old_col,const NDBCOL & new_col)4866 BackupRestore::check_compat_sizes(const NDBCOL &old_col,
4867                                   const NDBCOL &new_col)
4868 {
4869   // the size (width) of the element type
4870   Uint32 new_size = new_col.getSize();
4871   Uint32 old_size = old_col.getSize();
4872   // the fixed/max array length (1 for scalars)
4873   Uint32 new_length = new_col.getLength();
4874   Uint32 old_length = old_col.getLength();
4875 
4876   // identity conversions have been handled by column_compatible_check()
4877   assert(new_size != old_size
4878          || new_length != old_length
4879          || new_col.getArrayType() != old_col.getArrayType());
4880 
4881   // test for loss of element width or array length
4882   if (new_size < old_size || new_length < old_length) {
4883     return ACT_LOSSY;
4884   }
4885 
4886   // not tested: conversions varying in both, array length and element width
4887   if (new_size != old_size && new_length != old_length) {
4888     return ACT_UNSUPPORTED;
4889   }
4890 
4891   assert(new_size >= old_size && new_length >= old_length);
4892   return ACT_PRESERVING;
4893 }
4894 
4895 AttrConvType
check_compat_precision(const NDBCOL & old_col,const NDBCOL & new_col)4896 BackupRestore::check_compat_precision(const NDBCOL &old_col,
4897                                       const NDBCOL &new_col)
4898 {
4899   Uint32 new_prec = new_col.getPrecision();
4900   Uint32 old_prec = old_col.getPrecision();
4901 
4902   if (new_prec < old_prec)
4903     return ACT_LOSSY;
4904   return ACT_PRESERVING;
4905 }
4906 
4907 AttrConvType
check_compat_char_binary(const NDBCOL & old_col,const NDBCOL & new_col)4908 BackupRestore::check_compat_char_binary(const NDBCOL &old_col,
4909                                            const NDBCOL &new_col)
4910 {
4911   // as in check_compat_sizes
4912   assert(old_col.getSize() == 1 && new_col.getSize() == 1);
4913   Uint32 new_length = new_col.getLength();
4914   Uint32 old_length = old_col.getLength();
4915 
4916   if (new_length < old_length) {
4917     return ACT_LOSSY;
4918   }
4919   return ACT_PRESERVING;
4920 }
4921 
4922 AttrConvType
check_compat_char_to_text(const NDBCOL & old_col,const NDBCOL & new_col)4923 BackupRestore::check_compat_char_to_text(const NDBCOL &old_col,
4924                                          const NDBCOL &new_col)
4925 {
4926   if (new_col.getPrimaryKey()) {
4927     // staging will refuse this so detect early
4928     restoreLogger.log_info("convert of TEXT to primary key column not supported");
4929     return ACT_UNSUPPORTED;
4930   }
4931   return ACT_STAGING_PRESERVING;
4932 }
4933 
4934 AttrConvType
check_compat_text_to_char(const NDBCOL & old_col,const NDBCOL & new_col)4935 BackupRestore::check_compat_text_to_char(const NDBCOL &old_col,
4936                                          const NDBCOL &new_col)
4937 {
4938   if (old_col.getPrimaryKey()) {
4939     // staging will refuse this so detect early
4940     restoreLogger.log_info("convert of primary key column to TEXT not supported");
4941     return ACT_UNSUPPORTED;
4942   }
4943   return ACT_STAGING_LOSSY;
4944 }
4945 
4946 AttrConvType
check_compat_text_to_text(const NDBCOL & old_col,const NDBCOL & new_col)4947 BackupRestore::check_compat_text_to_text(const NDBCOL &old_col,
4948                                          const NDBCOL &new_col)
4949 {
4950   if(old_col.getCharset() != new_col.getCharset())
4951   {
4952     restoreLogger.log_info("convert to field with different charset not supported");
4953     return ACT_UNSUPPORTED;
4954   }
4955   if(old_col.getPartSize() > new_col.getPartSize())
4956   {
4957    // TEXT/MEDIUMTEXT/LONGTEXT to TINYTEXT conversion is potentially lossy at the
4958    // Ndb level because there is a hard limit on the TINYTEXT size.
4959    // TEXT/MEDIUMTEXT/LONGTEXT is not lossy at the Ndb level, but can be at the
4960    // MySQL level.
4961    // Both conversions require the lossy switch, but they are not lossy in the same way.
4962     return ACT_STAGING_LOSSY;
4963   }
4964   return ACT_STAGING_PRESERVING;
4965 }
4966 
4967 AttrConvType
check_compat_binary_to_blob(const NDBCOL & old_col,const NDBCOL & new_col)4968 BackupRestore::check_compat_binary_to_blob(const NDBCOL &old_col,
4969                                            const NDBCOL &new_col)
4970 {
4971   return ACT_STAGING_PRESERVING;
4972 }
4973 
4974 AttrConvType
check_compat_blob_to_binary(const NDBCOL & old_col,const NDBCOL & new_col)4975 BackupRestore::check_compat_blob_to_binary(const NDBCOL &old_col,
4976                                            const NDBCOL &new_col)
4977 {
4978   return ACT_STAGING_LOSSY;
4979 }
4980 
4981 AttrConvType
check_compat_blob_to_blob(const NDBCOL & old_col,const NDBCOL & new_col)4982 BackupRestore::check_compat_blob_to_blob(const NDBCOL &old_col,
4983                                          const NDBCOL &new_col)
4984 {
4985   if(old_col.getPartSize() > new_col.getPartSize())
4986   {
4987    // BLOB/MEDIUMBLOB/LONGBLOB to TINYBLOB conversion is potentially lossy at the
4988    // Ndb level because there is a hard limit on the TINYBLOB size.
4989    // BLOB/MEDIUMBLOB/LONGBLOB is not lossy at the Ndb level, but can be at the
4990    // MySQL level.
4991    // Both conversions require the lossy switch, but they are not lossy in the same way.
4992     return ACT_STAGING_LOSSY;
4993   }
4994   return ACT_STAGING_PRESERVING;
4995 }
4996 
4997 
4998 // ----------------------------------------------------------------------
4999 // explicit template instantiations
5000 // ----------------------------------------------------------------------
5001 
5002 template class Vector<NdbDictionary::Table*>;
5003 template class Vector<const NdbDictionary::Table*>;
5004 template class Vector<NdbDictionary::Tablespace*>;
5005 template class Vector<NdbDictionary::LogfileGroup*>;
5006 template class Vector<NdbDictionary::HashMap*>;
5007 template class Vector<NdbDictionary::Index*>;
5008 template class Vector<Vector<NdbDictionary::Index*> >;
5009 
5010 // char array promotions/demotions
5011 template void * BackupRestore::convert_array< Hchar, Hchar >(const void *, void *, bool &);
5012 template void * BackupRestore::convert_array< Hchar, Hvarchar >(const void *, void *, bool &);
5013 template void * BackupRestore::convert_array< Hchar, Hlongvarchar >(const void *, void *, bool &);
5014 template void * BackupRestore::convert_array< Hvarchar, Hchar >(const void *, void *, bool &);
5015 template void * BackupRestore::convert_array< Hvarchar, Hvarchar >(const void *, void *, bool &);
5016 template void * BackupRestore::convert_array< Hvarchar, Hlongvarchar >(const void *, void *, bool &);
5017 template void * BackupRestore::convert_array< Hlongvarchar, Hchar >(const void *, void *, bool &);
5018 template void * BackupRestore::convert_array< Hlongvarchar, Hvarchar >(const void *, void *, bool &);
5019 template void * BackupRestore::convert_array< Hlongvarchar, Hlongvarchar >(const void *, void *, bool &);
5020 
5021 // binary array promotions/demotions
5022 template void * BackupRestore::convert_array< Hbinary, Hbinary >(const void *, void *, bool &);
5023 template void * BackupRestore::convert_array< Hbinary, Hvarbinary >(const void *, void *, bool &);
5024 template void * BackupRestore::convert_array< Hbinary, Hlongvarbinary >(const void *, void *, bool &);
5025 template void * BackupRestore::convert_array< Hvarbinary, Hbinary >(const void *, void *, bool &);
5026 template void * BackupRestore::convert_array< Hvarbinary, Hvarbinary >(const void *, void *, bool &);
5027 template void * BackupRestore::convert_array< Hvarbinary, Hlongvarbinary >(const void *, void *, bool &);
5028 template void * BackupRestore::convert_array< Hlongvarbinary, Hbinary >(const void *, void *, bool &);
5029 template void * BackupRestore::convert_array< Hlongvarbinary, Hvarbinary >(const void *, void *, bool &);
5030 template void * BackupRestore::convert_array< Hlongvarbinary, Hlongvarbinary >(const void *, void *, bool &);
5031 
5032 // char to binary promotions/demotions
5033 template void * BackupRestore::convert_array< Hchar, Hbinary >(const void *, void *, bool &);
5034 template void * BackupRestore::convert_array< Hchar, Hvarbinary >(const void *, void *, bool &);
5035 template void * BackupRestore::convert_array< Hchar, Hlongvarbinary >(const void *, void *, bool &);
5036 template void * BackupRestore::convert_array< Hvarchar, Hbinary >(const void *, void *, bool &);
5037 template void * BackupRestore::convert_array< Hvarchar, Hvarbinary >(const void *, void *, bool &);
5038 template void * BackupRestore::convert_array< Hvarchar, Hlongvarbinary >(const void *, void *, bool &);
5039 template void * BackupRestore::convert_array< Hlongvarchar, Hbinary >(const void *, void *, bool &);
5040 template void * BackupRestore::convert_array< Hlongvarchar, Hvarbinary >(const void *, void *, bool &);
5041 template void * BackupRestore::convert_array< Hlongvarchar, Hlongvarbinary >(const void *, void *, bool &);
5042 
5043 // binary array to char array promotions/demotions
5044 template void * BackupRestore::convert_array< Hbinary, Hchar >(const void *, void *, bool &);
5045 template void * BackupRestore::convert_array< Hbinary, Hvarchar >(const void *, void *, bool &);
5046 template void * BackupRestore::convert_array< Hbinary, Hlongvarchar >(const void *, void *, bool &);
5047 template void * BackupRestore::convert_array< Hvarbinary, Hchar >(const void *, void *, bool &);
5048 template void * BackupRestore::convert_array< Hvarbinary, Hvarchar >(const void *, void *, bool &);
5049 template void * BackupRestore::convert_array< Hvarbinary, Hlongvarchar >(const void *, void *, bool &);
5050 template void * BackupRestore::convert_array< Hlongvarbinary, Hchar >(const void *, void *, bool &);
5051 template void * BackupRestore::convert_array< Hlongvarbinary, Hvarchar >(const void *, void *, bool &);
5052 template void * BackupRestore::convert_array< Hlongvarbinary, Hlongvarchar >(const void *, void *, bool &);
5053 
5054 // integral promotions
5055 template void * BackupRestore::convert_integral<Hint8, Hint16>(const void *, void *, bool &);
5056 template void * BackupRestore::convert_integral<Hint8, Hint24>(const void *, void *, bool &);
5057 template void * BackupRestore::convert_integral<Hint8, Hint32>(const void *, void *, bool &);
5058 template void * BackupRestore::convert_integral<Hint8, Hint64>(const void *, void *, bool &);
5059 template void * BackupRestore::convert_integral<Hint16, Hint24>(const void *, void *, bool &);
5060 template void * BackupRestore::convert_integral<Hint16, Hint32>(const void *, void *, bool &);
5061 template void * BackupRestore::convert_integral<Hint16, Hint64>(const void *, void *, bool &);
5062 template void * BackupRestore::convert_integral<Hint24, Hint32>(const void *, void *, bool &);
5063 template void * BackupRestore::convert_integral<Hint24, Hint64>(const void *, void *, bool &);
5064 template void * BackupRestore::convert_integral<Hint32, Hint64>(const void *, void *, bool &);
5065 template void * BackupRestore::convert_integral<Huint8, Huint16>(const void *, void *, bool &);
5066 template void * BackupRestore::convert_integral<Huint8, Huint24>(const void *, void *, bool &);
5067 template void * BackupRestore::convert_integral<Huint8, Huint32>(const void *, void *, bool &);
5068 template void * BackupRestore::convert_integral<Huint8, Huint64>(const void *, void *, bool &);
5069 template void * BackupRestore::convert_integral<Huint16, Huint24>(const void *, void *, bool &);
5070 template void * BackupRestore::convert_integral<Huint16, Huint32>(const void *, void *, bool &);
5071 template void * BackupRestore::convert_integral<Huint16, Huint64>(const void *, void *, bool &);
5072 template void * BackupRestore::convert_integral<Huint24, Huint32>(const void *, void *, bool &);
5073 template void * BackupRestore::convert_integral<Huint24, Huint64>(const void *, void *, bool &);
5074 template void * BackupRestore::convert_integral<Huint32, Huint64>(const void *, void *, bool &);
5075 
5076 // integral demotions
5077 template void * BackupRestore::convert_integral<Hint16, Hint8>(const void *, void *, bool &);
5078 template void * BackupRestore::convert_integral<Hint24, Hint8>(const void *, void *, bool &);
5079 template void * BackupRestore::convert_integral<Hint24, Hint16>(const void *, void *, bool &);
5080 template void * BackupRestore::convert_integral<Hint32, Hint8>(const void *, void *, bool &);
5081 template void * BackupRestore::convert_integral<Hint32, Hint16>(const void *, void *, bool &);
5082 template void * BackupRestore::convert_integral<Hint32, Hint24>(const void *, void *, bool &);
5083 template void * BackupRestore::convert_integral<Hint64, Hint8>(const void *, void *, bool &);
5084 template void * BackupRestore::convert_integral<Hint64, Hint16>(const void *, void *, bool &);
5085 template void * BackupRestore::convert_integral<Hint64, Hint24>(const void *, void *, bool &);
5086 template void * BackupRestore::convert_integral<Hint64, Hint32>(const void *, void *, bool &);
5087 template void * BackupRestore::convert_integral<Huint16, Huint8>(const void *, void *, bool &);
5088 template void * BackupRestore::convert_integral<Huint24, Huint8>(const void *, void *, bool &);
5089 template void * BackupRestore::convert_integral<Huint24, Huint16>(const void *, void *, bool &);
5090 template void * BackupRestore::convert_integral<Huint32, Huint8>(const void *, void *, bool &);
5091 template void * BackupRestore::convert_integral<Huint32, Huint16>(const void *, void *, bool &);
5092 template void * BackupRestore::convert_integral<Huint32, Huint24>(const void *, void *, bool &);
5093 template void * BackupRestore::convert_integral<Huint64, Huint8>(const void *, void *, bool &);
5094 template void * BackupRestore::convert_integral<Huint64, Huint16>(const void *, void *, bool &);
5095 template void * BackupRestore::convert_integral<Huint64, Huint24>(const void *, void *, bool &);
5096 template void * BackupRestore::convert_integral<Huint64, Huint32>(const void *, void *, bool &);
5097 
5098 // integral signedness BackupRestore::conversions
5099 template void * BackupRestore::convert_integral<Hint8, Huint8>(const void *, void *, bool &);
5100 template void * BackupRestore::convert_integral<Hint16, Huint16>(const void *, void *, bool &);
5101 template void * BackupRestore::convert_integral<Hint24, Huint24>(const void *, void *, bool &);
5102 template void * BackupRestore::convert_integral<Hint32, Huint32>(const void *, void *, bool &);
5103 template void * BackupRestore::convert_integral<Hint64, Huint64>(const void *, void *, bool &);
5104 template void * BackupRestore::convert_integral<Huint8, Hint8>(const void *, void *, bool &);
5105 template void * BackupRestore::convert_integral<Huint16, Hint16>(const void *, void *, bool &);
5106 template void * BackupRestore::convert_integral<Huint24, Hint24>(const void *, void *, bool &);
5107 template void * BackupRestore::convert_integral<Huint32, Hint32>(const void *, void *, bool &);
5108 template void * BackupRestore::convert_integral<Huint64, Hint64>(const void *, void *, bool &);
5109 
5110 // integral signedness+promotion BackupRestore::conversions
5111 template void * BackupRestore::convert_integral<Hint8, Huint16>(const void *, void *, bool &);
5112 template void * BackupRestore::convert_integral<Hint8, Huint24>(const void *, void *, bool &);
5113 template void * BackupRestore::convert_integral<Hint8, Huint32>(const void *, void *, bool &);
5114 template void * BackupRestore::convert_integral<Hint8, Huint64>(const void *, void *, bool &);
5115 template void * BackupRestore::convert_integral<Hint16, Huint24>(const void *, void *, bool &);
5116 template void * BackupRestore::convert_integral<Hint16, Huint32>(const void *, void *, bool &);
5117 template void * BackupRestore::convert_integral<Hint16, Huint64>(const void *, void *, bool &);
5118 template void * BackupRestore::convert_integral<Hint24, Huint32>(const void *, void *, bool &);
5119 template void * BackupRestore::convert_integral<Hint24, Huint64>(const void *, void *, bool &);
5120 template void * BackupRestore::convert_integral<Hint32, Huint64>(const void *, void *, bool &);
5121 template void * BackupRestore::convert_integral<Huint8, Hint16>(const void *, void *, bool &);
5122 template void * BackupRestore::convert_integral<Huint8, Hint24>(const void *, void *, bool &);
5123 template void * BackupRestore::convert_integral<Huint8, Hint32>(const void *, void *, bool &);
5124 template void * BackupRestore::convert_integral<Huint8, Hint64>(const void *, void *, bool &);
5125 template void * BackupRestore::convert_integral<Huint16, Hint24>(const void *, void *, bool &);
5126 template void * BackupRestore::convert_integral<Huint16, Hint32>(const void *, void *, bool &);
5127 template void * BackupRestore::convert_integral<Huint16, Hint64>(const void *, void *, bool &);
5128 template void * BackupRestore::convert_integral<Huint24, Hint32>(const void *, void *, bool &);
5129 template void * BackupRestore::convert_integral<Huint24, Hint64>(const void *, void *, bool &);
5130 template void * BackupRestore::convert_integral<Huint32, Hint64>(const void *, void *, bool &);
5131 
5132 // integral signedness+demotion BackupRestore::conversions
5133 template void * BackupRestore::convert_integral<Hint16, Huint8>(const void *, void *, bool &);
5134 template void * BackupRestore::convert_integral<Hint24, Huint8>(const void *, void *, bool &);
5135 template void * BackupRestore::convert_integral<Hint24, Huint16>(const void *, void *, bool &);
5136 template void * BackupRestore::convert_integral<Hint32, Huint8>(const void *, void *, bool &);
5137 template void * BackupRestore::convert_integral<Hint32, Huint16>(const void *, void *, bool &);
5138 template void * BackupRestore::convert_integral<Hint32, Huint24>(const void *, void *, bool &);
5139 template void * BackupRestore::convert_integral<Hint64, Huint8>(const void *, void *, bool &);
5140 template void * BackupRestore::convert_integral<Hint64, Huint16>(const void *, void *, bool &);
5141 template void * BackupRestore::convert_integral<Hint64, Huint24>(const void *, void *, bool &);
5142 template void * BackupRestore::convert_integral<Hint64, Huint32>(const void *, void *, bool &);
5143 template void * BackupRestore::convert_integral<Huint16, Hint8>(const void *, void *, bool &);
5144 template void * BackupRestore::convert_integral<Huint24, Hint8>(const void *, void *, bool &);
5145 template void * BackupRestore::convert_integral<Huint24, Hint16>(const void *, void *, bool &);
5146 template void * BackupRestore::convert_integral<Huint32, Hint8>(const void *, void *, bool &);
5147 template void * BackupRestore::convert_integral<Huint32, Hint16>(const void *, void *, bool &);
5148 template void * BackupRestore::convert_integral<Huint32, Hint24>(const void *, void *, bool &);
5149 template void * BackupRestore::convert_integral<Huint64, Hint8>(const void *, void *, bool &);
5150 template void * BackupRestore::convert_integral<Huint64, Hint16>(const void *, void *, bool &);
5151 template void * BackupRestore::convert_integral<Huint64, Hint24>(const void *, void *, bool &);
5152 template void * BackupRestore::convert_integral<Huint64, Hint32>(const void *, void *, bool &);
5153