1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*****************************************************************************
19  * $Id: we_brmreporter.cpp 4731 2013-08-09 22:37:44Z wweeks $
20  *
21  ****************************************************************************/
22 
23 /** @file
24  * Implementation of the BRMReporter class
25  */
26 
27 #include "we_brmreporter.h"
28 
29 #include <cerrno>
30 #include <set>
31 
32 #include "we_brm.h"
33 #include "we_convertor.h"
34 #include "we_log.h"
35 #include "cacheutils.h"
36 #include "IDBPolicy.h"
37 
38 namespace WriteEngine
39 {
40 
41 //------------------------------------------------------------------------------
42 // Constructor
43 //------------------------------------------------------------------------------
BRMReporter(Log * logger,const std::string & tableName)44 BRMReporter::BRMReporter(Log* logger, const std::string& tableName) :
45     fLog( logger ),
46     fTableName(tableName )
47 {
48 }
49 
50 //------------------------------------------------------------------------------
51 // Destructor
52 //------------------------------------------------------------------------------
~BRMReporter()53 BRMReporter::~BRMReporter( )
54 {
55 }
56 
57 //------------------------------------------------------------------------------
58 // Add a casual partition update to "this" BRMReporter's collection
59 //------------------------------------------------------------------------------
addToCPInfo(const BRM::CPInfoMerge & cpEntry)60 void BRMReporter::addToCPInfo(const BRM::CPInfoMerge& cpEntry)
61 {
62     fCPInfo.push_back( cpEntry );
63 }
64 
65 //------------------------------------------------------------------------------
66 // Add an HWM update to "this" BRMReporter's collection
67 //------------------------------------------------------------------------------
addToHWMInfo(const BRM::BulkSetHWMArg & hwmEntry)68 void BRMReporter::addToHWMInfo(const BRM::BulkSetHWMArg& hwmEntry)
69 {
70     fHWMInfo.push_back( hwmEntry );
71 }
72 
73 //------------------------------------------------------------------------------
74 // Add Column File infomation to "this" BRMReporter's collection
75 //------------------------------------------------------------------------------
addToFileInfo(const BRM::FileInfo & fileEntry)76 void BRMReporter::addToFileInfo(const BRM::FileInfo& fileEntry)
77 {
78     fFileInfo.push_back( fileEntry );
79 }
80 
81 //------------------------------------------------------------------------------
82 // Add Dictionary File infomation to "this" BRMReporter's collection
83 //------------------------------------------------------------------------------
addToDctnryFileInfo(const BRM::FileInfo & fileEntry)84 void BRMReporter::addToDctnryFileInfo(const BRM::FileInfo& fileEntry)
85 {
86     fDctnryFileInfo.push_back( fileEntry );
87 }
88 
89 //------------------------------------------------------------------------------
90 // Add Critical Error Message
91 //------------------------------------------------------------------------------
addToErrMsgEntry(const std::string & errCritMsg)92 void BRMReporter::addToErrMsgEntry(const std::string& errCritMsg)
93 {
94     fCritErrMsgs.push_back(errCritMsg);
95 }
96 
97 //------------------------------------------------------------------------------
98 // Send ErrMsg to BRM Rpt File
99 //------------------------------------------------------------------------------
sendErrMsgToFile(const std::string & rptFileName)100 void BRMReporter::sendErrMsgToFile(const std::string& rptFileName)
101 {
102     if ((!rptFileName.empty()) && (fRptFileName.empty()))
103         fRptFileName = rptFileName;
104 
105     if ((!fRptFileName.empty()) && (fCritErrMsgs.size()))
106     {
107         fRptFile.open( fRptFileName.c_str(), std::ios_base::app);
108 
109         if ( fRptFile.good() )
110         {
111             for (unsigned int i = 0; i < fCritErrMsgs.size(); i++)
112             {
113                 fRptFile << "MERR: " << fCritErrMsgs[i] << std::endl;
114                 //std::cout <<"**********" << fCritErrMsgs[i] << std::endl;
115             }
116         }
117     }
118 }
119 
120 //------------------------------------------------------------------------------
121 // Send collection information (Casual Partition and HWM) to applicable
122 // destination.  If file name given, then data is saved to the file, else
123 // the data is sent directly to BRM.
124 //
125 // On HDFS system, this function also notifies PrimProc to flush certain file
126 // descriptors (for columns and dictionary store), and blocks (for dictionary
127 // store).  Any DB file changes should have been "confirmed" prior to calling
128 // sendBRMInfo().  Once PrimProc cache is flushed, we can send the BRM updates.
129 //------------------------------------------------------------------------------
sendBRMInfo(const std::string & rptFileName,const std::vector<std::string> & errFiles,const std::vector<std::string> & badFiles)130 int BRMReporter::sendBRMInfo(const std::string& rptFileName,
131                              const std::vector<std::string>& errFiles,
132                              const std::vector<std::string>& badFiles)
133 {
134     int rc = NO_ERROR;
135 
136     // For HDFS, we need to flush PrimProc cache since we modify HDFS files
137     // by rewriting the files.
138     if (idbdatafile::IDBPolicy::useHdfs())
139     {
140         std::vector<BRM::FileInfo> allFileInfo;
141 
142         if ( fFileInfo.size() > 0 )
143         {
144             for (unsigned k = 0; k < fFileInfo.size(); k++)
145             {
146                 allFileInfo.push_back( fFileInfo[k] );
147             }
148         }
149 
150         std::vector<BRM::OID_t> oidsToFlush;
151         std::set<BRM::OID_t>    oidSet;
152 
153         if (fDctnryFileInfo.size() > 0)
154         {
155             for (unsigned k = 0; k < fDctnryFileInfo.size(); k++)
156             {
157                 allFileInfo.push_back( fDctnryFileInfo[k] );
158                 oidSet.insert( fDctnryFileInfo[k].oid );
159             }
160 
161             // Store dictionary oids in std::set first, to eliminate duplicates
162             if (oidSet.size() > 0)
163             {
164                 for (std::set<BRM::OID_t>::const_iterator iter = oidSet.begin();
165                         iter != oidSet.end();
166                         ++iter)
167                 {
168                     oidsToFlush.push_back( *iter );
169                 }
170             }
171         }
172 
173         // Flush PrimProc FD cache
174         if (allFileInfo.size() > 0)
175         {
176             cacheutils::purgePrimProcFdCache(allFileInfo,
177                                              Config::getLocalModuleID());
178         }
179 
180         // Flush PrimProc block cache
181         if (oidsToFlush.size() > 0)
182             cacheutils::flushOIDsFromCache(oidsToFlush);
183     }
184 
185     // After flushing cache (for HDFS), now we can update BRM
186     if (rptFileName.empty())
187     {
188         // Set Casual Partition (CP) info for BRM for this column.  Be sure to
189         // do this before we set the HWM.  Updating HWM 1st could cause a race
190         // condition resulting in a query being based on temporary outdated CP
191         // info.
192 
193         rc = sendHWMandCPToBRM( );
194 
195         // If HWM error occurs, we fail the job.
196         if (rc != NO_ERROR)
197         {
198             return rc;
199         }
200     }
201     else
202     {
203         fRptFileName = rptFileName;
204 
205         rc = openRptFile( );
206 
207         if (rc != NO_ERROR)
208         {
209             return rc;
210         }
211 
212         sendCPToFile ( );
213         sendHWMToFile( );
214 
215         // Log the list of *.err and *.bad files
216         for (unsigned k = 0; k < errFiles.size(); k++)
217         {
218             fRptFile << "ERR: " << errFiles[k] << std::endl;
219         }
220 
221         for (unsigned k = 0; k < badFiles.size(); k++)
222         {
223             fRptFile << "BAD: " << badFiles[k] << std::endl;
224         }
225 
226     }
227 
228     return rc;
229 }
230 
231 //------------------------------------------------------------------------------
232 // Send HWM and CP update information to BRM
233 //------------------------------------------------------------------------------
sendHWMandCPToBRM()234 int BRMReporter::sendHWMandCPToBRM( )
235 {
236     int rc = NO_ERROR;
237 
238     if (fHWMInfo.size() > 0)
239     {
240         std::ostringstream oss;
241         oss << "Committing " << fHWMInfo.size() << " HWM update(s) for table " <<
242             fTableName << " to BRM";
243         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
244     }
245 
246     if (fCPInfo.size() > 0)
247     {
248         std::ostringstream oss;
249         oss << "Committing " << fCPInfo.size() << " CP update(s) for table " <<
250             fTableName << " to BRM";
251         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
252     }
253 
254     if ((fHWMInfo.size() > 0) || (fCPInfo.size() > 0))
255     {
256         rc = BRMWrapper::getInstance()->bulkSetHWMAndCP( fHWMInfo, fCPInfo );
257 
258         if (rc != NO_ERROR)
259         {
260             WErrorCodes ec;
261             std::ostringstream oss;
262             oss << "Error updating BRM with HWM and CP data for table " <<
263                 fTableName << "; " << ec.errorString(rc);
264             fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
265             return rc;
266         }
267     }
268 
269     return rc;
270 }
271 
272 //------------------------------------------------------------------------------
273 // Save HWM update information to a file
274 //------------------------------------------------------------------------------
sendHWMToFile()275 void BRMReporter::sendHWMToFile( )
276 {
277     if (fHWMInfo.size() > 0)
278     {
279         std::ostringstream oss;
280         oss << "Writing " << fHWMInfo.size() << " HWM update(s) for table " <<
281             fTableName << " to report file " << fRptFileName;
282         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
283 
284         for (unsigned int i = 0; i < fHWMInfo.size(); i++)
285         {
286             fRptFile << "HWM: " << fHWMInfo[i].oid     << ' ' <<
287                      fHWMInfo[i].partNum << ' ' <<
288                      fHWMInfo[i].segNum  << ' ' <<
289                      fHWMInfo[i].hwm     << std::endl;
290         }
291     }
292 }
293 
294 //------------------------------------------------------------------------------
295 // Send Casual Partition update information to BRM
296 //------------------------------------------------------------------------------
sendCPToFile()297 void BRMReporter::sendCPToFile( )
298 {
299     if (fCPInfo.size() > 0)
300     {
301         std::ostringstream oss;
302         oss << "Writing " << fCPInfo.size() << " CP updates for table " <<
303             fTableName << " to report file " << fRptFileName;
304         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
305 
306         for (unsigned int i = 0; i < fCPInfo.size(); i++)
307         {
308             fRptFile << "CP: " << fCPInfo[i].startLbid << ' ' <<
309                      fCPInfo[i].max       << ' ' <<
310                      fCPInfo[i].min       << ' ' <<
311                      fCPInfo[i].seqNum    << ' ' <<
312                      fCPInfo[i].type      << ' ' <<
313                      fCPInfo[i].newExtent << std::endl;
314         }
315     }
316 }
317 
318 //------------------------------------------------------------------------------
319 // Report Summary totals; only applicable if we are generating a report file.
320 //------------------------------------------------------------------------------
reportTotals(uint64_t totalReadRows,uint64_t totalInsertedRows,const std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType,uint64_t,uint64_t>> & satCounts)321 void BRMReporter::reportTotals(
322     uint64_t totalReadRows,
323     uint64_t totalInsertedRows,
324     const std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType, uint64_t, uint64_t> >& satCounts)
325 {
326     if (fRptFile.is_open())
327     {
328         fRptFile << "ROWS: " << totalReadRows << ' ' <<
329                  totalInsertedRows << std::endl;
330 
331         for (unsigned k = 0; k < satCounts.size(); k++)
332         {
333             if (boost::get<0>(satCounts[k]) > 0)
334                 fRptFile << "DATA: " << k << ' ' << boost::get<0>(satCounts[k]) << ' ' <<
335                          boost::get<1>(satCounts[k]) << ' ' << boost::get<2>(satCounts[k]) << std::endl;
336         }
337 
338         closeRptFile();
339     }
340 }
341 
342 //------------------------------------------------------------------------------
343 // Generate report file indicating that user's import exceeded allowable error
344 // limit.
345 //------------------------------------------------------------------------------
rptMaxErrJob(const std::string & rptFileName,const std::vector<std::string> & errFiles,const std::vector<std::string> & badFiles)346 void BRMReporter::rptMaxErrJob(const std::string& rptFileName,
347                                const std::vector<std::string>& errFiles,
348                                const std::vector<std::string>& badFiles )
349 {
350     // We only write out information if we are generating a report file.
351     if (!rptFileName.empty())
352     {
353         fRptFileName = rptFileName;
354 
355         int rc = openRptFile();
356 
357         // No need to return bad return code; we are already in a job that
358         // is aborting.  openRptFile() at least logged the error.
359         if (rc != NO_ERROR)
360         {
361             return;
362         }
363 
364         // Log the list of *.err and *.bad files
365         for (unsigned k = 0; k < errFiles.size(); k++)
366         {
367             fRptFile << "ERR: " << errFiles[k] << std::endl;
368         }
369 
370         for (unsigned k = 0; k < badFiles.size(); k++)
371         {
372             fRptFile << "BAD: " << badFiles[k] << std::endl;
373         }
374 
375         closeRptFile();
376     }
377 }
378 
379 //------------------------------------------------------------------------------
380 // Open BRM report file
381 //------------------------------------------------------------------------------
openRptFile()382 int BRMReporter::openRptFile( )
383 {
384     fRptFile.open( fRptFileName.c_str() );
385 
386     if ( fRptFile.fail() )
387     {
388         int errRc = errno;
389         std::ostringstream oss;
390         std::string eMsg;
391         Convertor::mapErrnoToString(errRc, eMsg);
392         oss << "Error opening BRM report file " << fRptFileName << "; " << eMsg;
393         int rc = ERR_FILE_OPEN;
394         fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
395         return rc;
396     }
397 
398     fRptFile << "#CP:   startLBID max min seqnum type newExtent" << std::endl;
399     fRptFile << "#HWM:  oid partition segment hwm"               << std::endl;
400     fRptFile << "#ROWS: numRowsRead numRowsInserted"             << std::endl;
401     fRptFile << "#DATA: columNum columnType columnOid numOutOfRangeValues"   << std::endl;
402     fRptFile << "#ERR:  error message file"                      << std::endl;
403     fRptFile << "#BAD:  bad data file, with rejected rows"       << std::endl;
404     fRptFile << "#MERR: critical error messages in cpimport.bin" << std::endl;
405 
406     return NO_ERROR;
407 }
408 
409 //------------------------------------------------------------------------------
410 // Close BRM report file
411 //------------------------------------------------------------------------------
closeRptFile()412 void BRMReporter::closeRptFile( )
413 {
414     fRptFile.close();
415 }
416 
417 }
418