1 /*
2    Copyright (c) 2005, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "restore.hpp"
26 #include <signaldata/FsRef.hpp>
27 #include <signaldata/FsConf.hpp>
28 #include <signaldata/FsOpenReq.hpp>
29 #include <signaldata/FsCloseReq.hpp>
30 #include <signaldata/FsReadWriteReq.hpp>
31 #include <signaldata/FsRemoveReq.hpp>
32 #include <signaldata/RestoreImpl.hpp>
33 #include <signaldata/DictTabInfo.hpp>
34 #include <signaldata/KeyInfo.hpp>
35 #include <signaldata/AttrInfo.hpp>
36 #include <signaldata/LqhKey.hpp>
37 #include <AttributeHeader.hpp>
38 #include <md5_hash.hpp>
39 #include <backup/Backup.hpp>
40 #include <dblqh/Dblqh.hpp>
41 #include <dbtup/Dbtup.hpp>
42 #include <KeyDescriptor.hpp>
43 #include <signaldata/DumpStateOrd.hpp>
44 
45 #include <NdbTick.h>
46 #include <EventLogger.hpp>
47 extern EventLogger * g_eventLogger;
48 
49 #define JAM_FILE_ID 453
50 
51 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
52 //#define DEBUG_RES 1
53 //#define DEBUG_RES_OPEN 1
54 //#define DEBUG_RES_PARTS 1
55 //#define DEBUG_RES_STAT 1
56 //#define DEBUG_RES_STAT_EXTRA 1
57 //#define DEBUG_RES_DEL 1
58 //#define DEBUG_HIGH_RES 1
59 #endif
60 
61 #ifdef DEBUG_RES
62 #define DEB_RES(arglist) do { g_eventLogger->info arglist ; } while (0)
63 #else
64 #define DEB_RES(arglist) do { } while (0)
65 #endif
66 
67 #ifdef DEBUG_RES_OPEN
68 #define DEB_RES_OPEN(arglist) do { g_eventLogger->info arglist ; } while (0)
69 #else
70 #define DEB_RES_OPEN(arglist) do { } while (0)
71 #endif
72 
73 #ifdef DEBUG_RES_PARTS
74 #define DEB_RES_PARTS(arglist) do { g_eventLogger->info arglist ; } while (0)
75 #else
76 #define DEB_RES_PARTS(arglist) do { } while (0)
77 #endif
78 
79 #ifdef DEBUG_RES_STAT
80 #define DEB_RES_STAT(arglist) do { g_eventLogger->info arglist ; } while (0)
81 #else
82 #define DEB_RES_STAT(arglist) do { } while (0)
83 #endif
84 
85 #ifdef DEBUG_RES_STAT_EXTRA
86 #define DEB_RES_STAT_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
87 #else
88 #define DEB_RES_STAT_EXTRA(arglist) do { } while (0)
89 #endif
90 
91 #ifdef DEBUG_RES_DEL
92 #define DEB_RES_DEL(arglist) do { g_eventLogger->info arglist ; } while (0)
93 #else
94 #define DEB_RES_DEL(arglist) do { } while (0)
95 #endif
96 
97 #ifdef DEBUG_HIGH_RES
98 #define DEB_HIGH_RES(arglist) do { g_eventLogger->info arglist ; } while (0)
99 #else
100 #define DEB_HIGH_RES(arglist) do { } while (0)
101 #endif
102 
103 /**
104  * Same error codes used by both DBLQH and DBTC.
105  * See Dblqh.hpp and Dbtc.hpp.
106  */
107 #define ZGET_DATAREC_ERROR 418
108 #define ZGET_ATTRINBUF_ERROR 419
109 
110 #define PAGES LCP_RESTORE_BUFFER
111 
Restore(Block_context & ctx,Uint32 instanceNumber)112 Restore::Restore(Block_context& ctx, Uint32 instanceNumber) :
113   SimulatedBlock(RESTORE, ctx, instanceNumber),
114   m_file_list(m_file_pool),
115   m_file_hash(m_file_pool),
116   m_rows_restored(0),
117   m_millis_spent(0),
118   m_frags_restored(0)
119 {
120   BLOCK_CONSTRUCTOR(Restore);
121 
122   // Add received signals
123   addRecSignal(GSN_STTOR, &Restore::execSTTOR);
124   addRecSignal(GSN_DUMP_STATE_ORD, &Restore::execDUMP_STATE_ORD);
125   addRecSignal(GSN_CONTINUEB, &Restore::execCONTINUEB);
126   addRecSignal(GSN_READ_CONFIG_REQ, &Restore::execREAD_CONFIG_REQ, true);
127 
128   addRecSignal(GSN_RESTORE_LCP_REQ, &Restore::execRESTORE_LCP_REQ);
129 
130   addRecSignal(GSN_FSOPENREF, &Restore::execFSOPENREF, true);
131   addRecSignal(GSN_FSOPENCONF, &Restore::execFSOPENCONF);
132   addRecSignal(GSN_FSREADREF, &Restore::execFSREADREF, true);
133   addRecSignal(GSN_FSREADCONF, &Restore::execFSREADCONF);
134   addRecSignal(GSN_FSCLOSEREF, &Restore::execFSCLOSEREF, true);
135   addRecSignal(GSN_FSCLOSECONF, &Restore::execFSCLOSECONF);
136   addRecSignal(GSN_FSREMOVEREF, &Restore::execFSREMOVEREF, true);
137   addRecSignal(GSN_FSREMOVECONF, &Restore::execFSREMOVECONF);
138   addRecSignal(GSN_FSWRITECONF, &Restore::execFSWRITECONF);
139 
140   addRecSignal(GSN_LQHKEYREF, &Restore::execLQHKEYREF);
141   addRecSignal(GSN_LQHKEYCONF, &Restore::execLQHKEYCONF);
142 
143   ndbrequire(sizeof(Column) == 8);
144 }
145 
~Restore()146 Restore::~Restore()
147 {
148 }
149 
BLOCK_FUNCTIONS(Restore)150 BLOCK_FUNCTIONS(Restore)
151 
152 void
153 Restore::execSTTOR(Signal* signal)
154 {
155   jamEntry();
156 
157   c_lqh = (Dblqh*)globalData.getBlock(DBLQH, instance());
158   c_tup = (Dbtup*)globalData.getBlock(DBTUP, instance());
159   c_backup = (Backup*)globalData.getBlock(BACKUP, instance());
160   ndbrequire(c_lqh != 0 && c_tup != 0 && c_backup != 0);
161   sendSTTORRY(signal);
162   return;
163 }//Restore::execNDB_STTOR()
164 
165 void
execREAD_CONFIG_REQ(Signal * signal)166 Restore::execREAD_CONFIG_REQ(Signal* signal)
167 {
168   jamEntry();
169   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
170   Uint32 ref = req->senderRef;
171   Uint32 senderData = req->senderData;
172   ndbrequire(req->noOfParameters == 0);
173 
174   const ndb_mgm_configuration_iterator * p =
175     m_ctx.m_config.getOwnConfigIterator();
176   ndbrequire(p != 0);
177 
178   m_file_pool.setSize(1);
179   Uint32 cnt = 2*MAX_ATTRIBUTES_IN_TABLE;
180   cnt += PAGES;
181   cnt += List::getSegmentSize()-1;
182   cnt /= List::getSegmentSize();
183   cnt += 2;
184   m_databuffer_pool.setSize(cnt);
185 
186   /**
187    * Set up read and write buffer for LCP control files.
188    * We use 1 buffer of 4k in size. So currently no
189    * parallel reads or writes are supported.
190    */
191   NewVARIABLE *bat = allocateBat(1);
192   bat[0].WA = &m_lcp_ctl_file_data[0][0];
193   bat[0].nrr = 2 * (4 * BackupFormat::LCP_CTL_FILE_BUFFER_SIZE_IN_WORDS);
194 
195   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
196   conf->senderRef = reference();
197   conf->senderData = senderData;
198   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
199 	     ReadConfigConf::SignalLength, JBB);
200 }
201 
202 void
sendSTTORRY(Signal * signal)203 Restore::sendSTTORRY(Signal* signal){
204   signal->theData[0] = 0;
205   signal->theData[3] = 1;
206   signal->theData[4] = 255; // No more start phases from missra
207   BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : RESTORE_REF;
208   sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
209 }
210 
211 void
execCONTINUEB(Signal * signal)212 Restore::execCONTINUEB(Signal* signal){
213   jamEntry();
214 
215   switch(signal->theData[0]){
216   case RestoreContinueB::RESTORE_NEXT:
217   {
218     FilePtr file_ptr;
219     m_file_pool.getPtr(file_ptr, signal->theData[1]);
220     restore_next(signal, file_ptr);
221     return;
222   }
223   case RestoreContinueB::READ_FILE:
224   {
225     FilePtr file_ptr;
226     m_file_pool.getPtr(file_ptr, signal->theData[1]);
227     read_data_file(signal, file_ptr);
228     return;
229   }
230   default:
231     ndbabort();
232   }
233 }
234 
235 void
execDUMP_STATE_ORD(Signal * signal)236 Restore::execDUMP_STATE_ORD(Signal* signal){
237   jamEntry();
238 
239   if (signal->theData[0] == DumpStateOrd::RestoreRates)
240   {
241     jam();
242     Uint64 rate = m_rows_restored * 1000 /
243       (m_millis_spent == 0? 1: m_millis_spent);
244 
245     g_eventLogger->info("LDM instance %u: Restored LCP : %u fragments,"
246                         " %llu rows, "
247                         "%llu millis, %llu rows/s",
248                         instance(),
249                         m_frags_restored,
250                         m_rows_restored,
251                         m_millis_spent,
252                         rate);
253     infoEvent("LDM instance %u: Restored LCP : %u fragments, %llu rows, "
254               "%llu millis, %llu rows/s",
255               instance(),
256               m_frags_restored,
257               m_rows_restored,
258               m_millis_spent,
259               rate);
260   }
261 }
262 
263 /**
264  * MODULE: Restore LCP
265  * -------------------
266  * Restore LCP of a fragment
267  * Starts by receiving RESTORE_LCP_REQ and later responding by RESTORE_LCP_CONF
268  * from DBTUP when done.
269  *
270  * Here is a flow chart of what we perform here.
271  * There are 5 main cases:
272  * Case 1) Only valid LCP control file 0 exists
273  * Case 2) Only valid LCP control file 1 exists
274  *
275  *    Perfectly normal cases and common cases. This LCP was completed
276  *    and the previous one was both completed and removed from disk.
277  *
278  * Case 3) Both LCP control file 0 and 1 exists
279  *
280  *    This case is perfectly normal but unusual. It happens when
281  *    we had a crash before completing the removal of the old
282  *    LCP control file.
283  *
284  *    In this case we can either have two valid
285  *    LCP control files or one valid and one invalid.
286  *
287  *    Invalid LCP control files can happen if a crash occurs after opening
288  *    the LCP control file for a second LCP on a fragment and not
289  *    completing it. It can also happen when the crash occurs in the
290  *    middle of writing the LCP control file (should be extremely
291  *    rare or even never happening).
292  *
293  * Case 4) No LCP control file exists (restore of 7.4 and older LCP).
294  *
295  * This is the normal case for an upgrade case.
296  *
297  * Case 5) Only LCP control file 0 exists, but it still is empty or contains
298  *    invalid data. We could also have two invalid LCP control files here.
299  *
300  *    This case is also valid and can happen when we crash during running
301  *    of the very first LCP on a fragment. It could also happen simply
302  *    since we haven't done our first LCP on the fragment yet. In this
303  *    case we should definitely have received lcpNo == ZNIL from LQH
304  *    since DIH will not know of LCPs that we don't know about ourselves.
305  *
306  *    This case can also happen if we have 1 completed LCP control file
307  *    which is not recoverable. In this case the node crashed just before
308  *    completing the GCP that was necessary to make the LCP recoverable.
309  *    Even DIH could know about this LCP but also knows to not try to use
310  *    it. Either way DIH will send lcpNo equal to ZNIL.
311  *
312  * Variable descriptions:
313  * ----------------------
314  * m_ctl_file_no:
315  * --------------
316  * This represents the number of the CTL file currently being processed.
317  * It is set to 0 when opening the first file and 1 when later opening
318  * the second CTL file. It is initialised to Uint32(~0). When an empty
319  * CTL file is created when no LCP is found it is set to 0.
320  *
321  * m_status:
322  * ---------
323  * This variable represents what we are currently doing.
324  * It is a bitmap, so more than one state is possible at any time.
325  *
326  * Initial state is READ_CTL_FILES, this represents reading both CTL
327  * files to discover the state of the LCP.
328  *
329  * FIRST_READ, FILE_THREAD_RUNNING, RESTORE_THREAD_RUNNING, FILE_EOF and
330  * READING_RECORDS are states used when reading data files.
331  * FIRST_READ is the initial state when starting to open the data file.
332  * FILE_THREAD_RUNNING is an indication that a CONTINUEB thread is running
333  * that reads the data file.
334  * RESTORE_THREAD_RUNNING is an indication that a CONTINUEB thread is
335  * running to restore using the data file.
336  * READING_RECORDS is an indication that we are now reading records of the
337  * data file.
338  * FILE_EOF is an indication that the read of the data file is completed.
339  * It is set when FILE_THREAD_RUNNING is reset.
340  *
341  * CREATE_CTL_FILE is a state used when creating a CTL file at times when
342  * no LCP files was found.
343  *
344  * REMOVE_LCP_DATA_FILE is a state used when deleting data files after
345  * reading the CTL files.
346  * REMOVE_LCP_CTL_FILE is a state used when deleting a CTL file after
347  * deleting data files.
348  *
349  * We start in state READ_CTL_FILES, after that we go CREATE_CTL_FILE
350  * if no LCP files were found. If LCP files were found we move to
351  * REMOVE_LCP_DATA_FILE if data files to delete was present, next we
352  * move to REMOVE_LCP_CTL_FILE if necessary to remove a CTL file.
353  *
354  * Finally we move to restore using one or more data files. We restore
355  * one file at a time using the state variables described above for
356  * handling the data file.
357  *
358  * m_outstanding_reads:
359  * --------------------
360  * Used during read of data file to keep track of number of outstanding
361  * FSREADREQ's.
362  *
363  * m_outstanding_operations:
364  * -------------------------
365  * It is used during remove files to keep track of number of outstanding
366  * remove data files that are currently outstanding (we can delete multiple
367  * files in parallel).
368  * It is used during restore to keep track of number of outstanding
369  * LQHKEYREQs.
370  *
371  * m_remove_ctl_file_no:
372  * ---------------------
373  * It is initialised to Uint32(~0). If set to this we won't delete any
374  * CTL files.
375  * When we find no CTL files we drop CTL file 0, we also drop all potential
376  * data files from 0 to max file number.
377  * If a CTL file that isn't restorable is found, then this file number is
378  * set in this variable.
379  * If we find that the other file is newer and restorable then we set this
380  * variable to this file number.
381  *
382  * m_used_ctl_file_no:
383  * -------------------
384  * This variable is set to the CTL file we will use for restore. As soon as
385  * we find a possible candidate it is set to the candidate, we might then
386  * find that the other CTL file is an even better candidate and move the
387  * variable to this number. As long as no CTL file have been found it
388  * remains set to the initial value Uint32(~0).
389  *
390  * m_current_page_ptr_i:
391  * ---------------------
392  * Set to i-value of page we are currently restoring from. We allocate a set
393  * of pages at start of restore and use those pages when reading from file
394  * into those pages.
395  *
396  * m_current_page_pos:
397  * -------------------
398  * Indicates index position on the current page we are restoring.
399  *
400  * m_current_page_index:
401  * ---------------------
402  * Indicates which of the allocated pages we are currently restoring, used
403  * to find the next page. The allocated pages are in an array. So getting
404  * to the next page can be easily accomplished by adding one to this variable.
405  * We use modulo page_count always when getting the page ptr, so this variable
406  * can be constantly incremented.
407  *
408  * m_current_file_page:
409  * --------------------
410  * Used by read file process, keeps track of which page number was the last
411  * one we issued a read on.
412  *
413  * m_bytes_left:
414  * -------------
415  * Incremented with number of bytes read from disk when FSREADCONF arrives.
416  * Decremented by length of record when restoring from file.
417  * Thus keeps track of number of bytes left already read from disk.
418  *
419  * m_rows_restored:
420  * ----------------
421  * Statistical variable, counts number of rows restored (counts LQHKEYCONF's
422  * received). Used to display various stats about the restore.
423  *
424  * m_restore_start_time:
425  * ---------------------
426  * Current millisecond when restore starts. Used to print stats on restore
427  * performance.
428  *
429  * m_restored_gcp_id:
430  * ------------------
431  * This variable keeps track of the GCI we are restoring, no LCP files that
432  * have a newer GCP written can be used. This is either retrieved from
433  * DIH sysfile or local sysfile (if recovering in a not restorable state).
434  * Can be used for upgrade case where we use it to write a CTL file for
435  * an existing LCP that had no CTL files.
436  *
437  * m_restored_lcp_id:
438  * m_restored_local_lcp_id:
439  * m_max_gci_completed:
440  * m_max_gci_written:
441  * m_max_page_cnt:
442  * ------------------------
443  * These five variables are set from the used CTL file. They are initialised
444  * from the RESTORE_LCP_REQ to be used in the upgrade case. In the upgrade
445  * case we will set MaxPageCnt to Uint32(~0).
446  * m_restored_lcp_id and m_restored_local_lcp_id is the id of the LCP used
447  * write the LCP.
448  * m_max_page_cnt is the number of pages that we have ROW ids for in the file.
449  * m_max_gci_written is the maximum GCI written in this LCP.
450  * m_max_gci_completed is the maximum GCI completed when writing this LCP.
451  * m_max_gci_completed can be bigger than m_max_gci_written.
452  *
453  * m_create_gci:
454  * -------------
455  * CreateGCI from RESTORE_LCP_REQ, not used.
456  *
457  * m_file_id:
458  * ----------
459  * File id as described in used CTL file. When multiple files are to be restored
460  * it starts at first and then moves forward. Is between 0 and
461  * BackupFormat::NDB_MAX_LCP_FILES - 1.
462  *
463  * m_max_parts:
464  * ------------
465  * Set from used CTL file. Set to 1 when performing upgrade variant.
466  *
467  * m_max_files:
468  * ------------
469  * Set from used CTL file, normally set to BackupFormat::NDB_MAX_LCP_FILES but
470  * could be set differently when performing downgrade or upgrade. Indicates
471  * maximum files that could be used, this is necessary to know what the file
472  * name is of the next file.
473  *
474  * m_num_files:
475  * ------------
476  * Set from used CTL file. Set to number of files (also number of part pairs)
477  * to restore in the LCP.
478  *
479  * m_current_file_index:
480  * ---------------------
481  * Number of file currently restored, starts at 0 and goes up to
482  * m_num_files - 1 before we're done.
483  *
484  * m_dih_lcp_no:
485  * -------------
486  * In pre-7.6 this indicates data file number, in 7.6 it indicates rather
487  * which CTL file number that DIH thinks should be restored. If this is set
488  * to ZNIL then DIH knows of no LCPs written for this fragment. In this case
489  * we don't really know anything about what we will find since we can even
490  * have both CTL files restorable in this case if local LCPs was executed
491  * as part of restart. However if it is set to 0 or 1, then we should not
492  * be able to not find any files at all. So if we find no CTL file in this
493  * it is an upgrade case.
494  *
495  * m_upgrade_case:
496  * ---------------
497  * Initialised to true, as soon as we find an CTL file whether correct or
498  * not we know that it isn't an upgrade from pre-7.6 versions.
499  *
500  * m_double_lcps_found:
501  * --------------------
502  * Both CTL files found and both were found to be restorable.
503  *
504  * m_found_not_restorable:
505  * -----------------------
506  * We have found one CTL file that wasn't restorable if true.
507  *
508  * m_old_max_files:
509  * ----------------
510  * This is the max files read from CTL file NOT used. It is used to
511  * delete LCP data from the old data files. It is possible that
512  * the new and old CTL files have different max files in an upgrade
513  * or downgrade situation.
514  *
515  * m_num_remove_data_files:
516  * ------------------------
517  * Number of data files to remove, calculated after finding new and old
518  * CTL file. If only one CTL file is found then we cleaned up already during
519  * execution of LCP, so no need to clean up. In this case it is set to 0.
520  *
521  * m_table_id, m_fragment_id, m_table_version:
522  * -------------------------------------------
523  * Triplet describing the partition we are restoring. m_table_id and
524  * m_fragment_id came from RESTORE_LCP_REQ, m_table_version read from
525  * data file.
526  *
527  * The flow chart for Case 1) is here:
528  * -----------------------------------
529  * Open LCP control 0 -> Success
530  * Read LCP control 0 -> Success (read important data into File data)
531  * Close LCP control 0 -> Success
532  * Open LCP control 1 -> Fail
533  * Start restore (starts through open_data_file call)
534  *
535  * The flow chart for Case 2) is here
536  * -----------------------------------
537  * Open LCP control 0 -> Fail
538  * Open LCP control 1 -> Success
539  * Read LCP control 1 -> Success (read important data into File data)
540  * Close LCP control 1 -> Success
541  * Start restore
542  *
543  * The flow chart for Case 3) is here
544  * -----------------------------------
545  * Open LCP control 0 -> Success
546  * Read LCP control 0 -> Success (read important data into File data)
547  * Close LCP control 0 -> Sucess
548  * Open LCP control 1 -> Success
549  * Read LCP control 1 -> Success (calculate which LCP control file to use)
550  * Close LCP control 1 -> Success
551  * Assume here X is the LCP control file NOT used (0 or 1)
552  * Assume here Y is the file number of the file for the NOT used LCP
553  * Remove data file Y -> Success
554  * Remove control file X -> Success
555  * Start restore
556  *
557  * The flow chart for Case 4) is here
558  * ----------------------------------
559  * Open LCP control 0 -> Fail
560  * Open LCP control 1 -> Fail
561  * Create LCP control 0 -> Success
562  * Write LCP control 0 -> Success
563  * Close LCP control 0 -> Success
564  * if (lcpNo == ZNIL) then report Done
565  * else
566  * Remove not used data file
567  * Start restore (this is a certain upgrade)
568  *
569  * The flow chart for Case 5) is here
570  * ----------------------------------
571  * Open LCP control 0 -> Success
572  * Read LCP control 0 -> Success
573  * We discover that the LCP control file is readable but not valid
574  * Close LCP control 0 -> Success
575  * Open LCP control 1 -> Fail
576  * Create LCP control 0 -> Success
577  * Write LCP control 0 -> Success
578  * Close LCP control 0 -> Success
579  * In this case lcpNo must be ZNIL since if there is a CTL file
580  * but not completed then this LCP is written using Partial LCP
581  * code.
582  * ndbrequire(lcpNo == ZNIL) then report Done
583  *
584  * We will always with the following steps the read and close steps are
585  * only needed when open is a success.
586  *
587  * Open LCP control 0
588  * Read LCP control 0
589  * Close LCP control 0
590  * Open LCP control 1
591  * Read LCP control 1
592  * Close LCP control 1
593  *
594  * At this point we know which of the 5 cases we are.
595  * 1) and 2) will simply start the restore
596  * 4) and 5) will create LCP control file 0 and then conditionally restore
597  * 3) needs to remove unneeded LCP control and data file before continuing
598  *
599  * In 7.5 after development of Partial LCPs the LCP files can be in the
600  * following states.
601  *
602  * 1) No files at all
603  *    This state happens immediately after the table has been created and
604  *    the first LCP haven't been started yet.
605  *    This state is covered by Case 4) above and is handled as if the table
606  *    was created in 7.4 or earlier.
607  *
608  * 2) Two empty control files and possibly a not finished data file 0.
609  *    This state happens after the first LCP has started, but not yet
610  *    completed. We could also have only 1 invalid empty control file
611  *    if the crash occurs in the middle of the start of the first LCP.
612  *    In this case there could be a data file 0 which has been created
613  *    but not yet completed.
614  *    This is covered by state 5) above.
615  *
616  * 3) One valid LCP control file, in this case the only the data files
617  *    present in the control file should exist. There could also be an
618  *    invalid LCP control file here after the first LCP have been
619  *    completed.
620  *    This is Case 1) and 2) above.
621  *
622  * 4) Two valid control files. In this case all the data files present
623  *    in any of the control files can be present. There could however
624  *    be ones missing since we could be in the process of deleting an
625  *    LCP after completion of an LCP.
626  *    This is case 3) above.
627  *
628  * Execution of partial LCPs at restore
629  * ------------------------------------
630  * When we are restoring an LCP that consists of multiple data files this
631  * is the algorithm used.
632  * The LCP control file will cover either all parts or a subset of the parts.
633  * We start with the case where it covers all parts.
634  *
635  * When all parts are covered we could have a case where there is overlap in
636  * the parts. Let's use the following example.
637  * Last part: All of part 801-35 (801-1023 and 0-35).
638  * Last part - 1: All of part 554-800
639  * Last part - 2: All of part 287-553
640  * Last part - 3: All of part 18-286
641  *
642  * We need to execute all 4 of those parts (one data file per part). The file
643  * number of the last part is given in the control file and also the maximum
644  * file number is also given in the control file. This means that we can step
645  * backwards and if we step backwards from file number 0 we will step to
646  * file number MaxFileNumbers - 1.
647  *
648  * The above specifies which parts we have all changes for. There will also be
649  * changes present for many other parts in the LCP data file. We will ignore
650  * parts of those.
651  *
652  * We will start here with Last Part - 3. We will ignore everything for parts
653  * 0-35 and 287-1023. We will insert all data pertaining to parts 36-286.
654  * These changes should not contain any deleted rows as these should not be
655  * recorded in parts where we record all rows.
656  *
657  * Next part to restore is Last part - 2. Here we will restore all of parts
658  * 287-553. We will also install all changes related to parts 36-286. We
659  * will ignore parts 0-35 and 554-1024.
660  *
661  * Next part to restore is Last part - 1. Here we will restore all of parts
662  * 554-800 and all changes related to parts 36-553. We will ignore parts 0-35
663  * and parts 801-1023.
664  *
665  * Finally we will restore Last part. Here we will restore all of parts 0-35
666  * and parts 801-1023. We will also restore all changes of rows in parts
667  * 36-800.
668  *
669  * Where we restore all parts we will use INSERT since those rows should not
670  * be present yet. We will also reject the restore if we discover a DELETE row
671  * in any of those parts.
672  *
673  * For parts where we restore changes we will use WRITE instead of INSERT since
674  * the row might already exist. In addition we will accept DELETE rows by
675  * row id.
676  *
677  * For parts that we ignore we will simply skip to next row.
678  *
679  * So we effectively divide rows in those parts into 3 separate categories.
680  *
681  * When we restore an LCP that was not restorable then we will exactly the
682  * same scheme, the only difference is that we will only have some parts
683  * that are restorable. So this LCP isn't usable in a system restart. It will
684  * still be usable in a node restart however.
685  */
686 void
execFSREMOVEREF(Signal * signal)687 Restore::execFSREMOVEREF(Signal *signal)
688 {
689   jamEntry();
690   FsRef * ref = (FsRef*)signal->getDataPtr();
691   const Uint32 ptrI = ref->userPointer;
692   FsConf * conf = (FsConf*)signal->getDataPtr();
693   conf->userPointer = ptrI;
694   execFSREMOVECONF(signal);
695 }
696 
697 void
execFSREMOVECONF(Signal * signal)698 Restore::execFSREMOVECONF(Signal *signal)
699 {
700   jamEntry();
701   FsConf * conf = (FsConf*)signal->getDataPtr();
702   FilePtr file_ptr;
703   m_file_pool.getPtr(file_ptr, conf->userPointer);
704   lcp_remove_old_file_done(signal, file_ptr);
705 }
706 
707 void
execFSWRITECONF(Signal * signal)708 Restore::execFSWRITECONF(Signal *signal)
709 {
710   jamEntry();
711   FsConf *conf = (FsConf*)signal->getDataPtr();
712   FilePtr file_ptr;
713   m_file_pool.getPtr(file_ptr, conf->userPointer);
714   lcp_create_ctl_done_write(signal, file_ptr);
715 }
716 
717 void
lcp_create_ctl_open(Signal * signal,FilePtr file_ptr)718 Restore::lcp_create_ctl_open(Signal *signal, FilePtr file_ptr)
719 {
720   file_ptr.p->m_ctl_file_no = 0;
721   file_ptr.p->m_status = File::CREATE_CTL_FILE;
722 
723   FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
724   req->userReference = reference();
725   req->fileFlags = FsOpenReq::OM_WRITEONLY | FsOpenReq::OM_CREATE;
726 
727   req->userPointer = file_ptr.i;
728 
729   FsOpenReq::setVersion(req->fileNumber, 5);
730   FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_CTL);
731   FsOpenReq::v5_setLcpNo(req->fileNumber, 0);
732   FsOpenReq::v5_setTableId(req->fileNumber, file_ptr.p->m_table_id);
733   FsOpenReq::v5_setFragmentId(req->fileNumber, file_ptr.p->m_fragment_id);
734   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
735 }
736 
737 void
lcp_create_ctl_done_open(Signal * signal,FilePtr file_ptr)738 Restore::lcp_create_ctl_done_open(Signal *signal, FilePtr file_ptr)
739 {
740   struct BackupFormat::LCPCtlFile *lcpCtlFilePtr =
741     (struct BackupFormat::LCPCtlFile*)&m_lcp_ctl_file_data[0][0];
742 
743   memcpy(lcpCtlFilePtr->fileHeader.Magic, BACKUP_MAGIC, 8);
744 
745   lcpCtlFilePtr->fileHeader.BackupVersion = NDBD_USE_PARTIAL_LCP_v2;
746   const Uint32 sz = sizeof(BackupFormat::FileHeader) >> 2;
747   lcpCtlFilePtr->fileHeader.SectionType = BackupFormat::FILE_HEADER;
748   lcpCtlFilePtr->fileHeader.SectionLength = sz - 3;
749   lcpCtlFilePtr->fileHeader.FileType = BackupFormat::LCP_CTL_FILE;
750   lcpCtlFilePtr->fileHeader.BackupId = 0;
751   lcpCtlFilePtr->fileHeader.BackupKey_0 = 0;
752   lcpCtlFilePtr->fileHeader.BackupKey_1 = 0;
753   lcpCtlFilePtr->fileHeader.ByteOrder = 0x12345678;
754   lcpCtlFilePtr->fileHeader.NdbVersion = NDB_VERSION_D;
755   lcpCtlFilePtr->fileHeader.MySQLVersion = NDB_MYSQL_VERSION_D;
756 
757   lcpCtlFilePtr->MaxPartPairs = BackupFormat::NDB_MAX_LCP_PARTS;
758   lcpCtlFilePtr->MaxNumberDataFiles = BackupFormat::NDB_MAX_LCP_FILES;
759   lcpCtlFilePtr->ValidFlag = 0;
760   lcpCtlFilePtr->TableId = file_ptr.p->m_table_id;
761   lcpCtlFilePtr->FragmentId = file_ptr.p->m_fragment_id;
762   /**
763    * There are a couple of possibilities here:
764    * 1) DIH knows about the LCP, this is indicated by m_dih_lcp_no set to
765    *    0 or 1. In this case if we come here it means we're doing the
766    *    upgrade case and we can rely on that there is a correct data file
767    *    and we take the opportunity to create a CTL file for this
768    *    fragment here as well.
769    *
770    * 2) DIH knows about no data files, in this case there is no data file
771    *    since by coming here we have concluded that we found no correct
772    *    CTL file, so thus there is no data file both according to DIH
773    *    and according to the non-presence of correct CTL files.
774    */
775   if (file_ptr.p->m_dih_lcp_no == ZNIL ||
776       file_ptr.p->m_used_ctl_file_no == Uint32(~0))
777   {
778     /**
779      * We have no checkpointed data file yet, so we will write an initial
780      * LCP control file. This could be either upgrade case or not.
781      */
782     jam();
783     lcpCtlFilePtr->CreateGci = file_ptr.p->m_create_gci;
784     lcpCtlFilePtr->MaxGciWritten = 0;
785     lcpCtlFilePtr->MaxGciCompleted = 0;
786     lcpCtlFilePtr->LastDataFileNumber = 0;
787     lcpCtlFilePtr->LcpId = 0;
788     lcpCtlFilePtr->LocalLcpId = 0;
789     lcpCtlFilePtr->MaxPageCount = 0;
790   }
791   else
792   {
793     jam();
794     /**
795      * We have the upgrade case where DIH knows about a data file that there
796      * is no CTL file defined for. We create a correct data file before
797      * proceeding.
798      * This is Case 4) above
799      */
800     ndbrequire(file_ptr.p->m_upgrade_case);
801     ndbrequire(file_ptr.p->m_dih_lcp_no == 0 ||
802                file_ptr.p->m_dih_lcp_no == 1);
803     lcpCtlFilePtr->ValidFlag = 1;
804     lcpCtlFilePtr->CreateGci = file_ptr.p->m_create_gci;
805     lcpCtlFilePtr->MaxGciWritten = file_ptr.p->m_restored_gcp_id;
806     lcpCtlFilePtr->MaxGciCompleted = file_ptr.p->m_max_gci_completed;
807     lcpCtlFilePtr->LastDataFileNumber = file_ptr.p->m_dih_lcp_no;
808     lcpCtlFilePtr->LcpId = file_ptr.p->m_restored_lcp_id;
809     lcpCtlFilePtr->LocalLcpId = 0;
810     lcpCtlFilePtr->MaxPageCount = (~0);
811   }
812   struct BackupFormat::PartPair locPartPair;
813   locPartPair.startPart = 0;
814   locPartPair.numParts = BackupFormat::NDB_MAX_LCP_PARTS;
815   lcpCtlFilePtr->partPairs[0] = locPartPair;
816   lcpCtlFilePtr->NumPartPairs = 1;
817 
818   /**
819    * Since the LCP control file will only contain 1 part we are
820    * certain that we will fit in the small LCP control file size.
821    */
822   c_backup->convert_ctl_page_to_network((Uint32*)lcpCtlFilePtr,
823                               BackupFormat::NDB_LCP_CTL_FILE_SIZE_SMALL);
824   FsReadWriteReq *req = (FsReadWriteReq*)signal->getDataPtrSend();
825 
826   req->userPointer = file_ptr.i;
827   req->filePointer = file_ptr.p->m_fd;
828   req->userReference = reference();
829   req->varIndex = 0;
830   req->numberOfPages = 1;
831   req->operationFlag = 0;
832   FsReadWriteReq::setFormatFlag(req->operationFlag,
833                                 FsReadWriteReq::fsFormatMemAddress);
834   FsReadWriteReq::setSyncFlag(req->operationFlag, 1);
835 
836   /**
837    * Data will be written from m_lcp_ctl_file_data as prepared by Bat */
838   req->data.memoryAddress.memoryOffset = 0;
839   req->data.memoryAddress.fileOffset = 0;
840   req->data.memoryAddress.size = BackupFormat::NDB_LCP_CTL_FILE_SIZE_SMALL;
841 
842   sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
843              FsReadWriteReq::FixedLength + 3, JBA);
844 }
845 
846 void
lcp_create_ctl_done_write(Signal * signal,FilePtr file_ptr)847 Restore::lcp_create_ctl_done_write(Signal *signal, FilePtr file_ptr)
848 {
849   close_file(signal, file_ptr);
850 }
851 
852 void
lcp_create_ctl_done_close(Signal * signal,FilePtr file_ptr)853 Restore::lcp_create_ctl_done_close(Signal *signal, FilePtr file_ptr)
854 {
855   if (file_ptr.p->m_dih_lcp_no == ZNIL ||
856       file_ptr.p->m_used_ctl_file_no == Uint32(~0))
857   {
858     /**
859      * We have created an LCP control file, DIH knew not about any
860      * recoverable LCP for this fragment. We have already removed
861      * old LCP files not recoverable, so we're ready to move on
862      * from here.
863      */
864     jam();
865     /**
866      * Done with Case 4) or 5) without upgrade case
867      * --------------------------------------------
868      * We are done, there was no data file to restore, but we have
869      * created an LCP control file, so things should be fine now.
870      * We fake start of restore and end of restore to signal back
871      * the RESTORE_LCP_CONF and other reporting properly done.
872      * We set LCP id and local LCP id to indicate to LQH that no
873      * restorable LCP was found.
874      */
875     c_tup->start_restore_lcp(file_ptr.p->m_table_id,
876                              file_ptr.p->m_fragment_id);
877     jamEntry();
878     ndbrequire(file_ptr.p->m_outstanding_operations == 0);
879     DEB_RES(("(%u)restore_lcp_conf", instance()));
880     file_ptr.p->m_restored_lcp_id = 0;
881     file_ptr.p->m_restored_local_lcp_id = 0;
882     restore_lcp_conf(signal, file_ptr);
883     return;
884   }
885   else if (file_ptr.p->m_dih_lcp_no == 0 ||
886            file_ptr.p->m_dih_lcp_no == 1)
887   {
888     /**
889      * Case 4) Upgrade case
890      * --------------------
891      * We will clean away any old LCP data file that was not reported as
892      * the one to restore. So if we will use 0 to restore we will
893      * remove 1 and vice versa.
894      */
895     jam();
896     ndbrequire(file_ptr.p->m_upgrade_case);
897     file_ptr.p->m_status = File::CREATE_CTL_FILE;
898     lcp_remove_old_file(signal,
899                         file_ptr,
900                         file_ptr.p->m_dih_lcp_no == 0 ? 1 : 0,
901                         false);
902     return;
903   }
904   else
905   {
906     ndbabort();
907   }
908 }
909 
910 void
lcp_remove_old_file(Signal * signal,FilePtr file_ptr,Uint32 file_number,bool is_ctl_file)911 Restore::lcp_remove_old_file(Signal *signal,
912                                   FilePtr file_ptr,
913                                   Uint32 file_number,
914                                   bool is_ctl_file)
915 {
916   file_ptr.p->m_outstanding_operations++;
917   FsRemoveReq * req = (FsRemoveReq*)signal->getDataPtrSend();
918   req->userReference = reference();
919   req->userPointer = file_ptr.i;
920   req->directory = 0;
921   req->ownDirectory = 0;
922   FsOpenReq::setVersion(req->fileNumber, 5);
923   if (is_ctl_file)
924   {
925     jam();
926     FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_CTL);
927     DEB_RES(("(%u)tab(%u,%u) Delete control file number: %u",
928              instance(),
929              file_ptr.p->m_table_id,
930              file_ptr.p->m_fragment_id,
931              file_number));
932   }
933   else
934   {
935     jam();
936     DEB_RES(("tab(%u,%u) Delete data file number: %u",
937              file_ptr.p->m_table_id,
938              file_ptr.p->m_fragment_id,
939              file_number));
940     FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
941   }
942   FsOpenReq::v5_setLcpNo(req->fileNumber, file_number);
943   FsOpenReq::v5_setTableId(req->fileNumber, file_ptr.p->m_table_id);
944   FsOpenReq::v5_setFragmentId(req->fileNumber, file_ptr.p->m_fragment_id);
945   sendSignal(NDBFS_REF, GSN_FSREMOVEREQ, signal,
946              FsRemoveReq::SignalLength, JBA);
947 }
948 
949 void
lcp_remove_old_file_done(Signal * signal,FilePtr file_ptr)950 Restore::lcp_remove_old_file_done(Signal *signal, FilePtr file_ptr)
951 {
952   ndbrequire(file_ptr.p->m_outstanding_operations > 0);
953   file_ptr.p->m_outstanding_operations--;
954   if (file_ptr.p->m_outstanding_operations > 0)
955   {
956     jam();
957     return;
958   }
959   switch (file_ptr.p->m_status)
960   {
961     case File::CREATE_CTL_FILE:
962     {
963       /**
964        * END of UPGRADE PATH
965        * -------------------
966        * We are done creating a new LCP control file and removing
967        * any half-written data files still lingering. It is the
968        * normal path for case 4) for upgrades but could also happen
969        * in case 5) where a crash occurred in an early phase of the
970        * fragments lifetime.
971        * Done with Case 4) and 5)
972        * ------------------------
973        * We are now ready to follow the normal path for restoring
974        * a fragment. The information needed to complete the
975        * restore is available now in the File object.
976        */
977       jam();
978       DEB_RES(("(%u)start_restore_lcp_upgrade", instance()));
979       start_restore_lcp_upgrade(signal, file_ptr);
980       return;
981     }
982     case File::REMOVE_LCP_DATA_FILE:
983     {
984       jam();
985       /**
986        * Case 3) completed data file removal
987        * -----------------------------------
988        * We are starting up a normal restore, we found 2 LCP control files,
989        * this is a normal condition, we will always remove any unneeded
990        * LCP files as part of restore. We are now done with data file and
991        * will continue with LCP control file.
992        */
993       DEB_RES(("(%u)Case 3 discovered after remove", instance()));
994       ndbrequire(file_ptr.p->m_num_remove_data_files > 0);
995       file_ptr.p->m_num_remove_data_files--;
996       if (file_ptr.p->m_num_remove_data_files > 0)
997       {
998         jam();
999         if (file_ptr.p->m_remove_data_file_no ==
1000             (file_ptr.p->m_old_max_files - 1))
1001         {
1002           jam();
1003           file_ptr.p->m_remove_data_file_no = 0;
1004         }
1005         else
1006         {
1007           jam();
1008           file_ptr.p->m_remove_data_file_no++;
1009         }
1010         lcp_remove_old_file(signal,
1011                             file_ptr,
1012                             file_ptr.p->m_remove_data_file_no,
1013                             false);
1014       }
1015       else
1016       {
1017         jam();
1018         file_ptr.p->m_status = File::REMOVE_LCP_CTL_FILE;
1019         lcp_remove_old_file(signal,
1020                             file_ptr,
1021                             file_ptr.p->m_remove_ctl_file_no,
1022                             true);
1023       }
1024       return;
1025     }
1026     case File::REMOVE_LCP_CTL_FILE:
1027     {
1028       jam();
1029       /**
1030        * Case 3) is completed or Case 4 or Case 5) completed file removal
1031        * ----------------------------------------------------------------
1032        * Done with removal of both data file and control file of LCP
1033        * not used for restore. We are now ready to start restore for
1034        * Case 3, for Case 5 we will create an empty LCP control file
1035        * 0 first.
1036        */
1037       DEB_RES(("(%u)start_restore_lcp", instance()));
1038       if (file_ptr.p->m_used_ctl_file_no == Uint32(~0))
1039       {
1040         jam();
1041         lcp_create_ctl_open(signal, file_ptr);
1042         return;
1043       }
1044       start_restore_lcp(signal, file_ptr);
1045       return;
1046     }
1047     default:
1048     {
1049       ndbabort();
1050       return;
1051     }
1052   }
1053 }
1054 
1055 void
open_ctl_file(Signal * signal,FilePtr file_ptr,Uint32 lcp_no)1056 Restore::open_ctl_file(Signal *signal, FilePtr file_ptr, Uint32 lcp_no)
1057 {
1058   /* Keep track of which ctl file we're currently dealing with. */
1059   file_ptr.p->m_ctl_file_no = lcp_no;
1060 
1061   FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
1062   req->userReference = reference();
1063   req->fileFlags = FsOpenReq::OM_READONLY;
1064   req->userPointer = file_ptr.i;
1065 
1066   FsOpenReq::setVersion(req->fileNumber, 5);
1067   FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_CTL);
1068   FsOpenReq::v5_setLcpNo(req->fileNumber, lcp_no);
1069   FsOpenReq::v5_setTableId(req->fileNumber, file_ptr.p->m_table_id);
1070   FsOpenReq::v5_setFragmentId(req->fileNumber, file_ptr.p->m_fragment_id);
1071   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
1072 }
1073 
1074 void
open_ctl_file_done_ref(Signal * signal,FilePtr file_ptr)1075 Restore::open_ctl_file_done_ref(Signal *signal, FilePtr file_ptr)
1076 {
1077   if (file_ptr.p->m_ctl_file_no == 1)
1078   {
1079     if (file_ptr.p->m_used_ctl_file_no == Uint32(~0))
1080     {
1081       jam();
1082       /**
1083        * Case 4) discovered
1084        * ------------------
1085        * UPGRADE PATH when restoring an older MySQL Cluster version
1086        * ----------------------------------------------------------
1087        * We are done reading the LCP control files. If no one was found we will
1088        * assume that this is an LCP produced by an older version without LCP
1089        * control files.
1090        *
1091        * In the new format we always have a control file, even when there is
1092        * no LCP executed yet. We create this control file indicating an empty
1093        * set of LCP files before we continue restoring the data.
1094        *
1095        * We could come here also with a too new LCP completed and we create
1096        * an empty one also in this case since it will overwrite the old one.
1097        *
1098        * We could also come here when we have completed the LCP, but the LCP
1099        * control file is still invalid since we haven't ensured that the
1100        * LCP is safe yet by calling sync_lsn. In this case we can even have
1101        * a case where DIH thinks we have completed an LCP but we haven't
1102        * actually done so yet.
1103        */
1104       if (file_ptr.p->m_upgrade_case)
1105       {
1106         jam();
1107         DEB_RES(("(%u)Case 4 with upgrade discovered", instance()));
1108         lcp_create_ctl_open(signal, file_ptr);
1109       }
1110       else
1111       {
1112         jam();
1113         DEB_RES(("(%u)Case 4 without upgrade discovered", instance()));
1114         file_ptr.p->m_remove_ctl_file_no = 0;
1115         file_ptr.p->m_remove_data_file_no = 0;
1116         file_ptr.p->m_num_remove_data_files = BackupFormat::NDB_MAX_FILES_PER_LCP;
1117         file_ptr.p->m_status = File::REMOVE_LCP_DATA_FILE;
1118         lcp_remove_old_file(signal,
1119                             file_ptr,
1120                             file_ptr.p->m_remove_data_file_no,
1121                             false);
1122       }
1123       return;
1124     }
1125     else
1126     {
1127       /**
1128        * Case 1) discovered
1129        * ------------------
1130        * Normal behaviour, we had no LCP control file 1, but we had an LCP
1131        * control file 0, so we will use this to perform the restore. It is
1132        * already set up and ready to proceed with the restore. In this case
1133        * when there is only one LCP control file then we trust that there is
1134        * no LCP data files not needed. We always remove the data files of an
1135        * LCP before we remove the LCP control file of an LCP. So it is safe
1136        * to continue restoring now, we have 1 LCP control file and 1 set of
1137        * LCP data files that all are needed and described by the LCP control
1138        * file.
1139        */
1140       jam();
1141       DEB_RES(("(%u)Case 1 discovered", instance()));
1142       DEB_RES(("(%u)Use ctl file: 0, 1 not exist, Lcp(%u,%u), GCI_C: %u,"
1143                " GCI_W: %u, MPC: %u",
1144                 instance(),
1145                 file_ptr.p->m_restored_lcp_id,
1146                 file_ptr.p->m_restored_local_lcp_id,
1147                 file_ptr.p->m_max_gci_completed,
1148                 file_ptr.p->m_max_gci_written,
1149                 file_ptr.p->m_max_page_cnt));
1150       ndbrequire(!file_ptr.p->m_found_not_restorable);
1151       start_restore_lcp(signal, file_ptr);
1152       return;
1153     }
1154   }
1155   else
1156   {
1157     jam();
1158     ndbrequire(file_ptr.p->m_ctl_file_no == 0);
1159     /**
1160      * We found no LCP control file 0, this can be normal, so we will now
1161      * instead open LCP control file 1.
1162      */
1163     DEB_RES(("(%u)open_ctl_file( 1 )", instance()));
1164     open_ctl_file(signal, file_ptr, 1);
1165     return;
1166   }
1167 }
1168 
1169 void
calculate_remove_new_data_files(FilePtr file_ptr)1170 Restore::calculate_remove_new_data_files(FilePtr file_ptr)
1171 {
1172   Uint32 new_ctl_no = file_ptr.p->m_remove_ctl_file_no;
1173   Uint32 old_ctl_no = new_ctl_no == 0 ? 1 : 0;
1174 
1175   ndbrequire(new_ctl_no < 2);
1176   BackupFormat::LCPCtlFile *oldLcpCtlFilePtr = (BackupFormat::LCPCtlFile*)
1177     &m_lcp_ctl_file_data[old_ctl_no][0];
1178   BackupFormat::LCPCtlFile *newLcpCtlFilePtr = (BackupFormat::LCPCtlFile*)
1179     &m_lcp_ctl_file_data[new_ctl_no][0];
1180 
1181   Uint32 old_last_file = oldLcpCtlFilePtr->LastDataFileNumber;
1182   Uint32 new_last_file = newLcpCtlFilePtr->LastDataFileNumber;
1183 
1184   Uint32 new_max_files = newLcpCtlFilePtr->MaxNumberDataFiles;
1185   Uint32 old_max_files = oldLcpCtlFilePtr->MaxNumberDataFiles;
1186 
1187   ndbrequire(new_max_files == old_max_files);
1188   ndbrequire(new_max_files == BackupFormat::NDB_MAX_LCP_FILES);
1189 
1190   /**
1191    * Calculate first file to remove.
1192    */
1193   Uint32 first_remove_file = new_last_file;
1194   Uint32 num_remove_files = 0;
1195   if (new_last_file == old_last_file)
1196   {
1197     /**
1198      * We could end up here after a number of unsuccessful restarts.
1199      * The LCP to remove was possibly changing the GCP written, but it
1200      * didn't contain any real changes to the data, so the same data
1201      * file was used again. We simply return and continue the restart.
1202      */
1203     jam();
1204     return;
1205   }
1206   while (1)
1207   {
1208     Uint32 next_remove_file = first_remove_file;
1209     num_remove_files++;
1210     if (next_remove_file == 0)
1211     {
1212       jam();
1213       next_remove_file = old_max_files - 1;
1214     }
1215     else
1216     {
1217       jam();
1218       next_remove_file--;
1219     }
1220     if (next_remove_file == old_last_file)
1221     {
1222       jam();
1223       break;
1224     }
1225     first_remove_file = next_remove_file;
1226   }
1227   ndbrequire(num_remove_files > 0);
1228   file_ptr.p->m_remove_data_file_no = first_remove_file;
1229   file_ptr.p->m_num_remove_data_files = num_remove_files;
1230   file_ptr.p->m_old_max_files = old_max_files;
1231 }
1232 
1233 void
calculate_remove_old_data_files(FilePtr file_ptr)1234 Restore::calculate_remove_old_data_files(FilePtr file_ptr)
1235 {
1236   Uint32 old_ctl_no = file_ptr.p->m_remove_ctl_file_no;
1237   Uint32 new_ctl_no = old_ctl_no == 0 ? 1 : 0;
1238 
1239   ndbrequire(old_ctl_no < 2);
1240   BackupFormat::LCPCtlFile *oldLcpCtlFilePtr = (BackupFormat::LCPCtlFile*)
1241     &m_lcp_ctl_file_data[old_ctl_no][0];
1242   BackupFormat::LCPCtlFile *newLcpCtlFilePtr = (BackupFormat::LCPCtlFile*)
1243     &m_lcp_ctl_file_data[new_ctl_no][0];
1244 
1245   Uint32 new_parts = newLcpCtlFilePtr->NumPartPairs;
1246   Uint32 old_parts = oldLcpCtlFilePtr->NumPartPairs;
1247 
1248   Uint32 old_last_file = oldLcpCtlFilePtr->LastDataFileNumber;
1249   Uint32 new_last_file = newLcpCtlFilePtr->LastDataFileNumber;
1250 
1251   Uint32 new_max_files = newLcpCtlFilePtr->MaxNumberDataFiles;
1252   Uint32 old_max_files = oldLcpCtlFilePtr->MaxNumberDataFiles;
1253 
1254   ndbrequire(new_max_files == old_max_files);
1255   ndbrequire(new_max_files == BackupFormat::NDB_MAX_LCP_FILES);
1256   ndbrequire(new_parts > 0);
1257   ndbrequire(old_parts > 0);
1258   /**
1259    * new_parts can never be bigger than old_parts + 1. This happens
1260    * when the LCP adds one more data file, but removes no data file
1261    * from the old LCPs. So when old_parts + 1 = new_parts then we
1262    * should remove 0 data files. When we have removed parts in new
1263    * LCP, then new_parts will be smaller and thus
1264    * old_parts + 1 - new_parts will be the number of parts to remove
1265    * from old LCP.
1266    */
1267   Uint32 new_files = 0;
1268   Uint32 loop_file = new_last_file;
1269   while (loop_file != old_last_file)
1270   {
1271     new_files++;
1272     if (loop_file == 0)
1273     {
1274       jam();
1275       loop_file = old_max_files - 1;
1276     }
1277     else
1278     {
1279       jam();
1280       loop_file--;
1281     }
1282   }
1283   /* new_files can be 0 in cases where new_parts == old_parts */
1284   ndbrequire(new_files != 0 || new_parts == old_parts);
1285   Uint32 remove_parts = (old_parts + new_files) - new_parts;
1286   file_ptr.p->m_num_remove_data_files = remove_parts;
1287 
1288   if (remove_parts == 0)
1289   {
1290     jam();
1291     return;
1292   }
1293 
1294   /**
1295    * Calculate first file to remove.
1296    */
1297   Uint32 first_remove_file = old_last_file;
1298   for (Uint32 i = 0; i < (old_parts - 1); i++)
1299   {
1300     if (first_remove_file == 0)
1301     {
1302       jam();
1303       first_remove_file = old_max_files - 1;
1304     }
1305     else
1306     {
1307       jam();
1308       first_remove_file--;
1309     }
1310   }
1311   file_ptr.p->m_remove_data_file_no = first_remove_file;
1312   file_ptr.p->m_old_max_files = old_max_files;
1313 }
1314 
1315 void
open_ctl_file_done_conf(Signal * signal,FilePtr file_ptr)1316 Restore::open_ctl_file_done_conf(Signal *signal, FilePtr file_ptr)
1317 {
1318   file_ptr.p->m_upgrade_case = false;
1319 
1320   FsReadWriteReq *req = (FsReadWriteReq*)signal->getDataPtrSend();
1321   req->userPointer = file_ptr.i;
1322   req->filePointer = file_ptr.p->m_fd;
1323   req->userReference = reference();
1324   req->varIndex = 0;
1325   req->numberOfPages = 1;
1326   req->operationFlag = 0;
1327   FsReadWriteReq::setFormatFlag(req->operationFlag,
1328                                 FsReadWriteReq::fsFormatMemAddress);
1329   FsReadWriteReq::setPartialReadFlag(req->operationFlag, 1);
1330 
1331   /**
1332    * Data will be written from m_lcp_ctl_file_data as prepared by Bat */
1333   req->data.memoryAddress.memoryOffset =
1334     file_ptr.p->m_ctl_file_no *
1335       (BackupFormat::LCP_CTL_FILE_BUFFER_SIZE_IN_WORDS * 4);
1336   req->data.memoryAddress.fileOffset = 0;
1337   req->data.memoryAddress.size = BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG;
1338 
1339   sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
1340              FsReadWriteReq::FixedLength + 3, JBA);
1341 }
1342 
1343 void
read_ctl_file_done(Signal * signal,FilePtr file_ptr,Uint32 bytesRead)1344 Restore::read_ctl_file_done(Signal *signal, FilePtr file_ptr, Uint32 bytesRead)
1345 {
1346   /**
1347    * We read the LCP control file, we really want at this point to know
1348    * the following things.
1349    * 1) LCP id of this control file
1350    * 2) GCI completed, this makes it possible to shorten REDO log execution
1351    * 3) GCI written, if this is higher than the restored GCI than the LCP
1352    *    is not useful, in this case we should have an older LCP control file
1353    *    still there, otherwise the system is not restorable.
1354    * 4) Data file number to make sure we read the correct data file.
1355    *
1356    * The remainder of the information is used to verify that it is a correct
1357    * LCP control file and which version that have created it. We will only
1358    * go ahead if the LCP control is correct and we have the ability to
1359    * read it.
1360    *
1361    * We need to read both LCP control files, if one is missing then we use
1362    * the one we found. If both are present then we decide to use the newest
1363    * restorable LCP.
1364    * To handle case 3) we need to record which LCP control file we don't
1365    * use such that we can remove the LCP control file and LCP data file
1366    * belonging to this LCP which we will no longer use.
1367    *
1368    * When we come here the contents of the LCP control file is stored in
1369    * the m_lcp_ctl_file_data variable.
1370    */
1371   ndbrequire(file_ptr.p->m_ctl_file_no < 2);
1372   BackupFormat::LCPCtlFile *lcpCtlFilePtr = (BackupFormat::LCPCtlFile*)
1373     &m_lcp_ctl_file_data[file_ptr.p->m_ctl_file_no];
1374 
1375   if (bytesRead != BackupFormat::NDB_LCP_CTL_FILE_SIZE_SMALL &&
1376       bytesRead != BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG)
1377   {
1378     /**
1379      * Invalid file, probably still no data written. We will remove it
1380      * as we close it.
1381      */
1382     jam();
1383     ndbassert(bytesRead == 0);
1384     ndbrequire(!file_ptr.p->m_found_not_restorable);
1385     close_file(signal, file_ptr, true);
1386     return;
1387   }
1388   if (!c_backup->convert_ctl_page_to_host(lcpCtlFilePtr))
1389   {
1390     /* Invalid file data */
1391     jam();
1392     ndbassert(false);
1393     ndbrequire(!file_ptr.p->m_found_not_restorable);
1394     close_file(signal, file_ptr, true);
1395     return;
1396   }
1397   if (lcpCtlFilePtr->MaxGciWritten == 0 &&
1398       lcpCtlFilePtr->MaxGciCompleted == 0 &&
1399       lcpCtlFilePtr->ValidFlag == 0 &&
1400       lcpCtlFilePtr->LcpId == 0 &&
1401       lcpCtlFilePtr->LocalLcpId == 0 &&
1402       lcpCtlFilePtr->LastDataFileNumber == 0 &&
1403       lcpCtlFilePtr->MaxPageCount == 0)
1404   {
1405     jam();
1406     g_eventLogger->debug("Found empty LCP control file, "
1407                          "must have been created by earlier restart,"
1408                          " tab(%u,%u), CTL file: %u",
1409                          file_ptr.p->m_table_id,
1410                          file_ptr.p->m_fragment_id,
1411                          file_ptr.p->m_ctl_file_no);
1412 
1413     /**
1414      * An empty initialised LCP control file was found, this must have
1415      * been created by previous restart attempt. We will ignore it and
1416      * act as if we didn't see the LCP control file at all.
1417      */
1418     ndbrequire(!file_ptr.p->m_found_not_restorable);
1419     close_file(signal, file_ptr, true);
1420     return;
1421   }
1422 
1423   const Uint32 sz = sizeof(BackupFormat::FileHeader) >> 2;
1424   if ((memcmp(BACKUP_MAGIC, lcpCtlFilePtr->fileHeader.Magic, 8) != 0) ||
1425       ((lcpCtlFilePtr->fileHeader.BackupVersion != NDBD_USE_PARTIAL_LCP_v1) &&
1426        (lcpCtlFilePtr->fileHeader.BackupVersion != NDBD_USE_PARTIAL_LCP_v2)) ||
1427       (lcpCtlFilePtr->fileHeader.SectionType != BackupFormat::FILE_HEADER) ||
1428       (lcpCtlFilePtr->fileHeader.SectionLength != (sz - 3)) ||
1429       (lcpCtlFilePtr->fileHeader.FileType != BackupFormat::LCP_CTL_FILE) ||
1430       (lcpCtlFilePtr->TableId != file_ptr.p->m_table_id) ||
1431       (lcpCtlFilePtr->FragmentId != file_ptr.p->m_fragment_id))
1432   {
1433     jam();
1434     g_eventLogger->debug("LCP Control file inconsistency, tab(%u,%u)"
1435                          ", CTL file: %u",
1436                          file_ptr.p->m_table_id,
1437                          file_ptr.p->m_fragment_id,
1438                          file_ptr.p->m_ctl_file_no);
1439     ndbrequire(!file_ptr.p->m_found_not_restorable);
1440     close_file(signal, file_ptr, true);
1441     return;
1442   }
1443 
1444   /**
1445    * Now we are ready to read the parts of the LCP control file that we need
1446    * to know to handle the restore correctly.
1447    */
1448   Uint32 validFlag = lcpCtlFilePtr->ValidFlag;
1449   Uint32 createGci = lcpCtlFilePtr->CreateGci;
1450   Uint32 maxGciCompleted = lcpCtlFilePtr->MaxGciCompleted;
1451   Uint32 maxGciWritten = lcpCtlFilePtr->MaxGciWritten;
1452   Uint32 lcpId = lcpCtlFilePtr->LcpId;
1453   Uint32 localLcpId = lcpCtlFilePtr->LocalLcpId;
1454   Uint32 maxPageCnt = lcpCtlFilePtr->MaxPageCount;
1455   Uint32 createTableVersion = lcpCtlFilePtr->CreateTableVersion;
1456   Uint32 lcpCtlVersion = lcpCtlFilePtr->fileHeader.BackupVersion;
1457   Uint64 rowCount = Uint64(lcpCtlFilePtr->RowCountLow) +
1458                     (Uint64(lcpCtlFilePtr->RowCountHigh) << 32);
1459 
1460   if (createTableVersion == 0)
1461   {
1462     jam();
1463     /**
1464      * LCP control file was created during table drop, simply set the valid flag
1465      * to 0 and ignore the LCP control file.
1466      */
1467     createTableVersion = c_lqh->getCreateSchemaVersion(file_ptr.p->m_table_id);
1468     validFlag = 0;
1469   }
1470 
1471   if (createTableVersion !=
1472       c_lqh->getCreateSchemaVersion(file_ptr.p->m_table_id))
1473   {
1474     jam();
1475     g_eventLogger->debug("(%u)Found LCP control file from old table"
1476                          ", drop table haven't cleaned up properly"
1477                          ", tab(%u,%u).%u (now %u), createGci:%u,"
1478                          " maxGciCompleted: %u"
1479                          ", maxGciWritten: %u, restored createGci: %u",
1480                          instance(),
1481                          file_ptr.p->m_table_id,
1482                          file_ptr.p->m_fragment_id,
1483                          createTableVersion,
1484                          c_lqh->getCreateSchemaVersion(file_ptr.p->m_table_id),
1485                          createGci,
1486                          maxGciCompleted,
1487                          maxGciWritten,
1488                          file_ptr.p->m_create_gci);
1489     file_ptr.p->m_status = File::DROP_OLD_FILES;
1490     file_ptr.p->m_remove_ctl_file_no = file_ptr.p->m_ctl_file_no == 0 ? 1 : 0;
1491     file_ptr.p->m_remove_data_file_no = 0;
1492     file_ptr.p->m_num_remove_data_files = BackupFormat::NDB_MAX_FILES_PER_LCP;
1493     ndbrequire(file_ptr.p->m_used_ctl_file_no == ~Uint32(0));
1494     close_file(signal, file_ptr, true);
1495     return;
1496   }
1497   else if (maxGciWritten > file_ptr.p->m_restored_gcp_id ||
1498            maxGciCompleted > file_ptr.p->m_restored_gcp_id ||
1499            validFlag == 0)
1500   {
1501     jam();
1502     /**
1503      * This is a fairly normal case, but we will still log it to make sure we
1504      * have sufficient information logged if things turns for the worse. In a
1505      * normal restart we should at most have a few of those.
1506      *
1507      * The LCP contained records that was commited in GCI = maxGciWritten,
1508      * we are restoring a GCI which is smaller, this means that the LCP cannot
1509      * be used for restore since we have no UNDO log for main memory
1510      * data.
1511      *
1512      * This is a perfectly normal case although not so common. The LCP was
1513      * completed but had writes in it that rendered it useless. If this is
1514      * the very first LCP for this table it could even be that this is the
1515      * only LCP control file we have. But this can only happen for file 0.
1516      * If it happens for file 1 and we have no useful CTL file in file 0
1517      * then we are smoked since that is not supposed to be possible.
1518      *
1519      * It is also a normal case where we have written LCP control file
1520      * but not yet had time to sync the LSN for the LCP. This is flagged
1521      * by the validFlag not being set in the LCP control file.
1522      */
1523     g_eventLogger->debug("(%u)LCP Control file ok, but not recoverable,"
1524                          " tab(%u,%u), maxGciWritten: %u, restoredGcpId: %u"
1525                          ", CTL file: %u, validFlag: %u",
1526                          instance(),
1527                          file_ptr.p->m_table_id,
1528                          file_ptr.p->m_fragment_id,
1529                          maxGciWritten,
1530                          file_ptr.p->m_restored_gcp_id,
1531                          file_ptr.p->m_ctl_file_no,
1532                          validFlag);
1533     ndbrequire((file_ptr.p->m_ctl_file_no == 0 ||
1534                file_ptr.p->m_used_ctl_file_no != Uint32(~0)) ||
1535                validFlag == 0);
1536     ndbrequire(!file_ptr.p->m_found_not_restorable);
1537     file_ptr.p->m_found_not_restorable = true;
1538     file_ptr.p->m_remove_ctl_file_no = file_ptr.p->m_ctl_file_no;
1539     if (file_ptr.p->m_ctl_file_no == 1 &&
1540         file_ptr.p->m_used_ctl_file_no != Uint32(~0))
1541     {
1542       jam();
1543       calculate_remove_new_data_files(file_ptr);
1544     }
1545   }
1546   else if (file_ptr.p->m_used_ctl_file_no == Uint32(~0))
1547   {
1548     jam();
1549     /**
1550      * First LCP control file that we read, we simply set things up for
1551      * restore. We want the LCP id to check which LCP to use if there is
1552      * one more, also to report back to DBLQH.
1553      */
1554     file_ptr.p->m_max_gci_completed = maxGciCompleted;
1555     file_ptr.p->m_restored_lcp_id = lcpId;
1556     file_ptr.p->m_restored_local_lcp_id = localLcpId;
1557     file_ptr.p->m_max_page_cnt = maxPageCnt;
1558     file_ptr.p->m_max_gci_written = maxGciWritten;
1559     file_ptr.p->m_used_ctl_file_no = file_ptr.p->m_ctl_file_no;
1560     file_ptr.p->m_lcp_ctl_version = lcpCtlVersion;
1561     file_ptr.p->m_rows_in_lcp = rowCount;
1562     if (file_ptr.p->m_ctl_file_no == 1)
1563     {
1564       jam();
1565       DEB_RES(("(%u)Use ctl file: 1, 0 not exist, Lcp(%u,%u), GCI_C: %u,"
1566                " GCI_W: %u, MPC: %u",
1567                 instance(),
1568                 file_ptr.p->m_restored_lcp_id,
1569                 file_ptr.p->m_restored_local_lcp_id,
1570                 file_ptr.p->m_max_gci_completed,
1571                 file_ptr.p->m_max_gci_written,
1572                 file_ptr.p->m_max_page_cnt));
1573     }
1574     if (file_ptr.p->m_found_not_restorable)
1575     {
1576       jam();
1577       calculate_remove_new_data_files(file_ptr);
1578     }
1579   }
1580   else if (file_ptr.p->m_restored_lcp_id > lcpId)
1581   {
1582     /**
1583      * This file is older than the previous one. We will use the previous
1584      * one.
1585      */
1586     jam();
1587     ndbrequire(file_ptr.p->m_ctl_file_no == 1);
1588     file_ptr.p->m_double_lcps_found = true;
1589     file_ptr.p->m_remove_ctl_file_no = 1;
1590     calculate_remove_old_data_files(file_ptr);
1591     DEB_RES(("(%u)Use ctl file: 0, 1 older, Lcp(%u,%u), GCI_C: %u,"
1592              " GCI_W: %u, MPC: %u",
1593               instance(),
1594               file_ptr.p->m_restored_lcp_id,
1595               file_ptr.p->m_restored_local_lcp_id,
1596               file_ptr.p->m_max_gci_completed,
1597               file_ptr.p->m_max_gci_written,
1598               file_ptr.p->m_max_page_cnt));
1599   }
1600   else if (file_ptr.p->m_restored_lcp_id < lcpId ||
1601            (file_ptr.p->m_restored_lcp_id == lcpId &&
1602             file_ptr.p->m_restored_local_lcp_id < localLcpId))
1603   {
1604     jam();
1605     DEB_RES(("(%u)Use ctl file: 1, 0 older, Lcp(%u,%u), GCI_C: %u,"
1606              " GCI_W: %u, MPC: %u",
1607               instance(),
1608               lcpId,
1609               localLcpId,
1610               maxGciCompleted,
1611               maxGciWritten,
1612               maxPageCnt));
1613     ndbrequire(file_ptr.p->m_ctl_file_no == 1);
1614     ndbrequire(file_ptr.p->m_max_gci_completed <= maxGciCompleted);
1615     file_ptr.p->m_used_ctl_file_no = file_ptr.p->m_ctl_file_no;
1616     file_ptr.p->m_double_lcps_found = true;
1617     file_ptr.p->m_max_gci_completed = maxGciCompleted;
1618     file_ptr.p->m_max_gci_written = maxGciWritten;
1619     file_ptr.p->m_restored_lcp_id = lcpId;
1620     file_ptr.p->m_restored_local_lcp_id = localLcpId;
1621     file_ptr.p->m_max_page_cnt = maxPageCnt;
1622     file_ptr.p->m_remove_ctl_file_no = 0;
1623     file_ptr.p->m_lcp_ctl_version = lcpCtlVersion;
1624     file_ptr.p->m_rows_in_lcp = rowCount;
1625     calculate_remove_old_data_files(file_ptr);
1626   }
1627   else
1628   {
1629     /**
1630      * The LCP id of both LCPs were the same, this can happen when the
1631      * node previously crashed in the middle of an LCP and DIH haven't
1632      * finished it, so it starts the next LCP with the same ID.
1633      * In this case we have added one to the Local LCP id to ensure we
1634      * know which is the most recent one.
1635      * So here we come when CTL file 0 is newer.
1636      */
1637     DEB_RES(("(%u)Use ctl file: 0, 1 older, Lcp(%u,%u), GCI_C: %u,"
1638              " GCI_W: %u, MPC: %u",
1639               instance(),
1640               file_ptr.p->m_restored_lcp_id,
1641               file_ptr.p->m_restored_local_lcp_id,
1642               file_ptr.p->m_max_gci_completed,
1643               file_ptr.p->m_max_gci_written,
1644               file_ptr.p->m_max_page_cnt));
1645     ndbrequire(file_ptr.p->m_ctl_file_no == 1);
1646     ndbrequire(file_ptr.p->m_max_gci_completed >= maxGciCompleted);
1647     file_ptr.p->m_used_ctl_file_no = 0;
1648     file_ptr.p->m_double_lcps_found = true;
1649     file_ptr.p->m_remove_ctl_file_no = 1;
1650     calculate_remove_old_data_files(file_ptr);
1651   }
1652   close_file(signal, file_ptr);
1653 }
1654 
1655 void
lcp_drop_old_files(Signal * signal,FilePtr file_ptr)1656 Restore::lcp_drop_old_files(Signal *signal, FilePtr file_ptr)
1657 {
1658   file_ptr.p->m_status = File::REMOVE_LCP_DATA_FILE;
1659   lcp_remove_old_file(signal,
1660                       file_ptr,
1661                       file_ptr.p->m_remove_data_file_no,
1662                       false);
1663 }
1664 
1665 void
close_ctl_file_done(Signal * signal,FilePtr file_ptr)1666 Restore::close_ctl_file_done(Signal *signal, FilePtr file_ptr)
1667 {
1668   if (file_ptr.p->m_ctl_file_no == 0)
1669   {
1670     /**
1671      * We are done with LCP control file 0, continue with LCP control
1672      * file 1 in the same manner.
1673      */
1674     jam();
1675     open_ctl_file(signal, file_ptr, 1);
1676     return;
1677   }
1678   else
1679   {
1680     ndbrequire(file_ptr.p->m_ctl_file_no == 1);
1681     jam();
1682     if (file_ptr.p->m_used_ctl_file_no == Uint32(~0))
1683     {
1684       /**
1685        * Case 5) discovered
1686        * No valid LCP file was found. We create an LCP control file 0
1687        * which is ok and then continue with the restore if there is
1688        * anything to restore.
1689        */
1690       jam();
1691       ndbrequire(file_ptr.p->m_dih_lcp_no == ZNIL);
1692       DEB_RES(("(%u)Case 5 discovered", instance()));
1693       file_ptr.p->m_remove_data_file_no = 0;
1694       file_ptr.p->m_num_remove_data_files = BackupFormat::NDB_MAX_FILES_PER_LCP;
1695       file_ptr.p->m_status = File::REMOVE_LCP_DATA_FILE;
1696       lcp_remove_old_file(signal,
1697                           file_ptr,
1698                           file_ptr.p->m_remove_data_file_no,
1699                           false);
1700       return;
1701     }
1702     if (file_ptr.p->m_double_lcps_found ||
1703         file_ptr.p->m_found_not_restorable)
1704     {
1705       jam();
1706       /**
1707        * Case 3) discovered
1708        * ------------------
1709        * We start by removing potential data and CTL files still there.
1710        */
1711       DEB_RES(("(%u)Case 3 discovered after close", instance()));
1712       if (file_ptr.p->m_num_remove_data_files > 0)
1713       {
1714         jam();
1715         file_ptr.p->m_status = File::REMOVE_LCP_DATA_FILE;
1716         lcp_remove_old_file(signal,
1717                             file_ptr,
1718                             file_ptr.p->m_remove_data_file_no,
1719                             false);
1720       }
1721       else
1722       {
1723         file_ptr.p->m_status = File::REMOVE_LCP_CTL_FILE;
1724         lcp_remove_old_file(signal,
1725                             file_ptr,
1726                             file_ptr.p->m_remove_ctl_file_no,
1727                             true);
1728       }
1729       return;
1730     }
1731     else
1732     {
1733       jam();
1734       /**
1735        * Case 2) discovered
1736        * ------------------
1737        * LCP control file 1 existed alone, we are ready to execute the restore
1738        * now.
1739        */
1740       DEB_RES(("(%u)Case 2 discovered, start_restore_lcp",
1741               instance()));
1742       start_restore_lcp(signal, file_ptr);
1743       return;
1744     }
1745   }
1746 }
1747 
1748 void
execRESTORE_LCP_REQ(Signal * signal)1749 Restore::execRESTORE_LCP_REQ(Signal* signal)
1750 {
1751   jamEntry();
1752 
1753   Uint32 err= 0;
1754   RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtr();
1755   Uint32 senderRef= req->senderRef;
1756   Uint32 senderData= req->senderData;
1757   do
1758   {
1759     FilePtr file_ptr;
1760     if (!m_file_list.seizeFirst(file_ptr))
1761     {
1762       err= RestoreLcpRef::NoFileRecord;
1763       break;
1764     }
1765 
1766     if((err= init_file(req, file_ptr)))
1767     {
1768       break;
1769     }
1770 
1771     signal->theData[0] = NDB_LE_StartReadLCP;
1772     signal->theData[1] = file_ptr.p->m_table_id;
1773     signal->theData[2] = file_ptr.p->m_fragment_id;
1774     sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
1775 
1776     open_ctl_file(signal, file_ptr, 0);
1777     return;
1778   } while(0);
1779 
1780   DEB_RES(("(%u)RESTORE_LCP_REF", instance()));
1781   RestoreLcpRef* ref= (RestoreLcpRef*)signal->getDataPtrSend();
1782   ref->senderData= senderData;
1783   ref->senderRef= reference();
1784   ref->errorCode = err;
1785   sendSignal(senderRef, GSN_RESTORE_LCP_REF, signal,
1786 	     RestoreLcpRef::SignalLength, JBB);
1787 }
1788 
1789 Uint32
init_file(const RestoreLcpReq * req,FilePtr file_ptr)1790 Restore::init_file(const RestoreLcpReq* req, FilePtr file_ptr)
1791 {
1792   new (file_ptr.p) File();
1793   file_ptr.p->m_sender_ref = req->senderRef;
1794   file_ptr.p->m_sender_data = req->senderData;
1795 
1796   file_ptr.p->m_fd = RNIL;
1797   file_ptr.p->m_file_type = BackupFormat::LCP_FILE;
1798   file_ptr.p->m_status = File::READ_CTL_FILES;
1799 
1800   file_ptr.p->m_double_lcps_found = false;
1801   file_ptr.p->m_found_not_restorable = false;
1802   file_ptr.p->m_upgrade_case = true;
1803   file_ptr.p->m_remove_ctl_file_no = Uint32(~0);
1804   file_ptr.p->m_remove_data_file_no = Uint32(~0);
1805   file_ptr.p->m_num_remove_data_files = 0;
1806   file_ptr.p->m_old_max_files = Uint32(~0);
1807 
1808   file_ptr.p->m_dih_lcp_no = req->lcpNo;
1809   file_ptr.p->m_table_id = req->tableId;
1810   file_ptr.p->m_fragment_id = req->fragmentId;
1811   file_ptr.p->m_table_version = RNIL;
1812   file_ptr.p->m_restored_gcp_id = req->restoreGcpId;
1813   file_ptr.p->m_restored_lcp_id = req->lcpId;
1814   file_ptr.p->m_restored_local_lcp_id = 0;
1815   file_ptr.p->m_max_gci_completed = req->maxGciCompleted;
1816   file_ptr.p->m_create_gci = req->createGci;
1817   DEB_RES(("(%u)RESTORE_LCP_REQ tab(%u,%u),"
1818            " GCI: %u, LCP id: %u, LCP no: %u, createGci: %u",
1819            instance(),
1820            req->tableId,
1821            req->fragmentId,
1822            req->restoreGcpId,
1823            req->lcpId,
1824            req->lcpNo,
1825            req->createGci));
1826 
1827   file_ptr.p->m_bytes_left = 0; // Bytes read from FS
1828   file_ptr.p->m_current_page_ptr_i = RNIL;
1829   file_ptr.p->m_current_page_pos = 0;
1830   file_ptr.p->m_current_page_index = 0;
1831   file_ptr.p->m_current_file_page = 0;
1832   file_ptr.p->m_outstanding_reads = 0;
1833   file_ptr.p->m_outstanding_operations = 0;
1834 
1835   file_ptr.p->m_rows_in_lcp = 0;
1836   file_ptr.p->m_rows_restored = 0;
1837   file_ptr.p->m_rows_restored_insert = 0;
1838   file_ptr.p->m_rows_restored_delete = 0;
1839   file_ptr.p->m_rows_restored_delete_failed = 0;
1840   file_ptr.p->m_rows_restored_delete_page = 0;
1841   file_ptr.p->m_rows_restored_write = 0;
1842   file_ptr.p->m_ignored_rows = 0;
1843   file_ptr.p->m_row_operations = 0;
1844 
1845   file_ptr.p->m_file_id = Uint32(~0);
1846   file_ptr.p->m_ctl_file_no = Uint32(~0);
1847   file_ptr.p->m_used_ctl_file_no = Uint32(~0);
1848   file_ptr.p->m_current_file_index = 0;
1849   file_ptr.p->m_num_files = 0;
1850   file_ptr.p->m_max_parts = BackupFormat::NDB_MAX_LCP_PARTS;
1851   file_ptr.p->m_max_files = BackupFormat::NDB_MAX_LCP_FILES;
1852   file_ptr.p->m_restore_start_time = NdbTick_CurrentMillisecond();
1853   Uint32 err = seize_file(file_ptr);
1854   return err;
1855 }
1856 
1857 Uint32
seize_file(FilePtr file_ptr)1858 Restore::seize_file(FilePtr file_ptr)
1859 {
1860   LocalList pages(m_databuffer_pool, file_ptr.p->m_pages);
1861 
1862   ndbassert(pages.isEmpty());
1863   pages.release();
1864 
1865   Uint32 buf_size= PAGES*GLOBAL_PAGE_SIZE;
1866   Uint32 page_count= (buf_size+GLOBAL_PAGE_SIZE-1)/GLOBAL_PAGE_SIZE;
1867   if(!pages.seize(page_count))
1868   {
1869     return RestoreLcpRef::OutOfDataBuffer;
1870   }
1871 
1872   List::Iterator it;
1873   for(pages.first(it); !it.isNull(); pages.next(it))
1874   {
1875     * it.data = RNIL;
1876   }
1877 
1878   Uint32 err= 0;
1879   for(pages.first(it); !it.isNull(); pages.next(it))
1880   {
1881     Ptr<GlobalPage> page_ptr;
1882     if(!m_global_page_pool.seize(page_ptr))
1883     {
1884       err= RestoreLcpRef::OutOfReadBufferPages;
1885       break;
1886     }
1887     * it.data = page_ptr.i;
1888   }
1889 
1890   if(err)
1891   {
1892     for(pages.first(it); !it.isNull(); pages.next(it))
1893     {
1894       if(* it.data == RNIL)
1895 	break;
1896       m_global_page_pool.release(* it.data);
1897     }
1898   }
1899   else
1900   {
1901     pages.first(it);
1902     file_ptr.p->m_current_page_ptr_i = *it.data;
1903   }
1904   return err;
1905 }
1906 
1907 void
release_file(FilePtr file_ptr,bool statistics)1908 Restore::release_file(FilePtr file_ptr, bool statistics)
1909 {
1910   LocalList pages(m_databuffer_pool, file_ptr.p->m_pages);
1911 
1912   List::Iterator it;
1913   for (pages.first(it); !it.isNull(); pages.next(it))
1914   {
1915     if (* it.data == RNIL)
1916     {
1917       jam();
1918       continue;
1919     }
1920     m_global_page_pool.release(* it.data);
1921   }
1922 
1923   if (statistics)
1924   {
1925     Uint64 millis = NdbTick_CurrentMillisecond() -
1926                    file_ptr.p->m_restore_start_time;
1927     if (millis == 0)
1928       millis = 1;
1929     Uint64 rows_per_sec = (file_ptr.p->m_row_operations *
1930                            Uint64(1000)) / millis;
1931 
1932 
1933     g_eventLogger->info("LDM instance %u: Restored T%dF%u LCP %llu rows, "
1934                         "%llu row operations, "
1935                         "%llu millis, %llu row operations/sec)",
1936                         instance(),
1937                         file_ptr.p->m_table_id,
1938                         file_ptr.p->m_fragment_id,
1939                         file_ptr.p->m_rows_restored,
1940                         file_ptr.p->m_row_operations,
1941                         millis,
1942                         rows_per_sec);
1943 
1944 
1945     m_millis_spent+= millis;
1946     m_rows_restored+= file_ptr.p->m_rows_restored;
1947     m_frags_restored++;
1948 
1949     DEB_RES_STAT(("(%u)Restore tab(%u,%u): file_index: %u"
1950                   ", inserts: %llu, writes: %llu"
1951                   ", deletes: %llu, delete_pages: %llu"
1952                   ", delete_failed: %llu"
1953                   ", ignored rows: %llu",
1954                   instance(),
1955                   file_ptr.p->m_table_id,
1956                   file_ptr.p->m_fragment_id,
1957                   file_ptr.p->m_current_file_index - 1,
1958                   file_ptr.p->m_rows_restored_insert,
1959                   file_ptr.p->m_rows_restored_write,
1960                   file_ptr.p->m_rows_restored_delete,
1961                   file_ptr.p->m_rows_restored_delete_page,
1962                   file_ptr.p->m_rows_restored_delete_failed,
1963                   file_ptr.p->m_ignored_rows));
1964   }
1965   else
1966   {
1967     DEB_RES_STAT_EXTRA((
1968                   "(%u)Restore tab(%u,%u): file_index: %u"
1969                   ", inserts: %llu, writes: %llu"
1970                   ", deletes: %llu, delete_pages: %llu"
1971                   ", delete_failed: %llu"
1972                   ", ignored rows: %llu",
1973                   instance(),
1974                   file_ptr.p->m_table_id,
1975                   file_ptr.p->m_fragment_id,
1976                   file_ptr.p->m_current_file_index - 1,
1977                   file_ptr.p->m_rows_restored_insert,
1978                   file_ptr.p->m_rows_restored_write,
1979                   file_ptr.p->m_rows_restored_delete,
1980                   file_ptr.p->m_rows_restored_delete_page,
1981                   file_ptr.p->m_rows_restored_delete_failed,
1982                   file_ptr.p->m_ignored_rows));
1983   }
1984 
1985   pages.release();
1986   if (statistics)
1987   {
1988     jam();
1989     m_file_list.release(file_ptr);
1990   }
1991 }
1992 
1993 void
prepare_parts_for_execution(Signal * signal,FilePtr file_ptr)1994 Restore::prepare_parts_for_execution(Signal *signal, FilePtr file_ptr)
1995 {
1996   ndbrequire(file_ptr.p->m_used_ctl_file_no < 2);
1997   BackupFormat::LCPCtlFile *lcpCtlFilePtr = (BackupFormat::LCPCtlFile*)
1998     &m_lcp_ctl_file_data[file_ptr.p->m_used_ctl_file_no][0];
1999 
2000   if (file_ptr.p->m_max_parts == 1 &&
2001       file_ptr.p->m_num_files == 1)
2002   {
2003     /**
2004      * UPGRADE CASE, everything is in one file.
2005      */
2006     jam();
2007     file_ptr.p->m_part_state[0] = File::PART_ALL_ROWS;
2008     return;
2009   }
2010   /**
2011    * We set up the part state array in 3 steps.
2012    * The default state is that all parts receives all changes.
2013    *
2014    * For the current file index we have recorded in the LCP control file
2015    * all the parts where all rows exists, so these parts will all have the
2016    * state PART_ALL_ROWS.
2017    *
2018    * Lastly we will go backwards from the last LCP data file to restore and
2019    * set all parts that will be fully restored in this LCP data file to be
2020    * ignored by earlier LCP data files.
2021    *
2022    * We ensure that we have consistent data by ensuring that we don't have
2023    * any files set to PART_IGNORED that was in the array to receive all rows.
2024    */
2025   for (Uint32 i = 0; i < file_ptr.p->m_max_parts; i++)
2026   {
2027     file_ptr.p->m_part_state[i] = File::PART_ALL_CHANGES;
2028   }
2029 
2030   {
2031     struct BackupFormat::PartPair partPair =
2032       lcpCtlFilePtr->partPairs[file_ptr.p->m_current_file_index];
2033 
2034     DEB_RES_PARTS((
2035              "(%u)Prepare ALL parts[%u] = (%u,%u)",
2036              instance(),
2037              file_ptr.p->m_current_file_index,
2038              partPair.startPart,
2039              partPair.numParts));
2040 
2041     Uint32 part_id = partPair.startPart;
2042     for (Uint32 i = 0; i < partPair.numParts; i++)
2043     {
2044       file_ptr.p->m_part_state[part_id] = File::PART_ALL_ROWS;
2045       part_id++;
2046       if (part_id == file_ptr.p->m_max_parts)
2047         part_id = 0;
2048     }
2049   }
2050 
2051   for (Uint32 i = file_ptr.p->m_current_file_index + 1;
2052        i < lcpCtlFilePtr->NumPartPairs;
2053        i++)
2054   {
2055     jam();
2056     struct BackupFormat::PartPair partPair =
2057       lcpCtlFilePtr->partPairs[i];
2058 
2059     DEB_RES_PARTS((
2060              "(%u)Prepare IGNORE parts[%u] = (%u,%u)",
2061              instance(),
2062              i,
2063              partPair.startPart,
2064              partPair.numParts));
2065 
2066     Uint32 part_id = partPair.startPart;
2067     for (Uint32 j = 0; j < partPair.numParts; j++)
2068     {
2069       ndbrequire(file_ptr.p->m_part_state[part_id] == File::PART_ALL_CHANGES);
2070       file_ptr.p->m_part_state[part_id] = File::PART_IGNORED;
2071       part_id++;
2072       if (part_id == file_ptr.p->m_max_parts)
2073         part_id = 0;
2074     }
2075   }
2076 }
2077 
2078 void
start_restore_lcp_upgrade(Signal * signal,FilePtr file_ptr)2079 Restore::start_restore_lcp_upgrade(Signal *signal, FilePtr file_ptr)
2080 {
2081   /**
2082    * In this an LCP existed, but no valid LCP control file, this can
2083    * only occur if the LCP was written by older versions of MySQL
2084    * Cluster.
2085    */
2086   file_ptr.p->m_current_file_index = 0;
2087   file_ptr.p->m_num_files = 1;
2088   file_ptr.p->m_max_parts = 1;
2089   file_ptr.p->m_max_files = 1;
2090   file_ptr.p->m_file_id = file_ptr.p->m_dih_lcp_no;
2091   open_data_file(signal, file_ptr);
2092 }
2093 
2094 void
step_file_number_back(FilePtr file_ptr,Uint32 steps)2095 Restore::step_file_number_back(FilePtr file_ptr, Uint32 steps)
2096 {
2097   for (Uint32 i = 0; i < steps; i++)
2098   {
2099     if (file_ptr.p->m_file_id == 0)
2100     {
2101       jam();
2102       file_ptr.p->m_file_id = file_ptr.p->m_max_files - 1;
2103     }
2104     else
2105     {
2106       jam();
2107       file_ptr.p->m_file_id--;
2108     }
2109   }
2110 }
2111 
2112 void
step_file_number_forward(FilePtr file_ptr)2113 Restore::step_file_number_forward(FilePtr file_ptr)
2114 {
2115   file_ptr.p->m_file_id++;
2116   if (file_ptr.p->m_file_id == file_ptr.p->m_max_files)
2117   {
2118     jam();
2119     file_ptr.p->m_file_id = 0;
2120   }
2121 }
2122 
2123 void
start_restore_lcp(Signal * signal,FilePtr file_ptr)2124 Restore::start_restore_lcp(Signal *signal, FilePtr file_ptr)
2125 {
2126   ndbrequire(file_ptr.p->m_used_ctl_file_no < 2);
2127   BackupFormat::LCPCtlFile *lcpCtlFilePtr = (BackupFormat::LCPCtlFile*)
2128     &m_lcp_ctl_file_data[file_ptr.p->m_used_ctl_file_no][0];
2129 
2130   /**
2131    * Initialise a few variables before starting the first data file
2132    * restore.
2133    */
2134   file_ptr.p->m_current_file_index = 0;
2135   file_ptr.p->m_num_files = lcpCtlFilePtr->NumPartPairs;
2136   file_ptr.p->m_max_parts = lcpCtlFilePtr->MaxPartPairs;
2137   file_ptr.p->m_max_files = lcpCtlFilePtr->MaxNumberDataFiles;
2138   file_ptr.p->m_file_id = lcpCtlFilePtr->LastDataFileNumber;
2139   file_ptr.p->m_table_version = lcpCtlFilePtr->CreateTableVersion;
2140   DEB_RES_OPEN(("(%u) tab(%u,%u), num_files: %u, last_file: %u",
2141                 instance(),
2142                 file_ptr.p->m_table_id,
2143                 file_ptr.p->m_fragment_id,
2144                 file_ptr.p->m_num_files,
2145                 file_ptr.p->m_file_id));
2146   ndbrequire(file_ptr.p->m_num_files > 0);
2147   ndbrequire(file_ptr.p->m_num_files <= BackupFormat::NDB_MAX_LCP_PARTS);
2148   ndbrequire(file_ptr.p->m_file_id <= BackupFormat::NDB_MAX_LCP_FILES);
2149   step_file_number_back(file_ptr, file_ptr.p->m_num_files - 1);
2150   open_data_file(signal, file_ptr);
2151 }
2152 
2153 void
open_data_file(Signal * signal,FilePtr file_ptr)2154 Restore::open_data_file(Signal* signal, FilePtr file_ptr)
2155 {
2156   prepare_parts_for_execution(signal, file_ptr);
2157   file_ptr.p->m_status = File::FIRST_READ;
2158 
2159   FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
2160   req->userReference = reference();
2161   req->fileFlags = FsOpenReq::OM_READONLY | FsOpenReq::OM_GZ;
2162   req->userPointer = file_ptr.i;
2163 
2164   DEB_RES_OPEN(("(%u)tab(%u,%u) open_data_file data file number = %u",
2165                 instance(),
2166                 file_ptr.p->m_table_id,
2167                 file_ptr.p->m_fragment_id,
2168                 file_ptr.p->m_file_id));
2169 
2170   FsOpenReq::setVersion(req->fileNumber, 5);
2171   FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
2172   FsOpenReq::v5_setLcpNo(req->fileNumber, file_ptr.p->m_file_id);
2173   FsOpenReq::v5_setTableId(req->fileNumber, file_ptr.p->m_table_id);
2174   FsOpenReq::v5_setFragmentId(req->fileNumber, file_ptr.p->m_fragment_id);
2175   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
2176 }
2177 
2178 void
execFSOPENREF(Signal * signal)2179 Restore::execFSOPENREF(Signal* signal)
2180 {
2181   FsRef* ref= (FsRef*)signal->getDataPtr();
2182   FilePtr file_ptr;
2183   jamEntry();
2184   m_file_pool.getPtr(file_ptr, ref->userPointer);
2185 
2186   if (file_ptr.p->m_status == File::READ_CTL_FILES)
2187   {
2188     jam();
2189     open_ctl_file_done_ref(signal, file_ptr);
2190     return;
2191   }
2192   else if (file_ptr.p->m_status == File::CREATE_CTL_FILE)
2193   {
2194     ndbabort();
2195   }
2196   ndbrequire(file_ptr.p->m_status == File::FIRST_READ);
2197 
2198   Uint32 errCode= ref->errorCode;
2199   Uint32 osError= ref->osErrorCode;
2200 
2201   RestoreLcpRef* rep= (RestoreLcpRef*)signal->getDataPtrSend();
2202   rep->senderData= file_ptr.p->m_sender_data;
2203   rep->errorCode = errCode;
2204   rep->extra[0] = osError;
2205   sendSignal(file_ptr.p->m_sender_ref, GSN_RESTORE_LCP_REF, signal,
2206              RestoreLcpRef::SignalLength+1, JBB);
2207   release_file(file_ptr, true);
2208 }
2209 
2210 void
execFSOPENCONF(Signal * signal)2211 Restore::execFSOPENCONF(Signal* signal)
2212 {
2213   jamEntry();
2214   FilePtr file_ptr;
2215   FsConf* conf= (FsConf*)signal->getDataPtr();
2216   m_file_pool.getPtr(file_ptr, conf->userPointer);
2217 
2218   file_ptr.p->m_fd = conf->filePointer;
2219 
2220   if (file_ptr.p->m_status == File::READ_CTL_FILES)
2221   {
2222     jam();
2223     open_ctl_file_done_conf(signal, file_ptr);
2224     return;
2225   }
2226   else if (file_ptr.p->m_status == File::CREATE_CTL_FILE)
2227   {
2228     jam();
2229     lcp_create_ctl_done_open(signal, file_ptr);
2230     return;
2231   }
2232   ndbrequire(file_ptr.p->m_status == File::FIRST_READ);
2233 
2234   /**
2235    * Start thread's
2236    */
2237 
2238   ndbrequire((file_ptr.p->m_status & File::FILE_THREAD_RUNNING) == 0);
2239   ndbrequire((file_ptr.p->m_status & File::RESTORE_THREAD_RUNNING) == 0);
2240   file_ptr.p->m_status |= File::FILE_THREAD_RUNNING;
2241   signal->theData[0] = RestoreContinueB::READ_FILE;
2242   signal->theData[1] = file_ptr.i;
2243   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2244 
2245   file_ptr.p->m_status |= File::RESTORE_THREAD_RUNNING;
2246   signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
2247   signal->theData[1] = file_ptr.i;
2248   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2249 }
2250 
2251 void
restore_next(Signal * signal,FilePtr file_ptr)2252 Restore::restore_next(Signal* signal, FilePtr file_ptr)
2253 {
2254   Uint32 *data, len= 0;
2255   Uint32 status = file_ptr.p->m_status;
2256   Uint32 page_count = file_ptr.p->m_pages.getSize();
2257   BackupFormat::RecordType header_type = BackupFormat::INSERT_TYPE;
2258   do
2259   {
2260     Uint32 left= file_ptr.p->m_bytes_left;
2261     if (left < 8)
2262     {
2263       jam();
2264       /**
2265        * Not enough bytes to read header
2266        */
2267       break;
2268     }
2269     Ptr<GlobalPage> page_ptr(0,0), next_page_ptr(0,0);
2270     m_global_page_pool.getPtr(page_ptr, file_ptr.p->m_current_page_ptr_i);
2271     List::Iterator it;
2272 
2273     Uint32 pos= file_ptr.p->m_current_page_pos;
2274     if(status & File::READING_RECORDS)
2275     {
2276       jam();
2277       /**
2278        * We are reading records
2279        */
2280       len= ntohl(* (page_ptr.p->data + pos)) + 1;
2281       Uint32 type = len >> 16;
2282       len &= 0xFFFF;
2283       ndbrequire(len < GLOBAL_PAGE_SIZE_WORDS);
2284       ndbrequire(header_type < BackupFormat::END_TYPE);
2285       header_type = (BackupFormat::RecordType)type;
2286     }
2287     else
2288     {
2289       jam();
2290       /**
2291        * Section length is in 2 word
2292        */
2293       if(pos + 1 == GLOBAL_PAGE_SIZE_WORDS)
2294       {
2295         jam();
2296 	/**
2297 	 * But that's stored on next page...
2298 	 *   and since we have atleast 8 bytes left in buffer
2299 	 *   we can be sure that that's in buffer
2300 	 */
2301 	LocalList pages(m_databuffer_pool, file_ptr.p->m_pages);
2302 	Uint32 next_page = file_ptr.p->m_current_page_index + 1;
2303 	pages.position(it, next_page % page_count);
2304 	m_global_page_pool.getPtr(next_page_ptr, * it.data);
2305 	len= ntohl(* next_page_ptr.p->data);
2306       }
2307       else
2308       {
2309         jam();
2310 	len= ntohl(* (page_ptr.p->data + pos + 1));
2311       }
2312     }
2313 
2314     if (file_ptr.p->m_status & File::FIRST_READ)
2315     {
2316       jam();
2317       len= 3;
2318       file_ptr.p->m_status &= ~(Uint32)File::FIRST_READ;
2319     }
2320 
2321     if (4 * len > left)
2322     {
2323       jam();
2324 
2325       /**
2326        * Not enought bytes to read "record"
2327        */
2328       if (unlikely((status & File:: FILE_THREAD_RUNNING) == 0))
2329       {
2330         crash_during_restore(file_ptr, __LINE__, 0);
2331       }
2332       len= 0;
2333       break;
2334     }
2335 
2336     /**
2337      * Entire record is in buffer
2338      */
2339 
2340     if(pos + len >= GLOBAL_PAGE_SIZE_WORDS)
2341     {
2342       jam();
2343       /**
2344        * But it's split over pages
2345        */
2346       if(next_page_ptr.p == 0)
2347       {
2348 	LocalList pages(m_databuffer_pool, file_ptr.p->m_pages);
2349 	Uint32 next_page = file_ptr.p->m_current_page_index + 1;
2350 	pages.position(it, next_page % page_count);
2351 	m_global_page_pool.getPtr(next_page_ptr, * it.data);
2352       }
2353       file_ptr.p->m_current_page_ptr_i = next_page_ptr.i;
2354       file_ptr.p->m_current_page_pos = (pos + len) - GLOBAL_PAGE_SIZE_WORDS;
2355       file_ptr.p->m_current_page_index =
2356 	(file_ptr.p->m_current_page_index + 1) % page_count;
2357 
2358       if (len <= GLOBAL_PAGE_SIZE_WORDS)
2359       {
2360         jam();
2361         Uint32 first = (GLOBAL_PAGE_SIZE_WORDS - pos);
2362         // wl4391_todo removing valgrind overlap warning for now
2363         memmove(page_ptr.p, page_ptr.p->data+pos, 4 * first);
2364         memcpy(page_ptr.p->data+first, next_page_ptr.p, 4 * (len - first));
2365         data= page_ptr.p->data;
2366       }
2367       else
2368       {
2369         jam();
2370         /**
2371          * A table definition can be larger than one page...
2372          * when that happens copy it out to side buffer
2373          *
2374          * First copy part belonging to page_ptr
2375          * Then copy full middle pages (moving forward in page-list)
2376          * Last copy last part
2377          */
2378         Uint32 save = len;
2379         assert(len <= NDB_ARRAY_SIZE(m_table_buf));
2380         Uint32 * dst = m_table_buf;
2381 
2382         /**
2383          * First
2384          */
2385         Uint32 first = (GLOBAL_PAGE_SIZE_WORDS - pos);
2386         memcpy(dst, page_ptr.p->data+pos, 4 * first);
2387         len -= first;
2388         dst += first;
2389 
2390         /**
2391          * Middle
2392          */
2393         while (len > GLOBAL_PAGE_SIZE_WORDS)
2394         {
2395           jam();
2396           memcpy(dst, next_page_ptr.p, 4 * GLOBAL_PAGE_SIZE_WORDS);
2397           len -= GLOBAL_PAGE_SIZE_WORDS;
2398           dst += GLOBAL_PAGE_SIZE_WORDS;
2399 
2400           {
2401             LocalList pages(m_databuffer_pool, file_ptr.p->m_pages);
2402             Uint32 next_page = (file_ptr.p->m_current_page_index + 1) % page_count;
2403             pages.position(it, next_page % page_count);
2404             m_global_page_pool.getPtr(next_page_ptr, * it.data);
2405 
2406             file_ptr.p->m_current_page_ptr_i = next_page_ptr.i;
2407             file_ptr.p->m_current_page_index = next_page;
2408           }
2409         }
2410 
2411         /**
2412          * last
2413          */
2414         memcpy(dst, next_page_ptr.p, 4 * len);
2415         file_ptr.p->m_current_page_pos = len;
2416 
2417         /**
2418          * Set pointer and len
2419          */
2420         len = save;
2421         data = m_table_buf;
2422       }
2423     }
2424     else
2425     {
2426       file_ptr.p->m_current_page_pos = pos + len;
2427       data= page_ptr.p->data+pos;
2428     }
2429 
2430     file_ptr.p->m_bytes_left -= 4*len;
2431 
2432     if(status & File::READING_RECORDS)
2433     {
2434       if(len == 1)
2435       {
2436 	file_ptr.p->m_status = status & ~(Uint32)File::READING_RECORDS;
2437       }
2438       else
2439       {
2440 	parse_record(signal, file_ptr, data, len, header_type);
2441       }
2442     }
2443     else
2444     {
2445       switch(ntohl(* data)){
2446       case BackupFormat::FILE_HEADER:
2447 	parse_file_header(signal, file_ptr, data-3, len+3);
2448 	break;
2449       case BackupFormat::FRAGMENT_HEADER:
2450 	file_ptr.p->m_status = status | File::READING_RECORDS;
2451 	parse_fragment_header(signal, file_ptr, data, len);
2452 	break;
2453       case BackupFormat::FRAGMENT_FOOTER:
2454 	parse_fragment_footer(signal, file_ptr, data, len);
2455 	break;
2456       case BackupFormat::TABLE_LIST:
2457 	parse_table_list(signal, file_ptr, data, len);
2458 	break;
2459       case BackupFormat::TABLE_DESCRIPTION:
2460 	parse_table_description(signal, file_ptr, data, len);
2461 	break;
2462       case BackupFormat::GCP_ENTRY:
2463 	parse_gcp_entry(signal, file_ptr, data, len);
2464 	break;
2465       case BackupFormat::EMPTY_ENTRY:
2466         // skip
2467         break;
2468       case 0x4e444242: // 'NDBB'
2469 	if (check_file_version(signal, ntohl(* (data+2))) == 0)
2470 	{
2471 	  break;
2472 	}
2473         // Fall through - on bad version
2474       default:
2475 	parse_error(signal, file_ptr, __LINE__, ntohl(* data));
2476       }
2477     }
2478   } while(0);
2479 
2480   if(file_ptr.p->m_bytes_left == 0 && status & File::FILE_EOF)
2481   {
2482     file_ptr.p->m_status &= ~(Uint32)File::RESTORE_THREAD_RUNNING;
2483     /**
2484      * File is finished...
2485      */
2486     close_file(signal, file_ptr);
2487     return;
2488   }
2489 
2490   /**
2491    * We send an immediate signal to continue the restore, at times this
2492    * could lead to burning some extra CPU since we might still wait for
2493    * input from the disk reading. This code is however only executed
2494    * as part of restarts, so it should be ok to spend some extra CPU
2495    * to ensure that restarts are quick.
2496    */
2497   signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
2498   signal->theData[1] = file_ptr.i;
2499   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2500 }
2501 
2502 void
read_data_file(Signal * signal,FilePtr file_ptr)2503 Restore::read_data_file(Signal* signal, FilePtr file_ptr)
2504 {
2505   Uint32 left= file_ptr.p->m_bytes_left;
2506   Uint32 page_count = file_ptr.p->m_pages.getSize();
2507   Uint32 free= GLOBAL_PAGE_SIZE * page_count - left;
2508   Uint32 read_count= free/GLOBAL_PAGE_SIZE;
2509 
2510   if(read_count <= file_ptr.p->m_outstanding_reads)
2511   {
2512     signal->theData[0] = RestoreContinueB::READ_FILE;
2513     signal->theData[1] = file_ptr.i;
2514     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2515     return;
2516   }
2517 
2518   read_count -= file_ptr.p->m_outstanding_reads;
2519   Uint32 curr_page= file_ptr.p->m_current_page_index;
2520   LocalList pages(m_databuffer_pool, file_ptr.p->m_pages);
2521 
2522   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2523   req->filePointer = file_ptr.p->m_fd;
2524   req->userReference = reference();
2525   req->userPointer = file_ptr.i;
2526   req->numberOfPages = 1;
2527   req->operationFlag = 0;
2528   FsReadWriteReq::setFormatFlag(req->operationFlag,
2529 				FsReadWriteReq::fsFormatGlobalPage);
2530   FsReadWriteReq::setPartialReadFlag(req->operationFlag, 1);
2531 
2532   Uint32 start= (curr_page + page_count - read_count) % page_count;
2533 
2534   List::Iterator it;
2535   pages.position(it, start);
2536   do
2537   {
2538     file_ptr.p->m_outstanding_reads++;
2539     req->varIndex = file_ptr.p->m_current_file_page++;
2540     req->data.pageData[0] = *it.data;
2541     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2542 	       FsReadWriteReq::FixedLength + 1, JBA);
2543 
2544     start++;
2545     if(start == page_count)
2546     {
2547       start= 0;
2548       pages.position(it, start);
2549     }
2550     else
2551     {
2552       pages.next(it);
2553     }
2554   } while(start != curr_page);
2555 }
2556 
2557 void
execFSREADREF(Signal * signal)2558 Restore::execFSREADREF(Signal * signal)
2559 {
2560   jamEntry();
2561   FilePtr file_ptr;
2562   FsRef* ref= (FsRef*)signal->getDataPtr();
2563   m_file_pool.getPtr(file_ptr, ref->userPointer);
2564   if (file_ptr.p->m_status == File::READ_CTL_FILES)
2565   {
2566     jam();
2567     read_ctl_file_done(signal, file_ptr, 0);
2568     return;
2569   }
2570   SimulatedBlock::execFSREADREF(signal);
2571   ndbabort();
2572 }
2573 
2574 void
execFSREADCONF(Signal * signal)2575 Restore::execFSREADCONF(Signal * signal)
2576 {
2577   jamEntry();
2578   FilePtr file_ptr;
2579   FsConf* conf= (FsConf*)signal->getDataPtr();
2580   m_file_pool.getPtr(file_ptr, conf->userPointer);
2581 
2582   if (file_ptr.p->m_status == File::READ_CTL_FILES)
2583   {
2584     jam();
2585     read_ctl_file_done(signal, file_ptr, conf->bytes_read);
2586     return;
2587   }
2588   file_ptr.p->m_bytes_left += conf->bytes_read;
2589 
2590   ndbassert(file_ptr.p->m_outstanding_reads);
2591   file_ptr.p->m_outstanding_reads--;
2592 
2593   if (file_ptr.p->m_outstanding_reads == 0)
2594   {
2595     ndbassert(conf->bytes_read <= GLOBAL_PAGE_SIZE);
2596     if(conf->bytes_read == GLOBAL_PAGE_SIZE)
2597     {
2598       jam();
2599       read_data_file(signal, file_ptr);
2600     }
2601     else
2602     {
2603       jam();
2604       file_ptr.p->m_status |= File::FILE_EOF;
2605       file_ptr.p->m_status &= ~(Uint32)File::FILE_THREAD_RUNNING;
2606     }
2607   }
2608 }
2609 
2610 void
close_file(Signal * signal,FilePtr file_ptr,bool remove_flag)2611 Restore::close_file(Signal* signal, FilePtr file_ptr, bool remove_flag)
2612 {
2613   FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
2614   req->filePointer = file_ptr.p->m_fd;
2615   req->userPointer = file_ptr.i;
2616   req->userReference = reference();
2617   req->fileFlag = 0;
2618   if (remove_flag)
2619   {
2620     jam();
2621     FsCloseReq::setRemoveFileFlag(req->fileFlag, 1);
2622   }
2623   sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA);
2624 }
2625 
2626 void
execFSCLOSEREF(Signal * signal)2627 Restore::execFSCLOSEREF(Signal * signal)
2628 {
2629   jamEntry();
2630   SimulatedBlock::execFSCLOSEREF(signal);
2631   ndbabort();
2632 }
2633 
2634 void
execFSCLOSECONF(Signal * signal)2635 Restore::execFSCLOSECONF(Signal * signal)
2636 {
2637   jamEntry();
2638   FilePtr file_ptr;
2639   FsConf* conf= (FsConf*)signal->getDataPtr();
2640   m_file_pool.getPtr(file_ptr, conf->userPointer);
2641 
2642   file_ptr.p->m_fd = RNIL;
2643 
2644   if (file_ptr.p->m_status == File::READ_CTL_FILES)
2645   {
2646     jam();
2647     close_ctl_file_done(signal, file_ptr);
2648     return;
2649   }
2650   else if (file_ptr.p->m_status == File::CREATE_CTL_FILE)
2651   {
2652     jam();
2653     lcp_create_ctl_done_close(signal, file_ptr);
2654     return;
2655   }
2656   else if (file_ptr.p->m_status == File::DROP_OLD_FILES)
2657   {
2658     jam();
2659     lcp_drop_old_files(signal, file_ptr);
2660     return;
2661   }
2662 
2663   if(file_ptr.p->m_outstanding_operations == 0)
2664   {
2665     jam();
2666     restore_lcp_conf_after_execute(signal, file_ptr);
2667     return;
2668   }
2669 }
2670 
2671 void
parse_file_header(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)2672 Restore::parse_file_header(Signal* signal,
2673 			   FilePtr file_ptr,
2674 			   const Uint32* data, Uint32 len)
2675 {
2676   const BackupFormat::FileHeader* fh= (BackupFormat::FileHeader*)data;
2677 
2678   if(memcmp(fh->Magic, "NDBBCKUP", 8) != 0)
2679   {
2680     parse_error(signal, file_ptr, __LINE__, *data);
2681     return;
2682   }
2683 
2684   file_ptr.p->m_lcp_version = ntohl(fh->BackupVersion);
2685   if (check_file_version(signal, ntohl(fh->BackupVersion)))
2686   {
2687     parse_error(signal, file_ptr, __LINE__, ntohl(fh->NdbVersion));
2688     return;
2689   }
2690   ndbassert(ntohl(fh->SectionType) == BackupFormat::FILE_HEADER);
2691 
2692   if(ntohl(fh->SectionLength) != len-3)
2693   {
2694     parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
2695     return;
2696   }
2697 
2698   if(ntohl(fh->FileType) != BackupFormat::LCP_FILE)
2699   {
2700     parse_error(signal, file_ptr, __LINE__, ntohl(fh->FileType));
2701     return;
2702   }
2703 
2704   if(fh->ByteOrder != 0x12345678)
2705   {
2706     parse_error(signal, file_ptr, __LINE__, fh->ByteOrder);
2707     return;
2708   }
2709 }
2710 
2711 void
parse_table_list(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)2712 Restore::parse_table_list(Signal* signal, FilePtr file_ptr,
2713 			  const Uint32 *data, Uint32 len)
2714 {
2715   const BackupFormat::CtlFile::TableList* fh=
2716     (BackupFormat::CtlFile::TableList*)data;
2717 
2718   if(ntohl(fh->TableIds[0]) != file_ptr.p->m_table_id)
2719   {
2720     parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableIds[0]));
2721     return;
2722   }
2723 }
2724 
2725 void
parse_table_description(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)2726 Restore::parse_table_description(Signal* signal, FilePtr file_ptr,
2727 				 const Uint32 *data, Uint32 len)
2728 {
2729   const BackupFormat::CtlFile::TableDescription* fh=
2730     (BackupFormat::CtlFile::TableDescription*)data;
2731 
2732   SimplePropertiesLinearReader it(fh->DictTabInfo, len);
2733   it.first();
2734 
2735   DictTabInfo::Table tmpTab; tmpTab.init();
2736   SimpleProperties::UnpackStatus stat;
2737   stat = SimpleProperties::unpack(it, &tmpTab,
2738 				  DictTabInfo::TableMapping,
2739 				  DictTabInfo::TableMappingSize);
2740   ndbrequire(stat == SimpleProperties::Break);
2741 
2742   if(tmpTab.TableId != file_ptr.p->m_table_id)
2743   {
2744     parse_error(signal, file_ptr, __LINE__, tmpTab.TableId);
2745     return;
2746   }
2747 
2748   file_ptr.p->m_table_version = tmpTab.TableVersion;
2749 }
2750 
2751 void
parse_fragment_header(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)2752 Restore::parse_fragment_header(Signal* signal, FilePtr file_ptr,
2753 			       const Uint32 *data, Uint32 len)
2754 {
2755   const BackupFormat::DataFile::FragmentHeader* fh=
2756     (BackupFormat::DataFile::FragmentHeader*)data;
2757   if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
2758   {
2759     parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
2760     return;
2761   }
2762 
2763   if (ntohl(fh->ChecksumType) != 0)
2764   {
2765     parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
2766     return;
2767   }
2768 
2769   file_ptr.p->m_fragment_id = ntohl(fh->FragmentNo);
2770   if (file_ptr.p->m_current_file_index == 0)
2771   {
2772     jam();
2773     /**
2774      * Temporary reset DBTUP's #disk attributes on table
2775      * Already done when coming for file not being the first.
2776      */
2777     c_tup->start_restore_lcp(file_ptr.p->m_table_id, file_ptr.p->m_fragment_id);
2778   }
2779 }
2780 
2781 const char*
get_state_string(Uint32 part_state)2782 Restore::get_state_string(Uint32 part_state)
2783 {
2784   switch (part_state)
2785   {
2786     case File::PART_IGNORED:
2787       return "IGNORED";
2788     case File::PART_ALL_ROWS:
2789       return "ALL ROWS";
2790     case File::PART_ALL_CHANGES:
2791       return "CHANGED ROWS";
2792     default:
2793       return "Unknown";
2794   }
2795   return NULL;
2796 }
2797 
2798 const char*
get_header_string(Uint32 header_type)2799 Restore::get_header_string(Uint32 header_type)
2800 {
2801   switch (header_type)
2802   {
2803     case BackupFormat::INSERT_TYPE:
2804       return "INSERT_TYPE";
2805     case BackupFormat::WRITE_TYPE:
2806       return "WRITE_TYPE";
2807     case BackupFormat::DELETE_BY_PAGEID_TYPE:
2808       return "DELETE_BY_PAGEID_TYPE";
2809     case BackupFormat::DELETE_BY_ROWID_TYPE:
2810       return "DELETE_BY_ROWID_TYPE";
2811     default:
2812       ndbabort();
2813       return NULL;
2814   }
2815 }
2816 
2817 void
parse_record(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len,BackupFormat::RecordType header_type)2818 Restore::parse_record(Signal* signal,
2819                       FilePtr file_ptr,
2820                       const Uint32 *data,
2821                       Uint32 len,
2822                       BackupFormat::RecordType header_type)
2823 {
2824   Uint32 page_no = data[1];
2825   data += 1;
2826   file_ptr.p->m_error_code = 0;
2827   ndbrequire(file_ptr.p->m_lcp_version >= NDBD_RAW_LCP);
2828   if (page_no >= file_ptr.p->m_max_page_cnt)
2829   {
2830     /**
2831      * Page ignored since it is not part of this LCP.
2832      * Can happen with multiple files used to restore coming
2833      * from different LCPs.
2834      */
2835     jam();
2836     return;
2837   }
2838   Uint32 part_id = c_backup->hash_lcp_part(page_no);
2839   ndbrequire(part_id < MAX_LCP_PARTS_SUPPORTED);
2840   /*
2841   DEB_HIGH_RES(("(%u)parse_record, page_no: %u, part: %u,"
2842                 " state: %s, header_type: %s",
2843                 instance(),
2844                 page_no,
2845                 part_id,
2846                 get_state_string(Uint32(file_ptr.p->m_part_state[part_id])),
2847                 get_header_string(Uint32(header_type))));
2848   */
2849   switch (file_ptr.p->m_part_state[part_id])
2850   {
2851     case File::PART_IGNORED:
2852     {
2853       jam();
2854       /**
2855        * The row is a perfectly ok row, but we will ignore since
2856        * this part is handled by a later LCP data file.
2857        */
2858       file_ptr.p->m_ignored_rows++;
2859       return;
2860     }
2861     case File::PART_ALL_ROWS:
2862     {
2863       jam();
2864       /**
2865        * The data file contains all rows for this part, it contains no
2866        * DELETE BY ROWID. This part will be ignored in earlier LCP data
2867        * files restored, so we can safely use ZINSERT here as op_type.
2868        */
2869       ndbrequire(header_type == BackupFormat::INSERT_TYPE);
2870       break;
2871     }
2872     case File::PART_ALL_CHANGES:
2873     {
2874       jam();
2875       /**
2876        * This is a row that changed during the LCP this data file records.
2877        * The row could either exist or not dependent on if the operation
2878        * that changed it was an INSERT or an UPDATE. It could also be a
2879        * DELETE, in this case we only record the rowid and nothing more
2880        * to indicate this rowid was deleted. We will discover this below.
2881        */
2882       ndbrequire(header_type != BackupFormat::INSERT_TYPE);
2883       break;
2884     }
2885     default:
2886     {
2887       jam();
2888       ndbabort();
2889       return; /* Silence compiler warnings */
2890     }
2891   }
2892   Uint32 outstanding = file_ptr.p->m_outstanding_operations;
2893   if (header_type == BackupFormat::INSERT_TYPE)
2894   {
2895     /**
2896      * This is a normal INSERT as part of our restore process.
2897      * We install using a binary image saved in LCP file.
2898      */
2899     Uint32 * const key_start = signal->getDataPtrSend()+24;
2900     Uint32 * const attr_start = key_start + MAX_KEY_SIZE_IN_WORDS;
2901     Local_key rowid_val;
2902     jam();
2903     rowid_val.m_page_no = data[0];
2904     rowid_val.m_page_idx = data[1];
2905     file_ptr.p->m_rowid_page_no = rowid_val.m_page_no;
2906     file_ptr.p->m_rowid_page_idx = rowid_val.m_page_idx;
2907     Uint32 keyLen = c_tup->read_lcp_keys(file_ptr.p->m_table_id,
2908                                          data+2,
2909                                          len - 3,
2910                                          key_start);
2911     AttributeHeader::init(attr_start,
2912                           AttributeHeader::READ_LCP, 4*(len - 3));
2913     Uint32 attrLen = 1 + len - 3;
2914     file_ptr.p->m_rows_restored_insert++;
2915     memcpy(attr_start + 1, data+2, 4 * (len - 3));
2916     DEB_HIGH_RES(("(%u)INSERT_TYPE tab(%u,%u), row(%u,%u),"
2917                   " keyLen: %u, key[0]: %x",
2918                   instance(),
2919                   file_ptr.p->m_table_id,
2920                   file_ptr.p->m_fragment_id,
2921                   rowid_val.m_page_no,
2922                   rowid_val.m_page_idx,
2923                   keyLen,
2924                   key_start[0]));
2925 
2926     execute_operation(signal,
2927                       file_ptr,
2928                       keyLen,
2929                       attrLen,
2930                       ZINSERT,
2931                       0,
2932                       Uint32(BackupFormat::INSERT_TYPE),
2933                       &rowid_val);
2934     handle_return_execute_operation(signal,
2935                                     file_ptr,
2936                                     data,
2937                                     len,
2938                                     outstanding);
2939   }
2940   else
2941   {
2942     if (header_type == BackupFormat::DELETE_BY_ROWID_TYPE ||
2943         header_type == BackupFormat::WRITE_TYPE)
2944     {
2945       Local_key rowid_val;
2946       rowid_val.m_page_no = data[0];
2947       rowid_val.m_page_idx = data[1];
2948       file_ptr.p->m_rowid_page_no = rowid_val.m_page_no;
2949       file_ptr.p->m_rowid_page_idx = rowid_val.m_page_idx;
2950       jam();
2951       Uint32 gci_id = 0;
2952       Uint32 sent_header_type;
2953       if (header_type == BackupFormat::DELETE_BY_ROWID_TYPE)
2954       {
2955         gci_id = data[2];
2956         if (gci_id == 0)
2957         {
2958           jam();
2959           /**
2960            * We didn't have access to the GCI at LCP time, row
2961            * was in a new page and we didn't know about the GCI of the
2962            * old row in a previous page incarnation.
2963            * The DELETE BY ROWID could also have come through a
2964            * LCP keep list where the GCI isn't transported.
2965            *
2966            * The row is deleted at end of this restore and the
2967            * restore will have at least restored everything up to
2968            * Max GCI completed, if any changes happened after this
2969            * they will be in REDO log or need to be fetched from
2970            * live node.
2971            *
2972            * It is important to ensure that it is set to at least
2973            * this value to ensure that this node can properly
2974            * delete this row for a node that have been dead for an
2975            * extended amount of time.
2976            */
2977           gci_id = file_ptr.p->m_max_gci_completed;
2978         }
2979         sent_header_type = (Uint32)BackupFormat::DELETE_BY_ROWID_TYPE;
2980         file_ptr.p->m_rows_restored_delete++;
2981         DEB_HIGH_RES(("(%u)1:DELETE_BY_ROWID tab(%u,%u), row(%u,%u),"
2982                       " gci=%u",
2983                        instance(),
2984                        file_ptr.p->m_table_id,
2985                        file_ptr.p->m_fragment_id,
2986                        rowid_val.m_page_no,
2987                        rowid_val.m_page_idx,
2988                        gci_id));
2989       }
2990       else
2991       {
2992         sent_header_type = (Uint32)BackupFormat::DELETE_BY_ROWID_WRITE_TYPE;
2993         file_ptr.p->m_rows_restored_write++;
2994         DEB_HIGH_RES(("(%u)2:DELETE_BY_ROWID tab(%u,%u), row(%u,%u),"
2995                       " gci=%u",
2996                        instance(),
2997                        file_ptr.p->m_table_id,
2998                        file_ptr.p->m_fragment_id,
2999                        rowid_val.m_page_no,
3000                        rowid_val.m_page_idx,
3001                        gci_id));
3002       }
3003       execute_operation(signal,
3004                         file_ptr,
3005                         0,
3006                         0,
3007                         ZDELETE,
3008                         gci_id,
3009                         sent_header_type,
3010                         &rowid_val);
3011       if (header_type == BackupFormat::WRITE_TYPE)
3012       {
3013         /**
3014          * We found a CHANGE record. This is written into the LCP file
3015          * as part of an LCP where the part only records changes. In
3016          * this case we might have already inserted the row in a previous
3017          * LCP file. To simplify code we use a DELETE followed by a
3018          * normal LCP insert. Otherwise we will have to complicate the
3019          * TUP code to handle writes of LCP data.
3020          *
3021          * Normally there should be a smaller amount of those
3022          * records, so the performance impact should not be
3023          * very high.
3024          */
3025         DEB_HIGH_RES(("(%u)WRITE_TYPE tab(%u,%u), row(%u,%u), gci=%u",
3026                        instance(),
3027                        file_ptr.p->m_table_id,
3028                        file_ptr.p->m_fragment_id,
3029                        rowid_val.m_page_no,
3030                        rowid_val.m_page_idx,
3031                        gci_id));
3032         Uint32 * const key_start = signal->getDataPtrSend()+24;
3033         Uint32 * const attr_start = key_start + MAX_KEY_SIZE_IN_WORDS;
3034         Uint32 keyLen = c_tup->read_lcp_keys(file_ptr.p->m_table_id,
3035                                              data+2,
3036                                              len - 3,
3037                                              key_start);
3038         AttributeHeader::init(attr_start,
3039                               AttributeHeader::READ_LCP, 4*(len - 3));
3040         Uint32 attrLen = 1 + len - 3;
3041         memcpy(attr_start + 1, data+2, 4 * (len - 3));
3042         execute_operation(signal,
3043                           file_ptr,
3044                           keyLen,
3045                           attrLen,
3046                           ZINSERT,
3047                           gci_id,
3048                           header_type,
3049                           &rowid_val);
3050         handle_return_execute_operation(signal,
3051                                         file_ptr,
3052                                         data,
3053                                         len,
3054                                         outstanding);
3055       }
3056       else
3057       {
3058         /**
3059          * We found a DELETE BY ROWID, this deletes the row in the rowid
3060          * position, This can happen in parts where we record changes, we might
3061          * have inserted the row in an earlier LCP data file, so we need to
3062          * attempt to remove it here.
3063          *
3064          * For DELETE by ROWID there is no key and no ATTRINFO to send.
3065          * The key is instead the rowid which is sent when the row id flag is
3066          * set.
3067          */
3068         DEB_HIGH_RES(("(%u)3:DELETE_BY_ROWID tab(%u,%u), row(%u,%u), gci=%u",
3069                        instance(),
3070                        file_ptr.p->m_table_id,
3071                        file_ptr.p->m_fragment_id,
3072                        rowid_val.m_page_no,
3073                        rowid_val.m_page_idx,
3074                        gci_id));
3075         ndbrequire(len == (3 + 1));
3076         ndbrequire(outstanding == file_ptr.p->m_outstanding_operations);
3077       }
3078     }
3079     else
3080     {
3081       jam();
3082       Local_key rowid_val;
3083       DEB_HIGH_RES(("(%u)DELETE_BY_PAGEID tab(%u,%u), page=%u, record_size=%u",
3084                      instance(),
3085                      file_ptr.p->m_table_id,
3086                      file_ptr.p->m_fragment_id,
3087                      data[0],
3088                      data[1]));
3089       ndbrequire(header_type == BackupFormat::DELETE_BY_PAGEID_TYPE);
3090       ndbrequire(len == (2 + 1));
3091       /* DELETE by PAGEID, a loop of DELETE by ROWID */
3092       rowid_val.m_page_no = data[0];
3093       rowid_val.m_page_idx = 0;
3094       Uint32 record_size = data[1];
3095       file_ptr.p->m_outstanding_operations++;
3096       file_ptr.p->m_rows_restored_delete_page++;
3097       while ((rowid_val.m_page_idx + record_size) <=
3098              Tup_fixsize_page::DATA_WORDS)
3099       {
3100         jam();
3101         execute_operation(signal,
3102                           file_ptr,
3103                           0,
3104                           0,
3105                           ZDELETE,
3106                           0,
3107                           header_type,
3108                           &rowid_val);
3109         rowid_val.m_page_idx += record_size;
3110       }
3111       ndbrequire(file_ptr.p->m_outstanding_operations > 0);
3112       file_ptr.p->m_outstanding_operations--;
3113       ndbrequire(outstanding == file_ptr.p->m_outstanding_operations);
3114       check_restore_ready(signal, file_ptr);
3115     }
3116   }
3117 }
3118 
3119 void
handle_return_execute_operation(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len,Uint32 outstanding)3120 Restore::handle_return_execute_operation(Signal *signal,
3121                                          FilePtr file_ptr,
3122                                          const Uint32 *data,
3123                                          Uint32 len,
3124                                          Uint32 outstanding)
3125 {
3126   ndbrequire(outstanding == file_ptr.p->m_outstanding_operations);
3127   if (file_ptr.p->m_error_code == 0)
3128   {
3129     return; /* Normal path, return */
3130   }
3131   Uint32 * const key_start = signal->getDataPtrSend()+24;
3132   Uint32 * const attr_start = key_start + MAX_KEY_SIZE_IN_WORDS;
3133   Local_key rowid_val;
3134   Uint32 keyLen;
3135   Uint32 attrLen = 1 + len - 3;
3136 
3137   if (file_ptr.p->m_error_code != 630 ||
3138       file_ptr.p->m_num_files == 1 ||
3139       file_ptr.p->m_current_file_index == 0)
3140     goto error;
3141 
3142   jam();
3143   /**
3144    * 630 means that key already exists. When inserting a row during
3145    * restore it is normal that the key we're inserting can exist. This
3146    * key can have been inserted by a previous insert into a different
3147    * rowid.
3148    *
3149    * The rowid where this key previously existed can have a DELETE BY
3150    * ROWID operation in the LCP files, it could have a WRITE with a
3151    * different key as well.
3152    * In both those cases it is possible that the INSERT comes before
3153    * this DELETE BY ROWID or WRITE operation since these happen in
3154    * rowid order and not in key order. They can even happen in a
3155    * different LCP file since one LCP can span multiple LCP files.
3156    *
3157    * To ensure consistency we track exactly how many rows we restored
3158    * during the restore of the LCP files.
3159    *
3160    * We need to reinitialise key data and attribute data from data
3161    * array since signal object isn't safe after executing the
3162    * LQHKEYREQ signal.
3163    *
3164    * This cannot happen with only 1 LCP file and it cannot happen in
3165    * the first LCP file.
3166    */
3167 
3168   DEB_RES(("(%u)tab(%u,%u) row(%u,%u) key already existed,"
3169            " num_files: %u, current_file: %u",
3170            instance(),
3171            file_ptr.p->m_table_id,
3172            file_ptr.p->m_fragment_id,
3173            file_ptr.p->m_rowid_page_no,
3174            file_ptr.p->m_rowid_page_idx,
3175            file_ptr.p->m_num_files,
3176            file_ptr.p->m_current_file_index));
3177 
3178   keyLen = c_tup->read_lcp_keys(file_ptr.p->m_table_id,
3179                                 data+2,
3180                                 len - 3,
3181                                 key_start);
3182   execute_operation(signal,
3183                     file_ptr,
3184                     keyLen,
3185                     0,
3186                     ZDELETE,
3187                     0,
3188                     BackupFormat::NORMAL_DELETE_TYPE,
3189                     NULL);
3190 
3191   ndbrequire(outstanding == file_ptr.p->m_outstanding_operations);
3192   if (file_ptr.p->m_error_code != 0)
3193     goto error;
3194 
3195   /**
3196    * Setup key data and attribute data again, since the signal
3197    * object cannot be regarded as safe, we need to reinitialise
3198    * this data.
3199    */
3200   keyLen = c_tup->read_lcp_keys(file_ptr.p->m_table_id,
3201                                 data+2,
3202                                 len - 3,
3203                                 key_start);
3204   AttributeHeader::init(attr_start,
3205                         AttributeHeader::READ_LCP, 4*(len - 3));
3206   memcpy(attr_start + 1, data+2, 4 * (len - 3));
3207   rowid_val.m_page_no = data[0];
3208   rowid_val.m_page_idx = data[1];
3209   execute_operation(signal,
3210                     file_ptr,
3211                     keyLen,
3212                     attrLen,
3213                     ZINSERT,
3214                     0,
3215                     Uint32(BackupFormat::INSERT_TYPE),
3216                     &rowid_val);
3217   ndbrequire(outstanding == file_ptr.p->m_outstanding_operations);
3218   ndbrequire(file_ptr.p->m_error_code == 0);
3219   return;
3220 
3221 error:
3222   g_eventLogger->info("(%u)tab(%u,%u),row(%u,%u) crash, error: %u",
3223                       instance(),
3224                       file_ptr.p->m_table_id,
3225                       file_ptr.p->m_fragment_id,
3226                       file_ptr.p->m_rowid_page_no,
3227                       file_ptr.p->m_rowid_page_idx,
3228                       file_ptr.p->m_error_code);
3229   ndbrequire(file_ptr.p->m_error_code == 0);
3230 }
3231 
3232 void
execute_operation(Signal * signal,FilePtr file_ptr,Uint32 keyLen,Uint32 attrLen,Uint32 op_type,Uint32 gci_id,Uint32 header_type,Local_key * rowid_val)3233 Restore::execute_operation(Signal *signal,
3234                            FilePtr file_ptr,
3235                            Uint32 keyLen,
3236                            Uint32 attrLen,
3237                            Uint32 op_type,
3238                            Uint32 gci_id,
3239                            Uint32 header_type,
3240                            Local_key *rowid_val)
3241 {
3242   LqhKeyReq * req = (LqhKeyReq *)signal->getDataPtrSend();
3243   /**
3244    * attrLen is not used for long lqhkeyreq, and should be zero for short
3245    * lqhkeyreq.
3246    */
3247   req->attrLen = 0;
3248 
3249   Uint32 tmp= 0;
3250   const bool short_lqhkeyreq = (keyLen == 0);
3251   /**
3252    * With partital LCP also other operations like delete by rowid will be used.
3253    * In these cases no data is passed, and receiver will interpret signal as a
3254    * short signal, but no KEYINFO or ATTRINFO will be sent or expected.
3255    */
3256   Uint32 * const key_start = signal->getDataPtrSend()+24;
3257   if (short_lqhkeyreq)
3258   {
3259     ndbrequire(attrLen == 0);
3260     ndbassert(keyLen == 0);
3261     LqhKeyReq::setKeyLen(tmp, keyLen);
3262   }
3263   if (!short_lqhkeyreq)
3264   {
3265     LqhKeyReq::setDisableFkConstraints(tmp, 0);
3266     LqhKeyReq::setNoTriggersFlag(tmp, 0);
3267     LqhKeyReq::setUtilFlag(tmp, 0);
3268   }
3269   LqhKeyReq::setLastReplicaNo(tmp, 0);
3270   /* ---------------------------------------------------------------------- */
3271   // Indicate Application Reference is present in bit 15
3272   /* ---------------------------------------------------------------------- */
3273   LqhKeyReq::setApplicationAddressFlag(tmp, 0);
3274   LqhKeyReq::setDirtyFlag(tmp, 1);
3275   LqhKeyReq::setSimpleFlag(tmp, 1);
3276   LqhKeyReq::setOperation(tmp, op_type);
3277   LqhKeyReq::setSameClientAndTcFlag(tmp, 0);
3278   if (short_lqhkeyreq)
3279   {
3280     LqhKeyReq::setAIInLqhKeyReq(tmp, 0);
3281     req->hashValue = 0;
3282   }
3283   else
3284   {
3285     Uint32 tableId = file_ptr.p->m_table_id;
3286     LqhKeyReq::setCorrFactorFlag(tmp, 0);
3287     LqhKeyReq::setNormalProtocolFlag(tmp, 0);
3288     LqhKeyReq::setDeferredConstraints(tmp, 0);
3289 
3290     if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
3291     {
3292       req->hashValue = calculate_hash(tableId, key_start);
3293     }
3294     else
3295     {
3296       req->hashValue = md5_hash((Uint64*)key_start, keyLen);
3297     }
3298   }
3299   LqhKeyReq::setNoDiskFlag(tmp, 1);
3300   LqhKeyReq::setRowidFlag(tmp, (rowid_val != 0));
3301   req->clientConnectPtr = (file_ptr.i + (header_type << 28));
3302   req->tcBlockref = reference();
3303   req->savePointId = 0;
3304   req->tableSchemaVersion = file_ptr.p->m_table_id +
3305     (file_ptr.p->m_table_version << 16);
3306   req->fragmentData = file_ptr.p->m_fragment_id;
3307   req->transId1 = 0;
3308   req->transId2 = 0;
3309   req->scanInfo = 0;
3310   Uint32 pos = 0;
3311   if (op_type != ZDELETE)
3312   {
3313     /**
3314      * Need not set GCI flag here since we restore also the header part of
3315      * the row in this case.
3316      */
3317     req->variableData[pos++] = rowid_val->m_page_no;
3318     req->variableData[pos++] = rowid_val->m_page_idx;
3319     LqhKeyReq::setGCIFlag(tmp, 0);
3320   }
3321   else
3322   {
3323     /**
3324      * We reuse the Node Restart Copy handling to perform
3325      * DELETE by ROWID. In this case we need to set the GCI of the record.
3326      */
3327     if (rowid_val)
3328     {
3329       req->variableData[pos++] = rowid_val->m_page_no;
3330       req->variableData[pos++] = rowid_val->m_page_idx;
3331       LqhKeyReq::setGCIFlag(tmp, 1);
3332       LqhKeyReq::setNrCopyFlag(tmp, 1);
3333       req->variableData[pos++] = gci_id;
3334     }
3335   }
3336   req->requestInfo = tmp;
3337   if (short_lqhkeyreq)
3338   {
3339     file_ptr.p->m_outstanding_operations++;
3340     EXECUTE_DIRECT(DBLQH, GSN_LQHKEYREQ, signal,
3341                    LqhKeyReq::FixedSignalLength + pos);
3342   }
3343   else
3344   {
3345     bool ok = true;
3346     SectionHandle sections(this);
3347     sections.clear();
3348 
3349     sections.m_ptr[LqhKeyReq::KeyInfoSectionNum].i = RNIL;
3350     ok= appendToSection(sections.m_ptr[LqhKeyReq::KeyInfoSectionNum].i,
3351                         key_start,
3352                         keyLen);
3353     if (unlikely(!ok))
3354     {
3355       jam();
3356       crash_during_restore(file_ptr, __LINE__, ZGET_DATAREC_ERROR);
3357       ndbabort();
3358     }
3359     sections.m_cnt++;
3360 
3361     if (attrLen > 0)
3362     {
3363       Uint32 * const attr_start = key_start + MAX_KEY_SIZE_IN_WORDS;
3364       sections.m_ptr[LqhKeyReq::AttrInfoSectionNum].i = RNIL;
3365       ok= appendToSection(sections.m_ptr[LqhKeyReq::AttrInfoSectionNum].i,
3366                           attr_start,
3367                           attrLen);
3368 
3369       if (unlikely(!ok))
3370       {
3371         jam();
3372         crash_during_restore(file_ptr, __LINE__, ZGET_ATTRINBUF_ERROR);
3373         ndbabort();
3374       }
3375       sections.m_cnt++;
3376     }
3377     file_ptr.p->m_outstanding_operations++;
3378     EXECUTE_DIRECT_WITH_SECTIONS(DBLQH, GSN_LQHKEYREQ, signal,
3379                                  LqhKeyReq::FixedSignalLength+pos,
3380                                  &sections);
3381   }
3382 }
3383 
3384 Uint32
calculate_hash(Uint32 tableId,const Uint32 * src)3385 Restore::calculate_hash(Uint32 tableId, const Uint32 *src)
3386 {
3387   jam();
3388   Uint64 Tmp[(MAX_KEY_SIZE_IN_WORDS*MAX_XFRM_MULTIPLY) >> 1];
3389   Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
3390   Uint32 keyLen = xfrm_key_hash(tableId, src,
3391 				(Uint32*)Tmp, sizeof(Tmp) >> 2,
3392 			        keyPartLen);
3393   ndbrequire(keyLen);
3394 
3395   return md5_hash(Tmp, keyLen);
3396 }
3397 
3398 void
execLQHKEYREF(Signal * signal)3399 Restore::execLQHKEYREF(Signal* signal)
3400 {
3401   FilePtr file_ptr;
3402   LqhKeyRef* ref = (LqhKeyRef*)signal->getDataPtr();
3403   BackupFormat::RecordType header_type =
3404     (BackupFormat::RecordType)(ref->connectPtr >> 28);
3405   m_file_pool.getPtr(file_ptr, (ref->connectPtr & 0x0FFFFFFF));
3406 
3407   ndbrequire(file_ptr.p->m_outstanding_operations > 0);
3408   file_ptr.p->m_outstanding_operations--;
3409   file_ptr.p->m_error_code = 0;
3410   switch (header_type)
3411   {
3412     case BackupFormat::DELETE_BY_ROWID_TYPE:
3413     {
3414       jam();
3415       break;
3416     }
3417     case BackupFormat::DELETE_BY_PAGEID_TYPE:
3418     {
3419       jam();
3420       break;
3421     }
3422     case BackupFormat::DELETE_BY_ROWID_WRITE_TYPE:
3423     {
3424       jam();
3425       break;
3426     }
3427     case BackupFormat::INSERT_TYPE:
3428     case BackupFormat::WRITE_TYPE:
3429     case BackupFormat::NORMAL_DELETE_TYPE:
3430     default:
3431     {
3432       jam();
3433       file_ptr.p->m_error_code = ref->errorCode;
3434       return;
3435     }
3436   }
3437   file_ptr.p->m_rows_restored_delete_failed++;
3438   file_ptr.p->m_row_operations++;
3439   check_restore_ready(signal, file_ptr);
3440 }
3441 
3442 void
crash_during_restore(FilePtr file_ptr,Uint32 line,Uint32 errCode)3443 Restore::crash_during_restore(FilePtr file_ptr, Uint32 line, Uint32 errCode)
3444 {
3445   char buf[255], name[100];
3446   BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
3447 		       file_ptr.p->m_file_id,
3448 		       file_ptr.p->m_table_id,
3449 		       file_ptr.p->m_fragment_id);
3450 
3451   if (errCode)
3452   {
3453     BaseString::snprintf(buf, sizeof(buf),
3454                          "Error %d (line: %u) during restore of  %s",
3455                          errCode, line, name);
3456   }
3457   else
3458   {
3459     BaseString::snprintf(buf, sizeof(buf),
3460                          "Error (line %u) during restore of  %s",
3461                          line, name);
3462   }
3463   progError(__LINE__, NDBD_EXIT_INVALID_LCP_FILE, buf);
3464 }
3465 
3466 void
delete_by_rowid_fail(Uint32 op_ptr)3467 Restore::delete_by_rowid_fail(Uint32 op_ptr)
3468 {
3469   FilePtr file_ptr;
3470   m_file_pool.getPtr(file_ptr, (op_ptr & 0x0FFFFFFF));
3471   DEB_RES_DEL(("(%u)DELETE fail:tab(%u,%u), m_rows_restored = %llu",
3472                instance(),
3473                file_ptr.p->m_table_id,
3474                file_ptr.p->m_fragment_id,
3475                file_ptr.p->m_rows_restored));
3476 }
3477 
3478 void
delete_by_rowid_succ(Uint32 op_ptr)3479 Restore::delete_by_rowid_succ(Uint32 op_ptr)
3480 {
3481   FilePtr file_ptr;
3482   m_file_pool.getPtr(file_ptr, (op_ptr & 0x0FFFFFFF));
3483   ndbrequire(file_ptr.p->m_rows_restored > 0);
3484   file_ptr.p->m_rows_restored--;
3485   DEB_RES_DEL(("(%u)DELETE success:tab(%u,%u), m_rows_restored = %llu",
3486                instance(),
3487                file_ptr.p->m_table_id,
3488                file_ptr.p->m_fragment_id,
3489                file_ptr.p->m_rows_restored));
3490 }
3491 
3492 void
execLQHKEYCONF(Signal * signal)3493 Restore::execLQHKEYCONF(Signal* signal)
3494 {
3495   FilePtr file_ptr;
3496   LqhKeyConf * conf = (LqhKeyConf *)signal->getDataPtr();
3497   BackupFormat::RecordType header_type = (BackupFormat::RecordType)(conf->opPtr >> 28);
3498   m_file_pool.getPtr(file_ptr, (conf->opPtr & 0x0FFFFFFF));
3499 
3500   ndbassert(file_ptr.p->m_outstanding_operations);
3501   file_ptr.p->m_outstanding_operations--;
3502   file_ptr.p->m_error_code = 0;
3503   switch (header_type)
3504   {
3505     case BackupFormat::INSERT_TYPE:
3506       jam();
3507       file_ptr.p->m_rows_restored++;
3508       file_ptr.p->m_row_operations++;
3509       break;
3510     case BackupFormat::WRITE_TYPE:
3511       jam();
3512       file_ptr.p->m_rows_restored++;
3513       file_ptr.p->m_row_operations++;
3514       break;
3515     case BackupFormat::NORMAL_DELETE_TYPE:
3516       jam();
3517       file_ptr.p->m_rows_restored--;
3518       file_ptr.p->m_row_operations++;
3519       break;
3520     case BackupFormat::DELETE_BY_ROWID_TYPE:
3521     case BackupFormat::DELETE_BY_PAGEID_TYPE:
3522     case BackupFormat::DELETE_BY_ROWID_WRITE_TYPE:
3523       jam();
3524       file_ptr.p->m_row_operations++;
3525       break;
3526     default:
3527       ndbabort();
3528   }
3529   check_restore_ready(signal, file_ptr);
3530 }
3531 
3532 void
check_restore_ready(Signal * signal,FilePtr file_ptr)3533 Restore::check_restore_ready(Signal *signal, FilePtr file_ptr)
3534 {
3535   if (file_ptr.p->m_outstanding_operations == 0 && file_ptr.p->m_fd == RNIL)
3536   {
3537     jam();
3538     restore_lcp_conf_after_execute(signal, file_ptr);
3539     return;
3540   }
3541 }
3542 
3543 void
restore_lcp_conf_after_execute(Signal * signal,FilePtr file_ptr)3544 Restore::restore_lcp_conf_after_execute(Signal* signal, FilePtr file_ptr)
3545 {
3546   file_ptr.p->m_current_file_index++;
3547   if (file_ptr.p->m_current_file_index < file_ptr.p->m_num_files)
3548   {
3549     /**
3550      * There are still more data files to apply before restore is complete.
3551      * Handle next file now.
3552      */
3553     jam();
3554     DEB_RES(("(%u)Step forward to next data file", instance()));
3555     step_file_number_forward(file_ptr);
3556     file_ptr.p->m_current_page_pos = 0;
3557     file_ptr.p->m_current_page_index = 0;
3558     file_ptr.p->m_current_file_page = 0;
3559     ndbrequire(file_ptr.p->m_outstanding_reads == 0);
3560     ndbrequire(file_ptr.p->m_outstanding_operations == 0);
3561     ndbrequire(file_ptr.p->m_bytes_left == 0);
3562     release_file(file_ptr, false);
3563     ndbrequire(seize_file(file_ptr) == 0);
3564     open_data_file(signal, file_ptr);
3565     return;
3566   }
3567   restore_lcp_conf(signal, file_ptr);
3568 }
3569 
3570 void
restore_lcp_conf(Signal * signal,FilePtr file_ptr)3571 Restore::restore_lcp_conf(Signal *signal, FilePtr file_ptr)
3572 {
3573   /**
3574    * All LCP data files that are part of restore have been applied
3575    * successfully, this fragment has completed its restore and we're
3576    * ready to continue with the next step.
3577    */
3578 
3579   /**
3580    * Temporary reset DBTUP's #disk attributes on table
3581    *
3582    * TUP will send RESTORE_LCP_CONF
3583    */
3584   DEB_RES(("(%u)Complete restore", instance()));
3585 
3586   if (file_ptr.p->m_lcp_ctl_version == NDBD_USE_PARTIAL_LCP_v2)
3587   {
3588     /**
3589      * Important to verify that number of rows is what we expect.
3590      * Otherwise we could go on with inconsistent database without
3591      * knowing it. So better to crash and specify error.
3592      */
3593     if (file_ptr.p->m_rows_in_lcp != file_ptr.p->m_rows_restored)
3594     {
3595       char buf[512];
3596       BaseString::snprintf(buf, sizeof(buf),
3597                            "Inconsistency in restoring T%uF%u, restored"
3598                            " %llu rows, expected to restore %llu rows"
3599                            "\nInitial node restart is required to recover",
3600                            file_ptr.p->m_table_id,
3601                            file_ptr.p->m_fragment_id,
3602                            file_ptr.p->m_rows_restored,
3603                            file_ptr.p->m_rows_in_lcp);
3604       progError(__LINE__, NDBD_EXIT_INVALID_LCP_FILE, buf);
3605     }
3606   }
3607 
3608   c_tup->complete_restore_lcp(signal,
3609                               file_ptr.p->m_sender_ref,
3610                               file_ptr.p->m_sender_data,
3611                               file_ptr.p->m_restored_lcp_id,
3612                               file_ptr.p->m_restored_local_lcp_id,
3613                               file_ptr.p->m_max_gci_completed,
3614                               file_ptr.p->m_max_gci_written,
3615                               file_ptr.p->m_table_id,
3616                               file_ptr.p->m_fragment_id);
3617   jamEntry();
3618 
3619   if (c_tup->get_restore_row_count(file_ptr.p->m_table_id,
3620                                    file_ptr.p->m_fragment_id) !=
3621       file_ptr.p->m_rows_restored)
3622   {
3623     char buf[512];
3624     BaseString::snprintf(buf, sizeof(buf),
3625                          "Inconsistency in restoring T%uF%u, restored"
3626                          " %llu rows, TUP claims %llu rows"
3627                          "\nInitial node restart is required to recover",
3628                          file_ptr.p->m_table_id,
3629                          file_ptr.p->m_fragment_id,
3630                          file_ptr.p->m_rows_restored,
3631                          c_tup->get_restore_row_count(file_ptr.p->m_table_id,
3632                                              file_ptr.p->m_fragment_id));
3633     progError(__LINE__, NDBD_EXIT_INVALID_LCP_FILE, buf);
3634   }
3635   signal->theData[0] = NDB_LE_ReadLCPComplete;
3636   signal->theData[1] = file_ptr.p->m_table_id;
3637   signal->theData[2] = file_ptr.p->m_fragment_id;
3638   signal->theData[3] = Uint32(file_ptr.p->m_rows_restored >> 32);
3639   signal->theData[4] = Uint32(file_ptr.p->m_rows_restored);
3640   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
3641 
3642   release_file(file_ptr, true);
3643 }
3644 
3645 void
parse_fragment_footer(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)3646 Restore::parse_fragment_footer(Signal* signal, FilePtr file_ptr,
3647 			       const Uint32 *data, Uint32 len)
3648 {
3649   const BackupFormat::DataFile::FragmentFooter* fh=
3650     (BackupFormat::DataFile::FragmentFooter*)data;
3651   if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
3652   {
3653     parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
3654     return;
3655   }
3656 
3657   if(ntohl(fh->Checksum) != 0)
3658   {
3659     parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
3660     return;
3661   }
3662 }
3663 
3664 void
parse_gcp_entry(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)3665 Restore::parse_gcp_entry(Signal* signal, FilePtr file_ptr,
3666 			 const Uint32 *data, Uint32 len)
3667 {
3668 
3669 }
3670 
3671 void
parse_error(Signal * signal,FilePtr file_ptr,Uint32 line,Uint32 extra)3672 Restore::parse_error(Signal* signal,
3673 		     FilePtr file_ptr, Uint32 line, Uint32 extra)
3674 {
3675   char buf[255], name[100];
3676   BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
3677 		       file_ptr.p->m_file_id,
3678 		       file_ptr.p->m_table_id,
3679 		       file_ptr.p->m_fragment_id);
3680 
3681   BaseString::snprintf(buf, sizeof(buf),
3682 		       "Parse error in file: %s, extra: %d",
3683 		       name, extra);
3684 
3685   progError(line, NDBD_EXIT_INVALID_LCP_FILE, buf);
3686   ndbabort();
3687 }
3688 
3689 NdbOut&
operator <<(NdbOut & ndbout,const Restore::Column & col)3690 operator << (NdbOut& ndbout, const Restore::Column& col)
3691 {
3692   ndbout << "[ Col: id: " << col.m_id
3693 	 << " size: " << col.m_size
3694 	 << " key: " << (Uint32)(col.m_flags & Restore::Column::COL_KEY)
3695 	 << " variable: " << (Uint32)(col.m_flags & Restore::Column::COL_VAR)
3696 	 << " null: " << (Uint32)(col.m_flags & Restore::Column::COL_NULL)
3697 	 << " disk: " << (Uint32)(col.m_flags & Restore::Column::COL_DISK)
3698 	 << "]";
3699 
3700   return ndbout;
3701 }
3702 
3703 int
check_file_version(Signal * signal,Uint32 file_version)3704 Restore::check_file_version(Signal* signal, Uint32 file_version)
3705 {
3706   if (file_version < MAKE_VERSION(5,1,6))
3707   {
3708     char buf[255];
3709     char verbuf[255];
3710     ndbGetVersionString(file_version, 0, 0, verbuf, sizeof(verbuf));
3711     BaseString::snprintf(buf, sizeof(buf),
3712 			 "Unsupported version of LCP files found on disk, "
3713 			 " found: %s", verbuf);
3714 
3715     progError(__LINE__,
3716 	      NDBD_EXIT_SR_RESTARTCONFLICT,
3717 	      buf);
3718     return -1;
3719   }
3720   return 0;
3721 }
3722