1 /* Copyright (c) 2007, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql_priv.h"
24 #ifndef MYSQL_CLIENT
25 #include "unireg.h"
26 #endif
27 #include "my_global.h" // REQUIRED by log_event.h > m_string.h > my_bitmap.h
28 #include "log_event.h"
29 #ifndef MYSQL_CLIENT
30 #include "sql_cache.h"                       // QUERY_CACHE_FLAGS_SIZE
31 #include "sql_base.h"                       // close_tables_for_reopen
32 #include "key.h"                            // key_copy
33 #include "lock.h"                           // mysql_unlock_tables
34 #include "sql_parse.h"             // mysql_reset_thd_for_next_command
35 #include "rpl_rli.h"
36 #include "rpl_utility.h"
37 #endif
38 #include "log_event_old.h"
39 #include "rpl_record_old.h"
40 #include "transaction.h"
41 
42 #include <algorithm>
43 
44 using std::min;
45 using std::max;
46 
47 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
48 
49 // Old implementation of do_apply_event()
50 int
do_apply_event(Old_rows_log_event * ev,const Relay_log_info * rli)51 Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli)
52 {
53   DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
54   int error= 0;
55   THD *ev_thd= ev->thd;
56   uchar const *row_start= ev->m_rows_buf;
57 
58   /*
59     If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
60     does not contain any data.  In that case, we just remove all tables in the
61     tables_to_lock list, close the thread tables, and return with
62     success.
63    */
64   if ((ev->m_table_id.id() == ~0U || ev->m_table_id.id() == (~0ULL >> 16)) &&
65       ev->m_cols.n_bits == 1 && ev->m_cols.bitmap[0] == 0)
66 
67   {
68     /*
69        This one is supposed to be set: just an extra check so that
70        nothing strange has happened.
71      */
72     DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
73 
74     const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
75     ev_thd->clear_error();
76     DBUG_RETURN(0);
77   }
78 
79   /*
80     'ev_thd' has been set by exec_relay_log_event(), just before calling
81     do_apply_event(). We still check here to prevent future coding
82     errors.
83   */
84   DBUG_ASSERT(rli->info_thd == ev_thd);
85 
86   /*
87     If there is no locks taken, this is the first binrow event seen
88     after the table map events.  We should then lock all the tables
89     used in the transaction and proceed with execution of the actual
90     event.
91   */
92   if (!ev_thd->lock)
93   {
94     /*
95       Lock_tables() reads the contents of ev_thd->lex, so they must be
96       initialized.
97 
98       We also call the mysql_reset_thd_for_next_command(), since this
99       is the logical start of the next "statement". Note that this
100       call might reset the value of current_stmt_binlog_format, so
101       we need to do any changes to that value after this function.
102     */
103     lex_start(ev_thd);
104     mysql_reset_thd_for_next_command(ev_thd);
105 
106     /*
107       This is a row injection, so we flag the "statement" as
108       such. Note that this code is called both when the slave does row
109       injections and when the BINLOG statement is used to do row
110       injections.
111     */
112     ev_thd->lex->set_stmt_row_injection();
113 
114     if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0))
115     {
116       if (ev_thd->is_error())
117       {
118         /*
119           Error reporting borrowed from Query_log_event with many excessive
120           simplifications (we don't honour --slave-skip-errors)
121         */
122         rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
123                     "Error '%s' on opening tables",
124                     ev_thd->get_stmt_da()->message());
125         ev_thd->is_slave_error= 1;
126       }
127       DBUG_RETURN(1);
128     }
129 
130     /*
131       When the open and locking succeeded, we check all tables to
132       ensure that they still have the correct type.
133     */
134 
135     {
136       TABLE_LIST *table_list_ptr= rli->tables_to_lock;
137       for (uint i=0 ; table_list_ptr&& (i< rli->tables_to_lock_count);
138            table_list_ptr= table_list_ptr->next_global, i++)
139       {
140         /*
141           Please see comment in log_event.cc-Rows_log_event::do_apply_event()
142           function for the explanation of the below if condition
143         */
144         if (table_list_ptr->parent_l)
145           continue;
146         /*
147           We can use a down cast here since we know that every table added
148           to the tables_to_lock is a RPL_TABLE_LIST(or child table which is
149           skipped above).
150         */
151         RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
152         DBUG_ASSERT(ptr->m_tabledef_valid);
153         TABLE *conv_table;
154         if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
155                                              ptr->table, &conv_table))
156         {
157           ev_thd->is_slave_error= 1;
158           const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
159           DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
160         }
161         DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
162                              " - conv_table: %p",
163                              ptr->table->s->db.str,
164                              ptr->table->s->table_name.str, conv_table));
165         ptr->m_conv_table= conv_table;
166       }
167     }
168 
169     /*
170       ... and then we add all the tables to the table map and remove
171       them from tables to lock.
172 
173       We also invalidate the query cache for all the tables, since
174       they will now be changed.
175 
176       TODO [/Matz]: Maybe the query cache should not be invalidated
177       here? It might be that a table is not changed, even though it
178       was locked for the statement.  We do know that each
179       Old_rows_log_event contain at least one row, so after processing one
180       Old_rows_log_event, we can invalidate the query cache for the
181       associated table.
182      */
183     TABLE_LIST *ptr= rli->tables_to_lock;
184     for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
185     {
186       /*
187         Please see comment in log_event.cc-Rows_log_event::do_apply_event()
188         function for the explanation of the below if condition
189        */
190       if (ptr->parent_l)
191         continue;
192       const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
193     }
194 #ifdef HAVE_QUERY_CACHE
195     query_cache.invalidate_locked_for_write(rli->tables_to_lock);
196 #endif
197   }
198 
199   TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
200 
201   if (table)
202   {
203     /*
204       table == NULL means that this table should not be replicated
205       (this was set up by Table_map_log_event::do_apply_event()
206       which tested replicate-* rules).
207     */
208 
209     /*
210       It's not needed to set_time() but
211       1) it continues the property that "Time" in SHOW PROCESSLIST shows how
212       much slave is behind
213       2) it will be needed when we allow replication from a table with no
214       TIMESTAMP column to a table with one.
215       So we call set_time(), like in SBR. Presently it changes nothing.
216     */
217     ev_thd->set_time(&ev->when);
218     /*
219       There are a few flags that are replicated with each row event.
220       Make sure to set/clear them before executing the main body of
221       the event.
222     */
223     if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
224         ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
225     else
226         ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
227 
228     if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
229         ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
230     else
231         ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
232     /* A small test to verify that objects have consistent types */
233     DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
234 
235     /*
236       Now we are in a statement and will stay in a statement until we
237       see a STMT_END_F.
238 
239       We set this flag here, before actually applying any rows, in
240       case the SQL thread is stopped and we need to detect that we're
241       inside a statement and halting abruptly might cause problems
242       when restarting.
243      */
244     const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
245 
246     error= do_before_row_operations(table);
247     while (error == 0 && row_start < ev->m_rows_end)
248     {
249       uchar const *row_end= NULL;
250       if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
251         break; // We should perform the after-row operation even in
252                // the case of error
253 
254       DBUG_ASSERT(row_end != NULL); // cannot happen
255       DBUG_ASSERT(row_end <= ev->m_rows_end);
256 
257       /* in_use can have been set to NULL in close_tables_for_reopen */
258       THD* old_thd= table->in_use;
259       if (!table->in_use)
260         table->in_use= ev_thd;
261       error= do_exec_row(table);
262       table->in_use = old_thd;
263       switch (error)
264       {
265         /* Some recoverable errors */
266       case HA_ERR_RECORD_CHANGED:
267       case HA_ERR_KEY_NOT_FOUND:  /* Idempotency support: OK if
268                                            tuple does not exist */
269   error= 0;
270       case 0:
271   break;
272 
273       default:
274   rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
275                     "Error in %s event: row application failed. %s",
276                     ev->get_type_str(),
277                     ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
278   thd->is_slave_error= 1;
279   break;
280       }
281 
282       row_start= row_end;
283     }
284     DBUG_EXECUTE_IF("stop_slave_middle_group",
285                     const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
286     error= do_after_row_operations(table, error);
287   }
288 
289   if (error)
290   {                     /* error has occured during the transaction */
291     rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
292                 "Error in %s event: error during transaction execution "
293                 "on table %s.%s. %s",
294                 ev->get_type_str(), table->s->db.str,
295                 table->s->table_name.str,
296                 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
297 
298     /*
299       If one day we honour --skip-slave-errors in row-based replication, and
300       the error should be skipped, then we would clear mappings, rollback,
301       close tables, but the slave SQL thread would not stop and then may
302       assume the mapping is still available, the tables are still open...
303       So then we should clear mappings/rollback/close here only if this is a
304       STMT_END_F.
305       For now we code, knowing that error is not skippable and so slave SQL
306       thread is certainly going to stop.
307       rollback at the caller along with sbr.
308     */
309     ev_thd->reset_current_stmt_binlog_format_row();
310     const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
311     ev_thd->is_slave_error= 1;
312     DBUG_RETURN(error);
313   }
314 
315   DBUG_RETURN(0);
316 }
317 #endif
318 
319 
320 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
321 
322 /*
323   Check if there are more UNIQUE keys after the given key.
324 */
325 static int
last_uniq_key(TABLE * table,uint keyno)326 last_uniq_key(TABLE *table, uint keyno)
327 {
328   while (++keyno < table->s->keys)
329     if (table->key_info[keyno].flags & HA_NOSAME)
330       return 0;
331   return 1;
332 }
333 
334 
335 /*
336   Compares table->record[0] and table->record[1]
337 
338   Returns TRUE if different.
339 */
record_compare(TABLE * table)340 static bool record_compare(TABLE *table)
341 {
342   /*
343     Need to set the X bit and the filler bits in both records since
344     there are engines that do not set it correctly.
345 
346     In addition, since MyISAM checks that one hasn't tampered with the
347     record, it is necessary to restore the old bytes into the record
348     after doing the comparison.
349 
350     TODO[record format ndb]: Remove it once NDB returns correct
351     records. Check that the other engines also return correct records.
352    */
353 
354   bool result= FALSE;
355   uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
356 
357   if (table->s->null_bytes > 0)
358   {
359     for (int i = 0 ; i < 2 ; ++i)
360     {
361       /*
362         If we have an X bit then we need to take care of it.
363       */
364       if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
365       {
366         saved_x[i]= table->record[i][0];
367         table->record[i][0]|= 1U;
368       }
369 
370       /*
371          If (last_null_bit_pos == 0 && null_bytes > 1), then:
372 
373          X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
374 
375          Ie, the entire byte is used.
376       */
377       if (table->s->last_null_bit_pos > 0)
378       {
379         saved_filler[i]= table->record[i][table->s->null_bytes - 1];
380         table->record[i][table->s->null_bytes - 1]|=
381           256U - (1U << table->s->last_null_bit_pos);
382       }
383     }
384   }
385 
386   if (table->s->blob_fields + table->s->varchar_fields == 0)
387   {
388     result= cmp_record(table,record[1]);
389     goto record_compare_exit;
390   }
391 
392   /* Compare null bits */
393   if (memcmp(table->null_flags,
394        table->null_flags+table->s->rec_buff_length,
395        table->s->null_bytes))
396   {
397     result= TRUE;       // Diff in NULL value
398     goto record_compare_exit;
399   }
400 
401   /* Compare updated fields */
402   for (Field **ptr=table->field ; *ptr ; ptr++)
403   {
404     if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
405     {
406       result= TRUE;
407       goto record_compare_exit;
408     }
409   }
410 
411 record_compare_exit:
412   /*
413     Restore the saved bytes.
414 
415     TODO[record format ndb]: Remove this code once NDB returns the
416     correct record format.
417   */
418   if (table->s->null_bytes > 0)
419   {
420     for (int i = 0 ; i < 2 ; ++i)
421     {
422       if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
423         table->record[i][0]= saved_x[i];
424 
425       if (table->s->last_null_bit_pos > 0)
426         table->record[i][table->s->null_bytes - 1]= saved_filler[i];
427     }
428   }
429 
430   return result;
431 }
432 
433 
434 /*
435   Copy "extra" columns from record[1] to record[0].
436 
437   Copy the extra fields that are not present on the master but are
438   present on the slave from record[1] to record[0].  This is used
439   after fetching a record that are to be updated, either inside
440   replace_record() or as part of executing an update_row().
441  */
442 static int
copy_extra_record_fields(TABLE * table,size_t master_reclength,my_ptrdiff_t master_fields)443 copy_extra_record_fields(TABLE *table,
444                          size_t master_reclength,
445                          my_ptrdiff_t master_fields)
446 {
447   DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
448   DBUG_PRINT("info", ("Copying to 0x%lx "
449                       "from field %lu at offset %lu "
450                       "to field %d at offset %lu",
451                       (long) table->record[0],
452                       (ulong) master_fields, (ulong) master_reclength,
453                       table->s->fields, table->s->reclength));
454   /*
455     Copying the extra fields of the slave that does not exist on
456     master into record[0] (which are basically the default values).
457   */
458 
459   if (table->s->fields < (uint) master_fields)
460     DBUG_RETURN(0);
461 
462  DBUG_ASSERT(master_reclength <= table->s->reclength);
463   if (master_reclength < table->s->reclength)
464     memcpy(table->record[0] + master_reclength,
465                 table->record[1] + master_reclength,
466                 table->s->reclength - master_reclength);
467 
468   /*
469     Bit columns are special.  We iterate over all the remaining
470     columns and copy the "extra" bits to the new record.  This is
471     not a very good solution: it should be refactored on
472     opportunity.
473 
474     REFACTORING SUGGESTION (Matz).  Introduce a member function
475     similar to move_field_offset() called copy_field_offset() to
476     copy field values and implement it for all Field subclasses. Use
477     this function to copy data from the found record to the record
478     that are going to be inserted.
479 
480     The copy_field_offset() function need to be a virtual function,
481     which in this case will prevent copying an entire range of
482     fields efficiently.
483   */
484   {
485     Field **field_ptr= table->field + master_fields;
486     for ( ; *field_ptr ; ++field_ptr)
487     {
488       /*
489         Set the null bit according to the values in record[1]
490        */
491       if ((*field_ptr)->maybe_null() &&
492           (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
493         (*field_ptr)->set_null();
494       else
495         (*field_ptr)->set_notnull();
496 
497       /*
498         Do the extra work for special columns.
499        */
500       switch ((*field_ptr)->real_type())
501       {
502       default:
503         /* Nothing to do */
504         break;
505 
506       case MYSQL_TYPE_BIT:
507         Field_bit *f= static_cast<Field_bit*>(*field_ptr);
508         if (f->bit_len > 0)
509         {
510           my_ptrdiff_t const offset= table->record[1] - table->record[0];
511           uchar const bits=
512             get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
513           set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
514         }
515         break;
516       }
517     }
518   }
519   DBUG_RETURN(0);                                     // All OK
520 }
521 
522 
523 /*
524   Replace the provided record in the database.
525 
526   SYNOPSIS
527       replace_record()
528       thd    Thread context for writing the record.
529       table  Table to which record should be written.
530       master_reclength
531              Offset to first column that is not present on the master,
532              alternatively the length of the record on the master
533              side.
534 
535   RETURN VALUE
536       Error code on failure, 0 on success.
537 
538   DESCRIPTION
539       Similar to how it is done in mysql_insert(), we first try to do
540       a ha_write_row() and of that fails due to duplicated keys (or
541       indices), we do an ha_update_row() or a ha_delete_row() instead.
542  */
543 static int
replace_record(THD * thd,TABLE * table,ulong const master_reclength,uint const master_fields)544 replace_record(THD *thd, TABLE *table,
545                ulong const master_reclength,
546                uint const master_fields)
547 {
548   DBUG_ENTER("replace_record");
549   DBUG_ASSERT(table != NULL && thd != NULL);
550 
551   int error;
552   int keynum;
553   auto_afree_ptr<char> key(NULL);
554 
555 #ifndef DBUG_OFF
556   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
557   DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
558   DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
559 #endif
560 
561   while ((error= table->file->ha_write_row(table->record[0])))
562   {
563     if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
564     {
565       table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
566       DBUG_RETURN(error);
567     }
568     if ((keynum= table->file->get_dup_key(error)) < 0)
569     {
570       table->file->print_error(error, MYF(0));
571       /*
572         We failed to retrieve the duplicate key
573         - either because the error was not "duplicate key" error
574         - or because the information which key is not available
575       */
576       DBUG_RETURN(error);
577     }
578 
579     /*
580        We need to retrieve the old row into record[1] to be able to
581        either update or delete the offending record.  We either:
582 
583        - use ha_rnd_pos() with a row-id (available as dupp_row) to the
584          offending row, if that is possible (MyISAM and Blackhole), or else
585 
586        - use ha_index_read_idx_map() with the key that is duplicated, to
587          retrieve the offending row.
588      */
589     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
590     {
591       error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
592       if (error)
593       {
594         DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
595         if (error == HA_ERR_RECORD_DELETED)
596           error= HA_ERR_KEY_NOT_FOUND;
597         table->file->print_error(error, MYF(0));
598         DBUG_RETURN(error);
599       }
600     }
601     else
602     {
603       if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
604       {
605         DBUG_RETURN(my_errno);
606       }
607 
608       if (key.get() == NULL)
609       {
610         key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
611         if (key.get() == NULL)
612           DBUG_RETURN(ENOMEM);
613       }
614 
615       key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
616                0);
617       error= table->file->ha_index_read_idx_map(table->record[1], keynum,
618                                                 (const uchar*)key.get(),
619                                                 HA_WHOLE_KEY,
620                                                 HA_READ_KEY_EXACT);
621       if (error)
622       {
623         DBUG_PRINT("info", ("ha_index_read_idx_map() returns error %d", error));
624         if (error == HA_ERR_RECORD_DELETED)
625           error= HA_ERR_KEY_NOT_FOUND;
626         table->file->print_error(error, MYF(0));
627         DBUG_RETURN(error);
628       }
629     }
630 
631     /*
632        Now, table->record[1] should contain the offending row.  That
633        will enable us to update it or, alternatively, delete it (so
634        that we can insert the new row afterwards).
635 
636        First we copy the columns into table->record[0] that are not
637        present on the master from table->record[1], if there are any.
638     */
639     copy_extra_record_fields(table, master_reclength, master_fields);
640 
641     /*
642        REPLACE is defined as either INSERT or DELETE + INSERT.  If
643        possible, we can replace it with an UPDATE, but that will not
644        work on InnoDB if FOREIGN KEY checks are necessary.
645 
646        I (Matz) am not sure of the reason for the last_uniq_key()
647        check as, but I'm guessing that it's something along the
648        following lines.
649 
650        Suppose that we got the duplicate key to be a key that is not
651        the last unique key for the table and we perform an update:
652        then there might be another key for which the unique check will
653        fail, so we're better off just deleting the row and inserting
654        the correct row.
655      */
656     if (last_uniq_key(table, keynum) &&
657         !table->file->referenced_by_foreign_key())
658     {
659       error=table->file->ha_update_row(table->record[1],
660                                        table->record[0]);
661       if (error && error != HA_ERR_RECORD_IS_THE_SAME)
662         table->file->print_error(error, MYF(0));
663       else
664         error= 0;
665       DBUG_RETURN(error);
666     }
667     else
668     {
669       if ((error= table->file->ha_delete_row(table->record[1])))
670       {
671         table->file->print_error(error, MYF(0));
672         DBUG_RETURN(error);
673       }
674       /* Will retry ha_write_row() with the offending row removed. */
675     }
676   }
677 
678   DBUG_RETURN(error);
679 }
680 
681 
682 /**
683   Find the row given by 'key', if the table has keys, or else use a table scan
684   to find (and fetch) the row.
685 
686   If the engine allows random access of the records, a combination of
687   position() and rnd_pos() will be used.
688 
689   @param table Pointer to table to search
690   @param key   Pointer to key to use for search, if table has key
691 
692   @pre <code>table->record[0]</code> shall contain the row to locate
693   and <code>key</code> shall contain a key to use for searching, if
694   the engine has a key.
695 
696   @post If the return value is zero, <code>table->record[1]</code>
697   will contain the fetched row and the internal "cursor" will refer to
698   the row. If the return value is non-zero,
699   <code>table->record[1]</code> is undefined.  In either case,
700   <code>table->record[0]</code> is undefined.
701 
702   @return Zero if the row was successfully fetched into
703   <code>table->record[1]</code>, error code otherwise.
704  */
705 
find_and_fetch_row(TABLE * table,uchar * key)706 static int find_and_fetch_row(TABLE *table, uchar *key)
707 {
708   DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
709   DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx  record: 0x%lx",
710            (long) table, (long) key, (long) table->record[1]));
711 
712   DBUG_ASSERT(table->in_use != NULL);
713 
714   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
715 
716   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
717       table->s->primary_key < MAX_KEY)
718   {
719     /*
720       Use a more efficient method to fetch the record given by
721       table->record[0] if the engine allows it.  We first compute a
722       row reference using the position() member function (it will be
723       stored in table->file->ref) and the use rnd_pos() to position
724       the "cursor" (i.e., record[0] in this case) at the correct row.
725 
726       TODO: Add a check that the correct record has been fetched by
727       comparing with the original record. Take into account that the
728       record on the master and slave can be of different
729       length. Something along these lines should work:
730 
731       ADD>>>  store_record(table,record[1]);
732               int error= table->file->rnd_pos(table->record[0], table->file->ref);
733       ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
734                                  table->s->reclength) == 0);
735 
736     */
737     table->file->position(table->record[0]);
738     int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
739     /*
740       ha_rnd_pos() returns the record in table->record[0], so we have to
741       move it to table->record[1].
742      */
743     memcpy(table->record[1], table->record[0], table->s->reclength);
744     DBUG_RETURN(error);
745   }
746 
747   /* We need to retrieve all fields */
748   /* TODO: Move this out from this function to main loop */
749   table->use_all_columns();
750 
751   if (table->s->keys > 0)
752   {
753     int error;
754     /* We have a key: search the table using the index */
755     if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
756     {
757       table->file->print_error(error, MYF(0));
758       DBUG_RETURN(error);
759     }
760 
761   /*
762     Don't print debug messages when running valgrind since they can
763     trigger false warnings.
764    */
765 #ifndef HAVE_purify
766     DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
767     DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
768 #endif
769 
770     /*
771       We need to set the null bytes to ensure that the filler bit are
772       all set when returning.  There are storage engines that just set
773       the necessary bits on the bytes and don't set the filler bits
774       correctly.
775     */
776     my_ptrdiff_t const pos=
777       table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
778     table->record[1][pos]= 0xFF;
779     if ((error= table->file->ha_index_read_map(table->record[1], key, HA_WHOLE_KEY,
780                                                HA_READ_KEY_EXACT)))
781     {
782       table->file->print_error(error, MYF(0));
783       table->file->ha_index_end();
784       DBUG_RETURN(error);
785     }
786 
787   /*
788     Don't print debug messages when running valgrind since they can
789     trigger false warnings.
790    */
791 #ifndef HAVE_purify
792     DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
793     DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
794 #endif
795     /*
796       Below is a minor "optimization".  If the key (i.e., key number
797       0) has the HA_NOSAME flag set, we know that we have found the
798       correct record (since there can be no duplicates); otherwise, we
799       have to compare the record with the one found to see if it is
800       the correct one.
801 
802       CAVEAT! This behaviour is essential for the replication of,
803       e.g., the mysql.proc table since the correct record *shall* be
804       found using the primary key *only*.  There shall be no
805       comparison of non-PK columns to decide if the correct record is
806       found.  I can see no scenario where it would be incorrect to
807       chose the row to change only using a PK or an UNNI.
808     */
809     if (table->key_info->flags & HA_NOSAME)
810     {
811       table->file->ha_index_end();
812       DBUG_RETURN(0);
813     }
814 
815     while (record_compare(table))
816     {
817       int error;
818 
819       /*
820         We need to set the null bytes to ensure that the filler bit
821         are all set when returning.  There are storage engines that
822         just set the necessary bits on the bytes and don't set the
823         filler bits correctly.
824 
825         TODO[record format ndb]: Remove this code once NDB returns the
826         correct record format.
827       */
828       if (table->s->null_bytes > 0)
829       {
830         table->record[1][table->s->null_bytes - 1]|=
831           256U - (1U << table->s->last_null_bit_pos);
832       }
833 
834       while ((error= table->file->ha_index_next(table->record[1])))
835       {
836         /* We just skip records that has already been deleted */
837         if (error == HA_ERR_RECORD_DELETED)
838           continue;
839         table->file->print_error(error, MYF(0));
840         table->file->ha_index_end();
841         DBUG_RETURN(error);
842       }
843     }
844 
845     /*
846       Have to restart the scan to be able to fetch the next row.
847     */
848     table->file->ha_index_end();
849   }
850   else
851   {
852     int restart_count= 0; // Number of times scanning has restarted from top
853     int error;
854 
855     /* We don't have a key: search the table using ha_rnd_next() */
856     if ((error= table->file->ha_rnd_init(1)))
857     {
858       table->file->print_error(error, MYF(0));
859       DBUG_RETURN(error);
860     }
861 
862     /* Continue until we find the right record or have made a full loop */
863     do
864     {
865   restart_ha_rnd_next:
866       error= table->file->ha_rnd_next(table->record[1]);
867 
868       DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
869       DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
870 
871       switch (error) {
872       case 0:
873         break;
874 
875       /*
876         If the record was deleted, we pick the next one without doing
877         any comparisons.
878       */
879       case HA_ERR_RECORD_DELETED:
880         goto restart_ha_rnd_next;
881 
882       case HA_ERR_END_OF_FILE:
883         if (++restart_count < 2)
884         {
885           if ((error= table->file->ha_rnd_init(1)))
886           {
887             table->file->print_error(error, MYF(0));
888             DBUG_RETURN(error);
889           }
890         }
891        break;
892 
893       default:
894         table->file->print_error(error, MYF(0));
895         DBUG_PRINT("info", ("Record not found"));
896         (void) table->file->ha_rnd_end();
897         DBUG_RETURN(error);
898       }
899     }
900     while (restart_count < 2 && record_compare(table));
901 
902     /*
903       Have to restart the scan to be able to fetch the next row.
904     */
905     DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
906     table->file->ha_rnd_end();
907 
908     DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
909     DBUG_RETURN(error);
910   }
911 
912   DBUG_RETURN(0);
913 }
914 
915 
916 /**********************************************************
917   Row handling primitives for Write_rows_log_event_old
918  **********************************************************/
919 
do_before_row_operations(TABLE * table)920 int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
921 {
922   int error= 0;
923 
924   /*
925     We are using REPLACE semantics and not INSERT IGNORE semantics
926     when writing rows, that is: new rows replace old rows.  We need to
927     inform the storage engine that it should use this behaviour.
928   */
929 
930   /* Tell the storage engine that we are using REPLACE semantics. */
931   thd->lex->duplicates= DUP_REPLACE;
932 
933   /*
934     Pretend we're executing a REPLACE command: this is needed for
935     InnoDB and NDB Cluster since they are not (properly) checking the
936     lex->duplicates flag.
937   */
938   thd->lex->sql_command= SQLCOM_REPLACE;
939   /*
940      Do not raise the error flag in case of hitting to an unique attribute
941   */
942   table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
943   /*
944      NDB specific: update from ndb master wrapped as Write_rows
945   */
946   /*
947     so that the event should be applied to replace slave's row
948   */
949   table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
950   /*
951      NDB specific: if update from ndb master wrapped as Write_rows
952      does not find the row it's assumed idempotent binlog applying
953      is taking place; don't raise the error.
954   */
955   table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
956   /*
957     TODO: the cluster team (Tomas?) says that it's better if the engine knows
958     how many rows are going to be inserted, then it can allocate needed memory
959     from the start.
960   */
961   table->file->ha_start_bulk_insert(0);
962   return error;
963 }
964 
965 
do_after_row_operations(TABLE * table,int error)966 int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
967 {
968   int local_error= 0;
969   table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
970   table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
971   /*
972     reseting the extra with
973     table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
974     fires bug#27077
975     todo: explain or fix
976   */
977   if ((local_error= table->file->ha_end_bulk_insert()))
978   {
979     table->file->print_error(local_error, MYF(0));
980   }
981   return error? error : local_error;
982 }
983 
984 
985 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)986 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
987                                          Relay_log_info const *rli,
988                                          TABLE *table,
989                                          uchar const *row_start,
990                                          uchar const **row_end)
991 {
992   DBUG_ASSERT(table != NULL);
993   DBUG_ASSERT(row_start && row_end);
994 
995   int error;
996   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
997                         table, m_width, table->record[0],
998                         row_start, &m_cols, row_end, &m_master_reclength,
999                         table->write_set, PRE_GA_WRITE_ROWS_EVENT);
1000   bitmap_copy(table->read_set, table->write_set);
1001   return error;
1002 }
1003 
1004 
do_exec_row(TABLE * table)1005 int Write_rows_log_event_old::do_exec_row(TABLE *table)
1006 {
1007   DBUG_ASSERT(table != NULL);
1008   int error= replace_record(thd, table, m_master_reclength, m_width);
1009   return error;
1010 }
1011 
1012 
1013 /**********************************************************
1014   Row handling primitives for Delete_rows_log_event_old
1015  **********************************************************/
1016 
do_before_row_operations(TABLE * table)1017 int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
1018 {
1019   DBUG_ASSERT(m_memory == NULL);
1020 
1021   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
1022       table->s->primary_key < MAX_KEY)
1023   {
1024     /*
1025       We don't need to allocate any memory for m_after_image and
1026       m_key since they are not used.
1027     */
1028     return 0;
1029   }
1030 
1031   int error= 0;
1032 
1033   if (table->s->keys > 0)
1034   {
1035     m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1036                                        &m_after_image,
1037                                        (uint) table->s->reclength,
1038                                        &m_key,
1039                                        (uint) table->key_info->key_length,
1040                                        NullS);
1041   }
1042   else
1043   {
1044     m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1045     m_memory= (uchar*)m_after_image;
1046     m_key= NULL;
1047   }
1048   if (!m_memory)
1049     return HA_ERR_OUT_OF_MEM;
1050 
1051   return error;
1052 }
1053 
1054 
do_after_row_operations(TABLE * table,int error)1055 int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1056 {
1057   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1058   table->file->ha_index_or_rnd_end();
1059   my_free(m_memory); // Free for multi_malloc
1060   m_memory= NULL;
1061   m_after_image= NULL;
1062   m_key= NULL;
1063 
1064   return error;
1065 }
1066 
1067 
1068 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1069 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
1070                                           Relay_log_info const *rli,
1071                                           TABLE *table,
1072                                           uchar const *row_start,
1073                                           uchar const **row_end)
1074 {
1075   int error;
1076   DBUG_ASSERT(row_start && row_end);
1077   /*
1078     This assertion actually checks that there is at least as many
1079     columns on the slave as on the master.
1080   */
1081   DBUG_ASSERT(table->s->fields >= m_width);
1082 
1083   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1084                         table, m_width, table->record[0],
1085                         row_start, &m_cols, row_end, &m_master_reclength,
1086                         table->read_set, PRE_GA_DELETE_ROWS_EVENT);
1087   /*
1088     If we will access rows using the random access method, m_key will
1089     be set to NULL, so we do not need to make a key copy in that case.
1090    */
1091   if (m_key)
1092   {
1093     KEY *const key_info= table->key_info;
1094 
1095     key_copy(m_key, table->record[0], key_info, 0);
1096   }
1097 
1098   return error;
1099 }
1100 
1101 
do_exec_row(TABLE * table)1102 int Delete_rows_log_event_old::do_exec_row(TABLE *table)
1103 {
1104   int error;
1105   DBUG_ASSERT(table != NULL);
1106 
1107   if (!(error= ::find_and_fetch_row(table, m_key)))
1108   {
1109     /*
1110       Now we should have the right row to delete.  We are using
1111       record[0] since it is guaranteed to point to a record with the
1112       correct value.
1113     */
1114     error= table->file->ha_delete_row(table->record[0]);
1115   }
1116   return error;
1117 }
1118 
1119 
1120 /**********************************************************
1121   Row handling primitives for Update_rows_log_event_old
1122  **********************************************************/
1123 
do_before_row_operations(TABLE * table)1124 int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
1125 {
1126   DBUG_ASSERT(m_memory == NULL);
1127 
1128   int error= 0;
1129 
1130   if (table->s->keys > 0)
1131   {
1132     m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1133                                        &m_after_image,
1134                                        (uint) table->s->reclength,
1135                                        &m_key,
1136                                        (uint) table->key_info->key_length,
1137                                        NullS);
1138   }
1139   else
1140   {
1141     m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1142     m_memory= m_after_image;
1143     m_key= NULL;
1144   }
1145   if (!m_memory)
1146     return HA_ERR_OUT_OF_MEM;
1147 
1148   return error;
1149 }
1150 
1151 
do_after_row_operations(TABLE * table,int error)1152 int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1153 {
1154   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1155   table->file->ha_index_or_rnd_end();
1156   my_free(m_memory);
1157   m_memory= NULL;
1158   m_after_image= NULL;
1159   m_key= NULL;
1160 
1161   return error;
1162 }
1163 
1164 
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1165 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1166                                               Relay_log_info const *rli,
1167                                               TABLE *table,
1168                                               uchar const *row_start,
1169                                               uchar const **row_end)
1170 {
1171   int error;
1172   DBUG_ASSERT(row_start && row_end);
1173   /*
1174     This assertion actually checks that there is at least as many
1175     columns on the slave as on the master.
1176   */
1177   DBUG_ASSERT(table->s->fields >= m_width);
1178 
1179   /* record[0] is the before image for the update */
1180   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1181                         table, m_width, table->record[0],
1182                         row_start, &m_cols, row_end, &m_master_reclength,
1183                         table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
1184   row_start = *row_end;
1185   /* m_after_image is the after image for the update */
1186   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1187                         table, m_width, m_after_image,
1188                         row_start, &m_cols, row_end, &m_master_reclength,
1189                         table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
1190 
1191   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1192   DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1193 
1194   /*
1195     If we will access rows using the random access method, m_key will
1196     be set to NULL, so we do not need to make a key copy in that case.
1197    */
1198   if (m_key)
1199   {
1200     KEY *const key_info= table->key_info;
1201 
1202     key_copy(m_key, table->record[0], key_info, 0);
1203   }
1204 
1205   return error;
1206 }
1207 
1208 
do_exec_row(TABLE * table)1209 int Update_rows_log_event_old::do_exec_row(TABLE *table)
1210 {
1211   DBUG_ASSERT(table != NULL);
1212 
1213   int error= ::find_and_fetch_row(table, m_key);
1214   if (error)
1215     return error;
1216 
1217   /*
1218     We have to ensure that the new record (i.e., the after image) is
1219     in record[0] and the old record (i.e., the before image) is in
1220     record[1].  This since some storage engines require this (for
1221     example, the partition engine).
1222 
1223     Since find_and_fetch_row() puts the fetched record (i.e., the old
1224     record) in record[1], we can keep it there. We put the new record
1225     (i.e., the after image) into record[0], and copy the fields that
1226     are on the slave (i.e., in record[1]) into record[0], effectively
1227     overwriting the default values that where put there by the
1228     unpack_row() function.
1229   */
1230   memcpy(table->record[0], m_after_image, table->s->reclength);
1231   copy_extra_record_fields(table, m_master_reclength, m_width);
1232 
1233   /*
1234     Now we have the right row to update.  The old row (the one we're
1235     looking for) is in record[1] and the new row has is in record[0].
1236     We also have copied the original values already in the slave's
1237     database into the after image delivered from the master.
1238   */
1239   error= table->file->ha_update_row(table->record[1], table->record[0]);
1240   if (error == HA_ERR_RECORD_IS_THE_SAME)
1241     error= 0;
1242 
1243   return error;
1244 }
1245 
1246 #endif
1247 
1248 
1249 /**************************************************************************
1250 	Rows_log_event member functions
1251 **************************************************************************/
1252 
1253 #ifndef MYSQL_CLIENT
Old_rows_log_event(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool using_trans)1254 Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1255                                        MY_BITMAP const *cols,
1256                                        bool using_trans)
1257   : Log_event(thd_arg, 0,
1258               using_trans ? Log_event::EVENT_TRANSACTIONAL_CACHE :
1259                             Log_event::EVENT_STMT_CACHE,
1260               Log_event::EVENT_NORMAL_LOGGING),
1261     m_row_count(0),
1262     m_table(tbl_arg),
1263     m_table_id(tid),
1264     m_width(tbl_arg ? tbl_arg->s->fields : 1),
1265     m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1266 #ifdef HAVE_REPLICATION
1267     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1268 #endif
1269 {
1270 
1271   // This constructor should not be reached.
1272   assert(0);
1273 
1274   /*
1275     We allow a special form of dummy event when the table, and cols
1276     are null and the table id is ~0UL.  This is a temporary
1277     solution, to be able to terminate a started statement in the
1278     binary log: the extraneous events will be removed in the future.
1279    */
1280   DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1281               (!tbl_arg && !cols && tid == ~0UL));
1282 
1283   if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1284       set_flags(NO_FOREIGN_KEY_CHECKS_F);
1285   if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1286       set_flags(RELAXED_UNIQUE_CHECKS_F);
1287   /* if bitmap_init fails, caught in is_valid() */
1288   if (likely(!bitmap_init(&m_cols,
1289                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1290                           m_width,
1291                           false)))
1292   {
1293     /* Cols can be zero if this is a dummy binrows event */
1294     if (likely(cols != NULL))
1295     {
1296       memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1297       create_last_word_mask(&m_cols);
1298     }
1299   }
1300   else
1301   {
1302     // Needed because bitmap_init() does not set it to null on failure
1303     m_cols.bitmap= 0;
1304   }
1305 }
1306 #endif
1307 
1308 
Old_rows_log_event(const char * buf,uint event_len,Log_event_type event_type,const Format_description_log_event * description_event)1309 Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len,
1310                                        Log_event_type event_type,
1311                                        const Format_description_log_event
1312                                        *description_event)
1313   : Log_event(buf, description_event),
1314     m_row_count(0),
1315 #ifndef MYSQL_CLIENT
1316     m_table(NULL),
1317 #endif
1318     m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1319 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1320     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1321 #endif
1322 {
1323   DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1324   uint8 const common_header_len= description_event->common_header_len;
1325   uint8 const post_header_len= description_event->post_header_len[event_type-1];
1326 
1327   DBUG_PRINT("enter",("event_len: %u  common_header_len: %d  "
1328 		      "post_header_len: %d",
1329 		      event_len, common_header_len,
1330 		      post_header_len));
1331 
1332   const char *post_start= buf + common_header_len;
1333   DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1334   post_start+= RW_MAPID_OFFSET;
1335   if (post_header_len == 6)
1336   {
1337     /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1338     m_table_id= uint4korr(post_start);
1339     post_start+= 4;
1340   }
1341   else
1342   {
1343     m_table_id= (ulong) uint6korr(post_start);
1344     post_start+= RW_FLAGS_OFFSET;
1345   }
1346 
1347   m_flags= uint2korr(post_start);
1348 
1349   uchar const *const var_start=
1350     (const uchar *)buf + common_header_len + post_header_len;
1351   uchar const *const ptr_width= var_start;
1352   uchar *ptr_after_width= (uchar*) ptr_width;
1353   DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1354   m_width = net_field_length(&ptr_after_width);
1355   DBUG_PRINT("debug", ("m_width=%lu", m_width));
1356   /* Avoid reading out of buffer */
1357   if (static_cast<unsigned int>(m_width +
1358                                 (ptr_after_width -
1359                                 (const uchar *)buf)) > event_len)
1360   {
1361     m_cols.bitmap= NULL;
1362     DBUG_VOID_RETURN;
1363   }
1364 
1365   /* if bitmap_init fails, catched in is_valid() */
1366   if (likely(!bitmap_init(&m_cols,
1367                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1368                           m_width,
1369                           false)))
1370   {
1371     DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1372     memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1373     create_last_word_mask(&m_cols);
1374     ptr_after_width+= (m_width + 7) / 8;
1375     DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1376   }
1377   else
1378   {
1379     // Needed because bitmap_init() does not set it to null on failure
1380     m_cols.bitmap= NULL;
1381     DBUG_VOID_RETURN;
1382   }
1383 
1384   const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1385   size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1386   DBUG_PRINT("info",("m_table_id: %llu  m_flags: %d  m_width: %lu  data_size: %lu",
1387                      m_table_id.id(), m_flags, m_width, (ulong) data_size));
1388   DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1389 
1390   m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
1391   if (likely((bool)m_rows_buf))
1392   {
1393 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1394     m_curr_row= m_rows_buf;
1395 #endif
1396     m_rows_end= m_rows_buf + data_size;
1397     m_rows_cur= m_rows_end;
1398     memcpy(m_rows_buf, ptr_rows_data, data_size);
1399   }
1400   else
1401     m_cols.bitmap= 0; // to not free it
1402 
1403   DBUG_VOID_RETURN;
1404 }
1405 
1406 
~Old_rows_log_event()1407 Old_rows_log_event::~Old_rows_log_event()
1408 {
1409   if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1410     m_cols.bitmap= 0; // so no my_free in bitmap_free
1411   bitmap_free(&m_cols); // To pair with bitmap_init().
1412   my_free(m_rows_buf);
1413 }
1414 
1415 
get_data_size()1416 int Old_rows_log_event::get_data_size()
1417 {
1418   uchar buf[sizeof(m_width)+1];
1419   uchar *end= net_store_length(buf, (m_width + 7) / 8);
1420 
1421   DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1422                   return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
1423                   (m_rows_cur - m_rows_buf););
1424   int data_size= ROWS_HEADER_LEN;
1425   data_size+= no_bytes_in_map(&m_cols);
1426   data_size+= (uint) (end - buf);
1427 
1428   data_size+= (uint) (m_rows_cur - m_rows_buf);
1429   return data_size;
1430 }
1431 
1432 
1433 #ifndef MYSQL_CLIENT
do_add_row_data(uchar * row_data,size_t length)1434 int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1435 {
1436   /*
1437     When the table has a primary key, we would probably want, by default, to
1438     log only the primary key value instead of the entire "before image". This
1439     would save binlog space. TODO
1440   */
1441   DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1442   DBUG_PRINT("enter", ("row_data: 0x%lx  length: %lu", (ulong) row_data,
1443                        (ulong) length));
1444   /*
1445     Don't print debug messages when running valgrind since they can
1446     trigger false warnings.
1447    */
1448 #ifndef HAVE_purify
1449   DBUG_DUMP("row_data", row_data, min<size_t>(length, 32));
1450 #endif
1451 
1452   DBUG_ASSERT(m_rows_buf <= m_rows_cur);
1453   DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1454   DBUG_ASSERT(m_rows_cur <= m_rows_end);
1455 
1456   /* The cast will always work since m_rows_cur <= m_rows_end */
1457   if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1458   {
1459     size_t const block_size= 1024;
1460     my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1461     my_ptrdiff_t const new_alloc=
1462         block_size * ((cur_size + length + block_size - 1) / block_size);
1463 
1464     uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
1465                                            MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1466     if (unlikely(!new_buf))
1467       DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1468 
1469     /* If the memory moved, we need to move the pointers */
1470     if (new_buf != m_rows_buf)
1471     {
1472       m_rows_buf= new_buf;
1473       m_rows_cur= m_rows_buf + cur_size;
1474     }
1475 
1476     /*
1477        The end pointer should always be changed to point to the end of
1478        the allocated memory.
1479     */
1480     m_rows_end= m_rows_buf + new_alloc;
1481   }
1482 
1483   DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
1484   memcpy(m_rows_cur, row_data, length);
1485   m_rows_cur+= length;
1486   m_row_count++;
1487   DBUG_RETURN(0);
1488 }
1489 #endif
1490 
1491 
1492 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
do_apply_event(Relay_log_info const * rli)1493 int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
1494 {
1495   DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1496   int error= 0;
1497 
1498   /*
1499     If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
1500     does not contain any data.  In that case, we just remove all tables in the
1501     tables_to_lock list, close the thread tables, and return with
1502     success.
1503    */
1504   if ((m_table_id.id() == ~0U || m_table_id.id() == (~0ULL >> 16)) &&
1505       m_cols.n_bits == 1 && m_cols.bitmap[0] == 0)
1506   {
1507     /*
1508        This one is supposed to be set: just an extra check so that
1509        nothing strange has happened.
1510      */
1511     DBUG_ASSERT(get_flags(STMT_END_F));
1512 
1513     const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1514     thd->clear_error();
1515     DBUG_RETURN(0);
1516   }
1517 
1518   /*
1519     'thd' has been set by exec_relay_log_event(), just before calling
1520     do_apply_event(). We still check here to prevent future coding
1521     errors.
1522   */
1523   DBUG_ASSERT(rli->info_thd == thd);
1524 
1525   /*
1526     If there is no locks taken, this is the first binrow event seen
1527     after the table map events.  We should then lock all the tables
1528     used in the transaction and proceed with execution of the actual
1529     event.
1530   */
1531   if (!thd->lock)
1532   {
1533     /*
1534       lock_tables() reads the contents of thd->lex, so they must be
1535       initialized. Contrary to in
1536       Table_map_log_event::do_apply_event() we don't call
1537       mysql_init_query() as that may reset the binlog format.
1538     */
1539     lex_start(thd);
1540 
1541     if ((error= lock_tables(thd, rli->tables_to_lock,
1542                                rli->tables_to_lock_count, 0)))
1543     {
1544       if (thd->is_slave_error || thd->is_fatal_error)
1545       {
1546         /*
1547           Error reporting borrowed from Query_log_event with many excessive
1548           simplifications (we don't honour --slave-skip-errors)
1549         */
1550         uint actual_error= thd->net.last_errno;
1551         rli->report(ERROR_LEVEL, actual_error,
1552                     "Error '%s' in %s event: when locking tables",
1553                     (actual_error ? thd->net.last_error :
1554                      "unexpected success or fatal error"),
1555                     get_type_str());
1556         thd->is_fatal_error= 1;
1557       }
1558       else
1559       {
1560         rli->report(ERROR_LEVEL, error,
1561                     "Error in %s event: when locking tables",
1562                     get_type_str());
1563       }
1564       const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1565       DBUG_RETURN(error);
1566     }
1567 
1568     /*
1569       When the open and locking succeeded, we check all tables to
1570       ensure that they still have the correct type.
1571     */
1572 
1573     {
1574       TABLE_LIST *table_list_ptr= rli->tables_to_lock;
1575       for (uint i=0; table_list_ptr&& (i< rli->tables_to_lock_count);
1576            table_list_ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr->next_global), i++)
1577       {
1578         /*
1579           Please see comment in log_event.cc-Rows_log_event::do_apply_event()
1580           function for the explanation of the below if condition
1581         */
1582         if (table_list_ptr->parent_l)
1583           continue;
1584         /*
1585           We can use a down cast here since we know that every table added
1586           to the tables_to_lock is a RPL_TABLE_LIST (or child table which is
1587           skipped above).
1588         */
1589         RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
1590         TABLE *conv_table;
1591         if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
1592                                             ptr->table, &conv_table))
1593         {
1594           thd->is_slave_error= 1;
1595           const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1596           DBUG_RETURN(ERR_BAD_TABLE_DEF);
1597         }
1598         ptr->m_conv_table= conv_table;
1599       }
1600     }
1601 
1602     /*
1603       ... and then we add all the tables to the table map but keep
1604       them in the tables to lock list.
1605 
1606 
1607       We also invalidate the query cache for all the tables, since
1608       they will now be changed.
1609 
1610       TODO [/Matz]: Maybe the query cache should not be invalidated
1611       here? It might be that a table is not changed, even though it
1612       was locked for the statement.  We do know that each
1613       Old_rows_log_event contain at least one row, so after processing one
1614       Old_rows_log_event, we can invalidate the query cache for the
1615       associated table.
1616      */
1617     for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
1618     {
1619       const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
1620     }
1621 #ifdef HAVE_QUERY_CACHE
1622     query_cache.invalidate_locked_for_write(rli->tables_to_lock);
1623 #endif
1624   }
1625 
1626   TABLE*
1627     table=
1628     m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
1629 
1630   if (table)
1631   {
1632     /*
1633       table == NULL means that this table should not be replicated
1634       (this was set up by Table_map_log_event::do_apply_event()
1635       which tested replicate-* rules).
1636     */
1637 
1638     /*
1639       It's not needed to set_time() but
1640       1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1641       much slave is behind
1642       2) it will be needed when we allow replication from a table with no
1643       TIMESTAMP column to a table with one.
1644       So we call set_time(), like in SBR. Presently it changes nothing.
1645     */
1646     thd->set_time(&when);
1647     /*
1648       There are a few flags that are replicated with each row event.
1649       Make sure to set/clear them before executing the main body of
1650       the event.
1651     */
1652     if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1653         thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1654     else
1655         thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1656 
1657     if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1658         thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1659     else
1660         thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1661     /* A small test to verify that objects have consistent types */
1662     DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1663 
1664     /*
1665       Now we are in a statement and will stay in a statement until we
1666       see a STMT_END_F.
1667 
1668       We set this flag here, before actually applying any rows, in
1669       case the SQL thread is stopped and we need to detect that we're
1670       inside a statement and halting abruptly might cause problems
1671       when restarting.
1672      */
1673     const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
1674 
1675      if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1676       set_flags(COMPLETE_ROWS_F);
1677 
1678     /*
1679       Set tables write and read sets.
1680 
1681       Read_set contains all slave columns (in case we are going to fetch
1682       a complete record from slave)
1683 
1684       Write_set equals the m_cols bitmap sent from master but it can be
1685       longer if slave has extra columns.
1686      */
1687 
1688     DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1689 
1690     bitmap_set_all(table->read_set);
1691     bitmap_set_all(table->write_set);
1692     if (!get_flags(COMPLETE_ROWS_F))
1693       bitmap_intersect(table->write_set,&m_cols);
1694 
1695     // Do event specific preparations
1696 
1697     error= do_before_row_operations(rli);
1698 
1699     // row processing loop
1700 
1701     while (error == 0 && m_curr_row < m_rows_end)
1702     {
1703       /* in_use can have been set to NULL in close_tables_for_reopen */
1704       THD* old_thd= table->in_use;
1705       if (!table->in_use)
1706         table->in_use= thd;
1707 
1708       error= do_exec_row(rli);
1709 
1710       DBUG_PRINT("info", ("error: %d", error));
1711       DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
1712 
1713       table->in_use = old_thd;
1714       switch (error)
1715       {
1716       case 0:
1717 	break;
1718 
1719       /* Some recoverable errors */
1720       case HA_ERR_RECORD_CHANGED:
1721       case HA_ERR_KEY_NOT_FOUND:	/* Idempotency support: OK if
1722                                            tuple does not exist */
1723         error= 0;
1724         break;
1725 
1726       default:
1727 	rli->report(ERROR_LEVEL, thd->net.last_errno,
1728                     "Error in %s event: row application failed. %s",
1729                     get_type_str(),
1730                     thd->net.last_error);
1731        thd->is_slave_error= 1;
1732 	break;
1733       }
1734 
1735       /*
1736        If m_curr_row_end  was not set during event execution (e.g., because
1737        of errors) we can't proceed to the next row. If the error is transient
1738        (i.e., error==0 at this point) we must call unpack_current_row() to set
1739        m_curr_row_end.
1740       */
1741 
1742       DBUG_PRINT("info", ("error: %d", error));
1743       DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
1744                           (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
1745 
1746       if (!m_curr_row_end && !error)
1747         unpack_current_row(rli);
1748 
1749       // at this moment m_curr_row_end should be set
1750       DBUG_ASSERT(error || m_curr_row_end != NULL);
1751       DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
1752       DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
1753 
1754       m_curr_row= m_curr_row_end;
1755 
1756     } // row processing loop
1757 
1758     DBUG_EXECUTE_IF("stop_slave_middle_group",
1759                     const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1760     error= do_after_row_operations(rli, error);
1761   } // if (table)
1762 
1763   if (error)
1764   {                     /* error has occured during the transaction */
1765     rli->report(ERROR_LEVEL, thd->net.last_errno,
1766                 "Error in %s event: error during transaction execution "
1767                 "on table %s.%s. %s",
1768                 get_type_str(), table->s->db.str,
1769                 table->s->table_name.str,
1770                 thd->net.last_error);
1771 
1772     /*
1773       If one day we honour --skip-slave-errors in row-based replication, and
1774       the error should be skipped, then we would clear mappings, rollback,
1775       close tables, but the slave SQL thread would not stop and then may
1776       assume the mapping is still available, the tables are still open...
1777       So then we should clear mappings/rollback/close here only if this is a
1778       STMT_END_F.
1779       For now we code, knowing that error is not skippable and so slave SQL
1780       thread is certainly going to stop.
1781       rollback at the caller along with sbr.
1782     */
1783     thd->reset_current_stmt_binlog_format_row();
1784     const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
1785     thd->is_slave_error= 1;
1786     DBUG_RETURN(error);
1787   }
1788 
1789   /*
1790     This code would ideally be placed in do_update_pos() instead, but
1791     since we have no access to table there, we do the setting of
1792     last_event_start_time here instead.
1793   */
1794   if (table && (table->s->primary_key == MAX_KEY) &&
1795       !is_using_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1796   {
1797     /*
1798       ------------ Temporary fix until WL#2975 is implemented ---------
1799 
1800       This event is not the last one (no STMT_END_F). If we stop now
1801       (in case of terminate_slave_thread()), how will we restart? We
1802       have to restart from Table_map_log_event, but as this table is
1803       not transactional, the rows already inserted will still be
1804       present, and idempotency is not guaranteed (no PK) so we risk
1805       that repeating leads to double insert. So we desperately try to
1806       continue, hope we'll eventually leave this buggy situation (by
1807       executing the final Old_rows_log_event). If we are in a hopeless
1808       wait (reached end of last relay log and nothing gets appended
1809       there), we timeout after one minute, and notify DBA about the
1810       problem.  When WL#2975 is implemented, just remove the member
1811       Relay_log_info::last_event_start_time and all its occurrences.
1812     */
1813     const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
1814   }
1815 
1816   if (get_flags(STMT_END_F))
1817   {
1818     /*
1819       This is the end of a statement or transaction, so close (and
1820       unlock) the tables we opened when processing the
1821       Table_map_log_event starting the statement.
1822 
1823       OBSERVER.  This will clear *all* mappings, not only those that
1824       are open for the table. There is not good handle for on-close
1825       actions for tables.
1826 
1827       NOTE. Even if we have no table ('table' == 0) we still need to be
1828       here, so that we increase the group relay log position. If we didn't, we
1829       could have a group relay log position which lags behind "forever"
1830       (assume the last master's transaction is ignored by the slave because of
1831       replicate-ignore rules).
1832     */
1833     int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1834 
1835     /*
1836       If this event is not in a transaction, the call below will, if some
1837       transactional storage engines are involved, commit the statement into
1838       them and flush the pending event to binlog.
1839       If this event is in a transaction, the call will do nothing, but a
1840       Xid_log_event will come next which will, if some transactional engines
1841       are involved, commit the transaction and flush the pending event to the
1842       binlog.
1843       If there was a deadlock the transaction should have been rolled back
1844       already. So there should be no need to rollback the transaction.
1845     */
1846     DBUG_ASSERT(! thd->transaction_rollback_request);
1847     if ((error= (binlog_error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd))))
1848       rli->report(ERROR_LEVEL, error,
1849                   "Error in %s event: commit of row events failed, "
1850                   "table `%s`.`%s`",
1851                   get_type_str(), m_table->s->db.str,
1852                   m_table->s->table_name.str);
1853     error|= binlog_error;
1854 
1855     /*
1856       Now what if this is not a transactional engine? we still need to
1857       flush the pending event to the binlog; we did it with
1858       thd->binlog_flush_pending_rows_event(). Note that we imitate
1859       what is done for real queries: a call to
1860       ha_autocommit_or_rollback() (sometimes only if involves a
1861       transactional engine), and a call to be sure to have the pending
1862       event flushed.
1863     */
1864 
1865     thd->reset_current_stmt_binlog_format_row();
1866     const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
1867   }
1868 
1869   DBUG_RETURN(error);
1870 }
1871 
1872 
1873 Log_event::enum_skip_reason
do_shall_skip(Relay_log_info * rli)1874 Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
1875 {
1876   /*
1877     If the slave skip counter is 1 and this event does not end a
1878     statement, then we should not start executing on the next event.
1879     Otherwise, we defer the decision to the normal skipping logic.
1880   */
1881   if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1882     return Log_event::EVENT_SKIP_IGNORE;
1883   else
1884     return Log_event::do_shall_skip(rli);
1885 }
1886 
1887 int
do_update_pos(Relay_log_info * rli)1888 Old_rows_log_event::do_update_pos(Relay_log_info *rli)
1889 {
1890   DBUG_ENTER("Old_rows_log_event::do_update_pos");
1891   int error= 0;
1892 
1893   DBUG_PRINT("info", ("flags: %s",
1894                       get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1895 
1896   if (get_flags(STMT_END_F))
1897   {
1898     /*
1899       Indicate that a statement is finished.
1900       Step the group log position if we are not in a transaction,
1901       otherwise increase the event log position.
1902      */
1903     rli->stmt_done(log_pos);
1904     /*
1905       Clear any errors in thd->net.last_err*. It is not known if this is
1906       needed or not. It is believed that any errors that may exist in
1907       thd->net.last_err* are allowed. Examples of errors are "key not
1908       found", which is produced in the test case rpl_row_conflicts.test
1909     */
1910     thd->clear_error();
1911   }
1912   else
1913   {
1914     rli->inc_event_relay_log_pos();
1915   }
1916 
1917   DBUG_RETURN(error);
1918 }
1919 
1920 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1921 
1922 
1923 #ifndef MYSQL_CLIENT
write_data_header(IO_CACHE * file)1924 bool Old_rows_log_event::write_data_header(IO_CACHE *file)
1925 {
1926   // This method should not be reached.
1927   assert(0);
1928   return TRUE;
1929 }
1930 
1931 
write_data_body(IO_CACHE * file)1932 bool Old_rows_log_event::write_data_body(IO_CACHE*file)
1933 {
1934   /*
1935      Note that this should be the number of *bits*, not the number of
1936      bytes.
1937   */
1938   uchar sbuf[sizeof(m_width)];
1939   my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1940 
1941   // This method should not be reached.
1942   assert(0);
1943 
1944   bool res= false;
1945   uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1946   DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1947 
1948   DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1949   res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
1950 
1951   DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1952   res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
1953                               no_bytes_in_map(&m_cols));
1954   DBUG_DUMP("rows", m_rows_buf, data_size);
1955   res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
1956 
1957   return res;
1958 
1959 }
1960 #endif
1961 
1962 
1963 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
pack_info(Protocol * protocol)1964 int Old_rows_log_event::pack_info(Protocol *protocol)
1965 {
1966   char buf[256];
1967   char const *const flagstr=
1968     get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1969   size_t bytes= my_snprintf(buf, sizeof(buf),
1970                             "table_id: %llu%s", m_table_id.id(), flagstr);
1971   protocol->store(buf, bytes, &my_charset_bin);
1972   return 0;
1973 }
1974 #endif
1975 
1976 
1977 #ifdef MYSQL_CLIENT
print_helper(FILE * file,PRINT_EVENT_INFO * print_event_info,char const * const name)1978 void Old_rows_log_event::print_helper(FILE *file,
1979                                       PRINT_EVENT_INFO *print_event_info,
1980                                       char const *const name)
1981 {
1982   IO_CACHE *const head= &print_event_info->head_cache;
1983   IO_CACHE *const body= &print_event_info->body_cache;
1984   if (!print_event_info->short_form)
1985   {
1986     bool const last_stmt_event= get_flags(STMT_END_F);
1987     print_header(head, print_event_info, !last_stmt_event);
1988     my_b_printf(head, "\t%s: table id %llu%s\n",
1989                 name, m_table_id.id(),
1990                 last_stmt_event ? " flags: STMT_END_F" : "");
1991     print_base64(body, print_event_info, !last_stmt_event);
1992   }
1993 }
1994 #endif
1995 
1996 
1997 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1998 /**
1999   Write the current row into event's table.
2000 
2001   The row is located in the row buffer, pointed by @c m_curr_row member.
2002   Number of columns of the row is stored in @c m_width member (it can be
2003   different from the number of columns in the table to which we insert).
2004   Bitmap @c m_cols indicates which columns are present in the row. It is assumed
2005   that event's table is already open and pointed by @c m_table.
2006 
2007   If the same record already exists in the table it can be either overwritten
2008   or an error is reported depending on the value of @c overwrite flag
2009   (error reporting not yet implemented). Note that the matching record can be
2010   different from the row we insert if we use primary keys to identify records in
2011   the table.
2012 
2013   The row to be inserted can contain values only for selected columns. The
2014   missing columns are filled with default values using @c prepare_record()
2015   function. If a matching record is found in the table and @c overwritte is
2016   true, the missing columns are taken from it.
2017 
2018   @param  rli   Relay log info (needed for row unpacking).
2019   @param  overwrite
2020                 Shall we overwrite if the row already exists or signal
2021                 error (currently ignored).
2022 
2023   @returns Error code on failure, 0 on success.
2024 
2025   This method, if successful, sets @c m_curr_row_end pointer to point at the
2026   next row in the rows buffer. This is done when unpacking the row to be
2027   inserted.
2028 
2029   @note If a matching record is found, it is either updated using
2030   @c ha_update_row() or first deleted and then new record written.
2031 */
2032 
2033 int
write_row(const Relay_log_info * const rli,const bool overwrite)2034 Old_rows_log_event::write_row(const Relay_log_info *const rli,
2035                               const bool overwrite)
2036 {
2037   DBUG_ENTER("write_row");
2038   DBUG_ASSERT(m_table != NULL && thd != NULL);
2039 
2040   TABLE *table= m_table;  // pointer to event's table
2041   int error;
2042   int keynum;
2043   auto_afree_ptr<char> key(NULL);
2044 
2045   /* fill table->record[0] with default values */
2046 
2047   if ((error= prepare_record(table, table->write_set,
2048                              TRUE /* check if columns have def. values */)))
2049     DBUG_RETURN(error);
2050 
2051   /* unpack row into table->record[0] */
2052   error= unpack_current_row(rli); // TODO: how to handle errors?
2053 
2054 #ifndef DBUG_OFF
2055   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2056   DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
2057   DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
2058 #endif
2059 
2060   /*
2061     Try to write record. If a corresponding record already exists in the table,
2062     we try to change it using ha_update_row() if possible. Otherwise we delete
2063     it and repeat the whole process again.
2064 
2065     TODO: Add safety measures against infinite looping.
2066    */
2067 
2068   while ((error= table->file->ha_write_row(table->record[0])))
2069   {
2070     if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
2071     {
2072       table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
2073       DBUG_RETURN(error);
2074     }
2075     if ((keynum= table->file->get_dup_key(error)) < 0)
2076     {
2077       DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
2078       table->file->print_error(error, MYF(0));
2079       /*
2080         We failed to retrieve the duplicate key
2081         - either because the error was not "duplicate key" error
2082         - or because the information which key is not available
2083       */
2084       DBUG_RETURN(error);
2085     }
2086 
2087     /*
2088        We need to retrieve the old row into record[1] to be able to
2089        either update or delete the offending record.  We either:
2090 
2091        - use ha_rnd_pos() with a row-id (available as dupp_row) to the
2092          offending row, if that is possible (MyISAM and Blackhole), or else
2093 
2094        - use ha_index_read_idx_map() with the key that is duplicated, to
2095          retrieve the offending row.
2096      */
2097     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
2098     {
2099       DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
2100       error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
2101       if (error)
2102       {
2103         DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
2104         if (error == HA_ERR_RECORD_DELETED)
2105           error= HA_ERR_KEY_NOT_FOUND;
2106         table->file->print_error(error, MYF(0));
2107         DBUG_RETURN(error);
2108       }
2109     }
2110     else
2111     {
2112       DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
2113 
2114       if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2115       {
2116         DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
2117         DBUG_RETURN(my_errno);
2118       }
2119 
2120       if (key.get() == NULL)
2121       {
2122         key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
2123         if (key.get() == NULL)
2124         {
2125           DBUG_PRINT("info",("Can't allocate key buffer"));
2126           DBUG_RETURN(ENOMEM);
2127         }
2128       }
2129 
2130       key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
2131                0);
2132       error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2133                                                 (const uchar*)key.get(),
2134                                                 HA_WHOLE_KEY,
2135                                                 HA_READ_KEY_EXACT);
2136       if (error)
2137       {
2138         DBUG_PRINT("info",("ha_index_read_idx_map() returns error %d", error));
2139         if (error == HA_ERR_RECORD_DELETED)
2140           error= HA_ERR_KEY_NOT_FOUND;
2141         table->file->print_error(error, MYF(0));
2142         DBUG_RETURN(error);
2143       }
2144     }
2145 
2146     /*
2147        Now, record[1] should contain the offending row.  That
2148        will enable us to update it or, alternatively, delete it (so
2149        that we can insert the new row afterwards).
2150      */
2151 
2152     /*
2153       If row is incomplete we will use the record found to fill
2154       missing columns.
2155     */
2156     if (!get_flags(COMPLETE_ROWS_F))
2157     {
2158       restore_record(table,record[1]);
2159       error= unpack_current_row(rli);
2160     }
2161 
2162 #ifndef DBUG_OFF
2163     DBUG_PRINT("debug",("preparing for update: before and after image"));
2164     DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2165     DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2166 #endif
2167 
2168     /*
2169        REPLACE is defined as either INSERT or DELETE + INSERT.  If
2170        possible, we can replace it with an UPDATE, but that will not
2171        work on InnoDB if FOREIGN KEY checks are necessary.
2172 
2173        I (Matz) am not sure of the reason for the last_uniq_key()
2174        check as, but I'm guessing that it's something along the
2175        following lines.
2176 
2177        Suppose that we got the duplicate key to be a key that is not
2178        the last unique key for the table and we perform an update:
2179        then there might be another key for which the unique check will
2180        fail, so we're better off just deleting the row and inserting
2181        the correct row.
2182      */
2183     if (last_uniq_key(table, keynum) &&
2184         !table->file->referenced_by_foreign_key())
2185     {
2186       DBUG_PRINT("info",("Updating row using ha_update_row()"));
2187       error=table->file->ha_update_row(table->record[1],
2188                                        table->record[0]);
2189       switch (error) {
2190 
2191       case HA_ERR_RECORD_IS_THE_SAME:
2192         DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2193                            " ha_update_row()"));
2194         error= 0;
2195 
2196       case 0:
2197         break;
2198 
2199       default:
2200         DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2201         table->file->print_error(error, MYF(0));
2202       }
2203 
2204       DBUG_RETURN(error);
2205     }
2206     else
2207     {
2208       DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2209       if ((error= table->file->ha_delete_row(table->record[1])))
2210       {
2211         DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2212         table->file->print_error(error, MYF(0));
2213         DBUG_RETURN(error);
2214       }
2215       /* Will retry ha_write_row() with the offending row removed. */
2216     }
2217   }
2218 
2219   DBUG_RETURN(error);
2220 }
2221 
2222 
2223 /**
2224   Locate the current row in event's table.
2225 
2226   The current row is pointed by @c m_curr_row. Member @c m_width tells how many
2227   columns are there in the row (this can be differnet from the number of columns
2228   in the table). It is assumed that event's table is already open and pointed
2229   by @c m_table.
2230 
2231   If a corresponding record is found in the table it is stored in
2232   @c m_table->record[0]. Note that when record is located based on a primary
2233   key, it is possible that the record found differs from the row being located.
2234 
2235   If no key is specified or table does not have keys, a table scan is used to
2236   find the row. In that case the row should be complete and contain values for
2237   all columns. However, it can still be shorter than the table, i.e. the table
2238   can contain extra columns not present in the row. It is also possible that
2239   the table has fewer columns than the row being located.
2240 
2241   @returns Error code on failure, 0 on success.
2242 
2243   @post In case of success @c m_table->record[0] contains the record found.
2244   Also, the internal "cursor" of the table is positioned at the record found.
2245 
2246   @note If the engine allows random access of the records, a combination of
2247   @c position() and @c rnd_pos() will be used.
2248  */
2249 
find_row(const Relay_log_info * rli)2250 int Old_rows_log_event::find_row(const Relay_log_info *rli)
2251 {
2252   DBUG_ENTER("find_row");
2253 
2254   DBUG_ASSERT(m_table && m_table->in_use != NULL);
2255 
2256   TABLE *table= m_table;
2257   int error;
2258 
2259   /* unpack row - missing fields get default values */
2260 
2261   // TODO: shall we check and report errors here?
2262   prepare_record(table, table->read_set, FALSE /* don't check errors */);
2263   error= unpack_current_row(rli);
2264 
2265 #ifndef DBUG_OFF
2266   DBUG_PRINT("info",("looking for the following record"));
2267   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2268 #endif
2269 
2270   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2271       table->s->primary_key < MAX_KEY)
2272   {
2273     /*
2274       Use a more efficient method to fetch the record given by
2275       table->record[0] if the engine allows it.  We first compute a
2276       row reference using the position() member function (it will be
2277       stored in table->file->ref) and the use rnd_pos() to position
2278       the "cursor" (i.e., record[0] in this case) at the correct row.
2279 
2280       TODO: Add a check that the correct record has been fetched by
2281       comparing with the original record. Take into account that the
2282       record on the master and slave can be of different
2283       length. Something along these lines should work:
2284 
2285       ADD>>>  store_record(table,record[1]);
2286               int error= table->file->rnd_pos(table->record[0], table->file->ref);
2287       ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
2288                                  table->s->reclength) == 0);
2289 
2290     */
2291     DBUG_PRINT("info",("locating record using primary key (position)"));
2292     int error= table->file->rnd_pos_by_record(table->record[0]);
2293     if (error)
2294     {
2295       DBUG_PRINT("info",("rnd_pos returns error %d",error));
2296       if (error == HA_ERR_RECORD_DELETED)
2297         error= HA_ERR_KEY_NOT_FOUND;
2298       table->file->print_error(error, MYF(0));
2299     }
2300     DBUG_RETURN(error);
2301   }
2302 
2303   // We can't use position() - try other methods.
2304 
2305   /*
2306     We need to retrieve all fields
2307     TODO: Move this out from this function to main loop
2308    */
2309   table->use_all_columns();
2310 
2311   /*
2312     Save copy of the record in table->record[1]. It might be needed
2313     later if linear search is used to find exact match.
2314    */
2315   store_record(table,record[1]);
2316 
2317   if (table->s->keys > 0)
2318   {
2319     DBUG_PRINT("info",("locating record using primary key (index_read)"));
2320 
2321     /* We have a key: search the table using the index */
2322     if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
2323     {
2324       DBUG_PRINT("info",("ha_index_init returns error %d",error));
2325       table->file->print_error(error, MYF(0));
2326       DBUG_RETURN(error);
2327     }
2328 
2329     /* Fill key data for the row */
2330 
2331     DBUG_ASSERT(m_key);
2332     key_copy(m_key, table->record[0], table->key_info, 0);
2333 
2334     /*
2335       Don't print debug messages when running valgrind since they can
2336       trigger false warnings.
2337      */
2338 #ifndef HAVE_purify
2339     DBUG_DUMP("key data", m_key, table->key_info->key_length);
2340 #endif
2341 
2342     /*
2343       We need to set the null bytes to ensure that the filler bit are
2344       all set when returning.  There are storage engines that just set
2345       the necessary bits on the bytes and don't set the filler bits
2346       correctly.
2347     */
2348     my_ptrdiff_t const pos=
2349       table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2350     table->record[0][pos]= 0xFF;
2351 
2352     if ((error= table->file->ha_index_read_map(table->record[0], m_key,
2353                                                HA_WHOLE_KEY,
2354                                                HA_READ_KEY_EXACT)))
2355     {
2356       DBUG_PRINT("info",("no record matching the key found in the table"));
2357       if (error == HA_ERR_RECORD_DELETED)
2358         error= HA_ERR_KEY_NOT_FOUND;
2359       table->file->print_error(error, MYF(0));
2360       table->file->ha_index_end();
2361       DBUG_RETURN(error);
2362     }
2363 
2364   /*
2365     Don't print debug messages when running valgrind since they can
2366     trigger false warnings.
2367    */
2368 #ifndef HAVE_purify
2369     DBUG_PRINT("info",("found first matching record"));
2370     DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2371 #endif
2372     /*
2373       Below is a minor "optimization".  If the key (i.e., key number
2374       0) has the HA_NOSAME flag set, we know that we have found the
2375       correct record (since there can be no duplicates); otherwise, we
2376       have to compare the record with the one found to see if it is
2377       the correct one.
2378 
2379       CAVEAT! This behaviour is essential for the replication of,
2380       e.g., the mysql.proc table since the correct record *shall* be
2381       found using the primary key *only*.  There shall be no
2382       comparison of non-PK columns to decide if the correct record is
2383       found.  I can see no scenario where it would be incorrect to
2384       chose the row to change only using a PK or an UNNI.
2385     */
2386     if (table->key_info->flags & HA_NOSAME)
2387     {
2388       /* Unique does not have non nullable part */
2389       if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2390       {
2391         table->file->ha_index_end();
2392         DBUG_RETURN(0);
2393       }
2394       else
2395       {
2396         KEY *keyinfo= table->key_info;
2397         /*
2398           Unique has nullable part. We need to check if there is any field in the
2399           BI image that is null and part of UNNI.
2400         */
2401         bool null_found= FALSE;
2402         for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2403         {
2404           uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2405           Field **f= table->field+fieldnr;
2406           null_found= (*f)->is_null();
2407         }
2408 
2409         if (!null_found)
2410         {
2411           table->file->ha_index_end();
2412           DBUG_RETURN(0);
2413         }
2414 
2415         /* else fall through to index scan */
2416       }
2417     }
2418 
2419     /*
2420       In case key is not unique, we still have to iterate over records found
2421       and find the one which is identical to the row given. A copy of the
2422       record we are looking for is stored in record[1].
2423      */
2424     DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2425 
2426     while (record_compare(table))
2427     {
2428       /*
2429         We need to set the null bytes to ensure that the filler bit
2430         are all set when returning.  There are storage engines that
2431         just set the necessary bits on the bytes and don't set the
2432         filler bits correctly.
2433 
2434         TODO[record format ndb]: Remove this code once NDB returns the
2435         correct record format.
2436       */
2437       if (table->s->null_bytes > 0)
2438       {
2439         table->record[0][table->s->null_bytes - 1]|=
2440           256U - (1U << table->s->last_null_bit_pos);
2441       }
2442 
2443       while ((error= table->file->ha_index_next(table->record[0])))
2444       {
2445         /* We just skip records that has already been deleted */
2446         if (error == HA_ERR_RECORD_DELETED)
2447           continue;
2448         DBUG_PRINT("info",("no record matching the given row found"));
2449         table->file->print_error(error, MYF(0));
2450         (void) table->file->ha_index_end();
2451         DBUG_RETURN(error);
2452       }
2453     }
2454 
2455     /*
2456       Have to restart the scan to be able to fetch the next row.
2457     */
2458     table->file->ha_index_end();
2459   }
2460   else
2461   {
2462     DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
2463 
2464     int restart_count= 0; // Number of times scanning has restarted from top
2465 
2466     /* We don't have a key: search the table using ha_rnd_next() */
2467     if ((error= table->file->ha_rnd_init(1)))
2468     {
2469       DBUG_PRINT("info",("error initializing table scan"
2470                          " (ha_rnd_init returns %d)",error));
2471       table->file->print_error(error, MYF(0));
2472       DBUG_RETURN(error);
2473     }
2474 
2475     /* Continue until we find the right record or have made a full loop */
2476     do
2477     {
2478   restart_ha_rnd_next:
2479       error= table->file->ha_rnd_next(table->record[0]);
2480 
2481       switch (error) {
2482 
2483       case 0:
2484         break;
2485 
2486       case HA_ERR_RECORD_DELETED:
2487         goto restart_ha_rnd_next;
2488 
2489       case HA_ERR_END_OF_FILE:
2490         if (++restart_count < 2)
2491         {
2492           if ((error= table->file->ha_rnd_init(1)))
2493           {
2494             table->file->print_error(error, MYF(0));
2495             DBUG_RETURN(error);
2496           }
2497           goto restart_ha_rnd_next;
2498         }
2499         break;
2500 
2501       default:
2502         DBUG_PRINT("info", ("Failed to get next record"
2503                             " (ha_rnd_next returns %d)",error));
2504         table->file->print_error(error, MYF(0));
2505         table->file->ha_rnd_end();
2506         DBUG_RETURN(error);
2507       }
2508     }
2509     while (restart_count < 2 && record_compare(table));
2510 
2511     /*
2512       Note: above record_compare will take into accout all record fields
2513       which might be incorrect in case a partial row was given in the event
2514      */
2515 
2516     /*
2517       Have to restart the scan to be able to fetch the next row.
2518     */
2519     if (restart_count == 2)
2520       DBUG_PRINT("info", ("Record not found"));
2521     else
2522       DBUG_DUMP("record found", table->record[0], table->s->reclength);
2523     table->file->ha_rnd_end();
2524 
2525     DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
2526     DBUG_RETURN(error);
2527   }
2528 
2529   DBUG_RETURN(0);
2530 }
2531 
2532 #endif
2533 
2534 
2535 /**************************************************************************
2536 	Write_rows_log_event member functions
2537 **************************************************************************/
2538 
2539 /*
2540   Constructor used to build an event for writing to the binary log.
2541  */
2542 #if !defined(MYSQL_CLIENT)
Write_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid_arg,MY_BITMAP const * cols,bool is_transactional)2543 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2544                                                    TABLE *tbl_arg,
2545                                                    ulong tid_arg,
2546                                                    MY_BITMAP const *cols,
2547                                                    bool is_transactional)
2548   : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2549 {
2550 
2551   // This constructor should not be reached.
2552   assert(0);
2553 
2554 }
2555 #endif
2556 
2557 
2558 /*
2559   Constructor used by slave to read the event from the binary log.
2560  */
2561 #ifdef HAVE_REPLICATION
Write_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2562 Write_rows_log_event_old::Write_rows_log_event_old(const char *buf,
2563                                                    uint event_len,
2564                                                    const Format_description_log_event
2565                                                    *description_event)
2566 : Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT,
2567                      description_event)
2568 {
2569 }
2570 #endif
2571 
2572 
2573 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2574 int
do_before_row_operations(const Slave_reporting_capability * const)2575 Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2576 {
2577   int error= 0;
2578 
2579   /*
2580     We are using REPLACE semantics and not INSERT IGNORE semantics
2581     when writing rows, that is: new rows replace old rows.  We need to
2582     inform the storage engine that it should use this behaviour.
2583   */
2584 
2585   /* Tell the storage engine that we are using REPLACE semantics. */
2586   thd->lex->duplicates= DUP_REPLACE;
2587 
2588   /*
2589     Pretend we're executing a REPLACE command: this is needed for
2590     InnoDB and NDB Cluster since they are not (properly) checking the
2591     lex->duplicates flag.
2592   */
2593   thd->lex->sql_command= SQLCOM_REPLACE;
2594   /*
2595      Do not raise the error flag in case of hitting to an unique attribute
2596   */
2597   m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2598   /*
2599      NDB specific: update from ndb master wrapped as Write_rows
2600   */
2601   /*
2602     so that the event should be applied to replace slave's row
2603   */
2604   m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2605   /*
2606      NDB specific: if update from ndb master wrapped as Write_rows
2607      does not find the row it's assumed idempotent binlog applying
2608      is taking place; don't raise the error.
2609   */
2610   m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2611   /*
2612     TODO: the cluster team (Tomas?) says that it's better if the engine knows
2613     how many rows are going to be inserted, then it can allocate needed memory
2614     from the start.
2615   */
2616   m_table->file->ha_start_bulk_insert(0);
2617   return error;
2618 }
2619 
2620 
2621 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2622 Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2623                                                   int error)
2624 {
2625   int local_error= 0;
2626   m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2627   m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2628   /*
2629     reseting the extra with
2630     table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2631     fires bug#27077
2632     todo: explain or fix
2633   */
2634   if ((local_error= m_table->file->ha_end_bulk_insert()))
2635   {
2636     m_table->file->print_error(local_error, MYF(0));
2637   }
2638   return error? error : local_error;
2639 }
2640 
2641 
2642 int
do_exec_row(const Relay_log_info * const rli)2643 Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2644 {
2645   DBUG_ASSERT(m_table != NULL);
2646   int error= write_row(rli, TRUE /* overwrite */);
2647 
2648   if (error && !thd->net.last_errno)
2649     thd->net.last_errno= error;
2650 
2651   return error;
2652 }
2653 
2654 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2655 
2656 
2657 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2658 void Write_rows_log_event_old::print(FILE *file,
2659                                      PRINT_EVENT_INFO* print_event_info)
2660 {
2661   Old_rows_log_event::print_helper(file, print_event_info, "Write_rows_old");
2662 }
2663 #endif
2664 
2665 
2666 /**************************************************************************
2667 	Delete_rows_log_event member functions
2668 **************************************************************************/
2669 
2670 /*
2671   Constructor used to build an event for writing to the binary log.
2672  */
2673 
2674 #ifndef MYSQL_CLIENT
Delete_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2675 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2676                                                      TABLE *tbl_arg,
2677                                                      ulong tid,
2678                                                      MY_BITMAP const *cols,
2679                                                      bool is_transactional)
2680   : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2681     m_after_image(NULL), m_memory(NULL)
2682 {
2683 
2684   // This constructor should not be reached.
2685   assert(0);
2686 
2687 }
2688 #endif /* #if !defined(MYSQL_CLIENT) */
2689 
2690 
2691 /*
2692   Constructor used by slave to read the event from the binary log.
2693  */
2694 #ifdef HAVE_REPLICATION
Delete_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2695 Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf,
2696                                                      uint event_len,
2697                                                      const Format_description_log_event
2698                                                      *description_event)
2699   : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT,
2700                        description_event),
2701     m_after_image(NULL), m_memory(NULL)
2702 {
2703 }
2704 #endif
2705 
2706 
2707 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2708 
2709 int
do_before_row_operations(const Slave_reporting_capability * const)2710 Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2711 {
2712   if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2713       m_table->s->primary_key < MAX_KEY)
2714   {
2715     /*
2716       We don't need to allocate any memory for m_key since it is not used.
2717     */
2718     return 0;
2719   }
2720 
2721   if (m_table->s->keys > 0)
2722   {
2723     // Allocate buffer for key searches
2724     m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2725     if (!m_key)
2726       return HA_ERR_OUT_OF_MEM;
2727   }
2728   return 0;
2729 }
2730 
2731 
2732 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2733 Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2734                                                    int error)
2735 {
2736   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2737   m_table->file->ha_index_or_rnd_end();
2738   my_free(m_key);
2739   m_key= NULL;
2740 
2741   return error;
2742 }
2743 
2744 
do_exec_row(const Relay_log_info * const rli)2745 int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2746 {
2747   int error;
2748   DBUG_ASSERT(m_table != NULL);
2749 
2750   if (!(error= find_row(rli)))
2751   {
2752     /*
2753       Delete the record found, located in record[0]
2754     */
2755     error= m_table->file->ha_delete_row(m_table->record[0]);
2756   }
2757   return error;
2758 }
2759 
2760 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2761 
2762 
2763 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2764 void Delete_rows_log_event_old::print(FILE *file,
2765                                       PRINT_EVENT_INFO* print_event_info)
2766 {
2767   Old_rows_log_event::print_helper(file, print_event_info, "Delete_rows_old");
2768 }
2769 #endif
2770 
2771 
2772 /**************************************************************************
2773 	Update_rows_log_event member functions
2774 **************************************************************************/
2775 
2776 /*
2777   Constructor used to build an event for writing to the binary log.
2778  */
2779 #if !defined(MYSQL_CLIENT)
Update_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2780 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2781                                                      TABLE *tbl_arg,
2782                                                      ulong tid,
2783                                                      MY_BITMAP const *cols,
2784                                                      bool is_transactional)
2785   : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2786     m_after_image(NULL), m_memory(NULL)
2787 {
2788 
2789   // This constructor should not be reached.
2790   assert(0);
2791 }
2792 #endif /* !defined(MYSQL_CLIENT) */
2793 
2794 
2795 /*
2796   Constructor used by slave to read the event from the binary log.
2797  */
2798 #ifdef HAVE_REPLICATION
Update_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2799 Update_rows_log_event_old::Update_rows_log_event_old(const char *buf,
2800                                                      uint event_len,
2801                                                      const
2802                                                      Format_description_log_event
2803                                                      *description_event)
2804   : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT,
2805                        description_event),
2806     m_after_image(NULL), m_memory(NULL)
2807 {
2808 }
2809 #endif
2810 
2811 
2812 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2813 
2814 int
do_before_row_operations(const Slave_reporting_capability * const)2815 Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2816 {
2817   if (m_table->s->keys > 0)
2818   {
2819     // Allocate buffer for key searches
2820     m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2821     if (!m_key)
2822       return HA_ERR_OUT_OF_MEM;
2823   }
2824 
2825   return 0;
2826 }
2827 
2828 
2829 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2830 Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2831                                                    int error)
2832 {
2833   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2834   m_table->file->ha_index_or_rnd_end();
2835   my_free(m_key); // Free for multi_malloc
2836   m_key= NULL;
2837 
2838   return error;
2839 }
2840 
2841 
2842 int
do_exec_row(const Relay_log_info * const rli)2843 Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2844 {
2845   DBUG_ASSERT(m_table != NULL);
2846 
2847   int error= find_row(rli);
2848   if (error)
2849   {
2850     /*
2851       We need to read the second image in the event of error to be
2852       able to skip to the next pair of updates
2853     */
2854     m_curr_row= m_curr_row_end;
2855     unpack_current_row(rli);
2856     return error;
2857   }
2858 
2859   /*
2860     This is the situation after locating BI:
2861 
2862     ===|=== before image ====|=== after image ===|===
2863        ^                     ^
2864        m_curr_row            m_curr_row_end
2865 
2866     BI found in the table is stored in record[0]. We copy it to record[1]
2867     and unpack AI to record[0].
2868    */
2869 
2870   store_record(m_table,record[1]);
2871 
2872   m_curr_row= m_curr_row_end;
2873   error= unpack_current_row(rli); // this also updates m_curr_row_end
2874 
2875   /*
2876     Now we have the right row to update.  The old row (the one we're
2877     looking for) is in record[1] and the new row is in record[0].
2878   */
2879 #ifndef HAVE_purify
2880   /*
2881     Don't print debug messages when running valgrind since they can
2882     trigger false warnings.
2883    */
2884   DBUG_PRINT("info",("Updating row in table"));
2885   DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2886   DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2887 #endif
2888 
2889   error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2890   if (error == HA_ERR_RECORD_IS_THE_SAME)
2891     error= 0;
2892 
2893   return error;
2894 }
2895 
2896 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2897 
2898 
2899 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2900 void Update_rows_log_event_old::print(FILE *file,
2901                                       PRINT_EVENT_INFO* print_event_info)
2902 {
2903   Old_rows_log_event::print_helper(file, print_event_info, "Update_rows_old");
2904 }
2905 #endif
2906