1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: we_chunkmanager.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
20 
21 #include <unistd.h>
22 #include <sys/stat.h>
23 #include <sys/time.h>
24 #include <iostream>
25 #include <cstdio>
26 #include <ctime>
27 //#define NDEBUG
28 #include <cassert>
29 using namespace std;
30 
31 #include <boost/scoped_array.hpp>
32 #include <boost/scoped_ptr.hpp>
33 
34 #include "logger.h"
35 #include "cacheutils.h"
36 
37 #include "we_chunkmanager.h"
38 
39 #include "we_macro.h"
40 #include "we_brm.h"
41 #include "we_config.h"
42 #include "we_confirmhdfsdbfile.h"
43 #include "we_fileop.h"
44 #include "../dictionary/we_dctnry.h"
45 #include "we_stats.h"
46 using namespace execplan;
47 
48 #include "IDBDataFile.h"
49 #include "IDBPolicy.h"
50 #include "cloudio/SMFileSystem.h"
51 using namespace idbdatafile;
52 
53 namespace
54 {
55 
56 // Function to compare 2 ChunkData pointers.
chunkDataPtrLessCompare(WriteEngine::ChunkData * p1,WriteEngine::ChunkData * p2)57 bool chunkDataPtrLessCompare(WriteEngine::ChunkData* p1, WriteEngine::ChunkData* p2)
58 {
59     return (p1->fChunkId) < (p2->fChunkId);
60 }
61 
62 }
63 
64 namespace WriteEngine
65 {
66 
67 extern int NUM_BLOCKS_PER_INITIAL_EXTENT; // defined in we_dctnry.cpp
68 extern WErrorCodes ec;                    // defined in we_log.cpp
69 
70 const int COMPRESSED_CHUNK_SIZE = compress::IDBCompressInterface::maxCompressedSize(UNCOMPRESSED_CHUNK_SIZE) + 64 + 3 + 8 * 1024;
71 
72 //------------------------------------------------------------------------------
73 // Search for the specified chunk in fChunkList.
74 //------------------------------------------------------------------------------
findChunk(int64_t id) const75 ChunkData* CompFileData::findChunk(int64_t id) const
76 {
77     ChunkData* pChunkData = NULL;
78 
79     for (list<ChunkData*>::const_iterator lit = fChunkList.begin(); lit != fChunkList.end(); ++lit)
80     {
81         if ((*lit)->fChunkId == id)
82         {
83             pChunkData = *lit;
84             break;
85         }
86     }
87 
88     return pChunkData;
89 }
90 
91 //------------------------------------------------------------------------------
92 // ChunkManager constructor
93 //------------------------------------------------------------------------------
ChunkManager()94 ChunkManager::ChunkManager() : fMaxActiveChunkNum(100), fLenCompressed(0), fIsBulkLoad(false),
95     fDropFdCache(false), fIsInsert(false), fIsHdfs(IDBPolicy::useHdfs()),
96     fFileOp(0), fSysLogger(NULL), fTransId(-1),
97     fLocalModuleId(Config::getLocalModuleID()),
98     fFs(fIsHdfs ?
99         IDBFileSystem::getFs(IDBDataFile::HDFS) :
100         IDBPolicy::useCloud() ?
101             IDBFileSystem::getFs(IDBDataFile::CLOUD) :
102             IDBFileSystem::getFs(IDBDataFile::BUFFERED))
103 {
104     fUserPaddings = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
105     fCompressor.numUserPaddingBytes(fUserPaddings);
106     fMaxCompressedBufSize = COMPRESSED_CHUNK_SIZE + fUserPaddings;
107     fBufCompressed = new char[fMaxCompressedBufSize];
108     fSysLogger = new logging::Logger(SUBSYSTEM_ID_WE);
109     logging::MsgMap msgMap;
110     msgMap[logging::M0080] = logging::Message(logging::M0080);
111     fSysLogger->msgMap( msgMap );
112 }
113 
114 //------------------------------------------------------------------------------
115 // ChunkManager destructor
116 //------------------------------------------------------------------------------
~ChunkManager()117 ChunkManager::~ChunkManager()
118 {
119     std::map<FID, FID> columnOids;
120     cleanUp(columnOids);
121 
122     delete [] fBufCompressed;
123     fBufCompressed = NULL;
124 
125     delete fSysLogger;
126     fSysLogger = NULL;
127 }
128 
129 
130 //------------------------------------------------------------------------------
131 // Log a message into the DML recovery log.
132 //------------------------------------------------------------------------------
writeLog(TxnID txnId,string backUpFileType,string filename,string & aDMLLogFileName,int64_t size,int64_t offset) const133 int ChunkManager::writeLog(TxnID txnId, string backUpFileType, string filename,
134                            string& aDMLLogFileName, int64_t size, int64_t offset) const
135 {
136     //Get log file name
137     if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
138         return ERR_DML_LOG_NAME;
139 
140     //Open file
141     boost::scoped_ptr<IDBDataFile> aDMLLogFile;
142 
143     try
144     {
145         aDMLLogFile.reset(IDBDataFile::open(
146                               IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
147                               aDMLLogFileName.c_str(), "a+b", 0));
148 
149         if (!aDMLLogFile)
150         {
151             ostringstream oss;
152             oss << "trans " << txnId << ":File " << aDMLLogFileName
153                 << " can't be opened (no exception thrown)";
154             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
155             return ERR_OPEN_DML_LOG;
156         }
157     }
158     catch (exception& e)
159     {
160         ostringstream oss;
161         oss << "trans " << txnId << ":File " << aDMLLogFileName
162             << " can't be opened: " << e.what();
163         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
164         return ERR_OPEN_DML_LOG;
165     }
166 
167     //Write the log
168     ostringstream entry;
169     entry << backUpFileType << '\n' << filename << '\n' << size << '\n' << offset << '\n';
170     string tmp = entry.str();
171     aDMLLogFile->seek(0, SEEK_END);
172     aDMLLogFile->tell();
173     aDMLLogFile->write(tmp.c_str(), tmp.size());
174 
175     return NO_ERROR;
176 }
177 
removeBackups(TxnID txnId)178 int ChunkManager::removeBackups(TxnID txnId)
179 {
180     // HDFS update/delete is handled differently
181     if (fIsHdfs || fIsBulkLoad)
182         return NO_ERROR;
183 
184     string aDMLLogFileName;
185 
186     if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
187         return ERR_DML_LOG_NAME;
188 
189     if (IDBPolicy::exists(aDMLLogFileName.c_str()))
190     {
191         boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
192                     IDBPolicy::getType(aDMLLogFileName.c_str(),
193                                        IDBPolicy::WRITEENG),
194                     aDMLLogFileName.c_str(), "r", 0));
195 
196         if (aDMLLogFile) //need recover
197         {
198             ssize_t fileSize = aDMLLogFile->size();
199             boost::scoped_array<char> buf(new char[fileSize]);
200 
201             if (aDMLLogFile->read(buf.get(), fileSize) != fileSize)
202                 return ERR_FILE_READ;
203 
204             std::istringstream strstream(string(buf.get(), fileSize));
205             std::string backUpFileType;
206             std::string filename;
207             int64_t size;
208             int64_t offset;
209 
210             while (strstream >> backUpFileType >> filename >> size >> offset)
211             {
212                 if (backUpFileType.compare("tmp") == 0 )
213                 {
214                     filename += ".tmp";
215                     IDBPolicy::remove(filename.c_str());
216                 }
217                 else
218                 {
219                     std::string backFileName(filename);
220 
221                     if (backUpFileType.compare("chk") == 0 )
222                         backFileName += ".chk";
223                     else
224                         backFileName += ".hdr";
225 
226                     IDBPolicy::remove(backFileName.c_str());
227                 }
228             }
229 
230             aDMLLogFile.reset();  // closes the file in IDBDataFile destructor.
231 
232             IDBPolicy::remove(aDMLLogFileName.c_str());
233         }
234         else
235         {
236             return ERR_OPEN_DML_LOG;
237         }
238     }
239 
240     return NO_ERROR;
241 }
242 
243 //------------------------------------------------------------------------------
244 // Get/Return IDBDataFile* for specified OID, root, partition, and segment.
245 // Function is to be used to open column files.
246 // If the IDBDataFile* is not found, then a segment file will be opened using the
247 // mode (mode) and I/O buffer size (size) that is given.  Name of the resulting
248 // file is returned in filename.
249 //
250 // For Bulk HDFS usage:
251 //  If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
252 //------------------------------------------------------------------------------
253 // @bug 5572 - HDFS usage: add *.tmp file backup flag
getFilePtr(const Column & column,uint16_t root,uint32_t partition,uint16_t segment,string & filename,const char * mode,int size,bool useTmpSuffix) const254 IDBDataFile* ChunkManager::getFilePtr(const Column& column,
255                                       uint16_t root,
256                                       uint32_t partition,
257                                       uint16_t segment,
258                                       string& filename,
259                                       const char* mode,
260                                       int size,
261                                       bool useTmpSuffix) const
262 {
263     CompFileData* fileData = getFileData(column.dataFile.fid, root, partition, segment,
264                                          filename, mode, size, column.colDataType, column.colWidth, useTmpSuffix);
265     return (fileData ? fileData->fFilePtr : NULL);
266 }
267 
268 //------------------------------------------------------------------------------
269 // Get/Return IDBDataFile* for specified OID, root, partition, and segment.
270 // Function is to be used to open dictionary store files.
271 // If the IDBDataFile* is not found, then a segment file will be opened using the
272 // mode (mode) and I/O buffer size (size) that is given.  Name of the resulting
273 // file is returned in filename.
274 //
275 // For Bulk HDFS usage:
276 //  If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
277 //------------------------------------------------------------------------------
278 // @bug 5572 - HDFS usage: add *.tmp file backup flag
getFilePtr(const FID & fid,uint16_t root,uint32_t partition,uint16_t segment,string & filename,const char * mode,int size,bool useTmpSuffix) const279 IDBDataFile* ChunkManager::getFilePtr(const FID& fid,
280                                       uint16_t root,
281                                       uint32_t partition,
282                                       uint16_t segment,
283                                       string& filename,
284                                       const char* mode,
285                                       int size,
286                                       bool useTmpSuffix) const
287 {
288     CompFileData* fileData = getFileData(fid, root, partition, segment, filename, mode, size,
289                                          CalpontSystemCatalog::VARCHAR, 8, useTmpSuffix, true);  // hard code (varchar, 8) are dummy values for dictionary file
290     return (fileData ? fileData->fFilePtr : NULL);
291 }
292 
293 //------------------------------------------------------------------------------
294 // Get/Return CompFileData* for specified column OID, root, partition, and
295 // segment.  If the IDBDataFile* is not found, then a segment file will be opened
296 // using the mode (mode) and I/O buffer size (size) that is given.  Name of
297 // the resulting file is returned in filename.
298 // If the CompFileData* needs to be created, it will also be created and
299 // inserted into the fFileMap and fFilePtrMap for later use.
300 //
301 // For Bulk HDFS usage:
302 //  If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
303 //------------------------------------------------------------------------------
304 // @bug 5572 - HDFS usage: add *.tmp file backup flag
getFileData(const FID & fid,uint16_t root,uint32_t partition,uint16_t segment,string & filename,const char * mode,int size,const CalpontSystemCatalog::ColDataType colDataType,int colWidth,bool useTmpSuffix,bool dctnry) const305 CompFileData* ChunkManager::getFileData(const FID& fid,
306                                         uint16_t root,
307                                         uint32_t partition,
308                                         uint16_t segment,
309                                         string& filename,
310                                         const char* mode,
311                                         int size,
312                                         const CalpontSystemCatalog::ColDataType colDataType,
313                                         int colWidth,
314                                         bool useTmpSuffix,
315                                         bool dctnry) const
316 {
317     FileID fileID(fid, root, partition, segment);
318     map<FileID, CompFileData*>::const_iterator mit = fFileMap.find(fileID);
319 
320     WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition
321                 << " seg:" << segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ")
322                 << "found." << endl;)
323 
324     // Get CompFileData pointer for existing Column or Dictionary store file
325     if (mit != fFileMap.end())
326     {
327         filename = mit->second->fFileName;
328         return mit->second;
329     }
330 
331     // New CompFileData pointer needs to be created
332     char name[FILE_NAME_SIZE];
333 
334     if (fFileOp->getFileName(fid, name, root, partition, segment) != NO_ERROR)
335         return NULL;
336 
337     CompFileData* fileData = new CompFileData(fileID, fid, colDataType, colWidth);
338     fileData->fFileName = filename = name;
339 
340     if (openFile(fileData, mode, colWidth, useTmpSuffix, __LINE__) != NO_ERROR)
341     {
342         WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;)
343         delete fileData;
344         return NULL;
345     }
346 
347     fileData->fIoBuffer.reset(new char[size]);
348     fileData->fIoBSize = size;
349 // TODO-There is no current way to make this setvbuf call as IDBDataFile only
350 // accepts the USE_VBUF at construction time and then uses a buffer that it manages
351 // Can either propagate an option through the openFile() call above and let
352 // IDBDataFile manage it internally or expose a new setBuffer() option.
353 //  setvbuf(fileData->fFilePtr, fileData->fIoBuffer.get(), _IOFBF, size);
354     fileData->fDctnryCol = dctnry;
355     WE_COMP_DBG(cout << "open file* " << name << endl;)
356 
357     // get the control data in header.
358     if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData,
359                  COMPRESSED_FILE_HEADER_UNIT, __LINE__) != NO_ERROR)
360     {
361         WE_COMP_DBG(cout << "Failed to read control header." << endl;)
362         delete fileData;
363         return NULL;
364     }
365 
366     // make sure the header is valid
367     if (fCompressor.verifyHdr(fileData->fFileHeader.fControlData) != 0)
368     {
369         WE_COMP_DBG(cout << "Invalid header." << endl;)
370         delete fileData;
371         return NULL;
372     }
373 
374     int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
375     int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
376 
377     if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT)
378     {
379         // >8K header, dictionary width > 128
380         fileData->fFileHeader.fPtrSection = new char[ptrSecSize];
381         fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection);
382     }
383 
384     // read in the pointer section in header
385     if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection,
386                  ptrSecSize, __LINE__) != NO_ERROR)
387     {
388         WE_COMP_DBG(cout << "Failed to read pointer header." << endl;)
389         delete fileData;
390         return NULL;
391     }
392 
393     fFileMap.insert(make_pair(fileID, fileData));
394     //cout << "Insert into fFilemap root:partition:seg:fileID = " <<root<<":"<< partition<<":"<< segment<<":"<<fid<<endl;
395     fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
396     return fileData;
397 }
398 
399 //------------------------------------------------------------------------------
400 // Return new IDBDataFile* for specified dictionary OID, root, partition, segment, and
401 // width.  A new segment file will be opened using the mode (mode) and I/O
402 // buffer size (size) that is given.  Name of the resulting file is returned
403 // in filename.
404 // A corresponding CompFileData* is created and inserted into fFileMap and
405 // fFilePtrMap for later use.
406 //------------------------------------------------------------------------------
createDctnryFile(const FID & fid,int64_t width,uint16_t root,uint32_t partition,uint16_t segment,const char * filename,const char * mode,int size)407 IDBDataFile* ChunkManager::createDctnryFile(const FID& fid,
408         int64_t width,
409         uint16_t root,
410         uint32_t partition,
411         uint16_t segment,
412         const char* filename,
413         const char* mode,
414         int size)
415 {
416     FileID fileID(fid, root, partition, segment);
417     CompFileData* fileData = new CompFileData(fileID, fid, CalpontSystemCatalog::VARCHAR, width);
418     fileData->fFileName = filename;
419 
420     if (openFile(fileData, mode, width, false, __LINE__) != NO_ERROR) // @bug 5572 HDFS tmp file
421     {
422         WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;)
423         delete fileData;
424         return NULL;
425     }
426 
427     fileData->fIoBuffer.reset(new char[size]);
428     fileData->fIoBSize = size;
429 //  see TODO- comment above
430 //  setvbuf(fileData->fFilePtr, fileData->fIoBuffer.get(), _IOFBF, size);
431     fileData->fDctnryCol = true;
432     WE_COMP_DBG(cout << "create file* " << filename << endl;)
433     int hdrSize = calculateHeaderSize(width);
434     int ptrSecSize = hdrSize - COMPRESSED_FILE_HEADER_UNIT;
435 
436     if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT)
437     {
438         // >8K header, dictionary width > 128
439         fileData->fFileHeader.fPtrSection = new char[ptrSecSize];
440         fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection);
441     }
442 
443     fCompressor.initHdr(fileData->fFileHeader.fControlData, fileData->fFileHeader.fPtrSection,
444                         fFileOp->compressionType(), hdrSize);
445 
446     if (writeHeader(fileData, __LINE__) != NO_ERROR)
447     {
448         WE_COMP_DBG(cout << "Failed to write header." << endl;)
449         delete fileData;
450         return NULL;
451     }
452 
453     //@Bug 4977 remove log file
454     removeBackups(fTransId);
455     fFileMap.insert(make_pair(fileID, fileData));
456     fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
457     return fileData->fFilePtr;
458 }
459 
460 //------------------------------------------------------------------------------
461 // Read the block for the specified fbo, from pFile's applicable chunk, and
462 // into readBuf.
463 //------------------------------------------------------------------------------
readBlock(IDBDataFile * pFile,unsigned char * readBuf,uint64_t fbo)464 int ChunkManager::readBlock(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo)
465 {
466     map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
467 
468     if (fpIt == fFilePtrMap.end())
469     {
470         logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
471         return ERR_COMP_FILE_NOT_FOUND;
472     }
473 
474     // find the chunk ID and offset in the chunk
475     lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
476     ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
477 
478     WE_COMP_DBG(cout << "fbo:" << fbo << "  chunk id:" << offset.quot << " offset:" << offset.rem
479                 << "  chunkData*:" << chunkData << endl;)
480 
481     int rc = NO_ERROR;
482 
483     // chunk is not already uncompressed
484     if (chunkData == NULL)
485         rc = fetchChunkFromFile(pFile, offset.quot, chunkData);
486 
487     if (rc == NO_ERROR)
488     {
489         // copy the data at fbo to readBuf
490         memcpy(readBuf, chunkData->fBufUnCompressed + offset.rem, BYTE_PER_BLOCK);
491     }
492 
493     return rc;
494 }
495 
496 //------------------------------------------------------------------------------
497 // Write writeBuf to the block for the specified fbo, within pFile's applicable
498 // chunk.
499 //------------------------------------------------------------------------------
saveBlock(IDBDataFile * pFile,const unsigned char * writeBuf,uint64_t fbo)500 int ChunkManager::saveBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo)
501 {
502     WE_COMP_DBG(cout << "save block fbo:" << fbo << endl;)
503     map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
504 
505     if (fpIt == fFilePtrMap.end())
506     {
507         logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
508         return ERR_COMP_FILE_NOT_FOUND;
509     }
510 
511     // find the chunk ID and offset in the chunk
512     lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
513     ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
514 
515     int rc = NO_ERROR;
516 
517     // chunk is not already read in
518     if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, offset.quot, chunkData)) != NO_ERROR))
519         return rc;
520 
521     WE_COMP_DBG(cout << "fbo:" << fbo << "  chunk id:" << offset.quot << " offset:" << offset.rem
522                 << " saved @" << (&(chunkData->fBufUnCompressed) + offset.rem) << endl;)
523 
524     memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK);
525     chunkData->fWriteToFile = true;
526 
527 // if the chunk is full for insert, flush it
528 //cout << "current offset.rem/8192 = " << offset.rem/8192 << endl;
529     if (fIsInsert && (offset.rem == MAXOFFSET_PER_CHUNK))
530     {
531         if (((rc = writeChunkToFile(fpIt->second, chunkData)) == NO_ERROR) &&
532                 ((rc = writeHeader(fpIt->second, __LINE__)) == NO_ERROR))
533         {
534             //cout << "saveblock flushed the full chunk"<<endl;
535             pFile->flush();
536 
537             //@Bug 4977 remove log file
538             removeBackups(fTransId);
539         }
540     }
541 
542     return rc;
543 }
544 
545 //------------------------------------------------------------------------------
546 // Flush all pending chunks to their corresponding segment files.
547 //------------------------------------------------------------------------------
flushChunks(int rc,const std::map<FID,FID> & columOids)548 int ChunkManager::flushChunks(int rc, const std::map<FID, FID>& columOids)
549 {
550     // shall fail the the statement if failed here
551     WE_COMP_DBG(cout << "flushChunks." << endl;)
552 
553     int k = fFilePtrMap.size();
554     std::map<FID, FID>::const_iterator it;
555 
556     if ((rc == NO_ERROR) && fIsInsert)
557     {
558         while (k-- > 0 && rc == NO_ERROR)
559         {
560             map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
561             // sort the chunk list first
562             CompFileData* fileData = i->second;
563             it = columOids.find (fileData->fFid);
564 
565             if (it != columOids.end())
566             {
567                 list<ChunkData*>& chunkList = fileData->fChunkList;
568                 chunkList.sort(chunkDataPtrLessCompare);
569                 list<ChunkData*>::iterator j = chunkList.begin();
570 
571                 while (j != chunkList.end())
572                 {
573                     if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR)
574                         break;
575 
576                     // write chunk to file removes the written chunk from the list
577                     j = chunkList.begin();
578                 }
579 
580                 if (rc != NO_ERROR)
581                     break;
582 
583                 // finally update the header
584                 if ((rc = writeHeader(fileData, __LINE__)) != NO_ERROR)
585                     break;
586 
587                 //@Bug 4977 remove log file
588                 removeBackups(fTransId);
589 
590                 // closeFile invalidates the iterator
591                 closeFile(fileData);
592             }
593         }
594     }
595     else if (rc == NO_ERROR)
596     {
597         while (k-- > 0 && rc == NO_ERROR)
598         {
599             map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
600             // sort the chunk list first
601             CompFileData* fileData = i->second;
602 
603             list<ChunkData*>& chunkList = fileData->fChunkList;
604             chunkList.sort(chunkDataPtrLessCompare);
605             list<ChunkData*>::iterator j = chunkList.begin();
606 
607             while (j != chunkList.end())
608             {
609                 if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR)
610                     break;
611 
612                 // write chunk to file removes the written chunk from the list
613                 j = chunkList.begin();
614             }
615 
616             if (rc != NO_ERROR)
617                 break;
618 
619             // finally update the header
620             if ((rc = writeHeader(fileData, __LINE__)) != NO_ERROR)
621                 break;
622 
623             //@Bug 4977 remove log file
624             removeBackups(fTransId);
625 
626             // closeFile invalidates the iterator
627             closeFile(fileData);
628         }
629     }
630 
631     if (rc != NO_ERROR)
632     {
633         cleanUp(columOids);
634         return rc;
635     }
636 
637     //fActiveChunks.clear();
638     //fFileMap.clear();
639     //fFilePtrMap.clear();
640 
641     if (fDropFdCache)
642     {
643         cacheutils::dropPrimProcFdCache();
644         fDropFdCache = false;
645     }
646 
647     return NO_ERROR;
648 }
649 
650 //------------------------------------------------------------------------------
651 // Load and uncompress the requested chunk (id) for the specified file (pFile),
652 // into chunkData.
653 // id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE
654 // If the active chunk list is already full, then we flush the oldest pending
655 // chunk to disk, to make room for fetching the requested chunk.
656 // If the header ptr for the requested chunk is 0 (or has length 0), then
657 // chunkData is initialized with a new empty chunk.
658 //------------------------------------------------------------------------------
fetchChunkFromFile(IDBDataFile * pFile,int64_t id,ChunkData * & chunkData)659 int ChunkManager::fetchChunkFromFile(IDBDataFile* pFile, int64_t id, ChunkData*& chunkData)
660 {
661     // return value
662     int rc = NO_ERROR;
663 
664     // remove the oldest one if the max active chunk number is reached.
665     WE_COMP_DBG(cout << "fActiveChunks.size:" << fActiveChunks.size() << endl;)
666     //cout << "fetchChunkFromFile1: pFile = " << pFile << endl;
667     map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
668 
669     if (fpIt == fFilePtrMap.end())
670     {
671         logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
672         return ERR_COMP_FILE_NOT_FOUND;
673     }
674 
675     CompFileData* fileData = fpIt->second;
676 
677     if (fActiveChunks.size() >= fMaxActiveChunkNum)
678     {
679         list<std::pair<FileID, ChunkData*> >::iterator lIt = fActiveChunks.begin();
680 
681         if (!fIsBulkLoad && !(fpIt->second->fDctnryCol))
682         {
683             while ((lIt->first == fpIt->second->fFileID) && (lIt != fActiveChunks.end()))
684                 lIt++;
685         }
686 
687         if (lIt != fActiveChunks.end())
688         {
689             map<FileID, CompFileData*>::iterator fIt = fFileMap.find(lIt->first);
690 
691             if (fIt == fFileMap.end())
692             {
693                 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
694                 return ERR_COMP_FILE_NOT_FOUND;
695             }
696 
697             if ((rc = writeChunkToFile(fIt->second, lIt->second)) != NO_ERROR)
698             {
699                 ostringstream oss;
700                 oss << "write inactive chunk to file failed:" << fIt->second->fFileName << "@"
701                     << __LINE__;
702                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
703                 return rc;
704             }
705 
706             if ((rc = writeHeader(fIt->second, __LINE__)) != NO_ERROR)
707             {
708                 // logged by writeHeader
709                 return rc;
710             }
711 
712             //@Bug 4977 remove the log files
713             removeBackups(fTransId);
714         }
715     }
716 
717 #ifdef PROFILE
718     Stats::startParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF);
719 #endif
720     // get a new ChunkData object
721     chunkData = new ChunkData(id);
722     pFile = fileData->fFilePtr; //update to get the reopened file ptr.
723     fileData->fChunkList.push_back(chunkData);
724     fActiveChunks.push_back(make_pair(fileData->fFileID, chunkData));
725 
726     // read the compressed chunk from file
727     uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
728 
729     if (ptrs[id] && ptrs[id + 1]) // compressed chunk data exists
730     {
731         // safety check
732         if (ptrs[id] >= ptrs[id + 1])
733         {
734             logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__);
735             return ERR_COMP_WRONG_PTR;
736         }
737 
738         unsigned int chunkSize = (ptrs[id + 1] - ptrs[id]);
739 
740         if ((rc = setFileOffset(pFile, fileData->fFileName, ptrs[id], __LINE__)) != NO_ERROR ||
741                 (rc = readFile(pFile, fileData->fFileName, fBufCompressed, chunkSize, __LINE__)) !=
742                 NO_ERROR)
743         {
744             // logged by setFileOffset/readFile
745             return rc;
746         }
747 
748         // uncompress the read in buffer
749         unsigned int dataLen = sizeof(chunkData->fBufUnCompressed);
750 
751         if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
752                                         (unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
753         {
754             if (fIsFix)
755             {
756                 uint64_t blocks = 512;
757 
758                 if (id == 0)
759                 {
760                     char* hdr = fileData->fFileHeader.fControlData;
761 
762                     if (fCompressor.getBlockCount(hdr) < 512)
763                         blocks = 256;
764                 }
765 
766                 dataLen = 8192 * blocks;
767 
768                 // load the uncompressed buffer with empty values.
769                 char* buf = chunkData->fBufUnCompressed;
770                 chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
771 
772                 if (fileData->fDctnryCol)
773                     initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
774                 else
775                     initializeColumnChunk(buf, fileData);
776             }
777             else
778             {
779                 logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
780                 return ERR_COMP_UNCOMPRESS;
781             }
782         }
783 
784 //@bug 3313-Remove validation that incorrectly fails for long string store files
785 //      WE_COMP_DBG(cout << "chunk uncompressed to " << dataLen << endl;)
786 //      if (dataLen < (id+1) * BYTE_PER_BLOCK)
787 //      {
788 //          logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
789 //          return ERR_COMP_UNCOMPRESS;
790 //      }
791 
792         chunkData->fLenUnCompressed = dataLen;
793     }
794     else  // new chunk
795     {
796         if (id == 0 && ptrs[id] == 0) // if the 1st ptr is not set for new extent
797         {
798             ptrs[0] = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
799         }
800 
801         // load the uncompressed buffer with empty values.
802         char* buf = chunkData->fBufUnCompressed;
803         chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
804 
805         if (fileData->fDctnryCol)
806             initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
807         else
808             initializeColumnChunk(buf, fileData);
809     }
810 
811 #ifdef PROFILE
812     Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF);
813 #endif
814 
815     return NO_ERROR;
816 }
817 
818 //------------------------------------------------------------------------------
819 // Initialize a column based chunk with the applicable empty values.
820 //------------------------------------------------------------------------------
initializeColumnChunk(char * buf,CompFileData * fileData)821 void ChunkManager::initializeColumnChunk(char* buf, CompFileData* fileData)
822 {
823     int size = UNCOMPRESSED_CHUNK_SIZE;
824     uint64_t emptyVal = fFileOp->getEmptyRowValue(fileData->fColDataType, fileData->fColWidth);
825     fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, fileData->fColWidth);
826 }
827 
828 //------------------------------------------------------------------------------
829 // Initialize a dictionary based chunk with empty blocks.
830 //------------------------------------------------------------------------------
initializeDctnryChunk(char * buf,int size)831 void ChunkManager::initializeDctnryChunk(char* buf, int size)
832 {
833     Dctnry* dctnry = dynamic_cast<Dctnry*>(fFileOp);
834     memset(buf, 0, size);
835     char* end = buf + size;
836 
837     while (buf < end)
838     {
839         dctnry->copyDctnryHeader(buf);
840         buf += BYTE_PER_BLOCK;
841     }
842 }
843 
844 //------------------------------------------------------------------------------
845 // Compress and write the requested chunk (id) for fileData, to disk.
846 // id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE
847 //------------------------------------------------------------------------------
writeChunkToFile(CompFileData * fileData,int64_t id)848 int ChunkManager::writeChunkToFile(CompFileData* fileData, int64_t id)
849 {
850     ChunkData* chunkData = fileData->findChunk(id);
851 
852     if (!chunkData)
853     {
854         logMessage(ERR_COMP_CHUNK_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
855         return ERR_COMP_CHUNK_NOT_FOUND;
856     }
857 
858     return writeChunkToFile(fileData, chunkData);
859 }
860 
861 //------------------------------------------------------------------------------
862 // Compress and write the given chunk for fileData to disk.
863 // If the chunk has been flagged for writing (fWriteToFile is true),
864 // then subsequent chunks in the file are shifted down as needed, if the com-
865 // pressed chunk will not fit in the currently available embedded free space.
866 //------------------------------------------------------------------------------
writeChunkToFile(CompFileData * fileData,ChunkData * chunkData)867 int ChunkManager::writeChunkToFile(CompFileData* fileData, ChunkData* chunkData)
868 {
869     WE_COMP_DBG(cout << "write chunk id=" << chunkData->fChunkId << " data "
870                 << ((chunkData->fWriteToFile) ? "changed" : "NOT changed") << endl;)
871 
872     int rc = NO_ERROR; // return value
873     bool needReallocateChunks = false;
874     int64_t spaceAvl = 0;
875 
876     if (chunkData->fWriteToFile)
877     {
878 #ifdef PROFILE
879         Stats::startParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS);
880 #endif
881         // compress the chunk before writing it to file
882         fLenCompressed = fMaxCompressedBufSize;
883 
884         if (fCompressor.compressBlock((char*)chunkData->fBufUnCompressed,
885                                       chunkData->fLenUnCompressed,
886                                       (unsigned char*)fBufCompressed,
887                                       fLenCompressed) != 0)
888         {
889             logMessage(ERR_COMP_COMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
890             return ERR_COMP_COMPRESS;
891         }
892 
893         WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to "
894                     << fLenCompressed;)
895 
896         // Removed padding code here, will add padding for the last chunk.
897         // The existing chunks are already correctly aligned, use the padding to absort chunk
898         // size increase when update.  This improves the performance with less chunk shifting.
899 
900 #ifdef PROFILE
901         Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS);
902 #endif
903 
904         // need more work if the new compressed buffer is larger
905         uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
906         ChunkId chunkId = chunkData->fChunkId;
907 
908         if (ptrs[chunkId + 1] > 0)
909             spaceAvl = (ptrs[chunkId + 1] - ptrs[chunkId]);
910 
911         WE_COMP_DBG(cout << ", available space:" << spaceAvl;)
912 
913         bool lastChunk = true;
914         // usable chunkIds are 0 .. POINTERS_IN_HEADER-2
915         // [chunkId+0] is the start offset of current chunk.
916         // [chunkId+1] is the start offset of next chunk, the offset diff is current chunk size.
917         // [chunkId+2] is 0 or not indicates if the next chunk exists.
918         int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
919         int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
920         int64_t usablePtrIds = (ptrSecSize / sizeof(uint64_t)) - 2;
921 
922         if (chunkId < usablePtrIds) // make sure [chunkId+2] has valid value
923             lastChunk = (ptrs[(chunkId + 2)] == 0);
924 
925         WE_COMP_DBG(cout << ", last chunk:" << (lastChunk ? "true" : "false") << endl;)
926 
927         if (spaceAvl < 0)
928         {
929             logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__);
930             return ERR_COMP_WRONG_PTR;
931         }
932 
933         if ((int64_t)fLenCompressed <= spaceAvl)
934         {
935             // There is enough sapce.
936             if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR)
937             {
938                 // log in writeCompressedChunk by setFileOffset and writeFile
939                 return rc;
940             }
941         }
942         else if (lastChunk)
943         {
944             // add padding space if the chunk is written first time
945             if (fCompressor.padCompressedChunks(
946                         (unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize) != 0)
947             {
948                 WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padding failed." << endl;)
949 
950                 logMessage(ERR_COMP_PAD_DATA, logging::LOG_TYPE_ERROR, __LINE__);
951                 return ERR_COMP_PAD_DATA;
952             }
953 
954             WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padded to " << fLenCompressed;)
955 
956             // This is the last chunk, safe to write any length of data.
957             //@Bug 3888. Assign the error code
958             if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR)
959             {
960                 // log in writeCompressedChunk by setFileOffset and writeFile
961                 return rc;
962             }
963 
964             // Update the current chunk size.
965             ptrs[chunkId + 1] = ptrs[chunkId] + fLenCompressed;
966         }
967         else
968         {
969             needReallocateChunks = true;
970         }
971     }
972 
973     if (!needReallocateChunks)
974     {
975         fActiveChunks.remove(make_pair(fileData->fFileID, chunkData));
976         fileData->fChunkList.remove(chunkData);
977         delete chunkData;
978     }
979     else
980     {
981         ostringstream oss;
982         oss << "Compressed data does not fit, caused a chunk shifting @line:" << __LINE__
983             << " filename:" << fileData->fFileName << ", chunkId:" << chunkData->fChunkId
984             << " data size:" << fLenCompressed << "/available:" << spaceAvl << " -- shifting ";
985 
986         if ((rc = reallocateChunks(fileData)) == NO_ERROR)
987         {
988             oss << "SUCCESS";
989             logMessage(oss.str(), logging::LOG_TYPE_INFO);
990         }
991         else
992         {
993             oss << "FAILED";
994             logMessage(oss.str(), logging::LOG_TYPE_CRITICAL);
995         }
996     }
997 
998     return rc;
999 }
1000 
1001 //------------------------------------------------------------------------------
1002 // Write the current compressed data in fBufCompressed to the specified segment
1003 // file offset (offset) and file (fileData).  For DML usage, "size" specifies
1004 // how many bytes to backup, for error recovery.  cpimport.bin does it's own
1005 // backup and error recovery, so "size" is not applicable for bulk import usage.
1006 //------------------------------------------------------------------------------
writeCompressedChunk(CompFileData * fileData,int64_t offset,int64_t size)1007 int ChunkManager::writeCompressedChunk(CompFileData* fileData, int64_t offset, int64_t size)
1008 {
1009     int rc = NO_ERROR;
1010 
1011     if (!fIsBulkLoad && !fIsHdfs)
1012     {
1013         // backup current chunk to chk file
1014         string chkFileName(fileData->fFileName + ".chk");
1015         string aDMLLogFileName;
1016         unsigned char* buf = new unsigned char[size];
1017 
1018         if (((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__)) ==
1019                 NO_ERROR)
1020                 &&
1021                 ((rc = readFile(fileData->fFilePtr, fileData->fFileName, buf, size, __LINE__)) ==
1022                  NO_ERROR))
1023         {
1024             IDBDataFile*  chkFilePtr = IDBDataFile::open(
1025                                            IDBPolicy::getType(chkFileName.c_str(),
1026                                                    IDBPolicy::WRITEENG),
1027                                            chkFileName.c_str(),
1028                                            "w+b",
1029                                            0 );
1030 
1031             if (chkFilePtr)
1032             {
1033                 rc = writeFile(chkFilePtr, chkFileName, buf, size, __LINE__);
1034                 delete chkFilePtr;
1035             }
1036 
1037             delete [] buf;
1038 
1039             if (rc != NO_ERROR)
1040             {
1041                 IDBPolicy::remove(chkFileName.c_str());
1042                 return rc;
1043             }
1044 
1045             // log the chunk information for recovery
1046             rc = writeLog(fTransId, "chk", fileData->fFileName, aDMLLogFileName, size, offset);
1047 
1048             if (rc != NO_ERROR)
1049             {
1050                 ostringstream oss;
1051                 oss << "log " << fileData->fFileName << ".chk to DML logfile failed.";
1052                 logMessage(oss.str(), logging::LOG_TYPE_INFO);
1053                 return rc;
1054             }
1055         }
1056 
1057         // write out the compressed data + padding
1058         if ((rc == NO_ERROR) && ((rc = writeCompressedChunk_(fileData, offset)) == NO_ERROR))
1059         {
1060             if ((fileData->fFilePtr)->flush() != 0) //@Bug3162.
1061             {
1062                 rc = ERR_FILE_WRITE;
1063                 ostringstream oss;
1064                 oss << "Failed to flush " << fileData->fFileName << " @line: " << __LINE__;
1065                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1066             }
1067         }
1068     }
1069     else
1070     {
1071         // write out the compressed data + padding
1072         rc = writeCompressedChunk_(fileData, offset);
1073     }
1074 
1075     return rc;
1076 }
1077 
1078 //------------------------------------------------------------------------------
1079 // Actually write the current compressed data in fBufCompressed to the specified
1080 // segment file offset (offset) and file (fileData)
1081 //------------------------------------------------------------------------------
writeCompressedChunk_(CompFileData * fileData,int64_t offset)1082 inline int ChunkManager::writeCompressedChunk_(CompFileData* fileData, int64_t offset)
1083 {
1084     int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__);
1085 
1086     if (rc != NO_ERROR)
1087         return rc;
1088 
1089     return writeFile(fileData->fFilePtr, fileData->fFileName,
1090                      fBufCompressed, fLenCompressed, __LINE__);
1091 }
1092 
1093 //------------------------------------------------------------------------------
1094 // Open the specified segment file (fileData) using the given mode.
1095 // ln is the source code line number of the code invoking this operation
1096 // (ex __LINE__); this is used for logging error messages.
1097 //
1098 // useTmpSuffix controls whether HDFS file is opened with USE_TMPFILE bit set.
1099 // Typically set for bulk load, single insert, and batch insert, when adding
1100 // rows to an "existing" file.
1101 // Typically always set for DML update and delete.
1102 //
1103 // @bug 5572 - HDFS usage: add *.tmp file backup flag to API
1104 //------------------------------------------------------------------------------
openFile(CompFileData * fileData,const char * mode,int colWidth,bool useTmpSuffix,int ln) const1105 int ChunkManager::openFile(CompFileData* fileData, const char* mode, int colWidth,
1106                            bool useTmpSuffix, int ln) const
1107 {
1108     int rc = NO_ERROR;
1109     unsigned opts = IDBDataFile::USE_VBUF;
1110 
1111     if (fIsHdfs)
1112     {
1113         if (useTmpSuffix)
1114         {
1115             if (!fIsBulkLoad)
1116             {
1117                 // keep a DML log for confirm or cleanup the .tmp file
1118                 string aDMLLogFileName;
1119 
1120                 if ((rc = writeLog(fTransId, "tmp", fileData->fFileName,
1121                                    aDMLLogFileName, 0)) != NO_ERROR)
1122                 {
1123                     ostringstream oss;
1124                     oss << "Failed to put " << fileData->fFileName << " into DML log.";
1125                     logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1126 
1127                     return rc;
1128                 }
1129             }
1130 
1131             opts |= IDBDataFile::USE_TMPFILE;
1132         }
1133     }
1134 
1135     fileData->fFilePtr = IDBDataFile::open(
1136                              IDBPolicy::getType( fileData->fFileName.c_str(), IDBPolicy::WRITEENG ),
1137                              fileData->fFileName.c_str(), mode, opts, colWidth);
1138 
1139     if (fileData->fFilePtr == NULL)
1140     {
1141         ostringstream oss;
1142         oss << "Failed to open compressed data file " << fileData->fFileName << " @line: " << ln;
1143         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1144         rc = ERR_COMP_OPEN_FILE;
1145     }
1146 
1147     return rc;
1148 }
1149 
1150 //------------------------------------------------------------------------------
1151 // Set the file offset for the specified segment file (fileData) using the
1152 // given offset.
1153 // ln is the source code line number of the code invoking this operation
1154 // (ex __LINE__); this is used for logging error messages.  Likewise, filename
1155 // is used for logging any error message.
1156 //------------------------------------------------------------------------------
setFileOffset(IDBDataFile * pFile,const string & fileName,off64_t offset,int ln) const1157 int ChunkManager::setFileOffset(IDBDataFile* pFile, const string& fileName, off64_t offset, int ln) const
1158 {
1159     int rc = NO_ERROR;
1160 
1161     if (pFile->seek(offset, SEEK_SET) != 0) rc = ERR_COMP_SET_OFFSET;
1162 
1163     if (rc != NO_ERROR)
1164     {
1165         ostringstream oss;
1166         oss << "Failed to set offset in compressed data file " << fileName
1167             << " @line: " << ln << " offset:" << offset;
1168         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1169     }
1170 
1171     return rc;
1172 }
1173 
1174 //------------------------------------------------------------------------------
1175 // Read the requested number of bytes (size) from the specified file pFile.
1176 // ln is the source code line number of the code invoking this operation
1177 // (ex __LINE__); this is used for logging error messages.  Likewise, filename
1178 // is used for logging any error message.
1179 //------------------------------------------------------------------------------
readFile(IDBDataFile * pFile,const string & fileName,void * buf,size_t size,int ln) const1180 int ChunkManager::readFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const
1181 {
1182     size_t bytes = pFile->read(buf, size);
1183 
1184     if (bytes != size)
1185     {
1186         ostringstream oss;
1187         oss << "Failed to read from compressed data file " << fileName
1188             << " @line: " << ln << " read/expect:" << bytes << "/" << size;
1189         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1190         return ERR_COMP_READ_FILE;
1191     }
1192 
1193     return NO_ERROR;
1194 }
1195 
1196 //------------------------------------------------------------------------------
1197 // Write the requested number of bytes (size) to the specified file pFile.
1198 // ln is the source code line number of the code invoking this operation
1199 // (ex __LINE__); this is used for logging error messages.  Likewise, filename
1200 // is used for logging any error message.
1201 //------------------------------------------------------------------------------
writeFile(IDBDataFile * pFile,const string & fileName,void * buf,size_t size,int ln) const1202 int ChunkManager::writeFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const
1203 {
1204     size_t bytes = pFile->write(buf, size);
1205 
1206     if (bytes != size)
1207     {
1208         ostringstream oss;
1209         oss << "Failed to write to compressed data file " << fileName
1210             << " @line: " << ln << " written/expect:" << bytes << "/" << size;
1211         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1212         return ERR_COMP_WRITE_FILE;
1213     }
1214 
1215     return NO_ERROR;
1216 }
1217 
1218 //------------------------------------------------------------------------------
1219 // Close the specified segment file (fileData), and remove the
1220 // corresponding CompFileData reference from fFileMap and fFilePtrMap.
1221 //------------------------------------------------------------------------------
closeFile(CompFileData * fileData)1222 int ChunkManager::closeFile(CompFileData* fileData)
1223 {
1224     int rc = NO_ERROR;
1225 
1226     WE_COMP_DBG(cout << "closing file:" << fileData->fFileName << endl;)
1227     fFileMap.erase(fileData->fFileID);
1228     fFilePtrMap.erase(fileData->fFilePtr);
1229 
1230     if (fileData->fFilePtr)
1231         delete fileData->fFilePtr;
1232 
1233     delete fileData;
1234     fileData = NULL;
1235 
1236     return rc;
1237 }
1238 
1239 //------------------------------------------------------------------------------
1240 // Write the chunk pointers headers for the specified file (fileData).
1241 // ln is the source code line number of the code invoking this operation
1242 // (ex __LINE__); this is used for logging error messages.  For DML usage,
1243 // backup for recovery is also performed.  This step is skipped for cpimport.bin
1244 // as bulk import performs its own backup and recovery operations.
1245 //------------------------------------------------------------------------------
writeHeader(CompFileData * fileData,int ln)1246 int ChunkManager::writeHeader(CompFileData* fileData, int ln)
1247 {
1248     int rc = NO_ERROR;
1249     int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
1250     int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
1251 
1252     if (!fIsHdfs && !fIsBulkLoad)
1253     {
1254         // write a backup header
1255         string hdrFileName(fileData->fFileName + ".hdr");
1256         string aDMLLogFileName;
1257         IDBDataFile*  hdrFilePtr = IDBDataFile::open(
1258                                        IDBPolicy::getType(hdrFileName.c_str(),
1259                                                IDBPolicy::WRITEENG),
1260                                        hdrFileName.c_str(),
1261                                        "w+b",
1262                                        0,
1263                                        fileData->fColWidth );
1264 
1265         if (hdrFilePtr)
1266         {
1267             rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fControlData,
1268                            COMPRESSED_FILE_HEADER_UNIT, __LINE__);
1269 
1270             if (rc == NO_ERROR)
1271                 rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fPtrSection,
1272                                ptrSecSize, __LINE__);
1273 
1274             delete hdrFilePtr;
1275         }
1276 
1277         if (rc == NO_ERROR)
1278         {
1279             // log the header information for recovery
1280             rc = writeLog(fTransId, "hdr", fileData->fFileName, aDMLLogFileName, headerSize);
1281 
1282             if (rc != NO_ERROR)
1283             {
1284                 ostringstream oss;
1285                 oss << "log " << fileData->fFileName << ".hdr to DML logfile failed.";
1286                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1287             }
1288 
1289             if ((rc == NO_ERROR) && (rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR)
1290             {
1291                 (fileData->fFilePtr)->flush();
1292             }
1293         }
1294         else
1295         {
1296             IDBPolicy::remove(hdrFileName.c_str());
1297         }
1298     }
1299     else
1300     {
1301         if ((rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR)
1302         {
1303             (fileData->fFilePtr)->flush();
1304         }
1305     }
1306 
1307     if (rc != NO_ERROR)
1308     {
1309         ostringstream oss;
1310         oss << "write header failed: " << fileData->fFileName << "call from line:" << ln;
1311         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1312     }
1313 
1314     return rc;
1315 }
1316 
1317 //------------------------------------------------------------------------------
1318 // Write the chunk pointers headers for the specified file (fileData).
1319 // ln is the source code line number of the code invoking this operation
1320 // (ex __LINE__); this is used for logging error messages.  For DML usage,
1321 // backup for recovery is also performed.  This step is skipped for cpimport.bin
1322 // as bulk import performs its own backup and recovery operations.
1323 //------------------------------------------------------------------------------
writeHeader_(CompFileData * fileData,int ptrSecSize)1324 inline int ChunkManager::writeHeader_(CompFileData* fileData, int ptrSecSize)
1325 {
1326     int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, 0, __LINE__);
1327 
1328     if (rc == NO_ERROR)
1329         rc = writeFile(fileData->fFilePtr, fileData->fFileName,
1330                        fileData->fFileHeader.fControlData,
1331                        COMPRESSED_FILE_HEADER_UNIT, __LINE__);
1332 
1333     if (rc == NO_ERROR)
1334         rc = writeFile(fileData->fFilePtr, fileData->fFileName,
1335                        fileData->fFileHeader.fPtrSection,
1336                        ptrSecSize, __LINE__);
1337 
1338     return rc;
1339 }
1340 
1341 //------------------------------------------------------------------------------
1342 // For the specified segment file (pFile), read in an abbreviated/compressed
1343 // chunk extent, uncompress, and expand to a full chunk for a full extent.
1344 //------------------------------------------------------------------------------
expandAbbrevColumnExtent(IDBDataFile * pFile,uint64_t emptyVal,int width)1345 int ChunkManager::expandAbbrevColumnExtent(IDBDataFile* pFile, uint64_t emptyVal, int width)
1346 {
1347     map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
1348 
1349     if (i == fFilePtrMap.end())
1350     {
1351         logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1352         return ERR_COMP_FILE_NOT_FOUND;
1353     }
1354 
1355     int rc = NO_ERROR;
1356     // fetch the initial chunk if not already done.
1357     ChunkData* chunkData = (i->second)->findChunk(0);
1358 
1359     if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR))
1360         return rc;
1361 
1362     // buf points to the end of existing data
1363     char* buf = chunkData->fBufUnCompressed + chunkData->fLenUnCompressed;
1364     int   size = UNCOMPRESSED_CHUNK_SIZE - chunkData->fLenUnCompressed;
1365     fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, width);
1366     chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
1367     chunkData->fWriteToFile = true;
1368     //(writeChunkToFile(i->second, chunkData));
1369     //(writeHeader(i->second, __LINE__));
1370     return NO_ERROR;
1371 }
1372 
1373 //------------------------------------------------------------------------------
1374 // For column segment file:
1375 // Increment the block count stored in the chunk header used to track how many
1376 // blocks are allocated to the corresponding segment file.
1377 //------------------------------------------------------------------------------
updateColumnExtent(IDBDataFile * pFile,int addBlockCount)1378 int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount)
1379 {
1380     map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
1381 
1382     if (i == fFilePtrMap.end())
1383     {
1384         logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1385         return ERR_COMP_FILE_NOT_FOUND;
1386     }
1387 
1388     CompFileData* pFileData = i->second;
1389 
1390     if (!pFileData)
1391     {
1392         logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1393         return ERR_COMP_FILE_NOT_FOUND;
1394     }
1395 
1396     int rc = NO_ERROR;
1397     char* hdr = pFileData->fFileHeader.fControlData;
1398     fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
1399     ChunkData* chunkData = (pFileData)->findChunk(0);
1400 
1401     if (chunkData != NULL)
1402     {
1403         if ((rc = writeChunkToFile(pFileData, chunkData)) == NO_ERROR)
1404         {
1405             rc = writeHeader(pFileData, __LINE__);
1406 
1407             if ( rc == NO_ERROR)
1408             {
1409                 //@Bug 4977 remove log files
1410                 removeBackups(fTransId);
1411             }
1412         }
1413         else
1414         {
1415             ostringstream oss;
1416             oss << "write chunk to file failed when updateColumnExtent: " << pFileData->fFileName;
1417             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1418         }
1419     }
1420 
1421     pFile->flush();
1422     return rc;
1423 }
1424 
1425 //------------------------------------------------------------------------------
1426 // For dictionary store segment file:
1427 // Increment the block count stored in the chunk header used to track how many
1428 // blocks are allocated to the corresponding segment file.
1429 //------------------------------------------------------------------------------
updateDctnryExtent(IDBDataFile * pFile,int addBlockCount)1430 int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount)
1431 {
1432     map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
1433 
1434     if (i == fFilePtrMap.end())
1435     {
1436         logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1437         return ERR_COMP_FILE_NOT_FOUND;
1438     }
1439 
1440     int rc = NO_ERROR;
1441     // fetch the initial chunk if not already done.
1442     ChunkData* chunkData = (i->second)->findChunk(0);
1443 
1444     if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR))
1445         return rc;  // logged by fetchChunkFromFile
1446 
1447     char* hdr = i->second->fFileHeader.fControlData;
1448     char* uncompressedBuf = chunkData->fBufUnCompressed;
1449     int currentBlockCount = fCompressor.getBlockCount(hdr);
1450 
1451     // Bug 3203, write out the compressed initial extent.
1452     if (currentBlockCount == 0)
1453     {
1454         int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK;
1455         initializeDctnryChunk(uncompressedBuf, initSize);
1456         chunkData->fWriteToFile = true;
1457 
1458         if ((rc = writeChunkToFile(i->second, chunkData)) == NO_ERROR)
1459         {
1460             rc = writeHeader(i->second, __LINE__);
1461 
1462             if ( rc == NO_ERROR)
1463             {
1464                 //@Bug 4977 remove the log file
1465                 removeBackups(fTransId);
1466             }
1467         }
1468         else
1469         {
1470             ostringstream oss;
1471             oss << "write chunk to file failed when updateDctnryExtent: " << i->second->fFileName;
1472             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1473         }
1474     }
1475     else if (currentBlockCount == NUM_BLOCKS_PER_INITIAL_EXTENT)
1476     {
1477         int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK;
1478         int incrSize = UNCOMPRESSED_CHUNK_SIZE - initSize;
1479         initializeDctnryChunk(uncompressedBuf + initSize, incrSize);
1480         uint64_t* ptrs = reinterpret_cast<uint64_t*>(i->second->fFileHeader.fPtrSection);
1481         ptrs[1] = 0;  // the compressed chunk size is unknown
1482     }
1483 
1484     if (rc == NO_ERROR)
1485         fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
1486 
1487     return rc;
1488 }
1489 
1490 //------------------------------------------------------------------------------
1491 // Close any open segment files, and free up memory.
1492 //------------------------------------------------------------------------------
cleanUp(const std::map<FID,FID> & columOids)1493 void ChunkManager::cleanUp(const std::map<FID, FID>& columOids)
1494 {
1495     WE_COMP_DBG(cout << "cleanUp with " << fActiveChunks.size() << " active chunk(s)." << endl;)
1496     std::map<FID, FID>::const_iterator it;
1497     map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
1498 
1499     while ( i != fFilePtrMap.end() )
1500     {
1501         CompFileData* fileData = i->second;
1502 
1503         it = columOids.find (fileData->fFid);
1504 
1505         if (fIsInsert && it != columOids.end())
1506         {
1507             list<ChunkData*>& chunks = fileData->fChunkList;
1508 
1509             for (list<ChunkData*>::iterator j = chunks.begin(); j != chunks.end(); ++j)
1510                 delete *j;
1511 
1512             delete fileData->fFilePtr;
1513             fFileMap.erase(fileData->fFileID);
1514             fFilePtrMap.erase(i++);
1515 
1516             delete fileData;
1517 
1518         }
1519         else if (!fIsInsert || (columOids.size() == 0))
1520         {
1521             list<ChunkData*>& chunks = fileData->fChunkList;
1522 
1523             for (list<ChunkData*>::iterator j = chunks.begin(); j != chunks.end(); ++j)
1524                 delete *j;
1525 
1526             delete fileData->fFilePtr;
1527             fFileMap.erase(fileData->fFileID);
1528             fFilePtrMap.erase(i++);
1529 
1530             delete fileData;
1531         }
1532         else
1533         {
1534             i++;
1535         }
1536     }
1537 
1538     if (fDropFdCache)
1539     {
1540         cacheutils::dropPrimProcFdCache();
1541         fDropFdCache = false;
1542     }
1543 }
1544 
1545 //------------------------------------------------------------------------------
1546 // Read "n" blocks from pFile starting at fbo, into readBuf.
1547 //------------------------------------------------------------------------------
readBlocks(IDBDataFile * pFile,unsigned char * readBuf,uint64_t fbo,size_t n)1548 int ChunkManager::readBlocks(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo, size_t n)
1549 {
1550     WE_COMP_DBG(cout << "backup blocks fbo:" << fbo << "  num:" << n << " file:" << pFile << endl;)
1551 
1552     // safety check
1553     if (pFile == NULL || n < 1)
1554     {
1555         return -1;
1556     }
1557 
1558     map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
1559 
1560     if (fpIt == fFilePtrMap.end())
1561     {
1562         return -1;
1563     }
1564 
1565     // the n blocks may cross more than one chunk
1566     // find the chunk ID and offset of the 1st fbo
1567     lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
1568     int idx = offset.quot;                      // current chunk id
1569     int rem = offset.rem;                       // offset in current chunk
1570     int num = UNCOMPRESSED_CHUNK_SIZE - rem;    // # of bytes available in current chunk
1571     int left = n * BYTE_PER_BLOCK;              // # of bytest to be read
1572     // # of bytes to be read from current chunk
1573     num = (left > num) ? num : left;
1574 
1575     do
1576     {
1577         ChunkData* chunkData = (fpIt->second)->findChunk(idx);
1578 
1579         WE_COMP_DBG(cout << "id:" << idx << " ofst:" << rem << " num:" << num <<
1580                     " left:" << left << endl;)
1581 
1582         // chunk is not already uncompressed
1583         if (chunkData == NULL)
1584         {
1585             if (fetchChunkFromFile(pFile, idx, chunkData) != NO_ERROR)
1586             {
1587                 return -1;
1588             }
1589         }
1590 
1591         // copy the data at fbo to readBuf
1592         memcpy(readBuf, chunkData->fBufUnCompressed + rem, num);
1593 
1594         // prepare for the next read
1595         readBuf += num;
1596         rem = 0;
1597         left -= num;
1598         num = (left > UNCOMPRESSED_CHUNK_SIZE) ? UNCOMPRESSED_CHUNK_SIZE : left;
1599         idx++;
1600     }
1601     while (left > 0);
1602 
1603     return n;
1604 }
1605 
1606 //------------------------------------------------------------------------------
1607 // Write the a block (writeBuf) into the fbo block of the specified file.
1608 // Updated chunk is not flushed to disk but left pending in the applicable
1609 // CompFileData object.
1610 //------------------------------------------------------------------------------
restoreBlock(IDBDataFile * pFile,const unsigned char * writeBuf,uint64_t fbo)1611 int ChunkManager::restoreBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo)
1612 {
1613     WE_COMP_DBG(cout << "restore blocks fbo:" << fbo << " file:" << pFile << endl;)
1614 
1615     // safety check
1616     if (pFile == NULL)
1617         return -1;
1618 
1619     map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
1620 
1621     if (fpIt == fFilePtrMap.end())
1622         return -1;
1623 
1624     // the n blocks may cross more than one chunk
1625     // find the chunk ID and offset of the 1st fbo
1626     lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
1627     ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
1628     WE_COMP_DBG(cout << "id:" << offset.quot << " ofst:" << offset.rem << endl;)
1629 
1630     // chunk is not already uncompressed
1631     if (chunkData == NULL)
1632     {
1633         if (fetchChunkFromFile(pFile, offset.quot, chunkData) != NO_ERROR)
1634             return -1;
1635     }
1636 
1637     // copy the data to chunk buffer
1638     memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK);
1639     chunkData->fWriteToFile = true;
1640 
1641     return BYTE_PER_BLOCK;
1642 }
1643 
1644 //------------------------------------------------------------------------------
1645 // Get the allocated block count from the header, for the specified file (pFile)
1646 //------------------------------------------------------------------------------
getBlockCount(IDBDataFile * pFile)1647 int ChunkManager::getBlockCount(IDBDataFile* pFile)
1648 {
1649     map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
1650     idbassert(fpIt != fFilePtrMap.end());
1651 
1652     return fCompressor.getBlockCount(fpIt->second->fFileHeader.fControlData);
1653 }
1654 
1655 //------------------------------------------------------------------------------
1656 // Set the FileOp pointer and dictionary flag
1657 //------------------------------------------------------------------------------
fileOp(FileOp * fileOp)1658 void ChunkManager::fileOp(FileOp* fileOp)
1659 {
1660     fFileOp = fileOp;
1661 
1662     if (fileOp)
1663     {
1664         setTransId(fileOp->getTransId());
1665     }
1666 }
1667 
1668 //------------------------------------------------------------------------------
1669 // Calculate and return the size of the chunk pointer header for a column of the
1670 // specified width.
1671 //------------------------------------------------------------------------------
calculateHeaderSize(int width)1672 int ChunkManager::calculateHeaderSize(int width)
1673 {
1674     int headerUnits = 1;
1675 
1676     // dictionary columns may need variable length header
1677     if (width > 8)
1678     {
1679         int extentsPerFile = Config::getExtentsPerSegmentFile();
1680         int rowsPerExtent = BRMWrapper::getInstance()->getExtentRows();
1681         int rowsPerFile = rowsPerExtent * extentsPerFile;
1682         int stringsPerBlock = 8180 / (width + 2);  // 8180 = 8192 - 12
1683 
1684         // BLOB is 1 string per block
1685         if (stringsPerBlock == 0)
1686             stringsPerBlock = 1;
1687 
1688         int blocksNeeded = rowsPerFile / stringsPerBlock;
1689         int blocksPerChunk = UNCOMPRESSED_CHUNK_SIZE / BYTE_PER_BLOCK;
1690         lldiv_t chunks = lldiv(blocksNeeded, blocksPerChunk);
1691         int chunksNeeded = chunks.quot + (chunks.rem ? 1 : 0); // round up
1692         int ptrsNeeded = chunksNeeded + 1; // 1 more ptr for 0 ptr marking end
1693         int ptrsIn4K = (4 * 1024) / sizeof(uint64_t);
1694         lldiv_t hdrs = lldiv(ptrsNeeded, ptrsIn4K);
1695         headerUnits = hdrs.quot + (hdrs.rem ? 1 : 0); // round up
1696 
1697         // Always include odd number of 4K ptr headers, so that when we add the
1698         // single 4K control header, the cumulative header space will be an even
1699         // multiple of an 8K boundary.
1700         if ((headerUnits % 2) == 0)
1701             headerUnits++;
1702     }
1703 
1704     headerUnits++;  // add the control data block
1705     return (headerUnits * COMPRESSED_FILE_HEADER_UNIT);
1706 }
1707 
1708 //------------------------------------------------------------------------------
1709 // Reallocate the chunks in a file to account for an expanding chunk that will
1710 // not fit in the available embedded free space.
1711 //------------------------------------------------------------------------------
reallocateChunks(CompFileData * fileData)1712 int ChunkManager::reallocateChunks(CompFileData* fileData)
1713 {
1714     WE_COMP_DBG(cout << "reallocate chunks in " << fileData->fFileName
1715                 << " (" << fileData->fFilePtr << ")" << endl;)
1716 
1717     // return value
1718     int rc = NO_ERROR;
1719 
1720     // original file info
1721     string origFileName = fileData->fFileName;
1722     IDBDataFile*  origFilePtr = fileData->fFilePtr;
1723     origFilePtr->flush();
1724 
1725     // back out the current pointers
1726     int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
1727     int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
1728     compress::CompChunkPtrList origPtrs;
1729 
1730     if (fCompressor.getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, origPtrs) != 0)
1731     {
1732         ostringstream oss;
1733         oss << "Chunk shifting failed, file:" << origFileName << " -- invalid header.";
1734         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1735         return ERR_COMP_PARSE_HDRS;
1736     }
1737 
1738     // get the chunks already in memory
1739     list<ChunkData*>& chunkList = fileData->fChunkList;
1740     chunkList.sort(chunkDataPtrLessCompare);
1741     list<ChunkData*>::iterator j = chunkList.begin();
1742     int numOfChunks = origPtrs.size();  // number of chunks that contain user data
1743     vector<ChunkData*> chunksTouched;   // chunk data is being modified, and in memory
1744 
1745     for (int i = 0; i < numOfChunks; i++)
1746         chunksTouched.push_back(NULL);
1747 
1748     // mark touched chunks
1749     while (j != chunkList.end())
1750     {
1751         chunksTouched[(*j)->fChunkId] = *j;
1752         j++;
1753     }
1754 
1755     // new file name and pointer
1756     string rlcFileName(fileData->fFileName + ".rlc");
1757     IDBDataFile*  rlcFilePtr = IDBDataFile::open(
1758                                    IDBPolicy::getType( rlcFileName.c_str(), IDBPolicy::WRITEENG ),
1759                                    rlcFileName.c_str(),
1760                                    "w+b",
1761                                    0,
1762                                    fileData->fColWidth );
1763 
1764     if (!rlcFilePtr)
1765     {
1766         ostringstream oss;
1767         oss << "Chunk shifting failed, file:" << origFileName << " -- cannot open rlc file.";
1768         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1769         return ERR_FILE_OPEN;
1770     }
1771 
1772     // log the recover information here
1773     string aDMLLogFileName;
1774     rc = writeLog(fTransId, "rlc", fileData->fFileName, aDMLLogFileName);
1775 
1776     if (rc != NO_ERROR)
1777     {
1778         ostringstream oss;
1779         oss << "log " << fileData->fFileName << ".rlc to DML logfile failed.";
1780         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1781 
1782         delete rlcFilePtr;
1783         rlcFilePtr = NULL;
1784         IDBPolicy::remove(rlcFileName.c_str());
1785         return rc;
1786     }
1787 
1788     // !!! May conside to use mmap to speed up the copy !!!
1789     // !!! copy the whole file and update the shifting part !!!
1790 
1791     // store updated chunk pointers
1792     uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
1793     ptrs[0] = origPtrs[0].first;    // the first chunk offset has no change.
1794 
1795     // bug3913, file size 0 after reallocate.
1796     // write the header, to be updated later, make sure there is someing in the file
1797     if ((rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData,
1798                         COMPRESSED_FILE_HEADER_UNIT, __LINE__)) == NO_ERROR)
1799         rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection,
1800                        ptrSecSize, __LINE__);
1801 
1802     int k = 0;
1803 
1804     for (; k < numOfChunks && rc == NO_ERROR; k++)
1805     {
1806         uint64_t chunkSize = 0;                // size of current chunk
1807         unsigned char* buf = NULL;             // output buffer
1808 
1809         // Find the current chunk size, and allocate the data -- buf point to the data.
1810         if (chunksTouched[k] == NULL)
1811         {
1812             // Chunks not touched will be copied to new file without being uncompressed first.
1813             //cout << "reallocateChunks: chunk has not been updated" << endl;
1814             chunkSize = origPtrs[k].second;
1815 
1816             // read disk data into compressed data buffer
1817             buf = (unsigned char*)fBufCompressed;
1818 
1819             if ((rc = setFileOffset(origFilePtr, origFileName, origPtrs[k].first, __LINE__)) != NO_ERROR)
1820             {
1821                 ostringstream oss;
1822                 oss << "set file offset failed @line:" << __LINE__ << "with retCode:" << rc
1823                     << " filename:" << origFileName;
1824                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1825                 continue;
1826             }
1827 
1828             if ((rc = readFile(origFilePtr, origFileName, buf, chunkSize, __LINE__)) != NO_ERROR)
1829             {
1830                 ostringstream oss;
1831                 oss << "readfile failed @line:" << __LINE__ << "with retCode:" << rc
1832                     << " filename:" << origFileName;
1833                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1834                 continue;
1835             }
1836         }
1837         else // chunksTouched[k] != NULL
1838         {
1839             // chunk has been updated, and in memory.
1840             //cout << "reallocateChunks: chunk has been updated" << endl;
1841             ChunkData* chunkData = chunksTouched[k];
1842             fLenCompressed = fMaxCompressedBufSize;
1843 
1844             if ((rc = fCompressor.compressBlock((char*)chunkData->fBufUnCompressed,
1845                                                 chunkData->fLenUnCompressed,
1846                                                 (unsigned char*)fBufCompressed,
1847                                                 fLenCompressed)) != 0)
1848             {
1849                 ostringstream oss;
1850                 oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc
1851                     << " filename:" << rlcFileName;
1852                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1853 
1854                 rc = ERR_COMP_COMPRESS;
1855                 continue;
1856             }
1857 
1858             WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to "
1859                         << fLenCompressed;)
1860 
1861             // shifting chunk, add padding space
1862             if ((rc = fCompressor.padCompressedChunks(
1863                           (unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize)) != 0)
1864             {
1865                 WE_COMP_DBG(cout << ", but padding failed." << endl;)
1866                 ostringstream oss;
1867                 oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc
1868                     << " filename:" << rlcFileName;
1869                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1870 
1871                 rc = ERR_COMP_PAD_DATA;
1872                 continue;
1873             }
1874 
1875             WE_COMP_DBG(cout << ", and padded to " << fLenCompressed;)
1876 
1877             buf = (unsigned char*)fBufCompressed;
1878             chunkSize = fLenCompressed;
1879         }
1880 
1881         // write is in sequence, no need to call setFileOffset
1882         //cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr << endl;
1883         rc = writeFile(rlcFilePtr, rlcFileName, buf, chunkSize, __LINE__);
1884 
1885         if (rc != NO_ERROR)
1886         {
1887             //cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr << " failed" << endl;
1888             ostringstream oss;
1889             oss << "write file failed @line:" << __LINE__ << "with retCode:" << rc
1890                 << " filename:" << rlcFileName;
1891             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1892             continue;
1893         }
1894 
1895         // Update the current chunk size.
1896         ptrs[k + 1] = ptrs[k] + chunkSize;
1897     }
1898 
1899     // up to now, everything OK: rc == NO_ERROR
1900     // remove all written chunks from active chunk list.
1901     j = chunkList.begin();
1902 
1903     while (j != chunkList.end())
1904     {
1905         ChunkData* chunkData = *j;
1906         fActiveChunks.remove(make_pair(fileData->fFileID, chunkData));
1907         fileData->fChunkList.remove(chunkData);
1908         delete chunkData;
1909 
1910         j = chunkList.begin();
1911     }
1912 
1913     // finally update the header
1914     if (rc == NO_ERROR)
1915         rc = setFileOffset(rlcFilePtr, rlcFileName, 0, __LINE__);
1916 
1917     if (rc == NO_ERROR)
1918         rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData,
1919                        COMPRESSED_FILE_HEADER_UNIT, __LINE__);
1920 
1921     if (rc == NO_ERROR)
1922         rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection,
1923                        ptrSecSize, __LINE__);
1924 
1925     if (rc != NO_ERROR)
1926     {
1927         struct timeval tv;
1928         gettimeofday(&tv, 0);
1929         struct tm ltm;
1930         localtime_r(reinterpret_cast<time_t*>(&tv.tv_sec), &ltm);
1931         char tmText[24];
1932     // this snprintf call causes a compiler warning b/c buffer size is less
1933     // then maximum string size.
1934 #if defined(__GNUC__) && __GNUC__ >= 7
1935 #pragma GCC diagnostic push
1936 #pragma GCC diagnostic ignored "-Wformat-truncation="
1937         snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
1938                  ltm.tm_year + 1900, ltm.tm_mon + 1,
1939                  ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
1940                  ltm.tm_sec, tv.tv_usec);
1941 #pragma GCC diagnostic pop
1942 #else
1943        snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
1944                  ltm.tm_year + 1900, ltm.tm_mon + 1,
1945                  ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
1946                  ltm.tm_sec, tv.tv_usec);
1947 #endif
1948         string dbgFileName(rlcFileName + tmText);
1949 
1950         ostringstream oss;
1951         oss << "Chunk shifting failed, file:" << origFileName;
1952 
1953         if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0)
1954             oss << ", rlc file is:" << dbgFileName;
1955 
1956         // write out the header for debugging in case the header in rlc file is bad or not updated.
1957         string rlcPtrFileName(dbgFileName + ".ptr");
1958 
1959         IDBDataFile*  rlcPtrFilePtr = IDBDataFile::open(
1960                                           IDBPolicy::getType(rlcPtrFileName.c_str(),
1961                                                   IDBPolicy::WRITEENG),
1962                                           rlcPtrFileName.c_str(),
1963                                           "w+b",
1964                                           0,
1965                                           fileData->fColWidth);
1966 
1967         if (rlcPtrFilePtr &&
1968                 (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData,
1969                            COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) &&
1970                 (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection,
1971                            ptrSecSize, __LINE__) == NO_ERROR))
1972         {
1973             oss << ", rlc file header in memory: " << rlcPtrFileName;
1974         }
1975         else
1976         {
1977             oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName;
1978         }
1979 
1980         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1981 
1982         closeFile(fileData);
1983         delete rlcFilePtr;
1984         rlcFilePtr = NULL;
1985 
1986         if (rlcPtrFilePtr != NULL)
1987         {
1988             delete rlcPtrFilePtr;
1989             rlcPtrFilePtr = NULL;
1990         }
1991 
1992         return rc;
1993     }
1994 
1995     // update the file pointer map w/ new file pointer
1996     //cout << "realloc1: remove ptr = " << fileData->fFilePtr << endl;
1997     fFilePtrMap.erase(fileData->fFilePtr);
1998     delete fileData->fFilePtr;
1999     fileData->fFilePtr = NULL;
2000     delete rlcFilePtr;
2001     rlcFilePtr = NULL;
2002 
2003     // put reallocated file size here for logging purpose.
2004     uint64_t fileSize = 0;
2005 
2006     if (rc == NO_ERROR)
2007     {
2008 #ifdef _MSC_VER
2009         //We need to do this early on so the ::rename() call below will work on Windows
2010         // we'll do it again later on, but that's life...
2011         //FIXME: there's a race here that a query will re-open the files before we can jigger
2012         // them around. We need to make sure PP is opening these files with the right perms
2013         // to allow another process to delete them.
2014         cacheutils::dropPrimProcFdCache();
2015 #endif
2016 
2017         // @bug3913, keep the original file until the new file is properly renamed.
2018         // 1. check the new file size is NOT 0, matching ptr[k].
2019         // 2. mv the current to be backup.
2020         // 3. rename the rlc file.
2021         // 4. check the file size again.
2022         // 5. verify each chunk.
2023         // 5. rm the bak file or mv bak file back.
2024 
2025         // check the new file size using two methods mostly for curiosity on 0 size file.
2026         // They can be removed because all chunks are to be verified after rename.
2027         if ( IDBPolicy::size( rlcFileName.c_str() ) != (int64_t) ptrs[k] )
2028         {
2029             ostringstream oss;
2030             oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize
2031                 << ", filename:" << rlcFileName;
2032             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2033             rc = ERR_COMP_RENAME_FILE;
2034         }
2035 
2036         if (rc == NO_ERROR)
2037         {
2038             if (fIsHdfs)
2039                 rc = swapTmpFile(rlcFileName, fileData->fFileName + ".tmp");
2040             else
2041                 rc = swapTmpFile(rlcFileName, fileData->fFileName);
2042         }
2043 
2044         if ((rc == NO_ERROR) &&
2045                 (rc = openFile(fileData, "r+b", fileData->fColWidth, true, __LINE__)) == NO_ERROR) // @bug 5572 HDFS tmp file
2046         {
2047 //          see TODO- above regarding setvbuf
2048 //          setvbuf(fileData->fFilePtr, fileData->fIoBuffer.get(), _IOFBF, fileData->fIoBSize);
2049             fileSize = fileData->fFilePtr->size();
2050 
2051             if (fileSize == ptrs[k])
2052             {
2053                 rc = verifyChunksAfterRealloc(fileData);
2054             }
2055             else
2056             {
2057                 ostringstream oss;
2058                 oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize
2059                     << ", filename:" << fileData->fFileName;
2060                 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2061                 rc = ERR_COMP_RENAME_FILE;
2062             }
2063 
2064             if (rc == NO_ERROR)
2065             {
2066                 fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
2067                 //cout << "realloc2: insert ptr = " << fileData->fFilePtr << endl;
2068                 // notify the PrimProc of unlinking original data file
2069                 fDropFdCache = true;
2070             }
2071         }
2072 
2073         if (!fIsHdfs)
2074         {
2075             string bakFileName(fileData->fFileName + ".orig");
2076 
2077             if (rc == NO_ERROR)
2078             {
2079                 // unlink the original file (remove is portable)
2080                 if (fFs.remove(bakFileName.c_str()) != 0)
2081                 {
2082                     ostringstream oss;
2083                     oss << "remove backup file " << bakFileName << " failed: " << strerror(errno);
2084 
2085                     // not much we can do, log an info message for manual cleanup
2086                     logMessage(oss.str(), logging::LOG_TYPE_INFO);
2087                 }
2088             }
2089             else
2090             {
2091                 // keep the bad file for debugging purpose
2092                 if (fFs.rename(fileData->fFileName.c_str(), rlcFileName.c_str()) == 0)
2093                 {
2094                     ostringstream oss;
2095                     oss << "data file after chunk shifting failed verification.";
2096                     logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2097                 }
2098 
2099                 // roll back the bak file
2100                 if (fFs.rename(bakFileName.c_str(), fileData->fFileName.c_str()) != 0)
2101                 {
2102                     ostringstream oss;
2103                     oss << "rename " << bakFileName << " to " << fileData->fFileName << " failed: "
2104                         << strerror(errno);
2105 
2106                     // must manually move it back
2107                     logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2108                 }
2109             }
2110         }
2111     }
2112 
2113     if (!fIsHdfs)
2114     {
2115         if (rc == NO_ERROR)
2116         {
2117             // remove the log file
2118             fFs.remove(aDMLLogFileName.c_str());
2119         }
2120         else
2121         {
2122             struct timeval tv;
2123             gettimeofday(&tv, 0);
2124             struct tm ltm;
2125             localtime_r(reinterpret_cast<time_t*>(&tv.tv_sec), &ltm);
2126             char tmText[24];
2127     // this snprintf call causes a compiler warning b/c buffer size is less
2128     // then maximum string size.
2129 #if defined(__GNUC__) && __GNUC__ >= 7
2130 #pragma GCC diagnostic push
2131 #pragma GCC diagnostic ignored "-Wformat-truncation="
2132             snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
2133                      ltm.tm_year + 1900, ltm.tm_mon + 1,
2134                      ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
2135                      ltm.tm_sec, tv.tv_usec);
2136 #pragma GCC diagnostic pop
2137 #else
2138            snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
2139                      ltm.tm_year + 1900, ltm.tm_mon + 1,
2140                      ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
2141                      ltm.tm_sec, tv.tv_usec);
2142 #endif
2143             string dbgFileName(rlcFileName + tmText);
2144 
2145             ostringstream oss;
2146             oss << "Chunk shifting failed, file:" << origFileName;
2147 
2148             if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0)
2149                 oss << ", rlc file is:" << dbgFileName;
2150 
2151             // write out the header for debugging in case the header in rlc file is bad.
2152             string rlcPtrFileName(dbgFileName + ".hdr");
2153             IDBDataFile*  rlcPtrFilePtr = IDBDataFile::open(
2154                                               IDBPolicy::getType(rlcPtrFileName.c_str(),
2155                                                       IDBPolicy::WRITEENG),
2156                                               rlcPtrFileName.c_str(),
2157                                               "w+b",
2158                                               0);
2159 
2160             if (rlcPtrFilePtr &&
2161                     (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData,
2162                                COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) &&
2163                     (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection,
2164                                ptrSecSize, __LINE__) == NO_ERROR))
2165             {
2166                 oss << ", rlc file header in memory: " << rlcPtrFileName;
2167             }
2168             else
2169             {
2170                 oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName;
2171             }
2172 
2173             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2174 
2175             closeFile(fileData);
2176 
2177             if (rlcFilePtr != NULL)
2178             {
2179                 delete rlcFilePtr;
2180                 rlcFilePtr = NULL;
2181             }
2182 
2183             if (rlcPtrFilePtr != NULL)
2184             {
2185                 delete rlcPtrFilePtr;
2186                 rlcPtrFilePtr = NULL;
2187             }
2188         }
2189     }
2190 
2191     return rc;
2192 }
2193 
2194 //------------------------------------------------------------------------------
2195 // Verify chunks can be uncompressed after a chunk shift.
2196 //------------------------------------------------------------------------------
verifyChunksAfterRealloc(CompFileData * fileData)2197 int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
2198 {
2199     int rc = NO_ERROR;
2200 
2201     // read in the header
2202     if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData,
2203                        COMPRESSED_FILE_HEADER_UNIT, __LINE__)) != NO_ERROR)
2204     {
2205         ostringstream oss;
2206         oss << "Failed to read control header from new " << fileData->fFileName << ", roll back";
2207         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2208 
2209         return rc;
2210     }
2211 
2212     // make sure the header is valid
2213     if ((rc = fCompressor.verifyHdr(fileData->fFileHeader.fControlData)) != 0)
2214     {
2215         ostringstream oss;
2216         oss << "Invalid header in new " << fileData->fFileName << ", roll back";
2217         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2218 
2219         return rc;
2220     }
2221 
2222     int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
2223     int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
2224 
2225     // read in the pointer section in header
2226     if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection,
2227                        ptrSecSize, __LINE__)) != NO_ERROR)
2228     {
2229         ostringstream oss;
2230         oss << "Failed to read pointer header from new " << fileData->fFileName << "@" << __LINE__;
2231         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2232         return rc;
2233     }
2234 
2235     // get pointer list
2236     compress::CompChunkPtrList ptrs;
2237 
2238     if (fCompressor.getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
2239     {
2240         ostringstream oss;
2241         oss << "Failed to parse pointer list from new " << fileData->fFileName << "@" << __LINE__;
2242         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2243         return ERR_COMP_PARSE_HDRS;
2244     }
2245 
2246     // now verify each chunk
2247     ChunkData chunkData;
2248     int numOfChunks = ptrs.size();  // number of chunks in the file
2249 
2250     for (int i = 0; i < numOfChunks && rc == NO_ERROR; i++)
2251     {
2252         unsigned int chunkSize = ptrs[i].second;
2253 
2254         if ((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, ptrs[i].first, __LINE__)))
2255         {
2256             ostringstream oss;
2257             oss << "Failed to setFileOffset new " << fileData->fFileName << "@" << __LINE__;
2258             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2259             continue;
2260         }
2261 
2262         if ((rc = readFile(fileData->fFilePtr, fileData->fFileName,
2263                            fBufCompressed, chunkSize, __LINE__)))
2264         {
2265             ostringstream oss;
2266             oss << "Failed to read chunk from new " << fileData->fFileName << "@" << __LINE__;
2267             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2268             continue;
2269         }
2270 
2271         // uncompress the read in buffer
2272         unsigned int dataLen = sizeof(chunkData.fBufUnCompressed);
2273 
2274         if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
2275                                         (unsigned char*)chunkData.fBufUnCompressed, dataLen) != 0)
2276         {
2277             ostringstream oss;
2278             oss << "Failed to uncompress chunk new " << fileData->fFileName << "@" << __LINE__;
2279             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2280             rc = ERR_COMP_UNCOMPRESS;
2281             continue;
2282         }
2283     }
2284 
2285     return rc;
2286 }
2287 
2288 //------------------------------------------------------------------------------
2289 // Log an error message for the specified error code, and msg level.
2290 //------------------------------------------------------------------------------
logMessage(int code,int level,int lineNum,int fromLine) const2291 void ChunkManager::logMessage(int code, int level, int lineNum, int fromLine) const
2292 {
2293     ostringstream oss;
2294     oss << ec.errorString(code) << " @line:" << lineNum;
2295 
2296     if (fromLine != -1)
2297         oss << " called from line:" << fromLine;
2298 
2299     logMessage(oss.str(), level);
2300 }
2301 
2302 //------------------------------------------------------------------------------
2303 // Log the requested error message using the specified msg level.
2304 //------------------------------------------------------------------------------
logMessage(const string & msg,int level) const2305 void ChunkManager::logMessage(const string& msg, int level) const
2306 {
2307     logging::Message::Args args;
2308     args.add(msg);
2309 
2310     fSysLogger->logMessage((logging::LOG_TYPE) level, logging::M0080, args,
2311                            //FIXME: store session id in class to pass on to LogginID...
2312                            logging::LoggingID(SUBSYSTEM_ID_WE, 0, fTransId));
2313 }
2314 
2315 //------------------------------------------------------------------------------
2316 // Replace the cdf file with the updated tmp file.
2317 //------------------------------------------------------------------------------
swapTmpFile(const string & src,const string & dest)2318 int ChunkManager::swapTmpFile(const string& src, const string& dest)
2319 {
2320     // return value
2321     int rc = NO_ERROR;
2322 
2323     // if no change to the cdf, the tmp may not exist, no need to swap.
2324     if (!fFs.exists(src.c_str()))
2325         return rc;
2326 
2327     ssize_t srcFileSize = IDBPolicy::size(src.c_str());
2328 
2329     if (srcFileSize <= 0)
2330     {
2331         ostringstream oss;
2332         oss << "swapTmpFile aborted. Source file size = " << srcFileSize;
2333         logMessage(oss.str(), logging::LOG_TYPE_CRITICAL);
2334         rc = ERR_COMP_RENAME_FILE;
2335 
2336         return rc;
2337     }
2338 
2339     errno = 0;
2340     // save the original file
2341     string orig(dest + ".orig");
2342     fFs.remove(orig.c_str());  // remove left overs
2343 
2344     if (fFs.rename(dest.c_str(), orig.c_str()) != 0)
2345     {
2346         ostringstream oss;
2347         oss << "rename " << dest << " to " << orig << " failed: " << strerror(errno);
2348         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2349         rc = ERR_COMP_RENAME_FILE;
2350     }
2351 
2352     // rename the new file
2353     if (rc == NO_ERROR && fFs.rename(src.c_str(), dest.c_str()) != 0)
2354     {
2355         ostringstream oss;
2356         oss << "rename " << src << " to " << dest << " failed: " << strerror(errno);
2357         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2358         rc = ERR_COMP_RENAME_FILE;
2359     }
2360 
2361     if (rc == NO_ERROR && fFs.remove(orig.c_str()) != 0)
2362         rc = ERR_COMP_REMOVE_FILE;
2363 
2364     return rc;
2365 }
2366 
2367 
2368 //------------------------------------------------------------------------------
2369 // Construct a DML log file name based on transaction ID, etc.
2370 //------------------------------------------------------------------------------
getDMLLogFileName(string & aDMLLogFileName,const TxnID & txnId) const2371 int ChunkManager::getDMLLogFileName(string& aDMLLogFileName, const TxnID& txnId) const
2372 {
2373     config::Config* config = config::Config::makeConfig();
2374     string prefix = config->getConfig("SystemConfig", "DBRMRoot");
2375 
2376     if (prefix.length() == 0)
2377     {
2378         ostringstream oss;
2379         oss << "trans " << txnId << ":Need a valid DBRMRoot entry in Calpont configuation file";
2380         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2381         return ERR_DML_LOG_NAME;
2382     }
2383 
2384     uint64_t pos =  prefix.find_last_of ("/") ;
2385 
2386     if (pos != string::npos)
2387     {
2388         aDMLLogFileName = prefix.substr(0, pos + 1); //Get the file path
2389     }
2390     else
2391     {
2392         ostringstream oss;
2393         oss << "trans " << txnId << ":Cannot find the dbrm directory ("
2394             << prefix.c_str() << ") for the DML log file";
2395         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2396         return ERR_DML_LOG_NAME;
2397     }
2398 
2399     ostringstream oss;
2400     oss << txnId << "_" << fLocalModuleId;
2401     aDMLLogFileName += "DMLLog_" + oss.str();
2402 
2403     return NO_ERROR;
2404 }
2405 
2406 
2407 //------------------------------------------------------------------------------
2408 // clear the DML log file
2409 //------------------------------------------------------------------------------
startTransaction(const TxnID & txnId) const2410 int ChunkManager::startTransaction(const TxnID& txnId) const
2411 {
2412     // this step is for HDFS update/delete only.
2413     if (!fIsHdfs || fIsBulkLoad)
2414         return NO_ERROR;
2415 
2416     // Construct the DML log file name
2417     string aDMLLogFileName;
2418 
2419     if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
2420         return ERR_DML_LOG_NAME;
2421 
2422     // truncate the existing file
2423     boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
2424                 IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
2425                 aDMLLogFileName.c_str(), "wb", 0));
2426 
2427     if (!aDMLLogFile)
2428     {
2429         ostringstream oss;
2430         oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened.";
2431         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2432         return ERR_OPEN_DML_LOG;
2433     }
2434 
2435     return NO_ERROR;
2436 }
2437 
2438 
2439 //------------------------------------------------------------------------------
2440 // Backup cdf file and replace the with the updated tmp file.
2441 //------------------------------------------------------------------------------
confirmTransaction(const TxnID & txnId) const2442 int ChunkManager::confirmTransaction(const TxnID& txnId) const
2443 {
2444     // return value
2445     int rc = NO_ERROR;
2446 
2447     // this step is for HDFS update/delete only.
2448     if (!fIsHdfs || fIsBulkLoad)
2449         return rc;
2450 
2451     string aDMLLogFileName;
2452 
2453     if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
2454         return ERR_DML_LOG_NAME;
2455 
2456     //Open log file
2457     boost::scoped_ptr<IDBDataFile> aDMLLogFile;
2458     aDMLLogFile.reset(IDBDataFile::open(
2459                           IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
2460                           aDMLLogFileName.c_str(), "r", 0));
2461 
2462     if (!aDMLLogFile)
2463     {
2464         ostringstream oss;
2465         oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened";
2466         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2467         return ERR_OPEN_DML_LOG;
2468     }
2469 
2470     ssize_t logSize = fFs.size(aDMLLogFileName.c_str());
2471     boost::scoped_array<char> buf(new char[logSize]);
2472 
2473     if (aDMLLogFile->read(buf.get(), logSize) != logSize)
2474         return ERR_FILE_READ;
2475 
2476     std::istringstream strstream(string(buf.get(), logSize));
2477     std::string backUpFileType;
2478     std::string filename;
2479     int64_t size;
2480     int64_t offset;
2481     ConfirmHdfsDbFile confirmHdfs;
2482 
2483     while (strstream >> backUpFileType >> filename >> size >> offset)
2484     {
2485         std::string confirmErrMsg;
2486         rc = confirmHdfs.confirmDbFileChange( backUpFileType,
2487                                               filename, confirmErrMsg );
2488 
2489         if (rc != NO_ERROR)
2490         {
2491             logMessage(confirmErrMsg, logging::LOG_TYPE_ERROR);
2492             break;
2493         }
2494     }
2495 
2496     return rc;
2497 }
2498 
2499 
2500 //------------------------------------------------------------------------------
2501 // Finalize the chages
2502 // if success, remove the orig
2503 // otherwise,  move the orig back to cdf
2504 //------------------------------------------------------------------------------
endTransaction(const TxnID & txnId,bool success) const2505 int ChunkManager::endTransaction(const TxnID& txnId, bool success) const
2506 {
2507     // return value
2508     int rc = NO_ERROR;
2509 
2510     // this step is for HDFS update/delete only.
2511     if (!fIsHdfs || fIsBulkLoad)
2512         return rc;
2513 
2514     string aDMLLogFileName;
2515 
2516     if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
2517         return ERR_DML_LOG_NAME;
2518 
2519     //Open log file
2520     boost::scoped_ptr<IDBDataFile> aDMLLogFile;
2521     aDMLLogFile.reset(IDBDataFile::open(
2522                           IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
2523                           aDMLLogFileName.c_str(), "r", 0));
2524 
2525     if (!aDMLLogFile)
2526     {
2527         ostringstream oss;
2528         oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened";
2529         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2530         return ERR_OPEN_DML_LOG;
2531     }
2532 
2533     ssize_t logSize = fFs.size(aDMLLogFileName.c_str());
2534     ssize_t logRead = 0;
2535     boost::scoped_array<char> buf(new char[logSize]);
2536 
2537     if ((logRead = aDMLLogFile->read(buf.get(), logSize)) != logSize)
2538     {
2539         ostringstream oss;
2540         oss << "trans " << txnId << ":File " << aDMLLogFileName << " filed to read: "
2541             << logRead << "/" << logSize;
2542         logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2543         return ERR_FILE_READ;
2544     }
2545 
2546     std::istringstream strstream(string(buf.get(), logSize));
2547     std::string backUpFileType;
2548     std::string filename;
2549     int64_t size;
2550     int64_t offset;
2551     ConfirmHdfsDbFile confirmHdfs;
2552 
2553     while (strstream >> backUpFileType >> filename >> size >> offset)
2554     {
2555         std::string finalizeErrMsg;
2556         rc = confirmHdfs.endDbFileChange( backUpFileType,
2557                                           filename, success, finalizeErrMsg );
2558 
2559         if (rc != NO_ERROR)
2560         {
2561             logMessage(finalizeErrMsg, logging::LOG_TYPE_ERROR);
2562             break;
2563         }
2564     }
2565 
2566     // final clean up or recover
2567     if (rc == NO_ERROR)
2568         rc = fFs.remove(aDMLLogFileName.c_str());
2569 
2570     return rc;
2571 }
2572 
checkFixLastDictChunk(const FID & fid,uint16_t root,uint32_t partition,uint16_t segment)2573 int ChunkManager::checkFixLastDictChunk(const FID& fid,
2574                                         uint16_t root,
2575                                         uint32_t partition,
2576                                         uint16_t segment)
2577 {
2578 
2579     int rc = 0;
2580     //Find the file info
2581     FileID fileID(fid, root, partition, segment);
2582     map<FileID, CompFileData*>::const_iterator mit = fFileMap.find(fileID);
2583 
2584     WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition
2585                 << " seg:" << segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ")
2586                 << "found." << endl;)
2587 
2588     // Get CompFileData pointer for existing Dictionary store file mit->second is CompFileData
2589     if (mit != fFileMap.end())
2590     {
2591 
2592         int headerSize = fCompressor.getHdrSize(mit->second->fFileHeader.fControlData);
2593         int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
2594 
2595         // get pointer list
2596         compress::CompChunkPtrList ptrs;
2597 
2598         if (fCompressor.getPtrList(mit->second->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
2599         {
2600             ostringstream oss;
2601             oss << "Failed to parse pointer list from new " << mit->second->fFileName << "@" << __LINE__;
2602             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2603             return ERR_COMP_PARSE_HDRS;
2604         }
2605 
2606         // now verify last chunk
2607         ChunkData* chunkData;
2608         int numOfChunks = ptrs.size();  // number of chunks in the file
2609         unsigned int chunkSize = ptrs[numOfChunks - 1].second;
2610 
2611         if ((rc = setFileOffset(mit->second->fFilePtr, mit->second->fFileName, ptrs[numOfChunks - 1].first, __LINE__)))
2612         {
2613             ostringstream oss;
2614             oss << "Failed to setFileOffset new " << mit->second->fFileName << "@" << __LINE__;
2615             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2616             return rc;
2617         }
2618 
2619         if ((rc = readFile(mit->second->fFilePtr, mit->second->fFileName,
2620                            fBufCompressed, chunkSize, __LINE__)))
2621         {
2622             ostringstream oss;
2623             oss << "Failed to read chunk from new " << mit->second->fFileName << "@" << __LINE__;
2624             logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2625             return rc;
2626         }
2627 
2628         // uncompress the read in buffer
2629         chunkData = new ChunkData(numOfChunks - 1);
2630         unsigned int dataLen = sizeof(chunkData->fBufUnCompressed);
2631 
2632         if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
2633                                         (unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
2634         {
2635             mit->second->fChunkList.push_back(chunkData);
2636             fActiveChunks.push_back(make_pair(mit->second->fFileID, chunkData));
2637             //replace this chunk with empty chunk
2638             uint64_t blocks = 512;
2639 
2640             if ((numOfChunks - 1) == 0)
2641             {
2642                 char* hdr = mit->second->fFileHeader.fControlData;
2643 
2644                 if (fCompressor.getBlockCount(hdr) < 512)
2645                     blocks = 256;
2646             }
2647 
2648             dataLen = 8192 * blocks;
2649 
2650             // load the uncompressed buffer with empty values.
2651             char* buf = chunkData->fBufUnCompressed;
2652             chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
2653             initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
2654             chunkData->fLenUnCompressed = dataLen;
2655             chunkData->fWriteToFile = true;
2656         }
2657     }
2658 
2659     return rc;
2660 }
2661 
2662 }
2663 
2664 // vim:ts=4 sw=4:
2665