1 /*
2 This program is free software: you can redistribute it and/or modify
3 it under the terms of the GNU General Public License as published by
4 the Free Software Foundation, either version 3 of the License, or
5 (at your option) any later version.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program. If not, see <http://www.gnu.org/licenses/>.
14
15 Authors: Andrew Hutchings, SkySQL (andrew at skysql dot com)
16 David Ducos, Percona (david dot ducos at percona dot com)
17
18 */
19
20 #define _LARGEFILE64_SOURCE
21 #define _FILE_OFFSET_BITS 64
22
23 #include <mysql.h>
24
25 #if defined MARIADB_CLIENT_VERSION_STR && !defined MYSQL_SERVER_VERSION
26 #define MYSQL_SERVER_VERSION MARIADB_CLIENT_VERSION_STR
27 #endif
28
29 #include <unistd.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <glib.h>
33 #include <glib/gstdio.h>
34 #include <stdlib.h>
35 #include <stdarg.h>
36 #include <errno.h>
37 #ifdef ZWRAP_USE_ZSTD
38 #include "zstd/zstd_zlibwrapper.h"
39 #else
40 #include <zlib.h>
41 #endif
42 #include "config.h"
43 #include "common.h"
44 #include "myloader.h"
45 #include "connection.h"
46 #include "getPassword.h"
47 #include "logging.h"
48 #include "set_verbose.h"
49 #include "locale.h"
50 #include "server_detect.h"
51
52 guint commit_count = 1000;
53 gchar *input_directory = NULL;
54 gchar *directory = NULL;
55 gboolean overwrite_tables = FALSE;
56 gboolean innodb_optimize_keys = FALSE;
57 gboolean enable_binlog = FALSE;
58 gboolean disable_redo_log = FALSE;
59 guint rows = 0;
60 gchar *source_db = NULL;
61 gchar *purge_mode_str=NULL;
62 gchar *set_names_str=NULL;
63 enum purge_mode purge_mode;
64 static GMutex *init_mutex = NULL;
65 static GMutex *progress_mutex = NULL;
66 guint errors = 0;
67 guint max_threads_per_table=4;
68 unsigned long long int total_data_sql_files = 0;
69 unsigned long long int progress = 0;
70 GHashTable *db_hash=NULL;
71 GHashTable *tbl_hash=NULL;
72
73 const char DIRECTORY[] = "import";
74
75 gboolean read_data(FILE *file, gboolean is_compressed, GString *data,
76 gboolean *eof);
77 int restore_data_from_file(struct thread_data *td, char *database, char *table,
78 const char *filename, gboolean is_schema);
79 int restore_data_in_gstring_by_statement(struct thread_data *td,
80 GString *data, gboolean is_schema,
81 guint *query_counter);
82 int restore_data_in_gstring(struct thread_data *td, GString *data, gboolean is_schema, guint *query_counter);
83 void *process_queue(struct thread_data *td);
84 void checksum_table_filename(const gchar *filename, MYSQL *conn);
85 void load_directory_information(struct configuration *conf);
86 void checksum_databases(struct thread_data *td);
87 void no_log(const gchar *log_domain, GLogLevelFlags log_level,
88 const gchar *message, gpointer user_data);
89 void create_database(struct thread_data *td, gchar *database);
90 gint compare_dbt(gconstpointer a, gconstpointer b, gpointer table_hash);
91 gint compare_filename_part (gconstpointer a, gconstpointer b);
92 void get_database_table_from_file(const gchar *filename,const char *sufix,gchar **database,gchar **table);
93 void append_alter_table(GString * alter_table_statement, char *database, char *table);
94 void finish_alter_table(GString * alter_table_statement);
95 guint execute_use(struct thread_data *td, const gchar *msg);
96 int overwrite_table(MYSQL *conn,gchar * database, gchar * table);
97 gchar * get_database_name_from_content(const gchar *filename);
98 void process_restore_job(struct thread_data *td, struct restore_job *rj, int count);
99 gboolean process_job(struct thread_data *td, struct job *job, int count);
100 void *process_stream(struct configuration *conf);
101 GAsyncQueue *get_queue_for_type(struct configuration *conf, enum file_type current_ft);
102 void execute_use_if_needs_to(struct thread_data *td, gchar *database, const gchar * msg);
103
104 static GOptionEntry entries[] = {
105 {"directory", 'd', 0, G_OPTION_ARG_STRING, &input_directory,
106 "Directory of the dump to import", NULL},
107 {"queries-per-transaction", 'q', 0, G_OPTION_ARG_INT, &commit_count,
108 "Number of queries per transaction, default 1000", NULL},
109 {"overwrite-tables", 'o', 0, G_OPTION_ARG_NONE, &overwrite_tables,
110 "Drop tables if they already exist", NULL},
111 {"database", 'B', 0, G_OPTION_ARG_STRING, &db,
112 "An alternative database to restore into", NULL},
113 {"source-db", 's', 0, G_OPTION_ARG_STRING, &source_db,
114 "Database to restore", NULL},
115 {"enable-binlog", 'e', 0, G_OPTION_ARG_NONE, &enable_binlog,
116 "Enable binary logging of the restore data", NULL},
117 {"innodb-optimize-keys", 0, 0, G_OPTION_ARG_NONE, &innodb_optimize_keys,
118 "Creates the table without the indexes and it adds them at the end", NULL},
119 { "set-names",0, 0, G_OPTION_ARG_STRING, &set_names_str,
120 "Sets the names, use it at your own risk, default binary", NULL },
121 {"logfile", 'L', 0, G_OPTION_ARG_FILENAME, &logfile,
122 "Log file name to use, by default stdout is used", NULL},
123 { "purge-mode", 0, 0, G_OPTION_ARG_STRING, &purge_mode_str,
124 "This specify the truncate mode which can be: NONE, DROP, TRUNCATE and DELETE", NULL },
125 { "disable-redo-log", 0, 0, G_OPTION_ARG_NONE, &disable_redo_log,
126 "Disables the REDO_LOG and enables it after, doesn't check initial status", NULL },
127 {"rows", 'r', 0, G_OPTION_ARG_INT, &rows,
128 "Split the INSERT statement into this many rows.", NULL},
129 {"max-threads-per-table", 0, 0, G_OPTION_ARG_INT, &max_threads_per_table,
130 "Maximum number of threads per table to use, default 4", NULL},
131 {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}};
132
133
split_and_restore_data_in_gstring_by_statement(struct thread_data * td,GString * data,gboolean is_schema,guint * query_counter)134 int split_and_restore_data_in_gstring_by_statement(struct thread_data *td,
135 GString *data, gboolean is_schema, guint *query_counter)
136 {
137 char *next_line=g_strstr_len(data->str,-1,"\n");
138 char *insert_statement=g_strndup(data->str, next_line - data->str);
139 int r=0;
140 gchar *current_line=next_line+1;
141 next_line=g_strstr_len(current_line, -1, "\n");
142 GString * new_insert=g_string_sized_new(strlen(insert_statement));
143 do {
144 guint current_rows=0;
145 new_insert=g_string_append(new_insert,insert_statement);
146 do {
147 char *line=g_strndup(current_line, next_line - current_line);
148 g_string_append(new_insert, line);
149 g_free(line);
150 current_rows++;
151 current_line=next_line+1;
152 next_line=g_strstr_len(current_line, -1, "\n");
153 } while (current_rows < rows && next_line != NULL);
154 new_insert->str[new_insert->len - 1]=';';
155 r+=restore_data_in_gstring_by_statement(td, new_insert, is_schema, query_counter);
156 } while (next_line != NULL);
157 g_string_free(new_insert,TRUE);
158 g_string_set_size(data, 0);
159 return r;
160 }
161
new_job(enum job_type type,void * job_data,char * use_database)162 struct job * new_job (enum job_type type, void *job_data, char *use_database) {
163 struct job *j = g_new0(struct job, 1);
164 j->type = type;
165 j->use_database=use_database;
166 switch (type){
167 case JOB_WAIT:
168 j->job_data = (GAsyncQueue *)job_data;
169 case JOB_SHUTDOWN:
170 break;
171 default:
172 j->job_data = (struct restore_job *)job_data;
173 }
174 return j;
175 }
176
append_new_db_table(char * filename,gchar * database,gchar * table,guint64 number_rows,GHashTable * table_hash,GString * alter_table_statement)177 struct db_table* append_new_db_table(char * filename, gchar * database, gchar *table, guint64 number_rows, GHashTable *table_hash, GString *alter_table_statement){
178 if ( database == NULL || table == NULL){
179 g_critical("It was not possible to process file: %s",filename);
180 exit(EXIT_FAILURE);
181 }
182 char *real_database=g_hash_table_lookup(db_hash,database);
183 if (real_database == NULL){
184 g_critical("It was not possible to process file: %s",filename);
185 exit(EXIT_FAILURE);
186 }
187 gchar *lkey=g_strdup_printf("%s_%s",database, table);
188 struct db_table * dbt=g_hash_table_lookup(table_hash,lkey);
189 g_free(lkey);
190 if (dbt == NULL){
191 dbt=g_new(struct db_table,1);
192 dbt->filename=filename;
193 dbt->database=database;
194 // This should be the only place where we should use `db ? db : `
195 dbt->real_database = db ? db : real_database;
196 dbt->table=g_strdup(table);
197 dbt->real_table=dbt->table;
198 dbt->rows=number_rows;
199 dbt->restore_job_list = NULL;
200 dbt->queue=g_async_queue_new();
201 dbt->current_threads=0;
202 dbt->max_threads=max_threads_per_table;
203 dbt->mutex=g_mutex_new();
204 dbt->indexes=alter_table_statement;
205 dbt->start_time=NULL;
206 dbt->start_index_time=NULL;
207 dbt->finish_time=NULL;
208 g_hash_table_insert(table_hash, g_strdup_printf("%s_%s",dbt->database,dbt->table),dbt);
209 }else{
210 if (number_rows>0) dbt->rows=number_rows;
211 if (alter_table_statement != NULL) dbt->indexes=alter_table_statement;
212 // if (real_table != NULL) dbt->real_table=g_strdup(real_table);
213 }
214 return dbt;
215 }
216
new_restore_job(char * filename,char * database,struct db_table * dbt,GString * statement,guint part,enum restore_job_type type,const char * object)217 struct restore_job * new_restore_job( char * filename, char * database, struct db_table * dbt, GString * statement, guint part, enum restore_job_type type, const char *object){
218 struct restore_job *rj = g_new(struct restore_job, 1);
219 rj->type=type;
220 rj->filename= filename;
221 rj->database= database;
222 rj->statement = statement;
223 rj->dbt = dbt;
224 rj->part = part;
225 rj->object = object;
226 return rj;
227 }
228
free_restore_job(struct restore_job * rj)229 void free_restore_job(struct restore_job * rj){
230 // We consider that
231 if (rj->filename != NULL ) g_free(rj->filename);
232 if (rj->statement != NULL ) g_string_free(rj->statement,TRUE);
233 if (rj != NULL ) g_free(rj);
234 }
235
sync_threads_on_queue(GAsyncQueue * ready_queue,GAsyncQueue * comm_queue,const gchar * msg)236 void sync_threads_on_queue(GAsyncQueue *ready_queue,GAsyncQueue *comm_queue,const gchar *msg){
237 guint n;
238 GAsyncQueue * queue = g_async_queue_new();
239 for (n = 0; n < num_threads; n++){
240 g_async_queue_push(comm_queue, new_job(JOB_WAIT, queue, NULL));
241 }
242 for (n = 0; n < num_threads; n++)
243 g_async_queue_pop(ready_queue);
244 g_debug("%s",msg);
245 for (n = 0; n < num_threads; n++)
246 g_async_queue_push(queue, GINT_TO_POINTER(1));
247 }
248
sync_threads(struct configuration * conf)249 void sync_threads(struct configuration * conf){
250 sync_threads_on_queue(conf->ready, conf->data_queue,"Syncing");
251 }
252
print_time(GTimeSpan timespan)253 gchar * print_time(GTimeSpan timespan){
254 GTimeSpan days=timespan/G_TIME_SPAN_DAY;
255 GTimeSpan hours=(timespan-(days*G_TIME_SPAN_DAY))/G_TIME_SPAN_HOUR;
256 GTimeSpan minutes=(timespan-(days*G_TIME_SPAN_HOUR)-(hours*G_TIME_SPAN_HOUR))/G_TIME_SPAN_MINUTE;
257 GTimeSpan seconds=(timespan-(days*G_TIME_SPAN_MINUTE)-(hours*G_TIME_SPAN_HOUR)-(minutes*G_TIME_SPAN_MINUTE))/G_TIME_SPAN_SECOND;
258 return g_strdup_printf("%ld %02ld:%02ld:%02ld",days,hours,minutes,seconds);
259 }
260
restore_from_directory(struct configuration * conf)261 void restore_from_directory(struct configuration *conf){
262 // Leaving just on thread to execute the add constraints as it might cause deadlocks
263 guint n=0;
264 for (n = 0; n < num_threads-1; n++) {
265 g_async_queue_push(conf->post_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
266 }
267 load_directory_information(conf);
268 sync_threads_on_queue(conf->ready,conf->database_queue,"Step 1 completed, Databases created");
269 for (n = 0; n < num_threads; n++) {
270 g_async_queue_push(conf->database_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
271 }
272 sync_threads_on_queue(conf->ready,conf->table_queue,"Step 2 completed, Tables created");
273 for (n = 0; n < num_threads; n++) {
274 g_async_queue_push(conf->table_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
275 }
276 // We need to sync all the threads before continue
277 sync_threads_on_queue(conf->ready,conf->data_queue,"Step 3 completed, load data finished");
278 for (n = 0; n < num_threads; n++) {
279 g_async_queue_push(conf->data_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
280 }
281 g_async_queue_push(conf->post_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
282 g_debug("Step 4 completed");
283
284 GList * t=conf->table_list;
285 g_message("Import timings:");
286 g_message("Data \t| Index \t| Total \t| Table");
287 while (t != NULL){
288 struct db_table * dbt=t->data;
289 GTimeSpan diff1=g_date_time_difference(dbt->start_index_time,dbt->start_time);
290 GTimeSpan diff2=g_date_time_difference(dbt->finish_time,dbt->start_index_time);
291 g_message("%s\t| %s\t| %s\t| `%s`.`%s`",print_time(diff1),print_time(diff2),print_time(diff1+diff2),dbt->real_database,dbt->real_table);
292 t=t->next;
293 }
294 innodb_optimize_keys=FALSE;
295
296 for (n = 0; n < num_threads; n++) {
297 g_async_queue_push(conf->post_table_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
298 }
299 // Step 5: Create remaining objects.
300 // TODO: is it possible to do it in parallel? Actually, why aren't we queuing this files?
301 g_debug("Step 5 started");
302
303 }
304
main(int argc,char * argv[])305 int main(int argc, char *argv[]) {
306 struct configuration conf = {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0};
307
308 GError *error = NULL;
309 GOptionContext *context;
310
311 setlocale(LC_ALL, "");
312 g_thread_init(NULL);
313
314 init_mutex = g_mutex_new();
315 progress_mutex = g_mutex_new();
316
317 if (db == NULL && source_db != NULL) {
318 db = g_strdup(source_db);
319 }
320
321 context = g_option_context_new("multi-threaded MySQL loader");
322 GOptionGroup *main_group =
323 g_option_group_new("main", "Main Options", "Main Options", NULL, NULL);
324 g_option_group_add_entries(main_group, entries);
325 g_option_group_add_entries(main_group, common_entries);
326 g_option_context_set_main_group(context, main_group);
327 gchar ** tmpargv=g_strdupv(argv);
328 int tmpargc=argc;
329 if (!g_option_context_parse(context, &tmpargc, &tmpargv, &error)) {
330 g_print("option parsing failed: %s, try --help\n", error->message);
331 exit(EXIT_FAILURE);
332 }
333
334 if (defaults_file != NULL){
335 load_config_file(defaults_file, context, "myloader");
336 }
337 g_option_context_free(context);
338
339 if (password != NULL){
340 int i=1;
341 for(i=1; i < argc; i++){
342 gchar * p= g_strstr_len(argv[i],-1,password);
343 if (p != NULL){
344 strncpy(p, "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", strlen(password));
345 }
346 }
347 }
348 // prompt for password if it's NULL
349 if (sizeof(password) == 0 || (password == NULL && askPassword)) {
350 password = passwordPrompt();
351 }
352
353 if (program_version) {
354 g_print("myloader %s, built against MySQL %s\n", VERSION,
355 MYSQL_VERSION_STR);
356 exit(EXIT_SUCCESS);
357 }
358
359 set_verbose(verbose);
360
361 #ifdef ZWRAP_USE_ZSTD
362 compress_extension = g_strdup(".zst");
363 #else
364 compress_extension = g_strdup(".gz");
365 #endif
366 if (set_names_str){
367 gchar *tmp_str=g_strdup_printf("/*!40101 SET NAMES %s*/",set_names_str);
368 set_names_str=tmp_str;
369 } else
370 set_names_str=g_strdup("/*!40101 SET NAMES binary*/");
371
372 if (purge_mode_str){
373 if (!strcmp(purge_mode_str,"TRUNCATE")){
374 purge_mode=TRUNCATE;
375 } else if (!strcmp(purge_mode_str,"DROP")){
376 purge_mode=DROP;
377 } else if (!strcmp(purge_mode_str,"DELETE")){
378 purge_mode=DELETE;
379 } else if (!strcmp(purge_mode_str,"NONE")){
380 purge_mode=NONE;
381 } else {
382 g_error("Purge mode unknown");
383 }
384 } else if (overwrite_tables)
385 purge_mode=DROP; // Default mode is DROP when overwrite_tables is especified
386 else purge_mode=NONE;
387
388 if (!input_directory) {
389 if (stream){
390 GDateTime * datetime = g_date_time_new_now_local();
391 char *datetimestr;
392 datetimestr=g_date_time_format(datetime,"\%Y\%m\%d-\%H\%M\%S");
393 directory = g_strdup_printf("%s-%s", DIRECTORY, datetimestr);
394 create_backup_dir(directory);
395 g_free(datetimestr);
396 }else{
397 g_critical("a directory needs to be specified, see --help\n");
398 exit(EXIT_FAILURE);
399 }
400 } else {
401 if (!g_file_test(input_directory,G_FILE_TEST_IS_DIR)){
402 g_critical("the specified directory doesn't exists\n");
403 exit(EXIT_FAILURE);
404 }
405 directory=input_directory;
406 if (!stream){
407 char *p = g_strdup_printf("%s/metadata", directory);
408 if (!g_file_test(p, G_FILE_TEST_EXISTS)) {
409 g_critical("the specified directory is not a mydumper backup\n");
410 exit(EXIT_FAILURE);
411 }
412 }
413 }
414 MYSQL *conn;
415 conn = mysql_init(NULL);
416
417 configure_connection(conn, "myloader");
418 if (!mysql_real_connect(conn, hostname, username, password, NULL, port,
419 socket_path, 0)) {
420 g_critical("Error connection to database: %s", mysql_error(conn));
421 exit(EXIT_FAILURE);
422 }
423
424 detected_server = detect_server(conn);
425 initialize_session_variables("myloader",set_session, detected_server, defaults_file);
426
427 // TODO: we need to set the variables in the initilize session varibles, not from:
428 if (mysql_query(conn, "SET SESSION wait_timeout = 2147483")) {
429 g_warning("Failed to increase wait_timeout: %s", mysql_error(conn));
430 }
431
432 if (!enable_binlog)
433 mysql_query(conn, "SET SQL_LOG_BIN=0");
434
435 if (disable_redo_log){
436 g_message("Disabling redologs");
437 mysql_query(conn, "ALTER INSTANCE DISABLE INNODB REDO_LOG");
438 }
439 mysql_query(conn, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/");
440 // To here.
441 conf.database_queue = g_async_queue_new();
442 conf.table_queue = g_async_queue_new();
443 conf.data_queue = g_async_queue_new();
444 conf.post_table_queue = g_async_queue_new();
445 conf.post_queue = g_async_queue_new();
446 conf.ready = g_async_queue_new();
447 conf.stream_queue = g_async_queue_new();
448 db_hash=g_hash_table_new ( g_str_hash, g_str_equal );
449 tbl_hash=g_hash_table_new ( g_str_hash, g_str_equal );
450 guint n;
451 GThread **threads = g_new(GThread *, num_threads);
452 struct thread_data *td = g_new(struct thread_data, num_threads);
453 for (n = 0; n < num_threads; n++) {
454 td[n].conf = &conf;
455 td[n].thread_id = n + 1;
456 threads[n] =
457 g_thread_create((GThreadFunc)process_queue, &td[n], TRUE, NULL);
458 // Here, the ready queue is being used to serialize the connection to the database.
459 // We don't want all the threads try to connect at the same time
460 g_async_queue_pop(conf.ready);
461 }
462
463 struct thread_data t;
464 t.thread_id = 0;
465 t.conf = &conf;
466 t.thrconn = conn;
467 t.current_database=NULL;
468
469 // Step 1: Create databases | single threaded
470 if (db)
471 create_database(&t, db);
472 if (stream){
473 GThread *stream_thread = g_thread_create((GThreadFunc)process_stream, &conf, TRUE, NULL);
474 g_thread_join(stream_thread);
475 }else{
476 restore_from_directory(&conf);
477 }
478
479 for (n = 0; n < num_threads; n++) {
480 g_thread_join(threads[n]);
481 }
482 g_async_queue_unref(conf.ready);
483 if (disable_redo_log)
484 mysql_query(conn, "ALTER INSTANCE ENABLE INNODB REDO_LOG");
485
486 g_async_queue_unref(conf.data_queue);
487
488 checksum_databases(&t);
489
490 if (stream && no_delete == FALSE && input_directory == NULL){
491 // remove metadata files
492 GList *e=conf.metadata_list;
493 gchar *path = NULL;
494 while (e) {
495 path = g_build_filename(directory, e->data, NULL);
496 remove(path);
497 g_free(path);
498 e=e->next;
499 }
500 path = g_build_filename(directory, "metadata", NULL);
501 remove(path);
502 g_free(path);
503 if (g_rmdir(directory) != 0)
504 g_critical("Restore directory not removed: %s", directory);
505 }
506 mysql_close(conn);
507 mysql_thread_end();
508 mysql_library_end();
509 g_free(directory);
510 g_free(td);
511 g_free(threads);
512
513 if (logoutfile) {
514 fclose(logoutfile);
515 }
516
517 return errors ? EXIT_FAILURE : EXIT_SUCCESS;
518 }
519
process_create_table_statement(gchar * statement,GString * create_table_statement,GString * alter_table_statement,GString * alter_table_constraint_statement,struct db_table * dbt)520 int process_create_table_statement (gchar * statement, GString *create_table_statement, GString *alter_table_statement, GString *alter_table_constraint_statement, struct db_table *dbt){
521 int flag=0;
522 gchar** split_file= g_strsplit(statement, "\n", -1);
523 gchar *autoinc_column=NULL;
524 append_alter_table(alter_table_statement, dbt->real_database,dbt->real_table);
525 append_alter_table(alter_table_constraint_statement, dbt->real_database,dbt->real_table);
526 int fulltext_counter=0;
527 int i=0;
528 for (i=0; i < (int)g_strv_length(split_file);i++){
529 if ( g_strstr_len(split_file[i],5," KEY")
530 || g_strstr_len(split_file[i],8," UNIQUE")
531 || g_strstr_len(split_file[i],9," SPATIAL")
532 || g_strstr_len(split_file[i],10," FULLTEXT")
533 || g_strstr_len(split_file[i],7," INDEX")
534 ){
535 // Ignore if the first column of the index is the AUTO_INCREMENT column
536 if ((autoinc_column != NULL) && (g_strrstr(split_file[i],autoinc_column))){
537 g_string_append(create_table_statement, split_file[i]);
538 g_string_append_c(create_table_statement,'\n');
539 }else{
540 flag+=IS_ALTER_TABLE_PRESENT;
541 if (g_strrstr(split_file[i]," FULLTEXT")) fulltext_counter++;
542 if (fulltext_counter>1){
543 fulltext_counter=1;
544 finish_alter_table(alter_table_statement);
545 append_alter_table(alter_table_statement,dbt->real_database,dbt->real_table);
546 }
547 g_string_append(alter_table_statement,"\n ADD");
548 g_string_append(alter_table_statement, split_file[i]);
549 }
550 }else{
551 if (g_strstr_len(split_file[i],12," CONSTRAINT")){
552 flag+=INCLUDE_CONSTRAINT;
553 g_string_append(alter_table_constraint_statement,"\n ADD");
554 g_string_append(alter_table_constraint_statement, split_file[i]);
555 }else{
556 if (g_strrstr(split_file[i],"AUTO_INCREMENT")){
557 gchar** autoinc_split=g_strsplit(split_file[i],"`",3);
558 autoinc_column=g_strdup_printf("(`%s`", autoinc_split[1]);
559 }
560 g_string_append(create_table_statement, split_file[i]);
561 g_string_append_c(create_table_statement,'\n');
562 }
563 }
564 if (g_strrstr(split_file[i],"ENGINE=InnoDB")) flag+=IS_INNODB_TABLE;
565 }
566 return flag;
567 }
568
569
570
load_schema(struct configuration * conf,struct db_table * dbt,const gchar * filename)571 void load_schema(struct configuration *conf, struct db_table *dbt, const gchar *filename){
572 // g_message("Loading schema for file: %s", filename);
573 void *infile;
574 gboolean is_compressed = FALSE;
575 gboolean eof = FALSE;
576 GString *data=g_string_sized_new(512);
577 GString *create_table_statement=g_string_sized_new(512);
578 GString *alter_table_statement=g_string_sized_new(512);
579 GString *alter_table_constraint_statement=g_string_sized_new(512);
580 if (!g_str_has_suffix(filename, compress_extension)) {
581 infile = g_fopen(filename, "r");
582 is_compressed = FALSE;
583 } else {
584 infile = (void *)gzopen(filename, "r");
585 is_compressed = TRUE;
586 }
587 if (!infile) {
588 g_critical("cannot open file %s (%d)", filename, errno);
589 errors++;
590 return;
591 }
592 while (eof == FALSE) {
593 if (read_data(infile, is_compressed, data, &eof)) {
594 if (g_strrstr(&data->str[data->len >= 5 ? data->len - 5 : 0], ";\n")) {
595 if (g_strrstr(data->str,"CREATE ")){
596 gchar** create_table= g_strsplit(data->str, "`", 3);
597 dbt->real_table=g_strdup(create_table[1]);
598 if ( g_str_has_prefix(dbt->table,"mydumper_")){
599 g_hash_table_insert(tbl_hash, dbt->table, dbt->real_table);
600 }else{
601 g_hash_table_insert(tbl_hash, dbt->real_table, dbt->real_table);
602 }
603 g_strfreev(create_table);
604 }
605 if (innodb_optimize_keys){
606 // Check if it is a /*!40 SET
607 if (g_strrstr(data->str,"/*!40")){
608 g_string_append(alter_table_statement,data->str);
609 g_string_append(create_table_statement,data->str);
610 }else{
611 // Processing CREATE TABLE statement
612 GString *new_create_table_statement=g_string_sized_new(512);
613 int flag = process_create_table_statement(data->str, new_create_table_statement, alter_table_statement, alter_table_constraint_statement, dbt);
614 if (flag & IS_INNODB_TABLE){
615 if (flag & IS_ALTER_TABLE_PRESENT){
616 finish_alter_table(alter_table_statement);
617 g_message("Fast index creation will be use for table: %s.%s",dbt->real_database,dbt->real_table);
618 }else{
619 g_string_free(alter_table_statement,TRUE);
620 alter_table_statement=NULL;
621 }
622 g_string_append(create_table_statement,g_strjoinv("\n)",g_strsplit(new_create_table_statement->str,",\n)",-1)));
623 dbt->indexes=alter_table_statement;
624 if (stream){
625 struct restore_job *rj = new_restore_job(dbt->filename, dbt->real_database, dbt, dbt->indexes, 0, JOB_RESTORE_STRING, "indexes");
626 g_async_queue_push(conf->post_table_queue, new_job(JOB_RESTORE,rj,dbt->real_database));
627 }
628 if (flag & INCLUDE_CONSTRAINT){
629 struct restore_job *rj = new_restore_job(g_strdup(filename), dbt->real_database, dbt, alter_table_constraint_statement, 0, JOB_RESTORE_STRING, "constraint");
630 g_async_queue_push(conf->post_table_queue, new_job(JOB_RESTORE,rj,dbt->real_database));
631 dbt->constraints=alter_table_constraint_statement;
632 }else{
633 g_string_free(alter_table_constraint_statement,TRUE);
634 }
635 g_string_set_size(data, 0);
636 }else{
637 g_string_free(alter_table_statement,TRUE);
638 g_string_free(alter_table_constraint_statement,TRUE);
639 g_string_append(create_table_statement,data->str);
640 }
641 }
642 }else{
643 g_string_append(create_table_statement,data->str);
644 }
645 g_string_set_size(data, 0);
646 }
647 }
648 }
649 struct restore_job * rj = new_restore_job(g_strdup(filename), dbt->real_database, dbt, create_table_statement, 0,JOB_RESTORE_SCHEMA_STRING, "");
650 g_async_queue_push(conf->table_queue, new_job(JOB_RESTORE,rj,dbt->real_database));
651 if (!is_compressed) {
652 fclose(infile);
653 } else {
654 gzclose((gzFile)infile);
655 }
656 if (stream && no_delete == FALSE){
657 g_message("Removing file: %s", filename);
658 remove(filename);
659 }
660 }
661
get_database_name_from_filename(const gchar * filename)662 gchar * get_database_name_from_filename(const gchar *filename){
663 gchar **split_file = g_strsplit(filename, "-schema-create.sql", 2);
664 gchar *db_name=g_strdup(split_file[0]);
665 g_strfreev(split_file);
666 return db_name;
667 }
668
get_database_table_part_name_from_filename(const gchar * filename,const gchar * suffix,gchar ** database,gchar ** table,guint * part)669 void get_database_table_part_name_from_filename(const gchar *filename, const gchar * suffix, gchar **database, gchar **table, guint *part){
670 gchar **split_file = g_strsplit(filename, suffix, 3);
671 gchar **split_db_tbl = g_strsplit(split_file[0], ".", -1);
672 g_strfreev(split_file);
673 if (g_strv_length(split_db_tbl)>=3){
674 *database=g_strdup(split_db_tbl[0]);
675 *table=g_strdup(split_db_tbl[1]);
676 *part=g_ascii_strtoull(split_db_tbl[2], NULL, 10);
677 }else{
678 *database=NULL;
679 *table=NULL;
680 *part=0;
681 }
682 g_strfreev(split_db_tbl);
683 }
684
get_database_table_name_from_filename(const gchar * filename,const gchar * suffix,gchar ** database,gchar ** table)685 void get_database_table_name_from_filename(const gchar *filename, const gchar * suffix, gchar **database, gchar **table){
686 gchar **split_file = g_strsplit(filename, suffix, 2);
687 gchar **split_db_tbl = g_strsplit(split_file[0], ".", -1);
688 g_strfreev(split_file);
689 if (g_strv_length(split_db_tbl)==2){
690 *database=g_strdup(split_db_tbl[0]);
691 *table=g_strdup(split_db_tbl[1]);
692 }else{
693 *database=NULL;
694 *table=NULL;
695 }
696 g_strfreev(split_db_tbl);
697 }
698
process_database_filename(struct configuration * conf,char * filename,const char * object)699 void process_database_filename(struct configuration *conf, char * filename, const char *object) {
700 gchar *db_kname,*db_vname;
701 db_vname=db_kname=get_database_name_from_filename(filename);
702
703 if (db_kname!=NULL && g_str_has_prefix(db_kname,"mydumper_")){
704 db_vname=get_database_name_from_content(g_build_filename(directory,filename,NULL));
705 }
706 g_debug("Adding database: %s -> %s", db_kname, db ? db : db_vname);
707 g_hash_table_insert(db_hash, db_kname, db ? db : db_vname);
708 if (!db){
709 struct restore_job *rj = new_restore_job(g_strdup(filename), db_vname, NULL, NULL, 0 , JOB_RESTORE_SCHEMA_FILENAME, object);
710 g_async_queue_push(conf->database_queue, new_job(JOB_RESTORE,rj,NULL));
711 }
712 }
713
process_table_filename(struct configuration * conf,GHashTable * table_hash,char * filename)714 void process_table_filename(struct configuration *conf, GHashTable *table_hash, char * filename){
715 gchar *db_name, *table_name;
716 struct db_table *dbt=NULL;
717 get_database_table_name_from_filename(filename,"-schema.sql",&db_name,&table_name);
718 if (db_name == NULL || table_name == NULL){
719 g_critical("It was not possible to process file: %s (1)",filename);
720 exit(EXIT_FAILURE);
721 }
722 char *real_db_name=g_hash_table_lookup(db_hash,db_name);
723 if (real_db_name==NULL){
724 g_critical("It was not possible to process file: %s (2) because real_db_name isn't found",filename);
725 exit(EXIT_FAILURE);
726 }
727 dbt=append_new_db_table(filename, db_name, table_name,0,table_hash,NULL);
728 load_schema(conf, dbt,g_build_filename(directory,filename,NULL));
729 }
730
process_data_filename(struct configuration * conf,GHashTable * table_hash,char * filename)731 void process_data_filename(struct configuration *conf, GHashTable *table_hash, char * filename){
732 gchar *db_name, *table_name;
733 total_data_sql_files++;
734 // TODO: check if it is a data file
735 // TODO: we need to count sections of the data file to determine if it is ok.
736 guint part;
737 get_database_table_part_name_from_filename(filename,".sql",&db_name,&table_name,&part);
738 if (db_name == NULL || table_name == NULL){
739 g_critical("It was not possible to process file: %s (3)",filename);
740 exit(EXIT_FAILURE);
741 }
742 char *real_db_name=g_hash_table_lookup(db_hash,db_name);
743 struct db_table *dbt=append_new_db_table(real_db_name,db_name, table_name,0,table_hash,NULL);
744 //struct db_table *dbt=append_new_db_table(filename,db_name, table_name,0,table_hash,NULL);
745 struct restore_job *rj = new_restore_job(g_strdup(filename), dbt->real_database, dbt, NULL, part , JOB_RESTORE_FILENAME, "");
746 // in stream mode, there is no need to sort. We can enqueue directly, where? queue maybe?.
747 if (stream){
748 g_async_queue_push(conf->data_queue, new_job(JOB_RESTORE ,rj,dbt->real_database));
749 }else{
750 dbt->restore_job_list=g_list_insert_sorted(dbt->restore_job_list,rj,&compare_filename_part);
751 }
752 }
753
process_schema_filename(struct configuration * conf,const gchar * filename,const char * object)754 void process_schema_filename(struct configuration *conf, const gchar *filename, const char * object) {
755 gchar *database=NULL, *table=NULL, *real_db_name=NULL;
756 get_database_table_from_file(filename,"-schema",&database,&table);
757 if (database == NULL){
758 g_critical("Database is null on: %s",filename);
759 }
760 real_db_name=g_hash_table_lookup(db_hash,database);
761 struct restore_job *rj = new_restore_job(g_strdup(filename), real_db_name , NULL , NULL, 0 , JOB_RESTORE_SCHEMA_FILENAME, object);
762 g_async_queue_push(conf->post_queue, new_job(JOB_RESTORE,rj,real_db_name));
763 }
764
get_file_type(const char * filename)765 enum file_type get_file_type (const char * filename){
766 if (g_strrstr(filename, "-schema.sql")) {
767 return SCHEMA_TABLE;
768 } else if (g_str_has_suffix(filename, "-metadata")) {
769 return METADATA_TABLE;
770 } else if ( strcmp(filename, "metadata") == 0 ){
771 return METADATA_GLOBAL;
772 } else if (g_str_has_suffix(filename, "-checksum") || g_str_has_suffix(filename, "-checksum.gz")) {
773 return CHECKSUM;
774 } else if ( g_strrstr(filename, "-schema-view.sql") ){
775 return SCHEMA_VIEW;
776 } else if ( g_strrstr(filename, "-schema-triggers.sql") ){
777 return SCHEMA_TRIGGER;
778 } else if ( g_strrstr(filename, "-schema-post.sql") ){
779 return SCHEMA_POST;
780 } else if ( g_strrstr(filename, "-schema-create.sql") ){
781 return SCHEMA_CREATE;
782 }else if (g_str_has_suffix(filename, ".dat"))
783 return LOAD_DATA;
784 return DATA;
785 }
786
787
process_filename(struct configuration * conf,GHashTable * table_hash,char * filename)788 enum file_type process_filename(struct configuration *conf,GHashTable *table_hash, char *filename){
789 enum file_type ft= get_file_type(filename);
790 if (!source_db ||
791 g_str_has_prefix(filename, g_strdup_printf("%s.", source_db))) {
792 switch (ft){
793 case INIT:
794 case SCHEMA_CREATE:
795 if (db){
796 g_warning("Skipping database creation on file: %s",filename);
797 }else{
798 process_database_filename(conf, filename, "create database");
799 }
800 break;
801 case SCHEMA_TABLE:
802 process_table_filename(conf,table_hash,filename);
803 break;
804 case SCHEMA_VIEW:
805 process_schema_filename(conf, filename,"view");
806 break;
807 case SCHEMA_TRIGGER:
808 process_schema_filename(conf, filename,"trigger");
809 break;
810 case SCHEMA_POST:
811 // can be enqueued in any order
812 process_schema_filename(conf, filename,"post");
813 break;
814 case CHECKSUM:
815 conf->checksum_list=g_list_insert(conf->checksum_list,g_strdup(filename),-1);
816 break;
817 case METADATA_GLOBAL:
818 break;
819 case METADATA_TABLE:
820 // TODO: we need to process this info
821 conf->metadata_list=g_list_insert(conf->metadata_list,g_strdup(filename),-1);
822 break;
823 case DATA:
824 process_data_filename(conf,table_hash,filename);
825 break;
826 case LOAD_DATA:
827 g_message("Load data file found: %s", filename);
828 break;
829 }
830 }
831 return ft;
832 }
833
load_directory_information(struct configuration * conf)834 void load_directory_information(struct configuration *conf) {
835 GError *error = NULL;
836 GDir *dir = g_dir_open(directory, 0, &error);
837
838 if (error) {
839 g_critical("cannot open directory %s, %s\n", directory, error->message);
840 errors++;
841 return;
842 }
843
844 const gchar *filename = NULL;
845 GList *create_table_list=NULL,
846 *metadata_list= NULL,
847 *data_files_list=NULL,
848 *schema_create_list=NULL,
849 *view_list=NULL,
850 *trigger_list=NULL,
851 *post_list=NULL;
852
853 while ((filename = g_dir_read_name(dir))) {
854 enum file_type ft= get_file_type(filename);
855 if (ft == SCHEMA_POST){
856 post_list=g_list_insert(post_list,g_strdup(filename),-1);
857 } else if (ft == SCHEMA_CREATE ){
858 schema_create_list=g_list_insert(schema_create_list,g_strdup(filename),-1);
859 conf->schema_create_list=g_list_insert(conf->schema_create_list,g_strdup(filename),-1);
860 } else if (!source_db ||
861 g_str_has_prefix(filename, g_strdup_printf("%s.", source_db))||
862 g_str_has_prefix(filename, "mydumper_")) {
863 switch (ft){
864 case INIT:
865 break;
866 case SCHEMA_TABLE:
867 create_table_list=g_list_append(create_table_list,g_strdup(filename));
868 break;
869 case SCHEMA_VIEW:
870 view_list=g_list_append(view_list,g_strdup(filename));
871 break;
872 case SCHEMA_TRIGGER:
873 trigger_list=g_list_append(trigger_list,g_strdup(filename));
874 break;
875 case CHECKSUM:
876 conf->checksum_list=g_list_append(conf->checksum_list,g_strdup(filename));
877 break;
878 case METADATA_GLOBAL:
879 break;
880 case METADATA_TABLE:
881 // TODO: we need to process this info
882 metadata_list=g_list_append(metadata_list,g_strdup(filename));
883 break;
884 case DATA:
885 data_files_list=g_list_append(data_files_list,g_strdup(filename));
886 break;
887 case LOAD_DATA:
888 g_message("Load data file found: %s", filename);
889 break;
890 default:
891 g_warning("File ignored: %s", filename);
892 break;
893 }
894 }
895 }
896 g_dir_close(dir);
897
898 gchar *f = NULL;
899 // CREATE DATABASE
900 while (schema_create_list){
901 f = schema_create_list->data;
902 process_database_filename(conf, f, "create database");
903 schema_create_list=schema_create_list->next;
904 }
905
906 // CREATE TABLE
907 GHashTable *table_hash = g_hash_table_new ( g_str_hash, g_str_equal );
908 while (create_table_list != NULL){
909 f = create_table_list->data;
910 process_table_filename(conf,table_hash,f);
911 create_table_list=create_table_list->next;
912 }
913
914 // DATA FILES
915 while (data_files_list != NULL){
916 f = data_files_list->data;
917 process_data_filename(conf,table_hash,f);
918
919 data_files_list=data_files_list->next;
920 }
921
922
923 while (view_list != NULL){
924 f = view_list->data;
925 process_schema_filename(conf, f,"view");
926 view_list=view_list->next;
927 }
928
929 while (trigger_list != NULL){
930 f = trigger_list->data;
931 process_schema_filename(conf, f, "trigger");
932 trigger_list=trigger_list->next;
933 }
934
935 while (post_list != NULL){
936 f = post_list->data;
937 process_schema_filename(conf, f,"post");
938 post_list=post_list->next;
939 }
940 // SORT DATA FILES TO ENQUEUE
941 // iterates over the dbt to create the jobs in the dbt->queue
942 // and sorts the dbt for the conf->table_list
943 // in stream mode, it is not possible to sort the tables as
944 // we don't know the amount the rows, .metadata are sent at the end.
945 GList * table_list=NULL;
946 GHashTableIter iter;
947 gchar * lkey;
948 g_hash_table_iter_init ( &iter, table_hash );
949 struct db_table *dbt=NULL;
950 while ( g_hash_table_iter_next ( &iter, (gpointer *) &lkey, (gpointer *) &dbt ) ) {
951 table_list=g_list_insert_sorted_with_data (table_list,dbt,&compare_dbt,table_hash);
952 GList *i=dbt->restore_job_list;
953 while (i) {
954 g_async_queue_push(dbt->queue, new_job(JOB_RESTORE ,i->data,dbt->real_database));
955 i=i->next;
956 }
957 dbt->count=g_async_queue_length(dbt->queue);
958 }
959 g_hash_table_destroy(table_hash);
960 conf->table_list=table_list;
961 // conf->table needs to be set.
962 }
963
964
965 // this can be moved to the table structure and executed before index creation.
checksum_databases(struct thread_data * td)966 void checksum_databases(struct thread_data *td) {
967 g_message("Starting table checksum verification");
968
969 const gchar *filename = NULL;
970 GList *e = td->conf->checksum_list;
971 while (e){
972 filename=e->data;
973 checksum_table_filename(filename, td->thrconn);
974 e=e->next;
975 }
976 }
977
create_database(struct thread_data * td,gchar * database)978 void create_database(struct thread_data *td, gchar *database) {
979 gchar *query = NULL;
980
981 const gchar *filename =
982 g_strdup_printf("%s-schema-create.sql", database);
983 const gchar *filenamegz =
984 g_strdup_printf("%s-schema-create.sql%s", database, compress_extension);
985 const gchar *filepath = g_strdup_printf("%s/%s-schema-create.sql",
986 directory, database);
987 const gchar *filepathgz = g_strdup_printf("%s/%s-schema-create.sql%s",
988 directory, database, compress_extension);
989
990 if (g_file_test(filepath, G_FILE_TEST_EXISTS)) {
991 restore_data_from_file(td, database, NULL, filename, TRUE);
992 } else if (g_file_test(filepathgz, G_FILE_TEST_EXISTS)) {
993 restore_data_from_file(td, database, NULL, filenamegz, TRUE);
994 } else {
995 query = g_strdup_printf("CREATE DATABASE IF NOT EXISTS `%s`", database);
996 if (mysql_query(td->thrconn, query)){
997 g_warning("Fail to create database: %s", database);
998 }
999 }
1000
1001 g_free(query);
1002 return;
1003 }
1004
get_database_table_from_file(const gchar * filename,const char * sufix,gchar ** database,gchar ** table)1005 void get_database_table_from_file(const gchar *filename,const char *sufix,gchar **database,gchar **table){
1006 gchar **split_filename = g_strsplit(filename, sufix, 0);
1007 gchar **split = g_strsplit(split_filename[0],".",0);
1008 guint count=g_strv_length(split);
1009 if (count > 2){
1010 g_warning("We need to get the db and table name from the create table statement");
1011 return;
1012 }
1013 *table=g_strdup(split[1]);
1014 *database=g_strdup(split[0]);
1015 }
1016
1017
overwrite_table(MYSQL * conn,gchar * database,gchar * table)1018 int overwrite_table(MYSQL *conn,gchar * database, gchar * table){
1019 int truncate_or_delete_failed=0;
1020 gchar *query=NULL;
1021 if (purge_mode == DROP) {
1022 g_message("Dropping table or view (if exists) `%s`.`%s`",
1023 database, table);
1024 query = g_strdup_printf("DROP TABLE IF EXISTS `%s`.`%s`",
1025 database, table);
1026 mysql_query(conn, query);
1027 query = g_strdup_printf("DROP VIEW IF EXISTS `%s`.`%s`", database,
1028 table);
1029 mysql_query(conn, query);
1030 } else if (purge_mode == TRUNCATE) {
1031 g_message("Truncating table `%s`.`%s`", database, table);
1032 query= g_strdup_printf("TRUNCATE TABLE `%s`.`%s`", database, table);
1033 truncate_or_delete_failed= mysql_query(conn, query);
1034 if (truncate_or_delete_failed)
1035 g_warning("Truncate failed, we are going to try to create table or view");
1036 } else if (purge_mode == DELETE) {
1037 g_message("Deleting content of table `%s`.`%s`", database, table);
1038 query= g_strdup_printf("DELETE FROM `%s`.`%s`", database, table);
1039 truncate_or_delete_failed= mysql_query(conn, query);
1040 if (truncate_or_delete_failed)
1041 g_warning("Delete failed, we are going to try to create table or view");
1042 }
1043 g_free(query);
1044 return truncate_or_delete_failed;
1045 }
1046
compare_dbt(gconstpointer a,gconstpointer b,gpointer table_hash)1047 gint compare_dbt(gconstpointer a, gconstpointer b, gpointer table_hash){
1048 gchar *a_key=g_strdup_printf("%s_%s",((struct db_table *)a)->database,((struct db_table *)a)->table);
1049 gchar *b_key=g_strdup_printf("%s_%s",((struct db_table *)b)->database,((struct db_table *)b)->table);
1050 struct db_table * a_val=g_hash_table_lookup(table_hash,a_key);
1051 struct db_table * b_val=g_hash_table_lookup(table_hash,b_key);
1052 g_free(a_key);
1053 g_free(b_key);
1054 return a_val->rows < b_val->rows;
1055 }
compare_filename_part(gconstpointer a,gconstpointer b)1056 gint compare_filename_part (gconstpointer a, gconstpointer b){
1057 return ((struct restore_job *)a)->part > ((struct restore_job *)b)->part;
1058 }
1059
checksum_table_filename(const gchar * filename,MYSQL * conn)1060 void checksum_table_filename(const gchar *filename, MYSQL *conn) {
1061 gchar *database = NULL, *table = NULL;
1062 get_database_table_from_file(filename,"-checksum",&database,&table);
1063 gchar *real_database=g_hash_table_lookup(db_hash,database);
1064 gchar *real_table=g_hash_table_lookup(tbl_hash,table);
1065 void *infile;
1066 char checksum[256];
1067 int errn=0;
1068 char * row=checksum_table(conn, db ? db : real_database, real_table, &errn);
1069 gboolean is_compressed = FALSE;
1070 gchar *path = g_build_filename(directory, filename, NULL);
1071
1072 if (!g_str_has_suffix(path, compress_extension)) {
1073 infile = g_fopen(path, "r");
1074 is_compressed = FALSE;
1075 } else {
1076 infile = (void *)gzopen(path, "r");
1077 is_compressed=TRUE;
1078 }
1079
1080 if (!infile) {
1081 g_critical("cannot open file %s (%d)", filename, errno);
1082 errors++;
1083 return;
1084 }
1085
1086 char * cs= !is_compressed ? fgets(checksum, 256, infile) :gzgets((gzFile)infile, checksum, 256);
1087 if (cs != NULL) {
1088 if(strcmp(checksum, row) != 0) {
1089 g_warning("Checksum mismatch found for `%s`.`%s`. Got '%s', expecting '%s'", db ? db : real_database, real_table, row, checksum);
1090 errors++;
1091 }
1092 else {
1093 g_message("Checksum confirmed for `%s`.`%s`", db ? db : real_database, real_table);
1094 }
1095 } else {
1096 g_critical("error reading file %s (%d)", filename, errno);
1097 errors++;
1098 return;
1099 }
1100 if (!is_compressed) {
1101 fclose(infile);
1102 } else {
1103 gzclose((gzFile)infile);
1104 }
1105 }
1106
process_job(struct thread_data * td,struct job * job,int count)1107 gboolean process_job(struct thread_data *td, struct job *job, int count){
1108 switch (job->type) {
1109 case JOB_RESTORE:
1110 process_restore_job(td,job->job_data,count);
1111 break;
1112 case JOB_WAIT:
1113 g_async_queue_push(td->conf->ready, GINT_TO_POINTER(1));
1114 GAsyncQueue *queue=job->job_data;
1115 g_async_queue_pop(queue);
1116 break;
1117 case JOB_SHUTDOWN:
1118 // g_message("Thread %d shutting down", thread_id);
1119 g_free(job);
1120 return FALSE;
1121 break;
1122 default:
1123 g_critical("Something very bad happened!");
1124 exit(EXIT_FAILURE);
1125 }
1126 return TRUE;
1127 }
1128
1129
process_restore_job(struct thread_data * td,struct restore_job * rj,int count)1130 void process_restore_job(struct thread_data *td, struct restore_job *rj, int count){
1131 struct db_table *dbt=rj->dbt;
1132 dbt=rj->dbt;
1133
1134 switch (rj->type) {
1135 case JOB_RESTORE_STRING:
1136 g_message("Thread %d restoring %s `%s`.`%s` from %s", td->thread_id, rj->object,
1137 dbt->real_database, dbt->real_table, rj->filename);
1138 guint query_counter=0;
1139 restore_data_in_gstring(td, rj->statement, FALSE, &query_counter);
1140 break;
1141 case JOB_RESTORE_SCHEMA_STRING:
1142 g_message("Thread %d restoring table `%s`.`%s` from %s", td->thread_id,
1143 dbt->real_database, dbt->real_table, rj->filename);
1144 int truncate_or_delete_failed=0;
1145 if (overwrite_tables)
1146 truncate_or_delete_failed=overwrite_table(td->thrconn,dbt->real_database, dbt->real_table);
1147 if ((purge_mode == TRUNCATE || purge_mode == DELETE) && !truncate_or_delete_failed){
1148 g_message("Skipping table creation `%s`.`%s` from %s", dbt->real_database, dbt->real_table, rj->filename);
1149 }else{
1150 g_message("Creating table `%s`.`%s` from %s", dbt->real_database, dbt->real_table, rj->filename);
1151 if (restore_data_in_gstring(td, rj->statement, FALSE, &query_counter)){
1152 g_critical("Thread %d issue restoring %s: %s",td->thread_id,rj->filename, mysql_error(td->thrconn));
1153 }
1154 }
1155 break;
1156 case JOB_RESTORE_FILENAME:
1157 g_mutex_lock(progress_mutex);
1158 progress++;
1159 g_message("Thread %d restoring `%s`.`%s` part %d of %d from %s. Progress %llu of %llu .", td->thread_id,
1160 dbt->real_database, dbt->real_table, rj->part, count, rj->filename, progress,total_data_sql_files);
1161 g_mutex_unlock(progress_mutex);
1162 if (restore_data_from_file(td, dbt->real_database, dbt->real_table, rj->filename, FALSE) > 0){
1163 g_critical("Thread %d issue restoring %s: %s",td->thread_id,rj->filename, mysql_error(td->thrconn));
1164 }
1165 break;
1166 case JOB_RESTORE_SCHEMA_FILENAME:
1167 g_message("Thread %d restoring %s on `%s` from %s", td->thread_id, rj->object,
1168 rj->database, rj->filename);
1169 restore_data_from_file(td, rj->database, NULL, rj->filename, TRUE );
1170 break;
1171 default:
1172 g_critical("Something very bad happened!");
1173 exit(EXIT_FAILURE);
1174 }
1175 free_restore_job(rj);
1176 }
1177
process_stream_queue(struct thread_data * td)1178 void *process_stream_queue(struct thread_data * td) {
1179 struct job *job = NULL;
1180 int count =0;
1181 gboolean cont=TRUE;
1182
1183 enum file_type ft;
1184 while (cont){
1185
1186 ft = (enum file_type )g_async_queue_pop(td->conf->stream_queue);
1187 // g_message("Thread %d: Processing file type: %d",td->thread_id,ft);
1188 GAsyncQueue *q= get_queue_for_type(td->conf,ft);
1189 if (q != NULL){
1190 job = (struct job *)g_async_queue_pop(q);
1191
1192 execute_use_if_needs_to(td, job->use_database, "Restoring from stream");
1193 cont=process_job(td, job, count);
1194 }
1195 }
1196 return NULL;
1197 }
1198
1199
process_directory_queue(struct thread_data * td)1200 void *process_directory_queue(struct thread_data * td) {
1201 struct db_table *dbt=NULL;
1202 struct job *job = NULL;
1203 gboolean cont=TRUE;
1204 int count =0;
1205
1206 // Step 1: creating databases
1207 while (cont){
1208 job = (struct job *)g_async_queue_pop(td->conf->database_queue);
1209 cont=process_job(td, job, count);
1210 }
1211 // Step 2: Create tables
1212 cont=TRUE;
1213 while (cont){
1214 job = (struct job *)g_async_queue_pop(td->conf->table_queue);
1215 execute_use_if_needs_to(td, job->use_database, "Restoring tables");
1216 cont=process_job(td, job, count);
1217 }
1218
1219 // Is this correct in a streaming scenario ?
1220 GList *table_list=td->conf->table_list;
1221 if (table_list == NULL ) {
1222 dbt=NULL;
1223 }else{
1224 dbt=table_list->data;
1225 g_mutex_lock(dbt->mutex);
1226 dbt->current_threads++;
1227 if (dbt->start_time==NULL)
1228 dbt->start_time=g_date_time_new_now_local();
1229 g_mutex_unlock(dbt->mutex);
1230 }
1231
1232
1233 // Step 3: Load data
1234 cont=TRUE;
1235 while (cont){
1236 if (dbt != NULL){
1237 g_mutex_lock(dbt->mutex);
1238 if (dbt->current_threads > dbt->max_threads){
1239 dbt->current_threads--;
1240 g_mutex_unlock(dbt->mutex);
1241 table_list=table_list->next;
1242 if (table_list == NULL ){
1243 dbt=NULL;
1244 continue;
1245 }
1246 dbt=table_list->data;
1247 g_mutex_lock(dbt->mutex);
1248 count=dbt->count;
1249 if (dbt->start_time==NULL) dbt->start_time=g_date_time_new_now_local();
1250 dbt->current_threads++;
1251 g_mutex_unlock(dbt->mutex);
1252 continue;
1253 }
1254 g_mutex_unlock(dbt->mutex);
1255
1256 job = (struct job *)g_async_queue_try_pop(dbt->queue);
1257
1258 if (job == NULL){
1259 g_mutex_lock(dbt->mutex);
1260 dbt->current_threads--;
1261 if (dbt->current_threads == 0){
1262 dbt->current_threads--;
1263 dbt->start_index_time=g_date_time_new_now_local();
1264 g_mutex_unlock(dbt->mutex);
1265 if (dbt->indexes != NULL) {
1266 g_message("Thread %d restoring indexes `%s`.`%s`", td->thread_id,
1267 dbt->real_database, dbt->real_table);
1268 guint query_counter=0;
1269 restore_data_in_gstring(td, dbt->indexes, FALSE, &query_counter);
1270 }
1271 dbt->finish_time=g_date_time_new_now_local();
1272 }else{
1273 g_mutex_unlock(dbt->mutex);
1274 }
1275 guint max=dbt->max_threads;
1276 table_list=table_list->next;
1277 if (table_list == NULL ){
1278 dbt=NULL;
1279 continue;
1280 }
1281 dbt=table_list->data;
1282 g_mutex_lock(dbt->mutex);
1283 if (dbt->start_time==NULL) dbt->start_time=g_date_time_new_now_local();
1284 dbt->max_threads = max;
1285 dbt->current_threads++;
1286 g_mutex_unlock(dbt->mutex);
1287 continue;
1288 }
1289 }else{
1290 job = (struct job *)g_async_queue_pop(td->conf->data_queue);
1291 }
1292 execute_use_if_needs_to(td, job->use_database, "Restoring data");
1293 cont=process_job(td, job, count);
1294 }
1295 return NULL;
1296 }
1297
1298
process_queue(struct thread_data * td)1299 void *process_queue(struct thread_data *td) {
1300 struct configuration *conf = td->conf;
1301 g_mutex_lock(init_mutex);
1302 td->thrconn = mysql_init(NULL);
1303 g_mutex_unlock(init_mutex);
1304 td->current_database=NULL;
1305
1306 configure_connection(td->thrconn, "myloader");
1307
1308 if (!mysql_real_connect(td->thrconn, hostname, username, password, NULL, port,
1309 socket_path, 0)) {
1310 g_critical("Failed to connect to MySQL server: %s", mysql_error(td->thrconn));
1311 exit(EXIT_FAILURE);
1312 }
1313
1314 if (mysql_query(td->thrconn, "SET SESSION wait_timeout = 2147483")) {
1315 g_warning("Failed to increase wait_timeout: %s", mysql_error(td->thrconn));
1316 }
1317
1318 if (!enable_binlog)
1319 mysql_query(td->thrconn, "SET SQL_LOG_BIN=0");
1320
1321 mysql_query(td->thrconn, set_names_str);
1322 mysql_query(td->thrconn, "/*!40101 SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */");
1323 mysql_query(td->thrconn, "/*!40014 SET UNIQUE_CHECKS=0 */");
1324 mysql_query(td->thrconn, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/");
1325 if (commit_count > 1)
1326 mysql_query(td->thrconn, "SET autocommit=0");
1327
1328 execute_gstring(td->thrconn, set_session);
1329 g_async_queue_push(conf->ready, GINT_TO_POINTER(1));
1330
1331 if (db){
1332 td->current_database=db;
1333 execute_use(td, "Initializing thread");
1334 if (stream)
1335 g_async_queue_push(conf->database_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1336 }
1337 g_debug("Thread %d: Starting import", td->thread_id);
1338 if (stream){
1339 process_stream_queue(td);
1340 }else{
1341 process_directory_queue(td);
1342 }
1343 struct job *job = NULL;
1344 gboolean cont=TRUE;
1345 int count =0;
1346
1347 // g_message("Thread %d: Starting post import task over table", td->thread_id);
1348 cont=TRUE;
1349 while (cont){
1350 job = (struct job *)g_async_queue_pop(conf->post_table_queue);
1351 // g_message("%s",((struct restore_job *)job->job_data)->object);
1352 execute_use_if_needs_to(td, job->use_database, "Restoring post table");
1353 cont=process_job(td, job, count);
1354 }
1355 // g_message("Thread %d: Starting post import task: triggers, procedures and triggers", td->thread_id);
1356 cont=TRUE;
1357 while (cont){
1358 job = (struct job *)g_async_queue_pop(conf->post_queue);
1359 execute_use_if_needs_to(td, job->use_database, "Restoring post tasks");
1360 cont=process_job(td, job, count);
1361 }
1362
1363 if (td->thrconn)
1364 mysql_close(td->thrconn);
1365 mysql_thread_end();
1366 g_debug("Thread %d ending", td->thread_id);
1367 return NULL;
1368 }
1369
1370
restore_data_in_gstring_by_statement(struct thread_data * td,GString * data,gboolean is_schema,guint * query_counter)1371 int restore_data_in_gstring_by_statement(struct thread_data *td, GString *data, gboolean is_schema, guint *query_counter)
1372 {
1373 if (mysql_real_query(td->thrconn, data->str, data->len)) {
1374 //g_critical("Error restoring: %s %s", data->str, mysql_error(conn));
1375 errors++;
1376 return 1;
1377 }
1378 *query_counter=*query_counter+1;
1379 if (!is_schema && (commit_count > 1) &&(*query_counter == commit_count)) {
1380 *query_counter= 0;
1381 if (mysql_query(td->thrconn, "COMMIT")) {
1382 errors++;
1383 return 2;
1384 }
1385 mysql_query(td->thrconn, "START TRANSACTION");
1386 }
1387 g_string_set_size(data, 0);
1388 return 0;
1389 }
1390
restore_data_in_gstring(struct thread_data * td,GString * data,gboolean is_schema,guint * query_counter)1391 int restore_data_in_gstring(struct thread_data *td, GString *data, gboolean is_schema, guint *query_counter)
1392 {
1393 int i=0;
1394 int r=0;
1395 if (data != NULL && data->len > 4){
1396 gchar** line=g_strsplit(data->str, ";\n", -1);
1397 for (i=0; i < (int)g_strv_length(line);i++){
1398 if (strlen(line[i])>2){
1399 GString *str=g_string_new(line[i]);
1400 g_string_append_c(str,';');
1401 r+=restore_data_in_gstring_by_statement(td, str, is_schema, query_counter);
1402 }
1403 }
1404 }
1405 return r;
1406 }
1407
1408
append_alter_table(GString * alter_table_statement,char * database,char * table)1409 void append_alter_table(GString * alter_table_statement, char *database, char *table){
1410 g_string_append(alter_table_statement,"ALTER TABLE `");
1411 g_string_append(alter_table_statement, database);
1412 g_string_append(alter_table_statement,"`.`");
1413 g_string_append(alter_table_statement,table);
1414 g_string_append(alter_table_statement,"` ");
1415 }
1416
finish_alter_table(GString * alter_table_statement)1417 void finish_alter_table(GString * alter_table_statement){
1418 gchar * str=g_strrstr_len(alter_table_statement->str,alter_table_statement->len,",");
1419 if ((str - alter_table_statement->str) > (long int)(alter_table_statement->len - 5)){
1420 *str=';';
1421 g_string_append_c(alter_table_statement,'\n');
1422 }else
1423 g_string_append(alter_table_statement,";\n");
1424 }
1425
execute_use_if_needs_to(struct thread_data * td,gchar * database,const gchar * msg)1426 void execute_use_if_needs_to(struct thread_data *td, gchar *database, const gchar * msg){
1427 if ( database != NULL && db == NULL ){
1428 if (td->current_database==NULL || g_strcmp0(database, td->current_database) != 0){
1429 td->current_database=database;
1430 if (execute_use(td, msg)){
1431 exit(EXIT_FAILURE);
1432 }
1433 }
1434 }
1435 }
1436
execute_use(struct thread_data * td,const gchar * msg)1437 guint execute_use(struct thread_data *td, const gchar * msg){
1438 gchar *query = g_strdup_printf("USE `%s`", td->current_database);
1439 if (mysql_query(td->thrconn, query)) {
1440 g_critical("Error switching to database `%s` %s", td->current_database, msg);
1441 g_free(query);
1442 return 1;
1443 }
1444 g_free(query);
1445 return 0;
1446 }
1447
get_table_name_from_content(const gchar * filename)1448 gchar * get_table_name_from_content(const gchar *filename){
1449 void *infile;
1450 gboolean is_compressed = FALSE;
1451 gboolean eof = FALSE;
1452 GString *data=g_string_sized_new(512);
1453 if (!g_str_has_suffix(filename, compress_extension)) {
1454 infile = g_fopen(filename, "r");
1455 is_compressed = FALSE;
1456 } else {
1457 infile = (void *)gzopen(filename, "r");
1458 is_compressed = TRUE;
1459 }
1460 if (!infile) {
1461 g_critical("cannot open file %s (%d)", filename, errno);
1462 errors++;
1463 return NULL;
1464 }
1465 gchar *real_database=NULL;
1466 while (eof == FALSE) {
1467 if (read_data(infile, is_compressed, data, &eof)) {
1468 if (g_str_has_prefix(data->str,"INSERT ")){
1469 gchar** create= g_strsplit(data->str, "`", 3);
1470 real_database=g_strdup(create[1]);
1471 g_strfreev(create);
1472 break;
1473 }
1474 g_string_set_size(data, 0);
1475 }
1476 }
1477
1478 if (!is_compressed) {
1479 fclose(infile);
1480 } else {
1481 gzclose((gzFile)infile);
1482 }
1483 return real_database;
1484 }
1485
get_database_name_from_content(const gchar * filename)1486 gchar * get_database_name_from_content(const gchar *filename){
1487 void *infile;
1488 gboolean is_compressed = FALSE;
1489 gboolean eof = FALSE;
1490 GString *data=g_string_sized_new(512);
1491 if (!g_str_has_suffix(filename, compress_extension)) {
1492 infile = g_fopen(filename, "r");
1493 is_compressed = FALSE;
1494 } else {
1495 infile = (void *)gzopen(filename, "r");
1496 is_compressed = TRUE;
1497 }
1498 if (!infile) {
1499 g_critical("cannot open file %s (%d)", filename, errno);
1500 errors++;
1501 return NULL;
1502 }
1503 gchar *real_database=NULL;
1504 while (eof == FALSE) {
1505 if (read_data(infile, is_compressed, data, &eof)) {
1506 if (g_strrstr(&data->str[data->len >= 5 ? data->len - 5 : 0], ";\n")) {
1507 if (g_str_has_prefix(data->str,"CREATE ")){
1508 gchar** create= g_strsplit(data->str, "`", 3);
1509 real_database=g_strdup(create[1]);
1510 g_strfreev(create);
1511 break;
1512 }
1513 }
1514 }
1515 }
1516
1517 if (!is_compressed) {
1518 fclose(infile);
1519 } else {
1520 gzclose((gzFile)infile);
1521 }
1522 return real_database;
1523 }
1524
restore_data_from_file(struct thread_data * td,char * database,char * table,const char * filename,gboolean is_schema)1525 int restore_data_from_file(struct thread_data *td, char *database, char *table,
1526 const char *filename, gboolean is_schema){
1527 void *infile;
1528 int r=0;
1529 gboolean is_compressed = FALSE;
1530 gboolean eof = FALSE;
1531 guint query_counter = 0;
1532 GString *data = g_string_sized_new(512);
1533 guint line=0,preline=0;
1534 gchar *path = g_build_filename(directory, filename, NULL);
1535
1536 if (!g_str_has_suffix(path, compress_extension)) {
1537 infile = g_fopen(path, "r");
1538 is_compressed = FALSE;
1539 } else {
1540 infile = (void *)gzopen(path, "r");
1541 is_compressed = TRUE;
1542 }
1543
1544 if (!infile) {
1545 g_critical("cannot open file %s (%d)", filename, errno);
1546 errors++;
1547 return 1;
1548 }
1549 if (!is_schema && (commit_count > 1) )
1550 mysql_query(td->thrconn, "START TRANSACTION");
1551 while (eof == FALSE) {
1552 if (read_data(infile, is_compressed, data, &eof)) {
1553 if (g_strrstr(&data->str[data->len >= 5 ? data->len - 5 : 0], ";\n")) {
1554 preline=line;
1555 line+=strcount(data->str);
1556 if (rows > 0 && g_strrstr_len(data->str,6,"INSERT"))
1557 split_and_restore_data_in_gstring_by_statement(td,
1558 data, is_schema, &query_counter);
1559 else{
1560 guint tr=restore_data_in_gstring_by_statement(td, data, is_schema, &query_counter);
1561 r+=tr;
1562 if (tr > 0){
1563 g_critical("Error occours between lines: %d and %d on file %s",preline,line,filename);
1564 }
1565 }
1566 g_string_set_size(data, 0);
1567 }
1568 } else {
1569 g_critical("error reading file %s (%d)", filename, errno);
1570 errors++;
1571 return r;
1572 }
1573 }
1574 if (!is_schema && (commit_count > 1) && mysql_query(td->thrconn, "COMMIT")) {
1575 g_critical("Error committing data for %s.%s from file %s: %s",
1576 db ? db : database, table, filename, mysql_error(td->thrconn));
1577 errors++;
1578 }
1579 g_string_free(data, TRUE);
1580 if (!is_compressed) {
1581 fclose(infile);
1582 } else {
1583 gzclose((gzFile)infile);
1584 }
1585
1586 if (stream && no_delete == FALSE){
1587 g_message("Removing file: %s", path);
1588 remove(path);
1589 }
1590 g_free(path);
1591 return r;
1592 }
1593
read_data(FILE * file,gboolean is_compressed,GString * data,gboolean * eof)1594 gboolean read_data(FILE *file, gboolean is_compressed, GString *data,
1595 gboolean *eof) {
1596 char buffer[256];
1597
1598 do {
1599 if (!is_compressed) {
1600 if (fgets(buffer, 256, file) == NULL) {
1601 if (feof(file)) {
1602 *eof = TRUE;
1603 buffer[0] = '\0';
1604 } else {
1605 return FALSE;
1606 }
1607 }
1608 } else {
1609 if (!gzgets((gzFile)file, buffer, 256)) {
1610 if (gzeof((gzFile)file)) {
1611 *eof = TRUE;
1612 buffer[0] = '\0';
1613 } else {
1614 return FALSE;
1615 }
1616 }
1617 }
1618 g_string_append(data, buffer);
1619 } while ((buffer[strlen(buffer)] != '\0') && *eof == FALSE);
1620
1621 return TRUE;
1622 }
1623
get_queue_for_type(struct configuration * conf,enum file_type current_ft)1624 GAsyncQueue *get_queue_for_type(struct configuration *conf, enum file_type current_ft){
1625 switch (current_ft){
1626 case INIT:
1627 case SCHEMA_CREATE:
1628 return conf->database_queue;
1629 case SCHEMA_TABLE:
1630 return conf->table_queue;
1631 case SCHEMA_VIEW:
1632 case SCHEMA_TRIGGER:
1633 case SCHEMA_POST:
1634 case CHECKSUM:
1635 return conf->post_queue;
1636 case METADATA_GLOBAL:
1637 return NULL;
1638 case METADATA_TABLE:
1639 return NULL;
1640 return conf->post_table_queue;
1641 case DATA:
1642 return conf->data_queue;
1643 break;
1644 case LOAD_DATA:
1645 break;
1646
1647 }
1648 return NULL;
1649 }
1650
send_shutdown_jobs(GAsyncQueue * queue)1651 void send_shutdown_jobs(GAsyncQueue * queue){
1652 guint n=0;
1653 for (n = 0; n < num_threads; n++) {
1654 g_async_queue_push(queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1655 }
1656 }
1657
process_stream_filename(struct configuration * conf,GHashTable * table_hash,gchar * filename)1658 void process_stream_filename(struct configuration *conf,GHashTable *table_hash, gchar * filename){
1659 enum file_type current_ft=process_filename(conf,table_hash,filename);
1660 if (current_ft != SCHEMA_VIEW &&
1661 current_ft != SCHEMA_TRIGGER &&
1662 current_ft != SCHEMA_POST &&
1663 current_ft != CHECKSUM )
1664 g_async_queue_push(conf->stream_queue, GINT_TO_POINTER(current_ft));
1665 }
1666
1667
process_stream(struct configuration * conf)1668 void *process_stream(struct configuration *conf){
1669 char * filename=NULL,*real_filename=NULL;
1670 char buffer[1024];
1671 long unsigned int len=0;
1672 FILE *file=NULL;
1673 gboolean eof=FALSE;
1674 GHashTable *table_hash=g_hash_table_new ( g_str_hash, g_str_equal );
1675 do {
1676 if(fgets(buffer, 1024, stdin) == NULL){
1677 if (file && feof(file)){
1678 eof = TRUE;
1679 buffer[0] = '\0';
1680 fclose(file);
1681 }else{
1682 break;
1683 }
1684 }else{
1685 if (g_str_has_prefix(buffer,"-- ")){
1686 if (file){
1687 fclose(file);
1688 process_stream_filename(conf,table_hash,filename);
1689 }
1690 buffer[strlen(buffer)-1]='\0';
1691 real_filename = g_build_filename(directory,&(buffer[3]),NULL);
1692 filename = g_build_filename(&(buffer[3]),NULL);
1693 file = g_fopen(real_filename, "w");
1694 }else{
1695 if (file){
1696 len=write(fileno(file),buffer,strlen(buffer));
1697 if (len != strlen(buffer)) {
1698 g_critical("File size not the same");
1699 exit(EXIT_FAILURE);
1700 }
1701 }
1702 }
1703 }
1704 } while (eof == FALSE);
1705 fclose(file);
1706 process_stream_filename(conf,table_hash,filename);
1707 guint n=0;
1708 for (n = 0; n < num_threads *2 ; n++) {
1709 g_async_queue_push(conf->data_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1710 g_async_queue_push(conf->post_table_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1711 g_async_queue_push(conf->post_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1712 g_async_queue_push(conf->stream_queue, GINT_TO_POINTER(DATA));
1713 }
1714
1715 return NULL;
1716 }
1717