1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*********************************************************************
19  *   $Id: we_bulkloadbuffer.h 4489 2013-01-30 18:47:53Z dcathey $
20  *
21  ********************************************************************/
22 #ifndef _WE_BULKLOADBUFFER_H
23 #define _WE_BULKLOADBUFFER_H
24 
25 #include "we_type.h"
26 #include "limits"
27 #include "string"
28 #include "vector"
29 #include "boost/thread/mutex.hpp"
30 #include "boost/ptr_container/ptr_vector.hpp"
31 #include "we_columninfo.h"
32 #include "calpontsystemcatalog.h"
33 
34 namespace WriteEngine
35 {
36 class Log;
37 
38 // Used to collect stats about a BulkLoadBuffer buffer that is being parsed
39 class BLBufferStats
40 {
41 public:
42     int64_t minBufferVal;
43     int64_t maxBufferVal;
44     int64_t satCount;
BLBufferStats(ColDataType colDataType)45     BLBufferStats(ColDataType colDataType) : satCount(0)
46     {
47         if (isUnsigned(colDataType) || isCharType(colDataType))
48         {
49             minBufferVal = static_cast<int64_t>(MAX_UBIGINT);
50             maxBufferVal = static_cast<int64_t>(MIN_UBIGINT);
51         }
52         else
53         {
54             minBufferVal = MAX_BIGINT;
55             maxBufferVal = MIN_BIGINT;
56         }
57     }
58 };
59 
60 class BulkLoadBuffer
61 {
62 private:
63 
64     //--------------------------------------------------------------------------
65     // Private Data Members
66     //--------------------------------------------------------------------------
67 
68     char* fData;                        // Buffer with data read from tbl file
69     char* fDataParser;                  // for temporary use by parser
70 
71     char* fOverflowBuf;                 // Overflow data held for next buffer
72     unsigned fOverflowSize;             // Current size of fOverflowBuf
73 
74     // Information about the locker and status for each column in this buffer.
75     // Note that TableInfo::fSyncUpdatesTI mutex is used to synchronize
76     // access to fColumnLocks and fParseComplete from both read and parse
77     // threads.  Table-scope lock (fSyncUpdatesTI) is used instead of
78     // buffer-scope lock (fSyncUpdatesBLB), to keep state for all buffers
79     // static while we are scanning fColumnLocks for the buffers in a table
80     std::vector<LockInfo> fColumnLocks;
81 
82     // Note that TableInfo::fSyncUpdatesTI mutex (not fStatusBLB) is used
83     // to synchronize getting/setting fStatusBLB between threads.
84     Status fStatusBLB;                  // Status of buffer
85 
86     // TableInfo::fSyncUpdatesTI mutex should be locked when accessing
87     // this data member (see fColumnLocks discussion).
88     unsigned fParseComplete;            // Num of columns that are parseComplete
89 
90     unsigned fTotalRows;                // Max rows this buffer can now hold;
91     //   size of fTokens array
92     std::vector< std::pair<RID, std::string> > fRowStatus; //Status of bad rows
93     std::vector<std::string> fErrRows; // Rejected rows to write to .bad file
94 
95     uint32_t fTotalReadRows;                // Total valid rows read into buffer;
96     //   this count excludes rejected rows
97     uint32_t fTotalReadRowsParser;          // for temporary use by parser
98 
99     uint32_t fTotalReadRowsForLog;          // Total rows read into this buffer
100     //   including invalid rows
101 
102     RID fStartRow;                      // Starting row id for rows in buffer,
103     //   relative to start of job.
104     //   Rejected rows are excluded.
105     RID fStartRowParser;                // for temporary use by parser
106 
107     RID fStartRowForLogging;            // Starting row id for rows in buffer,
108     //   relative to start of current input
109     //   file.  All rows are counted.
110     RID fStartRowForLoggingParser;      // for temporary use by parser
111 
112     uint32_t fAutoIncGenCount;              // How many auto-increment values are
113     //   to be generated for current buffer
114     uint32_t fAutoIncGenCountParser;        // for temporary use by parser
115 
116     uint64_t fAutoIncNextValue;         // Next auto-increment value assign to
117     //   a row in this buffer
118     unsigned fNumberOfColumns;          // Number of ColumnInfo objs in table
119 
120     ColPosPair** fTokens;               // Vector of start and offsets for the
121     //   column values read from tbl files
122     ColPosPair** fTokensParser;         // for temporary use by parser
123 
124     char fColDelim;                     // Character to delimit columns in a row
125     unsigned fBufferSize;               // Size of input read buffer (fData)
126     unsigned fReadSize;                 // Number of bytes in read buffer(fData)
127     boost::mutex fSyncUpdatesBLB;       // Mutex to synchronize updates
128     Log* fLog;                          // Logger object
129     bool fNullStringMode;               // Indicates if "NULL" string is to be
130     //   treated as a NULL value or not
131     char fEnclosedByChar;               // Optional char to enclose col values
132     char fEscapeChar;                   // Used to escape enclosed character
133     int fBufferId;                      // Id for this read buffer
134     std::string fTableName;             // Table assigned to this read buffer
135     JobFieldRefList fFieldList;         // Complete list of cols and flds
136     unsigned int fNumFieldsInFile;      // Number of fields in input file
137     //   (including fields to be ignored)
138     unsigned int fNumColsInFile;        // Number of flds in input file targeted
139     //   for db cols (omits default cols)
140     bool fbTruncationAsError;           // Treat string truncation as error
141     ImportDataMode fImportDataMode;     // Import data in text or binary mode
142     std::string fTimeZone;              // Timezone used by TIMESTAMP datatype
143     unsigned int fFixedBinaryRecLen;    // Fixed rec len used in binary mode
144 
145     //--------------------------------------------------------------------------
146     // Private Functions
147     //--------------------------------------------------------------------------
148 
149     /** @brief Copy constructor
150      */
151     BulkLoadBuffer(const BulkLoadBuffer& buffer);
152 
153     /** @brief Assignment operator
154      */
155     BulkLoadBuffer& operator =(const BulkLoadBuffer& buffer);
156 
157     /** @brief Convert the buffer data depending upon the data type
158      */
159     void convert(char* field, int fieldLength,
160                  bool nullFlag, unsigned char* output,
161                  const JobColumn& column,
162                  BLBufferStats& bufStats);
163 
164     /** @brief Copy the overflow data
165      */
166     void copyOverflow(const BulkLoadBuffer& buffer);
167 
168     /** @brief Parse a Read buffer for a nonDictionary column
169      */
170     int parseCol(ColumnInfo& columnInfo);
171 
172     /** @brief Parse a Read buffer for a nonDictionary column
173      */
174     void parseColLogMinMax(std::ostringstream& oss,
175                            ColDataType         colDataType,
176                            int64_t             minBufferVal,
177                            int64_t             maxBufferVal) const;
178 
179     /** @brief Parse a Read buffer for a Dictionary column
180      */
181     int parseDict(ColumnInfo& columnInfo);
182 
183     /** @brief Parse a Dictionary Read buffer into a ColumnBufferSection.
184      *
185      * Parses the Read buffer into a section up to the point at which
186      * the buffer crosses an extent boundary.
187      *
188      * @param columnInfo    Column being parsed
189      * @oaram tokenPos      Position of rows to be parsed, in fTokens.
190      * @param startRow      Row id of first row in buffer to be parsed.
191      *        Row id is relative to all the rows in this import.
192      * @param totalReadRows Number of buffer rows ready to be parsed
193      * @param nRowsParsed   Number of buffer rows that were parsed
194      */
195     int parseDictSection(ColumnInfo& columnInfo, int tokenPos,
196                          RID startRow, uint32_t totalReadRows,
197                          uint32_t& nRowsParsed);
198 
199     /** @brief Expand the size of the fTokens array
200      */
201     void resizeTokenArray();
202 
203     /** @brief tokenize the buffer contents and fill up the token array.
204      */
205     void tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
206                   unsigned int allowedErrCntThisCall);
207 
208     /** @brief Binary tokenization of the buffer, and fill up the token array.
209      */
210     int tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo,
211                        unsigned int allowedErrCntThisCall,
212                        bool bEndOfData);
213 
214     /** @brief Determine if specified value is NULL or not.
215      */
216     bool isBinaryFieldNull(void* val, WriteEngine::ColType ct,
217                            execplan::CalpontSystemCatalog::ColDataType dt);
218 
219 public:
220 
221     /** @brief Constructor
222      * @param noOfCol Number of columns
223      * @param bufferSize Buffer size
224      * @param logger The Log object used for logging
225      * @param bufferId Id assigned to this buffer
226      * @param tableName Name of table associated with this read buffer
227      * @param jobFieldRefList Complete list of cols/flds listed in Job XML file
228      */
229     BulkLoadBuffer(unsigned noOfCols,
230                    unsigned bufferSize, Log* logger,
231                    int bufferId, const std::string& tableName,
232                    const JobFieldRefList& jobFieldRefList);
233 
234     /** @brief Destructor
235     */
236     ~BulkLoadBuffer();
237 
238     /** @brief Resets the values of the members (excluding column locks)
239      */
240     void reset();
241 
242     /** @brief Resets the column locks.
243      * TableInfo::fSyncUpdatesTI mutex should be locked when calling this
244      * function (see fColumnLocks discussion).
245      */
246     void resetColumnLocks();
247 
248     /** @brief Get the buffer status
249      */
getStatusBLB()250     Status getStatusBLB() const
251     {
252         return fStatusBLB;
253     }
254 
255     /** @brief Set the buffer status
256      */
setStatusBLB(const Status & status)257     void setStatusBLB(const Status& status)
258     {
259         fStatusBLB = status;
260     }
261 
262     /** @brief Try to lock a column for the buffer
263      * TableInfo::fSyncUpdatesTI mutex should be locked when calling this
264      * function (see fColumnLocks discussion).
265      */
266     bool tryAndLockColumn(const int& columnId, const int& id);
267 
268     int fillFromMemory(
269         const BulkLoadBuffer& overFlowBufIn,
270         const char* input, size_t length, size_t *parse_length, RID& totalReadRows,
271         RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
272         unsigned int allowedErrCntThisCall );
273 
274     /** @brief Read the table data into the buffer
275      */
276     int fillFromFile(const BulkLoadBuffer& overFlowBufIn,
277                      FILE* handle, RID& totalRows, RID& correctTotalRows,
278                      const boost::ptr_vector<ColumnInfo>& columnsInfo,
279                      unsigned int allowedErrCntThisCall);
280 
281     /** @brief Get the overflow size
282      */
getOverFlowSize()283     int getOverFlowSize()  const
284     {
285         return fOverflowSize;
286     }
287 
288     /** @brief Parse the buffer data
289      */
290     int parse(ColumnInfo& columnInfo);
291 
292     /** @brief Set the delimiter used to delimit the columns within a row
293      */
setColDelimiter(const char & delim)294     void setColDelimiter(const char& delim)
295     {
296         fColDelim = delim;
297     }
298 
299     /** @brief Set mode to treat "NULL" string as NULL value or not.
300      */
setNullStringMode(bool bMode)301     void setNullStringMode( bool bMode )
302     {
303         fNullStringMode = bMode;
304     }
305 
306     /** @brief Set character optionally used to enclose input column values.
307      */
setEnclosedByChar(char enChar)308     void setEnclosedByChar( char enChar )
309     {
310         fEnclosedByChar = enChar;
311     }
312 
313     /** @brief Set escape char to use in conjunction with enclosed by char.
314      */
setEscapeChar(char esChar)315     void setEscapeChar    ( char esChar )
316     {
317         fEscapeChar  = esChar;
318     }
319 
320     /** @brief Get the column status
321      *  TableInfo::fSyncUpdatesTI mutex should be locked when calling this
322      *  function (see fColumnLocks discussion).
323      */
getColumnStatus(const int & columnId)324     Status getColumnStatus(const int& columnId) const
325     {
326         return fColumnLocks[columnId].status;
327     }
328 
329     /** @brief Set the column status
330      *  TableInfo::fSyncUpdatesTI mutex should be locked when calling this
331      *  function (see fColumnLocks discussion).
332      *  @returns TRUE if all columns in the buffer are complete.
333      */
334     bool setColumnStatus(const int& columnId, const Status& status);
335 
336     /** @brief Get the error row status's
337      */
getErrorRows()338     const std::vector< std::pair<RID, std::string> >& getErrorRows() const
339     {
340         return fRowStatus;
341     }
342 
343     /** @brief Get the error rows
344      */
getExactErrorRows()345     const std::vector<std::string>& getExactErrorRows() const
346     {
347         return fErrRows;
348     }
349 
clearErrRows()350     void clearErrRows()
351     {
352         fRowStatus.clear();
353         fErrRows.clear();
354     }
355 
356     /** @brief Get the column locker.
357      *  TableInfo::fSyncUpdatesTI mutex should be locked when calling this
358      *  function (see fColumnLocks discussion).
359      */
getColumnLocker(const int & columnId)360     int getColumnLocker(const int& columnId) const
361     {
362         return fColumnLocks[columnId].locker;
363     }
364 
365     /** @brief set truncation as error for this import.
366      */
setTruncationAsError(bool bTruncationAsError)367     void setTruncationAsError(bool bTruncationAsError)
368     {
369         fbTruncationAsError = bTruncationAsError;
370     }
371 
372     /** @brief retrieve the tuncation as error setting for this
373      *  import. When set, this causes char and varchar strings
374      *  that are longer than the column definition to be treated
375      *  as errors instead of warnings.
376      */
getTruncationAsError()377     bool getTruncationAsError() const
378     {
379         return fbTruncationAsError;
380     }
381 
382     /** @brief Set text vs binary import mode along with corresponding fixed
383      *         record length that is used if the binary mode is set to TRUE.
384      */
setImportDataMode(ImportDataMode importMode,unsigned int fixedBinaryRecLen)385     void setImportDataMode( ImportDataMode importMode,
386                             unsigned int fixedBinaryRecLen )
387     {
388         fImportDataMode    = importMode;
389         fFixedBinaryRecLen = fixedBinaryRecLen;
390     }
391 
392     /** @brief set timezone.
393      */
setTimeZone(const std::string & timeZone)394     void setTimeZone(const std::string& timeZone)
395     {
396         fTimeZone = timeZone;
397     }
398 };
399 
isTrueWord(const char * field,int fieldLength)400 inline bool isTrueWord(const char *field, int fieldLength)
401 {
402     //return false;
403     return fieldLength == 4 && ( field[0] == 'T' || field[0] == 't' )
404         && ( field[1] == 'R' || field[1] == 'r' )
405         && ( field[2] == 'U' || field[2] == 'u' )
406         && ( field[3] == 'E' || field[3] == 'e' );
407 }
408 
409 }
410 #endif
411