1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 #include "we_confirmhdfsdbfile.h"
19
20 #include <cerrno>
21 #include <cstring>
22 #include <iostream>
23 #include <sstream>
24 #include <vector>
25
26 #include <boost/scoped_array.hpp>
27 #include <boost/scoped_ptr.hpp>
28
29 #include "we_define.h"
30 #include "we_config.h"
31 #include "we_fileop.h"
32 #include "we_rbmetawriter.h"
33 #include "IDBPolicy.h"
34 #include "IDBDataFile.h"
35
36 namespace
37 {
38 const int BUF_SIZE = 1024; // size of buffer used to read meta data records
39 }
40
41 namespace WriteEngine
42 {
43
44 //------------------------------------------------------------------------------
45 // Constructor
46 // This class should typically only be used on an HDFS system, so we could
47 // hardcode this to pass HDFS to getFsi(); but it comes in handy for testing,
48 // to be able to execute this class on a non-HDFS stack as well. So I rely
49 // on useHdfs() to tell me which FileSystem reference to get.
50 //------------------------------------------------------------------------------
ConfirmHdfsDbFile()51 ConfirmHdfsDbFile::ConfirmHdfsDbFile() :
52 fFs(idbdatafile::IDBPolicy::useHdfs() ?
53 idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::HDFS) :
54 idbdatafile::IDBPolicy::useCloud() ?
55 idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::CLOUD) :
56 idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::BUFFERED))
57 {
58 }
59
60 //------------------------------------------------------------------------------
61 // Destructor
62 //------------------------------------------------------------------------------
~ConfirmHdfsDbFile()63 ConfirmHdfsDbFile::~ConfirmHdfsDbFile()
64 {
65 }
66
67 //------------------------------------------------------------------------------
68 // Backup a cdf file and replace it with the updated tmp file.
69 //------------------------------------------------------------------------------
confirmDbFileChange(const std::string & backUpFileType,const std::string & filename,std::string & errMsg) const70 int ConfirmHdfsDbFile::confirmDbFileChange(
71 const std::string& backUpFileType,
72 const std::string& filename,
73 std::string& errMsg) const
74 {
75 // return value
76 int rc = NO_ERROR;
77
78 // This rlc file should be renamed if success, just skip it
79 if (backUpFileType.compare("rlc") == 0)
80 {
81 return rc;
82 }
83
84 if (backUpFileType.compare("tmp") != 0 )
85 {
86 std::ostringstream oss;
87 oss << backUpFileType << " is a bad type to confirm DbFile change: " <<
88 filename;
89 errMsg = oss.str();
90 rc = ERR_HDFS_BACKUP;
91
92 return rc;
93 }
94
95 // add safety checks, just in case
96 std::string tmp(filename + ".tmp");
97
98 if (!fFs.exists(tmp.c_str())) // file already swapped
99 return rc;
100
101 if (fFs.size(tmp.c_str()) <= 0)
102 {
103 std::ostringstream oss;
104 oss << "tmp file " << tmp << " has bad size" << fFs.size(tmp.c_str());
105 errMsg = oss.str();
106 rc = ERR_COMP_RENAME_FILE;
107
108 return rc;
109 }
110
111 // remove the old orig if exists
112 std::string orig(filename + ".orig");
113 errno = 0;
114
115 if ((fFs.exists(orig.c_str())) &&
116 (fFs.remove(orig.c_str())) != 0)
117 {
118 int errNum = errno;
119 std::ostringstream oss;
120 oss << "remove old " << orig << " failed: " << strerror(errNum);
121 errMsg = oss.str();
122 rc = ERR_COMP_REMOVE_FILE;
123
124 return rc;
125 }
126
127 // backup the original
128 errno = 0;
129
130 if (fFs.rename(filename.c_str(), orig.c_str()) != 0)
131 {
132 int errNum = errno;
133 std::ostringstream oss;
134 oss << "rename " << filename << " to " << orig << " failed: " <<
135 strerror(errNum);
136 errMsg = oss.str();
137 rc = ERR_COMP_RENAME_FILE;
138
139 return rc;
140 }
141
142 // rename the new file
143 errno = 0;
144
145 if (fFs.rename(tmp.c_str(), filename.c_str()) != 0)
146 {
147 int errNum = errno;
148 std::ostringstream oss;
149 oss << "rename " << tmp << " to " << filename << " failed: " <<
150 strerror(errNum);
151 errMsg = oss.str();
152 rc = ERR_COMP_RENAME_FILE;
153
154 return rc;
155 }
156
157 return rc;
158 }
159
160 //------------------------------------------------------------------------------
161 // Finalize the changes to a db file.
162 // If success flag is true, then remove the orig
163 // otherwise, move the orig back to cdf
164 //------------------------------------------------------------------------------
endDbFileChange(const std::string & backUpFileType,const std::string & filename,bool success,std::string & errMsg) const165 int ConfirmHdfsDbFile::endDbFileChange(
166 const std::string& backUpFileType,
167 const std::string& filename,
168 bool success,
169 std::string& errMsg) const
170 {
171 // return value
172 int rc = NO_ERROR;
173
174 // This rlc file should be renamed if success, it is useless if failed.
175 if (backUpFileType.compare("rlc") == 0)
176 {
177 std::string rlc(filename + ".rlc");
178
179 if (fFs.exists(rlc.c_str()))
180 fFs.remove(rlc.c_str()); // TBD-okay to ignore failed removal?
181
182 return rc;
183 }
184
185 if (backUpFileType.compare("tmp") != 0)
186 {
187 std::ostringstream oss;
188 oss << backUpFileType << " is a bad type to finalize DbFile change: " <<
189 filename;
190 errMsg = oss.str();
191 rc = ERR_HDFS_BACKUP;
192
193 return rc;
194 }
195
196 std::string orig(filename + ".orig");
197
198 if (success)
199 {
200 // remove the orig file
201 errno = 0;
202
203 if ((fFs.exists(orig.c_str())) &&
204 (fFs.remove(orig.c_str())) != 0)
205 {
206 int errNum = errno;
207 std::ostringstream oss;
208 oss << "remove " << orig << " failed: " << strerror(errNum);
209 errMsg = oss.str();
210 rc = ERR_COMP_REMOVE_FILE;
211
212 return rc;
213 }
214 }
215 else
216 {
217 // restore the orig file
218 if (fFs.exists(orig.c_str()))
219 {
220 errno = 0;
221
222 // Try to remove file only if it exists
223 if ((fFs.exists(filename.c_str())) &&
224 (fFs.remove(filename.c_str()) != 0))
225 {
226 int errNum = errno;
227 std::ostringstream oss;
228 oss << "failed restore; remove " << filename << " failed: " <<
229 strerror(errNum);
230 errMsg = oss.str();
231 rc = ERR_COMP_REMOVE_FILE;
232
233 return rc;
234 }
235
236 errno = 0;
237
238 if (fFs.rename(orig.c_str(), filename.c_str()) != 0)
239 {
240 int errNum = errno;
241 std::ostringstream oss;
242 oss << "failed restore; rename " << orig << " failed: " <<
243 strerror(errNum);
244 errMsg = oss.str();
245 rc = ERR_COMP_RENAME_FILE;
246
247 return rc;
248 }
249 }
250
251 // remove the tmp file
252 std::string tmp(filename + ".tmp");
253 errno = 0;
254
255 if ((fFs.exists(tmp.c_str())) &&
256 (fFs.remove(tmp.c_str())) != 0)
257 {
258 int errNum = errno;
259 std::ostringstream oss;
260 oss << "failed restore; remove " << tmp << " failed: " <<
261 strerror(errNum);
262 errMsg = oss.str();
263 rc = ERR_COMP_REMOVE_FILE;
264
265 return rc;
266 }
267
268 // remove the chunk shifting helper
269 std::string rlc(filename + ".rlc");
270 errno = 0;
271
272 if ((fFs.exists(rlc.c_str())) &&
273 (fFs.remove(rlc.c_str())) != 0)
274 {
275 int errNum = errno;
276 std::ostringstream oss;
277 oss << "failed restore; remove " << rlc << " failed: " <<
278 strerror(errNum);
279 errMsg = oss.str();
280 rc = ERR_COMP_REMOVE_FILE;
281
282 return rc;
283 }
284 }
285
286 return rc;
287 }
288
289 //------------------------------------------------------------------------------
290 // Confirm the changes to the hwm DB files listed in the bulk rollback meta
291 // data file corresponding to the specified table OID.
292 //------------------------------------------------------------------------------
confirmDbFileListFromMetaFile(OID tableOID,std::string & errMsg)293 int ConfirmHdfsDbFile::confirmDbFileListFromMetaFile(
294 OID tableOID,
295 std::string& errMsg)
296 {
297 int rc = NO_ERROR;
298
299 try
300 {
301 std::vector<uint16_t> dbRoots;
302 Config::getRootIdList( dbRoots );
303
304 for (unsigned m = 0; m < dbRoots.size(); m++)
305 {
306 std::istringstream metaDataStream;
307 openMetaDataFile ( tableOID,
308 dbRoots[m], metaDataStream );
309
310 confirmDbFiles( metaDataStream );
311 }
312 }
313 catch (WeException& ex)
314 {
315 std::ostringstream oss;
316 oss << "Error confirming changes to table " << tableOID <<
317 "; " << ex.what();
318 errMsg = oss.str();
319 rc = ex.errorCode();
320 }
321 catch (std::exception& ex)
322 {
323 std::ostringstream oss;
324 oss << "Error confirming changes to table " << tableOID <<
325 "; " << ex.what();
326 errMsg = oss.str();
327 rc = ERR_UNKNOWN;
328 }
329
330 return rc;
331 }
332
333 //------------------------------------------------------------------------------
334 // Confirm the changes to the hwm DB files listed in the bulk rollback meta
335 // data file stream stored in metaDataStream.
336 //------------------------------------------------------------------------------
confirmDbFiles(std::istringstream & metaDataStream) const337 void ConfirmHdfsDbFile::confirmDbFiles(std::istringstream& metaDataStream) const
338 {
339 char inBuf[ BUF_SIZE ];
340
341 // Loop through the records in the meta-data file
342 while (metaDataStream.getline( inBuf, BUF_SIZE ))
343 {
344 // Restore Files for current DBRoot
345 if (RBMetaWriter::verifyColumn1Rec(inBuf))
346 {
347 confirmColumnDbFile(inBuf);
348 }
349 else if (RBMetaWriter::verifyDStore1Rec(inBuf))
350 {
351 confirmDctnryStoreDbFile(inBuf);
352 }
353 }
354 }
355
356 //------------------------------------------------------------------------------
357 // Confirm the changes to the hwm column DB file described in the bulk
358 // rollback meta data file record stored in inBuf.
359 //------------------------------------------------------------------------------
confirmColumnDbFile(const char * inBuf) const360 void ConfirmHdfsDbFile::confirmColumnDbFile(const char* inBuf) const
361 {
362 char recType[100];
363 OID columnOID;
364 uint32_t dbRootHwm;
365 uint32_t partNumHwm;
366 uint32_t segNumHwm;
367 HWM lastLocalHwm;
368 int colTypeInt;
369 char colTypeName[100];
370 uint32_t colWidth;
371 int compressionType = 0; // optional parameter
372
373 // Read meta-data record
374 int numFields = sscanf(inBuf, "%s %u %u %u %u %u %d %s %u %d",
375 recType, &columnOID,
376 &dbRootHwm, &partNumHwm, &segNumHwm, &lastLocalHwm,
377 &colTypeInt, colTypeName, &colWidth, &compressionType );
378
379 if (numFields < 9) // compressionType is optional
380 {
381 std::ostringstream oss;
382 oss << "Invalid COLUM1 record in meta-data file " <<
383 fMetaFileName << "; record-<" << inBuf << ">";
384
385 throw WeException( oss.str(), ERR_INVALID_PARAM );
386 }
387
388 // Construct the DB file name
389 char dbFileName[FILE_NAME_SIZE];
390 FileOp dbFile(false);
391 int rc = dbFile.getFileName( columnOID,
392 dbFileName,
393 dbRootHwm,
394 partNumHwm,
395 segNumHwm );
396
397 if (rc != NO_ERROR)
398 {
399 WErrorCodes ec;
400 std::ostringstream oss;
401 oss << "Error constructing column filename to confirm changes" <<
402 "; columnOID-" << columnOID <<
403 "; dbRoot-" << dbRootHwm <<
404 "; partNum-" << partNumHwm <<
405 "; segNum-" << segNumHwm <<
406 "; " << ec.errorString(rc);
407
408 throw WeException( oss.str(), rc );
409 }
410
411 // Confirm the changes to the DB file name
412 std::string errMsg;
413 rc = confirmDbFileChange( std::string("tmp"),
414 dbFileName,
415 errMsg );
416
417 if (rc != NO_ERROR)
418 {
419 throw WeException( errMsg, rc );
420 }
421 }
422
423 //------------------------------------------------------------------------------
424 // Confirm the changes to the hwm dctnry store DB file described in the bulk
425 // rollback meta data file record stored in inBuf.
426 //------------------------------------------------------------------------------
confirmDctnryStoreDbFile(const char * inBuf) const427 void ConfirmHdfsDbFile::confirmDctnryStoreDbFile(const char* inBuf) const
428 {
429 char recType[100];
430 OID dColumnOID;
431 OID dStoreOID;
432 uint32_t dbRootHwm;
433 uint32_t partNumHwm;
434 uint32_t segNumHwm;
435 HWM localHwm;
436 int compressionType = 0; // optional parameter
437
438 // Read meta-data record
439 int numFields = sscanf(inBuf, "%s %u %u %u %u %u %u %d",
440 recType, &dColumnOID, &dStoreOID,
441 &dbRootHwm, &partNumHwm, &segNumHwm, &localHwm, &compressionType );
442
443 if (numFields < 7) // compressionType optional
444 {
445 std::ostringstream oss;
446 oss << "Invalid DSTOR1 record in meta-data file " <<
447 fMetaFileName << "; record-<" << inBuf << ">";
448
449 throw WeException( oss.str(), ERR_INVALID_PARAM );
450 }
451
452 // Construct the DB file name
453 char dbFileName[FILE_NAME_SIZE];
454 FileOp dbFile(false);
455 int rc = dbFile.getFileName( dStoreOID,
456 dbFileName,
457 dbRootHwm,
458 partNumHwm,
459 segNumHwm );
460
461 if (rc != NO_ERROR)
462 {
463 WErrorCodes ec;
464 std::ostringstream oss;
465 oss << "Error constructing dictionary store filename to confirm changes" <<
466 "; columnOID-" << dStoreOID <<
467 "; dbRoot-" << dbRootHwm <<
468 "; partNum-" << partNumHwm <<
469 "; segNum-" << segNumHwm <<
470 "; " << ec.errorString(rc);
471
472 throw WeException( oss.str(), rc );
473 }
474
475 // Confirm the changes to the DB file name
476 std::string errMsg;
477 rc = confirmDbFileChange( std::string("tmp"),
478 dbFileName,
479 errMsg );
480
481 if (rc != NO_ERROR)
482 {
483 throw WeException( errMsg, rc );
484 }
485 }
486
487 //------------------------------------------------------------------------------
488 // End the changes to the hwm DB files listed in the bulk rollback meta
489 // data file corresponding to the specified table OID. Delete temp files.
490 //------------------------------------------------------------------------------
endDbFileListFromMetaFile(OID tableOID,bool success,std::string & errMsg)491 int ConfirmHdfsDbFile::endDbFileListFromMetaFile(
492 OID tableOID,
493 bool success,
494 std::string& errMsg)
495 {
496 int rc = NO_ERROR;
497 errMsg.clear();
498
499 std::vector<uint16_t> dbRoots;
500 Config::getRootIdList( dbRoots );
501
502 for (unsigned m = 0; m < dbRoots.size(); m++)
503 {
504 std::istringstream metaDataStream;
505
506 try
507 {
508 std::istringstream metaDataStream;
509 openMetaDataFile ( tableOID,
510 dbRoots[m], metaDataStream );
511
512 endDbFiles( metaDataStream, success );
513 }
514 // We catch any errors, but not deleting a temp file is not fatal,
515 // so we capture the error msg and keep going if a problem occurs.
516 // We return a concatenated list of error msgs if multiple errors
517 // take place.
518 catch (WeException& ex)
519 {
520 if (errMsg.size() == 0)
521 {
522 std::ostringstream oss;
523 oss << "Error deleting temp files for table " << tableOID <<
524 "; " << ex.what();
525 errMsg = oss.str();
526 rc = ex.errorCode();
527 }
528 else
529 {
530 errMsg += "; ";
531 errMsg += ex.what();
532 }
533 }
534 catch (std::exception& ex)
535 {
536 if (errMsg.size() == 0)
537 {
538 std::ostringstream oss;
539 oss << "Error deleting temp files for table " << tableOID <<
540 "; " << ex.what();
541 errMsg = oss.str();
542 rc = ERR_UNKNOWN;
543 }
544 else
545 {
546 errMsg += "; ";
547 errMsg += ex.what();
548 }
549 }
550 }
551
552 return rc;
553 }
554
555 //------------------------------------------------------------------------------
556 // End the changes to the hwm DB files listed in the bulk rollback meta
557 // data file stream stored in metaDataStream. Delete temp files.
558 //------------------------------------------------------------------------------
endDbFiles(std::istringstream & metaDataStream,bool success) const559 void ConfirmHdfsDbFile::endDbFiles(
560 std::istringstream& metaDataStream,
561 bool success) const
562 {
563 char inBuf[ BUF_SIZE ];
564 std::string errMsg;
565 int rc = NO_ERROR;
566
567 // Loop through the records in the meta-data file
568 while (metaDataStream.getline( inBuf, BUF_SIZE ))
569 {
570 try
571 {
572 // Delete Temp Files for current DBRoot
573 if (RBMetaWriter::verifyColumn1Rec(inBuf))
574 {
575 endColumnDbFile(inBuf, success);
576 }
577 else if (RBMetaWriter::verifyDStore1Rec(inBuf))
578 {
579 endDctnryStoreDbFile(inBuf, success);
580 }
581 }
582 // We catch any errors, but not deleting a temp file is not fatal,
583 // so we capture the error msg and keep going if a problem occurs.
584 // We return a concatenated list of error msgs if multiple errors
585 // take place.
586 catch (WeException& ex)
587 {
588 if (errMsg.size() == 0)
589 {
590 rc = ex.errorCode();
591 }
592 else
593 {
594 errMsg += "; ";
595 }
596
597 errMsg += ex.what();
598 }
599 catch (std::exception& ex)
600 {
601 if (errMsg.size() == 0)
602 {
603 rc = ERR_UNKNOWN;
604 }
605 else
606 {
607 errMsg += "; ";
608 }
609
610 errMsg += ex.what();
611 }
612 }
613
614 // Throw exception with cumulative list of any error msgs
615 if (errMsg.size() > 0)
616 {
617 throw WeException( errMsg, rc );
618 }
619 }
620
621 //------------------------------------------------------------------------------
622 // End the changes to the hwm column DB file described in the bulk
623 // rollback meta data file record stored in inBuf. Delete the temp file.
624 //------------------------------------------------------------------------------
endColumnDbFile(const char * inBuf,bool success) const625 void ConfirmHdfsDbFile::endColumnDbFile(
626 const char* inBuf,
627 bool success) const
628 {
629 char recType[100];
630 OID columnOID;
631 uint32_t dbRootHwm;
632 uint32_t partNumHwm;
633 uint32_t segNumHwm;
634 HWM lastLocalHwm;
635 int colTypeInt;
636 char colTypeName[100];
637 uint32_t colWidth;
638 int compressionType = 0; // optional parameter
639
640 // Read meta-data record
641 int numFields = sscanf(inBuf, "%s %u %u %u %u %u %d %s %u %d",
642 recType, &columnOID,
643 &dbRootHwm, &partNumHwm, &segNumHwm, &lastLocalHwm,
644 &colTypeInt, colTypeName, &colWidth, &compressionType );
645
646 if (numFields < 9) // compressionType is optional
647 {
648 std::ostringstream oss;
649 oss << "Invalid COLUM1 record in meta-data file " <<
650 fMetaFileName << "; record-<" << inBuf << ">";
651
652 throw WeException( oss.str(), ERR_INVALID_PARAM );
653 }
654
655 // Construct the DB file name
656 char dbFileName[FILE_NAME_SIZE];
657 FileOp dbFile(false);
658 int rc = dbFile.getFileName( columnOID,
659 dbFileName,
660 dbRootHwm,
661 partNumHwm,
662 segNumHwm );
663
664 if (rc != NO_ERROR)
665 {
666 WErrorCodes ec;
667 std::ostringstream oss;
668 oss << "Error constructing column filename to end changes" <<
669 "; columnOID-" << columnOID <<
670 "; dbRoot-" << dbRootHwm <<
671 "; partNum-" << partNumHwm <<
672 "; segNum-" << segNumHwm <<
673 "; " << ec.errorString(rc);
674
675 throw WeException( oss.str(), rc );
676 }
677
678 // Confirm the changes to the DB file name
679 std::string errMsg;
680 rc = endDbFileChange( std::string("tmp"),
681 dbFileName,
682 success,
683 errMsg );
684
685 if (rc != NO_ERROR)
686 {
687 throw WeException( errMsg, rc );
688 }
689 }
690
691 //------------------------------------------------------------------------------
692 // End the changes to the hwm dctnry store DB file described in the bulk
693 // rollback meta data file record stored in inBuf. Delete the temp file.
694 //------------------------------------------------------------------------------
endDctnryStoreDbFile(const char * inBuf,bool success) const695 void ConfirmHdfsDbFile::endDctnryStoreDbFile(
696 const char* inBuf,
697 bool success) const
698 {
699 char recType[100];
700 OID dColumnOID;
701 OID dStoreOID;
702 uint32_t dbRootHwm;
703 uint32_t partNumHwm;
704 uint32_t segNumHwm;
705 HWM localHwm;
706 int compressionType = 0; // optional parameter
707
708 // Read meta-data record
709 int numFields = sscanf(inBuf, "%s %u %u %u %u %u %u %d",
710 recType, &dColumnOID, &dStoreOID,
711 &dbRootHwm, &partNumHwm, &segNumHwm, &localHwm, &compressionType );
712
713 if (numFields < 7) // compressionType optional
714 {
715 std::ostringstream oss;
716 oss << "Invalid DSTOR1 record in meta-data file " <<
717 fMetaFileName << "; record-<" << inBuf << ">";
718
719 throw WeException( oss.str(), ERR_INVALID_PARAM );
720 }
721
722 // Construct the DB file name
723 char dbFileName[FILE_NAME_SIZE];
724 FileOp dbFile(false);
725 int rc = dbFile.getFileName( dStoreOID,
726 dbFileName,
727 dbRootHwm,
728 partNumHwm,
729 segNumHwm );
730
731 if (rc != NO_ERROR)
732 {
733 WErrorCodes ec;
734 std::ostringstream oss;
735 oss << "Error constructing dictionary store filename to end changes" <<
736 "; columnOID-" << dStoreOID <<
737 "; dbRoot-" << dbRootHwm <<
738 "; partNum-" << partNumHwm <<
739 "; segNum-" << segNumHwm <<
740 "; " << ec.errorString(rc);
741
742 throw WeException( oss.str(), rc );
743 }
744
745 // Confirm the changes to the DB file name
746 std::string errMsg;
747 rc = endDbFileChange( std::string("tmp"),
748 dbFileName,
749 success,
750 errMsg );
751
752 if (rc != NO_ERROR)
753 {
754 throw WeException( errMsg, rc );
755 }
756 }
757
758 //------------------------------------------------------------------------------
759 // Open and read the bulk rollback metadata file for the specified table OID
760 // and DBRoot. The contents of the metadata file is returned in the meta-
761 // DataStream argument.
762 //------------------------------------------------------------------------------
openMetaDataFile(OID tableOID,uint16_t dbRoot,std::istringstream & metaDataStream)763 void ConfirmHdfsDbFile::openMetaDataFile(OID tableOID,
764 uint16_t dbRoot,
765 std::istringstream& metaDataStream)
766 {
767 std::string bulkRollbackPath( Config::getDBRootByNum( dbRoot ) );
768
769 // Construct file name and check for it's existence
770 std::ostringstream ossFileName;
771 ossFileName << '/' << DBROOT_BULK_ROLLBACK_SUBDIR << '/' << tableOID;
772 fMetaFileName = bulkRollbackPath;
773 fMetaFileName += ossFileName.str();
774
775 // Return if the meta-data file does not exist.
776 if ( !fFs.exists( fMetaFileName.c_str() ) )
777 {
778 std::ostringstream oss;
779 oss << "Bulk rollback meta-data file " <<
780 fMetaFileName << " does not exist.";
781
782 throw WeException( oss.str(), ERR_FILE_NOT_EXIST );
783 }
784
785 // Open the file
786 boost::scoped_ptr<IDBDataFile> metaFile;
787 errno = 0;
788 metaFile.reset(idbdatafile::IDBDataFile::open(
789 idbdatafile::IDBPolicy::getType(fMetaFileName.c_str(),
790 idbdatafile::IDBPolicy::WRITEENG),
791 fMetaFileName.c_str(), "rb", 0) );
792
793 if ( !metaFile )
794 {
795 int errRc = errno;
796 std::ostringstream oss;
797 oss << "Error opening bulk rollback meta-data file " <<
798 fMetaFileName << "; err-" <<
799 errRc << "; " << strerror( errRc );
800
801 throw WeException( oss.str(), ERR_FILE_OPEN );
802 }
803
804 // First record in the file must be a Version record.
805 char inBuf[ BUF_SIZE ];
806 ssize_t metaFileSize = fFs.size( fMetaFileName.c_str() );
807 boost::scoped_array<char> buf( new char[ metaFileSize ] );
808 // retry 10 times for partial reads, just in case
809 ssize_t readSofar = 0; // bytes read so far
810 ssize_t bytes = 0; // bytes read by one pread
811 char* p = buf.get();
812
813 for (int i = 0; i < 10 && readSofar < metaFileSize; i++)
814 {
815 errno = 0;
816 bytes = metaFile->pread( p + readSofar,
817 readSofar,
818 metaFileSize - readSofar);
819
820 if (bytes < 0)
821 break;
822
823 readSofar += bytes;
824 }
825
826 if ( readSofar != metaFileSize )
827 {
828 int errRc = errno;
829 std::ostringstream oss;
830 oss << "Error reading bulk rollback meta-data file "
831 << fMetaFileName << "; read/expect:" << readSofar << "/"
832 << metaFileSize
833 << "; err-" << errRc << "; " << strerror( errRc );
834
835 throw WeException( oss.str(), ERR_FILE_READ );
836 }
837
838 // put the data in a string stream
839 metaDataStream.str( std::string( p, metaFileSize ) );
840 buf.reset();
841
842 // read data
843 metaDataStream.getline( inBuf, BUF_SIZE );
844
845 if (!RBMetaWriter::verifyVersion4(inBuf))
846 {
847 std::ostringstream oss;
848 oss << "Invalid version record in meta-data file " << fMetaFileName
849 << "; record-<" << inBuf << ">";
850
851 throw WeException( oss.str(), ERR_INVALID_PARAM );
852 }
853 }
854
855 } // end of namespace
856