1 /* Copyright (c) 2007, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql_priv.h"
24 #ifndef MYSQL_CLIENT
25 #include "unireg.h"
26 #endif
27 #include "my_global.h" // REQUIRED by log_event.h > m_string.h > my_bitmap.h
28 #include "log_event.h"
29 #ifndef MYSQL_CLIENT
30 #include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
31 #include "sql_base.h" // close_tables_for_reopen
32 #include "key.h" // key_copy
33 #include "lock.h" // mysql_unlock_tables
34 #include "sql_parse.h" // mysql_reset_thd_for_next_command
35 #include "rpl_rli.h"
36 #include "rpl_utility.h"
37 #endif
38 #include "log_event_old.h"
39 #include "rpl_record_old.h"
40 #include "transaction.h"
41
42 #include <algorithm>
43
44 using std::min;
45 using std::max;
46
47 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
48
49 // Old implementation of do_apply_event()
50 int
do_apply_event(Old_rows_log_event * ev,const Relay_log_info * rli)51 Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli)
52 {
53 DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
54 int error= 0;
55 THD *ev_thd= ev->thd;
56 uchar const *row_start= ev->m_rows_buf;
57
58 /*
59 If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
60 does not contain any data. In that case, we just remove all tables in the
61 tables_to_lock list, close the thread tables, and return with
62 success.
63 */
64 if ((ev->m_table_id.id() == ~0U || ev->m_table_id.id() == (~0ULL >> 16)) &&
65 ev->m_cols.n_bits == 1 && ev->m_cols.bitmap[0] == 0)
66
67 {
68 /*
69 This one is supposed to be set: just an extra check so that
70 nothing strange has happened.
71 */
72 DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
73
74 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
75 ev_thd->clear_error();
76 DBUG_RETURN(0);
77 }
78
79 /*
80 'ev_thd' has been set by exec_relay_log_event(), just before calling
81 do_apply_event(). We still check here to prevent future coding
82 errors.
83 */
84 DBUG_ASSERT(rli->info_thd == ev_thd);
85
86 /*
87 If there is no locks taken, this is the first binrow event seen
88 after the table map events. We should then lock all the tables
89 used in the transaction and proceed with execution of the actual
90 event.
91 */
92 if (!ev_thd->lock)
93 {
94 /*
95 Lock_tables() reads the contents of ev_thd->lex, so they must be
96 initialized.
97
98 We also call the mysql_reset_thd_for_next_command(), since this
99 is the logical start of the next "statement". Note that this
100 call might reset the value of current_stmt_binlog_format, so
101 we need to do any changes to that value after this function.
102 */
103 lex_start(ev_thd);
104 mysql_reset_thd_for_next_command(ev_thd);
105
106 /*
107 This is a row injection, so we flag the "statement" as
108 such. Note that this code is called both when the slave does row
109 injections and when the BINLOG statement is used to do row
110 injections.
111 */
112 ev_thd->lex->set_stmt_row_injection();
113
114 if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0))
115 {
116 if (ev_thd->is_error())
117 {
118 /*
119 Error reporting borrowed from Query_log_event with many excessive
120 simplifications (we don't honour --slave-skip-errors)
121 */
122 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
123 "Error '%s' on opening tables",
124 ev_thd->get_stmt_da()->message());
125 ev_thd->is_slave_error= 1;
126 }
127 DBUG_RETURN(1);
128 }
129
130 /*
131 When the open and locking succeeded, we check all tables to
132 ensure that they still have the correct type.
133 */
134
135 {
136 TABLE_LIST *table_list_ptr= rli->tables_to_lock;
137 for (uint i=0 ; table_list_ptr&& (i< rli->tables_to_lock_count);
138 table_list_ptr= table_list_ptr->next_global, i++)
139 {
140 /*
141 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
142 function for the explanation of the below if condition
143 */
144 if (table_list_ptr->parent_l)
145 continue;
146 /*
147 We can use a down cast here since we know that every table added
148 to the tables_to_lock is a RPL_TABLE_LIST(or child table which is
149 skipped above).
150 */
151 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
152 DBUG_ASSERT(ptr->m_tabledef_valid);
153 TABLE *conv_table;
154 if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
155 ptr->table, &conv_table))
156 {
157 ev_thd->is_slave_error= 1;
158 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
159 DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
160 }
161 DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
162 " - conv_table: %p",
163 ptr->table->s->db.str,
164 ptr->table->s->table_name.str, conv_table));
165 ptr->m_conv_table= conv_table;
166 }
167 }
168
169 /*
170 ... and then we add all the tables to the table map and remove
171 them from tables to lock.
172
173 We also invalidate the query cache for all the tables, since
174 they will now be changed.
175
176 TODO [/Matz]: Maybe the query cache should not be invalidated
177 here? It might be that a table is not changed, even though it
178 was locked for the statement. We do know that each
179 Old_rows_log_event contain at least one row, so after processing one
180 Old_rows_log_event, we can invalidate the query cache for the
181 associated table.
182 */
183 TABLE_LIST *ptr= rli->tables_to_lock;
184 for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
185 {
186 /*
187 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
188 function for the explanation of the below if condition
189 */
190 if (ptr->parent_l)
191 continue;
192 const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
193 }
194 #ifdef HAVE_QUERY_CACHE
195 query_cache.invalidate_locked_for_write(rli->tables_to_lock);
196 #endif
197 }
198
199 TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
200
201 if (table)
202 {
203 /*
204 table == NULL means that this table should not be replicated
205 (this was set up by Table_map_log_event::do_apply_event()
206 which tested replicate-* rules).
207 */
208
209 /*
210 It's not needed to set_time() but
211 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
212 much slave is behind
213 2) it will be needed when we allow replication from a table with no
214 TIMESTAMP column to a table with one.
215 So we call set_time(), like in SBR. Presently it changes nothing.
216 */
217 ev_thd->set_time(&ev->when);
218 /*
219 There are a few flags that are replicated with each row event.
220 Make sure to set/clear them before executing the main body of
221 the event.
222 */
223 if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
224 ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
225 else
226 ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
227
228 if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
229 ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
230 else
231 ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
232 /* A small test to verify that objects have consistent types */
233 DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
234
235 /*
236 Now we are in a statement and will stay in a statement until we
237 see a STMT_END_F.
238
239 We set this flag here, before actually applying any rows, in
240 case the SQL thread is stopped and we need to detect that we're
241 inside a statement and halting abruptly might cause problems
242 when restarting.
243 */
244 const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
245
246 error= do_before_row_operations(table);
247 while (error == 0 && row_start < ev->m_rows_end)
248 {
249 uchar const *row_end= NULL;
250 if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
251 break; // We should perform the after-row operation even in
252 // the case of error
253
254 DBUG_ASSERT(row_end != NULL); // cannot happen
255 DBUG_ASSERT(row_end <= ev->m_rows_end);
256
257 /* in_use can have been set to NULL in close_tables_for_reopen */
258 THD* old_thd= table->in_use;
259 if (!table->in_use)
260 table->in_use= ev_thd;
261 error= do_exec_row(table);
262 table->in_use = old_thd;
263 switch (error)
264 {
265 /* Some recoverable errors */
266 case HA_ERR_RECORD_CHANGED:
267 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
268 tuple does not exist */
269 error= 0;
270 case 0:
271 break;
272
273 default:
274 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
275 "Error in %s event: row application failed. %s",
276 ev->get_type_str(),
277 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
278 thd->is_slave_error= 1;
279 break;
280 }
281
282 row_start= row_end;
283 }
284 DBUG_EXECUTE_IF("stop_slave_middle_group",
285 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
286 error= do_after_row_operations(table, error);
287 }
288
289 if (error)
290 { /* error has occured during the transaction */
291 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
292 "Error in %s event: error during transaction execution "
293 "on table %s.%s. %s",
294 ev->get_type_str(), table->s->db.str,
295 table->s->table_name.str,
296 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
297
298 /*
299 If one day we honour --skip-slave-errors in row-based replication, and
300 the error should be skipped, then we would clear mappings, rollback,
301 close tables, but the slave SQL thread would not stop and then may
302 assume the mapping is still available, the tables are still open...
303 So then we should clear mappings/rollback/close here only if this is a
304 STMT_END_F.
305 For now we code, knowing that error is not skippable and so slave SQL
306 thread is certainly going to stop.
307 rollback at the caller along with sbr.
308 */
309 ev_thd->reset_current_stmt_binlog_format_row();
310 const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
311 ev_thd->is_slave_error= 1;
312 DBUG_RETURN(error);
313 }
314
315 DBUG_RETURN(0);
316 }
317 #endif
318
319
320 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
321
322 /*
323 Check if there are more UNIQUE keys after the given key.
324 */
325 static int
last_uniq_key(TABLE * table,uint keyno)326 last_uniq_key(TABLE *table, uint keyno)
327 {
328 while (++keyno < table->s->keys)
329 if (table->key_info[keyno].flags & HA_NOSAME)
330 return 0;
331 return 1;
332 }
333
334
335 /*
336 Compares table->record[0] and table->record[1]
337
338 Returns TRUE if different.
339 */
record_compare(TABLE * table)340 static bool record_compare(TABLE *table)
341 {
342 /*
343 Need to set the X bit and the filler bits in both records since
344 there are engines that do not set it correctly.
345
346 In addition, since MyISAM checks that one hasn't tampered with the
347 record, it is necessary to restore the old bytes into the record
348 after doing the comparison.
349
350 TODO[record format ndb]: Remove it once NDB returns correct
351 records. Check that the other engines also return correct records.
352 */
353
354 bool result= FALSE;
355 uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
356
357 if (table->s->null_bytes > 0)
358 {
359 for (int i = 0 ; i < 2 ; ++i)
360 {
361 /*
362 If we have an X bit then we need to take care of it.
363 */
364 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
365 {
366 saved_x[i]= table->record[i][0];
367 table->record[i][0]|= 1U;
368 }
369
370 /*
371 If (last_null_bit_pos == 0 && null_bytes > 1), then:
372
373 X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
374
375 Ie, the entire byte is used.
376 */
377 if (table->s->last_null_bit_pos > 0)
378 {
379 saved_filler[i]= table->record[i][table->s->null_bytes - 1];
380 table->record[i][table->s->null_bytes - 1]|=
381 256U - (1U << table->s->last_null_bit_pos);
382 }
383 }
384 }
385
386 if (table->s->blob_fields + table->s->varchar_fields == 0)
387 {
388 result= cmp_record(table,record[1]);
389 goto record_compare_exit;
390 }
391
392 /* Compare null bits */
393 if (memcmp(table->null_flags,
394 table->null_flags+table->s->rec_buff_length,
395 table->s->null_bytes))
396 {
397 result= TRUE; // Diff in NULL value
398 goto record_compare_exit;
399 }
400
401 /* Compare updated fields */
402 for (Field **ptr=table->field ; *ptr ; ptr++)
403 {
404 if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
405 {
406 result= TRUE;
407 goto record_compare_exit;
408 }
409 }
410
411 record_compare_exit:
412 /*
413 Restore the saved bytes.
414
415 TODO[record format ndb]: Remove this code once NDB returns the
416 correct record format.
417 */
418 if (table->s->null_bytes > 0)
419 {
420 for (int i = 0 ; i < 2 ; ++i)
421 {
422 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
423 table->record[i][0]= saved_x[i];
424
425 if (table->s->last_null_bit_pos > 0)
426 table->record[i][table->s->null_bytes - 1]= saved_filler[i];
427 }
428 }
429
430 return result;
431 }
432
433
434 /*
435 Copy "extra" columns from record[1] to record[0].
436
437 Copy the extra fields that are not present on the master but are
438 present on the slave from record[1] to record[0]. This is used
439 after fetching a record that are to be updated, either inside
440 replace_record() or as part of executing an update_row().
441 */
442 static int
copy_extra_record_fields(TABLE * table,size_t master_reclength,my_ptrdiff_t master_fields)443 copy_extra_record_fields(TABLE *table,
444 size_t master_reclength,
445 my_ptrdiff_t master_fields)
446 {
447 DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
448 DBUG_PRINT("info", ("Copying to 0x%lx "
449 "from field %lu at offset %lu "
450 "to field %d at offset %lu",
451 (long) table->record[0],
452 (ulong) master_fields, (ulong) master_reclength,
453 table->s->fields, table->s->reclength));
454 /*
455 Copying the extra fields of the slave that does not exist on
456 master into record[0] (which are basically the default values).
457 */
458
459 if (table->s->fields < (uint) master_fields)
460 DBUG_RETURN(0);
461
462 DBUG_ASSERT(master_reclength <= table->s->reclength);
463 if (master_reclength < table->s->reclength)
464 memcpy(table->record[0] + master_reclength,
465 table->record[1] + master_reclength,
466 table->s->reclength - master_reclength);
467
468 /*
469 Bit columns are special. We iterate over all the remaining
470 columns and copy the "extra" bits to the new record. This is
471 not a very good solution: it should be refactored on
472 opportunity.
473
474 REFACTORING SUGGESTION (Matz). Introduce a member function
475 similar to move_field_offset() called copy_field_offset() to
476 copy field values and implement it for all Field subclasses. Use
477 this function to copy data from the found record to the record
478 that are going to be inserted.
479
480 The copy_field_offset() function need to be a virtual function,
481 which in this case will prevent copying an entire range of
482 fields efficiently.
483 */
484 {
485 Field **field_ptr= table->field + master_fields;
486 for ( ; *field_ptr ; ++field_ptr)
487 {
488 /*
489 Set the null bit according to the values in record[1]
490 */
491 if ((*field_ptr)->maybe_null() &&
492 (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
493 (*field_ptr)->set_null();
494 else
495 (*field_ptr)->set_notnull();
496
497 /*
498 Do the extra work for special columns.
499 */
500 switch ((*field_ptr)->real_type())
501 {
502 default:
503 /* Nothing to do */
504 break;
505
506 case MYSQL_TYPE_BIT:
507 Field_bit *f= static_cast<Field_bit*>(*field_ptr);
508 if (f->bit_len > 0)
509 {
510 my_ptrdiff_t const offset= table->record[1] - table->record[0];
511 uchar const bits=
512 get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
513 set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
514 }
515 break;
516 }
517 }
518 }
519 DBUG_RETURN(0); // All OK
520 }
521
522
523 /*
524 Replace the provided record in the database.
525
526 SYNOPSIS
527 replace_record()
528 thd Thread context for writing the record.
529 table Table to which record should be written.
530 master_reclength
531 Offset to first column that is not present on the master,
532 alternatively the length of the record on the master
533 side.
534
535 RETURN VALUE
536 Error code on failure, 0 on success.
537
538 DESCRIPTION
539 Similar to how it is done in mysql_insert(), we first try to do
540 a ha_write_row() and of that fails due to duplicated keys (or
541 indices), we do an ha_update_row() or a ha_delete_row() instead.
542 */
543 static int
replace_record(THD * thd,TABLE * table,ulong const master_reclength,uint const master_fields)544 replace_record(THD *thd, TABLE *table,
545 ulong const master_reclength,
546 uint const master_fields)
547 {
548 DBUG_ENTER("replace_record");
549 DBUG_ASSERT(table != NULL && thd != NULL);
550
551 int error;
552 int keynum;
553 auto_afree_ptr<char> key(NULL);
554
555 #ifndef DBUG_OFF
556 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
557 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
558 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
559 #endif
560
561 while ((error= table->file->ha_write_row(table->record[0])))
562 {
563 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
564 {
565 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
566 DBUG_RETURN(error);
567 }
568 if ((keynum= table->file->get_dup_key(error)) < 0)
569 {
570 table->file->print_error(error, MYF(0));
571 /*
572 We failed to retrieve the duplicate key
573 - either because the error was not "duplicate key" error
574 - or because the information which key is not available
575 */
576 DBUG_RETURN(error);
577 }
578
579 /*
580 We need to retrieve the old row into record[1] to be able to
581 either update or delete the offending record. We either:
582
583 - use ha_rnd_pos() with a row-id (available as dupp_row) to the
584 offending row, if that is possible (MyISAM and Blackhole), or else
585
586 - use ha_index_read_idx_map() with the key that is duplicated, to
587 retrieve the offending row.
588 */
589 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
590 {
591 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
592 if (error)
593 {
594 DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
595 if (error == HA_ERR_RECORD_DELETED)
596 error= HA_ERR_KEY_NOT_FOUND;
597 table->file->print_error(error, MYF(0));
598 DBUG_RETURN(error);
599 }
600 }
601 else
602 {
603 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
604 {
605 DBUG_RETURN(my_errno);
606 }
607
608 if (key.get() == NULL)
609 {
610 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
611 if (key.get() == NULL)
612 DBUG_RETURN(ENOMEM);
613 }
614
615 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
616 0);
617 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
618 (const uchar*)key.get(),
619 HA_WHOLE_KEY,
620 HA_READ_KEY_EXACT);
621 if (error)
622 {
623 DBUG_PRINT("info", ("ha_index_read_idx_map() returns error %d", error));
624 if (error == HA_ERR_RECORD_DELETED)
625 error= HA_ERR_KEY_NOT_FOUND;
626 table->file->print_error(error, MYF(0));
627 DBUG_RETURN(error);
628 }
629 }
630
631 /*
632 Now, table->record[1] should contain the offending row. That
633 will enable us to update it or, alternatively, delete it (so
634 that we can insert the new row afterwards).
635
636 First we copy the columns into table->record[0] that are not
637 present on the master from table->record[1], if there are any.
638 */
639 copy_extra_record_fields(table, master_reclength, master_fields);
640
641 /*
642 REPLACE is defined as either INSERT or DELETE + INSERT. If
643 possible, we can replace it with an UPDATE, but that will not
644 work on InnoDB if FOREIGN KEY checks are necessary.
645
646 I (Matz) am not sure of the reason for the last_uniq_key()
647 check as, but I'm guessing that it's something along the
648 following lines.
649
650 Suppose that we got the duplicate key to be a key that is not
651 the last unique key for the table and we perform an update:
652 then there might be another key for which the unique check will
653 fail, so we're better off just deleting the row and inserting
654 the correct row.
655 */
656 if (last_uniq_key(table, keynum) &&
657 !table->file->referenced_by_foreign_key())
658 {
659 error=table->file->ha_update_row(table->record[1],
660 table->record[0]);
661 if (error && error != HA_ERR_RECORD_IS_THE_SAME)
662 table->file->print_error(error, MYF(0));
663 else
664 error= 0;
665 DBUG_RETURN(error);
666 }
667 else
668 {
669 if ((error= table->file->ha_delete_row(table->record[1])))
670 {
671 table->file->print_error(error, MYF(0));
672 DBUG_RETURN(error);
673 }
674 /* Will retry ha_write_row() with the offending row removed. */
675 }
676 }
677
678 DBUG_RETURN(error);
679 }
680
681
682 /**
683 Find the row given by 'key', if the table has keys, or else use a table scan
684 to find (and fetch) the row.
685
686 If the engine allows random access of the records, a combination of
687 position() and rnd_pos() will be used.
688
689 @param table Pointer to table to search
690 @param key Pointer to key to use for search, if table has key
691
692 @pre <code>table->record[0]</code> shall contain the row to locate
693 and <code>key</code> shall contain a key to use for searching, if
694 the engine has a key.
695
696 @post If the return value is zero, <code>table->record[1]</code>
697 will contain the fetched row and the internal "cursor" will refer to
698 the row. If the return value is non-zero,
699 <code>table->record[1]</code> is undefined. In either case,
700 <code>table->record[0]</code> is undefined.
701
702 @return Zero if the row was successfully fetched into
703 <code>table->record[1]</code>, error code otherwise.
704 */
705
find_and_fetch_row(TABLE * table,uchar * key)706 static int find_and_fetch_row(TABLE *table, uchar *key)
707 {
708 DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
709 DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx",
710 (long) table, (long) key, (long) table->record[1]));
711
712 DBUG_ASSERT(table->in_use != NULL);
713
714 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
715
716 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
717 table->s->primary_key < MAX_KEY)
718 {
719 /*
720 Use a more efficient method to fetch the record given by
721 table->record[0] if the engine allows it. We first compute a
722 row reference using the position() member function (it will be
723 stored in table->file->ref) and the use rnd_pos() to position
724 the "cursor" (i.e., record[0] in this case) at the correct row.
725
726 TODO: Add a check that the correct record has been fetched by
727 comparing with the original record. Take into account that the
728 record on the master and slave can be of different
729 length. Something along these lines should work:
730
731 ADD>>> store_record(table,record[1]);
732 int error= table->file->rnd_pos(table->record[0], table->file->ref);
733 ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
734 table->s->reclength) == 0);
735
736 */
737 table->file->position(table->record[0]);
738 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
739 /*
740 ha_rnd_pos() returns the record in table->record[0], so we have to
741 move it to table->record[1].
742 */
743 memcpy(table->record[1], table->record[0], table->s->reclength);
744 DBUG_RETURN(error);
745 }
746
747 /* We need to retrieve all fields */
748 /* TODO: Move this out from this function to main loop */
749 table->use_all_columns();
750
751 if (table->s->keys > 0)
752 {
753 int error;
754 /* We have a key: search the table using the index */
755 if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
756 {
757 table->file->print_error(error, MYF(0));
758 DBUG_RETURN(error);
759 }
760
761 /*
762 Don't print debug messages when running valgrind since they can
763 trigger false warnings.
764 */
765 #ifndef HAVE_purify
766 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
767 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
768 #endif
769
770 /*
771 We need to set the null bytes to ensure that the filler bit are
772 all set when returning. There are storage engines that just set
773 the necessary bits on the bytes and don't set the filler bits
774 correctly.
775 */
776 my_ptrdiff_t const pos=
777 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
778 table->record[1][pos]= 0xFF;
779 if ((error= table->file->ha_index_read_map(table->record[1], key, HA_WHOLE_KEY,
780 HA_READ_KEY_EXACT)))
781 {
782 table->file->print_error(error, MYF(0));
783 table->file->ha_index_end();
784 DBUG_RETURN(error);
785 }
786
787 /*
788 Don't print debug messages when running valgrind since they can
789 trigger false warnings.
790 */
791 #ifndef HAVE_purify
792 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
793 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
794 #endif
795 /*
796 Below is a minor "optimization". If the key (i.e., key number
797 0) has the HA_NOSAME flag set, we know that we have found the
798 correct record (since there can be no duplicates); otherwise, we
799 have to compare the record with the one found to see if it is
800 the correct one.
801
802 CAVEAT! This behaviour is essential for the replication of,
803 e.g., the mysql.proc table since the correct record *shall* be
804 found using the primary key *only*. There shall be no
805 comparison of non-PK columns to decide if the correct record is
806 found. I can see no scenario where it would be incorrect to
807 chose the row to change only using a PK or an UNNI.
808 */
809 if (table->key_info->flags & HA_NOSAME)
810 {
811 table->file->ha_index_end();
812 DBUG_RETURN(0);
813 }
814
815 while (record_compare(table))
816 {
817 int error;
818
819 /*
820 We need to set the null bytes to ensure that the filler bit
821 are all set when returning. There are storage engines that
822 just set the necessary bits on the bytes and don't set the
823 filler bits correctly.
824
825 TODO[record format ndb]: Remove this code once NDB returns the
826 correct record format.
827 */
828 if (table->s->null_bytes > 0)
829 {
830 table->record[1][table->s->null_bytes - 1]|=
831 256U - (1U << table->s->last_null_bit_pos);
832 }
833
834 while ((error= table->file->ha_index_next(table->record[1])))
835 {
836 /* We just skip records that has already been deleted */
837 if (error == HA_ERR_RECORD_DELETED)
838 continue;
839 table->file->print_error(error, MYF(0));
840 table->file->ha_index_end();
841 DBUG_RETURN(error);
842 }
843 }
844
845 /*
846 Have to restart the scan to be able to fetch the next row.
847 */
848 table->file->ha_index_end();
849 }
850 else
851 {
852 int restart_count= 0; // Number of times scanning has restarted from top
853 int error;
854
855 /* We don't have a key: search the table using ha_rnd_next() */
856 if ((error= table->file->ha_rnd_init(1)))
857 {
858 table->file->print_error(error, MYF(0));
859 DBUG_RETURN(error);
860 }
861
862 /* Continue until we find the right record or have made a full loop */
863 do
864 {
865 restart_ha_rnd_next:
866 error= table->file->ha_rnd_next(table->record[1]);
867
868 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
869 DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
870
871 switch (error) {
872 case 0:
873 break;
874
875 /*
876 If the record was deleted, we pick the next one without doing
877 any comparisons.
878 */
879 case HA_ERR_RECORD_DELETED:
880 goto restart_ha_rnd_next;
881
882 case HA_ERR_END_OF_FILE:
883 if (++restart_count < 2)
884 {
885 if ((error= table->file->ha_rnd_init(1)))
886 {
887 table->file->print_error(error, MYF(0));
888 DBUG_RETURN(error);
889 }
890 }
891 break;
892
893 default:
894 table->file->print_error(error, MYF(0));
895 DBUG_PRINT("info", ("Record not found"));
896 (void) table->file->ha_rnd_end();
897 DBUG_RETURN(error);
898 }
899 }
900 while (restart_count < 2 && record_compare(table));
901
902 /*
903 Have to restart the scan to be able to fetch the next row.
904 */
905 DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
906 table->file->ha_rnd_end();
907
908 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
909 DBUG_RETURN(error);
910 }
911
912 DBUG_RETURN(0);
913 }
914
915
916 /**********************************************************
917 Row handling primitives for Write_rows_log_event_old
918 **********************************************************/
919
do_before_row_operations(TABLE * table)920 int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
921 {
922 int error= 0;
923
924 /*
925 We are using REPLACE semantics and not INSERT IGNORE semantics
926 when writing rows, that is: new rows replace old rows. We need to
927 inform the storage engine that it should use this behaviour.
928 */
929
930 /* Tell the storage engine that we are using REPLACE semantics. */
931 thd->lex->duplicates= DUP_REPLACE;
932
933 /*
934 Pretend we're executing a REPLACE command: this is needed for
935 InnoDB and NDB Cluster since they are not (properly) checking the
936 lex->duplicates flag.
937 */
938 thd->lex->sql_command= SQLCOM_REPLACE;
939 /*
940 Do not raise the error flag in case of hitting to an unique attribute
941 */
942 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
943 /*
944 NDB specific: update from ndb master wrapped as Write_rows
945 */
946 /*
947 so that the event should be applied to replace slave's row
948 */
949 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
950 /*
951 NDB specific: if update from ndb master wrapped as Write_rows
952 does not find the row it's assumed idempotent binlog applying
953 is taking place; don't raise the error.
954 */
955 table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
956 /*
957 TODO: the cluster team (Tomas?) says that it's better if the engine knows
958 how many rows are going to be inserted, then it can allocate needed memory
959 from the start.
960 */
961 table->file->ha_start_bulk_insert(0);
962 return error;
963 }
964
965
do_after_row_operations(TABLE * table,int error)966 int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
967 {
968 int local_error= 0;
969 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
970 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
971 /*
972 reseting the extra with
973 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
974 fires bug#27077
975 todo: explain or fix
976 */
977 if ((local_error= table->file->ha_end_bulk_insert()))
978 {
979 table->file->print_error(local_error, MYF(0));
980 }
981 return error? error : local_error;
982 }
983
984
985 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)986 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
987 Relay_log_info const *rli,
988 TABLE *table,
989 uchar const *row_start,
990 uchar const **row_end)
991 {
992 DBUG_ASSERT(table != NULL);
993 DBUG_ASSERT(row_start && row_end);
994
995 int error;
996 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
997 table, m_width, table->record[0],
998 row_start, &m_cols, row_end, &m_master_reclength,
999 table->write_set, PRE_GA_WRITE_ROWS_EVENT);
1000 bitmap_copy(table->read_set, table->write_set);
1001 return error;
1002 }
1003
1004
do_exec_row(TABLE * table)1005 int Write_rows_log_event_old::do_exec_row(TABLE *table)
1006 {
1007 DBUG_ASSERT(table != NULL);
1008 int error= replace_record(thd, table, m_master_reclength, m_width);
1009 return error;
1010 }
1011
1012
1013 /**********************************************************
1014 Row handling primitives for Delete_rows_log_event_old
1015 **********************************************************/
1016
do_before_row_operations(TABLE * table)1017 int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
1018 {
1019 DBUG_ASSERT(m_memory == NULL);
1020
1021 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
1022 table->s->primary_key < MAX_KEY)
1023 {
1024 /*
1025 We don't need to allocate any memory for m_after_image and
1026 m_key since they are not used.
1027 */
1028 return 0;
1029 }
1030
1031 int error= 0;
1032
1033 if (table->s->keys > 0)
1034 {
1035 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1036 &m_after_image,
1037 (uint) table->s->reclength,
1038 &m_key,
1039 (uint) table->key_info->key_length,
1040 NullS);
1041 }
1042 else
1043 {
1044 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1045 m_memory= (uchar*)m_after_image;
1046 m_key= NULL;
1047 }
1048 if (!m_memory)
1049 return HA_ERR_OUT_OF_MEM;
1050
1051 return error;
1052 }
1053
1054
do_after_row_operations(TABLE * table,int error)1055 int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1056 {
1057 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1058 table->file->ha_index_or_rnd_end();
1059 my_free(m_memory); // Free for multi_malloc
1060 m_memory= NULL;
1061 m_after_image= NULL;
1062 m_key= NULL;
1063
1064 return error;
1065 }
1066
1067
1068 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1069 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
1070 Relay_log_info const *rli,
1071 TABLE *table,
1072 uchar const *row_start,
1073 uchar const **row_end)
1074 {
1075 int error;
1076 DBUG_ASSERT(row_start && row_end);
1077 /*
1078 This assertion actually checks that there is at least as many
1079 columns on the slave as on the master.
1080 */
1081 DBUG_ASSERT(table->s->fields >= m_width);
1082
1083 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1084 table, m_width, table->record[0],
1085 row_start, &m_cols, row_end, &m_master_reclength,
1086 table->read_set, PRE_GA_DELETE_ROWS_EVENT);
1087 /*
1088 If we will access rows using the random access method, m_key will
1089 be set to NULL, so we do not need to make a key copy in that case.
1090 */
1091 if (m_key)
1092 {
1093 KEY *const key_info= table->key_info;
1094
1095 key_copy(m_key, table->record[0], key_info, 0);
1096 }
1097
1098 return error;
1099 }
1100
1101
do_exec_row(TABLE * table)1102 int Delete_rows_log_event_old::do_exec_row(TABLE *table)
1103 {
1104 int error;
1105 DBUG_ASSERT(table != NULL);
1106
1107 if (!(error= ::find_and_fetch_row(table, m_key)))
1108 {
1109 /*
1110 Now we should have the right row to delete. We are using
1111 record[0] since it is guaranteed to point to a record with the
1112 correct value.
1113 */
1114 error= table->file->ha_delete_row(table->record[0]);
1115 }
1116 return error;
1117 }
1118
1119
1120 /**********************************************************
1121 Row handling primitives for Update_rows_log_event_old
1122 **********************************************************/
1123
do_before_row_operations(TABLE * table)1124 int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
1125 {
1126 DBUG_ASSERT(m_memory == NULL);
1127
1128 int error= 0;
1129
1130 if (table->s->keys > 0)
1131 {
1132 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1133 &m_after_image,
1134 (uint) table->s->reclength,
1135 &m_key,
1136 (uint) table->key_info->key_length,
1137 NullS);
1138 }
1139 else
1140 {
1141 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1142 m_memory= m_after_image;
1143 m_key= NULL;
1144 }
1145 if (!m_memory)
1146 return HA_ERR_OUT_OF_MEM;
1147
1148 return error;
1149 }
1150
1151
do_after_row_operations(TABLE * table,int error)1152 int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1153 {
1154 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1155 table->file->ha_index_or_rnd_end();
1156 my_free(m_memory);
1157 m_memory= NULL;
1158 m_after_image= NULL;
1159 m_key= NULL;
1160
1161 return error;
1162 }
1163
1164
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1165 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1166 Relay_log_info const *rli,
1167 TABLE *table,
1168 uchar const *row_start,
1169 uchar const **row_end)
1170 {
1171 int error;
1172 DBUG_ASSERT(row_start && row_end);
1173 /*
1174 This assertion actually checks that there is at least as many
1175 columns on the slave as on the master.
1176 */
1177 DBUG_ASSERT(table->s->fields >= m_width);
1178
1179 /* record[0] is the before image for the update */
1180 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1181 table, m_width, table->record[0],
1182 row_start, &m_cols, row_end, &m_master_reclength,
1183 table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
1184 row_start = *row_end;
1185 /* m_after_image is the after image for the update */
1186 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1187 table, m_width, m_after_image,
1188 row_start, &m_cols, row_end, &m_master_reclength,
1189 table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
1190
1191 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1192 DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1193
1194 /*
1195 If we will access rows using the random access method, m_key will
1196 be set to NULL, so we do not need to make a key copy in that case.
1197 */
1198 if (m_key)
1199 {
1200 KEY *const key_info= table->key_info;
1201
1202 key_copy(m_key, table->record[0], key_info, 0);
1203 }
1204
1205 return error;
1206 }
1207
1208
do_exec_row(TABLE * table)1209 int Update_rows_log_event_old::do_exec_row(TABLE *table)
1210 {
1211 DBUG_ASSERT(table != NULL);
1212
1213 int error= ::find_and_fetch_row(table, m_key);
1214 if (error)
1215 return error;
1216
1217 /*
1218 We have to ensure that the new record (i.e., the after image) is
1219 in record[0] and the old record (i.e., the before image) is in
1220 record[1]. This since some storage engines require this (for
1221 example, the partition engine).
1222
1223 Since find_and_fetch_row() puts the fetched record (i.e., the old
1224 record) in record[1], we can keep it there. We put the new record
1225 (i.e., the after image) into record[0], and copy the fields that
1226 are on the slave (i.e., in record[1]) into record[0], effectively
1227 overwriting the default values that where put there by the
1228 unpack_row() function.
1229 */
1230 memcpy(table->record[0], m_after_image, table->s->reclength);
1231 copy_extra_record_fields(table, m_master_reclength, m_width);
1232
1233 /*
1234 Now we have the right row to update. The old row (the one we're
1235 looking for) is in record[1] and the new row has is in record[0].
1236 We also have copied the original values already in the slave's
1237 database into the after image delivered from the master.
1238 */
1239 error= table->file->ha_update_row(table->record[1], table->record[0]);
1240 if (error == HA_ERR_RECORD_IS_THE_SAME)
1241 error= 0;
1242
1243 return error;
1244 }
1245
1246 #endif
1247
1248
1249 /**************************************************************************
1250 Rows_log_event member functions
1251 **************************************************************************/
1252
1253 #ifndef MYSQL_CLIENT
Old_rows_log_event(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool using_trans)1254 Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1255 MY_BITMAP const *cols,
1256 bool using_trans)
1257 : Log_event(thd_arg, 0,
1258 using_trans ? Log_event::EVENT_TRANSACTIONAL_CACHE :
1259 Log_event::EVENT_STMT_CACHE,
1260 Log_event::EVENT_NORMAL_LOGGING),
1261 m_row_count(0),
1262 m_table(tbl_arg),
1263 m_table_id(tid),
1264 m_width(tbl_arg ? tbl_arg->s->fields : 1),
1265 m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1266 #ifdef HAVE_REPLICATION
1267 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1268 #endif
1269 {
1270
1271 // This constructor should not be reached.
1272 assert(0);
1273
1274 /*
1275 We allow a special form of dummy event when the table, and cols
1276 are null and the table id is ~0UL. This is a temporary
1277 solution, to be able to terminate a started statement in the
1278 binary log: the extraneous events will be removed in the future.
1279 */
1280 DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1281 (!tbl_arg && !cols && tid == ~0UL));
1282
1283 if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1284 set_flags(NO_FOREIGN_KEY_CHECKS_F);
1285 if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1286 set_flags(RELAXED_UNIQUE_CHECKS_F);
1287 /* if bitmap_init fails, caught in is_valid() */
1288 if (likely(!bitmap_init(&m_cols,
1289 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1290 m_width,
1291 false)))
1292 {
1293 /* Cols can be zero if this is a dummy binrows event */
1294 if (likely(cols != NULL))
1295 {
1296 memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1297 create_last_word_mask(&m_cols);
1298 }
1299 }
1300 else
1301 {
1302 // Needed because bitmap_init() does not set it to null on failure
1303 m_cols.bitmap= 0;
1304 }
1305 }
1306 #endif
1307
1308
Old_rows_log_event(const char * buf,uint event_len,Log_event_type event_type,const Format_description_log_event * description_event)1309 Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len,
1310 Log_event_type event_type,
1311 const Format_description_log_event
1312 *description_event)
1313 : Log_event(buf, description_event),
1314 m_row_count(0),
1315 #ifndef MYSQL_CLIENT
1316 m_table(NULL),
1317 #endif
1318 m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1319 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1320 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1321 #endif
1322 {
1323 DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1324 uint8 const common_header_len= description_event->common_header_len;
1325 uint8 const post_header_len= description_event->post_header_len[event_type-1];
1326
1327 DBUG_PRINT("enter",("event_len: %u common_header_len: %d "
1328 "post_header_len: %d",
1329 event_len, common_header_len,
1330 post_header_len));
1331
1332 const char *post_start= buf + common_header_len;
1333 DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1334 post_start+= RW_MAPID_OFFSET;
1335 if (post_header_len == 6)
1336 {
1337 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1338 m_table_id= uint4korr(post_start);
1339 post_start+= 4;
1340 }
1341 else
1342 {
1343 m_table_id= (ulong) uint6korr(post_start);
1344 post_start+= RW_FLAGS_OFFSET;
1345 }
1346
1347 m_flags= uint2korr(post_start);
1348
1349 uchar const *const var_start=
1350 (const uchar *)buf + common_header_len + post_header_len;
1351 uchar const *const ptr_width= var_start;
1352 uchar *ptr_after_width= (uchar*) ptr_width;
1353 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1354 m_width = net_field_length(&ptr_after_width);
1355 DBUG_PRINT("debug", ("m_width=%lu", m_width));
1356 /* Avoid reading out of buffer */
1357 if (static_cast<unsigned int>(m_width +
1358 (ptr_after_width -
1359 (const uchar *)buf)) > event_len)
1360 {
1361 m_cols.bitmap= NULL;
1362 DBUG_VOID_RETURN;
1363 }
1364
1365 /* if bitmap_init fails, catched in is_valid() */
1366 if (likely(!bitmap_init(&m_cols,
1367 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1368 m_width,
1369 false)))
1370 {
1371 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1372 memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1373 create_last_word_mask(&m_cols);
1374 ptr_after_width+= (m_width + 7) / 8;
1375 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1376 }
1377 else
1378 {
1379 // Needed because bitmap_init() does not set it to null on failure
1380 m_cols.bitmap= NULL;
1381 DBUG_VOID_RETURN;
1382 }
1383
1384 const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1385 size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1386 DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
1387 m_table_id.id(), m_flags, m_width, (ulong) data_size));
1388 DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1389
1390 m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
1391 if (likely((bool)m_rows_buf))
1392 {
1393 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1394 m_curr_row= m_rows_buf;
1395 #endif
1396 m_rows_end= m_rows_buf + data_size;
1397 m_rows_cur= m_rows_end;
1398 memcpy(m_rows_buf, ptr_rows_data, data_size);
1399 }
1400 else
1401 m_cols.bitmap= 0; // to not free it
1402
1403 DBUG_VOID_RETURN;
1404 }
1405
1406
~Old_rows_log_event()1407 Old_rows_log_event::~Old_rows_log_event()
1408 {
1409 if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1410 m_cols.bitmap= 0; // so no my_free in bitmap_free
1411 bitmap_free(&m_cols); // To pair with bitmap_init().
1412 my_free(m_rows_buf);
1413 }
1414
1415
get_data_size()1416 int Old_rows_log_event::get_data_size()
1417 {
1418 uchar buf[sizeof(m_width)+1];
1419 uchar *end= net_store_length(buf, (m_width + 7) / 8);
1420
1421 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1422 return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
1423 (m_rows_cur - m_rows_buf););
1424 int data_size= ROWS_HEADER_LEN;
1425 data_size+= no_bytes_in_map(&m_cols);
1426 data_size+= (uint) (end - buf);
1427
1428 data_size+= (uint) (m_rows_cur - m_rows_buf);
1429 return data_size;
1430 }
1431
1432
1433 #ifndef MYSQL_CLIENT
do_add_row_data(uchar * row_data,size_t length)1434 int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1435 {
1436 /*
1437 When the table has a primary key, we would probably want, by default, to
1438 log only the primary key value instead of the entire "before image". This
1439 would save binlog space. TODO
1440 */
1441 DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1442 DBUG_PRINT("enter", ("row_data: 0x%lx length: %lu", (ulong) row_data,
1443 (ulong) length));
1444 /*
1445 Don't print debug messages when running valgrind since they can
1446 trigger false warnings.
1447 */
1448 #ifndef HAVE_purify
1449 DBUG_DUMP("row_data", row_data, min<size_t>(length, 32));
1450 #endif
1451
1452 DBUG_ASSERT(m_rows_buf <= m_rows_cur);
1453 DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1454 DBUG_ASSERT(m_rows_cur <= m_rows_end);
1455
1456 /* The cast will always work since m_rows_cur <= m_rows_end */
1457 if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1458 {
1459 size_t const block_size= 1024;
1460 my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1461 my_ptrdiff_t const new_alloc=
1462 block_size * ((cur_size + length + block_size - 1) / block_size);
1463
1464 uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
1465 MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1466 if (unlikely(!new_buf))
1467 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1468
1469 /* If the memory moved, we need to move the pointers */
1470 if (new_buf != m_rows_buf)
1471 {
1472 m_rows_buf= new_buf;
1473 m_rows_cur= m_rows_buf + cur_size;
1474 }
1475
1476 /*
1477 The end pointer should always be changed to point to the end of
1478 the allocated memory.
1479 */
1480 m_rows_end= m_rows_buf + new_alloc;
1481 }
1482
1483 DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
1484 memcpy(m_rows_cur, row_data, length);
1485 m_rows_cur+= length;
1486 m_row_count++;
1487 DBUG_RETURN(0);
1488 }
1489 #endif
1490
1491
1492 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
do_apply_event(Relay_log_info const * rli)1493 int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
1494 {
1495 DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1496 int error= 0;
1497
1498 /*
1499 If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
1500 does not contain any data. In that case, we just remove all tables in the
1501 tables_to_lock list, close the thread tables, and return with
1502 success.
1503 */
1504 if ((m_table_id.id() == ~0U || m_table_id.id() == (~0ULL >> 16)) &&
1505 m_cols.n_bits == 1 && m_cols.bitmap[0] == 0)
1506 {
1507 /*
1508 This one is supposed to be set: just an extra check so that
1509 nothing strange has happened.
1510 */
1511 DBUG_ASSERT(get_flags(STMT_END_F));
1512
1513 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1514 thd->clear_error();
1515 DBUG_RETURN(0);
1516 }
1517
1518 /*
1519 'thd' has been set by exec_relay_log_event(), just before calling
1520 do_apply_event(). We still check here to prevent future coding
1521 errors.
1522 */
1523 DBUG_ASSERT(rli->info_thd == thd);
1524
1525 /*
1526 If there is no locks taken, this is the first binrow event seen
1527 after the table map events. We should then lock all the tables
1528 used in the transaction and proceed with execution of the actual
1529 event.
1530 */
1531 if (!thd->lock)
1532 {
1533 /*
1534 lock_tables() reads the contents of thd->lex, so they must be
1535 initialized. Contrary to in
1536 Table_map_log_event::do_apply_event() we don't call
1537 mysql_init_query() as that may reset the binlog format.
1538 */
1539 lex_start(thd);
1540
1541 if ((error= lock_tables(thd, rli->tables_to_lock,
1542 rli->tables_to_lock_count, 0)))
1543 {
1544 if (thd->is_slave_error || thd->is_fatal_error)
1545 {
1546 /*
1547 Error reporting borrowed from Query_log_event with many excessive
1548 simplifications (we don't honour --slave-skip-errors)
1549 */
1550 uint actual_error= thd->net.last_errno;
1551 rli->report(ERROR_LEVEL, actual_error,
1552 "Error '%s' in %s event: when locking tables",
1553 (actual_error ? thd->net.last_error :
1554 "unexpected success or fatal error"),
1555 get_type_str());
1556 thd->is_fatal_error= 1;
1557 }
1558 else
1559 {
1560 rli->report(ERROR_LEVEL, error,
1561 "Error in %s event: when locking tables",
1562 get_type_str());
1563 }
1564 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1565 DBUG_RETURN(error);
1566 }
1567
1568 /*
1569 When the open and locking succeeded, we check all tables to
1570 ensure that they still have the correct type.
1571 */
1572
1573 {
1574 TABLE_LIST *table_list_ptr= rli->tables_to_lock;
1575 for (uint i=0; table_list_ptr&& (i< rli->tables_to_lock_count);
1576 table_list_ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr->next_global), i++)
1577 {
1578 /*
1579 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
1580 function for the explanation of the below if condition
1581 */
1582 if (table_list_ptr->parent_l)
1583 continue;
1584 /*
1585 We can use a down cast here since we know that every table added
1586 to the tables_to_lock is a RPL_TABLE_LIST (or child table which is
1587 skipped above).
1588 */
1589 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
1590 TABLE *conv_table;
1591 if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
1592 ptr->table, &conv_table))
1593 {
1594 thd->is_slave_error= 1;
1595 const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1596 DBUG_RETURN(ERR_BAD_TABLE_DEF);
1597 }
1598 ptr->m_conv_table= conv_table;
1599 }
1600 }
1601
1602 /*
1603 ... and then we add all the tables to the table map but keep
1604 them in the tables to lock list.
1605
1606
1607 We also invalidate the query cache for all the tables, since
1608 they will now be changed.
1609
1610 TODO [/Matz]: Maybe the query cache should not be invalidated
1611 here? It might be that a table is not changed, even though it
1612 was locked for the statement. We do know that each
1613 Old_rows_log_event contain at least one row, so after processing one
1614 Old_rows_log_event, we can invalidate the query cache for the
1615 associated table.
1616 */
1617 for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
1618 {
1619 const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
1620 }
1621 #ifdef HAVE_QUERY_CACHE
1622 query_cache.invalidate_locked_for_write(rli->tables_to_lock);
1623 #endif
1624 }
1625
1626 TABLE*
1627 table=
1628 m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
1629
1630 if (table)
1631 {
1632 /*
1633 table == NULL means that this table should not be replicated
1634 (this was set up by Table_map_log_event::do_apply_event()
1635 which tested replicate-* rules).
1636 */
1637
1638 /*
1639 It's not needed to set_time() but
1640 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1641 much slave is behind
1642 2) it will be needed when we allow replication from a table with no
1643 TIMESTAMP column to a table with one.
1644 So we call set_time(), like in SBR. Presently it changes nothing.
1645 */
1646 thd->set_time(&when);
1647 /*
1648 There are a few flags that are replicated with each row event.
1649 Make sure to set/clear them before executing the main body of
1650 the event.
1651 */
1652 if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1653 thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1654 else
1655 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1656
1657 if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1658 thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1659 else
1660 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1661 /* A small test to verify that objects have consistent types */
1662 DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1663
1664 /*
1665 Now we are in a statement and will stay in a statement until we
1666 see a STMT_END_F.
1667
1668 We set this flag here, before actually applying any rows, in
1669 case the SQL thread is stopped and we need to detect that we're
1670 inside a statement and halting abruptly might cause problems
1671 when restarting.
1672 */
1673 const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
1674
1675 if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1676 set_flags(COMPLETE_ROWS_F);
1677
1678 /*
1679 Set tables write and read sets.
1680
1681 Read_set contains all slave columns (in case we are going to fetch
1682 a complete record from slave)
1683
1684 Write_set equals the m_cols bitmap sent from master but it can be
1685 longer if slave has extra columns.
1686 */
1687
1688 DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1689
1690 bitmap_set_all(table->read_set);
1691 bitmap_set_all(table->write_set);
1692 if (!get_flags(COMPLETE_ROWS_F))
1693 bitmap_intersect(table->write_set,&m_cols);
1694
1695 // Do event specific preparations
1696
1697 error= do_before_row_operations(rli);
1698
1699 // row processing loop
1700
1701 while (error == 0 && m_curr_row < m_rows_end)
1702 {
1703 /* in_use can have been set to NULL in close_tables_for_reopen */
1704 THD* old_thd= table->in_use;
1705 if (!table->in_use)
1706 table->in_use= thd;
1707
1708 error= do_exec_row(rli);
1709
1710 DBUG_PRINT("info", ("error: %d", error));
1711 DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
1712
1713 table->in_use = old_thd;
1714 switch (error)
1715 {
1716 case 0:
1717 break;
1718
1719 /* Some recoverable errors */
1720 case HA_ERR_RECORD_CHANGED:
1721 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
1722 tuple does not exist */
1723 error= 0;
1724 break;
1725
1726 default:
1727 rli->report(ERROR_LEVEL, thd->net.last_errno,
1728 "Error in %s event: row application failed. %s",
1729 get_type_str(),
1730 thd->net.last_error);
1731 thd->is_slave_error= 1;
1732 break;
1733 }
1734
1735 /*
1736 If m_curr_row_end was not set during event execution (e.g., because
1737 of errors) we can't proceed to the next row. If the error is transient
1738 (i.e., error==0 at this point) we must call unpack_current_row() to set
1739 m_curr_row_end.
1740 */
1741
1742 DBUG_PRINT("info", ("error: %d", error));
1743 DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
1744 (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
1745
1746 if (!m_curr_row_end && !error)
1747 unpack_current_row(rli);
1748
1749 // at this moment m_curr_row_end should be set
1750 DBUG_ASSERT(error || m_curr_row_end != NULL);
1751 DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
1752 DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
1753
1754 m_curr_row= m_curr_row_end;
1755
1756 } // row processing loop
1757
1758 DBUG_EXECUTE_IF("stop_slave_middle_group",
1759 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1760 error= do_after_row_operations(rli, error);
1761 } // if (table)
1762
1763 if (error)
1764 { /* error has occured during the transaction */
1765 rli->report(ERROR_LEVEL, thd->net.last_errno,
1766 "Error in %s event: error during transaction execution "
1767 "on table %s.%s. %s",
1768 get_type_str(), table->s->db.str,
1769 table->s->table_name.str,
1770 thd->net.last_error);
1771
1772 /*
1773 If one day we honour --skip-slave-errors in row-based replication, and
1774 the error should be skipped, then we would clear mappings, rollback,
1775 close tables, but the slave SQL thread would not stop and then may
1776 assume the mapping is still available, the tables are still open...
1777 So then we should clear mappings/rollback/close here only if this is a
1778 STMT_END_F.
1779 For now we code, knowing that error is not skippable and so slave SQL
1780 thread is certainly going to stop.
1781 rollback at the caller along with sbr.
1782 */
1783 thd->reset_current_stmt_binlog_format_row();
1784 const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
1785 thd->is_slave_error= 1;
1786 DBUG_RETURN(error);
1787 }
1788
1789 /*
1790 This code would ideally be placed in do_update_pos() instead, but
1791 since we have no access to table there, we do the setting of
1792 last_event_start_time here instead.
1793 */
1794 if (table && (table->s->primary_key == MAX_KEY) &&
1795 !is_using_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1796 {
1797 /*
1798 ------------ Temporary fix until WL#2975 is implemented ---------
1799
1800 This event is not the last one (no STMT_END_F). If we stop now
1801 (in case of terminate_slave_thread()), how will we restart? We
1802 have to restart from Table_map_log_event, but as this table is
1803 not transactional, the rows already inserted will still be
1804 present, and idempotency is not guaranteed (no PK) so we risk
1805 that repeating leads to double insert. So we desperately try to
1806 continue, hope we'll eventually leave this buggy situation (by
1807 executing the final Old_rows_log_event). If we are in a hopeless
1808 wait (reached end of last relay log and nothing gets appended
1809 there), we timeout after one minute, and notify DBA about the
1810 problem. When WL#2975 is implemented, just remove the member
1811 Relay_log_info::last_event_start_time and all its occurrences.
1812 */
1813 const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
1814 }
1815
1816 if (get_flags(STMT_END_F))
1817 {
1818 /*
1819 This is the end of a statement or transaction, so close (and
1820 unlock) the tables we opened when processing the
1821 Table_map_log_event starting the statement.
1822
1823 OBSERVER. This will clear *all* mappings, not only those that
1824 are open for the table. There is not good handle for on-close
1825 actions for tables.
1826
1827 NOTE. Even if we have no table ('table' == 0) we still need to be
1828 here, so that we increase the group relay log position. If we didn't, we
1829 could have a group relay log position which lags behind "forever"
1830 (assume the last master's transaction is ignored by the slave because of
1831 replicate-ignore rules).
1832 */
1833 int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1834
1835 /*
1836 If this event is not in a transaction, the call below will, if some
1837 transactional storage engines are involved, commit the statement into
1838 them and flush the pending event to binlog.
1839 If this event is in a transaction, the call will do nothing, but a
1840 Xid_log_event will come next which will, if some transactional engines
1841 are involved, commit the transaction and flush the pending event to the
1842 binlog.
1843 If there was a deadlock the transaction should have been rolled back
1844 already. So there should be no need to rollback the transaction.
1845 */
1846 DBUG_ASSERT(! thd->transaction_rollback_request);
1847 if ((error= (binlog_error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd))))
1848 rli->report(ERROR_LEVEL, error,
1849 "Error in %s event: commit of row events failed, "
1850 "table `%s`.`%s`",
1851 get_type_str(), m_table->s->db.str,
1852 m_table->s->table_name.str);
1853 error|= binlog_error;
1854
1855 /*
1856 Now what if this is not a transactional engine? we still need to
1857 flush the pending event to the binlog; we did it with
1858 thd->binlog_flush_pending_rows_event(). Note that we imitate
1859 what is done for real queries: a call to
1860 ha_autocommit_or_rollback() (sometimes only if involves a
1861 transactional engine), and a call to be sure to have the pending
1862 event flushed.
1863 */
1864
1865 thd->reset_current_stmt_binlog_format_row();
1866 const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
1867 }
1868
1869 DBUG_RETURN(error);
1870 }
1871
1872
1873 Log_event::enum_skip_reason
do_shall_skip(Relay_log_info * rli)1874 Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
1875 {
1876 /*
1877 If the slave skip counter is 1 and this event does not end a
1878 statement, then we should not start executing on the next event.
1879 Otherwise, we defer the decision to the normal skipping logic.
1880 */
1881 if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1882 return Log_event::EVENT_SKIP_IGNORE;
1883 else
1884 return Log_event::do_shall_skip(rli);
1885 }
1886
1887 int
do_update_pos(Relay_log_info * rli)1888 Old_rows_log_event::do_update_pos(Relay_log_info *rli)
1889 {
1890 DBUG_ENTER("Old_rows_log_event::do_update_pos");
1891 int error= 0;
1892
1893 DBUG_PRINT("info", ("flags: %s",
1894 get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1895
1896 if (get_flags(STMT_END_F))
1897 {
1898 /*
1899 Indicate that a statement is finished.
1900 Step the group log position if we are not in a transaction,
1901 otherwise increase the event log position.
1902 */
1903 rli->stmt_done(log_pos);
1904 /*
1905 Clear any errors in thd->net.last_err*. It is not known if this is
1906 needed or not. It is believed that any errors that may exist in
1907 thd->net.last_err* are allowed. Examples of errors are "key not
1908 found", which is produced in the test case rpl_row_conflicts.test
1909 */
1910 thd->clear_error();
1911 }
1912 else
1913 {
1914 rli->inc_event_relay_log_pos();
1915 }
1916
1917 DBUG_RETURN(error);
1918 }
1919
1920 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1921
1922
1923 #ifndef MYSQL_CLIENT
write_data_header(IO_CACHE * file)1924 bool Old_rows_log_event::write_data_header(IO_CACHE *file)
1925 {
1926 // This method should not be reached.
1927 assert(0);
1928 return TRUE;
1929 }
1930
1931
write_data_body(IO_CACHE * file)1932 bool Old_rows_log_event::write_data_body(IO_CACHE*file)
1933 {
1934 /*
1935 Note that this should be the number of *bits*, not the number of
1936 bytes.
1937 */
1938 uchar sbuf[sizeof(m_width)];
1939 my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1940
1941 // This method should not be reached.
1942 assert(0);
1943
1944 bool res= false;
1945 uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1946 DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1947
1948 DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1949 res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
1950
1951 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1952 res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
1953 no_bytes_in_map(&m_cols));
1954 DBUG_DUMP("rows", m_rows_buf, data_size);
1955 res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
1956
1957 return res;
1958
1959 }
1960 #endif
1961
1962
1963 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
pack_info(Protocol * protocol)1964 int Old_rows_log_event::pack_info(Protocol *protocol)
1965 {
1966 char buf[256];
1967 char const *const flagstr=
1968 get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1969 size_t bytes= my_snprintf(buf, sizeof(buf),
1970 "table_id: %llu%s", m_table_id.id(), flagstr);
1971 protocol->store(buf, bytes, &my_charset_bin);
1972 return 0;
1973 }
1974 #endif
1975
1976
1977 #ifdef MYSQL_CLIENT
print_helper(FILE * file,PRINT_EVENT_INFO * print_event_info,char const * const name)1978 void Old_rows_log_event::print_helper(FILE *file,
1979 PRINT_EVENT_INFO *print_event_info,
1980 char const *const name)
1981 {
1982 IO_CACHE *const head= &print_event_info->head_cache;
1983 IO_CACHE *const body= &print_event_info->body_cache;
1984 if (!print_event_info->short_form)
1985 {
1986 bool const last_stmt_event= get_flags(STMT_END_F);
1987 print_header(head, print_event_info, !last_stmt_event);
1988 my_b_printf(head, "\t%s: table id %llu%s\n",
1989 name, m_table_id.id(),
1990 last_stmt_event ? " flags: STMT_END_F" : "");
1991 print_base64(body, print_event_info, !last_stmt_event);
1992 }
1993 }
1994 #endif
1995
1996
1997 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1998 /**
1999 Write the current row into event's table.
2000
2001 The row is located in the row buffer, pointed by @c m_curr_row member.
2002 Number of columns of the row is stored in @c m_width member (it can be
2003 different from the number of columns in the table to which we insert).
2004 Bitmap @c m_cols indicates which columns are present in the row. It is assumed
2005 that event's table is already open and pointed by @c m_table.
2006
2007 If the same record already exists in the table it can be either overwritten
2008 or an error is reported depending on the value of @c overwrite flag
2009 (error reporting not yet implemented). Note that the matching record can be
2010 different from the row we insert if we use primary keys to identify records in
2011 the table.
2012
2013 The row to be inserted can contain values only for selected columns. The
2014 missing columns are filled with default values using @c prepare_record()
2015 function. If a matching record is found in the table and @c overwritte is
2016 true, the missing columns are taken from it.
2017
2018 @param rli Relay log info (needed for row unpacking).
2019 @param overwrite
2020 Shall we overwrite if the row already exists or signal
2021 error (currently ignored).
2022
2023 @returns Error code on failure, 0 on success.
2024
2025 This method, if successful, sets @c m_curr_row_end pointer to point at the
2026 next row in the rows buffer. This is done when unpacking the row to be
2027 inserted.
2028
2029 @note If a matching record is found, it is either updated using
2030 @c ha_update_row() or first deleted and then new record written.
2031 */
2032
2033 int
write_row(const Relay_log_info * const rli,const bool overwrite)2034 Old_rows_log_event::write_row(const Relay_log_info *const rli,
2035 const bool overwrite)
2036 {
2037 DBUG_ENTER("write_row");
2038 DBUG_ASSERT(m_table != NULL && thd != NULL);
2039
2040 TABLE *table= m_table; // pointer to event's table
2041 int error;
2042 int keynum;
2043 auto_afree_ptr<char> key(NULL);
2044
2045 /* fill table->record[0] with default values */
2046
2047 if ((error= prepare_record(table, table->write_set,
2048 TRUE /* check if columns have def. values */)))
2049 DBUG_RETURN(error);
2050
2051 /* unpack row into table->record[0] */
2052 error= unpack_current_row(rli); // TODO: how to handle errors?
2053
2054 #ifndef DBUG_OFF
2055 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2056 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
2057 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
2058 #endif
2059
2060 /*
2061 Try to write record. If a corresponding record already exists in the table,
2062 we try to change it using ha_update_row() if possible. Otherwise we delete
2063 it and repeat the whole process again.
2064
2065 TODO: Add safety measures against infinite looping.
2066 */
2067
2068 while ((error= table->file->ha_write_row(table->record[0])))
2069 {
2070 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
2071 {
2072 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
2073 DBUG_RETURN(error);
2074 }
2075 if ((keynum= table->file->get_dup_key(error)) < 0)
2076 {
2077 DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
2078 table->file->print_error(error, MYF(0));
2079 /*
2080 We failed to retrieve the duplicate key
2081 - either because the error was not "duplicate key" error
2082 - or because the information which key is not available
2083 */
2084 DBUG_RETURN(error);
2085 }
2086
2087 /*
2088 We need to retrieve the old row into record[1] to be able to
2089 either update or delete the offending record. We either:
2090
2091 - use ha_rnd_pos() with a row-id (available as dupp_row) to the
2092 offending row, if that is possible (MyISAM and Blackhole), or else
2093
2094 - use ha_index_read_idx_map() with the key that is duplicated, to
2095 retrieve the offending row.
2096 */
2097 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
2098 {
2099 DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
2100 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
2101 if (error)
2102 {
2103 DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
2104 if (error == HA_ERR_RECORD_DELETED)
2105 error= HA_ERR_KEY_NOT_FOUND;
2106 table->file->print_error(error, MYF(0));
2107 DBUG_RETURN(error);
2108 }
2109 }
2110 else
2111 {
2112 DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
2113
2114 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2115 {
2116 DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
2117 DBUG_RETURN(my_errno);
2118 }
2119
2120 if (key.get() == NULL)
2121 {
2122 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
2123 if (key.get() == NULL)
2124 {
2125 DBUG_PRINT("info",("Can't allocate key buffer"));
2126 DBUG_RETURN(ENOMEM);
2127 }
2128 }
2129
2130 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
2131 0);
2132 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2133 (const uchar*)key.get(),
2134 HA_WHOLE_KEY,
2135 HA_READ_KEY_EXACT);
2136 if (error)
2137 {
2138 DBUG_PRINT("info",("ha_index_read_idx_map() returns error %d", error));
2139 if (error == HA_ERR_RECORD_DELETED)
2140 error= HA_ERR_KEY_NOT_FOUND;
2141 table->file->print_error(error, MYF(0));
2142 DBUG_RETURN(error);
2143 }
2144 }
2145
2146 /*
2147 Now, record[1] should contain the offending row. That
2148 will enable us to update it or, alternatively, delete it (so
2149 that we can insert the new row afterwards).
2150 */
2151
2152 /*
2153 If row is incomplete we will use the record found to fill
2154 missing columns.
2155 */
2156 if (!get_flags(COMPLETE_ROWS_F))
2157 {
2158 restore_record(table,record[1]);
2159 error= unpack_current_row(rli);
2160 }
2161
2162 #ifndef DBUG_OFF
2163 DBUG_PRINT("debug",("preparing for update: before and after image"));
2164 DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2165 DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2166 #endif
2167
2168 /*
2169 REPLACE is defined as either INSERT or DELETE + INSERT. If
2170 possible, we can replace it with an UPDATE, but that will not
2171 work on InnoDB if FOREIGN KEY checks are necessary.
2172
2173 I (Matz) am not sure of the reason for the last_uniq_key()
2174 check as, but I'm guessing that it's something along the
2175 following lines.
2176
2177 Suppose that we got the duplicate key to be a key that is not
2178 the last unique key for the table and we perform an update:
2179 then there might be another key for which the unique check will
2180 fail, so we're better off just deleting the row and inserting
2181 the correct row.
2182 */
2183 if (last_uniq_key(table, keynum) &&
2184 !table->file->referenced_by_foreign_key())
2185 {
2186 DBUG_PRINT("info",("Updating row using ha_update_row()"));
2187 error=table->file->ha_update_row(table->record[1],
2188 table->record[0]);
2189 switch (error) {
2190
2191 case HA_ERR_RECORD_IS_THE_SAME:
2192 DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2193 " ha_update_row()"));
2194 error= 0;
2195
2196 case 0:
2197 break;
2198
2199 default:
2200 DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2201 table->file->print_error(error, MYF(0));
2202 }
2203
2204 DBUG_RETURN(error);
2205 }
2206 else
2207 {
2208 DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2209 if ((error= table->file->ha_delete_row(table->record[1])))
2210 {
2211 DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2212 table->file->print_error(error, MYF(0));
2213 DBUG_RETURN(error);
2214 }
2215 /* Will retry ha_write_row() with the offending row removed. */
2216 }
2217 }
2218
2219 DBUG_RETURN(error);
2220 }
2221
2222
2223 /**
2224 Locate the current row in event's table.
2225
2226 The current row is pointed by @c m_curr_row. Member @c m_width tells how many
2227 columns are there in the row (this can be differnet from the number of columns
2228 in the table). It is assumed that event's table is already open and pointed
2229 by @c m_table.
2230
2231 If a corresponding record is found in the table it is stored in
2232 @c m_table->record[0]. Note that when record is located based on a primary
2233 key, it is possible that the record found differs from the row being located.
2234
2235 If no key is specified or table does not have keys, a table scan is used to
2236 find the row. In that case the row should be complete and contain values for
2237 all columns. However, it can still be shorter than the table, i.e. the table
2238 can contain extra columns not present in the row. It is also possible that
2239 the table has fewer columns than the row being located.
2240
2241 @returns Error code on failure, 0 on success.
2242
2243 @post In case of success @c m_table->record[0] contains the record found.
2244 Also, the internal "cursor" of the table is positioned at the record found.
2245
2246 @note If the engine allows random access of the records, a combination of
2247 @c position() and @c rnd_pos() will be used.
2248 */
2249
find_row(const Relay_log_info * rli)2250 int Old_rows_log_event::find_row(const Relay_log_info *rli)
2251 {
2252 DBUG_ENTER("find_row");
2253
2254 DBUG_ASSERT(m_table && m_table->in_use != NULL);
2255
2256 TABLE *table= m_table;
2257 int error;
2258
2259 /* unpack row - missing fields get default values */
2260
2261 // TODO: shall we check and report errors here?
2262 prepare_record(table, table->read_set, FALSE /* don't check errors */);
2263 error= unpack_current_row(rli);
2264
2265 #ifndef DBUG_OFF
2266 DBUG_PRINT("info",("looking for the following record"));
2267 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2268 #endif
2269
2270 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2271 table->s->primary_key < MAX_KEY)
2272 {
2273 /*
2274 Use a more efficient method to fetch the record given by
2275 table->record[0] if the engine allows it. We first compute a
2276 row reference using the position() member function (it will be
2277 stored in table->file->ref) and the use rnd_pos() to position
2278 the "cursor" (i.e., record[0] in this case) at the correct row.
2279
2280 TODO: Add a check that the correct record has been fetched by
2281 comparing with the original record. Take into account that the
2282 record on the master and slave can be of different
2283 length. Something along these lines should work:
2284
2285 ADD>>> store_record(table,record[1]);
2286 int error= table->file->rnd_pos(table->record[0], table->file->ref);
2287 ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
2288 table->s->reclength) == 0);
2289
2290 */
2291 DBUG_PRINT("info",("locating record using primary key (position)"));
2292 int error= table->file->rnd_pos_by_record(table->record[0]);
2293 if (error)
2294 {
2295 DBUG_PRINT("info",("rnd_pos returns error %d",error));
2296 if (error == HA_ERR_RECORD_DELETED)
2297 error= HA_ERR_KEY_NOT_FOUND;
2298 table->file->print_error(error, MYF(0));
2299 }
2300 DBUG_RETURN(error);
2301 }
2302
2303 // We can't use position() - try other methods.
2304
2305 /*
2306 We need to retrieve all fields
2307 TODO: Move this out from this function to main loop
2308 */
2309 table->use_all_columns();
2310
2311 /*
2312 Save copy of the record in table->record[1]. It might be needed
2313 later if linear search is used to find exact match.
2314 */
2315 store_record(table,record[1]);
2316
2317 if (table->s->keys > 0)
2318 {
2319 DBUG_PRINT("info",("locating record using primary key (index_read)"));
2320
2321 /* We have a key: search the table using the index */
2322 if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
2323 {
2324 DBUG_PRINT("info",("ha_index_init returns error %d",error));
2325 table->file->print_error(error, MYF(0));
2326 DBUG_RETURN(error);
2327 }
2328
2329 /* Fill key data for the row */
2330
2331 DBUG_ASSERT(m_key);
2332 key_copy(m_key, table->record[0], table->key_info, 0);
2333
2334 /*
2335 Don't print debug messages when running valgrind since they can
2336 trigger false warnings.
2337 */
2338 #ifndef HAVE_purify
2339 DBUG_DUMP("key data", m_key, table->key_info->key_length);
2340 #endif
2341
2342 /*
2343 We need to set the null bytes to ensure that the filler bit are
2344 all set when returning. There are storage engines that just set
2345 the necessary bits on the bytes and don't set the filler bits
2346 correctly.
2347 */
2348 my_ptrdiff_t const pos=
2349 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2350 table->record[0][pos]= 0xFF;
2351
2352 if ((error= table->file->ha_index_read_map(table->record[0], m_key,
2353 HA_WHOLE_KEY,
2354 HA_READ_KEY_EXACT)))
2355 {
2356 DBUG_PRINT("info",("no record matching the key found in the table"));
2357 if (error == HA_ERR_RECORD_DELETED)
2358 error= HA_ERR_KEY_NOT_FOUND;
2359 table->file->print_error(error, MYF(0));
2360 table->file->ha_index_end();
2361 DBUG_RETURN(error);
2362 }
2363
2364 /*
2365 Don't print debug messages when running valgrind since they can
2366 trigger false warnings.
2367 */
2368 #ifndef HAVE_purify
2369 DBUG_PRINT("info",("found first matching record"));
2370 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2371 #endif
2372 /*
2373 Below is a minor "optimization". If the key (i.e., key number
2374 0) has the HA_NOSAME flag set, we know that we have found the
2375 correct record (since there can be no duplicates); otherwise, we
2376 have to compare the record with the one found to see if it is
2377 the correct one.
2378
2379 CAVEAT! This behaviour is essential for the replication of,
2380 e.g., the mysql.proc table since the correct record *shall* be
2381 found using the primary key *only*. There shall be no
2382 comparison of non-PK columns to decide if the correct record is
2383 found. I can see no scenario where it would be incorrect to
2384 chose the row to change only using a PK or an UNNI.
2385 */
2386 if (table->key_info->flags & HA_NOSAME)
2387 {
2388 /* Unique does not have non nullable part */
2389 if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2390 {
2391 table->file->ha_index_end();
2392 DBUG_RETURN(0);
2393 }
2394 else
2395 {
2396 KEY *keyinfo= table->key_info;
2397 /*
2398 Unique has nullable part. We need to check if there is any field in the
2399 BI image that is null and part of UNNI.
2400 */
2401 bool null_found= FALSE;
2402 for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2403 {
2404 uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2405 Field **f= table->field+fieldnr;
2406 null_found= (*f)->is_null();
2407 }
2408
2409 if (!null_found)
2410 {
2411 table->file->ha_index_end();
2412 DBUG_RETURN(0);
2413 }
2414
2415 /* else fall through to index scan */
2416 }
2417 }
2418
2419 /*
2420 In case key is not unique, we still have to iterate over records found
2421 and find the one which is identical to the row given. A copy of the
2422 record we are looking for is stored in record[1].
2423 */
2424 DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2425
2426 while (record_compare(table))
2427 {
2428 /*
2429 We need to set the null bytes to ensure that the filler bit
2430 are all set when returning. There are storage engines that
2431 just set the necessary bits on the bytes and don't set the
2432 filler bits correctly.
2433
2434 TODO[record format ndb]: Remove this code once NDB returns the
2435 correct record format.
2436 */
2437 if (table->s->null_bytes > 0)
2438 {
2439 table->record[0][table->s->null_bytes - 1]|=
2440 256U - (1U << table->s->last_null_bit_pos);
2441 }
2442
2443 while ((error= table->file->ha_index_next(table->record[0])))
2444 {
2445 /* We just skip records that has already been deleted */
2446 if (error == HA_ERR_RECORD_DELETED)
2447 continue;
2448 DBUG_PRINT("info",("no record matching the given row found"));
2449 table->file->print_error(error, MYF(0));
2450 (void) table->file->ha_index_end();
2451 DBUG_RETURN(error);
2452 }
2453 }
2454
2455 /*
2456 Have to restart the scan to be able to fetch the next row.
2457 */
2458 table->file->ha_index_end();
2459 }
2460 else
2461 {
2462 DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
2463
2464 int restart_count= 0; // Number of times scanning has restarted from top
2465
2466 /* We don't have a key: search the table using ha_rnd_next() */
2467 if ((error= table->file->ha_rnd_init(1)))
2468 {
2469 DBUG_PRINT("info",("error initializing table scan"
2470 " (ha_rnd_init returns %d)",error));
2471 table->file->print_error(error, MYF(0));
2472 DBUG_RETURN(error);
2473 }
2474
2475 /* Continue until we find the right record or have made a full loop */
2476 do
2477 {
2478 restart_ha_rnd_next:
2479 error= table->file->ha_rnd_next(table->record[0]);
2480
2481 switch (error) {
2482
2483 case 0:
2484 break;
2485
2486 case HA_ERR_RECORD_DELETED:
2487 goto restart_ha_rnd_next;
2488
2489 case HA_ERR_END_OF_FILE:
2490 if (++restart_count < 2)
2491 {
2492 if ((error= table->file->ha_rnd_init(1)))
2493 {
2494 table->file->print_error(error, MYF(0));
2495 DBUG_RETURN(error);
2496 }
2497 goto restart_ha_rnd_next;
2498 }
2499 break;
2500
2501 default:
2502 DBUG_PRINT("info", ("Failed to get next record"
2503 " (ha_rnd_next returns %d)",error));
2504 table->file->print_error(error, MYF(0));
2505 table->file->ha_rnd_end();
2506 DBUG_RETURN(error);
2507 }
2508 }
2509 while (restart_count < 2 && record_compare(table));
2510
2511 /*
2512 Note: above record_compare will take into accout all record fields
2513 which might be incorrect in case a partial row was given in the event
2514 */
2515
2516 /*
2517 Have to restart the scan to be able to fetch the next row.
2518 */
2519 if (restart_count == 2)
2520 DBUG_PRINT("info", ("Record not found"));
2521 else
2522 DBUG_DUMP("record found", table->record[0], table->s->reclength);
2523 table->file->ha_rnd_end();
2524
2525 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
2526 DBUG_RETURN(error);
2527 }
2528
2529 DBUG_RETURN(0);
2530 }
2531
2532 #endif
2533
2534
2535 /**************************************************************************
2536 Write_rows_log_event member functions
2537 **************************************************************************/
2538
2539 /*
2540 Constructor used to build an event for writing to the binary log.
2541 */
2542 #if !defined(MYSQL_CLIENT)
Write_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid_arg,MY_BITMAP const * cols,bool is_transactional)2543 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2544 TABLE *tbl_arg,
2545 ulong tid_arg,
2546 MY_BITMAP const *cols,
2547 bool is_transactional)
2548 : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2549 {
2550
2551 // This constructor should not be reached.
2552 assert(0);
2553
2554 }
2555 #endif
2556
2557
2558 /*
2559 Constructor used by slave to read the event from the binary log.
2560 */
2561 #ifdef HAVE_REPLICATION
Write_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2562 Write_rows_log_event_old::Write_rows_log_event_old(const char *buf,
2563 uint event_len,
2564 const Format_description_log_event
2565 *description_event)
2566 : Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT,
2567 description_event)
2568 {
2569 }
2570 #endif
2571
2572
2573 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2574 int
do_before_row_operations(const Slave_reporting_capability * const)2575 Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2576 {
2577 int error= 0;
2578
2579 /*
2580 We are using REPLACE semantics and not INSERT IGNORE semantics
2581 when writing rows, that is: new rows replace old rows. We need to
2582 inform the storage engine that it should use this behaviour.
2583 */
2584
2585 /* Tell the storage engine that we are using REPLACE semantics. */
2586 thd->lex->duplicates= DUP_REPLACE;
2587
2588 /*
2589 Pretend we're executing a REPLACE command: this is needed for
2590 InnoDB and NDB Cluster since they are not (properly) checking the
2591 lex->duplicates flag.
2592 */
2593 thd->lex->sql_command= SQLCOM_REPLACE;
2594 /*
2595 Do not raise the error flag in case of hitting to an unique attribute
2596 */
2597 m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2598 /*
2599 NDB specific: update from ndb master wrapped as Write_rows
2600 */
2601 /*
2602 so that the event should be applied to replace slave's row
2603 */
2604 m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2605 /*
2606 NDB specific: if update from ndb master wrapped as Write_rows
2607 does not find the row it's assumed idempotent binlog applying
2608 is taking place; don't raise the error.
2609 */
2610 m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2611 /*
2612 TODO: the cluster team (Tomas?) says that it's better if the engine knows
2613 how many rows are going to be inserted, then it can allocate needed memory
2614 from the start.
2615 */
2616 m_table->file->ha_start_bulk_insert(0);
2617 return error;
2618 }
2619
2620
2621 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2622 Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2623 int error)
2624 {
2625 int local_error= 0;
2626 m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2627 m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2628 /*
2629 reseting the extra with
2630 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2631 fires bug#27077
2632 todo: explain or fix
2633 */
2634 if ((local_error= m_table->file->ha_end_bulk_insert()))
2635 {
2636 m_table->file->print_error(local_error, MYF(0));
2637 }
2638 return error? error : local_error;
2639 }
2640
2641
2642 int
do_exec_row(const Relay_log_info * const rli)2643 Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2644 {
2645 DBUG_ASSERT(m_table != NULL);
2646 int error= write_row(rli, TRUE /* overwrite */);
2647
2648 if (error && !thd->net.last_errno)
2649 thd->net.last_errno= error;
2650
2651 return error;
2652 }
2653
2654 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2655
2656
2657 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2658 void Write_rows_log_event_old::print(FILE *file,
2659 PRINT_EVENT_INFO* print_event_info)
2660 {
2661 Old_rows_log_event::print_helper(file, print_event_info, "Write_rows_old");
2662 }
2663 #endif
2664
2665
2666 /**************************************************************************
2667 Delete_rows_log_event member functions
2668 **************************************************************************/
2669
2670 /*
2671 Constructor used to build an event for writing to the binary log.
2672 */
2673
2674 #ifndef MYSQL_CLIENT
Delete_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2675 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2676 TABLE *tbl_arg,
2677 ulong tid,
2678 MY_BITMAP const *cols,
2679 bool is_transactional)
2680 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2681 m_after_image(NULL), m_memory(NULL)
2682 {
2683
2684 // This constructor should not be reached.
2685 assert(0);
2686
2687 }
2688 #endif /* #if !defined(MYSQL_CLIENT) */
2689
2690
2691 /*
2692 Constructor used by slave to read the event from the binary log.
2693 */
2694 #ifdef HAVE_REPLICATION
Delete_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2695 Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf,
2696 uint event_len,
2697 const Format_description_log_event
2698 *description_event)
2699 : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT,
2700 description_event),
2701 m_after_image(NULL), m_memory(NULL)
2702 {
2703 }
2704 #endif
2705
2706
2707 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2708
2709 int
do_before_row_operations(const Slave_reporting_capability * const)2710 Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2711 {
2712 if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2713 m_table->s->primary_key < MAX_KEY)
2714 {
2715 /*
2716 We don't need to allocate any memory for m_key since it is not used.
2717 */
2718 return 0;
2719 }
2720
2721 if (m_table->s->keys > 0)
2722 {
2723 // Allocate buffer for key searches
2724 m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2725 if (!m_key)
2726 return HA_ERR_OUT_OF_MEM;
2727 }
2728 return 0;
2729 }
2730
2731
2732 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2733 Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2734 int error)
2735 {
2736 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2737 m_table->file->ha_index_or_rnd_end();
2738 my_free(m_key);
2739 m_key= NULL;
2740
2741 return error;
2742 }
2743
2744
do_exec_row(const Relay_log_info * const rli)2745 int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2746 {
2747 int error;
2748 DBUG_ASSERT(m_table != NULL);
2749
2750 if (!(error= find_row(rli)))
2751 {
2752 /*
2753 Delete the record found, located in record[0]
2754 */
2755 error= m_table->file->ha_delete_row(m_table->record[0]);
2756 }
2757 return error;
2758 }
2759
2760 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2761
2762
2763 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2764 void Delete_rows_log_event_old::print(FILE *file,
2765 PRINT_EVENT_INFO* print_event_info)
2766 {
2767 Old_rows_log_event::print_helper(file, print_event_info, "Delete_rows_old");
2768 }
2769 #endif
2770
2771
2772 /**************************************************************************
2773 Update_rows_log_event member functions
2774 **************************************************************************/
2775
2776 /*
2777 Constructor used to build an event for writing to the binary log.
2778 */
2779 #if !defined(MYSQL_CLIENT)
Update_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2780 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2781 TABLE *tbl_arg,
2782 ulong tid,
2783 MY_BITMAP const *cols,
2784 bool is_transactional)
2785 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2786 m_after_image(NULL), m_memory(NULL)
2787 {
2788
2789 // This constructor should not be reached.
2790 assert(0);
2791 }
2792 #endif /* !defined(MYSQL_CLIENT) */
2793
2794
2795 /*
2796 Constructor used by slave to read the event from the binary log.
2797 */
2798 #ifdef HAVE_REPLICATION
Update_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2799 Update_rows_log_event_old::Update_rows_log_event_old(const char *buf,
2800 uint event_len,
2801 const
2802 Format_description_log_event
2803 *description_event)
2804 : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT,
2805 description_event),
2806 m_after_image(NULL), m_memory(NULL)
2807 {
2808 }
2809 #endif
2810
2811
2812 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2813
2814 int
do_before_row_operations(const Slave_reporting_capability * const)2815 Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2816 {
2817 if (m_table->s->keys > 0)
2818 {
2819 // Allocate buffer for key searches
2820 m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2821 if (!m_key)
2822 return HA_ERR_OUT_OF_MEM;
2823 }
2824
2825 return 0;
2826 }
2827
2828
2829 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2830 Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2831 int error)
2832 {
2833 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2834 m_table->file->ha_index_or_rnd_end();
2835 my_free(m_key); // Free for multi_malloc
2836 m_key= NULL;
2837
2838 return error;
2839 }
2840
2841
2842 int
do_exec_row(const Relay_log_info * const rli)2843 Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2844 {
2845 DBUG_ASSERT(m_table != NULL);
2846
2847 int error= find_row(rli);
2848 if (error)
2849 {
2850 /*
2851 We need to read the second image in the event of error to be
2852 able to skip to the next pair of updates
2853 */
2854 m_curr_row= m_curr_row_end;
2855 unpack_current_row(rli);
2856 return error;
2857 }
2858
2859 /*
2860 This is the situation after locating BI:
2861
2862 ===|=== before image ====|=== after image ===|===
2863 ^ ^
2864 m_curr_row m_curr_row_end
2865
2866 BI found in the table is stored in record[0]. We copy it to record[1]
2867 and unpack AI to record[0].
2868 */
2869
2870 store_record(m_table,record[1]);
2871
2872 m_curr_row= m_curr_row_end;
2873 error= unpack_current_row(rli); // this also updates m_curr_row_end
2874
2875 /*
2876 Now we have the right row to update. The old row (the one we're
2877 looking for) is in record[1] and the new row is in record[0].
2878 */
2879 #ifndef HAVE_purify
2880 /*
2881 Don't print debug messages when running valgrind since they can
2882 trigger false warnings.
2883 */
2884 DBUG_PRINT("info",("Updating row in table"));
2885 DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2886 DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2887 #endif
2888
2889 error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2890 if (error == HA_ERR_RECORD_IS_THE_SAME)
2891 error= 0;
2892
2893 return error;
2894 }
2895
2896 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2897
2898
2899 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2900 void Update_rows_log_event_old::print(FILE *file,
2901 PRINT_EVENT_INFO* print_event_info)
2902 {
2903 Old_rows_log_event::print_helper(file, print_event_info, "Update_rows_old");
2904 }
2905 #endif
2906