1 /*
2 Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "restore.hpp"
26 #include <signaldata/FsRef.hpp>
27 #include <signaldata/FsConf.hpp>
28 #include <signaldata/FsOpenReq.hpp>
29 #include <signaldata/FsCloseReq.hpp>
30 #include <signaldata/FsReadWriteReq.hpp>
31 #include <signaldata/RestoreImpl.hpp>
32 #include <signaldata/DictTabInfo.hpp>
33 #include <signaldata/KeyInfo.hpp>
34 #include <signaldata/AttrInfo.hpp>
35 #include <signaldata/LqhKey.hpp>
36 #include <AttributeHeader.hpp>
37 #include <md5_hash.hpp>
38 #include <dblqh/Dblqh.hpp>
39 #include <dbtup/Dbtup.hpp>
40 #include <KeyDescriptor.hpp>
41
42 #define PAGES LCP_RESTORE_BUFFER
43
Restore(Block_context & ctx,Uint32 instanceNumber)44 Restore::Restore(Block_context& ctx, Uint32 instanceNumber) :
45 SimulatedBlock(RESTORE, ctx, instanceNumber),
46 m_file_list(m_file_pool),
47 m_file_hash(m_file_pool)
48 {
49 BLOCK_CONSTRUCTOR(Restore);
50
51 // Add received signals
52 addRecSignal(GSN_STTOR, &Restore::execSTTOR);
53 addRecSignal(GSN_DUMP_STATE_ORD, &Restore::execDUMP_STATE_ORD);
54 addRecSignal(GSN_CONTINUEB, &Restore::execCONTINUEB);
55 addRecSignal(GSN_READ_CONFIG_REQ, &Restore::execREAD_CONFIG_REQ, true);
56
57 addRecSignal(GSN_RESTORE_LCP_REQ, &Restore::execRESTORE_LCP_REQ);
58
59 addRecSignal(GSN_FSOPENREF, &Restore::execFSOPENREF, true);
60 addRecSignal(GSN_FSOPENCONF, &Restore::execFSOPENCONF);
61 addRecSignal(GSN_FSREADREF, &Restore::execFSREADREF, true);
62 addRecSignal(GSN_FSREADCONF, &Restore::execFSREADCONF);
63 addRecSignal(GSN_FSCLOSEREF, &Restore::execFSCLOSEREF, true);
64 addRecSignal(GSN_FSCLOSECONF, &Restore::execFSCLOSECONF);
65
66 addRecSignal(GSN_LQHKEYREF, &Restore::execLQHKEYREF);
67 addRecSignal(GSN_LQHKEYCONF, &Restore::execLQHKEYCONF);
68
69 ndbrequire(sizeof(Column) == 8);
70 }
71
~Restore()72 Restore::~Restore()
73 {
74 }
75
BLOCK_FUNCTIONS(Restore)76 BLOCK_FUNCTIONS(Restore)
77
78 void
79 Restore::execSTTOR(Signal* signal)
80 {
81 jamEntry();
82
83 c_lqh = (Dblqh*)globalData.getBlock(DBLQH, instance());
84 c_tup = (Dbtup*)globalData.getBlock(DBTUP, instance());
85 ndbrequire(c_lqh != 0 && c_tup != 0);
86 sendSTTORRY(signal);
87
88 return;
89 }//Restore::execNDB_STTOR()
90
91 void
execREAD_CONFIG_REQ(Signal * signal)92 Restore::execREAD_CONFIG_REQ(Signal* signal)
93 {
94 jamEntry();
95 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
96 Uint32 ref = req->senderRef;
97 Uint32 senderData = req->senderData;
98 ndbrequire(req->noOfParameters == 0);
99
100 const ndb_mgm_configuration_iterator * p =
101 m_ctx.m_config.getOwnConfigIterator();
102 ndbrequire(p != 0);
103
104 #if 0
105 Uint32 noBackups = 0, noTables = 0, noAttribs = 0;
106 ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &m_diskless));
107 ndb_mgm_get_int_parameter(p, CFG_DB_PARALLEL_BACKUPS, &noBackups);
108 // ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, &noTables));
109 ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &noTables));
110 ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES, &noAttribs));
111
112 noAttribs++; //RT 527 bug fix
113
114 c_backupPool.setSize(noBackups);
115 c_backupFilePool.setSize(3 * noBackups);
116 c_tablePool.setSize(noBackups * noTables);
117 c_attributePool.setSize(noBackups * noAttribs);
118 c_triggerPool.setSize(noBackups * 3 * noTables);
119
120 // 2 = no of replicas
121 c_fragmentPool.setSize(noBackups * NO_OF_FRAG_PER_NODE * noTables);
122
123 Uint32 szMem = 0;
124 ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MEM, &szMem);
125 Uint32 noPages = (szMem + sizeof(Page32) - 1) / sizeof(Page32);
126 // We need to allocate an additional of 2 pages. 1 page because of a bug in
127 // ArrayPool and another one for DICTTAINFO.
128 c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2);
129
130 Uint32 szDataBuf = (2 * 1024 * 1024);
131 Uint32 szLogBuf = (2 * 1024 * 1024);
132 Uint32 szWrite = 32768;
133 ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf);
134 ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
135 ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
136
137 c_defaults.m_logBufferSize = szLogBuf;
138 c_defaults.m_dataBufferSize = szDataBuf;
139 c_defaults.m_minWriteSize = szWrite;
140 c_defaults.m_maxWriteSize = szWrite;
141
142 { // Init all tables
143 ArrayList<Table> tables(c_tablePool);
144 TablePtr ptr;
145 while(tables.seize(ptr)){
146 new (ptr.p) Table(c_attributePool, c_fragmentPool);
147 }
148 tables.release();
149 }
150
151 {
152 ArrayList<BackupFile> ops(c_backupFilePool);
153 BackupFilePtr ptr;
154 while(ops.seize(ptr)){
155 new (ptr.p) BackupFile(* this, c_pagePool);
156 }
157 ops.release();
158 }
159
160 {
161 ArrayList<BackupRecord> recs(c_backupPool);
162 BackupRecordPtr ptr;
163 while(recs.seize(ptr)){
164 new (ptr.p) BackupRecord(* this, c_pagePool, c_tablePool,
165 c_backupFilePool, c_triggerPool);
166 }
167 recs.release();
168 }
169
170 // Initialize BAT for interface to file system
171 {
172 Page32Ptr p;
173 ndbrequire(c_pagePool.seizeId(p, 0));
174 c_startOfPages = (Uint32 *)p.p;
175 c_pagePool.release(p);
176
177 NewVARIABLE* bat = allocateBat(1);
178 bat[0].WA = c_startOfPages;
179 bat[0].nrr = c_pagePool.getSize()*sizeof(Page32)/sizeof(Uint32);
180 }
181 #endif
182 m_file_pool.setSize(1);
183 Uint32 cnt = 2*MAX_ATTRIBUTES_IN_TABLE;
184 cnt += PAGES;
185 cnt += List::getSegmentSize()-1;
186 cnt /= List::getSegmentSize();
187 cnt += 2;
188 m_databuffer_pool.setSize(cnt);
189
190 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
191 conf->senderRef = reference();
192 conf->senderData = senderData;
193 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
194 ReadConfigConf::SignalLength, JBB);
195 }
196
197 void
sendSTTORRY(Signal * signal)198 Restore::sendSTTORRY(Signal* signal){
199 signal->theData[0] = 0;
200 signal->theData[3] = 1;
201 signal->theData[4] = 3;
202 signal->theData[5] = 255; // No more start phases from missra
203 BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : RESTORE_REF;
204 sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
205 }
206
207 void
execCONTINUEB(Signal * signal)208 Restore::execCONTINUEB(Signal* signal){
209 jamEntry();
210
211 switch(signal->theData[0]){
212 case RestoreContinueB::RESTORE_NEXT:
213 {
214 FilePtr file_ptr;
215 m_file_pool.getPtr(file_ptr, signal->theData[1]);
216 restore_next(signal, file_ptr);
217 return;
218 }
219 case RestoreContinueB::READ_FILE:
220 {
221 FilePtr file_ptr;
222 m_file_pool.getPtr(file_ptr, signal->theData[1]);
223 read_file(signal, file_ptr);
224 return;
225 }
226 default:
227 ndbrequire(false);
228 }
229 }
230
231 void
execDUMP_STATE_ORD(Signal * signal)232 Restore::execDUMP_STATE_ORD(Signal* signal){
233 jamEntry();
234 }
235
236 void
execRESTORE_LCP_REQ(Signal * signal)237 Restore::execRESTORE_LCP_REQ(Signal* signal){
238 jamEntry();
239
240 Uint32 err= 0;
241 RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtr();
242 Uint32 senderRef= req->senderRef;
243 Uint32 senderData= req->senderData;
244 do
245 {
246 FilePtr file_ptr;
247 if(!m_file_list.seize(file_ptr))
248 {
249 err= RestoreLcpRef::NoFileRecord;
250 break;
251 }
252
253 if((err= init_file(req, file_ptr)))
254 {
255 break;
256 }
257
258 open_file(signal, file_ptr);
259 return;
260 } while(0);
261
262 RestoreLcpRef* ref= (RestoreLcpRef*)signal->getDataPtrSend();
263 ref->senderData= senderData;
264 ref->senderRef= reference();
265 ref->errorCode = err;
266 sendSignal(senderRef, GSN_RESTORE_LCP_REF, signal,
267 RestoreLcpRef::SignalLength, JBB);
268 }
269
270 Uint32
init_file(const RestoreLcpReq * req,FilePtr file_ptr)271 Restore::init_file(const RestoreLcpReq* req, FilePtr file_ptr)
272 {
273 new (file_ptr.p) File();
274 file_ptr.p->m_sender_ref = req->senderRef;
275 file_ptr.p->m_sender_data = req->senderData;
276
277 file_ptr.p->m_fd = RNIL;
278 file_ptr.p->m_file_type = BackupFormat::LCP_FILE;
279 file_ptr.p->m_status = File::FIRST_READ;
280
281 file_ptr.p->m_lcp_no = req->lcpNo;
282 file_ptr.p->m_table_id = req->tableId;
283 file_ptr.p->m_fragment_id = req->fragmentId;
284 file_ptr.p->m_table_version = RNIL;
285
286 file_ptr.p->m_bytes_left = 0; // Bytes read from FS
287 file_ptr.p->m_current_page_ptr_i = RNIL;
288 file_ptr.p->m_current_page_pos = 0;
289 file_ptr.p->m_current_page_index = 0;
290 file_ptr.p->m_current_file_page = 0;
291 file_ptr.p->m_outstanding_reads = 0;
292 file_ptr.p->m_outstanding_operations = 0;
293 file_ptr.p->m_rows_restored = 0;
294 LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
295 LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
296
297 ndbassert(columns.isEmpty());
298 columns.release();
299
300 ndbassert(pages.isEmpty());
301 pages.release();
302
303 Uint32 buf_size= PAGES*GLOBAL_PAGE_SIZE;
304 Uint32 page_count= (buf_size+GLOBAL_PAGE_SIZE-1)/GLOBAL_PAGE_SIZE;
305 if(!pages.seize(page_count))
306 {
307 return RestoreLcpRef::OutOfDataBuffer;
308 }
309
310 List::Iterator it;
311 for(pages.first(it); !it.isNull(); pages.next(it))
312 {
313 * it.data = RNIL;
314 }
315
316 Uint32 err= 0;
317 for(pages.first(it); !it.isNull(); pages.next(it))
318 {
319 Ptr<GlobalPage> page_ptr;
320 if(!m_global_page_pool.seize(page_ptr))
321 {
322 err= RestoreLcpRef::OutOfReadBufferPages;
323 break;
324 }
325 * it.data = page_ptr.i;
326 }
327
328 if(err)
329 {
330 for(pages.first(it); !it.isNull(); pages.next(it))
331 {
332 if(* it.data == RNIL)
333 break;
334 m_global_page_pool.release(* it.data);
335 }
336 }
337 else
338 {
339 pages.first(it);
340 file_ptr.p->m_current_page_ptr_i = *it.data;
341 }
342 return err;
343 }
344
345 void
release_file(FilePtr file_ptr)346 Restore::release_file(FilePtr file_ptr)
347 {
348 LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
349 LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
350
351 List::Iterator it;
352 for(pages.first(it); !it.isNull(); pages.next(it))
353 {
354 if(* it.data == RNIL)
355 continue;
356 m_global_page_pool.release(* it.data);
357 }
358
359 ndbout_c("RESTORE table: %d %lld rows applied",
360 file_ptr.p->m_table_id,
361 file_ptr.p->m_rows_restored);
362
363 columns.release();
364 pages.release();
365 m_file_list.release(file_ptr);
366 }
367
368 void
open_file(Signal * signal,FilePtr file_ptr)369 Restore::open_file(Signal* signal, FilePtr file_ptr)
370 {
371 signal->theData[0] = NDB_LE_StartReadLCP;
372 signal->theData[1] = file_ptr.p->m_table_id;
373 signal->theData[2] = file_ptr.p->m_fragment_id;
374 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
375
376 FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
377 req->userReference = reference();
378 req->fileFlags = FsOpenReq::OM_READONLY | FsOpenReq::OM_GZ;
379 req->userPointer = file_ptr.i;
380
381 FsOpenReq::setVersion(req->fileNumber, 5);
382 FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
383 FsOpenReq::v5_setLcpNo(req->fileNumber, file_ptr.p->m_lcp_no);
384 FsOpenReq::v5_setTableId(req->fileNumber, file_ptr.p->m_table_id);
385 FsOpenReq::v5_setFragmentId(req->fileNumber, file_ptr.p->m_fragment_id);
386 sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
387 }
388
389 void
execFSOPENREF(Signal * signal)390 Restore::execFSOPENREF(Signal* signal)
391 {
392 FsRef* ref= (FsRef*)signal->getDataPtr();
393 FilePtr file_ptr;
394 jamEntry();
395 m_file_pool.getPtr(file_ptr, ref->userPointer);
396
397 Uint32 errCode= ref->errorCode;
398 Uint32 osError= ref->osErrorCode;
399
400 RestoreLcpRef* rep= (RestoreLcpRef*)signal->getDataPtrSend();
401 rep->senderData= file_ptr.p->m_sender_data;
402 rep->errorCode = errCode;
403 rep->extra[0] = osError;
404 sendSignal(file_ptr.p->m_sender_ref,
405 GSN_RESTORE_LCP_REF, signal, RestoreLcpRef::SignalLength+1, JBB);
406
407 release_file(file_ptr);
408 }
409
410 void
execFSOPENCONF(Signal * signal)411 Restore::execFSOPENCONF(Signal* signal)
412 {
413 jamEntry();
414 FilePtr file_ptr;
415 FsConf* conf= (FsConf*)signal->getDataPtr();
416 m_file_pool.getPtr(file_ptr, conf->userPointer);
417
418 file_ptr.p->m_fd = conf->filePointer;
419
420 /**
421 * Start thread's
422 */
423
424 file_ptr.p->m_status |= File::FILE_THREAD_RUNNING;
425 signal->theData[0] = RestoreContinueB::READ_FILE;
426 signal->theData[1] = file_ptr.i;
427 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
428
429 file_ptr.p->m_status |= File::RESTORE_THREAD_RUNNING;
430 signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
431 signal->theData[1] = file_ptr.i;
432 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
433 }
434
435 void
restore_next(Signal * signal,FilePtr file_ptr)436 Restore::restore_next(Signal* signal, FilePtr file_ptr)
437 {
438 Uint32 *data, len= 0;
439 Uint32 status = file_ptr.p->m_status;
440 Uint32 page_count = file_ptr.p->m_pages.getSize();
441 do
442 {
443 Uint32 left= file_ptr.p->m_bytes_left;
444 if(left < 8)
445 {
446 /**
447 * Not enought bytes to read header
448 */
449 break;
450 }
451 Ptr<GlobalPage> page_ptr, next_page_ptr = { 0, 0 };
452 m_global_page_pool.getPtr(page_ptr, file_ptr.p->m_current_page_ptr_i);
453 List::Iterator it;
454
455 Uint32 pos= file_ptr.p->m_current_page_pos;
456 if(status & File::READING_RECORDS)
457 {
458 /**
459 * We are reading records
460 */
461 len= ntohl(* (page_ptr.p->data + pos)) + 1;
462 }
463 else
464 {
465 /**
466 * Section length is in 2 word
467 */
468 if(pos + 1 == GLOBAL_PAGE_SIZE_WORDS)
469 {
470 /**
471 * But that's stored on next page...
472 * and since we have atleast 8 bytes left in buffer
473 * we can be sure that that's in buffer
474 */
475 LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
476 Uint32 next_page = file_ptr.p->m_current_page_index + 1;
477 pages.position(it, next_page % page_count);
478 m_global_page_pool.getPtr(next_page_ptr, * it.data);
479 len= ntohl(* next_page_ptr.p->data);
480 }
481 else
482 {
483 len= ntohl(* (page_ptr.p->data + pos + 1));
484 }
485 }
486
487 if(file_ptr.p->m_status & File::FIRST_READ)
488 {
489 len= 3;
490 file_ptr.p->m_status &= ~(Uint32)File::FIRST_READ;
491 }
492
493 if(4 * len > left)
494 {
495 /**
496 * Not enought bytes to read "record"
497 */
498 ndbout_c("records: %d len: %x left: %d",
499 status & File::READING_RECORDS, 4*len, left);
500
501 if (unlikely((status & File:: FILE_THREAD_RUNNING) == 0))
502 {
503 crash_during_restore(file_ptr, __LINE__, 0);
504 }
505 len= 0;
506 break;
507 }
508
509 /**
510 * Entire record is in buffer
511 */
512
513 if(pos + len >= GLOBAL_PAGE_SIZE_WORDS)
514 {
515 /**
516 * But it's split over pages
517 */
518 if(next_page_ptr.p == 0)
519 {
520 LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
521 Uint32 next_page = file_ptr.p->m_current_page_index + 1;
522 pages.position(it, next_page % page_count);
523 m_global_page_pool.getPtr(next_page_ptr, * it.data);
524 }
525 file_ptr.p->m_current_page_ptr_i = next_page_ptr.i;
526 file_ptr.p->m_current_page_pos = (pos + len) - GLOBAL_PAGE_SIZE_WORDS;
527 file_ptr.p->m_current_page_index =
528 (file_ptr.p->m_current_page_index + 1) % page_count;
529
530 Uint32 first = (GLOBAL_PAGE_SIZE_WORDS - pos);
531 // wl4391_todo removing valgrind overlap warning for now
532 memmove(page_ptr.p, page_ptr.p->data+pos, 4 * first);
533 memcpy(page_ptr.p->data+first, next_page_ptr.p, 4 * (len - first));
534 data= page_ptr.p->data;
535 }
536 else
537 {
538 file_ptr.p->m_current_page_pos = pos + len;
539 data= page_ptr.p->data+pos;
540 }
541
542 file_ptr.p->m_bytes_left -= 4*len;
543
544 if(status & File::READING_RECORDS)
545 {
546 if(len == 1)
547 {
548 file_ptr.p->m_status = status & ~(Uint32)File::READING_RECORDS;
549 }
550 else
551 {
552 parse_record(signal, file_ptr, data, len);
553 }
554 }
555 else
556 {
557 switch(ntohl(* data)){
558 case BackupFormat::FILE_HEADER:
559 parse_file_header(signal, file_ptr, data-3, len+3);
560 break;
561 case BackupFormat::FRAGMENT_HEADER:
562 file_ptr.p->m_status = status | File::READING_RECORDS;
563 parse_fragment_header(signal, file_ptr, data, len);
564 break;
565 case BackupFormat::FRAGMENT_FOOTER:
566 parse_fragment_footer(signal, file_ptr, data, len);
567 break;
568 case BackupFormat::TABLE_LIST:
569 parse_table_list(signal, file_ptr, data, len);
570 break;
571 case BackupFormat::TABLE_DESCRIPTION:
572 parse_table_description(signal, file_ptr, data, len);
573 break;
574 case BackupFormat::GCP_ENTRY:
575 parse_gcp_entry(signal, file_ptr, data, len);
576 break;
577 case BackupFormat::EMPTY_ENTRY:
578 // skip
579 break;
580 case 0x4e444242: // 'NDBB'
581 if (check_file_version(signal, ntohl(* (data+2))) == 0)
582 {
583 break;
584 }
585 default:
586 parse_error(signal, file_ptr, __LINE__, ntohl(* data));
587 }
588 }
589 } while(0);
590
591 if(file_ptr.p->m_bytes_left == 0 && status & File::FILE_EOF)
592 {
593 file_ptr.p->m_status &= ~(Uint32)File::RESTORE_THREAD_RUNNING;
594 /**
595 * File is finished...
596 */
597 close_file(signal, file_ptr);
598 return;
599 }
600
601 signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
602 signal->theData[1] = file_ptr.i;
603
604 if(len)
605 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
606 else
607 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
608 }
609
610 void
read_file(Signal * signal,FilePtr file_ptr)611 Restore::read_file(Signal* signal, FilePtr file_ptr)
612 {
613 Uint32 left= file_ptr.p->m_bytes_left;
614 Uint32 page_count = file_ptr.p->m_pages.getSize();
615 Uint32 free= GLOBAL_PAGE_SIZE * page_count - left;
616 Uint32 read_count= free/GLOBAL_PAGE_SIZE;
617
618 if(read_count <= file_ptr.p->m_outstanding_reads)
619 {
620 signal->theData[0] = RestoreContinueB::READ_FILE;
621 signal->theData[1] = file_ptr.i;
622 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
623 return;
624 }
625
626 read_count -= file_ptr.p->m_outstanding_reads;
627 Uint32 curr_page= file_ptr.p->m_current_page_index;
628 LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
629
630 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
631 req->filePointer = file_ptr.p->m_fd;
632 req->userReference = reference();
633 req->userPointer = file_ptr.i;
634 req->numberOfPages = 1;
635 req->operationFlag = 0;
636 FsReadWriteReq::setFormatFlag(req->operationFlag,
637 FsReadWriteReq::fsFormatGlobalPage);
638 FsReadWriteReq::setPartialReadFlag(req->operationFlag, 1);
639
640 Uint32 start= (curr_page + page_count - read_count) % page_count;
641
642 List::Iterator it;
643 pages.position(it, start);
644 do
645 {
646 file_ptr.p->m_outstanding_reads++;
647 req->varIndex = file_ptr.p->m_current_file_page++;
648 req->data.pageData[0] = *it.data;
649 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
650 FsReadWriteReq::FixedLength + 1, JBB);
651
652 start++;
653 if(start == page_count)
654 {
655 start= 0;
656 pages.position(it, start);
657 }
658 else
659 {
660 pages.next(it);
661 }
662 } while(start != curr_page);
663 }
664
665 void
execFSREADREF(Signal * signal)666 Restore::execFSREADREF(Signal * signal)
667 {
668 jamEntry();
669 SimulatedBlock::execFSREADREF(signal);
670 ndbrequire(false);
671 }
672
673 void
execFSREADCONF(Signal * signal)674 Restore::execFSREADCONF(Signal * signal)
675 {
676 jamEntry();
677 FilePtr file_ptr;
678 FsConf* conf= (FsConf*)signal->getDataPtr();
679 m_file_pool.getPtr(file_ptr, conf->userPointer);
680
681 file_ptr.p->m_bytes_left += conf->bytes_read;
682
683 ndbassert(file_ptr.p->m_outstanding_reads);
684 file_ptr.p->m_outstanding_reads--;
685
686 if(file_ptr.p->m_outstanding_reads == 0)
687 {
688 ndbassert(conf->bytes_read <= GLOBAL_PAGE_SIZE);
689 if(conf->bytes_read == GLOBAL_PAGE_SIZE)
690 {
691 read_file(signal, file_ptr);
692 }
693 else
694 {
695 file_ptr.p->m_status |= File::FILE_EOF;
696 file_ptr.p->m_status &= ~(Uint32)File::FILE_THREAD_RUNNING;
697 }
698 }
699 }
700
701 void
close_file(Signal * signal,FilePtr file_ptr)702 Restore::close_file(Signal* signal, FilePtr file_ptr)
703 {
704 FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
705 req->filePointer = file_ptr.p->m_fd;
706 req->userPointer = file_ptr.i;
707 req->userReference = reference();
708 req->fileFlag = 0;
709 sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA);
710 }
711
712 void
execFSCLOSEREF(Signal * signal)713 Restore::execFSCLOSEREF(Signal * signal)
714 {
715 jamEntry();
716 SimulatedBlock::execFSCLOSEREF(signal);
717 ndbrequire(false);
718 }
719
720 void
execFSCLOSECONF(Signal * signal)721 Restore::execFSCLOSECONF(Signal * signal)
722 {
723 jamEntry();
724 FilePtr file_ptr;
725 FsConf* conf= (FsConf*)signal->getDataPtr();
726 m_file_pool.getPtr(file_ptr, conf->userPointer);
727
728 file_ptr.p->m_fd = RNIL;
729
730 if(file_ptr.p->m_outstanding_operations == 0)
731 {
732 jam();
733 restore_lcp_conf(signal, file_ptr);
734 return;
735 }
736 }
737
738 void
parse_file_header(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)739 Restore::parse_file_header(Signal* signal,
740 FilePtr file_ptr,
741 const Uint32* data, Uint32 len)
742 {
743 const BackupFormat::FileHeader* fh= (BackupFormat::FileHeader*)data;
744
745 if(memcmp(fh->Magic, "NDBBCKUP", 8) != 0)
746 {
747 parse_error(signal, file_ptr, __LINE__, *data);
748 return;
749 }
750
751 file_ptr.p->m_lcp_version = ntohl(fh->BackupVersion);
752 if (check_file_version(signal, ntohl(fh->BackupVersion)))
753 {
754 parse_error(signal, file_ptr, __LINE__, ntohl(fh->NdbVersion));
755 return;
756 }
757 ndbassert(ntohl(fh->SectionType) == BackupFormat::FILE_HEADER);
758
759 if(ntohl(fh->SectionLength) != len-3)
760 {
761 parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
762 return;
763 }
764
765 if(ntohl(fh->FileType) != BackupFormat::LCP_FILE)
766 {
767 parse_error(signal, file_ptr, __LINE__, ntohl(fh->FileType));
768 return;
769 }
770
771 if(fh->ByteOrder != 0x12345678)
772 {
773 parse_error(signal, file_ptr, __LINE__, fh->ByteOrder);
774 return;
775 }
776 }
777
778 void
parse_table_list(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)779 Restore::parse_table_list(Signal* signal, FilePtr file_ptr,
780 const Uint32 *data, Uint32 len)
781 {
782 const BackupFormat::CtlFile::TableList* fh=
783 (BackupFormat::CtlFile::TableList*)data;
784
785 if(ntohl(fh->TableIds[0]) != file_ptr.p->m_table_id)
786 {
787 parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableIds[0]));
788 return;
789 }
790 }
791
792 void
parse_table_description(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)793 Restore::parse_table_description(Signal* signal, FilePtr file_ptr,
794 const Uint32 *data, Uint32 len)
795 {
796 bool lcp = file_ptr.p->is_lcp();
797 Uint32 disk= 0;
798 const BackupFormat::CtlFile::TableDescription* fh=
799 (BackupFormat::CtlFile::TableDescription*)data;
800
801 LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
802
803 SimplePropertiesLinearReader it(fh->DictTabInfo, len);
804 it.first();
805
806 DictTabInfo::Table tmpTab; tmpTab.init();
807 SimpleProperties::UnpackStatus stat;
808 stat = SimpleProperties::unpack(it, &tmpTab,
809 DictTabInfo::TableMapping,
810 DictTabInfo::TableMappingSize,
811 true, true);
812 ndbrequire(stat == SimpleProperties::Break);
813
814 if(tmpTab.TableId != file_ptr.p->m_table_id)
815 {
816 parse_error(signal, file_ptr, __LINE__, tmpTab.TableId);
817 return;
818 }
819
820 Column c;
821 Uint32 colstore[sizeof(Column)/sizeof(Uint32)];
822
823 for(Uint32 i = 0; i<tmpTab.NoOfAttributes; i++) {
824 jam();
825 DictTabInfo::Attribute tmp; tmp.init();
826 stat = SimpleProperties::unpack(it, &tmp,
827 DictTabInfo::AttributeMapping,
828 DictTabInfo::AttributeMappingSize,
829 true, true);
830
831 ndbrequire(stat == SimpleProperties::Break);
832 it.next(); // Move Past EndOfAttribute
833
834 const Uint32 arr = tmp.AttributeArraySize;
835 const Uint32 sz = 1 << tmp.AttributeSize;
836 const Uint32 sz32 = (sz * arr + 31) >> 5;
837 const bool varsize = tmp.AttributeArrayType != NDB_ARRAYTYPE_FIXED;
838
839 c.m_id = tmp.AttributeId;
840 c.m_size = sz32;
841 c.m_flags = (tmp.AttributeKeyFlag ? Column::COL_KEY : 0);
842 c.m_flags |= (tmp.AttributeStorageType == NDB_STORAGETYPE_DISK ?
843 Column::COL_DISK : 0);
844
845 if(lcp && (c.m_flags & Column::COL_DISK))
846 {
847 /**
848 * Restore does not currently handle disk attributes
849 * which is fine as restore LCP shouldn't
850 */
851 disk++;
852 continue;
853 }
854
855 if(!tmp.AttributeNullableFlag && !varsize)
856 {
857 }
858 else if (true) // null mask dropped in 5.1
859 {
860 c.m_flags |= (varsize ? Column::COL_VAR : 0);
861 c.m_flags |= (tmp.AttributeNullableFlag ? Column::COL_NULL : 0);
862 }
863
864 memcpy(colstore, &c, sizeof(Column));
865 if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
866 {
867 parse_error(signal, file_ptr, __LINE__, i);
868 return;
869 }
870 }
871
872 if(lcp)
873 {
874 if (disk)
875 {
876 c.m_id = AttributeHeader::DISK_REF;
877 c.m_size = 2;
878 c.m_flags = 0;
879 memcpy(colstore, &c, sizeof(Column));
880 if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
881 {
882 parse_error(signal, file_ptr, __LINE__, 0);
883 return;
884 }
885 }
886
887 {
888 c.m_id = AttributeHeader::ROWID;
889 c.m_size = 2;
890 c.m_flags = 0;
891 memcpy(colstore, &c, sizeof(Column));
892 if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
893 {
894 parse_error(signal, file_ptr, __LINE__, 0);
895 return;
896 }
897 }
898
899 if (tmpTab.RowGCIFlag)
900 {
901 c.m_id = AttributeHeader::ROW_GCI;
902 c.m_size = 2;
903 c.m_flags = 0;
904 memcpy(colstore, &c, sizeof(Column));
905 if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
906 {
907 parse_error(signal, file_ptr, __LINE__, 0);
908 return;
909 }
910 }
911 }
912
913 file_ptr.p->m_table_version = tmpTab.TableVersion;
914 }
915
916 void
parse_fragment_header(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)917 Restore::parse_fragment_header(Signal* signal, FilePtr file_ptr,
918 const Uint32 *data, Uint32 len)
919 {
920 const BackupFormat::DataFile::FragmentHeader* fh=
921 (BackupFormat::DataFile::FragmentHeader*)data;
922 if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
923 {
924 parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
925 return;
926 }
927
928 if(ntohl(fh->ChecksumType) != 0)
929 {
930 parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
931 return;
932 }
933
934 file_ptr.p->m_fragment_id = ntohl(fh->FragmentNo);
935
936 if(file_ptr.p->is_lcp())
937 {
938 /**
939 * Temporary reset DBTUP's #disk attributes on table
940 */
941 c_tup->start_restore_lcp(file_ptr.p->m_table_id,
942 file_ptr.p->m_fragment_id);
943 }
944 }
945
946 void
parse_record(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)947 Restore::parse_record(Signal* signal, FilePtr file_ptr,
948 const Uint32 *data, Uint32 len)
949 {
950 List::Iterator it;
951 LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
952
953 Uint32 * const key_start = signal->getDataPtrSend()+24;
954 Uint32 * const attr_start = key_start + MAX_KEY_SIZE_IN_WORDS;
955
956 data += 1;
957 const Uint32* const dataStart = data;
958
959 bool disk = false;
960 bool rowid = false;
961 bool gci = false;
962 Uint32 keyLen;
963 Uint32 attrLen;
964 Local_key rowid_val;
965 Uint64 gci_val;
966 Uint32 tableId = file_ptr.p->m_table_id;
967 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
968
969 if (likely(file_ptr.p->m_lcp_version >= NDBD_RAW_LCP))
970 {
971 rowid = true;
972 rowid_val.m_page_no = data[0];
973 rowid_val.m_page_idx = data[1];
974 keyLen = c_tup->read_lcp_keys(tableId, data+2, len - 3, key_start);
975
976 AttributeHeader::init(attr_start, AttributeHeader::READ_LCP, 4*(len - 3));
977 memcpy(attr_start + 1, data + 2, 4 * (len - 3));
978 attrLen = 1 + len - 3;
979 }
980 else
981 {
982 Uint32 *keyData = key_start;
983 Uint32 *attrData = attr_start;
984 union {
985 Column c;
986 Uint32 _align[sizeof(Column)/sizeof(Uint32)];
987 };
988
989 columns.first(it);
990 while(!it.isNull())
991 {
992 _align[0] = *it.data; ndbrequire(columns.next(it));
993 _align[1] = *it.data; columns.next(it);
994
995 if (c.m_id == AttributeHeader::ROWID)
996 {
997 rowid_val.m_page_no = data[0];
998 rowid_val.m_page_idx = data[1];
999 data += 2;
1000 rowid = true;
1001 continue;
1002 }
1003
1004 if (c.m_id == AttributeHeader::ROW_GCI)
1005 {
1006 memcpy(&gci_val, data, 8);
1007 data += 2;
1008 gci = true;
1009 continue;
1010 }
1011
1012 if (! (c.m_flags & (Column::COL_VAR | Column::COL_NULL)))
1013 {
1014 ndbrequire(data < dataStart + len);
1015
1016 if(c.m_flags & Column::COL_KEY)
1017 {
1018 memcpy(keyData, data, 4*c.m_size);
1019 keyData += c.m_size;
1020 }
1021
1022 AttributeHeader::init(attrData++, c.m_id, c.m_size << 2);
1023 memcpy(attrData, data, 4*c.m_size);
1024 attrData += c.m_size;
1025 data += c.m_size;
1026 }
1027
1028 if(c.m_flags & Column::COL_DISK)
1029 disk= true;
1030 }
1031
1032 // second part is data driven
1033 while (data + 2 < dataStart + len) {
1034 Uint32 sz= ntohl(*data); data++;
1035 Uint32 id= ntohl(*data); data++; // column_no
1036
1037 ndbrequire(columns.position(it, 2 * id));
1038
1039 _align[0] = *it.data; ndbrequire(columns.next(it));
1040 _align[1] = *it.data;
1041
1042 Uint32 sz32 = (sz + 3) >> 2;
1043 ndbassert(c.m_flags & (Column::COL_VAR | Column::COL_NULL));
1044 if (c.m_flags & Column::COL_KEY)
1045 {
1046 memcpy(keyData, data, 4 * sz32);
1047 keyData += sz32;
1048 }
1049
1050 AttributeHeader::init(attrData++, c.m_id, sz);
1051 memcpy(attrData, data, sz);
1052
1053 attrData += sz32;
1054 data += sz32;
1055 }
1056
1057 ndbrequire(data == dataStart + len - 1);
1058
1059 ndbrequire(disk == false); // Not supported...
1060 ndbrequire(rowid == true);
1061 keyLen = Uint32(keyData - key_start);
1062 attrLen = Uint32(attrData - attr_start);
1063 if (desc->noOfKeyAttr != desc->noOfVarKeys)
1064 {
1065 reorder_key(desc, key_start, keyLen);
1066 }
1067 }
1068
1069 LqhKeyReq * req = (LqhKeyReq *)signal->getDataPtrSend();
1070
1071 Uint32 hashValue;
1072 if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
1073 hashValue = calulate_hash(tableId, key_start);
1074 else
1075 hashValue = md5_hash((Uint64*)key_start, keyLen);
1076
1077 Uint32 tmp= 0;
1078 LqhKeyReq::setAttrLen(tmp, attrLen);
1079 req->attrLen = tmp;
1080
1081 tmp= 0;
1082 LqhKeyReq::setKeyLen(tmp, keyLen);
1083 LqhKeyReq::setLastReplicaNo(tmp, 0);
1084 /* ---------------------------------------------------------------------- */
1085 // Indicate Application Reference is present in bit 15
1086 /* ---------------------------------------------------------------------- */
1087 LqhKeyReq::setApplicationAddressFlag(tmp, 0);
1088 LqhKeyReq::setDirtyFlag(tmp, 1);
1089 LqhKeyReq::setSimpleFlag(tmp, 1);
1090 LqhKeyReq::setOperation(tmp, ZINSERT);
1091 LqhKeyReq::setSameClientAndTcFlag(tmp, 0);
1092 LqhKeyReq::setAIInLqhKeyReq(tmp, 0);
1093 LqhKeyReq::setNoDiskFlag(tmp, disk ? 0 : 1);
1094 LqhKeyReq::setRowidFlag(tmp, 1);
1095 LqhKeyReq::setGCIFlag(tmp, gci);
1096 req->clientConnectPtr = file_ptr.i;
1097 req->hashValue = hashValue;
1098 req->requestInfo = tmp;
1099 req->tcBlockref = reference();
1100 req->savePointId = 0;
1101 req->tableSchemaVersion = file_ptr.p->m_table_id +
1102 (file_ptr.p->m_table_version << 16);
1103 req->fragmentData = file_ptr.p->m_fragment_id;
1104 req->transId1 = 0;
1105 req->transId2 = 0;
1106 req->scanInfo = 0;
1107 memcpy(req->variableData, key_start, 16);
1108 Uint32 pos = keyLen > 4 ? 4 : keyLen;
1109 req->variableData[pos++] = rowid_val.m_page_no;
1110 req->variableData[pos++] = rowid_val.m_page_idx;
1111 if (gci)
1112 req->variableData[pos++] = (Uint32)gci_val;
1113 file_ptr.p->m_outstanding_operations++;
1114 EXECUTE_DIRECT(DBLQH, GSN_LQHKEYREQ, signal,
1115 LqhKeyReq::FixedSignalLength+pos);
1116
1117 if(keyLen > 4)
1118 {
1119 c_lqh->receive_keyinfo(signal,
1120 key_start + 4,
1121 keyLen - 4);
1122 }
1123
1124 c_lqh->receive_attrinfo(signal, attr_start, attrLen);
1125 }
1126
1127 void
reorder_key(const KeyDescriptor * desc,Uint32 * data,Uint32 len)1128 Restore::reorder_key(const KeyDescriptor* desc,
1129 Uint32 *data, Uint32 len)
1130 {
1131 Uint32 i;
1132 Uint32 *var= data;
1133 Uint32 Tmp[MAX_KEY_SIZE_IN_WORDS];
1134 for(i = 0; i<desc->noOfKeyAttr; i++)
1135 {
1136 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
1137 switch(AttributeDescriptor::getArrayType(attr)){
1138 case NDB_ARRAYTYPE_FIXED:
1139 var += AttributeDescriptor::getSizeInWords(attr);
1140 }
1141 }
1142
1143 Uint32 *dst = Tmp;
1144 Uint32 *src = data;
1145 for(i = 0; i<desc->noOfKeyAttr; i++)
1146 {
1147 Uint32 sz;
1148 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
1149 switch(AttributeDescriptor::getArrayType(attr)){
1150 case NDB_ARRAYTYPE_FIXED:
1151 sz = AttributeDescriptor::getSizeInWords(attr);
1152 memcpy(dst, src, 4 * sz);
1153 src += sz;
1154 break;
1155 case NDB_ARRAYTYPE_SHORT_VAR:
1156 sz = (1 + ((Uint8*)var)[0] + 3) >> 2;
1157 memcpy(dst, var, 4 * sz);
1158 var += sz;
1159 break;
1160 case NDB_ARRAYTYPE_MEDIUM_VAR:
1161 sz = (2 + ((Uint8*)var)[0] + 256*((Uint8*)var)[1] + 3) >> 2;
1162 memcpy(dst, var, 4 * sz);
1163 var += sz;
1164 break;
1165 default:
1166 ndbrequire(false);
1167 sz = 0;
1168 }
1169 dst += sz;
1170 }
1171 ndbassert((Uint32) (dst - Tmp) == len);
1172 memcpy(data, Tmp, 4*len);
1173 }
1174
1175 Uint32
calulate_hash(Uint32 tableId,const Uint32 * src)1176 Restore::calulate_hash(Uint32 tableId, const Uint32 *src)
1177 {
1178 jam();
1179 Uint64 Tmp[(MAX_KEY_SIZE_IN_WORDS*MAX_XFRM_MULTIPLY) >> 1];
1180 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
1181 Uint32 keyLen = xfrm_key(tableId, src, (Uint32*)Tmp, sizeof(Tmp) >> 2,
1182 keyPartLen);
1183 ndbrequire(keyLen);
1184
1185 return md5_hash(Tmp, keyLen);
1186 }
1187
1188 void
execLQHKEYREF(Signal * signal)1189 Restore::execLQHKEYREF(Signal* signal)
1190 {
1191 FilePtr file_ptr;
1192 LqhKeyRef* ref = (LqhKeyRef*)signal->getDataPtr();
1193 m_file_pool.getPtr(file_ptr, ref->connectPtr);
1194
1195 crash_during_restore(file_ptr, __LINE__, ref->errorCode);
1196 ndbrequire(false);
1197 }
1198
1199 void
crash_during_restore(FilePtr file_ptr,Uint32 line,Uint32 errCode)1200 Restore::crash_during_restore(FilePtr file_ptr, Uint32 line, Uint32 errCode)
1201 {
1202 char buf[255], name[100];
1203 BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
1204 file_ptr.p->m_lcp_no,
1205 file_ptr.p->m_table_id,
1206 file_ptr.p->m_fragment_id);
1207
1208 if (errCode)
1209 {
1210 BaseString::snprintf(buf, sizeof(buf),
1211 "Error %d (line: %u) during restore of %s",
1212 errCode, line, name);
1213 }
1214 else
1215 {
1216 BaseString::snprintf(buf, sizeof(buf),
1217 "Error (line %u) during restore of %s",
1218 line, name);
1219 }
1220 progError(__LINE__, NDBD_EXIT_INVALID_LCP_FILE, buf);
1221 }
1222
1223 void
execLQHKEYCONF(Signal * signal)1224 Restore::execLQHKEYCONF(Signal* signal)
1225 {
1226 FilePtr file_ptr;
1227 LqhKeyConf * conf = (LqhKeyConf *)signal->getDataPtr();
1228 m_file_pool.getPtr(file_ptr, conf->opPtr);
1229
1230 ndbassert(file_ptr.p->m_outstanding_operations);
1231 file_ptr.p->m_outstanding_operations--;
1232 file_ptr.p->m_rows_restored++;
1233 if(file_ptr.p->m_outstanding_operations == 0 && file_ptr.p->m_fd == RNIL)
1234 {
1235 jam();
1236 restore_lcp_conf(signal, file_ptr);
1237 return;
1238 }
1239 }
1240
1241 void
restore_lcp_conf(Signal * signal,FilePtr file_ptr)1242 Restore::restore_lcp_conf(Signal* signal, FilePtr file_ptr)
1243 {
1244 RestoreLcpConf* rep= (RestoreLcpConf*)signal->getDataPtrSend();
1245 rep->senderData= file_ptr.p->m_sender_data;
1246 if(file_ptr.p->is_lcp())
1247 {
1248 /**
1249 * Temporary reset DBTUP's #disk attributes on table
1250 *
1251 * TUP will send RESTORE_LCP_CONF
1252 */
1253 c_tup->complete_restore_lcp(signal,
1254 file_ptr.p->m_sender_ref,
1255 file_ptr.p->m_sender_data,
1256 file_ptr.p->m_table_id,
1257 file_ptr.p->m_fragment_id);
1258 }
1259 else
1260 {
1261 sendSignal(file_ptr.p->m_sender_ref,
1262 GSN_RESTORE_LCP_CONF, signal,
1263 RestoreLcpConf::SignalLength, JBB);
1264 }
1265
1266 signal->theData[0] = NDB_LE_ReadLCPComplete;
1267 signal->theData[1] = file_ptr.p->m_table_id;
1268 signal->theData[2] = file_ptr.p->m_fragment_id;
1269 signal->theData[3] = Uint32(file_ptr.p->m_rows_restored >> 32);
1270 signal->theData[4] = Uint32(file_ptr.p->m_rows_restored);
1271 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
1272
1273 release_file(file_ptr);
1274 }
1275
1276 void
parse_fragment_footer(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)1277 Restore::parse_fragment_footer(Signal* signal, FilePtr file_ptr,
1278 const Uint32 *data, Uint32 len)
1279 {
1280 const BackupFormat::DataFile::FragmentFooter* fh=
1281 (BackupFormat::DataFile::FragmentFooter*)data;
1282 if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
1283 {
1284 parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
1285 return;
1286 }
1287
1288 if(ntohl(fh->Checksum) != 0)
1289 {
1290 parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
1291 return;
1292 }
1293 }
1294
1295 void
parse_gcp_entry(Signal * signal,FilePtr file_ptr,const Uint32 * data,Uint32 len)1296 Restore::parse_gcp_entry(Signal* signal, FilePtr file_ptr,
1297 const Uint32 *data, Uint32 len)
1298 {
1299
1300 }
1301
1302 void
parse_error(Signal * signal,FilePtr file_ptr,Uint32 line,Uint32 extra)1303 Restore::parse_error(Signal* signal,
1304 FilePtr file_ptr, Uint32 line, Uint32 extra)
1305 {
1306 char buf[255], name[100];
1307 BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
1308 file_ptr.p->m_lcp_no,
1309 file_ptr.p->m_table_id,
1310 file_ptr.p->m_fragment_id);
1311
1312 BaseString::snprintf(buf, sizeof(buf),
1313 "Parse error in file: %s, extra: %d",
1314 name, extra);
1315
1316 progError(line, NDBD_EXIT_INVALID_LCP_FILE, buf);
1317 ndbrequire(false);
1318 }
1319
1320 NdbOut&
operator <<(NdbOut & ndbout,const Restore::Column & col)1321 operator << (NdbOut& ndbout, const Restore::Column& col)
1322 {
1323 ndbout << "[ Col: id: " << col.m_id
1324 << " size: " << col.m_size
1325 << " key: " << (Uint32)(col.m_flags & Restore::Column::COL_KEY)
1326 << " variable: " << (Uint32)(col.m_flags & Restore::Column::COL_VAR)
1327 << " null: " << (Uint32)(col.m_flags & Restore::Column::COL_NULL)
1328 << " disk: " << (Uint32)(col.m_flags & Restore::Column::COL_DISK)
1329 << "]";
1330
1331 return ndbout;
1332 }
1333
1334 int
check_file_version(Signal * signal,Uint32 file_version)1335 Restore::check_file_version(Signal* signal, Uint32 file_version)
1336 {
1337 if (file_version < MAKE_VERSION(5,1,6))
1338 {
1339 char buf[255];
1340 char verbuf[255];
1341 ndbGetVersionString(file_version, 0, 0, verbuf, sizeof(verbuf));
1342 BaseString::snprintf(buf, sizeof(buf),
1343 "Unsupported version of LCP files found on disk, "
1344 " found: %s", verbuf);
1345
1346 progError(__LINE__,
1347 NDBD_EXIT_SR_RESTARTCONFLICT,
1348 buf);
1349 return -1;
1350 }
1351 return 0;
1352 }
1353