1 /*
2 This program is free software: you can redistribute it and/or modify
3 it under the terms of the GNU General Public License as published by
4 the Free Software Foundation, either version 3 of the License, or
5 (at your option) any later version.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program. If not, see <http://www.gnu.org/licenses/>.
14
15 Authors: Domas Mituzas, Facebook ( domas at fb dot com )
16 Mark Leith, Oracle Corporation (mark dot leith at oracle dot com)
17 Andrew Hutchings, SkySQL (andrew at skysql dot com)
18 Max Bubenick, Percona RDBA (max dot bubenick at percona dot com)
19 David Ducos, Percona (david dot ducos at percona dot com)
20 */
21
22 #define _LARGEFILE64_SOURCE
23 #define _FILE_OFFSET_BITS 64
24
25 #include <mysql.h>
26
27 #if defined MARIADB_CLIENT_VERSION_STR && !defined MYSQL_SERVER_VERSION
28 #define MYSQL_SERVER_VERSION MARIADB_CLIENT_VERSION_STR
29 #endif
30
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <glib.h>
35 #include <stdlib.h>
36 #include <stdarg.h>
37 #include <errno.h>
38 #include <time.h>
39 #ifdef ZWRAP_USE_ZSTD
40 #include "zstd/zstd_zlibwrapper.h"
41 #else
42 #include <zlib.h>
43 #endif
44 #include <pcre.h>
45 #include <signal.h>
46 #include <glib/gstdio.h>
47 #include <glib/gerror.h>
48 #include <gio/gio.h>
49 #include "config.h"
50 #include "mydumper.h"
51 #include "server_detect.h"
52 #include "connection.h"
53 #include "common.h"
54 #include "g_unix_signal.h"
55 #include <math.h>
56 #include "getPassword.h"
57 #include "logging.h"
58 #include "set_verbose.h"
59 # include "locale.h"
60
61 char *regexstring = NULL;
62
63 const char DIRECTORY[] = "export";
64
65 /* Some earlier versions of MySQL do not yet define MYSQL_TYPE_JSON */
66 #ifndef MYSQL_TYPE_JSON
67 #define MYSQL_TYPE_JSON 245
68 #endif
69
70 static GMutex *init_mutex = NULL;
71 static GMutex *ref_table_mutex = NULL;
72 /* Program options */
73 gchar *output_directory = NULL;
74 gchar *output_directory_param = NULL;
75 gchar *dump_directory = NULL;
76 guint statement_size = 1000000;
77 guint rows_per_file = 0;
78 guint chunk_filesize = 0;
79 int longquery = 60;
80 int longquery_retries = 0;
81 int longquery_retry_interval = 60;
82 int build_empty_files = 0;
83 int skip_tz = 0;
84 int need_dummy_read = 0;
85 int need_dummy_toku_read = 0;
86 int compress_output = 0;
87 int killqueries = 0;
88 int lock_all_tables = 0;
89 int sync_wait = -1;
90 guint snapshot_count= 2;
91 guint snapshot_interval = 60;
92 gboolean daemon_mode = FALSE;
93 gboolean have_snapshot_cloning = FALSE;
94 gboolean ignore_generated_fields = FALSE;
95
96 gchar *ignore_engines = NULL;
97 char **ignore = NULL;
98 gchar *tables_list = NULL;
99 gchar *tidb_snapshot = NULL;
100 GSequence *tables_skiplist = NULL;
101 gchar *tables_skiplist_file = NULL;
102 char **tables = NULL;
103 GList *no_updated_tables = NULL;
104
105 gboolean no_schemas = FALSE;
106 gboolean dump_checksums = FALSE;
107 gboolean no_data = FALSE;
108 gboolean no_locks = FALSE;
109 gboolean dump_triggers = FALSE;
110 gboolean dump_events = FALSE;
111 gboolean dump_routines = FALSE;
112 gboolean no_dump_views = FALSE;
113 gboolean less_locking = FALSE;
114 gboolean use_savepoints = FALSE;
115 gboolean success_on_1146 = FALSE;
116 gboolean no_backup_locks = FALSE;
117 gboolean insert_ignore = FALSE;
118 gboolean load_data = FALSE;
119 gboolean order_by_primary_key = FALSE;
120
121 GList *innodb_tables = NULL;
122 GMutex *innodb_tables_mutex = NULL;
123 GList *non_innodb_table = NULL;
124 GMutex *non_innodb_table_mutex = NULL;
125 GList *table_schemas = NULL;
126 GMutex *table_schemas_mutex = NULL;
127 GList *view_schemas = NULL;
128 GMutex *view_schemas_mutex = NULL;
129 GList *schema_post = NULL;
130 GMutex *schema_post_mutex = NULL;
131 gint database_counter = 0;
132 gint non_innodb_table_counter = 0;
133 gint non_innodb_done = 0;
134 guint less_locking_threads = 0;
135 guint updated_since = 0;
136 guint trx_consistency_only = 0;
137 guint complete_insert = 0;
138 gchar *set_names_str=NULL;
139 gchar *where_option=NULL;
140 guint64 max_rows=1000000;
141 GHashTable *database_hash=NULL;
142 GHashTable *ref_table=NULL;
143 guint table_number;
144
145 gchar *fields_terminated_by=NULL;
146 gchar *fields_enclosed_by=NULL;
147 gchar *fields_escaped_by=NULL;
148 gchar *lines_starting_by=NULL;
149 gchar *lines_terminated_by=NULL;
150 gchar *statement_terminated_by=NULL;
151
152 gchar *fields_enclosed_by_ld=NULL;
153 gchar *fields_terminated_by_ld=NULL;
154 gchar *lines_starting_by_ld=NULL;
155 gchar *lines_terminated_by_ld=NULL;
156 gchar *statement_terminated_by_ld=NULL;
157
158 // For daemon mode
159 guint dump_number = 0;
160 guint binlog_connect_id = 0;
161 gboolean shutdown_triggered = FALSE;
162 GAsyncQueue *start_scheduled_dump;
163
164 GMainLoop *m1;
165 static GCond *ll_cond = NULL;
166 static GMutex *ll_mutex = NULL;
167
168 int errors;
169
170 static GOptionEntry entries[] = {
171 {"database", 'B', 0, G_OPTION_ARG_STRING, &db, "Database to dump", NULL},
172 {"tables-list", 'T', 0, G_OPTION_ARG_STRING, &tables_list,
173 "Comma delimited table list to dump (does not exclude regex option)",
174 NULL},
175 {"omit-from-file", 'O', 0, G_OPTION_ARG_STRING, &tables_skiplist_file,
176 "File containing a list of database.table entries to skip, one per line "
177 "(skips before applying regex option)",
178 NULL},
179 {"outputdir", 'o', 0, G_OPTION_ARG_FILENAME, &output_directory_param,
180 "Directory to output files to", NULL},
181 {"statement-size", 's', 0, G_OPTION_ARG_INT, &statement_size,
182 "Attempted size of INSERT statement in bytes, default 1000000", NULL},
183 {"rows", 'r', 0, G_OPTION_ARG_INT, &rows_per_file,
184 "Try to split tables into chunks of this many rows. This option turns off "
185 "--chunk-filesize",
186 NULL},
187 {"chunk-filesize", 'F', 0, G_OPTION_ARG_INT, &chunk_filesize,
188 "Split tables into chunks of this output file size. This value is in MB",
189 NULL},
190 {"max-rows", 0, 0, G_OPTION_ARG_INT64, &max_rows,
191 "Limit the amounto of rows per block after the table is estimated, default 1000000", NULL},
192 {"compress", 'c', 0, G_OPTION_ARG_NONE, &compress_output,
193 "Compress output files", NULL},
194 {"build-empty-files", 'e', 0, G_OPTION_ARG_NONE, &build_empty_files,
195 "Build dump files even if no data available from table", NULL},
196 {"regex", 'x', 0, G_OPTION_ARG_STRING, ®exstring,
197 "Regular expression for 'db.table' matching", NULL},
198 {"ignore-engines", 'i', 0, G_OPTION_ARG_STRING, &ignore_engines,
199 "Comma delimited list of storage engines to ignore", NULL},
200 {"insert-ignore", 'N', 0, G_OPTION_ARG_NONE, &insert_ignore,
201 "Dump rows with INSERT IGNORE", NULL},
202 {"no-schemas", 'm', 0, G_OPTION_ARG_NONE, &no_schemas,
203 "Do not dump table schemas with the data", NULL},
204 {"table-checksums", 'M', 0, G_OPTION_ARG_NONE, &dump_checksums,
205 "Dump table checksums with the data", NULL},
206 {"no-data", 'd', 0, G_OPTION_ARG_NONE, &no_data, "Do not dump table data",
207 NULL},
208 {"order-by-primary", 0, 0, G_OPTION_ARG_NONE, &order_by_primary_key,
209 "Sort the data by Primary Key or Unique key if no primary key exists",
210 NULL},
211 {"triggers", 'G', 0, G_OPTION_ARG_NONE, &dump_triggers, "Dump triggers",
212 NULL},
213 {"events", 'E', 0, G_OPTION_ARG_NONE, &dump_events, "Dump events", NULL},
214 {"routines", 'R', 0, G_OPTION_ARG_NONE, &dump_routines,
215 "Dump stored procedures and functions", NULL},
216 {"no-views", 'W', 0, G_OPTION_ARG_NONE, &no_dump_views, "Do not dump VIEWs",
217 NULL},
218 {"no-locks", 'k', 0, G_OPTION_ARG_NONE, &no_locks,
219 "Do not execute the temporary shared read lock. WARNING: This will cause "
220 "inconsistent backups",
221 NULL},
222 {"no-backup-locks", 0, 0, G_OPTION_ARG_NONE, &no_backup_locks,
223 "Do not use Percona backup locks", NULL},
224 {"less-locking", 0, 0, G_OPTION_ARG_NONE, &less_locking,
225 "Minimize locking time on InnoDB tables.", NULL},
226 {"long-query-retries", 0, 0, G_OPTION_ARG_INT, &longquery_retries,
227 "Retry checking for long queries, default 0 (do not retry)", NULL},
228 {"long-query-retry-interval", 0, 0, G_OPTION_ARG_INT, &longquery_retry_interval,
229 "Time to wait before retrying the long query check in seconds, default 60", NULL},
230 {"long-query-guard", 'l', 0, G_OPTION_ARG_INT, &longquery,
231 "Set long query timer in seconds, default 60", NULL},
232 {"kill-long-queries", 'K', 0, G_OPTION_ARG_NONE, &killqueries,
233 "Kill long running queries (instead of aborting)", NULL},
234 {"daemon", 'D', 0, G_OPTION_ARG_NONE, &daemon_mode, "Enable daemon mode",
235 NULL},
236 {"snapshot-count", 'X', 0, G_OPTION_ARG_INT, &snapshot_count, "number of snapshots, default 2", NULL},
237 {"snapshot-interval", 'I', 0, G_OPTION_ARG_INT, &snapshot_interval,
238 "Interval between each dump snapshot (in minutes), requires --daemon, "
239 "default 60",
240 NULL},
241 {"logfile", 'L', 0, G_OPTION_ARG_FILENAME, &logfile,
242 "Log file name to use, by default stdout is used", NULL},
243 {"tz-utc", 0, G_OPTION_FLAG_REVERSE, G_OPTION_ARG_NONE, &skip_tz,
244 "SET TIME_ZONE='+00:00' at top of dump to allow dumping of TIMESTAMP data "
245 "when a server has data in different time zones or data is being moved "
246 "between servers with different time zones, defaults to on use "
247 "--skip-tz-utc to disable.",
248 NULL},
249 {"skip-tz-utc", 0, 0, G_OPTION_ARG_NONE, &skip_tz, "", NULL},
250 {"use-savepoints", 0, 0, G_OPTION_ARG_NONE, &use_savepoints,
251 "Use savepoints to reduce metadata locking issues, needs SUPER privilege",
252 NULL},
253 {"success-on-1146", 0, 0, G_OPTION_ARG_NONE, &success_on_1146,
254 "Not increment error count and Warning instead of Critical in case of "
255 "table doesn't exist",
256 NULL},
257 {"lock-all-tables", 0, 0, G_OPTION_ARG_NONE, &lock_all_tables,
258 "Use LOCK TABLE for all, instead of FTWRL", NULL},
259 {"updated-since", 'U', 0, G_OPTION_ARG_INT, &updated_since,
260 "Use Update_time to dump only tables updated in the last U days", NULL},
261 {"trx-consistency-only", 0, 0, G_OPTION_ARG_NONE, &trx_consistency_only,
262 "Transactional consistency only", NULL},
263 {"complete-insert", 0, 0, G_OPTION_ARG_NONE, &complete_insert,
264 "Use complete INSERT statements that include column names", NULL},
265 { "set-names",0, 0, G_OPTION_ARG_STRING, &set_names_str,
266 "Sets the names, use it at your own risk, default binary", NULL },
267 {"tidb-snapshot", 'z', 0, G_OPTION_ARG_STRING, &tidb_snapshot,
268 "Snapshot to use for TiDB", NULL},
269 {"load-data", 0, 0, G_OPTION_ARG_NONE, &load_data,
270 "", NULL },
271 {"fields-terminated-by", 0, 0, G_OPTION_ARG_STRING, &fields_terminated_by_ld,"", NULL },
272 {"fields-enclosed-by", 0, 0, G_OPTION_ARG_STRING, &fields_enclosed_by_ld,"", NULL },
273 {"fields-escaped-by", 0, 0, G_OPTION_ARG_STRING, &fields_escaped_by,
274 "Single character that is going to be used to escape characters in the"
275 "LOAD DATA stament, default: '\\' ", NULL },
276 {"lines-starting-by", 0, 0, G_OPTION_ARG_STRING, &lines_starting_by_ld,
277 "Adds the string at the begining of each row. When --load-data is used"
278 "it is added to the LOAD DATA statement. Its affects INSERT INTO statements"
279 "also when it is used.", NULL },
280 {"lines-terminated-by", 0, 0, G_OPTION_ARG_STRING, &lines_terminated_by_ld,
281 "Adds the string at the end of each row. When --load-data is used it is"
282 "added to the LOAD DATA statement. Its affects INSERT INTO statements"
283 "also when it is used.", NULL },
284 {"statement-terminated-by", 0, 0, G_OPTION_ARG_STRING, &statement_terminated_by_ld,
285 "This might never be used, unless you know what are you doing", NULL },
286 {"sync-wait", 0, 0, G_OPTION_ARG_INT, &sync_wait,
287 "WSREP_SYNC_WAIT value to set at SESSION level", NULL},
288 { "where", 0, 0, G_OPTION_ARG_STRING, &where_option,
289 "Dump only selected records.", NULL },
290 { "no-check-generated-fields", 0, 0, G_OPTION_ARG_NONE, &ignore_generated_fields,
291 "Queries related to generated fields are not going to be executed."
292 "It will lead to restoration issues if you have generated columns", NULL },
293 {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}};
294
295 struct tm tval;
296
297 void dump_schema_data(MYSQL *conn, char *database, char *table, char *filename);
298 void dump_triggers_data(MYSQL *conn, char *database, char *table,
299 char *filename);
300 void dump_view_data(MYSQL *conn, char *database, char *table, char *filename,
301 char *filename2);
302 void dump_schema(MYSQL *conn, struct db_table *dbt,
303 struct configuration *conf);
304 void dump_checksum(struct db_table * dbt,
305 struct configuration *conf);
306 void dump_view(struct db_table *dbt, struct configuration *conf);
307 void dump_table(MYSQL *conn, struct db_table *dbt,
308 struct configuration *conf, gboolean is_innodb);
309 void dump_tables(MYSQL *, GList *, struct configuration *);
310 void dump_schema_post(struct database *database, struct configuration *conf);
311 void restore_charset(GString *statement);
312 void set_charset(GString *statement, char *character_set,
313 char *collation_connection);
314 void dump_schema_post_data(MYSQL *conn, struct database *database, char *filename);
315 guint64 dump_table_data(MYSQL *conn, FILE *file, struct table_job *tj);
316 void dump_database(struct database *database, struct configuration *);
317 void dump_database_thread(MYSQL *, struct configuration*, struct database *);
318 void dump_create_database(char *, struct configuration *);
319 void dump_create_database_data(MYSQL *, char *, char *);
320 void get_tables(MYSQL *conn, struct configuration *);
321 gchar *get_primary_key_string(MYSQL *conn, char *database, char *table);
322 void get_not_updated(MYSQL *conn, FILE *);
323 GList *get_chunks_for_table(MYSQL *, char *, char *,
324 struct configuration *conf);
325 guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field,
326 char *from, char *to);
327 void dump_table_data_file(MYSQL *conn, struct table_job * tj);
328 void dump_table_checksum(MYSQL *conn, char *database, char *table, char *filename);
329 //void create_backup_dir(char *directory);
330 gboolean write_data(FILE *, GString *);
331 gboolean check_regex(char *database, char *table);
332 gboolean check_skiplist(char *database, char *table);
333 int tables_skiplist_cmp(gconstpointer a, gconstpointer b, gpointer user_data);
334 void read_tables_skiplist(const gchar *filename);
335 void start_dump(MYSQL *conn);
336 MYSQL *create_main_connection();
337 void *exec_thread(void *data);
338 void write_log_file(const gchar *log_domain, GLogLevelFlags log_level,
339 const gchar *message, gpointer user_data);
340 struct database * new_database(MYSQL *conn, char *database_name, gboolean already_dumped);
341 gchar *get_ref_table(gchar *k);
342 gboolean get_database(MYSQL *conn, char *database_name, struct database ** database);
343
check_regex_general(char * regex,char * word)344 gboolean check_regex_general(char *regex, char *word) {
345 /* This is not going to be used in threads */
346 static pcre *re = NULL;
347 int rc;
348 int ovector[9] = {0};
349 const char *error;
350 int erroroffset;
351
352 /* Let's compile the RE before we do anything */
353 if (!re) {
354 re = pcre_compile(regex, PCRE_CASELESS | PCRE_MULTILINE, &error,
355 &erroroffset, NULL);
356 if (!re) {
357 g_critical("Regular expression fail: %s", error);
358 exit(EXIT_FAILURE);
359 }
360 }
361
362 rc = pcre_exec(re, NULL, word, strlen(word), 0, 0, ovector, 9);
363 return (rc > 0) ? TRUE : FALSE;
364 }
365
determine_filename(char * table)366 char * determine_filename (char * table){
367 // https://stackoverflow.com/questions/11794144/regular-expression-for-valid-filename
368 // We might need to define a better filename alternatives
369 char * regex=strdup("^[\\w\\-_ ]+$");
370 if (check_regex_general(regex,table) && !g_strstr_len(table,-1,".") && !g_str_has_prefix(table,"mydumper_") )
371 return table;
372 else{
373 char *r = g_strdup_printf("mydumper_%d",table_number);
374 table_number++;
375 return r;
376 }
377
378 }
379
sig_triggered(gpointer user_data)380 gboolean sig_triggered(gpointer user_data) {
381 (void)user_data;
382
383 g_message("Shutting down gracefully");
384 shutdown_triggered = TRUE;
385 g_main_loop_quit(m1);
386 return FALSE;
387 }
388
389
build_meta_filename(char * database,char * table,const char * suffix)390 gchar * build_meta_filename(char *database, char *table, const char *suffix){
391 GString *filename = g_string_sized_new(20);
392 g_string_append_printf(filename, "%s.%s-%s", database, table, suffix);
393 gchar *r = g_build_filename(dump_directory, filename->str, NULL);
394 g_string_free(filename,TRUE);
395 return r;
396 }
397
build_schema_filename(char * database,const char * suffix)398 gchar * build_schema_filename(char *database, const char *suffix){
399 GString *filename = g_string_sized_new(20);
400 g_string_append_printf(filename, "%s-%s.sql%s", database, suffix, compress_extension);
401 gchar *r = g_build_filename(dump_directory, filename->str, NULL);
402 g_string_free(filename,TRUE);
403 return r;
404 }
405
build_schema_table_filename(char * database,char * table,const char * suffix)406 gchar * build_schema_table_filename(char *database, char *table, const char *suffix){
407 GString *filename = g_string_sized_new(20);
408 g_string_append_printf(filename, "%s.%s-%s.sql%s", database, table, suffix, compress_extension);
409 gchar *r = g_build_filename(dump_directory, filename->str, NULL);
410 g_string_free(filename,TRUE);
411 return r;
412 }
413
414 // Global Var used:
415 // - dump_directory
416 // - compress_extension
build_filename(char * database,char * table,guint part,const gchar * extension)417 gchar * build_filename(char *database, char *table, guint part, const gchar *extension){
418 GString *filename = g_string_sized_new(20);
419 g_string_append_printf(filename, "%s.%s.%05d.%s%s", database, table, part, extension, compress_extension);
420 gchar *r = g_build_filename(dump_directory, filename->str, NULL);
421 g_string_free(filename,TRUE);
422 return r;
423 }
424
425
build_data_filename(char * database,char * table,guint part)426 gchar * build_data_filename(char *database, char *table, guint part ){
427 return build_filename(database,table,part,"sql");
428 }
429
clear_dump_directory(gchar * directory)430 void clear_dump_directory(gchar *directory) {
431 GError *error = NULL;
432 GDir *dir = g_dir_open(directory, 0, &error);
433
434 if (error) {
435 g_critical("cannot open directory %s, %s\n", directory,
436 error->message);
437 errors++;
438 return;
439 }
440
441 const gchar *filename = NULL;
442
443 while ((filename = g_dir_read_name(dir))) {
444 gchar *path = g_build_filename(directory, filename, NULL);
445 if (g_unlink(path) == -1) {
446 g_critical("error removing file %s (%d)\n", path, errno);
447 errors++;
448 return;
449 }
450 g_free(path);
451 }
452
453 g_dir_close(dir);
454 }
455
run_snapshot(gpointer * data)456 gboolean run_snapshot(gpointer *data) {
457 (void)data;
458
459 g_async_queue_push(start_scheduled_dump, GINT_TO_POINTER(1));
460
461 return (shutdown_triggered) ? FALSE : TRUE;
462 }
463
464
465 /* Check database.table string against regular expression */
466
check_regex(char * database,char * table)467 gboolean check_regex(char *database, char *table) {
468 /* This is not going to be used in threads */
469 static pcre *re = NULL;
470 int rc;
471 int ovector[9] = {0};
472 const char *error;
473 int erroroffset;
474
475 char *p;
476
477 /* Let's compile the RE before we do anything */
478 if (!re) {
479 re = pcre_compile(regexstring, PCRE_CASELESS | PCRE_MULTILINE, &error,
480 &erroroffset, NULL);
481 if (!re) {
482 g_critical("Regular expression fail: %s", error);
483 exit(EXIT_FAILURE);
484 }
485 }
486
487 p = g_strdup_printf("%s.%s", database, table);
488 rc = pcre_exec(re, NULL, p, strlen(p), 0, 0, ovector, 9);
489 g_free(p);
490
491 return (rc > 0) ? TRUE : FALSE;
492 }
493
494 /* Check database.table string against skip list; returns TRUE if found */
495
check_skiplist(char * database,char * table)496 gboolean check_skiplist(char *database, char *table) {
497 if (g_sequence_lookup(tables_skiplist,
498 g_strdup_printf("%s.%s", database, table),
499 tables_skiplist_cmp, NULL)) {
500 return TRUE;
501 } else {
502 return FALSE;
503 };
504 }
505
506 /* Comparison function for skiplist sort and lookup */
507
tables_skiplist_cmp(gconstpointer a,gconstpointer b,gpointer user_data)508 int tables_skiplist_cmp(gconstpointer a, gconstpointer b, gpointer user_data) {
509 /* Not using user_data, but needed for function prototype, shutting up
510 * compiler warnings about unused variable */
511 (void)user_data;
512 /* Any sorting function would work, as long as its usage is consistent
513 * between sort and lookup. strcmp should be one of the fastest. */
514 return strcmp(a, b);
515 }
516
517 /* Read the list of tables to skip from the given filename, and prepares them
518 * for future lookups. */
519
read_tables_skiplist(const gchar * filename)520 void read_tables_skiplist(const gchar *filename) {
521 GIOChannel *tables_skiplist_channel = NULL;
522 gchar *buf = NULL;
523 GError *error = NULL;
524 /* Create skiplist if it does not exist */
525 if (!tables_skiplist) {
526 tables_skiplist = g_sequence_new(NULL);
527 };
528 tables_skiplist_channel = g_io_channel_new_file(filename, "r", &error);
529
530 /* Error opening/reading the file? bail out. */
531 if (!tables_skiplist_channel) {
532 g_critical("cannot read/open file %s, %s\n", filename, error->message);
533 errors++;
534 return;
535 };
536
537 /* Read lines, push them to the list */
538 do {
539 g_io_channel_read_line(tables_skiplist_channel, &buf, NULL, NULL, NULL);
540 if (buf) {
541 g_strchomp(buf);
542 g_sequence_append(tables_skiplist, buf);
543 };
544 } while (buf);
545 g_io_channel_shutdown(tables_skiplist_channel, FALSE, NULL);
546 /* Sort the list, so that lookups work */
547 g_sequence_sort(tables_skiplist, tables_skiplist_cmp, NULL);
548 g_message("Omit list file contains %d tables to skip\n",
549 g_sequence_get_length(tables_skiplist));
550 return;
551 }
552
553 /* Write some stuff we know about snapshot, before it changes */
write_snapshot_info(MYSQL * conn,FILE * file)554 void write_snapshot_info(MYSQL *conn, FILE *file) {
555 MYSQL_RES *master = NULL, *slave = NULL, *mdb = NULL;
556 MYSQL_FIELD *fields;
557 MYSQL_ROW row;
558
559 char *masterlog = NULL;
560 char *masterpos = NULL;
561 char *mastergtid = NULL;
562
563 char *connname = NULL;
564 char *slavehost = NULL;
565 char *slavelog = NULL;
566 char *slavepos = NULL;
567 char *slavegtid = NULL;
568 guint isms;
569 guint i;
570
571 mysql_query(conn, "SHOW MASTER STATUS");
572 master = mysql_store_result(conn);
573 if (master && (row = mysql_fetch_row(master))) {
574 masterlog = row[0];
575 masterpos = row[1];
576 /* Oracle/Percona GTID */
577 if (mysql_num_fields(master) == 5) {
578 mastergtid = row[4];
579 } else {
580 /* Let's try with MariaDB 10.x */
581 /* Use gtid_binlog_pos due to issue with gtid_current_pos with galera
582 * cluster, gtid_binlog_pos works as well with normal mariadb server
583 * https://jira.mariadb.org/browse/MDEV-10279 */
584 mysql_query(conn, "SELECT @@gtid_binlog_pos");
585 mdb = mysql_store_result(conn);
586 if (mdb && (row = mysql_fetch_row(mdb))) {
587 mastergtid = row[0];
588 }
589 }
590 }
591
592 if (masterlog) {
593 fprintf(file, "SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n",
594 masterlog, masterpos, mastergtid);
595 g_message("Written master status");
596 }
597
598 isms = 0;
599 mysql_query(conn, "SELECT @@default_master_connection");
600 MYSQL_RES *rest = mysql_store_result(conn);
601 if (rest != NULL && mysql_num_rows(rest)) {
602 mysql_free_result(rest);
603 g_message("Multisource slave detected.");
604 isms = 1;
605 }
606
607 if (isms)
608 mysql_query(conn, "SHOW ALL SLAVES STATUS");
609 else
610 mysql_query(conn, "SHOW SLAVE STATUS");
611
612 guint slave_count=0;
613 slave = mysql_store_result(conn);
614 while (slave && (row = mysql_fetch_row(slave))) {
615 fields = mysql_fetch_fields(slave);
616 for (i = 0; i < mysql_num_fields(slave); i++) {
617 if (isms && !strcasecmp("connection_name", fields[i].name))
618 connname = row[i];
619 if (!strcasecmp("exec_master_log_pos", fields[i].name)) {
620 slavepos = row[i];
621 } else if (!strcasecmp("relay_master_log_file", fields[i].name)) {
622 slavelog = row[i];
623 } else if (!strcasecmp("master_host", fields[i].name)) {
624 slavehost = row[i];
625 } else if (!strcasecmp("Executed_Gtid_Set", fields[i].name) ||
626 !strcasecmp("Gtid_Slave_Pos", fields[i].name)) {
627 slavegtid = row[i];
628 }
629 }
630 if (slavehost) {
631 slave_count++;
632 fprintf(file, "SHOW SLAVE STATUS:");
633 if (isms)
634 fprintf(file, "\n\tConnection name: %s", connname);
635 fprintf(file, "\n\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n",
636 slavehost, slavelog, slavepos, slavegtid);
637 g_message("Written slave status");
638 }
639 }
640 if (slave_count > 1)
641 g_warning("Multisource replication found. Do not trust in the exec_master_log_pos as it might cause data inconsistencies. Search 'Replication and Transaction Inconsistencies' on MySQL Documentation");
642
643 fflush(file);
644 if (master)
645 mysql_free_result(master);
646 if (slave)
647 mysql_free_result(slave);
648 if (mdb)
649 mysql_free_result(mdb);
650 }
651
652 // Free structures
653
free_table_job(struct table_job * tj)654 void free_table_job(struct table_job *tj){
655 if (tj->table)
656 g_free(tj->table);
657 if (tj->where)
658 g_free(tj->where);
659 if (tj->order_by)
660 g_free(tj->order_by);
661 if (tj->filename)
662 g_free(tj->filename);
663 // g_free(tj);
664 }
665
free_schema_job(struct schema_job * sj)666 void free_schema_job(struct schema_job *sj){
667 if (sj->table)
668 g_free(sj->table);
669 if (sj->filename)
670 g_free(sj->filename);
671 // g_free(sj);
672 }
673
free_table_checksum_job(struct table_checksum_job * tcj)674 void free_table_checksum_job(struct table_checksum_job*tcj){
675 if (tcj->table)
676 g_free(tcj->table);
677 if (tcj->filename)
678 g_free(tcj->filename);
679 // g_free(tcj);
680 }
free_view_job(struct view_job * vj)681 void free_view_job(struct view_job *vj){
682 if (vj->table)
683 g_free(vj->table);
684 if (vj->filename)
685 g_free(vj->filename);
686 if (vj->filename2)
687 g_free(vj->filename2);
688 // g_free(vj);
689 }
690
free_schema_post_job(struct schema_post_job * sp)691 void free_schema_post_job(struct schema_post_job *sp){
692 if (sp->filename)
693 g_free(sp->filename);
694 // g_free(sp);
695 }
696
free_create_database_job(struct create_database_job * cdj)697 void free_create_database_job(struct create_database_job * cdj){
698 if (cdj->filename)
699 g_free(cdj->filename);
700 // g_free(cdj);
701 }
702
message_dumping_data(struct thread_data * td,struct table_job * tj)703 void message_dumping_data(struct thread_data *td, struct table_job *tj){
704 g_message("Thread %d dumping data for `%s`.`%s`%s%s%s%s%s%s | Remaining jobs: %d",
705 td->thread_id, tj->database, tj->table,
706 (tj->where || where_option ) ? " WHERE " : "", tj->where ? tj->where : "",
707 (tj->where && where_option ) ? " AND " : "", where_option ? where_option : "",
708 tj->order_by ? " ORDER BY " : "", tj->order_by ? tj->order_by : "", g_async_queue_length(td->queue));
709 }
710
process_stream(void * data)711 void *process_stream(void *data){
712 (void)data;
713 char * filename=NULL;
714 FILE * f=NULL;
715 char buf[1024];
716 int buflen;
717 ssize_t len=0;
718 for(;;){
719 filename=(char *)g_async_queue_pop(stream_queue);
720 if (strlen(filename) == 0){
721 break;
722 }
723 char *used_filemame=g_path_get_basename(filename);
724 len=m_write(stdout, "\n-- ", 4);
725 len=m_write(stdout, used_filemame, strlen(used_filemame));
726 len=m_write(stdout, "\n", 1);
727 free(used_filemame);
728 f=m_open(filename,"r");
729 while((buflen = read(fileno(f), buf, 1024)) > 0)
730 {
731 len=m_write(stdout, buf, buflen);
732 if (len != buflen){
733 g_critical("Stream failed during transmition of file: %s",filename);
734 exit(EXIT_FAILURE);
735 }
736 }
737 m_close(f);
738 if (no_delete == FALSE){
739 remove(filename);
740 }
741 }
742 return NULL;
743 }
744
745
746
thd_JOB_DUMP_DATABASE(struct configuration * conf,struct thread_data * td,struct job * job)747 void thd_JOB_DUMP_DATABASE(struct configuration *conf, struct thread_data *td, struct job *job){
748 struct dump_database_job * ddj = (struct dump_database_job *)job->job_data;
749 g_message("Thread %d dumping db information for `%s`", td->thread_id,
750 ddj->database->name);
751 dump_database_thread(td->thrconn, conf, ddj->database);
752 g_free(ddj);
753 g_free(job);
754 if (g_atomic_int_dec_and_test(&database_counter)) {
755 g_async_queue_push(conf->ready_database_dump, GINT_TO_POINTER(1));
756 }
757 }
758
thd_JOB_CREATE_DATABASE(struct thread_data * td,struct job * job)759 void thd_JOB_CREATE_DATABASE(struct thread_data *td, struct job *job){
760 struct create_database_job * cdj = (struct create_database_job *)job->job_data;
761 g_message("Thread %d dumping schema create for `%s`", td->thread_id,
762 cdj->database);
763 dump_create_database_data(td->thrconn, cdj->database, cdj->filename);
764 free_create_database_job(cdj);
765 g_free(job);
766 }
767
thd_JOB_SCHEMA_POST(struct thread_data * td,struct job * job)768 void thd_JOB_SCHEMA_POST(struct thread_data *td, struct job *job){
769 struct schema_post_job * sp = (struct schema_post_job *)job->job_data;
770 g_message("Thread %d dumping SP and VIEWs for `%s`", td->thread_id,
771 sp->database->name);
772 dump_schema_post_data(td->thrconn, sp->database, sp->filename);
773 free_schema_post_job(sp);
774 g_free(job);
775 }
776
thd_JOB_VIEW(struct thread_data * td,struct job * job)777 void thd_JOB_VIEW(struct thread_data *td, struct job *job){
778 struct view_job * vj = (struct view_job *)job->job_data;
779 g_message("Thread %d dumping view for `%s`.`%s`", td->thread_id,
780 vj->database, vj->table);
781 dump_view_data(td->thrconn, vj->database, vj->table, vj->filename,
782 vj->filename2);
783 free_view_job(vj);
784 g_free(job);
785 }
786
thd_JOB_SCHEMA(struct thread_data * td,struct job * job)787 void thd_JOB_SCHEMA(struct thread_data *td, struct job *job){
788 struct schema_job *sj = (struct schema_job *)job->job_data;
789 g_message("Thread %d dumping schema for `%s`.`%s`", td->thread_id,
790 sj->database, sj->table);
791 dump_schema_data(td->thrconn, sj->database, sj->table, sj->filename);
792 free_schema_job(sj);
793 g_free(job);
794 }
795
thd_JOB_TRIGGERS(struct thread_data * td,struct job * job)796 void thd_JOB_TRIGGERS(struct thread_data *td, struct job *job){
797 struct schema_job * sj = (struct schema_job *)job->job_data;
798 g_message("Thread %d dumping triggers for `%s`.`%s`", td->thread_id,
799 sj->database, sj->table);
800 dump_triggers_data(td->thrconn, sj->database, sj->table, sj->filename);
801 free_schema_job(sj);
802 g_free(job);
803 }
804
initialize_thread(struct thread_data * td)805 void initialize_thread(struct thread_data *td){
806 configure_connection(td->thrconn, "mydumper");
807
808 if (!mysql_real_connect(td->thrconn, hostname, username, password, NULL, port,
809 socket_path, 0)) {
810 g_critical("Failed to connect to database: %s", mysql_error(td->thrconn));
811 exit(EXIT_FAILURE);
812 } else {
813 g_message("Thread %d connected using MySQL connection ID %lu",
814 td->thread_id, mysql_thread_id(td->thrconn));
815 }
816
817 }
818
initialize_consistent_snapshot(struct thread_data * td)819 void initialize_consistent_snapshot(struct thread_data *td){
820
821 if ( sync_wait != -1 && mysql_query(td->thrconn, g_strdup_printf("SET SESSION WSREP_SYNC_WAIT = %d",sync_wait))){
822 g_critical("Failed to set wsrep_sync_wait for the thread: %s",
823 mysql_error(td->thrconn));
824 exit(EXIT_FAILURE);
825 }
826 if (mysql_query(td->thrconn,
827 "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")) {
828 g_critical("Failed to set isolation level: %s", mysql_error(td->thrconn));
829 exit(EXIT_FAILURE);
830 }
831 if (mysql_query(td->thrconn,
832 "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */")) {
833 g_critical("Failed to start consistent snapshot: %s", mysql_error(td->thrconn));
834 exit(EXIT_FAILURE);
835 }
836 }
837
check_connection_status(struct thread_data * td)838 void check_connection_status(struct thread_data *td){
839 if (detected_server == SERVER_TYPE_TIDB) {
840 // Worker threads must set their tidb_snapshot in order to be safe
841 // Because no locking has been used.
842 gchar *query =
843 g_strdup_printf("SET SESSION tidb_snapshot = '%s'", tidb_snapshot);
844 if (mysql_query(td->thrconn, query)) {
845 g_critical("Failed to set tidb_snapshot: %s", mysql_error(td->thrconn));
846 exit(EXIT_FAILURE);
847 }
848 g_free(query);
849 g_message("Thread %d set to tidb_snapshot '%s'", td->thread_id,
850 tidb_snapshot);
851 }
852
853 /* Unfortunately version before 4.1.8 did not support consistent snapshot
854 * transaction starts, so we cheat */
855 if (need_dummy_read) {
856 mysql_query(td->thrconn,
857 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy");
858 MYSQL_RES *res = mysql_store_result(td->thrconn);
859 if (res)
860 mysql_free_result(res);
861 }
862 if (need_dummy_toku_read) {
863 mysql_query(td->thrconn,
864 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.tokudbdummy");
865 MYSQL_RES *res = mysql_store_result(td->thrconn);
866 if (res)
867 mysql_free_result(res);
868 }
869 }
870
process_queue(struct thread_data * td)871 void *process_queue(struct thread_data *td) {
872 struct configuration *conf = td->conf;
873 // mysql_init is not thread safe, especially in Connector/C
874 g_mutex_lock(init_mutex);
875 td->thrconn = mysql_init(NULL);
876 g_mutex_unlock(init_mutex);
877
878 initialize_thread(td);
879 execute_gstring(td->thrconn, set_session);
880
881 // if ((detected_server == SERVER_TYPE_MYSQL) &&
882 // mysql_query(thrconn, "SET SESSION wait_timeout = 2147483")) {
883 // g_warning("Failed to increase wait_timeout: %s", mysql_error(thrconn));
884 // }
885
886 if (!skip_tz && mysql_query(td->thrconn, "/*!40103 SET TIME_ZONE='+00:00' */")) {
887 g_critical("Failed to set time zone: %s", mysql_error(td->thrconn));
888 }
889 if (!td->less_locking_stage){
890 if (use_savepoints && mysql_query(td->thrconn, "SET SQL_LOG_BIN = 0")) {
891 g_critical("Failed to disable binlog for the thread: %s",
892 mysql_error(td->thrconn));
893 exit(EXIT_FAILURE);
894 }
895 initialize_consistent_snapshot(td);
896 check_connection_status(td);
897 }
898 mysql_query(td->thrconn, set_names_str);
899
900 g_async_queue_push(td->ready, GINT_TO_POINTER(1));
901
902 struct job *job = NULL;
903 struct table_job *tj = NULL;
904 struct table_checksum_job *tcj = NULL;
905 struct tables_job *mj = NULL;
906 GList *glj;
907 int first = 1;
908 GString *query = g_string_new(NULL);
909 GString *prev_table = g_string_new(NULL);
910 GString *prev_database = g_string_new(NULL);
911 /* if less locking we need to wait until that threads finish
912 progressively waking up these threads */
913 if (!td->less_locking_stage && less_locking) {
914 g_mutex_lock(ll_mutex);
915
916 while (less_locking_threads >= td->thread_id) {
917 g_cond_wait(ll_cond, ll_mutex);
918 }
919
920 g_mutex_unlock(ll_mutex);
921 }
922
923 for (;;) {
924 GTimeVal tv;
925 g_get_current_time(&tv);
926 g_time_val_add(&tv, 1000 * 1000 * 1);
927 job = (struct job *)g_async_queue_pop(td->queue);
928 if (shutdown_triggered && (job->type != JOB_SHUTDOWN)) {
929 continue;
930 }
931
932 switch (job->type) {
933 case JOB_LOCK_DUMP_NON_INNODB:
934 mj = (struct tables_job *)job->job_data;
935 for (glj = mj->table_job_list; glj != NULL; glj = glj->next) {
936 tj = (struct table_job *)glj->data;
937 if (first) {
938 g_string_printf(query, "LOCK TABLES `%s`.`%s` READ LOCAL",
939 tj->database, tj->table);
940 first = 0;
941 } else {
942 if (g_ascii_strcasecmp(prev_database->str, tj->database) ||
943 g_ascii_strcasecmp(prev_table->str, tj->table)) {
944 g_string_append_printf(query, ", `%s`.`%s` READ LOCAL",
945 tj->database, tj->table);
946 }
947 }
948 g_string_printf(prev_table, "%s", tj->table);
949 g_string_printf(prev_database, "%s", tj->database);
950 }
951 first = 1;
952 if (mysql_query(td->thrconn, query->str)) {
953 g_critical("Non Innodb lock tables fail: %s", mysql_error(td->thrconn));
954 exit(EXIT_FAILURE);
955 }
956 if (g_atomic_int_dec_and_test(&non_innodb_table_counter) &&
957 g_atomic_int_get(&non_innodb_done)) {
958 g_async_queue_push(conf->unlock_tables, GINT_TO_POINTER(1));
959 }
960 for (glj = mj->table_job_list; glj != NULL; glj = glj->next) {
961 tj = (struct table_job *)glj->data;
962 message_dumping_data(td,tj);
963 dump_table_data_file(td->thrconn, tj);
964 free_table_job(tj);
965 g_free(tj);
966 }
967 mysql_query(td->thrconn, "UNLOCK TABLES /* Non Innodb */");
968 g_list_free(mj->table_job_list);
969 g_free(mj);
970 g_free(job);
971 break;
972 case JOB_DUMP:
973 tj = (struct table_job *)job->job_data;
974 message_dumping_data(td,tj);
975 if (use_savepoints && mysql_query(td->thrconn, "SAVEPOINT mydumper")) {
976 g_critical("Savepoint failed: %s", mysql_error(td->thrconn));
977 }
978 dump_table_data_file(td->thrconn, tj);
979 if (use_savepoints &&
980 mysql_query(td->thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
981 g_critical("Rollback to savepoint failed: %s", mysql_error(td->thrconn));
982 }
983 free_table_job(tj);
984 g_free(job);
985 break;
986 case JOB_CHECKSUM:
987 tcj = (struct table_checksum_job *)job->job_data;
988 g_message("Thread %d dumping checksum for `%s`.`%s`", td->thread_id,
989 tcj->database, tcj->table);
990 if (use_savepoints && mysql_query(td->thrconn, "SAVEPOINT mydumper")) {
991 g_critical("Savepoint failed: %s", mysql_error(td->thrconn));
992 }
993 dump_table_checksum(td->thrconn, tcj->database, tcj->table, tcj->filename);
994 if (use_savepoints &&
995 mysql_query(td->thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
996 g_critical("Rollback to savepoint failed: %s", mysql_error(td->thrconn));
997 }
998 free_table_checksum_job(tcj);
999 g_free(job);
1000 break;
1001 case JOB_DUMP_NON_INNODB:
1002 tj = (struct table_job *)job->job_data;
1003 message_dumping_data(td,tj);
1004 if (use_savepoints && mysql_query(td->thrconn, "SAVEPOINT mydumper")) {
1005 g_critical("Savepoint failed: %s", mysql_error(td->thrconn));
1006 }
1007 dump_table_data_file(td->thrconn, tj);
1008 if (use_savepoints &&
1009 mysql_query(td->thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
1010 g_critical("Rollback to savepoint failed: %s", mysql_error(td->thrconn));
1011 }
1012 free_table_job(tj);
1013 g_free(job);
1014 if (g_atomic_int_dec_and_test(&non_innodb_table_counter) &&
1015 g_atomic_int_get(&non_innodb_done)) {
1016 g_async_queue_push(conf->unlock_tables, GINT_TO_POINTER(1));
1017 }
1018 break;
1019 case JOB_DUMP_DATABASE:
1020 thd_JOB_DUMP_DATABASE(conf,td,job);
1021 break;
1022 case JOB_CREATE_DATABASE:
1023 thd_JOB_CREATE_DATABASE(td,job);
1024 break;
1025 case JOB_SCHEMA:
1026 thd_JOB_SCHEMA(td,job);
1027 break;
1028 case JOB_VIEW:
1029 thd_JOB_VIEW(td,job);
1030 break;
1031 case JOB_TRIGGERS:
1032 thd_JOB_TRIGGERS(td,job);
1033 break;
1034 case JOB_SCHEMA_POST:
1035 thd_JOB_SCHEMA_POST(td,job);
1036 break;
1037 case JOB_SHUTDOWN:
1038 g_message("Thread %d shutting down", td->thread_id);
1039 if (td->less_locking_stage){
1040 g_mutex_lock(ll_mutex);
1041 less_locking_threads--;
1042 g_cond_broadcast(ll_cond);
1043 g_mutex_unlock(ll_mutex);
1044 g_string_free(query, TRUE);
1045 g_string_free(prev_table, TRUE);
1046 g_string_free(prev_database, TRUE);
1047 }
1048 if (td->thrconn)
1049 mysql_close(td->thrconn);
1050 g_free(job);
1051 mysql_thread_end();
1052 return NULL;
1053 break;
1054 default:
1055 g_critical("Something very bad happened!");
1056 exit(EXIT_FAILURE);
1057 }
1058 }
1059 if (td->thrconn)
1060 mysql_close(td->thrconn);
1061 mysql_thread_end();
1062 return NULL;
1063 }
1064
main(int argc,char * argv[])1065 int main(int argc, char *argv[]) {
1066 GError *error = NULL;
1067 GOptionContext *context;
1068
1069 g_thread_init(NULL);
1070 setlocale(LC_ALL, "");
1071
1072 ref_table_mutex = g_mutex_new();
1073 init_mutex = g_mutex_new();
1074 innodb_tables_mutex = g_mutex_new();
1075 non_innodb_table_mutex = g_mutex_new();
1076 table_schemas_mutex = g_mutex_new();
1077 view_schemas_mutex = g_mutex_new();
1078 schema_post_mutex = g_mutex_new();
1079 ll_mutex = g_mutex_new();
1080 ll_cond = g_cond_new();
1081 database_hash=g_hash_table_new ( g_str_hash, g_str_equal );
1082 ref_table=g_hash_table_new ( g_str_hash, g_str_equal );
1083 context = g_option_context_new("multi-threaded MySQL dumping");
1084 GOptionGroup *main_group =
1085 g_option_group_new("main", "Main Options", "Main Options", NULL, NULL);
1086 g_option_group_add_entries(main_group, entries);
1087 g_option_group_add_entries(main_group, common_entries);
1088 g_option_context_set_main_group(context, main_group);
1089 gchar ** tmpargv=g_strdupv(argv);
1090 int tmpargc=argc;
1091 if (!g_option_context_parse(context, &tmpargc, &tmpargv, &error)) {
1092 g_print("option parsing failed: %s, try --help\n", error->message);
1093 exit(EXIT_FAILURE);
1094 }
1095
1096 set_session = g_string_new(NULL);
1097
1098 if (defaults_file != NULL){
1099 load_config_file(defaults_file, context, "mydumper");
1100 }
1101 g_option_context_free(context);
1102
1103 if (!compress_output) {
1104 m_open=&g_fopen;
1105 m_close=(void *) &fclose;
1106 m_write=(void *)&write_file;
1107 compress_extension=g_strdup("");
1108 } else {
1109 m_open=(void *) &gzopen;
1110 m_close=(void *) &gzclose;
1111 m_write=(void *)&gzwrite;
1112 #ifdef ZWRAP_USE_ZSTD
1113 compress_extension = g_strdup(".zst");
1114 #else
1115 compress_extension = g_strdup(".gz");
1116 #endif
1117 }
1118
1119 if (password != NULL){
1120 int i=1;
1121 for(i=1; i < argc; i++){
1122 gchar * p= g_strstr_len(argv[i],-1,password);
1123 if (p != NULL){
1124 strncpy(p, "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", strlen(password));
1125 }
1126 }
1127 }
1128
1129 // prompt for password if it's NULL
1130 if (sizeof(password) == 0 || (password == NULL && askPassword)) {
1131 password = passwordPrompt();
1132 }
1133
1134 if (load_data){
1135 if (!fields_enclosed_by_ld){
1136 fields_enclosed_by=g_strdup("");
1137 }else if(strlen(fields_enclosed_by_ld)>1){
1138 g_error("--fields-enclosed-by must be a single character");
1139 exit(EXIT_FAILURE);
1140 }else{
1141 fields_enclosed_by=fields_enclosed_by_ld;
1142 }
1143
1144 if (fields_escaped_by){
1145 if(strlen(fields_escaped_by)>1){
1146 g_error("--fields-escaped-by must be a single character");
1147 exit(EXIT_FAILURE);
1148 }else if (strcmp(fields_escaped_by,"\\")==0){
1149 fields_escaped_by=g_strdup("\\\\");
1150 }
1151 }
1152 }
1153
1154 if (!fields_terminated_by_ld){
1155 if (load_data){
1156 fields_terminated_by=g_strdup("\t");
1157 // fields_terminated_by_ld=g_strdup("\\t")
1158 }else
1159 fields_terminated_by=g_strdup(",");
1160 }else
1161 fields_terminated_by=replace_escaped_strings(g_strdup(fields_terminated_by_ld));
1162 if (!lines_starting_by_ld){
1163 if (load_data)
1164 lines_starting_by=g_strdup("");
1165 else
1166 lines_starting_by=g_strdup("(");
1167 }else
1168 lines_starting_by=replace_escaped_strings(g_strdup(lines_starting_by_ld));
1169 if (!lines_terminated_by_ld){
1170 if (load_data){
1171 lines_terminated_by=g_strdup("\n");
1172 lines_terminated_by_ld=g_strdup("\\n");
1173 }else
1174 lines_terminated_by=g_strdup(")\n");
1175 }else
1176 lines_terminated_by=replace_escaped_strings(g_strdup(lines_terminated_by_ld));
1177 if (!statement_terminated_by_ld){
1178 if (load_data)
1179 statement_terminated_by=g_strdup("");
1180 else
1181 statement_terminated_by=g_strdup(";\n");
1182 }else
1183 statement_terminated_by=replace_escaped_strings(g_strdup(statement_terminated_by_ld));
1184
1185
1186 if (set_names_str){
1187 gchar *tmp_str=g_strdup_printf("/*!40101 SET NAMES %s*/",set_names_str);
1188 set_names_str=tmp_str;
1189 } else
1190 set_names_str=g_strdup("/*!40101 SET NAMES binary*/");
1191
1192 if (program_version) {
1193 g_print("mydumper %s, built against MySQL %s\n", VERSION,
1194 MYSQL_VERSION_STR);
1195 exit(EXIT_SUCCESS);
1196 }
1197
1198 set_verbose(verbose);
1199
1200 GDateTime * datetime = g_date_time_new_now_local();
1201
1202 g_message("MyDumper backup version: %s", VERSION);
1203
1204 time_t t;
1205 time(&t);
1206 localtime_r(&t, &tval);
1207
1208 // rows chunks have precedence over chunk_filesize
1209 if (rows_per_file > 0 && chunk_filesize > 0) {
1210 chunk_filesize = 0;
1211 g_warning("--chunk-filesize disabled by --rows option");
1212 }
1213
1214 // until we have an unique option on lock types we need to ensure this
1215 if (no_locks || trx_consistency_only)
1216 less_locking = 0;
1217
1218 /* savepoints workaround to avoid metadata locking issues
1219 doesnt work for chuncks */
1220 if (rows_per_file && use_savepoints) {
1221 use_savepoints = FALSE;
1222 g_warning("--use-savepoints disabled by --rows");
1223 }
1224
1225 // clarify binlog coordinates with trx_consistency_only
1226 if (trx_consistency_only)
1227 g_warning("Using trx_consistency_only, binlog coordinates will not be "
1228 "accurate if you are writing to non transactional tables.");
1229
1230 char *datetimestr;
1231
1232 if (!output_directory_param){
1233 datetimestr=g_date_time_format(datetime,"\%Y\%m\%d-\%H\%M\%S");
1234 output_directory = g_strdup_printf("%s-%s", DIRECTORY, datetimestr);
1235 g_free(datetimestr);
1236 }else{
1237 output_directory=output_directory_param;
1238 }
1239 create_backup_dir(output_directory);
1240 if (daemon_mode) {
1241 pid_t pid, sid;
1242
1243 pid = fork();
1244 if (pid < 0)
1245 exit(EXIT_FAILURE);
1246 else if (pid > 0)
1247 exit(EXIT_SUCCESS);
1248
1249 umask(0037);
1250 sid = setsid();
1251
1252 if (sid < 0)
1253 exit(EXIT_FAILURE);
1254
1255 char *d_d;
1256 for (dump_number = 0; dump_number < snapshot_count; dump_number++) {
1257 d_d= g_strdup_printf("%s/%d", output_directory, dump_number);
1258 create_backup_dir(d_d);
1259 g_free(d_d);
1260 }
1261
1262 GFile *last_dump = g_file_new_for_path(
1263 g_strdup_printf("%s/last_dump", output_directory)
1264 );
1265 GFileInfo *last_dump_i = g_file_query_info(
1266 last_dump,
1267 G_FILE_ATTRIBUTE_STANDARD_TYPE ","
1268 G_FILE_ATTRIBUTE_STANDARD_SYMLINK_TARGET,
1269 G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
1270 NULL, NULL
1271 );
1272 if (last_dump_i != NULL &&
1273 g_file_info_get_file_type(last_dump_i) == G_FILE_TYPE_SYMBOLIC_LINK) {
1274 dump_number = atoi(g_file_info_get_symlink_target(last_dump_i));
1275 if (dump_number >= snapshot_count-1) dump_number = 0;
1276 else dump_number++;
1277 g_object_unref(last_dump_i);
1278 } else {
1279 dump_number = 0;
1280 }
1281 g_object_unref(last_dump);
1282 }else{
1283 dump_directory = output_directory;
1284 }
1285 /* Give ourselves an array of engines to ignore */
1286 if (ignore_engines)
1287 ignore = g_strsplit(ignore_engines, ",", 0);
1288
1289 /* Give ourselves an array of tables to dump */
1290 if (tables_list)
1291 tables = g_strsplit(tables_list, ",", 0);
1292
1293 /* Process list of tables to omit if specified */
1294 if (tables_skiplist_file)
1295 read_tables_skiplist(tables_skiplist_file);
1296
1297 if (daemon_mode) {
1298 GError *terror;
1299 start_scheduled_dump = g_async_queue_new();
1300 GThread *ethread =
1301 g_thread_create(exec_thread, GINT_TO_POINTER(1), FALSE, &terror);
1302 if (ethread == NULL) {
1303 g_critical("Could not create exec thread: %s", terror->message);
1304 g_error_free(terror);
1305 exit(EXIT_FAILURE);
1306 }
1307 // Run initial snapshot
1308 run_snapshot(NULL);
1309 #if GLIB_MINOR_VERSION < 14
1310 g_timeout_add(snapshot_interval * 60 * 1000, (GSourceFunc)run_snapshot,
1311 NULL);
1312 #else
1313 g_timeout_add_seconds(snapshot_interval * 60, (GSourceFunc)run_snapshot,
1314 NULL);
1315 #endif
1316 guint sigsource = g_unix_signal_add(SIGINT, sig_triggered, NULL);
1317 sigsource = g_unix_signal_add(SIGTERM, sig_triggered, NULL);
1318 m1 = g_main_loop_new(NULL, TRUE);
1319 g_main_loop_run(m1);
1320 g_source_remove(sigsource);
1321 } else {
1322 MYSQL *conn = create_main_connection();
1323 start_dump(conn);
1324 }
1325
1326 // sleep(5);
1327 mysql_thread_end();
1328 mysql_library_end();
1329 g_free(output_directory);
1330 g_strfreev(ignore);
1331 g_strfreev(tables);
1332
1333 if (logoutfile) {
1334 fclose(logoutfile);
1335 }
1336
1337 exit(errors ? EXIT_FAILURE : EXIT_SUCCESS);
1338 }
1339
1340 /* void initialize_session_variables(const gchar *group){
1341 gchar * group_variables=g_strdup_printf("%s_variables", group);
1342 if (set_session)
1343 g_string_set_size(set_session, 0);
1344 else
1345 set_session = g_string_new(NULL);
1346 GHashTable * set_session_hash=g_hash_table_new ( g_str_hash, g_str_equal );
1347 if (detected_server == SERVER_TYPE_MYSQL){
1348 g_hash_table_insert(set_session_hash,g_strdup("WAIT_TIMEOUT"),g_strdup("2147483"));
1349 g_hash_table_insert(set_session_hash,g_strdup("NET_WRITE_TIMEOUT"),g_strdup("2147483"));
1350 }
1351 // get_set_session_from_key_file(set_session, group_variables, kf);
1352 load_hash_from_key_file(set_session_hash,group_variables);
1353 refresh_set_session_from_hash(set_session,set_session_hash);
1354 }*/
1355
create_main_connection()1356 MYSQL *create_main_connection() {
1357 MYSQL *conn;
1358 conn = mysql_init(NULL);
1359
1360 configure_connection(conn, "mydumper");
1361
1362 if (!mysql_real_connect(conn, hostname, username, password, db, port,
1363 socket_path, 0)) {
1364 g_critical("Error connecting to database: %s", mysql_error(conn));
1365 exit(EXIT_FAILURE);
1366 }
1367
1368 detected_server = detect_server(conn);
1369 initialize_session_variables("mydumper",set_session, detected_server, defaults_file);
1370 execute_gstring(conn, set_session);
1371 // if ((detected_server == SERVER_TYPE_MYSQL) &&
1372 // mysql_query(conn, "SET SESSION wait_timeout = 2147483")) {
1373 // g_warning("Failed to increase wait_timeout: %s", mysql_error(conn));
1374 // }
1375 // if ((detected_server == SERVER_TYPE_MYSQL) &&
1376 // mysql_query(conn, "SET SESSION net_write_timeout = 2147483")) {
1377 // g_warning("Failed to increase net_write_timeout: %s", mysql_error(conn));
1378 // }
1379
1380 switch (detected_server) {
1381 case SERVER_TYPE_MYSQL:
1382 g_message("Connected to a MySQL server");
1383 break;
1384 case SERVER_TYPE_DRIZZLE:
1385 g_message("Connected to a Drizzle server");
1386 break;
1387 case SERVER_TYPE_TIDB:
1388 g_message("Connected to a TiDB server");
1389 break;
1390 default:
1391 g_critical("Cannot detect server type");
1392 exit(EXIT_FAILURE);
1393 break;
1394 }
1395
1396 return conn;
1397 }
1398
exec_thread(void * data)1399 void *exec_thread(void *data) {
1400 (void)data;
1401
1402 while (1) {
1403 g_async_queue_pop(start_scheduled_dump);
1404 MYSQL *conn = create_main_connection();
1405 char *dump_number_str=g_strdup_printf("%d",dump_number);
1406 dump_directory = g_build_path( output_directory, dump_number_str, NULL);
1407 g_free(dump_number_str);
1408 clear_dump_directory(dump_directory);
1409 start_dump(conn);
1410 // start_dump already closes mysql
1411 // mysql_close(conn);
1412 mysql_thread_end();
1413
1414 // Don't switch the symlink on shutdown because the dump is probably
1415 // incomplete.
1416 if (!shutdown_triggered) {
1417 char *dump_symlink_source= g_strdup_printf("%d", dump_number);
1418 char *dump_symlink_dest =
1419 g_strdup_printf("%s/last_dump", output_directory);
1420
1421 // We don't care if this fails
1422 g_unlink(dump_symlink_dest);
1423
1424 if (symlink(dump_symlink_source, dump_symlink_dest) == -1) {
1425 g_critical("error setting last good dump symlink %s, %d",
1426 dump_symlink_dest, errno);
1427 }
1428 g_free(dump_symlink_dest);
1429
1430 if (dump_number >= snapshot_count-1) dump_number = 0;
1431 else dump_number++;
1432 }
1433 }
1434 return NULL;
1435 }
1436
dump_metadata(struct db_table * dbt)1437 void dump_metadata(struct db_table * dbt){
1438 char *filename = build_meta_filename(dbt->database->filename, dbt->table_filename, "metadata");
1439 FILE *table_meta = g_fopen(filename, "w");
1440 if (!table_meta) {
1441 g_critical("Couldn't write table metadata file %s (%d)", filename, errno);
1442 exit(EXIT_FAILURE);
1443 }
1444 fprintf(table_meta, "%d", dbt->rows);
1445 if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
1446 fclose(table_meta);
1447 }
1448
start_dump(MYSQL * conn)1449 void start_dump(MYSQL *conn) {
1450 struct configuration conf = {1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0};
1451 char *p;
1452 char *p2;
1453 char *p3;
1454 char *u;
1455
1456 guint64 nits[num_threads];
1457 GList *nitl[num_threads];
1458 int tn = 0;
1459 guint64 min = 0;
1460 struct db_table *dbt=NULL;
1461 struct schema_post *sp;
1462 guint n;
1463 FILE *nufile = NULL;
1464 guint have_backup_locks = 0;
1465
1466 for (n = 0; n < num_threads; n++) {
1467 nits[n] = 0;
1468 nitl[n] = NULL;
1469 }
1470 if (ignore_generated_fields)
1471 g_warning("Queries related to generated fields are not going to be executed. It will lead to restoration issues if you have generated columns");
1472
1473 p = g_strdup_printf("%s/metadata.partial", dump_directory);
1474 p2 = g_strndup(p, (unsigned)strlen(p) - 8);
1475
1476 FILE *mdfile = g_fopen(p, "w");
1477 if (!mdfile) {
1478 g_critical("Couldn't write metadata file %s (%d)", p, errno);
1479 exit(EXIT_FAILURE);
1480 }
1481
1482 if (updated_since > 0) {
1483 u = g_strdup_printf("%s/not_updated_tables", dump_directory);
1484 nufile = g_fopen(u, "w");
1485 if (!nufile) {
1486 g_critical("Couldn't write not_updated_tables file (%d)", errno);
1487 exit(EXIT_FAILURE);
1488 }
1489 get_not_updated(conn, nufile);
1490 }
1491
1492 /* We check SHOW PROCESSLIST, and if there're queries
1493 larger than preset value, we terminate the process.
1494
1495 This avoids stalling whole server with flush */
1496
1497 if (!no_locks) {
1498
1499 while (TRUE) {
1500 int longquery_count = 0;
1501 if (mysql_query(conn, "SHOW PROCESSLIST")) {
1502 g_warning("Could not check PROCESSLIST, no long query guard enabled: %s",
1503 mysql_error(conn));
1504 break;
1505 } else {
1506 MYSQL_RES *res = mysql_store_result(conn);
1507 MYSQL_ROW row;
1508
1509 /* Just in case PROCESSLIST output column order changes */
1510 MYSQL_FIELD *fields = mysql_fetch_fields(res);
1511 guint i;
1512 int tcol = -1, ccol = -1, icol = -1, ucol = -1;
1513 for (i = 0; i < mysql_num_fields(res); i++) {
1514 if (!strcasecmp(fields[i].name, "Command"))
1515 ccol = i;
1516 else if (!strcasecmp(fields[i].name, "Time"))
1517 tcol = i;
1518 else if (!strcasecmp(fields[i].name, "Id"))
1519 icol = i;
1520 else if (!strcasecmp(fields[i].name, "User"))
1521 ucol = i;
1522 }
1523 if ((tcol < 0) || (ccol < 0) || (icol < 0)) {
1524 g_critical("Error obtaining information from processlist");
1525 exit(EXIT_FAILURE);
1526 }
1527 while ((row = mysql_fetch_row(res))) {
1528 if (row[ccol] && strcmp(row[ccol], "Query"))
1529 continue;
1530 if (row[ucol] && !strcmp(row[ucol], "system user"))
1531 continue;
1532 if (row[tcol] && atoi(row[tcol]) > longquery) {
1533 if (killqueries) {
1534 if (mysql_query(conn,
1535 p3 = g_strdup_printf("KILL %lu", atol(row[icol])))) {
1536 g_warning("Could not KILL slow query: %s", mysql_error(conn));
1537 longquery_count++;
1538 } else {
1539 g_warning("Killed a query that was running for %ss", row[tcol]);
1540 }
1541 g_free(p3);
1542 } else {
1543 longquery_count++;
1544 }
1545 }
1546 }
1547 mysql_free_result(res);
1548 if (longquery_count == 0)
1549 break;
1550 else {
1551 if (longquery_retries == 0) {
1552 g_critical("There are queries in PROCESSLIST running longer than "
1553 "%us, aborting dump,\n\t"
1554 "use --long-query-guard to change the guard value, kill "
1555 "queries (--kill-long-queries) or use \n\tdifferent "
1556 "server for dump",
1557 longquery);
1558 exit(EXIT_FAILURE);
1559 }
1560 longquery_retries--;
1561 g_warning("There are queries in PROCESSLIST running longer than "
1562 "%us, retrying in %u seconds (%u left).",
1563 longquery, longquery_retry_interval, longquery_retries);
1564 sleep(longquery_retry_interval);
1565 }
1566 }
1567 }
1568 }
1569
1570 if (!no_locks && (detected_server != SERVER_TYPE_TIDB)) {
1571 // Percona Server 8 removed LOCK BINLOG so backup locks is useless for
1572 // mydumper now and we need to fail back to FTWRL
1573 mysql_query(conn, "SELECT @@version_comment, @@version");
1574 MYSQL_RES *res2 = mysql_store_result(conn);
1575 MYSQL_ROW ver;
1576 while ((ver = mysql_fetch_row(res2))) {
1577 if (g_str_has_prefix(ver[0], "Percona") &&
1578 g_str_has_prefix(ver[1], "8.")) {
1579 g_message("Disabling Percona Backup Locks for Percona Server 8");
1580 no_backup_locks = 1;
1581 }
1582 }
1583 mysql_free_result(res2);
1584
1585 // Percona Backup Locks
1586 if (!no_backup_locks) {
1587 mysql_query(conn, "SELECT @@have_backup_locks");
1588 MYSQL_RES *rest = mysql_store_result(conn);
1589 if (rest != NULL && mysql_num_rows(rest)) {
1590 mysql_free_result(rest);
1591 g_message("Using Percona Backup Locks");
1592 have_backup_locks = 1;
1593 }
1594 }
1595
1596 if (have_backup_locks) {
1597 if (mysql_query(conn, "LOCK TABLES FOR BACKUP")) {
1598 g_critical("Couldn't acquire LOCK TABLES FOR BACKUP, snapshots will "
1599 "not be consistent: %s",
1600 mysql_error(conn));
1601 errors++;
1602 }
1603
1604 if (mysql_query(conn, "LOCK BINLOG FOR BACKUP")) {
1605 g_critical("Couldn't acquire LOCK BINLOG FOR BACKUP, snapshots will "
1606 "not be consistent: %s",
1607 mysql_error(conn));
1608 errors++;
1609 }
1610 } else if (lock_all_tables) {
1611 // LOCK ALL TABLES
1612 GString *query = g_string_sized_new(16777216);
1613 gchar *dbtb = NULL;
1614 gchar **dt = NULL;
1615 GList *tables_lock = NULL;
1616 GList *iter = NULL;
1617 guint success = 0;
1618 guint retry = 0;
1619 guint lock = 1;
1620 int i = 0;
1621
1622 if (db) {
1623 g_string_printf(
1624 query,
1625 "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES "
1626 "WHERE TABLE_SCHEMA = '%s' AND TABLE_TYPE ='BASE TABLE' AND NOT "
1627 "(TABLE_SCHEMA = 'mysql' AND (TABLE_NAME = 'slow_log' OR "
1628 "TABLE_NAME = 'general_log'))",
1629 db);
1630 } else if (tables) {
1631 for (i = 0; tables[i] != NULL; i++) {
1632 dt = g_strsplit(tables[i], ".", 0);
1633 dbtb = g_strdup_printf("`%s`.`%s`", dt[0], dt[1]);
1634 tables_lock = g_list_prepend(tables_lock, dbtb);
1635 }
1636 tables_lock = g_list_reverse(tables_lock);
1637 } else {
1638 g_string_printf(
1639 query,
1640 "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES "
1641 "WHERE TABLE_TYPE ='BASE TABLE' AND TABLE_SCHEMA NOT IN "
1642 "('information_schema', 'performance_schema', 'data_dictionary') "
1643 "AND NOT (TABLE_SCHEMA = 'mysql' AND (TABLE_NAME = 'slow_log' OR "
1644 "TABLE_NAME = 'general_log'))");
1645 }
1646
1647 if (tables_lock == NULL) {
1648 if (mysql_query(conn, query->str)) {
1649 g_critical("Couldn't get table list for lock all tables: %s",
1650 mysql_error(conn));
1651 errors++;
1652 } else {
1653 MYSQL_RES *res = mysql_store_result(conn);
1654 MYSQL_ROW row;
1655
1656 while ((row = mysql_fetch_row(res))) {
1657 lock = 1;
1658 if (tables) {
1659 int table_found = 0;
1660 for (i = 0; tables[i] != NULL; i++)
1661 if (g_ascii_strcasecmp(tables[i], row[1]) == 0)
1662 table_found = 1;
1663 if (!table_found)
1664 lock = 0;
1665 }
1666 if (lock && tables_skiplist_file && check_skiplist(row[0], row[1]))
1667 continue;
1668 if (lock && regexstring && !check_regex(row[0], row[1]))
1669 continue;
1670
1671 if (lock) {
1672 dbtb = g_strdup_printf("`%s`.`%s`", row[0], row[1]);
1673 tables_lock = g_list_prepend(tables_lock, dbtb);
1674 }
1675 }
1676 tables_lock = g_list_reverse(tables_lock);
1677 }
1678 }
1679
1680 // Try three times to get the lock, this is in case of tmp tables
1681 // disappearing
1682 while (!success && retry < 4) {
1683 n = 0;
1684 for (iter = tables_lock; iter != NULL; iter = iter->next) {
1685 if (n == 0) {
1686 g_string_printf(query, "LOCK TABLE %s READ", (char *)iter->data);
1687 n = 1;
1688 } else {
1689 g_string_append_printf(query, ", %s READ", (char *)iter->data);
1690 }
1691 }
1692 if (mysql_query(conn, query->str)) {
1693 gchar *failed_table = NULL;
1694 gchar **tmp_fail;
1695
1696 tmp_fail = g_strsplit(mysql_error(conn), "'", 0);
1697 tmp_fail = g_strsplit(tmp_fail[1], ".", 0);
1698 failed_table = g_strdup_printf("`%s`.`%s`", tmp_fail[0], tmp_fail[1]);
1699 for (iter = tables_lock; iter != NULL; iter = iter->next) {
1700 if (strcmp(iter->data, failed_table) == 0) {
1701 tables_lock = g_list_remove(tables_lock, iter->data);
1702 }
1703 }
1704 g_free(tmp_fail);
1705 g_free(failed_table);
1706 } else {
1707 success = 1;
1708 }
1709 retry += 1;
1710 }
1711 if (!success) {
1712 g_critical("Lock all tables fail: %s", mysql_error(conn));
1713 exit(EXIT_FAILURE);
1714 }
1715 g_free(query->str);
1716 g_list_free(tables_lock);
1717 } else {
1718 if (mysql_query(conn, "FLUSH TABLES WITH READ LOCK")) {
1719 g_critical("Couldn't acquire global lock, snapshots will not be "
1720 "consistent: %s",
1721 mysql_error(conn));
1722 errors++;
1723 }
1724 }
1725 } else if (detected_server == SERVER_TYPE_TIDB) {
1726 g_message("Skipping locks because of TiDB");
1727 if (!tidb_snapshot) {
1728
1729 // Generate a @@tidb_snapshot to use for the worker threads since
1730 // the tidb-snapshot argument was not specified when starting mydumper
1731
1732 if (mysql_query(conn, "SHOW MASTER STATUS")) {
1733 g_critical("Couldn't generate @@tidb_snapshot: %s", mysql_error(conn));
1734 exit(EXIT_FAILURE);
1735 } else {
1736
1737 MYSQL_RES *result = mysql_store_result(conn);
1738 MYSQL_ROW row = mysql_fetch_row(
1739 result); /* There should never be more than one row */
1740 tidb_snapshot = g_strdup(row[1]);
1741 mysql_free_result(result);
1742 }
1743 }
1744
1745 // Need to set the @@tidb_snapshot for the master thread
1746 gchar *query =
1747 g_strdup_printf("SET SESSION tidb_snapshot = '%s'", tidb_snapshot);
1748
1749 g_message("Set to tidb_snapshot '%s'", tidb_snapshot);
1750
1751 if (mysql_query(conn, query)) {
1752 g_critical("Failed to set tidb_snapshot: %s", mysql_error(conn));
1753 exit(EXIT_FAILURE);
1754 }
1755 g_free(query);
1756
1757 } else {
1758 g_warning("Executing in no-locks mode, snapshot will not be consistent");
1759 }
1760 if (mysql_get_server_version(conn) < 40108) {
1761 mysql_query(
1762 conn,
1763 "CREATE TABLE IF NOT EXISTS mysql.mydumperdummy (a INT) ENGINE=INNODB");
1764 need_dummy_read = 1;
1765 }
1766
1767 // tokudb do not support consistent snapshot
1768 mysql_query(conn, "SELECT @@tokudb_version");
1769 MYSQL_RES *rest = mysql_store_result(conn);
1770 if (rest != NULL && mysql_num_rows(rest)) {
1771 mysql_free_result(rest);
1772 g_message("TokuDB detected, creating dummy table for CS");
1773 mysql_query(
1774 conn,
1775 "CREATE TABLE IF NOT EXISTS mysql.tokudbdummy (a INT) ENGINE=TokuDB");
1776 need_dummy_toku_read = 1;
1777 }
1778
1779 // Do not start a transaction when lock all tables instead of FTWRL,
1780 // since it can implicitly release read locks we hold
1781 if (!lock_all_tables) {
1782 mysql_query(conn, "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */");
1783 }
1784
1785 if (need_dummy_read) {
1786 mysql_query(conn,
1787 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy");
1788 MYSQL_RES *res = mysql_store_result(conn);
1789 if (res)
1790 mysql_free_result(res);
1791 }
1792 if (need_dummy_toku_read) {
1793 mysql_query(conn,
1794 "SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.tokudbdummy");
1795 MYSQL_RES *res = mysql_store_result(conn);
1796 if (res)
1797 mysql_free_result(res);
1798 }
1799 GDateTime *datetime = g_date_time_new_now_local();
1800 char *datetimestr=g_date_time_format(datetime,"\%Y-\%m-\%d \%H:\%M:\%S");
1801 fprintf(mdfile, "Started dump at: %s\n", datetimestr);
1802
1803 g_message("Started dump at: %s", datetimestr);
1804 g_free(datetimestr);
1805
1806 if (detected_server == SERVER_TYPE_MYSQL) {
1807 mysql_query(conn, set_names_str);
1808
1809 write_snapshot_info(conn, mdfile);
1810 }
1811 GThread *stream_thread = NULL;
1812 if (stream){
1813 stream_queue = g_async_queue_new();
1814 stream_thread = g_thread_create((GThreadFunc)process_stream, stream_queue, TRUE, NULL);
1815 }
1816 GThread **threads = g_new(GThread *, num_threads * (less_locking + 1));
1817 struct thread_data *td =
1818 g_new(struct thread_data, num_threads * (less_locking + 1));
1819
1820 if (less_locking) {
1821 conf.queue_less_locking = g_async_queue_new();
1822 conf.ready_less_locking = g_async_queue_new();
1823 less_locking_threads = num_threads;
1824 for (n = num_threads; n < num_threads * 2; n++) {
1825 td[n].conf = &conf;
1826 td[n].thread_id = n + 1;
1827 td[n].queue = conf.queue_less_locking;
1828 td[n].ready = conf.ready_less_locking;
1829 td[n].less_locking_stage = TRUE;
1830 threads[n] = g_thread_create((GThreadFunc)process_queue,
1831 &td[n], TRUE, NULL);
1832 g_async_queue_pop(conf.ready_less_locking);
1833 }
1834 g_async_queue_unref(conf.ready_less_locking);
1835 }
1836
1837 conf.queue = g_async_queue_new();
1838 conf.ready = g_async_queue_new();
1839 conf.unlock_tables = g_async_queue_new();
1840 conf.ready_database_dump = g_async_queue_new();
1841
1842 for (n = 0; n < num_threads; n++) {
1843 td[n].conf = &conf;
1844 td[n].thread_id = n + 1;
1845 td[n].queue = conf.queue;
1846 td[n].ready = conf.ready;
1847 td[n].less_locking_stage = FALSE;
1848 threads[n] =
1849 g_thread_create((GThreadFunc)process_queue, &td[n], TRUE, NULL);
1850 g_async_queue_pop(conf.ready);
1851 }
1852
1853 g_async_queue_unref(conf.ready);
1854
1855 if (trx_consistency_only) {
1856 g_message("Transactions started, unlocking tables");
1857 mysql_query(conn, "UNLOCK TABLES /* trx-only */");
1858 if (have_backup_locks)
1859 mysql_query(conn, "UNLOCK BINLOG");
1860 }
1861
1862 if (db) {
1863 dump_database(new_database(conn,db,TRUE), &conf);
1864 if (!no_schemas)
1865 dump_create_database(db, &conf);
1866 } else if (tables) {
1867 get_tables(conn, &conf);
1868 } else {
1869 MYSQL_RES *databases;
1870 MYSQL_ROW row;
1871 if (mysql_query(conn, "SHOW DATABASES") ||
1872 !(databases = mysql_store_result(conn))) {
1873 g_critical("Unable to list databases: %s", mysql_error(conn));
1874 exit(EXIT_FAILURE);
1875 }
1876
1877 while ((row = mysql_fetch_row(databases))) {
1878 if (!strcasecmp(row[0], "information_schema") ||
1879 !strcasecmp(row[0], "performance_schema") ||
1880 (!strcasecmp(row[0], "data_dictionary")))
1881 continue;
1882 struct database * db_tmp=NULL;
1883 if (get_database(conn,row[0],&db_tmp) && !no_schemas && (regexstring == NULL || check_regex(row[0], NULL))){
1884 g_mutex_lock(db_tmp->ad_mutex);
1885 if (!db_tmp->already_dumped){
1886 dump_create_database(db_tmp->name, &conf);
1887 db_tmp->already_dumped=TRUE;
1888 }
1889 g_mutex_unlock(db_tmp->ad_mutex);
1890 }
1891 dump_database(db_tmp, &conf);
1892 /* Checks PCRE expressions on 'database' string */
1893 // if (!no_schemas && (regexstring == NULL || check_regex(row[0], NULL))){
1894 // dump_create_database(row[0], &conf);
1895 // }
1896 }
1897 mysql_free_result(databases);
1898 }
1899 g_async_queue_pop(conf.ready_database_dump);
1900 g_async_queue_unref(conf.ready_database_dump);
1901 g_list_free(no_updated_tables);
1902
1903 if (!non_innodb_table) {
1904 g_async_queue_push(conf.unlock_tables, GINT_TO_POINTER(1));
1905 }
1906
1907 GList *iter;
1908 table_schemas = g_list_reverse(table_schemas);
1909 for (iter = table_schemas; iter != NULL; iter = iter->next) {
1910 dbt = (struct db_table *)iter->data;
1911 dump_schema(conn, dbt, &conf);
1912 }
1913
1914 non_innodb_table = g_list_reverse(non_innodb_table);
1915 if (less_locking) {
1916
1917 for (iter = non_innodb_table; iter != NULL; iter = iter->next) {
1918 dbt = (struct db_table *)iter->data;
1919 tn = 0;
1920 min = nits[0];
1921 for (n = 1; n < num_threads; n++) {
1922 if (nits[n] < min) {
1923 min = nits[n];
1924 tn = n;
1925 }
1926 }
1927 nitl[tn] = g_list_prepend(nitl[tn], dbt);
1928 nits[tn] += dbt->datalength;
1929 }
1930 nitl[tn] = g_list_reverse(nitl[tn]);
1931
1932 for (n = 0; n < num_threads; n++) {
1933 if (nits[n] > 0) {
1934 g_atomic_int_inc(&non_innodb_table_counter);
1935 dump_tables(conn, nitl[n], &conf);
1936 g_list_free(nitl[n]);
1937 }
1938 }
1939 g_list_free(non_innodb_table);
1940
1941 if (g_atomic_int_get(&non_innodb_table_counter))
1942 g_atomic_int_inc(&non_innodb_done);
1943 else
1944 g_async_queue_push(conf.unlock_tables, GINT_TO_POINTER(1));
1945
1946 for (n = 0; n < num_threads; n++) {
1947 struct job *j = g_new0(struct job, 1);
1948 j->type = JOB_SHUTDOWN;
1949 g_async_queue_push(conf.queue_less_locking, j);
1950 }
1951 } else {
1952 for (iter = non_innodb_table; iter != NULL; iter = iter->next) {
1953 dbt = (struct db_table *)iter->data;
1954 if (dump_checksums) {
1955 dump_checksum(dbt, &conf);
1956 }
1957 dump_table(conn, dbt, &conf, FALSE);
1958 g_atomic_int_inc(&non_innodb_table_counter);
1959 }
1960 g_list_free(non_innodb_table);
1961 g_atomic_int_inc(&non_innodb_done);
1962 }
1963
1964 innodb_tables = g_list_reverse(innodb_tables);
1965 for (iter = innodb_tables; iter != NULL; iter = iter->next) {
1966 dbt = (struct db_table *)iter->data;
1967 if (dump_checksums) {
1968 dump_checksum(dbt, &conf);
1969 }
1970 dump_table(conn, dbt, &conf, TRUE);
1971 }
1972 g_list_free(innodb_tables);
1973 innodb_tables=NULL;
1974
1975 /* table_schemas = g_list_reverse(table_schemas);
1976 for (iter = table_schemas; iter != NULL; iter = iter->next) {
1977 dbt = (struct db_table *)iter->data;
1978 dump_schema(conn, dbt, &conf);
1979 }*/
1980
1981 view_schemas = g_list_reverse(view_schemas);
1982 for (iter = view_schemas; iter != NULL; iter = iter->next) {
1983 dbt = (struct db_table *)iter->data;
1984 dump_view(dbt, &conf);
1985 g_free(dbt->table);
1986 g_free(dbt);
1987 }
1988 g_list_free(view_schemas);
1989 view_schemas=NULL;
1990
1991 schema_post = g_list_reverse(schema_post);
1992 for (iter = schema_post; iter != NULL; iter = iter->next) {
1993 sp = (struct schema_post *)iter->data;
1994 dump_schema_post(sp->database, &conf);
1995 g_free(sp);
1996 }
1997 g_list_free(schema_post);
1998 schema_post=NULL;
1999
2000 if (!no_locks && !trx_consistency_only) {
2001 g_async_queue_pop(conf.unlock_tables);
2002 g_message("Non-InnoDB dump complete, unlocking tables");
2003 mysql_query(conn, "UNLOCK TABLES /* FTWRL */");
2004 if (have_backup_locks)
2005 mysql_query(conn, "UNLOCK BINLOG");
2006 }
2007 // close main connection
2008 mysql_close(conn);
2009
2010 if (less_locking) {
2011 for (n = num_threads; n < num_threads * 2; n++) {
2012 g_thread_join(threads[n]);
2013 }
2014 g_async_queue_unref(conf.queue_less_locking);
2015 }
2016
2017 for (n = 0; n < num_threads; n++) {
2018 struct job *j = g_new0(struct job, 1);
2019 j->type = JOB_SHUTDOWN;
2020 g_async_queue_push(conf.queue, j);
2021 }
2022
2023 for (n = 0; n < num_threads; n++) {
2024 g_thread_join(threads[n]);
2025 }
2026
2027 table_schemas = g_list_reverse(table_schemas);
2028 for (iter = table_schemas; iter != NULL; iter = iter->next) {
2029 dbt = (struct db_table *)iter->data;
2030 dump_metadata(dbt);
2031 }
2032 g_list_free(table_schemas);
2033 table_schemas=NULL;
2034
2035 g_async_queue_unref(conf.queue);
2036 g_async_queue_unref(conf.unlock_tables);
2037
2038 datetime = g_date_time_new_now_local();
2039 datetimestr=g_date_time_format(datetime,"\%Y-\%m-\%d \%H:\%M:\%S");
2040 fprintf(mdfile, "Finished dump at: %s\n", datetimestr);
2041 fclose(mdfile);
2042 if (updated_since > 0)
2043 fclose(nufile);
2044 g_rename(p, p2);
2045 if (stream) {
2046 g_async_queue_push(stream_queue, g_strdup(p2));
2047 }
2048 g_free(p);
2049 g_free(p2);
2050 g_message("Finished dump at: %s",datetimestr);
2051 g_free(datetimestr);
2052
2053 if (stream) {
2054 g_async_queue_push(stream_queue, g_strdup(""));
2055 g_thread_join(stream_thread);
2056 if (no_delete == FALSE && output_directory_param == NULL)
2057 if (g_rmdir(output_directory) != 0)
2058 g_critical("Backup directory not removed: %s", output_directory);
2059 }
2060 g_free(td);
2061 g_free(threads);
2062 }
2063
dump_create_database(char * database,struct configuration * conf)2064 void dump_create_database(char *database, struct configuration *conf) {
2065 struct job *j = g_new0(struct job, 1);
2066 struct create_database_job *cdj = g_new0(struct create_database_job, 1);
2067 j->job_data = (void *)cdj;
2068 gchar *d=get_ref_table(database);
2069 cdj->database = g_strdup(database);
2070 j->conf = conf;
2071 j->type = JOB_CREATE_DATABASE;
2072
2073 cdj->filename = build_schema_filename(d, "schema-create");
2074
2075 g_async_queue_push(conf->queue, j);
2076 return;
2077 }
2078
dump_create_database_data(MYSQL * conn,char * database,char * filename)2079 void dump_create_database_data(MYSQL *conn, char *database, char *filename) {
2080 void *outfile = NULL;
2081 char *query = NULL;
2082 MYSQL_RES *result = NULL;
2083 MYSQL_ROW row;
2084
2085 outfile = m_open(filename,"w");
2086
2087 if (!outfile) {
2088 g_critical("Error: DB: %s Could not create output file %s (%d)", database,
2089 filename, errno);
2090 errors++;
2091 return;
2092 }
2093
2094 GString *statement = g_string_sized_new(statement_size);
2095
2096 query = g_strdup_printf("SHOW CREATE DATABASE IF NOT EXISTS `%s`", database);
2097 if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
2098 if (success_on_1146 && mysql_errno(conn) == 1146) {
2099 g_warning("Error dumping create database (%s): %s", database,
2100 mysql_error(conn));
2101 } else {
2102 g_critical("Error dumping create database (%s): %s", database,
2103 mysql_error(conn));
2104 errors++;
2105 }
2106 g_free(query);
2107 return;
2108 }
2109
2110 /* There should never be more than one row */
2111 row = mysql_fetch_row(result);
2112 g_string_append(statement, row[1]);
2113 g_string_append(statement, ";\n");
2114 if (!write_data((FILE *)outfile, statement)) {
2115 g_critical("Could not write create database for %s", database);
2116 errors++;
2117 }
2118 g_free(query);
2119
2120 m_close(outfile);
2121 if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
2122 g_string_free(statement, TRUE);
2123 if (result)
2124 mysql_free_result(result);
2125
2126 return;
2127 }
2128
get_not_updated(MYSQL * conn,FILE * file)2129 void get_not_updated(MYSQL *conn, FILE *file) {
2130 MYSQL_RES *res = NULL;
2131 MYSQL_ROW row;
2132
2133 gchar *query =
2134 g_strdup_printf("SELECT CONCAT(TABLE_SCHEMA,'.',TABLE_NAME) FROM "
2135 "information_schema.TABLES WHERE TABLE_TYPE = 'BASE "
2136 "TABLE' AND UPDATE_TIME < NOW() - INTERVAL %d DAY",
2137 updated_since);
2138 mysql_query(conn, query);
2139 g_free(query);
2140
2141 res = mysql_store_result(conn);
2142 while ((row = mysql_fetch_row(res))) {
2143 no_updated_tables = g_list_prepend(no_updated_tables, row[0]);
2144 fprintf(file, "%s\n", row[0]);
2145 }
2146 no_updated_tables = g_list_reverse(no_updated_tables);
2147 fflush(file);
2148 }
2149
detect_generated_fields(MYSQL * conn,struct db_table * dbt)2150 gboolean detect_generated_fields(MYSQL *conn, struct db_table *dbt) {
2151 MYSQL_RES *res = NULL;
2152 MYSQL_ROW row;
2153
2154 gboolean result = FALSE;
2155 if (ignore_generated_fields)
2156 return FALSE;
2157
2158 gchar *query = g_strdup_printf(
2159 "select COLUMN_NAME from information_schema.COLUMNS where "
2160 "TABLE_SCHEMA='%s' and TABLE_NAME='%s' and extra like '%%GENERATED%%' and extra not like '%%DEFAULT_GENERATED%%'",
2161 dbt->database->escaped, dbt->escaped_table);
2162
2163 mysql_query(conn, query);
2164 g_free(query);
2165
2166 res = mysql_store_result(conn);
2167 if (res == NULL){
2168 return FALSE;
2169 }
2170
2171 if ((row = mysql_fetch_row(res))) {
2172 result = TRUE;
2173 }
2174 mysql_free_result(res);
2175
2176 return result;
2177 }
2178
get_insertable_fields(MYSQL * conn,char * database,char * table)2179 GString *get_insertable_fields(MYSQL *conn, char *database, char *table) {
2180 MYSQL_RES *res = NULL;
2181 MYSQL_ROW row;
2182
2183 GString *field_list = g_string_new("");
2184
2185 gchar *query =
2186 g_strdup_printf("select COLUMN_NAME from information_schema.COLUMNS "
2187 "where TABLE_SCHEMA='%s' and TABLE_NAME='%s' and extra "
2188 "not like '%%VIRTUAL GENERATED%%' and extra not like '%%STORED GENERATED%%'",
2189 database, table);
2190 mysql_query(conn, query);
2191 g_free(query);
2192
2193 res = mysql_store_result(conn);
2194 gboolean first = TRUE;
2195 while ((row = mysql_fetch_row(res))) {
2196 if (first) {
2197 first = FALSE;
2198 } else {
2199 g_string_append(field_list, ",");
2200 }
2201
2202 gchar *tb = g_strdup_printf("`%s`", row[0]);
2203 g_string_append(field_list, tb);
2204 g_free(tb);
2205 }
2206 mysql_free_result(res);
2207
2208 return field_list;
2209 }
2210
get_primary_key_string(MYSQL * conn,char * database,char * table)2211 gchar *get_primary_key_string(MYSQL *conn, char *database, char *table) {
2212 MYSQL_RES *res = NULL;
2213 MYSQL_ROW row;
2214
2215 GString *field_list = g_string_new("");
2216
2217 gchar *query =
2218 g_strdup_printf("SELECT k.COLUMN_NAME, ORDINAL_POSITION "
2219 "FROM information_schema.table_constraints t "
2220 "LEFT JOIN information_schema.key_column_usage k "
2221 "USING(constraint_name,table_schema,table_name) "
2222 "WHERE t.constraint_type IN ('PRIMARY KEY', 'UNIQUE') "
2223 "AND t.table_schema='%s' "
2224 "AND t.table_name='%s' "
2225 "ORDER BY t.constraint_type, ORDINAL_POSITION; ",
2226 database, table);
2227 mysql_query(conn, query);
2228 g_free(query);
2229
2230 res = mysql_store_result(conn);
2231 gboolean first = TRUE;
2232 while ((row = mysql_fetch_row(res))) {
2233 if (first) {
2234 first = FALSE;
2235 } else if (atoi(row[1]) > 1) {
2236 g_string_append(field_list, ",");
2237 } else {
2238 break;
2239 }
2240
2241 gchar *tb = g_strdup_printf("`%s`", row[0]);
2242 g_string_append(field_list, tb);
2243 g_free(tb);
2244 }
2245 mysql_free_result(res);
2246 // Return NULL if we never found a PRIMARY or UNIQUE key
2247 if (first) {
2248 g_string_free(field_list, TRUE);
2249 return NULL;
2250 } else {
2251 gchar *order_string = g_string_free(field_list, FALSE);
2252 return order_string;
2253 }
2254 }
2255
2256 /* Heuristic chunks building - based on estimates, produces list of ranges for
2257 datadumping WORK IN PROGRESS
2258 */
get_chunks_for_table(MYSQL * conn,char * database,char * table,struct configuration * conf)2259 GList *get_chunks_for_table(MYSQL *conn, char *database, char *table,
2260 struct configuration *conf) {
2261
2262 GList *chunks = NULL;
2263 MYSQL_RES *indexes = NULL, *minmax = NULL, *total = NULL;
2264 MYSQL_ROW row;
2265 char *field = NULL;
2266 int showed_nulls = 0;
2267
2268 /* first have to pick index, in future should be able to preset in
2269 * configuration too */
2270 gchar *query = g_strdup_printf("SHOW INDEX FROM `%s`.`%s`", database, table);
2271 mysql_query(conn, query);
2272 g_free(query);
2273 indexes = mysql_store_result(conn);
2274
2275 if (indexes){
2276 while ((row = mysql_fetch_row(indexes))) {
2277 if (!strcmp(row[2], "PRIMARY") && (!strcmp(row[3], "1"))) {
2278 /* Pick first column in PK, cardinality doesn't matter */
2279 field = row[4];
2280 break;
2281 }
2282 }
2283
2284 /* If no PK found, try using first UNIQUE index */
2285 if (!field) {
2286 mysql_data_seek(indexes, 0);
2287 while ((row = mysql_fetch_row(indexes))) {
2288 if (!strcmp(row[1], "0") && (!strcmp(row[3], "1"))) {
2289 /* Again, first column of any unique index */
2290 field = row[4];
2291 break;
2292 }
2293 }
2294 }
2295 /* Still unlucky? Pick any high-cardinality index */
2296 if (!field && conf->use_any_index) {
2297 guint64 max_cardinality = 0;
2298 guint64 cardinality = 0;
2299
2300 mysql_data_seek(indexes, 0);
2301 while ((row = mysql_fetch_row(indexes))) {
2302 if (!strcmp(row[3], "1")) {
2303 if (row[6])
2304 cardinality = strtoul(row[6], NULL, 10);
2305 if (cardinality > max_cardinality) {
2306 field = row[4];
2307 max_cardinality = cardinality;
2308 }
2309 }
2310 }
2311 }
2312 }
2313 /* Oh well, no chunks today - no suitable index */
2314 if (!field)
2315 goto cleanup;
2316
2317 /* Get minimum/maximum */
2318 mysql_query(conn, query = g_strdup_printf(
2319 "SELECT %s MIN(`%s`),MAX(`%s`) FROM `%s`.`%s`",
2320 (detected_server == SERVER_TYPE_MYSQL)
2321 ? "/*!40001 SQL_NO_CACHE */"
2322 : "",
2323 field, field, database, table));
2324 g_free(query);
2325 minmax = mysql_store_result(conn);
2326
2327 if (!minmax)
2328 goto cleanup;
2329
2330 row = mysql_fetch_row(minmax);
2331 MYSQL_FIELD *fields = mysql_fetch_fields(minmax);
2332
2333 /* Check if all values are NULL */
2334 if (row[0] == NULL)
2335 goto cleanup;
2336
2337 char *min = row[0];
2338 char *max = row[1];
2339
2340 guint64 estimated_chunks, estimated_step, nmin, nmax, cutoff, rows;
2341
2342 /* Support just bigger INTs for now, very dumb, no verify approach */
2343 switch (fields[0].type) {
2344 case MYSQL_TYPE_LONG:
2345 case MYSQL_TYPE_LONGLONG:
2346 case MYSQL_TYPE_INT24:
2347 case MYSQL_TYPE_SHORT:
2348 /* Got total number of rows, skip chunk logic if estimates are low */
2349 rows = estimate_count(conn, database, table, field, min, max);
2350 if (rows <= rows_per_file)
2351 goto cleanup;
2352
2353 /* This is estimate, not to use as guarantee! Every chunk would have eventual
2354 * adjustments */
2355 estimated_chunks = rows / rows_per_file;
2356 /* static stepping */
2357 nmin = strtoul(min, NULL, 10);
2358 nmax = strtoul(max, NULL, 10);
2359 estimated_step = (nmax - nmin) / estimated_chunks + 1;
2360 if (estimated_step > max_rows)
2361 estimated_step = max_rows;
2362 cutoff = nmin;
2363 while (cutoff <= nmax) {
2364 chunks = g_list_prepend(
2365 chunks,
2366 g_strdup_printf("%s%s%s%s(`%s` >= %llu AND `%s` < %llu)",
2367 !showed_nulls ? "`" : "",
2368 !showed_nulls ? field : "",
2369 !showed_nulls ? "`" : "",
2370 !showed_nulls ? " IS NULL OR " : "", field,
2371 (unsigned long long)cutoff, field,
2372 (unsigned long long)(cutoff + estimated_step)));
2373 cutoff += estimated_step;
2374 showed_nulls = 1;
2375 }
2376 chunks = g_list_reverse(chunks);
2377 // TODO: We need to add more chunk options for different types
2378 default:
2379 goto cleanup;
2380 }
2381
2382 cleanup:
2383 if (indexes)
2384 mysql_free_result(indexes);
2385 if (minmax)
2386 mysql_free_result(minmax);
2387 if (total)
2388 mysql_free_result(total);
2389 return chunks;
2390 }
2391
2392 /* Try to get EXPLAIN'ed estimates of row in resultset */
estimate_count(MYSQL * conn,char * database,char * table,char * field,char * from,char * to)2393 guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field,
2394 char *from, char *to) {
2395 char *querybase, *query;
2396 int ret;
2397
2398 g_assert(conn && database && table);
2399
2400 querybase = g_strdup_printf("EXPLAIN SELECT `%s` FROM `%s`.`%s`",
2401 (field ? field : "*"), database, table);
2402 if (from || to) {
2403 g_assert(field != NULL);
2404 char *fromclause = NULL, *toclause = NULL;
2405 char *escaped;
2406 if (from) {
2407 escaped = g_new(char, strlen(from) * 2 + 1);
2408 mysql_real_escape_string(conn, escaped, from, strlen(from));
2409 fromclause = g_strdup_printf(" `%s` >= %s ", field, escaped);
2410 g_free(escaped);
2411 }
2412 if (to) {
2413 escaped = g_new(char, strlen(to) * 2 + 1);
2414 mysql_real_escape_string(conn, escaped, to, strlen(to));
2415 toclause = g_strdup_printf(" `%s` <= %s", field, escaped);
2416 g_free(escaped);
2417 }
2418 query = g_strdup_printf("%s WHERE %s %s %s", querybase,
2419 (from ? fromclause : ""),
2420 ((from && to) ? "AND" : ""), (to ? toclause : ""));
2421
2422 if (toclause)
2423 g_free(toclause);
2424 if (fromclause)
2425 g_free(fromclause);
2426 ret = mysql_query(conn, query);
2427 g_free(querybase);
2428 g_free(query);
2429 } else {
2430 ret = mysql_query(conn, querybase);
2431 g_free(querybase);
2432 }
2433
2434 if (ret) {
2435 g_warning("Unable to get estimates for %s.%s: %s", database, table,
2436 mysql_error(conn));
2437 }
2438
2439 MYSQL_RES *result = mysql_store_result(conn);
2440 MYSQL_FIELD *fields = mysql_fetch_fields(result);
2441
2442 guint i;
2443 for (i = 0; i < mysql_num_fields(result); i++) {
2444 if (!strcmp(fields[i].name, "rows"))
2445 break;
2446 }
2447
2448 MYSQL_ROW row = NULL;
2449
2450 guint64 count = 0;
2451
2452 if (result)
2453 row = mysql_fetch_row(result);
2454
2455 if (row && row[i])
2456 count = strtoul(row[i], NULL, 10);
2457
2458 if (result)
2459 mysql_free_result(result);
2460
2461 return (count);
2462 }
2463
old_create_backup_dir(char * new_directory)2464 void old_create_backup_dir(char *new_directory) {
2465 if (g_mkdir(new_directory, 0750) == -1) {
2466 if (errno != EEXIST) {
2467 g_critical("Unable to create `%s': %s", new_directory, g_strerror(errno));
2468 exit(EXIT_FAILURE);
2469 }
2470 }
2471 }
2472
escape_string(MYSQL * conn,char * str)2473 char * escape_string(MYSQL *conn, char *str){
2474 char * r=g_new(char, strlen(str) * 2 + 1);
2475 mysql_real_escape_string(conn, r, str, strlen(str));
2476 return r;
2477 }
2478
get_ref_table(gchar * k)2479 gchar *get_ref_table(gchar *k){
2480 g_mutex_lock(ref_table_mutex);
2481 gchar *val=g_hash_table_lookup(ref_table,k);
2482 if (val == NULL){
2483 val=determine_filename(g_strdup(k));
2484 g_hash_table_insert(ref_table, k, val);
2485 }
2486 g_mutex_unlock(ref_table_mutex);
2487 return val;
2488 }
2489
new_database(MYSQL * conn,char * database_name,gboolean already_dumped)2490 struct database * new_database(MYSQL *conn, char *database_name, gboolean already_dumped){
2491 struct database * d=g_new(struct database,1);
2492 d->name = g_strdup(database_name);
2493 d->filename = get_ref_table(d->name);
2494 d->escaped = escape_string(conn,d->name);
2495 d->already_dumped = already_dumped;
2496 d->ad_mutex=g_mutex_new();
2497 g_hash_table_insert(database_hash, d->name,d);
2498 return d;
2499 }
2500
get_database(MYSQL * conn,char * database_name,struct database ** database)2501 gboolean get_database(MYSQL *conn, char *database_name, struct database ** database){
2502 *database=g_hash_table_lookup(database_hash,database_name);
2503 if (*database == NULL){
2504 *database=new_database(conn,database_name,FALSE);
2505 return TRUE;
2506 }
2507 return FALSE;
2508 }
2509
new_db_table(MYSQL * conn,struct database * database,char * table,char * datalength)2510 struct db_table *new_db_table( MYSQL *conn, struct database *database, char *table, char *datalength){
2511 struct db_table *dbt = g_new(struct db_table, 1);
2512 dbt->database = database;
2513 dbt->table = g_strdup(table);
2514 dbt->table_filename = get_ref_table(dbt->table);
2515 dbt->rows_lock= g_mutex_new();
2516 dbt->escaped_table = escape_string(conn,dbt->table);
2517 dbt->anonymized_function=NULL;
2518 dbt->rows=0;
2519 if (!datalength)
2520 dbt->datalength = 0;
2521 else
2522 dbt->datalength = g_ascii_strtoull(datalength, NULL, 10);
2523 return dbt;
2524 }
2525
2526
dump_database(struct database * database,struct configuration * conf)2527 void dump_database(struct database *database, struct configuration *conf) {
2528
2529 g_atomic_int_inc(&database_counter);
2530
2531 struct job *j = g_new0(struct job, 1);
2532 struct dump_database_job *ddj = g_new0(struct dump_database_job, 1);
2533 j->job_data = (void *)ddj;
2534 ddj->database = database;
2535 j->conf = conf;
2536 j->type = JOB_DUMP_DATABASE;
2537
2538 if (less_locking)
2539 g_async_queue_push(conf->queue_less_locking, j);
2540 else
2541 g_async_queue_push(conf->queue, j);
2542 return;
2543 }
2544
2545
green_light(MYSQL * conn,struct configuration * conf,gboolean is_view,struct database * database,MYSQL_ROW * row,gchar * ecol)2546 void green_light(MYSQL *conn, struct configuration *conf, gboolean is_view, struct database * database, MYSQL_ROW *row, gchar *ecol){
2547 /* Green light! */
2548 g_mutex_lock(database->ad_mutex);
2549 if (!database->already_dumped){
2550 dump_create_database(database->name, conf);
2551 database->already_dumped=TRUE;
2552 }
2553 g_mutex_unlock(database->ad_mutex);
2554
2555 struct db_table *dbt = new_db_table( conn, database, (*row)[0], (*row)[6]);
2556
2557 // if is a view we care only about schema
2558 if (!is_view) {
2559 // with trx_consistency_only we dump all as innodb_tables
2560 if (!no_data) {
2561 if (ecol != NULL && g_ascii_strcasecmp("MRG_MYISAM",ecol)) {
2562 if (trx_consistency_only ||
2563 (ecol != NULL && !g_ascii_strcasecmp("InnoDB", ecol))) {
2564 g_mutex_lock(innodb_tables_mutex);
2565 innodb_tables = g_list_prepend(innodb_tables, dbt);
2566 g_mutex_unlock(innodb_tables_mutex);
2567 } else if (ecol != NULL &&
2568 !g_ascii_strcasecmp("TokuDB", ecol)) {
2569 g_mutex_lock(innodb_tables_mutex);
2570 innodb_tables = g_list_prepend(innodb_tables, dbt);
2571 g_mutex_unlock(innodb_tables_mutex);
2572 } else {
2573 g_mutex_lock(non_innodb_table_mutex);
2574 non_innodb_table = g_list_prepend(non_innodb_table, dbt);
2575 g_mutex_unlock(non_innodb_table_mutex);
2576 }
2577 }
2578 }
2579 if (!no_schemas) {
2580 g_mutex_lock(table_schemas_mutex);
2581 table_schemas = g_list_prepend(table_schemas, dbt);
2582 g_mutex_unlock(table_schemas_mutex);
2583 }
2584 } else {
2585 if (!no_schemas) {
2586 g_mutex_lock(view_schemas_mutex);
2587 view_schemas = g_list_prepend(view_schemas, dbt);
2588 g_mutex_unlock(view_schemas_mutex);
2589 }
2590 }
2591
2592 }
2593
2594
dump_database_thread(MYSQL * conn,struct configuration * conf,struct database * database)2595 void dump_database_thread(MYSQL *conn, struct configuration *conf, struct database *database) {
2596
2597 char *query;
2598 mysql_select_db(conn, database->name);
2599 if (detected_server == SERVER_TYPE_MYSQL ||
2600 detected_server == SERVER_TYPE_TIDB)
2601 query = g_strdup("SHOW TABLE STATUS");
2602 else
2603 query =
2604 g_strdup_printf("SELECT TABLE_NAME, ENGINE, TABLE_TYPE as COMMENT FROM "
2605 "DATA_DICTIONARY.TABLES WHERE TABLE_SCHEMA='%s'",
2606 database->escaped);
2607
2608 if (mysql_query(conn, (query))) {
2609 g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2610 mysql_error(conn));
2611 errors++;
2612 g_free(query);
2613 return;
2614 }
2615
2616 MYSQL_RES *result = mysql_store_result(conn);
2617 MYSQL_FIELD *fields = mysql_fetch_fields(result);
2618 guint i;
2619 int ecol = -1, ccol = -1;
2620 for (i = 0; i < mysql_num_fields(result); i++) {
2621 if (!strcasecmp(fields[i].name, "Engine"))
2622 ecol = i;
2623 else if (!strcasecmp(fields[i].name, "Comment"))
2624 ccol = i;
2625 }
2626
2627 if (!result) {
2628 g_critical("Could not list tables for %s: %s", database->name, mysql_error(conn));
2629 errors++;
2630 return;
2631 }
2632
2633 MYSQL_ROW row;
2634 while ((row = mysql_fetch_row(result))) {
2635
2636 int dump = 1;
2637 int is_view = 0;
2638
2639 /* We now do care about views!
2640 num_fields>1 kicks in only in case of 5.0 SHOW FULL TABLES or SHOW
2641 TABLE STATUS row[1] == NULL if it is a view in 5.0 'SHOW TABLE STATUS'
2642 row[1] == "VIEW" if it is a view in 5.0 'SHOW FULL TABLES'
2643 */
2644 if ((detected_server == SERVER_TYPE_MYSQL) &&
2645 (row[ccol] == NULL || !strcmp(row[ccol], "VIEW")))
2646 is_view = 1;
2647
2648 /* Check for broken tables, i.e. mrg with missing source tbl */
2649 if (!is_view && row[ecol] == NULL) {
2650 g_warning("Broken table detected, please review: %s.%s", database->name,
2651 row[0]);
2652 dump = 0;
2653 }
2654
2655 /* Skip ignored engines, handy for avoiding Merge, Federated or Blackhole
2656 * :-) dumps */
2657 if (dump && ignore && !is_view) {
2658 for (i = 0; ignore[i] != NULL; i++) {
2659 if (g_ascii_strcasecmp(ignore[i], row[ecol]) == 0) {
2660 dump = 0;
2661 break;
2662 }
2663 }
2664 }
2665
2666 /* Skip views */
2667 if (is_view && no_dump_views)
2668 dump = 0;
2669
2670 if (!dump)
2671 continue;
2672
2673 /* In case of table-list option is enabled, check if table is part of the
2674 * list */
2675 if (tables) {
2676 int table_found = 0;
2677 for (i = 0; tables[i] != NULL; i++)
2678 if (g_ascii_strcasecmp(tables[i], row[0]) == 0)
2679 table_found = 1;
2680
2681 if (!table_found)
2682 dump = 0;
2683 }
2684 if (!dump)
2685 continue;
2686
2687 /* Special tables */
2688 if (g_ascii_strcasecmp(database->name, "mysql") == 0 &&
2689 (g_ascii_strcasecmp(row[0], "general_log") == 0 ||
2690 g_ascii_strcasecmp(row[0], "slow_log") == 0 ||
2691 g_ascii_strcasecmp(row[0], "innodb_index_stats") == 0 ||
2692 g_ascii_strcasecmp(row[0], "innodb_table_stats") == 0)) {
2693 dump = 0;
2694 continue;
2695 }
2696
2697 /* Checks skip list on 'database.table' string */
2698 if (tables_skiplist && check_skiplist(database->name, row[0]))
2699 continue;
2700
2701 /* Checks PCRE expressions on 'database.table' string */
2702 if (regexstring && !check_regex(database->name, row[0]))
2703 continue;
2704
2705 /* Check if the table was recently updated */
2706 if (no_updated_tables && !is_view) {
2707 GList *iter;
2708 for (iter = no_updated_tables; iter != NULL; iter = iter->next) {
2709 if (g_ascii_strcasecmp(
2710 iter->data, g_strdup_printf("%s.%s", database->name, row[0])) == 0) {
2711 g_message("NO UPDATED TABLE: %s.%s", database->name, row[0]);
2712 dump = 0;
2713 }
2714 }
2715 }
2716
2717 if (!dump)
2718 continue;
2719
2720 green_light(conn,conf, is_view,database,&row,row[ecol]);
2721 /* Green light!
2722 struct db_table *dbt = new_db_table( conn, database, row[0], row[6]);
2723
2724 // if is a view we care only about schema
2725 if (!is_view) {
2726 // with trx_consistency_only we dump all as innodb_tables
2727 if (!no_data) {
2728 if (row[ecol] != NULL && g_ascii_strcasecmp("MRG_MYISAM", row[ecol])) {
2729 if (trx_consistency_only ||
2730 (row[ecol] != NULL && !g_ascii_strcasecmp("InnoDB", row[ecol]))) {
2731 g_mutex_lock(innodb_tables_mutex);
2732 innodb_tables = g_list_prepend(innodb_tables, dbt);
2733 g_mutex_unlock(innodb_tables_mutex);
2734 } else if (row[ecol] != NULL &&
2735 !g_ascii_strcasecmp("TokuDB", row[ecol])) {
2736 g_mutex_lock(innodb_tables_mutex);
2737 innodb_tables = g_list_prepend(innodb_tables, dbt);
2738 g_mutex_unlock(innodb_tables_mutex);
2739 } else {
2740 g_mutex_lock(non_innodb_table_mutex);
2741 non_innodb_table = g_list_prepend(non_innodb_table, dbt);
2742 g_mutex_unlock(non_innodb_table_mutex);
2743 }
2744 }
2745 }
2746 if (!no_schemas) {
2747 g_mutex_lock(table_schemas_mutex);
2748 table_schemas = g_list_prepend(table_schemas, dbt);
2749 g_mutex_unlock(table_schemas_mutex);
2750 }
2751 } else {
2752 if (!no_schemas) {
2753 g_mutex_lock(view_schemas_mutex);
2754 view_schemas = g_list_prepend(view_schemas, dbt);
2755 g_mutex_unlock(view_schemas_mutex);
2756 }
2757 }*/
2758
2759 }
2760
2761 mysql_free_result(result);
2762
2763 // Store Procedures and Events
2764 // As these are not attached to tables we need to define when we need to dump
2765 // or not Having regex filter make this hard because we dont now if a full
2766 // schema is filtered or not Also I cant decide this based on tables from a
2767 // schema being dumped So I will use only regex to dump or not SP and EVENTS I
2768 // only need one match to dump all
2769
2770 int post_dump = 0;
2771
2772 if (dump_routines) {
2773 // SP
2774 query = g_strdup_printf("SHOW PROCEDURE STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
2775 if (mysql_query(conn, (query))) {
2776 g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2777 mysql_error(conn));
2778 errors++;
2779 g_free(query);
2780 return;
2781 }
2782 result = mysql_store_result(conn);
2783 while ((row = mysql_fetch_row(result)) && !post_dump) {
2784 /* Checks skip list on 'database.sp' string */
2785 if (tables_skiplist && check_skiplist(database->name, row[1]))
2786 continue;
2787
2788 /* Checks PCRE expressions on 'database.sp' string */
2789 if (regexstring && !check_regex(database->name, row[1]))
2790 continue;
2791
2792 post_dump = 1;
2793 }
2794
2795 if (!post_dump) {
2796 // FUNCTIONS
2797 query = g_strdup_printf("SHOW FUNCTION STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
2798 if (mysql_query(conn, (query))) {
2799 g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2800 mysql_error(conn));
2801 errors++;
2802 g_free(query);
2803 return;
2804 }
2805 result = mysql_store_result(conn);
2806 while ((row = mysql_fetch_row(result)) && !post_dump) {
2807 /* Checks skip list on 'database.sp' string */
2808 if (tables_skiplist_file && check_skiplist(database->name, row[1]))
2809 continue;
2810 /* Checks PCRE expressions on 'database.sp' string */
2811 if (regexstring && !check_regex(database->name, row[1]))
2812 continue;
2813
2814 post_dump = 1;
2815 }
2816 }
2817 mysql_free_result(result);
2818 }
2819
2820 if (dump_events && !post_dump) {
2821 // EVENTS
2822 query = g_strdup_printf("SHOW EVENTS FROM `%s`", database->name);
2823 if (mysql_query(conn, (query))) {
2824 g_critical("Error: DB: %s - Could not execute query: %s", database->name,
2825 mysql_error(conn));
2826 errors++;
2827 g_free(query);
2828 return;
2829 }
2830 result = mysql_store_result(conn);
2831 while ((row = mysql_fetch_row(result)) && !post_dump) {
2832 /* Checks skip list on 'database.sp' string */
2833 if (tables_skiplist_file && check_skiplist(database->name, row[1]))
2834 continue;
2835 /* Checks PCRE expressions on 'database.sp' string */
2836 if (regexstring && !check_regex(database->name, row[1]))
2837 continue;
2838
2839 post_dump = 1;
2840 }
2841 mysql_free_result(result);
2842 }
2843
2844 if (post_dump) {
2845 struct schema_post *sp = g_new(struct schema_post, 1);
2846 sp->database = database;
2847 schema_post = g_list_prepend(schema_post, sp);
2848 }
2849
2850 g_free(query);
2851
2852 return;
2853 }
2854
get_tables(MYSQL * conn,struct configuration * conf)2855 void get_tables(MYSQL *conn, struct configuration *conf) {
2856
2857 gchar **dt = NULL;
2858 char *query = NULL;
2859 guint i, x;
2860
2861 for (x = 0; tables[x] != NULL; x++) {
2862 dt = g_strsplit(tables[x], ".", 0);
2863
2864 query =
2865 g_strdup_printf("SHOW TABLE STATUS FROM %s LIKE '%s'", dt[0], dt[1]);
2866
2867 if (mysql_query(conn, (query))) {
2868 g_critical("Error: DB: %s - Could not execute query: %s", dt[0],
2869 mysql_error(conn));
2870 errors++;
2871 return;
2872 }
2873
2874 MYSQL_RES *result = mysql_store_result(conn);
2875 MYSQL_FIELD *fields = mysql_fetch_fields(result);
2876 guint ecol = -1;
2877 guint ccol = -1;
2878 for (i = 0; i < mysql_num_fields(result); i++) {
2879 if (!strcasecmp(fields[i].name, "Engine"))
2880 ecol = i;
2881 else if (!strcasecmp(fields[i].name, "Comment"))
2882 ccol = i;
2883 }
2884
2885 if (!result) {
2886 g_warning("Could not list table for %s.%s: %s", dt[0], dt[1],
2887 mysql_error(conn));
2888 errors++;
2889 return;
2890 }
2891 struct database * database=NULL;
2892 if (get_database(conn, dt[0],&database)){
2893 dump_create_database(database->name, conf);
2894 g_async_queue_push(conf->ready_database_dump, GINT_TO_POINTER(1));
2895 }
2896 MYSQL_ROW row;
2897 while ((row = mysql_fetch_row(result))) {
2898
2899 int is_view = 0;
2900
2901 if ((detected_server == SERVER_TYPE_MYSQL) &&
2902 (row[ccol] == NULL || !strcmp(row[ccol], "VIEW")))
2903 is_view = 1;
2904 green_light(conn, conf, is_view, database, &row,row[ecol]);
2905 /* Green light!
2906 struct db_table *dbt = new_db_table( conn, get_database(conn, dt[0]), dt[1], NULL);
2907
2908 if (!is_view) {
2909 if (trx_consistency_only) {
2910 dump_table(conn, dbt, conf, TRUE);
2911 } else if (!g_ascii_strcasecmp("InnoDB", row[ecol])) {
2912 g_mutex_lock(innodb_tables_mutex);
2913 innodb_tables = g_list_prepend(innodb_tables, dbt);
2914 g_mutex_unlock(innodb_tables_mutex);
2915 } else if (!g_ascii_strcasecmp("TokuDB", row[ecol])) {
2916 g_mutex_lock(innodb_tables_mutex);
2917 innodb_tables = g_list_prepend(innodb_tables, dbt);
2918 g_mutex_unlock(innodb_tables_mutex);
2919 } else {
2920 g_mutex_lock(non_innodb_table_mutex);
2921 non_innodb_table = g_list_prepend(non_innodb_table, dbt);
2922 g_mutex_unlock(non_innodb_table_mutex);
2923 }
2924 if (!no_schemas) {
2925 g_mutex_lock(table_schemas_mutex);
2926 table_schemas = g_list_prepend(table_schemas, dbt);
2927 g_mutex_unlock(table_schemas_mutex);
2928 }
2929 } else {
2930 if (!no_schemas) {
2931 g_mutex_lock(view_schemas_mutex);
2932 view_schemas = g_list_prepend(view_schemas, dbt);
2933 g_mutex_unlock(view_schemas_mutex);
2934 }
2935 }
2936 */
2937 }
2938 }
2939
2940 g_free(query);
2941 }
2942
set_charset(GString * statement,char * character_set,char * collation_connection)2943 void set_charset(GString *statement, char *character_set,
2944 char *collation_connection) {
2945 g_string_printf(statement,
2946 "SET @PREV_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT;\n");
2947 g_string_append(statement,
2948 "SET @PREV_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS;\n");
2949 g_string_append(statement,
2950 "SET @PREV_COLLATION_CONNECTION=@@COLLATION_CONNECTION;\n");
2951
2952 g_string_append_printf(statement, "SET character_set_client = %s;\n",
2953 character_set);
2954 g_string_append_printf(statement, "SET character_set_results = %s;\n",
2955 character_set);
2956 g_string_append_printf(statement, "SET collation_connection = %s;\n",
2957 collation_connection);
2958 }
2959
restore_charset(GString * statement)2960 void restore_charset(GString *statement) {
2961 g_string_append(statement,
2962 "SET character_set_client = @PREV_CHARACTER_SET_CLIENT;\n");
2963 g_string_append(statement,
2964 "SET character_set_results = @PREV_CHARACTER_SET_RESULTS;\n");
2965 g_string_append(statement,
2966 "SET collation_connection = @PREV_COLLATION_CONNECTION;\n");
2967 }
2968
dump_schema_post_data(MYSQL * conn,struct database * database,char * filename)2969 void dump_schema_post_data(MYSQL *conn, struct database *database, char *filename) {
2970 void *outfile;
2971 char *query = NULL;
2972 MYSQL_RES *result = NULL;
2973 MYSQL_RES *result2 = NULL;
2974 MYSQL_ROW row;
2975 MYSQL_ROW row2;
2976 gchar **splited_st = NULL;
2977
2978 outfile = m_open(filename,"w");
2979
2980 if (!outfile) {
2981 g_critical("Error: DB: %s Could not create output file %s (%d)", database->name,
2982 filename, errno);
2983 errors++;
2984 return;
2985 }
2986
2987 GString *statement = g_string_sized_new(statement_size);
2988
2989 if (dump_routines) {
2990 // get functions
2991 query = g_strdup_printf("SHOW FUNCTION STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
2992 if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
2993 if (success_on_1146 && mysql_errno(conn) == 1146) {
2994 g_warning("Error dumping functions from %s: %s", database->name,
2995 mysql_error(conn));
2996 } else {
2997 g_critical("Error dumping functions from %s: %s", database->name,
2998 mysql_error(conn));
2999 errors++;
3000 }
3001 g_free(query);
3002 return;
3003 }
3004
3005 while ((row = mysql_fetch_row(result))) {
3006 set_charset(statement, row[8], row[9]);
3007 g_string_append_printf(statement, "DROP FUNCTION IF EXISTS `%s`;\n",
3008 row[1]);
3009 if (!write_data((FILE *)outfile, statement)) {
3010 g_critical("Could not write stored procedure data for %s.%s", database->name,
3011 row[1]);
3012 errors++;
3013 return;
3014 }
3015 g_string_set_size(statement, 0);
3016 query =
3017 g_strdup_printf("SHOW CREATE FUNCTION `%s`.`%s`", database->name, row[1]);
3018 mysql_query(conn, query);
3019 result2 = mysql_store_result(conn);
3020 row2 = mysql_fetch_row(result2);
3021 g_string_printf(statement, "%s", row2[2]);
3022 splited_st = g_strsplit(statement->str, ";\n", 0);
3023 g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3024 g_string_append(statement, ";\n");
3025 restore_charset(statement);
3026 if (!write_data((FILE *)outfile, statement)) {
3027 g_critical("Could not write function data for %s.%s", database->name, row[1]);
3028 errors++;
3029 return;
3030 }
3031 g_string_set_size(statement, 0);
3032 }
3033
3034 // get sp
3035 query = g_strdup_printf("SHOW PROCEDURE STATUS WHERE CAST(Db AS BINARY) = '%s'", database->escaped);
3036 if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3037 if (success_on_1146 && mysql_errno(conn) == 1146) {
3038 g_warning("Error dumping stored procedures from %s: %s", database->name,
3039 mysql_error(conn));
3040 } else {
3041 g_critical("Error dumping stored procedures from %s: %s", database->name,
3042 mysql_error(conn));
3043 errors++;
3044 }
3045 g_free(query);
3046 return;
3047 }
3048
3049 while ((row = mysql_fetch_row(result))) {
3050 set_charset(statement, row[8], row[9]);
3051 g_string_append_printf(statement, "DROP PROCEDURE IF EXISTS `%s`;\n",
3052 row[1]);
3053 if (!write_data((FILE *)outfile, statement)) {
3054 g_critical("Could not write stored procedure data for %s.%s", database->name,
3055 row[1]);
3056 errors++;
3057 return;
3058 }
3059 g_string_set_size(statement, 0);
3060 query =
3061 g_strdup_printf("SHOW CREATE PROCEDURE `%s`.`%s`", database->name, row[1]);
3062 mysql_query(conn, query);
3063 result2 = mysql_store_result(conn);
3064 row2 = mysql_fetch_row(result2);
3065 g_string_printf(statement, "%s", row2[2]);
3066 splited_st = g_strsplit(statement->str, ";\n", 0);
3067 g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3068 g_string_append(statement, ";\n");
3069 restore_charset(statement);
3070 if (!write_data((FILE *)outfile, statement)) {
3071 g_critical("Could not write stored procedure data for %s.%s", database->name,
3072 row[1]);
3073 errors++;
3074 return;
3075 }
3076 g_string_set_size(statement, 0);
3077 }
3078 }
3079
3080 // get events
3081 if (dump_events) {
3082 query = g_strdup_printf("SHOW EVENTS FROM `%s`", database->name);
3083 if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3084 if (success_on_1146 && mysql_errno(conn) == 1146) {
3085 g_warning("Error dumping events from %s: %s", database->name,
3086 mysql_error(conn));
3087 } else {
3088 g_critical("Error dumping events from %s: %s", database->name,
3089 mysql_error(conn));
3090 errors++;
3091 }
3092 g_free(query);
3093 return;
3094 }
3095
3096 while ((row = mysql_fetch_row(result))) {
3097 set_charset(statement, row[12], row[13]);
3098 g_string_append_printf(statement, "DROP EVENT IF EXISTS `%s`;\n", row[1]);
3099 if (!write_data((FILE *)outfile, statement)) {
3100 g_critical("Could not write stored procedure data for %s.%s", database->name,
3101 row[1]);
3102 errors++;
3103 return;
3104 }
3105 query = g_strdup_printf("SHOW CREATE EVENT `%s`.`%s`", database->name, row[1]);
3106 mysql_query(conn, query);
3107 result2 = mysql_store_result(conn);
3108 // DROP EVENT IF EXISTS event_name
3109 row2 = mysql_fetch_row(result2);
3110 g_string_printf(statement, "%s", row2[3]);
3111 splited_st = g_strsplit(statement->str, ";\n", 0);
3112 g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3113 g_string_append(statement, ";\n");
3114 restore_charset(statement);
3115 if (!write_data((FILE *)outfile, statement)) {
3116 g_critical("Could not write event data for %s.%s", database->name, row[1]);
3117 errors++;
3118 return;
3119 }
3120 g_string_set_size(statement, 0);
3121 }
3122 }
3123
3124 g_free(query);
3125 m_close(outfile);
3126 if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3127 g_string_free(statement, TRUE);
3128 g_strfreev(splited_st);
3129 if (result)
3130 mysql_free_result(result);
3131 if (result2)
3132 mysql_free_result(result2);
3133
3134 return;
3135 }
dump_triggers_data(MYSQL * conn,char * database,char * table,char * filename)3136 void dump_triggers_data(MYSQL *conn, char *database, char *table,
3137 char *filename) {
3138 void *outfile;
3139 char *query = NULL;
3140 MYSQL_RES *result = NULL;
3141 MYSQL_RES *result2 = NULL;
3142 MYSQL_ROW row;
3143 MYSQL_ROW row2;
3144 gchar **splited_st = NULL;
3145
3146 outfile = m_open(filename,"w");
3147
3148 if (!outfile) {
3149 g_critical("Error: DB: %s Could not create output file %s (%d)", database,
3150 filename, errno);
3151 errors++;
3152 return;
3153 }
3154
3155 GString *statement = g_string_sized_new(statement_size);
3156
3157 // get triggers
3158 query = g_strdup_printf("SHOW TRIGGERS FROM `%s` LIKE '%s'", database, table);
3159 if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3160 if (success_on_1146 && mysql_errno(conn) == 1146) {
3161 g_warning("Error dumping triggers (%s.%s): %s", database, table,
3162 mysql_error(conn));
3163 } else {
3164 g_critical("Error dumping triggers (%s.%s): %s", database, table,
3165 mysql_error(conn));
3166 errors++;
3167 }
3168 g_free(query);
3169 return;
3170 }
3171
3172 while ((row = mysql_fetch_row(result))) {
3173 set_charset(statement, row[8], row[9]);
3174 if (!write_data((FILE *)outfile, statement)) {
3175 g_critical("Could not write triggers data for %s.%s", database, table);
3176 errors++;
3177 return;
3178 }
3179 g_string_set_size(statement, 0);
3180 query = g_strdup_printf("SHOW CREATE TRIGGER `%s`.`%s`", database, row[0]);
3181 mysql_query(conn, query);
3182 result2 = mysql_store_result(conn);
3183 row2 = mysql_fetch_row(result2);
3184 g_string_append_printf(statement, "%s", row2[2]);
3185 splited_st = g_strsplit(statement->str, ";\n", 0);
3186 g_string_printf(statement, "%s", g_strjoinv("; \n", splited_st));
3187 g_string_append(statement, ";\n");
3188 restore_charset(statement);
3189 if (!write_data((FILE *)outfile, statement)) {
3190 g_critical("Could not write triggers data for %s.%s", database, table);
3191 errors++;
3192 return;
3193 }
3194 g_string_set_size(statement, 0);
3195 }
3196
3197 g_free(query);
3198 m_close(outfile);
3199 if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3200 g_string_free(statement, TRUE);
3201 g_strfreev(splited_st);
3202 if (result)
3203 mysql_free_result(result);
3204 if (result2)
3205 mysql_free_result(result2);
3206
3207 return;
3208 }
dump_schema_data(MYSQL * conn,char * database,char * table,char * filename)3209 void dump_schema_data(MYSQL *conn, char *database, char *table,
3210 char *filename) {
3211 void *outfile;
3212 char *query = NULL;
3213 MYSQL_RES *result = NULL;
3214 MYSQL_ROW row;
3215 outfile = m_open(filename,"w");
3216
3217 if (!outfile) {
3218 g_critical("Error: DB: %s Could not create output file %s (%d)", database,
3219 filename, errno);
3220 errors++;
3221 return;
3222 }
3223
3224 GString *statement = g_string_sized_new(statement_size);
3225
3226 if (detected_server == SERVER_TYPE_MYSQL) {
3227 g_string_printf(statement,"%s;\n",set_names_str);
3228 g_string_append(statement, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n\n");
3229 if (!skip_tz) {
3230 g_string_append(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3231 }
3232 } else if (detected_server == SERVER_TYPE_TIDB) {
3233 if (!skip_tz) {
3234 g_string_printf(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3235 }
3236 } else {
3237 g_string_printf(statement, "SET FOREIGN_KEY_CHECKS=0;\n");
3238 }
3239
3240 if (!write_data((FILE *)outfile, statement)) {
3241 g_critical("Could not write schema data for %s.%s", database, table);
3242 errors++;
3243 return;
3244 }
3245
3246 query = g_strdup_printf("SHOW CREATE TABLE `%s`.`%s`", database, table);
3247 if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3248 if (success_on_1146 && mysql_errno(conn) == 1146) {
3249 g_warning("Error dumping schemas (%s.%s): %s", database, table,
3250 mysql_error(conn));
3251 } else {
3252 g_critical("Error dumping schemas (%s.%s): %s", database, table,
3253 mysql_error(conn));
3254 errors++;
3255 }
3256 g_free(query);
3257 return;
3258 }
3259
3260 g_string_set_size(statement, 0);
3261
3262 /* There should never be more than one row */
3263 row = mysql_fetch_row(result);
3264 g_string_append(statement, row[1]);
3265 g_string_append(statement, ";\n");
3266 if (!write_data((FILE *)outfile, statement)) {
3267 g_critical("Could not write schema for %s.%s", database, table);
3268 errors++;
3269 }
3270 g_free(query);
3271
3272 m_close(outfile);
3273 if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3274 g_string_free(statement, TRUE);
3275 if (result)
3276 mysql_free_result(result);
3277
3278 return;
3279 }
3280
dump_view_data(MYSQL * conn,char * database,char * table,char * filename,char * filename2)3281 void dump_view_data(MYSQL *conn, char *database, char *table, char *filename,
3282 char *filename2) {
3283 void *outfile, *outfile2;
3284 char *query = NULL;
3285 MYSQL_RES *result = NULL;
3286 MYSQL_ROW row;
3287 GString *statement = g_string_sized_new(statement_size);
3288
3289 mysql_select_db(conn, database);
3290
3291 outfile = m_open(filename,"w");
3292 outfile2 = m_open(filename2,"w");
3293
3294 if (!outfile || !outfile2) {
3295 g_critical("Error: DB: %s Could not create output file (%d)", database,
3296 errno);
3297 errors++;
3298 return;
3299 }
3300
3301 if (detected_server == SERVER_TYPE_MYSQL) {
3302 g_string_printf(statement,"%s;\n",set_names_str);
3303 }
3304
3305 if (!write_data((FILE *)outfile, statement)) {
3306 g_critical("Could not write schema data for %s.%s", database, table);
3307 errors++;
3308 return;
3309 }
3310
3311 g_string_append_printf(statement, "DROP TABLE IF EXISTS `%s`;\n", table);
3312 g_string_append_printf(statement, "DROP VIEW IF EXISTS `%s`;\n", table);
3313
3314 if (!write_data((FILE *)outfile2, statement)) {
3315 g_critical("Could not write schema data for %s.%s", database, table);
3316 errors++;
3317 return;
3318 }
3319
3320 // we create tables as workaround
3321 // for view dependencies
3322 query = g_strdup_printf("SHOW FIELDS FROM `%s`.`%s`", database, table);
3323 if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3324 if (success_on_1146 && mysql_errno(conn) == 1146) {
3325 g_warning("Error dumping schemas (%s.%s): %s", database, table,
3326 mysql_error(conn));
3327 } else {
3328 g_critical("Error dumping schemas (%s.%s): %s", database, table,
3329 mysql_error(conn));
3330 errors++;
3331 }
3332 g_free(query);
3333 return;
3334 }
3335 g_free(query);
3336 g_string_set_size(statement, 0);
3337 g_string_append_printf(statement, "CREATE TABLE IF NOT EXISTS `%s`(\n", table);
3338 row = mysql_fetch_row(result);
3339 g_string_append_printf(statement, "`%s` int", row[0]);
3340 while ((row = mysql_fetch_row(result))) {
3341 g_string_append(statement, ",\n");
3342 g_string_append_printf(statement, "`%s` int", row[0]);
3343 }
3344 g_string_append(statement, "\n);\n");
3345
3346 if (result)
3347 mysql_free_result(result);
3348
3349 if (!write_data((FILE *)outfile, statement)) {
3350 g_critical("Could not write view schema for %s.%s", database, table);
3351 errors++;
3352 }
3353
3354 // real view
3355 query = g_strdup_printf("SHOW CREATE VIEW `%s`.`%s`", database, table);
3356 if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3357 if (success_on_1146 && mysql_errno(conn) == 1146) {
3358 g_warning("Error dumping schemas (%s.%s): %s", database, table,
3359 mysql_error(conn));
3360 } else {
3361 g_critical("Error dumping schemas (%s.%s): %s", database, table,
3362 mysql_error(conn));
3363 errors++;
3364 }
3365 g_free(query);
3366 return;
3367 }
3368 g_string_set_size(statement, 0);
3369
3370 /* There should never be more than one row */
3371 row = mysql_fetch_row(result);
3372 set_charset(statement, row[2], row[3]);
3373 g_string_append(statement, row[1]);
3374 g_string_append(statement, ";\n");
3375 restore_charset(statement);
3376 if (!write_data((FILE *)outfile2, statement)) {
3377 g_critical("Could not write schema for %s.%s", database, table);
3378 errors++;
3379 }
3380 g_free(query);
3381 m_close(outfile);
3382 if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3383 m_close(outfile2);
3384 if (stream) g_async_queue_push(stream_queue, g_strdup(filename2));
3385 g_string_free(statement, TRUE);
3386 if (result)
3387 mysql_free_result(result);
3388
3389 return;
3390 }
3391
dump_table_data_file(MYSQL * conn,struct table_job * tj)3392 void dump_table_data_file(MYSQL *conn, struct table_job *tj) {
3393 void *outfile = NULL;
3394
3395 outfile = m_open(tj->filename,"w");
3396
3397 if (!outfile) {
3398 g_critical("Error: DB: %s TABLE: %s Could not create output file %s (%d)",
3399 tj->database, tj->table, tj->filename, errno);
3400 errors++;
3401 return;
3402 }
3403 guint64 rows_count =
3404 dump_table_data(conn, (FILE *)outfile, tj);
3405
3406 if (!rows_count)
3407 g_message("Empty table %s.%s", tj->database, tj->table);
3408
3409 }
3410
dump_table_checksum(MYSQL * conn,char * database,char * table,char * filename)3411 void dump_table_checksum(MYSQL *conn, char *database, char *table, char *filename) {
3412 void *outfile = NULL;
3413
3414 outfile = g_fopen(filename, "w");
3415
3416 if (!outfile) {
3417 g_critical("Error: DB: %s TABLE: %s Could not create output file %s (%d)",
3418 database, table, filename, errno);
3419 errors++;
3420 return;
3421 }
3422 int errn=0;
3423
3424 gchar * checksum=checksum_table(conn, database, table, &errn);
3425 if (errn != 0 && !(success_on_1146 && errn == 1146)) {
3426 errors++;
3427 return;
3428 }
3429 fprintf(outfile, "%s", checksum);
3430 fclose(outfile);
3431
3432 if (stream) g_async_queue_push(stream_queue, g_strdup(filename));
3433 g_free(checksum);
3434
3435 return;
3436 }
3437
dump_checksum(struct db_table * dbt,struct configuration * conf)3438 void dump_checksum(struct db_table * dbt,
3439 struct configuration *conf) {
3440 struct job *j = g_new0(struct job, 1);
3441 struct table_checksum_job *tcj = g_new0(struct table_checksum_job, 1);
3442 j->job_data = (void *)tcj;
3443 tcj->database = dbt->database->name;
3444 tcj->table = g_strdup(dbt->table);
3445 j->conf = conf;
3446 j->type = JOB_CHECKSUM;
3447 tcj->filename = build_meta_filename(dbt->database->filename, dbt->table_filename,"checksum");
3448 g_async_queue_push(conf->queue, j);
3449 return;
3450 }
dump_schema(MYSQL * conn,struct db_table * dbt,struct configuration * conf)3451 void dump_schema(MYSQL *conn, struct db_table *dbt,
3452 struct configuration *conf) {
3453 struct job *j = g_new0(struct job, 1);
3454 struct schema_job *sj = g_new0(struct schema_job, 1);
3455 j->job_data = (void *)sj;
3456 sj->database = dbt->database->name;
3457 sj->table = g_strdup(dbt->table);
3458 j->conf = conf;
3459 j->type = JOB_SCHEMA;
3460 sj->filename = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema");
3461 g_async_queue_push(conf->queue, j);
3462
3463 if (dump_triggers) {
3464 char *query = NULL;
3465 MYSQL_RES *result = NULL;
3466
3467 query =
3468 g_strdup_printf("SHOW TRIGGERS FROM `%s` LIKE '%s'", dbt->database->name, dbt->escaped_table);
3469 if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
3470 g_critical("Error Checking triggers for %s.%s. Err: %s St: %s", dbt->database->name, dbt->table,
3471 mysql_error(conn),query);
3472 errors++;
3473 } else {
3474 if (mysql_num_rows(result)) {
3475 struct job *t = g_new0(struct job, 1);
3476 struct schema_job *st = g_new0(struct schema_job, 1);
3477 t->job_data = (void *)st;
3478 st->database = dbt->database->name;
3479 st->table = g_strdup(dbt->table);
3480 t->conf = conf;
3481 t->type = JOB_TRIGGERS;
3482 st->filename = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema-triggers");
3483 g_async_queue_push(conf->queue, t);
3484 }
3485 }
3486 g_free(query);
3487 if (result) {
3488 mysql_free_result(result);
3489 }
3490 }
3491 return;
3492 }
3493
dump_view(struct db_table * dbt,struct configuration * conf)3494 void dump_view(struct db_table *dbt, struct configuration *conf) {
3495 struct job *j = g_new0(struct job, 1);
3496 struct view_job *vj = g_new0(struct view_job, 1);
3497 j->job_data = (void *)vj;
3498 vj->database = dbt->database->name;
3499 vj->table = g_strdup(dbt->table);
3500 j->conf = conf;
3501 j->type = JOB_VIEW;
3502 vj->filename = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema");
3503 vj->filename2 = build_schema_table_filename(dbt->database->filename, dbt->table_filename, "schema-view");
3504 g_async_queue_push(conf->queue, j);
3505 return;
3506 }
3507
dump_schema_post(struct database * database,struct configuration * conf)3508 void dump_schema_post(struct database *database, struct configuration *conf) {
3509 struct job *j = g_new0(struct job, 1);
3510 struct schema_post_job *sp = g_new0(struct schema_post_job, 1);
3511 j->job_data = (void *)sp;
3512 sp->database = database;
3513 j->conf = conf;
3514 j->type = JOB_SCHEMA_POST;
3515 sp->filename = build_schema_filename(sp->database->filename,"schema-post");
3516 g_async_queue_push(conf->queue, j);
3517 return;
3518 }
3519
new_table_job(struct db_table * dbt,char * where,guint nchunk,gboolean has_generated_fields)3520 struct table_job * new_table_job(struct db_table *dbt, char *where, guint nchunk, gboolean has_generated_fields){
3521 struct table_job *tj = g_new0(struct table_job, 1);
3522 // begin Refactoring: We should review this, as dbt->database should not be free, so it might be no need to g_strdup.
3523 // from the ref table?? TODO
3524 tj->database=dbt->database->name;
3525 tj->table=g_strdup(dbt->table);
3526 // end
3527 tj->nchunk=nchunk;
3528 tj->where=where;
3529 tj->filename = build_data_filename(dbt->database->filename, dbt->table_filename, tj->nchunk);
3530 tj->has_generated_fields=has_generated_fields;
3531 tj->dbt=dbt;
3532 return tj;
3533 }
3534
dump_table(MYSQL * conn,struct db_table * dbt,struct configuration * conf,gboolean is_innodb)3535 void dump_table(MYSQL *conn, struct db_table *dbt,
3536 struct configuration *conf, gboolean is_innodb) {
3537 // char *database = dbt->database;
3538 // char *table = dbt->table;
3539 GList *chunks = NULL;
3540 if (rows_per_file)
3541 chunks = get_chunks_for_table(conn, dbt->database->name, dbt->table, conf);
3542
3543 gboolean has_generated_fields =
3544 detect_generated_fields(conn, dbt);
3545
3546 char *order_by = NULL;
3547 if (order_by_primary_key)
3548 order_by = get_primary_key_string(conn, dbt->database->name, dbt->table);
3549 if (chunks) {
3550 int nchunk = 0;
3551 GList *iter;
3552 for (iter = chunks; iter != NULL; iter = iter->next) {
3553 struct job *j = g_new0(struct job, 1);
3554 struct table_job *tj = NULL;
3555 j->conf = conf;
3556 j->type = is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB;
3557 tj = new_table_job(dbt, (char *)iter->data, nchunk, has_generated_fields);
3558 if (order_by) {
3559 tj->order_by = g_strdup(order_by);
3560 } else {
3561 tj->order_by = NULL;
3562 }
3563 j->job_data = (void *)tj;
3564 if (!is_innodb && nchunk)
3565 g_atomic_int_inc(&non_innodb_table_counter);
3566 g_async_queue_push(conf->queue, j);
3567 nchunk++;
3568 }
3569 g_list_free(chunks);
3570 } else {
3571 struct job *j = g_new0(struct job, 1);
3572 struct table_job *tj = NULL;
3573 j->conf = conf;
3574 j->type = is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB;
3575 tj = new_table_job(dbt, NULL, 0, has_generated_fields);
3576 if (order_by) {
3577 tj->order_by = g_strdup(order_by);
3578 } else {
3579 tj->order_by = NULL;
3580 }
3581 j->job_data = (void *)tj;
3582 g_async_queue_push(conf->queue, j);
3583 }
3584 g_free(order_by);
3585 }
3586
dump_tables(MYSQL * conn,GList * noninnodb_tables_list,struct configuration * conf)3587 void dump_tables(MYSQL *conn, GList *noninnodb_tables_list,
3588 struct configuration *conf) {
3589 struct db_table *dbt=NULL;
3590 GList *chunks = NULL;
3591
3592 struct job *j = g_new0(struct job, 1);
3593 struct tables_job *tjs = g_new0(struct tables_job, 1);
3594 j->conf = conf;
3595 j->type = JOB_LOCK_DUMP_NON_INNODB;
3596 j->job_data = (void *)tjs;
3597
3598 GList *iter;
3599 for (iter = noninnodb_tables_list; iter != NULL; iter = iter->next) {
3600 dbt = (struct db_table *)iter->data;
3601
3602 if (rows_per_file)
3603 chunks = get_chunks_for_table(conn, dbt->database->name, dbt->table, conf);
3604 gboolean has_generated_fields =
3605 detect_generated_fields(conn, dbt);
3606
3607 if (chunks) {
3608 int nchunk = 0;
3609 GList *citer;
3610 gchar *order_by = NULL;
3611 if (order_by_primary_key)
3612 order_by = get_primary_key_string(conn, dbt->database->name, dbt->table);
3613 for (citer = chunks; citer != NULL; citer = citer->next) {
3614 struct table_job *tj = NULL;
3615 tj = new_table_job(dbt, (char *)iter->data, nchunk, has_generated_fields);
3616 if (order_by)
3617 tj->order_by = g_strdup(order_by);
3618 else
3619 tj->order_by = NULL;
3620 tjs->table_job_list = g_list_prepend(tjs->table_job_list, tj);
3621 nchunk++;
3622 }
3623 if (order_by)
3624 g_free(order_by);
3625 g_list_free(chunks);
3626 } else {
3627 struct table_job *tj = NULL;
3628 tj = new_table_job(dbt, NULL, 0, has_generated_fields);
3629 if (order_by_primary_key)
3630 tj->order_by = get_primary_key_string(conn, dbt->database->name, dbt->table);
3631 else
3632 tj->order_by = NULL;
3633 tjs->table_job_list = g_list_prepend(tjs->table_job_list, tj);
3634 }
3635 }
3636 tjs->table_job_list = g_list_reverse(tjs->table_job_list);
3637 g_async_queue_push(conf->queue_less_locking, j);
3638 }
append_columns(GString * statement,MYSQL_FIELD * fields,guint num_fields)3639 void append_columns (GString *statement, MYSQL_FIELD *fields, guint num_fields){
3640 guint i = 0;
3641 for (i = 0; i < num_fields; ++i) {
3642 if (i > 0) {
3643 g_string_append_c(statement, ',');
3644 }
3645 g_string_append_printf(statement, "`%s`", fields[i].name);
3646 }
3647 }
3648
append_insert(gboolean condition,GString * statement,char * table,MYSQL_FIELD * fields,guint num_fields)3649 void append_insert (gboolean condition, GString *statement, char *table, MYSQL_FIELD *fields, guint num_fields){
3650 if (condition) {
3651 if (insert_ignore) {
3652 g_string_printf(statement, "INSERT IGNORE INTO `%s` (", table);
3653 } else {
3654 g_string_printf(statement, "INSERT INTO `%s` (", table);
3655 }
3656 append_columns(statement,fields,num_fields);
3657 g_string_append(statement, ") VALUES");
3658 } else {
3659 if (insert_ignore) {
3660 g_string_printf(statement, "INSERT IGNORE INTO `%s` VALUES", table);
3661 } else {
3662 g_string_printf(statement, "INSERT INTO `%s` VALUES", table);
3663 }
3664 }
3665 }
3666
3667 /* Do actual data chunk reading/writing magic */
dump_table_data(MYSQL * conn,FILE * file,struct table_job * tj)3668 guint64 dump_table_data(MYSQL *conn, FILE *file, struct table_job * tj){
3669 // There are 2 possible options to chunk the files:
3670 // - no chunk: this means that will be just 1 data file
3671 // - chunk_filesize: this function will be spliting the per filesize, this means that multiple files will be created
3672 // Split by row is before this step
3673 // It could write multiple INSERT statments in a data file if statement_size is reached
3674 guint i;
3675 guint fn = 0;
3676 guint st_in_file = 0;
3677 guint num_fields = 0;
3678 guint64 num_rows = 0;
3679 guint64 num_rows_st = 0;
3680 MYSQL_RES *result = NULL;
3681 char *query = NULL;
3682 gchar *fcfile = NULL;
3683 gchar *load_data_fn=NULL;
3684 // gchar *filename_prefix = NULL;
3685 struct db_table * dbt = tj->dbt;
3686 /* Buffer for escaping field values */
3687 GString *escaped = g_string_sized_new(3000);
3688 FILE *main_file=file;
3689 if (chunk_filesize) {
3690 fcfile = build_data_filename(dbt->database->filename, dbt->table_filename, fn);
3691 }else{
3692 fcfile = g_strdup(tj->filename);
3693 }
3694
3695 gboolean has_generated_fields = tj->has_generated_fields;
3696
3697 /* Ghm, not sure if this should be statement_size - but default isn't too big
3698 * for now */
3699 GString *statement = g_string_sized_new(statement_size);
3700 GString *statement_row = g_string_sized_new(0);
3701
3702 GString *select_fields;
3703
3704 if (has_generated_fields) {
3705 select_fields = get_insertable_fields(conn, tj->database, tj->table);
3706 } else {
3707 select_fields = g_string_new("*");
3708 }
3709
3710 /* Poor man's database code */
3711 query = g_strdup_printf(
3712 "SELECT %s %s FROM `%s`.`%s` %s %s %s %s %s %s",
3713 (detected_server == SERVER_TYPE_MYSQL) ? "/*!40001 SQL_NO_CACHE */" : "",
3714 select_fields->str, tj->database, tj->table, (tj->where || where_option ) ? "WHERE" : "",
3715 tj->where ? tj->where : "", (tj->where && where_option ) ? "AND" : "", where_option ? where_option : "", tj->order_by ? "ORDER BY" : "",
3716 tj->order_by ? tj->order_by : "");
3717 g_string_free(select_fields, TRUE);
3718 if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
3719 // ERROR 1146
3720 if (success_on_1146 && mysql_errno(conn) == 1146) {
3721 g_warning("Error dumping table (%s.%s) data: %s ", tj->database, tj->table,
3722 mysql_error(conn));
3723 } else {
3724 g_critical("Error dumping table (%s.%s) data: %s ", tj->database, tj->table,
3725 mysql_error(conn));
3726 errors++;
3727 }
3728 goto cleanup;
3729 }
3730
3731 num_fields = mysql_num_fields(result);
3732 MYSQL_FIELD *fields = mysql_fetch_fields(result);
3733
3734 MYSQL_ROW row;
3735
3736 g_string_set_size(statement, 0);
3737
3738 // TODO #364: this is the place where we need to link the column between file loaded and dbt.
3739 // Currently, we are using identity_function, which return the same data.
3740 for(i=0; i< num_fields;i++){
3741 if (i>0){
3742 dbt->anonymized_function=g_list_append(dbt->anonymized_function,&identity_function);
3743 }else{
3744 dbt->anonymized_function=g_list_append(dbt->anonymized_function,&identity_function);
3745 }
3746 }
3747
3748 gboolean first_time=TRUE;
3749 /* Poor man's data dump code */
3750 while ((row = mysql_fetch_row(result))) {
3751 gulong *lengths = mysql_fetch_lengths(result);
3752 num_rows++;
3753
3754 if (!statement->len) {
3755
3756 // A file can be chunked by amount of rows or file size.
3757 //
3758 if (!st_in_file) {
3759 // File Header
3760 if (detected_server == SERVER_TYPE_MYSQL) {
3761 g_string_printf(statement,"%s;\n",set_names_str);
3762 g_string_append(statement, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n");
3763 if (!skip_tz) {
3764 g_string_append(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3765 }
3766 } else if (detected_server == SERVER_TYPE_TIDB) {
3767 if (!skip_tz) {
3768 g_string_printf(statement, "/*!40103 SET TIME_ZONE='+00:00' */;\n");
3769 }
3770 } else {
3771 g_string_printf(statement, "SET FOREIGN_KEY_CHECKS=0;\n");
3772 }
3773
3774 if (!write_data(file, statement)) {
3775 g_critical("Could not write out data for %s.%s", tj->database, tj->table);
3776 goto cleanup;
3777 }
3778 }
3779 if ( load_data ){
3780 if (first_time){
3781 load_data_fn=build_filename(dbt->database->filename, dbt->table_filename, fn, "dat");
3782 // load_data_fn=g_strdup_printf("%s%05d.dat%s", filename_prefix, fn,
3783 // (compress_output ? ".gz" : ""));
3784 g_string_printf(statement, "LOAD DATA LOCAL INFILE '%s' REPLACE INTO TABLE `%s` ",load_data_fn,tj->table);
3785 if (fields_terminated_by_ld)
3786 g_string_append_printf(statement, "FIELDS TERMINATED BY '%s' ",fields_terminated_by_ld);
3787 if (fields_enclosed_by_ld)
3788 g_string_append_printf(statement, "ENCLOSED BY '%s' ",fields_enclosed_by_ld);
3789 if (fields_escaped_by)
3790 g_string_append_printf(statement, "ESCAPED BY '%s' ",fields_escaped_by);
3791 g_string_append(statement, "LINES ");
3792 if (lines_starting_by_ld)
3793 g_string_append_printf(statement, "STARTING BY '%s' ",lines_starting_by_ld);
3794 g_string_append_printf(statement, "TERMINATED BY '%s' (", lines_terminated_by_ld);
3795
3796 append_columns(statement,fields,num_fields);
3797 g_string_append(statement,");\n");
3798 if (!write_data(main_file, statement)) {
3799 g_critical("Could not write out data for %s.%s", tj->database, tj->table);
3800 goto cleanup;
3801 }else{
3802 g_string_set_size(statement, 0);
3803 g_free(fcfile);
3804 fcfile=load_data_fn;
3805
3806 if (!compress_output) {
3807 fclose((FILE *)file);
3808 file = g_fopen(fcfile, "a");
3809 } else {
3810 gzclose((gzFile)file);
3811 file = (void *)gzopen(fcfile, "a");
3812 }
3813 }
3814 first_time=FALSE;
3815 }
3816 }else{
3817 append_insert ((complete_insert || has_generated_fields), statement, tj->table, fields, num_fields);
3818 }
3819 num_rows_st = 0;
3820 }
3821
3822 if (statement_row->len) {
3823 g_string_append(statement, statement_row->str);
3824 g_string_set_size(statement_row, 0);
3825 num_rows_st++;
3826 }
3827
3828 g_string_append(statement_row, lines_starting_by);
3829 GList *f = dbt->anonymized_function;
3830 for (i = 0; i < num_fields; i++) {
3831 gchar * (*fun_ptr)(gchar **) = f->data;
3832 f=f->next;
3833 if (load_data){
3834 if (!row[i]) {
3835 // g_string_append(statement_row,fields_enclosed_by);
3836 g_string_append(statement_row, "NULL");
3837 // g_string_append(statement_row,fields_enclosed_by);
3838 }else if (fields[i].type != MYSQL_TYPE_LONG && fields[i].type != MYSQL_TYPE_LONGLONG && fields[i].type != MYSQL_TYPE_INT24 && fields[i].type != MYSQL_TYPE_SHORT ){
3839 g_string_append(statement_row,fields_enclosed_by);
3840 g_string_append(statement_row,fun_ptr(&(row[i])));
3841 g_string_append(statement_row,fields_enclosed_by);
3842 }else
3843 g_string_append(statement_row,fun_ptr(&(row[i])));
3844 }else{
3845 /* Don't escape safe formats, saves some time */
3846 if (!row[i]) {
3847 g_string_append(statement_row, "NULL");
3848 } else if (fields[i].flags & NUM_FLAG) {
3849 g_string_append(statement_row, fun_ptr(&(row[i])));
3850 } else {
3851 /* We reuse buffers for string escaping, growing is expensive just at
3852 * the beginning */
3853 g_string_set_size(escaped, lengths[i] * 2 + 1);
3854 mysql_real_escape_string(conn, escaped->str, fun_ptr(&(row[i])), lengths[i]);
3855 if (fields[i].type == MYSQL_TYPE_JSON)
3856 g_string_append(statement_row, "CONVERT(");
3857 g_string_append_c(statement_row, '\"');
3858 g_string_append(statement_row, escaped->str);
3859 g_string_append_c(statement_row, '\"');
3860 if (fields[i].type == MYSQL_TYPE_JSON)
3861 g_string_append(statement_row, " USING UTF8MB4)");
3862 }
3863 }
3864 if (i < num_fields - 1) {
3865 g_string_append(statement_row, fields_terminated_by);
3866 } else {
3867 g_string_append_printf(statement_row,"%s", lines_terminated_by);
3868
3869 /* INSERT statement is closed before over limit */
3870 if (statement->len + statement_row->len + 1 > statement_size) {
3871 if (num_rows_st == 0) {
3872 g_string_append(statement, statement_row->str);
3873 g_string_set_size(statement_row, 0);
3874 g_warning("Row bigger than statement_size for %s.%s", tj->database,
3875 tj->table);
3876 }
3877 g_string_append(statement, statement_terminated_by);
3878
3879 if (!write_data(file, statement)) {
3880 g_critical("Could not write out data for %s.%s", tj->database, tj->table);
3881 goto cleanup;
3882 } else {
3883 st_in_file++;
3884 if (chunk_filesize &&
3885 st_in_file * (guint)ceil((float)statement_size / 1024 / 1024) >
3886 chunk_filesize) {
3887 fn++;
3888 g_free(fcfile);
3889 m_close(file);
3890 fcfile = build_data_filename(dbt->database->filename, dbt->table_filename, fn);
3891 if (stream) g_async_queue_push(stream_queue, g_strdup(fcfile));
3892 file = m_open(fcfile,"w");
3893
3894 st_in_file = 0;
3895 }
3896 }
3897 g_string_set_size(statement, 0);
3898 } else {
3899 if (num_rows_st && ! load_data)
3900 g_string_append_c(statement, ',');
3901 g_string_append(statement, statement_row->str);
3902 num_rows_st++;
3903 g_string_set_size(statement_row, 0);
3904 }
3905 }
3906 }
3907 }
3908 if (mysql_errno(conn)) {
3909 g_critical("Could not read data from %s.%s: %s", tj->database, tj->table,
3910 mysql_error(conn));
3911 errors++;
3912 }
3913
3914 if (statement_row->len > 0) {
3915 /* this last row has not been written out */
3916 if (statement->len > 0) {
3917 /* strange, should not happen */
3918 g_string_append(statement, statement_row->str);
3919 } else {
3920 append_insert (complete_insert, statement, tj->table, fields, num_fields);
3921 g_string_append(statement, statement_row->str);
3922 }
3923 }
3924
3925 if (statement->len > 0) {
3926 g_string_append(statement, statement_terminated_by);
3927 if (!write_data(file, statement)) {
3928 g_critical(
3929 "Could not write out closing newline for %s.%s, now this is sad!",
3930 tj->database, tj->table);
3931 goto cleanup;
3932 }
3933 st_in_file++;
3934 }
3935
3936 cleanup:
3937 g_free(query);
3938
3939 g_string_free(escaped, TRUE);
3940 g_string_free(statement, TRUE);
3941 g_string_free(statement_row, TRUE);
3942
3943 if (result) {
3944 mysql_free_result(result);
3945 }
3946
3947 if (file) {
3948 m_close(file);
3949 }
3950
3951 if (!st_in_file && !build_empty_files) {
3952 // dropping the useless file
3953 if (remove(fcfile)) {
3954 g_warning("Failed to remove empty file : %s\n", fcfile);
3955 }
3956 } else if (chunk_filesize && fn == 0) {
3957 if (stream) g_async_queue_push(stream_queue, g_strdup(fcfile));
3958 }else{
3959 if (stream) g_async_queue_push(stream_queue, g_strdup(tj->filename));
3960 }
3961
3962 g_mutex_lock(dbt->rows_lock);
3963 dbt->rows+=num_rows;
3964 g_mutex_unlock(dbt->rows_lock);
3965
3966 g_free(fcfile);
3967
3968 return num_rows;
3969 }
3970
3971
write_data(FILE * file,GString * data)3972 gboolean write_data(FILE *file, GString *data) {
3973 size_t written = 0;
3974 ssize_t r = 0;
3975 while (written < data->len) {
3976 r=m_write(file, data->str + written, data->len);
3977 if (r < 0) {
3978 g_critical("Couldn't write data to a file: %s", strerror(errno));
3979 errors++;
3980 return FALSE;
3981 }
3982 written += r;
3983 }
3984
3985 return TRUE;
3986 }
3987