1 /*
2    Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "restore.hpp"
26 #include <signaldata/FsRef.hpp>
27 #include <signaldata/FsConf.hpp>
28 #include <signaldata/FsOpenReq.hpp>
29 #include <signaldata/FsCloseReq.hpp>
30 #include <signaldata/FsReadWriteReq.hpp>
31 #include <signaldata/RestoreImpl.hpp>
32 #include <signaldata/DictTabInfo.hpp>
33 #include <signaldata/KeyInfo.hpp>
34 #include <signaldata/AttrInfo.hpp>
35 #include <signaldata/LqhKey.hpp>
36 #include <AttributeHeader.hpp>
37 #include <md5_hash.hpp>
38 #include <dblqh/Dblqh.hpp>
39 #include <dbtup/Dbtup.hpp>
40 #include <KeyDescriptor.hpp>
41 
42 #define PAGES LCP_RESTORE_BUFFER
43 
Restore(Block_context & ctx,Uint32 instanceNumber)44 Restore::Restore(Block_context& ctx, Uint32 instanceNumber) :
45   SimulatedBlock(RESTORE, ctx, instanceNumber),
46   m_file_list(m_file_pool),
47   m_file_hash(m_file_pool)
48 {
49   BLOCK_CONSTRUCTOR(Restore);
50 
51   // Add received signals
52   addRecSignal(GSN_STTOR, &Restore::execSTTOR);
53   addRecSignal(GSN_DUMP_STATE_ORD, &Restore::execDUMP_STATE_ORD);
54   addRecSignal(GSN_CONTINUEB, &Restore::execCONTINUEB);
55   addRecSignal(GSN_READ_CONFIG_REQ, &Restore::execREAD_CONFIG_REQ, true);
56 
57   addRecSignal(GSN_RESTORE_LCP_REQ, &Restore::execRESTORE_LCP_REQ);
58 
59   addRecSignal(GSN_FSOPENREF, &Restore::execFSOPENREF, true);
60   addRecSignal(GSN_FSOPENCONF, &Restore::execFSOPENCONF);
61   addRecSignal(GSN_FSREADREF, &Restore::execFSREADREF, true);
62   addRecSignal(GSN_FSREADCONF, &Restore::execFSREADCONF);
63   addRecSignal(GSN_FSCLOSEREF, &Restore::execFSCLOSEREF, true);
64   addRecSignal(GSN_FSCLOSECONF, &Restore::execFSCLOSECONF);
65 
66   addRecSignal(GSN_LQHKEYREF, &Restore::execLQHKEYREF);
67   addRecSignal(GSN_LQHKEYCONF, &Restore::execLQHKEYCONF);
68 
69   ndbrequire(sizeof(Column) == 8);
70 }
71 
~Restore()72 Restore::~Restore()
73 {
74 }
75 
BLOCK_FUNCTIONS(Restore)76 BLOCK_FUNCTIONS(Restore)
77 
78 void
79 Restore::execSTTOR(Signal* signal)
80 {
81   jamEntry();
82 
83   c_lqh = (Dblqh*)globalData.getBlock(DBLQH, instance());
84   c_tup = (Dbtup*)globalData.getBlock(DBTUP, instance());
85   ndbrequire(c_lqh != 0 && c_tup != 0);
86   sendSTTORRY(signal);
87 
88   return;
89 }//Restore::execNDB_STTOR()
90 
91 void
execREAD_CONFIG_REQ(Signal * signal)92 Restore::execREAD_CONFIG_REQ(Signal* signal)
93 {
94   jamEntry();
95   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
96   Uint32 ref = req->senderRef;
97   Uint32 senderData = req->senderData;
98   ndbrequire(req->noOfParameters == 0);
99 
100   const ndb_mgm_configuration_iterator * p =
101     m_ctx.m_config.getOwnConfigIterator();
102   ndbrequire(p != 0);
103 
104 #if 0
105   Uint32 noBackups = 0, noTables = 0, noAttribs = 0;
106   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &m_diskless));
107   ndb_mgm_get_int_parameter(p, CFG_DB_PARALLEL_BACKUPS, &noBackups);
108   //  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, &noTables));
109   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &noTables));
110   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES, &noAttribs));
111 
112   noAttribs++; //RT 527 bug fix
113 
114   c_backupPool.setSize(noBackups);
115   c_backupFilePool.setSize(3 * noBackups);
116   c_tablePool.setSize(noBackups * noTables);
117   c_attributePool.setSize(noBackups * noAttribs);
118   c_triggerPool.setSize(noBackups * 3 * noTables);
119 
120   // 2 = no of replicas
121   c_fragmentPool.setSize(noBackups * NO_OF_FRAG_PER_NODE * noTables);
122 
123   Uint32 szMem = 0;
124   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MEM, &szMem);
125   Uint32 noPages = (szMem + sizeof(Page32) - 1) / sizeof(Page32);
126   // We need to allocate an additional of 2 pages. 1 page because of a bug in
127   // ArrayPool and another one for DICTTAINFO.
128   c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2);
129 
130   Uint32 szDataBuf = (2 * 1024 * 1024);
131   Uint32 szLogBuf = (2 * 1024 * 1024);
132   Uint32 szWrite = 32768;
133   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf);
134   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
135   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
136 
137   c_defaults.m_logBufferSize = szLogBuf;
138   c_defaults.m_dataBufferSize = szDataBuf;
139   c_defaults.m_minWriteSize = szWrite;
140   c_defaults.m_maxWriteSize = szWrite;
141 
142   { // Init all tables
143     ArrayList<Table> tables(c_tablePool);
144     TablePtr ptr;
145     while(tables.seize(ptr)){
146       new (ptr.p) Table(c_attributePool, c_fragmentPool);
147     }
148     tables.release();
149   }
150 
151   {
152     ArrayList<BackupFile> ops(c_backupFilePool);
153     BackupFilePtr ptr;
154     while(ops.seize(ptr)){
155       new (ptr.p) BackupFile(* this, c_pagePool);
156     }
157     ops.release();
158   }
159 
160   {
161     ArrayList<BackupRecord> recs(c_backupPool);
162     BackupRecordPtr ptr;
163     while(recs.seize(ptr)){
164       new (ptr.p) BackupRecord(* this, c_pagePool, c_tablePool,
165 			       c_backupFilePool, c_triggerPool);
166     }
167     recs.release();
168   }
169 
170   // Initialize BAT for interface to file system
171   {
172     Page32Ptr p;
173     ndbrequire(c_pagePool.seizeId(p, 0));
174     c_startOfPages = (Uint32 *)p.p;
175     c_pagePool.release(p);
176 
177     NewVARIABLE* bat = allocateBat(1);
178     bat[0].WA = c_startOfPages;
179     bat[0].nrr = c_pagePool.getSize()*sizeof(Page32)/sizeof(Uint32);
180   }
181 #endif
182   m_file_pool.setSize(1);
183   Uint32 cnt = 2*MAX_ATTRIBUTES_IN_TABLE;
184   cnt += PAGES;
185   cnt += List::getSegmentSize()-1;
186   cnt /= List::getSegmentSize();
187   cnt += 2;
188   m_databuffer_pool.setSize(cnt);
189 
190   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
191   conf->senderRef = reference();
192   conf->senderData = senderData;
193   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
194 	     ReadConfigConf::SignalLength, JBB);
195 }
196 
197 void
sendSTTORRY(Signal * signal)198 Restore::sendSTTORRY(Signal* signal){
199   signal->theData[0] = 0;
200   signal->theData[3] = 1;
201   signal->theData[4] = 3;
202   signal->theData[5] = 255; // No more start phases from missra
203   BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : RESTORE_REF;
204   sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
205 }
206 
207 void
execCONTINUEB(Signal * signal)208 Restore::execCONTINUEB(Signal* signal){
209   jamEntry();
210 
211   switch(signal->theData[0]){
212   case RestoreContinueB::RESTORE_NEXT:
213   {
214     FilePtr file_ptr;
215     m_file_pool.getPtr(file_ptr, signal->theData[1]);
216     restore_next(signal, file_ptr);
217     return;
218   }
219   case RestoreContinueB::READ_FILE:
220   {
221     FilePtr file_ptr;
222     m_file_pool.getPtr(file_ptr, signal->theData[1]);
223     read_file(signal, file_ptr);
224     return;
225   }
226   default:
227     ndbrequire(false);
228   }
229 }
230 
231 void
execDUMP_STATE_ORD(Signal * signal)232 Restore::execDUMP_STATE_ORD(Signal* signal){
233   jamEntry();
234 }
235 
236 void
execRESTORE_LCP_REQ(Signal * signal)237 Restore::execRESTORE_LCP_REQ(Signal* signal){
238   jamEntry();
239 
240   Uint32 err= 0;
241   RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtr();
242   Uint32 senderRef= req->senderRef;
243   Uint32 senderData= req->senderData;
244   do
245   {
246     FilePtr file_ptr;
247     if(!m_file_list.seize(file_ptr))
248     {
249       err= RestoreLcpRef::NoFileRecord;
250       break;
251     }
252 
253     if((err= init_file(req, file_ptr)))
254     {
255       break;
256     }
257 
258     open_file(signal, file_ptr);
259     return;
260   } while(0);
261 
262   RestoreLcpRef* ref= (RestoreLcpRef*)signal->getDataPtrSend();
263   ref->senderData= senderData;
264   ref->senderRef= reference();
265   ref->errorCode = err;
266   sendSignal(senderRef, GSN_RESTORE_LCP_REF, signal,
267 	     RestoreLcpRef::SignalLength, JBB);
268 }
269 
270 Uint32
init_file(const RestoreLcpReq * req,FilePtr file_ptr)271 Restore::init_file(const RestoreLcpReq* req, FilePtr file_ptr)
272 {
273   new (file_ptr.p) File();
274   file_ptr.p->m_sender_ref = req->senderRef;
275   file_ptr.p->m_sender_data = req->senderData;
276 
277   file_ptr.p->m_fd = RNIL;
278   file_ptr.p->m_file_type = BackupFormat::LCP_FILE;
279   file_ptr.p->m_status = File::FIRST_READ;
280 
281   file_ptr.p->m_lcp_no = req->lcpNo;
282   file_ptr.p->m_table_id = req->tableId;
283   file_ptr.p->m_fragment_id = req->fragmentId;
284   file_ptr.p->m_table_version = RNIL;
285 
286   file_ptr.p->m_bytes_left = 0; // Bytes read from FS
287   file_ptr.p->m_current_page_ptr_i = RNIL;
288   file_ptr.p->m_current_page_pos = 0;
289   file_ptr.p->m_current_page_index = 0;
290   file_ptr.p->m_current_file_page = 0;
291   file_ptr.p->m_outstanding_reads = 0;
292   file_ptr.p->m_outstanding_operations = 0;
293   file_ptr.p->m_rows_restored = 0;
294   LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
295   LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
296 
297   ndbassert(columns.isEmpty());
298   columns.release();
299 
300   ndbassert(pages.isEmpty());
301   pages.release();
302 
303   Uint32 buf_size= PAGES*GLOBAL_PAGE_SIZE;
304   Uint32 page_count= (buf_size+GLOBAL_PAGE_SIZE-1)/GLOBAL_PAGE_SIZE;
305   if(!pages.seize(page_count))
306   {
307     return RestoreLcpRef::OutOfDataBuffer;
308   }
309 
310   List::Iterator it;
311   for(pages.first(it); !it.isNull(); pages.next(it))
312   {
313     * it.data = RNIL;
314   }
315 
316   Uint32 err= 0;
317   for(pages.first(it); !it.isNull(); pages.next(it))
318   {
319     Ptr<GlobalPage> page_ptr;
320     if(!m_global_page_pool.seize(page_ptr))
321     {
322       err= RestoreLcpRef::OutOfReadBufferPages;
323       break;
324     }
325     * it.data = page_ptr.i;
326   }
327 
328   if(err)
329   {
330     for(pages.first(it); !it.isNull(); pages.next(it))
331     {
332       if(* it.data == RNIL)
333 	break;
334       m_global_page_pool.release(* it.data);
335     }
336   }
337   else
338   {
339     pages.first(it);
340     file_ptr.p->m_current_page_ptr_i = *it.data;
341   }
342   return err;
343 }
344 
345 void
release_file(FilePtr file_ptr)346 Restore::release_file(FilePtr file_ptr)
347 {
348   LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
349   LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
350 
351   List::Iterator it;
352   for(pages.first(it); !it.isNull(); pages.next(it))
353   {
354     if(* it.data == RNIL)
355       continue;
356     m_global_page_pool.release(* it.data);
357   }
358 
359   ndbout_c("RESTORE table: %d %lld rows applied",
360 	   file_ptr.p->m_table_id,
361 	   file_ptr.p->m_rows_restored);
362 
363   columns.release();
364   pages.release();
365   m_file_list.release(file_ptr);
366 }
367 
368 void
open_file(Signal * signal,FilePtr file_ptr)369 Restore::open_file(Signal* signal, FilePtr file_ptr)
370 {
371   signal->theData[0] = NDB_LE_StartReadLCP;
372   signal->theData[1] = file_ptr.p->m_table_id;
373   signal->theData[2] = file_ptr.p->m_fragment_id;
374   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
375 
376   FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
377   req->userReference = reference();
378   req->fileFlags = FsOpenReq::OM_READONLY | FsOpenReq::OM_GZ;
379   req->userPointer = file_ptr.i;
380 
381   FsOpenReq::setVersion(req->fileNumber, 5);
382   FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
383   FsOpenReq::v5_setLcpNo(req->fileNumber, file_ptr.p->m_lcp_no);
384   FsOpenReq::v5_setTableId(req->fileNumber, file_ptr.p->m_table_id);
385   FsOpenReq::v5_setFragmentId(req->fileNumber, file_ptr.p->m_fragment_id);
386   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
387 }
388 
389 void
execFSOPENREF(Signal * signal)390 Restore::execFSOPENREF(Signal* signal)
391 {
392   FsRef* ref= (FsRef*)signal->getDataPtr();
393   FilePtr file_ptr;
394   jamEntry();
395   m_file_pool.getPtr(file_ptr, ref->userPointer);
396 
397   Uint32 errCode= ref->errorCode;
398   Uint32 osError= ref->osErrorCode;
399 
400   RestoreLcpRef* rep= (RestoreLcpRef*)signal->getDataPtrSend();
401   rep->senderData= file_ptr.p->m_sender_data;
402   rep->errorCode = errCode;
403   rep->extra[0] = osError;
404   sendSignal(file_ptr.p->m_sender_ref,
405 	     GSN_RESTORE_LCP_REF, signal, RestoreLcpRef::SignalLength+1, JBB);
406 
407   release_file(file_ptr);
408 }
409 
410 void
execFSOPENCONF(Signal * signal)411 Restore::execFSOPENCONF(Signal* signal)
412 {
413   jamEntry();
414   FilePtr file_ptr;
415   FsConf* conf= (FsConf*)signal->getDataPtr();
416   m_file_pool.getPtr(file_ptr, conf->userPointer);
417 
418   file_ptr.p->m_fd = conf->filePointer;
419 
420   /**
421    * Start thread's
422    */
423 
424   file_ptr.p->m_status |= File::FILE_THREAD_RUNNING;
425   signal->theData[0] = RestoreContinueB::READ_FILE;
426   signal->theData[1] = file_ptr.i;
427   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
428 
429   file_ptr.p->m_status |= File::RESTORE_THREAD_RUNNING;
430   signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
431   signal->theData[1] = file_ptr.i;
432   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
433 }
434 
435 void
restore_next(Signal * signal,FilePtr file_ptr)436 Restore::restore_next(Signal* signal, FilePtr file_ptr)
437 {
438   Uint32 *data, len= 0;
439   Uint32 status = file_ptr.p->m_status;
440   Uint32 page_count = file_ptr.p->m_pages.getSize();
441   do
442   {
443     Uint32 left= file_ptr.p->m_bytes_left;
444     if(left < 8)
445     {
446       /**
447        * Not enought bytes to read header
448        */
449       break;
450     }
451     Ptr<GlobalPage> page_ptr, next_page_ptr = { 0, 0 };
452     m_global_page_pool.getPtr(page_ptr, file_ptr.p->m_current_page_ptr_i);
453     List::Iterator it;
454 
455     Uint32 pos= file_ptr.p->m_current_page_pos;
456     if(status & File::READING_RECORDS)
457     {
458       /**
459        * We are reading records
460        */
461       len= ntohl(* (page_ptr.p->data + pos)) + 1;
462     }
463     else
464     {
465       /**
466        * Section length is in 2 word
467        */
468       if(pos + 1 == GLOBAL_PAGE_SIZE_WORDS)
469       {
470 	/**
471 	 * But that's stored on next page...
472 	 *   and since we have atleast 8 bytes left in buffer
473 	 *   we can be sure that that's in buffer
474 	 */
475 	LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
476 	Uint32 next_page = file_ptr.p->m_current_page_index + 1;
477 	pages.position(it, next_page % page_count);
478 	m_global_page_pool.getPtr(next_page_ptr, * it.data);
479 	len= ntohl(* next_page_ptr.p->data);
480       }
481       else
482       {
483 	len= ntohl(* (page_ptr.p->data + pos + 1));
484       }
485     }
486 
487     if(file_ptr.p->m_status & File::FIRST_READ)
488     {
489       len= 3;
490       file_ptr.p->m_status &= ~(Uint32)File::FIRST_READ;
491     }
492 
493     if(4 * len > left)
494     {
495       /**
496        * Not enought bytes to read "record"
497        */
498       ndbout_c("records: %d len: %x left: %d",
499 	       status & File::READING_RECORDS, 4*len, left);
500 
501       if (unlikely((status & File:: FILE_THREAD_RUNNING) == 0))
502       {
503         crash_during_restore(file_ptr, __LINE__, 0);
504       }
505       len= 0;
506       break;
507     }
508 
509     /**
510      * Entire record is in buffer
511      */
512 
513     if(pos + len >= GLOBAL_PAGE_SIZE_WORDS)
514     {
515       /**
516        * But it's split over pages
517        */
518       if(next_page_ptr.p == 0)
519       {
520 	LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
521 	Uint32 next_page = file_ptr.p->m_current_page_index + 1;
522 	pages.position(it, next_page % page_count);
523 	m_global_page_pool.getPtr(next_page_ptr, * it.data);
524       }
525       file_ptr.p->m_current_page_ptr_i = next_page_ptr.i;
526       file_ptr.p->m_current_page_pos = (pos + len) - GLOBAL_PAGE_SIZE_WORDS;
527       file_ptr.p->m_current_page_index =
528 	(file_ptr.p->m_current_page_index + 1) % page_count;
529 
530       Uint32 first = (GLOBAL_PAGE_SIZE_WORDS - pos);
531       // wl4391_todo removing valgrind overlap warning for now
532       memmove(page_ptr.p, page_ptr.p->data+pos, 4 * first);
533       memcpy(page_ptr.p->data+first, next_page_ptr.p, 4 * (len - first));
534       data= page_ptr.p->data;
535     }
536     else
537     {
538       file_ptr.p->m_current_page_pos = pos + len;
539       data= page_ptr.p->data+pos;
540     }
541 
542     file_ptr.p->m_bytes_left -= 4*len;
543 
544     if(status & File::READING_RECORDS)
545     {
546       if(len == 1)
547       {
548 	file_ptr.p->m_status = status & ~(Uint32)File::READING_RECORDS;
549       }
550       else
551       {
552 	parse_record(signal, file_ptr, data, len);
553       }
554     }
555     else
556     {
557       switch(ntohl(* data)){
558       case BackupFormat::FILE_HEADER:
559 	parse_file_header(signal, file_ptr, data-3, len+3);
560 	break;
561       case BackupFormat::FRAGMENT_HEADER:
562 	file_ptr.p->m_status = status | File::READING_RECORDS;
563 	parse_fragment_header(signal, file_ptr, data, len);
564 	break;
565       case BackupFormat::FRAGMENT_FOOTER:
566 	parse_fragment_footer(signal, file_ptr, data, len);
567 	break;
568       case BackupFormat::TABLE_LIST:
569 	parse_table_list(signal, file_ptr, data, len);
570 	break;
571       case BackupFormat::TABLE_DESCRIPTION:
572 	parse_table_description(signal, file_ptr, data, len);
573 	break;
574       case BackupFormat::GCP_ENTRY:
575 	parse_gcp_entry(signal, file_ptr, data, len);
576 	break;
577       case BackupFormat::EMPTY_ENTRY:
578         // skip
579         break;
580       case 0x4e444242: // 'NDBB'
581 	if (check_file_version(signal, ntohl(* (data+2))) == 0)
582 	{
583 	  break;
584 	}
585       default:
586 	parse_error(signal, file_ptr, __LINE__, ntohl(* data));
587       }
588     }
589   } while(0);
590 
591   if(file_ptr.p->m_bytes_left == 0 && status & File::FILE_EOF)
592   {
593     file_ptr.p->m_status &= ~(Uint32)File::RESTORE_THREAD_RUNNING;
594     /**
595      * File is finished...
596      */
597     close_file(signal, file_ptr);
598     return;
599   }
600 
601   signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
602   signal->theData[1] = file_ptr.i;
603 
604   if(len)
605     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
606   else
607     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
608 }
609 
610 void
read_file(Signal * signal,FilePtr file_ptr)611 Restore::read_file(Signal* signal, FilePtr file_ptr)
612 {
613   Uint32 left= file_ptr.p->m_bytes_left;
614   Uint32 page_count = file_ptr.p->m_pages.getSize();
615   Uint32 free= GLOBAL_PAGE_SIZE * page_count - left;
616   Uint32 read_count= free/GLOBAL_PAGE_SIZE;
617 
618   if(read_count <= file_ptr.p->m_outstanding_reads)
619   {
620     signal->theData[0] = RestoreContinueB::READ_FILE;
621     signal->theData[1] = file_ptr.i;
622     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
623     return;
624   }
625 
626   read_count -= file_ptr.p->m_outstanding_reads;
627   Uint32 curr_page= file_ptr.p->m_current_page_index;
628   LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
629 
630   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
631   req->filePointer = file_ptr.p->m_fd;
632   req->userReference = reference();
633   req->userPointer = file_ptr.i;
634   req->numberOfPages = 1;
635   req->operationFlag = 0;
636   FsReadWriteReq::setFormatFlag(req->operationFlag,
637 				FsReadWriteReq::fsFormatGlobalPage);
638   FsReadWriteReq::setPartialReadFlag(req->operationFlag, 1);
639 
640   Uint32 start= (curr_page + page_count - read_count) % page_count;
641 
642   List::Iterator it;
643   pages.position(it, start);
644   do
645   {
646     file_ptr.p->m_outstanding_reads++;
647     req->varIndex = file_ptr.p->m_current_file_page++;
648     req->data.pageData[0] = *it.data;
649     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
650 	       FsReadWriteReq::FixedLength + 1, JBB);
651 
652     start++;
653     if(start == page_count)
654     {
655       start= 0;
656       pages.position(it, start);
657     }
658     else
659     {
660       pages.next(it);
661     }
662   } while(start != curr_page);
663 }
664 
665 void
execFSREADREF(Signal * signal)666 Restore::execFSREADREF(Signal * signal)
667 {
668   jamEntry();
669   SimulatedBlock::execFSREADREF(signal);
670   ndbrequire(false);
671 }
672 
673 void
execFSREADCONF(Signal * signal)674 Restore::execFSREADCONF(Signal * signal)
675 {
676   jamEntry();
677   FilePtr file_ptr;
678   FsConf* conf= (FsConf*)signal->getDataPtr();
679   m_file_pool.getPtr(file_ptr, conf->userPointer);
680 
681   file_ptr.p->m_bytes_left += conf->bytes_read;
682 
683   ndbassert(file_ptr.p->m_outstanding_reads);
684   file_ptr.p->m_outstanding_reads--;
685 
686   if(file_ptr.p->m_outstanding_reads == 0)
687   {
688     ndbassert(conf->bytes_read <= GLOBAL_PAGE_SIZE);
689     if(conf->bytes_read == GLOBAL_PAGE_SIZE)
690     {
691       read_file(signal, file_ptr);
692     }
693     else
694     {
695       file_ptr.p->m_status |= File::FILE_EOF;
696       file_ptr.p->m_status &= ~(Uint32)File::FILE_THREAD_RUNNING;
697     }
698   }
699 }
700 
701 void
close_file(Signal * signal,FilePtr file_ptr)702 Restore::close_file(Signal* signal, FilePtr file_ptr)
703 {
704   FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
705   req->filePointer = file_ptr.p->m_fd;
706   req->userPointer = file_ptr.i;
707   req->userReference = reference();
708   req->fileFlag = 0;
709   sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA);
710 }
711 
712 void
execFSCLOSEREF(Signal * signal)713 Restore::execFSCLOSEREF(Signal * signal)
714 {
715   jamEntry();
716   SimulatedBlock::execFSCLOSEREF(signal);
717   ndbrequire(false);
718 }
719 
720 void
execFSCLOSECONF(Signal * signal)721 Restore::execFSCLOSECONF(Signal * signal)
722 {
723   jamEntry();
724   FilePtr file_ptr;
725   FsConf* conf= (FsConf*)signal->getDataPtr();
726   m_file_pool.getPtr(file_ptr, conf->userPointer);
727 
728   file_ptr.p->m_fd = RNIL;
729 
730   if(file_ptr.p->m_outstanding_operations == 0)
731   {
732     jam();
733     restore_lcp_conf(signal, file_ptr);
734     return;
735   }
736 }
737 
738 void
parse_file_header(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)739 Restore::parse_file_header(Signal* signal,
740 			   FilePtr file_ptr,
741 			   const Uint32* data, Uint32 len)
742 {
743   const BackupFormat::FileHeader* fh= (BackupFormat::FileHeader*)data;
744 
745   if(memcmp(fh->Magic, "NDBBCKUP", 8) != 0)
746   {
747     parse_error(signal, file_ptr, __LINE__, *data);
748     return;
749   }
750 
751   file_ptr.p->m_lcp_version = ntohl(fh->BackupVersion);
752   if (check_file_version(signal, ntohl(fh->BackupVersion)))
753   {
754     parse_error(signal, file_ptr, __LINE__, ntohl(fh->NdbVersion));
755     return;
756   }
757   ndbassert(ntohl(fh->SectionType) == BackupFormat::FILE_HEADER);
758 
759   if(ntohl(fh->SectionLength) != len-3)
760   {
761     parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
762     return;
763   }
764 
765   if(ntohl(fh->FileType) != BackupFormat::LCP_FILE)
766   {
767     parse_error(signal, file_ptr, __LINE__, ntohl(fh->FileType));
768     return;
769   }
770 
771   if(fh->ByteOrder != 0x12345678)
772   {
773     parse_error(signal, file_ptr, __LINE__, fh->ByteOrder);
774     return;
775   }
776 }
777 
778 void
parse_table_list(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)779 Restore::parse_table_list(Signal* signal, FilePtr file_ptr,
780 			  const Uint32 *data, Uint32 len)
781 {
782   const BackupFormat::CtlFile::TableList* fh=
783     (BackupFormat::CtlFile::TableList*)data;
784 
785   if(ntohl(fh->TableIds[0]) != file_ptr.p->m_table_id)
786   {
787     parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableIds[0]));
788     return;
789   }
790 }
791 
792 void
parse_table_description(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)793 Restore::parse_table_description(Signal* signal, FilePtr file_ptr,
794 				 const Uint32 *data, Uint32 len)
795 {
796   bool lcp = file_ptr.p->is_lcp();
797   Uint32 disk= 0;
798   const BackupFormat::CtlFile::TableDescription* fh=
799     (BackupFormat::CtlFile::TableDescription*)data;
800 
801   LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
802 
803   SimplePropertiesLinearReader it(fh->DictTabInfo, len);
804   it.first();
805 
806   DictTabInfo::Table tmpTab; tmpTab.init();
807   SimpleProperties::UnpackStatus stat;
808   stat = SimpleProperties::unpack(it, &tmpTab,
809 				  DictTabInfo::TableMapping,
810 				  DictTabInfo::TableMappingSize,
811 				  true, true);
812   ndbrequire(stat == SimpleProperties::Break);
813 
814   if(tmpTab.TableId != file_ptr.p->m_table_id)
815   {
816     parse_error(signal, file_ptr, __LINE__, tmpTab.TableId);
817     return;
818   }
819 
820   Column c;
821   Uint32 colstore[sizeof(Column)/sizeof(Uint32)];
822 
823   for(Uint32 i = 0; i<tmpTab.NoOfAttributes; i++) {
824     jam();
825     DictTabInfo::Attribute tmp; tmp.init();
826     stat = SimpleProperties::unpack(it, &tmp,
827 				    DictTabInfo::AttributeMapping,
828 				    DictTabInfo::AttributeMappingSize,
829 				    true, true);
830 
831     ndbrequire(stat == SimpleProperties::Break);
832     it.next(); // Move Past EndOfAttribute
833 
834     const Uint32 arr = tmp.AttributeArraySize;
835     const Uint32 sz = 1 << tmp.AttributeSize;
836     const Uint32 sz32 = (sz * arr + 31) >> 5;
837     const bool varsize = tmp.AttributeArrayType != NDB_ARRAYTYPE_FIXED;
838 
839     c.m_id = tmp.AttributeId;
840     c.m_size = sz32;
841     c.m_flags = (tmp.AttributeKeyFlag ? Column::COL_KEY : 0);
842     c.m_flags |= (tmp.AttributeStorageType == NDB_STORAGETYPE_DISK ?
843 		  Column::COL_DISK : 0);
844 
845     if(lcp && (c.m_flags & Column::COL_DISK))
846     {
847       /**
848        * Restore does not currently handle disk attributes
849        *   which is fine as restore LCP shouldn't
850        */
851       disk++;
852       continue;
853     }
854 
855     if(!tmp.AttributeNullableFlag && !varsize)
856     {
857     }
858     else if (true) // null mask dropped in 5.1
859     {
860       c.m_flags |= (varsize ? Column::COL_VAR : 0);
861       c.m_flags |= (tmp.AttributeNullableFlag ? Column::COL_NULL : 0);
862     }
863 
864     memcpy(colstore, &c, sizeof(Column));
865     if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
866     {
867       parse_error(signal, file_ptr, __LINE__, i);
868       return;
869     }
870   }
871 
872   if(lcp)
873   {
874     if (disk)
875     {
876       c.m_id = AttributeHeader::DISK_REF;
877       c.m_size = 2;
878       c.m_flags = 0;
879       memcpy(colstore, &c, sizeof(Column));
880       if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
881       {
882 	parse_error(signal, file_ptr, __LINE__, 0);
883 	return;
884       }
885     }
886 
887     {
888       c.m_id = AttributeHeader::ROWID;
889       c.m_size = 2;
890       c.m_flags = 0;
891       memcpy(colstore, &c, sizeof(Column));
892       if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
893       {
894 	parse_error(signal, file_ptr, __LINE__, 0);
895 	return;
896       }
897     }
898 
899     if (tmpTab.RowGCIFlag)
900     {
901       c.m_id = AttributeHeader::ROW_GCI;
902       c.m_size = 2;
903       c.m_flags = 0;
904       memcpy(colstore, &c, sizeof(Column));
905       if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
906       {
907 	parse_error(signal, file_ptr, __LINE__, 0);
908 	return;
909       }
910     }
911   }
912 
913   file_ptr.p->m_table_version = tmpTab.TableVersion;
914 }
915 
916 void
parse_fragment_header(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)917 Restore::parse_fragment_header(Signal* signal, FilePtr file_ptr,
918 			       const Uint32 *data, Uint32 len)
919 {
920   const BackupFormat::DataFile::FragmentHeader* fh=
921     (BackupFormat::DataFile::FragmentHeader*)data;
922   if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
923   {
924     parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
925     return;
926   }
927 
928   if(ntohl(fh->ChecksumType) != 0)
929   {
930     parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
931     return;
932   }
933 
934   file_ptr.p->m_fragment_id = ntohl(fh->FragmentNo);
935 
936   if(file_ptr.p->is_lcp())
937   {
938     /**
939      * Temporary reset DBTUP's #disk attributes on table
940      */
941     c_tup->start_restore_lcp(file_ptr.p->m_table_id,
942 			     file_ptr.p->m_fragment_id);
943   }
944 }
945 
946 void
parse_record(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)947 Restore::parse_record(Signal* signal, FilePtr file_ptr,
948 		      const Uint32 *data, Uint32 len)
949 {
950   List::Iterator it;
951   LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
952 
953   Uint32 * const key_start = signal->getDataPtrSend()+24;
954   Uint32 * const attr_start = key_start + MAX_KEY_SIZE_IN_WORDS;
955 
956   data += 1;
957   const Uint32* const dataStart = data;
958 
959   bool disk = false;
960   bool rowid = false;
961   bool gci = false;
962   Uint32 keyLen;
963   Uint32 attrLen;
964   Local_key rowid_val;
965   Uint64 gci_val;
966   Uint32 tableId = file_ptr.p->m_table_id;
967   const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
968 
969   if (likely(file_ptr.p->m_lcp_version >= NDBD_RAW_LCP))
970   {
971     rowid = true;
972     rowid_val.m_page_no = data[0];
973     rowid_val.m_page_idx = data[1];
974     keyLen = c_tup->read_lcp_keys(tableId, data+2, len - 3, key_start);
975 
976     AttributeHeader::init(attr_start, AttributeHeader::READ_LCP, 4*(len - 3));
977     memcpy(attr_start + 1, data + 2, 4 * (len - 3));
978     attrLen = 1 + len - 3;
979   }
980   else
981   {
982     Uint32 *keyData = key_start;
983     Uint32 *attrData = attr_start;
984     union {
985       Column c;
986       Uint32 _align[sizeof(Column)/sizeof(Uint32)];
987     };
988 
989     columns.first(it);
990     while(!it.isNull())
991     {
992       _align[0] = *it.data; ndbrequire(columns.next(it));
993       _align[1] = *it.data; columns.next(it);
994 
995       if (c.m_id == AttributeHeader::ROWID)
996       {
997         rowid_val.m_page_no = data[0];
998         rowid_val.m_page_idx = data[1];
999         data += 2;
1000         rowid = true;
1001         continue;
1002       }
1003 
1004       if (c.m_id == AttributeHeader::ROW_GCI)
1005       {
1006         memcpy(&gci_val, data, 8);
1007         data += 2;
1008         gci = true;
1009         continue;
1010       }
1011 
1012       if (! (c.m_flags & (Column::COL_VAR | Column::COL_NULL)))
1013       {
1014         ndbrequire(data < dataStart + len);
1015 
1016         if(c.m_flags & Column::COL_KEY)
1017         {
1018           memcpy(keyData, data, 4*c.m_size);
1019           keyData += c.m_size;
1020         }
1021 
1022         AttributeHeader::init(attrData++, c.m_id, c.m_size << 2);
1023         memcpy(attrData, data, 4*c.m_size);
1024         attrData += c.m_size;
1025         data += c.m_size;
1026       }
1027 
1028       if(c.m_flags & Column::COL_DISK)
1029         disk= true;
1030     }
1031 
1032     // second part is data driven
1033     while (data + 2 < dataStart + len) {
1034       Uint32 sz= ntohl(*data); data++;
1035       Uint32 id= ntohl(*data); data++; // column_no
1036 
1037       ndbrequire(columns.position(it, 2 * id));
1038 
1039       _align[0] = *it.data; ndbrequire(columns.next(it));
1040       _align[1] = *it.data;
1041 
1042       Uint32 sz32 = (sz + 3) >> 2;
1043       ndbassert(c.m_flags & (Column::COL_VAR | Column::COL_NULL));
1044       if (c.m_flags & Column::COL_KEY)
1045       {
1046         memcpy(keyData, data, 4 * sz32);
1047         keyData += sz32;
1048       }
1049 
1050       AttributeHeader::init(attrData++, c.m_id, sz);
1051       memcpy(attrData, data, sz);
1052 
1053       attrData += sz32;
1054       data += sz32;
1055     }
1056 
1057     ndbrequire(data == dataStart + len - 1);
1058 
1059     ndbrequire(disk == false); // Not supported...
1060     ndbrequire(rowid == true);
1061     keyLen = Uint32(keyData - key_start);
1062     attrLen = Uint32(attrData - attr_start);
1063     if (desc->noOfKeyAttr != desc->noOfVarKeys)
1064     {
1065       reorder_key(desc, key_start, keyLen);
1066     }
1067   }
1068 
1069   LqhKeyReq * req = (LqhKeyReq *)signal->getDataPtrSend();
1070 
1071   Uint32 hashValue;
1072   if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
1073     hashValue = calulate_hash(tableId, key_start);
1074   else
1075     hashValue = md5_hash((Uint64*)key_start, keyLen);
1076 
1077   Uint32 tmp= 0;
1078   LqhKeyReq::setAttrLen(tmp, attrLen);
1079   req->attrLen = tmp;
1080 
1081   tmp= 0;
1082   LqhKeyReq::setKeyLen(tmp, keyLen);
1083   LqhKeyReq::setLastReplicaNo(tmp, 0);
1084   /* ---------------------------------------------------------------------- */
1085   // Indicate Application Reference is present in bit 15
1086   /* ---------------------------------------------------------------------- */
1087   LqhKeyReq::setApplicationAddressFlag(tmp, 0);
1088   LqhKeyReq::setDirtyFlag(tmp, 1);
1089   LqhKeyReq::setSimpleFlag(tmp, 1);
1090   LqhKeyReq::setOperation(tmp, ZINSERT);
1091   LqhKeyReq::setSameClientAndTcFlag(tmp, 0);
1092   LqhKeyReq::setAIInLqhKeyReq(tmp, 0);
1093   LqhKeyReq::setNoDiskFlag(tmp, disk ? 0 : 1);
1094   LqhKeyReq::setRowidFlag(tmp, 1);
1095   LqhKeyReq::setGCIFlag(tmp, gci);
1096   req->clientConnectPtr = file_ptr.i;
1097   req->hashValue = hashValue;
1098   req->requestInfo = tmp;
1099   req->tcBlockref = reference();
1100   req->savePointId = 0;
1101   req->tableSchemaVersion = file_ptr.p->m_table_id +
1102     (file_ptr.p->m_table_version << 16);
1103   req->fragmentData = file_ptr.p->m_fragment_id;
1104   req->transId1 = 0;
1105   req->transId2 = 0;
1106   req->scanInfo = 0;
1107   memcpy(req->variableData, key_start, 16);
1108   Uint32 pos = keyLen > 4 ? 4 : keyLen;
1109   req->variableData[pos++] = rowid_val.m_page_no;
1110   req->variableData[pos++] = rowid_val.m_page_idx;
1111   if (gci)
1112     req->variableData[pos++] = (Uint32)gci_val;
1113   file_ptr.p->m_outstanding_operations++;
1114   EXECUTE_DIRECT(DBLQH, GSN_LQHKEYREQ, signal,
1115 		 LqhKeyReq::FixedSignalLength+pos);
1116 
1117   if(keyLen > 4)
1118   {
1119     c_lqh->receive_keyinfo(signal,
1120 			   key_start + 4,
1121 			   keyLen - 4);
1122   }
1123 
1124   c_lqh->receive_attrinfo(signal, attr_start, attrLen);
1125 }
1126 
1127 void
reorder_key(const KeyDescriptor * desc,Uint32 * data,Uint32 len)1128 Restore::reorder_key(const KeyDescriptor* desc,
1129 		     Uint32 *data, Uint32 len)
1130 {
1131   Uint32 i;
1132   Uint32 *var= data;
1133   Uint32 Tmp[MAX_KEY_SIZE_IN_WORDS];
1134   for(i = 0; i<desc->noOfKeyAttr; i++)
1135   {
1136     Uint32 attr = desc->keyAttr[i].attributeDescriptor;
1137     switch(AttributeDescriptor::getArrayType(attr)){
1138     case NDB_ARRAYTYPE_FIXED:
1139       var += AttributeDescriptor::getSizeInWords(attr);
1140     }
1141   }
1142 
1143   Uint32 *dst = Tmp;
1144   Uint32 *src = data;
1145   for(i = 0; i<desc->noOfKeyAttr; i++)
1146   {
1147     Uint32 sz;
1148     Uint32 attr = desc->keyAttr[i].attributeDescriptor;
1149     switch(AttributeDescriptor::getArrayType(attr)){
1150     case NDB_ARRAYTYPE_FIXED:
1151       sz = AttributeDescriptor::getSizeInWords(attr);
1152       memcpy(dst, src, 4 * sz);
1153       src += sz;
1154       break;
1155     case NDB_ARRAYTYPE_SHORT_VAR:
1156       sz = (1 + ((Uint8*)var)[0] + 3) >> 2;
1157       memcpy(dst, var, 4 * sz);
1158       var += sz;
1159       break;
1160     case NDB_ARRAYTYPE_MEDIUM_VAR:
1161       sz = (2 + ((Uint8*)var)[0] +  256*((Uint8*)var)[1] + 3) >> 2;
1162       memcpy(dst, var, 4 * sz);
1163       var += sz;
1164       break;
1165     default:
1166       ndbrequire(false);
1167       sz = 0;
1168     }
1169     dst += sz;
1170   }
1171   ndbassert((Uint32) (dst - Tmp) == len);
1172   memcpy(data, Tmp, 4*len);
1173 }
1174 
1175 Uint32
calulate_hash(Uint32 tableId,const Uint32 * src)1176 Restore::calulate_hash(Uint32 tableId, const Uint32 *src)
1177 {
1178   jam();
1179   Uint64 Tmp[(MAX_KEY_SIZE_IN_WORDS*MAX_XFRM_MULTIPLY) >> 1];
1180   Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
1181   Uint32 keyLen = xfrm_key(tableId, src, (Uint32*)Tmp, sizeof(Tmp) >> 2,
1182 			   keyPartLen);
1183   ndbrequire(keyLen);
1184 
1185   return md5_hash(Tmp, keyLen);
1186 }
1187 
1188 void
execLQHKEYREF(Signal * signal)1189 Restore::execLQHKEYREF(Signal* signal)
1190 {
1191   FilePtr file_ptr;
1192   LqhKeyRef* ref = (LqhKeyRef*)signal->getDataPtr();
1193   m_file_pool.getPtr(file_ptr, ref->connectPtr);
1194 
1195   crash_during_restore(file_ptr, __LINE__, ref->errorCode);
1196   ndbrequire(false);
1197 }
1198 
1199 void
crash_during_restore(FilePtr file_ptr,Uint32 line,Uint32 errCode)1200 Restore::crash_during_restore(FilePtr file_ptr, Uint32 line, Uint32 errCode)
1201 {
1202   char buf[255], name[100];
1203   BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
1204 		       file_ptr.p->m_lcp_no,
1205 		       file_ptr.p->m_table_id,
1206 		       file_ptr.p->m_fragment_id);
1207 
1208   if (errCode)
1209   {
1210     BaseString::snprintf(buf, sizeof(buf),
1211                          "Error %d (line: %u) during restore of  %s",
1212                          errCode, line, name);
1213   }
1214   else
1215   {
1216     BaseString::snprintf(buf, sizeof(buf),
1217                          "Error (line %u) during restore of  %s",
1218                          line, name);
1219   }
1220   progError(__LINE__, NDBD_EXIT_INVALID_LCP_FILE, buf);
1221 }
1222 
1223 void
execLQHKEYCONF(Signal * signal)1224 Restore::execLQHKEYCONF(Signal* signal)
1225 {
1226   FilePtr file_ptr;
1227   LqhKeyConf * conf = (LqhKeyConf *)signal->getDataPtr();
1228   m_file_pool.getPtr(file_ptr, conf->opPtr);
1229 
1230   ndbassert(file_ptr.p->m_outstanding_operations);
1231   file_ptr.p->m_outstanding_operations--;
1232   file_ptr.p->m_rows_restored++;
1233   if(file_ptr.p->m_outstanding_operations == 0 && file_ptr.p->m_fd == RNIL)
1234   {
1235     jam();
1236     restore_lcp_conf(signal, file_ptr);
1237     return;
1238   }
1239 }
1240 
1241 void
restore_lcp_conf(Signal * signal,FilePtr file_ptr)1242 Restore::restore_lcp_conf(Signal* signal, FilePtr file_ptr)
1243 {
1244   RestoreLcpConf* rep= (RestoreLcpConf*)signal->getDataPtrSend();
1245   rep->senderData= file_ptr.p->m_sender_data;
1246   if(file_ptr.p->is_lcp())
1247   {
1248     /**
1249      * Temporary reset DBTUP's #disk attributes on table
1250      *
1251      * TUP will send RESTORE_LCP_CONF
1252      */
1253     c_tup->complete_restore_lcp(signal,
1254                                 file_ptr.p->m_sender_ref,
1255                                 file_ptr.p->m_sender_data,
1256                                 file_ptr.p->m_table_id,
1257 				file_ptr.p->m_fragment_id);
1258   }
1259   else
1260   {
1261     sendSignal(file_ptr.p->m_sender_ref,
1262                GSN_RESTORE_LCP_CONF, signal,
1263                RestoreLcpConf::SignalLength, JBB);
1264   }
1265 
1266   signal->theData[0] = NDB_LE_ReadLCPComplete;
1267   signal->theData[1] = file_ptr.p->m_table_id;
1268   signal->theData[2] = file_ptr.p->m_fragment_id;
1269   signal->theData[3] = Uint32(file_ptr.p->m_rows_restored >> 32);
1270   signal->theData[4] = Uint32(file_ptr.p->m_rows_restored);
1271   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
1272 
1273   release_file(file_ptr);
1274 }
1275 
1276 void
parse_fragment_footer(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)1277 Restore::parse_fragment_footer(Signal* signal, FilePtr file_ptr,
1278 			       const Uint32 *data, Uint32 len)
1279 {
1280   const BackupFormat::DataFile::FragmentFooter* fh=
1281     (BackupFormat::DataFile::FragmentFooter*)data;
1282   if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
1283   {
1284     parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
1285     return;
1286   }
1287 
1288   if(ntohl(fh->Checksum) != 0)
1289   {
1290     parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
1291     return;
1292   }
1293 }
1294 
1295 void
parse_gcp_entry(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)1296 Restore::parse_gcp_entry(Signal* signal, FilePtr file_ptr,
1297 			 const Uint32 *data, Uint32 len)
1298 {
1299 
1300 }
1301 
1302 void
parse_error(Signal * signal,FilePtr file_ptr,Uint32 line,Uint32 extra)1303 Restore::parse_error(Signal* signal,
1304 		     FilePtr file_ptr, Uint32 line, Uint32 extra)
1305 {
1306   char buf[255], name[100];
1307   BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
1308 		       file_ptr.p->m_lcp_no,
1309 		       file_ptr.p->m_table_id,
1310 		       file_ptr.p->m_fragment_id);
1311 
1312   BaseString::snprintf(buf, sizeof(buf),
1313 		       "Parse error in file: %s, extra: %d",
1314 		       name, extra);
1315 
1316   progError(line, NDBD_EXIT_INVALID_LCP_FILE, buf);
1317   ndbrequire(false);
1318 }
1319 
1320 NdbOut&
operator <<(NdbOut & ndbout,const Restore::Column & col)1321 operator << (NdbOut& ndbout, const Restore::Column& col)
1322 {
1323   ndbout << "[ Col: id: " << col.m_id
1324 	 << " size: " << col.m_size
1325 	 << " key: " << (Uint32)(col.m_flags & Restore::Column::COL_KEY)
1326 	 << " variable: " << (Uint32)(col.m_flags & Restore::Column::COL_VAR)
1327 	 << " null: " << (Uint32)(col.m_flags & Restore::Column::COL_NULL)
1328 	 << " disk: " << (Uint32)(col.m_flags & Restore::Column::COL_DISK)
1329 	 << "]";
1330 
1331   return ndbout;
1332 }
1333 
1334 int
check_file_version(Signal * signal,Uint32 file_version)1335 Restore::check_file_version(Signal* signal, Uint32 file_version)
1336 {
1337   if (file_version < MAKE_VERSION(5,1,6))
1338   {
1339     char buf[255];
1340     char verbuf[255];
1341     ndbGetVersionString(file_version, 0, 0, verbuf, sizeof(verbuf));
1342     BaseString::snprintf(buf, sizeof(buf),
1343 			 "Unsupported version of LCP files found on disk, "
1344 			 " found: %s", verbuf);
1345 
1346     progError(__LINE__,
1347 	      NDBD_EXIT_SR_RESTARTCONFLICT,
1348 	      buf);
1349     return -1;
1350   }
1351   return 0;
1352 }
1353