1 /* Copyright (c) 2007, 2019, Oracle and/or its affiliates.
2 Copyright (c) 2009, 2019, MariaDB
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #ifndef MYSQL_CLIENT
20 #include "unireg.h"
21 #endif
22 #include "log_event.h"
23 #ifndef MYSQL_CLIENT
24 #include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
25 #include "sql_base.h" // close_tables_for_reopen
26 #include "key.h" // key_copy
27 #include "lock.h" // mysql_unlock_tables
28 #include "rpl_rli.h"
29 #include "rpl_utility.h"
30 #endif
31 #include "log_event_old.h"
32 #include "rpl_record_old.h"
33 #include "transaction.h"
34
35 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
36
37 // Old implementation of do_apply_event()
38 int
do_apply_event(Old_rows_log_event * ev,rpl_group_info * rgi)39 Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
40 {
41 DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
42 int error= 0;
43 THD *ev_thd= ev->thd;
44 uchar const *row_start= ev->m_rows_buf;
45 const Relay_log_info *rli= rgi->rli;
46
47 /*
48 If m_table_id == ~0UL, then we have a dummy event that does not
49 contain any data. In that case, we just remove all tables in the
50 tables_to_lock list, close the thread tables, and return with
51 success.
52 */
53 if (ev->m_table_id == ~0UL)
54 {
55 /*
56 This one is supposed to be set: just an extra check so that
57 nothing strange has happened.
58 */
59 DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
60
61 rgi->slave_close_thread_tables(ev_thd);
62 ev_thd->clear_error();
63 DBUG_RETURN(0);
64 }
65
66 /*
67 'ev_thd' has been set by exec_relay_log_event(), just before calling
68 do_apply_event(). We still check here to prevent future coding
69 errors.
70 */
71 DBUG_ASSERT(rgi->thd == ev_thd);
72
73 /*
74 If there is no locks taken, this is the first binrow event seen
75 after the table map events. We should then lock all the tables
76 used in the transaction and proceed with execution of the actual
77 event.
78 */
79 if (!ev_thd->lock)
80 {
81 /*
82 Lock_tables() reads the contents of ev_thd->lex, so they must be
83 initialized.
84
85 We also call the THD::reset_for_next_command(), since this
86 is the logical start of the next "statement". Note that this
87 call might reset the value of current_stmt_binlog_format, so
88 we need to do any changes to that value after this function.
89 */
90 delete_explain_query(thd->lex);
91 lex_start(ev_thd);
92 ev_thd->reset_for_next_command();
93
94 /*
95 This is a row injection, so we flag the "statement" as
96 such. Note that this code is called both when the slave does row
97 injections and when the BINLOG statement is used to do row
98 injections.
99 */
100 ev_thd->lex->set_stmt_row_injection();
101
102 if (unlikely(open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0)))
103 {
104 if (ev_thd->is_error())
105 {
106 /*
107 Error reporting borrowed from Query_log_event with many excessive
108 simplifications.
109 We should not honour --slave-skip-errors at this point as we are
110 having severe errors which should not be skipped.
111 */
112 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
113 "Error '%s' on opening tables",
114 ev_thd->get_stmt_da()->message());
115 ev_thd->is_slave_error= 1;
116 }
117 DBUG_RETURN(1);
118 }
119
120 /*
121 When the open and locking succeeded, we check all tables to
122 ensure that they still have the correct type.
123 */
124
125 {
126 TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
127 for (uint i=0 ; table_list_ptr&& (i< rgi->tables_to_lock_count);
128 table_list_ptr= table_list_ptr->next_global, i++)
129 {
130 /*
131 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
132 function for the explanation of the below if condition
133 */
134 if (table_list_ptr->parent_l)
135 continue;
136 /*
137 We can use a down cast here since we know that every table added
138 to the tables_to_lock is a RPL_TABLE_LIST(or child table which is
139 skipped above).
140 */
141 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
142 DBUG_ASSERT(ptr->m_tabledef_valid);
143 TABLE *conv_table;
144 if (!ptr->m_tabledef.compatible_with(thd, rgi, ptr->table, &conv_table))
145 {
146 ev_thd->is_slave_error= 1;
147 rgi->slave_close_thread_tables(ev_thd);
148 DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
149 }
150 DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
151 " - conv_table: %p",
152 ptr->table->s->db.str,
153 ptr->table->s->table_name.str, conv_table));
154 ptr->m_conv_table= conv_table;
155 }
156 }
157
158 /*
159 ... and then we add all the tables to the table map and remove
160 them from tables to lock.
161
162 We also invalidate the query cache for all the tables, since
163 they will now be changed.
164
165 TODO [/Matz]: Maybe the query cache should not be invalidated
166 here? It might be that a table is not changed, even though it
167 was locked for the statement. We do know that each
168 Old_rows_log_event contain at least one row, so after processing one
169 Old_rows_log_event, we can invalidate the query cache for the
170 associated table.
171 */
172 TABLE_LIST *ptr= rgi->tables_to_lock;
173 for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++)
174 {
175 /*
176 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
177 function for the explanation of the below if condition
178 */
179 if (ptr->parent_l)
180 continue;
181 rgi->m_table_map.set_table(ptr->table_id, ptr->table);
182 }
183 #ifdef HAVE_QUERY_CACHE
184 query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
185 #endif
186 }
187
188 TABLE* table= rgi->m_table_map.get_table(ev->m_table_id);
189
190 if (table)
191 {
192 /*
193 table == NULL means that this table should not be replicated
194 (this was set up by Table_map_log_event::do_apply_event()
195 which tested replicate-* rules).
196 */
197
198 /*
199 It's not needed to set_time() but
200 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
201 much slave is behind
202 2) it will be needed when we allow replication from a table with no
203 TIMESTAMP column to a table with one.
204 So we call set_time(), like in SBR. Presently it changes nothing.
205 */
206 ev_thd->set_time(ev->when, ev->when_sec_part);
207 /*
208 There are a few flags that are replicated with each row event.
209 Make sure to set/clear them before executing the main body of
210 the event.
211 */
212 if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
213 ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
214 else
215 ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
216
217 if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
218 ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
219 else
220 ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
221 /* A small test to verify that objects have consistent types */
222 DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
223
224 table->rpl_write_set= table->write_set;
225
226 error= do_before_row_operations(table);
227 while (error == 0 && row_start < ev->m_rows_end)
228 {
229 uchar const *row_end= NULL;
230 if (unlikely((error= do_prepare_row(ev_thd, rgi, table, row_start,
231 &row_end))))
232 break; // We should perform the after-row operation even in
233 // the case of error
234
235 DBUG_ASSERT(row_end != NULL); // cannot happen
236 DBUG_ASSERT(row_end <= ev->m_rows_end);
237
238 /* in_use can have been set to NULL in close_tables_for_reopen */
239 THD* old_thd= table->in_use;
240 if (!table->in_use)
241 table->in_use= ev_thd;
242 error= do_exec_row(table);
243 table->in_use = old_thd;
244 switch (error)
245 {
246 /* Some recoverable errors */
247 case HA_ERR_RECORD_CHANGED:
248 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
249 tuple does not exist */
250 error= 0;
251 case 0:
252 break;
253
254 default:
255 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
256 "Error in %s event: row application failed. %s",
257 ev->get_type_str(),
258 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
259 thd->is_slave_error= 1;
260 break;
261 }
262
263 row_start= row_end;
264 }
265 DBUG_EXECUTE_IF("stop_slave_middle_group",
266 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
267 error= do_after_row_operations(table, error);
268 }
269
270 if (unlikely(error))
271 { /* error has occurred during the transaction */
272 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
273 "Error in %s event: error during transaction execution "
274 "on table %s.%s. %s",
275 ev->get_type_str(), table->s->db.str,
276 table->s->table_name.str,
277 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
278
279 /*
280 If one day we honour --skip-slave-errors in row-based replication, and
281 the error should be skipped, then we would clear mappings, rollback,
282 close tables, but the slave SQL thread would not stop and then may
283 assume the mapping is still available, the tables are still open...
284 So then we should clear mappings/rollback/close here only if this is a
285 STMT_END_F.
286 For now we code, knowing that error is not skippable and so slave SQL
287 thread is certainly going to stop.
288 rollback at the caller along with sbr.
289 */
290 ev_thd->reset_current_stmt_binlog_format_row();
291 rgi->cleanup_context(ev_thd, error);
292 ev_thd->is_slave_error= 1;
293 DBUG_RETURN(error);
294 }
295
296 DBUG_RETURN(0);
297 }
298 #endif
299
300
301 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
302
303 /*
304 Check if there are more UNIQUE keys after the given key.
305 */
306 static int
last_uniq_key(TABLE * table,uint keyno)307 last_uniq_key(TABLE *table, uint keyno)
308 {
309 while (++keyno < table->s->keys)
310 if (table->key_info[keyno].flags & HA_NOSAME)
311 return 0;
312 return 1;
313 }
314
315
316 /*
317 Compares table->record[0] and table->record[1]
318
319 Returns TRUE if different.
320 */
record_compare(TABLE * table)321 static bool record_compare(TABLE *table)
322 {
323 bool result= FALSE;
324 if (table->s->blob_fields + table->s->varchar_fields == 0)
325 {
326 result= cmp_record(table,record[1]);
327 goto record_compare_exit;
328 }
329
330 /* Compare null bits */
331 if (memcmp(table->null_flags,
332 table->null_flags+table->s->rec_buff_length,
333 table->s->null_bytes))
334 {
335 result= TRUE; // Diff in NULL value
336 goto record_compare_exit;
337 }
338
339 /* Compare updated fields */
340 for (Field **ptr=table->field ; *ptr ; ptr++)
341 {
342 if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
343 {
344 result= TRUE;
345 goto record_compare_exit;
346 }
347 }
348
349 record_compare_exit:
350 return result;
351 }
352
353
354 /*
355 Copy "extra" columns from record[1] to record[0].
356
357 Copy the extra fields that are not present on the master but are
358 present on the slave from record[1] to record[0]. This is used
359 after fetching a record that are to be updated, either inside
360 replace_record() or as part of executing an update_row().
361 */
362 static int
copy_extra_record_fields(TABLE * table,size_t master_reclength,my_ptrdiff_t master_fields)363 copy_extra_record_fields(TABLE *table,
364 size_t master_reclength,
365 my_ptrdiff_t master_fields)
366 {
367 DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
368 DBUG_PRINT("info", ("Copying to %p "
369 "from field %lu at offset %lu "
370 "to field %d at offset %lu",
371 table->record[0],
372 (ulong) master_fields, (ulong) master_reclength,
373 table->s->fields, table->s->reclength));
374 /*
375 Copying the extra fields of the slave that does not exist on
376 master into record[0] (which are basically the default values).
377 */
378
379 if (table->s->fields < (uint) master_fields)
380 DBUG_RETURN(0);
381
382 DBUG_ASSERT(master_reclength <= table->s->reclength);
383 if (master_reclength < table->s->reclength)
384 memcpy(table->record[0] + master_reclength,
385 table->record[1] + master_reclength,
386 table->s->reclength - master_reclength);
387
388 /*
389 Bit columns are special. We iterate over all the remaining
390 columns and copy the "extra" bits to the new record. This is
391 not a very good solution: it should be refactored on
392 opportunity.
393
394 REFACTORING SUGGESTION (Matz). Introduce a member function
395 similar to move_field_offset() called copy_field_offset() to
396 copy field values and implement it for all Field subclasses. Use
397 this function to copy data from the found record to the record
398 that are going to be inserted.
399
400 The copy_field_offset() function need to be a virtual function,
401 which in this case will prevent copying an entire range of
402 fields efficiently.
403 */
404 {
405 Field **field_ptr= table->field + master_fields;
406 for ( ; *field_ptr ; ++field_ptr)
407 {
408 /*
409 Set the null bit according to the values in record[1]
410 */
411 if ((*field_ptr)->maybe_null() &&
412 (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
413 (*field_ptr)->set_null();
414 else
415 (*field_ptr)->set_notnull();
416
417 /*
418 Do the extra work for special columns.
419 */
420 switch ((*field_ptr)->real_type())
421 {
422 default:
423 /* Nothing to do */
424 break;
425
426 case MYSQL_TYPE_BIT:
427 Field_bit *f= static_cast<Field_bit*>(*field_ptr);
428 if (f->bit_len > 0)
429 {
430 my_ptrdiff_t const offset= table->record[1] - table->record[0];
431 uchar const bits=
432 get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
433 set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
434 }
435 break;
436 }
437 }
438 }
439 DBUG_RETURN(0); // All OK
440 }
441
442
443 /*
444 Replace the provided record in the database.
445
446 SYNOPSIS
447 replace_record()
448 thd Thread context for writing the record.
449 table Table to which record should be written.
450 master_reclength
451 Offset to first column that is not present on the master,
452 alternatively the length of the record on the master
453 side.
454
455 RETURN VALUE
456 Error code on failure, 0 on success.
457
458 DESCRIPTION
459 Similar to how it is done in mysql_insert(), we first try to do
460 a ha_write_row() and of that fails due to duplicated keys (or
461 indices), we do an ha_update_row() or a ha_delete_row() instead.
462 */
463 static int
replace_record(THD * thd,TABLE * table,ulong const master_reclength,uint const master_fields)464 replace_record(THD *thd, TABLE *table,
465 ulong const master_reclength,
466 uint const master_fields)
467 {
468 DBUG_ENTER("replace_record");
469 DBUG_ASSERT(table != NULL && thd != NULL);
470
471 int error;
472 int keynum;
473 auto_afree_ptr<char> key(NULL);
474
475 #ifndef DBUG_OFF
476 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
477 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
478 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
479 #endif
480
481 while (unlikely(error= table->file->ha_write_row(table->record[0])))
482 {
483 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
484 {
485 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
486 DBUG_RETURN(error);
487 }
488 if (unlikely((keynum= table->file->get_dup_key(error)) < 0))
489 {
490 table->file->print_error(error, MYF(0));
491 /*
492 We failed to retrieve the duplicate key
493 - either because the error was not "duplicate key" error
494 - or because the information which key is not available
495 */
496 DBUG_RETURN(error);
497 }
498
499 /*
500 We need to retrieve the old row into record[1] to be able to
501 either update or delete the offending record. We either:
502
503 - use rnd_pos() with a row-id (available as dupp_row) to the
504 offending row, if that is possible (MyISAM and Blackhole), or else
505
506 - use index_read_idx() with the key that is duplicated, to
507 retrieve the offending row.
508 */
509 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
510 {
511 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
512 if (unlikely(error))
513 {
514 DBUG_PRINT("info",("rnd_pos() returns error %d",error));
515 table->file->print_error(error, MYF(0));
516 DBUG_RETURN(error);
517 }
518 }
519 else
520 {
521 if (unlikely(table->file->extra(HA_EXTRA_FLUSH_CACHE)))
522 {
523 DBUG_RETURN(my_errno);
524 }
525
526 if (key.get() == NULL)
527 {
528 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
529 if (unlikely(key.get() == NULL))
530 DBUG_RETURN(ENOMEM);
531 }
532
533 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
534 0);
535 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
536 (const uchar*)key.get(),
537 HA_WHOLE_KEY,
538 HA_READ_KEY_EXACT);
539 if (unlikely(error))
540 {
541 DBUG_PRINT("info", ("index_read_idx() returns error %d", error));
542 table->file->print_error(error, MYF(0));
543 DBUG_RETURN(error);
544 }
545 }
546
547 /*
548 Now, table->record[1] should contain the offending row. That
549 will enable us to update it or, alternatively, delete it (so
550 that we can insert the new row afterwards).
551
552 First we copy the columns into table->record[0] that are not
553 present on the master from table->record[1], if there are any.
554 */
555 copy_extra_record_fields(table, master_reclength, master_fields);
556
557 /*
558 REPLACE is defined as either INSERT or DELETE + INSERT. If
559 possible, we can replace it with an UPDATE, but that will not
560 work on InnoDB if FOREIGN KEY checks are necessary.
561
562 I (Matz) am not sure of the reason for the last_uniq_key()
563 check as, but I'm guessing that it's something along the
564 following lines.
565
566 Suppose that we got the duplicate key to be a key that is not
567 the last unique key for the table and we perform an update:
568 then there might be another key for which the unique check will
569 fail, so we're better off just deleting the row and inserting
570 the correct row.
571 */
572 if (last_uniq_key(table, keynum) &&
573 !table->file->referenced_by_foreign_key())
574 {
575 error=table->file->ha_update_row(table->record[1],
576 table->record[0]);
577 if (unlikely(error) && error != HA_ERR_RECORD_IS_THE_SAME)
578 table->file->print_error(error, MYF(0));
579 else
580 error= 0;
581 DBUG_RETURN(error);
582 }
583 else
584 {
585 if (unlikely((error= table->file->ha_delete_row(table->record[1]))))
586 {
587 table->file->print_error(error, MYF(0));
588 DBUG_RETURN(error);
589 }
590 /* Will retry ha_write_row() with the offending row removed. */
591 }
592 }
593
594 DBUG_RETURN(error);
595 }
596
597
598 /**
599 Find the row given by 'key', if the table has keys, or else use a table scan
600 to find (and fetch) the row.
601
602 If the engine allows random access of the records, a combination of
603 position() and rnd_pos() will be used.
604
605 @param table Pointer to table to search
606 @param key Pointer to key to use for search, if table has key
607
608 @pre <code>table->record[0]</code> shall contain the row to locate
609 and <code>key</code> shall contain a key to use for searching, if
610 the engine has a key.
611
612 @post If the return value is zero, <code>table->record[1]</code>
613 will contain the fetched row and the internal "cursor" will refer to
614 the row. If the return value is non-zero,
615 <code>table->record[1]</code> is undefined. In either case,
616 <code>table->record[0]</code> is undefined.
617
618 @return Zero if the row was successfully fetched into
619 <code>table->record[1]</code>, error code otherwise.
620 */
621
find_and_fetch_row(TABLE * table,uchar * key)622 static int find_and_fetch_row(TABLE *table, uchar *key)
623 {
624 DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
625 DBUG_PRINT("enter", ("table: %p, key: %p record: %p",
626 table, key, table->record[1]));
627
628 DBUG_ASSERT(table->in_use != NULL);
629
630 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
631
632 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
633 table->s->primary_key < MAX_KEY)
634 {
635 /*
636 Use a more efficient method to fetch the record given by
637 table->record[0] if the engine allows it. We first compute a
638 row reference using the position() member function (it will be
639 stored in table->file->ref) and the use rnd_pos() to position
640 the "cursor" (i.e., record[0] in this case) at the correct row.
641
642 TODO: Add a check that the correct record has been fetched by
643 comparing with the original record. Take into account that the
644 record on the master and slave can be of different
645 length. Something along these lines should work:
646
647 ADD>>> store_record(table,record[1]);
648 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
649 ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
650 table->s->reclength) == 0);
651
652 */
653 table->file->position(table->record[0]);
654 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
655 /*
656 rnd_pos() returns the record in table->record[0], so we have to
657 move it to table->record[1].
658 */
659 memcpy(table->record[1], table->record[0], table->s->reclength);
660 DBUG_RETURN(error);
661 }
662
663 /* We need to retrieve all fields */
664 /* TODO: Move this out from this function to main loop */
665 table->use_all_columns();
666
667 if (table->s->keys > 0)
668 {
669 int error;
670 /* We have a key: search the table using the index */
671 if (!table->file->inited &&
672 unlikely(error= table->file->ha_index_init(0, FALSE)))
673 {
674 table->file->print_error(error, MYF(0));
675 DBUG_RETURN(error);
676 }
677
678 /*
679 Don't print debug messages when running valgrind since they can
680 trigger false warnings.
681 */
682 #ifndef HAVE_valgrind
683 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
684 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
685 #endif
686
687 /*
688 We need to set the null bytes to ensure that the filler bit are
689 all set when returning. There are storage engines that just set
690 the necessary bits on the bytes and don't set the filler bits
691 correctly.
692 */
693 my_ptrdiff_t const pos=
694 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
695 table->record[1][pos]= 0xFF;
696 if (unlikely((error= table->file->ha_index_read_map(table->record[1], key,
697 HA_WHOLE_KEY,
698 HA_READ_KEY_EXACT))))
699 {
700 table->file->print_error(error, MYF(0));
701 table->file->ha_index_end();
702 DBUG_RETURN(error);
703 }
704
705 /*
706 Don't print debug messages when running valgrind since they can
707 trigger false warnings.
708 */
709 #ifndef HAVE_valgrind
710 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
711 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
712 #endif
713 /*
714 Below is a minor "optimization". If the key (i.e., key number
715 0) has the HA_NOSAME flag set, we know that we have found the
716 correct record (since there can be no duplicates); otherwise, we
717 have to compare the record with the one found to see if it is
718 the correct one.
719
720 CAVEAT! This behaviour is essential for the replication of,
721 e.g., the mysql.proc table since the correct record *shall* be
722 found using the primary key *only*. There shall be no
723 comparison of non-PK columns to decide if the correct record is
724 found. I can see no scenario where it would be incorrect to
725 chose the row to change only using a PK or an UNNI.
726 */
727 if (table->key_info->flags & HA_NOSAME)
728 {
729 table->file->ha_index_end();
730 DBUG_RETURN(0);
731 }
732
733 while (record_compare(table))
734 {
735 int error;
736
737 while ((error= table->file->ha_index_next(table->record[1])))
738 {
739 table->file->print_error(error, MYF(0));
740 table->file->ha_index_end();
741 DBUG_RETURN(error);
742 }
743 }
744
745 /*
746 Have to restart the scan to be able to fetch the next row.
747 */
748 table->file->ha_index_end();
749 }
750 else
751 {
752 int restart_count= 0; // Number of times scanning has restarted from top
753 int error;
754
755 /* We don't have a key: search the table using rnd_next() */
756 if (unlikely((error= table->file->ha_rnd_init_with_error(1))))
757 return error;
758
759 /* Continue until we find the right record or have made a full loop */
760 do
761 {
762 error= table->file->ha_rnd_next(table->record[1]);
763
764 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
765 DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
766
767 switch (error) {
768 case 0:
769 break;
770
771 case HA_ERR_END_OF_FILE:
772 if (++restart_count < 2)
773 {
774 int error2;
775 if (unlikely((error2= table->file->ha_rnd_init_with_error(1))))
776 DBUG_RETURN(error2);
777 }
778 break;
779
780 default:
781 table->file->print_error(error, MYF(0));
782 DBUG_PRINT("info", ("Record not found"));
783 (void) table->file->ha_rnd_end();
784 DBUG_RETURN(error);
785 }
786 }
787 while (restart_count < 2 && record_compare(table));
788
789 /*
790 Have to restart the scan to be able to fetch the next row.
791 */
792 DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
793 table->file->ha_rnd_end();
794
795 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
796 DBUG_RETURN(error);
797 }
798
799 DBUG_RETURN(0);
800 }
801
802
803 /**********************************************************
804 Row handling primitives for Write_rows_log_event_old
805 **********************************************************/
806
do_before_row_operations(TABLE * table)807 int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
808 {
809 int error= 0;
810
811 /*
812 We are using REPLACE semantics and not INSERT IGNORE semantics
813 when writing rows, that is: new rows replace old rows. We need to
814 inform the storage engine that it should use this behaviour.
815 */
816
817 /* Tell the storage engine that we are using REPLACE semantics. */
818 thd->lex->duplicates= DUP_REPLACE;
819
820 thd->lex->sql_command= SQLCOM_REPLACE;
821 /*
822 Do not raise the error flag in case of hitting to an unique attribute
823 */
824 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
825 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
826 table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
827 table->file->ha_start_bulk_insert(0);
828 return error;
829 }
830
831
do_after_row_operations(TABLE * table,int error)832 int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
833 {
834 int local_error= 0;
835 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
836 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
837 /*
838 resetting the extra with
839 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
840 fires bug#27077
841 todo: explain or fix
842 */
843 if (unlikely((local_error= table->file->ha_end_bulk_insert())))
844 {
845 table->file->print_error(local_error, MYF(0));
846 }
847 return error? error : local_error;
848 }
849
850
851 int
do_prepare_row(THD * thd_arg,rpl_group_info * rgi,TABLE * table,uchar const * row_start,uchar const ** row_end)852 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
853 rpl_group_info *rgi,
854 TABLE *table,
855 uchar const *row_start,
856 uchar const **row_end)
857 {
858 DBUG_ASSERT(table != NULL);
859 DBUG_ASSERT(row_start && row_end);
860
861 int error;
862 error= unpack_row_old(rgi,
863 table, m_width, table->record[0],
864 row_start, m_rows_end,
865 &m_cols, row_end, &m_master_reclength,
866 table->write_set, PRE_GA_WRITE_ROWS_EVENT);
867 bitmap_copy(table->read_set, table->write_set);
868 return error;
869 }
870
871
do_exec_row(TABLE * table)872 int Write_rows_log_event_old::do_exec_row(TABLE *table)
873 {
874 DBUG_ASSERT(table != NULL);
875 int error= replace_record(thd, table, m_master_reclength, m_width);
876 return error;
877 }
878
879
880 /**********************************************************
881 Row handling primitives for Delete_rows_log_event_old
882 **********************************************************/
883
do_before_row_operations(TABLE * table)884 int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
885 {
886 DBUG_ASSERT(m_memory == NULL);
887
888 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
889 table->s->primary_key < MAX_KEY)
890 {
891 /*
892 We don't need to allocate any memory for m_after_image and
893 m_key since they are not used.
894 */
895 return 0;
896 }
897
898 int error= 0;
899
900 if (table->s->keys > 0)
901 {
902 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
903 &m_after_image,
904 (uint) table->s->reclength,
905 &m_key,
906 (uint) table->key_info->key_length,
907 NullS);
908 }
909 else
910 {
911 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
912 m_memory= (uchar*)m_after_image;
913 m_key= NULL;
914 }
915 if (!m_memory)
916 return HA_ERR_OUT_OF_MEM;
917
918 return error;
919 }
920
921
do_after_row_operations(TABLE * table,int error)922 int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
923 {
924 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
925 table->file->ha_index_or_rnd_end();
926 my_free(m_memory); // Free for multi_malloc
927 m_memory= NULL;
928 m_after_image= NULL;
929 m_key= NULL;
930
931 return error;
932 }
933
934
935 int
do_prepare_row(THD * thd_arg,rpl_group_info * rgi,TABLE * table,uchar const * row_start,uchar const ** row_end)936 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
937 rpl_group_info *rgi,
938 TABLE *table,
939 uchar const *row_start,
940 uchar const **row_end)
941 {
942 int error;
943 DBUG_ASSERT(row_start && row_end);
944 /*
945 This assertion actually checks that there is at least as many
946 columns on the slave as on the master.
947 */
948 DBUG_ASSERT(table->s->fields >= m_width);
949
950 error= unpack_row_old(rgi,
951 table, m_width, table->record[0],
952 row_start, m_rows_end,
953 &m_cols, row_end, &m_master_reclength,
954 table->read_set, PRE_GA_DELETE_ROWS_EVENT);
955 /*
956 If we will access rows using the random access method, m_key will
957 be set to NULL, so we do not need to make a key copy in that case.
958 */
959 if (m_key)
960 {
961 KEY *const key_info= table->key_info;
962
963 key_copy(m_key, table->record[0], key_info, 0);
964 }
965
966 return error;
967 }
968
969
do_exec_row(TABLE * table)970 int Delete_rows_log_event_old::do_exec_row(TABLE *table)
971 {
972 int error;
973 DBUG_ASSERT(table != NULL);
974
975 if (likely(!(error= ::find_and_fetch_row(table, m_key))))
976 {
977 /*
978 Now we should have the right row to delete. We are using
979 record[0] since it is guaranteed to point to a record with the
980 correct value.
981 */
982 error= table->file->ha_delete_row(table->record[0]);
983 }
984 return error;
985 }
986
987
988 /**********************************************************
989 Row handling primitives for Update_rows_log_event_old
990 **********************************************************/
991
do_before_row_operations(TABLE * table)992 int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
993 {
994 DBUG_ASSERT(m_memory == NULL);
995
996 int error= 0;
997
998 if (table->s->keys > 0)
999 {
1000 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1001 &m_after_image,
1002 (uint) table->s->reclength,
1003 &m_key,
1004 (uint) table->key_info->key_length,
1005 NullS);
1006 }
1007 else
1008 {
1009 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1010 m_memory= m_after_image;
1011 m_key= NULL;
1012 }
1013 if (!m_memory)
1014 return HA_ERR_OUT_OF_MEM;
1015
1016 return error;
1017 }
1018
1019
do_after_row_operations(TABLE * table,int error)1020 int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1021 {
1022 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1023 table->file->ha_index_or_rnd_end();
1024 my_free(m_memory);
1025 m_memory= NULL;
1026 m_after_image= NULL;
1027 m_key= NULL;
1028
1029 return error;
1030 }
1031
1032
do_prepare_row(THD * thd_arg,rpl_group_info * rgi,TABLE * table,uchar const * row_start,uchar const ** row_end)1033 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1034 rpl_group_info *rgi,
1035 TABLE *table,
1036 uchar const *row_start,
1037 uchar const **row_end)
1038 {
1039 int error;
1040 DBUG_ASSERT(row_start && row_end);
1041 /*
1042 This assertion actually checks that there is at least as many
1043 columns on the slave as on the master.
1044 */
1045 DBUG_ASSERT(table->s->fields >= m_width);
1046
1047 /* record[0] is the before image for the update */
1048 error= unpack_row_old(rgi,
1049 table, m_width, table->record[0],
1050 row_start, m_rows_end,
1051 &m_cols, row_end, &m_master_reclength,
1052 table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
1053 row_start = *row_end;
1054 /* m_after_image is the after image for the update */
1055 error= unpack_row_old(rgi,
1056 table, m_width, m_after_image,
1057 row_start, m_rows_end,
1058 &m_cols, row_end, &m_master_reclength,
1059 table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
1060
1061 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1062 DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1063
1064 /*
1065 If we will access rows using the random access method, m_key will
1066 be set to NULL, so we do not need to make a key copy in that case.
1067 */
1068 if (m_key)
1069 {
1070 KEY *const key_info= table->key_info;
1071
1072 key_copy(m_key, table->record[0], key_info, 0);
1073 }
1074
1075 return error;
1076 }
1077
1078
do_exec_row(TABLE * table)1079 int Update_rows_log_event_old::do_exec_row(TABLE *table)
1080 {
1081 DBUG_ASSERT(table != NULL);
1082
1083 int error= ::find_and_fetch_row(table, m_key);
1084 if (unlikely(error))
1085 return error;
1086
1087 /*
1088 We have to ensure that the new record (i.e., the after image) is
1089 in record[0] and the old record (i.e., the before image) is in
1090 record[1]. This since some storage engines require this (for
1091 example, the partition engine).
1092
1093 Since find_and_fetch_row() puts the fetched record (i.e., the old
1094 record) in record[1], we can keep it there. We put the new record
1095 (i.e., the after image) into record[0], and copy the fields that
1096 are on the slave (i.e., in record[1]) into record[0], effectively
1097 overwriting the default values that where put there by the
1098 unpack_row() function.
1099 */
1100 memcpy(table->record[0], m_after_image, table->s->reclength);
1101 copy_extra_record_fields(table, m_master_reclength, m_width);
1102
1103 /*
1104 Now we have the right row to update. The old row (the one we're
1105 looking for) is in record[1] and the new row has is in record[0].
1106 We also have copied the original values already in the slave's
1107 database into the after image delivered from the master.
1108 */
1109 error= table->file->ha_update_row(table->record[1], table->record[0]);
1110 if (unlikely(error == HA_ERR_RECORD_IS_THE_SAME))
1111 error= 0;
1112
1113 return error;
1114 }
1115
1116 #endif
1117
1118
1119 /**************************************************************************
1120 Rows_log_event member functions
1121 **************************************************************************/
1122
1123 #ifndef MYSQL_CLIENT
Old_rows_log_event(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)1124 Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1125 MY_BITMAP const *cols,
1126 bool is_transactional)
1127 : Log_event(thd_arg, 0, is_transactional),
1128 m_row_count(0),
1129 m_table(tbl_arg),
1130 m_table_id(tid),
1131 m_width(tbl_arg ? tbl_arg->s->fields : 1),
1132 m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1133 #ifdef HAVE_REPLICATION
1134 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1135 #endif
1136 {
1137
1138 // This constructor should not be reached.
1139 assert(0);
1140
1141 /*
1142 We allow a special form of dummy event when the table, and cols
1143 are null and the table id is ~0UL. This is a temporary
1144 solution, to be able to terminate a started statement in the
1145 binary log: the extraneous events will be removed in the future.
1146 */
1147 DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1148 (!tbl_arg && !cols && tid == ~0UL));
1149
1150 if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1151 set_flags(NO_FOREIGN_KEY_CHECKS_F);
1152 if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1153 set_flags(RELAXED_UNIQUE_CHECKS_F);
1154 /* if my_bitmap_init fails, caught in is_valid() */
1155 if (likely(!my_bitmap_init(&m_cols,
1156 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1157 m_width,
1158 false)))
1159 {
1160 /* Cols can be zero if this is a dummy binrows event */
1161 if (likely(cols != NULL))
1162 {
1163 memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1164 create_last_word_mask(&m_cols);
1165 }
1166 }
1167 else
1168 {
1169 // Needed because my_bitmap_init() does not set it to null on failure
1170 m_cols.bitmap= 0;
1171 }
1172 }
1173 #endif
1174
1175
Old_rows_log_event(const char * buf,uint event_len,Log_event_type event_type,const Format_description_log_event * description_event)1176 Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len,
1177 Log_event_type event_type,
1178 const Format_description_log_event
1179 *description_event)
1180 : Log_event(buf, description_event),
1181 m_row_count(0),
1182 #ifndef MYSQL_CLIENT
1183 m_table(NULL),
1184 #endif
1185 m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1186 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1187 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1188 #endif
1189 {
1190 DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1191 uint8 const common_header_len= description_event->common_header_len;
1192 uint8 const post_header_len= description_event->post_header_len[event_type-1];
1193
1194 DBUG_PRINT("enter",("event_len: %u common_header_len: %d "
1195 "post_header_len: %d",
1196 event_len, common_header_len,
1197 post_header_len));
1198
1199 const char *post_start= buf + common_header_len;
1200 DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1201 post_start+= RW_MAPID_OFFSET;
1202 if (post_header_len == 6)
1203 {
1204 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1205 m_table_id= uint4korr(post_start);
1206 post_start+= 4;
1207 }
1208 else
1209 {
1210 m_table_id= (ulong) uint6korr(post_start);
1211 post_start+= RW_FLAGS_OFFSET;
1212 }
1213
1214 m_flags= uint2korr(post_start);
1215
1216 uchar const *const var_start=
1217 (const uchar *)buf + common_header_len + post_header_len;
1218 uchar const *const ptr_width= var_start;
1219 uchar *ptr_after_width= (uchar*) ptr_width;
1220 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1221 m_width = net_field_length(&ptr_after_width);
1222 DBUG_PRINT("debug", ("m_width=%lu", m_width));
1223 /* Avoid reading out of buffer */
1224 if (ptr_after_width + m_width > (uchar *)buf + event_len)
1225 {
1226 m_cols.bitmap= NULL;
1227 DBUG_VOID_RETURN;
1228 }
1229
1230 /* if my_bitmap_init fails, caught in is_valid() */
1231 if (likely(!my_bitmap_init(&m_cols,
1232 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1233 m_width,
1234 false)))
1235 {
1236 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1237 memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1238 create_last_word_mask(&m_cols);
1239 ptr_after_width+= (m_width + 7) / 8;
1240 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1241 }
1242 else
1243 {
1244 // Needed because my_bitmap_init() does not set it to null on failure
1245 m_cols.bitmap= NULL;
1246 DBUG_VOID_RETURN;
1247 }
1248
1249 const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1250 size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1251 DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %zu",
1252 m_table_id, m_flags, m_width, data_size));
1253 DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1254
1255 m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
1256 if (likely((bool)m_rows_buf))
1257 {
1258 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1259 m_curr_row= m_rows_buf;
1260 #endif
1261 m_rows_end= m_rows_buf + data_size;
1262 m_rows_cur= m_rows_end;
1263 memcpy(m_rows_buf, ptr_rows_data, data_size);
1264 }
1265 else
1266 m_cols.bitmap= 0; // to not free it
1267
1268 DBUG_VOID_RETURN;
1269 }
1270
1271
~Old_rows_log_event()1272 Old_rows_log_event::~Old_rows_log_event()
1273 {
1274 if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1275 m_cols.bitmap= 0; // so no my_free in my_bitmap_free
1276 my_bitmap_free(&m_cols); // To pair with my_bitmap_init().
1277 my_free(m_rows_buf);
1278 }
1279
1280
get_data_size()1281 int Old_rows_log_event::get_data_size()
1282 {
1283 uchar buf[MAX_INT_WIDTH];
1284 uchar *end= net_store_length(buf, (m_width + 7) / 8);
1285
1286 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1287 return (int)(6 + no_bytes_in_map(&m_cols) + (end - buf) +
1288 m_rows_cur - m_rows_buf););
1289 int data_size= ROWS_HEADER_LEN;
1290 data_size+= no_bytes_in_map(&m_cols);
1291 data_size+= (uint) (end - buf);
1292
1293 data_size+= (uint) (m_rows_cur - m_rows_buf);
1294 return data_size;
1295 }
1296
1297
1298 #ifndef MYSQL_CLIENT
do_add_row_data(uchar * row_data,size_t length)1299 int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1300 {
1301 /*
1302 When the table has a primary key, we would probably want, by default, to
1303 log only the primary key value instead of the entire "before image". This
1304 would save binlog space. TODO
1305 */
1306 DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1307 DBUG_PRINT("enter", ("row_data: %p length: %zu",row_data,
1308 length));
1309 /*
1310 Don't print debug messages when running valgrind since they can
1311 trigger false warnings.
1312 */
1313 #ifndef HAVE_valgrind
1314 DBUG_DUMP("row_data", row_data, MY_MIN(length, 32));
1315 #endif
1316
1317 DBUG_ASSERT(m_rows_buf <= m_rows_cur);
1318 DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1319 DBUG_ASSERT(m_rows_cur <= m_rows_end);
1320
1321 /* The cast will always work since m_rows_cur <= m_rows_end */
1322 if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1323 {
1324 size_t const block_size= 1024;
1325 my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1326 my_ptrdiff_t const new_alloc=
1327 block_size * ((cur_size + length + block_size - 1) / block_size);
1328
1329 uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
1330 MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1331 if (unlikely(!new_buf))
1332 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1333
1334 /* If the memory moved, we need to move the pointers */
1335 if (new_buf != m_rows_buf)
1336 {
1337 m_rows_buf= new_buf;
1338 m_rows_cur= m_rows_buf + cur_size;
1339 }
1340
1341 /*
1342 The end pointer should always be changed to point to the end of
1343 the allocated memory.
1344 */
1345 m_rows_end= m_rows_buf + new_alloc;
1346 }
1347
1348 DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
1349 memcpy(m_rows_cur, row_data, length);
1350 m_rows_cur+= length;
1351 m_row_count++;
1352 DBUG_RETURN(0);
1353 }
1354 #endif
1355
1356
1357 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
do_apply_event(rpl_group_info * rgi)1358 int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
1359 {
1360 DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1361 int error= 0;
1362 Relay_log_info const *rli= rgi->rli;
1363
1364 /*
1365 If m_table_id == ~0UL, then we have a dummy event that does not
1366 contain any data. In that case, we just remove all tables in the
1367 tables_to_lock list, close the thread tables, and return with
1368 success.
1369 */
1370 if (m_table_id == ~0UL)
1371 {
1372 /*
1373 This one is supposed to be set: just an extra check so that
1374 nothing strange has happened.
1375 */
1376 DBUG_ASSERT(get_flags(STMT_END_F));
1377
1378 rgi->slave_close_thread_tables(thd);
1379 thd->clear_error();
1380 DBUG_RETURN(0);
1381 }
1382
1383 /*
1384 'thd' has been set by exec_relay_log_event(), just before calling
1385 do_apply_event(). We still check here to prevent future coding
1386 errors.
1387 */
1388 DBUG_ASSERT(rgi->thd == thd);
1389
1390 /*
1391 If there is no locks taken, this is the first binrow event seen
1392 after the table map events. We should then lock all the tables
1393 used in the transaction and proceed with execution of the actual
1394 event.
1395 */
1396 if (!thd->lock)
1397 {
1398 /*
1399 lock_tables() reads the contents of thd->lex, so they must be
1400 initialized. Contrary to in
1401 Table_map_log_event::do_apply_event() we don't call
1402 mysql_init_query() as that may reset the binlog format.
1403 */
1404 lex_start(thd);
1405
1406 if (unlikely((error= lock_tables(thd, rgi->tables_to_lock,
1407 rgi->tables_to_lock_count, 0))))
1408 {
1409 if (thd->is_slave_error || thd->is_fatal_error)
1410 {
1411 /*
1412 Error reporting borrowed from Query_log_event with many excessive
1413 simplifications (we don't honour --slave-skip-errors)
1414 */
1415 uint actual_error= thd->net.last_errno;
1416 rli->report(ERROR_LEVEL, actual_error, NULL,
1417 "Error '%s' in %s event: when locking tables",
1418 (actual_error ? thd->net.last_error :
1419 "unexpected success or fatal error"),
1420 get_type_str());
1421 thd->is_fatal_error= 1;
1422 }
1423 else
1424 {
1425 rli->report(ERROR_LEVEL, error, NULL,
1426 "Error in %s event: when locking tables",
1427 get_type_str());
1428 }
1429 rgi->slave_close_thread_tables(thd);
1430 DBUG_RETURN(error);
1431 }
1432
1433 /*
1434 When the open and locking succeeded, we check all tables to
1435 ensure that they still have the correct type.
1436 */
1437
1438 {
1439 TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
1440 for (uint i=0; table_list_ptr&& (i< rgi->tables_to_lock_count);
1441 table_list_ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr->next_global), i++)
1442 {
1443 /*
1444 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
1445 function for the explanation of the below if condition
1446 */
1447 if (table_list_ptr->parent_l)
1448 continue;
1449 /*
1450 We can use a down cast here since we know that every table added
1451 to the tables_to_lock is a RPL_TABLE_LIST (or child table which is
1452 skipped above).
1453 */
1454 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
1455 TABLE *conv_table;
1456 if (ptr->m_tabledef.compatible_with(thd, rgi, ptr->table, &conv_table))
1457 {
1458 thd->is_slave_error= 1;
1459 rgi->slave_close_thread_tables(thd);
1460 DBUG_RETURN(ERR_BAD_TABLE_DEF);
1461 }
1462 ptr->m_conv_table= conv_table;
1463 }
1464 }
1465
1466 /*
1467 ... and then we add all the tables to the table map but keep
1468 them in the tables to lock list.
1469
1470
1471 We also invalidate the query cache for all the tables, since
1472 they will now be changed.
1473
1474 TODO [/Matz]: Maybe the query cache should not be invalidated
1475 here? It might be that a table is not changed, even though it
1476 was locked for the statement. We do know that each
1477 Old_rows_log_event contain at least one row, so after processing one
1478 Old_rows_log_event, we can invalidate the query cache for the
1479 associated table.
1480 */
1481 for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global)
1482 {
1483 rgi->m_table_map.set_table(ptr->table_id, ptr->table);
1484 }
1485 #ifdef HAVE_QUERY_CACHE
1486 query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
1487 #endif
1488 }
1489
1490 TABLE*
1491 table=
1492 m_table= rgi->m_table_map.get_table(m_table_id);
1493
1494 if (table)
1495 {
1496 /*
1497 table == NULL means that this table should not be replicated
1498 (this was set up by Table_map_log_event::do_apply_event()
1499 which tested replicate-* rules).
1500 */
1501
1502 /*
1503 It's not needed to set_time() but
1504 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1505 much slave is behind
1506 2) it will be needed when we allow replication from a table with no
1507 TIMESTAMP column to a table with one.
1508 So we call set_time(), like in SBR. Presently it changes nothing.
1509 */
1510 thd->set_time(when, when_sec_part);
1511 /*
1512 There are a few flags that are replicated with each row event.
1513 Make sure to set/clear them before executing the main body of
1514 the event.
1515 */
1516 if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1517 thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1518 else
1519 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1520
1521 if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1522 thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1523 else
1524 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1525 /* A small test to verify that objects have consistent types */
1526 DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1527
1528 if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1529 set_flags(COMPLETE_ROWS_F);
1530
1531 /*
1532 Set tables write and read sets.
1533
1534 Read_set contains all slave columns (in case we are going to fetch
1535 a complete record from slave)
1536
1537 Write_set equals the m_cols bitmap sent from master but it can be
1538 longer if slave has extra columns.
1539 */
1540
1541 DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1542
1543 bitmap_set_all(table->read_set);
1544 bitmap_set_all(table->write_set);
1545 if (!get_flags(COMPLETE_ROWS_F))
1546 bitmap_intersect(table->write_set,&m_cols);
1547 table->rpl_write_set= table->write_set;
1548
1549 // Do event specific preparations
1550
1551 error= do_before_row_operations(rli);
1552
1553 // row processing loop
1554
1555 while (error == 0 && m_curr_row < m_rows_end)
1556 {
1557 /* in_use can have been set to NULL in close_tables_for_reopen */
1558 THD* old_thd= table->in_use;
1559 if (!table->in_use)
1560 table->in_use= thd;
1561
1562 error= do_exec_row(rgi);
1563
1564 DBUG_PRINT("info", ("error: %d", error));
1565 DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
1566
1567 table->in_use = old_thd;
1568 switch (error)
1569 {
1570 case 0:
1571 break;
1572
1573 /* Some recoverable errors */
1574 case HA_ERR_RECORD_CHANGED:
1575 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
1576 tuple does not exist */
1577 error= 0;
1578 break;
1579
1580 default:
1581 rli->report(ERROR_LEVEL, thd->net.last_errno, NULL,
1582 "Error in %s event: row application failed. %s",
1583 get_type_str(), thd->net.last_error);
1584 thd->is_slave_error= 1;
1585 break;
1586 }
1587
1588 /*
1589 If m_curr_row_end was not set during event execution (e.g., because
1590 of errors) we can't proceed to the next row. If the error is transient
1591 (i.e., error==0 at this point) we must call unpack_current_row() to set
1592 m_curr_row_end.
1593 */
1594
1595 DBUG_PRINT("info", ("error: %d", error));
1596 DBUG_PRINT("info", ("curr_row: %p; curr_row_end:%p; rows_end: %p",
1597 m_curr_row, m_curr_row_end, m_rows_end));
1598
1599 if (!m_curr_row_end && likely(!error))
1600 unpack_current_row(rgi);
1601
1602 // at this moment m_curr_row_end should be set
1603 DBUG_ASSERT(error || m_curr_row_end != NULL);
1604 DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
1605 DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
1606
1607 m_curr_row= m_curr_row_end;
1608
1609 } // row processing loop
1610
1611 DBUG_EXECUTE_IF("stop_slave_middle_group",
1612 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1613 error= do_after_row_operations(rli, error);
1614 } // if (table)
1615
1616 if (unlikely(error))
1617 { /* error has occurred during the transaction */
1618 rli->report(ERROR_LEVEL, thd->net.last_errno, NULL,
1619 "Error in %s event: error during transaction execution "
1620 "on table %s.%s. %s",
1621 get_type_str(), table->s->db.str,
1622 table->s->table_name.str,
1623 thd->net.last_error);
1624
1625 /*
1626 If one day we honour --skip-slave-errors in row-based replication, and
1627 the error should be skipped, then we would clear mappings, rollback,
1628 close tables, but the slave SQL thread would not stop and then may
1629 assume the mapping is still available, the tables are still open...
1630 So then we should clear mappings/rollback/close here only if this is a
1631 STMT_END_F.
1632 For now we code, knowing that error is not skippable and so slave SQL
1633 thread is certainly going to stop.
1634 rollback at the caller along with sbr.
1635 */
1636 thd->reset_current_stmt_binlog_format_row();
1637 rgi->cleanup_context(thd, error);
1638 thd->is_slave_error= 1;
1639 DBUG_RETURN(error);
1640 }
1641
1642 /*
1643 This code would ideally be placed in do_update_pos() instead, but
1644 since we have no access to table there, we do the setting of
1645 last_event_start_time here instead.
1646 */
1647 if (table && (table->s->primary_key == MAX_KEY) &&
1648 !use_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1649 {
1650 /*
1651 ------------ Temporary fix until WL#2975 is implemented ---------
1652
1653 This event is not the last one (no STMT_END_F). If we stop now
1654 (in case of terminate_slave_thread()), how will we restart? We
1655 have to restart from Table_map_log_event, but as this table is
1656 not transactional, the rows already inserted will still be
1657 present, and idempotency is not guaranteed (no PK) so we risk
1658 that repeating leads to double insert. So we desperately try to
1659 continue, hope we'll eventually leave this buggy situation (by
1660 executing the final Old_rows_log_event). If we are in a hopeless
1661 wait (reached end of last relay log and nothing gets appended
1662 there), we timeout after one minute, and notify DBA about the
1663 problem. When WL#2975 is implemented, just remove the member
1664 Relay_log_info::last_event_start_time and all its occurrences.
1665 */
1666 rgi->last_event_start_time= my_time(0);
1667 }
1668
1669 if (get_flags(STMT_END_F))
1670 {
1671 /*
1672 This is the end of a statement or transaction, so close (and
1673 unlock) the tables we opened when processing the
1674 Table_map_log_event starting the statement.
1675
1676 OBSERVER. This will clear *all* mappings, not only those that
1677 are open for the table. There is not good handle for on-close
1678 actions for tables.
1679
1680 NOTE. Even if we have no table ('table' == 0) we still need to be
1681 here, so that we increase the group relay log position. If we didn't, we
1682 could have a group relay log position which lags behind "forever"
1683 (assume the last master's transaction is ignored by the slave because of
1684 replicate-ignore rules).
1685 */
1686 int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1687
1688 /*
1689 If this event is not in a transaction, the call below will, if some
1690 transactional storage engines are involved, commit the statement into
1691 them and flush the pending event to binlog.
1692 If this event is in a transaction, the call will do nothing, but a
1693 Xid_log_event will come next which will, if some transactional engines
1694 are involved, commit the transaction and flush the pending event to the
1695 binlog.
1696 If there was a deadlock the transaction should have been rolled back
1697 already. So there should be no need to rollback the transaction.
1698 */
1699 DBUG_ASSERT(! thd->transaction_rollback_request);
1700 if (unlikely((error= (binlog_error ?
1701 trans_rollback_stmt(thd) :
1702 trans_commit_stmt(thd)))))
1703 rli->report(ERROR_LEVEL, error, NULL,
1704 "Error in %s event: commit of row events failed, "
1705 "table `%s`.`%s`",
1706 get_type_str(), m_table->s->db.str,
1707 m_table->s->table_name.str);
1708 error|= binlog_error;
1709
1710 /*
1711 Now what if this is not a transactional engine? we still need to
1712 flush the pending event to the binlog; we did it with
1713 thd->binlog_flush_pending_rows_event(). Note that we imitate
1714 what is done for real queries: a call to
1715 ha_autocommit_or_rollback() (sometimes only if involves a
1716 transactional engine), and a call to be sure to have the pending
1717 event flushed.
1718 */
1719
1720 thd->reset_current_stmt_binlog_format_row();
1721 rgi->cleanup_context(thd, 0);
1722 }
1723
1724 DBUG_RETURN(error);
1725 }
1726
1727
1728 Log_event::enum_skip_reason
do_shall_skip(rpl_group_info * rgi)1729 Old_rows_log_event::do_shall_skip(rpl_group_info *rgi)
1730 {
1731 /*
1732 If the slave skip counter is 1 and this event does not end a
1733 statement, then we should not start executing on the next event.
1734 Otherwise, we defer the decision to the normal skipping logic.
1735 */
1736 if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1737 return Log_event::EVENT_SKIP_IGNORE;
1738 else
1739 return Log_event::do_shall_skip(rgi);
1740 }
1741
1742 int
do_update_pos(rpl_group_info * rgi)1743 Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
1744 {
1745 Relay_log_info *rli= rgi->rli;
1746 int error= 0;
1747 DBUG_ENTER("Old_rows_log_event::do_update_pos");
1748
1749 DBUG_PRINT("info", ("flags: %s",
1750 get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1751
1752 if (get_flags(STMT_END_F))
1753 {
1754 /*
1755 Indicate that a statement is finished.
1756 Step the group log position if we are not in a transaction,
1757 otherwise increase the event log position.
1758 */
1759 error= rli->stmt_done(log_pos, thd, rgi);
1760 /*
1761 Clear any errors in thd->net.last_err*. It is not known if this is
1762 needed or not. It is believed that any errors that may exist in
1763 thd->net.last_err* are allowed. Examples of errors are "key not
1764 found", which is produced in the test case rpl_row_conflicts.test
1765 */
1766 thd->clear_error();
1767 }
1768 else
1769 {
1770 rgi->inc_event_relay_log_pos();
1771 }
1772
1773 DBUG_RETURN(error);
1774 }
1775
1776 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1777
1778
1779 #ifndef MYSQL_CLIENT
write_data_header()1780 bool Old_rows_log_event::write_data_header()
1781 {
1782 uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer
1783
1784 // This method should not be reached.
1785 assert(0);
1786
1787 DBUG_ASSERT(m_table_id != ~0UL);
1788 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1789 {
1790 int4store(buf + 0, m_table_id);
1791 int2store(buf + 4, m_flags);
1792 return write_data(buf, 6);
1793 });
1794 int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
1795 int2store(buf + RW_FLAGS_OFFSET, m_flags);
1796 return write_data(buf, ROWS_HEADER_LEN);
1797 }
1798
1799
write_data_body()1800 bool Old_rows_log_event::write_data_body()
1801 {
1802 /*
1803 Note that this should be the number of *bits*, not the number of
1804 bytes.
1805 */
1806 uchar sbuf[MAX_INT_WIDTH];
1807 my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1808
1809 // This method should not be reached.
1810 assert(0);
1811
1812 bool res= false;
1813 uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1814 DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1815
1816 DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1817 res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
1818
1819 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1820 res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
1821 DBUG_DUMP("rows", m_rows_buf, data_size);
1822 res= res || write_data(m_rows_buf, (size_t) data_size);
1823
1824 return res;
1825
1826 }
1827 #endif
1828
1829
1830 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
pack_info(Protocol * protocol)1831 void Old_rows_log_event::pack_info(Protocol *protocol)
1832 {
1833 char buf[256];
1834 char const *const flagstr=
1835 get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1836 size_t bytes= my_snprintf(buf, sizeof(buf),
1837 "table_id: %lu%s", m_table_id, flagstr);
1838 protocol->store(buf, bytes, &my_charset_bin);
1839 }
1840 #endif
1841
1842
1843 #ifdef MYSQL_CLIENT
1844 /* Method duplicates Rows_log_event's one */
print_helper(FILE * file,PRINT_EVENT_INFO * print_event_info,char const * const name)1845 bool Old_rows_log_event::print_helper(FILE *file,
1846 PRINT_EVENT_INFO *print_event_info,
1847 char const *const name)
1848 {
1849 IO_CACHE *const head= &print_event_info->head_cache;
1850 IO_CACHE *const body= &print_event_info->body_cache;
1851 IO_CACHE *const tail= &print_event_info->tail_cache;
1852 bool do_print_encoded=
1853 print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
1854 print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
1855 !print_event_info->short_form;
1856
1857 if (!print_event_info->short_form)
1858 {
1859 if (print_header(head, print_event_info, !do_print_encoded) ||
1860 my_b_printf(head, "\t%s: table id %lu%s\n",
1861 name, m_table_id,
1862 do_print_encoded ? " flags: STMT_END_F" : "") ||
1863 print_base64(body, print_event_info, do_print_encoded))
1864 goto err;
1865 }
1866
1867 if (get_flags(STMT_END_F))
1868 {
1869 if (copy_event_cache_to_file_and_reinit(head, file) ||
1870 copy_cache_to_file_wrapped(body, file, do_print_encoded,
1871 print_event_info->delimiter,
1872 print_event_info->verbose) ||
1873 copy_event_cache_to_file_and_reinit(tail, file))
1874 goto err;
1875 }
1876 return 0;
1877 err:
1878 return 1;
1879 }
1880 #endif
1881
1882
1883 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1884 /**
1885 Write the current row into event's table.
1886
1887 The row is located in the row buffer, pointed by @c m_curr_row member.
1888 Number of columns of the row is stored in @c m_width member (it can be
1889 different from the number of columns in the table to which we insert).
1890 Bitmap @c m_cols indicates which columns are present in the row. It is assumed
1891 that event's table is already open and pointed by @c m_table.
1892
1893 If the same record already exists in the table it can be either overwritten
1894 or an error is reported depending on the value of @c overwrite flag
1895 (error reporting not yet implemented). Note that the matching record can be
1896 different from the row we insert if we use primary keys to identify records in
1897 the table.
1898
1899 The row to be inserted can contain values only for selected columns. The
1900 missing columns are filled with default values using @c prepare_record()
1901 function. If a matching record is found in the table and @c overwritte is
1902 true, the missing columns are taken from it.
1903
1904 @param rli Relay log info (needed for row unpacking).
1905 @param overwrite
1906 Shall we overwrite if the row already exists or signal
1907 error (currently ignored).
1908
1909 @returns Error code on failure, 0 on success.
1910
1911 This method, if successful, sets @c m_curr_row_end pointer to point at the
1912 next row in the rows buffer. This is done when unpacking the row to be
1913 inserted.
1914
1915 @note If a matching record is found, it is either updated using
1916 @c ha_update_row() or first deleted and then new record written.
1917 */
1918
1919 int
write_row(rpl_group_info * rgi,const bool overwrite)1920 Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite)
1921 {
1922 DBUG_ENTER("write_row");
1923 DBUG_ASSERT(m_table != NULL && thd != NULL);
1924
1925 TABLE *table= m_table; // pointer to event's table
1926 int error;
1927 int keynum;
1928 auto_afree_ptr<char> key(NULL);
1929
1930 /* fill table->record[0] with default values */
1931
1932 if (unlikely((error=
1933 prepare_record(table, m_width,
1934 TRUE /* check if columns have def. values */))))
1935 DBUG_RETURN(error);
1936
1937 /* unpack row into table->record[0] */
1938 if ((error= unpack_current_row(rgi)))
1939 DBUG_RETURN(error);
1940
1941 #ifndef DBUG_OFF
1942 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1943 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
1944 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
1945 #endif
1946
1947 /*
1948 Try to write record. If a corresponding record already exists in the table,
1949 we try to change it using ha_update_row() if possible. Otherwise we delete
1950 it and repeat the whole process again.
1951
1952 TODO: Add safety measures against infinite looping.
1953 */
1954
1955 while (unlikely(error= table->file->ha_write_row(table->record[0])))
1956 {
1957 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
1958 {
1959 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
1960 DBUG_RETURN(error);
1961 }
1962 if (unlikely((keynum= table->file->get_dup_key(error)) < 0))
1963 {
1964 DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
1965 table->file->print_error(error, MYF(0));
1966 /*
1967 We failed to retrieve the duplicate key
1968 - either because the error was not "duplicate key" error
1969 - or because the information which key is not available
1970 */
1971 DBUG_RETURN(error);
1972 }
1973
1974 /*
1975 We need to retrieve the old row into record[1] to be able to
1976 either update or delete the offending record. We either:
1977
1978 - use rnd_pos() with a row-id (available as dupp_row) to the
1979 offending row, if that is possible (MyISAM and Blackhole), or else
1980
1981 - use index_read_idx() with the key that is duplicated, to
1982 retrieve the offending row.
1983 */
1984 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1985 {
1986 DBUG_PRINT("info",("Locating offending record using rnd_pos()"));
1987 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
1988 if (unlikely(error))
1989 {
1990 DBUG_PRINT("info",("rnd_pos() returns error %d",error));
1991 table->file->print_error(error, MYF(0));
1992 DBUG_RETURN(error);
1993 }
1994 }
1995 else
1996 {
1997 DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
1998
1999 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2000 {
2001 DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
2002 DBUG_RETURN(my_errno);
2003 }
2004
2005 if (key.get() == NULL)
2006 {
2007 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
2008 if (unlikely(key.get() == NULL))
2009 {
2010 DBUG_PRINT("info",("Can't allocate key buffer"));
2011 DBUG_RETURN(ENOMEM);
2012 }
2013 }
2014
2015 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
2016 0);
2017 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2018 (const uchar*)key.get(),
2019 HA_WHOLE_KEY,
2020 HA_READ_KEY_EXACT);
2021 if (unlikely(error))
2022 {
2023 DBUG_PRINT("info",("index_read_idx() returns error %d", error));
2024 table->file->print_error(error, MYF(0));
2025 DBUG_RETURN(error);
2026 }
2027 }
2028
2029 /*
2030 Now, record[1] should contain the offending row. That
2031 will enable us to update it or, alternatively, delete it (so
2032 that we can insert the new row afterwards).
2033 */
2034
2035 /*
2036 If row is incomplete we will use the record found to fill
2037 missing columns.
2038 */
2039 if (!get_flags(COMPLETE_ROWS_F))
2040 {
2041 restore_record(table,record[1]);
2042 error= unpack_current_row(rgi);
2043 }
2044
2045 #ifndef DBUG_OFF
2046 DBUG_PRINT("debug",("preparing for update: before and after image"));
2047 DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2048 DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2049 #endif
2050
2051 /*
2052 REPLACE is defined as either INSERT or DELETE + INSERT. If
2053 possible, we can replace it with an UPDATE, but that will not
2054 work on InnoDB if FOREIGN KEY checks are necessary.
2055
2056 I (Matz) am not sure of the reason for the last_uniq_key()
2057 check as, but I'm guessing that it's something along the
2058 following lines.
2059
2060 Suppose that we got the duplicate key to be a key that is not
2061 the last unique key for the table and we perform an update:
2062 then there might be another key for which the unique check will
2063 fail, so we're better off just deleting the row and inserting
2064 the correct row.
2065 */
2066 if (last_uniq_key(table, keynum) &&
2067 !table->file->referenced_by_foreign_key())
2068 {
2069 DBUG_PRINT("info",("Updating row using ha_update_row()"));
2070 error=table->file->ha_update_row(table->record[1],
2071 table->record[0]);
2072 switch (error) {
2073
2074 case HA_ERR_RECORD_IS_THE_SAME:
2075 DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2076 " ha_update_row()"));
2077 error= 0;
2078
2079 case 0:
2080 break;
2081
2082 default:
2083 DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2084 table->file->print_error(error, MYF(0));
2085 }
2086
2087 DBUG_RETURN(error);
2088 }
2089 else
2090 {
2091 DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2092 if (unlikely((error= table->file->ha_delete_row(table->record[1]))))
2093 {
2094 DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2095 table->file->print_error(error, MYF(0));
2096 DBUG_RETURN(error);
2097 }
2098 /* Will retry ha_write_row() with the offending row removed. */
2099 }
2100 }
2101
2102 DBUG_RETURN(error);
2103 }
2104
2105
2106 /**
2107 Locate the current row in event's table.
2108
2109 The current row is pointed by @c m_curr_row. Member @c m_width tells how many
2110 columns are there in the row (this can be differnet from the number of columns
2111 in the table). It is assumed that event's table is already open and pointed
2112 by @c m_table.
2113
2114 If a corresponding record is found in the table it is stored in
2115 @c m_table->record[0]. Note that when record is located based on a primary
2116 key, it is possible that the record found differs from the row being located.
2117
2118 If no key is specified or table does not have keys, a table scan is used to
2119 find the row. In that case the row should be complete and contain values for
2120 all columns. However, it can still be shorter than the table, i.e. the table
2121 can contain extra columns not present in the row. It is also possible that
2122 the table has fewer columns than the row being located.
2123
2124 @returns Error code on failure, 0 on success.
2125
2126 @post In case of success @c m_table->record[0] contains the record found.
2127 Also, the internal "cursor" of the table is positioned at the record found.
2128
2129 @note If the engine allows random access of the records, a combination of
2130 @c position() and @c rnd_pos() will be used.
2131
2132 Note that one MUST call ha_index_or_rnd_end() after this function if
2133 it returns 0 as we must leave the row position in the handler intact
2134 for any following update/delete command.
2135 */
2136
find_row(rpl_group_info * rgi)2137 int Old_rows_log_event::find_row(rpl_group_info *rgi)
2138 {
2139 DBUG_ENTER("find_row");
2140
2141 DBUG_ASSERT(m_table && m_table->in_use != NULL);
2142
2143 TABLE *table= m_table;
2144 int error;
2145
2146 /* unpack row - missing fields get default values */
2147
2148 // TODO: shall we check and report errors here?
2149 prepare_record(table, m_width, FALSE /* don't check errors */);
2150 error= unpack_current_row(rgi);
2151
2152 #ifndef DBUG_OFF
2153 DBUG_PRINT("info",("looking for the following record"));
2154 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2155 #endif
2156
2157 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2158 table->s->primary_key < MAX_KEY)
2159 {
2160 /*
2161 Use a more efficient method to fetch the record given by
2162 table->record[0] if the engine allows it. We first compute a
2163 row reference using the position() member function (it will be
2164 stored in table->file->ref) and the use rnd_pos() to position
2165 the "cursor" (i.e., record[0] in this case) at the correct row.
2166
2167 TODO: Add a check that the correct record has been fetched by
2168 comparing with the original record. Take into account that the
2169 record on the master and slave can be of different
2170 length. Something along these lines should work:
2171
2172 ADD>>> store_record(table,record[1]);
2173 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
2174 ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
2175 table->s->reclength) == 0);
2176
2177 */
2178 DBUG_PRINT("info",("locating record using primary key (position)"));
2179 int error= table->file->ha_rnd_pos_by_record(table->record[0]);
2180 if (unlikely(error))
2181 {
2182 DBUG_PRINT("info",("rnd_pos returns error %d",error));
2183 table->file->print_error(error, MYF(0));
2184 }
2185 DBUG_RETURN(error);
2186 }
2187
2188 // We can't use position() - try other methods.
2189
2190 /*
2191 We need to retrieve all fields
2192 TODO: Move this out from this function to main loop
2193 */
2194 table->use_all_columns();
2195
2196 /*
2197 Save copy of the record in table->record[1]. It might be needed
2198 later if linear search is used to find exact match.
2199 */
2200 store_record(table,record[1]);
2201
2202 if (table->s->keys > 0)
2203 {
2204 DBUG_PRINT("info",("locating record using primary key (index_read)"));
2205
2206 /* We have a key: search the table using the index */
2207 if (!table->file->inited &&
2208 unlikely(error= table->file->ha_index_init(0, FALSE)))
2209 {
2210 DBUG_PRINT("info",("ha_index_init returns error %d",error));
2211 table->file->print_error(error, MYF(0));
2212 DBUG_RETURN(error);
2213 }
2214
2215 /* Fill key data for the row */
2216
2217 DBUG_ASSERT(m_key);
2218 key_copy(m_key, table->record[0], table->key_info, 0);
2219
2220 /*
2221 Don't print debug messages when running valgrind since they can
2222 trigger false warnings.
2223 */
2224 #ifndef HAVE_valgrind
2225 DBUG_DUMP("key data", m_key, table->key_info->key_length);
2226 #endif
2227
2228 /*
2229 We need to set the null bytes to ensure that the filler bit are
2230 all set when returning. There are storage engines that just set
2231 the necessary bits on the bytes and don't set the filler bits
2232 correctly.
2233 */
2234 my_ptrdiff_t const pos=
2235 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2236 table->record[0][pos]= 0xFF;
2237
2238 if (unlikely((error= table->file->ha_index_read_map(table->record[0],
2239 m_key,
2240 HA_WHOLE_KEY,
2241 HA_READ_KEY_EXACT))))
2242 {
2243 DBUG_PRINT("info",("no record matching the key found in the table"));
2244 table->file->print_error(error, MYF(0));
2245 table->file->ha_index_end();
2246 DBUG_RETURN(error);
2247 }
2248
2249 /*
2250 Don't print debug messages when running valgrind since they can
2251 trigger false warnings.
2252 */
2253 #ifndef HAVE_valgrind
2254 DBUG_PRINT("info",("found first matching record"));
2255 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2256 #endif
2257 /*
2258 Below is a minor "optimization". If the key (i.e., key number
2259 0) has the HA_NOSAME flag set, we know that we have found the
2260 correct record (since there can be no duplicates); otherwise, we
2261 have to compare the record with the one found to see if it is
2262 the correct one.
2263
2264 CAVEAT! This behaviour is essential for the replication of,
2265 e.g., the mysql.proc table since the correct record *shall* be
2266 found using the primary key *only*. There shall be no
2267 comparison of non-PK columns to decide if the correct record is
2268 found. I can see no scenario where it would be incorrect to
2269 chose the row to change only using a PK or an UNNI.
2270 */
2271 if (table->key_info->flags & HA_NOSAME)
2272 {
2273 /* Unique does not have non nullable part */
2274 if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2275 {
2276 DBUG_RETURN(0);
2277 }
2278 else
2279 {
2280 KEY *keyinfo= table->key_info;
2281 /*
2282 Unique has nullable part. We need to check if there is any
2283 field in the BI image that is null and part of UNNI.
2284 */
2285 bool null_found= FALSE;
2286 for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2287 {
2288 uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2289 Field **f= table->field+fieldnr;
2290 null_found= (*f)->is_null();
2291 }
2292
2293 if (!null_found)
2294 {
2295 DBUG_RETURN(0);
2296 }
2297
2298 /* else fall through to index scan */
2299 }
2300 }
2301
2302 /*
2303 In case key is not unique, we still have to iterate over records found
2304 and find the one which is identical to the row given. A copy of the
2305 record we are looking for is stored in record[1].
2306 */
2307 DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2308
2309 while (record_compare(table))
2310 {
2311 while (unlikely(error= table->file->ha_index_next(table->record[0])))
2312 {
2313 DBUG_PRINT("info",("no record matching the given row found"));
2314 table->file->print_error(error, MYF(0));
2315 (void) table->file->ha_index_end();
2316 DBUG_RETURN(error);
2317 }
2318 }
2319 }
2320 else
2321 {
2322 DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
2323
2324 int restart_count= 0; // Number of times scanning has restarted from top
2325
2326 /* We don't have a key: search the table using rnd_next() */
2327 if (unlikely((error= table->file->ha_rnd_init_with_error(1))))
2328 {
2329 DBUG_PRINT("info",("error initializing table scan"
2330 " (ha_rnd_init returns %d)",error));
2331 DBUG_RETURN(error);
2332 }
2333
2334 /* Continue until we find the right record or have made a full loop */
2335 do
2336 {
2337 restart_rnd_next:
2338 error= table->file->ha_rnd_next(table->record[0]);
2339
2340 switch (error) {
2341
2342 case 0:
2343 break;
2344
2345 case HA_ERR_END_OF_FILE:
2346 if (++restart_count < 2)
2347 {
2348 int error2;
2349 table->file->ha_rnd_end();
2350 if (unlikely((error2= table->file->ha_rnd_init_with_error(1))))
2351 DBUG_RETURN(error2);
2352 goto restart_rnd_next;
2353 }
2354 break;
2355
2356 default:
2357 DBUG_PRINT("info", ("Failed to get next record"
2358 " (rnd_next returns %d)",error));
2359 table->file->print_error(error, MYF(0));
2360 table->file->ha_rnd_end();
2361 DBUG_RETURN(error);
2362 }
2363 }
2364 while (restart_count < 2 && record_compare(table));
2365
2366 /*
2367 Note: above record_compare will take into accout all record fields
2368 which might be incorrect in case a partial row was given in the event
2369 */
2370
2371 /*
2372 Have to restart the scan to be able to fetch the next row.
2373 */
2374 if (restart_count == 2)
2375 DBUG_PRINT("info", ("Record not found"));
2376 else
2377 DBUG_DUMP("record found", table->record[0], table->s->reclength);
2378 if (error)
2379 table->file->ha_rnd_end();
2380
2381 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
2382 DBUG_RETURN(error);
2383 }
2384
2385 DBUG_RETURN(0);
2386 }
2387
2388 #endif
2389
2390
2391 /**************************************************************************
2392 Write_rows_log_event member functions
2393 **************************************************************************/
2394
2395 /*
2396 Constructor used to build an event for writing to the binary log.
2397 */
2398 #if !defined(MYSQL_CLIENT)
Write_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid_arg,MY_BITMAP const * cols,bool is_transactional)2399 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2400 TABLE *tbl_arg,
2401 ulong tid_arg,
2402 MY_BITMAP const *cols,
2403 bool is_transactional)
2404 : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2405 {
2406
2407 // This constructor should not be reached.
2408 assert(0);
2409
2410 }
2411 #endif
2412
2413
2414 /*
2415 Constructor used by slave to read the event from the binary log.
2416 */
2417 #ifdef HAVE_REPLICATION
Write_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2418 Write_rows_log_event_old::Write_rows_log_event_old(const char *buf,
2419 uint event_len,
2420 const Format_description_log_event
2421 *description_event)
2422 : Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT,
2423 description_event)
2424 {
2425 }
2426 #endif
2427
2428
2429 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2430 int
do_before_row_operations(const Slave_reporting_capability * const)2431 Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2432 {
2433 int error= 0;
2434
2435 /*
2436 We are using REPLACE semantics and not INSERT IGNORE semantics
2437 when writing rows, that is: new rows replace old rows. We need to
2438 inform the storage engine that it should use this behaviour.
2439 */
2440
2441 /* Tell the storage engine that we are using REPLACE semantics. */
2442 thd->lex->duplicates= DUP_REPLACE;
2443
2444 thd->lex->sql_command= SQLCOM_REPLACE;
2445 /*
2446 Do not raise the error flag in case of hitting to an unique attribute
2447 */
2448 m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2449 m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2450 m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2451 m_table->file->ha_start_bulk_insert(0);
2452 return error;
2453 }
2454
2455
2456 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2457 Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2458 int error)
2459 {
2460 int local_error= 0;
2461 m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2462 m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2463 /*
2464 resetting the extra with
2465 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2466 fires bug#27077
2467 todo: explain or fix
2468 */
2469 if (unlikely((local_error= m_table->file->ha_end_bulk_insert())))
2470 {
2471 m_table->file->print_error(local_error, MYF(0));
2472 }
2473 return error? error : local_error;
2474 }
2475
2476
2477 int
do_exec_row(rpl_group_info * rgi)2478 Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2479 {
2480 DBUG_ASSERT(m_table != NULL);
2481 int error= write_row(rgi, TRUE /* overwrite */);
2482
2483 if (unlikely(error) && !thd->net.last_errno)
2484 thd->net.last_errno= error;
2485
2486 return error;
2487 }
2488
2489 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2490
2491
2492 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2493 bool Write_rows_log_event_old::print(FILE *file,
2494 PRINT_EVENT_INFO* print_event_info)
2495 {
2496 return Old_rows_log_event::print_helper(file, print_event_info,
2497 "Write_rows_old");
2498 }
2499 #endif
2500
2501
2502 /**************************************************************************
2503 Delete_rows_log_event member functions
2504 **************************************************************************/
2505
2506 /*
2507 Constructor used to build an event for writing to the binary log.
2508 */
2509
2510 #ifndef MYSQL_CLIENT
Delete_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2511 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2512 TABLE *tbl_arg,
2513 ulong tid,
2514 MY_BITMAP const *cols,
2515 bool is_transactional)
2516 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2517 m_after_image(NULL), m_memory(NULL)
2518 {
2519
2520 // This constructor should not be reached.
2521 assert(0);
2522
2523 }
2524 #endif /* #if !defined(MYSQL_CLIENT) */
2525
2526
2527 /*
2528 Constructor used by slave to read the event from the binary log.
2529 */
2530 #ifdef HAVE_REPLICATION
Delete_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2531 Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf,
2532 uint event_len,
2533 const Format_description_log_event
2534 *description_event)
2535 : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT,
2536 description_event),
2537 m_after_image(NULL), m_memory(NULL)
2538 {
2539 }
2540 #endif
2541
2542
2543 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2544
2545 int
do_before_row_operations(const Slave_reporting_capability * const)2546 Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2547 {
2548 if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2549 m_table->s->primary_key < MAX_KEY)
2550 {
2551 /*
2552 We don't need to allocate any memory for m_key since it is not used.
2553 */
2554 return 0;
2555 }
2556
2557 if (m_table->s->keys > 0)
2558 {
2559 // Allocate buffer for key searches
2560 m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2561 if (!m_key)
2562 return HA_ERR_OUT_OF_MEM;
2563 }
2564 return 0;
2565 }
2566
2567
2568 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2569 Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2570 int error)
2571 {
2572 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2573 m_table->file->ha_index_or_rnd_end();
2574 my_free(m_key);
2575 m_key= NULL;
2576
2577 return error;
2578 }
2579
2580
do_exec_row(rpl_group_info * rgi)2581 int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2582 {
2583 int error;
2584 DBUG_ASSERT(m_table != NULL);
2585
2586 if (likely(!(error= find_row(rgi))) )
2587 {
2588 /*
2589 Delete the record found, located in record[0]
2590 */
2591 error= m_table->file->ha_delete_row(m_table->record[0]);
2592 m_table->file->ha_index_or_rnd_end();
2593 }
2594 return error;
2595 }
2596
2597 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2598
2599
2600 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2601 bool Delete_rows_log_event_old::print(FILE *file,
2602 PRINT_EVENT_INFO* print_event_info)
2603 {
2604 return Old_rows_log_event::print_helper(file, print_event_info,
2605 "Delete_rows_old");
2606 }
2607 #endif
2608
2609
2610 /**************************************************************************
2611 Update_rows_log_event member functions
2612 **************************************************************************/
2613
2614 /*
2615 Constructor used to build an event for writing to the binary log.
2616 */
2617 #if !defined(MYSQL_CLIENT)
Update_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2618 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2619 TABLE *tbl_arg,
2620 ulong tid,
2621 MY_BITMAP const *cols,
2622 bool is_transactional)
2623 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2624 m_after_image(NULL), m_memory(NULL)
2625 {
2626
2627 // This constructor should not be reached.
2628 assert(0);
2629 }
2630 #endif /* !defined(MYSQL_CLIENT) */
2631
2632
2633 /*
2634 Constructor used by slave to read the event from the binary log.
2635 */
2636 #ifdef HAVE_REPLICATION
Update_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2637 Update_rows_log_event_old::Update_rows_log_event_old(const char *buf,
2638 uint event_len,
2639 const
2640 Format_description_log_event
2641 *description_event)
2642 : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT,
2643 description_event),
2644 m_after_image(NULL), m_memory(NULL)
2645 {
2646 }
2647 #endif
2648
2649
2650 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2651
2652 int
do_before_row_operations(const Slave_reporting_capability * const)2653 Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2654 {
2655 if (m_table->s->keys > 0)
2656 {
2657 // Allocate buffer for key searches
2658 m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2659 if (!m_key)
2660 return HA_ERR_OUT_OF_MEM;
2661 }
2662
2663 return 0;
2664 }
2665
2666
2667 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2668 Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2669 int error)
2670 {
2671 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2672 m_table->file->ha_index_or_rnd_end();
2673 my_free(m_key); // Free for multi_malloc
2674 m_key= NULL;
2675
2676 return error;
2677 }
2678
2679
2680 int
do_exec_row(rpl_group_info * rgi)2681 Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2682 {
2683 DBUG_ASSERT(m_table != NULL);
2684
2685 int error= find_row(rgi);
2686 if (unlikely(error))
2687 {
2688 /*
2689 We need to read the second image in the event of error to be
2690 able to skip to the next pair of updates
2691 */
2692 m_curr_row= m_curr_row_end;
2693 unpack_current_row(rgi);
2694 return error;
2695 }
2696
2697 /*
2698 This is the situation after locating BI:
2699
2700 ===|=== before image ====|=== after image ===|===
2701 ^ ^
2702 m_curr_row m_curr_row_end
2703
2704 BI found in the table is stored in record[0]. We copy it to record[1]
2705 and unpack AI to record[0].
2706 */
2707
2708 store_record(m_table,record[1]);
2709
2710 m_curr_row= m_curr_row_end;
2711 error= unpack_current_row(rgi); // this also updates m_curr_row_end
2712
2713 /*
2714 Now we have the right row to update. The old row (the one we're
2715 looking for) is in record[1] and the new row is in record[0].
2716 */
2717 #ifndef HAVE_valgrind
2718 /*
2719 Don't print debug messages when running valgrind since they can
2720 trigger false warnings.
2721 */
2722 DBUG_PRINT("info",("Updating row in table"));
2723 DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2724 DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2725 #endif
2726
2727 error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2728 m_table->file->ha_index_or_rnd_end();
2729
2730 if (unlikely(error == HA_ERR_RECORD_IS_THE_SAME))
2731 error= 0;
2732
2733 return error;
2734 }
2735
2736 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2737
2738
2739 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2740 bool Update_rows_log_event_old::print(FILE *file,
2741 PRINT_EVENT_INFO* print_event_info)
2742 {
2743 return Old_rows_log_event::print_helper(file, print_event_info,
2744 "Update_rows_old");
2745 }
2746 #endif
2747