1 /* Copyright (c) 2007, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "my_global.h" // REQUIRED by log_event.h > m_string.h > my_bitmap.h
24 #include "log_event.h"
25 #ifndef MYSQL_CLIENT
26 #include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
27 #include "sql_base.h" // close_tables_for_reopen
28 #include "key.h" // key_copy
29 #include "lock.h" // mysql_unlock_tables
30 #include "sql_parse.h" // mysql_reset_thd_for_next_command
31 #include "rpl_rli.h"
32 #include "rpl_utility.h"
33 #endif
34 #include "log_event_old.h"
35 #include "rpl_record_old.h"
36 #include "transaction.h"
37
38 #include <algorithm>
39
40 using std::min;
41 using std::max;
42
43 PSI_memory_key key_memory_log_event_old;
44
45 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
46
47 // Old implementation of do_apply_event()
48 int
do_apply_event(Old_rows_log_event * ev,const Relay_log_info * rli)49 Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli)
50 {
51 DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
52 int error= 0;
53 THD *ev_thd= ev->thd;
54 uchar const *row_start= ev->m_rows_buf;
55
56 /*
57 If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
58 does not contain any data. In that case, we just remove all tables in the
59 tables_to_lock list, close the thread tables, and return with
60 success.
61 */
62 if ((ev->m_table_id.id() == ~0U || ev->m_table_id.id() == (~0ULL >> 16)) &&
63 ev->m_cols.n_bits == 1 && ev->m_cols.bitmap[0] == 0)
64
65 {
66 /*
67 This one is supposed to be set: just an extra check so that
68 nothing strange has happened.
69 */
70 assert(ev->get_flags(Old_rows_log_event::STMT_END_F));
71
72 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
73 ev_thd->clear_error();
74 DBUG_RETURN(0);
75 }
76
77 /*
78 'ev_thd' has been set by exec_relay_log_event(), just before calling
79 do_apply_event(). We still check here to prevent future coding
80 errors.
81 */
82 assert(rli->info_thd == ev_thd);
83
84 /*
85 If there is no locks taken, this is the first binrow event seen
86 after the table map events. We should then lock all the tables
87 used in the transaction and proceed with execution of the actual
88 event.
89 */
90 if (!ev_thd->lock)
91 {
92 /*
93 Lock_tables() reads the contents of ev_thd->lex, so they must be
94 initialized.
95
96 We also call the mysql_reset_thd_for_next_command(), since this
97 is the logical start of the next "statement". Note that this
98 call might reset the value of current_stmt_binlog_format, so
99 we need to do any changes to that value after this function.
100 */
101 lex_start(ev_thd);
102 mysql_reset_thd_for_next_command(ev_thd);
103
104 /*
105 This is a row injection, so we flag the "statement" as
106 such. Note that this code is called both when the slave does row
107 injections and when the BINLOG statement is used to do row
108 injections.
109 */
110 ev_thd->lex->set_stmt_row_injection();
111
112 if (open_and_lock_tables(ev_thd, rli->tables_to_lock, 0))
113 {
114 if (ev_thd->is_error())
115 {
116 /*
117 Error reporting borrowed from Query_log_event with many excessive
118 simplifications (we don't honour --slave-skip-errors)
119 */
120 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->mysql_errno(),
121 "Error '%s' on opening tables",
122 ev_thd->get_stmt_da()->message_text());
123 ev_thd->is_slave_error= 1;
124 }
125 DBUG_RETURN(1);
126 }
127
128 /*
129 When the open and locking succeeded, we check all tables to
130 ensure that they still have the correct type.
131 */
132
133 {
134 TABLE_LIST *table_list_ptr= rli->tables_to_lock;
135 for (uint i=0 ; table_list_ptr&& (i< rli->tables_to_lock_count);
136 table_list_ptr= table_list_ptr->next_global, i++)
137 {
138 /*
139 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
140 function for the explanation of the below if condition
141 */
142 if (table_list_ptr->parent_l)
143 continue;
144 /*
145 We can use a down cast here since we know that every table added
146 to the tables_to_lock is a RPL_TABLE_LIST(or child table which is
147 skipped above).
148 */
149 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
150 assert(ptr->m_tabledef_valid);
151 TABLE *conv_table;
152 if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
153 ptr->table, &conv_table))
154 {
155 ev_thd->is_slave_error= 1;
156 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
157 DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
158 }
159 DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
160 " - conv_table: %p",
161 ptr->table->s->db.str,
162 ptr->table->s->table_name.str, conv_table));
163 ptr->m_conv_table= conv_table;
164 }
165 }
166
167 /*
168 ... and then we add all the tables to the table map and remove
169 them from tables to lock.
170
171 We also invalidate the query cache for all the tables, since
172 they will now be changed.
173
174 TODO [/Matz]: Maybe the query cache should not be invalidated
175 here? It might be that a table is not changed, even though it
176 was locked for the statement. We do know that each
177 Old_rows_log_event contain at least one row, so after processing one
178 Old_rows_log_event, we can invalidate the query cache for the
179 associated table.
180 */
181 TABLE_LIST *ptr= rli->tables_to_lock;
182 for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
183 {
184 /*
185 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
186 function for the explanation of the below if condition
187 */
188 if (ptr->parent_l)
189 continue;
190 const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
191 }
192 query_cache.invalidate_locked_for_write(rli->tables_to_lock);
193 }
194
195 TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
196
197 if (table)
198 {
199 /*
200 table == NULL means that this table should not be replicated
201 (this was set up by Table_map_log_event::do_apply_event()
202 which tested replicate-* rules).
203 */
204
205 /*
206 It's not needed to set_time() but
207 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
208 much slave is behind
209 2) it will be needed when we allow replication from a table with no
210 TIMESTAMP column to a table with one.
211 So we call set_time(), like in SBR. Presently it changes nothing.
212 */
213 ev_thd->set_time(&ev->common_header->when);
214 /*
215 There are a few flags that are replicated with each row event.
216 Make sure to set/clear them before executing the main body of
217 the event.
218 */
219 if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
220 ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
221 else
222 ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
223
224 if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
225 ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
226 else
227 ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
228 /* A small test to verify that objects have consistent types */
229 assert(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
230
231 /*
232 Now we are in a statement and will stay in a statement until we
233 see a STMT_END_F.
234
235 We set this flag here, before actually applying any rows, in
236 case the SQL thread is stopped and we need to detect that we're
237 inside a statement and halting abruptly might cause problems
238 when restarting.
239 */
240 const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
241
242 error= do_before_row_operations(table);
243 while (error == 0 && row_start < ev->m_rows_end)
244 {
245 uchar const *row_end= NULL;
246 if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
247 break; // We should perform the after-row operation even in
248 // the case of error
249
250 assert(row_end != NULL); // cannot happen
251 assert(row_end <= ev->m_rows_end);
252
253 /* in_use can have been set to NULL in close_tables_for_reopen */
254 THD* old_thd= table->in_use;
255 if (!table->in_use)
256 table->in_use= ev_thd;
257 error= do_exec_row(table);
258 table->in_use = old_thd;
259 switch (error)
260 {
261 /* Some recoverable errors */
262 case HA_ERR_RECORD_CHANGED:
263 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
264 tuple does not exist */
265 error= 0;
266 case 0:
267 break;
268
269 default:
270 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->mysql_errno(),
271 "Error in %s event: row application failed. %s",
272 ev->get_type_str(),
273 (ev_thd->is_error() ?
274 ev_thd->get_stmt_da()->message_text() :
275 ""));
276 thd->is_slave_error= 1;
277 break;
278 }
279
280 row_start= row_end;
281 }
282 DBUG_EXECUTE_IF("stop_slave_middle_group",
283 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
284 error= do_after_row_operations(table, error);
285 }
286
287 if (error)
288 { /* error has occured during the transaction */
289 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->mysql_errno(),
290 "Error in %s event: error during transaction execution "
291 "on table %s.%s. %s",
292 ev->get_type_str(), table->s->db.str,
293 table->s->table_name.str,
294 ev_thd->is_error() ? ev_thd->get_stmt_da()->message_text() : "");
295
296 /*
297 If one day we honour --skip-slave-errors in row-based replication, and
298 the error should be skipped, then we would clear mappings, rollback,
299 close tables, but the slave SQL thread would not stop and then may
300 assume the mapping is still available, the tables are still open...
301 So then we should clear mappings/rollback/close here only if this is a
302 STMT_END_F.
303 For now we code, knowing that error is not skippable and so slave SQL
304 thread is certainly going to stop.
305 rollback at the caller along with sbr.
306 */
307 ev_thd->reset_current_stmt_binlog_format_row();
308 const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
309 ev_thd->is_slave_error= 1;
310 DBUG_RETURN(error);
311 }
312
313 DBUG_RETURN(0);
314 }
315 #endif
316
317
318 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
319
320 /*
321 Check if there are more UNIQUE keys after the given key.
322 */
323 static int
last_uniq_key(TABLE * table,uint keyno)324 last_uniq_key(TABLE *table, uint keyno)
325 {
326 while (++keyno < table->s->keys)
327 if (table->key_info[keyno].flags & HA_NOSAME)
328 return 0;
329 return 1;
330 }
331
332
333 /*
334 Compares table->record[0] and table->record[1]
335
336 Returns TRUE if different.
337 */
record_compare(TABLE * table)338 static bool record_compare(TABLE *table)
339 {
340 /*
341 Need to set the X bit and the filler bits in both records since
342 there are engines that do not set it correctly.
343
344 In addition, since MyISAM checks that one hasn't tampered with the
345 record, it is necessary to restore the old bytes into the record
346 after doing the comparison.
347
348 TODO[record format ndb]: Remove it once NDB returns correct
349 records. Check that the other engines also return correct records.
350 */
351
352 bool result= FALSE;
353 uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
354
355 if (table->s->null_bytes > 0)
356 {
357 for (int i = 0 ; i < 2 ; ++i)
358 {
359 /*
360 If we have an X bit then we need to take care of it.
361 */
362 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
363 {
364 saved_x[i]= table->record[i][0];
365 table->record[i][0]|= 1U;
366 }
367
368 /*
369 If (last_null_bit_pos == 0 && null_bytes > 1), then:
370
371 X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
372
373 Ie, the entire byte is used.
374 */
375 if (table->s->last_null_bit_pos > 0)
376 {
377 saved_filler[i]= table->record[i][table->s->null_bytes - 1];
378 table->record[i][table->s->null_bytes - 1]|=
379 256U - (1U << table->s->last_null_bit_pos);
380 }
381 }
382 }
383
384 if (table->s->blob_fields + table->s->varchar_fields == 0)
385 {
386 result= cmp_record(table,record[1]);
387 goto record_compare_exit;
388 }
389
390 /* Compare null bits */
391 if (memcmp(table->null_flags,
392 table->null_flags+table->s->rec_buff_length,
393 table->s->null_bytes))
394 {
395 result= TRUE; // Diff in NULL value
396 goto record_compare_exit;
397 }
398
399 /* Compare updated fields */
400 for (Field **ptr=table->field ; *ptr ; ptr++)
401 {
402 if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
403 {
404 result= TRUE;
405 goto record_compare_exit;
406 }
407 }
408
409 record_compare_exit:
410 /*
411 Restore the saved bytes.
412
413 TODO[record format ndb]: Remove this code once NDB returns the
414 correct record format.
415 */
416 if (table->s->null_bytes > 0)
417 {
418 for (int i = 0 ; i < 2 ; ++i)
419 {
420 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
421 table->record[i][0]= saved_x[i];
422
423 if (table->s->last_null_bit_pos > 0)
424 table->record[i][table->s->null_bytes - 1]= saved_filler[i];
425 }
426 }
427
428 return result;
429 }
430
431
432 /*
433 Copy "extra" columns from record[1] to record[0].
434
435 Copy the extra fields that are not present on the master but are
436 present on the slave from record[1] to record[0]. This is used
437 after fetching a record that are to be updated, either inside
438 replace_record() or as part of executing an update_row().
439 */
440 static int
copy_extra_record_fields(TABLE * table,size_t master_reclength,my_ptrdiff_t master_fields)441 copy_extra_record_fields(TABLE *table,
442 size_t master_reclength,
443 my_ptrdiff_t master_fields)
444 {
445 DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
446 DBUG_PRINT("info", ("Copying to 0x%lx "
447 "from field %lu at offset %lu "
448 "to field %d at offset %lu",
449 (long) table->record[0],
450 (ulong) master_fields, (ulong) master_reclength,
451 table->s->fields, table->s->reclength));
452 /*
453 Copying the extra fields of the slave that does not exist on
454 master into record[0] (which are basically the default values).
455 */
456
457 if (table->s->fields < (uint) master_fields)
458 DBUG_RETURN(0);
459
460 assert(master_reclength <= table->s->reclength);
461 if (master_reclength < table->s->reclength)
462 memcpy(table->record[0] + master_reclength,
463 table->record[1] + master_reclength,
464 table->s->reclength - master_reclength);
465
466 /*
467 Bit columns are special. We iterate over all the remaining
468 columns and copy the "extra" bits to the new record. This is
469 not a very good solution: it should be refactored on
470 opportunity.
471
472 REFACTORING SUGGESTION (Matz). Introduce a member function
473 similar to move_field_offset() called copy_field_offset() to
474 copy field values and implement it for all Field subclasses. Use
475 this function to copy data from the found record to the record
476 that are going to be inserted.
477
478 The copy_field_offset() function need to be a virtual function,
479 which in this case will prevent copying an entire range of
480 fields efficiently.
481 */
482 {
483 Field **field_ptr= table->field + master_fields;
484 for ( ; *field_ptr ; ++field_ptr)
485 {
486 /*
487 Set the null bit according to the values in record[1]
488 */
489 if ((*field_ptr)->maybe_null() &&
490 (*field_ptr)->is_null_in_record(table->record[1]))
491 (*field_ptr)->set_null();
492 else
493 (*field_ptr)->set_notnull();
494
495 /*
496 Do the extra work for special columns.
497 */
498 switch ((*field_ptr)->real_type())
499 {
500 default:
501 /* Nothing to do */
502 break;
503
504 case MYSQL_TYPE_BIT:
505 Field_bit *f= static_cast<Field_bit*>(*field_ptr);
506 if (f->bit_len > 0)
507 {
508 my_ptrdiff_t const offset= table->record[1] - table->record[0];
509 uchar const bits=
510 get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
511 set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
512 }
513 break;
514 }
515 }
516 }
517 DBUG_RETURN(0); // All OK
518 }
519
520
521 /*
522 Replace the provided record in the database.
523
524 SYNOPSIS
525 replace_record()
526 thd Thread context for writing the record.
527 table Table to which record should be written.
528 master_reclength
529 Offset to first column that is not present on the master,
530 alternatively the length of the record on the master
531 side.
532
533 RETURN VALUE
534 Error code on failure, 0 on success.
535
536 DESCRIPTION
537 Similar to how it is done in mysql_insert(), we first try to do
538 a ha_write_row() and of that fails due to duplicated keys (or
539 indices), we do an ha_update_row() or a ha_delete_row() instead.
540 */
541 static int
replace_record(THD * thd,TABLE * table,ulong const master_reclength,uint const master_fields)542 replace_record(THD *thd, TABLE *table,
543 ulong const master_reclength,
544 uint const master_fields)
545 {
546 DBUG_ENTER("replace_record");
547 assert(table != NULL && thd != NULL);
548
549 int error;
550 int keynum;
551 char *key= NULL;
552
553 #ifndef NDEBUG
554 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
555 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
556 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
557 #endif
558
559 while ((error= table->file->ha_write_row(table->record[0])))
560 {
561 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
562 {
563 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
564 DBUG_RETURN(error);
565 }
566 if ((keynum= table->file->get_dup_key(error)) < 0)
567 {
568 table->file->print_error(error, MYF(0));
569 /*
570 We failed to retrieve the duplicate key
571 - either because the error was not "duplicate key" error
572 - or because the information which key is not available
573 */
574 DBUG_RETURN(error);
575 }
576
577 /*
578 key index value is either valid in the range [0-MAX_KEY) or
579 has value MAX_KEY as a marker for the case when no information
580 about key can be found. In the last case we have to require
581 that storage engine has the flag HA_DUPLICATE_POS turned on.
582 If this invariant is false then assert will crash
583 the server built in debug mode. For the server that was built
584 without DEBUG we have additional check for the value of key index
585 in the code below in order to report about error in any case.
586 */
587 assert(keynum != MAX_KEY ||
588 (keynum == MAX_KEY &&
589 (table->file->ha_table_flags() & HA_DUPLICATE_POS)));
590
591 /*
592 We need to retrieve the old row into record[1] to be able to
593 either update or delete the offending record. We either:
594
595 - use ha_rnd_pos() with a row-id (available as dupp_row) to the
596 offending row, if that is possible (MyISAM and Blackhole), or else
597
598 - use ha_index_read_idx_map() with the key that is duplicated, to
599 retrieve the offending row.
600 */
601 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
602 {
603 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
604 if (error)
605 {
606 DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
607 if (error == HA_ERR_RECORD_DELETED)
608 error= HA_ERR_KEY_NOT_FOUND;
609 table->file->print_error(error, MYF(0));
610 DBUG_RETURN(error);
611 }
612 }
613 else
614 {
615 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
616 {
617 DBUG_RETURN(my_errno());
618 }
619
620 if (key == NULL)
621 {
622 key= static_cast<char*>(my_alloca(table->s->max_unique_length));
623 if (key == NULL)
624 DBUG_RETURN(ENOMEM);
625 }
626
627 if ((uint)keynum < MAX_KEY)
628 {
629 key_copy((uchar*)key, table->record[0], table->key_info + keynum,
630 0);
631 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
632 (const uchar*)key,
633 HA_WHOLE_KEY,
634 HA_READ_KEY_EXACT);
635 }
636 else
637 /*
638 For the server built in non-debug mode returns error if
639 handler::get_dup_key() returned MAX_KEY as the value of key index.
640 */
641 error= HA_ERR_FOUND_DUPP_KEY;
642
643 if (error)
644 {
645 DBUG_PRINT("info", ("ha_index_read_idx_map() returns error %d", error));
646 if (error == HA_ERR_RECORD_DELETED)
647 error= HA_ERR_KEY_NOT_FOUND;
648 table->file->print_error(error, MYF(0));
649 DBUG_RETURN(error);
650 }
651 }
652
653 /*
654 Now, table->record[1] should contain the offending row. That
655 will enable us to update it or, alternatively, delete it (so
656 that we can insert the new row afterwards).
657
658 First we copy the columns into table->record[0] that are not
659 present on the master from table->record[1], if there are any.
660 */
661 copy_extra_record_fields(table, master_reclength, master_fields);
662
663 /*
664 REPLACE is defined as either INSERT or DELETE + INSERT. If
665 possible, we can replace it with an UPDATE, but that will not
666 work on InnoDB if FOREIGN KEY checks are necessary.
667
668 I (Matz) am not sure of the reason for the last_uniq_key()
669 check as, but I'm guessing that it's something along the
670 following lines.
671
672 Suppose that we got the duplicate key to be a key that is not
673 the last unique key for the table and we perform an update:
674 then there might be another key for which the unique check will
675 fail, so we're better off just deleting the row and inserting
676 the correct row.
677 */
678 if (last_uniq_key(table, keynum) &&
679 !table->file->referenced_by_foreign_key())
680 {
681 error=table->file->ha_update_row(table->record[1],
682 table->record[0]);
683 if (error && error != HA_ERR_RECORD_IS_THE_SAME)
684 table->file->print_error(error, MYF(0));
685 else
686 error= 0;
687 DBUG_RETURN(error);
688 }
689 else
690 {
691 if ((error= table->file->ha_delete_row(table->record[1])))
692 {
693 table->file->print_error(error, MYF(0));
694 DBUG_RETURN(error);
695 }
696 /* Will retry ha_write_row() with the offending row removed. */
697 }
698 }
699
700 DBUG_RETURN(error);
701 }
702
703
704 /**
705 Find the row given by 'key', if the table has keys, or else use a table scan
706 to find (and fetch) the row.
707
708 If the engine allows random access of the records, a combination of
709 position() and rnd_pos() will be used.
710
711 @param table Pointer to table to search
712 @param key Pointer to key to use for search, if table has key
713
714 @pre <code>table->record[0]</code> shall contain the row to locate
715 and <code>key</code> shall contain a key to use for searching, if
716 the engine has a key.
717
718 @post If the return value is zero, <code>table->record[1]</code>
719 will contain the fetched row and the internal "cursor" will refer to
720 the row. If the return value is non-zero,
721 <code>table->record[1]</code> is undefined. In either case,
722 <code>table->record[0]</code> is undefined.
723
724 @return Zero if the row was successfully fetched into
725 <code>table->record[1]</code>, error code otherwise.
726 */
727
find_and_fetch_row(TABLE * table,uchar * key)728 static int find_and_fetch_row(TABLE *table, uchar *key)
729 {
730 DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
731 DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx",
732 (long) table, (long) key, (long) table->record[1]));
733
734 assert(table->in_use != NULL);
735
736 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
737
738 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
739 table->s->primary_key < MAX_KEY)
740 {
741 /*
742 Use a more efficient method to fetch the record given by
743 table->record[0] if the engine allows it. We first compute a
744 row reference using the position() member function (it will be
745 stored in table->file->ref) and the use rnd_pos() to position
746 the "cursor" (i.e., record[0] in this case) at the correct row.
747
748 TODO: Add a check that the correct record has been fetched by
749 comparing with the original record. Take into account that the
750 record on the master and slave can be of different
751 length. Something along these lines should work:
752
753 ADD>>> store_record(table,record[1]);
754 int error= table->file->rnd_pos(table->record[0], table->file->ref);
755 ADD>>> assert(memcmp(table->record[1], table->record[0],
756 table->s->reclength) == 0);
757
758 */
759 table->file->position(table->record[0]);
760 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
761 /*
762 ha_rnd_pos() returns the record in table->record[0], so we have to
763 move it to table->record[1].
764 */
765 memcpy(table->record[1], table->record[0], table->s->reclength);
766 DBUG_RETURN(error);
767 }
768
769 /* We need to retrieve all fields */
770 /* TODO: Move this out from this function to main loop */
771 table->use_all_columns();
772
773 if (table->s->keys > 0)
774 {
775 int error;
776 /* We have a key: search the table using the index */
777 if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
778 {
779 table->file->print_error(error, MYF(0));
780 DBUG_RETURN(error);
781 }
782
783 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
784 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
785
786 /*
787 We need to set the null bytes to ensure that the filler bit are
788 all set when returning. There are storage engines that just set
789 the necessary bits on the bytes and don't set the filler bits
790 correctly.
791 */
792 my_ptrdiff_t const pos=
793 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
794 table->record[1][pos]= 0xFF;
795 if ((error= table->file->ha_index_read_map(table->record[1], key, HA_WHOLE_KEY,
796 HA_READ_KEY_EXACT)))
797 {
798 table->file->print_error(error, MYF(0));
799 table->file->ha_index_end();
800 DBUG_RETURN(error);
801 }
802
803 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
804 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
805 /*
806 Below is a minor "optimization". If the key (i.e., key number
807 0) has the HA_NOSAME flag set, we know that we have found the
808 correct record (since there can be no duplicates); otherwise, we
809 have to compare the record with the one found to see if it is
810 the correct one.
811
812 CAVEAT! This behaviour is essential for the replication of,
813 e.g., the mysql.proc table since the correct record *shall* be
814 found using the primary key *only*. There shall be no
815 comparison of non-PK columns to decide if the correct record is
816 found. I can see no scenario where it would be incorrect to
817 chose the row to change only using a PK or an UNNI.
818 */
819 if (table->key_info->flags & HA_NOSAME)
820 {
821 table->file->ha_index_end();
822 DBUG_RETURN(0);
823 }
824
825 while (record_compare(table))
826 {
827 int error;
828
829 /*
830 We need to set the null bytes to ensure that the filler bit
831 are all set when returning. There are storage engines that
832 just set the necessary bits on the bytes and don't set the
833 filler bits correctly.
834
835 TODO[record format ndb]: Remove this code once NDB returns the
836 correct record format.
837 */
838 if (table->s->null_bytes > 0)
839 {
840 table->record[1][table->s->null_bytes - 1]|=
841 256U - (1U << table->s->last_null_bit_pos);
842 }
843
844 while ((error= table->file->ha_index_next(table->record[1])))
845 {
846 /* We just skip records that has already been deleted */
847 if (error == HA_ERR_RECORD_DELETED)
848 continue;
849 table->file->print_error(error, MYF(0));
850 table->file->ha_index_end();
851 DBUG_RETURN(error);
852 }
853 }
854
855 /*
856 Have to restart the scan to be able to fetch the next row.
857 */
858 table->file->ha_index_end();
859 }
860 else
861 {
862 int restart_count= 0; // Number of times scanning has restarted from top
863 int error;
864
865 /* We don't have a key: search the table using ha_rnd_next() */
866 if ((error= table->file->ha_rnd_init(1)))
867 {
868 table->file->print_error(error, MYF(0));
869 DBUG_RETURN(error);
870 }
871
872 /* Continue until we find the right record or have made a full loop */
873 do
874 {
875 restart_ha_rnd_next:
876 error= table->file->ha_rnd_next(table->record[1]);
877
878 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
879 DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
880
881 switch (error) {
882 case 0:
883 break;
884
885 /*
886 If the record was deleted, we pick the next one without doing
887 any comparisons.
888 */
889 case HA_ERR_RECORD_DELETED:
890 goto restart_ha_rnd_next;
891
892 case HA_ERR_END_OF_FILE:
893 if (++restart_count < 2)
894 {
895 if ((error= table->file->ha_rnd_init(1)))
896 {
897 table->file->print_error(error, MYF(0));
898 DBUG_RETURN(error);
899 }
900 }
901 break;
902
903 default:
904 table->file->print_error(error, MYF(0));
905 DBUG_PRINT("info", ("Record not found"));
906 (void) table->file->ha_rnd_end();
907 DBUG_RETURN(error);
908 }
909 }
910 while (restart_count < 2 && record_compare(table));
911
912 /*
913 Have to restart the scan to be able to fetch the next row.
914 */
915 DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
916 table->file->ha_rnd_end();
917
918 assert(error == HA_ERR_END_OF_FILE || error == 0);
919 DBUG_RETURN(error);
920 }
921
922 DBUG_RETURN(0);
923 }
924
925
926 /**********************************************************
927 Row handling primitives for Write_rows_log_event_old
928 **********************************************************/
929
do_before_row_operations(TABLE * table)930 int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
931 {
932 int error= 0;
933
934 /*
935 We are using REPLACE semantics and not INSERT IGNORE semantics
936 when writing rows, that is: new rows replace old rows. We need to
937 inform the storage engine that it should use this behaviour.
938 */
939
940 /* Tell the storage engine that we are using REPLACE semantics. */
941 thd->lex->duplicates= DUP_REPLACE;
942
943 /*
944 Pretend we're executing a REPLACE command: this is needed for
945 InnoDB and NDB Cluster since they are not (properly) checking the
946 lex->duplicates flag.
947 */
948 thd->lex->sql_command= SQLCOM_REPLACE;
949 /*
950 Do not raise the error flag in case of hitting to an unique attribute
951 */
952 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
953 /*
954 NDB specific: update from ndb master wrapped as Write_rows
955 */
956 /*
957 so that the event should be applied to replace slave's row
958 */
959 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
960 /*
961 NDB specific: if update from ndb master wrapped as Write_rows
962 does not find the row it's assumed idempotent binlog applying
963 is taking place; don't raise the error.
964 */
965 table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
966 /*
967 TODO: the cluster team (Tomas?) says that it's better if the engine knows
968 how many rows are going to be inserted, then it can allocate needed memory
969 from the start.
970 */
971 table->file->ha_start_bulk_insert(0);
972 return error;
973 }
974
975
do_after_row_operations(TABLE * table,int error)976 int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
977 {
978 int local_error= 0;
979 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
980 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
981 /*
982 reseting the extra with
983 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
984 fires bug#27077
985 todo: explain or fix
986 */
987 if ((local_error= table->file->ha_end_bulk_insert()))
988 {
989 table->file->print_error(local_error, MYF(0));
990 }
991 return error? error : local_error;
992 }
993
994
995 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)996 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
997 Relay_log_info const *rli,
998 TABLE *table,
999 uchar const *row_start,
1000 uchar const **row_end)
1001 {
1002 assert(table != NULL);
1003 assert(row_start && row_end);
1004
1005 int error;
1006 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1007 table, m_width, table->record[0],
1008 row_start, &m_cols, row_end, &m_master_reclength,
1009 table->write_set, binary_log::PRE_GA_WRITE_ROWS_EVENT);
1010 bitmap_copy(table->read_set, table->write_set);
1011 return error;
1012 }
1013
1014
do_exec_row(TABLE * table)1015 int Write_rows_log_event_old::do_exec_row(TABLE *table)
1016 {
1017 assert(table != NULL);
1018 int error= replace_record(thd, table, m_master_reclength, m_width);
1019 return error;
1020 }
1021
1022
1023 /**********************************************************
1024 Row handling primitives for Delete_rows_log_event_old
1025 **********************************************************/
1026
do_before_row_operations(TABLE * table)1027 int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
1028 {
1029 assert(m_memory == NULL);
1030
1031 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
1032 table->s->primary_key < MAX_KEY)
1033 {
1034 /*
1035 We don't need to allocate any memory for m_after_image and
1036 m_key since they are not used.
1037 */
1038 return 0;
1039 }
1040
1041 int error= 0;
1042
1043 if (table->s->keys > 0)
1044 {
1045 m_memory= (uchar*) my_multi_malloc(key_memory_log_event_old,
1046 MYF(MY_WME),
1047 &m_after_image,
1048 (uint) table->s->reclength,
1049 &m_key,
1050 (uint) table->key_info->key_length,
1051 NullS);
1052 }
1053 else
1054 {
1055 m_after_image= (uchar*) my_malloc(key_memory_log_event_old,
1056 table->s->reclength, MYF(MY_WME));
1057 m_memory= m_after_image;
1058 m_key= NULL;
1059 }
1060 if (!m_memory)
1061 return HA_ERR_OUT_OF_MEM;
1062
1063 return error;
1064 }
1065
1066
do_after_row_operations(TABLE * table,int error)1067 int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1068 {
1069 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1070 table->file->ha_index_or_rnd_end();
1071 my_free(m_memory); // Free for multi_malloc
1072 m_memory= NULL;
1073 m_after_image= NULL;
1074 m_key= NULL;
1075
1076 return error;
1077 }
1078
1079
1080 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1081 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
1082 Relay_log_info const *rli,
1083 TABLE *table,
1084 uchar const *row_start,
1085 uchar const **row_end)
1086 {
1087 int error;
1088 assert(row_start && row_end);
1089 /*
1090 This assertion actually checks that there is at least as many
1091 columns on the slave as on the master.
1092 */
1093 assert(table->s->fields >= m_width);
1094
1095 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1096 table, m_width, table->record[0],
1097 row_start, &m_cols, row_end, &m_master_reclength,
1098 table->read_set, binary_log::PRE_GA_DELETE_ROWS_EVENT);
1099 /*
1100 If we will access rows using the random access method, m_key will
1101 be set to NULL, so we do not need to make a key copy in that case.
1102 */
1103 if (m_key)
1104 {
1105 KEY *const key_info= table->key_info;
1106
1107 key_copy(m_key, table->record[0], key_info, 0);
1108 }
1109
1110 return error;
1111 }
1112
1113
do_exec_row(TABLE * table)1114 int Delete_rows_log_event_old::do_exec_row(TABLE *table)
1115 {
1116 int error;
1117 assert(table != NULL);
1118
1119 if (!(error= ::find_and_fetch_row(table, m_key)))
1120 {
1121 /*
1122 Now we should have the right row to delete. We are using
1123 record[0] since it is guaranteed to point to a record with the
1124 correct value.
1125 */
1126 error= table->file->ha_delete_row(table->record[0]);
1127 }
1128 return error;
1129 }
1130
1131
1132 /**********************************************************
1133 Row handling primitives for Update_rows_log_event_old
1134 **********************************************************/
1135
do_before_row_operations(TABLE * table)1136 int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
1137 {
1138 assert(m_memory == NULL);
1139
1140 int error= 0;
1141
1142 if (table->s->keys > 0)
1143 {
1144 m_memory= (uchar*) my_multi_malloc(key_memory_log_event_old,
1145 MYF(MY_WME),
1146 &m_after_image,
1147 (uint) table->s->reclength,
1148 &m_key,
1149 (uint) table->key_info->key_length,
1150 NullS);
1151 }
1152 else
1153 {
1154 m_after_image= (uchar*) my_malloc(key_memory_log_event_old,
1155 table->s->reclength, MYF(MY_WME));
1156 m_memory= m_after_image;
1157 m_key= NULL;
1158 }
1159 if (!m_memory)
1160 return HA_ERR_OUT_OF_MEM;
1161
1162 return error;
1163 }
1164
1165
do_after_row_operations(TABLE * table,int error)1166 int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1167 {
1168 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1169 table->file->ha_index_or_rnd_end();
1170 my_free(m_memory);
1171 m_memory= NULL;
1172 m_after_image= NULL;
1173 m_key= NULL;
1174
1175 return error;
1176 }
1177
1178
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1179 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1180 Relay_log_info const *rli,
1181 TABLE *table,
1182 uchar const *row_start,
1183 uchar const **row_end)
1184 {
1185 int error;
1186 assert(row_start && row_end);
1187 /*
1188 This assertion actually checks that there is at least as many
1189 columns on the slave as on the master.
1190 */
1191 assert(table->s->fields >= m_width);
1192
1193 /* record[0] is the before image for the update */
1194 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1195 table, m_width, table->record[0],
1196 row_start, &m_cols, row_end, &m_master_reclength,
1197 table->read_set, binary_log::PRE_GA_UPDATE_ROWS_EVENT);
1198 row_start = *row_end;
1199 /* m_after_image is the after image for the update */
1200 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1201 table, m_width, m_after_image,
1202 row_start, &m_cols, row_end, &m_master_reclength,
1203 table->write_set, binary_log::PRE_GA_UPDATE_ROWS_EVENT);
1204
1205 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1206 DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1207
1208 /*
1209 If we will access rows using the random access method, m_key will
1210 be set to NULL, so we do not need to make a key copy in that case.
1211 */
1212 if (m_key)
1213 {
1214 KEY *const key_info= table->key_info;
1215
1216 key_copy(m_key, table->record[0], key_info, 0);
1217 }
1218
1219 return error;
1220 }
1221
1222
do_exec_row(TABLE * table)1223 int Update_rows_log_event_old::do_exec_row(TABLE *table)
1224 {
1225 assert(table != NULL);
1226
1227 int error= ::find_and_fetch_row(table, m_key);
1228 if (error)
1229 return error;
1230
1231 /*
1232 We have to ensure that the new record (i.e., the after image) is
1233 in record[0] and the old record (i.e., the before image) is in
1234 record[1]. This since some storage engines require this (for
1235 example, the partition engine).
1236
1237 Since find_and_fetch_row() puts the fetched record (i.e., the old
1238 record) in record[1], we can keep it there. We put the new record
1239 (i.e., the after image) into record[0], and copy the fields that
1240 are on the slave (i.e., in record[1]) into record[0], effectively
1241 overwriting the default values that where put there by the
1242 unpack_row() function.
1243 */
1244 memcpy(table->record[0], m_after_image, table->s->reclength);
1245 copy_extra_record_fields(table, m_master_reclength, m_width);
1246
1247 /*
1248 Now we have the right row to update. The old row (the one we're
1249 looking for) is in record[1] and the new row has is in record[0].
1250 We also have copied the original values already in the slave's
1251 database into the after image delivered from the master.
1252 */
1253 error= table->file->ha_update_row(table->record[1], table->record[0]);
1254 if (error == HA_ERR_RECORD_IS_THE_SAME)
1255 error= 0;
1256
1257 return error;
1258 }
1259
1260 #endif
1261
1262
1263 /**************************************************************************
1264 Rows_log_event member functions
1265 **************************************************************************/
1266
1267 #ifndef MYSQL_CLIENT
1268 /**
1269 The event header and footer is constructed before constructing an object of
1270 a specific event type. The header and footer are constructed in the base
1271 class, Binary_log_event. They are accessed using the const methods header()
1272 and footer() defined in Binary_log_event.
1273
1274 This constructor calls the header() and footer() methods to initialize
1275 Log_event::common_header and Log_event::common_footer respectively.
1276
1277 We don't know the exact type of the event when we call
1278 this constructor, so passing ENUM_END_EVENT as the type here.
1279 */
Old_rows_log_event(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool using_trans)1280 Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1281 MY_BITMAP const *cols,
1282 bool using_trans)
1283 : Binary_log_event(binary_log::ENUM_END_EVENT),
1284 Log_event(thd_arg, 0,
1285 using_trans ? Log_event::EVENT_TRANSACTIONAL_CACHE :
1286 Log_event::EVENT_STMT_CACHE,
1287 Log_event::EVENT_NORMAL_LOGGING, header(),
1288 footer()),
1289 m_row_count(0),
1290 m_table(tbl_arg),
1291 m_table_id(tid),
1292 m_width(tbl_arg ? tbl_arg->s->fields : 1),
1293 m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1294 #ifdef HAVE_REPLICATION
1295 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1296 #endif
1297 {
1298
1299 // This constructor should not be reached.
1300 assert(0);
1301
1302 /*
1303 We allow a special form of dummy event when the table, and cols
1304 are null and the table id is ~0UL. This is a temporary
1305 solution, to be able to terminate a started statement in the
1306 binary log: the extraneous events will be removed in the future.
1307 */
1308 assert((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1309 (!tbl_arg && !cols && tid == ~0UL));
1310
1311 if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1312 set_flags(NO_FOREIGN_KEY_CHECKS_F);
1313 if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1314 set_flags(RELAXED_UNIQUE_CHECKS_F);
1315 /* if bitmap_init fails, caught in is_valid() */
1316 if (likely(!bitmap_init(&m_cols,
1317 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1318 m_width,
1319 false)))
1320 {
1321 /* Cols can be zero if this is a dummy binrows event */
1322 if (likely(cols != NULL))
1323 {
1324 memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1325 create_last_word_mask(&m_cols);
1326 }
1327 }
1328 else
1329 {
1330 // Needed because bitmap_init() does not set it to null on failure
1331 m_cols.bitmap= 0;
1332 }
1333 }
1334 #endif
1335
1336 Old_rows_log_event::
Old_rows_log_event(const char * buf,uint event_len,Log_event_type event_type,const Format_description_event * description_event)1337 Old_rows_log_event(const char *buf, uint event_len, Log_event_type event_type,
1338 const Format_description_event *description_event)
1339 : Binary_log_event(&buf, description_event->binlog_version,
1340 description_event->server_version),
1341 Log_event(header(), footer()),
1342 m_row_count(0),
1343 #ifndef MYSQL_CLIENT
1344 m_table(NULL),
1345 #endif
1346 m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1347 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1348 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1349 #endif
1350 {
1351 DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1352 uint8 const common_header_len= description_event->common_header_len;
1353 uint8 const post_header_len= description_event->post_header_len[event_type-1];
1354
1355 DBUG_PRINT("enter",("event_len: %u common_header_len: %d "
1356 "post_header_len: %d",
1357 event_len, common_header_len,
1358 post_header_len));
1359
1360 const char *post_start= buf + common_header_len;
1361 DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1362 post_start+= ROWS_MAPID_OFFSET;
1363 if (post_header_len == 6)
1364 {
1365 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1366 m_table_id= uint4korr(post_start);
1367 post_start+= 4;
1368 }
1369 else
1370 {
1371 m_table_id= (ulong) uint6korr(post_start);
1372 post_start+= ROWS_FLAGS_OFFSET;
1373 }
1374
1375 m_flags= uint2korr(post_start);
1376
1377 uchar const *const var_start=
1378 (const uchar *)buf + common_header_len + post_header_len;
1379 uchar const *const ptr_width= var_start;
1380 uchar *ptr_after_width= (uchar*) ptr_width;
1381 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1382 m_width = net_field_length(&ptr_after_width);
1383 DBUG_PRINT("debug", ("m_width=%lu", m_width));
1384 /* Avoid reading out of buffer */
1385 if (m_width + (ptr_after_width - (const uchar *)buf) > event_len)
1386 {
1387 m_cols.bitmap= NULL;
1388 DBUG_VOID_RETURN;
1389 }
1390
1391 /* if bitmap_init fails, catched in is_valid() */
1392 if (likely(!bitmap_init(&m_cols,
1393 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1394 m_width,
1395 false)))
1396 {
1397 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1398 memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1399 create_last_word_mask(&m_cols);
1400 ptr_after_width+= (m_width + 7) / 8;
1401 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1402 }
1403 else
1404 {
1405 // Needed because bitmap_init() does not set it to null on failure
1406 m_cols.bitmap= NULL;
1407 DBUG_VOID_RETURN;
1408 }
1409
1410 const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1411 size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1412 DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
1413 m_table_id.id(), m_flags, m_width, (ulong) data_size));
1414 DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1415
1416 m_rows_buf= (uchar*) my_malloc(key_memory_log_event_old,
1417 data_size, MYF(MY_WME));
1418 if (likely((bool)m_rows_buf))
1419 {
1420 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1421 m_curr_row= m_rows_buf;
1422 #endif
1423 m_rows_end= m_rows_buf + data_size;
1424 m_rows_cur= m_rows_end;
1425 memcpy(m_rows_buf, ptr_rows_data, data_size);
1426 }
1427 else
1428 m_cols.bitmap= 0; // to not free it
1429
1430 DBUG_VOID_RETURN;
1431 }
1432
1433
~Old_rows_log_event()1434 Old_rows_log_event::~Old_rows_log_event()
1435 {
1436 if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1437 m_cols.bitmap= 0; // so no my_free in bitmap_free
1438 bitmap_free(&m_cols); // To pair with bitmap_init().
1439 my_free(m_rows_buf);
1440 }
1441
1442
get_data_size()1443 size_t Old_rows_log_event::get_data_size()
1444 {
1445 uchar buf[sizeof(m_width)+1];
1446 uchar *end= net_store_length(buf, (m_width + 7) / 8);
1447
1448 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1449 return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
1450 (m_rows_cur - m_rows_buf););
1451 int data_size= Binary_log_event::ROWS_HEADER_LEN;
1452 data_size+= no_bytes_in_map(&m_cols);
1453 data_size+= (uint) (end - buf);
1454
1455 data_size+= (uint) (m_rows_cur - m_rows_buf);
1456 return data_size;
1457 }
1458
1459
1460 #ifndef MYSQL_CLIENT
do_add_row_data(uchar * row_data,size_t length)1461 int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1462 {
1463 /*
1464 When the table has a primary key, we would probably want, by default, to
1465 log only the primary key value instead of the entire "before image". This
1466 would save binlog space. TODO
1467 */
1468 DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1469 DBUG_PRINT("enter", ("row_data: 0x%lx length: %lu", (ulong) row_data,
1470 (ulong) length));
1471 DBUG_DUMP("row_data", row_data, min<size_t>(length, 32));
1472
1473 assert(m_rows_buf <= m_rows_cur);
1474 assert(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1475 assert(m_rows_cur <= m_rows_end);
1476
1477 /* The cast will always work since m_rows_cur <= m_rows_end */
1478 if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1479 {
1480 size_t const block_size= 1024;
1481 my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1482 my_ptrdiff_t const new_alloc=
1483 block_size * ((cur_size + length + block_size - 1) / block_size);
1484
1485 uchar* const new_buf= (uchar*)my_realloc(key_memory_log_event_old,
1486 m_rows_buf, (uint) new_alloc,
1487 MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1488 if (unlikely(!new_buf))
1489 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1490
1491 /* If the memory moved, we need to move the pointers */
1492 if (new_buf != m_rows_buf)
1493 {
1494 m_rows_buf= new_buf;
1495 m_rows_cur= m_rows_buf + cur_size;
1496 }
1497
1498 /*
1499 The end pointer should always be changed to point to the end of
1500 the allocated memory.
1501 */
1502 m_rows_end= m_rows_buf + new_alloc;
1503 }
1504
1505 assert(m_rows_cur + length <= m_rows_end);
1506 memcpy(m_rows_cur, row_data, length);
1507 m_rows_cur+= length;
1508 m_row_count++;
1509 DBUG_RETURN(0);
1510 }
1511 #endif
1512
1513
1514 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
do_apply_event(Relay_log_info const * rli)1515 int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
1516 {
1517 DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1518 int error= 0;
1519
1520 /*
1521 If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
1522 does not contain any data. In that case, we just remove all tables in the
1523 tables_to_lock list, close the thread tables, and return with
1524 success.
1525 */
1526 if ((m_table_id.id() == ~0U || m_table_id.id() == (~0ULL >> 16)) &&
1527 m_cols.n_bits == 1 && m_cols.bitmap[0] == 0)
1528 {
1529 /*
1530 This one is supposed to be set: just an extra check so that
1531 nothing strange has happened.
1532 */
1533 assert(get_flags(STMT_END_F));
1534
1535 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1536 thd->clear_error();
1537 DBUG_RETURN(0);
1538 }
1539
1540 /*
1541 'thd' has been set by exec_relay_log_event(), just before calling
1542 do_apply_event(). We still check here to prevent future coding
1543 errors.
1544 */
1545 assert(rli->info_thd == thd);
1546
1547 /*
1548 If there is no locks taken, this is the first binrow event seen
1549 after the table map events. We should then lock all the tables
1550 used in the transaction and proceed with execution of the actual
1551 event.
1552 */
1553 if (!thd->lock)
1554 {
1555 /*
1556 lock_tables() reads the contents of thd->lex, so they must be
1557 initialized. Contrary to in
1558 Table_map_log_event::do_apply_event() we don't call
1559 mysql_init_query() as that may reset the binlog format.
1560 */
1561 lex_start(thd);
1562
1563 if ((error= lock_tables(thd, rli->tables_to_lock,
1564 rli->tables_to_lock_count, 0)))
1565 {
1566 if (thd->is_slave_error || thd->is_fatal_error)
1567 {
1568 /*
1569 Error reporting borrowed from Query_log_event with many excessive
1570 simplifications (we don't honour --slave-skip-errors)
1571 */
1572 uint actual_error= thd->get_protocol_classic()->get_last_errno();
1573 rli->report(ERROR_LEVEL, actual_error,
1574 "Error '%s' in %s event: when locking tables",
1575 (actual_error ?
1576 thd->get_protocol_classic()->get_last_error() :
1577 "unexpected success or fatal error"),
1578 get_type_str());
1579 thd->is_fatal_error= 1;
1580 }
1581 else
1582 {
1583 rli->report(ERROR_LEVEL, error,
1584 "Error in %s event: when locking tables",
1585 get_type_str());
1586 }
1587 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1588 DBUG_RETURN(error);
1589 }
1590
1591 /*
1592 When the open and locking succeeded, we check all tables to
1593 ensure that they still have the correct type.
1594 */
1595
1596 {
1597 TABLE_LIST *table_list_ptr= rli->tables_to_lock;
1598 for (uint i=0; table_list_ptr&& (i< rli->tables_to_lock_count);
1599 table_list_ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr->next_global), i++)
1600 {
1601 /*
1602 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
1603 function for the explanation of the below if condition
1604 */
1605 if (table_list_ptr->parent_l)
1606 continue;
1607 /*
1608 We can use a down cast here since we know that every table added
1609 to the tables_to_lock is a RPL_TABLE_LIST (or child table which is
1610 skipped above).
1611 */
1612 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
1613 TABLE *conv_table;
1614 if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
1615 ptr->table, &conv_table))
1616 {
1617 thd->is_slave_error= 1;
1618 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1619 DBUG_RETURN(ERR_BAD_TABLE_DEF);
1620 }
1621 ptr->m_conv_table= conv_table;
1622 }
1623 }
1624
1625 /*
1626 ... and then we add all the tables to the table map but keep
1627 them in the tables to lock list.
1628
1629
1630 We also invalidate the query cache for all the tables, since
1631 they will now be changed.
1632
1633 TODO [/Matz]: Maybe the query cache should not be invalidated
1634 here? It might be that a table is not changed, even though it
1635 was locked for the statement. We do know that each
1636 Old_rows_log_event contain at least one row, so after processing one
1637 Old_rows_log_event, we can invalidate the query cache for the
1638 associated table.
1639 */
1640 for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
1641 {
1642 const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
1643 }
1644 query_cache.invalidate_locked_for_write(rli->tables_to_lock);
1645 }
1646
1647 TABLE*
1648 table=
1649 m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
1650
1651 if (table)
1652 {
1653 /*
1654 table == NULL means that this table should not be replicated
1655 (this was set up by Table_map_log_event::do_apply_event()
1656 which tested replicate-* rules).
1657 */
1658
1659 /*
1660 It's not needed to set_time() but
1661 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1662 much slave is behind
1663 2) it will be needed when we allow replication from a table with no
1664 TIMESTAMP column to a table with one.
1665 So we call set_time(), like in SBR. Presently it changes nothing.
1666 */
1667 thd->set_time(&(common_header->when));
1668 /*
1669 There are a few flags that are replicated with each row event.
1670 Make sure to set/clear them before executing the main body of
1671 the event.
1672 */
1673 if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1674 thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1675 else
1676 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1677
1678 if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1679 thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1680 else
1681 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1682 /* A small test to verify that objects have consistent types */
1683 assert(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1684
1685 /*
1686 Now we are in a statement and will stay in a statement until we
1687 see a STMT_END_F.
1688
1689 We set this flag here, before actually applying any rows, in
1690 case the SQL thread is stopped and we need to detect that we're
1691 inside a statement and halting abruptly might cause problems
1692 when restarting.
1693 */
1694 const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
1695
1696 if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1697 set_flags(COMPLETE_ROWS_F);
1698
1699 /*
1700 Set tables write and read sets.
1701
1702 Read_set contains all slave columns (in case we are going to fetch
1703 a complete record from slave)
1704
1705 Write_set equals the m_cols bitmap sent from master but it can be
1706 longer if slave has extra columns.
1707 */
1708
1709 DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1710
1711 bitmap_set_all(table->read_set);
1712 bitmap_set_all(table->write_set);
1713 if (!get_flags(COMPLETE_ROWS_F))
1714 bitmap_intersect(table->write_set,&m_cols);
1715
1716 // Do event specific preparations
1717
1718 error= do_before_row_operations(rli);
1719
1720 // row processing loop
1721
1722 while (error == 0 && m_curr_row < m_rows_end)
1723 {
1724 /* in_use can have been set to NULL in close_tables_for_reopen */
1725 THD* old_thd= table->in_use;
1726 if (!table->in_use)
1727 table->in_use= thd;
1728
1729 error= do_exec_row(rli);
1730
1731 DBUG_PRINT("info", ("error: %d", error));
1732 assert(error != HA_ERR_RECORD_DELETED);
1733
1734 table->in_use = old_thd;
1735 switch (error)
1736 {
1737 case 0:
1738 break;
1739
1740 /* Some recoverable errors */
1741 case HA_ERR_RECORD_CHANGED:
1742 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
1743 tuple does not exist */
1744 error= 0;
1745 break;
1746
1747 default:
1748 rli->report(ERROR_LEVEL,
1749 thd->get_protocol_classic()->get_last_errno(),
1750 "Error in %s event: row application failed. %s",
1751 get_type_str(), thd->get_protocol_classic()->get_last_error());
1752 thd->is_slave_error= 1;
1753 break;
1754 }
1755
1756 /*
1757 If m_curr_row_end was not set during event execution (e.g., because
1758 of errors) we can't proceed to the next row. If the error is transient
1759 (i.e., error==0 at this point) we must call unpack_current_row() to set
1760 m_curr_row_end.
1761 */
1762
1763 DBUG_PRINT("info", ("error: %d", error));
1764 DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
1765 (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
1766
1767 if (!m_curr_row_end && !error)
1768 unpack_current_row(rli);
1769
1770 // at this moment m_curr_row_end should be set
1771 assert(error || m_curr_row_end != NULL);
1772 assert(error || m_curr_row < m_curr_row_end);
1773 assert(error || m_curr_row_end <= m_rows_end);
1774
1775 m_curr_row= m_curr_row_end;
1776
1777 } // row processing loop
1778
1779 DBUG_EXECUTE_IF("stop_slave_middle_group",
1780 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1781 error= do_after_row_operations(rli, error);
1782 } // if (table)
1783
1784 if (error)
1785 { /* error has occured during the transaction */
1786 rli->report(ERROR_LEVEL,
1787 thd->get_protocol_classic()->get_last_errno(),
1788 "Error in %s event: error during transaction execution "
1789 "on table %s.%s. %s",
1790 get_type_str(), table->s->db.str, table->s->table_name.str,
1791 thd->get_protocol_classic()->get_last_error());
1792
1793 /*
1794 If one day we honour --skip-slave-errors in row-based replication, and
1795 the error should be skipped, then we would clear mappings, rollback,
1796 close tables, but the slave SQL thread would not stop and then may
1797 assume the mapping is still available, the tables are still open...
1798 So then we should clear mappings/rollback/close here only if this is a
1799 STMT_END_F.
1800 For now we code, knowing that error is not skippable and so slave SQL
1801 thread is certainly going to stop.
1802 rollback at the caller along with sbr.
1803 */
1804 thd->reset_current_stmt_binlog_format_row();
1805 const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
1806 thd->is_slave_error= 1;
1807 DBUG_RETURN(error);
1808 }
1809
1810 /*
1811 This code would ideally be placed in do_update_pos() instead, but
1812 since we have no access to table there, we do the setting of
1813 last_event_start_time here instead.
1814 */
1815 if (table && (table->s->primary_key == MAX_KEY) &&
1816 !is_using_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1817 {
1818 /*
1819 ------------ Temporary fix until WL#2975 is implemented ---------
1820
1821 This event is not the last one (no STMT_END_F). If we stop now
1822 (in case of terminate_slave_thread()), how will we restart? We
1823 have to restart from Table_map_log_event, but as this table is
1824 not transactional, the rows already inserted will still be
1825 present, and idempotency is not guaranteed (no PK) so we risk
1826 that repeating leads to double insert. So we desperately try to
1827 continue, hope we'll eventually leave this buggy situation (by
1828 executing the final Old_rows_log_event). If we are in a hopeless
1829 wait (reached end of last relay log and nothing gets appended
1830 there), we timeout after one minute, and notify DBA about the
1831 problem. When WL#2975 is implemented, just remove the member
1832 Relay_log_info::last_event_start_time and all its occurrences.
1833 */
1834 const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
1835 }
1836
1837 if (get_flags(STMT_END_F))
1838 {
1839 /*
1840 This is the end of a statement or transaction, so close (and
1841 unlock) the tables we opened when processing the
1842 Table_map_log_event starting the statement.
1843
1844 OBSERVER. This will clear *all* mappings, not only those that
1845 are open for the table. There is not good handle for on-close
1846 actions for tables.
1847
1848 NOTE. Even if we have no table ('table' == 0) we still need to be
1849 here, so that we increase the group relay log position. If we didn't, we
1850 could have a group relay log position which lags behind "forever"
1851 (assume the last master's transaction is ignored by the slave because of
1852 replicate-ignore rules).
1853 */
1854 int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1855
1856 /*
1857 If this event is not in a transaction, the call below will, if some
1858 transactional storage engines are involved, commit the statement into
1859 them and flush the pending event to binlog.
1860 If this event is in a transaction, the call will do nothing, but a
1861 Xid_log_event will come next which will, if some transactional engines
1862 are involved, commit the transaction and flush the pending event to the
1863 binlog.
1864 If there was a deadlock the transaction should have been rolled back
1865 already. So there should be no need to rollback the transaction.
1866 */
1867 assert(! thd->transaction_rollback_request);
1868 if ((error= (binlog_error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd))))
1869 rli->report(ERROR_LEVEL, error,
1870 "Error in %s event: commit of row events failed, "
1871 "table `%s`.`%s`",
1872 get_type_str(), m_table->s->db.str,
1873 m_table->s->table_name.str);
1874 error|= binlog_error;
1875
1876 /*
1877 Now what if this is not a transactional engine? we still need to
1878 flush the pending event to the binlog; we did it with
1879 thd->binlog_flush_pending_rows_event(). Note that we imitate
1880 what is done for real queries: a call to
1881 ha_autocommit_or_rollback() (sometimes only if involves a
1882 transactional engine), and a call to be sure to have the pending
1883 event flushed.
1884 */
1885
1886 thd->reset_current_stmt_binlog_format_row();
1887 const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
1888 }
1889
1890 DBUG_RETURN(error);
1891 }
1892
1893
1894 Log_event::enum_skip_reason
do_shall_skip(Relay_log_info * rli)1895 Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
1896 {
1897 /*
1898 If the slave skip counter is 1 and this event does not end a
1899 statement, then we should not start executing on the next event.
1900 Otherwise, we defer the decision to the normal skipping logic.
1901 */
1902 if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1903 return Log_event::EVENT_SKIP_IGNORE;
1904 else
1905 return Log_event::do_shall_skip(rli);
1906 }
1907
1908 int
do_update_pos(Relay_log_info * rli)1909 Old_rows_log_event::do_update_pos(Relay_log_info *rli)
1910 {
1911 DBUG_ENTER("Old_rows_log_event::do_update_pos");
1912 int error= 0;
1913
1914 DBUG_PRINT("info", ("flags: %s",
1915 get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1916
1917 if (get_flags(STMT_END_F))
1918 {
1919 /*
1920 Indicate that a statement is finished.
1921 Step the group log position if we are not in a transaction,
1922 otherwise increase the event log position.
1923 */
1924 rli->stmt_done(common_header->log_pos);
1925 /*
1926 Clear any errors in thd->net.last_err*. It is not known if this is
1927 needed or not. It is believed that any errors that may exist in
1928 thd->net.last_err* are allowed. Examples of errors are "key not
1929 found", which is produced in the test case rpl_row_conflicts.test
1930 */
1931 thd->clear_error();
1932 }
1933 else
1934 {
1935 rli->inc_event_relay_log_pos();
1936 }
1937
1938 DBUG_RETURN(error);
1939 }
1940
1941 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1942
1943
1944 #ifndef MYSQL_CLIENT
write_data_header(IO_CACHE * file)1945 bool Old_rows_log_event::write_data_header(IO_CACHE *file)
1946 {
1947 // This method should not be reached.
1948 assert(0);
1949 return TRUE;
1950 }
1951
1952
write_data_body(IO_CACHE * file)1953 bool Old_rows_log_event::write_data_body(IO_CACHE*file)
1954 {
1955 /*
1956 Note that this should be the number of *bits*, not the number of
1957 bytes.
1958 */
1959 uchar sbuf[sizeof(m_width)];
1960 my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1961
1962 // This method should not be reached.
1963 assert(0);
1964
1965 bool res= false;
1966 uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1967 assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1968
1969 DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1970 res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
1971
1972 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1973 res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
1974 no_bytes_in_map(&m_cols));
1975 DBUG_DUMP("rows", m_rows_buf, data_size);
1976 res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
1977
1978 return res;
1979
1980 }
1981 #endif
1982
1983
1984 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
pack_info(Protocol * protocol)1985 int Old_rows_log_event::pack_info(Protocol *protocol)
1986 {
1987 char buf[256];
1988 char const *const flagstr=
1989 get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1990 size_t bytes= my_snprintf(buf, sizeof(buf),
1991 "table_id: %llu%s", m_table_id.id(), flagstr);
1992 protocol->store(buf, bytes, &my_charset_bin);
1993 return 0;
1994 }
1995 #endif
1996
1997
1998 #ifdef MYSQL_CLIENT
print_helper(FILE * file,PRINT_EVENT_INFO * print_event_info,char const * const name)1999 void Old_rows_log_event::print_helper(FILE *file,
2000 PRINT_EVENT_INFO *print_event_info,
2001 char const *const name)
2002 {
2003 IO_CACHE *const head= &print_event_info->head_cache;
2004 IO_CACHE *const body= &print_event_info->body_cache;
2005 if (!print_event_info->short_form)
2006 {
2007 bool const last_stmt_event= get_flags(STMT_END_F);
2008 print_header(head, print_event_info, !last_stmt_event);
2009 my_b_printf(head, "\t%s: table id %llu%s\n",
2010 name, m_table_id.id(),
2011 last_stmt_event ? " flags: STMT_END_F" : "");
2012 print_base64(body, print_event_info, !last_stmt_event);
2013 }
2014 }
2015 #endif
2016
2017
2018 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2019 /**
2020 Write the current row into event's table.
2021
2022 The row is located in the row buffer, pointed by @c m_curr_row member.
2023 Number of columns of the row is stored in @c m_width member (it can be
2024 different from the number of columns in the table to which we insert).
2025 Bitmap @c m_cols indicates which columns are present in the row. It is assumed
2026 that event's table is already open and pointed by @c m_table.
2027
2028 If the same record already exists in the table it can be either overwritten
2029 or an error is reported depending on the value of @c overwrite flag
2030 (error reporting not yet implemented). Note that the matching record can be
2031 different from the row we insert if we use primary keys to identify records in
2032 the table.
2033
2034 The row to be inserted can contain values only for selected columns. The
2035 missing columns are filled with default values using @c prepare_record()
2036 function. If a matching record is found in the table and @c overwritte is
2037 true, the missing columns are taken from it.
2038
2039 @param rli Relay log info (needed for row unpacking).
2040 @param overwrite
2041 Shall we overwrite if the row already exists or signal
2042 error (currently ignored).
2043
2044 @returns Error code on failure, 0 on success.
2045
2046 This method, if successful, sets @c m_curr_row_end pointer to point at the
2047 next row in the rows buffer. This is done when unpacking the row to be
2048 inserted.
2049
2050 @note If a matching record is found, it is either updated using
2051 @c ha_update_row() or first deleted and then new record written.
2052 */
2053
2054 int
write_row(const Relay_log_info * const rli,const bool overwrite)2055 Old_rows_log_event::write_row(const Relay_log_info *const rli,
2056 const bool overwrite)
2057 {
2058 DBUG_ENTER("write_row");
2059 assert(m_table != NULL && thd != NULL);
2060
2061 TABLE *table= m_table; // pointer to event's table
2062 int error;
2063 int keynum;
2064 char *key= NULL;
2065
2066 /* fill table->record[0] with default values */
2067
2068 if ((error= prepare_record(table, table->write_set,
2069 TRUE /* check if columns have def. values */)))
2070 DBUG_RETURN(error);
2071
2072 /* unpack row into table->record[0] */
2073 error= unpack_current_row(rli); // TODO: how to handle errors?
2074
2075 #ifndef NDEBUG
2076 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2077 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
2078 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
2079 #endif
2080
2081 /*
2082 Try to write record. If a corresponding record already exists in the table,
2083 we try to change it using ha_update_row() if possible. Otherwise we delete
2084 it and repeat the whole process again.
2085
2086 TODO: Add safety measures against infinite looping.
2087 */
2088
2089 while ((error= table->file->ha_write_row(table->record[0])))
2090 {
2091 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
2092 {
2093 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
2094 DBUG_RETURN(error);
2095 }
2096 if ((keynum= table->file->get_dup_key(error)) < 0)
2097 {
2098 DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
2099 table->file->print_error(error, MYF(0));
2100 /*
2101 We failed to retrieve the duplicate key
2102 - either because the error was not "duplicate key" error
2103 - or because the information which key is not available
2104 */
2105 DBUG_RETURN(error);
2106 }
2107 /*
2108 key index value is either valid in the range [0-MAX_KEY) or
2109 has value MAX_KEY as a marker for the case when no information
2110 about key can be found. In the last case we have to require
2111 that storage engine has the flag HA_DUPLICATE_POS turned on.
2112 If this invariant is false then assert will crash
2113 the server built in debug mode. For the server that was built
2114 without DEBUG we have additional check for the value of key index
2115 in the code below in order to report about error in any case.
2116 */
2117 assert(keynum != MAX_KEY ||
2118 (keynum == MAX_KEY &&
2119 (table->file->ha_table_flags() & HA_DUPLICATE_POS)));
2120 /*
2121 We need to retrieve the old row into record[1] to be able to
2122 either update or delete the offending record. We either:
2123
2124 - use ha_rnd_pos() with a row-id (available as dupp_row) to the
2125 offending row, if that is possible (MyISAM and Blackhole), or else
2126
2127 - use ha_index_read_idx_map() with the key that is duplicated, to
2128 retrieve the offending row.
2129 */
2130 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
2131 {
2132 DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
2133 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
2134 if (error)
2135 {
2136 DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
2137 if (error == HA_ERR_RECORD_DELETED)
2138 error= HA_ERR_KEY_NOT_FOUND;
2139 table->file->print_error(error, MYF(0));
2140 DBUG_RETURN(error);
2141 }
2142 }
2143 else
2144 {
2145 DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
2146
2147 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2148 {
2149 DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
2150 DBUG_RETURN(my_errno());
2151 }
2152
2153 if (key == NULL)
2154 {
2155 key= static_cast<char*>(my_alloca(table->s->max_unique_length));
2156 if (key == NULL)
2157 {
2158 DBUG_PRINT("info",("Can't allocate key buffer"));
2159 DBUG_RETURN(ENOMEM);
2160 }
2161 }
2162
2163 if ((uint)keynum < MAX_KEY)
2164 {
2165 key_copy((uchar*)key, table->record[0], table->key_info + keynum,
2166 0);
2167 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2168 (const uchar*)key,
2169 HA_WHOLE_KEY,
2170 HA_READ_KEY_EXACT);
2171 }
2172 else
2173 /*
2174 For the server built in non-debug mode returns error if
2175 handler::get_dup_key() returned MAX_KEY as the value of key index.
2176 */
2177 error= HA_ERR_FOUND_DUPP_KEY;
2178
2179 if (error)
2180 {
2181 DBUG_PRINT("info",("ha_index_read_idx_map() returns error %d", error));
2182 if (error == HA_ERR_RECORD_DELETED)
2183 error= HA_ERR_KEY_NOT_FOUND;
2184 table->file->print_error(error, MYF(0));
2185 DBUG_RETURN(error);
2186 }
2187 }
2188
2189 /*
2190 Now, record[1] should contain the offending row. That
2191 will enable us to update it or, alternatively, delete it (so
2192 that we can insert the new row afterwards).
2193 */
2194
2195 /*
2196 If row is incomplete we will use the record found to fill
2197 missing columns.
2198 */
2199 if (!get_flags(COMPLETE_ROWS_F))
2200 {
2201 restore_record(table,record[1]);
2202 error= unpack_current_row(rli);
2203 }
2204
2205 #ifndef NDEBUG
2206 DBUG_PRINT("debug",("preparing for update: before and after image"));
2207 DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2208 DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2209 #endif
2210
2211 /*
2212 REPLACE is defined as either INSERT or DELETE + INSERT. If
2213 possible, we can replace it with an UPDATE, but that will not
2214 work on InnoDB if FOREIGN KEY checks are necessary.
2215
2216 I (Matz) am not sure of the reason for the last_uniq_key()
2217 check as, but I'm guessing that it's something along the
2218 following lines.
2219
2220 Suppose that we got the duplicate key to be a key that is not
2221 the last unique key for the table and we perform an update:
2222 then there might be another key for which the unique check will
2223 fail, so we're better off just deleting the row and inserting
2224 the correct row.
2225 */
2226 if (last_uniq_key(table, keynum) &&
2227 !table->file->referenced_by_foreign_key())
2228 {
2229 DBUG_PRINT("info",("Updating row using ha_update_row()"));
2230 error=table->file->ha_update_row(table->record[1],
2231 table->record[0]);
2232 switch (error) {
2233
2234 case HA_ERR_RECORD_IS_THE_SAME:
2235 DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2236 " ha_update_row()"));
2237 error= 0;
2238
2239 case 0:
2240 break;
2241
2242 default:
2243 DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2244 table->file->print_error(error, MYF(0));
2245 }
2246
2247 DBUG_RETURN(error);
2248 }
2249 else
2250 {
2251 DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2252 if ((error= table->file->ha_delete_row(table->record[1])))
2253 {
2254 DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2255 table->file->print_error(error, MYF(0));
2256 DBUG_RETURN(error);
2257 }
2258 /* Will retry ha_write_row() with the offending row removed. */
2259 }
2260 }
2261
2262 DBUG_RETURN(error);
2263 }
2264
2265
2266 /**
2267 Locate the current row in event's table.
2268
2269 The current row is pointed by @c m_curr_row. Member @c m_width tells how many
2270 columns are there in the row (this can be differnet from the number of columns
2271 in the table). It is assumed that event's table is already open and pointed
2272 by @c m_table.
2273
2274 If a corresponding record is found in the table it is stored in
2275 @c m_table->record[0]. Note that when record is located based on a primary
2276 key, it is possible that the record found differs from the row being located.
2277
2278 If no key is specified or table does not have keys, a table scan is used to
2279 find the row. In that case the row should be complete and contain values for
2280 all columns. However, it can still be shorter than the table, i.e. the table
2281 can contain extra columns not present in the row. It is also possible that
2282 the table has fewer columns than the row being located.
2283
2284 @returns Error code on failure, 0 on success.
2285
2286 @post In case of success @c m_table->record[0] contains the record found.
2287 Also, the internal "cursor" of the table is positioned at the record found.
2288
2289 @note If the engine allows random access of the records, a combination of
2290 @c position() and @c rnd_pos() will be used.
2291 */
2292
find_row(const Relay_log_info * rli)2293 int Old_rows_log_event::find_row(const Relay_log_info *rli)
2294 {
2295 DBUG_ENTER("find_row");
2296
2297 assert(m_table && m_table->in_use != NULL);
2298
2299 TABLE *table= m_table;
2300 int error;
2301
2302 /* unpack row - missing fields get default values */
2303
2304 // TODO: shall we check and report errors here?
2305 prepare_record(table, table->read_set, FALSE /* don't check errors */);
2306 error= unpack_current_row(rli);
2307
2308 #ifndef NDEBUG
2309 DBUG_PRINT("info",("looking for the following record"));
2310 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2311 #endif
2312
2313 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2314 table->s->primary_key < MAX_KEY)
2315 {
2316 /*
2317 Use a more efficient method to fetch the record given by
2318 table->record[0] if the engine allows it. We first compute a
2319 row reference using the position() member function (it will be
2320 stored in table->file->ref) and the use rnd_pos() to position
2321 the "cursor" (i.e., record[0] in this case) at the correct row.
2322
2323 TODO: Add a check that the correct record has been fetched by
2324 comparing with the original record. Take into account that the
2325 record on the master and slave can be of different
2326 length. Something along these lines should work:
2327
2328 ADD>>> store_record(table,record[1]);
2329 int error= table->file->rnd_pos(table->record[0], table->file->ref);
2330 ADD>>> assert(memcmp(table->record[1], table->record[0],
2331 table->s->reclength) == 0);
2332
2333 */
2334 DBUG_PRINT("info",("locating record using primary key (position)"));
2335 int error= table->file->rnd_pos_by_record(table->record[0]);
2336 if (error)
2337 {
2338 DBUG_PRINT("info",("rnd_pos returns error %d",error));
2339 if (error == HA_ERR_RECORD_DELETED)
2340 error= HA_ERR_KEY_NOT_FOUND;
2341 table->file->print_error(error, MYF(0));
2342 }
2343 DBUG_RETURN(error);
2344 }
2345
2346 // We can't use position() - try other methods.
2347
2348 /*
2349 We need to retrieve all fields
2350 TODO: Move this out from this function to main loop
2351 */
2352 table->use_all_columns();
2353
2354 /*
2355 Save copy of the record in table->record[1]. It might be needed
2356 later if linear search is used to find exact match.
2357 */
2358 store_record(table,record[1]);
2359
2360 if (table->s->keys > 0)
2361 {
2362 DBUG_PRINT("info",("locating record using primary key (index_read)"));
2363
2364 /* We have a key: search the table using the index */
2365 if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
2366 {
2367 DBUG_PRINT("info",("ha_index_init returns error %d",error));
2368 table->file->print_error(error, MYF(0));
2369 DBUG_RETURN(error);
2370 }
2371
2372 /* Fill key data for the row */
2373
2374 assert(m_key);
2375 key_copy(m_key, table->record[0], table->key_info, 0);
2376
2377 DBUG_DUMP("key data", m_key, table->key_info->key_length);
2378
2379 /*
2380 We need to set the null bytes to ensure that the filler bit are
2381 all set when returning. There are storage engines that just set
2382 the necessary bits on the bytes and don't set the filler bits
2383 correctly.
2384 */
2385 my_ptrdiff_t const pos=
2386 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2387 table->record[0][pos]= 0xFF;
2388
2389 if ((error= table->file->ha_index_read_map(table->record[0], m_key,
2390 HA_WHOLE_KEY,
2391 HA_READ_KEY_EXACT)))
2392 {
2393 DBUG_PRINT("info",("no record matching the key found in the table"));
2394 if (error == HA_ERR_RECORD_DELETED)
2395 error= HA_ERR_KEY_NOT_FOUND;
2396 table->file->print_error(error, MYF(0));
2397 table->file->ha_index_end();
2398 DBUG_RETURN(error);
2399 }
2400
2401 DBUG_PRINT("info",("found first matching record"));
2402 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2403 /*
2404 Below is a minor "optimization". If the key (i.e., key number
2405 0) has the HA_NOSAME flag set, we know that we have found the
2406 correct record (since there can be no duplicates); otherwise, we
2407 have to compare the record with the one found to see if it is
2408 the correct one.
2409
2410 CAVEAT! This behaviour is essential for the replication of,
2411 e.g., the mysql.proc table since the correct record *shall* be
2412 found using the primary key *only*. There shall be no
2413 comparison of non-PK columns to decide if the correct record is
2414 found. I can see no scenario where it would be incorrect to
2415 chose the row to change only using a PK or an UNNI.
2416 */
2417 if (table->key_info->flags & HA_NOSAME)
2418 {
2419 /* Unique does not have non nullable part */
2420 if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2421 {
2422 table->file->ha_index_end();
2423 DBUG_RETURN(0);
2424 }
2425 else
2426 {
2427 KEY *keyinfo= table->key_info;
2428 /*
2429 Unique has nullable part. We need to check if there is any field in the
2430 BI image that is null and part of UNNI.
2431 */
2432 bool null_found= FALSE;
2433 for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2434 {
2435 uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2436 Field **f= table->field+fieldnr;
2437 null_found= (*f)->is_null();
2438 }
2439
2440 if (!null_found)
2441 {
2442 table->file->ha_index_end();
2443 DBUG_RETURN(0);
2444 }
2445
2446 /* else fall through to index scan */
2447 }
2448 }
2449
2450 /*
2451 In case key is not unique, we still have to iterate over records found
2452 and find the one which is identical to the row given. A copy of the
2453 record we are looking for is stored in record[1].
2454 */
2455 DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2456
2457 while (record_compare(table))
2458 {
2459 /*
2460 We need to set the null bytes to ensure that the filler bit
2461 are all set when returning. There are storage engines that
2462 just set the necessary bits on the bytes and don't set the
2463 filler bits correctly.
2464
2465 TODO[record format ndb]: Remove this code once NDB returns the
2466 correct record format.
2467 */
2468 if (table->s->null_bytes > 0)
2469 {
2470 table->record[0][table->s->null_bytes - 1]|=
2471 256U - (1U << table->s->last_null_bit_pos);
2472 }
2473
2474 while ((error= table->file->ha_index_next(table->record[0])))
2475 {
2476 /* We just skip records that has already been deleted */
2477 if (error == HA_ERR_RECORD_DELETED)
2478 continue;
2479 DBUG_PRINT("info",("no record matching the given row found"));
2480 table->file->print_error(error, MYF(0));
2481 (void) table->file->ha_index_end();
2482 DBUG_RETURN(error);
2483 }
2484 }
2485
2486 /*
2487 Have to restart the scan to be able to fetch the next row.
2488 */
2489 table->file->ha_index_end();
2490 }
2491 else
2492 {
2493 DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
2494
2495 int restart_count= 0; // Number of times scanning has restarted from top
2496
2497 /* We don't have a key: search the table using ha_rnd_next() */
2498 if ((error= table->file->ha_rnd_init(1)))
2499 {
2500 DBUG_PRINT("info",("error initializing table scan"
2501 " (ha_rnd_init returns %d)",error));
2502 table->file->print_error(error, MYF(0));
2503 DBUG_RETURN(error);
2504 }
2505
2506 /* Continue until we find the right record or have made a full loop */
2507 do
2508 {
2509 restart_ha_rnd_next:
2510 error= table->file->ha_rnd_next(table->record[0]);
2511
2512 switch (error) {
2513
2514 case 0:
2515 break;
2516
2517 case HA_ERR_RECORD_DELETED:
2518 goto restart_ha_rnd_next;
2519
2520 case HA_ERR_END_OF_FILE:
2521 if (++restart_count < 2)
2522 {
2523 if ((error= table->file->ha_rnd_init(1)))
2524 {
2525 table->file->print_error(error, MYF(0));
2526 DBUG_RETURN(error);
2527 }
2528 goto restart_ha_rnd_next;
2529 }
2530 break;
2531
2532 default:
2533 DBUG_PRINT("info", ("Failed to get next record"
2534 " (ha_rnd_next returns %d)",error));
2535 table->file->print_error(error, MYF(0));
2536 table->file->ha_rnd_end();
2537 DBUG_RETURN(error);
2538 }
2539 }
2540 while (restart_count < 2 && record_compare(table));
2541
2542 /*
2543 Note: above record_compare will take into accout all record fields
2544 which might be incorrect in case a partial row was given in the event
2545 */
2546
2547 /*
2548 Have to restart the scan to be able to fetch the next row.
2549 */
2550 if (restart_count == 2)
2551 DBUG_PRINT("info", ("Record not found"));
2552 else
2553 DBUG_DUMP("record found", table->record[0], table->s->reclength);
2554 table->file->ha_rnd_end();
2555
2556 assert(error == HA_ERR_END_OF_FILE || error == 0);
2557 DBUG_RETURN(error);
2558 }
2559
2560 DBUG_RETURN(0);
2561 }
2562
2563 #endif
2564
2565
2566 /**************************************************************************
2567 Write_rows_log_event member functions
2568 **************************************************************************/
2569
2570 /*
2571 Constructor used to build an event for writing to the binary log.
2572 */
2573 #if !defined(MYSQL_CLIENT)
Write_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid_arg,MY_BITMAP const * cols,bool is_transactional)2574 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2575 TABLE *tbl_arg,
2576 ulong tid_arg,
2577 MY_BITMAP const *cols,
2578 bool is_transactional)
2579 : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2580 {
2581
2582 // This constructor should not be reached.
2583 assert(0);
2584
2585 }
2586 #endif
2587
2588
2589 /*
2590 Constructor used by slave to read the event from the binary log.
2591 */
2592 #ifdef HAVE_REPLICATION
2593 Write_rows_log_event_old::
Write_rows_log_event_old(const char * buf,uint event_len,const Format_description_event * description_event)2594 Write_rows_log_event_old(const char *buf, uint event_len,
2595 const Format_description_event *description_event)
2596 : Old_rows_log_event(buf, event_len, binary_log::PRE_GA_WRITE_ROWS_EVENT,
2597 description_event)
2598 {
2599 }
2600 #endif
2601
2602
2603 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2604 int
do_before_row_operations(const Slave_reporting_capability * const)2605 Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2606 {
2607 int error= 0;
2608
2609 /*
2610 We are using REPLACE semantics and not INSERT IGNORE semantics
2611 when writing rows, that is: new rows replace old rows. We need to
2612 inform the storage engine that it should use this behaviour.
2613 */
2614
2615 /* Tell the storage engine that we are using REPLACE semantics. */
2616 thd->lex->duplicates= DUP_REPLACE;
2617
2618 /*
2619 Pretend we're executing a REPLACE command: this is needed for
2620 InnoDB and NDB Cluster since they are not (properly) checking the
2621 lex->duplicates flag.
2622 */
2623 thd->lex->sql_command= SQLCOM_REPLACE;
2624 /*
2625 Do not raise the error flag in case of hitting to an unique attribute
2626 */
2627 m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2628 /*
2629 NDB specific: update from ndb master wrapped as Write_rows
2630 */
2631 /*
2632 so that the event should be applied to replace slave's row
2633 */
2634 m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2635 /*
2636 NDB specific: if update from ndb master wrapped as Write_rows
2637 does not find the row it's assumed idempotent binlog applying
2638 is taking place; don't raise the error.
2639 */
2640 m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2641 /*
2642 TODO: the cluster team (Tomas?) says that it's better if the engine knows
2643 how many rows are going to be inserted, then it can allocate needed memory
2644 from the start.
2645 */
2646 m_table->file->ha_start_bulk_insert(0);
2647 return error;
2648 }
2649
2650
2651 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2652 Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2653 int error)
2654 {
2655 int local_error= 0;
2656 m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2657 m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2658 /*
2659 reseting the extra with
2660 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2661 fires bug#27077
2662 todo: explain or fix
2663 */
2664 if ((local_error= m_table->file->ha_end_bulk_insert()))
2665 {
2666 m_table->file->print_error(local_error, MYF(0));
2667 }
2668 return error? error : local_error;
2669 }
2670
2671
2672 int
do_exec_row(const Relay_log_info * const rli)2673 Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2674 {
2675 assert(m_table != NULL);
2676 int error= write_row(rli, TRUE /* overwrite */);
2677
2678 if (error && !thd->get_protocol_classic()->get_last_errno())
2679 thd->get_protocol_classic()->set_last_errno(error);
2680
2681 return error;
2682 }
2683
2684 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2685
2686
2687 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2688 void Write_rows_log_event_old::print(FILE *file,
2689 PRINT_EVENT_INFO* print_event_info)
2690 {
2691 Old_rows_log_event::print_helper(file, print_event_info, "Write_rows_old");
2692 }
2693 #endif
2694
2695
2696 /**************************************************************************
2697 Delete_rows_log_event member functions
2698 **************************************************************************/
2699
2700 /*
2701 Constructor used to build an event for writing to the binary log.
2702 */
2703
2704 #ifndef MYSQL_CLIENT
Delete_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2705 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2706 TABLE *tbl_arg,
2707 ulong tid,
2708 MY_BITMAP const *cols,
2709 bool is_transactional)
2710 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2711 m_after_image(NULL), m_memory(NULL)
2712 {
2713
2714 // This constructor should not be reached.
2715 assert(0);
2716
2717 }
2718 #endif /* #if !defined(MYSQL_CLIENT) */
2719
2720
2721 /*
2722 Constructor used by slave to read the event from the binary log.
2723 */
2724 #ifdef HAVE_REPLICATION
2725 Delete_rows_log_event_old::
Delete_rows_log_event_old(const char * buf,uint event_len,const Format_description_event * description_event)2726 Delete_rows_log_event_old(const char *buf, uint event_len,
2727 const Format_description_event *description_event)
2728 : Old_rows_log_event(buf, event_len, binary_log::PRE_GA_DELETE_ROWS_EVENT,
2729 description_event),
2730 m_after_image(NULL), m_memory(NULL)
2731 {
2732 }
2733 #endif
2734
2735
2736 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2737
2738 int
do_before_row_operations(const Slave_reporting_capability * const)2739 Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2740 {
2741 if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2742 m_table->s->primary_key < MAX_KEY)
2743 {
2744 /*
2745 We don't need to allocate any memory for m_key since it is not used.
2746 */
2747 return 0;
2748 }
2749
2750 if (m_table->s->keys > 0)
2751 {
2752 // Allocate buffer for key searches
2753 m_key= (uchar*)my_malloc(key_memory_log_event_old,
2754 m_table->key_info->key_length, MYF(MY_WME));
2755 if (!m_key)
2756 return HA_ERR_OUT_OF_MEM;
2757 }
2758 return 0;
2759 }
2760
2761
2762 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2763 Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2764 int error)
2765 {
2766 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2767 m_table->file->ha_index_or_rnd_end();
2768 my_free(m_key);
2769 m_key= NULL;
2770
2771 return error;
2772 }
2773
2774
do_exec_row(const Relay_log_info * const rli)2775 int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2776 {
2777 int error;
2778 assert(m_table != NULL);
2779
2780 if (!(error= find_row(rli)))
2781 {
2782 /*
2783 Delete the record found, located in record[0]
2784 */
2785 error= m_table->file->ha_delete_row(m_table->record[0]);
2786 }
2787 return error;
2788 }
2789
2790 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2791
2792
2793 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2794 void Delete_rows_log_event_old::print(FILE *file,
2795 PRINT_EVENT_INFO* print_event_info)
2796 {
2797 Old_rows_log_event::print_helper(file, print_event_info, "Delete_rows_old");
2798 }
2799 #endif
2800
2801
2802 /**************************************************************************
2803 Update_rows_log_event member functions
2804 **************************************************************************/
2805
2806 /*
2807 Constructor used to build an event for writing to the binary log.
2808 */
2809 #if !defined(MYSQL_CLIENT)
Update_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2810 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2811 TABLE *tbl_arg,
2812 ulong tid,
2813 MY_BITMAP const *cols,
2814 bool is_transactional)
2815 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2816 m_after_image(NULL), m_memory(NULL)
2817 {
2818
2819 // This constructor should not be reached.
2820 assert(0);
2821 }
2822 #endif /* !defined(MYSQL_CLIENT) */
2823
2824
2825 /*
2826 Constructor used by slave to read the event from the binary log.
2827 */
2828 #ifdef HAVE_REPLICATION
2829 Update_rows_log_event_old::
Update_rows_log_event_old(const char * buf,uint event_len,const Format_description_event * description_event)2830 Update_rows_log_event_old(const char *buf, uint event_len,
2831 const Format_description_event
2832 *description_event)
2833 : Old_rows_log_event(buf, event_len, binary_log::PRE_GA_UPDATE_ROWS_EVENT,
2834 description_event),
2835 m_after_image(NULL), m_memory(NULL)
2836 {
2837 }
2838 #endif
2839
2840
2841 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2842
2843 int
do_before_row_operations(const Slave_reporting_capability * const)2844 Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2845 {
2846 if (m_table->s->keys > 0)
2847 {
2848 // Allocate buffer for key searches
2849 m_key= (uchar*)my_malloc(key_memory_log_event_old,
2850 m_table->key_info->key_length, MYF(MY_WME));
2851 if (!m_key)
2852 return HA_ERR_OUT_OF_MEM;
2853 }
2854
2855 return 0;
2856 }
2857
2858
2859 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2860 Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2861 int error)
2862 {
2863 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2864 m_table->file->ha_index_or_rnd_end();
2865 my_free(m_key); // Free for multi_malloc
2866 m_key= NULL;
2867
2868 return error;
2869 }
2870
2871
2872 int
do_exec_row(const Relay_log_info * const rli)2873 Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2874 {
2875 assert(m_table != NULL);
2876
2877 int error= find_row(rli);
2878 if (error)
2879 {
2880 /*
2881 We need to read the second image in the event of error to be
2882 able to skip to the next pair of updates
2883 */
2884 m_curr_row= m_curr_row_end;
2885 unpack_current_row(rli);
2886 return error;
2887 }
2888
2889 /*
2890 This is the situation after locating BI:
2891
2892 ===|=== before image ====|=== after image ===|===
2893 ^ ^
2894 m_curr_row m_curr_row_end
2895
2896 BI found in the table is stored in record[0]. We copy it to record[1]
2897 and unpack AI to record[0].
2898 */
2899
2900 store_record(m_table,record[1]);
2901
2902 m_curr_row= m_curr_row_end;
2903 error= unpack_current_row(rli); // this also updates m_curr_row_end
2904
2905 /*
2906 Now we have the right row to update. The old row (the one we're
2907 looking for) is in record[1] and the new row is in record[0].
2908 */
2909 DBUG_PRINT("info",("Updating row in table"));
2910 DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2911 DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2912
2913 error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2914 if (error == HA_ERR_RECORD_IS_THE_SAME)
2915 error= 0;
2916
2917 return error;
2918 }
2919
2920 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2921
2922
2923 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2924 void Update_rows_log_event_old::print(FILE *file,
2925 PRINT_EVENT_INFO* print_event_info)
2926 {
2927 Old_rows_log_event::print_helper(file, print_event_info, "Update_rows_old");
2928 }
2929 #endif
2930