1 /*
2     This program is free software: you can redistribute it and/or modify
3     it under the terms of the GNU General Public License as published by
4     the Free Software Foundation, either version 3 of the License, or
5     (at your option) any later version.
6 
7     This program is distributed in the hope that it will be useful,
8     but WITHOUT ANY WARRANTY; without even the implied warranty of
9     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10     GNU General Public License for more details.
11 
12     You should have received a copy of the GNU General Public License
13     along with this program.  If not, see <http://www.gnu.org/licenses/>.
14 
15         Authors: 	Domas Mituzas, Facebook ( domas at fb dot com )
16                         Mark Leith, Oracle Corporation (mark dot leith at oracle
17    dot com) Andrew Hutchings, SkySQL (andrew at skysql dot com)
18 
19 */
20 
21 #include <glib.h>
22 #include <glib/gstdio.h>
23 #include <my_global.h>
24 #include <mysql.h>
25 #include <my_sys.h>
26 #include <mysqld_error.h>
27 #include <sql_common.h>
28 #include <string.h>
29 #include <zlib.h>
30 #include "mydumper.h"
31 #include "binlog.h"
32 #include "set_verbose.h"
33 
34 #define BINLOG_MAGIC "\xfe\x62\x69\x6e"
35 
36 #define EVENT_HEADER_LENGTH 19
37 #define EVENT_ROTATE_FIXED_LENGTH 8
38 
39 enum event_postions {
40   EVENT_TIMESTAMP_POSITION = 0,
41   EVENT_TYPE_POSITION = 4,
42   EVENT_SERVERID_POSITION = 5,
43   EVENT_LENGTH_POSITION = 9,
44   EVENT_NEXT_POSITION = 13,
45   EVENT_FLAGS_POSITION = 17,
46   EVENT_EXTRA_FLAGS_POSITION =
47       19 // currently unused in v4 binlogs, but a good marker for end of header
48 };
49 
50 enum event_type {
51   ROTATE_EVENT = 4,
52   FORMAT_DESCRIPTION_EVENT = 15,
53   EVENT_TOO_SHORT = 254 // arbitrary high number, in 5.1 the max event type
54                         // number is 27 so this should be fine for a while
55 };
56 
57 extern int compress_output;
58 extern gboolean daemon_mode;
59 extern gboolean shutdown_triggered;
60 
61 FILE *new_binlog_file(char *binlog_file, const char *binlog_dir);
62 void close_binlog_file(FILE *outfile);
63 char *rotate_file_name(const char *buf);
64 
get_binlogs(MYSQL * conn,struct configuration * conf)65 void get_binlogs(MYSQL *conn, struct configuration *conf) {
66   // TODO: find logs we already have, use start position based on position of
67   // last log.
68   MYSQL_RES *result;
69   MYSQL_ROW row;
70   char *last_filename = NULL;
71   guint64 last_position;
72 
73   // Only snapshot dump the binlogs once in daemon mode
74   static gboolean got_binlogs = FALSE;
75   if (got_binlogs)
76     return;
77   else
78     got_binlogs = TRUE;
79 
80   if (mysql_query(conn, "SHOW MASTER STATUS")) {
81     g_critical("Error: Could not execute query: %s", mysql_error(conn));
82     return;
83   }
84 
85   result = mysql_store_result(conn);
86   if ((row = mysql_fetch_row(result))) {
87     last_filename = g_strdup(row[0]);
88     last_position = strtoll(row[1], NULL, 10);
89   } else {
90     g_critical("Error: Could not obtain binary log stop position");
91     if (last_filename != NULL)
92       g_free(last_filename);
93     return;
94   }
95   mysql_free_result(result);
96 
97   if (mysql_query(conn, "SHOW BINARY LOGS")) {
98     g_critical("Error: Could not execute query: %s", mysql_error(conn));
99     if (last_filename != NULL)
100       g_free(last_filename);
101     return;
102   }
103 
104   result = mysql_store_result(conn);
105   while ((row = mysql_fetch_row(result))) {
106     struct job *j = g_new0(struct job, 1);
107     struct binlog_job *bj = g_new0(struct binlog_job, 1);
108     j->job_data = (void *)bj;
109     bj->filename = g_strdup(row[0]);
110     bj->start_position = 4;
111     bj->stop_position =
112         (!strcasecmp(row[0], last_filename)) ? last_position : 0;
113     j->conf = conf;
114     j->type = JOB_BINLOG;
115     g_async_queue_push(conf->queue, j);
116   }
117   mysql_free_result(result);
118   if (last_filename != NULL)
119     g_free(last_filename);
120 }
121 
get_binlog_file(MYSQL * conn,char * binlog_file,const char * binlog_directory,guint64 start_position,guint64 stop_position,gboolean continuous)122 void get_binlog_file(MYSQL *conn, char *binlog_file,
123                      const char *binlog_directory, guint64 start_position,
124                      guint64 stop_position, gboolean continuous) {
125   // set serverID = max serverID - threadID to try an eliminate conflicts,
126   // 0 is bad because mysqld will disconnect at the end of the last log
127   // dupes aren't too bad since it is up to the client to check for them
128   uchar buf[128];
129   // We need to read the raw network packets
130   NET *net;
131   net = &conn->net;
132   unsigned long len;
133   FILE *outfile;
134   guint32 event_type;
135   gboolean read_error = FALSE;
136   gboolean read_end = FALSE;
137   gboolean rotated = FALSE;
138   guint32 server_id = G_MAXUINT32 - mysql_thread_id(conn);
139   guint64 pos_counter = 0;
140 
141   int4store(buf, (guint32)start_position);
142   // Binlog flags (2 byte int)
143   int2store(buf + 4, 0);
144   // ServerID
145   int4store(buf + 6, server_id);
146   memcpy(buf + 10, binlog_file, strlen(binlog_file));
147 #if MYSQL_VERSION_ID < 50100
148   if (simple_command(conn, COM_BINLOG_DUMP, (const char *)buf,
149 #else
150   if (simple_command(conn, COM_BINLOG_DUMP, buf,
151 #endif
152                      strlen(binlog_file) + 10, 1)) {
153     g_critical("Error: binlog: Critical error whilst requesting binary log");
154   }
155 
156   while (1) {
157     outfile = new_binlog_file(binlog_file, binlog_directory);
158     if (outfile == NULL) {
159       g_critical("Error: binlog: Could not create binlog file '%s', %d",
160                  binlog_file, errno);
161       return;
162     }
163 
164     write_binlog(outfile, BINLOG_MAGIC, 4);
165     while (1) {
166       len = 0;
167       if (net->vio != 0)
168         len = my_net_read(net);
169       if ((len == 0) || (len == ~(unsigned long)0)) {
170         // Net timeout (set to 1 second)
171         if (mysql_errno(conn) == ER_NET_READ_INTERRUPTED) {
172           if (shutdown_triggered) {
173             close_binlog_file(outfile);
174             return;
175           } else {
176             continue;
177           }
178           // A real error
179         } else {
180           g_critical("Error: binlog: Network packet read error getting binlog "
181                      "file: %s",
182                      binlog_file);
183           close_binlog_file(outfile);
184           return;
185         }
186       }
187       if (len < 8 && net->read_pos[0]) {
188         // end of data
189         break;
190       }
191       pos_counter += len;
192       event_type = get_event((const char *)net->read_pos + 1, len - 1);
193       switch (event_type) {
194       case EVENT_TOO_SHORT:
195         g_critical("Error: binlog: Event too short in binlog file: %s",
196                    binlog_file);
197         read_error = TRUE;
198         break;
199       case ROTATE_EVENT:
200         if (rotated) {
201           read_end = TRUE;
202         } else {
203           len = 1;
204           rotated = TRUE;
205         }
206         break;
207       default:
208         // if we get this far this is a normal event to record
209         break;
210       }
211       if (read_error)
212         break;
213       write_binlog(outfile, (const char *)net->read_pos + 1, len - 1);
214       if (read_end) {
215         if (!continuous) {
216           break;
217         } else {
218           g_free(binlog_file);
219           binlog_file = rotate_file_name((const char *)net->read_pos + 1);
220           break;
221         }
222       }
223       // stop if we are at requested end of last log
224       if ((stop_position > 0) && (pos_counter >= stop_position))
225         break;
226     }
227     close_binlog_file(outfile);
228     if ((!continuous) || (!read_end))
229       break;
230 
231     if (continuous && read_end) {
232       read_end = FALSE;
233       rotated = FALSE;
234     }
235   }
236 }
237 
rotate_file_name(const char * buf)238 char *rotate_file_name(const char *buf) {
239   guint32 event_length = 0;
240 
241   // event length is 4 bytes at position 9
242   event_length = uint4korr(&buf[EVENT_LENGTH_POSITION]);
243   // event length includes the header, plus a rotate event has a fixed 8byte
244   // part we don't need
245   event_length = event_length - EVENT_HEADER_LENGTH - EVENT_ROTATE_FIXED_LENGTH;
246 
247   return g_strndup(&buf[EVENT_HEADER_LENGTH + EVENT_ROTATE_FIXED_LENGTH],
248                    event_length);
249 }
250 
new_binlog_file(char * binlog_file,const char * binlog_dir)251 FILE *new_binlog_file(char *binlog_file, const char *binlog_dir) {
252   FILE *outfile;
253   char *filename;
254 
255   if (!compress_output) {
256     filename = g_strdup_printf("%s/%s", binlog_dir, binlog_file);
257     outfile = g_fopen(filename, "w");
258   } else {
259     filename = g_strdup_printf("%s/%s.gz", binlog_dir, binlog_file);
260     outfile = (void *)gzopen(filename, "w");
261   }
262   g_free(filename);
263 
264   return outfile;
265 }
266 
close_binlog_file(FILE * outfile)267 void close_binlog_file(FILE *outfile) {
268   if (!compress_output)
269     fclose(outfile);
270   else
271     gzclose((gzFile)outfile);
272 }
273 
get_event(const char * buf,unsigned int len)274 unsigned int get_event(const char *buf, unsigned int len) {
275   if (len < EVENT_TYPE_POSITION)
276     return EVENT_TOO_SHORT;
277   return buf[EVENT_TYPE_POSITION];
278 
279   // TODO: Would be good if we can check for valid event type, unfortunately
280   // this check can change from version to version
281 }
282 
write_binlog(FILE * file,const char * data,guint64 len)283 void write_binlog(FILE *file, const char *data, guint64 len) {
284   int err;
285 
286   if (len > 0) {
287     int write_result;
288 
289     if (!compress_output)
290       write_result = write(fileno(file), data, len);
291     else
292       write_result = gzwrite((gzFile)file, data, len);
293 
294     if (write_result <= 0) {
295       if (!compress_output)
296         g_critical("Error: binlog: Error writing binary log: %s",
297                    strerror(errno));
298       else
299         g_critical("Error: binlog: Error writing compressed binary log: %s",
300                    gzerror((gzFile)file, &err));
301     }
302   }
303 }
304