1 /*
2    Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "m_ctype.h"
26 #include "my_byteorder.h"
27 #include "my_sys.h"
28 #include <inttypes.h>
29 #include <NdbSqlUtil.hpp>
30 #include <decimal_utils.hpp>
31 #include "NdbImportCsv.hpp"
32 #include "NdbImportCsvGram.hpp"
33 // STL
34 #include <cmath>
35 
36 extern int NdbImportCsv_yyparse(NdbImportCsv::Parse& csvparse);
37 #ifdef VM_TRACE
38 extern int NdbImportCsv_yydebug;
39 #endif
40 
NdbImportCsv(NdbImportUtil & util)41 NdbImportCsv::NdbImportCsv(NdbImportUtil& util) :
42   m_util(util),
43   m_error(m_util.c_error)
44 {
45 #ifdef VM_TRACE
46   NdbImportCsv_yydebug = 0;
47 #endif
48 }
49 
~NdbImportCsv()50 NdbImportCsv::~NdbImportCsv()
51 {
52 }
53 
54 // spec
55 
Spec()56 NdbImportCsv::Spec::Spec()
57 {
58   m_fields_terminated_by = 0;
59   m_fields_enclosed_by = 0;
60   m_fields_optionally_enclosed_by = 0;
61   m_fields_escaped_by = 0;
62   m_lines_terminated_by = 0;
63   m_fields_terminated_by_len = Inval_uint;
64   m_fields_enclosed_by_len = Inval_uint;
65   m_fields_optionally_enclosed_by_len = Inval_uint;
66   m_fields_escaped_by_len = Inval_uint;
67   m_lines_terminated_by_len = Inval_uint;
68 }
69 
~Spec()70 NdbImportCsv::Spec::~Spec()
71 {
72   delete [] m_fields_terminated_by;
73   delete [] m_fields_enclosed_by;
74   delete [] m_fields_optionally_enclosed_by;
75   delete [] m_fields_escaped_by;
76   delete [] m_lines_terminated_by;
77 }
78 
79 int
translate_escapes(const char * src,const uchar * & dst,uint & dstlen)80 NdbImportCsv::translate_escapes(const char* src,
81                                 const uchar*& dst,
82                                 uint& dstlen)
83 {
84   dst = 0;
85   dstlen = Inval_uint;
86   if (src != 0)
87   {
88     uint n = strlen(src);
89     uchar* tmpdst = new uchar [n + 1];  // cannot be longer than src
90     const char* p = src;
91     uchar* q = tmpdst;
92     while (*p != 0)
93     {
94       if (*p != '\\')
95       {
96         *q++ = (uchar)*p++;
97       }
98       else
99       {
100         // XXX check what mysqlimport translates
101         char c = *++p;
102         switch (c) {
103         case '\\':
104           *q++ = '\\';
105           break;
106         case 'n':
107           *q++ = '\n';
108           break;
109         case 'r':
110           *q++ = '\r';
111           break;
112         case 't':
113           *q++ = '\t';
114           break;
115         default:
116           m_util.set_error_usage(m_error, __LINE__,
117                                  "unknown escape '\\%c' (0x%x) in CSV option",
118                                  c, (uint)(unsigned char)c);
119           return -1;
120         }
121         p++;
122       }
123     }
124     // null-terminate for use as char*
125     *q = 0;
126     dst = tmpdst;
127     dstlen = q - tmpdst;
128   }
129   return 0;
130 }
131 
132 int
set_spec(Spec & spec,const OptCsv & optcsv,OptCsv::Mode mode)133 NdbImportCsv::set_spec(Spec& spec, const OptCsv& optcsv, OptCsv::Mode mode)
134 {
135   if (translate_escapes(optcsv.m_fields_terminated_by,
136                         spec.m_fields_terminated_by,
137                         spec.m_fields_terminated_by_len) == -1)
138     return -1;
139   if (translate_escapes(optcsv.m_fields_enclosed_by,
140                         spec.m_fields_enclosed_by,
141                         spec.m_fields_enclosed_by_len) == -1)
142     return -1;
143   if (translate_escapes(optcsv.m_fields_optionally_enclosed_by,
144                         spec.m_fields_optionally_enclosed_by,
145                         spec.m_fields_optionally_enclosed_by_len) == -1)
146     return -1;
147   if (translate_escapes(optcsv.m_fields_escaped_by,
148                         spec.m_fields_escaped_by,
149                         spec.m_fields_escaped_by_len) == -1)
150     return -1;
151   if (translate_escapes(optcsv.m_lines_terminated_by,
152                         spec.m_lines_terminated_by,
153                         spec.m_lines_terminated_by_len) == -1)
154     return -1;
155   int used[256];
156   for (uint i = 0; i < 256; i++)
157     used[i] = 0;
158   do {
159     // fields-terminated-by
160     {
161       if (spec.m_fields_terminated_by == 0 ||
162           spec.m_fields_terminated_by_len == 0)
163       {
164         const char* msg =
165           "fields-terminated-by cannot be empty";
166         m_util.set_error_usage(m_error, __LINE__, "%s", msg);
167         break;
168       }
169       uchar u = spec.m_fields_terminated_by[0];
170       if (used[u])
171       {
172         const char* msg =
173           "fields-terminated-by re-uses previous special char";
174         m_util.set_error_usage(m_error, __LINE__, "%s", msg);
175         break;
176       }
177       used[u] = T_FIELDSEP;
178     }
179     // fields-enclosed-by
180     {
181       if (spec.m_fields_enclosed_by != 0)
182       {
183         if (spec.m_fields_enclosed_by_len != 1)
184         {
185           const char* msg =
186             "fields-enclosed-by must be a single char";
187           m_util.set_error_usage(m_error, __LINE__, "%s", msg);
188           break;
189         }
190         uchar u = spec.m_fields_enclosed_by[0];
191         if (used[u])
192         {
193           const char* msg =
194             "fields-enclosed-by re-uses previous special char";
195           m_util.set_error_usage(m_error, __LINE__, "%s", msg);
196           break;
197         }
198         used[u] = T_QUOTE;
199       }
200     }
201     // fields-optionally-enclosed-by
202     {
203       if (spec.m_fields_optionally_enclosed_by != 0)
204       {
205         if (spec.m_fields_optionally_enclosed_by_len != 1)
206         {
207           const char* msg =
208             "fields-optionally-enclosed-by must be a single char";
209           m_util.set_error_usage(m_error, __LINE__, "%s", msg);
210           break;
211         }
212         uchar u = spec.m_fields_optionally_enclosed_by[0];
213         if (used[u] && used[u] != T_QUOTE)
214         {
215           const char* msg =
216             "fields-optionally-enclosed-by re-uses previous special char";
217           m_util.set_error_usage(m_error, __LINE__, "%s", msg);
218           break;
219         }
220         used[u] = T_QUOTE;
221       }
222     }
223     // fields-escaped-by
224     {
225       require(spec.m_fields_escaped_by != 0);
226       if (spec.m_fields_escaped_by_len != 1)
227       {
228         const char* msg =
229           "fields-escaped-by must be empty or a single char";
230         m_util.set_error_usage(m_error, __LINE__, "%s", msg);
231         break;
232       }
233       uchar u = spec.m_fields_escaped_by[0];
234       if (used[u])
235       {
236         const char* msg =
237           "fields-escaped-by re-uses previous special char";
238         m_util.set_error_usage(m_error, __LINE__, "%s", msg);
239         break;
240       }
241       used[u] = T_ESCAPE;
242     }
243     // lines terminated-by
244     {
245       require(spec.m_lines_terminated_by != 0);
246       if (spec.m_lines_terminated_by_len == 0)
247       {
248         const char* msg =
249           "lines-terminated-by cannot be empty";
250         m_util.set_error_usage(m_error, __LINE__, "%s", msg);
251         break;
252       }
253       uchar u = spec.m_lines_terminated_by[0];
254       if (used[u])
255       {
256         const char* msg =
257           "lines-terminated-by re-uses previous special char";
258         m_util.set_error_usage(m_error, __LINE__, "%s", msg);
259         break;
260       }
261       used[u] = T_LINEEND;
262     }
263     // adjust
264     if (mode == OptCsv::ModeInput)
265     {
266       /*
267        * fields-enclosed-by and fields-optionally-enclosed-by
268        * have exact same meaning
269        */
270       if (spec.m_fields_enclosed_by != 0 &&
271           spec.m_fields_optionally_enclosed_by != 0)
272       {
273         if (spec.m_fields_enclosed_by_len !=
274             spec.m_fields_optionally_enclosed_by_len ||
275             memcmp(spec.m_fields_enclosed_by,
276                    spec.m_fields_optionally_enclosed_by,
277                    spec.m_fields_enclosed_by_len) != 0)
278         {
279           const char* msg =
280             "conflicting fields-enclosed-by options";
281           m_util.set_error_usage(m_error, __LINE__, "%s", msg);
282           break;
283         }
284       }
285       else if (spec.m_fields_enclosed_by != 0)
286       {
287         // for completeness - will not be used
288         uchar* fields_optionally_enclosed_by =
289           new uchar [spec.m_fields_enclosed_by_len + 1];
290         memcpy(fields_optionally_enclosed_by,
291                spec.m_fields_enclosed_by,
292                spec.m_fields_enclosed_by_len + 1);
293         spec.m_fields_optionally_enclosed_by =
294           fields_optionally_enclosed_by;
295         spec.m_fields_optionally_enclosed_by_len =
296           spec.m_fields_enclosed_by_len;
297       }
298       else if (spec.m_fields_optionally_enclosed_by != 0)
299       {
300         uchar* fields_enclosed_by =
301           new uchar [spec.m_fields_optionally_enclosed_by_len + 1];
302         memcpy(fields_enclosed_by,
303                spec.m_fields_optionally_enclosed_by,
304                spec.m_fields_optionally_enclosed_by_len + 1);
305         spec.m_fields_enclosed_by =
306           fields_enclosed_by;
307         spec.m_fields_enclosed_by_len =
308           spec.m_fields_optionally_enclosed_by_len;
309       }
310     }
311     if (mode == OptCsv::ModeOutput)
312     {
313       // XXX later
314     }
315     return 0;
316   } while (0);
317   return -1;
318 }
319 
320 // alloc
321 
Alloc()322 NdbImportCsv::Alloc::Alloc()
323 {
324   m_alloc_data_cnt = 0;
325   m_alloc_field_cnt = 0;
326   m_alloc_line_cnt = 0;
327   m_free_data_cnt = 0;
328   m_free_field_cnt = 0;
329   m_free_line_cnt = 0;
330 }
331 
332 NdbImportCsv::Data*
alloc_data()333 NdbImportCsv::Alloc::alloc_data()
334 {
335   Data* data =  m_data_free.pop_front();
336   if (data == 0)
337     data = new Data;
338   else
339     new (data) Data;
340   m_alloc_data_cnt++;
341   return data;
342 }
343 
344 void
free_data_list(DataList & data_list)345 NdbImportCsv::Alloc::free_data_list(DataList& data_list)
346 {
347   m_free_data_cnt += data_list.cnt();
348   m_data_free.push_back_from(data_list);
349 }
350 
351 NdbImportCsv::Field*
alloc_field()352 NdbImportCsv::Alloc::alloc_field()
353 {
354   Field* field = m_field_free.pop_front();
355   if (field == 0)
356     field = new Field;
357   else
358     new (field) Field;
359   m_alloc_field_cnt++;
360   return field;
361 }
362 
363 void
free_field_list(FieldList & field_list)364 NdbImportCsv::Alloc::free_field_list(FieldList& field_list)
365 {
366   Field* field = field_list.front();
367   while (field != 0)
368   {
369     free_data_list(field->m_data_list);
370     field = field->next();
371   }
372   m_free_field_cnt += field_list.cnt();
373   m_field_free.push_back_from(field_list);
374 }
375 
376 void
free_field(Field * field)377 NdbImportCsv::Alloc::free_field(Field *field)
378 {
379   free_data_list(field->m_data_list);
380   m_field_free.push_back(field);
381   m_free_field_cnt++;
382 }
383 
384 NdbImportCsv::Line*
alloc_line()385 NdbImportCsv::Alloc::alloc_line()
386 {
387   Line* line = m_line_free.pop_front();
388   if (line == 0)
389     line = new Line;
390   else
391     new (line) Line;
392   m_alloc_line_cnt++;
393   return line;
394 }
395 
396 void
free_line_list(LineList & line_list)397 NdbImportCsv::Alloc::free_line_list(LineList& line_list)
398 {
399   Line* line = line_list.front();
400   while (line != 0)
401   {
402     free_field_list(line->m_field_list);
403     line = line->next();
404   }
405   m_free_line_cnt += line_list.cnt();
406   m_line_free.push_back_from(line_list);
407 }
408 
409 bool
balanced()410 NdbImportCsv::Alloc::balanced()
411 {
412   return
413     m_alloc_data_cnt == m_free_data_cnt &&
414     m_alloc_field_cnt == m_free_field_cnt &&
415     m_alloc_line_cnt == m_free_line_cnt;
416 }
417 
418 // input
419 
Input(NdbImportCsv & csv,const char * name,const Spec & spec,const Table & table,Buf & buf,RowList & rows_out,RowList & rows_reject,RowMap & rowmap_in,Stats & stats)420 NdbImportCsv::Input::Input(NdbImportCsv& csv,
421                            const char* name,
422                            const Spec& spec,
423                            const Table& table,
424                            Buf& buf,
425                            RowList& rows_out,
426                            RowList& rows_reject,
427                            RowMap& rowmap_in,
428                            Stats& stats) :
429   m_csv(csv),
430   m_util(m_csv.m_util),
431   m_name(name),
432   m_spec(spec),
433   m_table(table),
434   m_buf(buf),
435   m_rows_out(rows_out),
436   m_rows_reject(rows_reject),
437   m_rowmap_in(rowmap_in)
438 {
439   m_parse = new Parse(*this);
440   m_eval = new Eval(*this);
441   m_rows.set_stats(m_util.c_stats, Name(m_name, "rows"));
442   m_startpos = 0;
443   m_startlineno = 0;
444   m_ignore_lines = 0;
445 }
446 
~Input()447 NdbImportCsv::Input::~Input()
448 {
449   delete m_parse;
450   delete m_eval;
451 }
452 
453 void
do_init()454 NdbImportCsv::Input::do_init()
455 {
456   const Opt& opt = m_util.c_opt;
457   m_ignore_lines = opt.m_ignore_lines;
458   m_parse->do_init();
459   m_eval->do_init();
460 }
461 
462 /*
463  * Adjust counters at resume.  Argument is first range in old
464  * rowmap.  Input file seek is done by caller.
465  */
466 void
do_resume(Range range_in)467 NdbImportCsv::Input::do_resume(Range range_in)
468 {
469   m_startpos = range_in.m_endpos;
470   m_startlineno = range_in.m_end + m_ignore_lines;
471 }
472 
473 void
do_parse()474 NdbImportCsv::Input::do_parse()
475 {
476 #ifdef VM_TRACE
477   NdbImportCsv_yydebug = (m_util.c_opt.m_log_level >= 4);
478 #endif
479   m_parse->do_parse();
480 #ifdef VM_TRACE
481   NdbImportCsv_yydebug = 0;
482 #endif
483 }
484 
485 void
do_eval()486 NdbImportCsv::Input::do_eval()
487 {
488   m_eval->do_eval();
489 }
490 
491 void
do_send(uint & curr,uint & left)492 NdbImportCsv::Input::do_send(uint& curr, uint& left)
493 {
494   const Opt& opt = m_util.c_opt;
495   RowList& rows_out = m_rows_out;       // shared
496   rows_out.lock();
497   curr = m_rows.cnt();
498   RowCtl ctl(opt.m_rowswait);
499   m_rows.pop_front_to(rows_out, ctl);
500   left = m_rows.cnt();
501   if (rows_out.m_foe)
502   {
503     log_debug(1, "consumer has stopped");
504     m_util.set_error_gen(m_error, __LINE__, "consumer has stopped");
505   }
506   rows_out.unlock();
507 }
508 
509 void
do_movetail(Input & input2)510 NdbImportCsv::Input::do_movetail(Input& input2)
511 {
512   Buf& buf1 = m_buf;
513   Buf& buf2 = input2.m_buf;
514   require(buf1.movetail(buf2) == 0);
515   buf1.m_pos = buf1.m_len;      // keep pos within new len
516   input2.m_startpos = m_startpos + buf1.m_len;
517   input2.m_startlineno = m_startlineno + m_line_list.cnt();
518   log_debug(1, "movetail " << " src: " << buf1 << " dst: " << buf2 <<
519                " startpos: " << m_startpos << "->" << input2.m_startpos <<
520                " startline: " << m_startlineno << "->" << input2.m_startlineno);
521 }
522 
523 void
reject_line(const Line * line,const Field * field,const Error & error)524 NdbImportCsv::Input::reject_line(const Line* line,
525                                  const Field* field,
526                                  const Error& error)
527 {
528   const Opt& opt = m_util.c_opt;
529   RowList& rows_reject = m_rows_reject;
530   rows_reject.lock();
531   // write reject row first
532   const Table& table = m_util.c_reject_table;
533   Row* rejectrow = m_util.alloc_row(table);
534   rejectrow->m_rowid = m_startlineno + line->m_lineno - m_ignore_lines;
535   rejectrow->m_linenr = 1 + m_startlineno + line->m_lineno;
536   rejectrow->m_startpos = m_startpos + line->m_pos;
537   rejectrow->m_endpos = m_startpos + line->m_end;
538   const Buf& buf = m_buf;
539   const uchar* bufdata = &buf.m_data[buf.m_start];
540   const char* bufdatac = (const char*)bufdata;
541   const char* reject = &bufdatac[line->m_pos];
542   uint32 rejectlen = line->m_end - line->m_pos;
543   m_util.set_reject_row(rejectrow, Inval_uint32, error, reject, rejectlen);
544   require(rows_reject.push_back(rejectrow));
545   // error if rejects exceeded
546   if (rows_reject.totcnt() > opt.m_rejects)
547   {
548     m_util.set_error_data(m_error, __LINE__, 0,
549                           "reject limit %u exceeded", opt.m_rejects);
550   }
551   rows_reject.unlock();
552 }
553 
554 void
print(NdbOut & out)555 NdbImportCsv::Input::print(NdbOut& out)
556 {
557   typedef NdbImportCsv::Line Line;
558   typedef NdbImportCsv::Field Field;
559   const NdbImportCsv::Buf& buf = m_buf;
560   const uchar* bufdata = &buf.m_data[buf.m_start];
561   const char* bufdatac = (const char*)bufdata;
562   LineList& line_list = m_line_list;
563   out << "input:" << endl;
564   out << "len=" << m_buf.m_len << endl;
565   uint n = strlen(bufdatac);
566   if (n != 0 && bufdatac[n-1] == '\n')
567     out << bufdatac;
568   else
569     out << bufdatac << "\\c" << endl;
570   out << "linecnt=" << line_list.cnt();
571   Line* line = line_list.front();
572   while (line != 0)
573   {
574     out << endl;
575     out << "lineno=" << line->m_lineno;
576     out << " pos=" << line->m_pos;
577     out << " length=" << line->m_end - line->m_pos;
578     out << " fieldcnt=" << line->m_field_list.cnt();
579     Field* field = line->m_field_list.front();
580     while (field != 0)
581     {
582       out << endl;
583       uint pos = field->m_pos;
584       uint end = field->m_end;
585       uint pack_pos = field->m_pack_pos;
586       uint pack_end = field->m_pack_end;
587       char b[4096];
588       snprintf(b, sizeof(b), "%.*s", pack_end - pack_pos, &bufdatac[pack_pos]);
589       out << "fieldno=" << field->m_fieldno;
590       out << " pos=" << pos;
591       out << " length=" << end - pos;
592       out << " pack_pos=" << pack_pos;
593       out << " pack_length=" << pack_end - pack_pos;
594       out << " null=" << field->m_null;
595       out << " data=" << b;
596       field = field->next();
597     }
598     line = line->next();
599   }
600   out << endl;
601   require(false);
602 }
603 
604 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Input & input)605 operator<<(NdbOut& out, const NdbImportCsv::Input& input)
606 {
607   out << input.m_name;
608   out << " len=" << input.m_buf.m_len;
609   out << " linecnt=" << input.m_line_list.cnt() << " ";
610   return out;
611 }
612 
613 // parse
614 
Parse(Input & input)615 NdbImportCsv::Parse::Parse(Input& input) :
616   m_input(input),
617   m_csv(m_input.m_csv),
618   m_util(m_input.m_util),
619   m_error(m_input.m_error)
620 {
621   m_stacktop = 0;
622   m_state[m_stacktop] = State_plain;
623   m_last_token = 0;
624 }
625 
626 void
do_init()627 NdbImportCsv::Parse::do_init()
628 {
629   log_debug(1, "do_init");
630   const Spec& spec = m_input.m_spec;
631   for (int s = 0; s < g_statecnt; s++)
632   {
633     /*
634      * NUL byte 0x00 can be represented as NUL, \NUL, or \0
635      * where the first two contain a literal NUL byte 0x00.
636      * The T_NUL token is used to avoid branching in the normal
637      * case where the third printable format is used.
638      */
639     m_trans[s][0] = T_NUL;
640   }
641   for (uint u = 1; u < g_bytecnt; u++)
642   {
643     m_trans[State_plain][u] = T_DATA;
644     m_trans[State_quote][u] = T_DATA;
645     m_trans[State_escape][u] = T_BYTE;
646   }
647   {
648     const uchar* p = spec.m_fields_terminated_by;
649     const uint len = spec.m_fields_terminated_by_len;
650     require(p != 0 && p[0] != 0 && len == strlen((const char*)p));
651     uint u = p[0];
652     // avoid parse-time branch in the common case
653     m_trans[State_plain][u] = len == 1 ? T_FIELDSEP : T_FIELDSEP2;
654     m_trans[State_quote][u] = T_DATA;
655     m_trans[State_escape][u] = T_BYTE;
656   }
657   {
658     const uchar* p = spec.m_fields_optionally_enclosed_by;
659     if (p != 0 && p[0] != 0)
660     {
661       require(p[1] == 0);
662       uint u = p[0];
663       m_trans[State_plain][u] = T_QUOTE;
664       m_trans[State_quote][u] = T_QUOTEQUOTE;
665       m_trans[State_escape][u] = T_BYTE;
666     }
667   }
668   {
669     const uchar* p = spec.m_fields_escaped_by;
670     require(p != 0);
671     if (p[0] != 0)
672     {
673       require(p[1] == 0);
674       uint u = p[0];
675       m_trans[State_plain][u] = T_ESCAPE;
676       m_trans[State_quote][u] = T_ESCAPE;
677       m_trans[State_escape][u] = T_BYTE;
678     }
679   }
680   {
681     const uchar* p = spec.m_lines_terminated_by;
682     const uint len = spec.m_lines_terminated_by_len;
683     require(p != 0 && p[0] != 0 && len == strlen((const char*)p));
684     uint u = p[0];
685     // avoid parse-time branch in the common case
686     m_trans[State_plain][u] = len == 1 ? T_LINEEND : T_LINEEND2;
687     m_trans[State_quote][u] = T_DATA;
688     m_trans[State_escape][u] = T_BYTE;
689   }
690   // escape (\N is special)
691   {
692     const uchar* p = spec.m_fields_escaped_by;
693     for (uint u = 0; u < g_bytecnt; u++)
694       m_escapes[u] = u;
695     require(p != 0);
696     if (p[0] != 0)
697     {
698       m_escapes[(int)'0'] = 000;  // NUL
699       m_escapes[(int)'b'] = 010;  // BS
700       m_escapes[(int)'n'] = 012;  // NL
701       m_escapes[(int)'r'] = 015;  // CR
702       m_escapes[(int)'t'] = 011;  // TAB
703       m_escapes[(int)'Z'] = 032;  // ^Z
704     }
705   }
706 }
707 
708 void
push_state(State state)709 NdbImportCsv::Parse::push_state(State state)
710 {
711   require(m_stacktop + 1 < g_stackmax);
712   m_state[++m_stacktop] = state;
713   log_debug_3("push " << g_str_state(m_state[m_stacktop-1])
714               << "->" << g_str_state(m_state[m_stacktop]));
715 }
716 
717 void
pop_state()718 NdbImportCsv::Parse::pop_state()
719 {
720   require(m_stacktop > 0);
721   m_stacktop--;
722   log_debug_3("pop " << g_str_state(m_state[m_stacktop])
723               << "<-" << g_str_state(m_state[m_stacktop+1]));
724 }
725 
726 void
do_parse()727 NdbImportCsv::Parse::do_parse()
728 {
729   log_debug(2, "do_parse");
730   m_input.free_line_list(m_input.m_line_list);
731   m_input.free_line_list(m_line_list);
732   m_input.free_field_list(m_field_list);
733   m_input.free_data_list(m_data_list);
734   m_stacktop = 0;
735   m_state[m_stacktop] = State_plain;
736   Buf& buf = m_input.m_buf;
737   buf.m_pos = 0;
738   int ret = 0;
739   if (buf.m_len != 0)
740     ret = NdbImportCsv_yyparse(*this);
741   log_debug(1, "parse ret=" << ret);
742   if (ret == 0)
743   {
744     require(m_last_token == 0);
745     buf.m_tail = buf.m_len;
746   }
747   else if (!m_util.has_error())
748   {
749     // last parsed line
750     Line* line = m_line_list.back();
751     if (line != 0)
752     {
753       buf.m_tail = line->m_end;
754       m_input.m_line_list.push_back_from(m_line_list);
755       m_input.free_field_list(m_field_list);
756       m_input.free_data_list(m_data_list);
757     }
758     else
759     {
760       uint64 abspos = m_input.m_startpos;
761       uint64 abslineno = 1 + m_input.m_startlineno;
762       m_util.set_error_data(m_error, __LINE__, 0,
763                             "parse error at line=%" PRIu64 ": pos=%" PRIu64 ":"
764                             " CSV page contains no complete record"
765                             " (buffer too small"
766                             " or missing last line terminator)",
767                             abslineno, abspos);
768       return;
769     }
770   }
771   /*
772    * Pack data parts into fields.  Modifies buf data and cannot
773    * be done before accepted lines and fields are known.  Otherwise
774    * movetail() passes garbage to next worker.
775    */
776   {
777     Line* line = m_input.m_line_list.front();
778     while (line != 0)
779     {
780       Field* field = line->m_field_list.front();
781       while (field != 0)
782       {
783         if (field->m_data_list.cnt() != 0)
784           pack_field(field);
785         field = field->next();
786       }
787       line = line->next();
788     }
789   }
790 }
791 
792 int
do_lex(YYSTYPE * lvalp)793 NdbImportCsv::Parse::do_lex(YYSTYPE* lvalp)
794 {
795   log_debug_3("do_lex");
796   const Spec& spec = m_input.m_spec;
797   Buf& buf = m_input.m_buf;
798   const uchar* bufdata = &buf.m_data[buf.m_start];
799   State state = m_state[m_stacktop];
800   const int* trans = m_trans[state];
801   const uint pos = buf.m_pos;
802   uint len = 0;
803   uint end = pos;
804   uint u = bufdata[pos];
805   int token = trans[u];
806   switch (token) {
807   case T_FIELDSEP:
808     len = 1;
809     end += len;
810     break;
811   case T_FIELDSEP2:
812     len = spec.m_fields_terminated_by_len;
813     if (len <= buf.m_len - buf.m_pos &&
814         memcmp(&bufdata[pos], spec.m_fields_terminated_by, len) == 0)
815     {
816       end += len;
817       token = T_FIELDSEP;
818       break;
819     }
820     len = 1;
821     end += len;
822     token = T_DATA;
823     break;
824   case T_QUOTE:
825     push_state(State_quote);
826     require(spec.m_fields_enclosed_by_len == 1);
827     len = 1;
828     end += len;
829     break;
830   case T_QUOTEQUOTE:
831     require(spec.m_fields_enclosed_by_len == 1);
832     if (bufdata[pos + 1] == u)
833     {
834       token = T_DATA;
835       len = 1;
836       end += 2;
837       break;
838     }
839     token = T_QUOTE;
840     len = 1;
841     end += len;
842     pop_state();
843     break;
844   case T_ESCAPE:
845     push_state(State_escape);
846     require(spec.m_fields_escaped_by_len == 1);
847     len = 1;
848     end += len;
849     break;
850   case T_LINEEND:
851     len = 1;
852     end += len;
853     break;
854   case T_LINEEND2:
855     len = spec.m_lines_terminated_by_len;
856     if (len <= buf.m_len - buf.m_pos &&
857         memcmp(&bufdata[pos], spec.m_lines_terminated_by, len) == 0)
858     {
859       end += len;
860       token = T_LINEEND;
861       break;
862     }
863     len = 1;
864     end += len;
865     token = T_DATA;
866     break;
867   case T_DATA:
868     do
869     {
870       len++;
871       u = bufdata[pos + len];
872     } while (trans[u] == T_DATA);
873     end += len;
874     break;
875   case T_BYTE:
876     len = 1;
877     end += len;
878     pop_state();
879     break;
880   case T_NUL:
881     if (buf.m_pos == buf.m_len)
882     {
883       token = 0;
884       break;
885     }
886     if (m_state[m_stacktop] != State_escape)
887       token = T_DATA;
888     else
889     {
890       token = T_BYTE;
891       pop_state();
892     }
893     len = 1;
894     end += len;
895     break;
896   }
897   Chunk chunk;
898   chunk.m_pos = pos;
899   chunk.m_len = len;
900   chunk.m_end = end;
901   log_debug_3("do_lex: token=" << token <<
902               " pos=" << chunk.m_pos << " len=" << len << " end=" << end);
903   buf.m_pos = end;
904   lvalp->m_chunk = chunk;
905   m_last_token = token;
906   return token;
907 }
908 
909 void
do_error(const char * msg)910 NdbImportCsv::Parse::do_error(const char* msg)
911 {
912   if (m_last_token != 0)
913   {
914     const Buf& buf = m_input.m_buf;
915     log_debug(2, "parse error at buf:" << buf);
916     uint64 abspos = m_input.m_startpos + buf.m_pos;
917     uint64 abslineno = m_input.m_startlineno + m_line_list.cnt();
918     m_util.set_error_data(m_error, __LINE__, 0,
919                           "parse error at line=%" PRIu64 ": pos=%" PRIu64 ": %s",
920                           abslineno, abspos, msg);
921   }
922 }
923 
924 void
pack_field(Field * field)925 NdbImportCsv::Parse::pack_field(Field* field)
926 {
927   Buf& buf = m_input.m_buf;
928   uchar* bufdata = &buf.m_data[buf.m_start];
929   DataList& data_list = field->m_data_list;
930   Data* data = data_list.front();
931   require(data != 0);
932   // if field is exactly "\N" then it becomes NULL
933   if (data->next() == 0 &&
934       data->m_escape &&
935       bufdata[data->m_pos] == 'N')
936   {
937     field->m_pack_pos = Inval_uint;
938     field->m_pack_end = Inval_uint;
939     field->m_null = true;
940     return;
941   }
942   // handle multiple pieces and normal escapes
943   uint pack_pos = data->m_pos;
944   uint pack_end = pack_pos;
945   while (data != 0)
946   {
947     uint len = data->m_len;
948     memmove(&bufdata[pack_end], &bufdata[data->m_pos], len);
949     if (data->m_escape)
950     {
951       require(len == 1);
952       bufdata[pack_end] = m_escapes[bufdata[pack_end]];
953     }
954     pack_end += len;
955     data = data->next();
956   }
957   field->m_pack_pos = pack_pos;
958   field->m_pack_end = pack_end;
959   field->m_null = false;
960 }
961 
962 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Parse & parse)963 operator<<(NdbOut& out, const NdbImportCsv::Parse& parse)
964 {
965   const NdbImportCsv::Buf& buf = parse.m_input.m_buf;
966   out << "parse " << parse.m_input.m_name;
967   NdbImportCsv::Parse::State state = parse.m_state[parse.m_stacktop];
968   out << " [" << NdbImportCsv::g_str_state(state) << "]";
969   if (buf.m_len != 0)
970   {
971     const uchar* bufdata = &buf.m_data[buf.m_start];
972     char chr[20];
973     int c = bufdata[buf.m_pos];
974     if (isascii(c) && isprint(c))
975       sprintf(chr, "%c", c);
976     else if (c == '\n')
977       sprintf(chr, "%s", "\\n");
978     else
979       sprintf(chr, "0x%02x", c);
980     out << " len=" << buf.m_len << " pos=" << buf.m_pos;
981     out << " chr=" << chr << " ";
982   }
983   return out;
984 }
985 
986 const char*
g_str_state(Parse::State state)987 NdbImportCsv::g_str_state(Parse::State state)
988 {
989   const char* str = 0;
990   switch (state) {
991   case Parse::State_plain:
992     str = "plain";
993     break;
994   case Parse::State_quote:
995     str = "quote";
996     break;
997   case Parse::State_escape:
998     str = "escape";
999     break;
1000   }
1001   require(str != 0);
1002   return str;
1003 }
1004 
1005 // eval
1006 
Eval(Input & input)1007 NdbImportCsv::Eval::Eval(Input& input) :
1008   m_input(input),
1009   m_csv(m_input.m_csv),
1010   m_util(m_input.m_util),
1011   m_error(m_input.m_error)
1012 {
1013 }
1014 
~Eval()1015 NdbImportCsv::Eval::~Eval()
1016 {
1017 }
1018 
1019 void
do_init()1020 NdbImportCsv::Eval::do_init()
1021 {
1022 }
1023 
1024 void
do_eval()1025 NdbImportCsv::Eval::do_eval()
1026 {
1027   const Opt& opt = m_util.c_opt;
1028   const Table& table = m_input.m_table;
1029   LineList& line_list = m_input.m_line_list;
1030   Line* line = line_list.front();
1031   RowList rows_chunk;
1032   while (line != 0)
1033   {
1034     const uint64 ignore_lines = m_input.m_ignore_lines;
1035     const uint64 lineno = m_input.m_startlineno + line->m_lineno;
1036     if (lineno < ignore_lines)
1037     {
1038       line = line->next();
1039       continue;
1040     }
1041     if (opt.m_resume)
1042     {
1043       RowMap& rowmap_in = m_input.m_rowmap_in;
1044       const uint64 rowid = lineno - ignore_lines;
1045       if (!rowmap_in.empty())
1046       {
1047         bool found = rowmap_in.remove(rowid);
1048         if (found)
1049         {
1050           line = line->next();
1051           log_debug(1, "skip old rowid: " << rowid);
1052           continue;
1053         }
1054       }
1055     }
1056     if (rows_chunk.cnt() == 0)
1057     {
1058       require(line->m_lineno < line_list.cnt());
1059       uint cnt = line_list.cnt() - line->m_lineno;
1060       if (cnt > opt.m_alloc_chunk)
1061         cnt = opt.m_alloc_chunk;
1062       m_util.alloc_rows(table, cnt, rows_chunk);
1063     }
1064     Row* row = rows_chunk.pop_front();
1065     eval_line(row, line);
1066     if (line->m_reject)
1067     {
1068       m_util.free_row(row);
1069     }
1070     // stop loading if error
1071     if (m_input.has_error())
1072     {
1073       break;
1074     }
1075     line = line->next();
1076   }
1077   m_input.free_line_list(m_input.m_line_list);
1078 }
1079 
1080 void
eval_line(Row * row,Line * line)1081 NdbImportCsv::Eval::eval_line(Row* row, Line* line)
1082 {
1083   const Table& table = m_input.m_table;
1084   const Attrs& attrs = table.m_attrs;
1085   const uint attrcnt = attrs.size();
1086   const uint64 lineno = m_input.m_startlineno + line->m_lineno;
1087   const uint64 linenr = 1 + lineno;
1088   row->m_rowid = lineno - m_input.m_ignore_lines;
1089   row->m_linenr = linenr;
1090   row->m_startpos = m_input.m_startpos + line->m_pos;
1091   row->m_endpos = m_input.m_startpos + line->m_end;
1092   uint fieldcnt = line->m_field_list.cnt();
1093   const uint has_hidden_pk = (uint)table.m_has_hidden_pk;
1094   const uint expect_attrcnt = attrcnt - has_hidden_pk;
1095   Error error;  // local error
1096   do
1097   {
1098     if (fieldcnt < expect_attrcnt)
1099     {
1100       m_util.set_error_data(
1101         error, __LINE__, 0,
1102         "line %" PRIu64 ": too few fields (%u < %u)",
1103         linenr, fieldcnt, attrcnt);
1104       break;
1105     }
1106     if(fieldcnt == expect_attrcnt + 1 &&
1107        line->m_field_list.final_field_is_empty())
1108     {
1109       /* Handle field terminator at end of line */
1110       Field * empty_field = line->m_field_list.pop_back();
1111       fieldcnt--;
1112       m_input.free_field(empty_field);
1113       break;
1114     }
1115     if (fieldcnt > expect_attrcnt)
1116     {
1117       m_util.set_error_data(
1118         error, __LINE__, 0,
1119         "line %" PRIu64 ": too many fields (%u > %u)",
1120         linenr, fieldcnt, attrcnt);
1121       break;
1122     }
1123   } while (0);
1124   if (m_util.has_error(error))
1125   {
1126     m_input.reject_line(line, (Field*)0, error);
1127     line->m_reject = true;
1128   }
1129   Field* field = line->m_field_list.front();
1130   for (uint n = 0; n < fieldcnt; n++)
1131   {
1132     if (line->m_reject) // wrong field count or eval error
1133       break;
1134     require(field != 0);
1135     require(field->m_fieldno == n);
1136     if (!field->m_null)
1137       eval_field(row, line, field);
1138     else
1139       eval_null(row, line, field);
1140     field = field->next();
1141   }
1142   if (!line->m_reject)
1143   {
1144     require(field == 0);
1145   }
1146   if (has_hidden_pk)
1147   {
1148     /*
1149      * CSV has no access to Ndb (in fact there may not be any Ndb
1150      * object e.g. in CSV input -> CSV output).  Any autoincrement
1151      * value for hidden pk is set later in RelayOpWorker.  Fill in
1152      * some dummy value to not leave uninitialized data.
1153      */
1154     const Attr& attr = attrs[attrcnt - 1];
1155     require(attr.m_type == NdbDictionary::Column::Bigunsigned);
1156     uint64 val = Inval_uint64;
1157     attr.set_value(row, &val, 8);
1158   }
1159   if (!line->m_reject)
1160     m_input.m_rows.push_back(row);
1161 }
1162 
1163 /*
1164  * Parse some fields by doing a CS101 "turn string into number".
1165  * Digits must be ascii digits.
1166  * Bengalese numbers are not supported.
1167  */
1168 
1169 struct Ndb_import_csv_error {
1170   enum Error_code {
1171     No_error = 0,
1172     Format_error = 1,
1173     Value_error = 2,    // but DBTUP should be final arbiter
1174     Internal_error = 3
1175   };
1176   static const int error_code_count = Internal_error + 1;
1177   int error_code;
1178   const char* error_text;
1179   int error_line;
1180 };
1181 
1182 static const Ndb_import_csv_error
1183 ndb_import_csv_error[Ndb_import_csv_error::error_code_count] = {
1184   { Ndb_import_csv_error::No_error, "no error", 0 },
1185   { Ndb_import_csv_error::Format_error, "format error", 0 },
1186   { Ndb_import_csv_error::Value_error, "value error", 0 },
1187   { Ndb_import_csv_error::Internal_error, "internal error", 0 }
1188 };
1189 
1190 static void
ndb_import_csv_decimal_error(int err,Ndb_import_csv_error & csv_error)1191 ndb_import_csv_decimal_error(int err,
1192                              Ndb_import_csv_error& csv_error)
1193 {
1194   switch (err) {
1195   case E_DEC_OK:
1196     csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1197     break;
1198   case E_DEC_TRUNCATED:
1199   case E_DEC_OVERFLOW:
1200     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Value_error];
1201     break;
1202   case E_DEC_BAD_NUM:
1203     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1204     break;
1205   case E_DEC_OOM:
1206   case E_DEC_BAD_PREC:
1207   case E_DEC_BAD_SCALE:
1208   default:
1209     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Internal_error];
1210     break;
1211   }
1212 }
1213 
1214 static bool
ndb_import_csv_parse_decimal(const NdbImportCsv::Attr & attr,bool is_unsigned,const char * datac,uint length,uchar * val,uint val_len,Ndb_import_csv_error & csv_error)1215 ndb_import_csv_parse_decimal(const NdbImportCsv::Attr& attr,
1216                              bool is_unsigned,
1217                              const char* datac, uint length,
1218                              uchar* val, uint val_len,
1219                              Ndb_import_csv_error& csv_error)
1220 {
1221 #if 0
1222   // [-+]ddd.ff
1223   "^"
1224   "([-+])*"                                   // 1:sign
1225   "([[:digit:]]*)?"                           // 2:ddd
1226   "(.)?"                                      // 3:.
1227   "([[:digit:]]*)?"                           // 4:ff
1228   "$"
1229 #endif
1230   // sign
1231   const char* p = datac;
1232   const char* q = p;
1233   if (!is_unsigned)
1234     while (*p == '+' || *p == '-')
1235       p++;
1236   else
1237     while (*p == '+')
1238       p++;
1239   q = p;
1240   // decimal_str2bin does not check string end so parse here
1241   uint digits = 0;
1242   while (isdigit(*p))
1243     p++;
1244   digits += p - q;
1245   q = p;
1246   if (*p == '.')
1247   {
1248     q = ++p;
1249     while (isdigit(*p))
1250       p++;
1251     digits += p - q;
1252   }
1253   if (*p != 0)
1254   {
1255     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1256     csv_error.error_line = __LINE__;
1257     return false;
1258   }
1259   if (digits == 0)
1260   {
1261     // single "." is not valid decimal
1262     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1263     csv_error.error_line = __LINE__;
1264     return false;
1265   }
1266   int err;
1267   err = decimal_str2bin(datac, length,
1268                         attr.m_precision, attr.m_scale,
1269                         val, val_len);
1270   if (err != 0)
1271   {
1272     ndb_import_csv_decimal_error(err, csv_error);
1273     csv_error.error_line = __LINE__;
1274     return false;
1275   }
1276   return true;
1277 }
1278 
1279 static bool
ndb_import_csv_parse_year(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Year & s,Ndb_import_csv_error & csv_error)1280 ndb_import_csv_parse_year(const NdbImportCsv::Attr& attr,
1281                           const char* datac,
1282                           NdbSqlUtil::Year& s,
1283                           Ndb_import_csv_error& csv_error)
1284 {
1285 #if 0
1286   // yyyy
1287   "^"
1288   "([[:digit:]]{4}|[[:digit:]]{2})"           // 1:yyyy
1289   "$"
1290 #endif
1291   csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1292   s.year = 0;
1293   const char* p = datac;
1294   const char* q = p;
1295   while (isdigit(*p) && p - q < 4)
1296     s.year = 10 * s.year + (*p++ - '0');
1297   if (p - q == 4)
1298     ;
1299   else if (p - q == 2)
1300   {
1301     if (s.year >= 70)
1302       s.year += 1900;
1303     else
1304       s.year += 2000;
1305   }
1306   else
1307   {
1308     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1309     csv_error.error_line = __LINE__;
1310     return false;
1311   }
1312   return true;
1313 }
1314 
1315 static bool
ndb_import_csv_parse_date(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Date & s,Ndb_import_csv_error & csv_error)1316 ndb_import_csv_parse_date(const NdbImportCsv::Attr& attr,
1317                           const char* datac,
1318                           NdbSqlUtil::Date& s,
1319                           Ndb_import_csv_error& csv_error)
1320 {
1321 #if 0
1322   // yyyy-mm-dd
1323   "^"
1324   "([[:digit:]]{4}|[[:digit:]]{2})"           // 1:yyyy
1325   "("                                         // 2:
1326   "[[:punct:]]+"
1327   "([[:digit:]]{1,2})"                        // 3:mm
1328   "[[:punct:]]+"
1329   "([[:digit:]]{1,2})"                        // 4:dd
1330   "|"
1331   "([[:digit:]]{2})"                          // 5:mm
1332   "([[:digit:]]{2})"                          // 6:dd
1333   ")"
1334   "$"
1335 #endif
1336   csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1337   s.year = s.month = s.day = 0;
1338   const char* p = datac;
1339   const char* q = p;
1340   while (isdigit(*p) && p - q < 4)
1341     s.year = 10 * s.year + (*p++ - '0');
1342   if (p - q == 4)
1343     ;
1344   else if (p - q == 2)
1345   {
1346     if (s.year >= 70)
1347       s.year += 1900;
1348     else
1349       s.year += 2000;
1350   }
1351   else
1352   {
1353     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1354     csv_error.error_line = __LINE__;
1355     return false;
1356   }
1357   q = p;
1358   // separator vs non-separator variant
1359   if (ispunct(*p))
1360   {
1361     // anything goes
1362     while (ispunct(*p))
1363       p++;
1364     q = p;
1365     // month
1366     while (isdigit(*p) && p - q < 2)
1367       s.month = 10 * s.month + (*p++ - '0');
1368     if (p - q > 0)
1369       ;
1370     else
1371     {
1372       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1373       csv_error.error_line = __LINE__;
1374       return false;
1375     }
1376     q = p;
1377     if (ispunct(*p))
1378       ;
1379     else
1380     {
1381       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1382       csv_error.error_line = __LINE__;
1383       return false;
1384     }
1385     // anything goes
1386     while (ispunct(*p))
1387       p++;
1388     q = p;
1389     // day
1390     while (isdigit(*p) && p - q < 2)
1391       s.day = 10 * s.day + (*p++ - '0');
1392     if (p - q > 0)
1393       ;
1394     else
1395     {
1396       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1397       csv_error.error_line = __LINE__;
1398       return false;
1399     }
1400     q = p;
1401   }
1402   else
1403   {
1404     // month
1405     while (isdigit(*p) && p - q < 2)
1406       s.month = 10 * s.month + (*p++ - '0');
1407     if (p - q == 2)
1408       ;
1409     else
1410     {
1411       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1412       csv_error.error_line = __LINE__;
1413       return false;
1414     }
1415     q = p;
1416     // day
1417     while (isdigit(*p) && p - q < 2)
1418       s.day = 10 * s.day + (*p++ - '0');
1419     if (p - q == 2)
1420       ;
1421     else
1422     {
1423       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1424       csv_error.error_line = __LINE__;
1425       return false;
1426     }
1427     q = p;
1428   }
1429   return true;
1430 }
1431 
1432 static bool
ndb_import_csv_parse_time2(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Time2 & s,Ndb_import_csv_error & csv_error)1433 ndb_import_csv_parse_time2(const NdbImportCsv::Attr& attr,
1434                            const char* datac,
1435                            NdbSqlUtil::Time2& s,
1436                            Ndb_import_csv_error& csv_error)
1437 {
1438 #if 0
1439   // dd hh:mm:ss.ffffff
1440   "^"
1441   "(([[:digit:]]+)[[:space:]]+)?"             // 1:dd 2: ***NOTYET***
1442   "("                                         // 3:
1443   "([[:digit:]]{1,2})"                        // 4:hh
1444   "[:]"
1445   "([[:digit:]]{1,2})"                        // 5:mm
1446   "[:]"
1447   "([[:digit:]]{1,2})"                        // 6:ss
1448   "|"
1449   "([[:digit:]]{2})"                          // 7:hh
1450   "([[:digit:]]{2})"                          // 8:mm
1451   "([[:digit:]]{2})"                          // 9:ss
1452   ")"
1453   "(\\.([[:digit:]]*))?"                      // 10: 11:ffffff
1454   "$"
1455 #endif
1456   csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1457   s.sign = 1;
1458   s.interval = 0;
1459   s.hour = s.minute = s.second = 0;
1460   s.fraction = 0;
1461   const char* p = datac;
1462   const char* q = p;
1463   // hour
1464   while (isdigit(*p) && p - q < 2)
1465     s.hour = 10 * s.hour + (*p++ - '0');
1466   if (p - q == 1 || p - q == 2)
1467     ;
1468   else
1469   {
1470     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1471     csv_error.error_line = __LINE__;
1472     return false;
1473   }
1474   q = p;
1475   // separator vs non-separator variant
1476   if (*p == ':')
1477   {
1478     q = ++p;
1479     // minute
1480     while (isdigit(*p))
1481       s.minute = 10 * s.minute + (*p++ - '0');
1482     if (p - q == 1 || p - q == 2)
1483       ;
1484     else
1485     {
1486       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1487       csv_error.error_line = __LINE__;
1488       return false;
1489     }
1490     q = p;
1491     if (*p == ':')
1492       q = ++p;
1493     else
1494     {
1495       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1496       csv_error.error_line = __LINE__;
1497       return false;
1498     }
1499     while (isdigit(*p))
1500       s.second = 10 * s.second + (*p++ - '0');
1501     if (p - q == 1 || p - q == 2)
1502       ;
1503     else
1504     {
1505       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1506       csv_error.error_line = __LINE__;
1507       return false;
1508     }
1509     q = p;
1510   }
1511   else
1512   {
1513     while (isdigit(*p) && p - q < 2)
1514       s.minute = 10 * s.minute + (*p++ - '0');
1515     if (p - q == 2)
1516       ;
1517     else
1518     {
1519       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1520       csv_error.error_line = __LINE__;
1521       return false;
1522     }
1523     q = p;
1524     while (isdigit(*p) && p - q < 2)
1525       s.second = 10 * s.second + (*p++ - '0');
1526     if (p - q == 2)
1527       ;
1528     else
1529     {
1530       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1531       csv_error.error_line = __LINE__;
1532       return false;
1533     }
1534     q = p;
1535   }
1536   // fraction point (optional)
1537   if (*p != 0)
1538   {
1539     if (*p == '.')
1540       p++;
1541     if (p - q == 1)
1542       ;
1543     else
1544     {
1545       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1546       csv_error.error_line = __LINE__;
1547       return false;
1548     }
1549     q = p;
1550     // fraction value (optional)
1551     while (isdigit(*p))
1552       s.fraction = 10 * s.fraction + (*p++ - '0');
1553     if (p - q <= 6)
1554     {
1555       uint n = p - q;
1556       while (n++ < attr.m_precision)
1557         s.fraction *= 10;
1558     }
1559     else
1560     {
1561       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1562       csv_error.error_line = __LINE__;
1563       return false;
1564     }
1565   }
1566   return true;
1567 }
1568 
1569 static bool
ndb_import_csv_parse_datetime2(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Datetime2 & s,Ndb_import_csv_error & csv_error)1570 ndb_import_csv_parse_datetime2(const NdbImportCsv::Attr& attr,
1571                                const char* datac,
1572                                NdbSqlUtil::Datetime2& s,
1573                                Ndb_import_csv_error& csv_error)
1574 {
1575 #if 0
1576   yyyy-mm-dd/hh:mm:ss.ffffff
1577   "^"
1578   "([[:digit:]]{4}|[[:digit:]]{2})"           // 1:yyyy
1579   "[[:punct:]]+"
1580   "([[:digit:]]{1,2})"                        // 2:mm
1581   "[[:punct:]]+"
1582   "([[:digit:]]{1,2})"                        // 3:dd
1583   "(T|[[:space:]]+|[[:punct:]]+)"             // 4:
1584   "([[:digit:]]{1,2})"                        // 5:hh
1585   "[[:punct:]]+"
1586   "([[:digit:]]{1,2})"                        // 6:mm
1587   "[[:punct:]]+"
1588   "([[:digit:]]{1,2})"                        // 7:ss
1589   "(\\.([[:digit:]]*))?"                      // 8: 9:ffffff
1590   "$"
1591 #endif
1592   csv_error = ndb_import_csv_error[Ndb_import_csv_error::No_error];
1593   s.sign = 1;
1594   s.year = s.month = s.day = 0;
1595   s.hour = s.minute = s.second = 0;
1596   s.fraction = 0;
1597   const char* p = datac;
1598   const char* q = p;
1599   // year
1600   while (isdigit(*p))
1601     s.year = 10 * s.year + (*p++ - '0');
1602   if (p - q == 4)
1603     ;
1604   else if (p - q == 2)
1605   {
1606     if (s.year >= 70)
1607       s.year += 1900;
1608     else
1609       s.year += 2000;
1610   }
1611   else
1612   {
1613     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1614     csv_error.error_line = __LINE__;
1615     return false;
1616   }
1617   q = p;
1618   // separator
1619   while (ispunct(*p))
1620     p++;
1621   if (p - q != 0)
1622     ;
1623   else
1624   {
1625     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1626     csv_error.error_line = __LINE__;
1627     return false;
1628   }
1629   q = p;
1630   // month
1631   while (isdigit(*p))
1632     s.month = 10 * s.month + (*p++ - '0');
1633   if (p - q == 1 || p - q == 2)
1634     ;
1635   else
1636   {
1637     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1638     csv_error.error_line = __LINE__;
1639     return false;
1640   }
1641   // separator
1642   while (ispunct(*p))
1643     p++;
1644   if (p - q != 0)
1645     ;
1646   else
1647   {
1648     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1649     csv_error.error_line = __LINE__;
1650     return false;
1651   }
1652   q = p;
1653   // day
1654   while (isdigit(*p))
1655     s.day = 10 * s.day + (*p++ - '0');
1656   if (p - q == 1 || p - q == 2)
1657     ;
1658   else
1659   {
1660     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1661     csv_error.error_line = __LINE__;
1662     return false;
1663   }
1664   q = p;
1665   // separator
1666   if (*p == 'T')
1667     p++;
1668   else if (isspace(*p))
1669   {
1670     while (isspace(*p))
1671       p++;
1672   }
1673   else if (ispunct(*p))
1674   {
1675     while (ispunct(*p))
1676       p++;
1677   }
1678   if (p - q != 0)
1679     ;
1680   else
1681   {
1682     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1683     csv_error.error_line = __LINE__;
1684     return false;
1685   }
1686   q = p;
1687   // hour
1688   while (isdigit(*p))
1689     s.hour = 10 * s.hour + (*p++ - '0');
1690   if (p - q == 1 || p - q == 2)
1691     ;
1692   else
1693   {
1694     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1695     csv_error.error_line = __LINE__;
1696     return false;
1697   }
1698   q = p;
1699   // separator
1700   while (ispunct(*p))
1701     p++;
1702   if (p - q != 0)
1703     ;
1704   else
1705   {
1706     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1707     csv_error.error_line = __LINE__;
1708     return false;
1709   }
1710   q = p;
1711   // minute
1712   while (isdigit(*p))
1713     s.minute = 10 * s.minute + (*p++ - '0');
1714   if (p - q == 1 || p - q == 2)
1715     ;
1716   else
1717   {
1718     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1719     csv_error.error_line = __LINE__;
1720     return false;
1721   }
1722   q = p;
1723   // separator
1724   while (ispunct(*p))
1725     p++;
1726   if (p - q != 0)
1727     ;
1728   else
1729   {
1730     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1731     csv_error.error_line = __LINE__;
1732     return false;
1733   }
1734   q = p;
1735   // second
1736   while (isdigit(*p))
1737     s.second = 10 * s.second + (*p++ - '0');
1738   if (p - q == 1 || p - q == 2)
1739     ;
1740   else
1741   {
1742     csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1743     csv_error.error_line = __LINE__;
1744     return false;
1745   }
1746   q = p;
1747   // fraction point (optional)
1748   if (*p != 0)
1749   {
1750     if (*p == '.')
1751       p++;
1752     if (p - q == 1)
1753       ;
1754     else
1755     {
1756       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1757       csv_error.error_line = __LINE__;
1758       return false;
1759     }
1760     q = p;
1761     // fraction value (optional)
1762     while (isdigit(*p))
1763       s.fraction = 10 * s.fraction + (*p++ - '0');
1764     if (p - q <= 6)
1765     {
1766       uint n = p - q;
1767       while (n++ < attr.m_precision)
1768         s.fraction *= 10;
1769     }
1770     else
1771     {
1772       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1773       csv_error.error_line = __LINE__;
1774       return false;
1775     }
1776     if (*p == 0)
1777       ;
1778     else
1779     {
1780       csv_error = ndb_import_csv_error[Ndb_import_csv_error::Format_error];
1781       csv_error.error_line = __LINE__;
1782       return false;
1783     }
1784   }
1785   //
1786   return true;
1787 }
1788 
1789 static bool
ndb_import_csv_parse_timestamp2(const NdbImportCsv::Attr & attr,const char * datac,NdbSqlUtil::Timestamp2 & s,Ndb_import_csv_error & csv_error)1790 ndb_import_csv_parse_timestamp2(const NdbImportCsv::Attr& attr,
1791                                 const char* datac,
1792                                 NdbSqlUtil::Timestamp2& s,
1793                                 Ndb_import_csv_error& csv_error)
1794 {
1795   // parsed as Datetime2
1796   NdbSqlUtil::Datetime2 s2;
1797   if (!ndb_import_csv_parse_datetime2(attr, datac, s2, csv_error))
1798     return false;
1799   // convert to seconds in localtime
1800   struct tm tm;
1801   tm.tm_year = s2.year - 1900;
1802   tm.tm_mon = s2.month - 1;
1803   tm.tm_mday = s2.day;
1804   tm.tm_hour = s2.hour;
1805   tm.tm_min = s2.minute;
1806   tm.tm_sec = s2.second;
1807   tm.tm_isdst = -1;       // mktime() will determine
1808   s.second = mktime(&tm);
1809   s.fraction = s2.fraction;
1810   return true;
1811 }
1812 
1813 void
eval_field(Row * row,Line * line,Field * field)1814 NdbImportCsv::Eval::eval_field(Row* row, Line* line, Field* field)
1815 {
1816   const Opt& opt = m_util.c_opt;
1817   const CHARSET_INFO* cs = opt.m_charset;
1818   const Table& table = m_input.m_table;
1819   const Attrs& attrs = table.m_attrs;
1820   Buf& buf = m_input.m_buf;
1821   uchar* bufdata = &buf.m_data[buf.m_start];
1822   char* bufdatac = (char*)bufdata;
1823   // internal counts file lines and fields from 0
1824   const uint64 lineno = m_input.m_startlineno + line->m_lineno;
1825   const uint fieldno = field->m_fieldno;
1826   // user wants the counts from 1
1827   const uint64 linenr = 1 + lineno;
1828   const uint fieldnr = 1 + fieldno;
1829   const Attr& attr = attrs[fieldno];
1830   uint pos = field->m_pack_pos;
1831   uint end = field->m_pack_end;
1832   uint length = end - pos;
1833   uchar* data = &bufdata[pos];
1834   char* datac = &bufdatac[pos];
1835   /*
1836    * A field is followed by non-empty separator or terminator.
1837    * We null-terminate the field and restore it at end.
1838    */
1839   uchar saveterm = data[length];
1840   data[length] = 0;
1841   Error error;  // local error
1842   /*
1843    * Lots of repeated code here but it is not worth changing
1844    * before it moves to some datatypes library.
1845    */
1846   switch (attr.m_type) {
1847   case NdbDictionary::Column::Tinyint:
1848     {
1849       int err = 0;
1850       const char* endptr = nullptr;
1851       int val = cs->cset->strntol(
1852                 cs, datac, length, 10, &endptr, &err);
1853       if (err != 0)
1854       {
1855         m_util.set_error_data(
1856           error, __LINE__, err,
1857           "line %" PRIu64 " field %u: eval %s failed",
1858           linenr, fieldnr, attr.m_sqltype);
1859         break;
1860       }
1861       if (uint(endptr - datac) != length)
1862       {
1863         m_util.set_error_data(
1864           error, __LINE__, 0,
1865           "line %" PRIu64 " field %u: eval %s failed: bad format",
1866            linenr, fieldnr, attr.m_sqltype);
1867         break;
1868       }
1869       const int minval = -128;
1870       const int maxval = +127;
1871       if (val < minval || val > maxval)
1872       {
1873         m_util.set_error_data(
1874           error, __LINE__, 0,
1875           "line %" PRIu64 " field %u: eval %s failed: "
1876           "value %d out of range",
1877            linenr, fieldnr, attr.m_sqltype, val);
1878         break;
1879       }
1880       int8 byteval = val;
1881       attr.set_value(row, &byteval, 1);
1882     }
1883     break;
1884   case NdbDictionary::Column::Smallint:
1885     {
1886       int err = 0;
1887       const char* endptr = nullptr;
1888       int val = cs->cset->strntol(
1889                 cs, datac, length, 10, &endptr, &err);
1890       if (err != 0)
1891       {
1892         m_util.set_error_data(
1893           error, __LINE__, err,
1894           "line %" PRIu64 " field %u: eval %s failed",
1895           linenr, fieldnr, attr.m_sqltype);
1896         break;
1897       }
1898       if (uint(endptr - datac) != length)
1899       {
1900         m_util.set_error_data(
1901           error, __LINE__, 0,
1902           "line %" PRIu64 " field %u: eval %s failed: bad format",
1903            linenr, fieldnr, attr.m_sqltype);
1904         break;
1905       }
1906       const int minval = -32768;
1907       const int maxval = +32767;
1908       if (val < minval || val > maxval)
1909       {
1910         m_util.set_error_data(
1911           error, __LINE__, 0,
1912           "line %" PRIu64 " field %u: eval %s failed: "
1913           "value %d out of range",
1914            linenr, fieldnr, attr.m_sqltype, val);
1915         break;
1916       }
1917       int16 shortval = val;
1918       attr.set_value(row, &shortval, 2);
1919     }
1920     break;
1921   case NdbDictionary::Column::Mediumint:
1922     {
1923       int err = 0;
1924       const char* endptr = nullptr;
1925       int val = cs->cset->strntol(
1926                 cs, datac, length, 10, &endptr, &err);
1927       if (err != 0)
1928       {
1929         m_util.set_error_data(
1930           error, __LINE__, err,
1931           "line %" PRIu64 " field %u: eval %s failed",
1932           linenr, fieldnr, attr.m_sqltype);
1933         break;
1934       }
1935       if (uint(endptr - datac) != length)
1936       {
1937         m_util.set_error_data(
1938           error, __LINE__, 0,
1939           "line %" PRIu64 " field %u: eval %s failed: bad format",
1940            linenr, fieldnr, attr.m_sqltype);
1941         break;
1942       }
1943       const int minval = -8388608;
1944       const int maxval = +8388607;
1945       if (val < minval || val > maxval)
1946       {
1947         m_util.set_error_data(
1948           error, __LINE__, 0,
1949           "line %" PRIu64 " field %u: eval %s failed: "
1950           "value %d out of range",
1951            linenr, fieldnr, attr.m_sqltype, val);
1952         break;
1953       }
1954       uchar val3[3];
1955       int3store(val3, (uint)val);
1956       attr.set_value(row, val3, 3);
1957     }
1958     break;
1959   case NdbDictionary::Column::Int:
1960     {
1961       int err = 0;
1962       const char* endptr = nullptr;
1963       int32 val = cs->cset->strntol(
1964                   cs, datac, length, 10, &endptr, &err);
1965       if (err != 0)
1966       {
1967         m_util.set_error_data(
1968           error, __LINE__, err,
1969           "line %" PRIu64 " field %u: eval %s failed",
1970           linenr, fieldnr, attr.m_sqltype);
1971         break;
1972       }
1973       if (uint(endptr - datac) != length)
1974       {
1975         m_util.set_error_data(
1976           error, __LINE__, 0,
1977           "line %" PRIu64 " field %u: eval %s failed: bad format",
1978            linenr, fieldnr, attr.m_sqltype);
1979         break;
1980       }
1981       attr.set_value(row, &val, 4);
1982     }
1983     break;
1984   case NdbDictionary::Column::Bigint:
1985     {
1986       int err = 0;
1987       const char* endptr = nullptr;
1988       int64 val = cs->cset->strntoll(
1989                   cs, datac, length, 10, &endptr, &err);
1990       if (err != 0)
1991       {
1992         m_util.set_error_data(
1993           error, __LINE__, err,
1994           "line %" PRIu64 " field %u: eval %s failed",
1995           linenr, fieldnr, attr.m_sqltype);
1996         break;
1997       }
1998       if (uint(endptr - datac) != length)
1999       {
2000         m_util.set_error_data(
2001           error, __LINE__, 0,
2002          "line %" PRIu64 " field %u: eval %s failed: bad format",
2003          linenr, fieldnr, attr.m_sqltype);
2004         break;
2005       }
2006       attr.set_value(row, &val, 8);
2007     }
2008     break;
2009   case NdbDictionary::Column::Tinyunsigned:
2010     {
2011       int err = 0;
2012       const char* endptr = nullptr;
2013       uint val = cs->cset->strntoul(
2014                  cs, datac, length, 10, &endptr, &err);
2015       if (err != 0)
2016       {
2017         m_util.set_error_data(
2018           error, __LINE__, err,
2019          "line %" PRIu64 " field %u: eval %s failed",
2020          linenr, fieldnr, attr.m_sqltype);
2021         break;
2022       }
2023       if (uint(endptr - datac) != length)
2024       {
2025         m_util.set_error_data(
2026           error, __LINE__, 0,
2027           "line %" PRIu64 " field %u: eval %s failed: bad format",
2028           linenr, fieldnr, attr.m_sqltype);
2029         break;
2030       }
2031       const uint maxval = 255;
2032       if (val > maxval)
2033       {
2034         m_util.set_error_data(
2035           error, __LINE__, 0,
2036           "line %" PRIu64 " field %u: eval %s failed: "
2037           "value %u out of range",
2038            linenr, fieldnr, attr.m_sqltype, val);
2039         break;
2040       }
2041       uint8 byteval = val;
2042       attr.set_value(row, &byteval, 1);
2043     }
2044     break;
2045   case NdbDictionary::Column::Smallunsigned:
2046     {
2047       int err = 0;
2048       const char* endptr = nullptr;
2049       uint val = cs->cset->strntoul(
2050                  cs, datac, length, 10, &endptr, &err);
2051       if (err != 0)
2052       {
2053         m_util.set_error_data(
2054           error, __LINE__, err,
2055          "line %" PRIu64 " field %u: eval %s failed",
2056          linenr, fieldnr, attr.m_sqltype);
2057         break;
2058       }
2059       if (uint(endptr - datac) != length)
2060       {
2061         m_util.set_error_data(
2062           error, __LINE__, 0,
2063           "line %" PRIu64 " field %u: eval %s failed: bad format",
2064           linenr, fieldnr, attr.m_sqltype);
2065         break;
2066       }
2067       const uint maxval = 65535;
2068       if (val > maxval)
2069       {
2070         m_util.set_error_data(
2071           error, __LINE__, 0,
2072           "line %" PRIu64 " field %u: eval %s failed: "
2073           "value %u out of range",
2074            linenr, fieldnr, attr.m_sqltype, val);
2075         break;
2076       }
2077       uint16 shortval = val;
2078       attr.set_value(row, &shortval, 2);
2079     }
2080     break;
2081   case NdbDictionary::Column::Mediumunsigned:
2082     {
2083       int err = 0;
2084       const char* endptr = nullptr;
2085       uint val = cs->cset->strntoul(
2086                  cs, datac, length, 10, &endptr, &err);
2087       if (err != 0)
2088       {
2089         m_util.set_error_data(
2090           error, __LINE__, err,
2091          "line %" PRIu64 " field %u: eval %s failed",
2092          linenr, fieldnr, attr.m_sqltype);
2093         break;
2094       }
2095       if (uint(endptr - datac) != length)
2096       {
2097         m_util.set_error_data(
2098           error, __LINE__, 0,
2099           "line %" PRIu64 " field %u: eval %s failed: bad format",
2100           linenr, fieldnr, attr.m_sqltype);
2101         break;
2102       }
2103       const uint maxval = 16777215;
2104       if (val > maxval)
2105       {
2106         m_util.set_error_data(
2107           error, __LINE__, 0,
2108           "line %" PRIu64 " field %u: eval %s failed: "
2109           "value %u out of range",
2110            linenr, fieldnr, attr.m_sqltype, val);
2111         break;
2112       }
2113       uchar val3[3];
2114       int3store(val3, val);
2115       attr.set_value(row, val3, 3);
2116     }
2117     break;
2118   case NdbDictionary::Column::Unsigned:
2119     {
2120       int err = 0;
2121       const char* endptr = nullptr;
2122       uint32 val = cs->cset->strntoul(
2123                    cs, datac, length, 10, &endptr, &err);
2124       if (err != 0)
2125       {
2126         m_util.set_error_data(
2127           error, __LINE__, err,
2128          "line %" PRIu64 " field %u: eval %s failed",
2129          linenr, fieldnr, attr.m_sqltype);
2130         break;
2131       }
2132       if (uint(endptr - datac) != length)
2133       {
2134         m_util.set_error_data(
2135           error, __LINE__, 0,
2136           "line %" PRIu64 " field %u: eval %s failed: bad format",
2137           linenr, fieldnr, attr.m_sqltype);
2138         break;
2139       }
2140       attr.set_value(row, &val, 4);
2141     }
2142     break;
2143   case NdbDictionary::Column::Bigunsigned:
2144     {
2145       int err = 0;
2146       const char* endptr = nullptr;
2147       uint64 val = cs->cset->strntoull(
2148                    cs, datac, length, 10, &endptr, &err);
2149       if (err != 0)
2150       {
2151         m_util.set_error_data(
2152           error, __LINE__, err,
2153           "line %" PRIu64 " field %u: eval %s failed",
2154           linenr, fieldnr, attr.m_sqltype);
2155         break;
2156       }
2157       if (uint(endptr - datac) != length)
2158       {
2159         m_util.set_error_data(
2160           error, __LINE__, 0,
2161           "line %" PRIu64 " field %u: eval %s failed: bad format",
2162           linenr, fieldnr, attr.m_sqltype);
2163         break;
2164       }
2165       attr.set_value(row, &val, 8);
2166     }
2167     break;
2168   case NdbDictionary::Column::Decimal:
2169     {
2170       uchar val[200];
2171       Ndb_import_csv_error csv_error;
2172       if (!ndb_import_csv_parse_decimal(attr,
2173                                         false,
2174                                         datac, length,
2175                                         val, sizeof(val),
2176                                         csv_error))
2177       {
2178         m_util.set_error_data(
2179           error, __LINE__, 0,
2180           "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2181           linenr, fieldnr, attr.m_sqltype,
2182           csv_error.error_text, csv_error.error_line);
2183         break;
2184       }
2185       attr.set_value(row, val, attr.m_size);
2186     }
2187     break;
2188   case NdbDictionary::Column::Decimalunsigned:
2189     {
2190       uchar val[200];
2191       Ndb_import_csv_error csv_error;
2192       if (!ndb_import_csv_parse_decimal(attr,
2193                                         true,
2194                                         datac, length,
2195                                         val, sizeof(val),
2196                                         csv_error))
2197       {
2198         m_util.set_error_data(
2199           error, __LINE__, 0,
2200           "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2201           linenr, fieldnr, attr.m_sqltype,
2202           csv_error.error_text, csv_error.error_line);
2203         break;
2204       }
2205       attr.set_value(row, val, attr.m_size);
2206     }
2207     break;
2208   /*
2209    * Float and Double.  We use same methods as LOAD DATA but for
2210    * some reason there are occasional infinitesimal diffs on "el6".
2211    * Fix by using ::strtod if charset allows (it does).
2212    */
2213   case NdbDictionary::Column::Float:
2214     {
2215       uint data_length;
2216       double val = 0.0;
2217       bool use_os_strtod =
2218 #ifndef _WIN32
2219         (opt.m_charset == &my_charset_bin);
2220 #else
2221         false;
2222 #endif
2223       if (use_os_strtod)
2224       {
2225         errno = 0;
2226         char* endptr = nullptr;
2227         val = ::strtod(datac, &endptr);
2228         data_length = endptr - datac;
2229         if (errno != 0)
2230         {
2231           m_util.set_error_data(
2232             error, __LINE__, errno,
2233             "line %" PRIu64 " field %u: eval %s failed",
2234             linenr, fieldnr, attr.m_sqltype);
2235           break;
2236         }
2237       }
2238       else
2239       {
2240         int err = 0;
2241         const char* endptr = nullptr;
2242         val = cs->cset->strntod(
2243               cs, datac, length, &endptr, &err);
2244         data_length = endptr - datac;
2245         if (err != 0)
2246         {
2247           m_util.set_error_data(
2248             error, __LINE__, err,
2249             "line %" PRIu64 " field %u: eval %s failed",
2250             linenr, fieldnr, attr.m_sqltype);
2251           break;
2252         }
2253       }
2254       if (data_length != length)
2255       {
2256         m_util.set_error_data(
2257           error, __LINE__, 0,
2258           "line %" PRIu64 " field %u: eval %s failed: bad format",
2259           linenr, fieldnr, attr.m_sqltype);
2260         break;
2261       }
2262       if (std::isnan(val))
2263       {
2264         m_util.set_error_data(
2265           error, __LINE__, 0,
2266           "line %" PRIu64 " field %u: eval %s failed: invalid value",
2267           linenr, fieldnr, attr.m_sqltype);
2268         break;
2269       }
2270       const double max_val = FLT_MAX;
2271       if (val < -max_val || val > max_val)
2272       {
2273         m_util.set_error_data(
2274           error, __LINE__, 0,
2275           "line %" PRIu64 " field %u: eval %s failed: value out of range",
2276           linenr, fieldnr, attr.m_sqltype);
2277         break;
2278       }
2279       float valf = (float)val;
2280       attr.set_value(row, &valf, 4);
2281     }
2282     break;
2283   case NdbDictionary::Column::Double:
2284     {
2285       int err = 0;
2286       uint data_length;
2287       double val = 0.0;
2288       bool use_os_strtod =
2289 #ifndef _WIN32
2290         (opt.m_charset == &my_charset_bin);
2291 #else
2292         false;
2293 #endif
2294       if (use_os_strtod)
2295       {
2296         errno = 0;
2297         char *endptr = nullptr;
2298         val = ::strtod(datac, &endptr);
2299         data_length = endptr - datac;
2300         if (errno != 0)
2301         {
2302           m_util.set_error_data(
2303             error, __LINE__, errno,
2304             "line %" PRIu64 " field %u: eval %s failed",
2305             linenr, fieldnr, attr.m_sqltype);
2306           break;
2307         }
2308       }
2309       else
2310       {
2311         const char* endptr = nullptr;
2312         val = cs->cset->strntod(
2313               cs, datac, length, &endptr, &err);
2314         data_length = endptr - datac;
2315         if (err != 0)
2316         {
2317           m_util.set_error_data(
2318             error, __LINE__, err,
2319             "line %" PRIu64 " field %u: eval %s failed",
2320             linenr, fieldnr, attr.m_sqltype);
2321           break;
2322         }
2323       }
2324       if (data_length != length)
2325       {
2326         m_util.set_error_data(
2327           error, __LINE__, 0,
2328           "line %" PRIu64 " field %u: eval %s failed: bad format",
2329           linenr, fieldnr, attr.m_sqltype);
2330         break;
2331       }
2332       if (std::isnan(val))
2333       {
2334         m_util.set_error_data(
2335           error, __LINE__, 0,
2336           "line %" PRIu64 " field %u: eval %s failed: invalid value",
2337           linenr, fieldnr, attr.m_sqltype);
2338         break;
2339       }
2340       const double max_val = DBL_MAX;
2341       if (val < -max_val || val > max_val)
2342       {
2343         m_util.set_error_data(
2344           error, __LINE__, 0,
2345           "line %" PRIu64 " field %u: eval %s failed: value out of range",
2346           linenr, fieldnr, attr.m_sqltype);
2347         break;
2348       }
2349       attr.set_value(row, &val, 8);
2350     }
2351     break;
2352   case NdbDictionary::Column::Char:
2353     {
2354       const char* val = datac;
2355       if (length > attr.m_length)
2356       {
2357         m_util.set_error_data(
2358           error, __LINE__, 0,
2359           "line %" PRIu64 " field %u: eval %s failed: "
2360           "byte length too long (%u > %u)",
2361           linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2362         break;
2363       }
2364       attr.set_value(row, val, length);
2365     }
2366     break;
2367   case NdbDictionary::Column::Varchar:
2368     {
2369       const char* val = datac;
2370       if (length > attr.m_length)
2371       {
2372         m_util.set_error_data(
2373           error, __LINE__, 0,
2374           "line %" PRIu64 " field %u: eval %s failed: "
2375           "byte length too long (%u > %u)",
2376           linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2377         break;
2378       }
2379       attr.set_value(row, val, length);
2380     }
2381     break;
2382   case NdbDictionary::Column::Longvarchar:
2383     {
2384       const char* val = datac;
2385       if (length > attr.m_length)
2386       {
2387         m_util.set_error_data(
2388           error, __LINE__, 0,
2389           "line %" PRIu64 " field %u: eval %s failed: "
2390           "byte length too long (%u > %u)",
2391           linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2392         break;
2393       }
2394       attr.set_value(row, val, length);
2395     }
2396     break;
2397   case NdbDictionary::Column::Binary:
2398     {
2399       const char* val = datac;
2400       if (length > attr.m_length)
2401       {
2402         m_util.set_error_data(
2403           error, __LINE__, 0,
2404           "line %" PRIu64 " field %u: eval %s failed: "
2405           "length too long (%u > %u)",
2406           linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2407         break;
2408       }
2409       attr.set_value(row, val, length);
2410     }
2411     break;
2412   case NdbDictionary::Column::Varbinary:
2413     {
2414       const char* val = datac;
2415       if (length > attr.m_length)
2416       {
2417         m_util.set_error_data(
2418           error, __LINE__, 0,
2419           "line %" PRIu64 " field %u: eval %s failed: "
2420           "length too long (%u > %u)",
2421           linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2422         break;
2423       }
2424       attr.set_value(row, val, length);
2425     }
2426     break;
2427   case NdbDictionary::Column::Longvarbinary:
2428     {
2429       const char* val = datac;
2430       if (length > attr.m_length)
2431       {
2432         m_util.set_error_data(
2433           error, __LINE__, 0,
2434           "line %" PRIu64 " field %u: eval %s failed: "
2435           "length too long (%u > %u)",
2436           linenr, fieldnr, attr.m_sqltype, length, attr.m_length);
2437         break;
2438       }
2439       attr.set_value(row, val, length);
2440     }
2441     break;
2442   case NdbDictionary::Column::Bit:
2443     {
2444       require(attr.m_length <= 64);
2445       uint bytelength = (attr.m_length + 7) / 8;
2446       require(bytelength <= 8);
2447       uchar val[8];
2448       memset(val, 0, sizeof(val));
2449       uint i = 0;
2450       uint j = Inval_uint;      // highest non-zero byte
2451       while (i < length)
2452       {
2453         uchar b = data[length - 1 - i];
2454         if (b != 0)
2455           j = i;
2456         if (i < bytelength)
2457           val[i] = b;
2458         i++;
2459       }
2460       if (j != Inval_uint)
2461       {
2462         uint k = 8;             // highest bit at j
2463         while (k != 0)
2464         {
2465           k--;
2466           if ((data[length - 1 - j] & (1 << k)) != 0)
2467             break;
2468         }
2469         uint hibit = 8 * (length - 1 - j) + k;
2470         if (hibit >= attr.m_length)
2471         {
2472           m_util.set_error_data(
2473             error, __LINE__, 0,
2474             "line %" PRIu64 " field %u: eval %s failed: "
2475             "highest set bit %u out of range",
2476              linenr, fieldnr, attr.m_sqltype, hibit);
2477           break;
2478         }
2479       }
2480 #if defined(WORDS_BIGENDIAN)
2481       std::swap(val[0], val[3]);
2482       std::swap(val[1], val[2]);
2483       std::swap(val[4], val[7]);
2484       std::swap(val[5], val[6]);
2485 #endif
2486       attr.set_value(row, val, attr.m_size);
2487     }
2488     break;
2489   case NdbDictionary::Column::Year:
2490     {
2491       NdbSqlUtil::Year s;
2492       Ndb_import_csv_error csv_error;
2493       if (!ndb_import_csv_parse_year(attr,
2494                                      datac,
2495                                      s,
2496                                      csv_error))
2497       {
2498         m_util.set_error_data(
2499           error, __LINE__, 0,
2500           "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2501           linenr, fieldnr, attr.m_sqltype,
2502           csv_error.error_text, csv_error.error_line);
2503         break;
2504       }
2505       uchar val[1];
2506       NdbSqlUtil::pack_year(s, val);
2507       attr.set_value(row, val, 1);
2508     }
2509     break;
2510   case NdbDictionary::Column::Date:
2511     {
2512       NdbSqlUtil::Date s;
2513       Ndb_import_csv_error csv_error;
2514       if (!ndb_import_csv_parse_date(attr,
2515                                      datac,
2516                                      s,
2517                                      csv_error))
2518       {
2519         m_util.set_error_data(
2520           error, __LINE__, 0,
2521           "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2522           linenr, fieldnr, attr.m_sqltype,
2523           csv_error.error_text, csv_error.error_line);
2524         break;
2525       }
2526       uchar val[3];
2527       NdbSqlUtil::pack_date(s, val);
2528       attr.set_value(row, val, 3);
2529     }
2530     break;
2531   case NdbDictionary::Column::Time2:
2532     {
2533       NdbSqlUtil::Time2 s;
2534       Ndb_import_csv_error csv_error;
2535       if (!ndb_import_csv_parse_time2(attr,
2536                                       datac,
2537                                       s,
2538                                       csv_error))
2539       {
2540         m_util.set_error_data(
2541           error, __LINE__, 0,
2542           "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2543           linenr, fieldnr, attr.m_sqltype,
2544           csv_error.error_text, csv_error.error_line);
2545         break;
2546       }
2547       uint prec = attr.m_precision;
2548       require(prec <= 6);
2549       uint flen = (1 + prec) / 2;
2550       uint len = 3 + flen;
2551       require(len <= 6);
2552       uchar val[6];
2553       NdbSqlUtil::pack_time2(s, val, prec);
2554       attr.set_value(row, val, len);
2555     }
2556     break;
2557   case NdbDictionary::Column::Datetime2:
2558     {
2559       NdbSqlUtil::Datetime2 s;
2560       Ndb_import_csv_error csv_error;
2561       if (!ndb_import_csv_parse_datetime2(attr,
2562                                           datac,
2563                                           s,
2564                                           csv_error))
2565       {
2566         m_util.set_error_data(
2567           error, __LINE__, 0,
2568           "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2569           linenr, fieldnr, attr.m_sqltype,
2570           csv_error.error_text, csv_error.error_line);
2571         break;
2572       }
2573       uint prec = attr.m_precision;
2574       require(prec <= 6);
2575       uint flen = (1 + prec) / 2;
2576       uint len = 5 + flen;
2577       require(len <= 8);
2578       uchar val[8];
2579       NdbSqlUtil::pack_datetime2(s, val, prec);
2580       attr.set_value(row, val, len);
2581     }
2582     break;
2583   case NdbDictionary::Column::Timestamp2:
2584     {
2585       NdbSqlUtil::Timestamp2 s;
2586       Ndb_import_csv_error csv_error;
2587       if (!ndb_import_csv_parse_timestamp2(attr,
2588                                           datac,
2589                                           s,
2590                                           csv_error))
2591       {
2592         m_util.set_error_data(
2593           error, __LINE__, 0,
2594           "line %" PRIu64 " field %u: eval %s failed: %s at %d",
2595           linenr, fieldnr, attr.m_sqltype,
2596           csv_error.error_text, csv_error.error_line);
2597         break;
2598       }
2599       uint prec = attr.m_precision;
2600       require(prec <= 6);
2601       uint flen = (1 + prec) / 2;
2602       uint len = 4 + flen;
2603       require(len <= 7);
2604       uchar val[7];
2605       NdbSqlUtil::pack_timestamp2(s, val, prec);
2606       attr.set_value(row, val, len);
2607     }
2608     break;
2609   case NdbDictionary::Column::Blob:
2610   case NdbDictionary::Column::Text:
2611     {
2612       const char* val = datac;
2613       attr.set_blob(row, val, length);
2614     }
2615     break;
2616   default:
2617     require(false);
2618     break;
2619   }
2620   data[length] = saveterm;
2621   if (m_util.has_error(error))
2622   {
2623     m_input.reject_line(line, field, error);
2624     line->m_reject = true;
2625   }
2626 }
2627 
2628 void
eval_null(Row * row,Line * line,Field * field)2629 NdbImportCsv::Eval::eval_null(Row* row, Line* line, Field* field)
2630 {
2631   const Table& table = m_input.m_table;
2632   const Attrs& attrs = table.m_attrs;
2633   // internal counts file lines and fields from 0
2634   const uint64 lineno = m_input.m_startlineno + line->m_lineno;
2635   const uint fieldno = field->m_fieldno;
2636   // user wants the counts from 1
2637   const uint64 linenr = 1 + lineno;
2638   const uint fieldnr = 1 + fieldno;
2639   const Attr& attr = attrs[fieldno];
2640   Error error;  // local error
2641   do
2642   {
2643     if (!attr.m_nullable)
2644     {
2645       m_util.set_error_data(
2646         error, __LINE__, 0,
2647         "line %" PRIu64 " field %u: setting non-nullable attr to NULL",
2648         linenr, fieldnr);
2649       break;
2650     }
2651   } while (0);
2652   if (m_util.has_error(error))
2653   {
2654     m_input.reject_line(line, field, error);
2655     line->m_reject = true;
2656   }
2657   attr.set_null(row, true);
2658 }
2659 
2660 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Eval & eval)2661 operator<<(NdbOut& out, const NdbImportCsv::Eval& eval)
2662 {
2663   out << "eval ";
2664   return out;
2665 }
2666 
2667 // output
2668 
Output(NdbImportCsv & csv,const Spec & spec,const Table & table,Buf & buf)2669 NdbImportCsv::Output::Output(NdbImportCsv& csv,
2670                              const Spec& spec,
2671                              const Table& table,
2672                              Buf& buf) :
2673   m_csv(csv),
2674   m_util(m_csv.m_util),
2675   m_spec(spec),
2676   m_table(table),
2677   m_buf(buf)
2678 {
2679   for (uint u = 0; u < g_bytecnt; u++)
2680     m_escapes[u] = 0;
2681 }
2682 
2683 void
do_init()2684 NdbImportCsv::Output::do_init()
2685 {
2686   log_debug(1, "do_init");
2687   const Spec& spec = m_spec;
2688   for (uint u = 0; u < g_bytecnt; u++)
2689     m_escapes[u] = 0;
2690   if (spec.m_fields_escaped_by != 0)    // should be
2691   {
2692     m_escapes[0] = '0';
2693     m_escapes[010] = 'b';
2694     m_escapes[012] = 'n';
2695     m_escapes[015] = 'r';
2696     m_escapes[011] = 't';
2697     m_escapes[032] = 'Z';
2698     if (spec.m_fields_enclosed_by != 0)
2699     {
2700       uchar quote = spec.m_fields_enclosed_by[0];
2701       m_escapes[quote] = quote;
2702     }
2703     uchar esc = spec.m_fields_escaped_by[0];
2704     m_escapes[esc] = esc;
2705   }
2706 }
2707 
2708 void
add_header()2709 NdbImportCsv::Output::add_header()
2710 {
2711   const Table& table = m_table;
2712   const Attrs& attrs = table.m_attrs;
2713   const uint attrcnt = attrs.size();
2714   for (uint i = 0; i < attrcnt; i++)
2715   {
2716     const Attr& attr = attrs[i];
2717     if (i > 0)
2718     {
2719       add_fieldsep();
2720     }
2721     uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2722     char* bufptrc = (char*)bufptr;
2723     strcpy(bufptrc, attr.m_attrname.c_str());
2724     m_buf.m_len += strlen(bufptrc);
2725   }
2726   add_lineend();
2727 }
2728 
2729 void
add_line(const Row * row)2730 NdbImportCsv::Output::add_line(const Row* row)
2731 {
2732   const Spec& spec = m_spec;
2733   const Table& table = m_table;
2734   const Attrs& attrs = table.m_attrs;
2735   const uint attrcnt = attrs.size();
2736   for (uint i = 0; i < attrcnt; i++)
2737   {
2738     const Attr& attr = attrs[i];
2739     if (i > 0)
2740     {
2741       add_fieldsep();
2742     }
2743     if (attr.m_quotable)
2744     {
2745       add_quote();
2746     }
2747     add_field(attr, row);
2748     if (attr.m_quotable && spec.m_fields_enclosed_by != 0)
2749     {
2750       add_quote();
2751     }
2752   }
2753   add_lineend();
2754 }
2755 
2756 void
add_field(const Attr & attr,const Row * row)2757 NdbImportCsv::Output::add_field(const Attr& attr, const Row* row)
2758 {
2759   uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2760   char* bufptrc = (char*)bufptr;
2761   const uchar* rowptr = &row->m_data[attr.m_offset];
2762   switch (attr.m_type) {
2763   case NdbDictionary::Column::Int:
2764     {
2765       int32 val;
2766       require(attr.m_size == sizeof(val));
2767       memcpy(&val, rowptr, sizeof(val));
2768       sprintf(bufptrc, "%d", val);
2769       break;
2770     }
2771     break;
2772   case NdbDictionary::Column::Unsigned:
2773     {
2774       uint32 val;
2775       require(attr.m_size == sizeof(val));
2776       memcpy(&val, rowptr, sizeof(val));
2777       sprintf(bufptrc, "%u", val);
2778       break;
2779     }
2780     break;
2781   case NdbDictionary::Column::Bigint:
2782     {
2783       int64 val;
2784       require(attr.m_size == sizeof(val));
2785       memcpy(&val, rowptr, sizeof(val));
2786       sprintf(bufptrc, "%" PRId64, val);
2787       break;
2788     }
2789     break;
2790   case NdbDictionary::Column::Bigunsigned:
2791     {
2792       uint64 val;
2793       require(attr.m_size == sizeof(val));
2794       memcpy(&val, rowptr, sizeof(val));
2795       sprintf(bufptrc, "%" PRIu64, val);
2796       break;
2797     }
2798     break;
2799   case NdbDictionary::Column::Double:
2800     {
2801       double val;
2802       require(attr.m_size == sizeof(val));
2803       memcpy(&val, rowptr, sizeof(val));
2804       sprintf(bufptrc, "%.02f", val);
2805       break;
2806     }
2807     break;
2808   case NdbDictionary::Column::Varchar:
2809     {
2810       uint len = rowptr[0];
2811       add_char(&rowptr[1], len);
2812       break;
2813     }
2814     break;
2815   case NdbDictionary::Column::Longvarchar:
2816     {
2817       uint len = rowptr[0] + (rowptr[1] << 8);
2818       add_char(&rowptr[2], len);
2819       break;
2820     }
2821     break;
2822   case NdbDictionary::Column::Text:
2823     {
2824       require(attr.m_isblob);
2825       const Blob* blob = row->m_blobs[attr.m_blobno];
2826       add_char(blob->m_data, blob->m_blobsize);
2827       break;
2828     }
2829   default:
2830     require(false);
2831     break;
2832   }
2833   m_buf.m_len += strlen(bufptrc);
2834 }
2835 
2836 void
add_char(const uchar * rowdata,uint len)2837 NdbImportCsv::Output::add_char(const uchar* rowdata, uint len)
2838 {
2839   log_debug_3("add_char " << len << " " << (char*)rowdata);
2840   const Spec& spec = m_spec;
2841   require(spec.m_fields_escaped_by != 0);
2842   uchar esc = spec.m_fields_escaped_by[0];
2843   uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2844   uchar* p = bufptr;
2845   for (uint i = 0; i < len; i++)
2846   {
2847     uchar c = rowdata[i];
2848     if (m_escapes[c])
2849     {
2850       *p++ = esc;
2851       *p++ = m_escapes[c];
2852     }
2853     else
2854       *p++ = c;
2855   }
2856   *p = 0;
2857 }
2858 
2859 void
add_quote()2860 NdbImportCsv::Output::add_quote()
2861 {
2862   const Spec& spec = m_spec;
2863   if (spec.m_fields_enclosed_by != 0)
2864   {
2865     uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2866     char* bufptrc = (char*)bufptr;
2867     strcpy(bufptrc, (const char*)spec.m_fields_enclosed_by);
2868     m_buf.m_len += strlen(bufptrc);
2869   }
2870 }
2871 
2872 void
add_fieldsep()2873 NdbImportCsv::Output::add_fieldsep()
2874 {
2875   const Spec& spec = m_spec;
2876   uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2877   char* bufptrc = (char*)bufptr;
2878   strcpy(bufptrc, (const char*)spec.m_fields_terminated_by);
2879   m_buf.m_len += strlen(bufptrc);
2880 }
2881 
2882 void
add_lineend()2883 NdbImportCsv::Output::add_lineend()
2884 {
2885   const Spec& spec = m_spec;
2886   uchar* bufptr = &m_buf.m_data[m_buf.m_start + m_buf.m_len];
2887   char* bufptrc = (char*)bufptr;
2888   strcpy(bufptrc, (const char*)spec.m_lines_terminated_by);
2889   m_buf.m_len += strlen(bufptrc);
2890 }
2891 
2892 NdbOut&
operator <<(NdbOut & out,const NdbImportCsv::Output & output)2893 operator<<(NdbOut& out, const NdbImportCsv::Output& output)
2894 {
2895   out << "output";
2896   out << " len=" << output.m_buf.m_len << " ";
2897   return out;
2898 }
2899 
2900 // unittest
2901 
2902 #ifdef TEST_NDBIMPORTCSV
2903 
2904 #include <NdbTap.hpp>
2905 
2906 typedef NdbImport::OptCsv OptCsv;
2907 typedef NdbImportUtil::Name UtilName;
2908 typedef NdbImportUtil::Buf UtilBuf;
2909 typedef NdbImportUtil::File UtilFile;
2910 typedef NdbImportUtil::Attr UtilAttr;
2911 typedef NdbImportUtil::Attrs UtilAttrs;
2912 typedef NdbImportUtil::Table UtilTable;
2913 typedef NdbImportUtil::RowList UtilRowList;
2914 typedef NdbImportUtil::RowMap UtilRowMap;
2915 typedef NdbImportUtil::Stats UtilStats;
2916 typedef NdbImportCsv::Spec CsvSpec;
2917 typedef NdbImportCsv::Input CsvInput;
2918 typedef NdbImportCsv::Line CsvLine;
2919 typedef NdbImportCsv::Field CsvField;
2920 
2921 static void
makeoptcsv(OptCsv & optcsv)2922 makeoptcsv(OptCsv& optcsv)
2923 {
2924   optcsv.m_fields_terminated_by = ",";
2925   optcsv.m_fields_enclosed_by = "\"";
2926   optcsv.m_fields_optionally_enclosed_by = "\"";
2927   optcsv.m_fields_escaped_by = "\\\\";
2928   optcsv.m_lines_terminated_by = "\\n";
2929 }
2930 
2931 // table (a int unsigned primary key, b varchar(10) not null)
2932 
2933 static void
maketable(UtilTable & table)2934 maketable(UtilTable& table)
2935 {
2936   table.add_pseudo_attr("a", NdbDictionary::Column::Unsigned);
2937   table.add_pseudo_attr("b", NdbDictionary::Column::Varchar, 10);
2938 }
2939 
2940 struct MyRes {
2941   uint fieldcnt;
2942   const char* field[20];        // fields, 0 for NULL
MyResMyRes2943   MyRes(uint cnt, ...) {
2944     va_list ap;
2945     va_start(ap, cnt);
2946     fieldcnt = cnt;
2947     for (uint n = 0; n < cnt; n++) {
2948       const char* f = va_arg(ap, const char*);
2949       field[n] = f;
2950     }
2951   }
2952 };
2953 
2954 struct MyCsv {
2955   uint error;   // 0-ok 1-error
2956   uint linecnt; // valid lines
2957   uint partial; // bytes in last partial line
2958   const char* buf;
2959   MyRes res;
2960 };
2961 
2962 static MyCsv mycsvlist[] = {
2963   { 0, 0, 0, "",
2964     MyRes(0) },
2965   { 0, 1, 0, "123,abc\n",
2966     MyRes(2, "123", "abc") },
2967   { 0, 2, 0, "123,abc\n456,def\n",
2968     MyRes(4, "123", "abc", "456", "def") },
2969   { 0, 1, 7, "123,abc\n456,def",
2970     MyRes(2, "123", "abc") },
2971   { 0, 2, 0, "123,\"abc\"\n456,def\n",
2972     MyRes(4, "123", "abc", "456", "def") },
2973   { 0, 2, 0, "123,\"a\"\"c\"\n456,def\n",
2974     MyRes(4, "123", "a\"c", "456", "def") },
2975   { 0, 1, 0, "123,\"a,c\"\n",
2976     MyRes(2, "123", "a,c") },
2977   { 0, 1, 0, "123,\\N\n",
2978     MyRes(2, "123", 0) },
2979   { 0, 1, 0, "123,\"\\N\"\n",
2980     MyRes(2, "123", 0) },
2981   { 0, 1, 0, "123,\\N\\N\n",
2982     MyRes(2, "123", "NN") },
2983   { 0, 1, 0, "123,\\0\\b\\n\\r\\t\\Z\\N\n",
2984     MyRes(2, "123", "\000\010\012\015\011\032N") },
2985 };
2986 
2987 static const uint mycsvcnt = sizeof(mycsvlist)/sizeof(mycsvlist[0]);
2988 
2989 static int
testinput1()2990 testinput1()
2991 {
2992   NdbImportUtil util;
2993   NdbOut& out = *util.c_log.out;
2994   util.c_opt.m_log_level = 4;
2995   out << "testinput1" << endl;
2996   NdbImportCsv csv(util);
2997   OptCsv optcsv;
2998   makeoptcsv(optcsv);
2999   CsvSpec csvspec;
3000   require(csv.set_spec(csvspec, optcsv, OptCsv::ModeInput) == 0);
3001   UtilTable table;
3002   maketable(table);
3003   UtilStats stats(util);
3004   for (uint i = 0; i < mycsvcnt; i++)
3005   {
3006     out << "case " << i << endl;
3007     const MyCsv& mycsv = mycsvlist[i];
3008     UtilBuf buf;
3009     buf.alloc(1024, 1);
3010     buf.copy((const uchar*)mycsv.buf, strlen(mycsv.buf));
3011     const uchar* bufdata = &buf.m_data[buf.m_start];
3012     const char* bufdatac = (const char*)bufdata;
3013     uint n = strlen(bufdatac);
3014     if (n != 0 && bufdatac[n-1] == '\n')
3015       out << bufdatac;
3016     else
3017       out << bufdatac << "\\c" << endl;
3018     UtilRowList rows_out;
3019     UtilRowList rows_reject;
3020     UtilRowMap rowmap_in(util);
3021     CsvInput input(csv,
3022                    "csvinput",
3023                    csvspec,
3024                    table,
3025                    buf,
3026                    rows_out,
3027                    rows_reject,
3028                    rowmap_in,
3029                    stats);
3030     input.do_init();
3031     input.do_parse();
3032     if (!input.has_error())
3033     {
3034       require(mycsv.error == 0);
3035     }
3036     else
3037     {
3038       out << util.c_error << endl;
3039       require(mycsv.error == 1);
3040     }
3041     require(input.m_line_list.cnt() == mycsv.linecnt);
3042     const MyRes& myres = mycsv.res;
3043     uint fieldcnt = 0;
3044     CsvLine* line = input.m_line_list.front();
3045     while (line != 0)
3046     {
3047       CsvField* field = line->m_field_list.front();
3048       while (field != 0)
3049       {
3050         require(fieldcnt < myres.fieldcnt);
3051         const char* myfield = myres.field[fieldcnt];
3052         if (field->m_null)
3053         {
3054           require(myfield == 0);
3055         }
3056         else
3057         {
3058           require(myfield != 0);
3059           uint pos = field->m_pack_pos;
3060           uint end = field->m_pack_end;
3061           uint len = end - pos;
3062           require(memcmp(&bufdata[pos], myfield, len) == 0);
3063         }
3064         fieldcnt++;
3065         field = field->next();
3066       }
3067       line = line->next();
3068     }
3069     require(fieldcnt == myres.fieldcnt);
3070     require(buf.m_tail <= buf.m_len);
3071     require(buf.m_len - buf.m_tail == mycsv.partial);
3072     input.free_line_list(input.m_line_list);
3073     require(input.balanced());
3074   }
3075   return 0;
3076 }
3077 
3078 static int
testinput2()3079 testinput2()
3080 {
3081   NdbImportUtil util;
3082   NdbOut& out = *util.c_log.out;
3083   util.c_opt.m_log_level = 2;
3084   util.c_opt.m_abort_on_error = 1;
3085   out << "testinput2" << endl;
3086   const char* path = "test.csv";
3087   struct stat st;
3088   if (stat(path, &st) == -1)
3089   {
3090     out << path << ": skip on errno " << errno << endl;
3091     return 0;
3092   }
3093   NdbImportCsv csv(util);
3094   OptCsv optcsv;
3095   makeoptcsv(optcsv);
3096   CsvSpec csvspec;
3097   require(csv.set_spec(csvspec, optcsv, OptCsv::ModeInput) == 0);
3098   UtilTable table;
3099   maketable(table);
3100   UtilBuf* buf[2];
3101   buf[0] = new UtilBuf(true);
3102   buf[1] = new UtilBuf(true);
3103   buf[0]->alloc(4096, 4);
3104   buf[1]->alloc(4096, 4);
3105   UtilRowList rows_out;
3106   UtilRowList rows_reject;
3107   UtilRowMap rowmap_in(util);
3108   UtilStats stats(util);
3109   CsvInput* input[2];
3110   input[0] = new CsvInput(csv, "csvinput-0", csvspec, table, *buf[0],
3111                           rows_out, rows_reject, rowmap_in, stats);
3112   input[1] = new CsvInput(csv, "csvinput-1", csvspec, table, *buf[1],
3113                           rows_out, rows_reject, rowmap_in, stats);
3114   input[0]->do_init();
3115   input[1]->do_init();
3116   UtilFile file(util, util.c_error);
3117   out << "read " << path << endl;
3118   file.set_path(path);
3119   require(file.do_open(UtilFile::Read_flags) == 0);
3120   uint totlen = 0;
3121   uint totread = 0;
3122   uint totlines = 0;
3123   uint i = 0;
3124   while (1)
3125   {
3126     uint j = 1 - i;
3127     CsvInput& input1 = *input[i];
3128     UtilBuf& b1 = *buf[i];
3129     UtilBuf& b2 = *buf[j];
3130     b1.reset();
3131     int ret = file.do_read(b1);
3132     require(ret == 0);
3133     totlen += b1.m_len;
3134     if (totread != 0)
3135     {
3136       out << "movetail" << " src=" << b2 << " dst=" << b1 << endl;
3137       require(b2.movetail(b1) == 0);
3138     }
3139     input1.do_parse();
3140     totread++;
3141     totlines += input1.m_line_list.cnt();
3142     input1.free_line_list(input1.m_line_list);
3143     if (b1.m_eof)
3144       break;
3145     i = j;
3146   }
3147   require(totlen == st.st_size);
3148   out << "len=" << totlen << " reads=" << totread
3149       << " lines=" << totlines << endl;
3150   require(file.do_close() == 0);
3151   return 0;
3152 }
3153 
3154 static int
testmain()3155 testmain()
3156 {
3157   ndb_init();
3158 #ifdef VM_TRACE
3159   signal(SIGABRT, SIG_DFL);
3160   signal(SIGSEGV, SIG_DFL);
3161 #endif
3162   if (testinput1() != 0)
3163     return -1;
3164   if (testinput2() != 0)
3165     return -1;
3166   return 0;
3167 }
3168 
TAPTEST(NdbImportCsv)3169 TAPTEST(NdbImportCsv)
3170 {
3171   int ret = testmain();
3172   return (ret == 0);
3173 }
3174 
3175 #endif
3176