1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 #include "we_confirmhdfsdbfile.h"
19 
20 #include <cerrno>
21 #include <cstring>
22 #include <iostream>
23 #include <sstream>
24 #include <vector>
25 
26 #include <boost/scoped_array.hpp>
27 #include <boost/scoped_ptr.hpp>
28 
29 #include "we_define.h"
30 #include "we_config.h"
31 #include "we_fileop.h"
32 #include "we_rbmetawriter.h"
33 #include "IDBPolicy.h"
34 #include "IDBDataFile.h"
35 
36 namespace
37 {
38 const int BUF_SIZE = 1024;  // size of buffer used to read meta data records
39 }
40 
41 namespace WriteEngine
42 {
43 
44 //------------------------------------------------------------------------------
45 // Constructor
46 // This class should typically only be used on an HDFS system, so we could
47 // hardcode this to pass HDFS to getFsi(); but it comes in handy for testing,
48 // to be able to execute this class on a non-HDFS stack as well.  So I rely
49 // on useHdfs() to tell me which FileSystem reference to get.
50 //------------------------------------------------------------------------------
ConfirmHdfsDbFile()51 ConfirmHdfsDbFile::ConfirmHdfsDbFile() :
52     fFs(idbdatafile::IDBPolicy::useHdfs() ?
53         idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::HDFS) :
54         idbdatafile::IDBPolicy::useCloud() ?
55             idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::CLOUD) :
56             idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::BUFFERED))
57 {
58 }
59 
60 //------------------------------------------------------------------------------
61 // Destructor
62 //------------------------------------------------------------------------------
~ConfirmHdfsDbFile()63 ConfirmHdfsDbFile::~ConfirmHdfsDbFile()
64 {
65 }
66 
67 //------------------------------------------------------------------------------
68 // Backup a cdf file and replace it with the updated tmp file.
69 //------------------------------------------------------------------------------
confirmDbFileChange(const std::string & backUpFileType,const std::string & filename,std::string & errMsg) const70 int ConfirmHdfsDbFile::confirmDbFileChange(
71     const std::string& backUpFileType,
72     const std::string& filename,
73     std::string& errMsg) const
74 {
75     // return value
76     int rc = NO_ERROR;
77 
78     // This rlc file should be renamed if success, just skip it
79     if (backUpFileType.compare("rlc") == 0)
80     {
81         return rc;
82     }
83 
84     if (backUpFileType.compare("tmp") != 0 )
85     {
86         std::ostringstream oss;
87         oss << backUpFileType << " is a bad type to confirm DbFile change: " <<
88             filename;
89         errMsg = oss.str();
90         rc = ERR_HDFS_BACKUP;
91 
92         return rc;
93     }
94 
95     // add safety checks, just in case
96     std::string tmp(filename + ".tmp");
97 
98     if (!fFs.exists(tmp.c_str()))  // file already swapped
99         return rc;
100 
101     if (fFs.size(tmp.c_str()) <= 0)
102     {
103         std::ostringstream oss;
104         oss << "tmp file " << tmp << " has bad size" << fFs.size(tmp.c_str());
105         errMsg = oss.str();
106         rc = ERR_COMP_RENAME_FILE;
107 
108         return rc;
109     }
110 
111     // remove the old orig if exists
112     std::string orig(filename + ".orig");
113     errno = 0;
114 
115     if ((fFs.exists(orig.c_str())) &&
116             (fFs.remove(orig.c_str())) != 0)
117     {
118         int errNum = errno;
119         std::ostringstream oss;
120         oss << "remove old " << orig << " failed: " << strerror(errNum);
121         errMsg = oss.str();
122         rc = ERR_COMP_REMOVE_FILE;
123 
124         return rc;
125     }
126 
127     // backup the original
128     errno = 0;
129 
130     if (fFs.rename(filename.c_str(), orig.c_str()) != 0)
131     {
132         int errNum = errno;
133         std::ostringstream oss;
134         oss << "rename " << filename << " to " << orig << " failed: " <<
135             strerror(errNum);
136         errMsg = oss.str();
137         rc = ERR_COMP_RENAME_FILE;
138 
139         return rc;
140     }
141 
142     // rename the new file
143     errno = 0;
144 
145     if (fFs.rename(tmp.c_str(), filename.c_str()) != 0)
146     {
147         int errNum = errno;
148         std::ostringstream oss;
149         oss << "rename " << tmp << " to " << filename << " failed: " <<
150             strerror(errNum);
151         errMsg = oss.str();
152         rc = ERR_COMP_RENAME_FILE;
153 
154         return rc;
155     }
156 
157     return rc;
158 }
159 
160 //------------------------------------------------------------------------------
161 // Finalize the changes to a db file.
162 // If success flag is true, then remove the orig
163 // otherwise, move the orig back to cdf
164 //------------------------------------------------------------------------------
endDbFileChange(const std::string & backUpFileType,const std::string & filename,bool success,std::string & errMsg) const165 int ConfirmHdfsDbFile::endDbFileChange(
166     const std::string& backUpFileType,
167     const std::string& filename,
168     bool  success,
169     std::string& errMsg) const
170 {
171     // return value
172     int rc = NO_ERROR;
173 
174     // This rlc file should be renamed if success, it is useless if failed.
175     if (backUpFileType.compare("rlc") == 0)
176     {
177         std::string rlc(filename + ".rlc");
178 
179         if (fFs.exists(rlc.c_str()))
180             fFs.remove(rlc.c_str()); // TBD-okay to ignore failed removal?
181 
182         return rc;
183     }
184 
185     if (backUpFileType.compare("tmp") != 0)
186     {
187         std::ostringstream oss;
188         oss << backUpFileType << " is a bad type to finalize DbFile change: " <<
189             filename;
190         errMsg = oss.str();
191         rc = ERR_HDFS_BACKUP;
192 
193         return rc;
194     }
195 
196     std::string orig(filename + ".orig");
197 
198     if (success)
199     {
200         // remove the orig file
201         errno = 0;
202 
203         if ((fFs.exists(orig.c_str())) &&
204                 (fFs.remove(orig.c_str())) != 0)
205         {
206             int errNum = errno;
207             std::ostringstream oss;
208             oss << "remove " << orig << " failed: " << strerror(errNum);
209             errMsg = oss.str();
210             rc = ERR_COMP_REMOVE_FILE;
211 
212             return rc;
213         }
214     }
215     else
216     {
217         // restore the orig file
218         if (fFs.exists(orig.c_str()))
219         {
220             errno = 0;
221 
222             // Try to remove file only if it exists
223             if ((fFs.exists(filename.c_str())) &&
224                     (fFs.remove(filename.c_str()) != 0))
225             {
226                 int errNum = errno;
227                 std::ostringstream oss;
228                 oss << "failed restore; remove " << filename << " failed: " <<
229                     strerror(errNum);
230                 errMsg = oss.str();
231                 rc = ERR_COMP_REMOVE_FILE;
232 
233                 return rc;
234             }
235 
236             errno = 0;
237 
238             if (fFs.rename(orig.c_str(), filename.c_str()) != 0)
239             {
240                 int errNum = errno;
241                 std::ostringstream oss;
242                 oss << "failed restore; rename " << orig << " failed: " <<
243                     strerror(errNum);
244                 errMsg = oss.str();
245                 rc = ERR_COMP_RENAME_FILE;
246 
247                 return rc;
248             }
249         }
250 
251         // remove the tmp file
252         std::string tmp(filename + ".tmp");
253         errno = 0;
254 
255         if ((fFs.exists(tmp.c_str())) &&
256                 (fFs.remove(tmp.c_str())) != 0)
257         {
258             int errNum = errno;
259             std::ostringstream oss;
260             oss << "failed restore; remove " << tmp << " failed: " <<
261                 strerror(errNum);
262             errMsg = oss.str();
263             rc = ERR_COMP_REMOVE_FILE;
264 
265             return rc;
266         }
267 
268         // remove the chunk shifting helper
269         std::string rlc(filename + ".rlc");
270         errno = 0;
271 
272         if ((fFs.exists(rlc.c_str())) &&
273                 (fFs.remove(rlc.c_str())) != 0)
274         {
275             int errNum = errno;
276             std::ostringstream oss;
277             oss << "failed restore; remove " << rlc << " failed: " <<
278                 strerror(errNum);
279             errMsg = oss.str();
280             rc = ERR_COMP_REMOVE_FILE;
281 
282             return rc;
283         }
284     }
285 
286     return rc;
287 }
288 
289 //------------------------------------------------------------------------------
290 // Confirm the changes to the hwm DB files listed in the bulk rollback meta
291 // data file corresponding to the specified table OID.
292 //------------------------------------------------------------------------------
confirmDbFileListFromMetaFile(OID tableOID,std::string & errMsg)293 int ConfirmHdfsDbFile::confirmDbFileListFromMetaFile(
294     OID tableOID,
295     std::string& errMsg)
296 {
297     int rc = NO_ERROR;
298 
299     try
300     {
301         std::vector<uint16_t> dbRoots;
302         Config::getRootIdList( dbRoots );
303 
304         for (unsigned m = 0; m < dbRoots.size(); m++)
305         {
306             std::istringstream metaDataStream;
307             openMetaDataFile ( tableOID,
308                                dbRoots[m], metaDataStream );
309 
310             confirmDbFiles( metaDataStream );
311         }
312     }
313     catch (WeException& ex)
314     {
315         std::ostringstream oss;
316         oss << "Error confirming changes to table " << tableOID <<
317             "; " << ex.what();
318         errMsg = oss.str();
319         rc = ex.errorCode();
320     }
321     catch (std::exception& ex)
322     {
323         std::ostringstream oss;
324         oss << "Error confirming changes to table " << tableOID <<
325             "; " << ex.what();
326         errMsg = oss.str();
327         rc = ERR_UNKNOWN;
328     }
329 
330     return rc;
331 }
332 
333 //------------------------------------------------------------------------------
334 // Confirm the changes to the hwm DB files listed in the bulk rollback meta
335 // data file stream stored in metaDataStream.
336 //------------------------------------------------------------------------------
confirmDbFiles(std::istringstream & metaDataStream) const337 void ConfirmHdfsDbFile::confirmDbFiles(std::istringstream& metaDataStream) const
338 {
339     char inBuf[ BUF_SIZE ];
340 
341     // Loop through the records in the meta-data file
342     while (metaDataStream.getline( inBuf, BUF_SIZE ))
343     {
344         // Restore Files for current DBRoot
345         if (RBMetaWriter::verifyColumn1Rec(inBuf))
346         {
347             confirmColumnDbFile(inBuf);
348         }
349         else if (RBMetaWriter::verifyDStore1Rec(inBuf))
350         {
351             confirmDctnryStoreDbFile(inBuf);
352         }
353     }
354 }
355 
356 //------------------------------------------------------------------------------
357 // Confirm the changes to the hwm column DB file described in the bulk
358 // rollback meta data file record stored in inBuf.
359 //------------------------------------------------------------------------------
confirmColumnDbFile(const char * inBuf) const360 void ConfirmHdfsDbFile::confirmColumnDbFile(const char* inBuf) const
361 {
362     char        recType[100];
363     OID         columnOID;
364     uint32_t    dbRootHwm;
365     uint32_t    partNumHwm;
366     uint32_t    segNumHwm;
367     HWM         lastLocalHwm;
368     int         colTypeInt;
369     char        colTypeName[100];
370     uint32_t    colWidth;
371     int         compressionType = 0; // optional parameter
372 
373     // Read meta-data record
374     int numFields = sscanf(inBuf, "%s %u %u %u %u %u %d %s %u %d",
375                            recType, &columnOID,
376                            &dbRootHwm, &partNumHwm, &segNumHwm, &lastLocalHwm,
377                            &colTypeInt, colTypeName, &colWidth, &compressionType );
378 
379     if (numFields < 9) // compressionType is optional
380     {
381         std::ostringstream oss;
382         oss << "Invalid COLUM1 record in meta-data file " <<
383             fMetaFileName << "; record-<" << inBuf << ">";
384 
385         throw WeException( oss.str(), ERR_INVALID_PARAM );
386     }
387 
388     // Construct the DB file name
389     char dbFileName[FILE_NAME_SIZE];
390     FileOp dbFile(false);
391     int rc = dbFile.getFileName( columnOID,
392                                  dbFileName,
393                                  dbRootHwm,
394                                  partNumHwm,
395                                  segNumHwm );
396 
397     if (rc != NO_ERROR)
398     {
399         WErrorCodes ec;
400         std::ostringstream oss;
401         oss << "Error constructing column filename to confirm changes" <<
402             "; columnOID-" << columnOID  <<
403             "; dbRoot-"    << dbRootHwm  <<
404             "; partNum-"   << partNumHwm <<
405             "; segNum-"    << segNumHwm  <<
406             "; "           << ec.errorString(rc);
407 
408         throw WeException( oss.str(), rc );
409     }
410 
411     // Confirm the changes to the DB file name
412     std::string errMsg;
413     rc = confirmDbFileChange( std::string("tmp"),
414                               dbFileName,
415                               errMsg );
416 
417     if (rc != NO_ERROR)
418     {
419         throw WeException( errMsg, rc );
420     }
421 }
422 
423 //------------------------------------------------------------------------------
424 // Confirm the changes to the hwm dctnry store DB file described in the bulk
425 // rollback meta data file record stored in inBuf.
426 //------------------------------------------------------------------------------
confirmDctnryStoreDbFile(const char * inBuf) const427 void ConfirmHdfsDbFile::confirmDctnryStoreDbFile(const char* inBuf) const
428 {
429     char      recType[100];
430     OID       dColumnOID;
431     OID       dStoreOID;
432     uint32_t  dbRootHwm;
433     uint32_t  partNumHwm;
434     uint32_t  segNumHwm;
435     HWM       localHwm;
436     int       compressionType = 0; // optional parameter
437 
438     // Read meta-data record
439     int numFields = sscanf(inBuf, "%s %u %u %u %u %u %u %d",
440                            recType, &dColumnOID, &dStoreOID,
441                            &dbRootHwm, &partNumHwm, &segNumHwm, &localHwm, &compressionType );
442 
443     if (numFields < 7) // compressionType optional
444     {
445         std::ostringstream oss;
446         oss << "Invalid DSTOR1 record in meta-data file " <<
447             fMetaFileName << "; record-<" << inBuf << ">";
448 
449         throw WeException( oss.str(), ERR_INVALID_PARAM );
450     }
451 
452     // Construct the DB file name
453     char dbFileName[FILE_NAME_SIZE];
454     FileOp dbFile(false);
455     int rc = dbFile.getFileName( dStoreOID,
456                                  dbFileName,
457                                  dbRootHwm,
458                                  partNumHwm,
459                                  segNumHwm );
460 
461     if (rc != NO_ERROR)
462     {
463         WErrorCodes ec;
464         std::ostringstream oss;
465         oss << "Error constructing dictionary store filename to confirm changes" <<
466             "; columnOID-" << dStoreOID  <<
467             "; dbRoot-"    << dbRootHwm  <<
468             "; partNum-"   << partNumHwm <<
469             "; segNum-"    << segNumHwm  <<
470             "; "           << ec.errorString(rc);
471 
472         throw WeException( oss.str(), rc );
473     }
474 
475     // Confirm the changes to the DB file name
476     std::string errMsg;
477     rc = confirmDbFileChange( std::string("tmp"),
478                               dbFileName,
479                               errMsg );
480 
481     if (rc != NO_ERROR)
482     {
483         throw WeException( errMsg, rc );
484     }
485 }
486 
487 //------------------------------------------------------------------------------
488 // End the changes to the hwm DB files listed in the bulk rollback meta
489 // data file corresponding to the specified table OID.  Delete temp files.
490 //------------------------------------------------------------------------------
endDbFileListFromMetaFile(OID tableOID,bool success,std::string & errMsg)491 int ConfirmHdfsDbFile::endDbFileListFromMetaFile(
492     OID tableOID,
493     bool success,
494     std::string& errMsg)
495 {
496     int rc = NO_ERROR;
497     errMsg.clear();
498 
499     std::vector<uint16_t> dbRoots;
500     Config::getRootIdList( dbRoots );
501 
502     for (unsigned m = 0; m < dbRoots.size(); m++)
503     {
504         std::istringstream metaDataStream;
505 
506         try
507         {
508             std::istringstream metaDataStream;
509             openMetaDataFile ( tableOID,
510                                dbRoots[m], metaDataStream );
511 
512             endDbFiles( metaDataStream, success );
513         }
514         // We catch any errors, but not deleting a temp file is not fatal,
515         // so we capture the error msg and keep going if a problem occurs.
516         // We return a concatenated list of error msgs if multiple errors
517         // take place.
518         catch (WeException& ex)
519         {
520             if (errMsg.size() == 0)
521             {
522                 std::ostringstream oss;
523                 oss << "Error deleting temp files for table " << tableOID <<
524                     "; " << ex.what();
525                 errMsg = oss.str();
526                 rc = ex.errorCode();
527             }
528             else
529             {
530                 errMsg += "; ";
531                 errMsg += ex.what();
532             }
533         }
534         catch (std::exception& ex)
535         {
536             if (errMsg.size() == 0)
537             {
538                 std::ostringstream oss;
539                 oss << "Error deleting temp files for table " << tableOID <<
540                     "; " << ex.what();
541                 errMsg = oss.str();
542                 rc = ERR_UNKNOWN;
543             }
544             else
545             {
546                 errMsg += "; ";
547                 errMsg += ex.what();
548             }
549         }
550     }
551 
552     return rc;
553 }
554 
555 //------------------------------------------------------------------------------
556 // End the changes to the hwm DB files listed in the bulk rollback meta
557 // data file stream stored in metaDataStream.  Delete temp files.
558 //------------------------------------------------------------------------------
endDbFiles(std::istringstream & metaDataStream,bool success) const559 void ConfirmHdfsDbFile::endDbFiles(
560     std::istringstream& metaDataStream,
561     bool success) const
562 {
563     char inBuf[ BUF_SIZE ];
564     std::string errMsg;
565     int rc = NO_ERROR;
566 
567     // Loop through the records in the meta-data file
568     while (metaDataStream.getline( inBuf, BUF_SIZE ))
569     {
570         try
571         {
572             // Delete Temp Files for current DBRoot
573             if (RBMetaWriter::verifyColumn1Rec(inBuf))
574             {
575                 endColumnDbFile(inBuf, success);
576             }
577             else if (RBMetaWriter::verifyDStore1Rec(inBuf))
578             {
579                 endDctnryStoreDbFile(inBuf, success);
580             }
581         }
582         // We catch any errors, but not deleting a temp file is not fatal,
583         // so we capture the error msg and keep going if a problem occurs.
584         // We return a concatenated list of error msgs if multiple errors
585         // take place.
586         catch (WeException& ex)
587         {
588             if (errMsg.size() == 0)
589             {
590                 rc = ex.errorCode();
591             }
592             else
593             {
594                 errMsg += "; ";
595             }
596 
597             errMsg += ex.what();
598         }
599         catch (std::exception& ex)
600         {
601             if (errMsg.size() == 0)
602             {
603                 rc = ERR_UNKNOWN;
604             }
605             else
606             {
607                 errMsg += "; ";
608             }
609 
610             errMsg += ex.what();
611         }
612     }
613 
614     // Throw exception with cumulative list of any error msgs
615     if (errMsg.size() > 0)
616     {
617         throw WeException( errMsg, rc );
618     }
619 }
620 
621 //------------------------------------------------------------------------------
622 // End the changes to the hwm column DB file described in the bulk
623 // rollback meta data file record stored in inBuf.  Delete the temp file.
624 //------------------------------------------------------------------------------
endColumnDbFile(const char * inBuf,bool success) const625 void ConfirmHdfsDbFile::endColumnDbFile(
626     const char* inBuf,
627     bool success) const
628 {
629     char        recType[100];
630     OID         columnOID;
631     uint32_t    dbRootHwm;
632     uint32_t    partNumHwm;
633     uint32_t    segNumHwm;
634     HWM         lastLocalHwm;
635     int         colTypeInt;
636     char        colTypeName[100];
637     uint32_t    colWidth;
638     int         compressionType = 0; // optional parameter
639 
640     // Read meta-data record
641     int numFields = sscanf(inBuf, "%s %u %u %u %u %u %d %s %u %d",
642                            recType, &columnOID,
643                            &dbRootHwm, &partNumHwm, &segNumHwm, &lastLocalHwm,
644                            &colTypeInt, colTypeName, &colWidth, &compressionType );
645 
646     if (numFields < 9) // compressionType is optional
647     {
648         std::ostringstream oss;
649         oss << "Invalid COLUM1 record in meta-data file " <<
650             fMetaFileName << "; record-<" << inBuf << ">";
651 
652         throw WeException( oss.str(), ERR_INVALID_PARAM );
653     }
654 
655     // Construct the DB file name
656     char dbFileName[FILE_NAME_SIZE];
657     FileOp dbFile(false);
658     int rc = dbFile.getFileName( columnOID,
659                                  dbFileName,
660                                  dbRootHwm,
661                                  partNumHwm,
662                                  segNumHwm );
663 
664     if (rc != NO_ERROR)
665     {
666         WErrorCodes ec;
667         std::ostringstream oss;
668         oss << "Error constructing column filename to end changes" <<
669             "; columnOID-" << columnOID  <<
670             "; dbRoot-"    << dbRootHwm  <<
671             "; partNum-"   << partNumHwm <<
672             "; segNum-"    << segNumHwm  <<
673             "; "           << ec.errorString(rc);
674 
675         throw WeException( oss.str(), rc );
676     }
677 
678     // Confirm the changes to the DB file name
679     std::string errMsg;
680     rc = endDbFileChange( std::string("tmp"),
681                           dbFileName,
682                           success,
683                           errMsg );
684 
685     if (rc != NO_ERROR)
686     {
687         throw WeException( errMsg, rc );
688     }
689 }
690 
691 //------------------------------------------------------------------------------
692 // End the changes to the hwm dctnry store DB file described in the bulk
693 // rollback meta data file record stored in inBuf.  Delete the temp file.
694 //------------------------------------------------------------------------------
endDctnryStoreDbFile(const char * inBuf,bool success) const695 void ConfirmHdfsDbFile::endDctnryStoreDbFile(
696     const char* inBuf,
697     bool success) const
698 {
699     char      recType[100];
700     OID       dColumnOID;
701     OID       dStoreOID;
702     uint32_t  dbRootHwm;
703     uint32_t  partNumHwm;
704     uint32_t  segNumHwm;
705     HWM       localHwm;
706     int       compressionType = 0; // optional parameter
707 
708     // Read meta-data record
709     int numFields = sscanf(inBuf, "%s %u %u %u %u %u %u %d",
710                            recType, &dColumnOID, &dStoreOID,
711                            &dbRootHwm, &partNumHwm, &segNumHwm, &localHwm, &compressionType );
712 
713     if (numFields < 7) // compressionType optional
714     {
715         std::ostringstream oss;
716         oss << "Invalid DSTOR1 record in meta-data file " <<
717             fMetaFileName << "; record-<" << inBuf << ">";
718 
719         throw WeException( oss.str(), ERR_INVALID_PARAM );
720     }
721 
722     // Construct the DB file name
723     char dbFileName[FILE_NAME_SIZE];
724     FileOp dbFile(false);
725     int rc = dbFile.getFileName( dStoreOID,
726                                  dbFileName,
727                                  dbRootHwm,
728                                  partNumHwm,
729                                  segNumHwm );
730 
731     if (rc != NO_ERROR)
732     {
733         WErrorCodes ec;
734         std::ostringstream oss;
735         oss << "Error constructing dictionary store filename to end changes" <<
736             "; columnOID-" << dStoreOID  <<
737             "; dbRoot-"    << dbRootHwm  <<
738             "; partNum-"   << partNumHwm <<
739             "; segNum-"    << segNumHwm  <<
740             "; "           << ec.errorString(rc);
741 
742         throw WeException( oss.str(), rc );
743     }
744 
745     // Confirm the changes to the DB file name
746     std::string errMsg;
747     rc = endDbFileChange( std::string("tmp"),
748                           dbFileName,
749                           success,
750                           errMsg );
751 
752     if (rc != NO_ERROR)
753     {
754         throw WeException( errMsg, rc );
755     }
756 }
757 
758 //------------------------------------------------------------------------------
759 // Open and read the bulk rollback metadata file for the specified table OID
760 // and DBRoot.  The contents of the metadata file is returned in the meta-
761 // DataStream argument.
762 //------------------------------------------------------------------------------
openMetaDataFile(OID tableOID,uint16_t dbRoot,std::istringstream & metaDataStream)763 void ConfirmHdfsDbFile::openMetaDataFile(OID tableOID,
764         uint16_t dbRoot,
765         std::istringstream& metaDataStream)
766 {
767     std::string bulkRollbackPath( Config::getDBRootByNum( dbRoot ) );
768 
769     // Construct file name and check for it's existence
770     std::ostringstream ossFileName;
771     ossFileName << '/' << DBROOT_BULK_ROLLBACK_SUBDIR << '/' << tableOID;
772     fMetaFileName  = bulkRollbackPath;
773     fMetaFileName += ossFileName.str();
774 
775     // Return if the meta-data file does not exist.
776     if ( !fFs.exists( fMetaFileName.c_str() ) )
777     {
778         std::ostringstream oss;
779         oss << "Bulk rollback meta-data file " <<
780             fMetaFileName << " does not exist.";
781 
782         throw WeException( oss.str(), ERR_FILE_NOT_EXIST );
783     }
784 
785     // Open the file
786     boost::scoped_ptr<IDBDataFile> metaFile;
787     errno = 0;
788     metaFile.reset(idbdatafile::IDBDataFile::open(
789                        idbdatafile::IDBPolicy::getType(fMetaFileName.c_str(),
790                                idbdatafile::IDBPolicy::WRITEENG),
791                        fMetaFileName.c_str(), "rb", 0) );
792 
793     if ( !metaFile )
794     {
795         int errRc = errno;
796         std::ostringstream oss;
797         oss << "Error opening bulk rollback meta-data file " <<
798             fMetaFileName << "; err-" <<
799             errRc << "; " << strerror( errRc );
800 
801         throw WeException( oss.str(), ERR_FILE_OPEN );
802     }
803 
804     // First record in the file must be a Version record.
805     char inBuf[ BUF_SIZE ];
806     ssize_t metaFileSize = fFs.size( fMetaFileName.c_str() );
807     boost::scoped_array<char> buf( new char[ metaFileSize ] );
808     // retry 10 times for partial reads, just in case
809     ssize_t readSofar = 0; // bytes read so far
810     ssize_t bytes = 0;    // bytes read by one pread
811     char* p = buf.get();
812 
813     for (int i = 0; i < 10 && readSofar < metaFileSize; i++)
814     {
815         errno = 0;
816         bytes = metaFile->pread( p + readSofar,
817                                  readSofar,
818                                  metaFileSize - readSofar);
819 
820         if (bytes < 0)
821             break;
822 
823         readSofar += bytes;
824     }
825 
826     if ( readSofar != metaFileSize )
827     {
828         int errRc = errno;
829         std::ostringstream oss;
830         oss << "Error reading bulk rollback meta-data file "
831             << fMetaFileName << "; read/expect:" << readSofar << "/"
832             << metaFileSize
833             << "; err-" << errRc << "; " << strerror( errRc );
834 
835         throw WeException( oss.str(), ERR_FILE_READ );
836     }
837 
838     // put the data in a string stream
839     metaDataStream.str( std::string( p, metaFileSize ) );
840     buf.reset();
841 
842     // read data
843     metaDataStream.getline( inBuf, BUF_SIZE );
844 
845     if (!RBMetaWriter::verifyVersion4(inBuf))
846     {
847         std::ostringstream oss;
848         oss << "Invalid version record in meta-data file " << fMetaFileName
849             << "; record-<" << inBuf << ">";
850 
851         throw WeException( oss.str(), ERR_INVALID_PARAM );
852     }
853 }
854 
855 } // end of namespace
856