1 /* 2 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 #ifndef NDB_IMPORT_CSV_HPP 26 #define NDB_IMPORT_CSV_HPP 27 28 #include <ndb_global.h> 29 #include <stdint.h> 30 #include <ndb_limits.h> 31 #include <NdbOut.hpp> 32 #include "NdbImport.hpp" 33 #include "NdbImportUtil.hpp" 34 // STL 35 #include <algorithm> 36 37 /* 38 * CSV helper class. There is one Csv instance attached to the Impl 39 * instance. The Csv instance is not aware of the Impl instance. 40 * 41 * Input: caller passes buffers of CSV data and gets back parsed and 42 * evaluated binary rows. See struct Input below. 43 * 44 * Output: caller passes binary row data and gets back buffers of 45 * formatted CSV data. See struct Output below. 46 */ 47 48 class NdbImportCsv { 49 public: 50 typedef NdbImport::Opt Opt; 51 typedef NdbImport::OptCsv OptCsv; 52 typedef NdbImportUtil::Name Name; 53 typedef NdbImportUtil::Lockable Lockable; 54 typedef NdbImportUtil::ListEnt ListEnt; 55 typedef NdbImportUtil::List List; 56 typedef NdbImportUtil::Attr Attr; 57 typedef NdbImportUtil::Attrs Attrs; 58 typedef NdbImportUtil::Table Table; 59 typedef NdbImportUtil::Row Row; 60 typedef NdbImportUtil::Blob Blob; 61 typedef NdbImportUtil::RowList RowList; 62 typedef NdbImportUtil::RowCtl RowCtl; 63 typedef NdbImportUtil::Range Range; 64 typedef NdbImportUtil::RangeList RangeList; 65 typedef NdbImportUtil::RowMap RowMap; 66 typedef NdbImportUtil::Buf Buf; 67 typedef NdbImportUtil::Stats Stats; 68 69 NdbImportCsv(NdbImportUtil& util); 70 ~NdbImportCsv(); 71 NdbImportUtil& m_util; 72 Error& m_error; // global 73 74 // spec 75 76 struct Spec { 77 Spec(); 78 ~Spec(); 79 // allocated into uchar* with escapes translated 80 const uchar* m_fields_terminated_by; 81 const uchar* m_fields_enclosed_by; 82 const uchar* m_fields_optionally_enclosed_by; 83 const uchar* m_fields_escaped_by; 84 const uchar* m_lines_terminated_by; 85 uint m_fields_terminated_by_len; 86 uint m_fields_enclosed_by_len; 87 uint m_fields_optionally_enclosed_by_len; 88 uint m_fields_escaped_by_len; 89 uint m_lines_terminated_by_len; 90 }; 91 92 // return allocated translated string and its length 93 int translate_escapes(const char* src, const uchar*& dst, uint& dstlen); 94 int set_spec(Spec& spec, const OptCsv& optcsv, OptCsv::Mode mode); 95 int set_spec(const OptCsv& optcsv, OptCsv::Mode mode); 96 97 // items 98 99 struct Chunk { 100 uint m_pos; // start position 101 uint m_len; // number of bytes returned starting at m_pos 102 uint m_end; // end position (possibly m_end > m_pos + m_len) 103 }; 104 105 struct Data : ListEnt { DataNdbImportCsv::Data106 Data() { 107 m_pos = 0; 108 m_len = 0; 109 m_end = 0; 110 m_escape = false; 111 } nextNdbImportCsv::Data112 Data* next() { 113 return static_cast<Data*>(m_next); 114 } 115 uint m_pos; 116 uint m_len; 117 uint m_end; 118 bool m_escape; 119 }; 120 121 struct DataList : private List { frontNdbImportCsv::DataList122 Data* front() { 123 return static_cast<Data*>(List::m_front); 124 } backNdbImportCsv::DataList125 Data* back() { 126 return static_cast<Data*>(List::m_back); 127 } push_backNdbImportCsv::DataList128 void push_back(Data* data) { 129 List::push_back(data); 130 } pop_frontNdbImportCsv::DataList131 Data* pop_front() { 132 return static_cast<Data*>(List::pop_front()); 133 } push_back_fromNdbImportCsv::DataList134 void push_back_from(DataList& src) { 135 List::push_back_from(src); 136 } cntNdbImportCsv::DataList137 uint cnt() const { 138 return m_cnt; 139 } 140 }; 141 142 struct Field : ListEnt { FieldNdbImportCsv::Field143 Field() { 144 m_fieldno = 0; 145 m_pos = 0; 146 m_end = 0; 147 m_pack_pos = 0; 148 m_pack_end = 0; 149 m_null = false; 150 } nextNdbImportCsv::Field151 Field* next() { 152 return static_cast<Field*>(m_next); 153 } is_emptyNdbImportCsv::Field154 bool is_empty() const { 155 return (m_pos == m_end); 156 } 157 uint m_fieldno; 158 uint m_pos; 159 uint m_end; 160 uint m_pack_pos; 161 uint m_pack_end; 162 bool m_null; 163 DataList m_data_list; 164 }; 165 166 struct FieldList : private List { frontNdbImportCsv::FieldList167 Field* front() { 168 return static_cast<Field*>(List::m_front); 169 } push_backNdbImportCsv::FieldList170 void push_back(Field* field) { 171 List::push_back(field); 172 } pop_frontNdbImportCsv::FieldList173 Field* pop_front() { 174 return static_cast<Field*>(List::pop_front()); 175 } push_back_fromNdbImportCsv::FieldList176 void push_back_from(FieldList& src) { 177 List::push_back_from(src); 178 } cntNdbImportCsv::FieldList179 uint cnt() const { 180 return m_cnt; 181 } final_field_is_emptyNdbImportCsv::FieldList182 bool final_field_is_empty() const { 183 return (static_cast<Field*>(m_back))->is_empty(); 184 } pop_backNdbImportCsv::FieldList185 Field * pop_back() { 186 return static_cast<Field*>(List::pop_back()); 187 } 188 }; 189 190 struct Line : ListEnt { LineNdbImportCsv::Line191 Line() { 192 m_lineno = 0; 193 m_pos = 0; 194 m_end = 0; 195 m_reject = false; 196 } nextNdbImportCsv::Line197 Line* next() { 198 return static_cast<Line*>(m_next); 199 } 200 uint m_lineno; 201 uint m_pos; 202 uint m_end; 203 bool m_reject; 204 FieldList m_field_list; 205 }; 206 207 struct LineList : private List { frontNdbImportCsv::LineList208 Line* front() { 209 return static_cast<Line*>(List::m_front); 210 } backNdbImportCsv::LineList211 Line* back() { 212 return static_cast<Line*>(List::m_back); 213 } push_backNdbImportCsv::LineList214 void push_back(Line* line) { 215 List::push_back(line); 216 } pop_frontNdbImportCsv::LineList217 Line* pop_front() { 218 return static_cast<Line*>(List::pop_front()); 219 } push_back_fromNdbImportCsv::LineList220 void push_back_from(LineList& src) { 221 List::push_back_from(src); 222 } cntNdbImportCsv::LineList223 uint cnt() const { 224 return m_cnt; 225 } 226 }; 227 228 struct Alloc { 229 Alloc(); 230 Data* alloc_data(); 231 Field* alloc_field(); 232 Line* alloc_line(); 233 void free_data_list(DataList& data_list); 234 void free_field_list(FieldList& field_list); 235 void free_field(Field *); 236 void free_line_list(LineList& line_list); 237 bool balanced(); 238 DataList m_data_free; 239 FieldList m_field_free; 240 LineList m_line_free; 241 uint m_alloc_data_cnt; 242 uint m_alloc_field_cnt; 243 uint m_alloc_line_cnt; 244 uint m_free_data_cnt; 245 uint m_free_field_cnt; 246 uint m_free_line_cnt; 247 }; 248 249 void free_data_list(Data*& data); 250 void free_field_list(Field*& field); 251 void free_line_list(Line*& line); 252 253 // input 254 255 /* 256 * CSV input. 257 * 258 * Each CSV input worker has its own Input instance and buffer. 259 * The input buffer is "split" i.e. has upper and lower halves. 260 * 261 * The input file is always owned by some CSV input worker. The 262 * worker reads a block of data into its buffer lower half. File 263 * ownership is passed immediately to the next worker so it can 264 * read next file block. And so on. 265 * 266 * Meanwhile current worker does parse to find lines and fields. 267 * The last line is usually partial, causing parse error, but if 268 * the last token was end-of-data we can assume that no real error 269 * occurred. The partial line ("tail") is copied to the upper 270 * half of next input worker buffer just above the lower half. 271 * The next worker can then do its own parse. 272 * 273 * Meanwhile current worker proceeds with evaluation of the lines 274 * and fields found. The resulting rows are stored locally until 275 * a separate send step pipes them to relay rows (rows_out). 276 * 277 * Parsing uses bison. The CSV delimiters are not fixed so the 278 * lex part is hand-coded with lookup tables. We require that 279 * each non-empty delimiter starts with a different special char. 280 * Also a strict format with field separators and line terminators 281 * is required. 282 */ 283 284 struct Parse; 285 struct Eval; 286 287 struct Input : Alloc { 288 Input(NdbImportCsv& csv, 289 const char* name, 290 const Spec& spec, 291 const Table& table, 292 Buf& buf, 293 RowList& rows_out, 294 RowList& rows_reject, 295 RowMap& rowmap_in, 296 Stats& stats); 297 ~Input(); 298 void do_init(); 299 void do_resume(Range range_in); 300 void do_parse(); 301 void do_eval(); 302 void do_send(uint& curr, uint& left); 303 void do_movetail(Input& input2); 304 void reject_line(const Line* line, 305 const Field* field, 306 const Error& error); 307 void print(NdbOut& out); 308 NdbImportCsv& m_csv; 309 NdbImportUtil& m_util; 310 Name m_name; 311 const Spec& m_spec; 312 const Table& m_table; 313 Buf& m_buf; 314 RowList& m_rows_out; 315 RowList& m_rows_reject; 316 RowMap& m_rowmap_in; 317 Error m_error; // local csv error has_errorNdbImportCsv::Input318 bool has_error() { 319 return m_util.has_error(m_error); 320 } 321 LineList m_line_list; 322 RowList m_rows; // lines eval'd to rows 323 Parse* m_parse; 324 Eval* m_eval; 325 uint64 m_startpos; 326 uint64 m_startlineno; 327 uint64 m_ignore_lines; 328 }; 329 330 // parse 331 332 static const uint g_bytecnt = 256; 333 334 struct Parse { 335 enum State { 336 State_plain = 0, 337 State_quote = 1, 338 State_escape = 2 339 }; 340 static const int g_statecnt = State_escape + 1; 341 Parse(Input& input); 342 void do_init(); 343 void push_state(State state); 344 void pop_state(); 345 void do_parse(); 346 int do_lex(union YYSTYPE* lvalp); 347 void do_error(const char* msg); 348 void pack_field(Field* field); 349 Input& m_input; 350 NdbImportCsv& m_csv; 351 NdbImportUtil& m_util; 352 Error& m_error; // team level 353 int m_trans[g_statecnt][g_bytecnt]; 354 static const uint g_stackmax = 10; 355 uint m_stacktop; 356 State m_state[g_stackmax]; 357 uint m_escapes[g_bytecnt]; 358 int m_last_token; 359 // parse temporaries 360 LineList m_line_list; 361 FieldList m_field_list; 362 DataList m_data_list; 363 }; 364 365 static const char* g_str_state(Parse::State state); 366 367 // eval 368 369 struct Eval { 370 Eval(Input& input); 371 ~Eval(); 372 void do_init(); 373 void do_eval(); 374 void eval_line(Row* row, Line* line); 375 void eval_field(Row* row, Line* line, Field* field); 376 void eval_null(Row* row, Line* line, Field* field); 377 Input& m_input; 378 NdbImportCsv& m_csv; 379 NdbImportUtil& m_util; 380 Error& m_error; // team level 381 }; 382 383 // output 384 385 /* 386 * CSV output. 387 * 388 * Currently used only by the diagnostics worker to write results 389 * etc into CSV files. The worker adds one row at a time and gets 390 * back formatted CSV data in the buffer, which it then writes 391 * immediately to the associated file. 392 * 393 * A high-performance multi-threaded CSV output team might appear 394 * in the future (ndb_export). 395 */ 396 397 struct Output { 398 Output(NdbImportCsv& csv, 399 const Spec& spec, 400 const Table& table, 401 Buf& buf); 402 void do_init(); 403 void add_header(); 404 void add_line(const Row* row); 405 void add_field(const Attr& attr, const Row* row); 406 void add_char(const uchar* data, uint len); 407 void add_quote(); 408 void add_fieldsep(); 409 void add_lineend(); 410 NdbImportCsv& m_csv; 411 NdbImportUtil& m_util; 412 const Spec& m_spec; 413 const Table& m_table; 414 Buf& m_buf; 415 uchar m_escapes[g_bytecnt]; 416 }; 417 }; 418 419 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Input& input); 420 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Parse& parse); 421 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Eval& eval); 422 NdbOut& operator<<(NdbOut& out, const NdbImportCsv::Output& output); 423 424 #endif 425