1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*****************************************************************************
19 * $Id: we_brmreporter.cpp 4731 2013-08-09 22:37:44Z wweeks $
20 *
21 ****************************************************************************/
22
23 /** @file
24 * Implementation of the BRMReporter class
25 */
26
27 #include "we_brmreporter.h"
28
29 #include <cerrno>
30 #include <set>
31
32 #include "we_brm.h"
33 #include "we_convertor.h"
34 #include "we_log.h"
35 #include "cacheutils.h"
36 #include "IDBPolicy.h"
37
38 namespace WriteEngine
39 {
40
41 //------------------------------------------------------------------------------
42 // Constructor
43 //------------------------------------------------------------------------------
BRMReporter(Log * logger,const std::string & tableName)44 BRMReporter::BRMReporter(Log* logger, const std::string& tableName) :
45 fLog( logger ),
46 fTableName(tableName )
47 {
48 }
49
50 //------------------------------------------------------------------------------
51 // Destructor
52 //------------------------------------------------------------------------------
~BRMReporter()53 BRMReporter::~BRMReporter( )
54 {
55 }
56
57 //------------------------------------------------------------------------------
58 // Add a casual partition update to "this" BRMReporter's collection
59 //------------------------------------------------------------------------------
addToCPInfo(const BRM::CPInfoMerge & cpEntry)60 void BRMReporter::addToCPInfo(const BRM::CPInfoMerge& cpEntry)
61 {
62 fCPInfo.push_back( cpEntry );
63 }
64
65 //------------------------------------------------------------------------------
66 // Add an HWM update to "this" BRMReporter's collection
67 //------------------------------------------------------------------------------
addToHWMInfo(const BRM::BulkSetHWMArg & hwmEntry)68 void BRMReporter::addToHWMInfo(const BRM::BulkSetHWMArg& hwmEntry)
69 {
70 fHWMInfo.push_back( hwmEntry );
71 }
72
73 //------------------------------------------------------------------------------
74 // Add Column File infomation to "this" BRMReporter's collection
75 //------------------------------------------------------------------------------
addToFileInfo(const BRM::FileInfo & fileEntry)76 void BRMReporter::addToFileInfo(const BRM::FileInfo& fileEntry)
77 {
78 fFileInfo.push_back( fileEntry );
79 }
80
81 //------------------------------------------------------------------------------
82 // Add Dictionary File infomation to "this" BRMReporter's collection
83 //------------------------------------------------------------------------------
addToDctnryFileInfo(const BRM::FileInfo & fileEntry)84 void BRMReporter::addToDctnryFileInfo(const BRM::FileInfo& fileEntry)
85 {
86 fDctnryFileInfo.push_back( fileEntry );
87 }
88
89 //------------------------------------------------------------------------------
90 // Add Critical Error Message
91 //------------------------------------------------------------------------------
addToErrMsgEntry(const std::string & errCritMsg)92 void BRMReporter::addToErrMsgEntry(const std::string& errCritMsg)
93 {
94 fCritErrMsgs.push_back(errCritMsg);
95 }
96
97 //------------------------------------------------------------------------------
98 // Send ErrMsg to BRM Rpt File
99 //------------------------------------------------------------------------------
sendErrMsgToFile(const std::string & rptFileName)100 void BRMReporter::sendErrMsgToFile(const std::string& rptFileName)
101 {
102 if ((!rptFileName.empty()) && (fRptFileName.empty()))
103 fRptFileName = rptFileName;
104
105 if ((!fRptFileName.empty()) && (fCritErrMsgs.size()))
106 {
107 fRptFile.open( fRptFileName.c_str(), std::ios_base::app);
108
109 if ( fRptFile.good() )
110 {
111 for (unsigned int i = 0; i < fCritErrMsgs.size(); i++)
112 {
113 fRptFile << "MERR: " << fCritErrMsgs[i] << std::endl;
114 //std::cout <<"**********" << fCritErrMsgs[i] << std::endl;
115 }
116 }
117 }
118 }
119
120 //------------------------------------------------------------------------------
121 // Send collection information (Casual Partition and HWM) to applicable
122 // destination. If file name given, then data is saved to the file, else
123 // the data is sent directly to BRM.
124 //
125 // On HDFS system, this function also notifies PrimProc to flush certain file
126 // descriptors (for columns and dictionary store), and blocks (for dictionary
127 // store). Any DB file changes should have been "confirmed" prior to calling
128 // sendBRMInfo(). Once PrimProc cache is flushed, we can send the BRM updates.
129 //------------------------------------------------------------------------------
sendBRMInfo(const std::string & rptFileName,const std::vector<std::string> & errFiles,const std::vector<std::string> & badFiles)130 int BRMReporter::sendBRMInfo(const std::string& rptFileName,
131 const std::vector<std::string>& errFiles,
132 const std::vector<std::string>& badFiles)
133 {
134 int rc = NO_ERROR;
135
136 // For HDFS, we need to flush PrimProc cache since we modify HDFS files
137 // by rewriting the files.
138 if (idbdatafile::IDBPolicy::useHdfs())
139 {
140 std::vector<BRM::FileInfo> allFileInfo;
141
142 if ( fFileInfo.size() > 0 )
143 {
144 for (unsigned k = 0; k < fFileInfo.size(); k++)
145 {
146 allFileInfo.push_back( fFileInfo[k] );
147 }
148 }
149
150 std::vector<BRM::OID_t> oidsToFlush;
151 std::set<BRM::OID_t> oidSet;
152
153 if (fDctnryFileInfo.size() > 0)
154 {
155 for (unsigned k = 0; k < fDctnryFileInfo.size(); k++)
156 {
157 allFileInfo.push_back( fDctnryFileInfo[k] );
158 oidSet.insert( fDctnryFileInfo[k].oid );
159 }
160
161 // Store dictionary oids in std::set first, to eliminate duplicates
162 if (oidSet.size() > 0)
163 {
164 for (std::set<BRM::OID_t>::const_iterator iter = oidSet.begin();
165 iter != oidSet.end();
166 ++iter)
167 {
168 oidsToFlush.push_back( *iter );
169 }
170 }
171 }
172
173 // Flush PrimProc FD cache
174 if (allFileInfo.size() > 0)
175 {
176 cacheutils::purgePrimProcFdCache(allFileInfo,
177 Config::getLocalModuleID());
178 }
179
180 // Flush PrimProc block cache
181 if (oidsToFlush.size() > 0)
182 cacheutils::flushOIDsFromCache(oidsToFlush);
183 }
184
185 // After flushing cache (for HDFS), now we can update BRM
186 if (rptFileName.empty())
187 {
188 // Set Casual Partition (CP) info for BRM for this column. Be sure to
189 // do this before we set the HWM. Updating HWM 1st could cause a race
190 // condition resulting in a query being based on temporary outdated CP
191 // info.
192
193 rc = sendHWMandCPToBRM( );
194
195 // If HWM error occurs, we fail the job.
196 if (rc != NO_ERROR)
197 {
198 return rc;
199 }
200 }
201 else
202 {
203 fRptFileName = rptFileName;
204
205 rc = openRptFile( );
206
207 if (rc != NO_ERROR)
208 {
209 return rc;
210 }
211
212 sendCPToFile ( );
213 sendHWMToFile( );
214
215 // Log the list of *.err and *.bad files
216 for (unsigned k = 0; k < errFiles.size(); k++)
217 {
218 fRptFile << "ERR: " << errFiles[k] << std::endl;
219 }
220
221 for (unsigned k = 0; k < badFiles.size(); k++)
222 {
223 fRptFile << "BAD: " << badFiles[k] << std::endl;
224 }
225
226 }
227
228 return rc;
229 }
230
231 //------------------------------------------------------------------------------
232 // Send HWM and CP update information to BRM
233 //------------------------------------------------------------------------------
sendHWMandCPToBRM()234 int BRMReporter::sendHWMandCPToBRM( )
235 {
236 int rc = NO_ERROR;
237
238 if (fHWMInfo.size() > 0)
239 {
240 std::ostringstream oss;
241 oss << "Committing " << fHWMInfo.size() << " HWM update(s) for table " <<
242 fTableName << " to BRM";
243 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
244 }
245
246 if (fCPInfo.size() > 0)
247 {
248 std::ostringstream oss;
249 oss << "Committing " << fCPInfo.size() << " CP update(s) for table " <<
250 fTableName << " to BRM";
251 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
252 }
253
254 if ((fHWMInfo.size() > 0) || (fCPInfo.size() > 0))
255 {
256 rc = BRMWrapper::getInstance()->bulkSetHWMAndCP( fHWMInfo, fCPInfo );
257
258 if (rc != NO_ERROR)
259 {
260 WErrorCodes ec;
261 std::ostringstream oss;
262 oss << "Error updating BRM with HWM and CP data for table " <<
263 fTableName << "; " << ec.errorString(rc);
264 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
265 return rc;
266 }
267 }
268
269 return rc;
270 }
271
272 //------------------------------------------------------------------------------
273 // Save HWM update information to a file
274 //------------------------------------------------------------------------------
sendHWMToFile()275 void BRMReporter::sendHWMToFile( )
276 {
277 if (fHWMInfo.size() > 0)
278 {
279 std::ostringstream oss;
280 oss << "Writing " << fHWMInfo.size() << " HWM update(s) for table " <<
281 fTableName << " to report file " << fRptFileName;
282 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
283
284 for (unsigned int i = 0; i < fHWMInfo.size(); i++)
285 {
286 fRptFile << "HWM: " << fHWMInfo[i].oid << ' ' <<
287 fHWMInfo[i].partNum << ' ' <<
288 fHWMInfo[i].segNum << ' ' <<
289 fHWMInfo[i].hwm << std::endl;
290 }
291 }
292 }
293
294 //------------------------------------------------------------------------------
295 // Send Casual Partition update information to BRM
296 //------------------------------------------------------------------------------
sendCPToFile()297 void BRMReporter::sendCPToFile( )
298 {
299 if (fCPInfo.size() > 0)
300 {
301 std::ostringstream oss;
302 oss << "Writing " << fCPInfo.size() << " CP updates for table " <<
303 fTableName << " to report file " << fRptFileName;
304 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
305
306 for (unsigned int i = 0; i < fCPInfo.size(); i++)
307 {
308 fRptFile << "CP: " << fCPInfo[i].startLbid << ' ' <<
309 fCPInfo[i].max << ' ' <<
310 fCPInfo[i].min << ' ' <<
311 fCPInfo[i].seqNum << ' ' <<
312 fCPInfo[i].type << ' ' <<
313 fCPInfo[i].newExtent << std::endl;
314 }
315 }
316 }
317
318 //------------------------------------------------------------------------------
319 // Report Summary totals; only applicable if we are generating a report file.
320 //------------------------------------------------------------------------------
reportTotals(uint64_t totalReadRows,uint64_t totalInsertedRows,const std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType,uint64_t,uint64_t>> & satCounts)321 void BRMReporter::reportTotals(
322 uint64_t totalReadRows,
323 uint64_t totalInsertedRows,
324 const std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType, uint64_t, uint64_t> >& satCounts)
325 {
326 if (fRptFile.is_open())
327 {
328 fRptFile << "ROWS: " << totalReadRows << ' ' <<
329 totalInsertedRows << std::endl;
330
331 for (unsigned k = 0; k < satCounts.size(); k++)
332 {
333 if (boost::get<0>(satCounts[k]) > 0)
334 fRptFile << "DATA: " << k << ' ' << boost::get<0>(satCounts[k]) << ' ' <<
335 boost::get<1>(satCounts[k]) << ' ' << boost::get<2>(satCounts[k]) << std::endl;
336 }
337
338 closeRptFile();
339 }
340 }
341
342 //------------------------------------------------------------------------------
343 // Generate report file indicating that user's import exceeded allowable error
344 // limit.
345 //------------------------------------------------------------------------------
rptMaxErrJob(const std::string & rptFileName,const std::vector<std::string> & errFiles,const std::vector<std::string> & badFiles)346 void BRMReporter::rptMaxErrJob(const std::string& rptFileName,
347 const std::vector<std::string>& errFiles,
348 const std::vector<std::string>& badFiles )
349 {
350 // We only write out information if we are generating a report file.
351 if (!rptFileName.empty())
352 {
353 fRptFileName = rptFileName;
354
355 int rc = openRptFile();
356
357 // No need to return bad return code; we are already in a job that
358 // is aborting. openRptFile() at least logged the error.
359 if (rc != NO_ERROR)
360 {
361 return;
362 }
363
364 // Log the list of *.err and *.bad files
365 for (unsigned k = 0; k < errFiles.size(); k++)
366 {
367 fRptFile << "ERR: " << errFiles[k] << std::endl;
368 }
369
370 for (unsigned k = 0; k < badFiles.size(); k++)
371 {
372 fRptFile << "BAD: " << badFiles[k] << std::endl;
373 }
374
375 closeRptFile();
376 }
377 }
378
379 //------------------------------------------------------------------------------
380 // Open BRM report file
381 //------------------------------------------------------------------------------
openRptFile()382 int BRMReporter::openRptFile( )
383 {
384 fRptFile.open( fRptFileName.c_str() );
385
386 if ( fRptFile.fail() )
387 {
388 int errRc = errno;
389 std::ostringstream oss;
390 std::string eMsg;
391 Convertor::mapErrnoToString(errRc, eMsg);
392 oss << "Error opening BRM report file " << fRptFileName << "; " << eMsg;
393 int rc = ERR_FILE_OPEN;
394 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
395 return rc;
396 }
397
398 fRptFile << "#CP: startLBID max min seqnum type newExtent" << std::endl;
399 fRptFile << "#HWM: oid partition segment hwm" << std::endl;
400 fRptFile << "#ROWS: numRowsRead numRowsInserted" << std::endl;
401 fRptFile << "#DATA: columNum columnType columnOid numOutOfRangeValues" << std::endl;
402 fRptFile << "#ERR: error message file" << std::endl;
403 fRptFile << "#BAD: bad data file, with rejected rows" << std::endl;
404 fRptFile << "#MERR: critical error messages in cpimport.bin" << std::endl;
405
406 return NO_ERROR;
407 }
408
409 //------------------------------------------------------------------------------
410 // Close BRM report file
411 //------------------------------------------------------------------------------
closeRptFile()412 void BRMReporter::closeRptFile( )
413 {
414 fRptFile.close();
415 }
416
417 }
418