1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*****************************************************************************
19 * $Id: we_colbufcompressed.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
20 *
21 ****************************************************************************/
22
23 /** @file
24 * Implementation of the ColumnBufferCompressed class
25 *
26 */
27
28 #include "we_colbufcompressed.h"
29
30 #include <cassert>
31 #include <cstdio>
32 #include <cstring>
33 #include <iostream>
34 #include <sstream>
35
36 #include <boost/scoped_array.hpp>
37
38 #include "we_define.h"
39 #include "we_config.h"
40 #include "we_convertor.h"
41 #include "we_columninfo.h"
42 #include "we_fileop.h"
43 #include "we_log.h"
44 #include "we_stats.h"
45 #include "IDBDataFile.h"
46 using namespace idbdatafile;
47
48 #include "idbcompress.h"
49 using namespace compress;
50
51 namespace WriteEngine
52 {
53
54 //------------------------------------------------------------------------------
55 // Constructor
56 //------------------------------------------------------------------------------
ColumnBufferCompressed(ColumnInfo * pColInfo,Log * logger)57 ColumnBufferCompressed::ColumnBufferCompressed( ColumnInfo* pColInfo,
58 Log* logger) :
59 ColumnBuffer(pColInfo, logger),
60 fToBeCompressedBuffer(0),
61 fToBeCompressedCapacity(0),
62 fNumBytes(0),
63 fCompressor(0),
64 fPreLoadHWMChunk(true),
65 fFlushedStartHwmChunk(false)
66 {
67 fUserPaddingBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
68 fCompressor = new compress::IDBCompressInterface( fUserPaddingBytes );
69 }
70
71 //------------------------------------------------------------------------------
72 // Destructor
73 //------------------------------------------------------------------------------
~ColumnBufferCompressed()74 ColumnBufferCompressed::~ColumnBufferCompressed()
75 {
76 if (fToBeCompressedBuffer)
77 delete []fToBeCompressedBuffer;
78
79 fToBeCompressedBuffer = 0;
80 fToBeCompressedCapacity = 0;
81 fNumBytes = 0;
82 delete fCompressor;
83 }
84
85 //------------------------------------------------------------------------------
86 // Reset "this" ColumnBufferCompressed object to read a different file, by
87 // resetting the FILE*, starting HWM, and the chunk pointers.
88 //------------------------------------------------------------------------------
setDbFile(IDBDataFile * f,HWM startHwm,const char * hdrs)89 int ColumnBufferCompressed::setDbFile(IDBDataFile* f, HWM startHwm, const char* hdrs)
90 {
91 fFile = f;
92 fStartingHwm = startHwm;
93
94 IDBCompressInterface compressor;
95
96 if (compressor.getPtrList(hdrs, fChunkPtrs) != 0)
97 {
98 return ERR_COMP_PARSE_HDRS;
99 }
100
101 // If we have any orphaned chunk pointers (ex: left over after a DML
102 // rollback), that fall after the HWM, then drop those trailing ptrs.
103 unsigned int chunkIndex = 0;
104 unsigned int blockOffsetWithinChunk = 0;
105 fCompressor->locateBlock(fStartingHwm, chunkIndex, blockOffsetWithinChunk);
106
107 if ((chunkIndex + 1) < fChunkPtrs.size())
108 {
109 fChunkPtrs.resize(chunkIndex + 1);
110 }
111
112 return NO_ERROR;
113 }
114
115 //------------------------------------------------------------------------------
116 // Reinitialize to-be-compressed column buffer (to empty chunk) prior to
117 // importing the first chunk of the next extent. Returns startFileOffset
118 // which indicates file offset (in bytes) where next extent will be starting.
119 //------------------------------------------------------------------------------
resetToBeCompressedColBuf(long long & startFileOffset)120 int ColumnBufferCompressed::resetToBeCompressedColBuf(
121 long long& startFileOffset )
122 {
123 // Don't load chunk, once we go to next extent
124 fPreLoadHWMChunk = false;
125
126 // Lazy creation of to-be-compressed buffer
127 if (!fToBeCompressedBuffer)
128 {
129 fToBeCompressedBuffer =
130 new unsigned char[IDBCompressInterface::UNCOMPRESSED_INBUF_LEN];
131 }
132
133 BlockOp::setEmptyBuf( fToBeCompressedBuffer,
134 IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
135 fColInfo->column.emptyVal,
136 fColInfo->column.width );
137
138 if (fLog->isDebug( DEBUG_2 ))
139 {
140 std::ostringstream oss;
141 oss << "Initializing empty chunk for next extent: OID-" <<
142 fColInfo->curCol.dataFile.fid <<
143 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
144 "; part-" << fColInfo->curCol.dataFile.fPartition <<
145 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
146 "; hwm-" << fStartingHwm;
147 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
148 }
149
150 fToBeCompressedCapacity = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
151
152 // Set file offset past end of last chunk
153 startFileOffset = IDBCompressInterface::HDR_BUF_LEN * 2;
154
155 if (fChunkPtrs.size() > 0)
156 startFileOffset = fChunkPtrs[ fChunkPtrs.size() - 1 ].first +
157 fChunkPtrs[ fChunkPtrs.size() - 1 ].second;
158
159 // Positition ourselves to start of empty to-be-compressed buffer
160 fNumBytes = 0;
161
162 return NO_ERROR;
163 }
164
165 //------------------------------------------------------------------------------
166 // Intercept data being copied from the raw-data output buffer to the output
167 // file, and instead buffer up the data to be compressed in 4M chunks before
168 // writing it out.
169 //------------------------------------------------------------------------------
writeToFile(int startOffset,int writeSize,bool fillUpWEmpties)170 int ColumnBufferCompressed::writeToFile(int startOffset, int writeSize,
171 bool fillUpWEmpties)
172 {
173 if (writeSize == 0) // skip unnecessary write, if 0 bytes given
174 return NO_ERROR;
175
176 int fillUpWEmptiesWriteSize = 0;
177 if (fillUpWEmpties)
178 fillUpWEmptiesWriteSize = BYTE_PER_BLOCK - writeSize % BYTE_PER_BLOCK;
179
180 // If we are starting a new file, we need to reinit the buffer and
181 // find out what our file offset should be set to.
182 if (!fToBeCompressedCapacity)
183 {
184 #ifdef PROFILE
185 Stats::startParseEvent(WE_STATS_COMPRESS_COL_INIT_BUF);
186 #endif
187 long long startFileOffset;
188 int rc = initToBeCompressedBuffer( startFileOffset );
189
190 if (rc != NO_ERROR)
191 {
192 WErrorCodes ec;
193 std::ostringstream oss;
194 oss << "writeToFile: error initializing to-be-compressed buffer "
195 "for OID " << fColInfo->curCol.dataFile.fid <<
196 "; " << ec.errorString(rc);
197 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
198
199 return rc;
200 }
201
202 rc = fColInfo->colOp->setFileOffset(fFile, startFileOffset, SEEK_SET);
203
204 if (rc != NO_ERROR)
205 {
206 WErrorCodes ec;
207 std::ostringstream oss;
208 oss << "writeToFile: error init compressed file offset for " <<
209 "OID " << fColInfo->curCol.dataFile.fid <<
210 "; " << startFileOffset <<
211 "; " << ec.errorString(rc);
212 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
213
214 return rc;
215 }
216
217 #ifdef PROFILE
218 Stats::stopParseEvent(WE_STATS_COMPRESS_COL_INIT_BUF);
219 #endif
220 }
221
222 unsigned char* bufOffset = fToBeCompressedBuffer + fNumBytes;
223
224 // Expand the compression buffer size if working with an abbrev extent, and
225 // the bytes we are about to add will overflow the abbreviated extent.
226 if ((fToBeCompressedCapacity < IDBCompressInterface::UNCOMPRESSED_INBUF_LEN) &&
227 ((fNumBytes + writeSize + fillUpWEmptiesWriteSize) > fToBeCompressedCapacity) )
228 {
229 std::ostringstream oss;
230 oss << "Expanding abbrev to-be-compressed buffer for: OID-" <<
231 fColInfo->curCol.dataFile.fid <<
232 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
233 "; part-" << fColInfo->curCol.dataFile.fPartition <<
234 "; seg-" << fColInfo->curCol.dataFile.fSegment;
235 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
236 fToBeCompressedCapacity = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
237 }
238
239 if ((fNumBytes + writeSize + fillUpWEmptiesWriteSize) <= fToBeCompressedCapacity)
240 {
241 if (fLog->isDebug( DEBUG_2 ))
242 {
243 std::ostringstream oss;
244 oss << "Buffering data to-be-compressed for: OID-" <<
245 fColInfo->curCol.dataFile.fid <<
246 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
247 "; part-" << fColInfo->curCol.dataFile.fPartition <<
248 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
249 "; addBytes-" << writeSize <<
250 "; extraBytes-" << fillUpWEmptiesWriteSize <<
251 "; totBytes-" << (fNumBytes + writeSize);
252 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
253 }
254
255 memcpy(bufOffset, (fBuffer + startOffset), writeSize);
256 fNumBytes += writeSize;
257 fNumBytes += fillUpWEmptiesWriteSize;
258 }
259 else // Not enough room to add all the data to the to-be-compressed buffer
260 {
261 int startOffsetX = startOffset;
262 int writeSizeX = writeSize;
263
264 // The number of bytes (in fBuffer) to be written, could be larger than
265 // our to-be-compressed buffer, so we require a loop to potentially
266 // iterate thru all the bytes to be compresssed and written from fBuffer
267 while (writeSizeX > 0)
268 {
269 idbassert( (fNumBytes <= fToBeCompressedCapacity) ); // DMC-temp debug
270
271 size_t writeSizeOut = 0;
272
273 if ((fNumBytes + writeSizeX) > fToBeCompressedCapacity)
274 {
275 writeSizeOut = fToBeCompressedCapacity - fNumBytes;
276
277 if (fLog->isDebug( DEBUG_2 ))
278 {
279 std::ostringstream oss;
280 oss << "Buffering data (full) to-be-compressed for: OID-" <<
281 fColInfo->curCol.dataFile.fid <<
282 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
283 "; part-" << fColInfo->curCol.dataFile.fPartition <<
284 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
285 "; addBytes-" << writeSizeOut <<
286 "; totBytes-" << (fNumBytes + writeSizeOut);
287 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
288 }
289
290 if (writeSizeOut > 0)
291 {
292 memcpy(bufOffset, (fBuffer + startOffsetX), writeSizeOut);
293 fNumBytes += writeSizeOut;
294 }
295
296 //char resp;
297 //std::cout << "dbg: before writeToFile->compressAndFlush" <<
298 // std::endl;
299 //std::cin >> resp;
300 int rc = compressAndFlush( false );
301
302 //std::cout << "dbg: after writeToFile->compressAndFlush" <<
303 // std::endl;
304 //std::cin >> resp;
305 if (rc != NO_ERROR)
306 {
307 WErrorCodes ec;
308 std::ostringstream oss;
309 oss << "writeToFile: error compressing and writing chunk "
310 "for OID " << fColInfo->curCol.dataFile.fid <<
311 "; " << ec.errorString(rc);
312 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
313
314 return rc;
315 }
316
317 // Start over again loading a new to-be-compressed buffer
318 BlockOp::setEmptyBuf( fToBeCompressedBuffer,
319 IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
320 fColInfo->column.emptyVal,
321 fColInfo->column.width );
322
323 fToBeCompressedCapacity =
324 IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
325 bufOffset = fToBeCompressedBuffer;
326
327 fNumBytes = 0;
328 }
329 else
330 {
331 writeSizeOut = writeSizeX;
332
333 if (fLog->isDebug( DEBUG_2 ))
334 {
335 std::ostringstream oss;
336 oss << "Buffering data (new) to-be-compressed for: OID-" <<
337 fColInfo->curCol.dataFile.fid <<
338 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
339 "; part-" << fColInfo->curCol.dataFile.fPartition <<
340 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
341 "; addBytes-" << writeSizeOut <<
342 "; totBytes-" << (fNumBytes + writeSizeOut);
343 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
344 }
345
346 memcpy(bufOffset, (fBuffer + startOffsetX), writeSizeOut);
347 fNumBytes += writeSizeOut;
348 fNumBytes += fillUpWEmptiesWriteSize;
349 }
350
351 startOffsetX += writeSizeOut;
352 writeSizeX -= writeSizeOut;
353 } // end of while loop
354 }
355
356 return NO_ERROR;
357 }
358 //------------------------------------------------------------------------------
359 // Compress and write out the data in the to-be-compressed buffer.
360 // Also may write out the compression header.
361 //
362 // bFinishingFile indicates whether we are finished working with this file,
363 // either because we are completing an extent or because we have reached the
364 // end of the input data. In either case, if bFinishingFile is true, then
365 // in addition to flushing the current working chunk to disk, this function
366 // will also write out the updated compression header to match the data.
367 //
368 // This function will also write out the compression header if we are writing
369 // out the first (starting HWM) chunk for this import. We do this to keep the
370 // compression header in sync with the data, in case PrimProc is trying to read
371 // the db file. It is not necessary to immediately update the header for the
372 // remaining chunks as they are written out, because PrimProc will not be try-
373 // ing to access those chunk until we update the extentmap HWM at the end of
374 // this import. It's only the starting HWM chunk that may cause a problem and
375 // requires the immediate rewriting of the header, because we are modifying
376 // that chunk and adding rows to it.
377 //------------------------------------------------------------------------------
compressAndFlush(bool bFinishingFile)378 int ColumnBufferCompressed::compressAndFlush( bool bFinishingFile )
379 {
380 const int OUTPUT_BUFFER_SIZE = IDBCompressInterface::maxCompressedSize(fToBeCompressedCapacity) +
381 fUserPaddingBytes;
382 unsigned char* compressedOutBuf = new unsigned char[ OUTPUT_BUFFER_SIZE ];
383 boost::scoped_array<unsigned char> compressedOutBufPtr(compressedOutBuf);
384 unsigned int outputLen = OUTPUT_BUFFER_SIZE;
385
386 #ifdef PROFILE
387 Stats::startParseEvent(WE_STATS_COMPRESS_COL_COMPRESS);
388 #endif
389
390 int rc = fCompressor->compressBlock(
391 reinterpret_cast<char*>(fToBeCompressedBuffer),
392 fToBeCompressedCapacity,
393 compressedOutBuf,
394 outputLen );
395
396 if (rc != 0)
397 {
398 return ERR_COMP_COMPRESS;
399 }
400
401 // Round up the compressed chunk size
402 rc = fCompressor->padCompressedChunks( compressedOutBuf,
403 outputLen, OUTPUT_BUFFER_SIZE );
404
405 if (rc != 0)
406 {
407 return ERR_COMP_PAD_DATA;
408 }
409
410 #ifdef PROFILE
411 Stats::stopParseEvent(WE_STATS_COMPRESS_COL_COMPRESS);
412 Stats::startParseEvent(WE_STATS_WRITE_COL);
413 #endif
414
415 off64_t fileOffset = fFile->tell();
416 size_t nitems = fFile->write(compressedOutBuf, outputLen) / outputLen;
417
418 if (nitems != 1)
419 return ERR_FILE_WRITE;
420
421 CompChunkPtr compChunk(
422 (uint64_t)fileOffset, (uint64_t)outputLen);
423 fChunkPtrs.push_back( compChunk );
424
425 if (fLog->isDebug( DEBUG_2 ))
426 {
427 std::ostringstream oss;
428 oss << "Writing compressed data for: OID-" <<
429 fColInfo->curCol.dataFile.fid <<
430 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
431 "; part-" << fColInfo->curCol.dataFile.fPartition <<
432 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
433 "; bytes-" << outputLen <<
434 "; fileOffset-" << fileOffset;
435 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
436 }
437
438 // We write out the compression headers if we are finished with this file
439 // (either because we are through with the extent or the data), or because
440 // this is the first HWM chunk that we may be modifying.
441 // See the description that precedes this function for more details.
442 if ( bFinishingFile || !fFlushedStartHwmChunk )
443 {
444 fileOffset = fFile->tell();
445 RETURN_ON_ERROR( saveCompressionHeaders() );
446
447 // If we just updated the chunk header for the starting HWM chunk,
448 // then we flush our output, to synchronize with compressed chunks,
449 if ( !fFlushedStartHwmChunk )
450 {
451 //char resp;
452 //std::cout << "dbg: before fflush of hdrs" << std::endl;
453 //std::cin >> resp;
454 if (fFile->flush() != 0)
455 return ERR_FILE_FLUSH;
456
457 //std::cout << "dbg: after fflush of hdrs" << std::endl;
458 //std::cin >> resp;
459 fFlushedStartHwmChunk = true;
460 }
461
462 // After seeking to the top of the file to write the headers,
463 // we restore the file offset to continue adding more chunks,
464 // if we are not through with this file.
465 if ( !bFinishingFile )
466 {
467 RETURN_ON_ERROR( fColInfo->colOp->setFileOffset(
468 fFile, fileOffset, SEEK_SET) );
469 }
470 }
471
472 #ifdef PROFILE
473 Stats::stopParseEvent(WE_STATS_WRITE_COL);
474 #endif
475
476 return NO_ERROR;
477 }
478
479 //------------------------------------------------------------------------------
480 // Final flushing of data and headers prior to closing the file.
481 // File is also truncated if applicable.
482 //------------------------------------------------------------------------------
finishFile(bool bTruncFile)483 int ColumnBufferCompressed::finishFile(bool bTruncFile)
484 {
485 // If capacity is 0, we never got far enough to read in the HWM chunk for
486 // the current column segment file, so no need to update the file contents.
487 // But we do continue in case we need to truncate the file before exiting.
488 // This could happen if our initial block skipping finished an extent.
489 if (fToBeCompressedCapacity > 0)
490 {
491 //char resp;
492 //std::cout << "dbg: before finishFile->compressAndFlush" << std::endl;
493 //std::cin >> resp;
494 // Write out any data still waiting to be compressed
495 RETURN_ON_ERROR( compressAndFlush( true ) );
496 //std::cout << "dbg: after finishFile->compressAndFlush" << std::endl;
497 //std::cin >> resp;
498 }
499
500 #ifdef PROFILE
501 Stats::startParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT);
502 #endif
503
504 // Truncate file (if applicable) based on offset and size of last chunk
505 if (bTruncFile && (fChunkPtrs.size() > 0))
506 {
507 long long truncateFileSize = fChunkPtrs[fChunkPtrs.size() - 1].first +
508 fChunkPtrs[fChunkPtrs.size() - 1].second;
509
510 // @bug5769 Don't initialize extents or truncate db files on HDFS
511 if (idbdatafile::IDBPolicy::useHdfs())
512 {
513 std::ostringstream oss1;
514 oss1 << "Finished writing column file"
515 ": OID-" << fColInfo->curCol.dataFile.fid <<
516 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
517 "; part-" << fColInfo->curCol.dataFile.fPartition <<
518 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
519 "; size-" << truncateFileSize;
520 fLog->logMsg( oss1.str(), MSGLVL_INFO2 );
521 }
522 else
523 {
524 std::ostringstream oss1;
525 oss1 << "Truncating column file"
526 ": OID-" << fColInfo->curCol.dataFile.fid <<
527 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
528 "; part-" << fColInfo->curCol.dataFile.fPartition <<
529 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
530 "; size-" << truncateFileSize;
531 fLog->logMsg( oss1.str(), MSGLVL_INFO2 );
532
533 int rc = NO_ERROR;
534
535 if (truncateFileSize > 0)
536 rc = fColInfo->colOp->truncateFile( fFile, truncateFileSize );
537 else
538 rc = ERR_COMP_TRUNCATE_ZERO;//@bug3913-Catch truncate to 0 bytes
539
540 if (rc != NO_ERROR)
541 {
542 WErrorCodes ec;
543 std::ostringstream oss2;
544 oss2 << "finishFile: error truncating file for " <<
545 "OID " << fColInfo->curCol.dataFile.fid <<
546 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
547 "; part-" << fColInfo->curCol.dataFile.fPartition <<
548 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
549 "; size-" << truncateFileSize <<
550 "; " << ec.errorString(rc);
551 fLog->logMsg( oss2.str(), rc, MSGLVL_ERROR );
552
553 return rc;
554 }
555 }
556 }
557
558 // Nothing more to do if we are not updating the file contents.
559 if (fToBeCompressedCapacity == 0)
560 {
561 #ifdef PROFILE
562 Stats::stopParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT);
563 #endif
564 return NO_ERROR;
565 }
566
567 fToBeCompressedCapacity = 0;
568 fNumBytes = 0;
569 fChunkPtrs.clear();
570
571 #ifdef PROFILE
572 Stats::stopParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT);
573 #endif
574
575 return NO_ERROR;
576 }
577
578 //------------------------------------------------------------------------------
579 // Write out the updated compression headers.
580 //------------------------------------------------------------------------------
saveCompressionHeaders()581 int ColumnBufferCompressed::saveCompressionHeaders( )
582 {
583 // Construct the header records
584 char hdrBuf[IDBCompressInterface::HDR_BUF_LEN * 2];
585 fCompressor->initHdr( hdrBuf, fColInfo->column.compressionType );
586 fCompressor->setBlockCount(hdrBuf,
587 (fColInfo->getFileSize() / BYTE_PER_BLOCK) );
588
589 std::vector<uint64_t> ptrs;
590
591 for (unsigned i = 0; i < fChunkPtrs.size(); i++)
592 {
593 ptrs.push_back( fChunkPtrs[i].first );
594 }
595
596 unsigned lastIdx = fChunkPtrs.size() - 1;
597 ptrs.push_back( fChunkPtrs[lastIdx].first + fChunkPtrs[lastIdx].second );
598 fCompressor->storePtrs( ptrs, hdrBuf );
599
600 // Write out the header records
601 //char resp;
602 //std::cout << "dbg: before writeHeaders" << std::endl;
603 //std::cin >> resp;
604 RETURN_ON_ERROR( fColInfo->colOp->writeHeaders(fFile, hdrBuf) );
605 //std::cout << "dbg: after writeHeaders" << std::endl;
606 //std::cin >> resp;
607
608 return NO_ERROR;
609 }
610
611 //------------------------------------------------------------------------------
612 // Allocates to-be-compressed buffer if it has not already been allocated.
613 // Initializes to-be-compressed buffer with the contents of the chunk containing
614 // the fStartingHwm block, as long as that chunk is in the pointer list.
615 // If the chunk is not in the list, then we must be adding a new chunk, in
616 // which case we just initialize an empty chunk.
617 // Returns startFileOffset which indicates file offset (in bytes) where the
618 // next chunk will be starting.
619 //------------------------------------------------------------------------------
initToBeCompressedBuffer(long long & startFileOffset)620 int ColumnBufferCompressed::initToBeCompressedBuffer(long long& startFileOffset)
621 {
622 bool bNewBuffer = false;
623
624 // Lazy initialization of to-be-compressed buffer
625 if (!fToBeCompressedBuffer)
626 {
627 fToBeCompressedBuffer =
628 new unsigned char[IDBCompressInterface::UNCOMPRESSED_INBUF_LEN];
629 BlockOp::setEmptyBuf( fToBeCompressedBuffer,
630 IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
631 fColInfo->column.emptyVal,
632 fColInfo->column.width );
633 bNewBuffer = true;
634 }
635
636 // Find the chunk containing the starting HWM, as long as our initial
637 // block skipping has not caused us to exit the HWM chunk; in which
638 // case we start a new empty chunk.
639 unsigned int chunkIndex = 0;
640 unsigned int blockOffsetWithinChunk = 0;
641 bool bSkipStartingBlks = false;
642
643 if (fPreLoadHWMChunk)
644 {
645 if (fChunkPtrs.size() > 0)
646 {
647 fCompressor->locateBlock(fStartingHwm,
648 chunkIndex, blockOffsetWithinChunk);
649
650 if (chunkIndex < fChunkPtrs.size())
651 startFileOffset = fChunkPtrs[chunkIndex].first;
652 else
653 fPreLoadHWMChunk = false;
654 }
655 // If we are at the start of the job, fPreLoadHWMChunk will be true,
656 // to preload the old HWM chunk. But if we have no chunk ptrs, then
657 // we are starting on an empty PM. In this case, we skip starting
658 // blks if fStartingHwm has been set.
659 else
660 {
661 fPreLoadHWMChunk = false;
662 bSkipStartingBlks = true;
663 }
664 }
665
666 // Preload (read and uncompress) the chunk for the starting HWM extent only
667 if (fPreLoadHWMChunk)
668 {
669 fPreLoadHWMChunk = false; // only preload HWM chunk in the first extent
670
671 std::ostringstream oss;
672 oss << "Reading HWM chunk for: OID-" <<
673 fColInfo->curCol.dataFile.fid <<
674 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
675 "; part-" << fColInfo->curCol.dataFile.fPartition <<
676 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
677 "; hwm-" << fStartingHwm <<
678 "; chunk#-" << chunkIndex <<
679 "; blkInChunk-" << blockOffsetWithinChunk;
680 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
681
682 // Read the chunk
683 RETURN_ON_ERROR( fColInfo->colOp->setFileOffset(
684 fFile, startFileOffset, SEEK_SET) );
685
686 char* compressedOutBuf = new char[ fChunkPtrs[chunkIndex].second ];
687 boost::scoped_array<char> compressedOutBufPtr(compressedOutBuf);
688 size_t itemsRead = fFile->read(compressedOutBuf, fChunkPtrs[chunkIndex].second) / fChunkPtrs[chunkIndex].second;
689
690 if (itemsRead != 1)
691 {
692 std::ostringstream oss;
693 oss << "Error reading HWM chunk for: " <<
694 "OID-" << fColInfo->curCol.dataFile.fid <<
695 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
696 "; part-" << fColInfo->curCol.dataFile.fPartition <<
697 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
698 "; hwm-" << fStartingHwm;
699 fLog->logMsg( oss.str(), ERR_COMP_READ_BLOCK, MSGLVL_ERROR );
700
701 return ERR_COMP_READ_BLOCK;
702 }
703
704 // Uncompress the chunk into our 4MB buffer
705 unsigned int outLen = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
706 int rc = fCompressor->uncompressBlock(
707 compressedOutBuf,
708 fChunkPtrs[chunkIndex].second,
709 fToBeCompressedBuffer,
710 outLen);
711
712 if (rc)
713 {
714 WErrorCodes ec;
715 std::ostringstream oss;
716 oss << "Error uncompressing HWM chunk for: " <<
717 "OID-" << fColInfo->curCol.dataFile.fid <<
718 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
719 "; part-" << fColInfo->curCol.dataFile.fPartition <<
720 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
721 "; hwm-" << fStartingHwm <<
722 "; " << ec.errorString(rc);
723 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
724
725 return ERR_COMP_UNCOMPRESS;
726 }
727
728 fToBeCompressedCapacity = outLen;
729
730 // Positition ourselves to start adding data to the HWM block
731 fNumBytes = blockOffsetWithinChunk * BYTE_PER_BLOCK;
732
733 // We are going to add data to, and thus re-add, the last chunk; so we
734 // drop it from our list.
735 fChunkPtrs.resize( fChunkPtrs.size() - 1 );
736 }
737 else // We have left the HWM chunk; just position file offset,
738 // without reading anything
739 {
740 // If it's not a new buffer, we need to initialize, since we won't be
741 // reading in anything to overlay what's in the to-be-compressed buffer.
742 if (!bNewBuffer)
743 {
744 BlockOp::setEmptyBuf( fToBeCompressedBuffer,
745 IDBCompressInterface::UNCOMPRESSED_INBUF_LEN,
746 fColInfo->column.emptyVal,
747 fColInfo->column.width );
748 }
749
750 if (fLog->isDebug( DEBUG_2 ))
751 {
752 std::ostringstream oss;
753 oss << "Initializing new empty chunk: OID-" <<
754 fColInfo->curCol.dataFile.fid <<
755 "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot <<
756 "; part-" << fColInfo->curCol.dataFile.fPartition <<
757 "; seg-" << fColInfo->curCol.dataFile.fSegment <<
758 "; hwm-" << fStartingHwm;
759 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
760 }
761
762 fToBeCompressedCapacity = IDBCompressInterface::UNCOMPRESSED_INBUF_LEN;
763
764 // Set file offset to start after last current chunk
765 startFileOffset = IDBCompressInterface::HDR_BUF_LEN * 2;
766
767 if (fChunkPtrs.size() > 0)
768 startFileOffset = fChunkPtrs[ fChunkPtrs.size() - 1 ].first +
769 fChunkPtrs[ fChunkPtrs.size() - 1 ].second;
770
771 // Position ourselves to start of empty to-be-compressed buffer.
772 // If we are starting the first extent on a PM, we may employ blk
773 // skipping at start of import; adjust fNumBytes accordingly.
774 // (see ColumnInfo::createDelayedFileIfNeeded() for discussion)
775 if (bSkipStartingBlks)
776 fNumBytes = fStartingHwm * BYTE_PER_BLOCK;
777 else
778 fNumBytes = 0;
779 }
780
781 return NO_ERROR;
782 }
783
784 }
785