1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*********************************************************************
19 * $Id: we_bulkloadbuffer.h 4489 2013-01-30 18:47:53Z dcathey $
20 *
21 ********************************************************************/
22 #ifndef _WE_BULKLOADBUFFER_H
23 #define _WE_BULKLOADBUFFER_H
24
25 #include "we_type.h"
26 #include "limits"
27 #include "string"
28 #include "vector"
29 #include "boost/thread/mutex.hpp"
30 #include "boost/ptr_container/ptr_vector.hpp"
31 #include "we_columninfo.h"
32 #include "calpontsystemcatalog.h"
33
34 namespace WriteEngine
35 {
36 class Log;
37
38 // Used to collect stats about a BulkLoadBuffer buffer that is being parsed
39 class BLBufferStats
40 {
41 public:
42 int64_t minBufferVal;
43 int64_t maxBufferVal;
44 int64_t satCount;
BLBufferStats(ColDataType colDataType)45 BLBufferStats(ColDataType colDataType) : satCount(0)
46 {
47 if (isUnsigned(colDataType) || isCharType(colDataType))
48 {
49 minBufferVal = static_cast<int64_t>(MAX_UBIGINT);
50 maxBufferVal = static_cast<int64_t>(MIN_UBIGINT);
51 }
52 else
53 {
54 minBufferVal = MAX_BIGINT;
55 maxBufferVal = MIN_BIGINT;
56 }
57 }
58 };
59
60 class BulkLoadBuffer
61 {
62 private:
63
64 //--------------------------------------------------------------------------
65 // Private Data Members
66 //--------------------------------------------------------------------------
67
68 char* fData; // Buffer with data read from tbl file
69 char* fDataParser; // for temporary use by parser
70
71 char* fOverflowBuf; // Overflow data held for next buffer
72 unsigned fOverflowSize; // Current size of fOverflowBuf
73
74 // Information about the locker and status for each column in this buffer.
75 // Note that TableInfo::fSyncUpdatesTI mutex is used to synchronize
76 // access to fColumnLocks and fParseComplete from both read and parse
77 // threads. Table-scope lock (fSyncUpdatesTI) is used instead of
78 // buffer-scope lock (fSyncUpdatesBLB), to keep state for all buffers
79 // static while we are scanning fColumnLocks for the buffers in a table
80 std::vector<LockInfo> fColumnLocks;
81
82 // Note that TableInfo::fSyncUpdatesTI mutex (not fStatusBLB) is used
83 // to synchronize getting/setting fStatusBLB between threads.
84 Status fStatusBLB; // Status of buffer
85
86 // TableInfo::fSyncUpdatesTI mutex should be locked when accessing
87 // this data member (see fColumnLocks discussion).
88 unsigned fParseComplete; // Num of columns that are parseComplete
89
90 unsigned fTotalRows; // Max rows this buffer can now hold;
91 // size of fTokens array
92 std::vector< std::pair<RID, std::string> > fRowStatus; //Status of bad rows
93 std::vector<std::string> fErrRows; // Rejected rows to write to .bad file
94
95 uint32_t fTotalReadRows; // Total valid rows read into buffer;
96 // this count excludes rejected rows
97 uint32_t fTotalReadRowsParser; // for temporary use by parser
98
99 uint32_t fTotalReadRowsForLog; // Total rows read into this buffer
100 // including invalid rows
101
102 RID fStartRow; // Starting row id for rows in buffer,
103 // relative to start of job.
104 // Rejected rows are excluded.
105 RID fStartRowParser; // for temporary use by parser
106
107 RID fStartRowForLogging; // Starting row id for rows in buffer,
108 // relative to start of current input
109 // file. All rows are counted.
110 RID fStartRowForLoggingParser; // for temporary use by parser
111
112 uint32_t fAutoIncGenCount; // How many auto-increment values are
113 // to be generated for current buffer
114 uint32_t fAutoIncGenCountParser; // for temporary use by parser
115
116 uint64_t fAutoIncNextValue; // Next auto-increment value assign to
117 // a row in this buffer
118 unsigned fNumberOfColumns; // Number of ColumnInfo objs in table
119
120 ColPosPair** fTokens; // Vector of start and offsets for the
121 // column values read from tbl files
122 ColPosPair** fTokensParser; // for temporary use by parser
123
124 char fColDelim; // Character to delimit columns in a row
125 unsigned fBufferSize; // Size of input read buffer (fData)
126 unsigned fReadSize; // Number of bytes in read buffer(fData)
127 boost::mutex fSyncUpdatesBLB; // Mutex to synchronize updates
128 Log* fLog; // Logger object
129 bool fNullStringMode; // Indicates if "NULL" string is to be
130 // treated as a NULL value or not
131 char fEnclosedByChar; // Optional char to enclose col values
132 char fEscapeChar; // Used to escape enclosed character
133 int fBufferId; // Id for this read buffer
134 std::string fTableName; // Table assigned to this read buffer
135 JobFieldRefList fFieldList; // Complete list of cols and flds
136 unsigned int fNumFieldsInFile; // Number of fields in input file
137 // (including fields to be ignored)
138 unsigned int fNumColsInFile; // Number of flds in input file targeted
139 // for db cols (omits default cols)
140 bool fbTruncationAsError; // Treat string truncation as error
141 ImportDataMode fImportDataMode; // Import data in text or binary mode
142 std::string fTimeZone; // Timezone used by TIMESTAMP datatype
143 unsigned int fFixedBinaryRecLen; // Fixed rec len used in binary mode
144
145 //--------------------------------------------------------------------------
146 // Private Functions
147 //--------------------------------------------------------------------------
148
149 /** @brief Copy constructor
150 */
151 BulkLoadBuffer(const BulkLoadBuffer& buffer);
152
153 /** @brief Assignment operator
154 */
155 BulkLoadBuffer& operator =(const BulkLoadBuffer& buffer);
156
157 /** @brief Convert the buffer data depending upon the data type
158 */
159 void convert(char* field, int fieldLength,
160 bool nullFlag, unsigned char* output,
161 const JobColumn& column,
162 BLBufferStats& bufStats);
163
164 /** @brief Copy the overflow data
165 */
166 void copyOverflow(const BulkLoadBuffer& buffer);
167
168 /** @brief Parse a Read buffer for a nonDictionary column
169 */
170 int parseCol(ColumnInfo& columnInfo);
171
172 /** @brief Parse a Read buffer for a nonDictionary column
173 */
174 void parseColLogMinMax(std::ostringstream& oss,
175 ColDataType colDataType,
176 int64_t minBufferVal,
177 int64_t maxBufferVal) const;
178
179 /** @brief Parse a Read buffer for a Dictionary column
180 */
181 int parseDict(ColumnInfo& columnInfo);
182
183 /** @brief Parse a Dictionary Read buffer into a ColumnBufferSection.
184 *
185 * Parses the Read buffer into a section up to the point at which
186 * the buffer crosses an extent boundary.
187 *
188 * @param columnInfo Column being parsed
189 * @oaram tokenPos Position of rows to be parsed, in fTokens.
190 * @param startRow Row id of first row in buffer to be parsed.
191 * Row id is relative to all the rows in this import.
192 * @param totalReadRows Number of buffer rows ready to be parsed
193 * @param nRowsParsed Number of buffer rows that were parsed
194 */
195 int parseDictSection(ColumnInfo& columnInfo, int tokenPos,
196 RID startRow, uint32_t totalReadRows,
197 uint32_t& nRowsParsed);
198
199 /** @brief Expand the size of the fTokens array
200 */
201 void resizeTokenArray();
202
203 /** @brief tokenize the buffer contents and fill up the token array.
204 */
205 void tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
206 unsigned int allowedErrCntThisCall);
207
208 /** @brief Binary tokenization of the buffer, and fill up the token array.
209 */
210 int tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo,
211 unsigned int allowedErrCntThisCall,
212 bool bEndOfData);
213
214 /** @brief Determine if specified value is NULL or not.
215 */
216 bool isBinaryFieldNull(void* val, WriteEngine::ColType ct,
217 execplan::CalpontSystemCatalog::ColDataType dt);
218
219 public:
220
221 /** @brief Constructor
222 * @param noOfCol Number of columns
223 * @param bufferSize Buffer size
224 * @param logger The Log object used for logging
225 * @param bufferId Id assigned to this buffer
226 * @param tableName Name of table associated with this read buffer
227 * @param jobFieldRefList Complete list of cols/flds listed in Job XML file
228 */
229 BulkLoadBuffer(unsigned noOfCols,
230 unsigned bufferSize, Log* logger,
231 int bufferId, const std::string& tableName,
232 const JobFieldRefList& jobFieldRefList);
233
234 /** @brief Destructor
235 */
236 ~BulkLoadBuffer();
237
238 /** @brief Resets the values of the members (excluding column locks)
239 */
240 void reset();
241
242 /** @brief Resets the column locks.
243 * TableInfo::fSyncUpdatesTI mutex should be locked when calling this
244 * function (see fColumnLocks discussion).
245 */
246 void resetColumnLocks();
247
248 /** @brief Get the buffer status
249 */
getStatusBLB()250 Status getStatusBLB() const
251 {
252 return fStatusBLB;
253 }
254
255 /** @brief Set the buffer status
256 */
setStatusBLB(const Status & status)257 void setStatusBLB(const Status& status)
258 {
259 fStatusBLB = status;
260 }
261
262 /** @brief Try to lock a column for the buffer
263 * TableInfo::fSyncUpdatesTI mutex should be locked when calling this
264 * function (see fColumnLocks discussion).
265 */
266 bool tryAndLockColumn(const int& columnId, const int& id);
267
268 int fillFromMemory(
269 const BulkLoadBuffer& overFlowBufIn,
270 const char* input, size_t length, size_t *parse_length, RID& totalReadRows,
271 RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
272 unsigned int allowedErrCntThisCall );
273
274 /** @brief Read the table data into the buffer
275 */
276 int fillFromFile(const BulkLoadBuffer& overFlowBufIn,
277 FILE* handle, RID& totalRows, RID& correctTotalRows,
278 const boost::ptr_vector<ColumnInfo>& columnsInfo,
279 unsigned int allowedErrCntThisCall);
280
281 /** @brief Get the overflow size
282 */
getOverFlowSize()283 int getOverFlowSize() const
284 {
285 return fOverflowSize;
286 }
287
288 /** @brief Parse the buffer data
289 */
290 int parse(ColumnInfo& columnInfo);
291
292 /** @brief Set the delimiter used to delimit the columns within a row
293 */
setColDelimiter(const char & delim)294 void setColDelimiter(const char& delim)
295 {
296 fColDelim = delim;
297 }
298
299 /** @brief Set mode to treat "NULL" string as NULL value or not.
300 */
setNullStringMode(bool bMode)301 void setNullStringMode( bool bMode )
302 {
303 fNullStringMode = bMode;
304 }
305
306 /** @brief Set character optionally used to enclose input column values.
307 */
setEnclosedByChar(char enChar)308 void setEnclosedByChar( char enChar )
309 {
310 fEnclosedByChar = enChar;
311 }
312
313 /** @brief Set escape char to use in conjunction with enclosed by char.
314 */
setEscapeChar(char esChar)315 void setEscapeChar ( char esChar )
316 {
317 fEscapeChar = esChar;
318 }
319
320 /** @brief Get the column status
321 * TableInfo::fSyncUpdatesTI mutex should be locked when calling this
322 * function (see fColumnLocks discussion).
323 */
getColumnStatus(const int & columnId)324 Status getColumnStatus(const int& columnId) const
325 {
326 return fColumnLocks[columnId].status;
327 }
328
329 /** @brief Set the column status
330 * TableInfo::fSyncUpdatesTI mutex should be locked when calling this
331 * function (see fColumnLocks discussion).
332 * @returns TRUE if all columns in the buffer are complete.
333 */
334 bool setColumnStatus(const int& columnId, const Status& status);
335
336 /** @brief Get the error row status's
337 */
getErrorRows()338 const std::vector< std::pair<RID, std::string> >& getErrorRows() const
339 {
340 return fRowStatus;
341 }
342
343 /** @brief Get the error rows
344 */
getExactErrorRows()345 const std::vector<std::string>& getExactErrorRows() const
346 {
347 return fErrRows;
348 }
349
clearErrRows()350 void clearErrRows()
351 {
352 fRowStatus.clear();
353 fErrRows.clear();
354 }
355
356 /** @brief Get the column locker.
357 * TableInfo::fSyncUpdatesTI mutex should be locked when calling this
358 * function (see fColumnLocks discussion).
359 */
getColumnLocker(const int & columnId)360 int getColumnLocker(const int& columnId) const
361 {
362 return fColumnLocks[columnId].locker;
363 }
364
365 /** @brief set truncation as error for this import.
366 */
setTruncationAsError(bool bTruncationAsError)367 void setTruncationAsError(bool bTruncationAsError)
368 {
369 fbTruncationAsError = bTruncationAsError;
370 }
371
372 /** @brief retrieve the tuncation as error setting for this
373 * import. When set, this causes char and varchar strings
374 * that are longer than the column definition to be treated
375 * as errors instead of warnings.
376 */
getTruncationAsError()377 bool getTruncationAsError() const
378 {
379 return fbTruncationAsError;
380 }
381
382 /** @brief Set text vs binary import mode along with corresponding fixed
383 * record length that is used if the binary mode is set to TRUE.
384 */
setImportDataMode(ImportDataMode importMode,unsigned int fixedBinaryRecLen)385 void setImportDataMode( ImportDataMode importMode,
386 unsigned int fixedBinaryRecLen )
387 {
388 fImportDataMode = importMode;
389 fFixedBinaryRecLen = fixedBinaryRecLen;
390 }
391
392 /** @brief set timezone.
393 */
setTimeZone(const std::string & timeZone)394 void setTimeZone(const std::string& timeZone)
395 {
396 fTimeZone = timeZone;
397 }
398 };
399
isTrueWord(const char * field,int fieldLength)400 inline bool isTrueWord(const char *field, int fieldLength)
401 {
402 //return false;
403 return fieldLength == 4 && ( field[0] == 'T' || field[0] == 't' )
404 && ( field[1] == 'R' || field[1] == 'r' )
405 && ( field[2] == 'U' || field[2] == 'u' )
406 && ( field[3] == 'E' || field[3] == 'e' );
407 }
408
409 }
410 #endif
411