1 /*
2    Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef NDB_IMPORT_HPP
26 #define NDB_IMPORT_HPP
27 
28 // STL
29 #include <map>
30 
31 struct CHARSET_INFO;
32 class NdbOut;
33 class Ndb_cluster_connection;
34 
35 class NdbImport {
36 public:
37   NdbImport();
38   ~NdbImport();
39 
40   // csv spec
41 
42   struct OptCsv {
43     OptCsv();
44     enum Mode {
45       ModeInput = 1,
46       ModeOutput = 2
47     };
48     const char* m_fields_terminated_by;
49     const char* m_fields_enclosed_by;
50     const char* m_fields_optionally_enclosed_by;
51     const char* m_fields_escaped_by;
52     const char* m_lines_terminated_by;
53   };
54 
55   // opt
56 
57   struct Opt {
58     Opt();
59     uint m_connections;
60     const char* m_database;
61     const char* m_state_dir;
62     bool m_keep_state;
63     bool m_stats;
64     const char* m_table;
65     const char* m_input_type;
66     const char* m_input_file;
67     uint m_input_workers;
68     const char* m_output_type;
69     uint m_output_workers;
70     uint m_db_workers;
71     uint m_ignore_lines;
72     uint m_max_rows;
73     const char* m_result_file;
74     const char* m_reject_file;
75     const char* m_rowmap_file;
76     const char* m_stopt_file;
77     const char* m_stats_file;
78     bool m_continue;
79     bool m_resume;
80     uint m_monitor;
81     uint m_ai_prefetch_sz;
82     uint m_ai_increment;
83     uint m_ai_offset;
84     bool m_no_asynch;
85     bool m_no_hint;
86     uint m_pagesize;
87     uint m_pagecnt;
88     uint m_pagebuffer;
89     uint m_rowbatch;
90     uint m_rowbytes;
91     uint m_opbatch;
92     uint m_opbytes;
93     uint m_polltimeout;
94     uint m_temperrors;
95     uint m_tempdelay;
96     uint m_rowswait;
97     uint m_idlespin;
98     uint m_idlesleep;
99     uint m_checkloop;
100     uint m_alloc_chunk;
101     uint m_rejects;
102     // character set of input file (currently fixed as binary)
103     const char* m_charset_name;
104     const CHARSET_INFO* m_charset;
105     // csv options
106     OptCsv m_optcsv;
107     const char* m_csvopt;
108     // debug options
109     uint m_log_level;
110     bool m_abort_on_error;
111     const char* m_errins_type;
112     uint m_errins_delay;
113   };
114   // set options for next job
115   int set_opt(Opt& opt);
116 
117   // connect
118 
119   int do_connect();
120   void do_disconnect();
121 
122   // table
123 
124   // tables are shared and can also be added outside job context
125   int add_table(const char* database, const char* table, uint& tabid);
126   int remove_table(uint table_id);
127 
128   // job
129 
130   struct Job;
131   struct Team;
132   struct Error;
133 
134   struct JobStatus {
135     enum Status {
136       Status_null = 0,
137       Status_created,
138       Status_starting,
139       Status_running,
140       Status_success,
141       Status_error,
142       Status_fatal
143     };
144   };
145 
146   // a selection of stats (full details are in stats file t1.stt)
147   struct JobStats {
148     JobStats();
149     // from all resumed runs
150     uint64 m_rows;
151     uint64 m_reject;
152     uint64 m_runtime;
153     uint64 m_rowssec;
154     // from latest run
155     uint64 m_new_rows;
156     uint64 m_new_reject;
157     uint m_temperrors;  // sum of values from m_errormap
158     std::map<uint, uint> m_errormap;
159   };
160 
161   struct Job {
162     Job(NdbImport& imp);
163     ~Job();
164     int do_create();
165     int do_start();
166     int do_stop();      // ask to stop before ready
167     int do_wait();
168     void do_destroy();
169     int add_table(const char* database, const char* table, uint& tabid);
170     void set_table(uint tabid);
171     int remove_table(uint table_id);
172     bool has_error() const;
173     const Error& get_error() const;
174     NdbImport& m_imp;
175     uint m_jobno;
176     uint m_runno;       // run number i.e. resume count
177     JobStatus::Status m_status;
178     const char* m_str_status;
179     JobStats m_stats;
180     uint m_teamcnt;
181     Team** m_teams;
182     // update status of job and all teams
183     void get_status();
184   };
185 
186   struct TeamStatus {
187     enum Status {
188       Status_null = 0
189     };
190   };
191 
192   struct Team {
193     Team(const Job& job, uint teamno);
194     const char* get_name();
195     bool has_error() const;
196     const Error& get_error() const;
197     const Job& m_job;
198     const uint m_teamno;
199     // snapshot or final status
200     enum Status {
201       Status_null = 0
202     };
203     TeamStatus::Status m_status;
204     const char* m_str_status;
205   };
206 
207   static const char* g_str_status(JobStatus::Status status);
208   static const char* g_str_status(TeamStatus::Status status);
209 
210   // error
211 
212   struct Error {
213     enum Type {
214       Type_noerror = 0,
215       Type_gen = 1,
216       Type_usage = 2,
217       Type_alloc = 3,
218       Type_mgm = 4,
219       Type_con = 5,
220       Type_ndb = 6,
221       Type_os = 7,
222       Type_data = 8
223     };
224     Error();
225     const char* gettypetext() const;
226     Type type;
227     int code;
228     int line;
229     char text[1024];
230   };
231 
232   bool has_error() const;
233   const Error& get_error() const;
234   friend class NdbOut& operator<<(NdbOut&, const Error&);
235 
236   // stop all jobs (crude way to handle signals)
237   static void set_stop_all();
238 
239 private:
240   friend class NdbImportImpl;
241   NdbImport(class NdbImportImpl& impl);
242   class NdbImportImpl& m_impl;
243 };
244 
245 #endif
246