1 /*
2    Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef NDB_IMPORT_CSV_HPP
26 #define NDB_IMPORT_CSV_HPP
27 
28 #include <ndb_global.h>
29 #include <stdint.h>
30 #include <ndb_limits.h>
31 #include <NdbOut.hpp>
32 #include "NdbImport.hpp"
33 #include "NdbImportUtil.hpp"
34 // STL
35 #include <algorithm>
36 
37 /*
38  * CSV helper class.  There is one Csv instance attached to the Impl
39  * instance.  The Csv instance is not aware of the Impl instance.
40  *
41  * Input: caller passes buffers of CSV data and gets back parsed and
42  * evaluated binary rows.  See struct Input below.
43  *
44  * Output: caller passes binary row data and gets back buffers of
45  * formatted CSV data.  See struct Output below.
46  */
47 
48 class NdbImportCsv {
49 public:
50   typedef NdbImport::Opt Opt;
51   typedef NdbImport::OptCsv OptCsv;
52   typedef NdbImportUtil::Name Name;
53   typedef NdbImportUtil::Lockable Lockable;
54   typedef NdbImportUtil::ListEnt ListEnt;
55   typedef NdbImportUtil::List List;
56   typedef NdbImportUtil::Attr Attr;
57   typedef NdbImportUtil::Attrs Attrs;
58   typedef NdbImportUtil::Table Table;
59   typedef NdbImportUtil::Row Row;
60   typedef NdbImportUtil::Blob Blob;
61   typedef NdbImportUtil::RowList RowList;
62   typedef NdbImportUtil::RowCtl RowCtl;
63   typedef NdbImportUtil::Range Range;
64   typedef NdbImportUtil::RangeList RangeList;
65   typedef NdbImportUtil::RowMap RowMap;
66   typedef NdbImportUtil::Buf Buf;
67   typedef NdbImportUtil::Stats Stats;
68 
69   NdbImportCsv(NdbImportUtil& util);
70   ~NdbImportCsv();
71   NdbImportUtil& m_util;
72   Error& m_error;       // global
73 
74   // spec
75 
76   struct Spec {
77     Spec();
78     ~Spec();
79     // allocated into uchar* with escapes translated
80     const uchar* m_fields_terminated_by;
81     const uchar* m_fields_enclosed_by;
82     const uchar* m_fields_optionally_enclosed_by;
83     const uchar* m_fields_escaped_by;
84     const uchar* m_lines_terminated_by;
85     uint m_fields_terminated_by_len;
86     uint m_fields_enclosed_by_len;
87     uint m_fields_optionally_enclosed_by_len;
88     uint m_fields_escaped_by_len;
89     uint m_lines_terminated_by_len;
90   };
91 
92   // return allocated translated string and its length
93   int translate_escapes(const char* src, const uchar*& dst, uint& dstlen);
94   int set_spec(Spec& spec, const OptCsv& optcsv, OptCsv::Mode mode);
95   int set_spec(const OptCsv& optcsv, OptCsv::Mode mode);
96 
97   // items
98 
99   struct Chunk {
100     uint m_pos; // start position
101     uint m_len; // number of bytes returned starting at m_pos
102     uint m_end; // end position (possibly m_end > m_pos + m_len)
103   };
104 
105   struct Data : ListEnt {
DataNdbImportCsv::Data106     Data() {
107       m_pos = 0;
108       m_len = 0;
109       m_end = 0;
110       m_escape = false;
111     }
nextNdbImportCsv::Data112     Data* next() {
113       return static_cast<Data*>(m_next);
114     }
115     uint m_pos;
116     uint m_len;
117     uint m_end;
118     bool m_escape;
119   };
120 
121   struct DataList : private List {
frontNdbImportCsv::DataList122     Data* front() {
123       return static_cast<Data*>(List::m_front);
124     }
backNdbImportCsv::DataList125     Data* back() {
126       return static_cast<Data*>(List::m_back);
127     }
push_backNdbImportCsv::DataList128     void push_back(Data* data) {
129       List::push_back(data);
130     }
pop_frontNdbImportCsv::DataList131     Data* pop_front() {
132       return static_cast<Data*>(List::pop_front());
133     }
push_back_fromNdbImportCsv::DataList134     void push_back_from(DataList& src) {
135       List::push_back_from(src);
136     }
cntNdbImportCsv::DataList137     uint cnt() const {
138       return m_cnt;
139     }
140   };
141 
142   struct Field : ListEnt {
FieldNdbImportCsv::Field143     Field() {
144       m_fieldno = 0;
145       m_pos = 0;
146       m_end = 0;
147       m_pack_pos = 0;
148       m_pack_end = 0;
149       m_null = false;
150     }
nextNdbImportCsv::Field151     Field* next() {
152       return static_cast<Field*>(m_next);
153     }
is_emptyNdbImportCsv::Field154     bool is_empty() const {
155       return (m_pos == m_end);
156     }
157     uint m_fieldno;
158     uint m_pos;
159     uint m_end;
160     uint m_pack_pos;
161     uint m_pack_end;
162     bool m_null;
163     DataList m_data_list;
164   };
165 
166   struct FieldList : private List {
frontNdbImportCsv::FieldList167     Field* front() {
168       return static_cast<Field*>(List::m_front);
169     }
push_backNdbImportCsv::FieldList170     void push_back(Field* field) {
171       List::push_back(field);
172     }
pop_frontNdbImportCsv::FieldList173     Field* pop_front() {
174       return static_cast<Field*>(List::pop_front());
175     }
push_back_fromNdbImportCsv::FieldList176     void push_back_from(FieldList& src) {
177       List::push_back_from(src);
178     }
cntNdbImportCsv::FieldList179     uint cnt() const {
180       return m_cnt;
181     }
final_field_is_emptyNdbImportCsv::FieldList182     bool final_field_is_empty() const {
183       return (static_cast<Field*>(m_back))->is_empty();
184     }
pop_backNdbImportCsv::FieldList185     Field * pop_back() {
186       return static_cast<Field*>(List::pop_back());
187     }
188   };
189 
190   struct Line : ListEnt {
LineNdbImportCsv::Line191     Line() {
192       m_lineno = 0;
193       m_pos = 0;
194       m_end = 0;
195       m_reject = false;
196     }
nextNdbImportCsv::Line197     Line* next() {
198       return static_cast<Line*>(m_next);
199     }
200     uint m_lineno;
201     uint m_pos;
202     uint m_end;
203     bool m_reject;
204     FieldList m_field_list;
205   };
206 
207   struct LineList : private List {
frontNdbImportCsv::LineList208     Line* front() {
209       return static_cast<Line*>(List::m_front);
210     }
backNdbImportCsv::LineList211     Line* back() {
212       return static_cast<Line*>(List::m_back);
213     }
push_backNdbImportCsv::LineList214     void push_back(Line* line) {
215       List::push_back(line);
216     }
pop_frontNdbImportCsv::LineList217     Line* pop_front() {
218       return static_cast<Line*>(List::pop_front());
219     }
push_back_fromNdbImportCsv::LineList220     void push_back_from(LineList& src) {
221       List::push_back_from(src);
222     }
cntNdbImportCsv::LineList223     uint cnt() const {
224       return m_cnt;
225     }
226   };
227 
228   struct Alloc {
229     Alloc();
230     Data* alloc_data();
231     Field* alloc_field();
232     Line* alloc_line();
233     void free_data_list(DataList& data_list);
234     void free_field_list(FieldList& field_list);
235     void free_field(Field *);
236     void free_line_list(LineList& line_list);
237     bool balanced();
238     DataList m_data_free;
239     FieldList m_field_free;
240     LineList m_line_free;
241     uint m_alloc_data_cnt;
242     uint m_alloc_field_cnt;
243     uint m_alloc_line_cnt;
244     uint m_free_data_cnt;
245     uint m_free_field_cnt;
246     uint m_free_line_cnt;
247   };
248 
249   void free_data_list(Data*& data);
250   void free_field_list(Field*& field);
251   void free_line_list(Line*& line);
252 
253   // input
254 
255   /*
256    * CSV input.
257    *
258    * Each CSV input worker has its own Input instance and buffer.
259    * The input buffer is "split" i.e. has upper and lower halves.
260    *
261    * The input file is always owned by some CSV input worker.  The
262    * worker reads a block of data into its buffer lower half.  File
263    * ownership is passed immediately to the next worker so it can
264    * read next file block.  And so on.
265    *
266    * Meanwhile current worker does parse to find lines and fields.
267    * The last line is usually partial, causing parse error, but if
268    * the last token was end-of-data we can assume that no real error
269    * occurred.  The partial line ("tail") is copied to the upper
270    * half of next input worker buffer just above the lower half.
271    * The next worker can then do its own parse.
272    *
273    * Meanwhile current worker proceeds with evaluation of the lines
274    * and fields found.  The resulting rows are stored locally until
275    * a separate send step pipes them to relay rows (rows_out).
276    *
277    * Parsing uses bison.  The CSV delimiters are not fixed so the
278    * lex part is hand-coded with lookup tables.  We require that
279    * each non-empty delimiter starts with a different special char.
280    * Also a strict format with field separators and line terminators
281    * is required.
282    */
283 
284   struct Parse;
285   struct Eval;
286 
287   struct Input : Alloc {
288     Input(NdbImportCsv& csv,
289           const char* name,
290           const Spec& spec,
291           const Table& table,
292           Buf& buf,
293           RowList& rows_out,
294           RowList& rows_reject,
295           RowMap& rowmap_in,
296           Stats& stats);
297     ~Input();
298     void do_init();
299     void do_resume(Range range_in);
300     void do_parse();
301     void do_eval();
302     void do_send(uint& curr, uint& left);
303     void do_movetail(Input& input2);
304     void reject_line(const Line* line,
305                      const Field* field,
306                      const Error& error);
307     void print(NdbOut& out);
308     NdbImportCsv& m_csv;
309     NdbImportUtil& m_util;
310     Name m_name;
311     const Spec& m_spec;
312     const Table& m_table;
313     Buf& m_buf;
314     RowList& m_rows_out;
315     RowList& m_rows_reject;
316     RowMap& m_rowmap_in;
317     Error m_error;      // local csv error
has_errorNdbImportCsv::Input318     bool has_error() {
319       return m_util.has_error(m_error);
320     }
321     LineList m_line_list;
322     RowList m_rows;     // lines eval'd to rows
323     Parse* m_parse;
324     Eval* m_eval;
325     uint64 m_startpos;
326     uint64 m_startlineno;
327     uint64 m_ignore_lines;
328   };
329 
330   // parse
331 
332   static const uint g_bytecnt = 256;
333 
334   struct Parse {
335     enum State {
336       State_plain = 0,
337       State_quote = 1,
338       State_escape = 2
339     };
340     static const int g_statecnt = State_escape + 1;
341     Parse(Input& input);
342     void do_init();
343     void push_state(State state);
344     void pop_state();
345     void do_parse();
346     int do_lex(union YYSTYPE* lvalp);
347     void do_error(const char* msg);
348     void pack_field(Field* field);
349     Input& m_input;
350     NdbImportCsv& m_csv;
351     NdbImportUtil& m_util;
352     Error& m_error;     // team level
353     int m_trans[g_statecnt][g_bytecnt];
354     static const uint g_stackmax = 10;
355     uint m_stacktop;
356     State m_state[g_stackmax];
357     uint m_escapes[g_bytecnt];
358     int m_last_token;
359     // parse temporaries
360     LineList m_line_list;
361     FieldList m_field_list;
362     DataList m_data_list;
363   };
364 
365   static const char* g_str_state(Parse::State state);
366 
367   // eval
368 
369   struct Eval {
370     Eval(Input& input);
371     ~Eval();
372     void do_init();
373     void do_eval();
374     void eval_line(Row* row, Line* line);
375     void eval_field(Row* row, Line* line, Field* field);
376     void eval_null(Row* row, Line* line, Field* field);
377     Input& m_input;
378     NdbImportCsv& m_csv;
379     NdbImportUtil& m_util;
380     Error& m_error;     // team level
381   };
382 
383   // output
384 
385   /*
386    * CSV output.
387    *
388    * Currently used only by the diagnostics worker to write results
389    * etc into CSV files.  The worker adds one row at a time and gets
390    * back formatted CSV data in the buffer, which it then writes
391    * immediately to the associated file.
392    *
393    * A high-performance multi-threaded CSV output team might appear
394    * in the future (ndb_export).
395    */
396 
397   struct Output {
398     Output(NdbImportCsv& csv,
399            const Spec& spec,
400            const Table& table,
401            Buf& buf);
402     void do_init();
403     void add_header();
404     void add_line(const Row* row);
405     void add_field(const Attr& attr, const Row* row);
406     void add_char(const uchar* data, uint len);
407     void add_quote();
408     void add_fieldsep();
409     void add_lineend();
410     NdbImportCsv& m_csv;
411     NdbImportUtil& m_util;
412     const Spec& m_spec;
413     const Table& m_table;
414     Buf& m_buf;
415     uchar m_escapes[g_bytecnt];
416   };
417 };
418 
419 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Input& input);
420 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Parse& parse);
421 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Eval& eval);
422 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Output& output);
423 
424 #endif
425