1 /*
2     This program is free software: you can redistribute it and/or modify
3     it under the terms of the GNU General Public License as published by
4     the Free Software Foundation, either version 3 of the License, or
5     (at your option) any later version.
6 
7     This program is distributed in the hope that it will be useful,
8     but WITHOUT ANY WARRANTY; without even the implied warranty of
9     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10     GNU General Public License for more details.
11 
12     You should have received a copy of the GNU General Public License
13     along with this program.  If not, see <http://www.gnu.org/licenses/>.
14 
15         Authors:    Domas Mituzas, Facebook ( domas at fb dot com )
16                     Mark Leith, Oracle Corporation (mark dot leith at oracle dot com)
17                     Andrew Hutchings, SkySQL (andrew at skysql dot com)
18                     Max Bubenick, Percona RDBA (max dot bubenick at percona dot com)
19                     David Ducos, Percona (david dot ducos at percona dot com)
20 */
21 
22 #define _LARGEFILE64_SOURCE
23 #define _FILE_OFFSET_BITS 64
24 
25 #include <mysql.h>
26 
27 #if defined MARIADB_CLIENT_VERSION_STR && !defined MYSQL_SERVER_VERSION
28 #define MYSQL_SERVER_VERSION MARIADB_CLIENT_VERSION_STR
29 #endif
30 
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <glib.h>
35 #include <stdlib.h>
36 #include <stdarg.h>
37 #include <errno.h>
38 #include <time.h>
39 #ifdef ZWRAP_USE_ZSTD
40 #include "zstd/zstd_zlibwrapper.h"
41 #else
42 #include <zlib.h>
43 #endif
44 #include <pcre.h>
45 #include <signal.h>
46 #include <glib/gstdio.h>
47 #include <glib/gerror.h>
48 #include <gio/gio.h>
49 #include "config.h"
50 #include "mydumper.h"
51 #include "server_detect.h"
52 #include "connection.h"
53 #include "common.h"
54 #include "g_unix_signal.h"
55 #include <math.h>
56 #include "getPassword.h"
57 #include "logging.h"
58 #include "set_verbose.h"
59 # include "locale.h"
60 
61 char *regexstring = NULL;
62 
63 const char DIRECTORY[] = "export";
64 
65 /* Some earlier versions of MySQL do not yet define MYSQL_TYPE_JSON */
66 #ifndef MYSQL_TYPE_JSON
67 #define MYSQL_TYPE_JSON 245
68 #endif
69 
70 static GMutex *init_mutex = NULL;
71 static GMutex *ref_table_mutex = NULL;
72 /* Program options */
73 gchar *output_directory = NULL;
74 gchar *output_directory_param = NULL;
75 gchar *dump_directory = NULL;
76 guint statement_size = 1000000;
77 guint rows_per_file = 0;
78 guint chunk_filesize = 0;
79 int longquery = 60;
80 int longquery_retries = 0;
81 int longquery_retry_interval = 60;
82 int build_empty_files = 0;
83 int skip_tz = 0;
84 int need_dummy_read = 0;
85 int need_dummy_toku_read = 0;
86 int compress_output = 0;
87 int killqueries = 0;
88 int lock_all_tables = 0;
89 int sync_wait = -1;
90 guint snapshot_count= 2;
91 guint snapshot_interval = 60;
92 gboolean daemon_mode = FALSE;
93 gboolean have_snapshot_cloning = FALSE;
94 gboolean ignore_generated_fields = FALSE;
95 
96 gchar *ignore_engines = NULL;
97 char **ignore = NULL;
98 gchar *tables_list = NULL;
99 gchar *tidb_snapshot = NULL;
100 GSequence *tables_skiplist = NULL;
101 gchar *tables_skiplist_file = NULL;
102 char **tables = NULL;
103 GList *no_updated_tables = NULL;
104 
105 gboolean no_schemas = FALSE;
106 gboolean dump_checksums = FALSE;
107 gboolean no_data = FALSE;
108 gboolean no_locks = FALSE;
109 gboolean dump_triggers = FALSE;
110 gboolean dump_events = FALSE;
111 gboolean dump_routines = FALSE;
112 gboolean no_dump_views = FALSE;
113 gboolean less_locking = FALSE;
114 gboolean use_savepoints = FALSE;
115 gboolean success_on_1146 = FALSE;
116 gboolean no_backup_locks = FALSE;
117 gboolean insert_ignore = FALSE;
118 gboolean load_data = FALSE;
119 gboolean order_by_primary_key = FALSE;
120 
121 GList *innodb_tables = NULL;
122 GMutex *innodb_tables_mutex = NULL;
123 GList *non_innodb_table = NULL;
124 GMutex *non_innodb_table_mutex = NULL;
125 GList *table_schemas = NULL;
126 GMutex *table_schemas_mutex = NULL;
127 GList *view_schemas = NULL;
128 GMutex *view_schemas_mutex = NULL;
129 GList *schema_post = NULL;
130 GMutex *schema_post_mutex = NULL;
131 gint database_counter = 0;
132 gint non_innodb_table_counter = 0;
133 gint non_innodb_done = 0;
134 guint less_locking_threads = 0;
135 guint updated_since = 0;
136 guint trx_consistency_only = 0;
137 guint complete_insert = 0;
138 gchar *set_names_str=NULL;
139 gchar *where_option=NULL;
140 guint64 max_rows=1000000;
141 GHashTable *database_hash=NULL;
142 GHashTable *ref_table=NULL;
143 guint table_number;
144 
145 gchar *fields_terminated_by=NULL;
146 gchar *fields_enclosed_by=NULL;
147 gchar *fields_escaped_by=NULL;
148 gchar *lines_starting_by=NULL;
149 gchar *lines_terminated_by=NULL;
150 gchar *statement_terminated_by=NULL;
151 
152 gchar *fields_enclosed_by_ld=NULL;
153 gchar *fields_terminated_by_ld=NULL;
154 gchar *lines_starting_by_ld=NULL;
155 gchar *lines_terminated_by_ld=NULL;
156 gchar *statement_terminated_by_ld=NULL;
157 
158 // For daemon mode
159 guint dump_number = 0;
160 guint binlog_connect_id = 0;
161 gboolean shutdown_triggered = FALSE;
162 GAsyncQueue *start_scheduled_dump;
163 
164 GMainLoop *m1;
165 static GCond *ll_cond = NULL;
166 static GMutex *ll_mutex = NULL;
167 
168 int errors;
169 
170 static GOptionEntry entries[] = {
171     {"database", 'B', 0, G_OPTION_ARG_STRING, &db, "Database to dump", NULL},
172     {"tables-list", 'T', 0, G_OPTION_ARG_STRING, &tables_list,
173      "Comma delimited table list to dump (does not exclude regex option)",
174      NULL},
175     {"omit-from-file", 'O', 0, G_OPTION_ARG_STRING, &tables_skiplist_file,
176      "File containing a list of database.table entries to skip, one per line "
177      "(skips before applying regex option)",
178      NULL},
179     {"outputdir", 'o', 0, G_OPTION_ARG_FILENAME, &output_directory_param,
180      "Directory to output files to", NULL},
181     {"statement-size", 's', 0, G_OPTION_ARG_INT, &statement_size,
182      "Attempted size of INSERT statement in bytes, default 1000000", NULL},
183     {"rows", 'r', 0, G_OPTION_ARG_INT, &rows_per_file,
184      "Try to split tables into chunks of this many rows. This option turns off "
185      "--chunk-filesize",
186      NULL},
187     {"chunk-filesize", 'F', 0, G_OPTION_ARG_INT, &chunk_filesize,
188      "Split tables into chunks of this output file size. This value is in MB",
189      NULL},
190     {"max-rows", 0, 0, G_OPTION_ARG_INT64, &max_rows,
191      "Limit the amounto of rows per block after the table is estimated, default 1000000", NULL},
192     {"compress", 'c', 0, G_OPTION_ARG_NONE, &compress_output,
193      "Compress output files", NULL},
194     {"build-empty-files", 'e', 0, G_OPTION_ARG_NONE, &build_empty_files,
195      "Build dump files even if no data available from table", NULL},
196     {"regex", 'x', 0, G_OPTION_ARG_STRING, &regexstring,
197      "Regular expression for 'db.table' matching", NULL},
198     {"ignore-engines", 'i', 0, G_OPTION_ARG_STRING, &ignore_engines,
199      "Comma delimited list of storage engines to ignore", NULL},
200     {"insert-ignore", 'N', 0, G_OPTION_ARG_NONE, &insert_ignore,
201      "Dump rows with INSERT IGNORE", NULL},
202     {"no-schemas", 'm', 0, G_OPTION_ARG_NONE, &no_schemas,
203      "Do not dump table schemas with the data", NULL},
204     {"table-checksums", 'M', 0, G_OPTION_ARG_NONE, &dump_checksums,
205      "Dump table checksums with the data", NULL},
206     {"no-data", 'd', 0, G_OPTION_ARG_NONE, &no_data, "Do not dump table data",
207      NULL},
208     {"order-by-primary", 0, 0, G_OPTION_ARG_NONE, &order_by_primary_key,
209      "Sort the data by Primary Key or Unique key if no primary key exists",
210      NULL},
211     {"triggers", 'G', 0, G_OPTION_ARG_NONE, &dump_triggers, "Dump triggers",
212      NULL},
213     {"events", 'E', 0, G_OPTION_ARG_NONE, &dump_events, "Dump events", NULL},
214     {"routines", 'R', 0, G_OPTION_ARG_NONE, &dump_routines,
215      "Dump stored procedures and functions", NULL},
216     {"no-views", 'W', 0, G_OPTION_ARG_NONE, &no_dump_views, "Do not dump VIEWs",
217      NULL},
218     {"no-locks", 'k', 0, G_OPTION_ARG_NONE, &no_locks,
219      "Do not execute the temporary shared read lock.  WARNING: This will cause "
220      "inconsistent backups",
221      NULL},
222     {"no-backup-locks", 0, 0, G_OPTION_ARG_NONE, &no_backup_locks,
223      "Do not use Percona backup locks", NULL},
224     {"less-locking", 0, 0, G_OPTION_ARG_NONE, &less_locking,
225      "Minimize locking time on InnoDB tables.", NULL},
226     {"long-query-retries", 0, 0, G_OPTION_ARG_INT, &longquery_retries,
227      "Retry checking for long queries, default 0 (do not retry)", NULL},
228     {"long-query-retry-interval", 0, 0, G_OPTION_ARG_INT, &longquery_retry_interval,
229      "Time to wait before retrying the long query check in seconds, default 60", NULL},
230     {"long-query-guard", 'l', 0, G_OPTION_ARG_INT, &longquery,
231      "Set long query timer in seconds, default 60", NULL},
232     {"kill-long-queries", 'K', 0, G_OPTION_ARG_NONE, &killqueries,
233      "Kill long running queries (instead of aborting)", NULL},
234     {"daemon", 'D', 0, G_OPTION_ARG_NONE, &daemon_mode, "Enable daemon mode",
235      NULL},
236     {"snapshot-count", 'X', 0, G_OPTION_ARG_INT, &snapshot_count, "number of snapshots, default 2", NULL},
237     {"snapshot-interval", 'I', 0, G_OPTION_ARG_INT, &snapshot_interval,
238      "Interval between each dump snapshot (in minutes), requires --daemon, "
239      "default 60",
240      NULL},
241     {"logfile", 'L', 0, G_OPTION_ARG_FILENAME, &logfile,
242      "Log file name to use, by default stdout is used", NULL},
243     {"tz-utc", 0, G_OPTION_FLAG_REVERSE, G_OPTION_ARG_NONE, &skip_tz,
244      "SET TIME_ZONE='+00:00' at top of dump to allow dumping of TIMESTAMP data "
245      "when a server has data in different time zones or data is being moved "
246      "between servers with different time zones, defaults to on use "
247      "--skip-tz-utc to disable.",
248      NULL},
249     {"skip-tz-utc", 0, 0, G_OPTION_ARG_NONE, &skip_tz, "", NULL},
250     {"use-savepoints", 0, 0, G_OPTION_ARG_NONE, &use_savepoints,
251      "Use savepoints to reduce metadata locking issues, needs SUPER privilege",
252      NULL},
253     {"success-on-1146", 0, 0, G_OPTION_ARG_NONE, &success_on_1146,
254      "Not increment error count and Warning instead of Critical in case of "
255      "table doesn't exist",
256      NULL},
257     {"lock-all-tables", 0, 0, G_OPTION_ARG_NONE, &lock_all_tables,
258      "Use LOCK TABLE for all, instead of FTWRL", NULL},
259     {"updated-since", 'U', 0, G_OPTION_ARG_INT, &updated_since,
260      "Use Update_time to dump only tables updated in the last U days", NULL},
261     {"trx-consistency-only", 0, 0, G_OPTION_ARG_NONE, &trx_consistency_only,
262      "Transactional consistency only", NULL},
263     {"complete-insert", 0, 0, G_OPTION_ARG_NONE, &complete_insert,
264      "Use complete INSERT statements that include column names", NULL},
265     { "set-names",0, 0, G_OPTION_ARG_STRING, &set_names_str,
266       "Sets the names, use it at your own risk, default binary", NULL },
267     {"tidb-snapshot", 'z', 0, G_OPTION_ARG_STRING, &tidb_snapshot,
268      "Snapshot to use for TiDB", NULL},
269     {"load-data", 0, 0, G_OPTION_ARG_NONE, &load_data,
270      "", NULL },
271     {"fields-terminated-by", 0, 0, G_OPTION_ARG_STRING, &fields_terminated_by_ld,"", NULL },
272     {"fields-enclosed-by", 0, 0, G_OPTION_ARG_STRING, &fields_enclosed_by_ld,"", NULL },
273     {"fields-escaped-by", 0, 0, G_OPTION_ARG_STRING, &fields_escaped_by,
274       "Single character that is going to be used to escape characters in the"
275       "LOAD DATA stament, default: '\\' ", NULL },
276     {"lines-starting-by", 0, 0, G_OPTION_ARG_STRING, &lines_starting_by_ld,
277       "Adds the string at the begining of each row. When --load-data is used"
278       "it is added to the LOAD DATA statement. Its affects INSERT INTO statements"
279       "also when it is used.", NULL },
280     {"lines-terminated-by", 0, 0, G_OPTION_ARG_STRING, &lines_terminated_by_ld,
281       "Adds the string at the end of each row. When --load-data is used it is"
282        "added to the LOAD DATA statement. Its affects INSERT INTO statements"
283        "also when it is used.", NULL },
284     {"statement-terminated-by", 0, 0, G_OPTION_ARG_STRING, &statement_terminated_by_ld,
285       "This might never be used, unless you know what are you doing", NULL },
286     {"sync-wait", 0, 0, G_OPTION_ARG_INT, &sync_wait,
287      "WSREP_SYNC_WAIT value to set at SESSION level", NULL},
288     { "where", 0, 0, G_OPTION_ARG_STRING, &where_option,
289       "Dump only selected records.", NULL },
290     { "no-check-generated-fields", 0, 0, G_OPTION_ARG_NONE, &ignore_generated_fields,
291       "Queries related to generated fields are not going to be executed."
292       "It will lead to restoration issues if you have generated columns", NULL },
293     {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}};
294 
295 struct tm tval;
296 
297 void dump_schema_data(MYSQL *conn, char *database, char *table, char *filename);
298 void dump_triggers_data(MYSQL *conn, char *database, char *table,
299                         char *filename);
300 void dump_view_data(MYSQL *conn, char *database, char *table, char *filename,
301                     char *filename2);
302 void dump_schema(MYSQL *conn, struct db_table *dbt,
303                  struct configuration *conf);
304 void dump_checksum(struct db_table * dbt,
305                  struct configuration *conf);
306 void dump_view(struct db_table *dbt, struct configuration *conf);
307 void dump_table(MYSQL *conn, struct db_table *dbt,
308                 struct configuration *conf, gboolean is_innodb);
309 void dump_tables(MYSQL *, GList *, struct configuration *);
310 void dump_schema_post(struct database *database, struct configuration *conf);
311 void restore_charset(GString *statement);
312 void set_charset(GString *statement, char *character_set,
313                  char *collation_connection);
314 void dump_schema_post_data(MYSQL *conn, struct database *database, char *filename);
315 guint64 dump_table_data(MYSQL *conn, FILE *file, struct table_job *tj);
316 void dump_database(struct database *database, struct configuration *);
317 void dump_database_thread(MYSQL *, struct configuration*, struct database *);
318 void dump_create_database(char *, struct configuration *);
319 void dump_create_database_data(MYSQL *, char *, char *);
320 void get_tables(MYSQL *conn, struct configuration *);
321 gchar *get_primary_key_string(MYSQL *conn, char *database, char *table);
322 void get_not_updated(MYSQL *conn, FILE *);
323 GList *get_chunks_for_table(MYSQL *, char *, char *,
324                             struct configuration *conf);
325 guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field,
326                        char *from, char *to);
327 void dump_table_data_file(MYSQL *conn, struct table_job * tj);
328 void dump_table_checksum(MYSQL *conn, char *database, char *table,  char *filename);
329 //void create_backup_dir(char *directory);
330 gboolean write_data(FILE *, GString *);
331 gboolean check_regex(char *database, char *table);
332 gboolean check_skiplist(char *database, char *table);
333 int tables_skiplist_cmp(gconstpointer a, gconstpointer b, gpointer user_data);
334 void read_tables_skiplist(const gchar *filename);
335 void start_dump(MYSQL *conn);
336 MYSQL *create_main_connection();
337 void *exec_thread(void *data);
338 void write_log_file(const gchar *log_domain, GLogLevelFlags log_level,
339                     const gchar *message, gpointer user_data);
340 struct database * new_database(MYSQL *conn, char *database_name, gboolean already_dumped);
341 gchar *get_ref_table(gchar *k);
342 gboolean get_database(MYSQL *conn, char *database_name, struct database ** database);
343 
check_regex_general(char * regex,char * word)344 gboolean check_regex_general(char *regex, char *word) {
345   /* This is not going to be used in threads */
346   static pcre *re = NULL;
347   int rc;
348   int ovector[9] = {0};
349   const char *error;
350   int erroroffset;
351 
352   /* Let's compile the RE before we do anything */
353   if (!re) {
354     re = pcre_compile(regex, PCRE_CASELESS | PCRE_MULTILINE, &error,
355                       &erroroffset, NULL);
356     if (!re) {
357       g_critical("Regular expression fail: %s", error);
358       exit(EXIT_FAILURE);
359     }
360   }
361 
362   rc = pcre_exec(re, NULL, word, strlen(word), 0, 0, ovector, 9);
363   return (rc > 0) ? TRUE : FALSE;
364 }
365 
determine_filename(char * table)366 char * determine_filename (char * table){
367   // https://stackoverflow.com/questions/11794144/regular-expression-for-valid-filename
368   // We might need to define a better filename alternatives
369   char * regex=strdup("^[\\w\\-_ ]+$");
370   if (check_regex_general(regex,table) && !g_strstr_len(table,-1,".") && !g_str_has_prefix(table,"mydumper_") )
371     return table;
372   else{
373     char *r = g_strdup_printf("mydumper_%d",table_number);
374     table_number++;
375     return r;
376   }
377 
378 }
379 
sig_triggered(gpointer user_data)380 gboolean sig_triggered(gpointer user_data) {
381   (void)user_data;
382 
383   g_message("Shutting down gracefully");
384   shutdown_triggered = TRUE;
385   g_main_loop_quit(m1);
386   return FALSE;
387 }
388 
389 
build_meta_filename(char * database,char * table,const char * suffix)390 gchar * build_meta_filename(char *database, char *table, const char *suffix){
391   GString *filename = g_string_sized_new(20);
392   g_string_append_printf(filename, "%s.%s-%s", database, table, suffix);
393   gchar *r = g_build_filename(dump_directory, filename->str, NULL);
394   g_string_free(filename,TRUE);
395   return r;
396 }
397 
build_schema_filename(char * database,const char * suffix)398 gchar * build_schema_filename(char *database, const char *suffix){
399   GString *filename = g_string_sized_new(20);
400   g_string_append_printf(filename, "%s-%s.sql%s", database, suffix, compress_extension);
401   gchar *r = g_build_filename(dump_directory, filename->str, NULL);
402   g_string_free(filename,TRUE);
403   return r;
404 }
405 
build_schema_table_filename(char * database,char * table,const char * suffix)406 gchar * build_schema_table_filename(char *database, char *table, const char *suffix){
407   GString *filename = g_string_sized_new(20);
408   g_string_append_printf(filename, "%s.%s-%s.sql%s", database, table, suffix, compress_extension);
409   gchar *r = g_build_filename(dump_directory, filename->str, NULL);
410   g_string_free(filename,TRUE);
411   return r;
412 }
413 
414 // Global Var used:
415 // - dump_directory
416 // - compress_extension
build_filename(char * database,char * table,guint part,const gchar * extension)417 gchar * build_filename(char *database, char *table, guint part, const gchar *extension){
418   GString *filename = g_string_sized_new(20);
419   g_string_append_printf(filename, "%s.%s.%05d.%s%s", database, table, part, extension, compress_extension);
420   gchar *r = g_build_filename(dump_directory, filename->str, NULL);
421   g_string_free(filename,TRUE);
422   return r;
423 }
424 
425 
build_data_filename(char * database,char * table,guint part)426 gchar * build_data_filename(char *database, char *table, guint part ){
427   return build_filename(database,table,part,"sql");
428 }
429 
clear_dump_directory(gchar * directory)430 void clear_dump_directory(gchar *directory) {
431   GError *error = NULL;
432   GDir *dir = g_dir_open(directory, 0, &error);
433 
434   if (error) {
435     g_critical("cannot open directory %s, %s\n", directory,
436                error->message);
437     errors++;
438     return;
439   }
440 
441   const gchar *filename = NULL;
442 
443   while ((filename = g_dir_read_name(dir))) {
444     gchar *path = g_build_filename(directory, filename, NULL);
445     if (g_unlink(path) == -1) {
446       g_critical("error removing file %s (%d)\n", path, errno);
447       errors++;
448       return;
449     }
450     g_free(path);
451   }
452 
453   g_dir_close(dir);
454 }
455 
run_snapshot(gpointer * data)456 gboolean run_snapshot(gpointer *data) {
457   (void)data;
458 
459   g_async_queue_push(start_scheduled_dump, GINT_TO_POINTER(1));
460 
461   return (shutdown_triggered) ? FALSE : TRUE;
462 }
463 
464 
465 /* Check database.table string against regular expression */
466 
check_regex(char * database,char * table)467 gboolean check_regex(char *database, char *table) {
468   /* This is not going to be used in threads */
469   static pcre *re = NULL;
470   int rc;
471   int ovector[9] = {0};
472   const char *error;
473   int erroroffset;
474 
475   char *p;
476 
477   /* Let's compile the RE before we do anything */
478   if (!re) {
479     re = pcre_compile(regexstring, PCRE_CASELESS | PCRE_MULTILINE, &error,
480                       &erroroffset, NULL);
481     if (!re) {
482       g_critical("Regular expression fail: %s", error);
483       exit(EXIT_FAILURE);
484     }
485   }
486 
487   p = g_strdup_printf("%s.%s", database, table);
488   rc = pcre_exec(re, NULL, p, strlen(p), 0, 0, ovector, 9);
489   g_free(p);
490 
491   return (rc > 0) ? TRUE : FALSE;
492 }
493 
494 /* Check database.table string against skip list; returns TRUE if found */
495 
check_skiplist(char * database,char * table)496 gboolean check_skiplist(char *database, char *table) {
497   if (g_sequence_lookup(tables_skiplist,
498                         g_strdup_printf("%s.%s", database, table),
499                         tables_skiplist_cmp, NULL)) {
500     return TRUE;
501   } else {
502     return FALSE;
503   };
504 }
505 
506 /* Comparison function for skiplist sort and lookup */
507 
tables_skiplist_cmp(gconstpointer a,gconstpointer b,gpointer user_data)508 int tables_skiplist_cmp(gconstpointer a, gconstpointer b, gpointer user_data) {
509   /* Not using user_data, but needed for function prototype, shutting up
510    * compiler warnings about unused variable */
511   (void)user_data;
512   /* Any sorting function would work, as long as its usage is consistent
513    * between sort and lookup.  strcmp should be one of the fastest. */
514   return strcmp(a, b);
515 }
516 
517 /* Read the list of tables to skip from the given filename, and prepares them
518  * for future lookups. */
519 
read_tables_skiplist(const gchar * filename)520 void read_tables_skiplist(const gchar *filename) {
521   GIOChannel *tables_skiplist_channel = NULL;
522   gchar *buf = NULL;
523   GError *error = NULL;
524   /* Create skiplist if it does not exist */
525   if (!tables_skiplist) {
526     tables_skiplist = g_sequence_new(NULL);
527   };
528   tables_skiplist_channel = g_io_channel_new_file(filename, "r", &error);
529 
530   /* Error opening/reading the file? bail out. */
531   if (!tables_skiplist_channel) {
532     g_critical("cannot read/open file %s, %s\n", filename, error->message);
533     errors++;
534     return;
535   };
536 
537   /* Read lines, push them to the list */
538   do {
539     g_io_channel_read_line(tables_skiplist_channel, &buf, NULL, NULL, NULL);
540     if (buf) {
541       g_strchomp(buf);
542       g_sequence_append(tables_skiplist, buf);
543     };
544   } while (buf);
545   g_io_channel_shutdown(tables_skiplist_channel, FALSE, NULL);
546   /* Sort the list, so that lookups work */
547   g_sequence_sort(tables_skiplist, tables_skiplist_cmp, NULL);
548   g_message("Omit list file contains %d tables to skip\n",
549             g_sequence_get_length(tables_skiplist));
550   return;
551 }
552 
553 /* Write some stuff we know about snapshot, before it changes */
write_snapshot_info(MYSQL * conn,FILE * file)554 void write_snapshot_info(MYSQL *conn, FILE *file) {
555   MYSQL_RES *master = NULL, *slave = NULL, *mdb = NULL;
556   MYSQL_FIELD *fields;
557   MYSQL_ROW row;
558 
559   char *masterlog = NULL;
560   char *masterpos = NULL;
561   char *mastergtid = NULL;
562 
563   char *connname = NULL;
564   char *slavehost = NULL;
565   char *slavelog = NULL;
566   char *slavepos = NULL;
567   char *slavegtid = NULL;
568   guint isms;
569   guint i;
570 
571   mysql_query(conn, "SHOW MASTER STATUS");
572   master = mysql_store_result(conn);
573   if (master && (row = mysql_fetch_row(master))) {
574     masterlog = row[0];
575     masterpos = row[1];
576     /* Oracle/Percona GTID */
577     if (mysql_num_fields(master) == 5) {
578       mastergtid = row[4];
579     } else {
580       /* Let's try with MariaDB 10.x */
581       /* Use gtid_binlog_pos due to issue with gtid_current_pos with galera
582        * cluster, gtid_binlog_pos works as well with normal mariadb server
583        * https://jira.mariadb.org/browse/MDEV-10279 */
584       mysql_query(conn, "SELECT @@gtid_binlog_pos");
585       mdb = mysql_store_result(conn);
586       if (mdb && (row = mysql_fetch_row(mdb))) {
587         mastergtid = row[0];
588       }
589     }
590   }
591 
592   if (masterlog) {
593     fprintf(file, "SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n",
594             masterlog, masterpos, mastergtid);
595     g_message("Written master status");
596   }
597 
598   isms = 0;
599   mysql_query(conn, "SELECT @@default_master_connection");
600   MYSQL_RES *rest = mysql_store_result(conn);
601   if (rest != NULL && mysql_num_rows(rest)) {
602     mysql_free_result(rest);
603     g_message("Multisource slave detected.");
604     isms = 1;
605   }
606 
607   if (isms)
608     mysql_query(conn, "SHOW ALL SLAVES STATUS");
609   else
610     mysql_query(conn, "SHOW SLAVE STATUS");
611 
612   guint slave_count=0;
613   slave = mysql_store_result(conn);
614   while (slave && (row = mysql_fetch_row(slave))) {
615     fields = mysql_fetch_fields(slave);
616     for (i = 0; i < mysql_num_fields(slave); i++) {
617       if (isms && !strcasecmp("connection_name", fields[i].name))
618         connname = row[i];
619       if (!strcasecmp("exec_master_log_pos", fields[i].name)) {
620         slavepos = row[i];
621       } else if (!strcasecmp("relay_master_log_file", fields[i].name)) {
622         slavelog = row[i];
623       } else if (!strcasecmp("master_host", fields[i].name)) {
624         slavehost = row[i];
625       } else if (!strcasecmp("Executed_Gtid_Set", fields[i].name) ||
626                  !strcasecmp("Gtid_Slave_Pos", fields[i].name)) {
627         slavegtid = row[i];
628       }
629     }
630     if (slavehost) {
631       slave_count++;
632       fprintf(file, "SHOW SLAVE STATUS:");
633       if (isms)
634         fprintf(file, "\n\tConnection name: %s", connname);
635       fprintf(file, "\n\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n",
636               slavehost, slavelog, slavepos, slavegtid);
637       g_message("Written slave status");
638     }
639   }
640   if (slave_count > 1)
641     g_warning("Multisource replication found. Do not trust in the exec_master_log_pos as it might cause data inconsistencies. Search 'Replication and Transaction Inconsistencies' on MySQL Documentation");
642 
643   fflush(file);
644   if (master)
645     mysql_free_result(master);
646   if (slave)
647     mysql_free_result(slave);
648   if (mdb)
649     mysql_free_result(mdb);
650 }
651 
652 // Free structures
653 
free_table_job(struct table_job * tj)654 void free_table_job(struct table_job *tj){
655   if (tj->table)
656     g_free(tj->table);
657   if (tj->where)
658     g_free(tj->where);
659   if (tj->order_by)
660     g_free(tj->order_by);
661   if (tj->filename)
662     g_free(tj->filename);
663 //  g_free(tj);
664 }
665 
free_schema_job(struct schema_job * sj)666 void free_schema_job(struct schema_job *sj){
667   if (sj->table)
668     g_free(sj->table);
669   if (sj->filename)
670     g_free(sj->filename);
671 //  g_free(sj);
672 }
673 
free_table_checksum_job(struct table_checksum_job * tcj)674 void free_table_checksum_job(struct table_checksum_job*tcj){
675       if (tcj->table)
676         g_free(tcj->table);
677       if (tcj->filename)
678         g_free(tcj->filename);
679  //     g_free(tcj);
680 }
free_view_job(struct view_job * vj)681 void free_view_job(struct view_job *vj){
682   if (vj->table)
683     g_free(vj->table);
684   if (vj->filename)
685     g_free(vj->filename);
686   if (vj->filename2)
687     g_free(vj->filename2);
688 //  g_free(vj);
689 }
690 
free_schema_post_job(struct schema_post_job * sp)691 void free_schema_post_job(struct schema_post_job *sp){
692   if (sp->filename)
693     g_free(sp->filename);
694 //  g_free(sp);
695 }
696 
free_create_database_job(struct create_database_job * cdj)697 void free_create_database_job(struct create_database_job * cdj){
698   if (cdj->filename)
699     g_free(cdj->filename);
700 //  g_free(cdj);
701 }
702 
message_dumping_data(struct thread_data * td,struct table_job * tj)703 void message_dumping_data(struct thread_data *td, struct table_job *tj){
704   g_message("Thread %d dumping data for `%s`.`%s`%s%s%s%s%s%s | Remaining jobs: %d",
705                     td->thread_id, tj->database, tj->table,
706 		    (tj->where || where_option ) ? " WHERE " : "", tj->where ? tj->where : "",
707 		    (tj->where && where_option ) ? " AND " : "", where_option ? where_option : "",
708                     tj->order_by ? " ORDER BY " : "", tj->order_by ? tj->order_by : "", g_async_queue_length(td->queue));
709 }
710 
process_stream(void * data)711 void *process_stream(void *data){
712   (void)data;
713   char * filename=NULL;
714   FILE * f=NULL;
715   char buf[1024];
716   int buflen;
717   ssize_t len=0;
718   for(;;){
719     filename=(char *)g_async_queue_pop(stream_queue);
720     if (strlen(filename) == 0){
721       break;
722     }
723     char *used_filemame=g_path_get_basename(filename);
724     len=m_write(stdout, "\n-- ", 4);
725     len=m_write(stdout, used_filemame, strlen(used_filemame));
726     len=m_write(stdout, "\n", 1);
727     free(used_filemame);
728     f=m_open(filename,"r");
729     while((buflen = read(fileno(f), buf, 1024)) > 0)
730     {
731       len=m_write(stdout, buf, buflen);
732       if (len != buflen){
733         g_critical("Stream failed during transmition of file: %s",filename);
734         exit(EXIT_FAILURE);
735       }
736     }
737     m_close(f);
738     if (no_delete == FALSE){
739       remove(filename);
740     }
741   }
742   return NULL;
743 }
744 
745 
746 
thd_JOB_DUMP_DATABASE(struct configuration * conf,struct thread_data * td,struct job * job)747 void thd_JOB_DUMP_DATABASE(struct configuration *conf, struct thread_data *td, struct job *job){
748   struct dump_database_job * ddj = (struct dump_database_job *)job->job_data;
749   g_message("Thread %d dumping db information for `%s`", td->thread_id,
750             ddj->database->name);
751   dump_database_thread(td->thrconn, conf, ddj->database);
752   g_free(ddj);
753   g_free(job);
754   if (g_atomic_int_dec_and_test(&database_counter)) {
755    g_async_queue_push(conf->ready_database_dump, GINT_TO_POINTER(1));
756   }
757 }
758 
thd_JOB_CREATE_DATABASE(struct thread_data * td,struct job * job)759 void thd_JOB_CREATE_DATABASE(struct thread_data *td, struct job *job){
760   struct create_database_job * cdj = (struct create_database_job *)job->job_data;
761   g_message("Thread %d dumping schema create for `%s`", td->thread_id,
762             cdj->database);
763   dump_create_database_data(td->thrconn, cdj->database, cdj->filename);
764   free_create_database_job(cdj);
765   g_free(job);
766 }
767 
thd_JOB_SCHEMA_POST(struct thread_data * td,struct job * job)768 void thd_JOB_SCHEMA_POST(struct thread_data *td, struct job *job){
769   struct schema_post_job * sp = (struct schema_post_job *)job->job_data;
770   g_message("Thread %d dumping SP and VIEWs for `%s`", td->thread_id,
771             sp->database->name);
772   dump_schema_post_data(td->thrconn, sp->database, sp->filename);
773   free_schema_post_job(sp);
774   g_free(job);
775 }
776 
thd_JOB_VIEW(struct thread_data * td,struct job * job)777 void thd_JOB_VIEW(struct thread_data *td, struct job *job){
778   struct view_job * vj = (struct view_job *)job->job_data;
779   g_message("Thread %d dumping view for `%s`.`%s`", td->thread_id,
780             vj->database, vj->table);
781   dump_view_data(td->thrconn, vj->database, vj->table, vj->filename,
782                  vj->filename2);
783   free_view_job(vj);
784   g_free(job);
785 }
786 
thd_JOB_SCHEMA(struct thread_data * td,struct job * job)787 void thd_JOB_SCHEMA(struct thread_data *td, struct job *job){
788   struct schema_job *sj = (struct schema_job *)job->job_data;
789   g_message("Thread %d dumping schema for `%s`.`%s`", td->thread_id,
790             sj->database, sj->table);
791   dump_schema_data(td->thrconn, sj->database, sj->table, sj->filename);
792   free_schema_job(sj);
793   g_free(job);
794 }
795 
thd_JOB_TRIGGERS(struct thread_data * td,struct job * job)796 void thd_JOB_TRIGGERS(struct thread_data *td, struct job *job){
797   struct schema_job * sj = (struct schema_job *)job->job_data;
798   g_message("Thread %d dumping triggers for `%s`.`%s`", td->thread_id,
799             sj->database, sj->table);
800   dump_triggers_data(td->thrconn, sj->database, sj->table, sj->filename);
801   free_schema_job(sj);
802   g_free(job);
803 }
804 
initialize_thread(struct thread_data * td)805 void initialize_thread(struct thread_data *td){
806   configure_connection(td->thrconn, "mydumper");
807 
808   if (!mysql_real_connect(td->thrconn, hostname, username, password, NULL, port,
809                           socket_path, 0)) {
810     g_critical("Failed to connect to database: %s", mysql_error(td->thrconn));
811     exit(EXIT_FAILURE);
812   } else {
813     g_message("Thread %d connected using MySQL connection ID %lu",
814               td->thread_id, mysql_thread_id(td->thrconn));
815   }
816 
817 }
818 
initialize_consistent_snapshot(struct thread_data * td)819 void initialize_consistent_snapshot(struct thread_data *td){
820 
821   if ( sync_wait != -1 && mysql_query(td->thrconn, g_strdup_printf("SET SESSION WSREP_SYNC_WAIT = %d",sync_wait))){
822     g_critical("Failed to set wsrep_sync_wait for the thread: %s",
823                mysql_error(td->thrconn));
824     exit(EXIT_FAILURE);
825   }
826   if (mysql_query(td->thrconn,
827                   "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")) {
828     g_critical("Failed to set isolation level: %s", mysql_error(td->thrconn));
829     exit(EXIT_FAILURE);
830   }
831   if (mysql_query(td->thrconn,
832                   "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */")) {
833     g_critical("Failed to start consistent snapshot: %s", mysql_error(td->thrconn));
834     exit(EXIT_FAILURE);
835   }
836 }
837 
check_connection_status(struct thread_data * td)838 void check_connection_status(struct thread_data *td){
839   if (detected_server == SERVER_TYPE_TIDB) {
840     // Worker threads must set their tidb_snapshot in order to be safe
841     // Because no locking has been used.
842     gchar *query =
843         g_strdup_printf("SET SESSION tidb_snapshot = '%s'", tidb_snapshot);
844     if (mysql_query(td->thrconn, query)) {
845       g_critical("Failed to set tidb_snapshot: %s", mysql_error(td->thrconn));
846       exit(EXIT_FAILURE);
847     }
848     g_free(query);
849     g_message("Thread %d set to tidb_snapshot '%s'", td->thread_id,
850               tidb_snapshot);
851   }
852 
853   /* Unfortunately version before 4.1.8 did not support consistent snapshot
854    * transaction starts, so we cheat */
855   if (need_dummy_read) {
856     mysql_query(td->thrconn,
857                 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy");
858     MYSQL_RES *res = mysql_store_result(td->thrconn);
859     if (res)
860       mysql_free_result(res);
861   }
862   if (need_dummy_toku_read) {
863     mysql_query(td->thrconn,
864                 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.tokudbdummy");
865     MYSQL_RES *res = mysql_store_result(td->thrconn);
866     if (res)
867       mysql_free_result(res);
868   }
869 }
870 
process_queue(struct thread_data * td)871 void *process_queue(struct thread_data *td) {
872   struct configuration *conf = td->conf;
873   // mysql_init is not thread safe, especially in Connector/C
874   g_mutex_lock(init_mutex);
875   td->thrconn = mysql_init(NULL);
876   g_mutex_unlock(init_mutex);
877 
878   initialize_thread(td);
879   execute_gstring(td->thrconn, set_session);
880 
881 //  if ((detected_server == SERVER_TYPE_MYSQL) &&
882 //      mysql_query(thrconn, "SET SESSION wait_timeout = 2147483")) {
883 //    g_warning("Failed to increase wait_timeout: %s", mysql_error(thrconn));
884 //  }
885 
886   if (!skip_tz && mysql_query(td->thrconn, "/*!40103 SET TIME_ZONE='+00:00' */")) {
887     g_critical("Failed to set time zone: %s", mysql_error(td->thrconn));
888   }
889   if (!td->less_locking_stage){
890     if (use_savepoints && mysql_query(td->thrconn, "SET SQL_LOG_BIN = 0")) {
891       g_critical("Failed to disable binlog for the thread: %s",
892                  mysql_error(td->thrconn));
893       exit(EXIT_FAILURE);
894     }
895     initialize_consistent_snapshot(td);
896     check_connection_status(td);
897   }
898   mysql_query(td->thrconn, set_names_str);
899 
900   g_async_queue_push(td->ready, GINT_TO_POINTER(1));
901 
902   struct job *job = NULL;
903   struct table_job *tj = NULL;
904   struct table_checksum_job *tcj = NULL;
905   struct tables_job *mj = NULL;
906   GList *glj;
907   int first = 1;
908   GString *query = g_string_new(NULL);
909   GString *prev_table = g_string_new(NULL);
910   GString *prev_database = g_string_new(NULL);
911   /* if less locking we need to wait until that threads finish
912       progressively waking up these threads */
913   if (!td->less_locking_stage && less_locking) {
914     g_mutex_lock(ll_mutex);
915 
916     while (less_locking_threads >= td->thread_id) {
917       g_cond_wait(ll_cond, ll_mutex);
918     }
919 
920     g_mutex_unlock(ll_mutex);
921   }
922 
923   for (;;) {
924     GTimeVal tv;
925     g_get_current_time(&tv);
926     g_time_val_add(&tv, 1000 * 1000 * 1);
927     job = (struct job *)g_async_queue_pop(td->queue);
928     if (shutdown_triggered && (job->type != JOB_SHUTDOWN)) {
929       continue;
930     }
931 
932     switch (job->type) {
933     case JOB_LOCK_DUMP_NON_INNODB:
934       mj = (struct tables_job *)job->job_data;
935       for (glj = mj->table_job_list; glj != NULL; glj = glj->next) {
936         tj = (struct table_job *)glj->data;
937         if (first) {
938           g_string_printf(query, "LOCK TABLES `%s`.`%s` READ LOCAL",
939                           tj->database, tj->table);
940           first = 0;
941         } else {
942           if (g_ascii_strcasecmp(prev_database->str, tj->database) ||
943               g_ascii_strcasecmp(prev_table->str, tj->table)) {
944             g_string_append_printf(query, ", `%s`.`%s` READ LOCAL",
945                                    tj->database, tj->table);
946           }
947         }
948         g_string_printf(prev_table, "%s", tj->table);
949         g_string_printf(prev_database, "%s", tj->database);
950       }
951       first = 1;
952       if (mysql_query(td->thrconn, query->str)) {
953         g_critical("Non Innodb lock tables fail: %s", mysql_error(td->thrconn));
954         exit(EXIT_FAILURE);
955       }
956       if (g_atomic_int_dec_and_test(&non_innodb_table_counter) &&
957           g_atomic_int_get(&non_innodb_done)) {
958         g_async_queue_push(conf->unlock_tables, GINT_TO_POINTER(1));
959       }
960       for (glj = mj->table_job_list; glj != NULL; glj = glj->next) {
961         tj = (struct table_job *)glj->data;
962         message_dumping_data(td,tj);
963         dump_table_data_file(td->thrconn, tj);
964         free_table_job(tj);
965         g_free(tj);
966       }
967       mysql_query(td->thrconn, "UNLOCK TABLES /* Non Innodb */");
968       g_list_free(mj->table_job_list);
969       g_free(mj);
970       g_free(job);
971       break;
972     case JOB_DUMP:
973       tj = (struct table_job *)job->job_data;
974       message_dumping_data(td,tj);
975       if (use_savepoints && mysql_query(td->thrconn, "SAVEPOINT mydumper")) {
976         g_critical("Savepoint failed: %s", mysql_error(td->thrconn));
977       }
978       dump_table_data_file(td->thrconn, tj);
979       if (use_savepoints &&
980           mysql_query(td->thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
981         g_critical("Rollback to savepoint failed: %s", mysql_error(td->thrconn));
982       }
983       free_table_job(tj);
984       g_free(job);
985       break;
986      case JOB_CHECKSUM:
987       tcj = (struct table_checksum_job *)job->job_data;
988         g_message("Thread %d dumping checksum for `%s`.`%s`", td->thread_id,
989                   tcj->database, tcj->table);
990       if (use_savepoints && mysql_query(td->thrconn, "SAVEPOINT mydumper")) {
991         g_critical("Savepoint failed: %s", mysql_error(td->thrconn));
992       }
993       dump_table_checksum(td->thrconn, tcj->database, tcj->table, tcj->filename);
994       if (use_savepoints &&
995           mysql_query(td->thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
996         g_critical("Rollback to savepoint failed: %s", mysql_error(td->thrconn));
997       }
998       free_table_checksum_job(tcj);
999       g_free(job);
1000       break;
1001     case JOB_DUMP_NON_INNODB:
1002       tj = (struct table_job *)job->job_data;
1003       message_dumping_data(td,tj);
1004       if (use_savepoints && mysql_query(td->thrconn, "SAVEPOINT mydumper")) {
1005         g_critical("Savepoint failed: %s", mysql_error(td->thrconn));
1006       }
1007       dump_table_data_file(td->thrconn, tj);
1008       if (use_savepoints &&
1009           mysql_query(td->thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
1010         g_critical("Rollback to savepoint failed: %s", mysql_error(td->thrconn));
1011       }
1012       free_table_job(tj);
1013       g_free(job);
1014       if (g_atomic_int_dec_and_test(&non_innodb_table_counter) &&
1015           g_atomic_int_get(&non_innodb_done)) {
1016         g_async_queue_push(conf->unlock_tables, GINT_TO_POINTER(1));
1017       }
1018       break;
1019     case JOB_DUMP_DATABASE:
1020       thd_JOB_DUMP_DATABASE(conf,td,job);
1021       break;
1022     case JOB_CREATE_DATABASE:
1023       thd_JOB_CREATE_DATABASE(td,job);
1024       break;
1025     case JOB_SCHEMA:
1026       thd_JOB_SCHEMA(td,job);
1027       break;
1028     case JOB_VIEW:
1029       thd_JOB_VIEW(td,job);
1030       break;
1031     case JOB_TRIGGERS:
1032       thd_JOB_TRIGGERS(td,job);
1033       break;
1034     case JOB_SCHEMA_POST:
1035       thd_JOB_SCHEMA_POST(td,job);
1036       break;
1037     case JOB_SHUTDOWN:
1038       g_message("Thread %d shutting down", td->thread_id);
1039       if (td->less_locking_stage){
1040         g_mutex_lock(ll_mutex);
1041         less_locking_threads--;
1042         g_cond_broadcast(ll_cond);
1043         g_mutex_unlock(ll_mutex);
1044         g_string_free(query, TRUE);
1045         g_string_free(prev_table, TRUE);
1046         g_string_free(prev_database, TRUE);
1047       }
1048       if (td->thrconn)
1049         mysql_close(td->thrconn);
1050       g_free(job);
1051       mysql_thread_end();
1052       return NULL;
1053       break;
1054     default:
1055       g_critical("Something very bad happened!");
1056       exit(EXIT_FAILURE);
1057     }
1058   }
1059   if (td->thrconn)
1060     mysql_close(td->thrconn);
1061   mysql_thread_end();
1062   return NULL;
1063 }
1064 
main(int argc,char * argv[])1065 int main(int argc, char *argv[]) {
1066   GError *error = NULL;
1067   GOptionContext *context;
1068 
1069   g_thread_init(NULL);
1070   setlocale(LC_ALL, "");
1071 
1072   ref_table_mutex = g_mutex_new();
1073   init_mutex = g_mutex_new();
1074   innodb_tables_mutex = g_mutex_new();
1075   non_innodb_table_mutex = g_mutex_new();
1076   table_schemas_mutex = g_mutex_new();
1077   view_schemas_mutex = g_mutex_new();
1078   schema_post_mutex = g_mutex_new();
1079   ll_mutex = g_mutex_new();
1080   ll_cond = g_cond_new();
1081   database_hash=g_hash_table_new ( g_str_hash, g_str_equal );
1082   ref_table=g_hash_table_new ( g_str_hash, g_str_equal );
1083   context = g_option_context_new("multi-threaded MySQL dumping");
1084   GOptionGroup *main_group =
1085       g_option_group_new("main", "Main Options", "Main Options", NULL, NULL);
1086   g_option_group_add_entries(main_group, entries);
1087   g_option_group_add_entries(main_group, common_entries);
1088   g_option_context_set_main_group(context, main_group);
1089   gchar ** tmpargv=g_strdupv(argv);
1090   int tmpargc=argc;
1091   if (!g_option_context_parse(context, &tmpargc, &tmpargv, &error)) {
1092     g_print("option parsing failed: %s, try --help\n", error->message);
1093     exit(EXIT_FAILURE);
1094   }
1095 
1096   set_session = g_string_new(NULL);
1097 
1098   if (defaults_file != NULL){
1099     load_config_file(defaults_file, context, "mydumper");
1100   }
1101   g_option_context_free(context);
1102 
1103   if (!compress_output) {
1104     m_open=&g_fopen;
1105     m_close=(void *) &fclose;
1106     m_write=(void *)&write_file;
1107     compress_extension=g_strdup("");
1108   } else {
1109     m_open=(void *) &gzopen;
1110     m_close=(void *) &gzclose;
1111     m_write=(void *)&gzwrite;
1112 #ifdef ZWRAP_USE_ZSTD
1113     compress_extension = g_strdup(".zst");
1114 #else
1115     compress_extension = g_strdup(".gz");
1116 #endif
1117   }
1118 
1119   if (password != NULL){
1120     int i=1;
1121     for(i=1; i < argc; i++){
1122       gchar * p= g_strstr_len(argv[i],-1,password);
1123       if (p != NULL){
1124         strncpy(p, "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", strlen(password));
1125       }
1126     }
1127   }
1128 
1129   // prompt for password if it's NULL
1130   if (sizeof(password) == 0 || (password == NULL && askPassword)) {
1131     password = passwordPrompt();
1132   }
1133 
1134   if (load_data){
1135     if (!fields_enclosed_by_ld){
1136     	fields_enclosed_by=g_strdup("");
1137     }else if(strlen(fields_enclosed_by_ld)>1){
1138 	    g_error("--fields-enclosed-by must be a single character");
1139       exit(EXIT_FAILURE);
1140     }else{
1141       fields_enclosed_by=fields_enclosed_by_ld;
1142     }
1143 
1144     if (fields_escaped_by){
1145       if(strlen(fields_escaped_by)>1){
1146 	      g_error("--fields-escaped-by must be a single character");
1147         exit(EXIT_FAILURE);
1148       }else if (strcmp(fields_escaped_by,"\\")==0){
1149         fields_escaped_by=g_strdup("\\\\");
1150       }
1151     }
1152   }
1153 
1154   if (!fields_terminated_by_ld){
1155     if (load_data){
1156       fields_terminated_by=g_strdup("\t");
1157 //      fields_terminated_by_ld=g_strdup("\\t")
1158     }else
1159       fields_terminated_by=g_strdup(",");
1160   }else
1161     fields_terminated_by=replace_escaped_strings(g_strdup(fields_terminated_by_ld));
1162   if (!lines_starting_by_ld){
1163     if (load_data)
1164       lines_starting_by=g_strdup("");
1165     else
1166   	  lines_starting_by=g_strdup("(");
1167   }else
1168     lines_starting_by=replace_escaped_strings(g_strdup(lines_starting_by_ld));
1169   if (!lines_terminated_by_ld){
1170     if (load_data){
1171       lines_terminated_by=g_strdup("\n");
1172       lines_terminated_by_ld=g_strdup("\\n");
1173     }else
1174   	  lines_terminated_by=g_strdup(")\n");
1175   }else
1176     lines_terminated_by=replace_escaped_strings(g_strdup(lines_terminated_by_ld));
1177   if (!statement_terminated_by_ld){
1178     if (load_data)
1179       statement_terminated_by=g_strdup("");
1180     else
1181   	  statement_terminated_by=g_strdup(";\n");
1182   }else
1183     statement_terminated_by=replace_escaped_strings(g_strdup(statement_terminated_by_ld));
1184 
1185 
1186   if (set_names_str){
1187     gchar *tmp_str=g_strdup_printf("/*!40101 SET NAMES %s*/",set_names_str);
1188     set_names_str=tmp_str;
1189   } else
1190     set_names_str=g_strdup("/*!40101 SET NAMES binary*/");
1191 
1192   if (program_version) {
1193     g_print("mydumper %s, built against MySQL %s\n", VERSION,
1194             MYSQL_VERSION_STR);
1195     exit(EXIT_SUCCESS);
1196   }
1197 
1198   set_verbose(verbose);
1199 
1200   GDateTime * datetime = g_date_time_new_now_local();
1201 
1202   g_message("MyDumper backup version: %s", VERSION);
1203 
1204   time_t t;
1205   time(&t);
1206   localtime_r(&t, &tval);
1207 
1208   // rows chunks have precedence over chunk_filesize
1209   if (rows_per_file > 0 && chunk_filesize > 0) {
1210     chunk_filesize = 0;
1211     g_warning("--chunk-filesize disabled by --rows option");
1212   }
1213 
1214   // until we have an unique option on lock types we need to ensure this
1215   if (no_locks || trx_consistency_only)
1216     less_locking = 0;
1217 
1218   /* savepoints workaround to avoid metadata locking issues
1219      doesnt work for chuncks */
1220   if (rows_per_file && use_savepoints) {
1221     use_savepoints = FALSE;
1222     g_warning("--use-savepoints disabled by --rows");
1223   }
1224 
1225   // clarify binlog coordinates with trx_consistency_only
1226   if (trx_consistency_only)
1227     g_warning("Using trx_consistency_only, binlog coordinates will not be "
1228               "accurate if you are writing to non transactional tables.");
1229 
1230   char *datetimestr;
1231 
1232   if (!output_directory_param){
1233     datetimestr=g_date_time_format(datetime,"\%Y\%m\%d-\%H\%M\%S");
1234     output_directory = g_strdup_printf("%s-%s", DIRECTORY, datetimestr);
1235     g_free(datetimestr);
1236   }else{
1237     output_directory=output_directory_param;
1238   }
1239   create_backup_dir(output_directory);
1240   if (daemon_mode) {
1241     pid_t pid, sid;
1242 
1243     pid = fork();
1244     if (pid < 0)
1245       exit(EXIT_FAILURE);
1246     else if (pid > 0)
1247       exit(EXIT_SUCCESS);
1248 
1249     umask(0037);
1250     sid = setsid();
1251 
1252     if (sid < 0)
1253       exit(EXIT_FAILURE);
1254 
1255     char *d_d;
1256     for (dump_number = 0; dump_number < snapshot_count; dump_number++) {
1257         d_d= g_strdup_printf("%s/%d", output_directory, dump_number);
1258         create_backup_dir(d_d);
1259         g_free(d_d);
1260     }
1261 
1262     GFile *last_dump = g_file_new_for_path(
1263         g_strdup_printf("%s/last_dump", output_directory)
1264     );
1265     GFileInfo *last_dump_i = g_file_query_info(
1266         last_dump,
1267         G_FILE_ATTRIBUTE_STANDARD_TYPE ","
1268         G_FILE_ATTRIBUTE_STANDARD_SYMLINK_TARGET,
1269         G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
1270         NULL, NULL
1271     );
1272     if (last_dump_i != NULL &&
1273         g_file_info_get_file_type(last_dump_i) == G_FILE_TYPE_SYMBOLIC_LINK) {
1274         dump_number = atoi(g_file_info_get_symlink_target(last_dump_i));
1275         if (dump_number >= snapshot_count-1) dump_number = 0;
1276         else dump_number++;
1277         g_object_unref(last_dump_i);
1278     } else {
1279         dump_number = 0;
1280     }
1281     g_object_unref(last_dump);
1282   }else{
1283     dump_directory = output_directory;
1284   }
1285   /* Give ourselves an array of engines to ignore */
1286   if (ignore_engines)
1287     ignore = g_strsplit(ignore_engines, ",", 0);
1288 
1289   /* Give ourselves an array of tables to dump */
1290   if (tables_list)
1291     tables = g_strsplit(tables_list, ",", 0);
1292 
1293   /* Process list of tables to omit if specified */
1294   if (tables_skiplist_file)
1295     read_tables_skiplist(tables_skiplist_file);
1296 
1297   if (daemon_mode) {
1298     GError *terror;
1299     start_scheduled_dump = g_async_queue_new();
1300     GThread *ethread =
1301         g_thread_create(exec_thread, GINT_TO_POINTER(1), FALSE, &terror);
1302     if (ethread == NULL) {
1303       g_critical("Could not create exec thread: %s", terror->message);
1304       g_error_free(terror);
1305       exit(EXIT_FAILURE);
1306     }
1307     // Run initial snapshot
1308     run_snapshot(NULL);
1309 #if GLIB_MINOR_VERSION < 14
1310     g_timeout_add(snapshot_interval * 60 * 1000, (GSourceFunc)run_snapshot,
1311                   NULL);
1312 #else
1313     g_timeout_add_seconds(snapshot_interval * 60, (GSourceFunc)run_snapshot,
1314                           NULL);
1315 #endif
1316     guint sigsource = g_unix_signal_add(SIGINT, sig_triggered, NULL);
1317     sigsource = g_unix_signal_add(SIGTERM, sig_triggered, NULL);
1318     m1 = g_main_loop_new(NULL, TRUE);
1319     g_main_loop_run(m1);
1320     g_source_remove(sigsource);
1321   } else {
1322     MYSQL *conn = create_main_connection();
1323     start_dump(conn);
1324   }
1325 
1326   // sleep(5);
1327   mysql_thread_end();
1328   mysql_library_end();
1329   g_free(output_directory);
1330   g_strfreev(ignore);
1331   g_strfreev(tables);
1332 
1333   if (logoutfile) {
1334     fclose(logoutfile);
1335   }
1336 
1337   exit(errors ? EXIT_FAILURE : EXIT_SUCCESS);
1338 }
1339 
1340 /* void initialize_session_variables(const gchar *group){
1341   gchar * group_variables=g_strdup_printf("%s_variables", group);
1342   if (set_session)
1343     g_string_set_size(set_session, 0);
1344   else
1345     set_session = g_string_new(NULL);
1346   GHashTable * set_session_hash=g_hash_table_new ( g_str_hash, g_str_equal );
1347   if (detected_server == SERVER_TYPE_MYSQL){
1348     g_hash_table_insert(set_session_hash,g_strdup("WAIT_TIMEOUT"),g_strdup("2147483"));
1349     g_hash_table_insert(set_session_hash,g_strdup("NET_WRITE_TIMEOUT"),g_strdup("2147483"));
1350   }
1351 //  get_set_session_from_key_file(set_session, group_variables, kf);
1352   load_hash_from_key_file(set_session_hash,group_variables);
1353   refresh_set_session_from_hash(set_session,set_session_hash);
1354 }*/
1355 
create_main_connection()1356 MYSQL *create_main_connection() {
1357   MYSQL *conn;
1358   conn = mysql_init(NULL);
1359 
1360   configure_connection(conn, "mydumper");
1361 
1362   if (!mysql_real_connect(conn, hostname, username, password, db, port,
1363                           socket_path, 0)) {
1364     g_critical("Error connecting to database: %s", mysql_error(conn));
1365     exit(EXIT_FAILURE);
1366   }
1367 
1368   detected_server = detect_server(conn);
1369   initialize_session_variables("mydumper",set_session, detected_server, defaults_file);
1370   execute_gstring(conn, set_session);
1371 //  if ((detected_server == SERVER_TYPE_MYSQL) &&
1372 //      mysql_query(conn, "SET SESSION wait_timeout = 2147483")) {
1373 //    g_warning("Failed to increase wait_timeout: %s", mysql_error(conn));
1374 //  }
1375 //  if ((detected_server == SERVER_TYPE_MYSQL) &&
1376 //      mysql_query(conn, "SET SESSION net_write_timeout = 2147483")) {
1377 //    g_warning("Failed to increase net_write_timeout: %s", mysql_error(conn));
1378 //  }
1379 
1380   switch (detected_server) {
1381   case SERVER_TYPE_MYSQL:
1382     g_message("Connected to a MySQL server");
1383     break;
1384   case SERVER_TYPE_DRIZZLE:
1385     g_message("Connected to a Drizzle server");
1386     break;
1387   case SERVER_TYPE_TIDB:
1388     g_message("Connected to a TiDB server");
1389     break;
1390   default:
1391     g_critical("Cannot detect server type");
1392     exit(EXIT_FAILURE);
1393     break;
1394   }
1395 
1396   return conn;
1397 }
1398 
exec_thread(void * data)1399 void *exec_thread(void *data) {
1400   (void)data;
1401 
1402   while (1) {
1403     g_async_queue_pop(start_scheduled_dump);
1404     MYSQL *conn = create_main_connection();
1405     char *dump_number_str=g_strdup_printf("%d",dump_number);
1406     dump_directory = g_build_path( output_directory, dump_number_str, NULL);
1407     g_free(dump_number_str);
1408     clear_dump_directory(dump_directory);
1409     start_dump(conn);
1410     // start_dump already closes mysql
1411     // mysql_close(conn);
1412     mysql_thread_end();
1413 
1414     // Don't switch the symlink on shutdown because the dump is probably
1415     // incomplete.
1416     if (!shutdown_triggered) {
1417       char *dump_symlink_source= g_strdup_printf("%d", dump_number);
1418       char *dump_symlink_dest =
1419           g_strdup_printf("%s/last_dump", output_directory);
1420 
1421       // We don't care if this fails
1422       g_unlink(dump_symlink_dest);
1423 
1424       if (symlink(dump_symlink_source, dump_symlink_dest) == -1) {
1425         g_critical("error setting last good dump symlink %s, %d",
1426                    dump_symlink_dest, errno);
1427       }
1428       g_free(dump_symlink_dest);
1429 
1430       if (dump_number >= snapshot_count-1) dump_number = 0;
1431       else dump_number++;
1432     }
1433   }
1434   return NULL;
1435 }
1436 
dump_metadata(struct db_table * dbt)1437 void dump_metadata(struct db_table * dbt){
1438   char *filename = build_meta_filename(dbt->database->filename, dbt->table_filename, "metadata");
1439   FILE *table_meta = g_fopen(filename, "w");
1440   if (!table_meta) {
1441     g_critical("Couldn't write table metadata file %s (%d)", filename, errno);
1442     exit(EXIT_FAILURE);
1443   }
1444   fprintf(table_meta, "%d", dbt->rows);
1445   if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
1446   fclose(table_meta);
1447 }
1448 
start_dump(MYSQL * conn)1449 void start_dump(MYSQL *conn) {
1450   struct configuration conf = {1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0};
1451   char *p;
1452   char *p2;
1453   char *p3;
1454   char *u;
1455 
1456   guint64 nits[num_threads];
1457   GList *nitl[num_threads];
1458   int tn = 0;
1459   guint64 min = 0;
1460   struct db_table *dbt=NULL;
1461   struct schema_post *sp;
1462   guint n;
1463   FILE *nufile = NULL;
1464   guint have_backup_locks = 0;
1465 
1466   for (n = 0; n < num_threads; n++) {
1467     nits[n] = 0;
1468     nitl[n] = NULL;
1469   }
1470   if (ignore_generated_fields)
1471     g_warning("Queries related to generated fields are not going to be executed. It will lead to restoration issues if you have generated columns");
1472 
1473   p = g_strdup_printf("%s/metadata.partial", dump_directory);
1474   p2 = g_strndup(p, (unsigned)strlen(p) - 8);
1475 
1476   FILE *mdfile = g_fopen(p, "w");
1477   if (!mdfile) {
1478     g_critical("Couldn't write metadata file %s (%d)", p, errno);
1479     exit(EXIT_FAILURE);
1480   }
1481 
1482   if (updated_since > 0) {
1483     u = g_strdup_printf("%s/not_updated_tables", dump_directory);
1484     nufile = g_fopen(u, "w");
1485     if (!nufile) {
1486       g_critical("Couldn't write not_updated_tables file (%d)", errno);
1487       exit(EXIT_FAILURE);
1488     }
1489     get_not_updated(conn, nufile);
1490   }
1491 
1492   /* We check SHOW PROCESSLIST, and if there're queries
1493      larger than preset value, we terminate the process.
1494 
1495      This avoids stalling whole server with flush */
1496 
1497   if (!no_locks) {
1498 
1499     while (TRUE) {
1500       int longquery_count = 0;
1501       if (mysql_query(conn, "SHOW PROCESSLIST")) {
1502         g_warning("Could not check PROCESSLIST, no long query guard enabled: %s",
1503                   mysql_error(conn));
1504         break;
1505       } else {
1506        MYSQL_RES *res = mysql_store_result(conn);
1507         MYSQL_ROW row;
1508 
1509         /* Just in case PROCESSLIST output column order changes */
1510         MYSQL_FIELD *fields = mysql_fetch_fields(res);
1511         guint i;
1512         int tcol = -1, ccol = -1, icol = -1, ucol = -1;
1513         for (i = 0; i < mysql_num_fields(res); i++) {
1514         if (!strcasecmp(fields[i].name, "Command"))
1515             ccol = i;
1516           else if (!strcasecmp(fields[i].name, "Time"))
1517             tcol = i;
1518           else if (!strcasecmp(fields[i].name, "Id"))
1519             icol = i;
1520           else if (!strcasecmp(fields[i].name, "User"))
1521             ucol = i;
1522         }
1523         if ((tcol < 0) || (ccol < 0) || (icol < 0)) {
1524           g_critical("Error obtaining information from processlist");
1525           exit(EXIT_FAILURE);
1526         }
1527         while ((row = mysql_fetch_row(res))) {
1528           if (row[ccol] && strcmp(row[ccol], "Query"))
1529             continue;
1530           if (row[ucol] && !strcmp(row[ucol], "system user"))
1531             continue;
1532           if (row[tcol] && atoi(row[tcol]) > longquery) {
1533             if (killqueries) {
1534               if (mysql_query(conn,
1535                               p3 = g_strdup_printf("KILL %lu", atol(row[icol])))) {
1536                 g_warning("Could not KILL slow query: %s", mysql_error(conn));
1537                 longquery_count++;
1538               } else {
1539                 g_warning("Killed a query that was running for %ss", row[tcol]);
1540               }
1541               g_free(p3);
1542             } else {
1543               longquery_count++;
1544             }
1545           }
1546         }
1547         mysql_free_result(res);
1548         if (longquery_count == 0)
1549           break;
1550         else {
1551           if (longquery_retries == 0) {
1552             g_critical("There are queries in PROCESSLIST running longer than "
1553                        "%us, aborting dump,\n\t"
1554                        "use --long-query-guard to change the guard value, kill "
1555                        "queries (--kill-long-queries) or use \n\tdifferent "
1556                        "server for dump",
1557                        longquery);
1558             exit(EXIT_FAILURE);
1559           }
1560           longquery_retries--;
1561           g_warning("There are queries in PROCESSLIST running longer than "
1562                          "%us, retrying in %u seconds (%u left).",
1563                          longquery, longquery_retry_interval, longquery_retries);
1564           sleep(longquery_retry_interval);
1565         }
1566       }
1567     }
1568   }
1569 
1570   if (!no_locks && (detected_server != SERVER_TYPE_TIDB)) {
1571     // Percona Server 8 removed LOCK BINLOG so backup locks is useless for
1572     // mydumper now and we need to fail back to FTWRL
1573     mysql_query(conn, "SELECT @@version_comment, @@version");
1574     MYSQL_RES *res2 = mysql_store_result(conn);
1575     MYSQL_ROW ver;
1576     while ((ver = mysql_fetch_row(res2))) {
1577       if (g_str_has_prefix(ver[0], "Percona") &&
1578           g_str_has_prefix(ver[1], "8.")) {
1579         g_message("Disabling Percona Backup Locks for Percona Server 8");
1580         no_backup_locks = 1;
1581       }
1582     }
1583     mysql_free_result(res2);
1584 
1585     // Percona Backup Locks
1586     if (!no_backup_locks) {
1587       mysql_query(conn, "SELECT @@have_backup_locks");
1588       MYSQL_RES *rest = mysql_store_result(conn);
1589       if (rest != NULL && mysql_num_rows(rest)) {
1590         mysql_free_result(rest);
1591         g_message("Using Percona Backup Locks");
1592         have_backup_locks = 1;
1593       }
1594     }
1595 
1596     if (have_backup_locks) {
1597       if (mysql_query(conn, "LOCK TABLES FOR BACKUP")) {
1598         g_critical("Couldn't acquire LOCK TABLES FOR BACKUP, snapshots will "
1599                    "not be consistent: %s",
1600                    mysql_error(conn));
1601         errors++;
1602       }
1603 
1604       if (mysql_query(conn, "LOCK BINLOG FOR BACKUP")) {
1605         g_critical("Couldn't acquire LOCK BINLOG FOR BACKUP, snapshots will "
1606                    "not be consistent: %s",
1607                    mysql_error(conn));
1608         errors++;
1609       }
1610     } else if (lock_all_tables) {
1611       // LOCK ALL TABLES
1612       GString *query = g_string_sized_new(16777216);
1613       gchar *dbtb = NULL;
1614       gchar **dt = NULL;
1615       GList *tables_lock = NULL;
1616       GList *iter = NULL;
1617       guint success = 0;
1618       guint retry = 0;
1619       guint lock = 1;
1620       int i = 0;
1621 
1622       if (db) {
1623         g_string_printf(
1624             query,
1625             "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES "
1626             "WHERE TABLE_SCHEMA = '%s' AND TABLE_TYPE ='BASE TABLE' AND NOT "
1627             "(TABLE_SCHEMA = 'mysql' AND (TABLE_NAME = 'slow_log' OR "
1628             "TABLE_NAME = 'general_log'))",
1629             db);
1630       } else if (tables) {
1631         for (i = 0; tables[i] != NULL; i++) {
1632           dt = g_strsplit(tables[i], ".", 0);
1633           dbtb = g_strdup_printf("`%s`.`%s`", dt[0], dt[1]);
1634           tables_lock = g_list_prepend(tables_lock, dbtb);
1635         }
1636         tables_lock = g_list_reverse(tables_lock);
1637       } else {
1638         g_string_printf(
1639             query,
1640             "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES "
1641             "WHERE TABLE_TYPE ='BASE TABLE' AND TABLE_SCHEMA NOT IN "
1642             "('information_schema', 'performance_schema', 'data_dictionary') "
1643             "AND NOT (TABLE_SCHEMA = 'mysql' AND (TABLE_NAME = 'slow_log' OR "
1644             "TABLE_NAME = 'general_log'))");
1645       }
1646 
1647       if (tables_lock == NULL) {
1648         if (mysql_query(conn, query->str)) {
1649           g_critical("Couldn't get table list for lock all tables: %s",
1650                      mysql_error(conn));
1651           errors++;
1652         } else {
1653           MYSQL_RES *res = mysql_store_result(conn);
1654           MYSQL_ROW row;
1655 
1656           while ((row = mysql_fetch_row(res))) {
1657             lock = 1;
1658             if (tables) {
1659               int table_found = 0;
1660               for (i = 0; tables[i] != NULL; i++)
1661                 if (g_ascii_strcasecmp(tables[i], row[1]) == 0)
1662                   table_found = 1;
1663               if (!table_found)
1664                 lock = 0;
1665             }
1666             if (lock && tables_skiplist_file && check_skiplist(row[0], row[1]))
1667               continue;
1668             if (lock && regexstring && !check_regex(row[0], row[1]))
1669               continue;
1670 
1671             if (lock) {
1672               dbtb = g_strdup_printf("`%s`.`%s`", row[0], row[1]);
1673               tables_lock = g_list_prepend(tables_lock, dbtb);
1674             }
1675           }
1676           tables_lock = g_list_reverse(tables_lock);
1677         }
1678       }
1679 
1680       // Try three times to get the lock, this is in case of tmp tables
1681       // disappearing
1682       while (!success && retry < 4) {
1683         n = 0;
1684         for (iter = tables_lock; iter != NULL; iter = iter->next) {
1685           if (n == 0) {
1686             g_string_printf(query, "LOCK TABLE %s READ", (char *)iter->data);
1687             n = 1;
1688           } else {
1689             g_string_append_printf(query, ", %s READ", (char *)iter->data);
1690           }
1691         }
1692         if (mysql_query(conn, query->str)) {
1693           gchar *failed_table = NULL;
1694           gchar **tmp_fail;
1695 
1696           tmp_fail = g_strsplit(mysql_error(conn), "'", 0);
1697           tmp_fail = g_strsplit(tmp_fail[1], ".", 0);
1698           failed_table = g_strdup_printf("`%s`.`%s`", tmp_fail[0], tmp_fail[1]);
1699           for (iter = tables_lock; iter != NULL; iter = iter->next) {
1700             if (strcmp(iter->data, failed_table) == 0) {
1701               tables_lock = g_list_remove(tables_lock, iter->data);
1702             }
1703           }
1704           g_free(tmp_fail);
1705           g_free(failed_table);
1706         } else {
1707           success = 1;
1708         }
1709         retry += 1;
1710       }
1711       if (!success) {
1712         g_critical("Lock all tables fail: %s", mysql_error(conn));
1713         exit(EXIT_FAILURE);
1714       }
1715       g_free(query->str);
1716       g_list_free(tables_lock);
1717     } else {
1718       if (mysql_query(conn, "FLUSH TABLES WITH READ LOCK")) {
1719         g_critical("Couldn't acquire global lock, snapshots will not be "
1720                    "consistent: %s",
1721                    mysql_error(conn));
1722         errors++;
1723       }
1724     }
1725   } else if (detected_server == SERVER_TYPE_TIDB) {
1726     g_message("Skipping locks because of TiDB");
1727     if (!tidb_snapshot) {
1728 
1729       // Generate a @@tidb_snapshot to use for the worker threads since
1730       // the tidb-snapshot argument was not specified when starting mydumper
1731 
1732       if (mysql_query(conn, "SHOW MASTER STATUS")) {
1733         g_critical("Couldn't generate @@tidb_snapshot: %s", mysql_error(conn));
1734         exit(EXIT_FAILURE);
1735       } else {
1736 
1737         MYSQL_RES *result = mysql_store_result(conn);
1738         MYSQL_ROW row = mysql_fetch_row(
1739             result); /* There should never be more than one row */
1740         tidb_snapshot = g_strdup(row[1]);
1741         mysql_free_result(result);
1742       }
1743     }
1744 
1745     // Need to set the @@tidb_snapshot for the master thread
1746     gchar *query =
1747         g_strdup_printf("SET SESSION tidb_snapshot = '%s'", tidb_snapshot);
1748 
1749     g_message("Set to tidb_snapshot '%s'", tidb_snapshot);
1750 
1751     if (mysql_query(conn, query)) {
1752       g_critical("Failed to set tidb_snapshot: %s", mysql_error(conn));
1753       exit(EXIT_FAILURE);
1754     }
1755     g_free(query);
1756 
1757   } else {
1758     g_warning("Executing in no-locks mode, snapshot will not be consistent");
1759   }
1760   if (mysql_get_server_version(conn) < 40108) {
1761     mysql_query(
1762         conn,
1763         "CREATE TABLE IF NOT EXISTS mysql.mydumperdummy (a INT) ENGINE=INNODB");
1764     need_dummy_read = 1;
1765   }
1766 
1767   // tokudb do not support consistent snapshot
1768   mysql_query(conn, "SELECT @@tokudb_version");
1769   MYSQL_RES *rest = mysql_store_result(conn);
1770   if (rest != NULL && mysql_num_rows(rest)) {
1771     mysql_free_result(rest);
1772     g_message("TokuDB detected, creating dummy table for CS");
1773     mysql_query(
1774         conn,
1775         "CREATE TABLE IF NOT EXISTS mysql.tokudbdummy (a INT) ENGINE=TokuDB");
1776     need_dummy_toku_read = 1;
1777   }
1778 
1779   // Do not start a transaction when lock all tables instead of FTWRL,
1780   // since it can implicitly release read locks we hold
1781   if (!lock_all_tables) {
1782     mysql_query(conn, "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */");
1783   }
1784 
1785   if (need_dummy_read) {
1786     mysql_query(conn,
1787                 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy");
1788     MYSQL_RES *res = mysql_store_result(conn);
1789     if (res)
1790       mysql_free_result(res);
1791   }
1792   if (need_dummy_toku_read) {
1793     mysql_query(conn,
1794                 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.tokudbdummy");
1795     MYSQL_RES *res = mysql_store_result(conn);
1796     if (res)
1797       mysql_free_result(res);
1798   }
1799   GDateTime *datetime = g_date_time_new_now_local();
1800   char *datetimestr=g_date_time_format(datetime,"\%Y-\%m-\%d \%H:\%M:\%S");
1801   fprintf(mdfile, "Started dump at: %s\n", datetimestr);
1802 
1803   g_message("Started dump at: %s", datetimestr);
1804   g_free(datetimestr);
1805 
1806   if (detected_server == SERVER_TYPE_MYSQL) {
1807 				mysql_query(conn, set_names_str);
1808 
1809     write_snapshot_info(conn, mdfile);
1810   }
1811   GThread *stream_thread = NULL;
1812   if (stream){
1813     stream_queue = g_async_queue_new();
1814     stream_thread = g_thread_create((GThreadFunc)process_stream, stream_queue, TRUE, NULL);
1815   }
1816   GThread **threads = g_new(GThread *, num_threads * (less_locking + 1));
1817   struct thread_data *td =
1818       g_new(struct thread_data, num_threads * (less_locking + 1));
1819 
1820   if (less_locking) {
1821     conf.queue_less_locking = g_async_queue_new();
1822     conf.ready_less_locking = g_async_queue_new();
1823     less_locking_threads = num_threads;
1824     for (n = num_threads; n < num_threads * 2; n++) {
1825       td[n].conf = &conf;
1826       td[n].thread_id = n + 1;
1827       td[n].queue = conf.queue_less_locking;
1828       td[n].ready = conf.ready_less_locking;
1829       td[n].less_locking_stage = TRUE;
1830       threads[n] = g_thread_create((GThreadFunc)process_queue,
1831                                    &td[n], TRUE, NULL);
1832       g_async_queue_pop(conf.ready_less_locking);
1833     }
1834     g_async_queue_unref(conf.ready_less_locking);
1835   }
1836 
1837   conf.queue = g_async_queue_new();
1838   conf.ready = g_async_queue_new();
1839   conf.unlock_tables = g_async_queue_new();
1840   conf.ready_database_dump = g_async_queue_new();
1841 
1842   for (n = 0; n < num_threads; n++) {
1843     td[n].conf = &conf;
1844     td[n].thread_id = n + 1;
1845     td[n].queue = conf.queue;
1846     td[n].ready = conf.ready;
1847     td[n].less_locking_stage = FALSE;
1848     threads[n] =
1849         g_thread_create((GThreadFunc)process_queue, &td[n], TRUE, NULL);
1850     g_async_queue_pop(conf.ready);
1851   }
1852 
1853   g_async_queue_unref(conf.ready);
1854 
1855   if (trx_consistency_only) {
1856     g_message("Transactions started, unlocking tables");
1857     mysql_query(conn, "UNLOCK TABLES /* trx-only */");
1858     if (have_backup_locks)
1859       mysql_query(conn, "UNLOCK BINLOG");
1860   }
1861 
1862   if (db) {
1863     dump_database(new_database(conn,db,TRUE), &conf);
1864     if (!no_schemas)
1865       dump_create_database(db, &conf);
1866   } else if (tables) {
1867     get_tables(conn, &conf);
1868   } else {
1869     MYSQL_RES *databases;
1870     MYSQL_ROW row;
1871     if (mysql_query(conn, "SHOW DATABASES") ||
1872         !(databases = mysql_store_result(conn))) {
1873       g_critical("Unable to list databases: %s", mysql_error(conn));
1874       exit(EXIT_FAILURE);
1875     }
1876 
1877     while ((row = mysql_fetch_row(databases))) {
1878       if (!strcasecmp(row[0], "information_schema") ||
1879           !strcasecmp(row[0], "performance_schema") ||
1880           (!strcasecmp(row[0], "data_dictionary")))
1881         continue;
1882       struct database * db_tmp=NULL;
1883       if (get_database(conn,row[0],&db_tmp) && !no_schemas && (regexstring == NULL || check_regex(row[0], NULL))){
1884         g_mutex_lock(db_tmp->ad_mutex);
1885         if (!db_tmp->already_dumped){
1886           dump_create_database(db_tmp->name, &conf);
1887           db_tmp->already_dumped=TRUE;
1888         }
1889         g_mutex_unlock(db_tmp->ad_mutex);
1890       }
1891       dump_database(db_tmp, &conf);
1892       /* Checks PCRE expressions on 'database' string */
1893 //      if (!no_schemas && (regexstring == NULL || check_regex(row[0], NULL))){
1894 //        dump_create_database(row[0], &conf);
1895 //      }
1896     }
1897     mysql_free_result(databases);
1898   }
1899   g_async_queue_pop(conf.ready_database_dump);
1900   g_async_queue_unref(conf.ready_database_dump);
1901   g_list_free(no_updated_tables);
1902 
1903   if (!non_innodb_table) {
1904     g_async_queue_push(conf.unlock_tables, GINT_TO_POINTER(1));
1905   }
1906 
1907   GList *iter;
1908   table_schemas = g_list_reverse(table_schemas);
1909   for (iter = table_schemas; iter != NULL; iter = iter->next) {
1910     dbt = (struct db_table *)iter->data;
1911     dump_schema(conn, dbt, &conf);
1912   }
1913 
1914   non_innodb_table = g_list_reverse(non_innodb_table);
1915   if (less_locking) {
1916 
1917     for (iter = non_innodb_table; iter != NULL; iter = iter->next) {
1918       dbt = (struct db_table *)iter->data;
1919       tn = 0;
1920       min = nits[0];
1921       for (n = 1; n < num_threads; n++) {
1922         if (nits[n] < min) {
1923           min = nits[n];
1924           tn = n;
1925         }
1926       }
1927       nitl[tn] = g_list_prepend(nitl[tn], dbt);
1928       nits[tn] += dbt->datalength;
1929     }
1930     nitl[tn] = g_list_reverse(nitl[tn]);
1931 
1932     for (n = 0; n < num_threads; n++) {
1933       if (nits[n] > 0) {
1934         g_atomic_int_inc(&non_innodb_table_counter);
1935         dump_tables(conn, nitl[n], &conf);
1936         g_list_free(nitl[n]);
1937       }
1938     }
1939     g_list_free(non_innodb_table);
1940 
1941     if (g_atomic_int_get(&non_innodb_table_counter))
1942       g_atomic_int_inc(&non_innodb_done);
1943     else
1944       g_async_queue_push(conf.unlock_tables, GINT_TO_POINTER(1));
1945 
1946     for (n = 0; n < num_threads; n++) {
1947       struct job *j = g_new0(struct job, 1);
1948       j->type = JOB_SHUTDOWN;
1949       g_async_queue_push(conf.queue_less_locking, j);
1950     }
1951   } else {
1952     for (iter = non_innodb_table; iter != NULL; iter = iter->next) {
1953       dbt = (struct db_table *)iter->data;
1954       if (dump_checksums) {
1955         dump_checksum(dbt, &conf);
1956       }
1957       dump_table(conn, dbt, &conf, FALSE);
1958       g_atomic_int_inc(&non_innodb_table_counter);
1959     }
1960     g_list_free(non_innodb_table);
1961     g_atomic_int_inc(&non_innodb_done);
1962   }
1963 
1964   innodb_tables = g_list_reverse(innodb_tables);
1965   for (iter = innodb_tables; iter != NULL; iter = iter->next) {
1966     dbt = (struct db_table *)iter->data;
1967     if (dump_checksums) {
1968       dump_checksum(dbt, &conf);
1969     }
1970     dump_table(conn, dbt, &conf, TRUE);
1971   }
1972   g_list_free(innodb_tables);
1973   innodb_tables=NULL;
1974 
1975 /*  table_schemas = g_list_reverse(table_schemas);
1976   for (iter = table_schemas; iter != NULL; iter = iter->next) {
1977     dbt = (struct db_table *)iter->data;
1978     dump_schema(conn, dbt, &conf);
1979   }*/
1980 
1981   view_schemas = g_list_reverse(view_schemas);
1982   for (iter = view_schemas; iter != NULL; iter = iter->next) {
1983     dbt = (struct db_table *)iter->data;
1984     dump_view(dbt, &conf);
1985     g_free(dbt->table);
1986     g_free(dbt);
1987   }
1988   g_list_free(view_schemas);
1989   view_schemas=NULL;
1990 
1991   schema_post = g_list_reverse(schema_post);
1992   for (iter = schema_post; iter != NULL; iter = iter->next) {
1993     sp = (struct schema_post *)iter->data;
1994     dump_schema_post(sp->database, &conf);
1995     g_free(sp);
1996   }
1997   g_list_free(schema_post);
1998   schema_post=NULL;
1999 
2000   if (!no_locks && !trx_consistency_only) {
2001     g_async_queue_pop(conf.unlock_tables);
2002     g_message("Non-InnoDB dump complete, unlocking tables");
2003     mysql_query(conn, "UNLOCK TABLES /* FTWRL */");
2004     if (have_backup_locks)
2005       mysql_query(conn, "UNLOCK BINLOG");
2006   }
2007   // close main connection
2008   mysql_close(conn);
2009 
2010   if (less_locking) {
2011     for (n = num_threads; n < num_threads * 2; n++) {
2012       g_thread_join(threads[n]);
2013     }
2014     g_async_queue_unref(conf.queue_less_locking);
2015   }
2016 
2017   for (n = 0; n < num_threads; n++) {
2018     struct job *j = g_new0(struct job, 1);
2019     j->type = JOB_SHUTDOWN;
2020     g_async_queue_push(conf.queue, j);
2021   }
2022 
2023   for (n = 0; n < num_threads; n++) {
2024     g_thread_join(threads[n]);
2025   }
2026 
2027   table_schemas = g_list_reverse(table_schemas);
2028   for (iter = table_schemas; iter != NULL; iter = iter->next) {
2029     dbt = (struct db_table *)iter->data;
2030     dump_metadata(dbt);
2031   }
2032   g_list_free(table_schemas);
2033   table_schemas=NULL;
2034 
2035   g_async_queue_unref(conf.queue);
2036   g_async_queue_unref(conf.unlock_tables);
2037 
2038   datetime = g_date_time_new_now_local();
2039   datetimestr=g_date_time_format(datetime,"\%Y-\%m-\%d \%H:\%M:\%S");
2040   fprintf(mdfile, "Finished dump at: %s\n", datetimestr);
2041   fclose(mdfile);
2042   if (updated_since > 0)
2043     fclose(nufile);
2044   g_rename(p, p2);
2045   if (stream) {
2046     g_async_queue_push(stream_queue, g_strdup(p2));
2047   }
2048   g_free(p);
2049   g_free(p2);
2050   g_message("Finished dump at: %s",datetimestr);
2051   g_free(datetimestr);
2052 
2053   if (stream) {
2054     g_async_queue_push(stream_queue, g_strdup(""));
2055     g_thread_join(stream_thread);
2056     if (no_delete == FALSE && output_directory_param == NULL)
2057       if (g_rmdir(output_directory) != 0)
2058         g_critical("Backup directory not removed: %s", output_directory);
2059   }
2060   g_free(td);
2061   g_free(threads);
2062 }
2063 
dump_create_database(char * database,struct configuration * conf)2064 void dump_create_database(char *database, struct configuration *conf) {
2065   struct job *j = g_new0(struct job, 1);
2066   struct create_database_job *cdj = g_new0(struct create_database_job, 1);
2067   j->job_data = (void *)cdj;
2068   gchar *d=get_ref_table(database);
2069   cdj->database = g_strdup(database);
2070   j->conf = conf;
2071   j->type = JOB_CREATE_DATABASE;
2072 
2073   cdj->filename = build_schema_filename(d, "schema-create");
2074 
2075   g_async_queue_push(conf->queue, j);
2076   return;
2077 }
2078 
dump_create_database_data(MYSQL * conn,char * database,char * filename)2079 void dump_create_database_data(MYSQL *conn, char *database, char *filename) {
2080   void *outfile = NULL;
2081   char *query = NULL;
2082   MYSQL_RES *result = NULL;
2083   MYSQL_ROW row;
2084 
2085   outfile = m_open(filename,"w");
2086 
2087   if (!outfile) {
2088     g_critical("Error: DB: %s Could not create output file %s (%d)", database,
2089                filename, errno);
2090     errors++;
2091     return;
2092   }
2093 
2094   GString *statement = g_string_sized_new(statement_size);
2095 
2096   query = g_strdup_printf("SHOW CREATE DATABASE IF NOT EXISTS `%s`", database);
2097   if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
2098     if (success_on_1146 && mysql_errno(conn) == 1146) {
2099       g_warning("Error dumping create database (%s): %s", database,
2100                 mysql_error(conn));
2101     } else {
2102       g_critical("Error dumping create database (%s): %s", database,
2103                  mysql_error(conn));
2104       errors++;
2105     }
2106     g_free(query);
2107     return;
2108   }
2109 
2110   /* There should never be more than one row */
2111   row = mysql_fetch_row(result);
2112   g_string_append(statement, row[1]);
2113   g_string_append(statement, ";\n");
2114   if (!write_data((FILE *)outfile, statement)) {
2115     g_critical("Could not write create database for %s", database);
2116     errors++;
2117   }
2118   g_free(query);
2119 
2120   m_close(outfile);
2121   if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
2122   g_string_free(statement, TRUE);
2123   if (result)
2124     mysql_free_result(result);
2125 
2126   return;
2127 }
2128 
get_not_updated(MYSQL * conn,FILE * file)2129 void get_not_updated(MYSQL *conn, FILE *file) {
2130   MYSQL_RES *res = NULL;
2131   MYSQL_ROW row;
2132 
2133   gchar *query =
2134       g_strdup_printf("SELECT CONCAT(TABLE_SCHEMA,'.',TABLE_NAME) FROM "
2135                       "information_schema.TABLES WHERE TABLE_TYPE = 'BASE "
2136                       "TABLE' AND UPDATE_TIME < NOW() - INTERVAL %d DAY",
2137                       updated_since);
2138   mysql_query(conn, query);
2139   g_free(query);
2140 
2141   res = mysql_store_result(conn);
2142   while ((row = mysql_fetch_row(res))) {
2143     no_updated_tables = g_list_prepend(no_updated_tables, row[0]);
2144     fprintf(file, "%s\n", row[0]);
2145   }
2146   no_updated_tables = g_list_reverse(no_updated_tables);
2147   fflush(file);
2148 }
2149 
detect_generated_fields(MYSQL * conn,struct db_table * dbt)2150 gboolean detect_generated_fields(MYSQL *conn, struct db_table *dbt) {
2151   MYSQL_RES *res = NULL;
2152   MYSQL_ROW row;
2153 
2154   gboolean result = FALSE;
2155   if (ignore_generated_fields)
2156     return FALSE;
2157 
2158   gchar *query = g_strdup_printf(
2159       "select COLUMN_NAME from information_schema.COLUMNS where "
2160       "TABLE_SCHEMA='%s' and TABLE_NAME='%s' and extra like '%%GENERATED%%' and extra not like '%%DEFAULT_GENERATED%%'",
2161       dbt->database->escaped, dbt->escaped_table);
2162 
2163   mysql_query(conn, query);
2164   g_free(query);
2165 
2166   res = mysql_store_result(conn);
2167   if (res == NULL){
2168   	return FALSE;
2169   }
2170 
2171   if ((row = mysql_fetch_row(res))) {
2172     result = TRUE;
2173   }
2174   mysql_free_result(res);
2175 
2176   return result;
2177 }
2178 
get_insertable_fields(MYSQL * conn,char * database,char * table)2179 GString *get_insertable_fields(MYSQL *conn, char *database, char *table) {
2180   MYSQL_RES *res = NULL;
2181   MYSQL_ROW row;
2182 
2183   GString *field_list = g_string_new("");
2184 
2185   gchar *query =
2186       g_strdup_printf("select COLUMN_NAME from information_schema.COLUMNS "
2187                       "where TABLE_SCHEMA='%s' and TABLE_NAME='%s' and extra "
2188                       "not like '%%VIRTUAL GENERATED%%' and extra not like '%%STORED GENERATED%%'",
2189                       database, table);
2190   mysql_query(conn, query);
2191   g_free(query);
2192 
2193   res = mysql_store_result(conn);
2194   gboolean first = TRUE;
2195   while ((row = mysql_fetch_row(res))) {
2196     if (first) {
2197       first = FALSE;
2198     } else {
2199       g_string_append(field_list, ",");
2200     }
2201 
2202     gchar *tb = g_strdup_printf("`%s`", row[0]);
2203     g_string_append(field_list, tb);
2204     g_free(tb);
2205   }
2206   mysql_free_result(res);
2207 
2208   return field_list;
2209 }
2210 
get_primary_key_string(MYSQL * conn,char * database,char * table)2211 gchar *get_primary_key_string(MYSQL *conn, char *database, char *table) {
2212   MYSQL_RES *res = NULL;
2213   MYSQL_ROW row;
2214 
2215   GString *field_list = g_string_new("");
2216 
2217   gchar *query =
2218           g_strdup_printf("SELECT k.COLUMN_NAME, ORDINAL_POSITION "
2219                           "FROM information_schema.table_constraints t "
2220                           "LEFT JOIN information_schema.key_column_usage k "
2221                           "USING(constraint_name,table_schema,table_name) "
2222                           "WHERE t.constraint_type IN ('PRIMARY KEY', 'UNIQUE') "
2223                           "AND t.table_schema='%s' "
2224                           "AND t.table_name='%s' "
2225                           "ORDER BY t.constraint_type, ORDINAL_POSITION; ",
2226                           database, table);
2227   mysql_query(conn, query);
2228   g_free(query);
2229 
2230   res = mysql_store_result(conn);
2231   gboolean first = TRUE;
2232   while ((row = mysql_fetch_row(res))) {
2233     if (first) {
2234       first = FALSE;
2235     } else if (atoi(row[1]) > 1) {
2236       g_string_append(field_list, ",");
2237     } else {
2238       break;
2239     }
2240 
2241     gchar *tb = g_strdup_printf("`%s`", row[0]);
2242     g_string_append(field_list, tb);
2243     g_free(tb);
2244   }
2245   mysql_free_result(res);
2246   // Return NULL if we never found a PRIMARY or UNIQUE key
2247   if (first) {
2248     g_string_free(field_list, TRUE);
2249     return NULL;
2250   } else {
2251     gchar *order_string = g_string_free(field_list, FALSE);
2252     return order_string;
2253   }
2254 }
2255 
2256 /* Heuristic chunks building - based on estimates, produces list of ranges for
2257    datadumping WORK IN PROGRESS
2258 */
get_chunks_for_table(MYSQL * conn,char * database,char * table,struct configuration * conf)2259 GList *get_chunks_for_table(MYSQL *conn, char *database, char *table,
2260                             struct configuration *conf) {
2261 
2262   GList *chunks = NULL;
2263   MYSQL_RES *indexes = NULL, *minmax = NULL, *total = NULL;
2264   MYSQL_ROW row;
2265   char *field = NULL;
2266   int showed_nulls = 0;
2267 
2268   /* first have to pick index, in future should be able to preset in
2269    * configuration too */
2270   gchar *query = g_strdup_printf("SHOW INDEX FROM `%s`.`%s`", database, table);
2271   mysql_query(conn, query);
2272   g_free(query);
2273   indexes = mysql_store_result(conn);
2274 
2275   if (indexes){
2276     while ((row = mysql_fetch_row(indexes))) {
2277       if (!strcmp(row[2], "PRIMARY") && (!strcmp(row[3], "1"))) {
2278         /* Pick first column in PK, cardinality doesn't matter */
2279         field = row[4];
2280         break;
2281       }
2282     }
2283 
2284     /* If no PK found, try using first UNIQUE index */
2285     if (!field) {
2286       mysql_data_seek(indexes, 0);
2287       while ((row = mysql_fetch_row(indexes))) {
2288         if (!strcmp(row[1], "0") && (!strcmp(row[3], "1"))) {
2289           /* Again, first column of any unique index */
2290           field = row[4];
2291           break;
2292         }
2293       }
2294     }
2295     /* Still unlucky? Pick any high-cardinality index */
2296     if (!field && conf->use_any_index) {
2297       guint64 max_cardinality = 0;
2298       guint64 cardinality = 0;
2299 
2300       mysql_data_seek(indexes, 0);
2301       while ((row = mysql_fetch_row(indexes))) {
2302         if (!strcmp(row[3], "1")) {
2303           if (row[6])
2304             cardinality = strtoul(row[6], NULL, 10);
2305           if (cardinality > max_cardinality) {
2306             field = row[4];
2307             max_cardinality = cardinality;
2308           }
2309         }
2310       }
2311     }
2312   }
2313   /* Oh well, no chunks today - no suitable index */
2314   if (!field)
2315     goto cleanup;
2316 
2317   /* Get minimum/maximum */
2318   mysql_query(conn, query = g_strdup_printf(
2319                         "SELECT %s MIN(`%s`),MAX(`%s`) FROM `%s`.`%s`",
2320                         (detected_server == SERVER_TYPE_MYSQL)
2321                             ? "/*!40001 SQL_NO_CACHE */"
2322                             : "",
2323                         field, field, database, table));
2324   g_free(query);
2325   minmax = mysql_store_result(conn);
2326 
2327   if (!minmax)
2328     goto cleanup;
2329 
2330   row = mysql_fetch_row(minmax);
2331   MYSQL_FIELD *fields = mysql_fetch_fields(minmax);
2332 
2333   /* Check if all values are NULL */
2334   if (row[0] == NULL)
2335     goto cleanup;
2336 
2337   char *min = row[0];
2338   char *max = row[1];
2339 
2340   guint64 estimated_chunks, estimated_step, nmin, nmax, cutoff, rows;
2341 
2342   /* Support just bigger INTs for now, very dumb, no verify approach */
2343   switch (fields[0].type) {
2344   case MYSQL_TYPE_LONG:
2345   case MYSQL_TYPE_LONGLONG:
2346   case MYSQL_TYPE_INT24:
2347   case MYSQL_TYPE_SHORT:
2348     /* Got total number of rows, skip chunk logic if estimates are low */
2349     rows = estimate_count(conn, database, table, field, min, max);
2350     if (rows <= rows_per_file)
2351       goto cleanup;
2352 
2353     /* This is estimate, not to use as guarantee! Every chunk would have eventual
2354      * adjustments */
2355     estimated_chunks = rows / rows_per_file;
2356     /* static stepping */
2357     nmin = strtoul(min, NULL, 10);
2358     nmax = strtoul(max, NULL, 10);
2359     estimated_step = (nmax - nmin) / estimated_chunks + 1;
2360     if (estimated_step > max_rows)
2361       estimated_step = max_rows;
2362     cutoff = nmin;
2363     while (cutoff <= nmax) {
2364       chunks = g_list_prepend(
2365           chunks,
2366           g_strdup_printf("%s%s%s%s(`%s` >= %llu AND `%s` < %llu)",
2367                           !showed_nulls ? "`" : "",
2368                           !showed_nulls ? field : "",
2369                           !showed_nulls ? "`" : "",
2370                           !showed_nulls ? " IS NULL OR " : "", field,
2371                           (unsigned long long)cutoff, field,
2372                           (unsigned long long)(cutoff + estimated_step)));
2373       cutoff += estimated_step;
2374       showed_nulls = 1;
2375     }
2376     chunks = g_list_reverse(chunks);
2377 // TODO: We need to add more chunk options for different types
2378   default:
2379     goto cleanup;
2380   }
2381 
2382 cleanup:
2383   if (indexes)
2384     mysql_free_result(indexes);
2385   if (minmax)
2386     mysql_free_result(minmax);
2387   if (total)
2388     mysql_free_result(total);
2389   return chunks;
2390 }
2391 
2392 /* Try to get EXPLAIN'ed estimates of row in resultset */
estimate_count(MYSQL * conn,char * database,char * table,char * field,char * from,char * to)2393 guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field,
2394                        char *from, char *to) {
2395   char *querybase, *query;
2396   int ret;
2397 
2398   g_assert(conn && database && table);
2399 
2400   querybase = g_strdup_printf("EXPLAIN SELECT `%s` FROM `%s`.`%s`",
2401                               (field ? field : "*"), database, table);
2402   if (from || to) {
2403     g_assert(field != NULL);
2404     char *fromclause = NULL, *toclause = NULL;
2405     char *escaped;
2406     if (from) {
2407       escaped = g_new(char, strlen(from) * 2 + 1);
2408       mysql_real_escape_string(conn, escaped, from, strlen(from));
2409       fromclause = g_strdup_printf(" `%s` >= %s ", field, escaped);
2410       g_free(escaped);
2411     }
2412     if (to) {
2413       escaped = g_new(char, strlen(to) * 2 + 1);
2414       mysql_real_escape_string(conn, escaped, to, strlen(to));
2415       toclause = g_strdup_printf(" `%s` <= %s", field, escaped);
2416       g_free(escaped);
2417     }
2418     query = g_strdup_printf("%s WHERE %s %s %s", querybase,
2419                             (from ? fromclause : ""),
2420                             ((from && to) ? "AND" : ""), (to ? toclause : ""));
2421 
2422     if (toclause)
2423       g_free(toclause);
2424     if (fromclause)
2425       g_free(fromclause);
2426     ret = mysql_query(conn, query);
2427     g_free(querybase);
2428     g_free(query);
2429   } else {
2430     ret = mysql_query(conn, querybase);
2431     g_free(querybase);
2432   }
2433 
2434   if (ret) {
2435     g_warning("Unable to get estimates for %s.%s: %s", database, table,
2436               mysql_error(conn));
2437   }
2438 
2439   MYSQL_RES *result = mysql_store_result(conn);
2440   MYSQL_FIELD *fields = mysql_fetch_fields(result);
2441 
2442   guint i;
2443   for (i = 0; i < mysql_num_fields(result); i++) {
2444     if (!strcmp(fields[i].name, "rows"))
2445       break;
2446   }
2447 
2448   MYSQL_ROW row = NULL;
2449 
2450   guint64 count = 0;
2451 
2452   if (result)
2453     row = mysql_fetch_row(result);
2454 
2455   if (row && row[i])
2456     count = strtoul(row[i], NULL, 10);
2457 
2458   if (result)
2459     mysql_free_result(result);
2460 
2461   return (count);
2462 }
2463 
old_create_backup_dir(char * new_directory)2464 void old_create_backup_dir(char *new_directory) {
2465   if (g_mkdir(new_directory, 0750) == -1) {
2466     if (errno != EEXIST) {
2467       g_critical("Unable to create `%s': %s", new_directory, g_strerror(errno));
2468       exit(EXIT_FAILURE);
2469     }
2470   }
2471 }
2472 
escape_string(MYSQL * conn,char * str)2473 char * escape_string(MYSQL *conn, char *str){
2474   char * r=g_new(char, strlen(str) * 2 + 1);
2475   mysql_real_escape_string(conn, r, str, strlen(str));
2476   return r;
2477 }
2478 
get_ref_table(gchar * k)2479 gchar *get_ref_table(gchar *k){
2480   g_mutex_lock(ref_table_mutex);
2481   gchar *val=g_hash_table_lookup(ref_table,k);
2482   if (val == NULL){
2483     val=determine_filename(g_strdup(k));
2484     g_hash_table_insert(ref_table, k, val);
2485   }
2486   g_mutex_unlock(ref_table_mutex);
2487   return val;
2488 }
2489 
new_database(MYSQL * conn,char * database_name,gboolean already_dumped)2490 struct database * new_database(MYSQL *conn, char *database_name, gboolean already_dumped){
2491   struct database * d=g_new(struct database,1);
2492   d->name = g_strdup(database_name);
2493   d->filename = get_ref_table(d->name);
2494   d->escaped = escape_string(conn,d->name);
2495   d->already_dumped = already_dumped;
2496   d->ad_mutex=g_mutex_new();
2497   g_hash_table_insert(database_hash, d->name,d);
2498   return d;
2499 }
2500 
get_database(MYSQL * conn,char * database_name,struct database ** database)2501 gboolean get_database(MYSQL *conn, char *database_name, struct database ** database){
2502   *database=g_hash_table_lookup(database_hash,database_name);
2503   if (*database == NULL){
2504     *database=new_database(conn,database_name,FALSE);
2505     return TRUE;
2506   }
2507   return FALSE;
2508 }
2509 
new_db_table(MYSQL * conn,struct database * database,char * table,char * datalength)2510 struct db_table *new_db_table( MYSQL *conn, struct database *database, char *table, char *datalength){
2511   struct db_table *dbt = g_new(struct db_table, 1);
2512   dbt->database = database;
2513   dbt->table = g_strdup(table);
2514   dbt->table_filename = get_ref_table(dbt->table);
2515   dbt->rows_lock= g_mutex_new();
2516   dbt->escaped_table = escape_string(conn,dbt->table);
2517   dbt->anonymized_function=NULL;
2518   dbt->rows=0;
2519   if (!datalength)
2520     dbt->datalength = 0;
2521   else
2522     dbt->datalength = g_ascii_strtoull(datalength, NULL, 10);
2523   return dbt;
2524 }
2525 
2526 
dump_database(struct database * database,struct configuration * conf)2527 void dump_database(struct database *database, struct configuration *conf) {
2528 
2529   g_atomic_int_inc(&database_counter);
2530 
2531   struct job *j = g_new0(struct job, 1);
2532   struct dump_database_job *ddj = g_new0(struct dump_database_job, 1);
2533   j->job_data = (void *)ddj;
2534   ddj->database = database;
2535   j->conf = conf;
2536   j->type = JOB_DUMP_DATABASE;
2537 
2538   if (less_locking)
2539     g_async_queue_push(conf->queue_less_locking, j);
2540   else
2541     g_async_queue_push(conf->queue, j);
2542   return;
2543 }
2544 
2545 
green_light(MYSQL * conn,struct configuration * conf,gboolean is_view,struct database * database,MYSQL_ROW * row,gchar * ecol)2546 void green_light(MYSQL *conn, struct configuration *conf, gboolean is_view, struct database * database, MYSQL_ROW *row, gchar *ecol){
2547     /* Green light! */
2548  g_mutex_lock(database->ad_mutex);
2549  if (!database->already_dumped){
2550    dump_create_database(database->name, conf);
2551    database->already_dumped=TRUE;
2552  }
2553  g_mutex_unlock(database->ad_mutex);
2554 
2555     struct db_table *dbt = new_db_table( conn, database, (*row)[0], (*row)[6]);
2556 
2557     // if is a view we care only about schema
2558     if (!is_view) {
2559       // with trx_consistency_only we dump all as innodb_tables
2560       if (!no_data) {
2561         if (ecol != NULL && g_ascii_strcasecmp("MRG_MYISAM",ecol)) {
2562           if (trx_consistency_only ||
2563               (ecol != NULL && !g_ascii_strcasecmp("InnoDB", ecol))) {
2564             g_mutex_lock(innodb_tables_mutex);
2565             innodb_tables = g_list_prepend(innodb_tables, dbt);
2566             g_mutex_unlock(innodb_tables_mutex);
2567           } else if (ecol != NULL &&
2568                      !g_ascii_strcasecmp("TokuDB", ecol)) {
2569             g_mutex_lock(innodb_tables_mutex);
2570             innodb_tables = g_list_prepend(innodb_tables, dbt);
2571             g_mutex_unlock(innodb_tables_mutex);
2572           } else {
2573             g_mutex_lock(non_innodb_table_mutex);
2574             non_innodb_table = g_list_prepend(non_innodb_table, dbt);
2575             g_mutex_unlock(non_innodb_table_mutex);
2576           }
2577         }
2578       }
2579       if (!no_schemas) {
2580         g_mutex_lock(table_schemas_mutex);
2581         table_schemas = g_list_prepend(table_schemas, dbt);
2582         g_mutex_unlock(table_schemas_mutex);
2583       }
2584     } else {
2585       if (!no_schemas) {
2586         g_mutex_lock(view_schemas_mutex);
2587         view_schemas = g_list_prepend(view_schemas, dbt);
2588         g_mutex_unlock(view_schemas_mutex);
2589       }
2590     }
2591 
2592 }
2593 
2594 
dump_database_thread(MYSQL * conn,struct configuration * conf,struct database * database)2595 void dump_database_thread(MYSQL *conn, struct configuration *conf, struct database *database) {
2596 
2597   char *query;
2598   mysql_select_db(conn, database->name);
2599   if (detected_server == SERVER_TYPE_MYSQL ||
2600       detected_server == SERVER_TYPE_TIDB)
2601     query = g_strdup("SHOW TABLE STATUS");
2602   else
2603     query =
2604         g_strdup_printf("SELECT TABLE_NAME, ENGINE, TABLE_TYPE as COMMENT FROM "
2605                         "DATA_DICTIONARY.TABLES WHERE TABLE_SCHEMA='%s'",
2606                         database->escaped);
2607 
2608   if (mysql_query(conn, (query))) {
2609       g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2610                mysql_error(conn));
2611     errors++;
2612     g_free(query);
2613     return;
2614   }
2615 
2616   MYSQL_RES *result = mysql_store_result(conn);
2617   MYSQL_FIELD *fields = mysql_fetch_fields(result);
2618   guint i;
2619   int ecol = -1, ccol = -1;
2620   for (i = 0; i < mysql_num_fields(result); i++) {
2621     if (!strcasecmp(fields[i].name, "Engine"))
2622       ecol = i;
2623     else if (!strcasecmp(fields[i].name, "Comment"))
2624       ccol = i;
2625   }
2626 
2627   if (!result) {
2628     g_critical("Could not list tables for %s: %s", database->name, mysql_error(conn));
2629     errors++;
2630     return;
2631   }
2632 
2633   MYSQL_ROW row;
2634   while ((row = mysql_fetch_row(result))) {
2635 
2636     int dump = 1;
2637     int is_view = 0;
2638 
2639     /* We now do care about views!
2640             num_fields>1 kicks in only in case of 5.0 SHOW FULL TABLES or SHOW
2641        TABLE STATUS row[1] == NULL if it is a view in 5.0 'SHOW TABLE STATUS'
2642             row[1] == "VIEW" if it is a view in 5.0 'SHOW FULL TABLES'
2643     */
2644     if ((detected_server == SERVER_TYPE_MYSQL) &&
2645         (row[ccol] == NULL || !strcmp(row[ccol], "VIEW")))
2646       is_view = 1;
2647 
2648     /* Check for broken tables, i.e. mrg with missing source tbl */
2649     if (!is_view && row[ecol] == NULL) {
2650       g_warning("Broken table detected, please review: %s.%s", database->name,
2651                 row[0]);
2652       dump = 0;
2653     }
2654 
2655     /* Skip ignored engines, handy for avoiding Merge, Federated or Blackhole
2656      * :-) dumps */
2657     if (dump && ignore && !is_view) {
2658       for (i = 0; ignore[i] != NULL; i++) {
2659         if (g_ascii_strcasecmp(ignore[i], row[ecol]) == 0) {
2660           dump = 0;
2661           break;
2662         }
2663       }
2664     }
2665 
2666     /* Skip views */
2667     if (is_view && no_dump_views)
2668       dump = 0;
2669 
2670     if (!dump)
2671       continue;
2672 
2673     /* In case of table-list option is enabled, check if table is part of the
2674      * list */
2675     if (tables) {
2676       int table_found = 0;
2677       for (i = 0; tables[i] != NULL; i++)
2678         if (g_ascii_strcasecmp(tables[i], row[0]) == 0)
2679           table_found = 1;
2680 
2681       if (!table_found)
2682         dump = 0;
2683     }
2684     if (!dump)
2685       continue;
2686 
2687     /* Special tables */
2688     if (g_ascii_strcasecmp(database->name, "mysql") == 0 &&
2689         (g_ascii_strcasecmp(row[0], "general_log") == 0 ||
2690          g_ascii_strcasecmp(row[0], "slow_log") == 0 ||
2691          g_ascii_strcasecmp(row[0], "innodb_index_stats") == 0 ||
2692          g_ascii_strcasecmp(row[0], "innodb_table_stats") == 0)) {
2693       dump = 0;
2694       continue;
2695     }
2696 
2697     /* Checks skip list on 'database.table' string */
2698     if (tables_skiplist && check_skiplist(database->name, row[0]))
2699       continue;
2700 
2701     /* Checks PCRE expressions on 'database.table' string */
2702     if (regexstring && !check_regex(database->name, row[0]))
2703       continue;
2704 
2705     /* Check if the table was recently updated */
2706     if (no_updated_tables && !is_view) {
2707       GList *iter;
2708       for (iter = no_updated_tables; iter != NULL; iter = iter->next) {
2709         if (g_ascii_strcasecmp(
2710                 iter->data, g_strdup_printf("%s.%s", database->name, row[0])) == 0) {
2711           g_message("NO UPDATED TABLE: %s.%s", database->name, row[0]);
2712           dump = 0;
2713         }
2714       }
2715     }
2716 
2717     if (!dump)
2718       continue;
2719 
2720     green_light(conn,conf, is_view,database,&row,row[ecol]);
2721     /* Green light!
2722     struct db_table *dbt = new_db_table( conn, database, row[0], row[6]);
2723 
2724     // if is a view we care only about schema
2725     if (!is_view) {
2726       // with trx_consistency_only we dump all as innodb_tables
2727       if (!no_data) {
2728         if (row[ecol] != NULL && g_ascii_strcasecmp("MRG_MYISAM", row[ecol])) {
2729           if (trx_consistency_only ||
2730               (row[ecol] != NULL && !g_ascii_strcasecmp("InnoDB", row[ecol]))) {
2731             g_mutex_lock(innodb_tables_mutex);
2732             innodb_tables = g_list_prepend(innodb_tables, dbt);
2733             g_mutex_unlock(innodb_tables_mutex);
2734           } else if (row[ecol] != NULL &&
2735                      !g_ascii_strcasecmp("TokuDB", row[ecol])) {
2736             g_mutex_lock(innodb_tables_mutex);
2737             innodb_tables = g_list_prepend(innodb_tables, dbt);
2738             g_mutex_unlock(innodb_tables_mutex);
2739           } else {
2740             g_mutex_lock(non_innodb_table_mutex);
2741             non_innodb_table = g_list_prepend(non_innodb_table, dbt);
2742             g_mutex_unlock(non_innodb_table_mutex);
2743           }
2744         }
2745       }
2746       if (!no_schemas) {
2747         g_mutex_lock(table_schemas_mutex);
2748         table_schemas = g_list_prepend(table_schemas, dbt);
2749         g_mutex_unlock(table_schemas_mutex);
2750       }
2751     } else {
2752       if (!no_schemas) {
2753         g_mutex_lock(view_schemas_mutex);
2754         view_schemas = g_list_prepend(view_schemas, dbt);
2755         g_mutex_unlock(view_schemas_mutex);
2756       }
2757     }*/
2758 
2759   }
2760 
2761   mysql_free_result(result);
2762 
2763   // Store Procedures and Events
2764   // As these are not attached to tables we need to define when we need to dump
2765   // or not Having regex filter make this hard because we dont now if a full
2766   // schema is filtered or not Also I cant decide this based on tables from a
2767   // schema being dumped So I will use only regex to dump or not SP and EVENTS I
2768   // only need one match to dump all
2769 
2770   int post_dump = 0;
2771 
2772   if (dump_routines) {
2773     // SP
2774     query = g_strdup_printf("SHOW PROCEDURE STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
2775     if (mysql_query(conn, (query))) {
2776       g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2777                  mysql_error(conn));
2778       errors++;
2779       g_free(query);
2780       return;
2781     }
2782     result = mysql_store_result(conn);
2783     while ((row = mysql_fetch_row(result)) && !post_dump) {
2784       /* Checks skip list on 'database.sp' string */
2785       if (tables_skiplist && check_skiplist(database->name, row[1]))
2786         continue;
2787 
2788       /* Checks PCRE expressions on 'database.sp' string */
2789       if (regexstring && !check_regex(database->name, row[1]))
2790         continue;
2791 
2792       post_dump = 1;
2793     }
2794 
2795     if (!post_dump) {
2796       // FUNCTIONS
2797       query = g_strdup_printf("SHOW FUNCTION STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
2798       if (mysql_query(conn, (query))) {
2799         g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2800                    mysql_error(conn));
2801         errors++;
2802         g_free(query);
2803         return;
2804       }
2805       result = mysql_store_result(conn);
2806       while ((row = mysql_fetch_row(result)) && !post_dump) {
2807         /* Checks skip list on 'database.sp' string */
2808         if (tables_skiplist_file && check_skiplist(database->name, row[1]))
2809           continue;
2810         /* Checks PCRE expressions on 'database.sp' string */
2811         if (regexstring && !check_regex(database->name, row[1]))
2812           continue;
2813 
2814         post_dump = 1;
2815       }
2816     }
2817     mysql_free_result(result);
2818   }
2819 
2820   if (dump_events && !post_dump) {
2821     // EVENTS
2822     query = g_strdup_printf("SHOW EVENTS FROM `%s`", database->name);
2823     if (mysql_query(conn, (query))) {
2824       g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2825                  mysql_error(conn));
2826       errors++;
2827       g_free(query);
2828       return;
2829     }
2830     result = mysql_store_result(conn);
2831     while ((row = mysql_fetch_row(result)) && !post_dump) {
2832       /* Checks skip list on 'database.sp' string */
2833       if (tables_skiplist_file && check_skiplist(database->name, row[1]))
2834         continue;
2835       /* Checks PCRE expressions on 'database.sp' string */
2836       if (regexstring && !check_regex(database->name, row[1]))
2837         continue;
2838 
2839       post_dump = 1;
2840     }
2841     mysql_free_result(result);
2842   }
2843 
2844   if (post_dump) {
2845     struct schema_post *sp = g_new(struct schema_post, 1);
2846     sp->database = database;
2847     schema_post = g_list_prepend(schema_post, sp);
2848   }
2849 
2850   g_free(query);
2851 
2852   return;
2853 }
2854 
get_tables(MYSQL * conn,struct configuration * conf)2855 void get_tables(MYSQL *conn, struct configuration *conf) {
2856 
2857   gchar **dt = NULL;
2858   char *query = NULL;
2859   guint i, x;
2860 
2861   for (x = 0; tables[x] != NULL; x++) {
2862     dt = g_strsplit(tables[x], ".", 0);
2863 
2864     query =
2865         g_strdup_printf("SHOW TABLE STATUS FROM %s LIKE '%s'", dt[0], dt[1]);
2866 
2867     if (mysql_query(conn, (query))) {
2868       g_critical("Error: DB: %s - Could not execute query: %s", dt[0],
2869                  mysql_error(conn));
2870       errors++;
2871       return;
2872     }
2873 
2874     MYSQL_RES *result = mysql_store_result(conn);
2875     MYSQL_FIELD *fields = mysql_fetch_fields(result);
2876     guint ecol = -1;
2877     guint ccol = -1;
2878     for (i = 0; i < mysql_num_fields(result); i++) {
2879       if (!strcasecmp(fields[i].name, "Engine"))
2880         ecol = i;
2881       else if (!strcasecmp(fields[i].name, "Comment"))
2882         ccol = i;
2883     }
2884 
2885     if (!result) {
2886       g_warning("Could not list table for %s.%s: %s", dt[0], dt[1],
2887                 mysql_error(conn));
2888       errors++;
2889       return;
2890     }
2891     struct database * database=NULL;
2892     if (get_database(conn, dt[0],&database)){
2893       dump_create_database(database->name, conf);
2894       g_async_queue_push(conf->ready_database_dump, GINT_TO_POINTER(1));
2895     }
2896     MYSQL_ROW row;
2897     while ((row = mysql_fetch_row(result))) {
2898 
2899       int is_view = 0;
2900 
2901       if ((detected_server == SERVER_TYPE_MYSQL) &&
2902           (row[ccol] == NULL || !strcmp(row[ccol], "VIEW")))
2903         is_view = 1;
2904       green_light(conn, conf, is_view, database, &row,row[ecol]);
2905       /* Green light!
2906       struct db_table *dbt = new_db_table( conn, get_database(conn, dt[0]), dt[1], NULL);
2907 
2908       if (!is_view) {
2909         if (trx_consistency_only) {
2910           dump_table(conn, dbt, conf, TRUE);
2911         } else if (!g_ascii_strcasecmp("InnoDB", row[ecol])) {
2912           g_mutex_lock(innodb_tables_mutex);
2913           innodb_tables = g_list_prepend(innodb_tables, dbt);
2914           g_mutex_unlock(innodb_tables_mutex);
2915         } else if (!g_ascii_strcasecmp("TokuDB", row[ecol])) {
2916           g_mutex_lock(innodb_tables_mutex);
2917           innodb_tables = g_list_prepend(innodb_tables, dbt);
2918           g_mutex_unlock(innodb_tables_mutex);
2919         } else {
2920           g_mutex_lock(non_innodb_table_mutex);
2921           non_innodb_table = g_list_prepend(non_innodb_table, dbt);
2922           g_mutex_unlock(non_innodb_table_mutex);
2923         }
2924         if (!no_schemas) {
2925           g_mutex_lock(table_schemas_mutex);
2926           table_schemas = g_list_prepend(table_schemas, dbt);
2927           g_mutex_unlock(table_schemas_mutex);
2928         }
2929       } else {
2930         if (!no_schemas) {
2931           g_mutex_lock(view_schemas_mutex);
2932           view_schemas = g_list_prepend(view_schemas, dbt);
2933           g_mutex_unlock(view_schemas_mutex);
2934         }
2935       }
2936       */
2937     }
2938   }
2939 
2940   g_free(query);
2941 }
2942 
set_charset(GString * statement,char * character_set,char * collation_connection)2943 void set_charset(GString *statement, char *character_set,
2944                  char *collation_connection) {
2945   g_string_printf(statement,
2946                   "SET @PREV_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT;\n");
2947   g_string_append(statement,
2948                   "SET @PREV_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS;\n");
2949   g_string_append(statement,
2950                   "SET @PREV_COLLATION_CONNECTION=@@COLLATION_CONNECTION;\n");
2951 
2952   g_string_append_printf(statement, "SET character_set_client = %s;\n",
2953                          character_set);
2954   g_string_append_printf(statement, "SET character_set_results = %s;\n",
2955                          character_set);
2956   g_string_append_printf(statement, "SET collation_connection = %s;\n",
2957                          collation_connection);
2958 }
2959 
restore_charset(GString * statement)2960 void restore_charset(GString *statement) {
2961   g_string_append(statement,
2962                   "SET character_set_client = @PREV_CHARACTER_SET_CLIENT;\n");
2963   g_string_append(statement,
2964                   "SET character_set_results = @PREV_CHARACTER_SET_RESULTS;\n");
2965   g_string_append(statement,
2966                   "SET collation_connection = @PREV_COLLATION_CONNECTION;\n");
2967 }
2968 
dump_schema_post_data(MYSQL * conn,struct database * database,char * filename)2969 void dump_schema_post_data(MYSQL *conn, struct database *database, char *filename) {
2970   void *outfile;
2971   char *query = NULL;
2972   MYSQL_RES *result = NULL;
2973   MYSQL_RES *result2 = NULL;
2974   MYSQL_ROW row;
2975   MYSQL_ROW row2;
2976   gchar **splited_st = NULL;
2977 
2978   outfile = m_open(filename,"w");
2979 
2980   if (!outfile) {
2981     g_critical("Error: DB: %s Could not create output file %s (%d)", database->name,
2982                filename, errno);
2983     errors++;
2984     return;
2985   }
2986 
2987   GString *statement = g_string_sized_new(statement_size);
2988 
2989   if (dump_routines) {
2990     // get functions
2991     query = g_strdup_printf("SHOW FUNCTION STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
2992     if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
2993       if (success_on_1146 && mysql_errno(conn) == 1146) {
2994         g_warning("Error dumping functions from %s: %s", database->name,
2995                   mysql_error(conn));
2996       } else {
2997         g_critical("Error dumping functions from %s: %s", database->name,
2998                    mysql_error(conn));
2999         errors++;
3000       }
3001       g_free(query);
3002       return;
3003     }
3004 
3005     while ((row = mysql_fetch_row(result))) {
3006       set_charset(statement, row[8], row[9]);
3007       g_string_append_printf(statement, "DROP FUNCTION IF EXISTS `%s`;\n",
3008                              row[1]);
3009       if (!write_data((FILE *)outfile, statement)) {
3010         g_critical("Could not write stored procedure data for %s.%s", database->name,
3011                    row[1]);
3012         errors++;
3013         return;
3014       }
3015       g_string_set_size(statement, 0);
3016       query =
3017           g_strdup_printf("SHOW CREATE FUNCTION `%s`.`%s`", database->name, row[1]);
3018       mysql_query(conn, query);
3019       result2 = mysql_store_result(conn);
3020       row2 = mysql_fetch_row(result2);
3021       g_string_printf(statement, "%s", row2[2]);
3022       splited_st = g_strsplit(statement->str, ";\n", 0);
3023       g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3024       g_string_append(statement, ";\n");
3025       restore_charset(statement);
3026       if (!write_data((FILE *)outfile, statement)) {
3027         g_critical("Could not write function data for %s.%s", database->name, row[1]);
3028         errors++;
3029         return;
3030       }
3031       g_string_set_size(statement, 0);
3032     }
3033 
3034     // get sp
3035     query = g_strdup_printf("SHOW PROCEDURE STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
3036     if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3037       if (success_on_1146 && mysql_errno(conn) == 1146) {
3038         g_warning("Error dumping stored procedures from %s: %s", database->name,
3039                   mysql_error(conn));
3040       } else {
3041         g_critical("Error dumping stored procedures from %s: %s", database->name,
3042                    mysql_error(conn));
3043         errors++;
3044       }
3045       g_free(query);
3046       return;
3047     }
3048 
3049     while ((row = mysql_fetch_row(result))) {
3050       set_charset(statement, row[8], row[9]);
3051       g_string_append_printf(statement, "DROP PROCEDURE IF EXISTS `%s`;\n",
3052                              row[1]);
3053       if (!write_data((FILE *)outfile, statement)) {
3054         g_critical("Could not write stored procedure data for %s.%s", database->name,
3055                    row[1]);
3056         errors++;
3057         return;
3058       }
3059       g_string_set_size(statement, 0);
3060       query =
3061           g_strdup_printf("SHOW CREATE PROCEDURE `%s`.`%s`", database->name, row[1]);
3062       mysql_query(conn, query);
3063       result2 = mysql_store_result(conn);
3064       row2 = mysql_fetch_row(result2);
3065       g_string_printf(statement, "%s", row2[2]);
3066       splited_st = g_strsplit(statement->str, ";\n", 0);
3067       g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3068       g_string_append(statement, ";\n");
3069       restore_charset(statement);
3070       if (!write_data((FILE *)outfile, statement)) {
3071         g_critical("Could not write stored procedure data for %s.%s", database->name,
3072                    row[1]);
3073         errors++;
3074         return;
3075       }
3076       g_string_set_size(statement, 0);
3077     }
3078   }
3079 
3080   // get events
3081   if (dump_events) {
3082     query = g_strdup_printf("SHOW EVENTS FROM `%s`", database->name);
3083     if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3084       if (success_on_1146 && mysql_errno(conn) == 1146) {
3085         g_warning("Error dumping events from %s: %s", database->name,
3086                   mysql_error(conn));
3087       } else {
3088         g_critical("Error dumping events from %s: %s", database->name,
3089                    mysql_error(conn));
3090         errors++;
3091       }
3092       g_free(query);
3093       return;
3094     }
3095 
3096     while ((row = mysql_fetch_row(result))) {
3097       set_charset(statement, row[12], row[13]);
3098       g_string_append_printf(statement, "DROP EVENT IF EXISTS `%s`;\n", row[1]);
3099       if (!write_data((FILE *)outfile, statement)) {
3100         g_critical("Could not write stored procedure data for %s.%s", database->name,
3101                    row[1]);
3102         errors++;
3103         return;
3104       }
3105       query = g_strdup_printf("SHOW CREATE EVENT `%s`.`%s`", database->name, row[1]);
3106       mysql_query(conn, query);
3107       result2 = mysql_store_result(conn);
3108       // DROP EVENT IF EXISTS event_name
3109       row2 = mysql_fetch_row(result2);
3110       g_string_printf(statement, "%s", row2[3]);
3111       splited_st = g_strsplit(statement->str, ";\n", 0);
3112       g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3113       g_string_append(statement, ";\n");
3114       restore_charset(statement);
3115       if (!write_data((FILE *)outfile, statement)) {
3116         g_critical("Could not write event data for %s.%s", database->name, row[1]);
3117         errors++;
3118         return;
3119       }
3120       g_string_set_size(statement, 0);
3121     }
3122   }
3123 
3124   g_free(query);
3125   m_close(outfile);
3126   if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3127   g_string_free(statement, TRUE);
3128   g_strfreev(splited_st);
3129   if (result)
3130     mysql_free_result(result);
3131   if (result2)
3132     mysql_free_result(result2);
3133 
3134   return;
3135 }
dump_triggers_data(MYSQL * conn,char * database,char * table,char * filename)3136 void dump_triggers_data(MYSQL *conn, char *database, char *table,
3137                         char *filename) {
3138   void *outfile;
3139   char *query = NULL;
3140   MYSQL_RES *result = NULL;
3141   MYSQL_RES *result2 = NULL;
3142   MYSQL_ROW row;
3143   MYSQL_ROW row2;
3144   gchar **splited_st = NULL;
3145 
3146   outfile = m_open(filename,"w");
3147 
3148   if (!outfile) {
3149     g_critical("Error: DB: %s Could not create output file %s (%d)", database,
3150                filename, errno);
3151     errors++;
3152     return;
3153   }
3154 
3155   GString *statement = g_string_sized_new(statement_size);
3156 
3157   // get triggers
3158   query = g_strdup_printf("SHOW TRIGGERS FROM `%s` LIKE '%s'", database, table);
3159   if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3160     if (success_on_1146 && mysql_errno(conn) == 1146) {
3161       g_warning("Error dumping triggers (%s.%s): %s", database, table,
3162                 mysql_error(conn));
3163     } else {
3164       g_critical("Error dumping triggers (%s.%s): %s", database, table,
3165                  mysql_error(conn));
3166       errors++;
3167     }
3168     g_free(query);
3169     return;
3170   }
3171 
3172   while ((row = mysql_fetch_row(result))) {
3173     set_charset(statement, row[8], row[9]);
3174     if (!write_data((FILE *)outfile, statement)) {
3175       g_critical("Could not write triggers data for %s.%s", database, table);
3176       errors++;
3177       return;
3178     }
3179     g_string_set_size(statement, 0);
3180     query = g_strdup_printf("SHOW CREATE TRIGGER `%s`.`%s`", database, row[0]);
3181     mysql_query(conn, query);
3182     result2 = mysql_store_result(conn);
3183     row2 = mysql_fetch_row(result2);
3184     g_string_append_printf(statement, "%s", row2[2]);
3185     splited_st = g_strsplit(statement->str, ";\n", 0);
3186     g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3187     g_string_append(statement, ";\n");
3188     restore_charset(statement);
3189     if (!write_data((FILE *)outfile, statement)) {
3190       g_critical("Could not write triggers data for %s.%s", database, table);
3191       errors++;
3192       return;
3193     }
3194     g_string_set_size(statement, 0);
3195   }
3196 
3197   g_free(query);
3198   m_close(outfile);
3199   if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3200   g_string_free(statement, TRUE);
3201   g_strfreev(splited_st);
3202   if (result)
3203     mysql_free_result(result);
3204   if (result2)
3205     mysql_free_result(result2);
3206 
3207   return;
3208 }
dump_schema_data(MYSQL * conn,char * database,char * table,char * filename)3209 void dump_schema_data(MYSQL *conn, char *database, char *table,
3210                       char *filename) {
3211   void *outfile;
3212   char *query = NULL;
3213   MYSQL_RES *result = NULL;
3214   MYSQL_ROW row;
3215   outfile = m_open(filename,"w");
3216 
3217   if (!outfile) {
3218     g_critical("Error: DB: %s Could not create output file %s (%d)", database,
3219                filename, errno);
3220     errors++;
3221     return;
3222   }
3223 
3224   GString *statement = g_string_sized_new(statement_size);
3225 
3226   if (detected_server == SERVER_TYPE_MYSQL) {
3227 				g_string_printf(statement,"%s;\n",set_names_str);
3228     g_string_append(statement, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n\n");
3229     if (!skip_tz) {
3230       g_string_append(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3231     }
3232   } else if (detected_server == SERVER_TYPE_TIDB) {
3233     if (!skip_tz) {
3234       g_string_printf(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3235     }
3236   } else {
3237     g_string_printf(statement, "SET FOREIGN_KEY_CHECKS=0;\n");
3238   }
3239 
3240   if (!write_data((FILE *)outfile, statement)) {
3241     g_critical("Could not write schema data for %s.%s", database, table);
3242     errors++;
3243     return;
3244   }
3245 
3246   query = g_strdup_printf("SHOW CREATE TABLE `%s`.`%s`", database, table);
3247   if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3248     if (success_on_1146 && mysql_errno(conn) == 1146) {
3249       g_warning("Error dumping schemas (%s.%s): %s", database, table,
3250                 mysql_error(conn));
3251     } else {
3252       g_critical("Error dumping schemas (%s.%s): %s", database, table,
3253                  mysql_error(conn));
3254       errors++;
3255     }
3256     g_free(query);
3257     return;
3258   }
3259 
3260   g_string_set_size(statement, 0);
3261 
3262   /* There should never be more than one row */
3263   row = mysql_fetch_row(result);
3264   g_string_append(statement, row[1]);
3265   g_string_append(statement, ";\n");
3266   if (!write_data((FILE *)outfile, statement)) {
3267     g_critical("Could not write schema for %s.%s", database, table);
3268     errors++;
3269   }
3270   g_free(query);
3271 
3272   m_close(outfile);
3273   if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3274   g_string_free(statement, TRUE);
3275   if (result)
3276     mysql_free_result(result);
3277 
3278   return;
3279 }
3280 
dump_view_data(MYSQL * conn,char * database,char * table,char * filename,char * filename2)3281 void dump_view_data(MYSQL *conn, char *database, char *table, char *filename,
3282                     char *filename2) {
3283   void *outfile, *outfile2;
3284   char *query = NULL;
3285   MYSQL_RES *result = NULL;
3286   MYSQL_ROW row;
3287   GString *statement = g_string_sized_new(statement_size);
3288 
3289   mysql_select_db(conn, database);
3290 
3291   outfile = m_open(filename,"w");
3292   outfile2 = m_open(filename2,"w");
3293 
3294   if (!outfile || !outfile2) {
3295     g_critical("Error: DB: %s Could not create output file (%d)", database,
3296                errno);
3297     errors++;
3298     return;
3299   }
3300 
3301   if (detected_server == SERVER_TYPE_MYSQL) {
3302 				g_string_printf(statement,"%s;\n",set_names_str);
3303   }
3304 
3305   if (!write_data((FILE *)outfile, statement)) {
3306     g_critical("Could not write schema data for %s.%s", database, table);
3307     errors++;
3308     return;
3309   }
3310 
3311   g_string_append_printf(statement, "DROP TABLE IF EXISTS `%s`;\n", table);
3312   g_string_append_printf(statement, "DROP VIEW IF EXISTS `%s`;\n", table);
3313 
3314   if (!write_data((FILE *)outfile2, statement)) {
3315     g_critical("Could not write schema data for %s.%s", database, table);
3316     errors++;
3317     return;
3318   }
3319 
3320   // we create tables as workaround
3321   // for view dependencies
3322   query = g_strdup_printf("SHOW FIELDS FROM `%s`.`%s`", database, table);
3323   if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3324     if (success_on_1146 && mysql_errno(conn) == 1146) {
3325       g_warning("Error dumping schemas (%s.%s): %s", database, table,
3326                 mysql_error(conn));
3327     } else {
3328       g_critical("Error dumping schemas (%s.%s): %s", database, table,
3329                  mysql_error(conn));
3330       errors++;
3331     }
3332     g_free(query);
3333     return;
3334   }
3335   g_free(query);
3336   g_string_set_size(statement, 0);
3337   g_string_append_printf(statement, "CREATE TABLE IF NOT EXISTS `%s`(\n", table);
3338   row = mysql_fetch_row(result);
3339   g_string_append_printf(statement, "`%s` int", row[0]);
3340   while ((row = mysql_fetch_row(result))) {
3341     g_string_append(statement, ",\n");
3342     g_string_append_printf(statement, "`%s` int", row[0]);
3343   }
3344   g_string_append(statement, "\n);\n");
3345 
3346   if (result)
3347     mysql_free_result(result);
3348 
3349   if (!write_data((FILE *)outfile, statement)) {
3350     g_critical("Could not write view schema for %s.%s", database, table);
3351     errors++;
3352   }
3353 
3354   // real view
3355   query = g_strdup_printf("SHOW CREATE VIEW `%s`.`%s`", database, table);
3356   if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3357     if (success_on_1146 && mysql_errno(conn) == 1146) {
3358       g_warning("Error dumping schemas (%s.%s): %s", database, table,
3359                 mysql_error(conn));
3360     } else {
3361       g_critical("Error dumping schemas (%s.%s): %s", database, table,
3362                  mysql_error(conn));
3363       errors++;
3364     }
3365     g_free(query);
3366     return;
3367   }
3368   g_string_set_size(statement, 0);
3369 
3370   /* There should never be more than one row */
3371   row = mysql_fetch_row(result);
3372   set_charset(statement, row[2], row[3]);
3373   g_string_append(statement, row[1]);
3374   g_string_append(statement, ";\n");
3375   restore_charset(statement);
3376   if (!write_data((FILE *)outfile2, statement)) {
3377     g_critical("Could not write schema for %s.%s", database, table);
3378     errors++;
3379   }
3380   g_free(query);
3381   m_close(outfile);
3382   if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3383   m_close(outfile2);
3384   if (stream) g_async_queue_push(stream_queue, g_strdup(filename2));
3385   g_string_free(statement, TRUE);
3386   if (result)
3387     mysql_free_result(result);
3388 
3389   return;
3390 }
3391 
dump_table_data_file(MYSQL * conn,struct table_job * tj)3392 void dump_table_data_file(MYSQL *conn, struct table_job *tj) {
3393   void *outfile = NULL;
3394 
3395   outfile = m_open(tj->filename,"w");
3396 
3397   if (!outfile) {
3398     g_critical("Error: DB: %s TABLE: %s Could not create output file %s (%d)",
3399                tj->database, tj->table, tj->filename, errno);
3400     errors++;
3401     return;
3402   }
3403   guint64 rows_count =
3404       dump_table_data(conn, (FILE *)outfile, tj);
3405 
3406   if (!rows_count)
3407     g_message("Empty table %s.%s", tj->database, tj->table);
3408 
3409 }
3410 
dump_table_checksum(MYSQL * conn,char * database,char * table,char * filename)3411 void dump_table_checksum(MYSQL *conn, char *database, char *table, char *filename) {
3412   void *outfile = NULL;
3413 
3414   outfile = g_fopen(filename, "w");
3415 
3416   if (!outfile) {
3417     g_critical("Error: DB: %s TABLE: %s Could not create output file %s (%d)",
3418                database, table, filename, errno);
3419     errors++;
3420     return;
3421   }
3422   int errn=0;
3423 
3424   gchar * checksum=checksum_table(conn, database, table, &errn);
3425   if (errn != 0 && !(success_on_1146 && errn == 1146)) {
3426     errors++;
3427     return;
3428   }
3429   fprintf(outfile, "%s", checksum);
3430   fclose(outfile);
3431 
3432   if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3433   g_free(checksum);
3434 
3435   return;
3436 }
3437 
dump_checksum(struct db_table * dbt,struct configuration * conf)3438 void dump_checksum(struct db_table * dbt,
3439                  struct configuration *conf) {
3440   struct job *j = g_new0(struct job, 1);
3441   struct table_checksum_job *tcj = g_new0(struct table_checksum_job, 1);
3442   j->job_data = (void *)tcj;
3443   tcj->database = dbt->database->name;
3444   tcj->table = g_strdup(dbt->table);
3445   j->conf = conf;
3446   j->type = JOB_CHECKSUM;
3447   tcj->filename = build_meta_filename(dbt->database->filename, dbt->table_filename,"checksum");
3448   g_async_queue_push(conf->queue, j);
3449   return;
3450 }
dump_schema(MYSQL * conn,struct db_table * dbt,struct configuration * conf)3451 void dump_schema(MYSQL *conn, struct db_table *dbt,
3452                  struct configuration *conf) {
3453   struct job *j = g_new0(struct job, 1);
3454   struct schema_job *sj = g_new0(struct schema_job, 1);
3455   j->job_data = (void *)sj;
3456   sj->database = dbt->database->name;
3457   sj->table = g_strdup(dbt->table);
3458   j->conf = conf;
3459   j->type = JOB_SCHEMA;
3460   sj->filename = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema");
3461   g_async_queue_push(conf->queue, j);
3462 
3463   if (dump_triggers) {
3464     char *query = NULL;
3465     MYSQL_RES *result = NULL;
3466 
3467     query =
3468         g_strdup_printf("SHOW TRIGGERS FROM `%s` LIKE '%s'", dbt->database->name, dbt->escaped_table);
3469     if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3470       g_critical("Error Checking triggers for %s.%s. Err: %s St: %s", dbt->database->name, dbt->table,
3471                  mysql_error(conn),query);
3472       errors++;
3473     } else {
3474       if (mysql_num_rows(result)) {
3475         struct job *t = g_new0(struct job, 1);
3476         struct schema_job *st = g_new0(struct schema_job, 1);
3477         t->job_data = (void *)st;
3478         st->database = dbt->database->name;
3479         st->table = g_strdup(dbt->table);
3480         t->conf = conf;
3481         t->type = JOB_TRIGGERS;
3482         st->filename = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema-triggers");
3483         g_async_queue_push(conf->queue, t);
3484       }
3485     }
3486     g_free(query);
3487     if (result) {
3488       mysql_free_result(result);
3489     }
3490   }
3491   return;
3492 }
3493 
dump_view(struct db_table * dbt,struct configuration * conf)3494 void dump_view(struct db_table *dbt, struct configuration *conf) {
3495   struct job *j = g_new0(struct job, 1);
3496   struct view_job *vj = g_new0(struct view_job, 1);
3497   j->job_data = (void *)vj;
3498   vj->database = dbt->database->name;
3499   vj->table = g_strdup(dbt->table);
3500   j->conf = conf;
3501   j->type = JOB_VIEW;
3502   vj->filename  = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema");
3503   vj->filename2 = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema-view");
3504   g_async_queue_push(conf->queue, j);
3505   return;
3506 }
3507 
dump_schema_post(struct database * database,struct configuration * conf)3508 void dump_schema_post(struct database *database, struct configuration *conf) {
3509   struct job *j = g_new0(struct job, 1);
3510   struct schema_post_job *sp = g_new0(struct schema_post_job, 1);
3511   j->job_data = (void *)sp;
3512   sp->database = database;
3513   j->conf = conf;
3514   j->type = JOB_SCHEMA_POST;
3515   sp->filename = build_schema_filename(sp->database->filename,"schema-post");
3516   g_async_queue_push(conf->queue, j);
3517   return;
3518 }
3519 
new_table_job(struct db_table * dbt,char * where,guint nchunk,gboolean has_generated_fields)3520 struct table_job * new_table_job(struct db_table *dbt, char *where, guint nchunk, gboolean has_generated_fields){
3521   struct table_job *tj = g_new0(struct table_job, 1);
3522 // begin Refactoring: We should review this, as dbt->database should not be free, so it might be no need to g_strdup.
3523   // from the ref table?? TODO
3524   tj->database=dbt->database->name;
3525   tj->table=g_strdup(dbt->table);
3526 // end
3527   tj->nchunk=nchunk;
3528   tj->where=where;
3529   tj->filename = build_data_filename(dbt->database->filename, dbt->table_filename, tj->nchunk);
3530   tj->has_generated_fields=has_generated_fields;
3531   tj->dbt=dbt;
3532   return tj;
3533 }
3534 
dump_table(MYSQL * conn,struct db_table * dbt,struct configuration * conf,gboolean is_innodb)3535 void dump_table(MYSQL *conn, struct db_table *dbt,
3536                 struct configuration *conf, gboolean is_innodb) {
3537 //  char *database = dbt->database;
3538 //  char *table = dbt->table;
3539   GList *chunks = NULL;
3540   if (rows_per_file)
3541     chunks = get_chunks_for_table(conn, dbt->database->name, dbt->table, conf);
3542 
3543   gboolean has_generated_fields =
3544     detect_generated_fields(conn, dbt);
3545 
3546   char *order_by = NULL;
3547   if (order_by_primary_key)
3548     order_by = get_primary_key_string(conn, dbt->database->name, dbt->table);
3549   if (chunks) {
3550     int nchunk = 0;
3551     GList *iter;
3552     for (iter = chunks; iter != NULL; iter = iter->next) {
3553       struct job *j = g_new0(struct job, 1);
3554       struct table_job *tj = NULL;
3555       j->conf = conf;
3556       j->type = is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB;
3557       tj = new_table_job(dbt, (char *)iter->data, nchunk, has_generated_fields);
3558       if (order_by) {
3559         tj->order_by = g_strdup(order_by);
3560       } else {
3561         tj->order_by = NULL;
3562       }
3563       j->job_data = (void *)tj;
3564       if (!is_innodb && nchunk)
3565         g_atomic_int_inc(&non_innodb_table_counter);
3566       g_async_queue_push(conf->queue, j);
3567       nchunk++;
3568     }
3569     g_list_free(chunks);
3570   } else {
3571     struct job *j = g_new0(struct job, 1);
3572     struct table_job *tj = NULL;
3573     j->conf = conf;
3574     j->type = is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB;
3575     tj = new_table_job(dbt, NULL, 0, has_generated_fields);
3576     if (order_by) {
3577       tj->order_by = g_strdup(order_by);
3578     } else {
3579       tj->order_by = NULL;
3580     }
3581     j->job_data = (void *)tj;
3582     g_async_queue_push(conf->queue, j);
3583   }
3584   g_free(order_by);
3585 }
3586 
dump_tables(MYSQL * conn,GList * noninnodb_tables_list,struct configuration * conf)3587 void dump_tables(MYSQL *conn, GList *noninnodb_tables_list,
3588                  struct configuration *conf) {
3589   struct db_table *dbt=NULL;
3590   GList *chunks = NULL;
3591 
3592   struct job *j = g_new0(struct job, 1);
3593   struct tables_job *tjs = g_new0(struct tables_job, 1);
3594   j->conf = conf;
3595   j->type = JOB_LOCK_DUMP_NON_INNODB;
3596   j->job_data = (void *)tjs;
3597 
3598   GList *iter;
3599   for (iter = noninnodb_tables_list; iter != NULL; iter = iter->next) {
3600     dbt = (struct db_table *)iter->data;
3601 
3602     if (rows_per_file)
3603       chunks = get_chunks_for_table(conn, dbt->database->name, dbt->table, conf);
3604     gboolean has_generated_fields =
3605       detect_generated_fields(conn, dbt);
3606 
3607     if (chunks) {
3608       int nchunk = 0;
3609       GList *citer;
3610       gchar *order_by = NULL;
3611       if (order_by_primary_key)
3612         order_by = get_primary_key_string(conn, dbt->database->name, dbt->table);
3613       for (citer = chunks; citer != NULL; citer = citer->next) {
3614         struct table_job *tj = NULL;
3615         tj = new_table_job(dbt, (char *)iter->data, nchunk, has_generated_fields);
3616         if (order_by)
3617           tj->order_by = g_strdup(order_by);
3618         else
3619           tj->order_by = NULL;
3620         tjs->table_job_list = g_list_prepend(tjs->table_job_list, tj);
3621         nchunk++;
3622       }
3623       if (order_by)
3624         g_free(order_by);
3625       g_list_free(chunks);
3626     } else {
3627       struct table_job *tj = NULL;
3628       tj = new_table_job(dbt, NULL, 0, has_generated_fields);
3629       if (order_by_primary_key)
3630         tj->order_by = get_primary_key_string(conn, dbt->database->name, dbt->table);
3631       else
3632         tj->order_by = NULL;
3633       tjs->table_job_list = g_list_prepend(tjs->table_job_list, tj);
3634     }
3635   }
3636   tjs->table_job_list = g_list_reverse(tjs->table_job_list);
3637   g_async_queue_push(conf->queue_less_locking, j);
3638 }
append_columns(GString * statement,MYSQL_FIELD * fields,guint num_fields)3639 void append_columns (GString *statement, MYSQL_FIELD *fields, guint num_fields){
3640   guint i = 0;
3641   for (i = 0; i < num_fields; ++i) {
3642     if (i > 0) {
3643       g_string_append_c(statement, ',');
3644     }
3645     g_string_append_printf(statement, "`%s`", fields[i].name);
3646   }
3647 }
3648 
append_insert(gboolean condition,GString * statement,char * table,MYSQL_FIELD * fields,guint num_fields)3649 void append_insert (gboolean condition, GString *statement, char *table, MYSQL_FIELD *fields, guint num_fields){
3650   if (condition) {
3651     if (insert_ignore) {
3652       g_string_printf(statement, "INSERT IGNORE INTO `%s` (", table);
3653     } else {
3654       g_string_printf(statement, "INSERT INTO `%s` (", table);
3655     }
3656     append_columns(statement,fields,num_fields);
3657     g_string_append(statement, ") VALUES");
3658   } else {
3659     if (insert_ignore) {
3660       g_string_printf(statement, "INSERT IGNORE INTO `%s` VALUES", table);
3661     } else {
3662       g_string_printf(statement, "INSERT INTO `%s` VALUES", table);
3663     }
3664   }
3665 }
3666 
3667 /* Do actual data chunk reading/writing magic */
dump_table_data(MYSQL * conn,FILE * file,struct table_job * tj)3668 guint64 dump_table_data(MYSQL *conn, FILE *file, struct table_job * tj){
3669   // There are 2 possible options to chunk the files:
3670   // - no chunk: this means that will be just 1 data file
3671   // - chunk_filesize: this function will be spliting the per filesize, this means that multiple files will be created
3672   // Split by row is before this step
3673   // It could write multiple INSERT statments in a data file if statement_size is reached
3674   guint i;
3675   guint fn = 0;
3676   guint st_in_file = 0;
3677   guint num_fields = 0;
3678   guint64 num_rows = 0;
3679   guint64 num_rows_st = 0;
3680   MYSQL_RES *result = NULL;
3681   char *query = NULL;
3682   gchar *fcfile = NULL;
3683   gchar *load_data_fn=NULL;
3684 //  gchar *filename_prefix = NULL;
3685   struct db_table * dbt = tj->dbt;
3686   /* Buffer for escaping field values */
3687   GString *escaped = g_string_sized_new(3000);
3688   FILE *main_file=file;
3689   if (chunk_filesize) {
3690     fcfile = build_data_filename(dbt->database->filename, dbt->table_filename, fn);
3691   }else{
3692     fcfile = g_strdup(tj->filename);
3693   }
3694 
3695   gboolean has_generated_fields = tj->has_generated_fields;
3696 
3697   /* Ghm, not sure if this should be statement_size - but default isn't too big
3698    * for now */
3699   GString *statement = g_string_sized_new(statement_size);
3700   GString *statement_row = g_string_sized_new(0);
3701 
3702   GString *select_fields;
3703 
3704   if (has_generated_fields) {
3705     select_fields = get_insertable_fields(conn, tj->database, tj->table);
3706   } else {
3707     select_fields = g_string_new("*");
3708   }
3709 
3710   /* Poor man's database code */
3711   query = g_strdup_printf(
3712       "SELECT %s %s FROM `%s`.`%s` %s %s %s %s %s %s",
3713       (detected_server == SERVER_TYPE_MYSQL) ? "/*!40001 SQL_NO_CACHE */" : "",
3714       select_fields->str, tj->database, tj->table, (tj->where || where_option ) ? "WHERE" : "",
3715       tj->where ? tj->where : "",  (tj->where && where_option ) ? "AND" : "", where_option ? where_option : "", tj->order_by ? "ORDER BY" : "",
3716       tj->order_by ? tj->order_by : "");
3717   g_string_free(select_fields, TRUE);
3718   if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3719     // ERROR 1146
3720     if (success_on_1146 && mysql_errno(conn) == 1146) {
3721       g_warning("Error dumping table (%s.%s) data: %s ", tj->database, tj->table,
3722                 mysql_error(conn));
3723     } else {
3724       g_critical("Error dumping table (%s.%s) data: %s ", tj->database, tj->table,
3725                  mysql_error(conn));
3726       errors++;
3727     }
3728     goto cleanup;
3729   }
3730 
3731   num_fields = mysql_num_fields(result);
3732   MYSQL_FIELD *fields = mysql_fetch_fields(result);
3733 
3734   MYSQL_ROW row;
3735 
3736   g_string_set_size(statement, 0);
3737 
3738   // TODO #364: this is the place where we need to link the column between file loaded and dbt.
3739   // Currently, we are using identity_function, which return the same data.
3740     for(i=0; i< num_fields;i++){
3741       if (i>0){
3742         dbt->anonymized_function=g_list_append(dbt->anonymized_function,&identity_function);
3743       }else{
3744         dbt->anonymized_function=g_list_append(dbt->anonymized_function,&identity_function);
3745       }
3746     }
3747 
3748   gboolean first_time=TRUE;
3749   /* Poor man's data dump code */
3750   while ((row = mysql_fetch_row(result))) {
3751     gulong *lengths = mysql_fetch_lengths(result);
3752     num_rows++;
3753 
3754     if (!statement->len) {
3755 
3756       // A file can be chunked by amount of rows or file size.
3757       //
3758       if (!st_in_file) {
3759         // File Header
3760         if (detected_server == SERVER_TYPE_MYSQL) {
3761           g_string_printf(statement,"%s;\n",set_names_str);
3762           g_string_append(statement, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n");
3763           if (!skip_tz) {
3764             g_string_append(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3765           }
3766         } else if (detected_server == SERVER_TYPE_TIDB) {
3767           if (!skip_tz) {
3768             g_string_printf(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3769           }
3770         } else {
3771           g_string_printf(statement, "SET FOREIGN_KEY_CHECKS=0;\n");
3772         }
3773 
3774         if (!write_data(file, statement)) {
3775           g_critical("Could not write out data for %s.%s", tj->database, tj->table);
3776           goto cleanup;
3777         }
3778       }
3779       if ( load_data ){
3780         if (first_time){
3781           load_data_fn=build_filename(dbt->database->filename, dbt->table_filename, fn, "dat");
3782 //	      load_data_fn=g_strdup_printf("%s%05d.dat%s", filename_prefix, fn,
3783 //			      (compress_output ? ".gz" : ""));
3784 	      g_string_printf(statement, "LOAD DATA LOCAL INFILE '%s' REPLACE INTO TABLE `%s` ",load_data_fn,tj->table);
3785 	      if (fields_terminated_by_ld)
3786 	      	g_string_append_printf(statement, "FIELDS TERMINATED BY '%s' ",fields_terminated_by_ld);
3787 	      if (fields_enclosed_by_ld)
3788 	        g_string_append_printf(statement, "ENCLOSED BY '%s' ",fields_enclosed_by_ld);
3789 	      if (fields_escaped_by)
3790           g_string_append_printf(statement, "ESCAPED BY '%s' ",fields_escaped_by);
3791 	      g_string_append(statement, "LINES ");
3792 	      if (lines_starting_by_ld)
3793 	        g_string_append_printf(statement, "STARTING BY '%s' ",lines_starting_by_ld);
3794 	      g_string_append_printf(statement, "TERMINATED BY '%s' (", lines_terminated_by_ld);
3795 
3796         append_columns(statement,fields,num_fields);
3797         g_string_append(statement,");\n");
3798 	      if (!write_data(main_file, statement)) {
3799 		      g_critical("Could not write out data for %s.%s", tj->database, tj->table);
3800 		      goto cleanup;
3801 	      }else{
3802           g_string_set_size(statement, 0);
3803           g_free(fcfile);
3804           fcfile=load_data_fn;
3805 
3806           if (!compress_output) {
3807             fclose((FILE *)file);
3808             file = g_fopen(fcfile, "a");
3809           } else {
3810             gzclose((gzFile)file);
3811             file = (void *)gzopen(fcfile, "a");
3812           }
3813 	      }
3814         first_time=FALSE;
3815         }
3816       }else{
3817         append_insert ((complete_insert || has_generated_fields), statement, tj->table, fields, num_fields);
3818       }
3819       num_rows_st = 0;
3820     }
3821 
3822     if (statement_row->len) {
3823       g_string_append(statement, statement_row->str);
3824       g_string_set_size(statement_row, 0);
3825       num_rows_st++;
3826     }
3827 
3828     g_string_append(statement_row, lines_starting_by);
3829     GList *f = dbt->anonymized_function;
3830     for (i = 0; i < num_fields; i++) {
3831       gchar * (*fun_ptr)(gchar **) = f->data;
3832       f=f->next;
3833       if (load_data){
3834         if (!row[i]) {
3835 //          g_string_append(statement_row,fields_enclosed_by);
3836           g_string_append(statement_row, "NULL");
3837 //          g_string_append(statement_row,fields_enclosed_by);
3838         }else if (fields[i].type != MYSQL_TYPE_LONG && fields[i].type != MYSQL_TYPE_LONGLONG  && fields[i].type != MYSQL_TYPE_INT24  && fields[i].type != MYSQL_TYPE_SHORT ){
3839           g_string_append(statement_row,fields_enclosed_by);
3840           g_string_append(statement_row,fun_ptr(&(row[i])));
3841           g_string_append(statement_row,fields_enclosed_by);
3842         }else
3843           g_string_append(statement_row,fun_ptr(&(row[i])));
3844       }else{
3845         /* Don't escape safe formats, saves some time */
3846         if (!row[i]) {
3847           g_string_append(statement_row, "NULL");
3848         } else if (fields[i].flags & NUM_FLAG) {
3849           g_string_append(statement_row, fun_ptr(&(row[i])));
3850         } else {
3851           /* We reuse buffers for string escaping, growing is expensive just at
3852            * the beginning */
3853           g_string_set_size(escaped, lengths[i] * 2 + 1);
3854           mysql_real_escape_string(conn, escaped->str, fun_ptr(&(row[i])), lengths[i]);
3855           if (fields[i].type == MYSQL_TYPE_JSON)
3856             g_string_append(statement_row, "CONVERT(");
3857           g_string_append_c(statement_row, '\"');
3858           g_string_append(statement_row, escaped->str);
3859           g_string_append_c(statement_row, '\"');
3860           if (fields[i].type == MYSQL_TYPE_JSON)
3861             g_string_append(statement_row, " USING UTF8MB4)");
3862         }
3863       }
3864       if (i < num_fields - 1) {
3865         g_string_append(statement_row, fields_terminated_by);
3866       } else {
3867         g_string_append_printf(statement_row,"%s", lines_terminated_by);
3868 
3869         /* INSERT statement is closed before over limit */
3870         if (statement->len + statement_row->len + 1 > statement_size) {
3871           if (num_rows_st == 0) {
3872             g_string_append(statement, statement_row->str);
3873             g_string_set_size(statement_row, 0);
3874             g_warning("Row bigger than statement_size for %s.%s", tj->database,
3875                       tj->table);
3876           }
3877           g_string_append(statement, statement_terminated_by);
3878 
3879           if (!write_data(file, statement)) {
3880             g_critical("Could not write out data for %s.%s", tj->database, tj->table);
3881             goto cleanup;
3882           } else {
3883             st_in_file++;
3884             if (chunk_filesize &&
3885                 st_in_file * (guint)ceil((float)statement_size / 1024 / 1024) >
3886                     chunk_filesize) {
3887               fn++;
3888               g_free(fcfile);
3889               m_close(file);
3890               fcfile = build_data_filename(dbt->database->filename, dbt->table_filename, fn);
3891               if (stream) g_async_queue_push(stream_queue, g_strdup(fcfile));
3892               file = m_open(fcfile,"w");
3893 
3894               st_in_file = 0;
3895             }
3896           }
3897           g_string_set_size(statement, 0);
3898         } else {
3899           if (num_rows_st && ! load_data)
3900             g_string_append_c(statement, ',');
3901           g_string_append(statement, statement_row->str);
3902           num_rows_st++;
3903           g_string_set_size(statement_row, 0);
3904         }
3905       }
3906     }
3907   }
3908   if (mysql_errno(conn)) {
3909     g_critical("Could not read data from %s.%s: %s", tj->database, tj->table,
3910                mysql_error(conn));
3911     errors++;
3912   }
3913 
3914   if (statement_row->len > 0) {
3915     /* this last row has not been written out */
3916     if (statement->len > 0) {
3917       /* strange, should not happen */
3918       g_string_append(statement, statement_row->str);
3919     } else {
3920       append_insert (complete_insert, statement, tj->table, fields, num_fields);
3921       g_string_append(statement, statement_row->str);
3922     }
3923   }
3924 
3925   if (statement->len > 0) {
3926     g_string_append(statement, statement_terminated_by);
3927     if (!write_data(file, statement)) {
3928       g_critical(
3929           "Could not write out closing newline for %s.%s, now this is sad!",
3930           tj->database, tj->table);
3931       goto cleanup;
3932     }
3933     st_in_file++;
3934   }
3935 
3936 cleanup:
3937   g_free(query);
3938 
3939   g_string_free(escaped, TRUE);
3940   g_string_free(statement, TRUE);
3941   g_string_free(statement_row, TRUE);
3942 
3943   if (result) {
3944     mysql_free_result(result);
3945   }
3946 
3947   if (file) {
3948     m_close(file);
3949   }
3950 
3951   if (!st_in_file && !build_empty_files) {
3952     // dropping the useless file
3953     if (remove(fcfile)) {
3954       g_warning("Failed to remove empty file : %s\n", fcfile);
3955     }
3956   } else if (chunk_filesize && fn == 0) {
3957     if (stream) g_async_queue_push(stream_queue, g_strdup(fcfile));
3958   }else{
3959     if (stream) g_async_queue_push(stream_queue, g_strdup(tj->filename));
3960   }
3961 
3962   g_mutex_lock(dbt->rows_lock);
3963   dbt->rows+=num_rows;
3964   g_mutex_unlock(dbt->rows_lock);
3965 
3966   g_free(fcfile);
3967 
3968   return num_rows;
3969 }
3970 
3971 
write_data(FILE * file,GString * data)3972 gboolean write_data(FILE *file, GString *data) {
3973   size_t written = 0;
3974   ssize_t r = 0;
3975   while (written < data->len) {
3976     r=m_write(file, data->str + written, data->len);
3977     if (r < 0) {
3978       g_critical("Couldn't write data to a file: %s", strerror(errno));
3979       errors++;
3980       return FALSE;
3981     }
3982     written += r;
3983   }
3984 
3985   return TRUE;
3986 }
3987