1 /*
2 Copyright (c) 2000, 2018, Oracle and/or its affiliates.
3 Copyright (c) 2009, 2021, MariaDB
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
17
18
19 #include "mariadb.h"
20 #include "sql_priv.h"
21 #include "handler.h"
22 #ifndef MYSQL_CLIENT
23 #include "unireg.h"
24 #include "log_event.h"
25 #include "sql_base.h" // close_thread_tables
26 #include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
27 #include "sql_locale.h" // MY_LOCALE, my_locale_by_number, my_locale_en_US
28 #include "key.h" // key_copy
29 #include "lock.h" // mysql_unlock_tables
30 #include "sql_parse.h" // mysql_test_parse_for_slave
31 #include "tztime.h" // struct Time_zone
32 #include "sql_load.h" // mysql_load
33 #include "sql_db.h" // load_db_opt_by_name
34 #include "slave.h"
35 #include "rpl_rli.h"
36 #include "rpl_mi.h"
37 #include "rpl_filter.h"
38 #include "rpl_record.h"
39 #include "transaction.h"
40 #include <my_dir.h>
41 #include "sql_show.h" // append_identifier
42 #include <mysql/psi/mysql_statement.h>
43 #include <strfunc.h>
44 #include "compat56.h"
45 #include "sql_insert.h"
46 #else
47 #include "mysqld_error.h"
48 #endif /* MYSQL_CLIENT */
49
50 #include <my_bitmap.h>
51 #include "rpl_utility.h"
52 #include "rpl_constants.h"
53 #include "sql_digest.h"
54 #include "zlib.h"
55
56 #define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
57
58 #ifndef _AIX
59 PSI_memory_key key_memory_log_event;
60 #endif
61 PSI_memory_key key_memory_Incident_log_event_message;
62 PSI_memory_key key_memory_Rows_query_log_event_rows_query;
63
64 /**
65 BINLOG_CHECKSUM variable.
66 */
67 const char *binlog_checksum_type_names[]= {
68 "NONE",
69 "CRC32",
70 NullS
71 };
72
73 unsigned int binlog_checksum_type_length[]= {
74 sizeof("NONE") - 1,
75 sizeof("CRC32") - 1,
76 0
77 };
78
79 TYPELIB binlog_checksum_typelib=
80 {
81 array_elements(binlog_checksum_type_names) - 1, "",
82 binlog_checksum_type_names,
83 binlog_checksum_type_length
84 };
85
86
87 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
88
89 /*
90 Size of buffer for printing a double in format %.<PREC>g
91
92 optional '-' + optional zero + '.' + PREC digits + 'e' + sign +
93 exponent digits + '\0'
94 */
95 #define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
96
97 /*
98 replication event checksum is introduced in the following "checksum-home" version.
99 The checksum-aware servers extract FD's version to decide whether the FD event
100 carries checksum info.
101
102 TODO: correct the constant when it has been determined
103 (which main tree to push and when)
104 */
105 const Version checksum_version_split_mysql(5, 6, 1);
106 const Version checksum_version_split_mariadb(5, 3, 0);
107
108 // First MySQL version with fraction seconds
109 const Version fsp_version_split_mysql(5, 6, 0);
110
111 /*
112 Cache that will automatically be written to a dedicated file on
113 destruction.
114
115 DESCRIPTION
116
117 */
118 class Write_on_release_cache
119 {
120 public:
121 enum flag
122 {
123 FLUSH_F
124 };
125
126 typedef unsigned short flag_set;
127
128 /*
129 Constructor.
130
131 SYNOPSIS
132 Write_on_release_cache
133 cache Pointer to cache to use
134 file File to write cache to upon destruction
135 flags Flags for the cache
136
137 DESCRIPTION
138 Cache common parameters and ensure common flush_data() code
139 on successful copy of the cache, the cache will be reinited as a
140 WRITE_CACHE.
141
142 Currently, a pointer to the cache is provided in the
143 constructor, but it would be possible to create a subclass
144 holding the IO_CACHE itself.
145 */
Write_on_release_cache(IO_CACHE * cache,FILE * file,flag_set flags=0,Log_event * ev=NULL)146 Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0, Log_event *ev = NULL)
147 : m_cache(cache), m_file(file), m_flags(flags), m_ev(ev)
148 {
149 reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE);
150 }
151
~Write_on_release_cache()152 ~Write_on_release_cache() {}
153
flush_data()154 bool flush_data()
155 {
156 #ifdef MYSQL_CLIENT
157 if (m_ev == NULL)
158 {
159 if (copy_event_cache_to_file_and_reinit(m_cache, m_file))
160 return 1;
161 if ((m_flags & FLUSH_F) && fflush(m_file))
162 return 1;
163 }
164 else // if m_ev<>NULL, then storing the output in output_buf
165 {
166 LEX_STRING tmp_str;
167 bool res;
168 if (copy_event_cache_to_string_and_reinit(m_cache, &tmp_str))
169 return 1;
170 /* use 2 argument append as tmp_str is not \0 terminated */
171 res= m_ev->output_buf.append(tmp_str.str, tmp_str.length);
172 my_free(tmp_str.str);
173 return res ? res : 0;
174 }
175 #else /* MySQL_SERVER */
176 if (copy_event_cache_to_file_and_reinit(m_cache, m_file))
177 return 1;
178 if ((m_flags & FLUSH_F) && fflush(m_file))
179 return 1;
180 #endif
181 return 0;
182 }
183
184 /*
185 Return a pointer to the internal IO_CACHE.
186
187 SYNOPSIS
188 operator&()
189
190 DESCRIPTION
191
192 Function to return a pointer to the internal cache, so that the
193 object can be treated as a IO_CACHE and used with the my_b_*
194 IO_CACHE functions
195
196 RETURN VALUE
197 A pointer to the internal IO_CACHE.
198 */
operator &()199 IO_CACHE *operator&()
200 {
201 return m_cache;
202 }
203
204 private:
205 // Hidden, to prevent usage.
206 Write_on_release_cache(Write_on_release_cache const&);
207
208 IO_CACHE *m_cache;
209 FILE *m_file;
210 flag_set m_flags;
211 Log_event *m_ev; // Used for Flashback
212 };
213
214 #ifndef DBUG_OFF
215 #define DBUG_DUMP_EVENT_BUF(B,L) \
216 do { \
217 const uchar *_buf=(uchar*)(B); \
218 size_t _len=(L); \
219 if (_len >= LOG_EVENT_MINIMAL_HEADER_LEN) \
220 { \
221 DBUG_PRINT("data", ("header: timestamp:%u type:%u server_id:%u len:%u log_pos:%u flags:%u", \
222 uint4korr(_buf), _buf[EVENT_TYPE_OFFSET], \
223 uint4korr(_buf+SERVER_ID_OFFSET), \
224 uint4korr(_buf+EVENT_LEN_OFFSET), \
225 uint4korr(_buf+LOG_POS_OFFSET), \
226 uint4korr(_buf+FLAGS_OFFSET))); \
227 DBUG_DUMP("data", _buf+LOG_EVENT_MINIMAL_HEADER_LEN, \
228 _len-LOG_EVENT_MINIMAL_HEADER_LEN); \
229 } \
230 else \
231 DBUG_DUMP("data", _buf, _len); \
232 } while(0)
233 #else
234 #define DBUG_DUMP_EVENT_BUF(B,L) do { } while(0)
235 #endif
236
237 /*
238 read_str()
239 */
240
read_str(const char ** buf,const char * buf_end,const char ** str,uint8 * len)241 static inline int read_str(const char **buf, const char *buf_end,
242 const char **str, uint8 *len)
243 {
244 if (*buf + ((uint) (uchar) **buf) >= buf_end)
245 return 1;
246 *len= (uint8) **buf;
247 *str= (*buf)+1;
248 (*buf)+= (uint) *len+1;
249 return 0;
250 }
251
252
253 /**
254 Transforms a string into "" or its expression in X'HHHH' form.
255 */
256
str_to_hex(char * to,const char * from,size_t len)257 char *str_to_hex(char *to, const char *from, size_t len)
258 {
259 if (len)
260 {
261 *to++= 'X';
262 *to++= '\'';
263 to= octet2hex(to, from, len);
264 *to++= '\'';
265 *to= '\0';
266 }
267 else
268 to= strmov(to, "\"\"");
269 return to; // pointer to end 0 of 'to'
270 }
271
272 #define BINLOG_COMPRESSED_HEADER_LEN 1
273 #define BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES 4
274 /**
275 Compressed Record
276 Record Header: 1 Byte
277 7 Bit: Always 1, mean compressed;
278 4-6 Bit: Compressed algorithm - Always 0, means zlib
279 It maybe support other compression algorithm in the future.
280 0-3 Bit: Bytes of "Record Original Length"
281 Record Original Length: 1-4 Bytes
282 Compressed Buf:
283 */
284
285 /**
286 Get the length of compress content.
287 */
288
binlog_get_compress_len(uint32 len)289 uint32 binlog_get_compress_len(uint32 len)
290 {
291 /* 5 for the begin content, 1 reserved for a '\0'*/
292 return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES)
293 + compressBound(len) + 1);
294 }
295
296 /**
297 Compress buf from 'src' to 'dst'.
298
299 Note: 1) Then the caller should guarantee the length of 'dst', which
300 can be got by binlog_get_uncompress_len, is enough to hold
301 the content uncompressed.
302 2) The 'comlen' should stored the length of 'dst', and it will
303 be set as the size of compressed content after return.
304
305 return zero if successful, others otherwise.
306 */
binlog_buf_compress(const char * src,char * dst,uint32 len,uint32 * comlen)307 int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
308 {
309 uchar lenlen;
310 if (len & 0xFF000000)
311 {
312 dst[1] = uchar(len >> 24);
313 dst[2] = uchar(len >> 16);
314 dst[3] = uchar(len >> 8);
315 dst[4] = uchar(len);
316 lenlen = 4;
317 }
318 else if (len & 0x00FF0000)
319 {
320 dst[1] = uchar(len >> 16);
321 dst[2] = uchar(len >> 8);
322 dst[3] = uchar(len);
323 lenlen = 3;
324 }
325 else if (len & 0x0000FF00)
326 {
327 dst[1] = uchar(len >> 8);
328 dst[2] = uchar(len);
329 lenlen = 2;
330 }
331 else
332 {
333 dst[1] = uchar(len);
334 lenlen = 1;
335 }
336 dst[0] = 0x80 | (lenlen & 0x07);
337
338 uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1;
339 if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen,
340 (const Bytef *)src, (uLongf)len) != Z_OK)
341 {
342 return 1;
343 }
344 *comlen = (uint32)tmplen + BINLOG_COMPRESSED_HEADER_LEN + lenlen;
345 return 0;
346 }
347
348 /**
349 Convert a query_compressed_log_event to query_log_event
350 from 'src' to 'dst', the size after compression stored in 'newlen'.
351
352 @Note:
353 1) The caller should call my_free to release 'dst' if *is_malloc is
354 returned as true.
355 2) If *is_malloc is retuened as false, then 'dst' reuses the passed-in
356 'buf'.
357
358 return zero if successful, non-zero otherwise.
359 */
360
361 int
query_event_uncompress(const Format_description_log_event * description_event,bool contain_checksum,const char * src,ulong src_len,char * buf,ulong buf_size,bool * is_malloc,char ** dst,ulong * newlen)362 query_event_uncompress(const Format_description_log_event *description_event,
363 bool contain_checksum, const char *src, ulong src_len,
364 char* buf, ulong buf_size, bool* is_malloc, char **dst,
365 ulong *newlen)
366 {
367 ulong len = uint4korr(src + EVENT_LEN_OFFSET);
368 const char *tmp = src;
369 const char *end = src + len;
370
371 // bad event
372 if (src_len < len )
373 return 1;
374
375 DBUG_ASSERT((uchar)src[EVENT_TYPE_OFFSET] == QUERY_COMPRESSED_EVENT);
376
377 uint8 common_header_len= description_event->common_header_len;
378 uint8 post_header_len=
379 description_event->post_header_len[QUERY_COMPRESSED_EVENT-1];
380
381 *is_malloc = false;
382
383 tmp += common_header_len;
384 // bad event
385 if (end <= tmp)
386 return 1;
387
388 uint db_len = (uint)tmp[Q_DB_LEN_OFFSET];
389 uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET);
390
391 tmp += post_header_len + status_vars_len + db_len + 1;
392 // bad event
393 if (end <= tmp)
394 return 1;
395
396 int32 comp_len = (int32)(len - (tmp - src) -
397 (contain_checksum ? BINLOG_CHECKSUM_LEN : 0));
398 uint32 un_len = binlog_get_uncompress_len(tmp);
399
400 // bad event
401 if (comp_len < 0 || un_len == 0)
402 return 1;
403
404 *newlen = (ulong)(tmp - src) + un_len;
405 if(contain_checksum)
406 *newlen += BINLOG_CHECKSUM_LEN;
407
408 uint32 alloc_size = (uint32)ALIGN_SIZE(*newlen);
409 char *new_dst = NULL;
410
411
412 if (alloc_size <= buf_size)
413 {
414 new_dst = buf;
415 }
416 else
417 {
418 new_dst = (char *)my_malloc(PSI_INSTRUMENT_ME, alloc_size, MYF(MY_WME));
419 if (!new_dst)
420 return 1;
421
422 *is_malloc = true;
423 }
424
425 /* copy the head*/
426 memcpy(new_dst, src , tmp - src);
427 if (binlog_buf_uncompress(tmp, new_dst + (tmp - src),
428 comp_len, &un_len))
429 {
430 if (*is_malloc)
431 my_free(new_dst);
432
433 *is_malloc = false;
434
435 return 1;
436 }
437
438 new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT;
439 int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
440 if(contain_checksum)
441 {
442 ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
443 int4store(new_dst + clear_len,
444 my_checksum(0L, (uchar *)new_dst, clear_len));
445 }
446 *dst = new_dst;
447 return 0;
448 }
449
450 int
row_log_event_uncompress(const Format_description_log_event * description_event,bool contain_checksum,const char * src,ulong src_len,char * buf,ulong buf_size,bool * is_malloc,char ** dst,ulong * newlen)451 row_log_event_uncompress(const Format_description_log_event *description_event,
452 bool contain_checksum, const char *src, ulong src_len,
453 char* buf, ulong buf_size, bool* is_malloc, char **dst,
454 ulong *newlen)
455 {
456 Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
457 ulong len = uint4korr(src + EVENT_LEN_OFFSET);
458 const char *tmp = src;
459 char *new_dst = NULL;
460 const char *end = tmp + len;
461
462 // bad event
463 if (src_len < len)
464 return 1;
465
466 DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));
467
468 uint8 common_header_len= description_event->common_header_len;
469 uint8 post_header_len= description_event->post_header_len[type-1];
470
471 tmp += common_header_len + ROWS_HEADER_LEN_V1;
472 if (post_header_len == ROWS_HEADER_LEN_V2)
473 {
474 /*
475 Have variable length header, check length,
476 which includes length bytes
477 */
478
479 // bad event
480 if (end - tmp <= 2)
481 return 1;
482
483 uint16 var_header_len= uint2korr(tmp);
484 DBUG_ASSERT(var_header_len >= 2);
485
486 /* skip over var-len header, extracting 'chunks' */
487 tmp += var_header_len;
488
489 /* get the uncompressed event type */
490 type=
491 (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT);
492 }
493 else
494 {
495 /* get the uncompressed event type */
496 type= (Log_event_type)
497 (type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1);
498 }
499
500 //bad event
501 if (end <= tmp)
502 return 1;
503
504 ulong m_width = net_field_length((uchar **)&tmp);
505 tmp += (m_width + 7) / 8;
506
507 if (type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_EVENT)
508 {
509 tmp += (m_width + 7) / 8;
510 }
511
512 //bad event
513 if (end <= tmp)
514 return 1;
515
516 uint32 un_len = binlog_get_uncompress_len(tmp);
517 //bad event
518 if (un_len == 0)
519 return 1;
520
521 int32 comp_len = (int32)(len - (tmp - src) -
522 (contain_checksum ? BINLOG_CHECKSUM_LEN : 0));
523 //bad event
524 if (comp_len <=0)
525 return 1;
526
527 *newlen = ulong(tmp - src) + un_len;
528 if(contain_checksum)
529 *newlen += BINLOG_CHECKSUM_LEN;
530
531 size_t alloc_size = ALIGN_SIZE(*newlen);
532
533 *is_malloc = false;
534 if (alloc_size <= buf_size)
535 {
536 new_dst = buf;
537 }
538 else
539 {
540 new_dst = (char *)my_malloc(PSI_INSTRUMENT_ME, alloc_size, MYF(MY_WME));
541 if (!new_dst)
542 return 1;
543
544 *is_malloc = true;
545 }
546
547 /* Copy the head. */
548 memcpy(new_dst, src , tmp - src);
549 /* Uncompress the body. */
550 if (binlog_buf_uncompress(tmp, new_dst + (tmp - src),
551 comp_len, &un_len))
552 {
553 if (*is_malloc)
554 my_free(new_dst);
555
556 return 1;
557 }
558
559 new_dst[EVENT_TYPE_OFFSET] = type;
560 int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
561 if(contain_checksum){
562 ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
563 int4store(new_dst + clear_len,
564 my_checksum(0L, (uchar *)new_dst, clear_len));
565 }
566 *dst = new_dst;
567 return 0;
568 }
569
570 /**
571 Get the length of uncompress content.
572 return 0 means error.
573 */
574
binlog_get_uncompress_len(const char * buf)575 uint32 binlog_get_uncompress_len(const char *buf)
576 {
577 uint32 len = 0;
578 uint32 lenlen = 0;
579
580 if ((buf == NULL) || ((buf[0] & 0xe0) != 0x80))
581 return len;
582
583 lenlen = buf[0] & 0x07;
584
585 switch(lenlen)
586 {
587 case 1:
588 len = uchar(buf[1]);
589 break;
590 case 2:
591 len = uchar(buf[1]) << 8 | uchar(buf[2]);
592 break;
593 case 3:
594 len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]);
595 break;
596 case 4:
597 len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 |
598 uchar(buf[3]) << 8 | uchar(buf[4]);
599 break;
600 default:
601 DBUG_ASSERT(lenlen >= 1 && lenlen <= 4);
602 break;
603 }
604 return len;
605 }
606
607 /**
608 Uncompress the content in 'src' with length of 'len' to 'dst'.
609
610 Note: 1) Then the caller should guarantee the length of 'dst' (which
611 can be got by statement_get_uncompress_len) is enough to hold
612 the content uncompressed.
613 2) The 'newlen' should stored the length of 'dst', and it will
614 be set as the size of uncompressed content after return.
615
616 return zero if successful, others otherwise.
617 */
binlog_buf_uncompress(const char * src,char * dst,uint32 len,uint32 * newlen)618 int binlog_buf_uncompress(const char *src, char *dst, uint32 len,
619 uint32 *newlen)
620 {
621 if((src[0] & 0x80) == 0)
622 {
623 return 1;
624 }
625
626 uint32 lenlen= src[0] & 0x07;
627 uLongf buflen= *newlen;
628
629 uint32 alg = (src[0] & 0x70) >> 4;
630 switch(alg)
631 {
632 case 0:
633 // zlib
634 if(uncompress((Bytef *)dst, &buflen,
635 (const Bytef*)src + 1 + lenlen, len - 1 - lenlen) != Z_OK)
636 {
637 return 1;
638 }
639 break;
640 default:
641 //TODO
642 //bad algorithm
643 return 1;
644 }
645
646 DBUG_ASSERT(*newlen == (uint32)buflen);
647 *newlen = (uint32)buflen;
648 return 0;
649 }
650
651
652 /**************************************************************************
653 Log_event methods (= the parent class of all events)
654 **************************************************************************/
655
656 /**
657 @return
658 returns the human readable name of the event's type
659 */
660
get_type_str(Log_event_type type)661 const char* Log_event::get_type_str(Log_event_type type)
662 {
663 switch(type) {
664 case START_EVENT_V3: return "Start_v3";
665 case STOP_EVENT: return "Stop";
666 case QUERY_EVENT: return "Query";
667 case ROTATE_EVENT: return "Rotate";
668 case INTVAR_EVENT: return "Intvar";
669 case LOAD_EVENT: return "Load";
670 case NEW_LOAD_EVENT: return "New_load";
671 case SLAVE_EVENT: return "Slave";
672 case CREATE_FILE_EVENT: return "Create_file";
673 case APPEND_BLOCK_EVENT: return "Append_block";
674 case DELETE_FILE_EVENT: return "Delete_file";
675 case EXEC_LOAD_EVENT: return "Exec_load";
676 case RAND_EVENT: return "RAND";
677 case XID_EVENT: return "Xid";
678 case USER_VAR_EVENT: return "User var";
679 case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
680 case TABLE_MAP_EVENT: return "Table_map";
681 case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
682 case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
683 case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
684 case WRITE_ROWS_EVENT_V1: return "Write_rows_v1";
685 case UPDATE_ROWS_EVENT_V1: return "Update_rows_v1";
686 case DELETE_ROWS_EVENT_V1: return "Delete_rows_v1";
687 case WRITE_ROWS_EVENT: return "Write_rows";
688 case UPDATE_ROWS_EVENT: return "Update_rows";
689 case DELETE_ROWS_EVENT: return "Delete_rows";
690 case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
691 case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
692 case INCIDENT_EVENT: return "Incident";
693 case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
694 case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
695 case GTID_EVENT: return "Gtid";
696 case GTID_LIST_EVENT: return "Gtid_list";
697 case START_ENCRYPTION_EVENT: return "Start_encryption";
698
699 /* The following is only for mysqlbinlog */
700 case IGNORABLE_LOG_EVENT: return "Ignorable log event";
701 case ROWS_QUERY_LOG_EVENT: return "MySQL Rows_query";
702 case GTID_LOG_EVENT: return "MySQL Gtid";
703 case ANONYMOUS_GTID_LOG_EVENT: return "MySQL Anonymous_Gtid";
704 case PREVIOUS_GTIDS_LOG_EVENT: return "MySQL Previous_gtids";
705 case HEARTBEAT_LOG_EVENT: return "Heartbeat";
706 case TRANSACTION_CONTEXT_EVENT: return "Transaction_context";
707 case VIEW_CHANGE_EVENT: return "View_change";
708 case XA_PREPARE_LOG_EVENT: return "XA_prepare";
709 case QUERY_COMPRESSED_EVENT: return "Query_compressed";
710 case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed";
711 case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed";
712 case DELETE_ROWS_COMPRESSED_EVENT: return "Delete_rows_compressed";
713 case WRITE_ROWS_COMPRESSED_EVENT_V1: return "Write_rows_compressed_v1";
714 case UPDATE_ROWS_COMPRESSED_EVENT_V1: return "Update_rows_compressed_v1";
715 case DELETE_ROWS_COMPRESSED_EVENT_V1: return "Delete_rows_compressed_v1";
716
717 default: return "Unknown"; /* impossible */
718 }
719 }
720
get_type_str()721 const char* Log_event::get_type_str()
722 {
723 return get_type_str(get_type_code());
724 }
725
726
727 /*
728 Log_event::Log_event()
729 */
730
Log_event(const char * buf,const Format_description_log_event * description_event)731 Log_event::Log_event(const char* buf,
732 const Format_description_log_event* description_event)
733 :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE),
734 checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
735 {
736 #ifndef MYSQL_CLIENT
737 thd = 0;
738 #endif
739 when = uint4korr(buf);
740 when_sec_part= ~0UL;
741 server_id = uint4korr(buf + SERVER_ID_OFFSET);
742 data_written= uint4korr(buf + EVENT_LEN_OFFSET);
743 if (description_event->binlog_version==1)
744 {
745 log_pos= 0;
746 flags= 0;
747 return;
748 }
749 /* 4.0 or newer */
750 log_pos= uint4korr(buf + LOG_POS_OFFSET);
751 /*
752 If the log is 4.0 (so here it can only be a 4.0 relay log read by
753 the SQL thread or a 4.0 master binlog read by the I/O thread),
754 log_pos is the beginning of the event: we transform it into the end
755 of the event, which is more useful.
756 But how do you know that the log is 4.0: you know it if
757 description_event is version 3 *and* you are not reading a
758 Format_desc (remember that mysqlbinlog starts by assuming that 5.0
759 logs are in 4.0 format, until it finds a Format_desc).
760 */
761 if (description_event->binlog_version==3 &&
762 (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
763 {
764 /*
765 If log_pos=0, don't change it. log_pos==0 is a marker to mean
766 "don't change rli->group_master_log_pos" (see
767 inc_group_relay_log_pos()). As it is unreal log_pos, adding the
768 event len's is nonsense. For example, a fake Rotate event should
769 not have its log_pos (which is 0) changed or it will modify
770 Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
771 value of (a non-zero offset which does not exist in the master's
772 binlog, so which will cause problems if the user uses this value
773 in CHANGE MASTER).
774 */
775 log_pos+= data_written; /* purecov: inspected */
776 }
777 DBUG_PRINT("info", ("log_pos: %llu", log_pos));
778
779 flags= uint2korr(buf + FLAGS_OFFSET);
780 if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
781 ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
782 {
783 /*
784 These events always have a header which stops here (i.e. their
785 header is FROZEN).
786 */
787 /*
788 Initialization to zero of all other Log_event members as they're
789 not specified. Currently there are no such members; in the future
790 there will be an event UID (but Format_description and Rotate
791 don't need this UID, as they are not propagated through
792 --log-slave-updates (remember the UID is used to not play a query
793 twice when you have two masters which are slaves of a 3rd master).
794 Then we are done.
795 */
796 return;
797 }
798 /* otherwise, go on with reading the header from buf (nothing now) */
799 }
800
801
802 /**
803 This needn't be format-tolerant, because we only parse the first
804 LOG_EVENT_MINIMAL_HEADER_LEN bytes (just need the event's length).
805 */
806
read_log_event(IO_CACHE * file,String * packet,const Format_description_log_event * fdle,enum enum_binlog_checksum_alg checksum_alg_arg)807 int Log_event::read_log_event(IO_CACHE* file, String* packet,
808 const Format_description_log_event *fdle,
809 enum enum_binlog_checksum_alg checksum_alg_arg)
810 {
811 ulong data_len;
812 char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
813 uchar ev_offset= packet->length();
814 #if !defined(MYSQL_CLIENT)
815 THD *thd=current_thd;
816 ulong max_allowed_packet= thd ? thd->slave_thread ? slave_max_allowed_packet
817 : thd->variables.max_allowed_packet
818 : ~(uint)0;
819 #endif
820 DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,String*...)");
821
822 if (my_b_read(file, (uchar*) buf, sizeof(buf)))
823 {
824 /*
825 If the read hits eof, we must report it as eof so the caller
826 will know it can go into cond_wait to be woken up on the next
827 update to the log.
828 */
829 DBUG_PRINT("error",("file->error: %d", file->error));
830 DBUG_RETURN(file->error == 0 ? LOG_READ_EOF :
831 file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
832 }
833 data_len= uint4korr(buf + EVENT_LEN_OFFSET);
834
835 /* Append the log event header to packet */
836 if (packet->append(buf, sizeof(buf)))
837 DBUG_RETURN(LOG_READ_MEM);
838
839 if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN)
840 DBUG_RETURN(LOG_READ_BOGUS);
841
842 if (data_len > MY_MAX(max_allowed_packet,
843 opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
844 DBUG_RETURN(LOG_READ_TOO_LARGE);
845
846 if (likely(data_len > LOG_EVENT_MINIMAL_HEADER_LEN))
847 {
848 /* Append rest of event, read directly from file into packet */
849 if (packet->append(file, data_len - LOG_EVENT_MINIMAL_HEADER_LEN))
850 {
851 /*
852 Fatal error occurred when appending rest of the event
853 to packet, possible failures:
854 1. EOF occurred when reading from file, it's really an error
855 as there's supposed to be more bytes available.
856 file->error will have been set to number of bytes left to read
857 2. Read was interrupted, file->error would normally be set to -1
858 3. Failed to allocate memory for packet, my_errno
859 will be ENOMEM(file->error should be 0, but since the
860 memory allocation occurs before the call to read it might
861 be uninitialized)
862 */
863 DBUG_RETURN(my_errno == ENOMEM ? LOG_READ_MEM :
864 (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
865 }
866 }
867
868 if (fdle->crypto_data.scheme)
869 {
870 uchar iv[BINLOG_IV_LENGTH];
871 fdle->crypto_data.set_iv(iv, (uint32) (my_b_tell(file) - data_len));
872 size_t sz= data_len + ev_offset + 1;
873 #ifdef HAVE_WOLFSSL
874 /*
875 Workaround for MDEV-19582.
876 WolfSSL reads memory out of bounds with decryption/NOPAD)
877 We allocate a little more memory therefore.
878 */
879 sz += MY_AES_BLOCK_SIZE;
880 #endif
881 char *newpkt= (char*)my_malloc(PSI_INSTRUMENT_ME, sz, MYF(MY_WME));
882 if (!newpkt)
883 DBUG_RETURN(LOG_READ_MEM);
884 memcpy(newpkt, packet->ptr(), ev_offset);
885
886 uint dstlen;
887 uchar *src= (uchar*)packet->ptr() + ev_offset;
888 uchar *dst= (uchar*)newpkt + ev_offset;
889 memcpy(src + EVENT_LEN_OFFSET, src, 4);
890 if (encryption_crypt(src + 4, data_len - 4, dst + 4, &dstlen,
891 fdle->crypto_data.key, fdle->crypto_data.key_length, iv,
892 sizeof(iv), ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD,
893 ENCRYPTION_KEY_SYSTEM_DATA, fdle->crypto_data.key_version))
894 {
895 my_free(newpkt);
896 DBUG_RETURN(LOG_READ_DECRYPT);
897 }
898 DBUG_ASSERT(dstlen == data_len - 4);
899 memcpy(dst, dst + EVENT_LEN_OFFSET, 4);
900 int4store(dst + EVENT_LEN_OFFSET, data_len);
901 packet->reset(newpkt, data_len + ev_offset, data_len + ev_offset + 1,
902 &my_charset_bin);
903 }
904
905 /*
906 CRC verification of the Dump thread
907 */
908 if (data_len > LOG_EVENT_MINIMAL_HEADER_LEN)
909 {
910 /* Corrupt the event for Dump thread*/
911 DBUG_EXECUTE_IF("corrupt_read_log_event2",
912 uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
913 if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
914 {
915 int debug_cor_pos = rand() % (data_len - BINLOG_CHECKSUM_LEN);
916 debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
917 DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
918 DBUG_SET("-d,corrupt_read_log_event2");
919 }
920 );
921 if (event_checksum_test((uchar*) packet->ptr() + ev_offset,
922 data_len, checksum_alg_arg))
923 DBUG_RETURN(LOG_READ_CHECKSUM_FAILURE);
924 }
925 DBUG_RETURN(0);
926 }
927
read_log_event(IO_CACHE * file,const Format_description_log_event * fdle,my_bool crc_check)928 Log_event* Log_event::read_log_event(IO_CACHE* file,
929 const Format_description_log_event *fdle,
930 my_bool crc_check)
931 {
932 DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,Format_description_log_event*...)");
933 DBUG_ASSERT(fdle != 0);
934 String event;
935 const char *error= 0;
936 Log_event *res= 0;
937
938 switch (read_log_event(file, &event, fdle, BINLOG_CHECKSUM_ALG_OFF))
939 {
940 case 0:
941 break;
942 case LOG_READ_EOF: // no error here; we are at the file's end
943 goto err;
944 case LOG_READ_BOGUS:
945 error= "Event invalid";
946 goto err;
947 case LOG_READ_IO:
948 error= "read error";
949 goto err;
950 case LOG_READ_MEM:
951 error= "Out of memory";
952 goto err;
953 case LOG_READ_TRUNC:
954 error= "Event truncated";
955 goto err;
956 case LOG_READ_TOO_LARGE:
957 error= "Event too big";
958 goto err;
959 case LOG_READ_DECRYPT:
960 error= "Event decryption failure";
961 goto err;
962 case LOG_READ_CHECKSUM_FAILURE:
963 default:
964 DBUG_ASSERT(0);
965 error= "internal error";
966 goto err;
967 }
968
969 if ((res= read_log_event(event.ptr(), event.length(),
970 &error, fdle, crc_check)))
971 res->register_temp_buf(event.release(), true);
972
973 err:
974 if (unlikely(error))
975 {
976 DBUG_ASSERT(!res);
977 #ifdef MYSQL_CLIENT
978 if (force_opt)
979 DBUG_RETURN(new Unknown_log_event());
980 #endif
981 if (event.length() >= OLD_HEADER_LEN)
982 sql_print_error("Error in Log_event::read_log_event(): '%s',"
983 " data_len: %lu, event_type: %u", error,
984 (ulong) uint4korr(&event[EVENT_LEN_OFFSET]),
985 (uint) (uchar)event[EVENT_TYPE_OFFSET]);
986 else
987 sql_print_error("Error in Log_event::read_log_event(): '%s'", error);
988 /*
989 The SQL slave thread will check if file->error<0 to know
990 if there was an I/O error. Even if there is no "low-level" I/O errors
991 with 'file', any of the high-level above errors is worrying
992 enough to stop the SQL thread now ; as we are skipping the current event,
993 going on with reading and successfully executing other events can
994 only corrupt the slave's databases. So stop.
995 */
996 file->error= -1;
997 }
998 DBUG_RETURN(res);
999 }
1000
1001
1002 /**
1003 Binlog format tolerance is in (buf, event_len, fdle)
1004 constructors.
1005 */
1006
read_log_event(const char * buf,uint event_len,const char ** error,const Format_description_log_event * fdle,my_bool crc_check)1007 Log_event* Log_event::read_log_event(const char* buf, uint event_len,
1008 const char **error,
1009 const Format_description_log_event *fdle,
1010 my_bool crc_check)
1011 {
1012 Log_event* ev;
1013 enum enum_binlog_checksum_alg alg;
1014 DBUG_ENTER("Log_event::read_log_event(char*,...)");
1015 DBUG_ASSERT(fdle != 0);
1016 DBUG_PRINT("info", ("binlog_version: %d", fdle->binlog_version));
1017 DBUG_DUMP_EVENT_BUF(buf, event_len);
1018
1019 /*
1020 Check the integrity; This is needed because handle_slave_io() doesn't
1021 check if packet is of proper length.
1022 */
1023 if (event_len < EVENT_LEN_OFFSET)
1024 {
1025 *error="Sanity check failed"; // Needed to free buffer
1026 DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
1027 }
1028
1029 uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
1030 // all following START events in the current file are without checksum
1031 if (event_type == START_EVENT_V3)
1032 (const_cast< Format_description_log_event *>(fdle))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
1033 /*
1034 CRC verification by SQL and Show-Binlog-Events master side.
1035 The caller has to provide @fdle->checksum_alg to
1036 be the last seen FD's (A) descriptor.
1037 If event is FD the descriptor is in it.
1038 Notice, FD of the binlog can be only in one instance and therefore
1039 Show-Binlog-Events executing master side thread needs just to know
1040 the only FD's (A) value - whereas RL can contain more.
1041 In the RL case, the alg is kept in FD_e (@fdle) which is reset
1042 to the newer read-out event after its execution with possibly new alg descriptor.
1043 Therefore in a typical sequence of RL:
1044 {FD_s^0, FD_m, E_m^1} E_m^1
1045 will be verified with (A) of FD_m.
1046
1047 See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg docs
1048 lines (log.h).
1049
1050 Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF.
1051 */
1052 alg= (event_type != FORMAT_DESCRIPTION_EVENT) ?
1053 fdle->checksum_alg : get_checksum_alg(buf, event_len);
1054 // Emulate the corruption during reading an event
1055 DBUG_EXECUTE_IF("corrupt_read_log_event_char",
1056 if (event_type != FORMAT_DESCRIPTION_EVENT)
1057 {
1058 char *debug_event_buf_c = (char *)buf;
1059 int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
1060 debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
1061 DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event(char*,...): byte on position %d", debug_cor_pos));
1062 DBUG_SET("-d,corrupt_read_log_event_char");
1063 }
1064 );
1065 if (crc_check &&
1066 event_checksum_test((uchar *) buf, event_len, alg))
1067 {
1068 #ifdef MYSQL_CLIENT
1069 *error= "Event crc check failed! Most likely there is event corruption.";
1070 if (force_opt)
1071 {
1072 ev= new Unknown_log_event(buf, fdle);
1073 DBUG_RETURN(ev);
1074 }
1075 else
1076 DBUG_RETURN(NULL);
1077 #else
1078 *error= ER_THD_OR_DEFAULT(current_thd, ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE);
1079 sql_print_error("%s", *error);
1080 DBUG_RETURN(NULL);
1081 #endif
1082 }
1083
1084 if (event_type > fdle->number_of_event_types &&
1085 event_type != FORMAT_DESCRIPTION_EVENT)
1086 {
1087 /*
1088 It is unsafe to use the fdle if its post_header_len
1089 array does not include the event type.
1090 */
1091 DBUG_PRINT("error", ("event type %d found, but the current "
1092 "Format_description_log_event supports only %d event "
1093 "types", event_type,
1094 fdle->number_of_event_types));
1095 ev= NULL;
1096 }
1097 else
1098 {
1099 /*
1100 In some previuos versions (see comment in
1101 Format_description_log_event::Format_description_log_event(char*,...)),
1102 event types were assigned different id numbers than in the
1103 present version. In order to replicate from such versions to the
1104 present version, we must map those event type id's to our event
1105 type id's. The mapping is done with the event_type_permutation
1106 array, which was set up when the Format_description_log_event
1107 was read.
1108 */
1109 if (fdle->event_type_permutation)
1110 {
1111 int new_event_type= fdle->event_type_permutation[event_type];
1112 DBUG_PRINT("info", ("converting event type %d to %d (%s)",
1113 event_type, new_event_type,
1114 get_type_str((Log_event_type)new_event_type)));
1115 event_type= new_event_type;
1116 }
1117
1118 if (alg != BINLOG_CHECKSUM_ALG_UNDEF &&
1119 (event_type == FORMAT_DESCRIPTION_EVENT ||
1120 alg != BINLOG_CHECKSUM_ALG_OFF))
1121 event_len= event_len - BINLOG_CHECKSUM_LEN;
1122
1123 /*
1124 Create an object of Ignorable_log_event for unrecognized sub-class.
1125 So that SLAVE SQL THREAD will only update the position and continue.
1126 We should look for this flag first instead of judging by event_type
1127 Any event can be Ignorable_log_event if it has this flag on.
1128 look into @note of Ignorable_log_event
1129 */
1130 if (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)
1131 {
1132 ev= new Ignorable_log_event(buf, fdle,
1133 get_type_str((Log_event_type) event_type));
1134 goto exit;
1135 }
1136 switch(event_type) {
1137 case QUERY_EVENT:
1138 ev = new Query_log_event(buf, event_len, fdle, QUERY_EVENT);
1139 break;
1140 case QUERY_COMPRESSED_EVENT:
1141 ev = new Query_compressed_log_event(buf, event_len, fdle,
1142 QUERY_COMPRESSED_EVENT);
1143 break;
1144 case LOAD_EVENT:
1145 ev = new Load_log_event(buf, event_len, fdle);
1146 break;
1147 case NEW_LOAD_EVENT:
1148 ev = new Load_log_event(buf, event_len, fdle);
1149 break;
1150 case ROTATE_EVENT:
1151 ev = new Rotate_log_event(buf, event_len, fdle);
1152 break;
1153 case BINLOG_CHECKPOINT_EVENT:
1154 ev = new Binlog_checkpoint_log_event(buf, event_len, fdle);
1155 break;
1156 case GTID_EVENT:
1157 ev = new Gtid_log_event(buf, event_len, fdle);
1158 break;
1159 case GTID_LIST_EVENT:
1160 ev = new Gtid_list_log_event(buf, event_len, fdle);
1161 break;
1162 case CREATE_FILE_EVENT:
1163 ev = new Create_file_log_event(buf, event_len, fdle);
1164 break;
1165 case APPEND_BLOCK_EVENT:
1166 ev = new Append_block_log_event(buf, event_len, fdle);
1167 break;
1168 case DELETE_FILE_EVENT:
1169 ev = new Delete_file_log_event(buf, event_len, fdle);
1170 break;
1171 case EXEC_LOAD_EVENT:
1172 ev = new Execute_load_log_event(buf, event_len, fdle);
1173 break;
1174 case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
1175 ev = new Start_log_event_v3(buf, event_len, fdle);
1176 break;
1177 case STOP_EVENT:
1178 ev = new Stop_log_event(buf, fdle);
1179 break;
1180 case INTVAR_EVENT:
1181 ev = new Intvar_log_event(buf, fdle);
1182 break;
1183 case XID_EVENT:
1184 ev = new Xid_log_event(buf, fdle);
1185 break;
1186 case XA_PREPARE_LOG_EVENT:
1187 ev = new XA_prepare_log_event(buf, fdle);
1188 break;
1189 case RAND_EVENT:
1190 ev = new Rand_log_event(buf, fdle);
1191 break;
1192 case USER_VAR_EVENT:
1193 ev = new User_var_log_event(buf, event_len, fdle);
1194 break;
1195 case FORMAT_DESCRIPTION_EVENT:
1196 ev = new Format_description_log_event(buf, event_len, fdle);
1197 break;
1198 #if defined(HAVE_REPLICATION)
1199 case PRE_GA_WRITE_ROWS_EVENT:
1200 ev = new Write_rows_log_event_old(buf, event_len, fdle);
1201 break;
1202 case PRE_GA_UPDATE_ROWS_EVENT:
1203 ev = new Update_rows_log_event_old(buf, event_len, fdle);
1204 break;
1205 case PRE_GA_DELETE_ROWS_EVENT:
1206 ev = new Delete_rows_log_event_old(buf, event_len, fdle);
1207 break;
1208 case WRITE_ROWS_EVENT_V1:
1209 case WRITE_ROWS_EVENT:
1210 ev = new Write_rows_log_event(buf, event_len, fdle);
1211 break;
1212 case UPDATE_ROWS_EVENT_V1:
1213 case UPDATE_ROWS_EVENT:
1214 ev = new Update_rows_log_event(buf, event_len, fdle);
1215 break;
1216 case DELETE_ROWS_EVENT_V1:
1217 case DELETE_ROWS_EVENT:
1218 ev = new Delete_rows_log_event(buf, event_len, fdle);
1219 break;
1220
1221 case WRITE_ROWS_COMPRESSED_EVENT:
1222 case WRITE_ROWS_COMPRESSED_EVENT_V1:
1223 ev = new Write_rows_compressed_log_event(buf, event_len, fdle);
1224 break;
1225 case UPDATE_ROWS_COMPRESSED_EVENT:
1226 case UPDATE_ROWS_COMPRESSED_EVENT_V1:
1227 ev = new Update_rows_compressed_log_event(buf, event_len, fdle);
1228 break;
1229 case DELETE_ROWS_COMPRESSED_EVENT:
1230 case DELETE_ROWS_COMPRESSED_EVENT_V1:
1231 ev = new Delete_rows_compressed_log_event(buf, event_len, fdle);
1232 break;
1233
1234 /* MySQL GTID events are ignored */
1235 case GTID_LOG_EVENT:
1236 case ANONYMOUS_GTID_LOG_EVENT:
1237 case PREVIOUS_GTIDS_LOG_EVENT:
1238 case TRANSACTION_CONTEXT_EVENT:
1239 case VIEW_CHANGE_EVENT:
1240 ev= new Ignorable_log_event(buf, fdle,
1241 get_type_str((Log_event_type) event_type));
1242 break;
1243
1244 case TABLE_MAP_EVENT:
1245 ev = new Table_map_log_event(buf, event_len, fdle);
1246 break;
1247 #endif
1248 case BEGIN_LOAD_QUERY_EVENT:
1249 ev = new Begin_load_query_log_event(buf, event_len, fdle);
1250 break;
1251 case EXECUTE_LOAD_QUERY_EVENT:
1252 ev= new Execute_load_query_log_event(buf, event_len, fdle);
1253 break;
1254 case INCIDENT_EVENT:
1255 ev = new Incident_log_event(buf, event_len, fdle);
1256 break;
1257 case ANNOTATE_ROWS_EVENT:
1258 ev = new Annotate_rows_log_event(buf, event_len, fdle);
1259 break;
1260 case START_ENCRYPTION_EVENT:
1261 ev = new Start_encryption_log_event(buf, event_len, fdle);
1262 break;
1263 default:
1264 DBUG_PRINT("error",("Unknown event code: %d",
1265 (uchar) buf[EVENT_TYPE_OFFSET]));
1266 ev= NULL;
1267 break;
1268 }
1269 }
1270 exit:
1271
1272 if (ev)
1273 {
1274 ev->checksum_alg= alg;
1275 #ifdef MYSQL_CLIENT
1276 if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1277 ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1278 ev->crc= uint4korr(buf + (event_len));
1279 #endif
1280 }
1281
1282 DBUG_PRINT("read_event", ("%s(type_code: %u; event_len: %u)",
1283 ev ? ev->get_type_str() : "<unknown>",
1284 (uchar)buf[EVENT_TYPE_OFFSET],
1285 event_len));
1286 /*
1287 is_valid() are small event-specific sanity tests which are
1288 important; for example there are some my_malloc() in constructors
1289 (e.g. Query_log_event::Query_log_event(char*...)); when these
1290 my_malloc() fail we can't return an error out of the constructor
1291 (because constructor is "void") ; so instead we leave the pointer we
1292 wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1293 Same for Format_description_log_event, member 'post_header_len'.
1294
1295 SLAVE_EVENT is never used, so it should not be read ever.
1296 */
1297 if (!ev || !ev->is_valid() || (event_type == SLAVE_EVENT))
1298 {
1299 DBUG_PRINT("error",("Found invalid event in binary log"));
1300
1301 delete ev;
1302 #ifdef MYSQL_CLIENT
1303 if (!force_opt) /* then mysqlbinlog dies */
1304 {
1305 *error= "Found invalid event in binary log";
1306 DBUG_RETURN(0);
1307 }
1308 ev= new Unknown_log_event(buf, fdle);
1309 #else
1310 *error= "Found invalid event in binary log";
1311 DBUG_RETURN(0);
1312 #endif
1313 }
1314 DBUG_RETURN(ev);
1315 }
1316
1317
1318
1319 /* 2 utility functions for the next method */
1320
1321 /**
1322 Read a string with length from memory.
1323
1324 This function reads the string-with-length stored at
1325 <code>src</code> and extract the length into <code>*len</code> and
1326 a pointer to the start of the string into <code>*dst</code>. The
1327 string can then be copied using <code>memcpy()</code> with the
1328 number of bytes given in <code>*len</code>.
1329
1330 @param src Pointer to variable holding a pointer to the memory to
1331 read the string from.
1332 @param dst Pointer to variable holding a pointer where the actual
1333 string starts. Starting from this position, the string
1334 can be copied using @c memcpy().
1335 @param len Pointer to variable where the length will be stored.
1336 @param end One-past-the-end of the memory where the string is
1337 stored.
1338
1339 @return Zero if the entire string can be copied successfully,
1340 @c UINT_MAX if the length could not be read from memory
1341 (that is, if <code>*src >= end</code>), otherwise the
1342 number of bytes that are missing to read the full
1343 string, which happends <code>*dst + *len >= end</code>.
1344 */
1345 static int
get_str_len_and_pointer(const Log_event::Byte ** src,const char ** dst,uint * len,const Log_event::Byte * end)1346 get_str_len_and_pointer(const Log_event::Byte **src,
1347 const char **dst,
1348 uint *len,
1349 const Log_event::Byte *end)
1350 {
1351 if (*src >= end)
1352 return -1; // Will be UINT_MAX in two-complement arithmetics
1353 uint length= **src;
1354 if (length > 0)
1355 {
1356 if (*src + length >= end)
1357 return (int)(*src + length - end + 1); // Number of bytes missing
1358 *dst= (char *)*src + 1; // Will be copied later
1359 }
1360 *len= length;
1361 *src+= length + 1;
1362 return 0;
1363 }
1364
copy_str_and_move(const char ** src,Log_event::Byte ** dst,size_t len)1365 static void copy_str_and_move(const char **src,
1366 Log_event::Byte **dst,
1367 size_t len)
1368 {
1369 memcpy(*dst, *src, len);
1370 *src= (const char *)*dst;
1371 (*dst)+= len;
1372 *(*dst)++= 0;
1373 }
1374
1375
1376 #ifndef DBUG_OFF
1377 static char const *
code_name(int code)1378 code_name(int code)
1379 {
1380 static char buf[255];
1381 switch (code) {
1382 case Q_FLAGS2_CODE: return "Q_FLAGS2_CODE";
1383 case Q_SQL_MODE_CODE: return "Q_SQL_MODE_CODE";
1384 case Q_CATALOG_CODE: return "Q_CATALOG_CODE";
1385 case Q_AUTO_INCREMENT: return "Q_AUTO_INCREMENT";
1386 case Q_CHARSET_CODE: return "Q_CHARSET_CODE";
1387 case Q_TIME_ZONE_CODE: return "Q_TIME_ZONE_CODE";
1388 case Q_CATALOG_NZ_CODE: return "Q_CATALOG_NZ_CODE";
1389 case Q_LC_TIME_NAMES_CODE: return "Q_LC_TIME_NAMES_CODE";
1390 case Q_CHARSET_DATABASE_CODE: return "Q_CHARSET_DATABASE_CODE";
1391 case Q_TABLE_MAP_FOR_UPDATE_CODE: return "Q_TABLE_MAP_FOR_UPDATE_CODE";
1392 case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
1393 case Q_HRNOW: return "Q_HRNOW";
1394 }
1395 sprintf(buf, "CODE#%d", code);
1396 return buf;
1397 }
1398 #endif
1399
1400 #define VALIDATE_BYTES_READ(CUR_POS, START, EVENT_LEN) \
1401 do { \
1402 uchar *cur_pos= (uchar *)CUR_POS; \
1403 uchar *start= (uchar *)START; \
1404 uint len= EVENT_LEN; \
1405 uint bytes_read= (uint)(cur_pos - start); \
1406 DBUG_PRINT("info", ("Bytes read: %u event_len:%u.\n",\
1407 bytes_read, len)); \
1408 if (bytes_read >= len) \
1409 DBUG_VOID_RETURN; \
1410 } while (0)
1411
1412 /**
1413 Macro to check that there is enough space to read from memory.
1414
1415 @param PTR Pointer to memory
1416 @param END End of memory
1417 @param CNT Number of bytes that should be read.
1418 */
1419 #define CHECK_SPACE(PTR,END,CNT) \
1420 do { \
1421 DBUG_PRINT("info", ("Read %s", code_name(pos[-1]))); \
1422 if ((PTR) + (CNT) > (END)) { \
1423 DBUG_PRINT("info", ("query= 0")); \
1424 query= 0; \
1425 DBUG_VOID_RETURN; \
1426 } \
1427 } while (0)
1428
1429
1430 /**
1431 This is used by the SQL slave thread to prepare the event before execution.
1432 */
Query_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event,Log_event_type event_type)1433 Query_log_event::Query_log_event(const char* buf, uint event_len,
1434 const Format_description_log_event
1435 *description_event,
1436 Log_event_type event_type)
1437 :Log_event(buf, description_event), data_buf(0), query(NullS),
1438 db(NullS), catalog_len(0), status_vars_len(0),
1439 flags2_inited(0), sql_mode_inited(0), charset_inited(0), flags2(0),
1440 auto_increment_increment(1), auto_increment_offset(1),
1441 time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
1442 table_map_for_update(0), master_data_written(0)
1443 {
1444 ulong data_len;
1445 uint32 tmp;
1446 uint8 common_header_len, post_header_len;
1447 Log_event::Byte *start;
1448 const Log_event::Byte *end;
1449 bool catalog_nz= 1;
1450 DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
1451
1452 memset(&user, 0, sizeof(user));
1453 memset(&host, 0, sizeof(host));
1454 common_header_len= description_event->common_header_len;
1455 post_header_len= description_event->post_header_len[event_type-1];
1456 DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
1457 event_len, common_header_len, post_header_len));
1458
1459 /*
1460 We test if the event's length is sensible, and if so we compute data_len.
1461 We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1462 We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1463 */
1464 if (event_len < (uint)(common_header_len + post_header_len))
1465 DBUG_VOID_RETURN;
1466 data_len = event_len - (common_header_len + post_header_len);
1467 buf+= common_header_len;
1468
1469 thread_id = slave_proxy_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1470 exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1471 db_len = (uchar)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1472 error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1473
1474 /*
1475 5.0 format starts here.
1476 Depending on the format, we may or not have affected/warnings etc
1477 The remnent post-header to be parsed has length:
1478 */
1479 tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1480 if (tmp)
1481 {
1482 status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1483 /*
1484 Check if status variable length is corrupt and will lead to very
1485 wrong data. We could be even more strict and require data_len to
1486 be even bigger, but this will suffice to catch most corruption
1487 errors that can lead to a crash.
1488 */
1489 if (status_vars_len > MY_MIN(data_len, MAX_SIZE_LOG_EVENT_STATUS))
1490 {
1491 DBUG_PRINT("info", ("status_vars_len (%u) > data_len (%lu); query= 0",
1492 status_vars_len, data_len));
1493 query= 0;
1494 DBUG_VOID_RETURN;
1495 }
1496 data_len-= status_vars_len;
1497 DBUG_PRINT("info", ("Query_log_event has status_vars_len: %u",
1498 (uint) status_vars_len));
1499 tmp-= 2;
1500 }
1501 else
1502 {
1503 /*
1504 server version < 5.0 / binlog_version < 4 master's event is
1505 relay-logged with storing the original size of the event in
1506 Q_MASTER_DATA_WRITTEN_CODE status variable.
1507 The size is to be restored at reading Q_MASTER_DATA_WRITTEN_CODE-marked
1508 event from the relay log.
1509 */
1510 DBUG_ASSERT(description_event->binlog_version < 4);
1511 master_data_written= (uint32)data_written;
1512 }
1513 /*
1514 We have parsed everything we know in the post header for QUERY_EVENT,
1515 the rest of post header is either comes from older version MySQL or
1516 dedicated to derived events (e.g. Execute_load_query...)
1517 */
1518
1519 /* variable-part: the status vars; only in MySQL 5.0 */
1520
1521 start= (Log_event::Byte*) (buf+post_header_len);
1522 end= (const Log_event::Byte*) (start+status_vars_len);
1523 for (const Log_event::Byte* pos= start; pos < end;)
1524 {
1525 switch (*pos++) {
1526 case Q_FLAGS2_CODE:
1527 CHECK_SPACE(pos, end, 4);
1528 flags2_inited= 1;
1529 flags2= uint4korr(pos);
1530 DBUG_PRINT("info",("In Query_log_event, read flags2: %lu", (ulong) flags2));
1531 pos+= 4;
1532 break;
1533 case Q_SQL_MODE_CODE:
1534 {
1535 CHECK_SPACE(pos, end, 8);
1536 sql_mode_inited= 1;
1537 sql_mode= (sql_mode_t) uint8korr(pos);
1538 DBUG_PRINT("info",("In Query_log_event, read sql_mode: %llu", sql_mode));
1539 pos+= 8;
1540 break;
1541 }
1542 case Q_CATALOG_NZ_CODE:
1543 DBUG_PRINT("info", ("case Q_CATALOG_NZ_CODE; pos:%p; end:%p",
1544 pos, end));
1545 if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
1546 {
1547 DBUG_PRINT("info", ("query= 0"));
1548 query= 0;
1549 DBUG_VOID_RETURN;
1550 }
1551 break;
1552 case Q_AUTO_INCREMENT:
1553 CHECK_SPACE(pos, end, 4);
1554 auto_increment_increment= uint2korr(pos);
1555 auto_increment_offset= uint2korr(pos+2);
1556 pos+= 4;
1557 break;
1558 case Q_CHARSET_CODE:
1559 {
1560 CHECK_SPACE(pos, end, 6);
1561 charset_inited= 1;
1562 memcpy(charset, pos, 6);
1563 pos+= 6;
1564 break;
1565 }
1566 case Q_TIME_ZONE_CODE:
1567 {
1568 if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1569 {
1570 DBUG_PRINT("info", ("Q_TIME_ZONE_CODE: query= 0"));
1571 query= 0;
1572 DBUG_VOID_RETURN;
1573 }
1574 break;
1575 }
1576 case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
1577 CHECK_SPACE(pos, end, 1);
1578 if ((catalog_len= *pos))
1579 catalog= (char*) pos+1; // Will be copied later
1580 CHECK_SPACE(pos, end, catalog_len + 2);
1581 pos+= catalog_len+2; // leap over end 0
1582 catalog_nz= 0; // catalog has end 0 in event
1583 break;
1584 case Q_LC_TIME_NAMES_CODE:
1585 CHECK_SPACE(pos, end, 2);
1586 lc_time_names_number= uint2korr(pos);
1587 pos+= 2;
1588 break;
1589 case Q_CHARSET_DATABASE_CODE:
1590 CHECK_SPACE(pos, end, 2);
1591 charset_database_number= uint2korr(pos);
1592 pos+= 2;
1593 break;
1594 case Q_TABLE_MAP_FOR_UPDATE_CODE:
1595 CHECK_SPACE(pos, end, 8);
1596 table_map_for_update= uint8korr(pos);
1597 pos+= 8;
1598 break;
1599 case Q_MASTER_DATA_WRITTEN_CODE:
1600 CHECK_SPACE(pos, end, 4);
1601 data_written= master_data_written= uint4korr(pos);
1602 pos+= 4;
1603 break;
1604 case Q_INVOKER:
1605 {
1606 CHECK_SPACE(pos, end, 1);
1607 user.length= *pos++;
1608 CHECK_SPACE(pos, end, user.length);
1609 user.str= (char *)pos;
1610 pos+= user.length;
1611
1612 CHECK_SPACE(pos, end, 1);
1613 host.length= *pos++;
1614 CHECK_SPACE(pos, end, host.length);
1615 host.str= (char *)pos;
1616 pos+= host.length;
1617 break;
1618 }
1619 case Q_HRNOW:
1620 {
1621 CHECK_SPACE(pos, end, 3);
1622 when_sec_part= uint3korr(pos);
1623 pos+= 3;
1624 break;
1625 }
1626 default:
1627 /* That's why you must write status vars in growing order of code */
1628 DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\
1629 code: %u), skipping the rest of them", (uint) *(pos-1)));
1630 pos= (const uchar*) end; // Break loop
1631 }
1632 }
1633
1634 #if !defined(MYSQL_CLIENT)
1635 if (description_event->server_version_split.kind ==
1636 Format_description_log_event::master_version_split::KIND_MYSQL)
1637 {
1638 // Handle MariaDB/MySQL incompatible sql_mode bits
1639 sql_mode_t mysql_sql_mode= sql_mode;
1640 sql_mode&= MODE_MASK_MYSQL_COMPATIBLE; // Unset MySQL specific bits
1641
1642 /*
1643 sql_mode flags related to fraction second rounding/truncation
1644 have opposite meaning in MySQL vs MariaDB.
1645 MySQL:
1646 - rounds fractional seconds by default
1647 - truncates if TIME_TRUNCATE_FRACTIONAL is set
1648 MariaDB:
1649 - truncates fractional seconds by default
1650 - rounds if TIME_ROUND_FRACTIONAL is set
1651 */
1652 if (description_event->server_version_split >= fsp_version_split_mysql &&
1653 !(mysql_sql_mode & MODE_MYSQL80_TIME_TRUNCATE_FRACTIONAL))
1654 sql_mode|= MODE_TIME_ROUND_FRACTIONAL;
1655 }
1656 #endif
1657
1658 /**
1659 Layout for the data buffer is as follows
1660 +--------+-----------+------+------+---------+----+-------+
1661 | catlog | time_zone | user | host | db name | \0 | Query |
1662 +--------+-----------+------+------+---------+----+-------+
1663
1664 To support the query cache we append the following buffer to the above
1665 +-------+----------------------------------------+-------+
1666 |db len | uninitiatlized space of size of db len | FLAGS |
1667 +-------+----------------------------------------+-------+
1668
1669 The area of buffer starting from Query field all the way to the end belongs
1670 to the Query buffer and its structure is described in alloc_query() in
1671 sql_parse.cc
1672 */
1673
1674 #if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
1675 if (!(start= data_buf = (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
1676 catalog_len + 1
1677 + time_zone_len + 1
1678 + user.length + 1
1679 + host.length + 1
1680 + data_len + 1
1681 + sizeof(size_t)//for db_len
1682 + db_len + 1
1683 + QUERY_CACHE_DB_LENGTH_SIZE
1684 + QUERY_CACHE_FLAGS_SIZE,
1685 MYF(MY_WME))))
1686 #else
1687 if (!(start= data_buf = (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
1688 catalog_len + 1
1689 + time_zone_len + 1
1690 + user.length + 1
1691 + host.length + 1
1692 + data_len + 1,
1693 MYF(MY_WME))))
1694 #endif
1695 DBUG_VOID_RETURN;
1696 if (catalog_len) // If catalog is given
1697 {
1698 /**
1699 @todo we should clean up and do only copy_str_and_move; it
1700 works for both cases. Then we can remove the catalog_nz
1701 flag. /sven
1702 */
1703 if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1704 copy_str_and_move(&catalog, &start, catalog_len);
1705 else
1706 {
1707 memcpy(start, catalog, catalog_len+1); // copy end 0
1708 catalog= (const char *)start;
1709 start+= catalog_len+1;
1710 }
1711 }
1712 if (time_zone_len)
1713 copy_str_and_move(&time_zone_str, &start, time_zone_len);
1714
1715 if (user.length)
1716 {
1717 copy_str_and_move(&user.str, &start, user.length);
1718 }
1719 else
1720 {
1721 user.str= (char*) start;
1722 *(start++)= 0;
1723 }
1724
1725 if (host.length)
1726 copy_str_and_move(&host.str, &start, host.length);
1727 else
1728 {
1729 host.str= (char*) start;
1730 *(start++)= 0;
1731 }
1732
1733 /**
1734 if time_zone_len or catalog_len are 0, then time_zone and catalog
1735 are uninitialized at this point. shouldn't they point to the
1736 zero-length null-terminated strings we allocated space for in the
1737 my_alloc call above? /sven
1738 */
1739
1740 /* A 2nd variable part; this is common to all versions */
1741 memcpy((char*) start, end, data_len); // Copy db and query
1742 start[data_len]= '\0'; // End query with \0 (For safetly)
1743 db= (char *)start;
1744 query= (char *)(start + db_len + 1);
1745 q_len= data_len - db_len -1;
1746
1747 if (data_len && (data_len < db_len ||
1748 data_len < q_len ||
1749 data_len != (db_len + q_len + 1)))
1750 {
1751 q_len= 0;
1752 query= NULL;
1753 DBUG_VOID_RETURN;
1754 }
1755
1756 uint32 max_length= uint32(event_len - ((const char*)(end + db_len + 1) -
1757 (buf - common_header_len)));
1758 if (q_len != max_length ||
1759 (event_len < uint((const char*)(end + db_len + 1) -
1760 (buf - common_header_len))))
1761 {
1762 q_len= 0;
1763 query= NULL;
1764 DBUG_VOID_RETURN;
1765 }
1766 /**
1767 Append the db length at the end of the buffer. This will be used by
1768 Query_cache::send_result_to_client() in case the query cache is On.
1769 */
1770 #if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
1771 size_t db_length= (size_t)db_len;
1772 memcpy(start + data_len + 1, &db_length, sizeof(size_t));
1773 #endif
1774 DBUG_VOID_RETURN;
1775 }
1776
Query_compressed_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event,Log_event_type event_type)1777 Query_compressed_log_event::Query_compressed_log_event(const char *buf,
1778 uint event_len,
1779 const Format_description_log_event
1780 *description_event,
1781 Log_event_type event_type)
1782 :Query_log_event(buf, event_len, description_event, event_type),
1783 query_buf(NULL)
1784 {
1785 if(query)
1786 {
1787 uint32 un_len=binlog_get_uncompress_len(query);
1788 if (!un_len)
1789 {
1790 query = 0;
1791 return;
1792 }
1793
1794 /* Reserve one byte for '\0' */
1795 query_buf = (Log_event::Byte*)my_malloc(PSI_INSTRUMENT_ME,
1796 ALIGN_SIZE(un_len + 1), MYF(MY_WME));
1797 if(query_buf &&
1798 !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len))
1799 {
1800 query_buf[un_len] = 0;
1801 query = (const char *)query_buf;
1802 q_len = un_len;
1803 }
1804 else
1805 {
1806 query= 0;
1807 }
1808 }
1809 }
1810
1811 /*
1812 Replace a binlog event read into a packet with a dummy event. Either a
1813 Query_log_event that has just a comment, or if that will not fit in the
1814 space used for the event to be replaced, then a NULL user_var event.
1815
1816 This is used when sending binlog data to a slave which does not understand
1817 this particular event and which is too old to support informational events
1818 or holes in the event stream.
1819
1820 This allows to write such events into the binlog on the master and still be
1821 able to replicate against old slaves without them breaking.
1822
1823 Clears the flag LOG_EVENT_THREAD_SPECIFIC_F and set LOG_EVENT_SUPPRESS_USE_F.
1824 Overwrites the type with QUERY_EVENT (or USER_VAR_EVENT), and replaces the
1825 body with a minimal query / NULL user var.
1826
1827 Returns zero on success, -1 if error due to too little space in original
1828 event. A minimum of 25 bytes (19 bytes fixed header + 6 bytes in the body)
1829 is needed in any event to be replaced with a dummy event.
1830 */
1831 int
dummy_event(String * packet,ulong ev_offset,enum enum_binlog_checksum_alg checksum_alg)1832 Query_log_event::dummy_event(String *packet, ulong ev_offset,
1833 enum enum_binlog_checksum_alg checksum_alg)
1834 {
1835 uchar *p= (uchar *)packet->ptr() + ev_offset;
1836 size_t data_len= packet->length() - ev_offset;
1837 uint16 flags;
1838 static const size_t min_user_var_event_len=
1839 LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + 1 + UV_VAL_IS_NULL; // 25
1840 static const size_t min_query_event_len=
1841 LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN + 1 + 1; // 34
1842
1843 if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
1844 data_len-= BINLOG_CHECKSUM_LEN;
1845 else
1846 DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1847 checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
1848
1849 if (data_len < min_user_var_event_len)
1850 /* Cannot replace with dummy, event too short. */
1851 return -1;
1852
1853 flags= uint2korr(p + FLAGS_OFFSET);
1854 flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
1855 flags|= LOG_EVENT_SUPPRESS_USE_F;
1856 int2store(p + FLAGS_OFFSET, flags);
1857
1858 if (data_len < min_query_event_len)
1859 {
1860 /*
1861 Have to use dummy user_var event for such a short packet.
1862
1863 This works, but the event will be considered part of an event group with
1864 the following event. So for example @@global.sql_slave_skip_counter=1
1865 will skip not only the dummy event, but also the immediately following
1866 event.
1867
1868 We write a NULL user var with the name @`!dummyvar` (or as much
1869 as that as will fit within the size of the original event - so
1870 possibly just @`!`).
1871 */
1872 static const char var_name[]= "!dummyvar";
1873 size_t name_len= data_len - (min_user_var_event_len - 1);
1874
1875 p[EVENT_TYPE_OFFSET]= USER_VAR_EVENT;
1876 int4store(p + LOG_EVENT_HEADER_LEN, name_len);
1877 memcpy(p + LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE, var_name, name_len);
1878 p[LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + name_len]= 1; // indicates NULL
1879 }
1880 else
1881 {
1882 /*
1883 Use a dummy query event, just a comment.
1884 */
1885 static const char message[]=
1886 "# Dummy event replacing event type %u that slave cannot handle.";
1887 char buf[sizeof(message)+1]; /* +1, as %u can expand to 3 digits. */
1888 uchar old_type= p[EVENT_TYPE_OFFSET];
1889 uchar *q= p + LOG_EVENT_HEADER_LEN;
1890 size_t comment_len, len;
1891
1892 p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
1893 int4store(q + Q_THREAD_ID_OFFSET, 0);
1894 int4store(q + Q_EXEC_TIME_OFFSET, 0);
1895 q[Q_DB_LEN_OFFSET]= 0;
1896 int2store(q + Q_ERR_CODE_OFFSET, 0);
1897 int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
1898 q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
1899 q+= Q_DATA_OFFSET + 1;
1900 len= my_snprintf(buf, sizeof(buf), message, old_type);
1901 comment_len= data_len - (min_query_event_len - 1);
1902 if (comment_len <= len)
1903 memcpy(q, buf, comment_len);
1904 else
1905 {
1906 memcpy(q, buf, len);
1907 memset(q+len, ' ', comment_len - len);
1908 }
1909 }
1910
1911 if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
1912 {
1913 ha_checksum crc= my_checksum(0, p, data_len);
1914 int4store(p + data_len, crc);
1915 }
1916 return 0;
1917 }
1918
1919 /*
1920 Replace an event (GTID event) with a BEGIN query event, to be compatible
1921 with an old slave.
1922 */
1923 int
begin_event(String * packet,ulong ev_offset,enum enum_binlog_checksum_alg checksum_alg)1924 Query_log_event::begin_event(String *packet, ulong ev_offset,
1925 enum enum_binlog_checksum_alg checksum_alg)
1926 {
1927 uchar *p= (uchar *)packet->ptr() + ev_offset;
1928 uchar *q= p + LOG_EVENT_HEADER_LEN;
1929 size_t data_len= packet->length() - ev_offset;
1930 uint16 flags;
1931
1932 if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
1933 data_len-= BINLOG_CHECKSUM_LEN;
1934 else
1935 DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1936 checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
1937
1938 /*
1939 Currently we only need to replace GTID event.
1940 The length of GTID differs depending on whether it contains commit id.
1941 */
1942 DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN ||
1943 data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
1944 if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN &&
1945 data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2)
1946 return 1;
1947
1948 flags= uint2korr(p + FLAGS_OFFSET);
1949 flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
1950 flags|= LOG_EVENT_SUPPRESS_USE_F;
1951 int2store(p + FLAGS_OFFSET, flags);
1952
1953 p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
1954 int4store(q + Q_THREAD_ID_OFFSET, 0);
1955 int4store(q + Q_EXEC_TIME_OFFSET, 0);
1956 q[Q_DB_LEN_OFFSET]= 0;
1957 int2store(q + Q_ERR_CODE_OFFSET, 0);
1958 if (data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
1959 {
1960 int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
1961 q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
1962 q+= Q_DATA_OFFSET + 1;
1963 }
1964 else
1965 {
1966 DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
1967 /* Put in an empty time_zone_str to take up the extra 2 bytes. */
1968 int2store(q + Q_STATUS_VARS_LEN_OFFSET, 2);
1969 q[Q_DATA_OFFSET]= Q_TIME_ZONE_CODE;
1970 q[Q_DATA_OFFSET+1]= 0; /* Zero length for empty time_zone_str */
1971 q[Q_DATA_OFFSET+2]= 0; /* Zero terminator for empty db */
1972 q+= Q_DATA_OFFSET + 3;
1973 }
1974 memcpy(q, "BEGIN", 5);
1975
1976 if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
1977 {
1978 ha_checksum crc= my_checksum(0, p, data_len);
1979 int4store(p + data_len, crc);
1980 }
1981 return 0;
1982 }
1983
1984
1985 /**************************************************************************
1986 Start_log_event_v3 methods
1987 **************************************************************************/
1988
1989
Start_log_event_v3(const char * buf,uint event_len,const Format_description_log_event * description_event)1990 Start_log_event_v3::Start_log_event_v3(const char* buf, uint event_len,
1991 const Format_description_log_event
1992 *description_event)
1993 :Log_event(buf, description_event), binlog_version(BINLOG_VERSION)
1994 {
1995 if (event_len < LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET)
1996 {
1997 server_version[0]= 0;
1998 return;
1999 }
2000 buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2001 binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
2002 memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
2003 ST_SERVER_VER_LEN);
2004 // prevent overrun if log is corrupted on disk
2005 server_version[ST_SERVER_VER_LEN-1]= 0;
2006 created= uint4korr(buf+ST_CREATED_OFFSET);
2007 dont_set_created= 1;
2008 }
2009
2010
2011 /***************************************************************************
2012 Format_description_log_event methods
2013 ****************************************************************************/
2014
2015 /**
2016 Format_description_log_event 1st ctor.
2017
2018 Ctor. Can be used to create the event to write to the binary log (when the
2019 server starts or when FLUSH LOGS), or to create artificial events to parse
2020 binlogs from MySQL 3.23 or 4.x.
2021 When in a client, only the 2nd use is possible.
2022
2023 @param binlog_version the binlog version for which we want to build
2024 an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
2025 x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
2026 old 4.0 (binlog version 2) is not supported;
2027 it should not be used for replication with
2028 5.0.
2029 @param server_ver a string containing the server version.
2030 */
2031
2032 Format_description_log_event::
Format_description_log_event(uint8 binlog_ver,const char * server_ver)2033 Format_description_log_event(uint8 binlog_ver, const char* server_ver)
2034 :Start_log_event_v3(), event_type_permutation(0)
2035 {
2036 binlog_version= binlog_ver;
2037 switch (binlog_ver) {
2038 case 4: /* MySQL 5.0 */
2039 memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2040 DBUG_EXECUTE_IF("pretend_version_50034_in_binlog",
2041 strmov(server_version, "5.0.34"););
2042 common_header_len= LOG_EVENT_HEADER_LEN;
2043 number_of_event_types= LOG_EVENT_TYPES;
2044 /* we'll catch my_malloc() error in is_valid() */
2045 post_header_len=(uint8*) my_malloc(PSI_INSTRUMENT_ME,
2046 number_of_event_types*sizeof(uint8)
2047 + BINLOG_CHECKSUM_ALG_DESC_LEN,
2048 MYF(0));
2049 /*
2050 This long list of assignments is not beautiful, but I see no way to
2051 make it nicer, as the right members are #defines, not array members, so
2052 it's impossible to write a loop.
2053 */
2054 if (post_header_len)
2055 {
2056 #ifndef DBUG_OFF
2057 // Allows us to sanity-check that all events initialized their
2058 // events (see the end of this 'if' block).
2059 memset(post_header_len, 255, number_of_event_types*sizeof(uint8));
2060 #endif
2061
2062 /* Note: all event types must explicitly fill in their lengths here. */
2063 post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2064 post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
2065 post_header_len[STOP_EVENT-1]= STOP_HEADER_LEN;
2066 post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
2067 post_header_len[INTVAR_EVENT-1]= INTVAR_HEADER_LEN;
2068 post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2069 post_header_len[SLAVE_EVENT-1]= SLAVE_HEADER_LEN;
2070 post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2071 post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2072 post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2073 post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2074 post_header_len[NEW_LOAD_EVENT-1]= NEW_LOAD_HEADER_LEN;
2075 post_header_len[RAND_EVENT-1]= RAND_HEADER_LEN;
2076 post_header_len[USER_VAR_EVENT-1]= USER_VAR_HEADER_LEN;
2077 post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
2078 post_header_len[XID_EVENT-1]= XID_HEADER_LEN;
2079 post_header_len[XA_PREPARE_LOG_EVENT-1]= XA_PREPARE_HEADER_LEN;
2080 post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= BEGIN_LOAD_QUERY_HEADER_LEN;
2081 post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
2082 /*
2083 The PRE_GA events are never be written to any binlog, but
2084 their lengths are included in Format_description_log_event.
2085 Hence, we need to be assign some value here, to avoid reading
2086 uninitialized memory when the array is written to disk.
2087 */
2088 post_header_len[PRE_GA_WRITE_ROWS_EVENT-1] = 0;
2089 post_header_len[PRE_GA_UPDATE_ROWS_EVENT-1] = 0;
2090 post_header_len[PRE_GA_DELETE_ROWS_EVENT-1] = 0;
2091
2092 post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
2093 post_header_len[WRITE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
2094 post_header_len[UPDATE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
2095 post_header_len[DELETE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
2096 /*
2097 We here have the possibility to simulate a master of before we changed
2098 the table map id to be stored in 6 bytes: when it was stored in 4
2099 bytes (=> post_header_len was 6). This is used to test backward
2100 compatibility.
2101 This code can be removed after a few months (today is Dec 21st 2005),
2102 when we know that the 4-byte masters are not deployed anymore (check
2103 with Tomas Ulin first!), and the accompanying test (rpl_row_4_bytes)
2104 too.
2105 */
2106 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
2107 post_header_len[TABLE_MAP_EVENT-1]=
2108 post_header_len[WRITE_ROWS_EVENT_V1-1]=
2109 post_header_len[UPDATE_ROWS_EVENT_V1-1]=
2110 post_header_len[DELETE_ROWS_EVENT_V1-1]= 6;);
2111 post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
2112 post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
2113 post_header_len[IGNORABLE_LOG_EVENT-1]= 0;
2114 post_header_len[ROWS_QUERY_LOG_EVENT-1]= 0;
2115 post_header_len[GTID_LOG_EVENT-1]= 0;
2116 post_header_len[ANONYMOUS_GTID_LOG_EVENT-1]= 0;
2117 post_header_len[PREVIOUS_GTIDS_LOG_EVENT-1]= 0;
2118 post_header_len[TRANSACTION_CONTEXT_EVENT-1]= 0;
2119 post_header_len[VIEW_CHANGE_EVENT-1]= 0;
2120 post_header_len[XA_PREPARE_LOG_EVENT-1]= 0;
2121 post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
2122 post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
2123 post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
2124
2125 // Set header length of the reserved events to 0
2126 memset(post_header_len + MYSQL_EVENTS_END - 1, 0,
2127 (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8));
2128
2129 // Set header lengths of Maria events
2130 post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
2131 post_header_len[BINLOG_CHECKPOINT_EVENT-1]=
2132 BINLOG_CHECKPOINT_HEADER_LEN;
2133 post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN;
2134 post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
2135 post_header_len[START_ENCRYPTION_EVENT-1]= START_ENCRYPTION_HEADER_LEN;
2136
2137 //compressed event
2138 post_header_len[QUERY_COMPRESSED_EVENT-1]= QUERY_HEADER_LEN;
2139 post_header_len[WRITE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
2140 post_header_len[UPDATE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
2141 post_header_len[DELETE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
2142 post_header_len[WRITE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
2143 post_header_len[UPDATE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
2144 post_header_len[DELETE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
2145
2146 // Sanity-check that all post header lengths are initialized.
2147 int i;
2148 for (i=0; i<number_of_event_types; i++)
2149 DBUG_ASSERT(post_header_len[i] != 255);
2150 }
2151 break;
2152
2153 case 1: /* 3.23 */
2154 case 3: /* 4.0.x x>=2 */
2155 /*
2156 We build an artificial (i.e. not sent by the master) event, which
2157 describes what those old master versions send.
2158 */
2159 if (binlog_ver==1)
2160 strmov(server_version, server_ver ? server_ver : "3.23");
2161 else
2162 strmov(server_version, server_ver ? server_ver : "4.0");
2163 common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2164 LOG_EVENT_MINIMAL_HEADER_LEN;
2165 /*
2166 The first new event in binlog version 4 is Format_desc. So any event type
2167 after that does not exist in older versions. We use the events known by
2168 version 3, even if version 1 had only a subset of them (this is not a
2169 problem: it uses a few bytes for nothing but unifies code; it does not
2170 make the slave detect less corruptions).
2171 */
2172 number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2173 post_header_len=(uint8*) my_malloc(PSI_INSTRUMENT_ME,
2174 number_of_event_types*sizeof(uint8), MYF(0));
2175 if (post_header_len)
2176 {
2177 post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2178 post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2179 post_header_len[STOP_EVENT-1]= 0;
2180 post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2181 post_header_len[INTVAR_EVENT-1]= 0;
2182 post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2183 post_header_len[SLAVE_EVENT-1]= 0;
2184 post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2185 post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2186 post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2187 post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2188 post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2189 post_header_len[RAND_EVENT-1]= 0;
2190 post_header_len[USER_VAR_EVENT-1]= 0;
2191 }
2192 break;
2193 default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2194 post_header_len= 0; /* will make is_valid() fail */
2195 break;
2196 }
2197 calc_server_version_split();
2198 checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
2199 reset_crypto();
2200 }
2201
2202
2203 /**
2204 The problem with this constructor is that the fixed header may have a
2205 length different from this version, but we don't know this length as we
2206 have not read the Format_description_log_event which says it, yet. This
2207 length is in the post-header of the event, but we don't know where the
2208 post-header starts.
2209
2210 So this type of event HAS to:
2211 - either have the header's length at the beginning (in the header, at a
2212 fixed position which will never be changed), not in the post-header. That
2213 would make the header be "shifted" compared to other events.
2214 - or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
2215 versions, so that we know for sure.
2216
2217 I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
2218 it is sent before Format_description_log_event).
2219 */
2220
2221 Format_description_log_event::
Format_description_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2222 Format_description_log_event(const char* buf,
2223 uint event_len,
2224 const
2225 Format_description_log_event*
2226 description_event)
2227 :Start_log_event_v3(buf, event_len, description_event),
2228 common_header_len(0), post_header_len(NULL), event_type_permutation(0)
2229 {
2230 DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)");
2231 if (!Start_log_event_v3::is_valid())
2232 DBUG_VOID_RETURN; /* sanity check */
2233 buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2234 if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
2235 DBUG_VOID_RETURN; /* sanity check */
2236 number_of_event_types=
2237 event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET + 1);
2238 DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d",
2239 common_header_len, number_of_event_types));
2240 /* If alloc fails, we'll detect it in is_valid() */
2241
2242 post_header_len= (uint8*) my_memdup(PSI_INSTRUMENT_ME,
2243 buf+ST_COMMON_HEADER_LEN_OFFSET+1,
2244 number_of_event_types*
2245 sizeof(*post_header_len),
2246 MYF(0));
2247 calc_server_version_split();
2248 if (!is_version_before_checksum(&server_version_split))
2249 {
2250 /* the last bytes are the checksum alg desc and value (or value's room) */
2251 number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN;
2252 checksum_alg= (enum_binlog_checksum_alg)post_header_len[number_of_event_types];
2253 }
2254 else
2255 {
2256 checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
2257 }
2258 reset_crypto();
2259
2260 DBUG_VOID_RETURN;
2261 }
2262
start_decryption(Start_encryption_log_event * sele)2263 bool Format_description_log_event::start_decryption(Start_encryption_log_event* sele)
2264 {
2265 DBUG_ASSERT(crypto_data.scheme == 0);
2266
2267 if (!sele->is_valid())
2268 return 1;
2269
2270 memcpy(crypto_data.nonce, sele->nonce, BINLOG_NONCE_LENGTH);
2271 return crypto_data.init(sele->crypto_scheme, sele->key_version);
2272 }
2273
2274
Version(const char * version,const char ** endptr)2275 Version::Version(const char *version, const char **endptr)
2276 {
2277 const char *p= version;
2278 ulong number;
2279 for (uint i= 0; i<=2; i++)
2280 {
2281 char *r;
2282 number= strtoul(p, &r, 10);
2283 /*
2284 It is an invalid version if any version number greater than 255 or
2285 first number is not followed by '.'.
2286 */
2287 if (number < 256 && (*r == '.' || i != 0))
2288 m_ver[i]= (uchar) number;
2289 else
2290 {
2291 *this= Version();
2292 break;
2293 }
2294
2295 p= r;
2296 if (*r == '.')
2297 p++; // skip the dot
2298 }
2299 endptr[0]= p;
2300 }
2301
2302
2303 Format_description_log_event::
master_version_split(const char * version)2304 master_version_split::master_version_split(const char *version)
2305 {
2306 const char *p;
2307 static_cast<Version*>(this)[0]= Version(version, &p);
2308 if (strstr(p, "MariaDB") != 0 || strstr(p, "-maria-") != 0)
2309 kind= KIND_MARIADB;
2310 else
2311 kind= KIND_MYSQL;
2312 }
2313
2314
2315 /**
2316 Splits the event's 'server_version' string into three numeric pieces stored
2317 into 'server_version_split':
2318 X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2319 X.Yabc -> {X,Y,0}
2320 'server_version_split' is then used for lookups to find if the server which
2321 created this event has some known bug.
2322 */
calc_server_version_split()2323 void Format_description_log_event::calc_server_version_split()
2324 {
2325 server_version_split= master_version_split(server_version);
2326
2327 DBUG_PRINT("info",("Format_description_log_event::server_version_split:"
2328 " '%s' %d %d %d", server_version,
2329 server_version_split[0],
2330 server_version_split[1], server_version_split[2]));
2331 }
2332
2333
2334 /**
2335 @return TRUE is the event's version is earlier than one that introduced
2336 the replication event checksum. FALSE otherwise.
2337 */
2338 bool
is_version_before_checksum(const master_version_split * version_split)2339 Format_description_log_event::is_version_before_checksum(const master_version_split
2340 *version_split)
2341 {
2342 return *version_split <
2343 (version_split->kind == master_version_split::KIND_MARIADB ?
2344 checksum_version_split_mariadb : checksum_version_split_mysql);
2345 }
2346
2347 /**
2348 @param buf buffer holding serialized FD event
2349 @param len netto (possible checksum is stripped off) length of the event buf
2350
2351 @return the version-safe checksum alg descriptor where zero
2352 designates no checksum, 255 - the orginator is
2353 checksum-unaware (effectively no checksum) and the actuall
2354 [1-254] range alg descriptor.
2355 */
get_checksum_alg(const char * buf,ulong len)2356 enum enum_binlog_checksum_alg get_checksum_alg(const char* buf, ulong len)
2357 {
2358 enum enum_binlog_checksum_alg ret;
2359 char version[ST_SERVER_VER_LEN];
2360
2361 DBUG_ENTER("get_checksum_alg");
2362 DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
2363
2364 memcpy(version,
2365 buf + LOG_EVENT_MINIMAL_HEADER_LEN + ST_SERVER_VER_OFFSET,
2366 ST_SERVER_VER_LEN);
2367 version[ST_SERVER_VER_LEN - 1]= 0;
2368
2369 Format_description_log_event::master_version_split version_split(version);
2370 ret= Format_description_log_event::is_version_before_checksum(&version_split)
2371 ? BINLOG_CHECKSUM_ALG_UNDEF
2372 : (enum_binlog_checksum_alg)buf[len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN];
2373 DBUG_ASSERT(ret == BINLOG_CHECKSUM_ALG_OFF ||
2374 ret == BINLOG_CHECKSUM_ALG_UNDEF ||
2375 ret == BINLOG_CHECKSUM_ALG_CRC32);
2376 DBUG_RETURN(ret);
2377 }
2378
Start_encryption_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2379 Start_encryption_log_event::Start_encryption_log_event(
2380 const char* buf, uint event_len,
2381 const Format_description_log_event* description_event)
2382 :Log_event(buf, description_event)
2383 {
2384 if ((int)event_len ==
2385 LOG_EVENT_MINIMAL_HEADER_LEN + Start_encryption_log_event::get_data_size())
2386 {
2387 buf += LOG_EVENT_MINIMAL_HEADER_LEN;
2388 crypto_scheme = *(uchar*)buf;
2389 key_version = uint4korr(buf + BINLOG_CRYPTO_SCHEME_LENGTH);
2390 memcpy(nonce,
2391 buf + BINLOG_CRYPTO_SCHEME_LENGTH + BINLOG_KEY_VERSION_LENGTH,
2392 BINLOG_NONCE_LENGTH);
2393 }
2394 else
2395 crypto_scheme= ~0; // invalid
2396 }
2397
2398
2399 /**************************************************************************
2400 Load_log_event methods
2401 General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2402 going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2403 However, the 5.0 slave could still have to read such events (from a 4.x
2404 master), convert them (which just means maybe expand the header, when 5.0
2405 servers have a UID in events) (remember that whatever is after the header
2406 will be like in 4.x, as this event's format is not modified in 5.0 as we
2407 will use new types of events to log the new LOAD DATA INFILE features).
2408 To be able to read/convert, we just need to not assume that the common
2409 header is of length LOG_EVENT_HEADER_LEN (we must use the description
2410 event).
2411 Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2412 between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2413 positions displayed in SHOW SLAVE STATUS then are fine too).
2414 **************************************************************************/
2415
2416
2417 /**
2418 @note
2419 The caller must do buf[event_len] = 0 before he starts using the
2420 constructed event.
2421 */
Load_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2422 Load_log_event::Load_log_event(const char *buf, uint event_len,
2423 const Format_description_log_event *description_event)
2424 :Log_event(buf, description_event), num_fields(0), fields(0),
2425 field_lens(0),field_block_len(0),
2426 table_name(0), db(0), fname(0), local_fname(FALSE),
2427 /*
2428 Load_log_event which comes from the binary log does not contain
2429 information about the type of insert which was used on the master.
2430 Assume that it was an ordinary, non-concurrent LOAD DATA.
2431 */
2432 is_concurrent(FALSE)
2433 {
2434 DBUG_ENTER("Load_log_event");
2435 /*
2436 I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2437 4.0->5.0 and 5.0->5.0 and it works.
2438 */
2439 if (event_len)
2440 copy_log_event(buf, event_len,
2441 (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2442 LOAD_HEADER_LEN +
2443 description_event->common_header_len :
2444 LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2445 description_event);
2446 /* otherwise it's a derived class, will call copy_log_event() itself */
2447 DBUG_VOID_RETURN;
2448 }
2449
2450
2451 /*
2452 Load_log_event::copy_log_event()
2453 */
2454
copy_log_event(const char * buf,ulong event_len,int body_offset,const Format_description_log_event * description_event)2455 int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2456 int body_offset,
2457 const Format_description_log_event *description_event)
2458 {
2459 DBUG_ENTER("Load_log_event::copy_log_event");
2460 uint data_len;
2461 if ((int) event_len <= body_offset)
2462 DBUG_RETURN(1);
2463 char* buf_end = (char*)buf + event_len;
2464 /* this is the beginning of the post-header */
2465 const char* data_head = buf + description_event->common_header_len;
2466 thread_id= slave_proxy_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2467 exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2468 skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2469 table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2470 db_len = (uint)data_head[L_DB_LEN_OFFSET];
2471 num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2472
2473 /*
2474 Sql_ex.init() on success returns the pointer to the first byte after
2475 the sql_ex structure, which is the start of field lengths array.
2476 */
2477 if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset,
2478 buf_end,
2479 (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2480 DBUG_RETURN(1);
2481
2482 data_len = event_len - body_offset;
2483 if (num_fields > data_len) // simple sanity check against corruption
2484 DBUG_RETURN(1);
2485 for (uint i = 0; i < num_fields; i++)
2486 field_block_len += (uint)field_lens[i] + 1;
2487
2488 fields = (char*)field_lens + num_fields;
2489 table_name = fields + field_block_len;
2490 if (strlen(table_name) > NAME_LEN)
2491 goto err;
2492
2493 db = table_name + table_name_len + 1;
2494 DBUG_EXECUTE_IF ("simulate_invalid_address",
2495 db_len = data_len;);
2496 fname = db + db_len + 1;
2497 if ((db_len > data_len) || (fname > buf_end))
2498 goto err;
2499 fname_len = (uint) strlen(fname);
2500 if ((fname_len > data_len) || (fname + fname_len > buf_end))
2501 goto err;
2502 // null termination is accomplished by the caller doing buf[event_len]=0
2503
2504 DBUG_RETURN(0);
2505
2506 err:
2507 // Invalid event.
2508 table_name = 0;
2509 DBUG_RETURN(1);
2510 }
2511
2512
2513 /**************************************************************************
2514 Rotate_log_event methods
2515 **************************************************************************/
2516
Rotate_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2517 Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
2518 const Format_description_log_event* description_event)
2519 :Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
2520 {
2521 DBUG_ENTER("Rotate_log_event::Rotate_log_event(char*,...)");
2522 // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
2523 uint8 post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
2524 uint ident_offset;
2525 if (event_len < (uint)(LOG_EVENT_MINIMAL_HEADER_LEN + post_header_len))
2526 DBUG_VOID_RETURN;
2527 buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2528 pos= post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
2529 ident_len= (uint)(event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + post_header_len));
2530 ident_offset= post_header_len;
2531 set_if_smaller(ident_len,FN_REFLEN-1);
2532 new_log_ident= my_strndup(PSI_INSTRUMENT_ME, buf + ident_offset, (uint) ident_len, MYF(MY_WME));
2533 DBUG_PRINT("debug", ("new_log_ident: '%s'", new_log_ident));
2534 DBUG_VOID_RETURN;
2535 }
2536
2537
2538 /**************************************************************************
2539 Binlog_checkpoint_log_event methods
2540 **************************************************************************/
2541
Binlog_checkpoint_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2542 Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
2543 const char *buf, uint event_len,
2544 const Format_description_log_event *description_event)
2545 :Log_event(buf, description_event), binlog_file_name(0)
2546 {
2547 uint8 header_size= description_event->common_header_len;
2548 uint8 post_header_len=
2549 description_event->post_header_len[BINLOG_CHECKPOINT_EVENT-1];
2550 if (event_len < (uint) header_size + (uint) post_header_len ||
2551 post_header_len < BINLOG_CHECKPOINT_HEADER_LEN)
2552 return;
2553 buf+= header_size;
2554 /* See uint4korr and int4store below */
2555 compile_time_assert(BINLOG_CHECKPOINT_HEADER_LEN == 4);
2556 binlog_file_len= uint4korr(buf);
2557 if (event_len - (header_size + post_header_len) < binlog_file_len)
2558 return;
2559 binlog_file_name= my_strndup(PSI_INSTRUMENT_ME, buf + post_header_len, binlog_file_len,
2560 MYF(MY_WME));
2561 return;
2562 }
2563
2564
2565 /**************************************************************************
2566 Global transaction ID stuff
2567 **************************************************************************/
2568
Gtid_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2569 Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
2570 const Format_description_log_event *description_event)
2571 : Log_event(buf, description_event), seq_no(0), commit_id(0)
2572 {
2573 uint8 header_size= description_event->common_header_len;
2574 uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
2575 if (event_len < (uint) header_size + (uint) post_header_len ||
2576 post_header_len < GTID_HEADER_LEN)
2577 return;
2578
2579 buf+= header_size;
2580 seq_no= uint8korr(buf);
2581 buf+= 8;
2582 domain_id= uint4korr(buf);
2583 buf+= 4;
2584 flags2= *(buf++);
2585 if (flags2 & FL_GROUP_COMMIT_ID)
2586 {
2587 if (event_len < (uint)header_size + GTID_HEADER_LEN + 2)
2588 {
2589 seq_no= 0; // So is_valid() returns false
2590 return;
2591 }
2592 commit_id= uint8korr(buf);
2593 buf+= 8;
2594 }
2595 if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA))
2596 {
2597 xid.formatID= uint4korr(buf);
2598 buf+= 4;
2599
2600 xid.gtrid_length= (long) buf[0];
2601 xid.bqual_length= (long) buf[1];
2602 buf+= 2;
2603
2604 long data_length= xid.bqual_length + xid.gtrid_length;
2605 memcpy(xid.data, buf, data_length);
2606 buf+= data_length;
2607 }
2608 }
2609
2610
2611 /* GTID list. */
2612
Gtid_list_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2613 Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
2614 const Format_description_log_event *description_event)
2615 : Log_event(buf, description_event), count(0), list(0), sub_id_list(0)
2616 {
2617 uint32 i;
2618 uint32 val;
2619 uint8 header_size= description_event->common_header_len;
2620 uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1];
2621 if (event_len < (uint) header_size + (uint) post_header_len ||
2622 post_header_len < GTID_LIST_HEADER_LEN)
2623 return;
2624
2625 buf+= header_size;
2626 val= uint4korr(buf);
2627 count= val & ((1<<28)-1);
2628 gl_flags= val & ((uint32)0xf << 28);
2629 buf+= 4;
2630 if (event_len - (header_size + post_header_len) < count*element_size ||
2631 (!(list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
2632 count*sizeof(*list) + (count == 0), MYF(MY_WME)))))
2633 return;
2634
2635 for (i= 0; i < count; ++i)
2636 {
2637 list[i].domain_id= uint4korr(buf);
2638 buf+= 4;
2639 list[i].server_id= uint4korr(buf);
2640 buf+= 4;
2641 list[i].seq_no= uint8korr(buf);
2642 buf+= 8;
2643 }
2644
2645 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2646 if ((gl_flags & FLAG_IGN_GTIDS))
2647 {
2648 uint32 i;
2649 if (!(sub_id_list= (uint64 *)my_malloc(PSI_INSTRUMENT_ME,
2650 count*sizeof(uint64), MYF(MY_WME))))
2651 {
2652 my_free(list);
2653 list= NULL;
2654 return;
2655 }
2656 for (i= 0; i < count; ++i)
2657 {
2658 if (!(sub_id_list[i]=
2659 rpl_global_gtid_slave_state->next_sub_id(list[i].domain_id)))
2660 {
2661 my_free(list);
2662 my_free(sub_id_list);
2663 list= NULL;
2664 sub_id_list= NULL;
2665 return;
2666 }
2667 }
2668 }
2669 #endif
2670 }
2671
2672
2673 /*
2674 Used to record gtid_list event while sending binlog to slave, without having to
2675 fully contruct the event object.
2676 */
2677 bool
peek(const char * event_start,size_t event_len,enum enum_binlog_checksum_alg checksum_alg,rpl_gtid ** out_gtid_list,uint32 * out_list_len,const Format_description_log_event * fdev)2678 Gtid_list_log_event::peek(const char *event_start, size_t event_len,
2679 enum enum_binlog_checksum_alg checksum_alg,
2680 rpl_gtid **out_gtid_list, uint32 *out_list_len,
2681 const Format_description_log_event *fdev)
2682 {
2683 const char *p;
2684 uint32 count_field, count;
2685 rpl_gtid *gtid_list;
2686
2687 if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
2688 {
2689 if (event_len > BINLOG_CHECKSUM_LEN)
2690 event_len-= BINLOG_CHECKSUM_LEN;
2691 else
2692 event_len= 0;
2693 }
2694 else
2695 DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
2696 checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
2697
2698 if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN)
2699 return true;
2700 p= event_start + fdev->common_header_len;
2701 count_field= uint4korr(p);
2702 p+= 4;
2703 count= count_field & ((1<<28)-1);
2704 if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN +
2705 16 * count)
2706 return true;
2707 if (!(gtid_list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
2708 sizeof(rpl_gtid)*count + (count == 0), MYF(MY_WME))))
2709 return true;
2710 *out_gtid_list= gtid_list;
2711 *out_list_len= count;
2712 while (count--)
2713 {
2714 gtid_list->domain_id= uint4korr(p);
2715 p+= 4;
2716 gtid_list->server_id= uint4korr(p);
2717 p+= 4;
2718 gtid_list->seq_no= uint8korr(p);
2719 p+= 8;
2720 ++gtid_list;
2721 }
2722
2723 return false;
2724 }
2725
2726
2727 /**************************************************************************
2728 Intvar_log_event methods
2729 **************************************************************************/
2730
2731 /*
2732 Intvar_log_event::Intvar_log_event()
2733 */
2734
Intvar_log_event(const char * buf,const Format_description_log_event * description_event)2735 Intvar_log_event::Intvar_log_event(const char* buf,
2736 const Format_description_log_event* description_event)
2737 :Log_event(buf, description_event)
2738 {
2739 /* The Post-Header is empty. The Variable Data part begins immediately. */
2740 buf+= description_event->common_header_len +
2741 description_event->post_header_len[INTVAR_EVENT-1];
2742 type= buf[I_TYPE_OFFSET];
2743 val= uint8korr(buf+I_VAL_OFFSET);
2744 }
2745
2746
2747 /*
2748 Intvar_log_event::get_var_type_name()
2749 */
2750
get_var_type_name()2751 const char* Intvar_log_event::get_var_type_name()
2752 {
2753 switch(type) {
2754 case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
2755 case INSERT_ID_EVENT: return "INSERT_ID";
2756 default: /* impossible */ return "UNKNOWN";
2757 }
2758 }
2759
2760
2761 /**************************************************************************
2762 Rand_log_event methods
2763 **************************************************************************/
2764
Rand_log_event(const char * buf,const Format_description_log_event * description_event)2765 Rand_log_event::Rand_log_event(const char* buf,
2766 const Format_description_log_event* description_event)
2767 :Log_event(buf, description_event)
2768 {
2769 /* The Post-Header is empty. The Variable Data part begins immediately. */
2770 buf+= description_event->common_header_len +
2771 description_event->post_header_len[RAND_EVENT-1];
2772 seed1= uint8korr(buf+RAND_SEED1_OFFSET);
2773 seed2= uint8korr(buf+RAND_SEED2_OFFSET);
2774 }
2775
2776
2777 /**************************************************************************
2778 Xid_log_event methods
2779 **************************************************************************/
2780
2781 /**
2782 @note
2783 It's ok not to use int8store here,
2784 as long as xid_t::set(ulonglong) and
2785 xid_t::get_my_xid doesn't do it either.
2786 We don't care about actual values of xids as long as
2787 identical numbers compare identically
2788 */
2789
2790 Xid_log_event::
Xid_log_event(const char * buf,const Format_description_log_event * description_event)2791 Xid_log_event(const char* buf,
2792 const Format_description_log_event *description_event)
2793 :Xid_apply_log_event(buf, description_event)
2794 {
2795 /* The Post-Header is empty. The Variable Data part begins immediately. */
2796 buf+= description_event->common_header_len +
2797 description_event->post_header_len[XID_EVENT-1];
2798 memcpy((char*) &xid, buf, sizeof(xid));
2799 }
2800
2801 /**************************************************************************
2802 XA_prepare_log_event methods
2803 **************************************************************************/
2804 XA_prepare_log_event::
XA_prepare_log_event(const char * buf,const Format_description_log_event * description_event)2805 XA_prepare_log_event(const char* buf,
2806 const Format_description_log_event *description_event)
2807 :Xid_apply_log_event(buf, description_event)
2808 {
2809 buf+= description_event->common_header_len +
2810 description_event->post_header_len[XA_PREPARE_LOG_EVENT-1];
2811 one_phase= * (bool *) buf;
2812 buf+= 1;
2813
2814 m_xid.formatID= uint4korr(buf);
2815 buf+= 4;
2816 m_xid.gtrid_length= uint4korr(buf);
2817 buf+= 4;
2818 // Todo: validity here and elsewhere checks to be replaced by MDEV-21839 fixes
2819 if (m_xid.gtrid_length <= 0 || m_xid.gtrid_length > MAXGTRIDSIZE)
2820 {
2821 m_xid.formatID= -1;
2822 return;
2823 }
2824 m_xid.bqual_length= uint4korr(buf);
2825 buf+= 4;
2826 if (m_xid.bqual_length < 0 || m_xid.bqual_length > MAXBQUALSIZE)
2827 {
2828 m_xid.formatID= -1;
2829 return;
2830 }
2831 DBUG_ASSERT(m_xid.gtrid_length + m_xid.bqual_length <= XIDDATASIZE);
2832
2833 memcpy(m_xid.data, buf, m_xid.gtrid_length + m_xid.bqual_length);
2834
2835 xid= NULL;
2836 }
2837
2838
2839 /**************************************************************************
2840 User_var_log_event methods
2841 **************************************************************************/
2842
2843 User_var_log_event::
User_var_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)2844 User_var_log_event(const char* buf, uint event_len,
2845 const Format_description_log_event* description_event)
2846 :Log_event(buf, description_event)
2847 #ifndef MYSQL_CLIENT
2848 , deferred(false), query_id(0)
2849 #endif
2850 {
2851 bool error= false;
2852 const char* buf_start= buf, *buf_end= buf + event_len;
2853
2854 /* The Post-Header is empty. The Variable Data part begins immediately. */
2855 buf+= description_event->common_header_len +
2856 description_event->post_header_len[USER_VAR_EVENT-1];
2857 name_len= uint4korr(buf);
2858 /* Avoid reading out of buffer */
2859 if ((buf - buf_start) + UV_NAME_LEN_SIZE + name_len > event_len)
2860 {
2861 error= true;
2862 goto err;
2863 }
2864
2865 name= (char *) buf + UV_NAME_LEN_SIZE;
2866
2867 /*
2868 We don't know yet is_null value, so we must assume that name_len
2869 may have the bigger value possible, is_null= True and there is no
2870 payload for val, or even that name_len is 0.
2871 */
2872 if (name + name_len + UV_VAL_IS_NULL > buf_end)
2873 {
2874 error= true;
2875 goto err;
2876 }
2877
2878 buf+= UV_NAME_LEN_SIZE + name_len;
2879 is_null= (bool) *buf;
2880 flags= User_var_log_event::UNDEF_F; // defaults to UNDEF_F
2881 if (is_null)
2882 {
2883 type= STRING_RESULT;
2884 charset_number= my_charset_bin.number;
2885 val_len= 0;
2886 val= 0;
2887 }
2888 else
2889 {
2890 val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
2891 UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
2892
2893 if (val > buf_end)
2894 {
2895 error= true;
2896 goto err;
2897 }
2898
2899 type= (Item_result) buf[UV_VAL_IS_NULL];
2900 charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
2901 val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
2902 UV_CHARSET_NUMBER_SIZE);
2903
2904 /**
2905 We need to check if this is from an old server
2906 that did not pack information for flags.
2907 We do this by checking if there are extra bytes
2908 after the packed value. If there are we take the
2909 extra byte and it's value is assumed to contain
2910 the flags value.
2911
2912 Old events will not have this extra byte, thence,
2913 we keep the flags set to UNDEF_F.
2914 */
2915 size_t bytes_read= (val + val_len) - buf_start;
2916 if (bytes_read > event_len)
2917 {
2918 error= true;
2919 goto err;
2920 }
2921 if ((data_written - bytes_read) > 0)
2922 {
2923 flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
2924 UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE +
2925 val_len);
2926 }
2927 }
2928
2929 err:
2930 if (unlikely(error))
2931 name= 0;
2932 }
2933
2934
2935 /**************************************************************************
2936 Create_file_log_event methods
2937 **************************************************************************/
2938
2939 /*
2940 Create_file_log_event ctor
2941 */
2942
Create_file_log_event(const char * buf,uint len,const Format_description_log_event * description_event)2943 Create_file_log_event::Create_file_log_event(const char* buf, uint len,
2944 const Format_description_log_event* description_event)
2945 :Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
2946 {
2947 DBUG_ENTER("Create_file_log_event::Create_file_log_event(char*,...)");
2948 uint block_offset;
2949 uint header_len= description_event->common_header_len;
2950 uint8 load_header_len= description_event->post_header_len[LOAD_EVENT-1];
2951 uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
2952 if (!(event_buf= (char*) my_memdup(PSI_INSTRUMENT_ME, buf, len, MYF(MY_WME))) ||
2953 copy_log_event(event_buf,len,
2954 (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2955 load_header_len + header_len :
2956 (fake_base ? (header_len+load_header_len) :
2957 (header_len+load_header_len) +
2958 create_file_header_len)),
2959 description_event))
2960 DBUG_VOID_RETURN;
2961 if (description_event->binlog_version!=1)
2962 {
2963 file_id= uint4korr(buf +
2964 header_len +
2965 load_header_len + CF_FILE_ID_OFFSET);
2966 /*
2967 Note that it's ok to use get_data_size() below, because it is computed
2968 with values we have already read from this event (because we called
2969 copy_log_event()); we are not using slave's format info to decode
2970 master's format, we are really using master's format info.
2971 Anyway, both formats should be identical (except the common_header_len)
2972 as these Load events are not changed between 4.0 and 5.0 (as logging of
2973 LOAD DATA INFILE does not use Load_log_event in 5.0).
2974
2975 The + 1 is for \0 terminating fname
2976 */
2977 block_offset= (description_event->common_header_len +
2978 Load_log_event::get_data_size() +
2979 create_file_header_len + 1);
2980 if (len < block_offset)
2981 DBUG_VOID_RETURN;
2982 block = (uchar*)buf + block_offset;
2983 block_len = len - block_offset;
2984 }
2985 else
2986 {
2987 sql_ex.force_new_format();
2988 inited_from_old = 1;
2989 }
2990 DBUG_VOID_RETURN;
2991 }
2992
2993
2994 /**************************************************************************
2995 Append_block_log_event methods
2996 **************************************************************************/
2997
2998 /*
2999 Append_block_log_event ctor
3000 */
3001
Append_block_log_event(const char * buf,uint len,const Format_description_log_event * description_event)3002 Append_block_log_event::Append_block_log_event(const char* buf, uint len,
3003 const Format_description_log_event* description_event)
3004 :Log_event(buf, description_event),block(0)
3005 {
3006 DBUG_ENTER("Append_block_log_event::Append_block_log_event(char*,...)");
3007 uint8 common_header_len= description_event->common_header_len;
3008 uint8 append_block_header_len=
3009 description_event->post_header_len[APPEND_BLOCK_EVENT-1];
3010 uint total_header_len= common_header_len+append_block_header_len;
3011 if (len < total_header_len)
3012 DBUG_VOID_RETURN;
3013 file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
3014 block= (uchar*)buf + total_header_len;
3015 block_len= len - total_header_len;
3016 DBUG_VOID_RETURN;
3017 }
3018
3019
3020 /**************************************************************************
3021 Delete_file_log_event methods
3022 **************************************************************************/
3023
3024 /*
3025 Delete_file_log_event ctor
3026 */
3027
Delete_file_log_event(const char * buf,uint len,const Format_description_log_event * description_event)3028 Delete_file_log_event::Delete_file_log_event(const char* buf, uint len,
3029 const Format_description_log_event* description_event)
3030 :Log_event(buf, description_event),file_id(0)
3031 {
3032 uint8 common_header_len= description_event->common_header_len;
3033 uint8 delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
3034 if (len < (uint)(common_header_len + delete_file_header_len))
3035 return;
3036 file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
3037 }
3038
3039
3040 /**************************************************************************
3041 Execute_load_log_event methods
3042 **************************************************************************/
3043
3044 /*
3045 Execute_load_log_event ctor
3046 */
3047
Execute_load_log_event(const char * buf,uint len,const Format_description_log_event * description_event)3048 Execute_load_log_event::Execute_load_log_event(const char* buf, uint len,
3049 const Format_description_log_event* description_event)
3050 :Log_event(buf, description_event), file_id(0)
3051 {
3052 uint8 common_header_len= description_event->common_header_len;
3053 uint8 exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
3054 if (len < (uint)(common_header_len+exec_load_header_len))
3055 return;
3056 file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
3057 }
3058
3059
3060 /**************************************************************************
3061 Begin_load_query_log_event methods
3062 **************************************************************************/
3063
3064 Begin_load_query_log_event::
Begin_load_query_log_event(const char * buf,uint len,const Format_description_log_event * desc_event)3065 Begin_load_query_log_event(const char* buf, uint len,
3066 const Format_description_log_event* desc_event)
3067 :Append_block_log_event(buf, len, desc_event)
3068 {
3069 }
3070
3071
3072 /**************************************************************************
3073 Execute_load_query_log_event methods
3074 **************************************************************************/
3075
3076
3077 Execute_load_query_log_event::
Execute_load_query_log_event(const char * buf,uint event_len,const Format_description_log_event * desc_event)3078 Execute_load_query_log_event(const char* buf, uint event_len,
3079 const Format_description_log_event* desc_event):
3080 Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
3081 file_id(0), fn_pos_start(0), fn_pos_end(0)
3082 {
3083 if (!Query_log_event::is_valid())
3084 return;
3085
3086 buf+= desc_event->common_header_len;
3087
3088 fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
3089 fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
3090 dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
3091
3092 if (fn_pos_start > q_len || fn_pos_end > q_len ||
3093 dup_handling > LOAD_DUP_REPLACE)
3094 return;
3095
3096 file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
3097 }
3098
3099
get_post_header_size_for_derived()3100 ulong Execute_load_query_log_event::get_post_header_size_for_derived()
3101 {
3102 return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
3103 }
3104
3105
3106 /**************************************************************************
3107 sql_ex_info methods
3108 **************************************************************************/
3109
3110 /*
3111 sql_ex_info::init()
3112 */
3113
init(const char * buf,const char * buf_end,bool use_new_format)3114 const char *sql_ex_info::init(const char *buf, const char *buf_end,
3115 bool use_new_format)
3116 {
3117 cached_new_format = use_new_format;
3118 if (use_new_format)
3119 {
3120 empty_flags=0;
3121 /*
3122 The code below assumes that buf will not disappear from
3123 under our feet during the lifetime of the event. This assumption
3124 holds true in the slave thread if the log is in new format, but is not
3125 the case when we have old format because we will be reusing net buffer
3126 to read the actual file before we write out the Create_file event.
3127 */
3128 if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
3129 read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
3130 read_str(&buf, buf_end, &line_term, &line_term_len) ||
3131 read_str(&buf, buf_end, &line_start, &line_start_len) ||
3132 read_str(&buf, buf_end, &escaped, &escaped_len))
3133 return 0;
3134 opt_flags = *buf++;
3135 }
3136 else
3137 {
3138 if (buf_end - buf < 7)
3139 return 0; // Wrong data
3140 field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
3141 field_term = buf++; // Use first byte in string
3142 enclosed= buf++;
3143 line_term= buf++;
3144 line_start= buf++;
3145 escaped= buf++;
3146 opt_flags = *buf++;
3147 empty_flags= *buf++;
3148 if (empty_flags & FIELD_TERM_EMPTY)
3149 field_term_len=0;
3150 if (empty_flags & ENCLOSED_EMPTY)
3151 enclosed_len=0;
3152 if (empty_flags & LINE_TERM_EMPTY)
3153 line_term_len=0;
3154 if (empty_flags & LINE_START_EMPTY)
3155 line_start_len=0;
3156 if (empty_flags & ESCAPED_EMPTY)
3157 escaped_len=0;
3158 }
3159 return buf;
3160 }
3161
3162
3163
3164 /**************************************************************************
3165 Rows_log_event member functions
3166 **************************************************************************/
3167
3168
Rows_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3169 Rows_log_event::Rows_log_event(const char *buf, uint event_len,
3170 const Format_description_log_event
3171 *description_event)
3172 : Log_event(buf, description_event),
3173 m_row_count(0),
3174 #ifndef MYSQL_CLIENT
3175 m_table(NULL),
3176 #endif
3177 m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
3178 m_extra_row_data(0)
3179 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
3180 , m_curr_row(NULL), m_curr_row_end(NULL),
3181 m_key(NULL), m_key_info(NULL), m_key_nr(0),
3182 master_had_triggers(0)
3183 #endif
3184 {
3185 DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
3186 uint8 const common_header_len= description_event->common_header_len;
3187 Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
3188 m_type= event_type;
3189 m_cols_ai.bitmap= 0;
3190
3191 uint8 const post_header_len= description_event->post_header_len[event_type-1];
3192
3193 if (event_len < (uint)(common_header_len + post_header_len))
3194 {
3195 m_cols.bitmap= 0;
3196 DBUG_VOID_RETURN;
3197 }
3198
3199 DBUG_PRINT("enter",("event_len: %u common_header_len: %d "
3200 "post_header_len: %d",
3201 event_len, common_header_len,
3202 post_header_len));
3203
3204 const char *post_start= buf + common_header_len;
3205 post_start+= RW_MAPID_OFFSET;
3206 if (post_header_len == 6)
3207 {
3208 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
3209 m_table_id= uint4korr(post_start);
3210 post_start+= 4;
3211 }
3212 else
3213 {
3214 m_table_id= (ulong) uint6korr(post_start);
3215 post_start+= RW_FLAGS_OFFSET;
3216 }
3217
3218 m_flags_pos= post_start - buf;
3219 m_flags= uint2korr(post_start);
3220 post_start+= 2;
3221
3222 uint16 var_header_len= 0;
3223 if (post_header_len == ROWS_HEADER_LEN_V2)
3224 {
3225 /*
3226 Have variable length header, check length,
3227 which includes length bytes
3228 */
3229 var_header_len= uint2korr(post_start);
3230 /* Check length and also avoid out of buffer read */
3231 if (var_header_len < 2 ||
3232 event_len < static_cast<unsigned int>(var_header_len +
3233 (post_start - buf)))
3234 {
3235 m_cols.bitmap= 0;
3236 DBUG_VOID_RETURN;
3237 }
3238 var_header_len-= 2;
3239
3240 /* Iterate over var-len header, extracting 'chunks' */
3241 const char* start= post_start + 2;
3242 const char* end= start + var_header_len;
3243 for (const char* pos= start; pos < end;)
3244 {
3245 switch(*pos++)
3246 {
3247 case RW_V_EXTRAINFO_TAG:
3248 {
3249 /* Have an 'extra info' section, read it in */
3250 assert((end - pos) >= EXTRA_ROW_INFO_HDR_BYTES);
3251 uint8 infoLen= pos[EXTRA_ROW_INFO_LEN_OFFSET];
3252 assert((end - pos) >= infoLen);
3253 /* Just store/use the first tag of this type, skip others */
3254 if (likely(!m_extra_row_data))
3255 {
3256 m_extra_row_data= (uchar*) my_malloc(PSI_INSTRUMENT_ME, infoLen,
3257 MYF(MY_WME));
3258 if (likely(m_extra_row_data != NULL))
3259 {
3260 memcpy(m_extra_row_data, pos, infoLen);
3261 }
3262 }
3263 pos+= infoLen;
3264 break;
3265 }
3266 default:
3267 /* Unknown code, we will not understand anything further here */
3268 pos= end; /* Break loop */
3269 }
3270 }
3271 }
3272
3273 uchar const *const var_start=
3274 (const uchar *)buf + common_header_len + post_header_len + var_header_len;
3275 uchar const *const ptr_width= var_start;
3276 uchar *ptr_after_width= (uchar*) ptr_width;
3277 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
3278 m_width = net_field_length(&ptr_after_width);
3279 DBUG_PRINT("debug", ("m_width=%lu", m_width));
3280
3281 /* Avoid reading out of buffer */
3282 if (ptr_after_width + (m_width + 7) / 8 > (uchar*)buf + event_len)
3283 {
3284 m_cols.bitmap= NULL;
3285 DBUG_VOID_RETURN;
3286 }
3287
3288 /* if my_bitmap_init fails, caught in is_valid() */
3289 if (likely(!my_bitmap_init(&m_cols,
3290 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
3291 m_width,
3292 false)))
3293 {
3294 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
3295 memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
3296 create_last_word_mask(&m_cols);
3297 ptr_after_width+= (m_width + 7) / 8;
3298 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
3299 }
3300 else
3301 {
3302 // Needed because my_bitmap_init() does not set it to null on failure
3303 m_cols.bitmap= NULL;
3304 DBUG_VOID_RETURN;
3305 }
3306
3307 m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
3308
3309 if (LOG_EVENT_IS_UPDATE_ROW(event_type))
3310 {
3311 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
3312
3313 /* if my_bitmap_init fails, caught in is_valid() */
3314 if (likely(!my_bitmap_init(&m_cols_ai,
3315 m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
3316 m_width,
3317 false)))
3318 {
3319 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
3320 memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
3321 create_last_word_mask(&m_cols_ai);
3322 ptr_after_width+= (m_width + 7) / 8;
3323 DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
3324 no_bytes_in_map(&m_cols_ai));
3325 }
3326 else
3327 {
3328 // Needed because my_bitmap_init() does not set it to null on failure
3329 m_cols_ai.bitmap= 0;
3330 DBUG_VOID_RETURN;
3331 }
3332 }
3333
3334 const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
3335
3336 size_t const read_size= ptr_rows_data - (const unsigned char *) buf;
3337 if (read_size > event_len)
3338 {
3339 DBUG_VOID_RETURN;
3340 }
3341 size_t const data_size= event_len - read_size;
3342 DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
3343 m_table_id, m_flags, m_width, (ulong) data_size));
3344
3345 m_rows_buf= (uchar*) my_malloc(PSI_INSTRUMENT_ME, data_size, MYF(MY_WME));
3346 if (likely((bool)m_rows_buf))
3347 {
3348 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
3349 m_curr_row= m_rows_buf;
3350 #endif
3351 m_rows_end= m_rows_buf + data_size;
3352 m_rows_cur= m_rows_end;
3353 memcpy(m_rows_buf, ptr_rows_data, data_size);
3354 m_rows_before_size= ptr_rows_data - (const uchar *) buf; // Get the size that before SET part
3355 }
3356 else
3357 m_cols.bitmap= 0; // to not free it
3358
3359 DBUG_VOID_RETURN;
3360 }
3361
uncompress_buf()3362 void Rows_log_event::uncompress_buf()
3363 {
3364 uint32 un_len = binlog_get_uncompress_len((char *)m_rows_buf);
3365 if (!un_len)
3366 return;
3367
3368 uchar *new_buf= (uchar*) my_malloc(PSI_INSTRUMENT_ME, ALIGN_SIZE(un_len), MYF(MY_WME));
3369 if (new_buf)
3370 {
3371 if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf,
3372 (uint32)(m_rows_cur - m_rows_buf), &un_len))
3373 {
3374 my_free(m_rows_buf);
3375 m_rows_buf = new_buf;
3376 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
3377 m_curr_row= m_rows_buf;
3378 #endif
3379 m_rows_end= m_rows_buf + un_len;
3380 m_rows_cur= m_rows_end;
3381 return;
3382 }
3383 else
3384 {
3385 my_free(new_buf);
3386 }
3387 }
3388 m_cols.bitmap= 0; // catch it in is_valid
3389 }
3390
~Rows_log_event()3391 Rows_log_event::~Rows_log_event()
3392 {
3393 if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
3394 m_cols.bitmap= 0; // so no my_free in my_bitmap_free
3395 my_bitmap_free(&m_cols); // To pair with my_bitmap_init().
3396 my_free(m_rows_buf);
3397 my_free(m_extra_row_data);
3398 }
3399
get_data_size()3400 int Rows_log_event::get_data_size()
3401 {
3402 int const general_type_code= get_general_type_code();
3403
3404 uchar buf[MAX_INT_WIDTH];
3405 uchar *end= net_store_length(buf, m_width);
3406
3407 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
3408 return (int)(6 + no_bytes_in_map(&m_cols) + (end - buf) +
3409 (general_type_code == UPDATE_ROWS_EVENT ? no_bytes_in_map(&m_cols_ai) : 0) +
3410 m_rows_cur - m_rows_buf););
3411 int data_size= 0;
3412 Log_event_type type = get_type_code();
3413 bool is_v2_event= LOG_EVENT_IS_ROW_V2(type);
3414 if (is_v2_event)
3415 {
3416 data_size= ROWS_HEADER_LEN_V2 +
3417 (m_extra_row_data ?
3418 RW_V_TAG_LEN + m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET]:
3419 0);
3420 }
3421 else
3422 {
3423 data_size= ROWS_HEADER_LEN_V1;
3424 }
3425 data_size+= no_bytes_in_map(&m_cols);
3426 data_size+= (uint) (end - buf);
3427
3428 if (general_type_code == UPDATE_ROWS_EVENT)
3429 data_size+= no_bytes_in_map(&m_cols_ai);
3430
3431 data_size+= (uint) (m_rows_cur - m_rows_buf);
3432 return data_size;
3433 }
3434
3435
3436 /**************************************************************************
3437 Annotate_rows_log_event member functions
3438 **************************************************************************/
3439
Annotate_rows_log_event(const char * buf,uint event_len,const Format_description_log_event * desc)3440 Annotate_rows_log_event::Annotate_rows_log_event(const char *buf,
3441 uint event_len,
3442 const Format_description_log_event *desc)
3443 : Log_event(buf, desc),
3444 m_save_thd_query_txt(0),
3445 m_save_thd_query_len(0),
3446 m_saved_thd_query(false),
3447 m_used_query_txt(0)
3448 {
3449 m_query_len= event_len - desc->common_header_len;
3450 m_query_txt= (char*) buf + desc->common_header_len;
3451 }
3452
~Annotate_rows_log_event()3453 Annotate_rows_log_event::~Annotate_rows_log_event()
3454 {
3455 DBUG_ENTER("Annotate_rows_log_event::~Annotate_rows_log_event");
3456 #ifndef MYSQL_CLIENT
3457 if (m_saved_thd_query)
3458 thd->set_query(m_save_thd_query_txt, m_save_thd_query_len);
3459 else if (m_used_query_txt)
3460 thd->reset_query();
3461 #endif
3462 DBUG_VOID_RETURN;
3463 }
3464
get_data_size()3465 int Annotate_rows_log_event::get_data_size()
3466 {
3467 return m_query_len;
3468 }
3469
get_type_code()3470 Log_event_type Annotate_rows_log_event::get_type_code()
3471 {
3472 return ANNOTATE_ROWS_EVENT;
3473 }
3474
is_valid() const3475 bool Annotate_rows_log_event::is_valid() const
3476 {
3477 return (m_query_txt != NULL && m_query_len != 0);
3478 }
3479
3480
3481 /**************************************************************************
3482 Table_map_log_event member functions and support functions
3483 **************************************************************************/
3484
3485 /**
3486 @page How replication of field metadata works.
3487
3488 When a table map is created, the master first calls
3489 Table_map_log_event::save_field_metadata() which calculates how many
3490 values will be in the field metadata. Only those fields that require the
3491 extra data are added. The method also loops through all of the fields in
3492 the table calling the method Field::save_field_metadata() which returns the
3493 values for the field that will be saved in the metadata and replicated to
3494 the slave. Once all fields have been processed, the table map is written to
3495 the binlog adding the size of the field metadata and the field metadata to
3496 the end of the body of the table map.
3497
3498 When a table map is read on the slave, the field metadata is read from the
3499 table map and passed to the table_def class constructor which saves the
3500 field metadata from the table map into an array based on the type of the
3501 field. Field metadata values not present (those fields that do not use extra
3502 data) in the table map are initialized as zero (0). The array size is the
3503 same as the columns for the table on the slave.
3504
3505 Additionally, values saved for field metadata on the master are saved as a
3506 string of bytes (uchar) in the binlog. A field may require 1 or more bytes
3507 to store the information. In cases where values require multiple bytes
3508 (e.g. values > 255), the endian-safe methods are used to properly encode
3509 the values on the master and decode them on the slave. When the field
3510 metadata values are captured on the slave, they are stored in an array of
3511 type uint16. This allows the least number of casts to prevent casting bugs
3512 when the field metadata is used in comparisons of field attributes. When
3513 the field metadata is used for calculating addresses in pointer math, the
3514 type used is uint32.
3515 */
3516
3517 /*
3518 Constructor used by slave to read the event from the binary log.
3519 */
3520 #if defined(HAVE_REPLICATION)
Table_map_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3521 Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
3522 const Format_description_log_event
3523 *description_event)
3524
3525 : Log_event(buf, description_event),
3526 #ifndef MYSQL_CLIENT
3527 m_table(NULL),
3528 #endif
3529 m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
3530 m_colcnt(0), m_coltype(0),
3531 m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
3532 m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
3533 m_null_bits(0), m_meta_memory(NULL),
3534 m_optional_metadata_len(0), m_optional_metadata(NULL)
3535 {
3536 unsigned int bytes_read= 0;
3537 DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
3538
3539 uint8 common_header_len= description_event->common_header_len;
3540 uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
3541 DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
3542 event_len, common_header_len, post_header_len));
3543
3544 /*
3545 Don't print debug messages when running valgrind since they can
3546 trigger false warnings.
3547 */
3548 #ifndef HAVE_valgrind
3549 DBUG_DUMP("event buffer", (uchar*) buf, event_len);
3550 #endif
3551
3552 if (event_len < (uint)(common_header_len + post_header_len))
3553 DBUG_VOID_RETURN;
3554
3555 /* Read the post-header */
3556 const char *post_start= buf + common_header_len;
3557
3558 post_start+= TM_MAPID_OFFSET;
3559 VALIDATE_BYTES_READ(post_start, buf, event_len);
3560 if (post_header_len == 6)
3561 {
3562 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
3563 m_table_id= uint4korr(post_start);
3564 post_start+= 4;
3565 }
3566 else
3567 {
3568 DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
3569 m_table_id= (ulong) uint6korr(post_start);
3570 post_start+= TM_FLAGS_OFFSET;
3571 }
3572
3573 DBUG_ASSERT(m_table_id != ~0ULL);
3574
3575 m_flags= uint2korr(post_start);
3576
3577 /* Read the variable part of the event */
3578 const char *const vpart= buf + common_header_len + post_header_len;
3579
3580 /* Extract the length of the various parts from the buffer */
3581 uchar const *const ptr_dblen= (uchar const*)vpart + 0;
3582 VALIDATE_BYTES_READ(ptr_dblen, buf, event_len);
3583 m_dblen= *(uchar*) ptr_dblen;
3584
3585 /* Length of database name + counter + terminating null */
3586 uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
3587 VALIDATE_BYTES_READ(ptr_tbllen, buf, event_len);
3588 m_tbllen= *(uchar*) ptr_tbllen;
3589
3590 /* Length of table name + counter + terminating null */
3591 uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
3592 uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
3593 VALIDATE_BYTES_READ(ptr_after_colcnt, buf, event_len);
3594 m_colcnt= net_field_length(&ptr_after_colcnt);
3595
3596 DBUG_PRINT("info",("m_dblen: %lu off: %ld m_tbllen: %lu off: %ld m_colcnt: %lu off: %ld",
3597 (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart),
3598 (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
3599 m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
3600
3601 /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
3602 m_memory= (uchar*) my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
3603 &m_dbnam, (uint) m_dblen + 1,
3604 &m_tblnam, (uint) m_tbllen + 1,
3605 &m_coltype, (uint) m_colcnt,
3606 NullS);
3607
3608 if (m_memory)
3609 {
3610 /* Copy the different parts into their memory */
3611 strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
3612 strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
3613 memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
3614
3615 ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
3616 VALIDATE_BYTES_READ(ptr_after_colcnt, buf, event_len);
3617 m_field_metadata_size= net_field_length(&ptr_after_colcnt);
3618 if (m_field_metadata_size <= (m_colcnt * 2))
3619 {
3620 uint num_null_bytes= (m_colcnt + 7) / 8;
3621 m_meta_memory= (uchar *)my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
3622 &m_null_bits, num_null_bytes,
3623 &m_field_metadata, m_field_metadata_size,
3624 NULL);
3625 memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
3626 ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
3627 memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
3628 ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + num_null_bytes;
3629 }
3630 else
3631 {
3632 m_coltype= NULL;
3633 my_free(m_memory);
3634 m_memory= NULL;
3635 DBUG_VOID_RETURN;
3636 }
3637
3638 bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
3639
3640 /* After null_bits field, there are some new fields for extra metadata. */
3641 if (bytes_read < event_len)
3642 {
3643 m_optional_metadata_len= event_len - bytes_read;
3644 m_optional_metadata=
3645 static_cast<unsigned char*>(my_malloc(PSI_INSTRUMENT_ME, m_optional_metadata_len, MYF(MY_WME)));
3646 memcpy(m_optional_metadata, ptr_after_colcnt, m_optional_metadata_len);
3647 }
3648 }
3649 #ifdef MYSQL_SERVER
3650 if (!m_table)
3651 DBUG_VOID_RETURN;
3652 binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields *
3653 sizeof(Binlog_type_info));
3654 for (uint i= 0; i < m_table->s->fields; i++)
3655 binlog_type_info_array[i]= m_table->field[i]->binlog_type_info();
3656 #endif
3657
3658 DBUG_VOID_RETURN;
3659 }
3660 #endif
3661
~Table_map_log_event()3662 Table_map_log_event::~Table_map_log_event()
3663 {
3664 my_free(m_meta_memory);
3665 my_free(m_memory);
3666 my_free(m_optional_metadata);
3667 m_optional_metadata= NULL;
3668 }
3669
3670 /**
3671 Parses SIGNEDNESS field.
3672
3673 @param[out] vec stores the signedness flags extracted from field.
3674 @param[in] field SIGNEDNESS field in table_map_event.
3675 @param[in] length length of the field
3676 */
parse_signedness(std::vector<bool> & vec,unsigned char * field,unsigned int length)3677 static void parse_signedness(std::vector<bool> &vec,
3678 unsigned char *field, unsigned int length)
3679 {
3680 for (unsigned int i= 0; i < length; i++)
3681 {
3682 for (unsigned char c= 0x80; c != 0; c>>= 1)
3683 vec.push_back(field[i] & c);
3684 }
3685 }
3686
3687 /**
3688 Parses DEFAULT_CHARSET field.
3689
3690 @param[out] default_charset stores collation numbers extracted from field.
3691 @param[in] field DEFAULT_CHARSET field in table_map_event.
3692 @param[in] length length of the field
3693 */
parse_default_charset(Table_map_log_event::Optional_metadata_fields::Default_charset & default_charset,unsigned char * field,unsigned int length)3694 static void parse_default_charset(Table_map_log_event::Optional_metadata_fields::
3695 Default_charset &default_charset,
3696 unsigned char *field, unsigned int length)
3697 {
3698 unsigned char* p= field;
3699
3700 default_charset.default_charset= net_field_length(&p);
3701 while (p < field + length)
3702 {
3703 unsigned int col_index= net_field_length(&p);
3704 unsigned int col_charset= net_field_length(&p);
3705
3706 default_charset.charset_pairs.push_back(std::make_pair(col_index,
3707 col_charset));
3708 }
3709 }
3710
3711 /**
3712 Parses COLUMN_CHARSET field.
3713
3714 @param[out] vec stores collation numbers extracted from field.
3715 @param[in] field COLUMN_CHARSET field in table_map_event.
3716 @param[in] length length of the field
3717 */
parse_column_charset(std::vector<unsigned int> & vec,unsigned char * field,unsigned int length)3718 static void parse_column_charset(std::vector<unsigned int> &vec,
3719 unsigned char *field, unsigned int length)
3720 {
3721 unsigned char* p= field;
3722
3723 while (p < field + length)
3724 vec.push_back(net_field_length(&p));
3725 }
3726
3727 /**
3728 Parses COLUMN_NAME field.
3729
3730 @param[out] vec stores column names extracted from field.
3731 @param[in] field COLUMN_NAME field in table_map_event.
3732 @param[in] length length of the field
3733 */
parse_column_name(std::vector<std::string> & vec,unsigned char * field,unsigned int length)3734 static void parse_column_name(std::vector<std::string> &vec,
3735 unsigned char *field, unsigned int length)
3736 {
3737 unsigned char* p= field;
3738
3739 while (p < field + length)
3740 {
3741 unsigned len= net_field_length(&p);
3742 vec.push_back(std::string(reinterpret_cast<char *>(p), len));
3743 p+= len;
3744 }
3745 }
3746
3747 /**
3748 Parses SET_STR_VALUE/ENUM_STR_VALUE field.
3749
3750 @param[out] vec stores SET/ENUM column's string values extracted from
3751 field. Each SET/ENUM column's string values are stored
3752 into a string separate vector. All of them are stored
3753 in 'vec'.
3754 @param[in] field COLUMN_NAME field in table_map_event.
3755 @param[in] length length of the field
3756 */
parse_set_str_value(std::vector<Table_map_log_event::Optional_metadata_fields::str_vector> & vec,unsigned char * field,unsigned int length)3757 static void parse_set_str_value(std::vector<Table_map_log_event::
3758 Optional_metadata_fields::str_vector> &vec,
3759 unsigned char *field, unsigned int length)
3760 {
3761 unsigned char* p= field;
3762
3763 while (p < field + length)
3764 {
3765 unsigned int count= net_field_length(&p);
3766
3767 vec.push_back(std::vector<std::string>());
3768 for (unsigned int i= 0; i < count; i++)
3769 {
3770 unsigned len1= net_field_length(&p);
3771 vec.back().push_back(std::string(reinterpret_cast<char *>(p), len1));
3772 p+= len1;
3773 }
3774 }
3775 }
3776
3777 /**
3778 Parses GEOMETRY_TYPE field.
3779
3780 @param[out] vec stores geometry column's types extracted from field.
3781 @param[in] field GEOMETRY_TYPE field in table_map_event.
3782 @param[in] length length of the field
3783 */
parse_geometry_type(std::vector<unsigned int> & vec,unsigned char * field,unsigned int length)3784 static void parse_geometry_type(std::vector<unsigned int> &vec,
3785 unsigned char *field, unsigned int length)
3786 {
3787 unsigned char* p= field;
3788
3789 while (p < field + length)
3790 vec.push_back(net_field_length(&p));
3791 }
3792
3793 /**
3794 Parses SIMPLE_PRIMARY_KEY field.
3795
3796 @param[out] vec stores primary key's column information extracted from
3797 field. Each column has an index and a prefix which are
3798 stored as a unit_pair. prefix is always 0 for
3799 SIMPLE_PRIMARY_KEY field.
3800 @param[in] field SIMPLE_PRIMARY_KEY field in table_map_event.
3801 @param[in] length length of the field
3802 */
parse_simple_pk(std::vector<Table_map_log_event::Optional_metadata_fields::uint_pair> & vec,unsigned char * field,unsigned int length)3803 static void parse_simple_pk(std::vector<Table_map_log_event::
3804 Optional_metadata_fields::uint_pair> &vec,
3805 unsigned char *field, unsigned int length)
3806 {
3807 unsigned char* p= field;
3808
3809 while (p < field + length)
3810 vec.push_back(std::make_pair(net_field_length(&p), 0));
3811 }
3812
3813 /**
3814 Parses PRIMARY_KEY_WITH_PREFIX field.
3815
3816 @param[out] vec stores primary key's column information extracted from
3817 field. Each column has an index and a prefix which are
3818 stored as a unit_pair.
3819 @param[in] field PRIMARY_KEY_WITH_PREFIX field in table_map_event.
3820 @param[in] length length of the field
3821 */
3822
parse_pk_with_prefix(std::vector<Table_map_log_event::Optional_metadata_fields::uint_pair> & vec,unsigned char * field,unsigned int length)3823 static void parse_pk_with_prefix(std::vector<Table_map_log_event::
3824 Optional_metadata_fields::uint_pair> &vec,
3825 unsigned char *field, unsigned int length)
3826 {
3827 unsigned char* p= field;
3828
3829 while (p < field + length)
3830 {
3831 unsigned int col_index= net_field_length(&p);
3832 unsigned int col_prefix= net_field_length(&p);
3833 vec.push_back(std::make_pair(col_index, col_prefix));
3834 }
3835 }
3836
3837 Table_map_log_event::Optional_metadata_fields::
Optional_metadata_fields(unsigned char * optional_metadata,unsigned int optional_metadata_len)3838 Optional_metadata_fields(unsigned char* optional_metadata,
3839 unsigned int optional_metadata_len)
3840 {
3841 unsigned char* field= optional_metadata;
3842
3843 if (optional_metadata == NULL)
3844 return;
3845
3846 while (field < optional_metadata + optional_metadata_len)
3847 {
3848 unsigned int len;
3849 Optional_metadata_field_type type=
3850 static_cast<Optional_metadata_field_type>(field[0]);
3851
3852 // Get length and move field to the value.
3853 field++;
3854 len= net_field_length(&field);
3855
3856 switch(type)
3857 {
3858 case SIGNEDNESS:
3859 parse_signedness(m_signedness, field, len);
3860 break;
3861 case DEFAULT_CHARSET:
3862 parse_default_charset(m_default_charset, field, len);
3863 break;
3864 case COLUMN_CHARSET:
3865 parse_column_charset(m_column_charset, field, len);
3866 break;
3867 case COLUMN_NAME:
3868 parse_column_name(m_column_name, field, len);
3869 break;
3870 case SET_STR_VALUE:
3871 parse_set_str_value(m_set_str_value, field, len);
3872 break;
3873 case ENUM_STR_VALUE:
3874 parse_set_str_value(m_enum_str_value, field, len);
3875 break;
3876 case GEOMETRY_TYPE:
3877 parse_geometry_type(m_geometry_type, field, len);
3878 break;
3879 case SIMPLE_PRIMARY_KEY:
3880 parse_simple_pk(m_primary_key, field, len);
3881 break;
3882 case PRIMARY_KEY_WITH_PREFIX:
3883 parse_pk_with_prefix(m_primary_key, field, len);
3884 break;
3885 case ENUM_AND_SET_DEFAULT_CHARSET:
3886 parse_default_charset(m_enum_and_set_default_charset, field, len);
3887 break;
3888 case ENUM_AND_SET_COLUMN_CHARSET:
3889 parse_column_charset(m_enum_and_set_column_charset, field, len);
3890 break;
3891 default:
3892 DBUG_ASSERT(0);
3893 }
3894 // next field
3895 field+= len;
3896 }
3897 }
3898
3899
3900 /**************************************************************************
3901 Write_rows_log_event member functions
3902 **************************************************************************/
3903
3904
3905 /*
3906 Constructor used by slave to read the event from the binary log.
3907 */
3908 #ifdef HAVE_REPLICATION
Write_rows_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3909 Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
3910 const Format_description_log_event
3911 *description_event)
3912 : Rows_log_event(buf, event_len, description_event)
3913 {
3914 }
3915
Write_rows_compressed_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3916 Write_rows_compressed_log_event::Write_rows_compressed_log_event(
3917 const char *buf, uint event_len,
3918 const Format_description_log_event
3919 *description_event)
3920 : Write_rows_log_event(buf, event_len, description_event)
3921 {
3922 uncompress_buf();
3923 }
3924 #endif
3925
3926
3927 /**************************************************************************
3928 Delete_rows_log_event member functions
3929 **************************************************************************/
3930
3931 /*
3932 Constructor used by slave to read the event from the binary log.
3933 */
3934 #ifdef HAVE_REPLICATION
Delete_rows_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3935 Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
3936 const Format_description_log_event
3937 *description_event)
3938 : Rows_log_event(buf, event_len, description_event)
3939 {
3940 }
3941
Delete_rows_compressed_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3942 Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(
3943 const char *buf, uint event_len,
3944 const Format_description_log_event
3945 *description_event)
3946 : Delete_rows_log_event(buf, event_len, description_event)
3947 {
3948 uncompress_buf();
3949 }
3950 #endif
3951
3952 /**************************************************************************
3953 Update_rows_log_event member functions
3954 **************************************************************************/
3955
~Update_rows_log_event()3956 Update_rows_log_event::~Update_rows_log_event()
3957 {
3958 if (m_cols_ai.bitmap)
3959 {
3960 if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
3961 m_cols_ai.bitmap= 0; // so no my_free in my_bitmap_free
3962 my_bitmap_free(&m_cols_ai); // To pair with my_bitmap_init().
3963 }
3964 }
3965
3966
3967 /*
3968 Constructor used by slave to read the event from the binary log.
3969 */
3970 #ifdef HAVE_REPLICATION
Update_rows_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3971 Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
3972 const
3973 Format_description_log_event
3974 *description_event)
3975 : Rows_log_event(buf, event_len, description_event)
3976 {
3977 }
3978
Update_rows_compressed_log_event(const char * buf,uint event_len,const Format_description_log_event * description_event)3979 Update_rows_compressed_log_event::Update_rows_compressed_log_event(
3980 const char *buf, uint event_len,
3981 const Format_description_log_event
3982 *description_event)
3983 : Update_rows_log_event(buf, event_len, description_event)
3984 {
3985 uncompress_buf();
3986 }
3987 #endif
3988
Incident_log_event(const char * buf,uint event_len,const Format_description_log_event * descr_event)3989 Incident_log_event::Incident_log_event(const char *buf, uint event_len,
3990 const Format_description_log_event *descr_event)
3991 : Log_event(buf, descr_event)
3992 {
3993 DBUG_ENTER("Incident_log_event::Incident_log_event");
3994 uint8 const common_header_len=
3995 descr_event->common_header_len;
3996 uint8 const post_header_len=
3997 descr_event->post_header_len[INCIDENT_EVENT-1];
3998
3999 DBUG_PRINT("info",("event_len: %u; common_header_len: %d; post_header_len: %d",
4000 event_len, common_header_len, post_header_len));
4001
4002 m_message.str= NULL;
4003 m_message.length= 0;
4004 int incident_number= uint2korr(buf + common_header_len);
4005 if (incident_number >= INCIDENT_COUNT ||
4006 incident_number <= INCIDENT_NONE)
4007 {
4008 // If the incident is not recognized, this binlog event is
4009 // invalid. If we set incident_number to INCIDENT_NONE, the
4010 // invalidity will be detected by is_valid().
4011 m_incident= INCIDENT_NONE;
4012 DBUG_VOID_RETURN;
4013 }
4014 m_incident= static_cast<Incident>(incident_number);
4015 char const *ptr= buf + common_header_len + post_header_len;
4016 char const *const str_end= buf + event_len;
4017 uint8 len= 0; // Assignment to keep compiler happy
4018 const char *str= NULL; // Assignment to keep compiler happy
4019 if (read_str(&ptr, str_end, &str, &len))
4020 {
4021 /* Mark this event invalid */
4022 m_incident= INCIDENT_NONE;
4023 DBUG_VOID_RETURN;
4024 }
4025 if (!(m_message.str= (char*) my_malloc(key_memory_log_event, len+1, MYF(MY_WME))))
4026 {
4027 /* Mark this event invalid */
4028 m_incident= INCIDENT_NONE;
4029 DBUG_VOID_RETURN;
4030 }
4031 strmake(m_message.str, str, len);
4032 m_message.length= len;
4033 DBUG_PRINT("info", ("m_incident: %d", m_incident));
4034 DBUG_VOID_RETURN;
4035 }
4036
4037
~Incident_log_event()4038 Incident_log_event::~Incident_log_event()
4039 {
4040 if (m_message.str)
4041 my_free(m_message.str);
4042 }
4043
4044
4045 const char *
description() const4046 Incident_log_event::description() const
4047 {
4048 static const char *const description[]= {
4049 "NOTHING", // Not used
4050 "LOST_EVENTS"
4051 };
4052
4053 DBUG_PRINT("info", ("m_incident: %d", m_incident));
4054 return description[m_incident];
4055 }
4056
4057
Ignorable_log_event(const char * buf,const Format_description_log_event * descr_event,const char * event_name)4058 Ignorable_log_event::Ignorable_log_event(const char *buf,
4059 const Format_description_log_event
4060 *descr_event,
4061 const char *event_name)
4062 :Log_event(buf, descr_event), number((int) (uchar) buf[EVENT_TYPE_OFFSET]),
4063 description(event_name)
4064 {
4065 DBUG_ENTER("Ignorable_log_event::Ignorable_log_event");
4066 DBUG_VOID_RETURN;
4067 }
4068
~Ignorable_log_event()4069 Ignorable_log_event::~Ignorable_log_event()
4070 {
4071 }
4072
copy_event_cache_to_file_and_reinit(IO_CACHE * cache,FILE * file)4073 bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file)
4074 {
4075 return (my_b_copy_all_to_file(cache, file) ||
4076 reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE));
4077 }
4078