1 /*
2 This program is free software: you can redistribute it and/or modify
3 it under the terms of the GNU General Public License as published by
4 the Free Software Foundation, either version 3 of the License, or
5 (at your option) any later version.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program. If not, see <http://www.gnu.org/licenses/>.
14
15 Authors: Domas Mituzas, Facebook ( domas at fb dot com )
16 Mark Leith, Oracle Corporation (mark dot leith at oracle
17 dot com) Andrew Hutchings, SkySQL (andrew at skysql dot com)
18
19 */
20
21 #include <glib.h>
22 #include <glib/gstdio.h>
23 #include <my_global.h>
24 #include <mysql.h>
25 #include <my_sys.h>
26 #include <mysqld_error.h>
27 #include <sql_common.h>
28 #include <string.h>
29 #include <zlib.h>
30 #include "mydumper.h"
31 #include "binlog.h"
32 #include "set_verbose.h"
33
34 #define BINLOG_MAGIC "\xfe\x62\x69\x6e"
35
36 #define EVENT_HEADER_LENGTH 19
37 #define EVENT_ROTATE_FIXED_LENGTH 8
38
39 enum event_postions {
40 EVENT_TIMESTAMP_POSITION = 0,
41 EVENT_TYPE_POSITION = 4,
42 EVENT_SERVERID_POSITION = 5,
43 EVENT_LENGTH_POSITION = 9,
44 EVENT_NEXT_POSITION = 13,
45 EVENT_FLAGS_POSITION = 17,
46 EVENT_EXTRA_FLAGS_POSITION =
47 19 // currently unused in v4 binlogs, but a good marker for end of header
48 };
49
50 enum event_type {
51 ROTATE_EVENT = 4,
52 FORMAT_DESCRIPTION_EVENT = 15,
53 EVENT_TOO_SHORT = 254 // arbitrary high number, in 5.1 the max event type
54 // number is 27 so this should be fine for a while
55 };
56
57 extern int compress_output;
58 extern gboolean daemon_mode;
59 extern gboolean shutdown_triggered;
60
61 FILE *new_binlog_file(char *binlog_file, const char *binlog_dir);
62 void close_binlog_file(FILE *outfile);
63 char *rotate_file_name(const char *buf);
64
get_binlogs(MYSQL * conn,struct configuration * conf)65 void get_binlogs(MYSQL *conn, struct configuration *conf) {
66 // TODO: find logs we already have, use start position based on position of
67 // last log.
68 MYSQL_RES *result;
69 MYSQL_ROW row;
70 char *last_filename = NULL;
71 guint64 last_position;
72
73 // Only snapshot dump the binlogs once in daemon mode
74 static gboolean got_binlogs = FALSE;
75 if (got_binlogs)
76 return;
77 else
78 got_binlogs = TRUE;
79
80 if (mysql_query(conn, "SHOW MASTER STATUS")) {
81 g_critical("Error: Could not execute query: %s", mysql_error(conn));
82 return;
83 }
84
85 result = mysql_store_result(conn);
86 if ((row = mysql_fetch_row(result))) {
87 last_filename = g_strdup(row[0]);
88 last_position = strtoll(row[1], NULL, 10);
89 } else {
90 g_critical("Error: Could not obtain binary log stop position");
91 if (last_filename != NULL)
92 g_free(last_filename);
93 return;
94 }
95 mysql_free_result(result);
96
97 if (mysql_query(conn, "SHOW BINARY LOGS")) {
98 g_critical("Error: Could not execute query: %s", mysql_error(conn));
99 if (last_filename != NULL)
100 g_free(last_filename);
101 return;
102 }
103
104 result = mysql_store_result(conn);
105 while ((row = mysql_fetch_row(result))) {
106 struct job *j = g_new0(struct job, 1);
107 struct binlog_job *bj = g_new0(struct binlog_job, 1);
108 j->job_data = (void *)bj;
109 bj->filename = g_strdup(row[0]);
110 bj->start_position = 4;
111 bj->stop_position =
112 (!strcasecmp(row[0], last_filename)) ? last_position : 0;
113 j->conf = conf;
114 j->type = JOB_BINLOG;
115 g_async_queue_push(conf->queue, j);
116 }
117 mysql_free_result(result);
118 if (last_filename != NULL)
119 g_free(last_filename);
120 }
121
get_binlog_file(MYSQL * conn,char * binlog_file,const char * binlog_directory,guint64 start_position,guint64 stop_position,gboolean continuous)122 void get_binlog_file(MYSQL *conn, char *binlog_file,
123 const char *binlog_directory, guint64 start_position,
124 guint64 stop_position, gboolean continuous) {
125 // set serverID = max serverID - threadID to try an eliminate conflicts,
126 // 0 is bad because mysqld will disconnect at the end of the last log
127 // dupes aren't too bad since it is up to the client to check for them
128 uchar buf[128];
129 // We need to read the raw network packets
130 NET *net;
131 net = &conn->net;
132 unsigned long len;
133 FILE *outfile;
134 guint32 event_type;
135 gboolean read_error = FALSE;
136 gboolean read_end = FALSE;
137 gboolean rotated = FALSE;
138 guint32 server_id = G_MAXUINT32 - mysql_thread_id(conn);
139 guint64 pos_counter = 0;
140
141 int4store(buf, (guint32)start_position);
142 // Binlog flags (2 byte int)
143 int2store(buf + 4, 0);
144 // ServerID
145 int4store(buf + 6, server_id);
146 memcpy(buf + 10, binlog_file, strlen(binlog_file));
147 #if MYSQL_VERSION_ID < 50100
148 if (simple_command(conn, COM_BINLOG_DUMP, (const char *)buf,
149 #else
150 if (simple_command(conn, COM_BINLOG_DUMP, buf,
151 #endif
152 strlen(binlog_file) + 10, 1)) {
153 g_critical("Error: binlog: Critical error whilst requesting binary log");
154 }
155
156 while (1) {
157 outfile = new_binlog_file(binlog_file, binlog_directory);
158 if (outfile == NULL) {
159 g_critical("Error: binlog: Could not create binlog file '%s', %d",
160 binlog_file, errno);
161 return;
162 }
163
164 write_binlog(outfile, BINLOG_MAGIC, 4);
165 while (1) {
166 len = 0;
167 if (net->vio != 0)
168 len = my_net_read(net);
169 if ((len == 0) || (len == ~(unsigned long)0)) {
170 // Net timeout (set to 1 second)
171 if (mysql_errno(conn) == ER_NET_READ_INTERRUPTED) {
172 if (shutdown_triggered) {
173 close_binlog_file(outfile);
174 return;
175 } else {
176 continue;
177 }
178 // A real error
179 } else {
180 g_critical("Error: binlog: Network packet read error getting binlog "
181 "file: %s",
182 binlog_file);
183 close_binlog_file(outfile);
184 return;
185 }
186 }
187 if (len < 8 && net->read_pos[0]) {
188 // end of data
189 break;
190 }
191 pos_counter += len;
192 event_type = get_event((const char *)net->read_pos + 1, len - 1);
193 switch (event_type) {
194 case EVENT_TOO_SHORT:
195 g_critical("Error: binlog: Event too short in binlog file: %s",
196 binlog_file);
197 read_error = TRUE;
198 break;
199 case ROTATE_EVENT:
200 if (rotated) {
201 read_end = TRUE;
202 } else {
203 len = 1;
204 rotated = TRUE;
205 }
206 break;
207 default:
208 // if we get this far this is a normal event to record
209 break;
210 }
211 if (read_error)
212 break;
213 write_binlog(outfile, (const char *)net->read_pos + 1, len - 1);
214 if (read_end) {
215 if (!continuous) {
216 break;
217 } else {
218 g_free(binlog_file);
219 binlog_file = rotate_file_name((const char *)net->read_pos + 1);
220 break;
221 }
222 }
223 // stop if we are at requested end of last log
224 if ((stop_position > 0) && (pos_counter >= stop_position))
225 break;
226 }
227 close_binlog_file(outfile);
228 if ((!continuous) || (!read_end))
229 break;
230
231 if (continuous && read_end) {
232 read_end = FALSE;
233 rotated = FALSE;
234 }
235 }
236 }
237
rotate_file_name(const char * buf)238 char *rotate_file_name(const char *buf) {
239 guint32 event_length = 0;
240
241 // event length is 4 bytes at position 9
242 event_length = uint4korr(&buf[EVENT_LENGTH_POSITION]);
243 // event length includes the header, plus a rotate event has a fixed 8byte
244 // part we don't need
245 event_length = event_length - EVENT_HEADER_LENGTH - EVENT_ROTATE_FIXED_LENGTH;
246
247 return g_strndup(&buf[EVENT_HEADER_LENGTH + EVENT_ROTATE_FIXED_LENGTH],
248 event_length);
249 }
250
new_binlog_file(char * binlog_file,const char * binlog_dir)251 FILE *new_binlog_file(char *binlog_file, const char *binlog_dir) {
252 FILE *outfile;
253 char *filename;
254
255 if (!compress_output) {
256 filename = g_strdup_printf("%s/%s", binlog_dir, binlog_file);
257 outfile = g_fopen(filename, "w");
258 } else {
259 filename = g_strdup_printf("%s/%s.gz", binlog_dir, binlog_file);
260 outfile = (void *)gzopen(filename, "w");
261 }
262 g_free(filename);
263
264 return outfile;
265 }
266
close_binlog_file(FILE * outfile)267 void close_binlog_file(FILE *outfile) {
268 if (!compress_output)
269 fclose(outfile);
270 else
271 gzclose((gzFile)outfile);
272 }
273
get_event(const char * buf,unsigned int len)274 unsigned int get_event(const char *buf, unsigned int len) {
275 if (len < EVENT_TYPE_POSITION)
276 return EVENT_TOO_SHORT;
277 return buf[EVENT_TYPE_POSITION];
278
279 // TODO: Would be good if we can check for valid event type, unfortunately
280 // this check can change from version to version
281 }
282
write_binlog(FILE * file,const char * data,guint64 len)283 void write_binlog(FILE *file, const char *data, guint64 len) {
284 int err;
285
286 if (len > 0) {
287 int write_result;
288
289 if (!compress_output)
290 write_result = write(fileno(file), data, len);
291 else
292 write_result = gzwrite((gzFile)file, data, len);
293
294 if (write_result <= 0) {
295 if (!compress_output)
296 g_critical("Error: binlog: Error writing binary log: %s",
297 strerror(errno));
298 else
299 g_critical("Error: binlog: Error writing compressed binary log: %s",
300 gzerror((gzFile)file, &err));
301 }
302 }
303 }
304