1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: we_chunkmanager.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
20
21 #include <unistd.h>
22 #include <sys/stat.h>
23 #include <sys/time.h>
24 #include <iostream>
25 #include <cstdio>
26 #include <ctime>
27 //#define NDEBUG
28 #include <cassert>
29 using namespace std;
30
31 #include <boost/scoped_array.hpp>
32 #include <boost/scoped_ptr.hpp>
33
34 #include "logger.h"
35 #include "cacheutils.h"
36
37 #include "we_chunkmanager.h"
38
39 #include "we_macro.h"
40 #include "we_brm.h"
41 #include "we_config.h"
42 #include "we_confirmhdfsdbfile.h"
43 #include "we_fileop.h"
44 #include "../dictionary/we_dctnry.h"
45 #include "we_stats.h"
46 using namespace execplan;
47
48 #include "IDBDataFile.h"
49 #include "IDBPolicy.h"
50 #include "cloudio/SMFileSystem.h"
51 using namespace idbdatafile;
52
53 namespace
54 {
55
56 // Function to compare 2 ChunkData pointers.
chunkDataPtrLessCompare(WriteEngine::ChunkData * p1,WriteEngine::ChunkData * p2)57 bool chunkDataPtrLessCompare(WriteEngine::ChunkData* p1, WriteEngine::ChunkData* p2)
58 {
59 return (p1->fChunkId) < (p2->fChunkId);
60 }
61
62 }
63
64 namespace WriteEngine
65 {
66
67 extern int NUM_BLOCKS_PER_INITIAL_EXTENT; // defined in we_dctnry.cpp
68 extern WErrorCodes ec; // defined in we_log.cpp
69
70 const int COMPRESSED_CHUNK_SIZE = compress::IDBCompressInterface::maxCompressedSize(UNCOMPRESSED_CHUNK_SIZE) + 64 + 3 + 8 * 1024;
71
72 //------------------------------------------------------------------------------
73 // Search for the specified chunk in fChunkList.
74 //------------------------------------------------------------------------------
findChunk(int64_t id) const75 ChunkData* CompFileData::findChunk(int64_t id) const
76 {
77 ChunkData* pChunkData = NULL;
78
79 for (list<ChunkData*>::const_iterator lit = fChunkList.begin(); lit != fChunkList.end(); ++lit)
80 {
81 if ((*lit)->fChunkId == id)
82 {
83 pChunkData = *lit;
84 break;
85 }
86 }
87
88 return pChunkData;
89 }
90
91 //------------------------------------------------------------------------------
92 // ChunkManager constructor
93 //------------------------------------------------------------------------------
ChunkManager()94 ChunkManager::ChunkManager() : fMaxActiveChunkNum(100), fLenCompressed(0), fIsBulkLoad(false),
95 fDropFdCache(false), fIsInsert(false), fIsHdfs(IDBPolicy::useHdfs()),
96 fFileOp(0), fSysLogger(NULL), fTransId(-1),
97 fLocalModuleId(Config::getLocalModuleID()),
98 fFs(fIsHdfs ?
99 IDBFileSystem::getFs(IDBDataFile::HDFS) :
100 IDBPolicy::useCloud() ?
101 IDBFileSystem::getFs(IDBDataFile::CLOUD) :
102 IDBFileSystem::getFs(IDBDataFile::BUFFERED))
103 {
104 fUserPaddings = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
105 fCompressor.numUserPaddingBytes(fUserPaddings);
106 fMaxCompressedBufSize = COMPRESSED_CHUNK_SIZE + fUserPaddings;
107 fBufCompressed = new char[fMaxCompressedBufSize];
108 fSysLogger = new logging::Logger(SUBSYSTEM_ID_WE);
109 logging::MsgMap msgMap;
110 msgMap[logging::M0080] = logging::Message(logging::M0080);
111 fSysLogger->msgMap( msgMap );
112 }
113
114 //------------------------------------------------------------------------------
115 // ChunkManager destructor
116 //------------------------------------------------------------------------------
~ChunkManager()117 ChunkManager::~ChunkManager()
118 {
119 std::map<FID, FID> columnOids;
120 cleanUp(columnOids);
121
122 delete [] fBufCompressed;
123 fBufCompressed = NULL;
124
125 delete fSysLogger;
126 fSysLogger = NULL;
127 }
128
129
130 //------------------------------------------------------------------------------
131 // Log a message into the DML recovery log.
132 //------------------------------------------------------------------------------
writeLog(TxnID txnId,string backUpFileType,string filename,string & aDMLLogFileName,int64_t size,int64_t offset) const133 int ChunkManager::writeLog(TxnID txnId, string backUpFileType, string filename,
134 string& aDMLLogFileName, int64_t size, int64_t offset) const
135 {
136 //Get log file name
137 if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
138 return ERR_DML_LOG_NAME;
139
140 //Open file
141 boost::scoped_ptr<IDBDataFile> aDMLLogFile;
142
143 try
144 {
145 aDMLLogFile.reset(IDBDataFile::open(
146 IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
147 aDMLLogFileName.c_str(), "a+b", 0));
148
149 if (!aDMLLogFile)
150 {
151 ostringstream oss;
152 oss << "trans " << txnId << ":File " << aDMLLogFileName
153 << " can't be opened (no exception thrown)";
154 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
155 return ERR_OPEN_DML_LOG;
156 }
157 }
158 catch (exception& e)
159 {
160 ostringstream oss;
161 oss << "trans " << txnId << ":File " << aDMLLogFileName
162 << " can't be opened: " << e.what();
163 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
164 return ERR_OPEN_DML_LOG;
165 }
166
167 //Write the log
168 ostringstream entry;
169 entry << backUpFileType << '\n' << filename << '\n' << size << '\n' << offset << '\n';
170 string tmp = entry.str();
171 aDMLLogFile->seek(0, SEEK_END);
172 aDMLLogFile->tell();
173 aDMLLogFile->write(tmp.c_str(), tmp.size());
174
175 return NO_ERROR;
176 }
177
removeBackups(TxnID txnId)178 int ChunkManager::removeBackups(TxnID txnId)
179 {
180 // HDFS update/delete is handled differently
181 if (fIsHdfs || fIsBulkLoad)
182 return NO_ERROR;
183
184 string aDMLLogFileName;
185
186 if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
187 return ERR_DML_LOG_NAME;
188
189 if (IDBPolicy::exists(aDMLLogFileName.c_str()))
190 {
191 boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
192 IDBPolicy::getType(aDMLLogFileName.c_str(),
193 IDBPolicy::WRITEENG),
194 aDMLLogFileName.c_str(), "r", 0));
195
196 if (aDMLLogFile) //need recover
197 {
198 ssize_t fileSize = aDMLLogFile->size();
199 boost::scoped_array<char> buf(new char[fileSize]);
200
201 if (aDMLLogFile->read(buf.get(), fileSize) != fileSize)
202 return ERR_FILE_READ;
203
204 std::istringstream strstream(string(buf.get(), fileSize));
205 std::string backUpFileType;
206 std::string filename;
207 int64_t size;
208 int64_t offset;
209
210 while (strstream >> backUpFileType >> filename >> size >> offset)
211 {
212 if (backUpFileType.compare("tmp") == 0 )
213 {
214 filename += ".tmp";
215 IDBPolicy::remove(filename.c_str());
216 }
217 else
218 {
219 std::string backFileName(filename);
220
221 if (backUpFileType.compare("chk") == 0 )
222 backFileName += ".chk";
223 else
224 backFileName += ".hdr";
225
226 IDBPolicy::remove(backFileName.c_str());
227 }
228 }
229
230 aDMLLogFile.reset(); // closes the file in IDBDataFile destructor.
231
232 IDBPolicy::remove(aDMLLogFileName.c_str());
233 }
234 else
235 {
236 return ERR_OPEN_DML_LOG;
237 }
238 }
239
240 return NO_ERROR;
241 }
242
243 //------------------------------------------------------------------------------
244 // Get/Return IDBDataFile* for specified OID, root, partition, and segment.
245 // Function is to be used to open column files.
246 // If the IDBDataFile* is not found, then a segment file will be opened using the
247 // mode (mode) and I/O buffer size (size) that is given. Name of the resulting
248 // file is returned in filename.
249 //
250 // For Bulk HDFS usage:
251 // If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
252 //------------------------------------------------------------------------------
253 // @bug 5572 - HDFS usage: add *.tmp file backup flag
getFilePtr(const Column & column,uint16_t root,uint32_t partition,uint16_t segment,string & filename,const char * mode,int size,bool useTmpSuffix) const254 IDBDataFile* ChunkManager::getFilePtr(const Column& column,
255 uint16_t root,
256 uint32_t partition,
257 uint16_t segment,
258 string& filename,
259 const char* mode,
260 int size,
261 bool useTmpSuffix) const
262 {
263 CompFileData* fileData = getFileData(column.dataFile.fid, root, partition, segment,
264 filename, mode, size, column.colDataType, column.colWidth, useTmpSuffix);
265 return (fileData ? fileData->fFilePtr : NULL);
266 }
267
268 //------------------------------------------------------------------------------
269 // Get/Return IDBDataFile* for specified OID, root, partition, and segment.
270 // Function is to be used to open dictionary store files.
271 // If the IDBDataFile* is not found, then a segment file will be opened using the
272 // mode (mode) and I/O buffer size (size) that is given. Name of the resulting
273 // file is returned in filename.
274 //
275 // For Bulk HDFS usage:
276 // If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
277 //------------------------------------------------------------------------------
278 // @bug 5572 - HDFS usage: add *.tmp file backup flag
getFilePtr(const FID & fid,uint16_t root,uint32_t partition,uint16_t segment,string & filename,const char * mode,int size,bool useTmpSuffix) const279 IDBDataFile* ChunkManager::getFilePtr(const FID& fid,
280 uint16_t root,
281 uint32_t partition,
282 uint16_t segment,
283 string& filename,
284 const char* mode,
285 int size,
286 bool useTmpSuffix) const
287 {
288 CompFileData* fileData = getFileData(fid, root, partition, segment, filename, mode, size,
289 CalpontSystemCatalog::VARCHAR, 8, useTmpSuffix, true); // hard code (varchar, 8) are dummy values for dictionary file
290 return (fileData ? fileData->fFilePtr : NULL);
291 }
292
293 //------------------------------------------------------------------------------
294 // Get/Return CompFileData* for specified column OID, root, partition, and
295 // segment. If the IDBDataFile* is not found, then a segment file will be opened
296 // using the mode (mode) and I/O buffer size (size) that is given. Name of
297 // the resulting file is returned in filename.
298 // If the CompFileData* needs to be created, it will also be created and
299 // inserted into the fFileMap and fFilePtrMap for later use.
300 //
301 // For Bulk HDFS usage:
302 // If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
303 //------------------------------------------------------------------------------
304 // @bug 5572 - HDFS usage: add *.tmp file backup flag
getFileData(const FID & fid,uint16_t root,uint32_t partition,uint16_t segment,string & filename,const char * mode,int size,const CalpontSystemCatalog::ColDataType colDataType,int colWidth,bool useTmpSuffix,bool dctnry) const305 CompFileData* ChunkManager::getFileData(const FID& fid,
306 uint16_t root,
307 uint32_t partition,
308 uint16_t segment,
309 string& filename,
310 const char* mode,
311 int size,
312 const CalpontSystemCatalog::ColDataType colDataType,
313 int colWidth,
314 bool useTmpSuffix,
315 bool dctnry) const
316 {
317 FileID fileID(fid, root, partition, segment);
318 map<FileID, CompFileData*>::const_iterator mit = fFileMap.find(fileID);
319
320 WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition
321 << " seg:" << segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ")
322 << "found." << endl;)
323
324 // Get CompFileData pointer for existing Column or Dictionary store file
325 if (mit != fFileMap.end())
326 {
327 filename = mit->second->fFileName;
328 return mit->second;
329 }
330
331 // New CompFileData pointer needs to be created
332 char name[FILE_NAME_SIZE];
333
334 if (fFileOp->getFileName(fid, name, root, partition, segment) != NO_ERROR)
335 return NULL;
336
337 CompFileData* fileData = new CompFileData(fileID, fid, colDataType, colWidth);
338 fileData->fFileName = filename = name;
339
340 if (openFile(fileData, mode, colWidth, useTmpSuffix, __LINE__) != NO_ERROR)
341 {
342 WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;)
343 delete fileData;
344 return NULL;
345 }
346
347 fileData->fIoBuffer.reset(new char[size]);
348 fileData->fIoBSize = size;
349 // TODO-There is no current way to make this setvbuf call as IDBDataFile only
350 // accepts the USE_VBUF at construction time and then uses a buffer that it manages
351 // Can either propagate an option through the openFile() call above and let
352 // IDBDataFile manage it internally or expose a new setBuffer() option.
353 // setvbuf(fileData->fFilePtr, fileData->fIoBuffer.get(), _IOFBF, size);
354 fileData->fDctnryCol = dctnry;
355 WE_COMP_DBG(cout << "open file* " << name << endl;)
356
357 // get the control data in header.
358 if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData,
359 COMPRESSED_FILE_HEADER_UNIT, __LINE__) != NO_ERROR)
360 {
361 WE_COMP_DBG(cout << "Failed to read control header." << endl;)
362 delete fileData;
363 return NULL;
364 }
365
366 // make sure the header is valid
367 if (fCompressor.verifyHdr(fileData->fFileHeader.fControlData) != 0)
368 {
369 WE_COMP_DBG(cout << "Invalid header." << endl;)
370 delete fileData;
371 return NULL;
372 }
373
374 int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
375 int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
376
377 if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT)
378 {
379 // >8K header, dictionary width > 128
380 fileData->fFileHeader.fPtrSection = new char[ptrSecSize];
381 fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection);
382 }
383
384 // read in the pointer section in header
385 if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection,
386 ptrSecSize, __LINE__) != NO_ERROR)
387 {
388 WE_COMP_DBG(cout << "Failed to read pointer header." << endl;)
389 delete fileData;
390 return NULL;
391 }
392
393 fFileMap.insert(make_pair(fileID, fileData));
394 //cout << "Insert into fFilemap root:partition:seg:fileID = " <<root<<":"<< partition<<":"<< segment<<":"<<fid<<endl;
395 fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
396 return fileData;
397 }
398
399 //------------------------------------------------------------------------------
400 // Return new IDBDataFile* for specified dictionary OID, root, partition, segment, and
401 // width. A new segment file will be opened using the mode (mode) and I/O
402 // buffer size (size) that is given. Name of the resulting file is returned
403 // in filename.
404 // A corresponding CompFileData* is created and inserted into fFileMap and
405 // fFilePtrMap for later use.
406 //------------------------------------------------------------------------------
createDctnryFile(const FID & fid,int64_t width,uint16_t root,uint32_t partition,uint16_t segment,const char * filename,const char * mode,int size)407 IDBDataFile* ChunkManager::createDctnryFile(const FID& fid,
408 int64_t width,
409 uint16_t root,
410 uint32_t partition,
411 uint16_t segment,
412 const char* filename,
413 const char* mode,
414 int size)
415 {
416 FileID fileID(fid, root, partition, segment);
417 CompFileData* fileData = new CompFileData(fileID, fid, CalpontSystemCatalog::VARCHAR, width);
418 fileData->fFileName = filename;
419
420 if (openFile(fileData, mode, width, false, __LINE__) != NO_ERROR) // @bug 5572 HDFS tmp file
421 {
422 WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;)
423 delete fileData;
424 return NULL;
425 }
426
427 fileData->fIoBuffer.reset(new char[size]);
428 fileData->fIoBSize = size;
429 // see TODO- comment above
430 // setvbuf(fileData->fFilePtr, fileData->fIoBuffer.get(), _IOFBF, size);
431 fileData->fDctnryCol = true;
432 WE_COMP_DBG(cout << "create file* " << filename << endl;)
433 int hdrSize = calculateHeaderSize(width);
434 int ptrSecSize = hdrSize - COMPRESSED_FILE_HEADER_UNIT;
435
436 if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT)
437 {
438 // >8K header, dictionary width > 128
439 fileData->fFileHeader.fPtrSection = new char[ptrSecSize];
440 fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection);
441 }
442
443 fCompressor.initHdr(fileData->fFileHeader.fControlData, fileData->fFileHeader.fPtrSection,
444 fFileOp->compressionType(), hdrSize);
445
446 if (writeHeader(fileData, __LINE__) != NO_ERROR)
447 {
448 WE_COMP_DBG(cout << "Failed to write header." << endl;)
449 delete fileData;
450 return NULL;
451 }
452
453 //@Bug 4977 remove log file
454 removeBackups(fTransId);
455 fFileMap.insert(make_pair(fileID, fileData));
456 fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
457 return fileData->fFilePtr;
458 }
459
460 //------------------------------------------------------------------------------
461 // Read the block for the specified fbo, from pFile's applicable chunk, and
462 // into readBuf.
463 //------------------------------------------------------------------------------
readBlock(IDBDataFile * pFile,unsigned char * readBuf,uint64_t fbo)464 int ChunkManager::readBlock(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo)
465 {
466 map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
467
468 if (fpIt == fFilePtrMap.end())
469 {
470 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
471 return ERR_COMP_FILE_NOT_FOUND;
472 }
473
474 // find the chunk ID and offset in the chunk
475 lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
476 ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
477
478 WE_COMP_DBG(cout << "fbo:" << fbo << " chunk id:" << offset.quot << " offset:" << offset.rem
479 << " chunkData*:" << chunkData << endl;)
480
481 int rc = NO_ERROR;
482
483 // chunk is not already uncompressed
484 if (chunkData == NULL)
485 rc = fetchChunkFromFile(pFile, offset.quot, chunkData);
486
487 if (rc == NO_ERROR)
488 {
489 // copy the data at fbo to readBuf
490 memcpy(readBuf, chunkData->fBufUnCompressed + offset.rem, BYTE_PER_BLOCK);
491 }
492
493 return rc;
494 }
495
496 //------------------------------------------------------------------------------
497 // Write writeBuf to the block for the specified fbo, within pFile's applicable
498 // chunk.
499 //------------------------------------------------------------------------------
saveBlock(IDBDataFile * pFile,const unsigned char * writeBuf,uint64_t fbo)500 int ChunkManager::saveBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo)
501 {
502 WE_COMP_DBG(cout << "save block fbo:" << fbo << endl;)
503 map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
504
505 if (fpIt == fFilePtrMap.end())
506 {
507 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
508 return ERR_COMP_FILE_NOT_FOUND;
509 }
510
511 // find the chunk ID and offset in the chunk
512 lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
513 ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
514
515 int rc = NO_ERROR;
516
517 // chunk is not already read in
518 if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, offset.quot, chunkData)) != NO_ERROR))
519 return rc;
520
521 WE_COMP_DBG(cout << "fbo:" << fbo << " chunk id:" << offset.quot << " offset:" << offset.rem
522 << " saved @" << (&(chunkData->fBufUnCompressed) + offset.rem) << endl;)
523
524 memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK);
525 chunkData->fWriteToFile = true;
526
527 // if the chunk is full for insert, flush it
528 //cout << "current offset.rem/8192 = " << offset.rem/8192 << endl;
529 if (fIsInsert && (offset.rem == MAXOFFSET_PER_CHUNK))
530 {
531 if (((rc = writeChunkToFile(fpIt->second, chunkData)) == NO_ERROR) &&
532 ((rc = writeHeader(fpIt->second, __LINE__)) == NO_ERROR))
533 {
534 //cout << "saveblock flushed the full chunk"<<endl;
535 pFile->flush();
536
537 //@Bug 4977 remove log file
538 removeBackups(fTransId);
539 }
540 }
541
542 return rc;
543 }
544
545 //------------------------------------------------------------------------------
546 // Flush all pending chunks to their corresponding segment files.
547 //------------------------------------------------------------------------------
flushChunks(int rc,const std::map<FID,FID> & columOids)548 int ChunkManager::flushChunks(int rc, const std::map<FID, FID>& columOids)
549 {
550 // shall fail the the statement if failed here
551 WE_COMP_DBG(cout << "flushChunks." << endl;)
552
553 int k = fFilePtrMap.size();
554 std::map<FID, FID>::const_iterator it;
555
556 if ((rc == NO_ERROR) && fIsInsert)
557 {
558 while (k-- > 0 && rc == NO_ERROR)
559 {
560 map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
561 // sort the chunk list first
562 CompFileData* fileData = i->second;
563 it = columOids.find (fileData->fFid);
564
565 if (it != columOids.end())
566 {
567 list<ChunkData*>& chunkList = fileData->fChunkList;
568 chunkList.sort(chunkDataPtrLessCompare);
569 list<ChunkData*>::iterator j = chunkList.begin();
570
571 while (j != chunkList.end())
572 {
573 if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR)
574 break;
575
576 // write chunk to file removes the written chunk from the list
577 j = chunkList.begin();
578 }
579
580 if (rc != NO_ERROR)
581 break;
582
583 // finally update the header
584 if ((rc = writeHeader(fileData, __LINE__)) != NO_ERROR)
585 break;
586
587 //@Bug 4977 remove log file
588 removeBackups(fTransId);
589
590 // closeFile invalidates the iterator
591 closeFile(fileData);
592 }
593 }
594 }
595 else if (rc == NO_ERROR)
596 {
597 while (k-- > 0 && rc == NO_ERROR)
598 {
599 map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
600 // sort the chunk list first
601 CompFileData* fileData = i->second;
602
603 list<ChunkData*>& chunkList = fileData->fChunkList;
604 chunkList.sort(chunkDataPtrLessCompare);
605 list<ChunkData*>::iterator j = chunkList.begin();
606
607 while (j != chunkList.end())
608 {
609 if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR)
610 break;
611
612 // write chunk to file removes the written chunk from the list
613 j = chunkList.begin();
614 }
615
616 if (rc != NO_ERROR)
617 break;
618
619 // finally update the header
620 if ((rc = writeHeader(fileData, __LINE__)) != NO_ERROR)
621 break;
622
623 //@Bug 4977 remove log file
624 removeBackups(fTransId);
625
626 // closeFile invalidates the iterator
627 closeFile(fileData);
628 }
629 }
630
631 if (rc != NO_ERROR)
632 {
633 cleanUp(columOids);
634 return rc;
635 }
636
637 //fActiveChunks.clear();
638 //fFileMap.clear();
639 //fFilePtrMap.clear();
640
641 if (fDropFdCache)
642 {
643 cacheutils::dropPrimProcFdCache();
644 fDropFdCache = false;
645 }
646
647 return NO_ERROR;
648 }
649
650 //------------------------------------------------------------------------------
651 // Load and uncompress the requested chunk (id) for the specified file (pFile),
652 // into chunkData.
653 // id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE
654 // If the active chunk list is already full, then we flush the oldest pending
655 // chunk to disk, to make room for fetching the requested chunk.
656 // If the header ptr for the requested chunk is 0 (or has length 0), then
657 // chunkData is initialized with a new empty chunk.
658 //------------------------------------------------------------------------------
fetchChunkFromFile(IDBDataFile * pFile,int64_t id,ChunkData * & chunkData)659 int ChunkManager::fetchChunkFromFile(IDBDataFile* pFile, int64_t id, ChunkData*& chunkData)
660 {
661 // return value
662 int rc = NO_ERROR;
663
664 // remove the oldest one if the max active chunk number is reached.
665 WE_COMP_DBG(cout << "fActiveChunks.size:" << fActiveChunks.size() << endl;)
666 //cout << "fetchChunkFromFile1: pFile = " << pFile << endl;
667 map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
668
669 if (fpIt == fFilePtrMap.end())
670 {
671 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
672 return ERR_COMP_FILE_NOT_FOUND;
673 }
674
675 CompFileData* fileData = fpIt->second;
676
677 if (fActiveChunks.size() >= fMaxActiveChunkNum)
678 {
679 list<std::pair<FileID, ChunkData*> >::iterator lIt = fActiveChunks.begin();
680
681 if (!fIsBulkLoad && !(fpIt->second->fDctnryCol))
682 {
683 while ((lIt->first == fpIt->second->fFileID) && (lIt != fActiveChunks.end()))
684 lIt++;
685 }
686
687 if (lIt != fActiveChunks.end())
688 {
689 map<FileID, CompFileData*>::iterator fIt = fFileMap.find(lIt->first);
690
691 if (fIt == fFileMap.end())
692 {
693 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
694 return ERR_COMP_FILE_NOT_FOUND;
695 }
696
697 if ((rc = writeChunkToFile(fIt->second, lIt->second)) != NO_ERROR)
698 {
699 ostringstream oss;
700 oss << "write inactive chunk to file failed:" << fIt->second->fFileName << "@"
701 << __LINE__;
702 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
703 return rc;
704 }
705
706 if ((rc = writeHeader(fIt->second, __LINE__)) != NO_ERROR)
707 {
708 // logged by writeHeader
709 return rc;
710 }
711
712 //@Bug 4977 remove the log files
713 removeBackups(fTransId);
714 }
715 }
716
717 #ifdef PROFILE
718 Stats::startParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF);
719 #endif
720 // get a new ChunkData object
721 chunkData = new ChunkData(id);
722 pFile = fileData->fFilePtr; //update to get the reopened file ptr.
723 fileData->fChunkList.push_back(chunkData);
724 fActiveChunks.push_back(make_pair(fileData->fFileID, chunkData));
725
726 // read the compressed chunk from file
727 uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
728
729 if (ptrs[id] && ptrs[id + 1]) // compressed chunk data exists
730 {
731 // safety check
732 if (ptrs[id] >= ptrs[id + 1])
733 {
734 logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__);
735 return ERR_COMP_WRONG_PTR;
736 }
737
738 unsigned int chunkSize = (ptrs[id + 1] - ptrs[id]);
739
740 if ((rc = setFileOffset(pFile, fileData->fFileName, ptrs[id], __LINE__)) != NO_ERROR ||
741 (rc = readFile(pFile, fileData->fFileName, fBufCompressed, chunkSize, __LINE__)) !=
742 NO_ERROR)
743 {
744 // logged by setFileOffset/readFile
745 return rc;
746 }
747
748 // uncompress the read in buffer
749 unsigned int dataLen = sizeof(chunkData->fBufUnCompressed);
750
751 if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
752 (unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
753 {
754 if (fIsFix)
755 {
756 uint64_t blocks = 512;
757
758 if (id == 0)
759 {
760 char* hdr = fileData->fFileHeader.fControlData;
761
762 if (fCompressor.getBlockCount(hdr) < 512)
763 blocks = 256;
764 }
765
766 dataLen = 8192 * blocks;
767
768 // load the uncompressed buffer with empty values.
769 char* buf = chunkData->fBufUnCompressed;
770 chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
771
772 if (fileData->fDctnryCol)
773 initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
774 else
775 initializeColumnChunk(buf, fileData);
776 }
777 else
778 {
779 logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
780 return ERR_COMP_UNCOMPRESS;
781 }
782 }
783
784 //@bug 3313-Remove validation that incorrectly fails for long string store files
785 // WE_COMP_DBG(cout << "chunk uncompressed to " << dataLen << endl;)
786 // if (dataLen < (id+1) * BYTE_PER_BLOCK)
787 // {
788 // logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
789 // return ERR_COMP_UNCOMPRESS;
790 // }
791
792 chunkData->fLenUnCompressed = dataLen;
793 }
794 else // new chunk
795 {
796 if (id == 0 && ptrs[id] == 0) // if the 1st ptr is not set for new extent
797 {
798 ptrs[0] = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
799 }
800
801 // load the uncompressed buffer with empty values.
802 char* buf = chunkData->fBufUnCompressed;
803 chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
804
805 if (fileData->fDctnryCol)
806 initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
807 else
808 initializeColumnChunk(buf, fileData);
809 }
810
811 #ifdef PROFILE
812 Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF);
813 #endif
814
815 return NO_ERROR;
816 }
817
818 //------------------------------------------------------------------------------
819 // Initialize a column based chunk with the applicable empty values.
820 //------------------------------------------------------------------------------
initializeColumnChunk(char * buf,CompFileData * fileData)821 void ChunkManager::initializeColumnChunk(char* buf, CompFileData* fileData)
822 {
823 int size = UNCOMPRESSED_CHUNK_SIZE;
824 uint64_t emptyVal = fFileOp->getEmptyRowValue(fileData->fColDataType, fileData->fColWidth);
825 fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, fileData->fColWidth);
826 }
827
828 //------------------------------------------------------------------------------
829 // Initialize a dictionary based chunk with empty blocks.
830 //------------------------------------------------------------------------------
initializeDctnryChunk(char * buf,int size)831 void ChunkManager::initializeDctnryChunk(char* buf, int size)
832 {
833 Dctnry* dctnry = dynamic_cast<Dctnry*>(fFileOp);
834 memset(buf, 0, size);
835 char* end = buf + size;
836
837 while (buf < end)
838 {
839 dctnry->copyDctnryHeader(buf);
840 buf += BYTE_PER_BLOCK;
841 }
842 }
843
844 //------------------------------------------------------------------------------
845 // Compress and write the requested chunk (id) for fileData, to disk.
846 // id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE
847 //------------------------------------------------------------------------------
writeChunkToFile(CompFileData * fileData,int64_t id)848 int ChunkManager::writeChunkToFile(CompFileData* fileData, int64_t id)
849 {
850 ChunkData* chunkData = fileData->findChunk(id);
851
852 if (!chunkData)
853 {
854 logMessage(ERR_COMP_CHUNK_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
855 return ERR_COMP_CHUNK_NOT_FOUND;
856 }
857
858 return writeChunkToFile(fileData, chunkData);
859 }
860
861 //------------------------------------------------------------------------------
862 // Compress and write the given chunk for fileData to disk.
863 // If the chunk has been flagged for writing (fWriteToFile is true),
864 // then subsequent chunks in the file are shifted down as needed, if the com-
865 // pressed chunk will not fit in the currently available embedded free space.
866 //------------------------------------------------------------------------------
writeChunkToFile(CompFileData * fileData,ChunkData * chunkData)867 int ChunkManager::writeChunkToFile(CompFileData* fileData, ChunkData* chunkData)
868 {
869 WE_COMP_DBG(cout << "write chunk id=" << chunkData->fChunkId << " data "
870 << ((chunkData->fWriteToFile) ? "changed" : "NOT changed") << endl;)
871
872 int rc = NO_ERROR; // return value
873 bool needReallocateChunks = false;
874 int64_t spaceAvl = 0;
875
876 if (chunkData->fWriteToFile)
877 {
878 #ifdef PROFILE
879 Stats::startParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS);
880 #endif
881 // compress the chunk before writing it to file
882 fLenCompressed = fMaxCompressedBufSize;
883
884 if (fCompressor.compressBlock((char*)chunkData->fBufUnCompressed,
885 chunkData->fLenUnCompressed,
886 (unsigned char*)fBufCompressed,
887 fLenCompressed) != 0)
888 {
889 logMessage(ERR_COMP_COMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
890 return ERR_COMP_COMPRESS;
891 }
892
893 WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to "
894 << fLenCompressed;)
895
896 // Removed padding code here, will add padding for the last chunk.
897 // The existing chunks are already correctly aligned, use the padding to absort chunk
898 // size increase when update. This improves the performance with less chunk shifting.
899
900 #ifdef PROFILE
901 Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS);
902 #endif
903
904 // need more work if the new compressed buffer is larger
905 uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
906 ChunkId chunkId = chunkData->fChunkId;
907
908 if (ptrs[chunkId + 1] > 0)
909 spaceAvl = (ptrs[chunkId + 1] - ptrs[chunkId]);
910
911 WE_COMP_DBG(cout << ", available space:" << spaceAvl;)
912
913 bool lastChunk = true;
914 // usable chunkIds are 0 .. POINTERS_IN_HEADER-2
915 // [chunkId+0] is the start offset of current chunk.
916 // [chunkId+1] is the start offset of next chunk, the offset diff is current chunk size.
917 // [chunkId+2] is 0 or not indicates if the next chunk exists.
918 int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
919 int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
920 int64_t usablePtrIds = (ptrSecSize / sizeof(uint64_t)) - 2;
921
922 if (chunkId < usablePtrIds) // make sure [chunkId+2] has valid value
923 lastChunk = (ptrs[(chunkId + 2)] == 0);
924
925 WE_COMP_DBG(cout << ", last chunk:" << (lastChunk ? "true" : "false") << endl;)
926
927 if (spaceAvl < 0)
928 {
929 logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__);
930 return ERR_COMP_WRONG_PTR;
931 }
932
933 if ((int64_t)fLenCompressed <= spaceAvl)
934 {
935 // There is enough sapce.
936 if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR)
937 {
938 // log in writeCompressedChunk by setFileOffset and writeFile
939 return rc;
940 }
941 }
942 else if (lastChunk)
943 {
944 // add padding space if the chunk is written first time
945 if (fCompressor.padCompressedChunks(
946 (unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize) != 0)
947 {
948 WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padding failed." << endl;)
949
950 logMessage(ERR_COMP_PAD_DATA, logging::LOG_TYPE_ERROR, __LINE__);
951 return ERR_COMP_PAD_DATA;
952 }
953
954 WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padded to " << fLenCompressed;)
955
956 // This is the last chunk, safe to write any length of data.
957 //@Bug 3888. Assign the error code
958 if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR)
959 {
960 // log in writeCompressedChunk by setFileOffset and writeFile
961 return rc;
962 }
963
964 // Update the current chunk size.
965 ptrs[chunkId + 1] = ptrs[chunkId] + fLenCompressed;
966 }
967 else
968 {
969 needReallocateChunks = true;
970 }
971 }
972
973 if (!needReallocateChunks)
974 {
975 fActiveChunks.remove(make_pair(fileData->fFileID, chunkData));
976 fileData->fChunkList.remove(chunkData);
977 delete chunkData;
978 }
979 else
980 {
981 ostringstream oss;
982 oss << "Compressed data does not fit, caused a chunk shifting @line:" << __LINE__
983 << " filename:" << fileData->fFileName << ", chunkId:" << chunkData->fChunkId
984 << " data size:" << fLenCompressed << "/available:" << spaceAvl << " -- shifting ";
985
986 if ((rc = reallocateChunks(fileData)) == NO_ERROR)
987 {
988 oss << "SUCCESS";
989 logMessage(oss.str(), logging::LOG_TYPE_INFO);
990 }
991 else
992 {
993 oss << "FAILED";
994 logMessage(oss.str(), logging::LOG_TYPE_CRITICAL);
995 }
996 }
997
998 return rc;
999 }
1000
1001 //------------------------------------------------------------------------------
1002 // Write the current compressed data in fBufCompressed to the specified segment
1003 // file offset (offset) and file (fileData). For DML usage, "size" specifies
1004 // how many bytes to backup, for error recovery. cpimport.bin does it's own
1005 // backup and error recovery, so "size" is not applicable for bulk import usage.
1006 //------------------------------------------------------------------------------
writeCompressedChunk(CompFileData * fileData,int64_t offset,int64_t size)1007 int ChunkManager::writeCompressedChunk(CompFileData* fileData, int64_t offset, int64_t size)
1008 {
1009 int rc = NO_ERROR;
1010
1011 if (!fIsBulkLoad && !fIsHdfs)
1012 {
1013 // backup current chunk to chk file
1014 string chkFileName(fileData->fFileName + ".chk");
1015 string aDMLLogFileName;
1016 unsigned char* buf = new unsigned char[size];
1017
1018 if (((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__)) ==
1019 NO_ERROR)
1020 &&
1021 ((rc = readFile(fileData->fFilePtr, fileData->fFileName, buf, size, __LINE__)) ==
1022 NO_ERROR))
1023 {
1024 IDBDataFile* chkFilePtr = IDBDataFile::open(
1025 IDBPolicy::getType(chkFileName.c_str(),
1026 IDBPolicy::WRITEENG),
1027 chkFileName.c_str(),
1028 "w+b",
1029 0 );
1030
1031 if (chkFilePtr)
1032 {
1033 rc = writeFile(chkFilePtr, chkFileName, buf, size, __LINE__);
1034 delete chkFilePtr;
1035 }
1036
1037 delete [] buf;
1038
1039 if (rc != NO_ERROR)
1040 {
1041 IDBPolicy::remove(chkFileName.c_str());
1042 return rc;
1043 }
1044
1045 // log the chunk information for recovery
1046 rc = writeLog(fTransId, "chk", fileData->fFileName, aDMLLogFileName, size, offset);
1047
1048 if (rc != NO_ERROR)
1049 {
1050 ostringstream oss;
1051 oss << "log " << fileData->fFileName << ".chk to DML logfile failed.";
1052 logMessage(oss.str(), logging::LOG_TYPE_INFO);
1053 return rc;
1054 }
1055 }
1056
1057 // write out the compressed data + padding
1058 if ((rc == NO_ERROR) && ((rc = writeCompressedChunk_(fileData, offset)) == NO_ERROR))
1059 {
1060 if ((fileData->fFilePtr)->flush() != 0) //@Bug3162.
1061 {
1062 rc = ERR_FILE_WRITE;
1063 ostringstream oss;
1064 oss << "Failed to flush " << fileData->fFileName << " @line: " << __LINE__;
1065 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1066 }
1067 }
1068 }
1069 else
1070 {
1071 // write out the compressed data + padding
1072 rc = writeCompressedChunk_(fileData, offset);
1073 }
1074
1075 return rc;
1076 }
1077
1078 //------------------------------------------------------------------------------
1079 // Actually write the current compressed data in fBufCompressed to the specified
1080 // segment file offset (offset) and file (fileData)
1081 //------------------------------------------------------------------------------
writeCompressedChunk_(CompFileData * fileData,int64_t offset)1082 inline int ChunkManager::writeCompressedChunk_(CompFileData* fileData, int64_t offset)
1083 {
1084 int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__);
1085
1086 if (rc != NO_ERROR)
1087 return rc;
1088
1089 return writeFile(fileData->fFilePtr, fileData->fFileName,
1090 fBufCompressed, fLenCompressed, __LINE__);
1091 }
1092
1093 //------------------------------------------------------------------------------
1094 // Open the specified segment file (fileData) using the given mode.
1095 // ln is the source code line number of the code invoking this operation
1096 // (ex __LINE__); this is used for logging error messages.
1097 //
1098 // useTmpSuffix controls whether HDFS file is opened with USE_TMPFILE bit set.
1099 // Typically set for bulk load, single insert, and batch insert, when adding
1100 // rows to an "existing" file.
1101 // Typically always set for DML update and delete.
1102 //
1103 // @bug 5572 - HDFS usage: add *.tmp file backup flag to API
1104 //------------------------------------------------------------------------------
openFile(CompFileData * fileData,const char * mode,int colWidth,bool useTmpSuffix,int ln) const1105 int ChunkManager::openFile(CompFileData* fileData, const char* mode, int colWidth,
1106 bool useTmpSuffix, int ln) const
1107 {
1108 int rc = NO_ERROR;
1109 unsigned opts = IDBDataFile::USE_VBUF;
1110
1111 if (fIsHdfs)
1112 {
1113 if (useTmpSuffix)
1114 {
1115 if (!fIsBulkLoad)
1116 {
1117 // keep a DML log for confirm or cleanup the .tmp file
1118 string aDMLLogFileName;
1119
1120 if ((rc = writeLog(fTransId, "tmp", fileData->fFileName,
1121 aDMLLogFileName, 0)) != NO_ERROR)
1122 {
1123 ostringstream oss;
1124 oss << "Failed to put " << fileData->fFileName << " into DML log.";
1125 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1126
1127 return rc;
1128 }
1129 }
1130
1131 opts |= IDBDataFile::USE_TMPFILE;
1132 }
1133 }
1134
1135 fileData->fFilePtr = IDBDataFile::open(
1136 IDBPolicy::getType( fileData->fFileName.c_str(), IDBPolicy::WRITEENG ),
1137 fileData->fFileName.c_str(), mode, opts, colWidth);
1138
1139 if (fileData->fFilePtr == NULL)
1140 {
1141 ostringstream oss;
1142 oss << "Failed to open compressed data file " << fileData->fFileName << " @line: " << ln;
1143 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1144 rc = ERR_COMP_OPEN_FILE;
1145 }
1146
1147 return rc;
1148 }
1149
1150 //------------------------------------------------------------------------------
1151 // Set the file offset for the specified segment file (fileData) using the
1152 // given offset.
1153 // ln is the source code line number of the code invoking this operation
1154 // (ex __LINE__); this is used for logging error messages. Likewise, filename
1155 // is used for logging any error message.
1156 //------------------------------------------------------------------------------
setFileOffset(IDBDataFile * pFile,const string & fileName,off64_t offset,int ln) const1157 int ChunkManager::setFileOffset(IDBDataFile* pFile, const string& fileName, off64_t offset, int ln) const
1158 {
1159 int rc = NO_ERROR;
1160
1161 if (pFile->seek(offset, SEEK_SET) != 0) rc = ERR_COMP_SET_OFFSET;
1162
1163 if (rc != NO_ERROR)
1164 {
1165 ostringstream oss;
1166 oss << "Failed to set offset in compressed data file " << fileName
1167 << " @line: " << ln << " offset:" << offset;
1168 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1169 }
1170
1171 return rc;
1172 }
1173
1174 //------------------------------------------------------------------------------
1175 // Read the requested number of bytes (size) from the specified file pFile.
1176 // ln is the source code line number of the code invoking this operation
1177 // (ex __LINE__); this is used for logging error messages. Likewise, filename
1178 // is used for logging any error message.
1179 //------------------------------------------------------------------------------
readFile(IDBDataFile * pFile,const string & fileName,void * buf,size_t size,int ln) const1180 int ChunkManager::readFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const
1181 {
1182 size_t bytes = pFile->read(buf, size);
1183
1184 if (bytes != size)
1185 {
1186 ostringstream oss;
1187 oss << "Failed to read from compressed data file " << fileName
1188 << " @line: " << ln << " read/expect:" << bytes << "/" << size;
1189 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1190 return ERR_COMP_READ_FILE;
1191 }
1192
1193 return NO_ERROR;
1194 }
1195
1196 //------------------------------------------------------------------------------
1197 // Write the requested number of bytes (size) to the specified file pFile.
1198 // ln is the source code line number of the code invoking this operation
1199 // (ex __LINE__); this is used for logging error messages. Likewise, filename
1200 // is used for logging any error message.
1201 //------------------------------------------------------------------------------
writeFile(IDBDataFile * pFile,const string & fileName,void * buf,size_t size,int ln) const1202 int ChunkManager::writeFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const
1203 {
1204 size_t bytes = pFile->write(buf, size);
1205
1206 if (bytes != size)
1207 {
1208 ostringstream oss;
1209 oss << "Failed to write to compressed data file " << fileName
1210 << " @line: " << ln << " written/expect:" << bytes << "/" << size;
1211 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1212 return ERR_COMP_WRITE_FILE;
1213 }
1214
1215 return NO_ERROR;
1216 }
1217
1218 //------------------------------------------------------------------------------
1219 // Close the specified segment file (fileData), and remove the
1220 // corresponding CompFileData reference from fFileMap and fFilePtrMap.
1221 //------------------------------------------------------------------------------
closeFile(CompFileData * fileData)1222 int ChunkManager::closeFile(CompFileData* fileData)
1223 {
1224 int rc = NO_ERROR;
1225
1226 WE_COMP_DBG(cout << "closing file:" << fileData->fFileName << endl;)
1227 fFileMap.erase(fileData->fFileID);
1228 fFilePtrMap.erase(fileData->fFilePtr);
1229
1230 if (fileData->fFilePtr)
1231 delete fileData->fFilePtr;
1232
1233 delete fileData;
1234 fileData = NULL;
1235
1236 return rc;
1237 }
1238
1239 //------------------------------------------------------------------------------
1240 // Write the chunk pointers headers for the specified file (fileData).
1241 // ln is the source code line number of the code invoking this operation
1242 // (ex __LINE__); this is used for logging error messages. For DML usage,
1243 // backup for recovery is also performed. This step is skipped for cpimport.bin
1244 // as bulk import performs its own backup and recovery operations.
1245 //------------------------------------------------------------------------------
writeHeader(CompFileData * fileData,int ln)1246 int ChunkManager::writeHeader(CompFileData* fileData, int ln)
1247 {
1248 int rc = NO_ERROR;
1249 int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
1250 int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
1251
1252 if (!fIsHdfs && !fIsBulkLoad)
1253 {
1254 // write a backup header
1255 string hdrFileName(fileData->fFileName + ".hdr");
1256 string aDMLLogFileName;
1257 IDBDataFile* hdrFilePtr = IDBDataFile::open(
1258 IDBPolicy::getType(hdrFileName.c_str(),
1259 IDBPolicy::WRITEENG),
1260 hdrFileName.c_str(),
1261 "w+b",
1262 0,
1263 fileData->fColWidth );
1264
1265 if (hdrFilePtr)
1266 {
1267 rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fControlData,
1268 COMPRESSED_FILE_HEADER_UNIT, __LINE__);
1269
1270 if (rc == NO_ERROR)
1271 rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fPtrSection,
1272 ptrSecSize, __LINE__);
1273
1274 delete hdrFilePtr;
1275 }
1276
1277 if (rc == NO_ERROR)
1278 {
1279 // log the header information for recovery
1280 rc = writeLog(fTransId, "hdr", fileData->fFileName, aDMLLogFileName, headerSize);
1281
1282 if (rc != NO_ERROR)
1283 {
1284 ostringstream oss;
1285 oss << "log " << fileData->fFileName << ".hdr to DML logfile failed.";
1286 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1287 }
1288
1289 if ((rc == NO_ERROR) && (rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR)
1290 {
1291 (fileData->fFilePtr)->flush();
1292 }
1293 }
1294 else
1295 {
1296 IDBPolicy::remove(hdrFileName.c_str());
1297 }
1298 }
1299 else
1300 {
1301 if ((rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR)
1302 {
1303 (fileData->fFilePtr)->flush();
1304 }
1305 }
1306
1307 if (rc != NO_ERROR)
1308 {
1309 ostringstream oss;
1310 oss << "write header failed: " << fileData->fFileName << "call from line:" << ln;
1311 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1312 }
1313
1314 return rc;
1315 }
1316
1317 //------------------------------------------------------------------------------
1318 // Write the chunk pointers headers for the specified file (fileData).
1319 // ln is the source code line number of the code invoking this operation
1320 // (ex __LINE__); this is used for logging error messages. For DML usage,
1321 // backup for recovery is also performed. This step is skipped for cpimport.bin
1322 // as bulk import performs its own backup and recovery operations.
1323 //------------------------------------------------------------------------------
writeHeader_(CompFileData * fileData,int ptrSecSize)1324 inline int ChunkManager::writeHeader_(CompFileData* fileData, int ptrSecSize)
1325 {
1326 int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, 0, __LINE__);
1327
1328 if (rc == NO_ERROR)
1329 rc = writeFile(fileData->fFilePtr, fileData->fFileName,
1330 fileData->fFileHeader.fControlData,
1331 COMPRESSED_FILE_HEADER_UNIT, __LINE__);
1332
1333 if (rc == NO_ERROR)
1334 rc = writeFile(fileData->fFilePtr, fileData->fFileName,
1335 fileData->fFileHeader.fPtrSection,
1336 ptrSecSize, __LINE__);
1337
1338 return rc;
1339 }
1340
1341 //------------------------------------------------------------------------------
1342 // For the specified segment file (pFile), read in an abbreviated/compressed
1343 // chunk extent, uncompress, and expand to a full chunk for a full extent.
1344 //------------------------------------------------------------------------------
expandAbbrevColumnExtent(IDBDataFile * pFile,uint64_t emptyVal,int width)1345 int ChunkManager::expandAbbrevColumnExtent(IDBDataFile* pFile, uint64_t emptyVal, int width)
1346 {
1347 map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
1348
1349 if (i == fFilePtrMap.end())
1350 {
1351 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1352 return ERR_COMP_FILE_NOT_FOUND;
1353 }
1354
1355 int rc = NO_ERROR;
1356 // fetch the initial chunk if not already done.
1357 ChunkData* chunkData = (i->second)->findChunk(0);
1358
1359 if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR))
1360 return rc;
1361
1362 // buf points to the end of existing data
1363 char* buf = chunkData->fBufUnCompressed + chunkData->fLenUnCompressed;
1364 int size = UNCOMPRESSED_CHUNK_SIZE - chunkData->fLenUnCompressed;
1365 fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, width);
1366 chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
1367 chunkData->fWriteToFile = true;
1368 //(writeChunkToFile(i->second, chunkData));
1369 //(writeHeader(i->second, __LINE__));
1370 return NO_ERROR;
1371 }
1372
1373 //------------------------------------------------------------------------------
1374 // For column segment file:
1375 // Increment the block count stored in the chunk header used to track how many
1376 // blocks are allocated to the corresponding segment file.
1377 //------------------------------------------------------------------------------
updateColumnExtent(IDBDataFile * pFile,int addBlockCount)1378 int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount)
1379 {
1380 map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
1381
1382 if (i == fFilePtrMap.end())
1383 {
1384 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1385 return ERR_COMP_FILE_NOT_FOUND;
1386 }
1387
1388 CompFileData* pFileData = i->second;
1389
1390 if (!pFileData)
1391 {
1392 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1393 return ERR_COMP_FILE_NOT_FOUND;
1394 }
1395
1396 int rc = NO_ERROR;
1397 char* hdr = pFileData->fFileHeader.fControlData;
1398 fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
1399 ChunkData* chunkData = (pFileData)->findChunk(0);
1400
1401 if (chunkData != NULL)
1402 {
1403 if ((rc = writeChunkToFile(pFileData, chunkData)) == NO_ERROR)
1404 {
1405 rc = writeHeader(pFileData, __LINE__);
1406
1407 if ( rc == NO_ERROR)
1408 {
1409 //@Bug 4977 remove log files
1410 removeBackups(fTransId);
1411 }
1412 }
1413 else
1414 {
1415 ostringstream oss;
1416 oss << "write chunk to file failed when updateColumnExtent: " << pFileData->fFileName;
1417 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1418 }
1419 }
1420
1421 pFile->flush();
1422 return rc;
1423 }
1424
1425 //------------------------------------------------------------------------------
1426 // For dictionary store segment file:
1427 // Increment the block count stored in the chunk header used to track how many
1428 // blocks are allocated to the corresponding segment file.
1429 //------------------------------------------------------------------------------
updateDctnryExtent(IDBDataFile * pFile,int addBlockCount)1430 int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount)
1431 {
1432 map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
1433
1434 if (i == fFilePtrMap.end())
1435 {
1436 logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
1437 return ERR_COMP_FILE_NOT_FOUND;
1438 }
1439
1440 int rc = NO_ERROR;
1441 // fetch the initial chunk if not already done.
1442 ChunkData* chunkData = (i->second)->findChunk(0);
1443
1444 if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR))
1445 return rc; // logged by fetchChunkFromFile
1446
1447 char* hdr = i->second->fFileHeader.fControlData;
1448 char* uncompressedBuf = chunkData->fBufUnCompressed;
1449 int currentBlockCount = fCompressor.getBlockCount(hdr);
1450
1451 // Bug 3203, write out the compressed initial extent.
1452 if (currentBlockCount == 0)
1453 {
1454 int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK;
1455 initializeDctnryChunk(uncompressedBuf, initSize);
1456 chunkData->fWriteToFile = true;
1457
1458 if ((rc = writeChunkToFile(i->second, chunkData)) == NO_ERROR)
1459 {
1460 rc = writeHeader(i->second, __LINE__);
1461
1462 if ( rc == NO_ERROR)
1463 {
1464 //@Bug 4977 remove the log file
1465 removeBackups(fTransId);
1466 }
1467 }
1468 else
1469 {
1470 ostringstream oss;
1471 oss << "write chunk to file failed when updateDctnryExtent: " << i->second->fFileName;
1472 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1473 }
1474 }
1475 else if (currentBlockCount == NUM_BLOCKS_PER_INITIAL_EXTENT)
1476 {
1477 int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK;
1478 int incrSize = UNCOMPRESSED_CHUNK_SIZE - initSize;
1479 initializeDctnryChunk(uncompressedBuf + initSize, incrSize);
1480 uint64_t* ptrs = reinterpret_cast<uint64_t*>(i->second->fFileHeader.fPtrSection);
1481 ptrs[1] = 0; // the compressed chunk size is unknown
1482 }
1483
1484 if (rc == NO_ERROR)
1485 fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
1486
1487 return rc;
1488 }
1489
1490 //------------------------------------------------------------------------------
1491 // Close any open segment files, and free up memory.
1492 //------------------------------------------------------------------------------
cleanUp(const std::map<FID,FID> & columOids)1493 void ChunkManager::cleanUp(const std::map<FID, FID>& columOids)
1494 {
1495 WE_COMP_DBG(cout << "cleanUp with " << fActiveChunks.size() << " active chunk(s)." << endl;)
1496 std::map<FID, FID>::const_iterator it;
1497 map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
1498
1499 while ( i != fFilePtrMap.end() )
1500 {
1501 CompFileData* fileData = i->second;
1502
1503 it = columOids.find (fileData->fFid);
1504
1505 if (fIsInsert && it != columOids.end())
1506 {
1507 list<ChunkData*>& chunks = fileData->fChunkList;
1508
1509 for (list<ChunkData*>::iterator j = chunks.begin(); j != chunks.end(); ++j)
1510 delete *j;
1511
1512 delete fileData->fFilePtr;
1513 fFileMap.erase(fileData->fFileID);
1514 fFilePtrMap.erase(i++);
1515
1516 delete fileData;
1517
1518 }
1519 else if (!fIsInsert || (columOids.size() == 0))
1520 {
1521 list<ChunkData*>& chunks = fileData->fChunkList;
1522
1523 for (list<ChunkData*>::iterator j = chunks.begin(); j != chunks.end(); ++j)
1524 delete *j;
1525
1526 delete fileData->fFilePtr;
1527 fFileMap.erase(fileData->fFileID);
1528 fFilePtrMap.erase(i++);
1529
1530 delete fileData;
1531 }
1532 else
1533 {
1534 i++;
1535 }
1536 }
1537
1538 if (fDropFdCache)
1539 {
1540 cacheutils::dropPrimProcFdCache();
1541 fDropFdCache = false;
1542 }
1543 }
1544
1545 //------------------------------------------------------------------------------
1546 // Read "n" blocks from pFile starting at fbo, into readBuf.
1547 //------------------------------------------------------------------------------
readBlocks(IDBDataFile * pFile,unsigned char * readBuf,uint64_t fbo,size_t n)1548 int ChunkManager::readBlocks(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo, size_t n)
1549 {
1550 WE_COMP_DBG(cout << "backup blocks fbo:" << fbo << " num:" << n << " file:" << pFile << endl;)
1551
1552 // safety check
1553 if (pFile == NULL || n < 1)
1554 {
1555 return -1;
1556 }
1557
1558 map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
1559
1560 if (fpIt == fFilePtrMap.end())
1561 {
1562 return -1;
1563 }
1564
1565 // the n blocks may cross more than one chunk
1566 // find the chunk ID and offset of the 1st fbo
1567 lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
1568 int idx = offset.quot; // current chunk id
1569 int rem = offset.rem; // offset in current chunk
1570 int num = UNCOMPRESSED_CHUNK_SIZE - rem; // # of bytes available in current chunk
1571 int left = n * BYTE_PER_BLOCK; // # of bytest to be read
1572 // # of bytes to be read from current chunk
1573 num = (left > num) ? num : left;
1574
1575 do
1576 {
1577 ChunkData* chunkData = (fpIt->second)->findChunk(idx);
1578
1579 WE_COMP_DBG(cout << "id:" << idx << " ofst:" << rem << " num:" << num <<
1580 " left:" << left << endl;)
1581
1582 // chunk is not already uncompressed
1583 if (chunkData == NULL)
1584 {
1585 if (fetchChunkFromFile(pFile, idx, chunkData) != NO_ERROR)
1586 {
1587 return -1;
1588 }
1589 }
1590
1591 // copy the data at fbo to readBuf
1592 memcpy(readBuf, chunkData->fBufUnCompressed + rem, num);
1593
1594 // prepare for the next read
1595 readBuf += num;
1596 rem = 0;
1597 left -= num;
1598 num = (left > UNCOMPRESSED_CHUNK_SIZE) ? UNCOMPRESSED_CHUNK_SIZE : left;
1599 idx++;
1600 }
1601 while (left > 0);
1602
1603 return n;
1604 }
1605
1606 //------------------------------------------------------------------------------
1607 // Write the a block (writeBuf) into the fbo block of the specified file.
1608 // Updated chunk is not flushed to disk but left pending in the applicable
1609 // CompFileData object.
1610 //------------------------------------------------------------------------------
restoreBlock(IDBDataFile * pFile,const unsigned char * writeBuf,uint64_t fbo)1611 int ChunkManager::restoreBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo)
1612 {
1613 WE_COMP_DBG(cout << "restore blocks fbo:" << fbo << " file:" << pFile << endl;)
1614
1615 // safety check
1616 if (pFile == NULL)
1617 return -1;
1618
1619 map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
1620
1621 if (fpIt == fFilePtrMap.end())
1622 return -1;
1623
1624 // the n blocks may cross more than one chunk
1625 // find the chunk ID and offset of the 1st fbo
1626 lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
1627 ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
1628 WE_COMP_DBG(cout << "id:" << offset.quot << " ofst:" << offset.rem << endl;)
1629
1630 // chunk is not already uncompressed
1631 if (chunkData == NULL)
1632 {
1633 if (fetchChunkFromFile(pFile, offset.quot, chunkData) != NO_ERROR)
1634 return -1;
1635 }
1636
1637 // copy the data to chunk buffer
1638 memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK);
1639 chunkData->fWriteToFile = true;
1640
1641 return BYTE_PER_BLOCK;
1642 }
1643
1644 //------------------------------------------------------------------------------
1645 // Get the allocated block count from the header, for the specified file (pFile)
1646 //------------------------------------------------------------------------------
getBlockCount(IDBDataFile * pFile)1647 int ChunkManager::getBlockCount(IDBDataFile* pFile)
1648 {
1649 map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
1650 idbassert(fpIt != fFilePtrMap.end());
1651
1652 return fCompressor.getBlockCount(fpIt->second->fFileHeader.fControlData);
1653 }
1654
1655 //------------------------------------------------------------------------------
1656 // Set the FileOp pointer and dictionary flag
1657 //------------------------------------------------------------------------------
fileOp(FileOp * fileOp)1658 void ChunkManager::fileOp(FileOp* fileOp)
1659 {
1660 fFileOp = fileOp;
1661
1662 if (fileOp)
1663 {
1664 setTransId(fileOp->getTransId());
1665 }
1666 }
1667
1668 //------------------------------------------------------------------------------
1669 // Calculate and return the size of the chunk pointer header for a column of the
1670 // specified width.
1671 //------------------------------------------------------------------------------
calculateHeaderSize(int width)1672 int ChunkManager::calculateHeaderSize(int width)
1673 {
1674 int headerUnits = 1;
1675
1676 // dictionary columns may need variable length header
1677 if (width > 8)
1678 {
1679 int extentsPerFile = Config::getExtentsPerSegmentFile();
1680 int rowsPerExtent = BRMWrapper::getInstance()->getExtentRows();
1681 int rowsPerFile = rowsPerExtent * extentsPerFile;
1682 int stringsPerBlock = 8180 / (width + 2); // 8180 = 8192 - 12
1683
1684 // BLOB is 1 string per block
1685 if (stringsPerBlock == 0)
1686 stringsPerBlock = 1;
1687
1688 int blocksNeeded = rowsPerFile / stringsPerBlock;
1689 int blocksPerChunk = UNCOMPRESSED_CHUNK_SIZE / BYTE_PER_BLOCK;
1690 lldiv_t chunks = lldiv(blocksNeeded, blocksPerChunk);
1691 int chunksNeeded = chunks.quot + (chunks.rem ? 1 : 0); // round up
1692 int ptrsNeeded = chunksNeeded + 1; // 1 more ptr for 0 ptr marking end
1693 int ptrsIn4K = (4 * 1024) / sizeof(uint64_t);
1694 lldiv_t hdrs = lldiv(ptrsNeeded, ptrsIn4K);
1695 headerUnits = hdrs.quot + (hdrs.rem ? 1 : 0); // round up
1696
1697 // Always include odd number of 4K ptr headers, so that when we add the
1698 // single 4K control header, the cumulative header space will be an even
1699 // multiple of an 8K boundary.
1700 if ((headerUnits % 2) == 0)
1701 headerUnits++;
1702 }
1703
1704 headerUnits++; // add the control data block
1705 return (headerUnits * COMPRESSED_FILE_HEADER_UNIT);
1706 }
1707
1708 //------------------------------------------------------------------------------
1709 // Reallocate the chunks in a file to account for an expanding chunk that will
1710 // not fit in the available embedded free space.
1711 //------------------------------------------------------------------------------
reallocateChunks(CompFileData * fileData)1712 int ChunkManager::reallocateChunks(CompFileData* fileData)
1713 {
1714 WE_COMP_DBG(cout << "reallocate chunks in " << fileData->fFileName
1715 << " (" << fileData->fFilePtr << ")" << endl;)
1716
1717 // return value
1718 int rc = NO_ERROR;
1719
1720 // original file info
1721 string origFileName = fileData->fFileName;
1722 IDBDataFile* origFilePtr = fileData->fFilePtr;
1723 origFilePtr->flush();
1724
1725 // back out the current pointers
1726 int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
1727 int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
1728 compress::CompChunkPtrList origPtrs;
1729
1730 if (fCompressor.getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, origPtrs) != 0)
1731 {
1732 ostringstream oss;
1733 oss << "Chunk shifting failed, file:" << origFileName << " -- invalid header.";
1734 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1735 return ERR_COMP_PARSE_HDRS;
1736 }
1737
1738 // get the chunks already in memory
1739 list<ChunkData*>& chunkList = fileData->fChunkList;
1740 chunkList.sort(chunkDataPtrLessCompare);
1741 list<ChunkData*>::iterator j = chunkList.begin();
1742 int numOfChunks = origPtrs.size(); // number of chunks that contain user data
1743 vector<ChunkData*> chunksTouched; // chunk data is being modified, and in memory
1744
1745 for (int i = 0; i < numOfChunks; i++)
1746 chunksTouched.push_back(NULL);
1747
1748 // mark touched chunks
1749 while (j != chunkList.end())
1750 {
1751 chunksTouched[(*j)->fChunkId] = *j;
1752 j++;
1753 }
1754
1755 // new file name and pointer
1756 string rlcFileName(fileData->fFileName + ".rlc");
1757 IDBDataFile* rlcFilePtr = IDBDataFile::open(
1758 IDBPolicy::getType( rlcFileName.c_str(), IDBPolicy::WRITEENG ),
1759 rlcFileName.c_str(),
1760 "w+b",
1761 0,
1762 fileData->fColWidth );
1763
1764 if (!rlcFilePtr)
1765 {
1766 ostringstream oss;
1767 oss << "Chunk shifting failed, file:" << origFileName << " -- cannot open rlc file.";
1768 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1769 return ERR_FILE_OPEN;
1770 }
1771
1772 // log the recover information here
1773 string aDMLLogFileName;
1774 rc = writeLog(fTransId, "rlc", fileData->fFileName, aDMLLogFileName);
1775
1776 if (rc != NO_ERROR)
1777 {
1778 ostringstream oss;
1779 oss << "log " << fileData->fFileName << ".rlc to DML logfile failed.";
1780 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1781
1782 delete rlcFilePtr;
1783 rlcFilePtr = NULL;
1784 IDBPolicy::remove(rlcFileName.c_str());
1785 return rc;
1786 }
1787
1788 // !!! May conside to use mmap to speed up the copy !!!
1789 // !!! copy the whole file and update the shifting part !!!
1790
1791 // store updated chunk pointers
1792 uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
1793 ptrs[0] = origPtrs[0].first; // the first chunk offset has no change.
1794
1795 // bug3913, file size 0 after reallocate.
1796 // write the header, to be updated later, make sure there is someing in the file
1797 if ((rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData,
1798 COMPRESSED_FILE_HEADER_UNIT, __LINE__)) == NO_ERROR)
1799 rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection,
1800 ptrSecSize, __LINE__);
1801
1802 int k = 0;
1803
1804 for (; k < numOfChunks && rc == NO_ERROR; k++)
1805 {
1806 uint64_t chunkSize = 0; // size of current chunk
1807 unsigned char* buf = NULL; // output buffer
1808
1809 // Find the current chunk size, and allocate the data -- buf point to the data.
1810 if (chunksTouched[k] == NULL)
1811 {
1812 // Chunks not touched will be copied to new file without being uncompressed first.
1813 //cout << "reallocateChunks: chunk has not been updated" << endl;
1814 chunkSize = origPtrs[k].second;
1815
1816 // read disk data into compressed data buffer
1817 buf = (unsigned char*)fBufCompressed;
1818
1819 if ((rc = setFileOffset(origFilePtr, origFileName, origPtrs[k].first, __LINE__)) != NO_ERROR)
1820 {
1821 ostringstream oss;
1822 oss << "set file offset failed @line:" << __LINE__ << "with retCode:" << rc
1823 << " filename:" << origFileName;
1824 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1825 continue;
1826 }
1827
1828 if ((rc = readFile(origFilePtr, origFileName, buf, chunkSize, __LINE__)) != NO_ERROR)
1829 {
1830 ostringstream oss;
1831 oss << "readfile failed @line:" << __LINE__ << "with retCode:" << rc
1832 << " filename:" << origFileName;
1833 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1834 continue;
1835 }
1836 }
1837 else // chunksTouched[k] != NULL
1838 {
1839 // chunk has been updated, and in memory.
1840 //cout << "reallocateChunks: chunk has been updated" << endl;
1841 ChunkData* chunkData = chunksTouched[k];
1842 fLenCompressed = fMaxCompressedBufSize;
1843
1844 if ((rc = fCompressor.compressBlock((char*)chunkData->fBufUnCompressed,
1845 chunkData->fLenUnCompressed,
1846 (unsigned char*)fBufCompressed,
1847 fLenCompressed)) != 0)
1848 {
1849 ostringstream oss;
1850 oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc
1851 << " filename:" << rlcFileName;
1852 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1853
1854 rc = ERR_COMP_COMPRESS;
1855 continue;
1856 }
1857
1858 WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to "
1859 << fLenCompressed;)
1860
1861 // shifting chunk, add padding space
1862 if ((rc = fCompressor.padCompressedChunks(
1863 (unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize)) != 0)
1864 {
1865 WE_COMP_DBG(cout << ", but padding failed." << endl;)
1866 ostringstream oss;
1867 oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc
1868 << " filename:" << rlcFileName;
1869 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1870
1871 rc = ERR_COMP_PAD_DATA;
1872 continue;
1873 }
1874
1875 WE_COMP_DBG(cout << ", and padded to " << fLenCompressed;)
1876
1877 buf = (unsigned char*)fBufCompressed;
1878 chunkSize = fLenCompressed;
1879 }
1880
1881 // write is in sequence, no need to call setFileOffset
1882 //cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr << endl;
1883 rc = writeFile(rlcFilePtr, rlcFileName, buf, chunkSize, __LINE__);
1884
1885 if (rc != NO_ERROR)
1886 {
1887 //cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr << " failed" << endl;
1888 ostringstream oss;
1889 oss << "write file failed @line:" << __LINE__ << "with retCode:" << rc
1890 << " filename:" << rlcFileName;
1891 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1892 continue;
1893 }
1894
1895 // Update the current chunk size.
1896 ptrs[k + 1] = ptrs[k] + chunkSize;
1897 }
1898
1899 // up to now, everything OK: rc == NO_ERROR
1900 // remove all written chunks from active chunk list.
1901 j = chunkList.begin();
1902
1903 while (j != chunkList.end())
1904 {
1905 ChunkData* chunkData = *j;
1906 fActiveChunks.remove(make_pair(fileData->fFileID, chunkData));
1907 fileData->fChunkList.remove(chunkData);
1908 delete chunkData;
1909
1910 j = chunkList.begin();
1911 }
1912
1913 // finally update the header
1914 if (rc == NO_ERROR)
1915 rc = setFileOffset(rlcFilePtr, rlcFileName, 0, __LINE__);
1916
1917 if (rc == NO_ERROR)
1918 rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData,
1919 COMPRESSED_FILE_HEADER_UNIT, __LINE__);
1920
1921 if (rc == NO_ERROR)
1922 rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection,
1923 ptrSecSize, __LINE__);
1924
1925 if (rc != NO_ERROR)
1926 {
1927 struct timeval tv;
1928 gettimeofday(&tv, 0);
1929 struct tm ltm;
1930 localtime_r(reinterpret_cast<time_t*>(&tv.tv_sec), <m);
1931 char tmText[24];
1932 // this snprintf call causes a compiler warning b/c buffer size is less
1933 // then maximum string size.
1934 #if defined(__GNUC__) && __GNUC__ >= 7
1935 #pragma GCC diagnostic push
1936 #pragma GCC diagnostic ignored "-Wformat-truncation="
1937 snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
1938 ltm.tm_year + 1900, ltm.tm_mon + 1,
1939 ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
1940 ltm.tm_sec, tv.tv_usec);
1941 #pragma GCC diagnostic pop
1942 #else
1943 snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
1944 ltm.tm_year + 1900, ltm.tm_mon + 1,
1945 ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
1946 ltm.tm_sec, tv.tv_usec);
1947 #endif
1948 string dbgFileName(rlcFileName + tmText);
1949
1950 ostringstream oss;
1951 oss << "Chunk shifting failed, file:" << origFileName;
1952
1953 if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0)
1954 oss << ", rlc file is:" << dbgFileName;
1955
1956 // write out the header for debugging in case the header in rlc file is bad or not updated.
1957 string rlcPtrFileName(dbgFileName + ".ptr");
1958
1959 IDBDataFile* rlcPtrFilePtr = IDBDataFile::open(
1960 IDBPolicy::getType(rlcPtrFileName.c_str(),
1961 IDBPolicy::WRITEENG),
1962 rlcPtrFileName.c_str(),
1963 "w+b",
1964 0,
1965 fileData->fColWidth);
1966
1967 if (rlcPtrFilePtr &&
1968 (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData,
1969 COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) &&
1970 (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection,
1971 ptrSecSize, __LINE__) == NO_ERROR))
1972 {
1973 oss << ", rlc file header in memory: " << rlcPtrFileName;
1974 }
1975 else
1976 {
1977 oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName;
1978 }
1979
1980 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
1981
1982 closeFile(fileData);
1983 delete rlcFilePtr;
1984 rlcFilePtr = NULL;
1985
1986 if (rlcPtrFilePtr != NULL)
1987 {
1988 delete rlcPtrFilePtr;
1989 rlcPtrFilePtr = NULL;
1990 }
1991
1992 return rc;
1993 }
1994
1995 // update the file pointer map w/ new file pointer
1996 //cout << "realloc1: remove ptr = " << fileData->fFilePtr << endl;
1997 fFilePtrMap.erase(fileData->fFilePtr);
1998 delete fileData->fFilePtr;
1999 fileData->fFilePtr = NULL;
2000 delete rlcFilePtr;
2001 rlcFilePtr = NULL;
2002
2003 // put reallocated file size here for logging purpose.
2004 uint64_t fileSize = 0;
2005
2006 if (rc == NO_ERROR)
2007 {
2008 #ifdef _MSC_VER
2009 //We need to do this early on so the ::rename() call below will work on Windows
2010 // we'll do it again later on, but that's life...
2011 //FIXME: there's a race here that a query will re-open the files before we can jigger
2012 // them around. We need to make sure PP is opening these files with the right perms
2013 // to allow another process to delete them.
2014 cacheutils::dropPrimProcFdCache();
2015 #endif
2016
2017 // @bug3913, keep the original file until the new file is properly renamed.
2018 // 1. check the new file size is NOT 0, matching ptr[k].
2019 // 2. mv the current to be backup.
2020 // 3. rename the rlc file.
2021 // 4. check the file size again.
2022 // 5. verify each chunk.
2023 // 5. rm the bak file or mv bak file back.
2024
2025 // check the new file size using two methods mostly for curiosity on 0 size file.
2026 // They can be removed because all chunks are to be verified after rename.
2027 if ( IDBPolicy::size( rlcFileName.c_str() ) != (int64_t) ptrs[k] )
2028 {
2029 ostringstream oss;
2030 oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize
2031 << ", filename:" << rlcFileName;
2032 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2033 rc = ERR_COMP_RENAME_FILE;
2034 }
2035
2036 if (rc == NO_ERROR)
2037 {
2038 if (fIsHdfs)
2039 rc = swapTmpFile(rlcFileName, fileData->fFileName + ".tmp");
2040 else
2041 rc = swapTmpFile(rlcFileName, fileData->fFileName);
2042 }
2043
2044 if ((rc == NO_ERROR) &&
2045 (rc = openFile(fileData, "r+b", fileData->fColWidth, true, __LINE__)) == NO_ERROR) // @bug 5572 HDFS tmp file
2046 {
2047 // see TODO- above regarding setvbuf
2048 // setvbuf(fileData->fFilePtr, fileData->fIoBuffer.get(), _IOFBF, fileData->fIoBSize);
2049 fileSize = fileData->fFilePtr->size();
2050
2051 if (fileSize == ptrs[k])
2052 {
2053 rc = verifyChunksAfterRealloc(fileData);
2054 }
2055 else
2056 {
2057 ostringstream oss;
2058 oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize
2059 << ", filename:" << fileData->fFileName;
2060 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2061 rc = ERR_COMP_RENAME_FILE;
2062 }
2063
2064 if (rc == NO_ERROR)
2065 {
2066 fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
2067 //cout << "realloc2: insert ptr = " << fileData->fFilePtr << endl;
2068 // notify the PrimProc of unlinking original data file
2069 fDropFdCache = true;
2070 }
2071 }
2072
2073 if (!fIsHdfs)
2074 {
2075 string bakFileName(fileData->fFileName + ".orig");
2076
2077 if (rc == NO_ERROR)
2078 {
2079 // unlink the original file (remove is portable)
2080 if (fFs.remove(bakFileName.c_str()) != 0)
2081 {
2082 ostringstream oss;
2083 oss << "remove backup file " << bakFileName << " failed: " << strerror(errno);
2084
2085 // not much we can do, log an info message for manual cleanup
2086 logMessage(oss.str(), logging::LOG_TYPE_INFO);
2087 }
2088 }
2089 else
2090 {
2091 // keep the bad file for debugging purpose
2092 if (fFs.rename(fileData->fFileName.c_str(), rlcFileName.c_str()) == 0)
2093 {
2094 ostringstream oss;
2095 oss << "data file after chunk shifting failed verification.";
2096 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2097 }
2098
2099 // roll back the bak file
2100 if (fFs.rename(bakFileName.c_str(), fileData->fFileName.c_str()) != 0)
2101 {
2102 ostringstream oss;
2103 oss << "rename " << bakFileName << " to " << fileData->fFileName << " failed: "
2104 << strerror(errno);
2105
2106 // must manually move it back
2107 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2108 }
2109 }
2110 }
2111 }
2112
2113 if (!fIsHdfs)
2114 {
2115 if (rc == NO_ERROR)
2116 {
2117 // remove the log file
2118 fFs.remove(aDMLLogFileName.c_str());
2119 }
2120 else
2121 {
2122 struct timeval tv;
2123 gettimeofday(&tv, 0);
2124 struct tm ltm;
2125 localtime_r(reinterpret_cast<time_t*>(&tv.tv_sec), <m);
2126 char tmText[24];
2127 // this snprintf call causes a compiler warning b/c buffer size is less
2128 // then maximum string size.
2129 #if defined(__GNUC__) && __GNUC__ >= 7
2130 #pragma GCC diagnostic push
2131 #pragma GCC diagnostic ignored "-Wformat-truncation="
2132 snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
2133 ltm.tm_year + 1900, ltm.tm_mon + 1,
2134 ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
2135 ltm.tm_sec, tv.tv_usec);
2136 #pragma GCC diagnostic pop
2137 #else
2138 snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld",
2139 ltm.tm_year + 1900, ltm.tm_mon + 1,
2140 ltm.tm_mday, ltm.tm_hour, ltm.tm_min,
2141 ltm.tm_sec, tv.tv_usec);
2142 #endif
2143 string dbgFileName(rlcFileName + tmText);
2144
2145 ostringstream oss;
2146 oss << "Chunk shifting failed, file:" << origFileName;
2147
2148 if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0)
2149 oss << ", rlc file is:" << dbgFileName;
2150
2151 // write out the header for debugging in case the header in rlc file is bad.
2152 string rlcPtrFileName(dbgFileName + ".hdr");
2153 IDBDataFile* rlcPtrFilePtr = IDBDataFile::open(
2154 IDBPolicy::getType(rlcPtrFileName.c_str(),
2155 IDBPolicy::WRITEENG),
2156 rlcPtrFileName.c_str(),
2157 "w+b",
2158 0);
2159
2160 if (rlcPtrFilePtr &&
2161 (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData,
2162 COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) &&
2163 (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection,
2164 ptrSecSize, __LINE__) == NO_ERROR))
2165 {
2166 oss << ", rlc file header in memory: " << rlcPtrFileName;
2167 }
2168 else
2169 {
2170 oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName;
2171 }
2172
2173 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2174
2175 closeFile(fileData);
2176
2177 if (rlcFilePtr != NULL)
2178 {
2179 delete rlcFilePtr;
2180 rlcFilePtr = NULL;
2181 }
2182
2183 if (rlcPtrFilePtr != NULL)
2184 {
2185 delete rlcPtrFilePtr;
2186 rlcPtrFilePtr = NULL;
2187 }
2188 }
2189 }
2190
2191 return rc;
2192 }
2193
2194 //------------------------------------------------------------------------------
2195 // Verify chunks can be uncompressed after a chunk shift.
2196 //------------------------------------------------------------------------------
verifyChunksAfterRealloc(CompFileData * fileData)2197 int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
2198 {
2199 int rc = NO_ERROR;
2200
2201 // read in the header
2202 if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData,
2203 COMPRESSED_FILE_HEADER_UNIT, __LINE__)) != NO_ERROR)
2204 {
2205 ostringstream oss;
2206 oss << "Failed to read control header from new " << fileData->fFileName << ", roll back";
2207 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2208
2209 return rc;
2210 }
2211
2212 // make sure the header is valid
2213 if ((rc = fCompressor.verifyHdr(fileData->fFileHeader.fControlData)) != 0)
2214 {
2215 ostringstream oss;
2216 oss << "Invalid header in new " << fileData->fFileName << ", roll back";
2217 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2218
2219 return rc;
2220 }
2221
2222 int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
2223 int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
2224
2225 // read in the pointer section in header
2226 if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection,
2227 ptrSecSize, __LINE__)) != NO_ERROR)
2228 {
2229 ostringstream oss;
2230 oss << "Failed to read pointer header from new " << fileData->fFileName << "@" << __LINE__;
2231 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2232 return rc;
2233 }
2234
2235 // get pointer list
2236 compress::CompChunkPtrList ptrs;
2237
2238 if (fCompressor.getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
2239 {
2240 ostringstream oss;
2241 oss << "Failed to parse pointer list from new " << fileData->fFileName << "@" << __LINE__;
2242 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2243 return ERR_COMP_PARSE_HDRS;
2244 }
2245
2246 // now verify each chunk
2247 ChunkData chunkData;
2248 int numOfChunks = ptrs.size(); // number of chunks in the file
2249
2250 for (int i = 0; i < numOfChunks && rc == NO_ERROR; i++)
2251 {
2252 unsigned int chunkSize = ptrs[i].second;
2253
2254 if ((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, ptrs[i].first, __LINE__)))
2255 {
2256 ostringstream oss;
2257 oss << "Failed to setFileOffset new " << fileData->fFileName << "@" << __LINE__;
2258 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2259 continue;
2260 }
2261
2262 if ((rc = readFile(fileData->fFilePtr, fileData->fFileName,
2263 fBufCompressed, chunkSize, __LINE__)))
2264 {
2265 ostringstream oss;
2266 oss << "Failed to read chunk from new " << fileData->fFileName << "@" << __LINE__;
2267 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2268 continue;
2269 }
2270
2271 // uncompress the read in buffer
2272 unsigned int dataLen = sizeof(chunkData.fBufUnCompressed);
2273
2274 if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
2275 (unsigned char*)chunkData.fBufUnCompressed, dataLen) != 0)
2276 {
2277 ostringstream oss;
2278 oss << "Failed to uncompress chunk new " << fileData->fFileName << "@" << __LINE__;
2279 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2280 rc = ERR_COMP_UNCOMPRESS;
2281 continue;
2282 }
2283 }
2284
2285 return rc;
2286 }
2287
2288 //------------------------------------------------------------------------------
2289 // Log an error message for the specified error code, and msg level.
2290 //------------------------------------------------------------------------------
logMessage(int code,int level,int lineNum,int fromLine) const2291 void ChunkManager::logMessage(int code, int level, int lineNum, int fromLine) const
2292 {
2293 ostringstream oss;
2294 oss << ec.errorString(code) << " @line:" << lineNum;
2295
2296 if (fromLine != -1)
2297 oss << " called from line:" << fromLine;
2298
2299 logMessage(oss.str(), level);
2300 }
2301
2302 //------------------------------------------------------------------------------
2303 // Log the requested error message using the specified msg level.
2304 //------------------------------------------------------------------------------
logMessage(const string & msg,int level) const2305 void ChunkManager::logMessage(const string& msg, int level) const
2306 {
2307 logging::Message::Args args;
2308 args.add(msg);
2309
2310 fSysLogger->logMessage((logging::LOG_TYPE) level, logging::M0080, args,
2311 //FIXME: store session id in class to pass on to LogginID...
2312 logging::LoggingID(SUBSYSTEM_ID_WE, 0, fTransId));
2313 }
2314
2315 //------------------------------------------------------------------------------
2316 // Replace the cdf file with the updated tmp file.
2317 //------------------------------------------------------------------------------
swapTmpFile(const string & src,const string & dest)2318 int ChunkManager::swapTmpFile(const string& src, const string& dest)
2319 {
2320 // return value
2321 int rc = NO_ERROR;
2322
2323 // if no change to the cdf, the tmp may not exist, no need to swap.
2324 if (!fFs.exists(src.c_str()))
2325 return rc;
2326
2327 ssize_t srcFileSize = IDBPolicy::size(src.c_str());
2328
2329 if (srcFileSize <= 0)
2330 {
2331 ostringstream oss;
2332 oss << "swapTmpFile aborted. Source file size = " << srcFileSize;
2333 logMessage(oss.str(), logging::LOG_TYPE_CRITICAL);
2334 rc = ERR_COMP_RENAME_FILE;
2335
2336 return rc;
2337 }
2338
2339 errno = 0;
2340 // save the original file
2341 string orig(dest + ".orig");
2342 fFs.remove(orig.c_str()); // remove left overs
2343
2344 if (fFs.rename(dest.c_str(), orig.c_str()) != 0)
2345 {
2346 ostringstream oss;
2347 oss << "rename " << dest << " to " << orig << " failed: " << strerror(errno);
2348 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2349 rc = ERR_COMP_RENAME_FILE;
2350 }
2351
2352 // rename the new file
2353 if (rc == NO_ERROR && fFs.rename(src.c_str(), dest.c_str()) != 0)
2354 {
2355 ostringstream oss;
2356 oss << "rename " << src << " to " << dest << " failed: " << strerror(errno);
2357 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2358 rc = ERR_COMP_RENAME_FILE;
2359 }
2360
2361 if (rc == NO_ERROR && fFs.remove(orig.c_str()) != 0)
2362 rc = ERR_COMP_REMOVE_FILE;
2363
2364 return rc;
2365 }
2366
2367
2368 //------------------------------------------------------------------------------
2369 // Construct a DML log file name based on transaction ID, etc.
2370 //------------------------------------------------------------------------------
getDMLLogFileName(string & aDMLLogFileName,const TxnID & txnId) const2371 int ChunkManager::getDMLLogFileName(string& aDMLLogFileName, const TxnID& txnId) const
2372 {
2373 config::Config* config = config::Config::makeConfig();
2374 string prefix = config->getConfig("SystemConfig", "DBRMRoot");
2375
2376 if (prefix.length() == 0)
2377 {
2378 ostringstream oss;
2379 oss << "trans " << txnId << ":Need a valid DBRMRoot entry in Calpont configuation file";
2380 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2381 return ERR_DML_LOG_NAME;
2382 }
2383
2384 uint64_t pos = prefix.find_last_of ("/") ;
2385
2386 if (pos != string::npos)
2387 {
2388 aDMLLogFileName = prefix.substr(0, pos + 1); //Get the file path
2389 }
2390 else
2391 {
2392 ostringstream oss;
2393 oss << "trans " << txnId << ":Cannot find the dbrm directory ("
2394 << prefix.c_str() << ") for the DML log file";
2395 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2396 return ERR_DML_LOG_NAME;
2397 }
2398
2399 ostringstream oss;
2400 oss << txnId << "_" << fLocalModuleId;
2401 aDMLLogFileName += "DMLLog_" + oss.str();
2402
2403 return NO_ERROR;
2404 }
2405
2406
2407 //------------------------------------------------------------------------------
2408 // clear the DML log file
2409 //------------------------------------------------------------------------------
startTransaction(const TxnID & txnId) const2410 int ChunkManager::startTransaction(const TxnID& txnId) const
2411 {
2412 // this step is for HDFS update/delete only.
2413 if (!fIsHdfs || fIsBulkLoad)
2414 return NO_ERROR;
2415
2416 // Construct the DML log file name
2417 string aDMLLogFileName;
2418
2419 if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
2420 return ERR_DML_LOG_NAME;
2421
2422 // truncate the existing file
2423 boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
2424 IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
2425 aDMLLogFileName.c_str(), "wb", 0));
2426
2427 if (!aDMLLogFile)
2428 {
2429 ostringstream oss;
2430 oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened.";
2431 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2432 return ERR_OPEN_DML_LOG;
2433 }
2434
2435 return NO_ERROR;
2436 }
2437
2438
2439 //------------------------------------------------------------------------------
2440 // Backup cdf file and replace the with the updated tmp file.
2441 //------------------------------------------------------------------------------
confirmTransaction(const TxnID & txnId) const2442 int ChunkManager::confirmTransaction(const TxnID& txnId) const
2443 {
2444 // return value
2445 int rc = NO_ERROR;
2446
2447 // this step is for HDFS update/delete only.
2448 if (!fIsHdfs || fIsBulkLoad)
2449 return rc;
2450
2451 string aDMLLogFileName;
2452
2453 if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
2454 return ERR_DML_LOG_NAME;
2455
2456 //Open log file
2457 boost::scoped_ptr<IDBDataFile> aDMLLogFile;
2458 aDMLLogFile.reset(IDBDataFile::open(
2459 IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
2460 aDMLLogFileName.c_str(), "r", 0));
2461
2462 if (!aDMLLogFile)
2463 {
2464 ostringstream oss;
2465 oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened";
2466 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2467 return ERR_OPEN_DML_LOG;
2468 }
2469
2470 ssize_t logSize = fFs.size(aDMLLogFileName.c_str());
2471 boost::scoped_array<char> buf(new char[logSize]);
2472
2473 if (aDMLLogFile->read(buf.get(), logSize) != logSize)
2474 return ERR_FILE_READ;
2475
2476 std::istringstream strstream(string(buf.get(), logSize));
2477 std::string backUpFileType;
2478 std::string filename;
2479 int64_t size;
2480 int64_t offset;
2481 ConfirmHdfsDbFile confirmHdfs;
2482
2483 while (strstream >> backUpFileType >> filename >> size >> offset)
2484 {
2485 std::string confirmErrMsg;
2486 rc = confirmHdfs.confirmDbFileChange( backUpFileType,
2487 filename, confirmErrMsg );
2488
2489 if (rc != NO_ERROR)
2490 {
2491 logMessage(confirmErrMsg, logging::LOG_TYPE_ERROR);
2492 break;
2493 }
2494 }
2495
2496 return rc;
2497 }
2498
2499
2500 //------------------------------------------------------------------------------
2501 // Finalize the chages
2502 // if success, remove the orig
2503 // otherwise, move the orig back to cdf
2504 //------------------------------------------------------------------------------
endTransaction(const TxnID & txnId,bool success) const2505 int ChunkManager::endTransaction(const TxnID& txnId, bool success) const
2506 {
2507 // return value
2508 int rc = NO_ERROR;
2509
2510 // this step is for HDFS update/delete only.
2511 if (!fIsHdfs || fIsBulkLoad)
2512 return rc;
2513
2514 string aDMLLogFileName;
2515
2516 if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
2517 return ERR_DML_LOG_NAME;
2518
2519 //Open log file
2520 boost::scoped_ptr<IDBDataFile> aDMLLogFile;
2521 aDMLLogFile.reset(IDBDataFile::open(
2522 IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
2523 aDMLLogFileName.c_str(), "r", 0));
2524
2525 if (!aDMLLogFile)
2526 {
2527 ostringstream oss;
2528 oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened";
2529 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2530 return ERR_OPEN_DML_LOG;
2531 }
2532
2533 ssize_t logSize = fFs.size(aDMLLogFileName.c_str());
2534 ssize_t logRead = 0;
2535 boost::scoped_array<char> buf(new char[logSize]);
2536
2537 if ((logRead = aDMLLogFile->read(buf.get(), logSize)) != logSize)
2538 {
2539 ostringstream oss;
2540 oss << "trans " << txnId << ":File " << aDMLLogFileName << " filed to read: "
2541 << logRead << "/" << logSize;
2542 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2543 return ERR_FILE_READ;
2544 }
2545
2546 std::istringstream strstream(string(buf.get(), logSize));
2547 std::string backUpFileType;
2548 std::string filename;
2549 int64_t size;
2550 int64_t offset;
2551 ConfirmHdfsDbFile confirmHdfs;
2552
2553 while (strstream >> backUpFileType >> filename >> size >> offset)
2554 {
2555 std::string finalizeErrMsg;
2556 rc = confirmHdfs.endDbFileChange( backUpFileType,
2557 filename, success, finalizeErrMsg );
2558
2559 if (rc != NO_ERROR)
2560 {
2561 logMessage(finalizeErrMsg, logging::LOG_TYPE_ERROR);
2562 break;
2563 }
2564 }
2565
2566 // final clean up or recover
2567 if (rc == NO_ERROR)
2568 rc = fFs.remove(aDMLLogFileName.c_str());
2569
2570 return rc;
2571 }
2572
checkFixLastDictChunk(const FID & fid,uint16_t root,uint32_t partition,uint16_t segment)2573 int ChunkManager::checkFixLastDictChunk(const FID& fid,
2574 uint16_t root,
2575 uint32_t partition,
2576 uint16_t segment)
2577 {
2578
2579 int rc = 0;
2580 //Find the file info
2581 FileID fileID(fid, root, partition, segment);
2582 map<FileID, CompFileData*>::const_iterator mit = fFileMap.find(fileID);
2583
2584 WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition
2585 << " seg:" << segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ")
2586 << "found." << endl;)
2587
2588 // Get CompFileData pointer for existing Dictionary store file mit->second is CompFileData
2589 if (mit != fFileMap.end())
2590 {
2591
2592 int headerSize = fCompressor.getHdrSize(mit->second->fFileHeader.fControlData);
2593 int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
2594
2595 // get pointer list
2596 compress::CompChunkPtrList ptrs;
2597
2598 if (fCompressor.getPtrList(mit->second->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
2599 {
2600 ostringstream oss;
2601 oss << "Failed to parse pointer list from new " << mit->second->fFileName << "@" << __LINE__;
2602 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2603 return ERR_COMP_PARSE_HDRS;
2604 }
2605
2606 // now verify last chunk
2607 ChunkData* chunkData;
2608 int numOfChunks = ptrs.size(); // number of chunks in the file
2609 unsigned int chunkSize = ptrs[numOfChunks - 1].second;
2610
2611 if ((rc = setFileOffset(mit->second->fFilePtr, mit->second->fFileName, ptrs[numOfChunks - 1].first, __LINE__)))
2612 {
2613 ostringstream oss;
2614 oss << "Failed to setFileOffset new " << mit->second->fFileName << "@" << __LINE__;
2615 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2616 return rc;
2617 }
2618
2619 if ((rc = readFile(mit->second->fFilePtr, mit->second->fFileName,
2620 fBufCompressed, chunkSize, __LINE__)))
2621 {
2622 ostringstream oss;
2623 oss << "Failed to read chunk from new " << mit->second->fFileName << "@" << __LINE__;
2624 logMessage(oss.str(), logging::LOG_TYPE_ERROR);
2625 return rc;
2626 }
2627
2628 // uncompress the read in buffer
2629 chunkData = new ChunkData(numOfChunks - 1);
2630 unsigned int dataLen = sizeof(chunkData->fBufUnCompressed);
2631
2632 if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
2633 (unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
2634 {
2635 mit->second->fChunkList.push_back(chunkData);
2636 fActiveChunks.push_back(make_pair(mit->second->fFileID, chunkData));
2637 //replace this chunk with empty chunk
2638 uint64_t blocks = 512;
2639
2640 if ((numOfChunks - 1) == 0)
2641 {
2642 char* hdr = mit->second->fFileHeader.fControlData;
2643
2644 if (fCompressor.getBlockCount(hdr) < 512)
2645 blocks = 256;
2646 }
2647
2648 dataLen = 8192 * blocks;
2649
2650 // load the uncompressed buffer with empty values.
2651 char* buf = chunkData->fBufUnCompressed;
2652 chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
2653 initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
2654 chunkData->fLenUnCompressed = dataLen;
2655 chunkData->fWriteToFile = true;
2656 }
2657 }
2658
2659 return rc;
2660 }
2661
2662 }
2663
2664 // vim:ts=4 sw=4:
2665