1 /*
2 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "m_ctype.h"
26 #include "my_byteorder.h"
27 #include "my_sys.h"
28 #include <inttypes.h>
29 #include <NdbSqlUtil.hpp>
30 #include <decimal_utils.hpp>
31 #include "NdbImportCsv.hpp"
32 #include "NdbImportCsvGram.hpp"
33 // STL
34 #include <cmath>
35
36 extern int NdbImportCsv_yyparse(NdbImportCsv::Parse& csvparse);
37 #ifdef VM_TRACE
38 extern int NdbImportCsv_yydebug;
39 #endif
40
NdbImportCsv(NdbImportUtil & util)41 NdbImportCsv::NdbImportCsv(NdbImportUtil& util) :
42 m_util(util),
43 m_error(m_util.c_error)
44 {
45 #ifdef VM_TRACE
46 NdbImportCsv_yydebug = 0;
47 #endif
48 }
49
~NdbImportCsv()50 NdbImportCsv::~NdbImportCsv()
51 {
52 }
53
54 // spec
55
Spec()56 NdbImportCsv::Spec::Spec()
57 {
58 m_fields_terminated_by = 0;
59 m_fields_enclosed_by = 0;
60 m_fields_optionally_enclosed_by = 0;
61 m_fields_escaped_by = 0;
62 m_lines_terminated_by = 0;
63 m_fields_terminated_by_len = Inval_uint;
64 m_fields_enclosed_by_len = Inval_uint;
65 m_fields_optionally_enclosed_by_len = Inval_uint;
66 m_fields_escaped_by_len = Inval_uint;
67 m_lines_terminated_by_len = Inval_uint;
68 }
69
~Spec()70 NdbImportCsv::Spec::~Spec()
71 {
72 delete [] m_fields_terminated_by;
73 delete [] m_fields_enclosed_by;
74 delete [] m_fields_optionally_enclosed_by;
75 delete [] m_fields_escaped_by;
76 delete [] m_lines_terminated_by;
77 }
78
79 int
translate_escapes(const char * src,const uchar * & dst,uint & dstlen)80 NdbImportCsv::translate_escapes(const char* src,
81 const uchar*& dst,
82 uint& dstlen)
83 {
84 dst = 0;
85 dstlen = Inval_uint;
86 if (src != 0)
87 {
88 uint n = strlen(src);
89 uchar* tmpdst = new uchar [n + 1]; // cannot be longer than src
90 const char* p = src;
91 uchar* q = tmpdst;
92 while (*p != 0)
93 {
94 if (*p != '\\')
95 {
96 *q++ = (uchar)*p++;
97 }
98 else
99 {
100 // XXX check what mysqlimport translates
101 char c = *++p;
102 switch (c) {
103 case '\\':
104 *q++ = '\\';
105 break;
106 case 'n':
107 *q++ = '\n';
108 break;
109 case 'r':
110 *q++ = '\r';
111 break;
112 case 't':
113 *q++ = '\t';
114 break;
115 default:
116 m_util.set_error_usage(m_error, __LINE__,
117 "unknown escape '\\%c' (0x%x) in CSV option",
118 c, (uint)(unsigned char)c);
119 return -1;
120 }
121 p++;
122 }
123 }
124 // null-terminate for use as char*
125 *q = 0;
126 dst = tmpdst;
127 dstlen = q - tmpdst;
128 }
129 return 0;
130 }
131
132 int
set_spec(Spec & spec,const OptCsv & optcsv,OptCsv::Mode mode)133 NdbImportCsv::set_spec(Spec& spec, const OptCsv& optcsv, OptCsv::Mode mode)
134 {
135 if (translate_escapes(optcsv.m_fields_terminated_by,
136 spec.m_fields_terminated_by,
137 spec.m_fields_terminated_by_len) == -1)
138 return -1;
139 if (translate_escapes(optcsv.m_fields_enclosed_by,
140 spec.m_fields_enclosed_by,
141 spec.m_fields_enclosed_by_len) == -1)
142 return -1;
143 if (translate_escapes(optcsv.m_fields_optionally_enclosed_by,
144 spec.m_fields_optionally_enclosed_by,
145 spec.m_fields_optionally_enclosed_by_len) == -1)
146 return -1;
147 if (translate_escapes(optcsv.m_fields_escaped_by,
148 spec.m_fields_escaped_by,
149 spec.m_fields_escaped_by_len) == -1)
150 return -1;
151 if (translate_escapes(optcsv.m_lines_terminated_by,
152 spec.m_lines_terminated_by,
153 spec.m_lines_terminated_by_len) == -1)
154 return -1;
155 int used[256];
156 for (uint i = 0; i < 256; i++)
157 used[i] = 0;
158 do {
159 // fields-terminated-by
160 {
161 if (spec.m_fields_terminated_by == 0 ||
162 spec.m_fields_terminated_by_len == 0)
163 {
164 const char* msg =
165 "fields-terminated-by cannot be empty";
166 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
167 break;
168 }
169 uchar u = spec.m_fields_terminated_by[0];
170 if (used[u])
171 {
172 const char* msg =
173 "fields-terminated-by re-uses previous special char";
174 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
175 break;
176 }
177 used[u] = T_FIELDSEP;
178 }
179 // fields-enclosed-by
180 {
181 if (spec.m_fields_enclosed_by != 0)
182 {
183 if (spec.m_fields_enclosed_by_len != 1)
184 {
185 const char* msg =
186 "fields-enclosed-by must be a single char";
187 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
188 break;
189 }
190 uchar u = spec.m_fields_enclosed_by[0];
191 if (used[u])
192 {
193 const char* msg =
194 "fields-enclosed-by re-uses previous special char";
195 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
196 break;
197 }
198 used[u] = T_QUOTE;
199 }
200 }
201 // fields-optionally-enclosed-by
202 {
203 if (spec.m_fields_optionally_enclosed_by != 0)
204 {
205 if (spec.m_fields_optionally_enclosed_by_len != 1)
206 {
207 const char* msg =
208 "fields-optionally-enclosed-by must be a single char";
209 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
210 break;
211 }
212 uchar u = spec.m_fields_optionally_enclosed_by[0];
213 if (used[u] && used[u] != T_QUOTE)
214 {
215 const char* msg =
216 "fields-optionally-enclosed-by re-uses previous special char";
217 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
218 break;
219 }
220 used[u] = T_QUOTE;
221 }
222 }
223 // fields-escaped-by
224 {
225 require(spec.m_fields_escaped_by != 0);
226 if (spec.m_fields_escaped_by_len != 1)
227 {
228 const char* msg =
229 "fields-escaped-by must be empty or a single char";
230 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
231 break;
232 }
233 uchar u = spec.m_fields_escaped_by[0];
234 if (used[u])
235 {
236 const char* msg =
237 "fields-escaped-by re-uses previous special char";
238 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
239 break;
240 }
241 used[u] = T_ESCAPE;
242 }
243 // lines terminated-by
244 {
245 require(spec.m_lines_terminated_by != 0);
246 if (spec.m_lines_terminated_by_len == 0)
247 {
248 const char* msg =
249 "lines-terminated-by cannot be empty";
250 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
251 break;
252 }
253 uchar u = spec.m_lines_terminated_by[0];
254 if (used[u])
255 {
256 const char* msg =
257 "lines-terminated-by re-uses previous special char";
258 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
259 break;
260 }
261 used[u] = T_LINEEND;
262 }
263 // adjust
264 if (mode == OptCsv::ModeInput)
265 {
266 /*
267 * fields-enclosed-by and fields-optionally-enclosed-by
268 * have exact same meaning
269 */
270 if (spec.m_fields_enclosed_by != 0 &&
271 spec.m_fields_optionally_enclosed_by != 0)
272 {
273 if (spec.m_fields_enclosed_by_len !=
274 spec.m_fields_optionally_enclosed_by_len ||
275 memcmp(spec.m_fields_enclosed_by,
276 spec.m_fields_optionally_enclosed_by,
277 spec.m_fields_enclosed_by_len) != 0)
278 {
279 const char* msg =
280 "conflicting fields-enclosed-by options";
281 m_util.set_error_usage(m_error, __LINE__, "%s", msg);
282 break;
283 }
284 }
285 else if (spec.m_fields_enclosed_by != 0)
286 {
287 // for completeness - will not be used
288 uchar* fields_optionally_enclosed_by =
289 new uchar [spec.m_fields_enclosed_by_len + 1];
290 memcpy(fields_optionally_enclosed_by,
291 spec.m_fields_enclosed_by,
292 spec.m_fields_enclosed_by_len + 1);
293 spec.m_fields_optionally_enclosed_by =
294 fields_optionally_enclosed_by;
295 spec.m_fields_optionally_enclosed_by_len =
296 spec.m_fields_enclosed_by_len;
297 }
298 else if (spec.m_fields_optionally_enclosed_by != 0)
299 {
300 uchar* fields_enclosed_by =
301 new uchar [spec.m_fields_optionally_enclosed_by_len + 1];
302 memcpy(fields_enclosed_by,
303 spec.m_fields_optionally_enclosed_by,
304 spec.m_fields_optionally_enclosed_by_len + 1);
305 spec.m_fields_enclosed_by =
306 fields_enclosed_by;
307 spec.m_fields_enclosed_by_len =
308 spec.m_fields_optionally_enclosed_by_len;
309 }
310 }
311 if (mode == OptCsv::ModeOutput)
312 {
313 // XXX later
314 }
315 return 0;
316 } while (0);
317 return -1;
318 }
319
320 // alloc
321
Alloc()322 NdbImportCsv::Alloc::Alloc()
323 {
324 m_alloc_data_cnt = 0;
325 m_alloc_field_cnt = 0;
326 m_alloc_line_cnt = 0;
327 m_free_data_cnt = 0;
328 m_free_field_cnt = 0;
329 m_free_line_cnt = 0;
330 }
331
332 NdbImportCsv::Data*
alloc_data()333 NdbImportCsv::Alloc::alloc_data()
334 {
335 Data* data = m_data_free.pop_front();
336 if (data == 0)
337 data = new Data;
338 else
339 new (data) Data;
340 m_alloc_data_cnt++;
341 return data;
342 }
343
344 void
free_data_list(DataList & data_list)345 NdbImportCsv::Alloc::free_data_list(DataList& data_list)
346 {
347 m_free_data_cnt += data_list.cnt();
348 m_data_free.push_back_from(data_list);
349 }
350
351 NdbImportCsv::Field*
alloc_field()352 NdbImportCsv::Alloc::alloc_field()
353 {
354 Field* field = m_field_free.pop_front();
355 if (field == 0)
356 field = new Field;
357 else
358 new (field) Field;
359 m_alloc_field_cnt++;
360 return field;
361 }
362
363 void
free_field_list(FieldList & field_list)364 NdbImportCsv::Alloc::free_field_list(FieldList& field_list)
365 {
366 Field* field = field_list.front();
367 while (field != 0)
368 {
369 free_data_list(field->m_data_list);
370 field = field->next();
371 }
372 m_free_field_cnt += field_list.cnt();
373 m_field_free.push_back_from(field_list);
374 }
375
376 void
free_field(Field * field)377 NdbImportCsv::Alloc::free_field(Field *field)
378 {
379 free_data_list(field->m_data_list);
380 m_field_free.push_back(field);
381 m_free_field_cnt++;
382 }
383
384 NdbImportCsv::Line*
alloc_line()385 NdbImportCsv::Alloc::alloc_line()
386 {
387 Line* line = m_line_free.pop_front();
388 if (line == 0)
389 line = new Line;
390 else
391 new (line) Line;
392 m_alloc_line_cnt++;
393 return line;
394 }
395
396 void
free_line_list(LineList & line_list)397 NdbImportCsv::Alloc::free_line_list(LineList& line_list)
398 {
399 Line* line = line_list.front();
400 while (line != 0)
401 {
402 free_field_list(line->m_field_list);
403 line = line->next();
404 }
405 m_free_line_cnt += line_list.cnt();
406 m_line_free.push_back_from(line_list);
407 }
408
409 bool
balanced()410 NdbImportCsv::Alloc::balanced()
411 {
412 return
413 m_alloc_data_cnt == m_free_data_cnt &&
414 m_alloc_field_cnt == m_free_field_cnt &&
415 m_alloc_line_cnt == m_free_line_cnt;
416 }
417
418 // input
419
Input(NdbImportCsv & csv,const char * name,const Spec & spec,const Table & table,Buf & buf,RowList & rows_out,RowList & rows_reject,RowMap & rowmap_in,Stats & stats)420 NdbImportCsv::Input::Input(NdbImportCsv& csv,
421 const char* name,
422 const Spec& spec,
423 const Table& table,
424 Buf& buf,
425 RowList& rows_out,
426 RowList& rows_reject,
427 RowMap& rowmap_in,
428 Stats& stats) :
429 m_csv(csv),
430 m_util(m_csv.m_util),
431 m_name(name),
432 m_spec(spec),
433 m_table(table),
434 m_buf(buf),
435 m_rows_out(rows_out),
436 m_rows_reject(rows_reject),
437 m_rowmap_in(rowmap_in)
438 {
439 m_parse = new Parse(*this);
440 m_eval = new Eval(*this);
441 m_rows.set_stats(m_util.c_stats, Name(m_name, "rows"));
442 m_startpos = 0;
443 m_startlineno = 0;
444 m_ignore_lines = 0;
445 }
446
~Input()447 NdbImportCsv::Input::~Input()
448 {
449 delete m_parse;
450 delete m_eval;
451 }
452
453 void
do_init()454 NdbImportCsv::Input::do_init()
455 {
456 const Opt& opt = m_util.c_opt;
457 m_ignore_lines = opt.m_ignore_lines;
458 m_parse->do_init();
459 m_eval->do_init();
460 }
461
462 /*
463 * Adjust counters at resume. Argument is first range in old
464 * rowmap. Input file seek is done by caller.
465 */
466 void
do_resume(Range range_in)467 NdbImportCsv::Input::do_resume(Range range_in)
468 {
469 m_startpos = range_in.m_endpos;
470 m_startlineno = range_in.m_end + m_ignore_lines;
471 }
472
473 void
do_parse()474 NdbImportCsv::Input::do_parse()
475 {
476 #ifdef VM_TRACE
477 NdbImportCsv_yydebug = (m_util.c_opt.m_log_level >= 4);
478 #endif
479 m_parse->do_parse();
480 #ifdef VM_TRACE
481 NdbImportCsv_yydebug = 0;
482 #endif
483 }
484
485 void
do_eval()486 NdbImportCsv::Input::do_eval()
487 {
488 m_eval->do_eval();
489 }
490
491 void
do_send(uint & curr,uint & left)492 NdbImportCsv::Input::do_send(uint& curr, uint& left)
493 {
494 const Opt& opt = m_util.c_opt;
495 RowList& rows_out = m_rows_out; // shared
496 rows_out.lock();
497 curr = m_rows.cnt();
498 RowCtl ctl(opt.m_rowswait);
499 m_rows.pop_front_to(rows_out, ctl);
500 left = m_rows.cnt();
501 if (rows_out.m_foe)
502 {
503 log_debug(1, "consumer has stopped");
504 m_util.set_error_gen(m_error, __LINE__, "consumer has stopped");
505 }
506 rows_out.unlock();
507 }
508
509 void
do_movetail(Input & input2)510 NdbImportCsv::Input::do_movetail(Input& input2)
511 {
512 Buf& buf1 = m_buf;
513 Buf& buf2 = input2.m_buf;
514 require(buf1.movetail(buf2) == 0);
515 buf1.m_pos = buf1.m_len; // keep pos within new len
516 input2.m_startpos = m_startpos + buf1.m_len;
517 input2.m_startlineno = m_startlineno + m_line_list.cnt();
518 log_debug(1, "movetail " << " src: " << buf1 << " dst: " << buf2 <<
519 " startpos: " << m_startpos << "->" << input2.m_startpos <<
520 " startline: " << m_startlineno << "->" << input2.m_startlineno);
521 }
522
523 void
reject_line(const Line * line,const Field * field,const Error & error)524 NdbImportCsv::Input::reject_line(const Line* line,
525 const Field* field,
526 const Error& error)
527 {
528 const Opt& opt = m_util.c_opt;
529 RowList& rows_reject = m_rows_reject;
530 rows_reject.lock();
531 // write reject row first
532 const Table& table = m_util.c_reject_table;
533 Row* rejectrow = m_util.alloc_row(table);
534 rejectrow->m_rowid = m_startlineno + line->m_lineno - m_ignore_lines;
535 rejectrow->m_linenr = 1 + m_startlineno + line->m_lineno;
536 rejectrow->m_startpos = m_startpos + line->m_pos;
537 rejectrow->m_endpos = m_startpos + line->m_end;
538 const Buf& buf = m_buf;
539 const uchar* bufdata = &buf.m_data[buf.m_start];
540 const char* bufdatac = (const char*)bufdata;
541 const char* reject = &bufdatac[line->m_pos];
542 uint32 rejectlen = line->m_end - line->m_pos;
543 m_util.set_reject_row(rejectrow, Inval_uint32, error, reject, rejectlen);
544 require(rows_reject.push_back(rejectrow));
545 // error if rejects exceeded
546 if (rows_reject.totcnt() > opt.m_rejects)
547 {
548 m_util.set_error_data(m_error, __LINE__, 0,
549 "reject limit %u exceeded", opt.m_rejects);
550 }
551 rows_reject.unlock();
552 }
553
554 void
print(NdbOut & out)555 NdbImportCsv::Input::print(NdbOut& out)
556 {
557 typedef NdbImportCsv::Line Line;
558 typedef NdbImportCsv::Field Field;
559 const NdbImportCsv::Buf& buf = m_buf;
560 const uchar* bufdata = &buf.m_data[buf.m_start];
561 const char* bufdatac = (const char*)bufdata;
562 LineList& line_list = m_line_list;
563 out << "input:" << endl;
564 out << "len=" << m_buf.m_len << endl;
565 uint n = strlen(bufdatac);
566 if (n != 0 && bufdatac[n-1] == '\n')
567 out << bufdatac;
568 else
569 out << bufdatac << "\\c" << endl;
570 out << "linecnt=" << line_list.cnt();
571 Line* line = line_list.front();
572 while (line != 0)
573 {
574 out << endl;
575 out << "lineno=" << line->m_lineno;
576 out << " pos=" << line->m_pos;
577 out << " length=" << line->m_end - line->m_pos;
578 out << " fieldcnt=" << line->m_field_list.cnt();
579 Field* field = line->m_field_list.front();
580 while (field != 0)
581 {
582 out << endl;
583 uint pos = field->m_pos;
584 uint end = field->m_end;
585 uint pack_pos = field->m_pack_pos;
586 uint pack_end = field->m_pack_end;
587 char b[4096];
588 snprintf(b, sizeof(b), "%.*s", pack_end - pack_pos, &bufdatac[pack_pos]);
589 out << "fieldno=" << field->m_fieldno;
590 out << " pos=" << pos;
591 out << " length=" << end - pos;
592 out << " pack_pos=" << pack_pos;
593 out << " pack_length=" << pack_end - pack_pos;
594 out << " null=" << field->m_null;
595 out << " data=" << b;
596 field = field->next();
597 }
598 line = line->next();
599 }
600 out << endl;
601 require(false);
602 }
603
604 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Input & input)605 operator<<(NdbOut& out, const NdbImportCsv::Input& input)
606 {
607 out << input.m_name;
608 out << " len=" << input.m_buf.m_len;
609 out << " linecnt=" << input.m_line_list.cnt() << " ";
610 return out;
611 }
612
613 // parse
614
Parse(Input & input)615 NdbImportCsv::Parse::Parse(Input& input) :
616 m_input(input),
617 m_csv(m_input.m_csv),
618 m_util(m_input.m_util),
619 m_error(m_input.m_error)
620 {
621 m_stacktop = 0;
622 m_state[m_stacktop] = State_plain;
623 m_last_token = 0;
624 }
625
626 void
do_init()627 NdbImportCsv::Parse::do_init()
628 {
629 log_debug(1, "do_init");
630 const Spec& spec = m_input.m_spec;
631 for (int s = 0; s < g_statecnt; s++)
632 {
633 /*
634 * NUL byte 0x00 can be represented as NUL, \NUL, or \0
635 * where the first two contain a literal NUL byte 0x00.
636 * The T_NUL token is used to avoid branching in the normal
637 * case where the third printable format is used.
638 */
639 m_trans[s][0] = T_NUL;
640 }
641 for (uint u = 1; u < g_bytecnt; u++)
642 {
643 m_trans[State_plain][u] = T_DATA;
644 m_trans[State_quote][u] = T_DATA;
645 m_trans[State_escape][u] = T_BYTE;
646 }
647 {
648 const uchar* p = spec.m_fields_terminated_by;
649 const uint len = spec.m_fields_terminated_by_len;
650 require(p != 0 && p[0] != 0 && len == strlen((const char*)p));
651 uint u = p[0];
652 // avoid parse-time branch in the common case
653 m_trans[State_plain][u] = len == 1 ? T_FIELDSEP : T_FIELDSEP2;
654 m_trans[State_quote][u] = T_DATA;
655 m_trans[State_escape][u] = T_BYTE;
656 }
657 {
658 const uchar* p = spec.m_fields_optionally_enclosed_by;
659 if (p != 0 && p[0] != 0)
660 {
661 require(p[1] == 0);
662 uint u = p[0];
663 m_trans[State_plain][u] = T_QUOTE;
664 m_trans[State_quote][u] = T_QUOTEQUOTE;
665 m_trans[State_escape][u] = T_BYTE;
666 }
667 }
668 {
669 const uchar* p = spec.m_fields_escaped_by;
670 require(p != 0);
671 if (p[0] != 0)
672 {
673 require(p[1] == 0);
674 uint u = p[0];
675 m_trans[State_plain][u] = T_ESCAPE;
676 m_trans[State_quote][u] = T_ESCAPE;
677 m_trans[State_escape][u] = T_BYTE;
678 }
679 }
680 {
681 const uchar* p = spec.m_lines_terminated_by;
682 const uint len = spec.m_lines_terminated_by_len;
683 require(p != 0 && p[0] != 0 && len == strlen((const char*)p));
684 uint u = p[0];
685 // avoid parse-time branch in the common case
686 m_trans[State_plain][u] = len == 1 ? T_LINEEND : T_LINEEND2;
687 m_trans[State_quote][u] = T_DATA;
688 m_trans[State_escape][u] = T_BYTE;
689 }
690 // escape (\N is special)
691 {
692 const uchar* p = spec.m_fields_escaped_by;
693 for (uint u = 0; u < g_bytecnt; u++)
694 m_escapes[u] = u;
695 require(p != 0);
696 if (p[0] != 0)
697 {
698 m_escapes[(int)'0'] = 000; // NUL
699 m_escapes[(int)'b'] = 010; // BS
700 m_escapes[(int)'n'] = 012; // NL
701 m_escapes[(int)'r'] = 015; // CR
702 m_escapes[(int)'t'] = 011; // TAB
703 m_escapes[(int)'Z'] = 032; // ^Z
704 }
705 }
706 }
707
708 void
push_state(State state)709 NdbImportCsv::Parse::push_state(State state)
710 {
711 require(m_stacktop + 1 < g_stackmax);
712 m_state[++m_stacktop] = state;
713 log_debug_3("push " << g_str_state(m_state[m_stacktop-1])
714 << "->" << g_str_state(m_state[m_stacktop]));
715 }
716
717 void
pop_state()718 NdbImportCsv::Parse::pop_state()
719 {
720 require(m_stacktop > 0);
721 m_stacktop--;
722 log_debug_3("pop " << g_str_state(m_state[m_stacktop])
723 << "<-" << g_str_state(m_state[m_stacktop+1]));
724 }
725
726 void
do_parse()727 NdbImportCsv::Parse::do_parse()
728 {
729 log_debug(2, "do_parse");
730 m_input.free_line_list(m_input.m_line_list);
731 m_input.free_line_list(m_line_list);
732 m_input.free_field_list(m_field_list);
733 m_input.free_data_list(m_data_list);
734 m_stacktop = 0;
735 m_state[m_stacktop] = State_plain;
736 Buf& buf = m_input.m_buf;
737 buf.m_pos = 0;
738 int ret = 0;
739 if (buf.m_len != 0)
740 ret = NdbImportCsv_yyparse(*this);
741 log_debug(1, "parse ret=" << ret);
742 if (ret == 0)
743 {
744 require(m_last_token == 0);
745 buf.m_tail = buf.m_len;
746 }
747 else if (!m_util.has_error())
748 {
749 // last parsed line
750 Line* line = m_line_list.back();
751 if (line != 0)
752 {
753 buf.m_tail = line->m_end;
754 m_input.m_line_list.push_back_from(m_line_list);
755 m_input.free_field_list(m_field_list);
756 m_input.free_data_list(m_data_list);
757 }
758 else
759 {
760 uint64 abspos = m_input.m_startpos;
761 uint64 abslineno = 1 + m_input.m_startlineno;
762 m_util.set_error_data(m_error, __LINE__, 0,
763 "parse error at line=%" PRIu64 ": pos=%" PRIu64 ":"
764 " CSV page contains no complete record"
765 " (buffer too small"
766 " or missing last line terminator)",
767 abslineno, abspos);
768 return;
769 }
770 }
771 /*
772 * Pack data parts into fields. Modifies buf data and cannot
773 * be done before accepted lines and fields are known. Otherwise
774 * movetail() passes garbage to next worker.
775 */
776 {
777 Line* line = m_input.m_line_list.front();
778 while (line != 0)
779 {
780 Field* field = line->m_field_list.front();
781 while (field != 0)
782 {
783 if (field->m_data_list.cnt() != 0)
784 pack_field(field);
785 field = field->next();
786 }
787 line = line->next();
788 }
789 }
790 }
791
792 int
do_lex(YYSTYPE * lvalp)793 NdbImportCsv::Parse::do_lex(YYSTYPE* lvalp)
794 {
795 log_debug_3("do_lex");
796 const Spec& spec = m_input.m_spec;
797 Buf& buf = m_input.m_buf;
798 const uchar* bufdata = &buf.m_data[buf.m_start];
799 State state = m_state[m_stacktop];
800 const int* trans = m_trans[state];
801 const uint pos = buf.m_pos;
802 uint len = 0;
803 uint end = pos;
804 uint u = bufdata[pos];
805 int token = trans[u];
806 switch (token) {
807 case T_FIELDSEP:
808 len = 1;
809 end += len;
810 break;
811 case T_FIELDSEP2:
812 len = spec.m_fields_terminated_by_len;
813 if (len <= buf.m_len - buf.m_pos &&
814 memcmp(&bufdata[pos], spec.m_fields_terminated_by, len) == 0)
815 {
816 end += len;
817 token = T_FIELDSEP;
818 break;
819 }
820 len = 1;
821 end += len;
822 token = T_DATA;
823 break;
824 case T_QUOTE:
825 push_state(State_quote);
826 require(spec.m_fields_enclosed_by_len == 1);
827 len = 1;
828 end += len;
829 break;
830 case T_QUOTEQUOTE:
831 require(spec.m_fields_enclosed_by_len == 1);
832 if (bufdata[pos + 1] == u)
833 {
834 token = T_DATA;
835 len = 1;
836 end += 2;
837 break;
838 }
839 token = T_QUOTE;
840 len = 1;
841 end += len;
842 pop_state();
843 break;
844 case T_ESCAPE:
845 push_state(State_escape);
846 require(spec.m_fields_escaped_by_len == 1);
847 len = 1;
848 end += len;
849 break;
850 case T_LINEEND:
851 len = 1;
852 end += len;
853 break;
854 case T_LINEEND2:
855 len = spec.m_lines_terminated_by_len;
856 if (len <= buf.m_len - buf.m_pos &&
857 memcmp(&bufdata[pos], spec.m_lines_terminated_by, len) == 0)
858 {
859 end += len;
860 token = T_LINEEND;
861 break;
862 }
863 len = 1;
864 end += len;
865 token = T_DATA;
866 break;
867 case T_DATA:
868 do
869 {
870 len++;
871 u = bufdata[pos + len];
872 } while (trans[u] == T_DATA);
873 end += len;
874 break;
875 case T_BYTE:
876 len = 1;
877 end += len;
878 pop_state();
879 break;
880 case T_NUL:
881 if (buf.m_pos == buf.m_len)
882 {
883 token = 0;
884 break;
885 }
886 if (m_state[m_stacktop] != State_escape)
887 token = T_DATA;
888 else
889 {
890 token = T_BYTE;
891 pop_state();
892 }
893 len = 1;
894 end += len;
895 break;
896 }
897 Chunk chunk;
898 chunk.m_pos = pos;
899 chunk.m_len = len;
900 chunk.m_end = end;
901 log_debug_3("do_lex: token=" << token <<
902 " pos=" << chunk.m_pos << " len=" << len << " end=" << end);
903 buf.m_pos = end;
904 lvalp->m_chunk = chunk;
905 m_last_token = token;
906 return token;
907 }
908
909 void
do_error(const char * msg)910 NdbImportCsv::Parse::do_error(const char* msg)
911 {
912 if (m_last_token != 0)
913 {
914 const Buf& buf = m_input.m_buf;
915 log_debug(2, "parse error at buf:" << buf);
916 uint64 abspos = m_input.m_startpos + buf.m_pos;
917 uint64 abslineno = m_input.m_startlineno + m_line_list.cnt();
918 m_util.set_error_data(m_error, __LINE__, 0,
919 "parse error at line=%" PRIu64 ": pos=%" PRIu64 ": %s",
920 abslineno, abspos, msg);
921 }
922 }
923
924 void
pack_field(Field * field)925 NdbImportCsv::Parse::pack_field(Field* field)
926 {
927 Buf& buf = m_input.m_buf;
928 uchar* bufdata = &buf.m_data[buf.m_start];
929 DataList& data_list = field->m_data_list;
930 Data* data = data_list.front();
931 require(data != 0);
932 // if field is exactly "\N" then it becomes NULL
933 if (data->next() == 0 &&
934 data->m_escape &&
935 bufdata[data->m_pos] == 'N')
936 {
937 field->m_pack_pos = Inval_uint;
938 field->m_pack_end = Inval_uint;
939 field->m_null = true;
940 return;
941 }
942 // handle multiple pieces and normal escapes
943 uint pack_pos = data->m_pos;
944 uint pack_end = pack_pos;
945 while (data != 0)
946 {
947 uint len = data->m_len;
948 memmove(&bufdata[pack_end], &bufdata[data->m_pos], len);
949 if (data->m_escape)
950 {
951 require(len == 1);
952 bufdata[pack_end] = m_escapes[bufdata[pack_end]];
953 }
954 pack_end += len;
955 data = data->next();
956 }
957 field->m_pack_pos = pack_pos;
958 field->m_pack_end = pack_end;
959 field->m_null = false;
960 }
961
962 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Parse & parse)963 operator<<(NdbOut& out, const NdbImportCsv::Parse& parse)
964 {
965 const NdbImportCsv::Buf& buf = parse.m_input.m_buf;
966 out << "parse " << parse.m_input.m_name;
967 NdbImportCsv::Parse::State state = parse.m_state[parse.m_stacktop];
968 out << " [" << NdbImportCsv::g_str_state(state) << "]";
969 if (buf.m_len != 0)
970 {
971 const uchar* bufdata = &buf.m_data[buf.m_start];
972 char chr[20];
973 int c = bufdata[buf.m_pos];
974 if (isascii(c) && isprint(c))
975 sprintf(chr, "%c", c);
976 else if (c == '\n')
977 sprintf(chr, "%s", "\\n");
978 else
979 sprintf(chr, "0x%02x", c);
980 out << " len=" << buf.m_len << " pos=" << buf.m_pos;
981 out << " chr=" << chr << " ";
982 }
983 return out;
984 }
985
986 const char*
g_str_state(Parse::State state)987 NdbImportCsv::g_str_state(Parse::State state)
988 {
989 const char* str = 0;
990 switch (state) {
991 case Parse::State_plain:
992 str = "plain";
993 break;
994 case Parse::State_quote:
995 str = "quote";
996 break;
997 case Parse::State_escape:
998 str = "escape";
999 break;
1000 }
1001 require(str != 0);
1002 return str;
1003 }
1004
1005 // eval
1006
Eval(Input & input)1007 NdbImportCsv::Eval::Eval(Input& input) :
1008 m_input(input),
1009 m_csv(m_input.m_csv),
1010 m_util(m_input.m_util),
1011 m_error(m_input.m_error)
1012 {
1013 }
1014
~Eval()1015 NdbImportCsv::Eval::~Eval()
1016 {
1017 }
1018
1019 void
do_init()1020 NdbImportCsv::Eval::do_init()
1021 {
1022 }
1023
1024 void
do_eval()1025 NdbImportCsv::Eval::do_eval()
1026 {
1027 const Opt& opt = m_util.c_opt;
1028 const Table& table = m_input.m_table;
1029 LineList& line_list = m_input.m_line_list;
1030 Line* line = line_list.front();
1031 RowList rows_chunk;
1032 while (line != 0)
1033 {
1034 const uint64 ignore_lines = m_input.m_ignore_lines;
1035 const uint64 lineno = m_input.m_startlineno + line->m_lineno;
1036 if (lineno < ignore_lines)
1037 {
1038 line = line->next();
1039 continue;
1040 }
1041 if (opt.m_resume)
1042 {
1043 RowMap& rowmap_in = m_input.m_rowmap_in;
1044 const uint64 rowid = lineno - ignore_lines;
1045 if (!rowmap_in.empty())
1046 {
1047 bool found = rowmap_in.remove(rowid);
1048 if (found)
1049 {
1050 line = line->next();
1051 log_debug(1, "skip old rowid: " << rowid);
1052 continue;
1053 }
1054 }
1055 }
1056 if (rows_chunk.cnt() == 0)
1057 {
1058 require(line->m_lineno < line_list.cnt());
1059 uint cnt = line_list.cnt() - line->m_lineno;
1060 if (cnt > opt.m_alloc_chunk)
1061 cnt = opt.m_alloc_chunk;
1062 m_util.alloc_rows(table, cnt, rows_chunk);
1063 }
1064 Row* row = rows_chunk.pop_front();
1065 eval_line(row, line);
1066 if (line->m_reject)
1067 {
1068 m_util.free_row(row);
1069 }
1070 // stop loading if error
1071 if (m_input.has_error())
1072 {
1073 break;
1074 }
1075 line = line->next();
1076 }
1077 m_input.free_line_list(m_input.m_line_list);
1078 }
1079
1080 void
eval_line(Row * row,Line * line)1081 NdbImportCsv::Eval::eval_line(Row* row, Line* line)
1082 {
1083 const Table& table = m_input.m_table;
1084 const Attrs& attrs = table.m_attrs;
1085 const uint attrcnt = attrs.size();
1086 const uint64 lineno = m_input.m_startlineno + line->m_lineno;
1087 const uint64 linenr = 1 + lineno;
1088 row->m_rowid = lineno - m_input.m_ignore_lines;
1089 row->m_linenr = linenr;
1090 row->m_startpos = m_input.m_startpos + line->m_pos;
1091 row->m_endpos = m_input.m_startpos + line->m_end;
1092 uint fieldcnt = line->m_field_list.cnt();
1093 const uint has_hidden_pk = (uint)table.m_has_hidden_pk;
1094 const uint expect_attrcnt = attrcnt - has_hidden_pk;
1095 Error error; // local error
1096 do
1097 {
1098 if (fieldcnt < expect_attrcnt)
1099 {
1100 m_util.set_error_data(
1101 error, __LINE__, 0,
1102 "line %" PRIu64 ": too few fields (%u < %u)",
1103 linenr, fieldcnt, attrcnt);
1104 break;
1105 }
1106 if(fieldcnt == expect_attrcnt + 1 &&
1107 line->m_field_list.final_field_is_empty())
1108 {
1109 /* Handle field terminator at end of line */
1110 Field * empty_field = line->m_field_list.pop_back();
1111 fieldcnt--;
1112 m_input.free_field(empty_field);
1113 break;
1114 }
1115 if (fieldcnt > expect_attrcnt)
1116 {
1117 m_util.set_error_data(
1118 error, __LINE__, 0,
1119 "line %" PRIu64 ": too many fields (%u > %u)",
1120 linenr, fieldcnt, attrcnt);
1121 break;
1122 }
1123 } while (0);
1124 if (m_util.has_error(error))
1125 {
1126 m_input.reject_line(line, (Field*)0, error);
1127 line->m_reject = true;
1128 }
1129 Field* field = line->m_field_list.front();
1130 for (uint n = 0; n < fieldcnt; n++)
1131 {
1132 if (line->m_reject) // wrong field count or eval error
1133 break;
1134 require(field != 0);
1135 require(field->m_fieldno == n);
1136 if (!field->m_null)
1137 eval_field(row, line, field);
1138 else
1139 eval_null(row, line, field);
1140 field = field->next();
1141 }
1142 if (!line->m_reject)
1143 {
1144 require(field == 0);
1145 }
1146 if (has_hidden_pk)
1147 {
1148 /*
1149 * CSV has no access to Ndb (in fact there may not be any Ndb
1150 * object e.g. in CSV input -> CSV output). Any autoincrement
1151 * value for hidden pk is set later in RelayOpWorker. Fill in
1152 * some dummy value to not leave uninitialized data.
1153 */
1154 const Attr& attr = attrs[attrcnt - 1];
1155 require(attr.m_type == NdbDictionary::Column::Bigunsigned);
1156 uint64 val = Inval_uint64;
1157 attr.set_value(row, &val, 8);
1158 }
1159 if (!line->m_reject)
1160 m_input.m_rows.push_back(row);
1161 }
1162
1163 /*
1164 * Parse some fields by doing a CS101 "turn string into number".
1165 * Digits must be ascii digits.
1166 * Bengalese numbers are not supported.
1167 */
1168
1169 struct Ndb_import_csv_error {
1170 enum Error_code {
1171 No_error = 0,
1172 Format_error = 1,
1173 Value_error = 2, // but DBTUP should be final arbiter
1174 Internal_error = 3
1175 };
1176 static const int error_code_count = Internal_error + 1;
1177 int error_code;
1178 const char* error_text;
1179 int error_line;
1180 };
1181
1182 static const Ndb_import_csv_error
1183 ndb_import_csv_error[Ndb_import_csv_error::error_code_count] = {
1184 { Ndb_import_csv_error::No_error, "no error", 0 },
1185 { Ndb_import_csv_error::Format_error, "format error", 0 },
1186 { Ndb_import_csv_error::Value_error, "value error", 0 },
1187 { Ndb_import_csv_error::Internal_error, "internal error", 0 }
1188 };
1189
1190 static void
ndb_import_csv_decimal_error(int err,Ndb_import_csv_error & csv_error)1191 ndb_import_csv_decimal_error(int err,
1192 Ndb_import_csv_error& csv_error)
1193 {
1194 switch (err) {
1195 case E_DEC_OK:
1196 csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1197 break;
1198 case E_DEC_TRUNCATED:
1199 case E_DEC_OVERFLOW:
1200 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Value_error];
1201 break;
1202 case E_DEC_BAD_NUM:
1203 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1204 break;
1205 case E_DEC_OOM:
1206 case E_DEC_BAD_PREC:
1207 case E_DEC_BAD_SCALE:
1208 default:
1209 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Internal_error];
1210 break;
1211 }
1212 }
1213
1214 static bool
ndb_import_csv_parse_decimal(const NdbImportCsv::Attr & attr,bool is_unsigned,const char * datac,uint length,uchar * val,uint val_len,Ndb_import_csv_error & csv_error)1215 ndb_import_csv_parse_decimal(const NdbImportCsv::Attr& attr,
1216 bool is_unsigned,
1217 const char* datac, uint length,
1218 uchar* val, uint val_len,
1219 Ndb_import_csv_error& csv_error)
1220 {
1221 #if 0
1222 // [-+]ddd.ff
1223 "^"
1224 "([-+])*" // 1:sign
1225 "([[:digit:]]*)?" // 2:ddd
1226 "(.)?" // 3:.
1227 "([[:digit:]]*)?" // 4:ff
1228 "$"
1229 #endif
1230 // sign
1231 const char* p = datac;
1232 const char* q = p;
1233 if (!is_unsigned)
1234 while (*p == '+' || *p == '-')
1235 p++;
1236 else
1237 while (*p == '+')
1238 p++;
1239 q = p;
1240 // decimal_str2bin does not check string end so parse here
1241 uint digits = 0;
1242 while (isdigit(*p))
1243 p++;
1244 digits += p - q;
1245 q = p;
1246 if (*p == '.')
1247 {
1248 q = ++p;
1249 while (isdigit(*p))
1250 p++;
1251 digits += p - q;
1252 }
1253 if (*p != 0)
1254 {
1255 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1256 csv_error.error_line = __LINE__;
1257 return false;
1258 }
1259 if (digits == 0)
1260 {
1261 // single "." is not valid decimal
1262 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1263 csv_error.error_line = __LINE__;
1264 return false;
1265 }
1266 int err;
1267 err = decimal_str2bin(datac, length,
1268 attr.m_precision, attr.m_scale,
1269 val, val_len);
1270 if (err != 0)
1271 {
1272 ndb_import_csv_decimal_error(err, csv_error);
1273 csv_error.error_line = __LINE__;
1274 return false;
1275 }
1276 return true;
1277 }
1278
1279 static bool
ndb_import_csv_parse_year(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Year & s,Ndb_import_csv_error & csv_error)1280 ndb_import_csv_parse_year(const NdbImportCsv::Attr& attr,
1281 const char* datac,
1282 NdbSqlUtil::Year& s,
1283 Ndb_import_csv_error& csv_error)
1284 {
1285 #if 0
1286 // yyyy
1287 "^"
1288 "([[:digit:]]{4}|[[:digit:]]{2})" // 1:yyyy
1289 "$"
1290 #endif
1291 csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1292 s.year = 0;
1293 const char* p = datac;
1294 const char* q = p;
1295 while (isdigit(*p) && p - q < 4)
1296 s.year = 10 * s.year + (*p++ - '0');
1297 if (p - q == 4)
1298 ;
1299 else if (p - q == 2)
1300 {
1301 if (s.year >= 70)
1302 s.year += 1900;
1303 else
1304 s.year += 2000;
1305 }
1306 else
1307 {
1308 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1309 csv_error.error_line = __LINE__;
1310 return false;
1311 }
1312 return true;
1313 }
1314
1315 static bool
ndb_import_csv_parse_date(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Date & s,Ndb_import_csv_error & csv_error)1316 ndb_import_csv_parse_date(const NdbImportCsv::Attr& attr,
1317 const char* datac,
1318 NdbSqlUtil::Date& s,
1319 Ndb_import_csv_error& csv_error)
1320 {
1321 #if 0
1322 // yyyy-mm-dd
1323 "^"
1324 "([[:digit:]]{4}|[[:digit:]]{2})" // 1:yyyy
1325 "(" // 2:
1326 "[[:punct:]]+"
1327 "([[:digit:]]{1,2})" // 3:mm
1328 "[[:punct:]]+"
1329 "([[:digit:]]{1,2})" // 4:dd
1330 "|"
1331 "([[:digit:]]{2})" // 5:mm
1332 "([[:digit:]]{2})" // 6:dd
1333 ")"
1334 "$"
1335 #endif
1336 csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1337 s.year = s.month = s.day = 0;
1338 const char* p = datac;
1339 const char* q = p;
1340 while (isdigit(*p) && p - q < 4)
1341 s.year = 10 * s.year + (*p++ - '0');
1342 if (p - q == 4)
1343 ;
1344 else if (p - q == 2)
1345 {
1346 if (s.year >= 70)
1347 s.year += 1900;
1348 else
1349 s.year += 2000;
1350 }
1351 else
1352 {
1353 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1354 csv_error.error_line = __LINE__;
1355 return false;
1356 }
1357 q = p;
1358 // separator vs non-separator variant
1359 if (ispunct(*p))
1360 {
1361 // anything goes
1362 while (ispunct(*p))
1363 p++;
1364 q = p;
1365 // month
1366 while (isdigit(*p) && p - q < 2)
1367 s.month = 10 * s.month + (*p++ - '0');
1368 if (p - q > 0)
1369 ;
1370 else
1371 {
1372 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1373 csv_error.error_line = __LINE__;
1374 return false;
1375 }
1376 q = p;
1377 if (ispunct(*p))
1378 ;
1379 else
1380 {
1381 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1382 csv_error.error_line = __LINE__;
1383 return false;
1384 }
1385 // anything goes
1386 while (ispunct(*p))
1387 p++;
1388 q = p;
1389 // day
1390 while (isdigit(*p) && p - q < 2)
1391 s.day = 10 * s.day + (*p++ - '0');
1392 if (p - q > 0)
1393 ;
1394 else
1395 {
1396 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1397 csv_error.error_line = __LINE__;
1398 return false;
1399 }
1400 q = p;
1401 }
1402 else
1403 {
1404 // month
1405 while (isdigit(*p) && p - q < 2)
1406 s.month = 10 * s.month + (*p++ - '0');
1407 if (p - q == 2)
1408 ;
1409 else
1410 {
1411 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1412 csv_error.error_line = __LINE__;
1413 return false;
1414 }
1415 q = p;
1416 // day
1417 while (isdigit(*p) && p - q < 2)
1418 s.day = 10 * s.day + (*p++ - '0');
1419 if (p - q == 2)
1420 ;
1421 else
1422 {
1423 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1424 csv_error.error_line = __LINE__;
1425 return false;
1426 }
1427 q = p;
1428 }
1429 return true;
1430 }
1431
1432 static bool
ndb_import_csv_parse_time2(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Time2 & s,Ndb_import_csv_error & csv_error)1433 ndb_import_csv_parse_time2(const NdbImportCsv::Attr& attr,
1434 const char* datac,
1435 NdbSqlUtil::Time2& s,
1436 Ndb_import_csv_error& csv_error)
1437 {
1438 #if 0
1439 // dd hh:mm:ss.ffffff
1440 "^"
1441 "(([[:digit:]]+)[[:space:]]+)?" // 1:dd 2: ***NOTYET***
1442 "(" // 3:
1443 "([[:digit:]]{1,2})" // 4:hh
1444 "[:]"
1445 "([[:digit:]]{1,2})" // 5:mm
1446 "[:]"
1447 "([[:digit:]]{1,2})" // 6:ss
1448 "|"
1449 "([[:digit:]]{2})" // 7:hh
1450 "([[:digit:]]{2})" // 8:mm
1451 "([[:digit:]]{2})" // 9:ss
1452 ")"
1453 "(\\.([[:digit:]]*))?" // 10: 11:ffffff
1454 "$"
1455 #endif
1456 csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1457 s.sign = 1;
1458 s.interval = 0;
1459 s.hour = s.minute = s.second = 0;
1460 s.fraction = 0;
1461 const char* p = datac;
1462 const char* q = p;
1463 // hour
1464 while (isdigit(*p) && p - q < 2)
1465 s.hour = 10 * s.hour + (*p++ - '0');
1466 if (p - q == 1 || p - q == 2)
1467 ;
1468 else
1469 {
1470 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1471 csv_error.error_line = __LINE__;
1472 return false;
1473 }
1474 q = p;
1475 // separator vs non-separator variant
1476 if (*p == ':')
1477 {
1478 q = ++p;
1479 // minute
1480 while (isdigit(*p))
1481 s.minute = 10 * s.minute + (*p++ - '0');
1482 if (p - q == 1 || p - q == 2)
1483 ;
1484 else
1485 {
1486 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1487 csv_error.error_line = __LINE__;
1488 return false;
1489 }
1490 q = p;
1491 if (*p == ':')
1492 q = ++p;
1493 else
1494 {
1495 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1496 csv_error.error_line = __LINE__;
1497 return false;
1498 }
1499 while (isdigit(*p))
1500 s.second = 10 * s.second + (*p++ - '0');
1501 if (p - q == 1 || p - q == 2)
1502 ;
1503 else
1504 {
1505 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1506 csv_error.error_line = __LINE__;
1507 return false;
1508 }
1509 q = p;
1510 }
1511 else
1512 {
1513 while (isdigit(*p) && p - q < 2)
1514 s.minute = 10 * s.minute + (*p++ - '0');
1515 if (p - q == 2)
1516 ;
1517 else
1518 {
1519 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1520 csv_error.error_line = __LINE__;
1521 return false;
1522 }
1523 q = p;
1524 while (isdigit(*p) && p - q < 2)
1525 s.second = 10 * s.second + (*p++ - '0');
1526 if (p - q == 2)
1527 ;
1528 else
1529 {
1530 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1531 csv_error.error_line = __LINE__;
1532 return false;
1533 }
1534 q = p;
1535 }
1536 // fraction point (optional)
1537 if (*p != 0)
1538 {
1539 if (*p == '.')
1540 p++;
1541 if (p - q == 1)
1542 ;
1543 else
1544 {
1545 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1546 csv_error.error_line = __LINE__;
1547 return false;
1548 }
1549 q = p;
1550 // fraction value (optional)
1551 while (isdigit(*p))
1552 s.fraction = 10 * s.fraction + (*p++ - '0');
1553 if (p - q <= 6)
1554 {
1555 uint n = p - q;
1556 while (n++ < attr.m_precision)
1557 s.fraction *= 10;
1558 }
1559 else
1560 {
1561 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1562 csv_error.error_line = __LINE__;
1563 return false;
1564 }
1565 }
1566 return true;
1567 }
1568
1569 static bool
ndb_import_csv_parse_datetime2(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Datetime2 & s,Ndb_import_csv_error & csv_error)1570 ndb_import_csv_parse_datetime2(const NdbImportCsv::Attr& attr,
1571 const char* datac,
1572 NdbSqlUtil::Datetime2& s,
1573 Ndb_import_csv_error& csv_error)
1574 {
1575 #if 0
1576 yyyy-mm-dd/hh:mm:ss.ffffff
1577 "^"
1578 "([[:digit:]]{4}|[[:digit:]]{2})" // 1:yyyy
1579 "[[:punct:]]+"
1580 "([[:digit:]]{1,2})" // 2:mm
1581 "[[:punct:]]+"
1582 "([[:digit:]]{1,2})" // 3:dd
1583 "(T|[[:space:]]+|[[:punct:]]+)" // 4:
1584 "([[:digit:]]{1,2})" // 5:hh
1585 "[[:punct:]]+"
1586 "([[:digit:]]{1,2})" // 6:mm
1587 "[[:punct:]]+"
1588 "([[:digit:]]{1,2})" // 7:ss
1589 "(\\.([[:digit:]]*))?" // 8: 9:ffffff
1590 "$"
1591 #endif
1592 csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1593 s.sign = 1;
1594 s.year = s.month = s.day = 0;
1595 s.hour = s.minute = s.second = 0;
1596 s.fraction = 0;
1597 const char* p = datac;
1598 const char* q = p;
1599 // year
1600 while (isdigit(*p))
1601 s.year = 10 * s.year + (*p++ - '0');
1602 if (p - q == 4)
1603 ;
1604 else if (p - q == 2)
1605 {
1606 if (s.year >= 70)
1607 s.year += 1900;
1608 else
1609 s.year += 2000;
1610 }
1611 else
1612 {
1613 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1614 csv_error.error_line = __LINE__;
1615 return false;
1616 }
1617 q = p;
1618 // separator
1619 while (ispunct(*p))
1620 p++;
1621 if (p - q != 0)
1622 ;
1623 else
1624 {
1625 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1626 csv_error.error_line = __LINE__;
1627 return false;
1628 }
1629 q = p;
1630 // month
1631 while (isdigit(*p))
1632 s.month = 10 * s.month + (*p++ - '0');
1633 if (p - q == 1 || p - q == 2)
1634 ;
1635 else
1636 {
1637 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1638 csv_error.error_line = __LINE__;
1639 return false;
1640 }
1641 // separator
1642 while (ispunct(*p))
1643 p++;
1644 if (p - q != 0)
1645 ;
1646 else
1647 {
1648 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1649 csv_error.error_line = __LINE__;
1650 return false;
1651 }
1652 q = p;
1653 // day
1654 while (isdigit(*p))
1655 s.day = 10 * s.day + (*p++ - '0');
1656 if (p - q == 1 || p - q == 2)
1657 ;
1658 else
1659 {
1660 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1661 csv_error.error_line = __LINE__;
1662 return false;
1663 }
1664 q = p;
1665 // separator
1666 if (*p == 'T')
1667 p++;
1668 else if (isspace(*p))
1669 {
1670 while (isspace(*p))
1671 p++;
1672 }
1673 else if (ispunct(*p))
1674 {
1675 while (ispunct(*p))
1676 p++;
1677 }
1678 if (p - q != 0)
1679 ;
1680 else
1681 {
1682 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1683 csv_error.error_line = __LINE__;
1684 return false;
1685 }
1686 q = p;
1687 // hour
1688 while (isdigit(*p))
1689 s.hour = 10 * s.hour + (*p++ - '0');
1690 if (p - q == 1 || p - q == 2)
1691 ;
1692 else
1693 {
1694 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1695 csv_error.error_line = __LINE__;
1696 return false;
1697 }
1698 q = p;
1699 // separator
1700 while (ispunct(*p))
1701 p++;
1702 if (p - q != 0)
1703 ;
1704 else
1705 {
1706 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1707 csv_error.error_line = __LINE__;
1708 return false;
1709 }
1710 q = p;
1711 // minute
1712 while (isdigit(*p))
1713 s.minute = 10 * s.minute + (*p++ - '0');
1714 if (p - q == 1 || p - q == 2)
1715 ;
1716 else
1717 {
1718 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1719 csv_error.error_line = __LINE__;
1720 return false;
1721 }
1722 q = p;
1723 // separator
1724 while (ispunct(*p))
1725 p++;
1726 if (p - q != 0)
1727 ;
1728 else
1729 {
1730 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1731 csv_error.error_line = __LINE__;
1732 return false;
1733 }
1734 q = p;
1735 // second
1736 while (isdigit(*p))
1737 s.second = 10 * s.second + (*p++ - '0');
1738 if (p - q == 1 || p - q == 2)
1739 ;
1740 else
1741 {
1742 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1743 csv_error.error_line = __LINE__;
1744 return false;
1745 }
1746 q = p;
1747 // fraction point (optional)
1748 if (*p != 0)
1749 {
1750 if (*p == '.')
1751 p++;
1752 if (p - q == 1)
1753 ;
1754 else
1755 {
1756 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1757 csv_error.error_line = __LINE__;
1758 return false;
1759 }
1760 q = p;
1761 // fraction value (optional)
1762 while (isdigit(*p))
1763 s.fraction = 10 * s.fraction + (*p++ - '0');
1764 if (p - q <= 6)
1765 {
1766 uint n = p - q;
1767 while (n++ < attr.m_precision)
1768 s.fraction *= 10;
1769 }
1770 else
1771 {
1772 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1773 csv_error.error_line = __LINE__;
1774 return false;
1775 }
1776 if (*p == 0)
1777 ;
1778 else
1779 {
1780 csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1781 csv_error.error_line = __LINE__;
1782 return false;
1783 }
1784 }
1785 //
1786 return true;
1787 }
1788
1789 static bool
ndb_import_csv_parse_timestamp2(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Timestamp2 & s,Ndb_import_csv_error & csv_error)1790 ndb_import_csv_parse_timestamp2(const NdbImportCsv::Attr& attr,
1791 const char* datac,
1792 NdbSqlUtil::Timestamp2& s,
1793 Ndb_import_csv_error& csv_error)
1794 {
1795 // parsed as Datetime2
1796 NdbSqlUtil::Datetime2 s2;
1797 if (!ndb_import_csv_parse_datetime2(attr, datac, s2, csv_error))
1798 return false;
1799 // convert to seconds in localtime
1800 struct tm tm;
1801 tm.tm_year = s2.year - 1900;
1802 tm.tm_mon = s2.month - 1;
1803 tm.tm_mday = s2.day;
1804 tm.tm_hour = s2.hour;
1805 tm.tm_min = s2.minute;
1806 tm.tm_sec = s2.second;
1807 tm.tm_isdst = -1; // mktime() will determine
1808 s.second = mktime(&tm);
1809 s.fraction = s2.fraction;
1810 return true;
1811 }
1812
1813 void
eval_field(Row * row,Line * line,Field * field)1814 NdbImportCsv::Eval::eval_field(Row* row, Line* line, Field* field)
1815 {
1816 const Opt& opt = m_util.c_opt;
1817 const CHARSET_INFO* cs = opt.m_charset;
1818 const Table& table = m_input.m_table;
1819 const Attrs& attrs = table.m_attrs;
1820 Buf& buf = m_input.m_buf;
1821 uchar* bufdata = &buf.m_data[buf.m_start];
1822 char* bufdatac = (char*)bufdata;
1823 // internal counts file lines and fields from 0
1824 const uint64 lineno = m_input.m_startlineno + line->m_lineno;
1825 const uint fieldno = field->m_fieldno;
1826 // user wants the counts from 1
1827 const uint64 linenr = 1 + lineno;
1828 const uint fieldnr = 1 + fieldno;
1829 const Attr& attr = attrs[fieldno];
1830 uint pos = field->m_pack_pos;
1831 uint end = field->m_pack_end;
1832 uint length = end - pos;
1833 uchar* data = &bufdata[pos];
1834 char* datac = &bufdatac[pos];
1835 /*
1836 * A field is followed by non-empty separator or terminator.
1837 * We null-terminate the field and restore it at end.
1838 */
1839 uchar saveterm = data[length];
1840 data[length] = 0;
1841 Error error; // local error
1842 /*
1843 * Lots of repeated code here but it is not worth changing
1844 * before it moves to some datatypes library.
1845 */
1846 switch (attr.m_type) {
1847 case NdbDictionary::Column::Tinyint:
1848 {
1849 int err = 0;
1850 const char* endptr = nullptr;
1851 int val = cs->cset->strntol(
1852 cs, datac, length, 10, &endptr, &err);
1853 if (err != 0)
1854 {
1855 m_util.set_error_data(
1856 error, __LINE__, err,
1857 "line %" PRIu64 " field %u: eval %s failed",
1858 linenr, fieldnr, attr.m_sqltype);
1859 break;
1860 }
1861 if (uint(endptr - datac) != length)
1862 {
1863 m_util.set_error_data(
1864 error, __LINE__, 0,
1865 "line %" PRIu64 " field %u: eval %s failed: bad format",
1866 linenr, fieldnr, attr.m_sqltype);
1867 break;
1868 }
1869 const int minval = -128;
1870 const int maxval = +127;
1871 if (val < minval || val > maxval)
1872 {
1873 m_util.set_error_data(
1874 error, __LINE__, 0,
1875 "line %" PRIu64 " field %u: eval %s failed: "
1876 "value %d out of range",
1877 linenr, fieldnr, attr.m_sqltype, val);
1878 break;
1879 }
1880 int8 byteval = val;
1881 attr.set_value(row, &byteval, 1);
1882 }
1883 break;
1884 case NdbDictionary::Column::Smallint:
1885 {
1886 int err = 0;
1887 const char* endptr = nullptr;
1888 int val = cs->cset->strntol(
1889 cs, datac, length, 10, &endptr, &err);
1890 if (err != 0)
1891 {
1892 m_util.set_error_data(
1893 error, __LINE__, err,
1894 "line %" PRIu64 " field %u: eval %s failed",
1895 linenr, fieldnr, attr.m_sqltype);
1896 break;
1897 }
1898 if (uint(endptr - datac) != length)
1899 {
1900 m_util.set_error_data(
1901 error, __LINE__, 0,
1902 "line %" PRIu64 " field %u: eval %s failed: bad format",
1903 linenr, fieldnr, attr.m_sqltype);
1904 break;
1905 }
1906 const int minval = -32768;
1907 const int maxval = +32767;
1908 if (val < minval || val > maxval)
1909 {
1910 m_util.set_error_data(
1911 error, __LINE__, 0,
1912 "line %" PRIu64 " field %u: eval %s failed: "
1913 "value %d out of range",
1914 linenr, fieldnr, attr.m_sqltype, val);
1915 break;
1916 }
1917 int16 shortval = val;
1918 attr.set_value(row, &shortval, 2);
1919 }
1920 break;
1921 case NdbDictionary::Column::Mediumint:
1922 {
1923 int err = 0;
1924 const char* endptr = nullptr;
1925 int val = cs->cset->strntol(
1926 cs, datac, length, 10, &endptr, &err);
1927 if (err != 0)
1928 {
1929 m_util.set_error_data(
1930 error, __LINE__, err,
1931 "line %" PRIu64 " field %u: eval %s failed",
1932 linenr, fieldnr, attr.m_sqltype);
1933 break;
1934 }
1935 if (uint(endptr - datac) != length)
1936 {
1937 m_util.set_error_data(
1938 error, __LINE__, 0,
1939 "line %" PRIu64 " field %u: eval %s failed: bad format",
1940 linenr, fieldnr, attr.m_sqltype);
1941 break;
1942 }
1943 const int minval = -8388608;
1944 const int maxval = +8388607;
1945 if (val < minval || val > maxval)
1946 {
1947 m_util.set_error_data(
1948 error, __LINE__, 0,
1949 "line %" PRIu64 " field %u: eval %s failed: "
1950 "value %d out of range",
1951 linenr, fieldnr, attr.m_sqltype, val);
1952 break;
1953 }
1954 uchar val3[3];
1955 int3store(val3, (uint)val);
1956 attr.set_value(row, val3, 3);
1957 }
1958 break;
1959 case NdbDictionary::Column::Int:
1960 {
1961 int err = 0;
1962 const char* endptr = nullptr;
1963 int32 val = cs->cset->strntol(
1964 cs, datac, length, 10, &endptr, &err);
1965 if (err != 0)
1966 {
1967 m_util.set_error_data(
1968 error, __LINE__, err,
1969 "line %" PRIu64 " field %u: eval %s failed",
1970 linenr, fieldnr, attr.m_sqltype);
1971 break;
1972 }
1973 if (uint(endptr - datac) != length)
1974 {
1975 m_util.set_error_data(
1976 error, __LINE__, 0,
1977 "line %" PRIu64 " field %u: eval %s failed: bad format",
1978 linenr, fieldnr, attr.m_sqltype);
1979 break;
1980 }
1981 attr.set_value(row, &val, 4);
1982 }
1983 break;
1984 case NdbDictionary::Column::Bigint:
1985 {
1986 int err = 0;
1987 const char* endptr = nullptr;
1988 int64 val = cs->cset->strntoll(
1989 cs, datac, length, 10, &endptr, &err);
1990 if (err != 0)
1991 {
1992 m_util.set_error_data(
1993 error, __LINE__, err,
1994 "line %" PRIu64 " field %u: eval %s failed",
1995 linenr, fieldnr, attr.m_sqltype);
1996 break;
1997 }
1998 if (uint(endptr - datac) != length)
1999 {
2000 m_util.set_error_data(
2001 error, __LINE__, 0,
2002 "line %" PRIu64 " field %u: eval %s failed: bad format",
2003 linenr, fieldnr, attr.m_sqltype);
2004 break;
2005 }
2006 attr.set_value(row, &val, 8);
2007 }
2008 break;
2009 case NdbDictionary::Column::Tinyunsigned:
2010 {
2011 int err = 0;
2012 const char* endptr = nullptr;
2013 uint val = cs->cset->strntoul(
2014 cs, datac, length, 10, &endptr, &err);
2015 if (err != 0)
2016 {
2017 m_util.set_error_data(
2018 error, __LINE__, err,
2019 "line %" PRIu64 " field %u: eval %s failed",
2020 linenr, fieldnr, attr.m_sqltype);
2021 break;
2022 }
2023 if (uint(endptr - datac) != length)
2024 {
2025 m_util.set_error_data(
2026 error, __LINE__, 0,
2027 "line %" PRIu64 " field %u: eval %s failed: bad format",
2028 linenr, fieldnr, attr.m_sqltype);
2029 break;
2030 }
2031 const uint maxval = 255;
2032 if (val > maxval)
2033 {
2034 m_util.set_error_data(
2035 error, __LINE__, 0,
2036 "line %" PRIu64 " field %u: eval %s failed: "
2037 "value %u out of range",
2038 linenr, fieldnr, attr.m_sqltype, val);
2039 break;
2040 }
2041 uint8 byteval = val;
2042 attr.set_value(row, &byteval, 1);
2043 }
2044 break;
2045 case NdbDictionary::Column::Smallunsigned:
2046 {
2047 int err = 0;
2048 const char* endptr = nullptr;
2049 uint val = cs->cset->strntoul(
2050 cs, datac, length, 10, &endptr, &err);
2051 if (err != 0)
2052 {
2053 m_util.set_error_data(
2054 error, __LINE__, err,
2055 "line %" PRIu64 " field %u: eval %s failed",
2056 linenr, fieldnr, attr.m_sqltype);
2057 break;
2058 }
2059 if (uint(endptr - datac) != length)
2060 {
2061 m_util.set_error_data(
2062 error, __LINE__, 0,
2063 "line %" PRIu64 " field %u: eval %s failed: bad format",
2064 linenr, fieldnr, attr.m_sqltype);
2065 break;
2066 }
2067 const uint maxval = 65535;
2068 if (val > maxval)
2069 {
2070 m_util.set_error_data(
2071 error, __LINE__, 0,
2072 "line %" PRIu64 " field %u: eval %s failed: "
2073 "value %u out of range",
2074 linenr, fieldnr, attr.m_sqltype, val);
2075 break;
2076 }
2077 uint16 shortval = val;
2078 attr.set_value(row, &shortval, 2);
2079 }
2080 break;
2081 case NdbDictionary::Column::Mediumunsigned:
2082 {
2083 int err = 0;
2084 const char* endptr = nullptr;
2085 uint val = cs->cset->strntoul(
2086 cs, datac, length, 10, &endptr, &err);
2087 if (err != 0)
2088 {
2089 m_util.set_error_data(
2090 error, __LINE__, err,
2091 "line %" PRIu64 " field %u: eval %s failed",
2092 linenr, fieldnr, attr.m_sqltype);
2093 break;
2094 }
2095 if (uint(endptr - datac) != length)
2096 {
2097 m_util.set_error_data(
2098 error, __LINE__, 0,
2099 "line %" PRIu64 " field %u: eval %s failed: bad format",
2100 linenr, fieldnr, attr.m_sqltype);
2101 break;
2102 }
2103 const uint maxval = 16777215;
2104 if (val > maxval)
2105 {
2106 m_util.set_error_data(
2107 error, __LINE__, 0,
2108 "line %" PRIu64 " field %u: eval %s failed: "
2109 "value %u out of range",
2110 linenr, fieldnr, attr.m_sqltype, val);
2111 break;
2112 }
2113 uchar val3[3];
2114 int3store(val3, val);
2115 attr.set_value(row, val3, 3);
2116 }
2117 break;
2118 case NdbDictionary::Column::Unsigned:
2119 {
2120 int err = 0;
2121 const char* endptr = nullptr;
2122 uint32 val = cs->cset->strntoul(
2123 cs, datac, length, 10, &endptr, &err);
2124 if (err != 0)
2125 {
2126 m_util.set_error_data(
2127 error, __LINE__, err,
2128 "line %" PRIu64 " field %u: eval %s failed",
2129 linenr, fieldnr, attr.m_sqltype);
2130 break;
2131 }
2132 if (uint(endptr - datac) != length)
2133 {
2134 m_util.set_error_data(
2135 error, __LINE__, 0,
2136 "line %" PRIu64 " field %u: eval %s failed: bad format",
2137 linenr, fieldnr, attr.m_sqltype);
2138 break;
2139 }
2140 attr.set_value(row, &val, 4);
2141 }
2142 break;
2143 case NdbDictionary::Column::Bigunsigned:
2144 {
2145 int err = 0;
2146 const char* endptr = nullptr;
2147 uint64 val = cs->cset->strntoull(
2148 cs, datac, length, 10, &endptr, &err);
2149 if (err != 0)
2150 {
2151 m_util.set_error_data(
2152 error, __LINE__, err,
2153 "line %" PRIu64 " field %u: eval %s failed",
2154 linenr, fieldnr, attr.m_sqltype);
2155 break;
2156 }
2157 if (uint(endptr - datac) != length)
2158 {
2159 m_util.set_error_data(
2160 error, __LINE__, 0,
2161 "line %" PRIu64 " field %u: eval %s failed: bad format",
2162 linenr, fieldnr, attr.m_sqltype);
2163 break;
2164 }
2165 attr.set_value(row, &val, 8);
2166 }
2167 break;
2168 case NdbDictionary::Column::Decimal:
2169 {
2170 uchar val[200];
2171 Ndb_import_csv_error csv_error;
2172 if (!ndb_import_csv_parse_decimal(attr,
2173 false,
2174 datac, length,
2175 val, sizeof(val),
2176 csv_error))
2177 {
2178 m_util.set_error_data(
2179 error, __LINE__, 0,
2180 "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2181 linenr, fieldnr, attr.m_sqltype,
2182 csv_error.error_text, csv_error.error_line);
2183 break;
2184 }
2185 attr.set_value(row, val, attr.m_size);
2186 }
2187 break;
2188 case NdbDictionary::Column::Decimalunsigned:
2189 {
2190 uchar val[200];
2191 Ndb_import_csv_error csv_error;
2192 if (!ndb_import_csv_parse_decimal(attr,
2193 true,
2194 datac, length,
2195 val, sizeof(val),
2196 csv_error))
2197 {
2198 m_util.set_error_data(
2199 error, __LINE__, 0,
2200 "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2201 linenr, fieldnr, attr.m_sqltype,
2202 csv_error.error_text, csv_error.error_line);
2203 break;
2204 }
2205 attr.set_value(row, val, attr.m_size);
2206 }
2207 break;
2208 /*
2209 * Float and Double. We use same methods as LOAD DATA but for
2210 * some reason there are occasional infinitesimal diffs on "el6".
2211 * Fix by using ::strtod if charset allows (it does).
2212 */
2213 case NdbDictionary::Column::Float:
2214 {
2215 uint data_length;
2216 double val = 0.0;
2217 bool use_os_strtod =
2218 #ifndef _WIN32
2219 (opt.m_charset == &my_charset_bin);
2220 #else
2221 false;
2222 #endif
2223 if (use_os_strtod)
2224 {
2225 errno = 0;
2226 char* endptr = nullptr;
2227 val = ::strtod(datac, &endptr);
2228 data_length = endptr - datac;
2229 if (errno != 0)
2230 {
2231 m_util.set_error_data(
2232 error, __LINE__, errno,
2233 "line %" PRIu64 " field %u: eval %s failed",
2234 linenr, fieldnr, attr.m_sqltype);
2235 break;
2236 }
2237 }
2238 else
2239 {
2240 int err = 0;
2241 const char* endptr = nullptr;
2242 val = cs->cset->strntod(
2243 cs, datac, length, &endptr, &err);
2244 data_length = endptr - datac;
2245 if (err != 0)
2246 {
2247 m_util.set_error_data(
2248 error, __LINE__, err,
2249 "line %" PRIu64 " field %u: eval %s failed",
2250 linenr, fieldnr, attr.m_sqltype);
2251 break;
2252 }
2253 }
2254 if (data_length != length)
2255 {
2256 m_util.set_error_data(
2257 error, __LINE__, 0,
2258 "line %" PRIu64 " field %u: eval %s failed: bad format",
2259 linenr, fieldnr, attr.m_sqltype);
2260 break;
2261 }
2262 if (std::isnan(val))
2263 {
2264 m_util.set_error_data(
2265 error, __LINE__, 0,
2266 "line %" PRIu64 " field %u: eval %s failed: invalid value",
2267 linenr, fieldnr, attr.m_sqltype);
2268 break;
2269 }
2270 const double max_val = FLT_MAX;
2271 if (val < -max_val || val > max_val)
2272 {
2273 m_util.set_error_data(
2274 error, __LINE__, 0,
2275 "line %" PRIu64 " field %u: eval %s failed: value out of range",
2276 linenr, fieldnr, attr.m_sqltype);
2277 break;
2278 }
2279 float valf = (float)val;
2280 attr.set_value(row, &valf, 4);
2281 }
2282 break;
2283 case NdbDictionary::Column::Double:
2284 {
2285 int err = 0;
2286 uint data_length;
2287 double val = 0.0;
2288 bool use_os_strtod =
2289 #ifndef _WIN32
2290 (opt.m_charset == &my_charset_bin);
2291 #else
2292 false;
2293 #endif
2294 if (use_os_strtod)
2295 {
2296 errno = 0;
2297 char *endptr = nullptr;
2298 val = ::strtod(datac, &endptr);
2299 data_length = endptr - datac;
2300 if (errno != 0)
2301 {
2302 m_util.set_error_data(
2303 error, __LINE__, errno,
2304 "line %" PRIu64 " field %u: eval %s failed",
2305 linenr, fieldnr, attr.m_sqltype);
2306 break;
2307 }
2308 }
2309 else
2310 {
2311 const char* endptr = nullptr;
2312 val = cs->cset->strntod(
2313 cs, datac, length, &endptr, &err);
2314 data_length = endptr - datac;
2315 if (err != 0)
2316 {
2317 m_util.set_error_data(
2318 error, __LINE__, err,
2319 "line %" PRIu64 " field %u: eval %s failed",
2320 linenr, fieldnr, attr.m_sqltype);
2321 break;
2322 }
2323 }
2324 if (data_length != length)
2325 {
2326 m_util.set_error_data(
2327 error, __LINE__, 0,
2328 "line %" PRIu64 " field %u: eval %s failed: bad format",
2329 linenr, fieldnr, attr.m_sqltype);
2330 break;
2331 }
2332 if (std::isnan(val))
2333 {
2334 m_util.set_error_data(
2335 error, __LINE__, 0,
2336 "line %" PRIu64 " field %u: eval %s failed: invalid value",
2337 linenr, fieldnr, attr.m_sqltype);
2338 break;
2339 }
2340 const double max_val = DBL_MAX;
2341 if (val < -max_val || val > max_val)
2342 {
2343 m_util.set_error_data(
2344 error, __LINE__, 0,
2345 "line %" PRIu64 " field %u: eval %s failed: value out of range",
2346 linenr, fieldnr, attr.m_sqltype);
2347 break;
2348 }
2349 attr.set_value(row, &val, 8);
2350 }
2351 break;
2352 case NdbDictionary::Column::Char:
2353 {
2354 const char* val = datac;
2355 if (length > attr.m_length)
2356 {
2357 m_util.set_error_data(
2358 error, __LINE__, 0,
2359 "line %" PRIu64 " field %u: eval %s failed: "
2360 "byte length too long (%u > %u)",
2361 linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2362 break;
2363 }
2364 attr.set_value(row, val, length);
2365 }
2366 break;
2367 case NdbDictionary::Column::Varchar:
2368 {
2369 const char* val = datac;
2370 if (length > attr.m_length)
2371 {
2372 m_util.set_error_data(
2373 error, __LINE__, 0,
2374 "line %" PRIu64 " field %u: eval %s failed: "
2375 "byte length too long (%u > %u)",
2376 linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2377 break;
2378 }
2379 attr.set_value(row, val, length);
2380 }
2381 break;
2382 case NdbDictionary::Column::Longvarchar:
2383 {
2384 const char* val = datac;
2385 if (length > attr.m_length)
2386 {
2387 m_util.set_error_data(
2388 error, __LINE__, 0,
2389 "line %" PRIu64 " field %u: eval %s failed: "
2390 "byte length too long (%u > %u)",
2391 linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2392 break;
2393 }
2394 attr.set_value(row, val, length);
2395 }
2396 break;
2397 case NdbDictionary::Column::Binary:
2398 {
2399 const char* val = datac;
2400 if (length > attr.m_length)
2401 {
2402 m_util.set_error_data(
2403 error, __LINE__, 0,
2404 "line %" PRIu64 " field %u: eval %s failed: "
2405 "length too long (%u > %u)",
2406 linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2407 break;
2408 }
2409 attr.set_value(row, val, length);
2410 }
2411 break;
2412 case NdbDictionary::Column::Varbinary:
2413 {
2414 const char* val = datac;
2415 if (length > attr.m_length)
2416 {
2417 m_util.set_error_data(
2418 error, __LINE__, 0,
2419 "line %" PRIu64 " field %u: eval %s failed: "
2420 "length too long (%u > %u)",
2421 linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2422 break;
2423 }
2424 attr.set_value(row, val, length);
2425 }
2426 break;
2427 case NdbDictionary::Column::Longvarbinary:
2428 {
2429 const char* val = datac;
2430 if (length > attr.m_length)
2431 {
2432 m_util.set_error_data(
2433 error, __LINE__, 0,
2434 "line %" PRIu64 " field %u: eval %s failed: "
2435 "length too long (%u > %u)",
2436 linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2437 break;
2438 }
2439 attr.set_value(row, val, length);
2440 }
2441 break;
2442 case NdbDictionary::Column::Bit:
2443 {
2444 require(attr.m_length <= 64);
2445 uint bytelength = (attr.m_length + 7) / 8;
2446 require(bytelength <= 8);
2447 uchar val[8];
2448 memset(val, 0, sizeof(val));
2449 uint i = 0;
2450 uint j = Inval_uint; // highest non-zero byte
2451 while (i < length)
2452 {
2453 uchar b = data[length - 1 - i];
2454 if (b != 0)
2455 j = i;
2456 if (i < bytelength)
2457 val[i] = b;
2458 i++;
2459 }
2460 if (j != Inval_uint)
2461 {
2462 uint k = 8; // highest bit at j
2463 while (k != 0)
2464 {
2465 k--;
2466 if ((data[length - 1 - j] & (1 << k)) != 0)
2467 break;
2468 }
2469 uint hibit = 8 * (length - 1 - j) + k;
2470 if (hibit >= attr.m_length)
2471 {
2472 m_util.set_error_data(
2473 error, __LINE__, 0,
2474 "line %" PRIu64 " field %u: eval %s failed: "
2475 "highest set bit %u out of range",
2476 linenr, fieldnr, attr.m_sqltype, hibit);
2477 break;
2478 }
2479 }
2480 #if defined(WORDS_BIGENDIAN)
2481 std::swap(val[0], val[3]);
2482 std::swap(val[1], val[2]);
2483 std::swap(val[4], val[7]);
2484 std::swap(val[5], val[6]);
2485 #endif
2486 attr.set_value(row, val, attr.m_size);
2487 }
2488 break;
2489 case NdbDictionary::Column::Year:
2490 {
2491 NdbSqlUtil::Year s;
2492 Ndb_import_csv_error csv_error;
2493 if (!ndb_import_csv_parse_year(attr,
2494 datac,
2495 s,
2496 csv_error))
2497 {
2498 m_util.set_error_data(
2499 error, __LINE__, 0,
2500 "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2501 linenr, fieldnr, attr.m_sqltype,
2502 csv_error.error_text, csv_error.error_line);
2503 break;
2504 }
2505 uchar val[1];
2506 NdbSqlUtil::pack_year(s, val);
2507 attr.set_value(row, val, 1);
2508 }
2509 break;
2510 case NdbDictionary::Column::Date:
2511 {
2512 NdbSqlUtil::Date s;
2513 Ndb_import_csv_error csv_error;
2514 if (!ndb_import_csv_parse_date(attr,
2515 datac,
2516 s,
2517 csv_error))
2518 {
2519 m_util.set_error_data(
2520 error, __LINE__, 0,
2521 "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2522 linenr, fieldnr, attr.m_sqltype,
2523 csv_error.error_text, csv_error.error_line);
2524 break;
2525 }
2526 uchar val[3];
2527 NdbSqlUtil::pack_date(s, val);
2528 attr.set_value(row, val, 3);
2529 }
2530 break;
2531 case NdbDictionary::Column::Time2:
2532 {
2533 NdbSqlUtil::Time2 s;
2534 Ndb_import_csv_error csv_error;
2535 if (!ndb_import_csv_parse_time2(attr,
2536 datac,
2537 s,
2538 csv_error))
2539 {
2540 m_util.set_error_data(
2541 error, __LINE__, 0,
2542 "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2543 linenr, fieldnr, attr.m_sqltype,
2544 csv_error.error_text, csv_error.error_line);
2545 break;
2546 }
2547 uint prec = attr.m_precision;
2548 require(prec <= 6);
2549 uint flen = (1 + prec) / 2;
2550 uint len = 3 + flen;
2551 require(len <= 6);
2552 uchar val[6];
2553 NdbSqlUtil::pack_time2(s, val, prec);
2554 attr.set_value(row, val, len);
2555 }
2556 break;
2557 case NdbDictionary::Column::Datetime2:
2558 {
2559 NdbSqlUtil::Datetime2 s;
2560 Ndb_import_csv_error csv_error;
2561 if (!ndb_import_csv_parse_datetime2(attr,
2562 datac,
2563 s,
2564 csv_error))
2565 {
2566 m_util.set_error_data(
2567 error, __LINE__, 0,
2568 "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2569 linenr, fieldnr, attr.m_sqltype,
2570 csv_error.error_text, csv_error.error_line);
2571 break;
2572 }
2573 uint prec = attr.m_precision;
2574 require(prec <= 6);
2575 uint flen = (1 + prec) / 2;
2576 uint len = 5 + flen;
2577 require(len <= 8);
2578 uchar val[8];
2579 NdbSqlUtil::pack_datetime2(s, val, prec);
2580 attr.set_value(row, val, len);
2581 }
2582 break;
2583 case NdbDictionary::Column::Timestamp2:
2584 {
2585 NdbSqlUtil::Timestamp2 s;
2586 Ndb_import_csv_error csv_error;
2587 if (!ndb_import_csv_parse_timestamp2(attr,
2588 datac,
2589 s,
2590 csv_error))
2591 {
2592 m_util.set_error_data(
2593 error, __LINE__, 0,
2594 "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2595 linenr, fieldnr, attr.m_sqltype,
2596 csv_error.error_text, csv_error.error_line);
2597 break;
2598 }
2599 uint prec = attr.m_precision;
2600 require(prec <= 6);
2601 uint flen = (1 + prec) / 2;
2602 uint len = 4 + flen;
2603 require(len <= 7);
2604 uchar val[7];
2605 NdbSqlUtil::pack_timestamp2(s, val, prec);
2606 attr.set_value(row, val, len);
2607 }
2608 break;
2609 case NdbDictionary::Column::Blob:
2610 case NdbDictionary::Column::Text:
2611 {
2612 const char* val = datac;
2613 attr.set_blob(row, val, length);
2614 }
2615 break;
2616 default:
2617 require(false);
2618 break;
2619 }
2620 data[length] = saveterm;
2621 if (m_util.has_error(error))
2622 {
2623 m_input.reject_line(line, field, error);
2624 line->m_reject = true;
2625 }
2626 }
2627
2628 void
eval_null(Row * row,Line * line,Field * field)2629 NdbImportCsv::Eval::eval_null(Row* row, Line* line, Field* field)
2630 {
2631 const Table& table = m_input.m_table;
2632 const Attrs& attrs = table.m_attrs;
2633 // internal counts file lines and fields from 0
2634 const uint64 lineno = m_input.m_startlineno + line->m_lineno;
2635 const uint fieldno = field->m_fieldno;
2636 // user wants the counts from 1
2637 const uint64 linenr = 1 + lineno;
2638 const uint fieldnr = 1 + fieldno;
2639 const Attr& attr = attrs[fieldno];
2640 Error error; // local error
2641 do
2642 {
2643 if (!attr.m_nullable)
2644 {
2645 m_util.set_error_data(
2646 error, __LINE__, 0,
2647 "line %" PRIu64 " field %u: setting non-nullable attr to NULL",
2648 linenr, fieldnr);
2649 break;
2650 }
2651 } while (0);
2652 if (m_util.has_error(error))
2653 {
2654 m_input.reject_line(line, field, error);
2655 line->m_reject = true;
2656 }
2657 attr.set_null(row, true);
2658 }
2659
2660 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Eval & eval)2661 operator<<(NdbOut& out, const NdbImportCsv::Eval& eval)
2662 {
2663 out << "eval ";
2664 return out;
2665 }
2666
2667 // output
2668
Output(NdbImportCsv & csv,const Spec & spec,const Table & table,Buf & buf)2669 NdbImportCsv::Output::Output(NdbImportCsv& csv,
2670 const Spec& spec,
2671 const Table& table,
2672 Buf& buf) :
2673 m_csv(csv),
2674 m_util(m_csv.m_util),
2675 m_spec(spec),
2676 m_table(table),
2677 m_buf(buf)
2678 {
2679 for (uint u = 0; u < g_bytecnt; u++)
2680 m_escapes[u] = 0;
2681 }
2682
2683 void
do_init()2684 NdbImportCsv::Output::do_init()
2685 {
2686 log_debug(1, "do_init");
2687 const Spec& spec = m_spec;
2688 for (uint u = 0; u < g_bytecnt; u++)
2689 m_escapes[u] = 0;
2690 if (spec.m_fields_escaped_by != 0) // should be
2691 {
2692 m_escapes[0] = '0';
2693 m_escapes[010] = 'b';
2694 m_escapes[012] = 'n';
2695 m_escapes[015] = 'r';
2696 m_escapes[011] = 't';
2697 m_escapes[032] = 'Z';
2698 if (spec.m_fields_enclosed_by != 0)
2699 {
2700 uchar quote = spec.m_fields_enclosed_by[0];
2701 m_escapes[quote] = quote;
2702 }
2703 uchar esc = spec.m_fields_escaped_by[0];
2704 m_escapes[esc] = esc;
2705 }
2706 }
2707
2708 void
add_header()2709 NdbImportCsv::Output::add_header()
2710 {
2711 const Table& table = m_table;
2712 const Attrs& attrs = table.m_attrs;
2713 const uint attrcnt = attrs.size();
2714 for (uint i = 0; i < attrcnt; i++)
2715 {
2716 const Attr& attr = attrs[i];
2717 if (i > 0)
2718 {
2719 add_fieldsep();
2720 }
2721 uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2722 char* bufptrc = (char*)bufptr;
2723 strcpy(bufptrc, attr.m_attrname.c_str());
2724 m_buf.m_len += strlen(bufptrc);
2725 }
2726 add_lineend();
2727 }
2728
2729 void
add_line(const Row * row)2730 NdbImportCsv::Output::add_line(const Row* row)
2731 {
2732 const Spec& spec = m_spec;
2733 const Table& table = m_table;
2734 const Attrs& attrs = table.m_attrs;
2735 const uint attrcnt = attrs.size();
2736 for (uint i = 0; i < attrcnt; i++)
2737 {
2738 const Attr& attr = attrs[i];
2739 if (i > 0)
2740 {
2741 add_fieldsep();
2742 }
2743 if (attr.m_quotable)
2744 {
2745 add_quote();
2746 }
2747 add_field(attr, row);
2748 if (attr.m_quotable && spec.m_fields_enclosed_by != 0)
2749 {
2750 add_quote();
2751 }
2752 }
2753 add_lineend();
2754 }
2755
2756 void
add_field(const Attr & attr,const Row * row)2757 NdbImportCsv::Output::add_field(const Attr& attr, const Row* row)
2758 {
2759 uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2760 char* bufptrc = (char*)bufptr;
2761 const uchar* rowptr = &row->m_data[attr.m_offset];
2762 switch (attr.m_type) {
2763 case NdbDictionary::Column::Int:
2764 {
2765 int32 val;
2766 require(attr.m_size == sizeof(val));
2767 memcpy(&val, rowptr, sizeof(val));
2768 sprintf(bufptrc, "%d", val);
2769 break;
2770 }
2771 break;
2772 case NdbDictionary::Column::Unsigned:
2773 {
2774 uint32 val;
2775 require(attr.m_size == sizeof(val));
2776 memcpy(&val, rowptr, sizeof(val));
2777 sprintf(bufptrc, "%u", val);
2778 break;
2779 }
2780 break;
2781 case NdbDictionary::Column::Bigint:
2782 {
2783 int64 val;
2784 require(attr.m_size == sizeof(val));
2785 memcpy(&val, rowptr, sizeof(val));
2786 sprintf(bufptrc, "%" PRId64, val);
2787 break;
2788 }
2789 break;
2790 case NdbDictionary::Column::Bigunsigned:
2791 {
2792 uint64 val;
2793 require(attr.m_size == sizeof(val));
2794 memcpy(&val, rowptr, sizeof(val));
2795 sprintf(bufptrc, "%" PRIu64, val);
2796 break;
2797 }
2798 break;
2799 case NdbDictionary::Column::Double:
2800 {
2801 double val;
2802 require(attr.m_size == sizeof(val));
2803 memcpy(&val, rowptr, sizeof(val));
2804 sprintf(bufptrc, "%.02f", val);
2805 break;
2806 }
2807 break;
2808 case NdbDictionary::Column::Varchar:
2809 {
2810 uint len = rowptr[0];
2811 add_char(&rowptr[1], len);
2812 break;
2813 }
2814 break;
2815 case NdbDictionary::Column::Longvarchar:
2816 {
2817 uint len = rowptr[0] + (rowptr[1] << 8);
2818 add_char(&rowptr[2], len);
2819 break;
2820 }
2821 break;
2822 case NdbDictionary::Column::Text:
2823 {
2824 require(attr.m_isblob);
2825 const Blob* blob = row->m_blobs[attr.m_blobno];
2826 add_char(blob->m_data, blob->m_blobsize);
2827 break;
2828 }
2829 default:
2830 require(false);
2831 break;
2832 }
2833 m_buf.m_len += strlen(bufptrc);
2834 }
2835
2836 void
add_char(const uchar * rowdata,uint len)2837 NdbImportCsv::Output::add_char(const uchar* rowdata, uint len)
2838 {
2839 log_debug_3("add_char " << len << " " << (char*)rowdata);
2840 const Spec& spec = m_spec;
2841 require(spec.m_fields_escaped_by != 0);
2842 uchar esc = spec.m_fields_escaped_by[0];
2843 uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2844 uchar* p = bufptr;
2845 for (uint i = 0; i < len; i++)
2846 {
2847 uchar c = rowdata[i];
2848 if (m_escapes[c])
2849 {
2850 *p++ = esc;
2851 *p++ = m_escapes[c];
2852 }
2853 else
2854 *p++ = c;
2855 }
2856 *p = 0;
2857 }
2858
2859 void
add_quote()2860 NdbImportCsv::Output::add_quote()
2861 {
2862 const Spec& spec = m_spec;
2863 if (spec.m_fields_enclosed_by != 0)
2864 {
2865 uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2866 char* bufptrc = (char*)bufptr;
2867 strcpy(bufptrc, (const char*)spec.m_fields_enclosed_by);
2868 m_buf.m_len += strlen(bufptrc);
2869 }
2870 }
2871
2872 void
add_fieldsep()2873 NdbImportCsv::Output::add_fieldsep()
2874 {
2875 const Spec& spec = m_spec;
2876 uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2877 char* bufptrc = (char*)bufptr;
2878 strcpy(bufptrc, (const char*)spec.m_fields_terminated_by);
2879 m_buf.m_len += strlen(bufptrc);
2880 }
2881
2882 void
add_lineend()2883 NdbImportCsv::Output::add_lineend()
2884 {
2885 const Spec& spec = m_spec;
2886 uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2887 char* bufptrc = (char*)bufptr;
2888 strcpy(bufptrc, (const char*)spec.m_lines_terminated_by);
2889 m_buf.m_len += strlen(bufptrc);
2890 }
2891
2892 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Output & output)2893 operator<<(NdbOut& out, const NdbImportCsv::Output& output)
2894 {
2895 out << "output";
2896 out << " len=" << output.m_buf.m_len << " ";
2897 return out;
2898 }
2899
2900 // unittest
2901
2902 #ifdef TEST_NDBIMPORTCSV
2903
2904 #include <NdbTap.hpp>
2905
2906 typedef NdbImport::OptCsv OptCsv;
2907 typedef NdbImportUtil::Name UtilName;
2908 typedef NdbImportUtil::Buf UtilBuf;
2909 typedef NdbImportUtil::File UtilFile;
2910 typedef NdbImportUtil::Attr UtilAttr;
2911 typedef NdbImportUtil::Attrs UtilAttrs;
2912 typedef NdbImportUtil::Table UtilTable;
2913 typedef NdbImportUtil::RowList UtilRowList;
2914 typedef NdbImportUtil::RowMap UtilRowMap;
2915 typedef NdbImportUtil::Stats UtilStats;
2916 typedef NdbImportCsv::Spec CsvSpec;
2917 typedef NdbImportCsv::Input CsvInput;
2918 typedef NdbImportCsv::Line CsvLine;
2919 typedef NdbImportCsv::Field CsvField;
2920
2921 static void
makeoptcsv(OptCsv & optcsv)2922 makeoptcsv(OptCsv& optcsv)
2923 {
2924 optcsv.m_fields_terminated_by = ",";
2925 optcsv.m_fields_enclosed_by = "\"";
2926 optcsv.m_fields_optionally_enclosed_by = "\"";
2927 optcsv.m_fields_escaped_by = "\\\\";
2928 optcsv.m_lines_terminated_by = "\\n";
2929 }
2930
2931 // table (a int unsigned primary key, b varchar(10) not null)
2932
2933 static void
maketable(UtilTable & table)2934 maketable(UtilTable& table)
2935 {
2936 table.add_pseudo_attr("a", NdbDictionary::Column::Unsigned);
2937 table.add_pseudo_attr("b", NdbDictionary::Column::Varchar, 10);
2938 }
2939
2940 struct MyRes {
2941 uint fieldcnt;
2942 const char* field[20]; // fields, 0 for NULL
MyResMyRes2943 MyRes(uint cnt, ...) {
2944 va_list ap;
2945 va_start(ap, cnt);
2946 fieldcnt = cnt;
2947 for (uint n = 0; n < cnt; n++) {
2948 const char* f = va_arg(ap, const char*);
2949 field[n] = f;
2950 }
2951 }
2952 };
2953
2954 struct MyCsv {
2955 uint error; // 0-ok 1-error
2956 uint linecnt; // valid lines
2957 uint partial; // bytes in last partial line
2958 const char* buf;
2959 MyRes res;
2960 };
2961
2962 static MyCsv mycsvlist[] = {
2963 { 0, 0, 0, "",
2964 MyRes(0) },
2965 { 0, 1, 0, "123,abc\n",
2966 MyRes(2, "123", "abc") },
2967 { 0, 2, 0, "123,abc\n456,def\n",
2968 MyRes(4, "123", "abc", "456", "def") },
2969 { 0, 1, 7, "123,abc\n456,def",
2970 MyRes(2, "123", "abc") },
2971 { 0, 2, 0, "123,\"abc\"\n456,def\n",
2972 MyRes(4, "123", "abc", "456", "def") },
2973 { 0, 2, 0, "123,\"a\"\"c\"\n456,def\n",
2974 MyRes(4, "123", "a\"c", "456", "def") },
2975 { 0, 1, 0, "123,\"a,c\"\n",
2976 MyRes(2, "123", "a,c") },
2977 { 0, 1, 0, "123,\\N\n",
2978 MyRes(2, "123", 0) },
2979 { 0, 1, 0, "123,\"\\N\"\n",
2980 MyRes(2, "123", 0) },
2981 { 0, 1, 0, "123,\\N\\N\n",
2982 MyRes(2, "123", "NN") },
2983 { 0, 1, 0, "123,\\0\\b\\n\\r\\t\\Z\\N\n",
2984 MyRes(2, "123", "\000\010\012\015\011\032N") },
2985 };
2986
2987 static const uint mycsvcnt = sizeof(mycsvlist)/sizeof(mycsvlist[0]);
2988
2989 static int
testinput1()2990 testinput1()
2991 {
2992 NdbImportUtil util;
2993 NdbOut& out = *util.c_log.out;
2994 util.c_opt.m_log_level = 4;
2995 out << "testinput1" << endl;
2996 NdbImportCsv csv(util);
2997 OptCsv optcsv;
2998 makeoptcsv(optcsv);
2999 CsvSpec csvspec;
3000 require(csv.set_spec(csvspec, optcsv, OptCsv::ModeInput) == 0);
3001 UtilTable table;
3002 maketable(table);
3003 UtilStats stats(util);
3004 for (uint i = 0; i < mycsvcnt; i++)
3005 {
3006 out << "case " << i << endl;
3007 const MyCsv& mycsv = mycsvlist[i];
3008 UtilBuf buf;
3009 buf.alloc(1024, 1);
3010 buf.copy((const uchar*)mycsv.buf, strlen(mycsv.buf));
3011 const uchar* bufdata = &buf.m_data[buf.m_start];
3012 const char* bufdatac = (const char*)bufdata;
3013 uint n = strlen(bufdatac);
3014 if (n != 0 && bufdatac[n-1] == '\n')
3015 out << bufdatac;
3016 else
3017 out << bufdatac << "\\c" << endl;
3018 UtilRowList rows_out;
3019 UtilRowList rows_reject;
3020 UtilRowMap rowmap_in(util);
3021 CsvInput input(csv,
3022 "csvinput",
3023 csvspec,
3024 table,
3025 buf,
3026 rows_out,
3027 rows_reject,
3028 rowmap_in,
3029 stats);
3030 input.do_init();
3031 input.do_parse();
3032 if (!input.has_error())
3033 {
3034 require(mycsv.error == 0);
3035 }
3036 else
3037 {
3038 out << util.c_error << endl;
3039 require(mycsv.error == 1);
3040 }
3041 require(input.m_line_list.cnt() == mycsv.linecnt);
3042 const MyRes& myres = mycsv.res;
3043 uint fieldcnt = 0;
3044 CsvLine* line = input.m_line_list.front();
3045 while (line != 0)
3046 {
3047 CsvField* field = line->m_field_list.front();
3048 while (field != 0)
3049 {
3050 require(fieldcnt < myres.fieldcnt);
3051 const char* myfield = myres.field[fieldcnt];
3052 if (field->m_null)
3053 {
3054 require(myfield == 0);
3055 }
3056 else
3057 {
3058 require(myfield != 0);
3059 uint pos = field->m_pack_pos;
3060 uint end = field->m_pack_end;
3061 uint len = end - pos;
3062 require(memcmp(&bufdata[pos], myfield, len) == 0);
3063 }
3064 fieldcnt++;
3065 field = field->next();
3066 }
3067 line = line->next();
3068 }
3069 require(fieldcnt == myres.fieldcnt);
3070 require(buf.m_tail <= buf.m_len);
3071 require(buf.m_len - buf.m_tail == mycsv.partial);
3072 input.free_line_list(input.m_line_list);
3073 require(input.balanced());
3074 }
3075 return 0;
3076 }
3077
3078 static int
testinput2()3079 testinput2()
3080 {
3081 NdbImportUtil util;
3082 NdbOut& out = *util.c_log.out;
3083 util.c_opt.m_log_level = 2;
3084 util.c_opt.m_abort_on_error = 1;
3085 out << "testinput2" << endl;
3086 const char* path = "test.csv";
3087 struct stat st;
3088 if (stat(path, &st) == -1)
3089 {
3090 out << path << ": skip on errno " << errno << endl;
3091 return 0;
3092 }
3093 NdbImportCsv csv(util);
3094 OptCsv optcsv;
3095 makeoptcsv(optcsv);
3096 CsvSpec csvspec;
3097 require(csv.set_spec(csvspec, optcsv, OptCsv::ModeInput) == 0);
3098 UtilTable table;
3099 maketable(table);
3100 UtilBuf* buf[2];
3101 buf[0] = new UtilBuf(true);
3102 buf[1] = new UtilBuf(true);
3103 buf[0]->alloc(4096, 4);
3104 buf[1]->alloc(4096, 4);
3105 UtilRowList rows_out;
3106 UtilRowList rows_reject;
3107 UtilRowMap rowmap_in(util);
3108 UtilStats stats(util);
3109 CsvInput* input[2];
3110 input[0] = new CsvInput(csv, "csvinput-0", csvspec, table, *buf[0],
3111 rows_out, rows_reject, rowmap_in, stats);
3112 input[1] = new CsvInput(csv, "csvinput-1", csvspec, table, *buf[1],
3113 rows_out, rows_reject, rowmap_in, stats);
3114 input[0]->do_init();
3115 input[1]->do_init();
3116 UtilFile file(util, util.c_error);
3117 out << "read " << path << endl;
3118 file.set_path(path);
3119 require(file.do_open(UtilFile::Read_flags) == 0);
3120 uint totlen = 0;
3121 uint totread = 0;
3122 uint totlines = 0;
3123 uint i = 0;
3124 while (1)
3125 {
3126 uint j = 1 - i;
3127 CsvInput& input1 = *input[i];
3128 UtilBuf& b1 = *buf[i];
3129 UtilBuf& b2 = *buf[j];
3130 b1.reset();
3131 int ret = file.do_read(b1);
3132 require(ret == 0);
3133 totlen += b1.m_len;
3134 if (totread != 0)
3135 {
3136 out << "movetail" << " src=" << b2 << " dst=" << b1 << endl;
3137 require(b2.movetail(b1) == 0);
3138 }
3139 input1.do_parse();
3140 totread++;
3141 totlines += input1.m_line_list.cnt();
3142 input1.free_line_list(input1.m_line_list);
3143 if (b1.m_eof)
3144 break;
3145 i = j;
3146 }
3147 require(totlen == st.st_size);
3148 out << "len=" << totlen << " reads=" << totread
3149 << " lines=" << totlines << endl;
3150 require(file.do_close() == 0);
3151 return 0;
3152 }
3153
3154 static int
testmain()3155 testmain()
3156 {
3157 ndb_init();
3158 #ifdef VM_TRACE
3159 signal(SIGABRT, SIG_DFL);
3160 signal(SIGSEGV, SIG_DFL);
3161 #endif
3162 if (testinput1() != 0)
3163 return -1;
3164 if (testinput2() != 0)
3165 return -1;
3166 return 0;
3167 }
3168
TAPTEST(NdbImportCsv)3169 TAPTEST(NdbImportCsv)
3170 {
3171 int ret = testmain();
3172 return (ret == 0);
3173 }
3174
3175 #endif
3176