1 /*
2     This program is free software: you can redistribute it and/or modify
3     it under the terms of the GNU General Public License as published by
4     the Free Software Foundation, either version 3 of the License, or
5     (at your option) any later version.
6 
7     This program is distributed in the hope that it will be useful,
8     but WITHOUT ANY WARRANTY; without even the implied warranty of
9     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10     GNU General Public License for more details.
11 
12     You should have received a copy of the GNU General Public License
13     along with this program.  If not, see <http://www.gnu.org/licenses/>.
14 
15     Authors:        Andrew Hutchings, SkySQL (andrew at skysql dot com)
16                     David Ducos, Percona (david dot ducos at percona dot com)
17 
18 */
19 
20 #define _LARGEFILE64_SOURCE
21 #define _FILE_OFFSET_BITS 64
22 
23 #include <mysql.h>
24 
25 #if defined MARIADB_CLIENT_VERSION_STR && !defined MYSQL_SERVER_VERSION
26 #define MYSQL_SERVER_VERSION MARIADB_CLIENT_VERSION_STR
27 #endif
28 
29 #include <unistd.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <glib.h>
33 #include <glib/gstdio.h>
34 #include <stdlib.h>
35 #include <stdarg.h>
36 #include <errno.h>
37 #ifdef ZWRAP_USE_ZSTD
38 #include "zstd/zstd_zlibwrapper.h"
39 #else
40 #include <zlib.h>
41 #endif
42 #include "config.h"
43 #include "common.h"
44 #include "myloader.h"
45 #include "connection.h"
46 #include "getPassword.h"
47 #include "logging.h"
48 #include "set_verbose.h"
49 #include "locale.h"
50 #include "server_detect.h"
51 
52 guint commit_count = 1000;
53 gchar *input_directory = NULL;
54 gchar *directory = NULL;
55 gboolean overwrite_tables = FALSE;
56 gboolean innodb_optimize_keys = FALSE;
57 gboolean enable_binlog = FALSE;
58 gboolean disable_redo_log = FALSE;
59 guint rows = 0;
60 gchar *source_db = NULL;
61 gchar *purge_mode_str=NULL;
62 gchar *set_names_str=NULL;
63 enum purge_mode purge_mode;
64 static GMutex *init_mutex = NULL;
65 static GMutex *progress_mutex = NULL;
66 guint errors = 0;
67 guint max_threads_per_table=4;
68 unsigned long long int total_data_sql_files = 0;
69 unsigned long long int progress = 0;
70 GHashTable *db_hash=NULL;
71 GHashTable *tbl_hash=NULL;
72 
73 const char DIRECTORY[] = "import";
74 
75 gboolean read_data(FILE *file, gboolean is_compressed, GString *data,
76                    gboolean *eof);
77 int restore_data_from_file(struct thread_data *td, char *database, char *table,
78                   const char *filename, gboolean is_schema);
79 int restore_data_in_gstring_by_statement(struct thread_data *td,
80 		  GString *data, gboolean is_schema,
81 		  guint *query_counter);
82 int restore_data_in_gstring(struct thread_data *td, GString *data, gboolean is_schema, guint *query_counter);
83 void *process_queue(struct thread_data *td);
84 void checksum_table_filename(const gchar *filename, MYSQL *conn);
85 void load_directory_information(struct configuration *conf);
86 void checksum_databases(struct thread_data *td);
87 void no_log(const gchar *log_domain, GLogLevelFlags log_level,
88             const gchar *message, gpointer user_data);
89 void create_database(struct thread_data *td, gchar *database);
90 gint compare_dbt(gconstpointer a, gconstpointer b, gpointer table_hash);
91 gint compare_filename_part (gconstpointer a, gconstpointer b);
92 void get_database_table_from_file(const gchar *filename,const char *sufix,gchar **database,gchar **table);
93 void append_alter_table(GString * alter_table_statement, char *database, char *table);
94 void finish_alter_table(GString * alter_table_statement);
95 guint execute_use(struct thread_data *td, const gchar *msg);
96 int overwrite_table(MYSQL *conn,gchar * database, gchar * table);
97 gchar * get_database_name_from_content(const gchar *filename);
98 void process_restore_job(struct thread_data *td, struct restore_job *rj, int count);
99 gboolean process_job(struct thread_data *td, struct job *job, int count);
100 void *process_stream(struct configuration *conf);
101 GAsyncQueue *get_queue_for_type(struct configuration *conf, enum file_type current_ft);
102 void execute_use_if_needs_to(struct thread_data *td, gchar *database, const gchar * msg);
103 
104 static GOptionEntry entries[] = {
105     {"directory", 'd', 0, G_OPTION_ARG_STRING, &input_directory,
106      "Directory of the dump to import", NULL},
107     {"queries-per-transaction", 'q', 0, G_OPTION_ARG_INT, &commit_count,
108      "Number of queries per transaction, default 1000", NULL},
109     {"overwrite-tables", 'o', 0, G_OPTION_ARG_NONE, &overwrite_tables,
110      "Drop tables if they already exist", NULL},
111     {"database", 'B', 0, G_OPTION_ARG_STRING, &db,
112      "An alternative database to restore into", NULL},
113     {"source-db", 's', 0, G_OPTION_ARG_STRING, &source_db,
114      "Database to restore", NULL},
115     {"enable-binlog", 'e', 0, G_OPTION_ARG_NONE, &enable_binlog,
116      "Enable binary logging of the restore data", NULL},
117     {"innodb-optimize-keys", 0, 0, G_OPTION_ARG_NONE, &innodb_optimize_keys,
118      "Creates the table without the indexes and it adds them at the end", NULL},
119     { "set-names",0, 0, G_OPTION_ARG_STRING, &set_names_str,
120       "Sets the names, use it at your own risk, default binary", NULL },
121     {"logfile", 'L', 0, G_OPTION_ARG_FILENAME, &logfile,
122      "Log file name to use, by default stdout is used", NULL},
123     { "purge-mode", 0, 0, G_OPTION_ARG_STRING, &purge_mode_str,
124       "This specify the truncate mode which can be: NONE, DROP, TRUNCATE and DELETE", NULL },
125     { "disable-redo-log", 0, 0, G_OPTION_ARG_NONE, &disable_redo_log,
126       "Disables the REDO_LOG and enables it after, doesn't check initial status", NULL },
127     {"rows", 'r', 0, G_OPTION_ARG_INT, &rows,
128      "Split the INSERT statement into this many rows.", NULL},
129     {"max-threads-per-table", 0, 0, G_OPTION_ARG_INT, &max_threads_per_table,
130      "Maximum number of threads per table to use, default 4", NULL},
131     {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}};
132 
133 
split_and_restore_data_in_gstring_by_statement(struct thread_data * td,GString * data,gboolean is_schema,guint * query_counter)134 int split_and_restore_data_in_gstring_by_statement(struct thread_data *td,
135                   GString *data, gboolean is_schema, guint *query_counter)
136 {
137   char *next_line=g_strstr_len(data->str,-1,"\n");
138   char *insert_statement=g_strndup(data->str, next_line - data->str);
139   int r=0;
140   gchar *current_line=next_line+1;
141   next_line=g_strstr_len(current_line, -1, "\n");
142   GString * new_insert=g_string_sized_new(strlen(insert_statement));
143   do {
144     guint current_rows=0;
145     new_insert=g_string_append(new_insert,insert_statement);
146     do {
147       char *line=g_strndup(current_line, next_line - current_line);
148       g_string_append(new_insert, line);
149       g_free(line);
150       current_rows++;
151       current_line=next_line+1;
152       next_line=g_strstr_len(current_line, -1, "\n");
153     } while (current_rows < rows && next_line != NULL);
154     new_insert->str[new_insert->len - 1]=';';
155     r+=restore_data_in_gstring_by_statement(td, new_insert, is_schema, query_counter);
156   } while (next_line != NULL);
157   g_string_free(new_insert,TRUE);
158   g_string_set_size(data, 0);
159   return r;
160 }
161 
new_job(enum job_type type,void * job_data,char * use_database)162 struct job * new_job (enum job_type type, void *job_data, char *use_database) {
163   struct job *j = g_new0(struct job, 1);
164   j->type = type;
165   j->use_database=use_database;
166   switch (type){
167     case JOB_WAIT:
168       j->job_data = (GAsyncQueue *)job_data;
169     case JOB_SHUTDOWN:
170       break;
171     default:
172       j->job_data = (struct restore_job *)job_data;
173   }
174   return j;
175 }
176 
append_new_db_table(char * filename,gchar * database,gchar * table,guint64 number_rows,GHashTable * table_hash,GString * alter_table_statement)177 struct db_table* append_new_db_table(char * filename, gchar * database, gchar *table, guint64 number_rows, GHashTable *table_hash, GString *alter_table_statement){
178   if ( database == NULL || table == NULL){
179     g_critical("It was not possible to process file: %s",filename);
180     exit(EXIT_FAILURE);
181   }
182   char *real_database=g_hash_table_lookup(db_hash,database);
183   if (real_database == NULL){
184     g_critical("It was not possible to process file: %s",filename);
185     exit(EXIT_FAILURE);
186   }
187   gchar *lkey=g_strdup_printf("%s_%s",database, table);
188   struct db_table * dbt=g_hash_table_lookup(table_hash,lkey);
189   g_free(lkey);
190   if (dbt == NULL){
191     dbt=g_new(struct db_table,1);
192     dbt->filename=filename;
193     dbt->database=database;
194     // This should be the only place where we should use `db ? db : `
195     dbt->real_database = db ? db : real_database;
196     dbt->table=g_strdup(table);
197     dbt->real_table=dbt->table;
198     dbt->rows=number_rows;
199     dbt->restore_job_list = NULL;
200     dbt->queue=g_async_queue_new();
201     dbt->current_threads=0;
202     dbt->max_threads=max_threads_per_table;
203     dbt->mutex=g_mutex_new();
204     dbt->indexes=alter_table_statement;
205     dbt->start_time=NULL;
206     dbt->start_index_time=NULL;
207     dbt->finish_time=NULL;
208     g_hash_table_insert(table_hash, g_strdup_printf("%s_%s",dbt->database,dbt->table),dbt);
209   }else{
210     if (number_rows>0) dbt->rows=number_rows;
211     if (alter_table_statement != NULL) dbt->indexes=alter_table_statement;
212 //    if (real_table != NULL) dbt->real_table=g_strdup(real_table);
213   }
214   return dbt;
215 }
216 
new_restore_job(char * filename,char * database,struct db_table * dbt,GString * statement,guint part,enum restore_job_type type,const char * object)217 struct restore_job * new_restore_job( char * filename, char * database, struct db_table * dbt, GString * statement, guint part, enum restore_job_type type, const char *object){
218   struct restore_job *rj = g_new(struct restore_job, 1);
219   rj->type=type;
220   rj->filename= filename;
221   rj->database= database;
222   rj->statement = statement;
223   rj->dbt = dbt;
224   rj->part = part;
225   rj->object = object;
226   return rj;
227 }
228 
free_restore_job(struct restore_job * rj)229 void free_restore_job(struct restore_job * rj){
230   // We consider that
231   if (rj->filename != NULL ) g_free(rj->filename);
232   if (rj->statement != NULL ) g_string_free(rj->statement,TRUE);
233   if (rj != NULL ) g_free(rj);
234 }
235 
sync_threads_on_queue(GAsyncQueue * ready_queue,GAsyncQueue * comm_queue,const gchar * msg)236 void sync_threads_on_queue(GAsyncQueue *ready_queue,GAsyncQueue *comm_queue,const gchar *msg){
237   guint n;
238   GAsyncQueue * queue = g_async_queue_new();
239   for (n = 0; n < num_threads; n++){
240     g_async_queue_push(comm_queue, new_job(JOB_WAIT, queue, NULL));
241   }
242   for (n = 0; n < num_threads; n++)
243     g_async_queue_pop(ready_queue);
244   g_debug("%s",msg);
245   for (n = 0; n < num_threads; n++)
246     g_async_queue_push(queue, GINT_TO_POINTER(1));
247 }
248 
sync_threads(struct configuration * conf)249 void sync_threads(struct configuration * conf){
250   sync_threads_on_queue(conf->ready, conf->data_queue,"Syncing");
251 }
252 
print_time(GTimeSpan timespan)253 gchar * print_time(GTimeSpan timespan){
254   GTimeSpan days=timespan/G_TIME_SPAN_DAY;
255   GTimeSpan hours=(timespan-(days*G_TIME_SPAN_DAY))/G_TIME_SPAN_HOUR;
256   GTimeSpan minutes=(timespan-(days*G_TIME_SPAN_HOUR)-(hours*G_TIME_SPAN_HOUR))/G_TIME_SPAN_MINUTE;
257   GTimeSpan seconds=(timespan-(days*G_TIME_SPAN_MINUTE)-(hours*G_TIME_SPAN_HOUR)-(minutes*G_TIME_SPAN_MINUTE))/G_TIME_SPAN_SECOND;
258   return g_strdup_printf("%ld %02ld:%02ld:%02ld",days,hours,minutes,seconds);
259 }
260 
restore_from_directory(struct configuration * conf)261 void restore_from_directory(struct configuration *conf){
262   // Leaving just on thread to execute the add constraints as it might cause deadlocks
263   guint n=0;
264   for (n = 0; n < num_threads-1; n++) {
265     g_async_queue_push(conf->post_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
266   }
267   load_directory_information(conf);
268   sync_threads_on_queue(conf->ready,conf->database_queue,"Step 1 completed, Databases created");
269   for (n = 0; n < num_threads; n++) {
270     g_async_queue_push(conf->database_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
271   }
272   sync_threads_on_queue(conf->ready,conf->table_queue,"Step 2 completed, Tables created");
273   for (n = 0; n < num_threads; n++) {
274     g_async_queue_push(conf->table_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
275   }
276   // We need to sync all the threads before continue
277   sync_threads_on_queue(conf->ready,conf->data_queue,"Step 3 completed, load data finished");
278   for (n = 0; n < num_threads; n++) {
279     g_async_queue_push(conf->data_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
280   }
281   g_async_queue_push(conf->post_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
282   g_debug("Step 4 completed");
283 
284   GList * t=conf->table_list;
285   g_message("Import timings:");
286   g_message("Data      \t| Index    \t| Total   \t| Table");
287   while (t != NULL){
288     struct db_table * dbt=t->data;
289     GTimeSpan diff1=g_date_time_difference(dbt->start_index_time,dbt->start_time);
290     GTimeSpan diff2=g_date_time_difference(dbt->finish_time,dbt->start_index_time);
291     g_message("%s\t| %s\t| %s\t| `%s`.`%s`",print_time(diff1),print_time(diff2),print_time(diff1+diff2),dbt->real_database,dbt->real_table);
292     t=t->next;
293   }
294   innodb_optimize_keys=FALSE;
295 
296   for (n = 0; n < num_threads; n++) {
297     g_async_queue_push(conf->post_table_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
298   }
299   // Step 5: Create remaining objects.
300   // TODO: is it possible to do it in parallel? Actually, why aren't we queuing this files?
301   g_debug("Step 5 started");
302 
303 }
304 
main(int argc,char * argv[])305 int main(int argc, char *argv[]) {
306   struct configuration conf = {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0};
307 
308   GError *error = NULL;
309   GOptionContext *context;
310 
311   setlocale(LC_ALL, "");
312   g_thread_init(NULL);
313 
314   init_mutex = g_mutex_new();
315   progress_mutex = g_mutex_new();
316 
317   if (db == NULL && source_db != NULL) {
318     db = g_strdup(source_db);
319   }
320 
321   context = g_option_context_new("multi-threaded MySQL loader");
322   GOptionGroup *main_group =
323       g_option_group_new("main", "Main Options", "Main Options", NULL, NULL);
324   g_option_group_add_entries(main_group, entries);
325   g_option_group_add_entries(main_group, common_entries);
326   g_option_context_set_main_group(context, main_group);
327   gchar ** tmpargv=g_strdupv(argv);
328   int tmpargc=argc;
329   if (!g_option_context_parse(context, &tmpargc, &tmpargv, &error)) {
330     g_print("option parsing failed: %s, try --help\n", error->message);
331     exit(EXIT_FAILURE);
332   }
333 
334   if (defaults_file != NULL){
335     load_config_file(defaults_file, context, "myloader");
336   }
337   g_option_context_free(context);
338 
339   if (password != NULL){
340     int i=1;
341     for(i=1; i < argc; i++){
342       gchar * p= g_strstr_len(argv[i],-1,password);
343       if (p != NULL){
344         strncpy(p, "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", strlen(password));
345       }
346     }
347   }
348   // prompt for password if it's NULL
349   if (sizeof(password) == 0 || (password == NULL && askPassword)) {
350     password = passwordPrompt();
351   }
352 
353   if (program_version) {
354     g_print("myloader %s, built against MySQL %s\n", VERSION,
355             MYSQL_VERSION_STR);
356     exit(EXIT_SUCCESS);
357   }
358 
359   set_verbose(verbose);
360 
361 #ifdef ZWRAP_USE_ZSTD
362   compress_extension = g_strdup(".zst");
363 #else
364   compress_extension = g_strdup(".gz");
365 #endif
366   if (set_names_str){
367     gchar *tmp_str=g_strdup_printf("/*!40101 SET NAMES %s*/",set_names_str);
368     set_names_str=tmp_str;
369   } else
370     set_names_str=g_strdup("/*!40101 SET NAMES binary*/");
371 
372   if (purge_mode_str){
373     if (!strcmp(purge_mode_str,"TRUNCATE")){
374       purge_mode=TRUNCATE;
375     } else if (!strcmp(purge_mode_str,"DROP")){
376       purge_mode=DROP;
377     } else if (!strcmp(purge_mode_str,"DELETE")){
378       purge_mode=DELETE;
379     } else if (!strcmp(purge_mode_str,"NONE")){
380       purge_mode=NONE;
381     } else {
382       g_error("Purge mode unknown");
383     }
384   } else if (overwrite_tables)
385     purge_mode=DROP; // Default mode is DROP when overwrite_tables is especified
386   else purge_mode=NONE;
387 
388   if (!input_directory) {
389     if (stream){
390       GDateTime * datetime = g_date_time_new_now_local();
391       char *datetimestr;
392       datetimestr=g_date_time_format(datetime,"\%Y\%m\%d-\%H\%M\%S");
393       directory = g_strdup_printf("%s-%s", DIRECTORY, datetimestr);
394       create_backup_dir(directory);
395       g_free(datetimestr);
396     }else{
397       g_critical("a directory needs to be specified, see --help\n");
398       exit(EXIT_FAILURE);
399     }
400   } else {
401     if (!g_file_test(input_directory,G_FILE_TEST_IS_DIR)){
402       g_critical("the specified directory doesn't exists\n");
403       exit(EXIT_FAILURE);
404     }
405     directory=input_directory;
406     if (!stream){
407       char *p = g_strdup_printf("%s/metadata", directory);
408       if (!g_file_test(p, G_FILE_TEST_EXISTS)) {
409         g_critical("the specified directory is not a mydumper backup\n");
410         exit(EXIT_FAILURE);
411       }
412     }
413   }
414   MYSQL *conn;
415   conn = mysql_init(NULL);
416 
417   configure_connection(conn, "myloader");
418   if (!mysql_real_connect(conn, hostname, username, password, NULL, port,
419                           socket_path, 0)) {
420     g_critical("Error connection to database: %s", mysql_error(conn));
421     exit(EXIT_FAILURE);
422   }
423 
424   detected_server = detect_server(conn);
425   initialize_session_variables("myloader",set_session, detected_server, defaults_file);
426 
427   // TODO: we need to set the variables in the initilize session varibles, not from:
428   if (mysql_query(conn, "SET SESSION wait_timeout = 2147483")) {
429     g_warning("Failed to increase wait_timeout: %s", mysql_error(conn));
430   }
431 
432   if (!enable_binlog)
433     mysql_query(conn, "SET SQL_LOG_BIN=0");
434 
435   if (disable_redo_log){
436     g_message("Disabling redologs");
437     mysql_query(conn, "ALTER INSTANCE DISABLE INNODB REDO_LOG");
438   }
439   mysql_query(conn, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/");
440   // To here.
441   conf.database_queue = g_async_queue_new();
442   conf.table_queue = g_async_queue_new();
443   conf.data_queue = g_async_queue_new();
444   conf.post_table_queue = g_async_queue_new();
445   conf.post_queue = g_async_queue_new();
446   conf.ready = g_async_queue_new();
447   conf.stream_queue = g_async_queue_new();
448   db_hash=g_hash_table_new ( g_str_hash, g_str_equal );
449   tbl_hash=g_hash_table_new ( g_str_hash, g_str_equal );
450   guint n;
451   GThread **threads = g_new(GThread *, num_threads);
452   struct thread_data *td = g_new(struct thread_data, num_threads);
453   for (n = 0; n < num_threads; n++) {
454     td[n].conf = &conf;
455     td[n].thread_id = n + 1;
456     threads[n] =
457         g_thread_create((GThreadFunc)process_queue, &td[n], TRUE, NULL);
458     // Here, the ready queue is being used to serialize the connection to the database.
459     // We don't want all the threads try to connect at the same time
460     g_async_queue_pop(conf.ready);
461   }
462 
463   struct thread_data t;
464   t.thread_id = 0;
465   t.conf = &conf;
466   t.thrconn = conn;
467   t.current_database=NULL;
468 
469   // Step 1: Create databases | single threaded
470   if (db)
471     create_database(&t, db);
472   if (stream){
473     GThread *stream_thread = g_thread_create((GThreadFunc)process_stream, &conf, TRUE, NULL);
474     g_thread_join(stream_thread);
475   }else{
476     restore_from_directory(&conf);
477   }
478 
479   for (n = 0; n < num_threads; n++) {
480     g_thread_join(threads[n]);
481   }
482   g_async_queue_unref(conf.ready);
483   if (disable_redo_log)
484     mysql_query(conn, "ALTER INSTANCE ENABLE INNODB REDO_LOG");
485 
486   g_async_queue_unref(conf.data_queue);
487 
488   checksum_databases(&t);
489 
490   if (stream && no_delete == FALSE && input_directory == NULL){
491     // remove metadata files
492     GList *e=conf.metadata_list;
493     gchar *path = NULL;
494     while (e) {
495       path = g_build_filename(directory, e->data, NULL);
496       remove(path);
497       g_free(path);
498       e=e->next;
499     }
500     path = g_build_filename(directory, "metadata", NULL);
501     remove(path);
502     g_free(path);
503     if (g_rmdir(directory) != 0)
504         g_critical("Restore directory not removed: %s", directory);
505   }
506   mysql_close(conn);
507   mysql_thread_end();
508   mysql_library_end();
509   g_free(directory);
510   g_free(td);
511   g_free(threads);
512 
513   if (logoutfile) {
514     fclose(logoutfile);
515   }
516 
517   return errors ? EXIT_FAILURE : EXIT_SUCCESS;
518 }
519 
process_create_table_statement(gchar * statement,GString * create_table_statement,GString * alter_table_statement,GString * alter_table_constraint_statement,struct db_table * dbt)520 int process_create_table_statement (gchar * statement, GString *create_table_statement, GString *alter_table_statement, GString *alter_table_constraint_statement, struct db_table *dbt){
521   int flag=0;
522   gchar** split_file= g_strsplit(statement, "\n", -1);
523   gchar *autoinc_column=NULL;
524   append_alter_table(alter_table_statement, dbt->real_database,dbt->real_table);
525   append_alter_table(alter_table_constraint_statement, dbt->real_database,dbt->real_table);
526   int fulltext_counter=0;
527   int i=0;
528   for (i=0; i < (int)g_strv_length(split_file);i++){
529     if ( g_strstr_len(split_file[i],5,"  KEY")
530       || g_strstr_len(split_file[i],8,"  UNIQUE")
531       || g_strstr_len(split_file[i],9,"  SPATIAL")
532       || g_strstr_len(split_file[i],10,"  FULLTEXT")
533       || g_strstr_len(split_file[i],7,"  INDEX")
534       ){
535       // Ignore if the first column of the index is the AUTO_INCREMENT column
536       if ((autoinc_column != NULL) && (g_strrstr(split_file[i],autoinc_column))){
537         g_string_append(create_table_statement, split_file[i]);
538         g_string_append_c(create_table_statement,'\n');
539       }else{
540         flag+=IS_ALTER_TABLE_PRESENT;
541         if (g_strrstr(split_file[i],"  FULLTEXT")) fulltext_counter++;
542         if (fulltext_counter>1){
543           fulltext_counter=1;
544           finish_alter_table(alter_table_statement);
545           append_alter_table(alter_table_statement,dbt->real_database,dbt->real_table);
546         }
547         g_string_append(alter_table_statement,"\n ADD");
548         g_string_append(alter_table_statement, split_file[i]);
549       }
550     }else{
551       if (g_strstr_len(split_file[i],12,"  CONSTRAINT")){
552         flag+=INCLUDE_CONSTRAINT;
553         g_string_append(alter_table_constraint_statement,"\n ADD");
554         g_string_append(alter_table_constraint_statement, split_file[i]);
555       }else{
556         if (g_strrstr(split_file[i],"AUTO_INCREMENT")){
557           gchar** autoinc_split=g_strsplit(split_file[i],"`",3);
558           autoinc_column=g_strdup_printf("(`%s`", autoinc_split[1]);
559         }
560         g_string_append(create_table_statement, split_file[i]);
561         g_string_append_c(create_table_statement,'\n');
562       }
563     }
564     if (g_strrstr(split_file[i],"ENGINE=InnoDB")) flag+=IS_INNODB_TABLE;
565   }
566   return flag;
567 }
568 
569 
570 
load_schema(struct configuration * conf,struct db_table * dbt,const gchar * filename)571 void load_schema(struct configuration *conf, struct db_table *dbt, const gchar *filename){
572 //  g_message("Loading schema for file: %s", filename);
573   void *infile;
574   gboolean is_compressed = FALSE;
575   gboolean eof = FALSE;
576   GString *data=g_string_sized_new(512);
577   GString *create_table_statement=g_string_sized_new(512);
578   GString *alter_table_statement=g_string_sized_new(512);
579   GString *alter_table_constraint_statement=g_string_sized_new(512);
580   if (!g_str_has_suffix(filename, compress_extension)) {
581     infile = g_fopen(filename, "r");
582     is_compressed = FALSE;
583   } else {
584     infile = (void *)gzopen(filename, "r");
585     is_compressed = TRUE;
586   }
587   if (!infile) {
588     g_critical("cannot open file %s (%d)", filename, errno);
589     errors++;
590     return;
591   }
592   while (eof == FALSE) {
593     if (read_data(infile, is_compressed, data, &eof)) {
594       if (g_strrstr(&data->str[data->len >= 5 ? data->len - 5 : 0], ";\n")) {
595         if (g_strrstr(data->str,"CREATE ")){
596           gchar** create_table= g_strsplit(data->str, "`", 3);
597 	        dbt->real_table=g_strdup(create_table[1]);
598           if ( g_str_has_prefix(dbt->table,"mydumper_")){
599             g_hash_table_insert(tbl_hash, dbt->table, dbt->real_table);
600           }else{
601             g_hash_table_insert(tbl_hash, dbt->real_table, dbt->real_table);
602           }
603           g_strfreev(create_table);
604         }
605         if (innodb_optimize_keys){
606           // Check if it is a /*!40  SET
607           if (g_strrstr(data->str,"/*!40")){
608             g_string_append(alter_table_statement,data->str);
609             g_string_append(create_table_statement,data->str);
610 	        }else{
611             // Processing CREATE TABLE statement
612             GString *new_create_table_statement=g_string_sized_new(512);
613             int flag = process_create_table_statement(data->str, new_create_table_statement, alter_table_statement, alter_table_constraint_statement, dbt);
614             if (flag & IS_INNODB_TABLE){
615               if (flag & IS_ALTER_TABLE_PRESENT){
616                 finish_alter_table(alter_table_statement);
617                 g_message("Fast index creation will be use for table: %s.%s",dbt->real_database,dbt->real_table);
618               }else{
619                 g_string_free(alter_table_statement,TRUE);
620                 alter_table_statement=NULL;
621               }
622               g_string_append(create_table_statement,g_strjoinv("\n)",g_strsplit(new_create_table_statement->str,",\n)",-1)));
623               dbt->indexes=alter_table_statement;
624               if (stream){
625                 struct restore_job *rj = new_restore_job(dbt->filename, dbt->real_database, dbt, dbt->indexes, 0, JOB_RESTORE_STRING, "indexes");
626                 g_async_queue_push(conf->post_table_queue, new_job(JOB_RESTORE,rj,dbt->real_database));
627               }
628               if (flag & INCLUDE_CONSTRAINT){
629                 struct restore_job *rj = new_restore_job(g_strdup(filename), dbt->real_database, dbt, alter_table_constraint_statement, 0, JOB_RESTORE_STRING, "constraint");
630                 g_async_queue_push(conf->post_table_queue, new_job(JOB_RESTORE,rj,dbt->real_database));
631                 dbt->constraints=alter_table_constraint_statement;
632               }else{
633                  g_string_free(alter_table_constraint_statement,TRUE);
634               }
635               g_string_set_size(data, 0);
636             }else{
637               g_string_free(alter_table_statement,TRUE);
638               g_string_free(alter_table_constraint_statement,TRUE);
639       	      g_string_append(create_table_statement,data->str);
640             }
641           }
642     	  }else{
643           g_string_append(create_table_statement,data->str);
644       	}
645       	g_string_set_size(data, 0);
646       }
647     }
648   }
649   struct restore_job * rj = new_restore_job(g_strdup(filename), dbt->real_database, dbt, create_table_statement, 0,JOB_RESTORE_SCHEMA_STRING, "");
650   g_async_queue_push(conf->table_queue, new_job(JOB_RESTORE,rj,dbt->real_database));
651   if (!is_compressed) {
652     fclose(infile);
653   } else {
654     gzclose((gzFile)infile);
655   }
656   if (stream && no_delete == FALSE){
657     g_message("Removing file: %s", filename);
658     remove(filename);
659   }
660 }
661 
get_database_name_from_filename(const gchar * filename)662 gchar * get_database_name_from_filename(const gchar *filename){
663   gchar **split_file = g_strsplit(filename, "-schema-create.sql", 2);
664   gchar *db_name=g_strdup(split_file[0]);
665   g_strfreev(split_file);
666   return db_name;
667 }
668 
get_database_table_part_name_from_filename(const gchar * filename,const gchar * suffix,gchar ** database,gchar ** table,guint * part)669 void get_database_table_part_name_from_filename(const gchar *filename, const gchar * suffix, gchar **database, gchar **table, guint *part){
670   gchar **split_file = g_strsplit(filename, suffix, 3);
671   gchar **split_db_tbl = g_strsplit(split_file[0], ".", -1);
672   g_strfreev(split_file);
673   if (g_strv_length(split_db_tbl)>=3){
674     *database=g_strdup(split_db_tbl[0]);
675     *table=g_strdup(split_db_tbl[1]);
676     *part=g_ascii_strtoull(split_db_tbl[2], NULL, 10);
677   }else{
678     *database=NULL;
679     *table=NULL;
680     *part=0;
681   }
682   g_strfreev(split_db_tbl);
683 }
684 
get_database_table_name_from_filename(const gchar * filename,const gchar * suffix,gchar ** database,gchar ** table)685 void get_database_table_name_from_filename(const gchar *filename, const gchar * suffix, gchar **database, gchar **table){
686   gchar **split_file = g_strsplit(filename, suffix, 2);
687   gchar **split_db_tbl = g_strsplit(split_file[0], ".", -1);
688   g_strfreev(split_file);
689   if (g_strv_length(split_db_tbl)==2){
690     *database=g_strdup(split_db_tbl[0]);
691     *table=g_strdup(split_db_tbl[1]);
692   }else{
693     *database=NULL;
694     *table=NULL;
695   }
696   g_strfreev(split_db_tbl);
697 }
698 
process_database_filename(struct configuration * conf,char * filename,const char * object)699 void process_database_filename(struct configuration *conf, char * filename, const char *object) {
700   gchar *db_kname,*db_vname;
701   db_vname=db_kname=get_database_name_from_filename(filename);
702 
703   if (db_kname!=NULL && g_str_has_prefix(db_kname,"mydumper_")){
704     db_vname=get_database_name_from_content(g_build_filename(directory,filename,NULL));
705   }
706   g_debug("Adding database: %s -> %s", db_kname, db ? db : db_vname);
707   g_hash_table_insert(db_hash, db_kname, db ? db : db_vname);
708   if (!db){
709     struct restore_job *rj = new_restore_job(g_strdup(filename), db_vname, NULL, NULL, 0 , JOB_RESTORE_SCHEMA_FILENAME, object);
710     g_async_queue_push(conf->database_queue, new_job(JOB_RESTORE,rj,NULL));
711   }
712 }
713 
process_table_filename(struct configuration * conf,GHashTable * table_hash,char * filename)714 void process_table_filename(struct configuration *conf, GHashTable *table_hash, char * filename){
715   gchar *db_name, *table_name;
716   struct db_table *dbt=NULL;
717   get_database_table_name_from_filename(filename,"-schema.sql",&db_name,&table_name);
718   if (db_name == NULL || table_name == NULL){
719       g_critical("It was not possible to process file: %s (1)",filename);
720       exit(EXIT_FAILURE);
721   }
722   char *real_db_name=g_hash_table_lookup(db_hash,db_name);
723   if (real_db_name==NULL){
724     g_critical("It was not possible to process file: %s (2) because real_db_name isn't found",filename);
725     exit(EXIT_FAILURE);
726   }
727   dbt=append_new_db_table(filename, db_name, table_name,0,table_hash,NULL);
728   load_schema(conf, dbt,g_build_filename(directory,filename,NULL));
729 }
730 
process_data_filename(struct configuration * conf,GHashTable * table_hash,char * filename)731 void process_data_filename(struct configuration *conf, GHashTable *table_hash, char * filename){
732   gchar *db_name, *table_name;
733   total_data_sql_files++;
734   // TODO: check if it is a data file
735   // TODO: we need to count sections of the data file to determine if it is ok.
736   guint part;
737   get_database_table_part_name_from_filename(filename,".sql",&db_name,&table_name,&part);
738   if (db_name == NULL || table_name == NULL){
739     g_critical("It was not possible to process file: %s (3)",filename);
740     exit(EXIT_FAILURE);
741   }
742   char *real_db_name=g_hash_table_lookup(db_hash,db_name);
743   struct db_table *dbt=append_new_db_table(real_db_name,db_name, table_name,0,table_hash,NULL);
744   //struct db_table *dbt=append_new_db_table(filename,db_name, table_name,0,table_hash,NULL);
745   struct restore_job *rj = new_restore_job(g_strdup(filename), dbt->real_database, dbt, NULL, part , JOB_RESTORE_FILENAME, "");
746   // in stream mode, there is no need to sort. We can enqueue directly, where? queue maybe?.
747   if (stream){
748     g_async_queue_push(conf->data_queue, new_job(JOB_RESTORE ,rj,dbt->real_database));
749   }else{
750     dbt->restore_job_list=g_list_insert_sorted(dbt->restore_job_list,rj,&compare_filename_part);
751   }
752 }
753 
process_schema_filename(struct configuration * conf,const gchar * filename,const char * object)754 void process_schema_filename(struct configuration *conf, const gchar *filename, const char * object) {
755     gchar *database=NULL, *table=NULL, *real_db_name=NULL;
756     get_database_table_from_file(filename,"-schema",&database,&table);
757     if (database == NULL){
758       g_critical("Database is null on: %s",filename);
759     }
760     real_db_name=g_hash_table_lookup(db_hash,database);
761     struct restore_job *rj = new_restore_job(g_strdup(filename), real_db_name , NULL , NULL, 0 , JOB_RESTORE_SCHEMA_FILENAME, object);
762     g_async_queue_push(conf->post_queue, new_job(JOB_RESTORE,rj,real_db_name));
763 }
764 
get_file_type(const char * filename)765 enum file_type get_file_type (const char * filename){
766   if (g_strrstr(filename, "-schema.sql")) {
767     return SCHEMA_TABLE;
768   } else if (g_str_has_suffix(filename, "-metadata")) {
769     return METADATA_TABLE;
770   } else if ( strcmp(filename, "metadata") == 0 ){
771     return METADATA_GLOBAL;
772   } else if (g_str_has_suffix(filename, "-checksum") || g_str_has_suffix(filename, "-checksum.gz")) {
773     return CHECKSUM;
774   } else if ( g_strrstr(filename, "-schema-view.sql") ){
775     return SCHEMA_VIEW;
776   } else if ( g_strrstr(filename, "-schema-triggers.sql") ){
777     return SCHEMA_TRIGGER;
778   } else if ( g_strrstr(filename, "-schema-post.sql") ){
779     return SCHEMA_POST;
780   } else if ( g_strrstr(filename, "-schema-create.sql") ){
781     return SCHEMA_CREATE;
782   }else if (g_str_has_suffix(filename, ".dat"))
783     return LOAD_DATA;
784   return DATA;
785 }
786 
787 
process_filename(struct configuration * conf,GHashTable * table_hash,char * filename)788 enum file_type process_filename(struct configuration *conf,GHashTable *table_hash, char *filename){
789   enum file_type ft= get_file_type(filename);
790   if (!source_db ||
791     g_str_has_prefix(filename, g_strdup_printf("%s.", source_db))) {
792     switch (ft){
793       case INIT:
794       case SCHEMA_CREATE:
795         if (db){
796           g_warning("Skipping database creation on file: %s",filename);
797 	}else{
798           process_database_filename(conf, filename, "create database");
799         }
800         break;
801       case SCHEMA_TABLE:
802         process_table_filename(conf,table_hash,filename);
803         break;
804       case SCHEMA_VIEW:
805         process_schema_filename(conf, filename,"view");
806         break;
807       case SCHEMA_TRIGGER:
808         process_schema_filename(conf, filename,"trigger");
809         break;
810       case SCHEMA_POST:
811         // can be enqueued in any order
812         process_schema_filename(conf, filename,"post");
813         break;
814       case CHECKSUM:
815         conf->checksum_list=g_list_insert(conf->checksum_list,g_strdup(filename),-1);
816         break;
817       case METADATA_GLOBAL:
818         break;
819       case METADATA_TABLE:
820         // TODO: we need to process this info
821         conf->metadata_list=g_list_insert(conf->metadata_list,g_strdup(filename),-1);
822         break;
823       case DATA:
824         process_data_filename(conf,table_hash,filename);
825         break;
826       case LOAD_DATA:
827         g_message("Load data file found: %s", filename);
828         break;
829     }
830   }
831   return ft;
832 }
833 
load_directory_information(struct configuration * conf)834 void load_directory_information(struct configuration *conf) {
835   GError *error = NULL;
836   GDir *dir = g_dir_open(directory, 0, &error);
837 
838   if (error) {
839     g_critical("cannot open directory %s, %s\n", directory, error->message);
840     errors++;
841     return;
842   }
843 
844   const gchar *filename = NULL;
845   GList *create_table_list=NULL,
846         *metadata_list= NULL,
847         *data_files_list=NULL,
848         *schema_create_list=NULL,
849         *view_list=NULL,
850         *trigger_list=NULL,
851         *post_list=NULL;
852 
853   while ((filename = g_dir_read_name(dir))) {
854     enum file_type ft= get_file_type(filename);
855     if (ft == SCHEMA_POST){
856           post_list=g_list_insert(post_list,g_strdup(filename),-1);
857     } else if (ft ==  SCHEMA_CREATE ){
858           schema_create_list=g_list_insert(schema_create_list,g_strdup(filename),-1);
859           conf->schema_create_list=g_list_insert(conf->schema_create_list,g_strdup(filename),-1);
860     } else if (!source_db ||
861       g_str_has_prefix(filename, g_strdup_printf("%s.", source_db))||
862       g_str_has_prefix(filename, "mydumper_")) {
863         switch (ft){
864           case INIT:
865             break;
866           case SCHEMA_TABLE:
867             create_table_list=g_list_append(create_table_list,g_strdup(filename));
868             break;
869           case SCHEMA_VIEW:
870             view_list=g_list_append(view_list,g_strdup(filename));
871             break;
872           case SCHEMA_TRIGGER:
873             trigger_list=g_list_append(trigger_list,g_strdup(filename));
874             break;
875           case CHECKSUM:
876             conf->checksum_list=g_list_append(conf->checksum_list,g_strdup(filename));
877             break;
878           case METADATA_GLOBAL:
879             break;
880           case METADATA_TABLE:
881             // TODO: we need to process this info
882             metadata_list=g_list_append(metadata_list,g_strdup(filename));
883             break;
884           case DATA:
885             data_files_list=g_list_append(data_files_list,g_strdup(filename));
886             break;
887           case LOAD_DATA:
888             g_message("Load data file found: %s", filename);
889             break;
890           default:
891             g_warning("File ignored: %s", filename);
892             break;
893         }
894       }
895     }
896   g_dir_close(dir);
897 
898   gchar *f = NULL;
899   // CREATE DATABASE
900   while (schema_create_list){
901     f = schema_create_list->data;
902     process_database_filename(conf, f, "create database");
903     schema_create_list=schema_create_list->next;
904   }
905 
906   // CREATE TABLE
907   GHashTable *table_hash = g_hash_table_new ( g_str_hash, g_str_equal );
908   while (create_table_list != NULL){
909     f = create_table_list->data;
910     process_table_filename(conf,table_hash,f);
911     create_table_list=create_table_list->next;
912   }
913 
914   // DATA FILES
915   while (data_files_list != NULL){
916     f = data_files_list->data;
917     process_data_filename(conf,table_hash,f);
918 
919     data_files_list=data_files_list->next;
920   }
921 
922 
923   while (view_list != NULL){
924     f = view_list->data;
925     process_schema_filename(conf, f,"view");
926     view_list=view_list->next;
927   }
928 
929   while (trigger_list != NULL){
930     f = trigger_list->data;
931     process_schema_filename(conf, f, "trigger");
932     trigger_list=trigger_list->next;
933   }
934 
935   while (post_list != NULL){
936     f = post_list->data;
937     process_schema_filename(conf, f,"post");
938     post_list=post_list->next;
939   }
940   // SORT DATA FILES TO ENQUEUE
941   // iterates over the dbt to create the jobs in the dbt->queue
942   // and sorts the dbt for the conf->table_list
943   // in stream mode, it is not possible to sort the tables as
944   // we don't know the amount the rows, .metadata are sent at the end.
945   GList * table_list=NULL;
946   GHashTableIter iter;
947   gchar * lkey;
948   g_hash_table_iter_init ( &iter, table_hash );
949     struct db_table *dbt=NULL;
950   while ( g_hash_table_iter_next ( &iter, (gpointer *) &lkey, (gpointer *) &dbt ) ) {
951     table_list=g_list_insert_sorted_with_data (table_list,dbt,&compare_dbt,table_hash);
952     GList *i=dbt->restore_job_list;
953     while (i) {
954       g_async_queue_push(dbt->queue, new_job(JOB_RESTORE ,i->data,dbt->real_database));
955       i=i->next;
956     }
957     dbt->count=g_async_queue_length(dbt->queue);
958   }
959   g_hash_table_destroy(table_hash);
960   conf->table_list=table_list;
961   // conf->table needs to be set.
962 }
963 
964 
965 // this can be moved to the table structure and executed before index creation.
checksum_databases(struct thread_data * td)966 void checksum_databases(struct thread_data *td) {
967   g_message("Starting table checksum verification");
968 
969   const gchar *filename = NULL;
970   GList *e = td->conf->checksum_list;
971   while (e){
972     filename=e->data;
973     checksum_table_filename(filename, td->thrconn);
974     e=e->next;
975   }
976 }
977 
create_database(struct thread_data * td,gchar * database)978 void create_database(struct thread_data *td, gchar *database) {
979   gchar *query = NULL;
980 
981   const gchar *filename =
982       g_strdup_printf("%s-schema-create.sql", database);
983   const gchar *filenamegz =
984       g_strdup_printf("%s-schema-create.sql%s", database, compress_extension);
985   const gchar *filepath = g_strdup_printf("%s/%s-schema-create.sql",
986                                           directory, database);
987   const gchar *filepathgz = g_strdup_printf("%s/%s-schema-create.sql%s",
988                                             directory, database, compress_extension);
989 
990   if (g_file_test(filepath, G_FILE_TEST_EXISTS)) {
991     restore_data_from_file(td, database, NULL, filename, TRUE);
992   } else if (g_file_test(filepathgz, G_FILE_TEST_EXISTS)) {
993     restore_data_from_file(td, database, NULL, filenamegz, TRUE);
994   } else {
995     query = g_strdup_printf("CREATE DATABASE IF NOT EXISTS `%s`", database);
996     if (mysql_query(td->thrconn, query)){
997       g_warning("Fail to create database: %s", database);
998     }
999   }
1000 
1001   g_free(query);
1002   return;
1003 }
1004 
get_database_table_from_file(const gchar * filename,const char * sufix,gchar ** database,gchar ** table)1005 void get_database_table_from_file(const gchar *filename,const char *sufix,gchar **database,gchar **table){
1006   gchar **split_filename = g_strsplit(filename, sufix, 0);
1007   gchar **split = g_strsplit(split_filename[0],".",0);
1008   guint count=g_strv_length(split);
1009   if (count > 2){
1010     g_warning("We need to get the db and table name from the create table statement");
1011     return;
1012   }
1013   *table=g_strdup(split[1]);
1014   *database=g_strdup(split[0]);
1015 }
1016 
1017 
overwrite_table(MYSQL * conn,gchar * database,gchar * table)1018 int overwrite_table(MYSQL *conn,gchar * database, gchar * table){
1019   int truncate_or_delete_failed=0;
1020   gchar *query=NULL;
1021   if (purge_mode == DROP) {
1022     g_message("Dropping table or view (if exists) `%s`.`%s`",
1023               database, table);
1024     query = g_strdup_printf("DROP TABLE IF EXISTS `%s`.`%s`",
1025                             database, table);
1026     mysql_query(conn, query);
1027     query = g_strdup_printf("DROP VIEW IF EXISTS `%s`.`%s`", database,
1028                             table);
1029     mysql_query(conn, query);
1030   } else if (purge_mode == TRUNCATE) {
1031     g_message("Truncating table `%s`.`%s`", database, table);
1032     query= g_strdup_printf("TRUNCATE TABLE `%s`.`%s`", database, table);
1033     truncate_or_delete_failed= mysql_query(conn, query);
1034     if (truncate_or_delete_failed)
1035       g_warning("Truncate failed, we are going to try to create table or view");
1036   } else if (purge_mode == DELETE) {
1037     g_message("Deleting content of table `%s`.`%s`", database, table);
1038     query= g_strdup_printf("DELETE FROM `%s`.`%s`", database, table);
1039     truncate_or_delete_failed= mysql_query(conn, query);
1040     if (truncate_or_delete_failed)
1041       g_warning("Delete failed, we are going to try to create table or view");
1042   }
1043   g_free(query);
1044   return truncate_or_delete_failed;
1045 }
1046 
compare_dbt(gconstpointer a,gconstpointer b,gpointer table_hash)1047 gint compare_dbt(gconstpointer a, gconstpointer b, gpointer table_hash){
1048   gchar *a_key=g_strdup_printf("%s_%s",((struct db_table *)a)->database,((struct db_table *)a)->table);
1049   gchar *b_key=g_strdup_printf("%s_%s",((struct db_table *)b)->database,((struct db_table *)b)->table);
1050   struct db_table * a_val=g_hash_table_lookup(table_hash,a_key);
1051   struct db_table * b_val=g_hash_table_lookup(table_hash,b_key);
1052   g_free(a_key);
1053   g_free(b_key);
1054   return a_val->rows < b_val->rows;
1055 }
compare_filename_part(gconstpointer a,gconstpointer b)1056 gint compare_filename_part (gconstpointer a, gconstpointer b){
1057   return ((struct restore_job *)a)->part > ((struct restore_job *)b)->part;
1058 }
1059 
checksum_table_filename(const gchar * filename,MYSQL * conn)1060 void checksum_table_filename(const gchar *filename, MYSQL *conn) {
1061   gchar *database = NULL, *table = NULL;
1062   get_database_table_from_file(filename,"-checksum",&database,&table);
1063   gchar *real_database=g_hash_table_lookup(db_hash,database);
1064   gchar *real_table=g_hash_table_lookup(tbl_hash,table);
1065   void *infile;
1066   char checksum[256];
1067   int errn=0;
1068   char * row=checksum_table(conn, db ? db : real_database, real_table, &errn);
1069   gboolean is_compressed = FALSE;
1070   gchar *path = g_build_filename(directory, filename, NULL);
1071 
1072   if (!g_str_has_suffix(path, compress_extension)) {
1073     infile = g_fopen(path, "r");
1074     is_compressed = FALSE;
1075   } else {
1076     infile = (void *)gzopen(path, "r");
1077     is_compressed=TRUE;
1078   }
1079 
1080   if (!infile) {
1081     g_critical("cannot open file %s (%d)", filename, errno);
1082     errors++;
1083     return;
1084   }
1085 
1086   char * cs= !is_compressed ? fgets(checksum, 256, infile) :gzgets((gzFile)infile, checksum, 256);
1087   if (cs != NULL) {
1088     if(strcmp(checksum, row) != 0) {
1089       g_warning("Checksum mismatch found for `%s`.`%s`. Got '%s', expecting '%s'", db ? db : real_database, real_table, row, checksum);
1090       errors++;
1091     }
1092     else {
1093       g_message("Checksum confirmed for `%s`.`%s`", db ? db : real_database, real_table);
1094     }
1095   } else {
1096     g_critical("error reading file %s (%d)", filename, errno);
1097     errors++;
1098     return;
1099   }
1100   if (!is_compressed) {
1101     fclose(infile);
1102   } else {
1103     gzclose((gzFile)infile);
1104   }
1105 }
1106 
process_job(struct thread_data * td,struct job * job,int count)1107 gboolean process_job(struct thread_data *td, struct job *job, int count){
1108   switch (job->type) {
1109     case JOB_RESTORE:
1110       process_restore_job(td,job->job_data,count);
1111       break;
1112     case JOB_WAIT:
1113       g_async_queue_push(td->conf->ready, GINT_TO_POINTER(1));
1114       GAsyncQueue *queue=job->job_data;
1115       g_async_queue_pop(queue);
1116       break;
1117     case JOB_SHUTDOWN:
1118 //      g_message("Thread %d shutting down", thread_id);
1119       g_free(job);
1120       return FALSE;
1121       break;
1122     default:
1123       g_critical("Something very bad happened!");
1124       exit(EXIT_FAILURE);
1125   }
1126   return TRUE;
1127 }
1128 
1129 
process_restore_job(struct thread_data * td,struct restore_job * rj,int count)1130 void process_restore_job(struct thread_data *td, struct restore_job *rj, int count){
1131   struct db_table *dbt=rj->dbt;
1132   dbt=rj->dbt;
1133 
1134   switch (rj->type) {
1135     case JOB_RESTORE_STRING:
1136       g_message("Thread %d restoring %s `%s`.`%s` from %s", td->thread_id, rj->object,
1137                 dbt->real_database, dbt->real_table, rj->filename);
1138       guint query_counter=0;
1139       restore_data_in_gstring(td, rj->statement, FALSE, &query_counter);
1140       break;
1141     case JOB_RESTORE_SCHEMA_STRING:
1142       g_message("Thread %d restoring table `%s`.`%s` from %s", td->thread_id,
1143                 dbt->real_database, dbt->real_table, rj->filename);
1144       int truncate_or_delete_failed=0;
1145       if (overwrite_tables)
1146         truncate_or_delete_failed=overwrite_table(td->thrconn,dbt->real_database, dbt->real_table);
1147       if ((purge_mode == TRUNCATE || purge_mode == DELETE) && !truncate_or_delete_failed){
1148         g_message("Skipping table creation `%s`.`%s` from %s", dbt->real_database, dbt->real_table, rj->filename);
1149       }else{
1150         g_message("Creating table `%s`.`%s` from %s", dbt->real_database, dbt->real_table, rj->filename);
1151         if (restore_data_in_gstring(td, rj->statement, FALSE, &query_counter)){
1152           g_critical("Thread %d issue restoring %s: %s",td->thread_id,rj->filename, mysql_error(td->thrconn));
1153         }
1154       }
1155       break;
1156     case JOB_RESTORE_FILENAME:
1157       g_mutex_lock(progress_mutex);
1158       progress++;
1159       g_message("Thread %d restoring `%s`.`%s` part %d of %d from %s. Progress %llu of %llu .", td->thread_id,
1160                 dbt->real_database, dbt->real_table, rj->part, count, rj->filename, progress,total_data_sql_files);
1161       g_mutex_unlock(progress_mutex);
1162       if (restore_data_from_file(td, dbt->real_database, dbt->real_table, rj->filename, FALSE) > 0){
1163         g_critical("Thread %d issue restoring %s: %s",td->thread_id,rj->filename, mysql_error(td->thrconn));
1164       }
1165       break;
1166     case JOB_RESTORE_SCHEMA_FILENAME:
1167       g_message("Thread %d restoring %s on `%s` from %s", td->thread_id, rj->object,
1168                 rj->database, rj->filename);
1169       restore_data_from_file(td, rj->database, NULL, rj->filename, TRUE );
1170       break;
1171     default:
1172       g_critical("Something very bad happened!");
1173       exit(EXIT_FAILURE);
1174     }
1175   free_restore_job(rj);
1176 }
1177 
process_stream_queue(struct thread_data * td)1178 void *process_stream_queue(struct thread_data * td) {
1179   struct job *job = NULL;
1180   int count =0;
1181   gboolean cont=TRUE;
1182 
1183   enum file_type ft;
1184   while (cont){
1185 
1186     ft = (enum file_type )g_async_queue_pop(td->conf->stream_queue);
1187 //    g_message("Thread %d: Processing file type: %d",td->thread_id,ft);
1188     GAsyncQueue *q= get_queue_for_type(td->conf,ft);
1189     if (q != NULL){
1190     job = (struct job *)g_async_queue_pop(q);
1191 
1192     execute_use_if_needs_to(td, job->use_database, "Restoring from stream");
1193     cont=process_job(td, job, count);
1194     }
1195   }
1196   return NULL;
1197 }
1198 
1199 
process_directory_queue(struct thread_data * td)1200 void *process_directory_queue(struct thread_data * td) {
1201   struct db_table *dbt=NULL;
1202   struct job *job = NULL;
1203   gboolean cont=TRUE;
1204   int count =0;
1205 
1206   // Step 1: creating databases
1207   while (cont){
1208     job = (struct job *)g_async_queue_pop(td->conf->database_queue);
1209     cont=process_job(td, job, count);
1210   }
1211   // Step 2: Create tables
1212   cont=TRUE;
1213   while (cont){
1214     job = (struct job *)g_async_queue_pop(td->conf->table_queue);
1215     execute_use_if_needs_to(td, job->use_database, "Restoring tables");
1216     cont=process_job(td, job, count);
1217   }
1218 
1219   // Is this correct in a streaming scenario ?
1220   GList *table_list=td->conf->table_list;
1221   if (table_list == NULL ) {
1222     dbt=NULL;
1223   }else{
1224     dbt=table_list->data;
1225     g_mutex_lock(dbt->mutex);
1226     dbt->current_threads++;
1227     if (dbt->start_time==NULL)
1228       dbt->start_time=g_date_time_new_now_local();
1229     g_mutex_unlock(dbt->mutex);
1230   }
1231 
1232 
1233   // Step 3: Load data
1234   cont=TRUE;
1235   while (cont){
1236     if (dbt != NULL){
1237       g_mutex_lock(dbt->mutex);
1238       if (dbt->current_threads > dbt->max_threads){
1239         dbt->current_threads--;
1240         g_mutex_unlock(dbt->mutex);
1241         table_list=table_list->next;
1242         if (table_list == NULL ){
1243           dbt=NULL;
1244           continue;
1245         }
1246         dbt=table_list->data;
1247         g_mutex_lock(dbt->mutex);
1248         count=dbt->count;
1249         if (dbt->start_time==NULL) dbt->start_time=g_date_time_new_now_local();
1250         dbt->current_threads++;
1251         g_mutex_unlock(dbt->mutex);
1252         continue;
1253       }
1254       g_mutex_unlock(dbt->mutex);
1255 
1256       job = (struct job *)g_async_queue_try_pop(dbt->queue);
1257 
1258       if (job == NULL){
1259         g_mutex_lock(dbt->mutex);
1260         dbt->current_threads--;
1261         if (dbt->current_threads == 0){
1262           dbt->current_threads--;
1263           dbt->start_index_time=g_date_time_new_now_local();
1264           g_mutex_unlock(dbt->mutex);
1265           if (dbt->indexes != NULL) {
1266             g_message("Thread %d restoring indexes `%s`.`%s`", td->thread_id,
1267                   dbt->real_database, dbt->real_table);
1268             guint query_counter=0;
1269             restore_data_in_gstring(td, dbt->indexes, FALSE, &query_counter);
1270           }
1271           dbt->finish_time=g_date_time_new_now_local();
1272         }else{
1273           g_mutex_unlock(dbt->mutex);
1274         }
1275         guint max=dbt->max_threads;
1276         table_list=table_list->next;
1277         if (table_list == NULL ){
1278           dbt=NULL;
1279           continue;
1280         }
1281         dbt=table_list->data;
1282         g_mutex_lock(dbt->mutex);
1283         if (dbt->start_time==NULL) dbt->start_time=g_date_time_new_now_local();
1284         dbt->max_threads = max;
1285         dbt->current_threads++;
1286         g_mutex_unlock(dbt->mutex);
1287         continue;
1288       }
1289     }else{
1290      job = (struct job *)g_async_queue_pop(td->conf->data_queue);
1291     }
1292     execute_use_if_needs_to(td, job->use_database, "Restoring data");
1293     cont=process_job(td, job, count);
1294   }
1295   return NULL;
1296 }
1297 
1298 
process_queue(struct thread_data * td)1299 void *process_queue(struct thread_data *td) {
1300   struct configuration *conf = td->conf;
1301   g_mutex_lock(init_mutex);
1302   td->thrconn = mysql_init(NULL);
1303   g_mutex_unlock(init_mutex);
1304   td->current_database=NULL;
1305 
1306   configure_connection(td->thrconn, "myloader");
1307 
1308   if (!mysql_real_connect(td->thrconn, hostname, username, password, NULL, port,
1309                           socket_path, 0)) {
1310     g_critical("Failed to connect to MySQL server: %s", mysql_error(td->thrconn));
1311     exit(EXIT_FAILURE);
1312   }
1313 
1314   if (mysql_query(td->thrconn, "SET SESSION wait_timeout = 2147483")) {
1315     g_warning("Failed to increase wait_timeout: %s", mysql_error(td->thrconn));
1316   }
1317 
1318   if (!enable_binlog)
1319     mysql_query(td->thrconn, "SET SQL_LOG_BIN=0");
1320 
1321   mysql_query(td->thrconn, set_names_str);
1322   mysql_query(td->thrconn, "/*!40101 SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */");
1323   mysql_query(td->thrconn, "/*!40014 SET UNIQUE_CHECKS=0 */");
1324   mysql_query(td->thrconn, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/");
1325   if (commit_count > 1)
1326     mysql_query(td->thrconn, "SET autocommit=0");
1327 
1328   execute_gstring(td->thrconn, set_session);
1329   g_async_queue_push(conf->ready, GINT_TO_POINTER(1));
1330 
1331   if (db){
1332     td->current_database=db;
1333     execute_use(td, "Initializing thread");
1334     if (stream)
1335       g_async_queue_push(conf->database_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1336   }
1337   g_debug("Thread %d: Starting import", td->thread_id);
1338   if (stream){
1339     process_stream_queue(td);
1340   }else{
1341     process_directory_queue(td);
1342   }
1343   struct job *job = NULL;
1344   gboolean cont=TRUE;
1345   int count =0;
1346 
1347 //  g_message("Thread %d: Starting post import task over table", td->thread_id);
1348   cont=TRUE;
1349   while (cont){
1350     job = (struct job *)g_async_queue_pop(conf->post_table_queue);
1351 //    g_message("%s",((struct restore_job *)job->job_data)->object);
1352     execute_use_if_needs_to(td, job->use_database, "Restoring post table");
1353     cont=process_job(td, job, count);
1354   }
1355 //  g_message("Thread %d: Starting post import task: triggers, procedures and triggers", td->thread_id);
1356   cont=TRUE;
1357   while (cont){
1358     job = (struct job *)g_async_queue_pop(conf->post_queue);
1359     execute_use_if_needs_to(td, job->use_database, "Restoring post tasks");
1360     cont=process_job(td, job, count);
1361   }
1362 
1363   if (td->thrconn)
1364     mysql_close(td->thrconn);
1365   mysql_thread_end();
1366   g_debug("Thread %d ending", td->thread_id);
1367   return NULL;
1368 }
1369 
1370 
restore_data_in_gstring_by_statement(struct thread_data * td,GString * data,gboolean is_schema,guint * query_counter)1371 int restore_data_in_gstring_by_statement(struct thread_data *td, GString *data, gboolean is_schema, guint *query_counter)
1372 {
1373   if (mysql_real_query(td->thrconn, data->str, data->len)) {
1374 	  //g_critical("Error restoring: %s %s", data->str, mysql_error(conn));
1375     errors++;
1376     return 1;
1377   }
1378   *query_counter=*query_counter+1;
1379   if (!is_schema && (commit_count > 1) &&(*query_counter == commit_count)) {
1380     *query_counter= 0;
1381     if (mysql_query(td->thrconn, "COMMIT")) {
1382       errors++;
1383       return 2;
1384     }
1385     mysql_query(td->thrconn, "START TRANSACTION");
1386   }
1387   g_string_set_size(data, 0);
1388   return 0;
1389 }
1390 
restore_data_in_gstring(struct thread_data * td,GString * data,gboolean is_schema,guint * query_counter)1391 int restore_data_in_gstring(struct thread_data *td, GString *data, gboolean is_schema, guint *query_counter)
1392 {
1393   int i=0;
1394   int r=0;
1395   if (data != NULL && data->len > 4){
1396     gchar** line=g_strsplit(data->str, ";\n", -1);
1397     for (i=0; i < (int)g_strv_length(line);i++){
1398        if (strlen(line[i])>2){
1399          GString *str=g_string_new(line[i]);
1400          g_string_append_c(str,';');
1401          r+=restore_data_in_gstring_by_statement(td, str, is_schema, query_counter);
1402        }
1403     }
1404   }
1405   return r;
1406 }
1407 
1408 
append_alter_table(GString * alter_table_statement,char * database,char * table)1409 void append_alter_table(GString * alter_table_statement, char *database, char *table){
1410   g_string_append(alter_table_statement,"ALTER TABLE `");
1411   g_string_append(alter_table_statement, database);
1412   g_string_append(alter_table_statement,"`.`");
1413   g_string_append(alter_table_statement,table);
1414   g_string_append(alter_table_statement,"` ");
1415 }
1416 
finish_alter_table(GString * alter_table_statement)1417 void finish_alter_table(GString * alter_table_statement){
1418   gchar * str=g_strrstr_len(alter_table_statement->str,alter_table_statement->len,",");
1419   if ((str - alter_table_statement->str) > (long int)(alter_table_statement->len - 5)){
1420     *str=';';
1421     g_string_append_c(alter_table_statement,'\n');
1422   }else
1423     g_string_append(alter_table_statement,";\n");
1424 }
1425 
execute_use_if_needs_to(struct thread_data * td,gchar * database,const gchar * msg)1426 void execute_use_if_needs_to(struct thread_data *td, gchar *database, const gchar * msg){
1427   if ( database != NULL && db == NULL ){
1428     if (td->current_database==NULL || g_strcmp0(database, td->current_database) != 0){
1429       td->current_database=database;
1430       if (execute_use(td, msg)){
1431         exit(EXIT_FAILURE);
1432       }
1433     }
1434   }
1435 }
1436 
execute_use(struct thread_data * td,const gchar * msg)1437 guint execute_use(struct thread_data *td, const gchar * msg){
1438   gchar *query = g_strdup_printf("USE `%s`", td->current_database);
1439   if (mysql_query(td->thrconn, query)) {
1440     g_critical("Error switching to database `%s` %s", td->current_database, msg);
1441     g_free(query);
1442     return 1;
1443   }
1444   g_free(query);
1445   return 0;
1446 }
1447 
get_table_name_from_content(const gchar * filename)1448 gchar * get_table_name_from_content(const gchar *filename){
1449   void *infile;
1450   gboolean is_compressed = FALSE;
1451   gboolean eof = FALSE;
1452   GString *data=g_string_sized_new(512);
1453   if (!g_str_has_suffix(filename, compress_extension)) {
1454     infile = g_fopen(filename, "r");
1455     is_compressed = FALSE;
1456   } else {
1457     infile = (void *)gzopen(filename, "r");
1458     is_compressed = TRUE;
1459   }
1460   if (!infile) {
1461     g_critical("cannot open file %s (%d)", filename, errno);
1462     errors++;
1463     return NULL;
1464   }
1465   gchar *real_database=NULL;
1466   while (eof == FALSE) {
1467     if (read_data(infile, is_compressed, data, &eof)) {
1468       if (g_str_has_prefix(data->str,"INSERT ")){
1469         gchar** create= g_strsplit(data->str, "`", 3);
1470         real_database=g_strdup(create[1]);
1471         g_strfreev(create);
1472         break;
1473       }
1474       g_string_set_size(data, 0);
1475     }
1476   }
1477 
1478   if (!is_compressed) {
1479     fclose(infile);
1480   } else {
1481     gzclose((gzFile)infile);
1482   }
1483   return real_database;
1484 }
1485 
get_database_name_from_content(const gchar * filename)1486 gchar * get_database_name_from_content(const gchar *filename){
1487   void *infile;
1488   gboolean is_compressed = FALSE;
1489   gboolean eof = FALSE;
1490   GString *data=g_string_sized_new(512);
1491   if (!g_str_has_suffix(filename, compress_extension)) {
1492     infile = g_fopen(filename, "r");
1493     is_compressed = FALSE;
1494   } else {
1495     infile = (void *)gzopen(filename, "r");
1496     is_compressed = TRUE;
1497   }
1498   if (!infile) {
1499     g_critical("cannot open file %s (%d)", filename, errno);
1500     errors++;
1501     return NULL;
1502   }
1503   gchar *real_database=NULL;
1504   while (eof == FALSE) {
1505     if (read_data(infile, is_compressed, data, &eof)) {
1506       if (g_strrstr(&data->str[data->len >= 5 ? data->len - 5 : 0], ";\n")) {
1507         if (g_str_has_prefix(data->str,"CREATE ")){
1508           gchar** create= g_strsplit(data->str, "`", 3);
1509           real_database=g_strdup(create[1]);
1510           g_strfreev(create);
1511           break;
1512         }
1513       }
1514     }
1515   }
1516 
1517   if (!is_compressed) {
1518     fclose(infile);
1519   } else {
1520     gzclose((gzFile)infile);
1521   }
1522   return real_database;
1523 }
1524 
restore_data_from_file(struct thread_data * td,char * database,char * table,const char * filename,gboolean is_schema)1525 int restore_data_from_file(struct thread_data *td, char *database, char *table,
1526                   const char *filename, gboolean is_schema){
1527   void *infile;
1528   int r=0;
1529   gboolean is_compressed = FALSE;
1530   gboolean eof = FALSE;
1531   guint query_counter = 0;
1532   GString *data = g_string_sized_new(512);
1533   guint line=0,preline=0;
1534   gchar *path = g_build_filename(directory, filename, NULL);
1535 
1536   if (!g_str_has_suffix(path, compress_extension)) {
1537     infile = g_fopen(path, "r");
1538     is_compressed = FALSE;
1539   } else {
1540     infile = (void *)gzopen(path, "r");
1541     is_compressed = TRUE;
1542   }
1543 
1544   if (!infile) {
1545     g_critical("cannot open file %s (%d)", filename, errno);
1546     errors++;
1547     return 1;
1548   }
1549   if (!is_schema && (commit_count > 1) )
1550     mysql_query(td->thrconn, "START TRANSACTION");
1551   while (eof == FALSE) {
1552     if (read_data(infile, is_compressed, data, &eof)) {
1553       if (g_strrstr(&data->str[data->len >= 5 ? data->len - 5 : 0], ";\n")) {
1554         preline=line;
1555         line+=strcount(data->str);
1556         if (rows > 0 && g_strrstr_len(data->str,6,"INSERT"))
1557           split_and_restore_data_in_gstring_by_statement(td,
1558             data, is_schema, &query_counter);
1559         else{
1560           guint tr=restore_data_in_gstring_by_statement(td, data, is_schema, &query_counter);
1561           r+=tr;
1562           if (tr > 0){
1563             g_critical("Error occours between lines: %d and %d on file %s",preline,line,filename);
1564           }
1565         }
1566         g_string_set_size(data, 0);
1567       }
1568     } else {
1569       g_critical("error reading file %s (%d)", filename, errno);
1570       errors++;
1571       return r;
1572     }
1573   }
1574   if (!is_schema && (commit_count > 1) && mysql_query(td->thrconn, "COMMIT")) {
1575     g_critical("Error committing data for %s.%s from file %s: %s",
1576                db ? db : database, table, filename, mysql_error(td->thrconn));
1577     errors++;
1578   }
1579   g_string_free(data, TRUE);
1580   if (!is_compressed) {
1581     fclose(infile);
1582   } else {
1583     gzclose((gzFile)infile);
1584   }
1585 
1586   if (stream && no_delete == FALSE){
1587     g_message("Removing file: %s", path);
1588     remove(path);
1589   }
1590   g_free(path);
1591   return r;
1592 }
1593 
read_data(FILE * file,gboolean is_compressed,GString * data,gboolean * eof)1594 gboolean read_data(FILE *file, gboolean is_compressed, GString *data,
1595                    gboolean *eof) {
1596   char buffer[256];
1597 
1598   do {
1599     if (!is_compressed) {
1600       if (fgets(buffer, 256, file) == NULL) {
1601         if (feof(file)) {
1602           *eof = TRUE;
1603           buffer[0] = '\0';
1604         } else {
1605           return FALSE;
1606         }
1607       }
1608     } else {
1609       if (!gzgets((gzFile)file, buffer, 256)) {
1610         if (gzeof((gzFile)file)) {
1611           *eof = TRUE;
1612           buffer[0] = '\0';
1613         } else {
1614           return FALSE;
1615         }
1616       }
1617     }
1618     g_string_append(data, buffer);
1619   } while ((buffer[strlen(buffer)] != '\0') && *eof == FALSE);
1620 
1621   return TRUE;
1622 }
1623 
get_queue_for_type(struct configuration * conf,enum file_type current_ft)1624 GAsyncQueue *get_queue_for_type(struct configuration *conf, enum file_type current_ft){
1625   switch (current_ft){
1626     case INIT:
1627     case SCHEMA_CREATE:
1628       return conf->database_queue;
1629     case SCHEMA_TABLE:
1630       return conf->table_queue;
1631     case SCHEMA_VIEW:
1632     case SCHEMA_TRIGGER:
1633     case SCHEMA_POST:
1634     case CHECKSUM:
1635       return conf->post_queue;
1636     case METADATA_GLOBAL:
1637       return NULL;
1638     case METADATA_TABLE:
1639       return NULL;
1640       return conf->post_table_queue;
1641     case DATA:
1642       return conf->data_queue;
1643       break;
1644     case LOAD_DATA:
1645       break;
1646 
1647   }
1648   return NULL;
1649 }
1650 
send_shutdown_jobs(GAsyncQueue * queue)1651 void send_shutdown_jobs(GAsyncQueue * queue){
1652       guint n=0;
1653       for (n = 0; n < num_threads; n++) {
1654         g_async_queue_push(queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1655       }
1656 }
1657 
process_stream_filename(struct configuration * conf,GHashTable * table_hash,gchar * filename)1658 void process_stream_filename(struct configuration *conf,GHashTable *table_hash, gchar * filename){
1659   enum file_type current_ft=process_filename(conf,table_hash,filename);
1660   if (current_ft != SCHEMA_VIEW &&
1661       current_ft != SCHEMA_TRIGGER &&
1662       current_ft != SCHEMA_POST &&
1663       current_ft != CHECKSUM )
1664   g_async_queue_push(conf->stream_queue, GINT_TO_POINTER(current_ft));
1665 }
1666 
1667 
process_stream(struct configuration * conf)1668 void *process_stream(struct configuration *conf){
1669   char * filename=NULL,*real_filename=NULL;
1670   char buffer[1024];
1671   long unsigned int len=0;
1672   FILE *file=NULL;
1673   gboolean eof=FALSE;
1674   GHashTable *table_hash=g_hash_table_new ( g_str_hash, g_str_equal );
1675   do {
1676     if(fgets(buffer, 1024, stdin) == NULL){
1677       if (file && feof(file)){
1678         eof = TRUE;
1679         buffer[0] = '\0';
1680         fclose(file);
1681       }else{
1682         break;
1683       }
1684     }else{
1685       if (g_str_has_prefix(buffer,"-- ")){
1686         if (file){
1687           fclose(file);
1688           process_stream_filename(conf,table_hash,filename);
1689         }
1690         buffer[strlen(buffer)-1]='\0';
1691         real_filename = g_build_filename(directory,&(buffer[3]),NULL);
1692         filename = g_build_filename(&(buffer[3]),NULL);
1693         file = g_fopen(real_filename, "w");
1694       }else{
1695         if (file){
1696           len=write(fileno(file),buffer,strlen(buffer));
1697           if (len != strlen(buffer)) {
1698             g_critical("File size not the same");
1699             exit(EXIT_FAILURE);
1700           }
1701         }
1702       }
1703     }
1704   } while (eof == FALSE);
1705   fclose(file);
1706   process_stream_filename(conf,table_hash,filename);
1707   guint n=0;
1708   for (n = 0; n < num_threads *2 ; n++) {
1709     g_async_queue_push(conf->data_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1710     g_async_queue_push(conf->post_table_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1711     g_async_queue_push(conf->post_queue, new_job(JOB_SHUTDOWN,NULL,NULL));
1712     g_async_queue_push(conf->stream_queue, GINT_TO_POINTER(DATA));
1713   }
1714 
1715   return NULL;
1716 }
1717