1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*****************************************************************************
19  * $Id: we_colbufcompressed.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
20  *
21  ****************************************************************************/
22 
23 /** @file
24  * Implementation of the ColumnBufferCompressed class
25  *
26  */
27 
28 #include "we_colbufcompressed.h"
29 
30 #include <cassert>
31 #include <cstdio>
32 #include <cstring>
33 #include <iostream>
34 #include <sstream>
35 
36 #include <boost/scoped_array.hpp>
37 
38 #include "we_define.h"
39 #include "we_config.h"
40 #include "we_convertor.h"
41 #include "we_columninfo.h"
42 #include "we_fileop.h"
43 #include "we_log.h"
44 #include "we_stats.h"
45 #include "IDBDataFile.h"
46 using namespace idbdatafile;
47 
48 #include "idbcompress.h"
49 using namespace compress;
50 
51 namespace WriteEngine
52 {
53 
54 //------------------------------------------------------------------------------
55 // Constructor
56 //------------------------------------------------------------------------------
ColumnBufferCompressed(ColumnInfo * pColInfo,Log * logger)57 ColumnBufferCompressed::ColumnBufferCompressed( ColumnInfo* pColInfo,
58         Log* logger) :
59     ColumnBuffer(pColInfo, logger),
60     fToBeCompressedBuffer(0),
61     fToBeCompressedCapacity(0),
62     fNumBytes(0),
63     fCompressor(0),
64     fPreLoadHWMChunk(true),
65     fFlushedStartHwmChunk(false)
66 {
67     fUserPaddingBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
68     fCompressor = new compress::IDBCompressInterface( fUserPaddingBytes );
69 }
70 
71 //------------------------------------------------------------------------------
72 // Destructor
73 //------------------------------------------------------------------------------
~ColumnBufferCompressed()74 ColumnBufferCompressed::~ColumnBufferCompressed()
75 {
76     if (fToBeCompressedBuffer)
77         delete []fToBeCompressedBuffer;
78 
79     fToBeCompressedBuffer   = 0;
80     fToBeCompressedCapacity = 0;
81     fNumBytes               = 0;
82     delete fCompressor;
83 }
84 
85 //------------------------------------------------------------------------------
86 // Reset "this" ColumnBufferCompressed object to read a different file, by
87 // resetting the FILE*, starting HWM, and the chunk pointers.
88 //------------------------------------------------------------------------------
setDbFile(IDBDataFile * f,HWM startHwm,const char * hdrs)89 int ColumnBufferCompressed::setDbFile(IDBDataFile* f, HWM startHwm, const char* hdrs)
90 {
91     fFile        = f;
92     fStartingHwm = startHwm;
93 
94     IDBCompressInterface compressor;
95 
96     if (compressor.getPtrList(hdrs, fChunkPtrs) != 0)
97     {
98         return ERR_COMP_PARSE_HDRS;
99     }
100 
101     // If we have any orphaned chunk pointers (ex: left over after a DML
102     // rollback), that fall after the HWM, then drop those trailing ptrs.
103     unsigned int chunkIndex             = 0;
104     unsigned int blockOffsetWithinChunk = 0;
105     fCompressor->locateBlock(fStartingHwm, chunkIndex, blockOffsetWithinChunk);
106 
107     if ((chunkIndex + 1) < fChunkPtrs.size())
108     {
109         fChunkPtrs.resize(chunkIndex + 1);
110     }
111 
112     return NO_ERROR;
113 }
114 
115 //------------------------------------------------------------------------------
116 // Reinitialize to-be-compressed column buffer (to empty chunk) prior to
117 // importing the first chunk of the next extent.  Returns startFileOffset
118 // which indicates file offset (in bytes) where next extent will be starting.
119 //------------------------------------------------------------------------------
resetToBeCompressedColBuf(long long & startFileOffset)120 int ColumnBufferCompressed::resetToBeCompressedColBuf(
121     long long& startFileOffset )
122 {
123     // Don't load chunk, once we go to next extent
124     fPreLoadHWMChunk = false;
125 
126     // Lazy creation of to-be-compressed buffer
127     if (!fToBeCompressedBuffer)
128     {
129         fToBeCompressedBuffer =
130             new unsigned char[IDBCompressInterface::UNCOMPRESSED_INBUF_LEN];
131     }
132 
133     BlockOp::setEmptyBuf( fToBeCompressedBuffer,
134                           IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
135                           fColInfo->column.emptyVal,
136                           fColInfo->column.width );
137 
138     if (fLog->isDebug( DEBUG_2 ))
139     {
140         std::ostringstream oss;
141         oss << "Initializing empty chunk for next extent: OID-" <<
142             fColInfo->curCol.dataFile.fid <<
143             "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
144             "; part-"   << fColInfo->curCol.dataFile.fPartition <<
145             "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
146             "; hwm-"    << fStartingHwm;
147         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
148     }
149 
150     fToBeCompressedCapacity = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
151 
152     // Set file offset past end of last chunk
153     startFileOffset = IDBCompressInterface::HDR_BUF_LEN * 2;
154 
155     if (fChunkPtrs.size() > 0)
156         startFileOffset = fChunkPtrs[ fChunkPtrs.size() - 1 ].first +
157                           fChunkPtrs[ fChunkPtrs.size() - 1 ].second;
158 
159     // Positition ourselves to start of empty to-be-compressed buffer
160     fNumBytes       = 0;
161 
162     return NO_ERROR;
163 }
164 
165 //------------------------------------------------------------------------------
166 // Intercept data being copied from the raw-data output buffer to the output
167 // file, and instead buffer up the data to be compressed in 4M chunks before
168 // writing it out.
169 //------------------------------------------------------------------------------
writeToFile(int startOffset,int writeSize,bool fillUpWEmpties)170 int ColumnBufferCompressed::writeToFile(int startOffset, int writeSize,
171                                         bool fillUpWEmpties)
172 {
173     if (writeSize == 0) // skip unnecessary write, if 0 bytes given
174         return NO_ERROR;
175 
176     int fillUpWEmptiesWriteSize = 0;
177     if (fillUpWEmpties)
178         fillUpWEmptiesWriteSize = BYTE_PER_BLOCK - writeSize % BYTE_PER_BLOCK;
179 
180     // If we are starting a new file, we need to reinit the buffer and
181     // find out what our file offset should be set to.
182     if (!fToBeCompressedCapacity)
183     {
184 #ifdef PROFILE
185         Stats::startParseEvent(WE_STATS_COMPRESS_COL_INIT_BUF);
186 #endif
187         long long startFileOffset;
188         int rc = initToBeCompressedBuffer( startFileOffset );
189 
190         if (rc != NO_ERROR)
191         {
192             WErrorCodes ec;
193             std::ostringstream oss;
194             oss << "writeToFile: error initializing to-be-compressed buffer "
195                 "for OID " << fColInfo->curCol.dataFile.fid <<
196                 "; " << ec.errorString(rc);
197             fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
198 
199             return rc;
200         }
201 
202         rc = fColInfo->colOp->setFileOffset(fFile, startFileOffset, SEEK_SET);
203 
204         if (rc != NO_ERROR)
205         {
206             WErrorCodes ec;
207             std::ostringstream oss;
208             oss << "writeToFile: error init compressed file offset for " <<
209                 "OID " << fColInfo->curCol.dataFile.fid <<
210                 "; " << startFileOffset <<
211                 "; " << ec.errorString(rc);
212             fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
213 
214             return rc;
215         }
216 
217 #ifdef PROFILE
218         Stats::stopParseEvent(WE_STATS_COMPRESS_COL_INIT_BUF);
219 #endif
220     }
221 
222     unsigned char* bufOffset = fToBeCompressedBuffer + fNumBytes;
223 
224     // Expand the compression buffer size if working with an abbrev extent, and
225     // the bytes we are about to add will overflow the abbreviated extent.
226     if ((fToBeCompressedCapacity < IDBCompressInterface::UNCOMPRESSED_INBUF_LEN) &&
227             ((fNumBytes + writeSize + fillUpWEmptiesWriteSize) > fToBeCompressedCapacity) )
228     {
229         std::ostringstream oss;
230         oss << "Expanding abbrev to-be-compressed buffer for: OID-" <<
231             fColInfo->curCol.dataFile.fid <<
232             "; DBRoot-"   << fColInfo->curCol.dataFile.fDbRoot    <<
233             "; part-"     << fColInfo->curCol.dataFile.fPartition <<
234             "; seg-"      << fColInfo->curCol.dataFile.fSegment;
235         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
236         fToBeCompressedCapacity = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
237     }
238 
239     if ((fNumBytes + writeSize + fillUpWEmptiesWriteSize) <= fToBeCompressedCapacity)
240     {
241         if (fLog->isDebug( DEBUG_2 ))
242         {
243             std::ostringstream oss;
244             oss << "Buffering data to-be-compressed for: OID-" <<
245                 fColInfo->curCol.dataFile.fid <<
246                 "; DBRoot-"   << fColInfo->curCol.dataFile.fDbRoot    <<
247                 "; part-"     << fColInfo->curCol.dataFile.fPartition <<
248                 "; seg-"      << fColInfo->curCol.dataFile.fSegment   <<
249                 "; addBytes-" << writeSize <<
250                 "; extraBytes-" << fillUpWEmptiesWriteSize <<
251                 "; totBytes-" << (fNumBytes + writeSize);
252             fLog->logMsg( oss.str(), MSGLVL_INFO2 );
253         }
254 
255         memcpy(bufOffset, (fBuffer + startOffset), writeSize);
256         fNumBytes += writeSize;
257         fNumBytes += fillUpWEmptiesWriteSize;
258     }
259     else // Not enough room to add all the data to the to-be-compressed buffer
260     {
261         int startOffsetX = startOffset;
262         int writeSizeX   = writeSize;
263 
264         // The number of bytes (in fBuffer) to be written, could be larger than
265         // our to-be-compressed buffer, so we require a loop to potentially
266         // iterate thru all the bytes to be compresssed and written from fBuffer
267         while (writeSizeX > 0)
268         {
269             idbassert( (fNumBytes <= fToBeCompressedCapacity) ); // DMC-temp debug
270 
271             size_t writeSizeOut = 0;
272 
273             if ((fNumBytes + writeSizeX) > fToBeCompressedCapacity)
274             {
275                 writeSizeOut = fToBeCompressedCapacity - fNumBytes;
276 
277                 if (fLog->isDebug( DEBUG_2 ))
278                 {
279                     std::ostringstream oss;
280                     oss << "Buffering data (full) to-be-compressed for: OID-" <<
281                         fColInfo->curCol.dataFile.fid <<
282                         "; DBRoot-"   << fColInfo->curCol.dataFile.fDbRoot    <<
283                         "; part-"     << fColInfo->curCol.dataFile.fPartition <<
284                         "; seg-"      << fColInfo->curCol.dataFile.fSegment   <<
285                         "; addBytes-" << writeSizeOut                         <<
286                         "; totBytes-" << (fNumBytes + writeSizeOut);
287                     fLog->logMsg( oss.str(), MSGLVL_INFO2 );
288                 }
289 
290                 if (writeSizeOut > 0)
291                 {
292                     memcpy(bufOffset, (fBuffer + startOffsetX), writeSizeOut);
293                     fNumBytes += writeSizeOut;
294                 }
295 
296                 //char resp;
297                 //std::cout << "dbg: before writeToFile->compressAndFlush" <<
298                 //    std::endl;
299                 //std::cin  >> resp;
300                 int rc = compressAndFlush( false );
301 
302                 //std::cout << "dbg: after writeToFile->compressAndFlush" <<
303                 //    std::endl;
304                 //std::cin  >> resp;
305                 if (rc != NO_ERROR)
306                 {
307                     WErrorCodes ec;
308                     std::ostringstream oss;
309                     oss << "writeToFile: error compressing and writing chunk "
310                         "for OID " << fColInfo->curCol.dataFile.fid <<
311                         "; "   << ec.errorString(rc);
312                     fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
313 
314                     return rc;
315                 }
316 
317                 // Start over again loading a new to-be-compressed buffer
318                 BlockOp::setEmptyBuf( fToBeCompressedBuffer,
319                                       IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
320                                       fColInfo->column.emptyVal,
321                                       fColInfo->column.width );
322 
323                 fToBeCompressedCapacity =
324                     IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
325                 bufOffset = fToBeCompressedBuffer;
326 
327                 fNumBytes = 0;
328             }
329             else
330             {
331                 writeSizeOut = writeSizeX;
332 
333                 if (fLog->isDebug( DEBUG_2 ))
334                 {
335                     std::ostringstream oss;
336                     oss << "Buffering data (new) to-be-compressed for: OID-"  <<
337                         fColInfo->curCol.dataFile.fid <<
338                         "; DBRoot-"   << fColInfo->curCol.dataFile.fDbRoot    <<
339                         "; part-"     << fColInfo->curCol.dataFile.fPartition <<
340                         "; seg-"      << fColInfo->curCol.dataFile.fSegment   <<
341                         "; addBytes-" << writeSizeOut                         <<
342                         "; totBytes-" << (fNumBytes + writeSizeOut);
343                     fLog->logMsg( oss.str(), MSGLVL_INFO2 );
344                 }
345 
346                 memcpy(bufOffset, (fBuffer + startOffsetX), writeSizeOut);
347                 fNumBytes += writeSizeOut;
348                 fNumBytes += fillUpWEmptiesWriteSize;
349             }
350 
351             startOffsetX += writeSizeOut;
352             writeSizeX   -= writeSizeOut;
353         } // end of while loop
354     }
355 
356     return NO_ERROR;
357 }
358 //------------------------------------------------------------------------------
359 // Compress and write out the data in the to-be-compressed buffer.
360 // Also may write out the compression header.
361 //
362 // bFinishingFile indicates whether we are finished working with this file,
363 // either because we are completing an extent or because we have reached the
364 // end of the input data.  In either case, if bFinishingFile is true, then
365 // in addition to flushing the current working chunk to disk, this function
366 // will also write out the updated compression header to match the data.
367 //
368 // This function will also write out the compression header if we are writing
369 // out the first (starting HWM) chunk for this import.  We do this to keep the
370 // compression header in sync with the data, in case PrimProc is trying to read
371 // the db file.  It is not necessary to immediately update the header for the
372 // remaining chunks as they are written out, because PrimProc will not be try-
373 // ing to access those chunk until we update the extentmap HWM at the end of
374 // this import.  It's only the starting HWM chunk that may cause a problem and
375 // requires the immediate rewriting of the header, because we are modifying
376 // that chunk and adding rows to it.
377 //------------------------------------------------------------------------------
compressAndFlush(bool bFinishingFile)378 int ColumnBufferCompressed::compressAndFlush( bool bFinishingFile )
379 {
380     const int OUTPUT_BUFFER_SIZE = IDBCompressInterface::maxCompressedSize(fToBeCompressedCapacity) +
381                                    fUserPaddingBytes;
382     unsigned char* compressedOutBuf = new unsigned char[ OUTPUT_BUFFER_SIZE ];
383     boost::scoped_array<unsigned char> compressedOutBufPtr(compressedOutBuf);
384     unsigned int   outputLen = OUTPUT_BUFFER_SIZE;
385 
386 #ifdef PROFILE
387     Stats::startParseEvent(WE_STATS_COMPRESS_COL_COMPRESS);
388 #endif
389 
390     int rc = fCompressor->compressBlock(
391                  reinterpret_cast<char*>(fToBeCompressedBuffer),
392                  fToBeCompressedCapacity,
393                  compressedOutBuf,
394                  outputLen );
395 
396     if (rc != 0)
397     {
398         return ERR_COMP_COMPRESS;
399     }
400 
401     // Round up the compressed chunk size
402     rc = fCompressor->padCompressedChunks( compressedOutBuf,
403                                            outputLen, OUTPUT_BUFFER_SIZE );
404 
405     if (rc != 0)
406     {
407         return ERR_COMP_PAD_DATA;
408     }
409 
410 #ifdef PROFILE
411     Stats::stopParseEvent(WE_STATS_COMPRESS_COL_COMPRESS);
412     Stats::startParseEvent(WE_STATS_WRITE_COL);
413 #endif
414 
415     off64_t   fileOffset = fFile->tell();
416     size_t nitems =  fFile->write(compressedOutBuf, outputLen) / outputLen;
417 
418     if (nitems != 1)
419         return ERR_FILE_WRITE;
420 
421     CompChunkPtr compChunk(
422         (uint64_t)fileOffset, (uint64_t)outputLen);
423     fChunkPtrs.push_back( compChunk );
424 
425     if (fLog->isDebug( DEBUG_2 ))
426     {
427         std::ostringstream oss;
428         oss << "Writing compressed data for: OID-" <<
429             fColInfo->curCol.dataFile.fid <<
430             "; DBRoot-"    << fColInfo->curCol.dataFile.fDbRoot    <<
431             "; part-"      << fColInfo->curCol.dataFile.fPartition <<
432             "; seg-"       << fColInfo->curCol.dataFile.fSegment   <<
433             "; bytes-"     << outputLen <<
434             "; fileOffset-" << fileOffset;
435         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
436     }
437 
438     // We write out the compression headers if we are finished with this file
439     // (either because we are through with the extent or the data), or because
440     // this is the first HWM chunk that we may be modifying.
441     // See the description that precedes this function for more details.
442     if ( bFinishingFile || !fFlushedStartHwmChunk )
443     {
444         fileOffset = fFile->tell();
445         RETURN_ON_ERROR( saveCompressionHeaders() );
446 
447         // If we just updated the chunk header for the starting HWM chunk,
448         // then we flush our output, to synchronize with compressed chunks,
449         if ( !fFlushedStartHwmChunk )
450         {
451             //char resp;
452             //std::cout << "dbg: before fflush of hdrs" << std::endl;
453             //std::cin  >> resp;
454             if (fFile->flush() != 0)
455                 return ERR_FILE_FLUSH;
456 
457             //std::cout << "dbg: after fflush of hdrs" << std::endl;
458             //std::cin  >> resp;
459             fFlushedStartHwmChunk = true;
460         }
461 
462         // After seeking to the top of the file to write the headers,
463         // we restore the file offset to continue adding more chunks,
464         // if we are not through with this file.
465         if ( !bFinishingFile )
466         {
467             RETURN_ON_ERROR( fColInfo->colOp->setFileOffset(
468                                  fFile, fileOffset, SEEK_SET) );
469         }
470     }
471 
472 #ifdef PROFILE
473     Stats::stopParseEvent(WE_STATS_WRITE_COL);
474 #endif
475 
476     return NO_ERROR;
477 }
478 
479 //------------------------------------------------------------------------------
480 // Final flushing of data and headers prior to closing the file.
481 // File is also truncated if applicable.
482 //------------------------------------------------------------------------------
finishFile(bool bTruncFile)483 int ColumnBufferCompressed::finishFile(bool bTruncFile)
484 {
485     // If capacity is 0, we never got far enough to read in the HWM chunk for
486     // the current column segment file, so no need to update the file contents.
487     // But we do continue in case we need to truncate the file before exiting.
488     // This could happen if our initial block skipping finished an extent.
489     if (fToBeCompressedCapacity > 0)
490     {
491         //char resp;
492         //std::cout << "dbg: before finishFile->compressAndFlush" << std::endl;
493         //std::cin  >> resp;
494         // Write out any data still waiting to be compressed
495         RETURN_ON_ERROR( compressAndFlush( true ) );
496         //std::cout << "dbg: after finishFile->compressAndFlush" << std::endl;
497         //std::cin  >> resp;
498     }
499 
500 #ifdef PROFILE
501     Stats::startParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT);
502 #endif
503 
504     // Truncate file (if applicable) based on offset and size of last chunk
505     if (bTruncFile && (fChunkPtrs.size() > 0))
506     {
507         long long truncateFileSize = fChunkPtrs[fChunkPtrs.size() - 1].first +
508                                      fChunkPtrs[fChunkPtrs.size() - 1].second;
509 
510         // @bug5769 Don't initialize extents or truncate db files on HDFS
511         if (idbdatafile::IDBPolicy::useHdfs())
512         {
513             std::ostringstream oss1;
514             oss1 << "Finished writing column file"
515                  ": OID-"    << fColInfo->curCol.dataFile.fid        <<
516                  "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
517                  "; part-"   << fColInfo->curCol.dataFile.fPartition <<
518                  "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
519                  "; size-"   << truncateFileSize;
520             fLog->logMsg( oss1.str(), MSGLVL_INFO2 );
521         }
522         else
523         {
524             std::ostringstream oss1;
525             oss1 << "Truncating column file"
526                  ": OID-"    << fColInfo->curCol.dataFile.fid        <<
527                  "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
528                  "; part-"   << fColInfo->curCol.dataFile.fPartition <<
529                  "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
530                  "; size-"   << truncateFileSize;
531             fLog->logMsg( oss1.str(), MSGLVL_INFO2 );
532 
533             int rc = NO_ERROR;
534 
535             if (truncateFileSize > 0)
536                 rc = fColInfo->colOp->truncateFile( fFile, truncateFileSize );
537             else
538                 rc = ERR_COMP_TRUNCATE_ZERO;//@bug3913-Catch truncate to 0 bytes
539 
540             if (rc != NO_ERROR)
541             {
542                 WErrorCodes ec;
543                 std::ostringstream oss2;
544                 oss2 << "finishFile: error truncating file for "        <<
545                      "OID "      << fColInfo->curCol.dataFile.fid        <<
546                      "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
547                      "; part-"   << fColInfo->curCol.dataFile.fPartition <<
548                      "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
549                      "; size-"   << truncateFileSize                     <<
550                      "; "        << ec.errorString(rc);
551                 fLog->logMsg( oss2.str(), rc, MSGLVL_ERROR );
552 
553                 return rc;
554             }
555         }
556     }
557 
558     // Nothing more to do if we are not updating the file contents.
559     if (fToBeCompressedCapacity == 0)
560     {
561 #ifdef PROFILE
562         Stats::stopParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT);
563 #endif
564         return NO_ERROR;
565     }
566 
567     fToBeCompressedCapacity = 0;
568     fNumBytes               = 0;
569     fChunkPtrs.clear();
570 
571 #ifdef PROFILE
572     Stats::stopParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT);
573 #endif
574 
575     return NO_ERROR;
576 }
577 
578 //------------------------------------------------------------------------------
579 // Write out the updated compression headers.
580 //------------------------------------------------------------------------------
saveCompressionHeaders()581 int ColumnBufferCompressed::saveCompressionHeaders( )
582 {
583     // Construct the header records
584     char hdrBuf[IDBCompressInterface::HDR_BUF_LEN * 2];
585     fCompressor->initHdr( hdrBuf, fColInfo->column.compressionType );
586     fCompressor->setBlockCount(hdrBuf,
587                                (fColInfo->getFileSize() / BYTE_PER_BLOCK) );
588 
589     std::vector<uint64_t> ptrs;
590 
591     for (unsigned i = 0; i < fChunkPtrs.size(); i++)
592     {
593         ptrs.push_back( fChunkPtrs[i].first );
594     }
595 
596     unsigned lastIdx = fChunkPtrs.size() - 1;
597     ptrs.push_back( fChunkPtrs[lastIdx].first + fChunkPtrs[lastIdx].second );
598     fCompressor->storePtrs( ptrs, hdrBuf );
599 
600     // Write out the header records
601     //char resp;
602     //std::cout << "dbg: before writeHeaders" << std::endl;
603     //std::cin  >> resp;
604     RETURN_ON_ERROR( fColInfo->colOp->writeHeaders(fFile, hdrBuf) );
605     //std::cout << "dbg: after writeHeaders" << std::endl;
606     //std::cin  >> resp;
607 
608     return NO_ERROR;
609 }
610 
611 //------------------------------------------------------------------------------
612 // Allocates to-be-compressed buffer if it has not already been allocated.
613 // Initializes to-be-compressed buffer with the contents of the chunk containing
614 // the fStartingHwm block, as long as that chunk is in the pointer list.
615 // If the chunk is not in the list, then we must be adding a new chunk, in
616 // which case we just initialize an empty chunk.
617 // Returns startFileOffset which indicates file offset (in bytes) where the
618 // next chunk will be starting.
619 //------------------------------------------------------------------------------
initToBeCompressedBuffer(long long & startFileOffset)620 int ColumnBufferCompressed::initToBeCompressedBuffer(long long& startFileOffset)
621 {
622     bool bNewBuffer = false;
623 
624     // Lazy initialization of to-be-compressed buffer
625     if (!fToBeCompressedBuffer)
626     {
627         fToBeCompressedBuffer =
628             new unsigned char[IDBCompressInterface::UNCOMPRESSED_INBUF_LEN];
629         BlockOp::setEmptyBuf( fToBeCompressedBuffer,
630                               IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
631                               fColInfo->column.emptyVal,
632                               fColInfo->column.width );
633         bNewBuffer = true;
634     }
635 
636     // Find the chunk containing the starting HWM, as long as our initial
637     // block skipping has not caused us to exit the HWM chunk; in which
638     // case we start a new empty chunk.
639     unsigned int chunkIndex             = 0;
640     unsigned int blockOffsetWithinChunk = 0;
641     bool         bSkipStartingBlks      = false;
642 
643     if (fPreLoadHWMChunk)
644     {
645         if (fChunkPtrs.size() > 0)
646         {
647             fCompressor->locateBlock(fStartingHwm,
648                                      chunkIndex, blockOffsetWithinChunk);
649 
650             if (chunkIndex < fChunkPtrs.size())
651                 startFileOffset  = fChunkPtrs[chunkIndex].first;
652             else
653                 fPreLoadHWMChunk = false;
654         }
655         // If we are at the start of the job, fPreLoadHWMChunk will be true,
656         // to preload the old HWM chunk.  But if we have no chunk ptrs, then
657         // we are starting on an empty PM.  In this case, we skip starting
658         // blks if fStartingHwm has been set.
659         else
660         {
661             fPreLoadHWMChunk  = false;
662             bSkipStartingBlks = true;
663         }
664     }
665 
666     // Preload (read and uncompress) the chunk for the starting HWM extent only
667     if (fPreLoadHWMChunk)
668     {
669         fPreLoadHWMChunk = false; // only preload HWM chunk in the first extent
670 
671         std::ostringstream oss;
672         oss << "Reading HWM chunk for: OID-" <<
673             fColInfo->curCol.dataFile.fid <<
674             "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
675             "; part-"   << fColInfo->curCol.dataFile.fPartition <<
676             "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
677             "; hwm-"    << fStartingHwm <<
678             "; chunk#-" << chunkIndex   <<
679             "; blkInChunk-" << blockOffsetWithinChunk;
680         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
681 
682         // Read the chunk
683         RETURN_ON_ERROR( fColInfo->colOp->setFileOffset(
684                              fFile, startFileOffset, SEEK_SET) );
685 
686         char* compressedOutBuf = new char[ fChunkPtrs[chunkIndex].second ];
687         boost::scoped_array<char> compressedOutBufPtr(compressedOutBuf);
688         size_t itemsRead = fFile->read(compressedOutBuf, fChunkPtrs[chunkIndex].second) / fChunkPtrs[chunkIndex].second;
689 
690         if (itemsRead != 1)
691         {
692             std::ostringstream oss;
693             oss << "Error reading HWM chunk for: " <<
694                 "OID-" << fColInfo->curCol.dataFile.fid <<
695                 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
696                 "; part-"   << fColInfo->curCol.dataFile.fPartition <<
697                 "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
698                 "; hwm-"    << fStartingHwm;
699             fLog->logMsg( oss.str(), ERR_COMP_READ_BLOCK, MSGLVL_ERROR );
700 
701             return ERR_COMP_READ_BLOCK;
702         }
703 
704         // Uncompress the chunk into our 4MB buffer
705         unsigned int outLen = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
706         int rc = fCompressor->uncompressBlock(
707                      compressedOutBuf,
708                      fChunkPtrs[chunkIndex].second,
709                      fToBeCompressedBuffer,
710                      outLen);
711 
712         if (rc)
713         {
714             WErrorCodes ec;
715             std::ostringstream oss;
716             oss << "Error uncompressing HWM chunk for: " <<
717                 "OID-" << fColInfo->curCol.dataFile.fid <<
718                 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
719                 "; part-"   << fColInfo->curCol.dataFile.fPartition <<
720                 "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
721                 "; hwm-"    << fStartingHwm <<
722                 "; "        << ec.errorString(rc);
723             fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
724 
725             return ERR_COMP_UNCOMPRESS;
726         }
727 
728         fToBeCompressedCapacity = outLen;
729 
730         // Positition ourselves to start adding data to the HWM block
731         fNumBytes = blockOffsetWithinChunk * BYTE_PER_BLOCK;
732 
733         // We are going to add data to, and thus re-add, the last chunk; so we
734         // drop it from our list.
735         fChunkPtrs.resize( fChunkPtrs.size() - 1 );
736     }
737     else // We have left the HWM chunk; just position file offset,
738         // without reading anything
739     {
740         // If it's not a new buffer, we need to initialize, since we won't be
741         // reading in anything to overlay what's in the to-be-compressed buffer.
742         if (!bNewBuffer)
743         {
744             BlockOp::setEmptyBuf( fToBeCompressedBuffer,
745                                   IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
746                                   fColInfo->column.emptyVal,
747                                   fColInfo->column.width );
748         }
749 
750         if (fLog->isDebug( DEBUG_2 ))
751         {
752             std::ostringstream oss;
753             oss << "Initializing new empty chunk: OID-" <<
754                 fColInfo->curCol.dataFile.fid <<
755                 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot    <<
756                 "; part-"   << fColInfo->curCol.dataFile.fPartition <<
757                 "; seg-"    << fColInfo->curCol.dataFile.fSegment   <<
758                 "; hwm-"    << fStartingHwm;
759             fLog->logMsg( oss.str(), MSGLVL_INFO2 );
760         }
761 
762         fToBeCompressedCapacity = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
763 
764         // Set file offset to start after last current chunk
765         startFileOffset     = IDBCompressInterface::HDR_BUF_LEN * 2;
766 
767         if (fChunkPtrs.size() > 0)
768             startFileOffset = fChunkPtrs[ fChunkPtrs.size() - 1 ].first +
769                               fChunkPtrs[ fChunkPtrs.size() - 1 ].second;
770 
771         // Position ourselves to start of empty to-be-compressed buffer.
772         // If we are starting the first extent on a PM, we may employ blk
773         // skipping at start of import; adjust fNumBytes accordingly.
774         // (see ColumnInfo::createDelayedFileIfNeeded() for discussion)
775         if (bSkipStartingBlks)
776             fNumBytes = fStartingHwm * BYTE_PER_BLOCK;
777         else
778             fNumBytes = 0;
779     }
780 
781     return NO_ERROR;
782 }
783 
784 }
785