1 /* Copyright (c) 2007, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "my_global.h" // REQUIRED by log_event.h > m_string.h > my_bitmap.h
24 #include "log_event.h"
25 #ifndef MYSQL_CLIENT
26 #include "sql_cache.h"                       // QUERY_CACHE_FLAGS_SIZE
27 #include "sql_base.h"                       // close_tables_for_reopen
28 #include "key.h"                            // key_copy
29 #include "lock.h"                           // mysql_unlock_tables
30 #include "sql_parse.h"             // mysql_reset_thd_for_next_command
31 #include "rpl_rli.h"
32 #include "rpl_utility.h"
33 #endif
34 #include "log_event_old.h"
35 #include "rpl_record_old.h"
36 #include "transaction.h"
37 
38 #include <algorithm>
39 
40 using std::min;
41 using std::max;
42 
43 PSI_memory_key key_memory_log_event_old;
44 
45 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
46 
47 // Old implementation of do_apply_event()
48 int
do_apply_event(Old_rows_log_event * ev,const Relay_log_info * rli)49 Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli)
50 {
51   DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
52   int error= 0;
53   THD *ev_thd= ev->thd;
54   uchar const *row_start= ev->m_rows_buf;
55 
56   /*
57     If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
58     does not contain any data.  In that case, we just remove all tables in the
59     tables_to_lock list, close the thread tables, and return with
60     success.
61    */
62   if ((ev->m_table_id.id() == ~0U || ev->m_table_id.id() == (~0ULL >> 16)) &&
63       ev->m_cols.n_bits == 1 && ev->m_cols.bitmap[0] == 0)
64 
65   {
66     /*
67        This one is supposed to be set: just an extra check so that
68        nothing strange has happened.
69      */
70     assert(ev->get_flags(Old_rows_log_event::STMT_END_F));
71 
72     const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
73     ev_thd->clear_error();
74     DBUG_RETURN(0);
75   }
76 
77   /*
78     'ev_thd' has been set by exec_relay_log_event(), just before calling
79     do_apply_event(). We still check here to prevent future coding
80     errors.
81   */
82   assert(rli->info_thd == ev_thd);
83 
84   /*
85     If there is no locks taken, this is the first binrow event seen
86     after the table map events.  We should then lock all the tables
87     used in the transaction and proceed with execution of the actual
88     event.
89   */
90   if (!ev_thd->lock)
91   {
92     /*
93       Lock_tables() reads the contents of ev_thd->lex, so they must be
94       initialized.
95 
96       We also call the mysql_reset_thd_for_next_command(), since this
97       is the logical start of the next "statement". Note that this
98       call might reset the value of current_stmt_binlog_format, so
99       we need to do any changes to that value after this function.
100     */
101     lex_start(ev_thd);
102     mysql_reset_thd_for_next_command(ev_thd);
103 
104     /*
105       This is a row injection, so we flag the "statement" as
106       such. Note that this code is called both when the slave does row
107       injections and when the BINLOG statement is used to do row
108       injections.
109     */
110     ev_thd->lex->set_stmt_row_injection();
111 
112     if (open_and_lock_tables(ev_thd, rli->tables_to_lock, 0))
113     {
114       if (ev_thd->is_error())
115       {
116         /*
117           Error reporting borrowed from Query_log_event with many excessive
118           simplifications (we don't honour --slave-skip-errors)
119         */
120         rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->mysql_errno(),
121                     "Error '%s' on opening tables",
122                     ev_thd->get_stmt_da()->message_text());
123         ev_thd->is_slave_error= 1;
124       }
125       DBUG_RETURN(1);
126     }
127 
128     /*
129       When the open and locking succeeded, we check all tables to
130       ensure that they still have the correct type.
131     */
132 
133     {
134       TABLE_LIST *table_list_ptr= rli->tables_to_lock;
135       for (uint i=0 ; table_list_ptr&& (i< rli->tables_to_lock_count);
136            table_list_ptr= table_list_ptr->next_global, i++)
137       {
138         /*
139           Please see comment in log_event.cc-Rows_log_event::do_apply_event()
140           function for the explanation of the below if condition
141         */
142         if (table_list_ptr->parent_l)
143           continue;
144         /*
145           We can use a down cast here since we know that every table added
146           to the tables_to_lock is a RPL_TABLE_LIST(or child table which is
147           skipped above).
148         */
149         RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
150         assert(ptr->m_tabledef_valid);
151         TABLE *conv_table;
152         if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
153                                              ptr->table, &conv_table))
154         {
155           ev_thd->is_slave_error= 1;
156           const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
157           DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
158         }
159         DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
160                              " - conv_table: %p",
161                              ptr->table->s->db.str,
162                              ptr->table->s->table_name.str, conv_table));
163         ptr->m_conv_table= conv_table;
164       }
165     }
166 
167     /*
168       ... and then we add all the tables to the table map and remove
169       them from tables to lock.
170 
171       We also invalidate the query cache for all the tables, since
172       they will now be changed.
173 
174       TODO [/Matz]: Maybe the query cache should not be invalidated
175       here? It might be that a table is not changed, even though it
176       was locked for the statement.  We do know that each
177       Old_rows_log_event contain at least one row, so after processing one
178       Old_rows_log_event, we can invalidate the query cache for the
179       associated table.
180      */
181     TABLE_LIST *ptr= rli->tables_to_lock;
182     for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
183     {
184       /*
185         Please see comment in log_event.cc-Rows_log_event::do_apply_event()
186         function for the explanation of the below if condition
187        */
188       if (ptr->parent_l)
189         continue;
190       const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
191     }
192     query_cache.invalidate_locked_for_write(rli->tables_to_lock);
193   }
194 
195   TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
196 
197   if (table)
198   {
199     /*
200       table == NULL means that this table should not be replicated
201       (this was set up by Table_map_log_event::do_apply_event()
202       which tested replicate-* rules).
203     */
204 
205     /*
206       It's not needed to set_time() but
207       1) it continues the property that "Time" in SHOW PROCESSLIST shows how
208       much slave is behind
209       2) it will be needed when we allow replication from a table with no
210       TIMESTAMP column to a table with one.
211       So we call set_time(), like in SBR. Presently it changes nothing.
212     */
213     ev_thd->set_time(&ev->common_header->when);
214     /*
215       There are a few flags that are replicated with each row event.
216       Make sure to set/clear them before executing the main body of
217       the event.
218     */
219     if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
220         ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
221     else
222         ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
223 
224     if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
225         ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
226     else
227         ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
228     /* A small test to verify that objects have consistent types */
229     assert(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
230 
231     /*
232       Now we are in a statement and will stay in a statement until we
233       see a STMT_END_F.
234 
235       We set this flag here, before actually applying any rows, in
236       case the SQL thread is stopped and we need to detect that we're
237       inside a statement and halting abruptly might cause problems
238       when restarting.
239      */
240     const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
241 
242     error= do_before_row_operations(table);
243     while (error == 0 && row_start < ev->m_rows_end)
244     {
245       uchar const *row_end= NULL;
246       if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
247         break; // We should perform the after-row operation even in
248                // the case of error
249 
250       assert(row_end != NULL); // cannot happen
251       assert(row_end <= ev->m_rows_end);
252 
253       /* in_use can have been set to NULL in close_tables_for_reopen */
254       THD* old_thd= table->in_use;
255       if (!table->in_use)
256         table->in_use= ev_thd;
257       error= do_exec_row(table);
258       table->in_use = old_thd;
259       switch (error)
260       {
261         /* Some recoverable errors */
262       case HA_ERR_RECORD_CHANGED:
263       case HA_ERR_KEY_NOT_FOUND:  /* Idempotency support: OK if
264                                            tuple does not exist */
265   error= 0;
266       case 0:
267   break;
268 
269       default:
270   rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->mysql_errno(),
271                     "Error in %s event: row application failed. %s",
272                     ev->get_type_str(),
273                     (ev_thd->is_error() ?
274                      ev_thd->get_stmt_da()->message_text() :
275                      ""));
276   thd->is_slave_error= 1;
277   break;
278       }
279 
280       row_start= row_end;
281     }
282     DBUG_EXECUTE_IF("stop_slave_middle_group",
283                     const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
284     error= do_after_row_operations(table, error);
285   }
286 
287   if (error)
288   {                     /* error has occured during the transaction */
289     rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->mysql_errno(),
290                 "Error in %s event: error during transaction execution "
291                 "on table %s.%s. %s",
292                 ev->get_type_str(), table->s->db.str,
293                 table->s->table_name.str,
294                 ev_thd->is_error() ? ev_thd->get_stmt_da()->message_text() : "");
295 
296     /*
297       If one day we honour --skip-slave-errors in row-based replication, and
298       the error should be skipped, then we would clear mappings, rollback,
299       close tables, but the slave SQL thread would not stop and then may
300       assume the mapping is still available, the tables are still open...
301       So then we should clear mappings/rollback/close here only if this is a
302       STMT_END_F.
303       For now we code, knowing that error is not skippable and so slave SQL
304       thread is certainly going to stop.
305       rollback at the caller along with sbr.
306     */
307     ev_thd->reset_current_stmt_binlog_format_row();
308     const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
309     ev_thd->is_slave_error= 1;
310     DBUG_RETURN(error);
311   }
312 
313   DBUG_RETURN(0);
314 }
315 #endif
316 
317 
318 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
319 
320 /*
321   Check if there are more UNIQUE keys after the given key.
322 */
323 static int
last_uniq_key(TABLE * table,uint keyno)324 last_uniq_key(TABLE *table, uint keyno)
325 {
326   while (++keyno < table->s->keys)
327     if (table->key_info[keyno].flags & HA_NOSAME)
328       return 0;
329   return 1;
330 }
331 
332 
333 /*
334   Compares table->record[0] and table->record[1]
335 
336   Returns TRUE if different.
337 */
record_compare(TABLE * table)338 static bool record_compare(TABLE *table)
339 {
340   /*
341     Need to set the X bit and the filler bits in both records since
342     there are engines that do not set it correctly.
343 
344     In addition, since MyISAM checks that one hasn't tampered with the
345     record, it is necessary to restore the old bytes into the record
346     after doing the comparison.
347 
348     TODO[record format ndb]: Remove it once NDB returns correct
349     records. Check that the other engines also return correct records.
350    */
351 
352   bool result= FALSE;
353   uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
354 
355   if (table->s->null_bytes > 0)
356   {
357     for (int i = 0 ; i < 2 ; ++i)
358     {
359       /*
360         If we have an X bit then we need to take care of it.
361       */
362       if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
363       {
364         saved_x[i]= table->record[i][0];
365         table->record[i][0]|= 1U;
366       }
367 
368       /*
369          If (last_null_bit_pos == 0 && null_bytes > 1), then:
370 
371          X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
372 
373          Ie, the entire byte is used.
374       */
375       if (table->s->last_null_bit_pos > 0)
376       {
377         saved_filler[i]= table->record[i][table->s->null_bytes - 1];
378         table->record[i][table->s->null_bytes - 1]|=
379           256U - (1U << table->s->last_null_bit_pos);
380       }
381     }
382   }
383 
384   if (table->s->blob_fields + table->s->varchar_fields == 0)
385   {
386     result= cmp_record(table,record[1]);
387     goto record_compare_exit;
388   }
389 
390   /* Compare null bits */
391   if (memcmp(table->null_flags,
392        table->null_flags+table->s->rec_buff_length,
393        table->s->null_bytes))
394   {
395     result= TRUE;       // Diff in NULL value
396     goto record_compare_exit;
397   }
398 
399   /* Compare updated fields */
400   for (Field **ptr=table->field ; *ptr ; ptr++)
401   {
402     if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
403     {
404       result= TRUE;
405       goto record_compare_exit;
406     }
407   }
408 
409 record_compare_exit:
410   /*
411     Restore the saved bytes.
412 
413     TODO[record format ndb]: Remove this code once NDB returns the
414     correct record format.
415   */
416   if (table->s->null_bytes > 0)
417   {
418     for (int i = 0 ; i < 2 ; ++i)
419     {
420       if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
421         table->record[i][0]= saved_x[i];
422 
423       if (table->s->last_null_bit_pos > 0)
424         table->record[i][table->s->null_bytes - 1]= saved_filler[i];
425     }
426   }
427 
428   return result;
429 }
430 
431 
432 /*
433   Copy "extra" columns from record[1] to record[0].
434 
435   Copy the extra fields that are not present on the master but are
436   present on the slave from record[1] to record[0].  This is used
437   after fetching a record that are to be updated, either inside
438   replace_record() or as part of executing an update_row().
439  */
440 static int
copy_extra_record_fields(TABLE * table,size_t master_reclength,my_ptrdiff_t master_fields)441 copy_extra_record_fields(TABLE *table,
442                          size_t master_reclength,
443                          my_ptrdiff_t master_fields)
444 {
445   DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
446   DBUG_PRINT("info", ("Copying to 0x%lx "
447                       "from field %lu at offset %lu "
448                       "to field %d at offset %lu",
449                       (long) table->record[0],
450                       (ulong) master_fields, (ulong) master_reclength,
451                       table->s->fields, table->s->reclength));
452   /*
453     Copying the extra fields of the slave that does not exist on
454     master into record[0] (which are basically the default values).
455   */
456 
457   if (table->s->fields < (uint) master_fields)
458     DBUG_RETURN(0);
459 
460   assert(master_reclength <= table->s->reclength);
461   if (master_reclength < table->s->reclength)
462     memcpy(table->record[0] + master_reclength,
463                 table->record[1] + master_reclength,
464                 table->s->reclength - master_reclength);
465 
466   /*
467     Bit columns are special.  We iterate over all the remaining
468     columns and copy the "extra" bits to the new record.  This is
469     not a very good solution: it should be refactored on
470     opportunity.
471 
472     REFACTORING SUGGESTION (Matz).  Introduce a member function
473     similar to move_field_offset() called copy_field_offset() to
474     copy field values and implement it for all Field subclasses. Use
475     this function to copy data from the found record to the record
476     that are going to be inserted.
477 
478     The copy_field_offset() function need to be a virtual function,
479     which in this case will prevent copying an entire range of
480     fields efficiently.
481   */
482   {
483     Field **field_ptr= table->field + master_fields;
484     for ( ; *field_ptr ; ++field_ptr)
485     {
486       /*
487         Set the null bit according to the values in record[1]
488        */
489       if ((*field_ptr)->maybe_null() &&
490           (*field_ptr)->is_null_in_record(table->record[1]))
491         (*field_ptr)->set_null();
492       else
493         (*field_ptr)->set_notnull();
494 
495       /*
496         Do the extra work for special columns.
497        */
498       switch ((*field_ptr)->real_type())
499       {
500       default:
501         /* Nothing to do */
502         break;
503 
504       case MYSQL_TYPE_BIT:
505         Field_bit *f= static_cast<Field_bit*>(*field_ptr);
506         if (f->bit_len > 0)
507         {
508           my_ptrdiff_t const offset= table->record[1] - table->record[0];
509           uchar const bits=
510             get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
511           set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
512         }
513         break;
514       }
515     }
516   }
517   DBUG_RETURN(0);                                     // All OK
518 }
519 
520 
521 /*
522   Replace the provided record in the database.
523 
524   SYNOPSIS
525       replace_record()
526       thd    Thread context for writing the record.
527       table  Table to which record should be written.
528       master_reclength
529              Offset to first column that is not present on the master,
530              alternatively the length of the record on the master
531              side.
532 
533   RETURN VALUE
534       Error code on failure, 0 on success.
535 
536   DESCRIPTION
537       Similar to how it is done in mysql_insert(), we first try to do
538       a ha_write_row() and of that fails due to duplicated keys (or
539       indices), we do an ha_update_row() or a ha_delete_row() instead.
540  */
541 static int
replace_record(THD * thd,TABLE * table,ulong const master_reclength,uint const master_fields)542 replace_record(THD *thd, TABLE *table,
543                ulong const master_reclength,
544                uint const master_fields)
545 {
546   DBUG_ENTER("replace_record");
547   assert(table != NULL && thd != NULL);
548 
549   int error;
550   int keynum;
551   char *key= NULL;
552 
553 #ifndef NDEBUG
554   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
555   DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
556   DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
557 #endif
558 
559   while ((error= table->file->ha_write_row(table->record[0])))
560   {
561     if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
562     {
563       table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
564       DBUG_RETURN(error);
565     }
566     if ((keynum= table->file->get_dup_key(error)) < 0)
567     {
568       table->file->print_error(error, MYF(0));
569       /*
570         We failed to retrieve the duplicate key
571         - either because the error was not "duplicate key" error
572         - or because the information which key is not available
573       */
574       DBUG_RETURN(error);
575     }
576 
577     /*
578       key index value is either valid in the range [0-MAX_KEY) or
579       has value MAX_KEY as a marker for the case when no information
580       about key can be found. In the last case we have to require
581       that storage engine has the flag HA_DUPLICATE_POS turned on.
582       If this invariant is false then assert will crash
583       the server built in debug mode. For the server that was built
584       without DEBUG we have additional check for the value of key index
585       in the code below in order to report about error in any case.
586     */
587     assert(keynum != MAX_KEY ||
588            (keynum == MAX_KEY &&
589             (table->file->ha_table_flags() & HA_DUPLICATE_POS)));
590 
591     /*
592        We need to retrieve the old row into record[1] to be able to
593        either update or delete the offending record.  We either:
594 
595        - use ha_rnd_pos() with a row-id (available as dupp_row) to the
596          offending row, if that is possible (MyISAM and Blackhole), or else
597 
598        - use ha_index_read_idx_map() with the key that is duplicated, to
599          retrieve the offending row.
600      */
601     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
602     {
603       error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
604       if (error)
605       {
606         DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
607         if (error == HA_ERR_RECORD_DELETED)
608           error= HA_ERR_KEY_NOT_FOUND;
609         table->file->print_error(error, MYF(0));
610         DBUG_RETURN(error);
611       }
612     }
613     else
614     {
615       if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
616       {
617         DBUG_RETURN(my_errno());
618       }
619 
620       if (key == NULL)
621       {
622         key= static_cast<char*>(my_alloca(table->s->max_unique_length));
623         if (key == NULL)
624           DBUG_RETURN(ENOMEM);
625       }
626 
627       if ((uint)keynum < MAX_KEY)
628       {
629         key_copy((uchar*)key, table->record[0], table->key_info + keynum,
630                  0);
631         error= table->file->ha_index_read_idx_map(table->record[1], keynum,
632                                                   (const uchar*)key,
633                                                   HA_WHOLE_KEY,
634                                                   HA_READ_KEY_EXACT);
635       }
636       else
637         /*
638           For the server built in non-debug mode returns error if
639           handler::get_dup_key() returned MAX_KEY as the value of key index.
640         */
641         error= HA_ERR_FOUND_DUPP_KEY;
642 
643       if (error)
644       {
645         DBUG_PRINT("info", ("ha_index_read_idx_map() returns error %d", error));
646         if (error == HA_ERR_RECORD_DELETED)
647           error= HA_ERR_KEY_NOT_FOUND;
648         table->file->print_error(error, MYF(0));
649         DBUG_RETURN(error);
650       }
651     }
652 
653     /*
654        Now, table->record[1] should contain the offending row.  That
655        will enable us to update it or, alternatively, delete it (so
656        that we can insert the new row afterwards).
657 
658        First we copy the columns into table->record[0] that are not
659        present on the master from table->record[1], if there are any.
660     */
661     copy_extra_record_fields(table, master_reclength, master_fields);
662 
663     /*
664        REPLACE is defined as either INSERT or DELETE + INSERT.  If
665        possible, we can replace it with an UPDATE, but that will not
666        work on InnoDB if FOREIGN KEY checks are necessary.
667 
668        I (Matz) am not sure of the reason for the last_uniq_key()
669        check as, but I'm guessing that it's something along the
670        following lines.
671 
672        Suppose that we got the duplicate key to be a key that is not
673        the last unique key for the table and we perform an update:
674        then there might be another key for which the unique check will
675        fail, so we're better off just deleting the row and inserting
676        the correct row.
677      */
678     if (last_uniq_key(table, keynum) &&
679         !table->file->referenced_by_foreign_key())
680     {
681       error=table->file->ha_update_row(table->record[1],
682                                        table->record[0]);
683       if (error && error != HA_ERR_RECORD_IS_THE_SAME)
684         table->file->print_error(error, MYF(0));
685       else
686         error= 0;
687       DBUG_RETURN(error);
688     }
689     else
690     {
691       if ((error= table->file->ha_delete_row(table->record[1])))
692       {
693         table->file->print_error(error, MYF(0));
694         DBUG_RETURN(error);
695       }
696       /* Will retry ha_write_row() with the offending row removed. */
697     }
698   }
699 
700   DBUG_RETURN(error);
701 }
702 
703 
704 /**
705   Find the row given by 'key', if the table has keys, or else use a table scan
706   to find (and fetch) the row.
707 
708   If the engine allows random access of the records, a combination of
709   position() and rnd_pos() will be used.
710 
711   @param table Pointer to table to search
712   @param key   Pointer to key to use for search, if table has key
713 
714   @pre <code>table->record[0]</code> shall contain the row to locate
715   and <code>key</code> shall contain a key to use for searching, if
716   the engine has a key.
717 
718   @post If the return value is zero, <code>table->record[1]</code>
719   will contain the fetched row and the internal "cursor" will refer to
720   the row. If the return value is non-zero,
721   <code>table->record[1]</code> is undefined.  In either case,
722   <code>table->record[0]</code> is undefined.
723 
724   @return Zero if the row was successfully fetched into
725   <code>table->record[1]</code>, error code otherwise.
726  */
727 
find_and_fetch_row(TABLE * table,uchar * key)728 static int find_and_fetch_row(TABLE *table, uchar *key)
729 {
730   DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
731   DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx  record: 0x%lx",
732            (long) table, (long) key, (long) table->record[1]));
733 
734   assert(table->in_use != NULL);
735 
736   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
737 
738   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
739       table->s->primary_key < MAX_KEY)
740   {
741     /*
742       Use a more efficient method to fetch the record given by
743       table->record[0] if the engine allows it.  We first compute a
744       row reference using the position() member function (it will be
745       stored in table->file->ref) and the use rnd_pos() to position
746       the "cursor" (i.e., record[0] in this case) at the correct row.
747 
748       TODO: Add a check that the correct record has been fetched by
749       comparing with the original record. Take into account that the
750       record on the master and slave can be of different
751       length. Something along these lines should work:
752 
753       ADD>>>  store_record(table,record[1]);
754               int error= table->file->rnd_pos(table->record[0], table->file->ref);
755               ADD>>>  assert(memcmp(table->record[1], table->record[0],
756               table->s->reclength) == 0);
757 
758     */
759     table->file->position(table->record[0]);
760     int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
761     /*
762       ha_rnd_pos() returns the record in table->record[0], so we have to
763       move it to table->record[1].
764      */
765     memcpy(table->record[1], table->record[0], table->s->reclength);
766     DBUG_RETURN(error);
767   }
768 
769   /* We need to retrieve all fields */
770   /* TODO: Move this out from this function to main loop */
771   table->use_all_columns();
772 
773   if (table->s->keys > 0)
774   {
775     int error;
776     /* We have a key: search the table using the index */
777     if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
778     {
779       table->file->print_error(error, MYF(0));
780       DBUG_RETURN(error);
781     }
782 
783     DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
784     DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
785 
786     /*
787       We need to set the null bytes to ensure that the filler bit are
788       all set when returning.  There are storage engines that just set
789       the necessary bits on the bytes and don't set the filler bits
790       correctly.
791     */
792     my_ptrdiff_t const pos=
793       table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
794     table->record[1][pos]= 0xFF;
795     if ((error= table->file->ha_index_read_map(table->record[1], key, HA_WHOLE_KEY,
796                                                HA_READ_KEY_EXACT)))
797     {
798       table->file->print_error(error, MYF(0));
799       table->file->ha_index_end();
800       DBUG_RETURN(error);
801     }
802 
803     DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
804     DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
805     /*
806       Below is a minor "optimization".  If the key (i.e., key number
807       0) has the HA_NOSAME flag set, we know that we have found the
808       correct record (since there can be no duplicates); otherwise, we
809       have to compare the record with the one found to see if it is
810       the correct one.
811 
812       CAVEAT! This behaviour is essential for the replication of,
813       e.g., the mysql.proc table since the correct record *shall* be
814       found using the primary key *only*.  There shall be no
815       comparison of non-PK columns to decide if the correct record is
816       found.  I can see no scenario where it would be incorrect to
817       chose the row to change only using a PK or an UNNI.
818     */
819     if (table->key_info->flags & HA_NOSAME)
820     {
821       table->file->ha_index_end();
822       DBUG_RETURN(0);
823     }
824 
825     while (record_compare(table))
826     {
827       int error;
828 
829       /*
830         We need to set the null bytes to ensure that the filler bit
831         are all set when returning.  There are storage engines that
832         just set the necessary bits on the bytes and don't set the
833         filler bits correctly.
834 
835         TODO[record format ndb]: Remove this code once NDB returns the
836         correct record format.
837       */
838       if (table->s->null_bytes > 0)
839       {
840         table->record[1][table->s->null_bytes - 1]|=
841           256U - (1U << table->s->last_null_bit_pos);
842       }
843 
844       while ((error= table->file->ha_index_next(table->record[1])))
845       {
846         /* We just skip records that has already been deleted */
847         if (error == HA_ERR_RECORD_DELETED)
848           continue;
849         table->file->print_error(error, MYF(0));
850         table->file->ha_index_end();
851         DBUG_RETURN(error);
852       }
853     }
854 
855     /*
856       Have to restart the scan to be able to fetch the next row.
857     */
858     table->file->ha_index_end();
859   }
860   else
861   {
862     int restart_count= 0; // Number of times scanning has restarted from top
863     int error;
864 
865     /* We don't have a key: search the table using ha_rnd_next() */
866     if ((error= table->file->ha_rnd_init(1)))
867     {
868       table->file->print_error(error, MYF(0));
869       DBUG_RETURN(error);
870     }
871 
872     /* Continue until we find the right record or have made a full loop */
873     do
874     {
875   restart_ha_rnd_next:
876       error= table->file->ha_rnd_next(table->record[1]);
877 
878       DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
879       DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
880 
881       switch (error) {
882       case 0:
883         break;
884 
885       /*
886         If the record was deleted, we pick the next one without doing
887         any comparisons.
888       */
889       case HA_ERR_RECORD_DELETED:
890         goto restart_ha_rnd_next;
891 
892       case HA_ERR_END_OF_FILE:
893         if (++restart_count < 2)
894         {
895           if ((error= table->file->ha_rnd_init(1)))
896           {
897             table->file->print_error(error, MYF(0));
898             DBUG_RETURN(error);
899           }
900         }
901        break;
902 
903       default:
904         table->file->print_error(error, MYF(0));
905         DBUG_PRINT("info", ("Record not found"));
906         (void) table->file->ha_rnd_end();
907         DBUG_RETURN(error);
908       }
909     }
910     while (restart_count < 2 && record_compare(table));
911 
912     /*
913       Have to restart the scan to be able to fetch the next row.
914     */
915     DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
916     table->file->ha_rnd_end();
917 
918     assert(error == HA_ERR_END_OF_FILE || error == 0);
919     DBUG_RETURN(error);
920   }
921 
922   DBUG_RETURN(0);
923 }
924 
925 
926 /**********************************************************
927   Row handling primitives for Write_rows_log_event_old
928  **********************************************************/
929 
do_before_row_operations(TABLE * table)930 int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
931 {
932   int error= 0;
933 
934   /*
935     We are using REPLACE semantics and not INSERT IGNORE semantics
936     when writing rows, that is: new rows replace old rows.  We need to
937     inform the storage engine that it should use this behaviour.
938   */
939 
940   /* Tell the storage engine that we are using REPLACE semantics. */
941   thd->lex->duplicates= DUP_REPLACE;
942 
943   /*
944     Pretend we're executing a REPLACE command: this is needed for
945     InnoDB and NDB Cluster since they are not (properly) checking the
946     lex->duplicates flag.
947   */
948   thd->lex->sql_command= SQLCOM_REPLACE;
949   /*
950      Do not raise the error flag in case of hitting to an unique attribute
951   */
952   table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
953   /*
954      NDB specific: update from ndb master wrapped as Write_rows
955   */
956   /*
957     so that the event should be applied to replace slave's row
958   */
959   table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
960   /*
961      NDB specific: if update from ndb master wrapped as Write_rows
962      does not find the row it's assumed idempotent binlog applying
963      is taking place; don't raise the error.
964   */
965   table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
966   /*
967     TODO: the cluster team (Tomas?) says that it's better if the engine knows
968     how many rows are going to be inserted, then it can allocate needed memory
969     from the start.
970   */
971   table->file->ha_start_bulk_insert(0);
972   return error;
973 }
974 
975 
do_after_row_operations(TABLE * table,int error)976 int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
977 {
978   int local_error= 0;
979   table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
980   table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
981   /*
982     reseting the extra with
983     table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
984     fires bug#27077
985     todo: explain or fix
986   */
987   if ((local_error= table->file->ha_end_bulk_insert()))
988   {
989     table->file->print_error(local_error, MYF(0));
990   }
991   return error? error : local_error;
992 }
993 
994 
995 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)996 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
997                                          Relay_log_info const *rli,
998                                          TABLE *table,
999                                          uchar const *row_start,
1000                                          uchar const **row_end)
1001 {
1002   assert(table != NULL);
1003   assert(row_start && row_end);
1004 
1005   int error;
1006   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1007                         table, m_width, table->record[0],
1008                         row_start, &m_cols, row_end, &m_master_reclength,
1009                         table->write_set, binary_log::PRE_GA_WRITE_ROWS_EVENT);
1010   bitmap_copy(table->read_set, table->write_set);
1011   return error;
1012 }
1013 
1014 
do_exec_row(TABLE * table)1015 int Write_rows_log_event_old::do_exec_row(TABLE *table)
1016 {
1017   assert(table != NULL);
1018   int error= replace_record(thd, table, m_master_reclength, m_width);
1019   return error;
1020 }
1021 
1022 
1023 /**********************************************************
1024   Row handling primitives for Delete_rows_log_event_old
1025  **********************************************************/
1026 
do_before_row_operations(TABLE * table)1027 int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
1028 {
1029   assert(m_memory == NULL);
1030 
1031   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
1032       table->s->primary_key < MAX_KEY)
1033   {
1034     /*
1035       We don't need to allocate any memory for m_after_image and
1036       m_key since they are not used.
1037     */
1038     return 0;
1039   }
1040 
1041   int error= 0;
1042 
1043   if (table->s->keys > 0)
1044   {
1045     m_memory= (uchar*) my_multi_malloc(key_memory_log_event_old,
1046                                        MYF(MY_WME),
1047                                        &m_after_image,
1048                                        (uint) table->s->reclength,
1049                                        &m_key,
1050                                        (uint) table->key_info->key_length,
1051                                        NullS);
1052   }
1053   else
1054   {
1055     m_after_image= (uchar*) my_malloc(key_memory_log_event_old,
1056                                       table->s->reclength, MYF(MY_WME));
1057     m_memory= m_after_image;
1058     m_key= NULL;
1059   }
1060   if (!m_memory)
1061     return HA_ERR_OUT_OF_MEM;
1062 
1063   return error;
1064 }
1065 
1066 
do_after_row_operations(TABLE * table,int error)1067 int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1068 {
1069   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1070   table->file->ha_index_or_rnd_end();
1071   my_free(m_memory); // Free for multi_malloc
1072   m_memory= NULL;
1073   m_after_image= NULL;
1074   m_key= NULL;
1075 
1076   return error;
1077 }
1078 
1079 
1080 int
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1081 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
1082                                           Relay_log_info const *rli,
1083                                           TABLE *table,
1084                                           uchar const *row_start,
1085                                           uchar const **row_end)
1086 {
1087   int error;
1088   assert(row_start && row_end);
1089   /*
1090     This assertion actually checks that there is at least as many
1091     columns on the slave as on the master.
1092   */
1093   assert(table->s->fields >= m_width);
1094 
1095   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1096                         table, m_width, table->record[0],
1097                         row_start, &m_cols, row_end, &m_master_reclength,
1098                         table->read_set, binary_log::PRE_GA_DELETE_ROWS_EVENT);
1099   /*
1100     If we will access rows using the random access method, m_key will
1101     be set to NULL, so we do not need to make a key copy in that case.
1102    */
1103   if (m_key)
1104   {
1105     KEY *const key_info= table->key_info;
1106 
1107     key_copy(m_key, table->record[0], key_info, 0);
1108   }
1109 
1110   return error;
1111 }
1112 
1113 
do_exec_row(TABLE * table)1114 int Delete_rows_log_event_old::do_exec_row(TABLE *table)
1115 {
1116   int error;
1117   assert(table != NULL);
1118 
1119   if (!(error= ::find_and_fetch_row(table, m_key)))
1120   {
1121     /*
1122       Now we should have the right row to delete.  We are using
1123       record[0] since it is guaranteed to point to a record with the
1124       correct value.
1125     */
1126     error= table->file->ha_delete_row(table->record[0]);
1127   }
1128   return error;
1129 }
1130 
1131 
1132 /**********************************************************
1133   Row handling primitives for Update_rows_log_event_old
1134  **********************************************************/
1135 
do_before_row_operations(TABLE * table)1136 int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
1137 {
1138   assert(m_memory == NULL);
1139 
1140   int error= 0;
1141 
1142   if (table->s->keys > 0)
1143   {
1144     m_memory= (uchar*) my_multi_malloc(key_memory_log_event_old,
1145                                        MYF(MY_WME),
1146                                        &m_after_image,
1147                                        (uint) table->s->reclength,
1148                                        &m_key,
1149                                        (uint) table->key_info->key_length,
1150                                        NullS);
1151   }
1152   else
1153   {
1154     m_after_image= (uchar*) my_malloc(key_memory_log_event_old,
1155                                       table->s->reclength, MYF(MY_WME));
1156     m_memory= m_after_image;
1157     m_key= NULL;
1158   }
1159   if (!m_memory)
1160     return HA_ERR_OUT_OF_MEM;
1161 
1162   return error;
1163 }
1164 
1165 
do_after_row_operations(TABLE * table,int error)1166 int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1167 {
1168   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1169   table->file->ha_index_or_rnd_end();
1170   my_free(m_memory);
1171   m_memory= NULL;
1172   m_after_image= NULL;
1173   m_key= NULL;
1174 
1175   return error;
1176 }
1177 
1178 
do_prepare_row(THD * thd_arg,Relay_log_info const * rli,TABLE * table,uchar const * row_start,uchar const ** row_end)1179 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1180                                               Relay_log_info const *rli,
1181                                               TABLE *table,
1182                                               uchar const *row_start,
1183                                               uchar const **row_end)
1184 {
1185   int error;
1186   assert(row_start && row_end);
1187   /*
1188     This assertion actually checks that there is at least as many
1189     columns on the slave as on the master.
1190   */
1191   assert(table->s->fields >= m_width);
1192 
1193   /* record[0] is the before image for the update */
1194   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1195                         table, m_width, table->record[0],
1196                         row_start, &m_cols, row_end, &m_master_reclength,
1197                         table->read_set, binary_log::PRE_GA_UPDATE_ROWS_EVENT);
1198   row_start = *row_end;
1199   /* m_after_image is the after image for the update */
1200   error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1201                         table, m_width, m_after_image,
1202                         row_start, &m_cols, row_end, &m_master_reclength,
1203                         table->write_set, binary_log::PRE_GA_UPDATE_ROWS_EVENT);
1204 
1205   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1206   DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1207 
1208   /*
1209     If we will access rows using the random access method, m_key will
1210     be set to NULL, so we do not need to make a key copy in that case.
1211    */
1212   if (m_key)
1213   {
1214     KEY *const key_info= table->key_info;
1215 
1216     key_copy(m_key, table->record[0], key_info, 0);
1217   }
1218 
1219   return error;
1220 }
1221 
1222 
do_exec_row(TABLE * table)1223 int Update_rows_log_event_old::do_exec_row(TABLE *table)
1224 {
1225   assert(table != NULL);
1226 
1227   int error= ::find_and_fetch_row(table, m_key);
1228   if (error)
1229     return error;
1230 
1231   /*
1232     We have to ensure that the new record (i.e., the after image) is
1233     in record[0] and the old record (i.e., the before image) is in
1234     record[1].  This since some storage engines require this (for
1235     example, the partition engine).
1236 
1237     Since find_and_fetch_row() puts the fetched record (i.e., the old
1238     record) in record[1], we can keep it there. We put the new record
1239     (i.e., the after image) into record[0], and copy the fields that
1240     are on the slave (i.e., in record[1]) into record[0], effectively
1241     overwriting the default values that where put there by the
1242     unpack_row() function.
1243   */
1244   memcpy(table->record[0], m_after_image, table->s->reclength);
1245   copy_extra_record_fields(table, m_master_reclength, m_width);
1246 
1247   /*
1248     Now we have the right row to update.  The old row (the one we're
1249     looking for) is in record[1] and the new row has is in record[0].
1250     We also have copied the original values already in the slave's
1251     database into the after image delivered from the master.
1252   */
1253   error= table->file->ha_update_row(table->record[1], table->record[0]);
1254   if (error == HA_ERR_RECORD_IS_THE_SAME)
1255     error= 0;
1256 
1257   return error;
1258 }
1259 
1260 #endif
1261 
1262 
1263 /**************************************************************************
1264 	Rows_log_event member functions
1265 **************************************************************************/
1266 
1267 #ifndef MYSQL_CLIENT
1268 /**
1269   The event header and footer is constructed before constructing an object of
1270   a specific event type. The header and footer are constructed in the base
1271   class, Binary_log_event. They are accessed using the const methods header()
1272   and footer() defined in Binary_log_event.
1273 
1274   This constructor calls the header() and footer() methods to initialize
1275   Log_event::common_header and Log_event::common_footer respectively.
1276 
1277   We don't know the exact type of the event when we call
1278   this constructor, so passing ENUM_END_EVENT as the type here.
1279 */
Old_rows_log_event(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool using_trans)1280 Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1281                                        MY_BITMAP const *cols,
1282                                        bool using_trans)
1283   : Binary_log_event(binary_log::ENUM_END_EVENT),
1284     Log_event(thd_arg, 0,
1285               using_trans ? Log_event::EVENT_TRANSACTIONAL_CACHE :
1286                             Log_event::EVENT_STMT_CACHE,
1287               Log_event::EVENT_NORMAL_LOGGING, header(),
1288               footer()),
1289     m_row_count(0),
1290     m_table(tbl_arg),
1291     m_table_id(tid),
1292     m_width(tbl_arg ? tbl_arg->s->fields : 1),
1293     m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1294 #ifdef HAVE_REPLICATION
1295     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1296 #endif
1297 {
1298 
1299   // This constructor should not be reached.
1300   assert(0);
1301 
1302   /*
1303     We allow a special form of dummy event when the table, and cols
1304     are null and the table id is ~0UL.  This is a temporary
1305     solution, to be able to terminate a started statement in the
1306     binary log: the extraneous events will be removed in the future.
1307    */
1308   assert((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1309          (!tbl_arg && !cols && tid == ~0UL));
1310 
1311   if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1312       set_flags(NO_FOREIGN_KEY_CHECKS_F);
1313   if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1314       set_flags(RELAXED_UNIQUE_CHECKS_F);
1315   /* if bitmap_init fails, caught in is_valid() */
1316   if (likely(!bitmap_init(&m_cols,
1317                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1318                           m_width,
1319                           false)))
1320   {
1321     /* Cols can be zero if this is a dummy binrows event */
1322     if (likely(cols != NULL))
1323     {
1324       memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1325       create_last_word_mask(&m_cols);
1326     }
1327   }
1328   else
1329   {
1330     // Needed because bitmap_init() does not set it to null on failure
1331     m_cols.bitmap= 0;
1332   }
1333 }
1334 #endif
1335 
1336 Old_rows_log_event::
Old_rows_log_event(const char * buf,uint event_len,Log_event_type event_type,const Format_description_event * description_event)1337 Old_rows_log_event(const char *buf, uint event_len, Log_event_type event_type,
1338                    const Format_description_event *description_event)
1339  : Binary_log_event(&buf, description_event->binlog_version,
1340                      description_event->server_version),
1341    Log_event(header(), footer()),
1342     m_row_count(0),
1343 #ifndef MYSQL_CLIENT
1344     m_table(NULL),
1345 #endif
1346     m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1347 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1348     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1349 #endif
1350 {
1351   DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1352   uint8 const common_header_len= description_event->common_header_len;
1353   uint8 const post_header_len= description_event->post_header_len[event_type-1];
1354 
1355   DBUG_PRINT("enter",("event_len: %u  common_header_len: %d  "
1356 		      "post_header_len: %d",
1357 		      event_len, common_header_len,
1358 		      post_header_len));
1359 
1360   const char *post_start= buf + common_header_len;
1361   DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1362   post_start+= ROWS_MAPID_OFFSET;
1363   if (post_header_len == 6)
1364   {
1365     /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1366     m_table_id= uint4korr(post_start);
1367     post_start+= 4;
1368   }
1369   else
1370   {
1371     m_table_id= (ulong) uint6korr(post_start);
1372     post_start+= ROWS_FLAGS_OFFSET;
1373   }
1374 
1375   m_flags= uint2korr(post_start);
1376 
1377   uchar const *const var_start=
1378     (const uchar *)buf + common_header_len + post_header_len;
1379   uchar const *const ptr_width= var_start;
1380   uchar *ptr_after_width= (uchar*) ptr_width;
1381   DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1382   m_width = net_field_length(&ptr_after_width);
1383   DBUG_PRINT("debug", ("m_width=%lu", m_width));
1384   /* Avoid reading out of buffer */
1385   if (m_width + (ptr_after_width - (const uchar *)buf) > event_len)
1386   {
1387     m_cols.bitmap= NULL;
1388     DBUG_VOID_RETURN;
1389   }
1390 
1391   /* if bitmap_init fails, catched in is_valid() */
1392   if (likely(!bitmap_init(&m_cols,
1393                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1394                           m_width,
1395                           false)))
1396   {
1397     DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1398     memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1399     create_last_word_mask(&m_cols);
1400     ptr_after_width+= (m_width + 7) / 8;
1401     DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1402   }
1403   else
1404   {
1405     // Needed because bitmap_init() does not set it to null on failure
1406     m_cols.bitmap= NULL;
1407     DBUG_VOID_RETURN;
1408   }
1409 
1410   const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1411   size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1412   DBUG_PRINT("info",("m_table_id: %llu  m_flags: %d  m_width: %lu  data_size: %lu",
1413                      m_table_id.id(), m_flags, m_width, (ulong) data_size));
1414   DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1415 
1416   m_rows_buf= (uchar*) my_malloc(key_memory_log_event_old,
1417                                  data_size, MYF(MY_WME));
1418   if (likely((bool)m_rows_buf))
1419   {
1420 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1421     m_curr_row= m_rows_buf;
1422 #endif
1423     m_rows_end= m_rows_buf + data_size;
1424     m_rows_cur= m_rows_end;
1425     memcpy(m_rows_buf, ptr_rows_data, data_size);
1426   }
1427   else
1428     m_cols.bitmap= 0; // to not free it
1429 
1430   DBUG_VOID_RETURN;
1431 }
1432 
1433 
~Old_rows_log_event()1434 Old_rows_log_event::~Old_rows_log_event()
1435 {
1436   if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1437     m_cols.bitmap= 0; // so no my_free in bitmap_free
1438   bitmap_free(&m_cols); // To pair with bitmap_init().
1439   my_free(m_rows_buf);
1440 }
1441 
1442 
get_data_size()1443 size_t Old_rows_log_event::get_data_size()
1444 {
1445   uchar buf[sizeof(m_width)+1];
1446   uchar *end= net_store_length(buf, (m_width + 7) / 8);
1447 
1448   DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1449                   return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
1450                   (m_rows_cur - m_rows_buf););
1451   int data_size= Binary_log_event::ROWS_HEADER_LEN;
1452   data_size+= no_bytes_in_map(&m_cols);
1453   data_size+= (uint) (end - buf);
1454 
1455   data_size+= (uint) (m_rows_cur - m_rows_buf);
1456   return data_size;
1457 }
1458 
1459 
1460 #ifndef MYSQL_CLIENT
do_add_row_data(uchar * row_data,size_t length)1461 int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1462 {
1463   /*
1464     When the table has a primary key, we would probably want, by default, to
1465     log only the primary key value instead of the entire "before image". This
1466     would save binlog space. TODO
1467   */
1468   DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1469   DBUG_PRINT("enter", ("row_data: 0x%lx  length: %lu", (ulong) row_data,
1470                        (ulong) length));
1471   DBUG_DUMP("row_data", row_data, min<size_t>(length, 32));
1472 
1473   assert(m_rows_buf <= m_rows_cur);
1474   assert(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1475   assert(m_rows_cur <= m_rows_end);
1476 
1477   /* The cast will always work since m_rows_cur <= m_rows_end */
1478   if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1479   {
1480     size_t const block_size= 1024;
1481     my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1482     my_ptrdiff_t const new_alloc=
1483         block_size * ((cur_size + length + block_size - 1) / block_size);
1484 
1485     uchar* const new_buf= (uchar*)my_realloc(key_memory_log_event_old,
1486                                              m_rows_buf, (uint) new_alloc,
1487                                            MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1488     if (unlikely(!new_buf))
1489       DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1490 
1491     /* If the memory moved, we need to move the pointers */
1492     if (new_buf != m_rows_buf)
1493     {
1494       m_rows_buf= new_buf;
1495       m_rows_cur= m_rows_buf + cur_size;
1496     }
1497 
1498     /*
1499        The end pointer should always be changed to point to the end of
1500        the allocated memory.
1501     */
1502     m_rows_end= m_rows_buf + new_alloc;
1503   }
1504 
1505   assert(m_rows_cur + length <= m_rows_end);
1506   memcpy(m_rows_cur, row_data, length);
1507   m_rows_cur+= length;
1508   m_row_count++;
1509   DBUG_RETURN(0);
1510 }
1511 #endif
1512 
1513 
1514 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
do_apply_event(Relay_log_info const * rli)1515 int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
1516 {
1517   DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1518   int error= 0;
1519 
1520   /*
1521     If m_table_id == ~0U or max 6 Bytes integer, then we have a dummy event that
1522     does not contain any data.  In that case, we just remove all tables in the
1523     tables_to_lock list, close the thread tables, and return with
1524     success.
1525    */
1526   if ((m_table_id.id() == ~0U || m_table_id.id() == (~0ULL >> 16)) &&
1527       m_cols.n_bits == 1 && m_cols.bitmap[0] == 0)
1528   {
1529     /*
1530        This one is supposed to be set: just an extra check so that
1531        nothing strange has happened.
1532      */
1533     assert(get_flags(STMT_END_F));
1534 
1535     const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1536     thd->clear_error();
1537     DBUG_RETURN(0);
1538   }
1539 
1540   /*
1541     'thd' has been set by exec_relay_log_event(), just before calling
1542     do_apply_event(). We still check here to prevent future coding
1543     errors.
1544   */
1545   assert(rli->info_thd == thd);
1546 
1547   /*
1548     If there is no locks taken, this is the first binrow event seen
1549     after the table map events.  We should then lock all the tables
1550     used in the transaction and proceed with execution of the actual
1551     event.
1552   */
1553   if (!thd->lock)
1554   {
1555     /*
1556       lock_tables() reads the contents of thd->lex, so they must be
1557       initialized. Contrary to in
1558       Table_map_log_event::do_apply_event() we don't call
1559       mysql_init_query() as that may reset the binlog format.
1560     */
1561     lex_start(thd);
1562 
1563     if ((error= lock_tables(thd, rli->tables_to_lock,
1564                                rli->tables_to_lock_count, 0)))
1565     {
1566       if (thd->is_slave_error || thd->is_fatal_error)
1567       {
1568         /*
1569           Error reporting borrowed from Query_log_event with many excessive
1570           simplifications (we don't honour --slave-skip-errors)
1571         */
1572         uint actual_error= thd->get_protocol_classic()->get_last_errno();
1573         rli->report(ERROR_LEVEL, actual_error,
1574                     "Error '%s' in %s event: when locking tables",
1575                     (actual_error ?
1576                      thd->get_protocol_classic()->get_last_error() :
1577                      "unexpected success or fatal error"),
1578                     get_type_str());
1579         thd->is_fatal_error= 1;
1580       }
1581       else
1582       {
1583         rli->report(ERROR_LEVEL, error,
1584                     "Error in %s event: when locking tables",
1585                     get_type_str());
1586       }
1587       const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1588       DBUG_RETURN(error);
1589     }
1590 
1591     /*
1592       When the open and locking succeeded, we check all tables to
1593       ensure that they still have the correct type.
1594     */
1595 
1596     {
1597       TABLE_LIST *table_list_ptr= rli->tables_to_lock;
1598       for (uint i=0; table_list_ptr&& (i< rli->tables_to_lock_count);
1599            table_list_ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr->next_global), i++)
1600       {
1601         /*
1602           Please see comment in log_event.cc-Rows_log_event::do_apply_event()
1603           function for the explanation of the below if condition
1604         */
1605         if (table_list_ptr->parent_l)
1606           continue;
1607         /*
1608           We can use a down cast here since we know that every table added
1609           to the tables_to_lock is a RPL_TABLE_LIST (or child table which is
1610           skipped above).
1611         */
1612         RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
1613         TABLE *conv_table;
1614         if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
1615                                             ptr->table, &conv_table))
1616         {
1617           thd->is_slave_error= 1;
1618           const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
1619           DBUG_RETURN(ERR_BAD_TABLE_DEF);
1620         }
1621         ptr->m_conv_table= conv_table;
1622       }
1623     }
1624 
1625     /*
1626       ... and then we add all the tables to the table map but keep
1627       them in the tables to lock list.
1628 
1629 
1630       We also invalidate the query cache for all the tables, since
1631       they will now be changed.
1632 
1633       TODO [/Matz]: Maybe the query cache should not be invalidated
1634       here? It might be that a table is not changed, even though it
1635       was locked for the statement.  We do know that each
1636       Old_rows_log_event contain at least one row, so after processing one
1637       Old_rows_log_event, we can invalidate the query cache for the
1638       associated table.
1639      */
1640     for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
1641     {
1642       const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
1643     }
1644     query_cache.invalidate_locked_for_write(rli->tables_to_lock);
1645   }
1646 
1647   TABLE*
1648     table=
1649     m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
1650 
1651   if (table)
1652   {
1653     /*
1654       table == NULL means that this table should not be replicated
1655       (this was set up by Table_map_log_event::do_apply_event()
1656       which tested replicate-* rules).
1657     */
1658 
1659     /*
1660       It's not needed to set_time() but
1661       1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1662       much slave is behind
1663       2) it will be needed when we allow replication from a table with no
1664       TIMESTAMP column to a table with one.
1665       So we call set_time(), like in SBR. Presently it changes nothing.
1666     */
1667     thd->set_time(&(common_header->when));
1668     /*
1669       There are a few flags that are replicated with each row event.
1670       Make sure to set/clear them before executing the main body of
1671       the event.
1672     */
1673     if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1674         thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1675     else
1676         thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1677 
1678     if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1679         thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1680     else
1681         thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1682     /* A small test to verify that objects have consistent types */
1683     assert(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1684 
1685     /*
1686       Now we are in a statement and will stay in a statement until we
1687       see a STMT_END_F.
1688 
1689       We set this flag here, before actually applying any rows, in
1690       case the SQL thread is stopped and we need to detect that we're
1691       inside a statement and halting abruptly might cause problems
1692       when restarting.
1693      */
1694     const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
1695 
1696      if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1697       set_flags(COMPLETE_ROWS_F);
1698 
1699     /*
1700       Set tables write and read sets.
1701 
1702       Read_set contains all slave columns (in case we are going to fetch
1703       a complete record from slave)
1704 
1705       Write_set equals the m_cols bitmap sent from master but it can be
1706       longer if slave has extra columns.
1707      */
1708 
1709     DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1710 
1711     bitmap_set_all(table->read_set);
1712     bitmap_set_all(table->write_set);
1713     if (!get_flags(COMPLETE_ROWS_F))
1714       bitmap_intersect(table->write_set,&m_cols);
1715 
1716     // Do event specific preparations
1717 
1718     error= do_before_row_operations(rli);
1719 
1720     // row processing loop
1721 
1722     while (error == 0 && m_curr_row < m_rows_end)
1723     {
1724       /* in_use can have been set to NULL in close_tables_for_reopen */
1725       THD* old_thd= table->in_use;
1726       if (!table->in_use)
1727         table->in_use= thd;
1728 
1729       error= do_exec_row(rli);
1730 
1731       DBUG_PRINT("info", ("error: %d", error));
1732       assert(error != HA_ERR_RECORD_DELETED);
1733 
1734       table->in_use = old_thd;
1735       switch (error)
1736       {
1737       case 0:
1738 	break;
1739 
1740       /* Some recoverable errors */
1741       case HA_ERR_RECORD_CHANGED:
1742       case HA_ERR_KEY_NOT_FOUND:	/* Idempotency support: OK if
1743                                            tuple does not exist */
1744         error= 0;
1745         break;
1746 
1747       default:
1748         rli->report(ERROR_LEVEL,
1749           thd->get_protocol_classic()->get_last_errno(),
1750           "Error in %s event: row application failed. %s",
1751           get_type_str(), thd->get_protocol_classic()->get_last_error());
1752         thd->is_slave_error= 1;
1753         break;
1754       }
1755 
1756       /*
1757        If m_curr_row_end  was not set during event execution (e.g., because
1758        of errors) we can't proceed to the next row. If the error is transient
1759        (i.e., error==0 at this point) we must call unpack_current_row() to set
1760        m_curr_row_end.
1761       */
1762 
1763       DBUG_PRINT("info", ("error: %d", error));
1764       DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
1765                           (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
1766 
1767       if (!m_curr_row_end && !error)
1768         unpack_current_row(rli);
1769 
1770       // at this moment m_curr_row_end should be set
1771       assert(error || m_curr_row_end != NULL);
1772       assert(error || m_curr_row < m_curr_row_end);
1773       assert(error || m_curr_row_end <= m_rows_end);
1774 
1775       m_curr_row= m_curr_row_end;
1776 
1777     } // row processing loop
1778 
1779     DBUG_EXECUTE_IF("stop_slave_middle_group",
1780                     const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1781     error= do_after_row_operations(rli, error);
1782   } // if (table)
1783 
1784   if (error)
1785   {                     /* error has occured during the transaction */
1786     rli->report(ERROR_LEVEL,
1787                 thd->get_protocol_classic()->get_last_errno(),
1788                 "Error in %s event: error during transaction execution "
1789                 "on table %s.%s. %s",
1790                 get_type_str(), table->s->db.str, table->s->table_name.str,
1791                 thd->get_protocol_classic()->get_last_error());
1792 
1793     /*
1794       If one day we honour --skip-slave-errors in row-based replication, and
1795       the error should be skipped, then we would clear mappings, rollback,
1796       close tables, but the slave SQL thread would not stop and then may
1797       assume the mapping is still available, the tables are still open...
1798       So then we should clear mappings/rollback/close here only if this is a
1799       STMT_END_F.
1800       For now we code, knowing that error is not skippable and so slave SQL
1801       thread is certainly going to stop.
1802       rollback at the caller along with sbr.
1803     */
1804     thd->reset_current_stmt_binlog_format_row();
1805     const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
1806     thd->is_slave_error= 1;
1807     DBUG_RETURN(error);
1808   }
1809 
1810   /*
1811     This code would ideally be placed in do_update_pos() instead, but
1812     since we have no access to table there, we do the setting of
1813     last_event_start_time here instead.
1814   */
1815   if (table && (table->s->primary_key == MAX_KEY) &&
1816       !is_using_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1817   {
1818     /*
1819       ------------ Temporary fix until WL#2975 is implemented ---------
1820 
1821       This event is not the last one (no STMT_END_F). If we stop now
1822       (in case of terminate_slave_thread()), how will we restart? We
1823       have to restart from Table_map_log_event, but as this table is
1824       not transactional, the rows already inserted will still be
1825       present, and idempotency is not guaranteed (no PK) so we risk
1826       that repeating leads to double insert. So we desperately try to
1827       continue, hope we'll eventually leave this buggy situation (by
1828       executing the final Old_rows_log_event). If we are in a hopeless
1829       wait (reached end of last relay log and nothing gets appended
1830       there), we timeout after one minute, and notify DBA about the
1831       problem.  When WL#2975 is implemented, just remove the member
1832       Relay_log_info::last_event_start_time and all its occurrences.
1833     */
1834     const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
1835   }
1836 
1837   if (get_flags(STMT_END_F))
1838   {
1839     /*
1840       This is the end of a statement or transaction, so close (and
1841       unlock) the tables we opened when processing the
1842       Table_map_log_event starting the statement.
1843 
1844       OBSERVER.  This will clear *all* mappings, not only those that
1845       are open for the table. There is not good handle for on-close
1846       actions for tables.
1847 
1848       NOTE. Even if we have no table ('table' == 0) we still need to be
1849       here, so that we increase the group relay log position. If we didn't, we
1850       could have a group relay log position which lags behind "forever"
1851       (assume the last master's transaction is ignored by the slave because of
1852       replicate-ignore rules).
1853     */
1854     int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1855 
1856     /*
1857       If this event is not in a transaction, the call below will, if some
1858       transactional storage engines are involved, commit the statement into
1859       them and flush the pending event to binlog.
1860       If this event is in a transaction, the call will do nothing, but a
1861       Xid_log_event will come next which will, if some transactional engines
1862       are involved, commit the transaction and flush the pending event to the
1863       binlog.
1864       If there was a deadlock the transaction should have been rolled back
1865       already. So there should be no need to rollback the transaction.
1866     */
1867     assert(! thd->transaction_rollback_request);
1868     if ((error= (binlog_error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd))))
1869       rli->report(ERROR_LEVEL, error,
1870                   "Error in %s event: commit of row events failed, "
1871                   "table `%s`.`%s`",
1872                   get_type_str(), m_table->s->db.str,
1873                   m_table->s->table_name.str);
1874     error|= binlog_error;
1875 
1876     /*
1877       Now what if this is not a transactional engine? we still need to
1878       flush the pending event to the binlog; we did it with
1879       thd->binlog_flush_pending_rows_event(). Note that we imitate
1880       what is done for real queries: a call to
1881       ha_autocommit_or_rollback() (sometimes only if involves a
1882       transactional engine), and a call to be sure to have the pending
1883       event flushed.
1884     */
1885 
1886     thd->reset_current_stmt_binlog_format_row();
1887     const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
1888   }
1889 
1890   DBUG_RETURN(error);
1891 }
1892 
1893 
1894 Log_event::enum_skip_reason
do_shall_skip(Relay_log_info * rli)1895 Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
1896 {
1897   /*
1898     If the slave skip counter is 1 and this event does not end a
1899     statement, then we should not start executing on the next event.
1900     Otherwise, we defer the decision to the normal skipping logic.
1901   */
1902   if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1903     return Log_event::EVENT_SKIP_IGNORE;
1904   else
1905     return Log_event::do_shall_skip(rli);
1906 }
1907 
1908 int
do_update_pos(Relay_log_info * rli)1909 Old_rows_log_event::do_update_pos(Relay_log_info *rli)
1910 {
1911   DBUG_ENTER("Old_rows_log_event::do_update_pos");
1912   int error= 0;
1913 
1914   DBUG_PRINT("info", ("flags: %s",
1915                       get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1916 
1917   if (get_flags(STMT_END_F))
1918   {
1919     /*
1920       Indicate that a statement is finished.
1921       Step the group log position if we are not in a transaction,
1922       otherwise increase the event log position.
1923      */
1924     rli->stmt_done(common_header->log_pos);
1925     /*
1926       Clear any errors in thd->net.last_err*. It is not known if this is
1927       needed or not. It is believed that any errors that may exist in
1928       thd->net.last_err* are allowed. Examples of errors are "key not
1929       found", which is produced in the test case rpl_row_conflicts.test
1930     */
1931     thd->clear_error();
1932   }
1933   else
1934   {
1935     rli->inc_event_relay_log_pos();
1936   }
1937 
1938   DBUG_RETURN(error);
1939 }
1940 
1941 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1942 
1943 
1944 #ifndef MYSQL_CLIENT
write_data_header(IO_CACHE * file)1945 bool Old_rows_log_event::write_data_header(IO_CACHE *file)
1946 {
1947   // This method should not be reached.
1948   assert(0);
1949   return TRUE;
1950 }
1951 
1952 
write_data_body(IO_CACHE * file)1953 bool Old_rows_log_event::write_data_body(IO_CACHE*file)
1954 {
1955   /*
1956      Note that this should be the number of *bits*, not the number of
1957      bytes.
1958   */
1959   uchar sbuf[sizeof(m_width)];
1960   my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1961 
1962   // This method should not be reached.
1963   assert(0);
1964 
1965   bool res= false;
1966   uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1967   assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1968 
1969   DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1970   res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
1971 
1972   DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1973   res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
1974                               no_bytes_in_map(&m_cols));
1975   DBUG_DUMP("rows", m_rows_buf, data_size);
1976   res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
1977 
1978   return res;
1979 
1980 }
1981 #endif
1982 
1983 
1984 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
pack_info(Protocol * protocol)1985 int Old_rows_log_event::pack_info(Protocol *protocol)
1986 {
1987   char buf[256];
1988   char const *const flagstr=
1989     get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1990   size_t bytes= my_snprintf(buf, sizeof(buf),
1991                             "table_id: %llu%s", m_table_id.id(), flagstr);
1992   protocol->store(buf, bytes, &my_charset_bin);
1993   return 0;
1994 }
1995 #endif
1996 
1997 
1998 #ifdef MYSQL_CLIENT
print_helper(FILE * file,PRINT_EVENT_INFO * print_event_info,char const * const name)1999 void Old_rows_log_event::print_helper(FILE *file,
2000                                       PRINT_EVENT_INFO *print_event_info,
2001                                       char const *const name)
2002 {
2003   IO_CACHE *const head= &print_event_info->head_cache;
2004   IO_CACHE *const body= &print_event_info->body_cache;
2005   if (!print_event_info->short_form)
2006   {
2007     bool const last_stmt_event= get_flags(STMT_END_F);
2008     print_header(head, print_event_info, !last_stmt_event);
2009     my_b_printf(head, "\t%s: table id %llu%s\n",
2010                 name, m_table_id.id(),
2011                 last_stmt_event ? " flags: STMT_END_F" : "");
2012     print_base64(body, print_event_info, !last_stmt_event);
2013   }
2014 }
2015 #endif
2016 
2017 
2018 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2019 /**
2020   Write the current row into event's table.
2021 
2022   The row is located in the row buffer, pointed by @c m_curr_row member.
2023   Number of columns of the row is stored in @c m_width member (it can be
2024   different from the number of columns in the table to which we insert).
2025   Bitmap @c m_cols indicates which columns are present in the row. It is assumed
2026   that event's table is already open and pointed by @c m_table.
2027 
2028   If the same record already exists in the table it can be either overwritten
2029   or an error is reported depending on the value of @c overwrite flag
2030   (error reporting not yet implemented). Note that the matching record can be
2031   different from the row we insert if we use primary keys to identify records in
2032   the table.
2033 
2034   The row to be inserted can contain values only for selected columns. The
2035   missing columns are filled with default values using @c prepare_record()
2036   function. If a matching record is found in the table and @c overwritte is
2037   true, the missing columns are taken from it.
2038 
2039   @param  rli   Relay log info (needed for row unpacking).
2040   @param  overwrite
2041                 Shall we overwrite if the row already exists or signal
2042                 error (currently ignored).
2043 
2044   @returns Error code on failure, 0 on success.
2045 
2046   This method, if successful, sets @c m_curr_row_end pointer to point at the
2047   next row in the rows buffer. This is done when unpacking the row to be
2048   inserted.
2049 
2050   @note If a matching record is found, it is either updated using
2051   @c ha_update_row() or first deleted and then new record written.
2052 */
2053 
2054 int
write_row(const Relay_log_info * const rli,const bool overwrite)2055 Old_rows_log_event::write_row(const Relay_log_info *const rli,
2056                               const bool overwrite)
2057 {
2058   DBUG_ENTER("write_row");
2059   assert(m_table != NULL && thd != NULL);
2060 
2061   TABLE *table= m_table;  // pointer to event's table
2062   int error;
2063   int keynum;
2064   char *key= NULL;
2065 
2066   /* fill table->record[0] with default values */
2067 
2068   if ((error= prepare_record(table, table->write_set,
2069                              TRUE /* check if columns have def. values */)))
2070     DBUG_RETURN(error);
2071 
2072   /* unpack row into table->record[0] */
2073   error= unpack_current_row(rli); // TODO: how to handle errors?
2074 
2075 #ifndef NDEBUG
2076   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2077   DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
2078   DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
2079 #endif
2080 
2081   /*
2082     Try to write record. If a corresponding record already exists in the table,
2083     we try to change it using ha_update_row() if possible. Otherwise we delete
2084     it and repeat the whole process again.
2085 
2086     TODO: Add safety measures against infinite looping.
2087    */
2088 
2089   while ((error= table->file->ha_write_row(table->record[0])))
2090   {
2091     if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
2092     {
2093       table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
2094       DBUG_RETURN(error);
2095     }
2096     if ((keynum= table->file->get_dup_key(error)) < 0)
2097     {
2098       DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
2099       table->file->print_error(error, MYF(0));
2100       /*
2101         We failed to retrieve the duplicate key
2102         - either because the error was not "duplicate key" error
2103         - or because the information which key is not available
2104       */
2105       DBUG_RETURN(error);
2106     }
2107     /*
2108       key index value is either valid in the range [0-MAX_KEY) or
2109       has value MAX_KEY as a marker for the case when no information
2110       about key can be found. In the last case we have to require
2111       that storage engine has the flag HA_DUPLICATE_POS turned on.
2112       If this invariant is false then assert will crash
2113       the server built in debug mode. For the server that was built
2114       without DEBUG we have additional check for the value of key index
2115       in the code below in order to report about error in any case.
2116     */
2117     assert(keynum != MAX_KEY ||
2118            (keynum == MAX_KEY &&
2119             (table->file->ha_table_flags() & HA_DUPLICATE_POS)));
2120     /*
2121        We need to retrieve the old row into record[1] to be able to
2122        either update or delete the offending record.  We either:
2123 
2124        - use ha_rnd_pos() with a row-id (available as dupp_row) to the
2125          offending row, if that is possible (MyISAM and Blackhole), or else
2126 
2127        - use ha_index_read_idx_map() with the key that is duplicated, to
2128          retrieve the offending row.
2129      */
2130     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
2131     {
2132       DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
2133       error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
2134       if (error)
2135       {
2136         DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
2137         if (error == HA_ERR_RECORD_DELETED)
2138           error= HA_ERR_KEY_NOT_FOUND;
2139         table->file->print_error(error, MYF(0));
2140         DBUG_RETURN(error);
2141       }
2142     }
2143     else
2144     {
2145       DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
2146 
2147       if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2148       {
2149         DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
2150         DBUG_RETURN(my_errno());
2151       }
2152 
2153       if (key == NULL)
2154       {
2155         key= static_cast<char*>(my_alloca(table->s->max_unique_length));
2156         if (key == NULL)
2157         {
2158           DBUG_PRINT("info",("Can't allocate key buffer"));
2159           DBUG_RETURN(ENOMEM);
2160         }
2161       }
2162 
2163       if ((uint)keynum < MAX_KEY)
2164       {
2165         key_copy((uchar*)key, table->record[0], table->key_info + keynum,
2166                  0);
2167         error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2168                                                   (const uchar*)key,
2169                                                   HA_WHOLE_KEY,
2170                                                   HA_READ_KEY_EXACT);
2171       }
2172       else
2173         /*
2174           For the server built in non-debug mode returns error if
2175           handler::get_dup_key() returned MAX_KEY as the value of key index.
2176         */
2177         error= HA_ERR_FOUND_DUPP_KEY;
2178 
2179       if (error)
2180       {
2181         DBUG_PRINT("info",("ha_index_read_idx_map() returns error %d", error));
2182         if (error == HA_ERR_RECORD_DELETED)
2183           error= HA_ERR_KEY_NOT_FOUND;
2184         table->file->print_error(error, MYF(0));
2185         DBUG_RETURN(error);
2186       }
2187     }
2188 
2189     /*
2190        Now, record[1] should contain the offending row.  That
2191        will enable us to update it or, alternatively, delete it (so
2192        that we can insert the new row afterwards).
2193      */
2194 
2195     /*
2196       If row is incomplete we will use the record found to fill
2197       missing columns.
2198     */
2199     if (!get_flags(COMPLETE_ROWS_F))
2200     {
2201       restore_record(table,record[1]);
2202       error= unpack_current_row(rli);
2203     }
2204 
2205 #ifndef NDEBUG
2206     DBUG_PRINT("debug",("preparing for update: before and after image"));
2207     DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2208     DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2209 #endif
2210 
2211     /*
2212        REPLACE is defined as either INSERT or DELETE + INSERT.  If
2213        possible, we can replace it with an UPDATE, but that will not
2214        work on InnoDB if FOREIGN KEY checks are necessary.
2215 
2216        I (Matz) am not sure of the reason for the last_uniq_key()
2217        check as, but I'm guessing that it's something along the
2218        following lines.
2219 
2220        Suppose that we got the duplicate key to be a key that is not
2221        the last unique key for the table and we perform an update:
2222        then there might be another key for which the unique check will
2223        fail, so we're better off just deleting the row and inserting
2224        the correct row.
2225      */
2226     if (last_uniq_key(table, keynum) &&
2227         !table->file->referenced_by_foreign_key())
2228     {
2229       DBUG_PRINT("info",("Updating row using ha_update_row()"));
2230       error=table->file->ha_update_row(table->record[1],
2231                                        table->record[0]);
2232       switch (error) {
2233 
2234       case HA_ERR_RECORD_IS_THE_SAME:
2235         DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2236                            " ha_update_row()"));
2237         error= 0;
2238 
2239       case 0:
2240         break;
2241 
2242       default:
2243         DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2244         table->file->print_error(error, MYF(0));
2245       }
2246 
2247       DBUG_RETURN(error);
2248     }
2249     else
2250     {
2251       DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2252       if ((error= table->file->ha_delete_row(table->record[1])))
2253       {
2254         DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2255         table->file->print_error(error, MYF(0));
2256         DBUG_RETURN(error);
2257       }
2258       /* Will retry ha_write_row() with the offending row removed. */
2259     }
2260   }
2261 
2262   DBUG_RETURN(error);
2263 }
2264 
2265 
2266 /**
2267   Locate the current row in event's table.
2268 
2269   The current row is pointed by @c m_curr_row. Member @c m_width tells how many
2270   columns are there in the row (this can be differnet from the number of columns
2271   in the table). It is assumed that event's table is already open and pointed
2272   by @c m_table.
2273 
2274   If a corresponding record is found in the table it is stored in
2275   @c m_table->record[0]. Note that when record is located based on a primary
2276   key, it is possible that the record found differs from the row being located.
2277 
2278   If no key is specified or table does not have keys, a table scan is used to
2279   find the row. In that case the row should be complete and contain values for
2280   all columns. However, it can still be shorter than the table, i.e. the table
2281   can contain extra columns not present in the row. It is also possible that
2282   the table has fewer columns than the row being located.
2283 
2284   @returns Error code on failure, 0 on success.
2285 
2286   @post In case of success @c m_table->record[0] contains the record found.
2287   Also, the internal "cursor" of the table is positioned at the record found.
2288 
2289   @note If the engine allows random access of the records, a combination of
2290   @c position() and @c rnd_pos() will be used.
2291  */
2292 
find_row(const Relay_log_info * rli)2293 int Old_rows_log_event::find_row(const Relay_log_info *rli)
2294 {
2295   DBUG_ENTER("find_row");
2296 
2297   assert(m_table && m_table->in_use != NULL);
2298 
2299   TABLE *table= m_table;
2300   int error;
2301 
2302   /* unpack row - missing fields get default values */
2303 
2304   // TODO: shall we check and report errors here?
2305   prepare_record(table, table->read_set, FALSE /* don't check errors */);
2306   error= unpack_current_row(rli);
2307 
2308 #ifndef NDEBUG
2309   DBUG_PRINT("info",("looking for the following record"));
2310   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2311 #endif
2312 
2313   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2314       table->s->primary_key < MAX_KEY)
2315   {
2316     /*
2317       Use a more efficient method to fetch the record given by
2318       table->record[0] if the engine allows it.  We first compute a
2319       row reference using the position() member function (it will be
2320       stored in table->file->ref) and the use rnd_pos() to position
2321       the "cursor" (i.e., record[0] in this case) at the correct row.
2322 
2323       TODO: Add a check that the correct record has been fetched by
2324       comparing with the original record. Take into account that the
2325       record on the master and slave can be of different
2326       length. Something along these lines should work:
2327 
2328       ADD>>>  store_record(table,record[1]);
2329               int error= table->file->rnd_pos(table->record[0], table->file->ref);
2330               ADD>>>  assert(memcmp(table->record[1], table->record[0],
2331               table->s->reclength) == 0);
2332 
2333     */
2334     DBUG_PRINT("info",("locating record using primary key (position)"));
2335     int error= table->file->rnd_pos_by_record(table->record[0]);
2336     if (error)
2337     {
2338       DBUG_PRINT("info",("rnd_pos returns error %d",error));
2339       if (error == HA_ERR_RECORD_DELETED)
2340         error= HA_ERR_KEY_NOT_FOUND;
2341       table->file->print_error(error, MYF(0));
2342     }
2343     DBUG_RETURN(error);
2344   }
2345 
2346   // We can't use position() - try other methods.
2347 
2348   /*
2349     We need to retrieve all fields
2350     TODO: Move this out from this function to main loop
2351    */
2352   table->use_all_columns();
2353 
2354   /*
2355     Save copy of the record in table->record[1]. It might be needed
2356     later if linear search is used to find exact match.
2357    */
2358   store_record(table,record[1]);
2359 
2360   if (table->s->keys > 0)
2361   {
2362     DBUG_PRINT("info",("locating record using primary key (index_read)"));
2363 
2364     /* We have a key: search the table using the index */
2365     if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
2366     {
2367       DBUG_PRINT("info",("ha_index_init returns error %d",error));
2368       table->file->print_error(error, MYF(0));
2369       DBUG_RETURN(error);
2370     }
2371 
2372     /* Fill key data for the row */
2373 
2374     assert(m_key);
2375     key_copy(m_key, table->record[0], table->key_info, 0);
2376 
2377     DBUG_DUMP("key data", m_key, table->key_info->key_length);
2378 
2379     /*
2380       We need to set the null bytes to ensure that the filler bit are
2381       all set when returning.  There are storage engines that just set
2382       the necessary bits on the bytes and don't set the filler bits
2383       correctly.
2384     */
2385     my_ptrdiff_t const pos=
2386       table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2387     table->record[0][pos]= 0xFF;
2388 
2389     if ((error= table->file->ha_index_read_map(table->record[0], m_key,
2390                                                HA_WHOLE_KEY,
2391                                                HA_READ_KEY_EXACT)))
2392     {
2393       DBUG_PRINT("info",("no record matching the key found in the table"));
2394       if (error == HA_ERR_RECORD_DELETED)
2395         error= HA_ERR_KEY_NOT_FOUND;
2396       table->file->print_error(error, MYF(0));
2397       table->file->ha_index_end();
2398       DBUG_RETURN(error);
2399     }
2400 
2401     DBUG_PRINT("info",("found first matching record"));
2402     DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2403     /*
2404       Below is a minor "optimization".  If the key (i.e., key number
2405       0) has the HA_NOSAME flag set, we know that we have found the
2406       correct record (since there can be no duplicates); otherwise, we
2407       have to compare the record with the one found to see if it is
2408       the correct one.
2409 
2410       CAVEAT! This behaviour is essential for the replication of,
2411       e.g., the mysql.proc table since the correct record *shall* be
2412       found using the primary key *only*.  There shall be no
2413       comparison of non-PK columns to decide if the correct record is
2414       found.  I can see no scenario where it would be incorrect to
2415       chose the row to change only using a PK or an UNNI.
2416     */
2417     if (table->key_info->flags & HA_NOSAME)
2418     {
2419       /* Unique does not have non nullable part */
2420       if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2421       {
2422         table->file->ha_index_end();
2423         DBUG_RETURN(0);
2424       }
2425       else
2426       {
2427         KEY *keyinfo= table->key_info;
2428         /*
2429           Unique has nullable part. We need to check if there is any field in the
2430           BI image that is null and part of UNNI.
2431         */
2432         bool null_found= FALSE;
2433         for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2434         {
2435           uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2436           Field **f= table->field+fieldnr;
2437           null_found= (*f)->is_null();
2438         }
2439 
2440         if (!null_found)
2441         {
2442           table->file->ha_index_end();
2443           DBUG_RETURN(0);
2444         }
2445 
2446         /* else fall through to index scan */
2447       }
2448     }
2449 
2450     /*
2451       In case key is not unique, we still have to iterate over records found
2452       and find the one which is identical to the row given. A copy of the
2453       record we are looking for is stored in record[1].
2454      */
2455     DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2456 
2457     while (record_compare(table))
2458     {
2459       /*
2460         We need to set the null bytes to ensure that the filler bit
2461         are all set when returning.  There are storage engines that
2462         just set the necessary bits on the bytes and don't set the
2463         filler bits correctly.
2464 
2465         TODO[record format ndb]: Remove this code once NDB returns the
2466         correct record format.
2467       */
2468       if (table->s->null_bytes > 0)
2469       {
2470         table->record[0][table->s->null_bytes - 1]|=
2471           256U - (1U << table->s->last_null_bit_pos);
2472       }
2473 
2474       while ((error= table->file->ha_index_next(table->record[0])))
2475       {
2476         /* We just skip records that has already been deleted */
2477         if (error == HA_ERR_RECORD_DELETED)
2478           continue;
2479         DBUG_PRINT("info",("no record matching the given row found"));
2480         table->file->print_error(error, MYF(0));
2481         (void) table->file->ha_index_end();
2482         DBUG_RETURN(error);
2483       }
2484     }
2485 
2486     /*
2487       Have to restart the scan to be able to fetch the next row.
2488     */
2489     table->file->ha_index_end();
2490   }
2491   else
2492   {
2493     DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
2494 
2495     int restart_count= 0; // Number of times scanning has restarted from top
2496 
2497     /* We don't have a key: search the table using ha_rnd_next() */
2498     if ((error= table->file->ha_rnd_init(1)))
2499     {
2500       DBUG_PRINT("info",("error initializing table scan"
2501                          " (ha_rnd_init returns %d)",error));
2502       table->file->print_error(error, MYF(0));
2503       DBUG_RETURN(error);
2504     }
2505 
2506     /* Continue until we find the right record or have made a full loop */
2507     do
2508     {
2509   restart_ha_rnd_next:
2510       error= table->file->ha_rnd_next(table->record[0]);
2511 
2512       switch (error) {
2513 
2514       case 0:
2515         break;
2516 
2517       case HA_ERR_RECORD_DELETED:
2518         goto restart_ha_rnd_next;
2519 
2520       case HA_ERR_END_OF_FILE:
2521         if (++restart_count < 2)
2522         {
2523           if ((error= table->file->ha_rnd_init(1)))
2524           {
2525             table->file->print_error(error, MYF(0));
2526             DBUG_RETURN(error);
2527           }
2528           goto restart_ha_rnd_next;
2529         }
2530         break;
2531 
2532       default:
2533         DBUG_PRINT("info", ("Failed to get next record"
2534                             " (ha_rnd_next returns %d)",error));
2535         table->file->print_error(error, MYF(0));
2536         table->file->ha_rnd_end();
2537         DBUG_RETURN(error);
2538       }
2539     }
2540     while (restart_count < 2 && record_compare(table));
2541 
2542     /*
2543       Note: above record_compare will take into accout all record fields
2544       which might be incorrect in case a partial row was given in the event
2545      */
2546 
2547     /*
2548       Have to restart the scan to be able to fetch the next row.
2549     */
2550     if (restart_count == 2)
2551       DBUG_PRINT("info", ("Record not found"));
2552     else
2553       DBUG_DUMP("record found", table->record[0], table->s->reclength);
2554     table->file->ha_rnd_end();
2555 
2556     assert(error == HA_ERR_END_OF_FILE || error == 0);
2557     DBUG_RETURN(error);
2558   }
2559 
2560   DBUG_RETURN(0);
2561 }
2562 
2563 #endif
2564 
2565 
2566 /**************************************************************************
2567 	Write_rows_log_event member functions
2568 **************************************************************************/
2569 
2570 /*
2571   Constructor used to build an event for writing to the binary log.
2572  */
2573 #if !defined(MYSQL_CLIENT)
Write_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid_arg,MY_BITMAP const * cols,bool is_transactional)2574 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2575                                                    TABLE *tbl_arg,
2576                                                    ulong tid_arg,
2577                                                    MY_BITMAP const *cols,
2578                                                    bool is_transactional)
2579   :  Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2580 {
2581 
2582   // This constructor should not be reached.
2583   assert(0);
2584 
2585 }
2586 #endif
2587 
2588 
2589 /*
2590   Constructor used by slave to read the event from the binary log.
2591  */
2592 #ifdef HAVE_REPLICATION
2593 Write_rows_log_event_old::
Write_rows_log_event_old(const char * buf,uint event_len,const Format_description_event * description_event)2594 Write_rows_log_event_old(const char *buf, uint event_len,
2595                          const Format_description_event *description_event)
2596  : Old_rows_log_event(buf, event_len, binary_log::PRE_GA_WRITE_ROWS_EVENT,
2597    description_event)
2598 {
2599 }
2600 #endif
2601 
2602 
2603 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2604 int
do_before_row_operations(const Slave_reporting_capability * const)2605 Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2606 {
2607   int error= 0;
2608 
2609   /*
2610     We are using REPLACE semantics and not INSERT IGNORE semantics
2611     when writing rows, that is: new rows replace old rows.  We need to
2612     inform the storage engine that it should use this behaviour.
2613   */
2614 
2615   /* Tell the storage engine that we are using REPLACE semantics. */
2616   thd->lex->duplicates= DUP_REPLACE;
2617 
2618   /*
2619     Pretend we're executing a REPLACE command: this is needed for
2620     InnoDB and NDB Cluster since they are not (properly) checking the
2621     lex->duplicates flag.
2622   */
2623   thd->lex->sql_command= SQLCOM_REPLACE;
2624   /*
2625      Do not raise the error flag in case of hitting to an unique attribute
2626   */
2627   m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2628   /*
2629      NDB specific: update from ndb master wrapped as Write_rows
2630   */
2631   /*
2632     so that the event should be applied to replace slave's row
2633   */
2634   m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2635   /*
2636      NDB specific: if update from ndb master wrapped as Write_rows
2637      does not find the row it's assumed idempotent binlog applying
2638      is taking place; don't raise the error.
2639   */
2640   m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2641   /*
2642     TODO: the cluster team (Tomas?) says that it's better if the engine knows
2643     how many rows are going to be inserted, then it can allocate needed memory
2644     from the start.
2645   */
2646   m_table->file->ha_start_bulk_insert(0);
2647   return error;
2648 }
2649 
2650 
2651 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2652 Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2653                                                   int error)
2654 {
2655   int local_error= 0;
2656   m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2657   m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2658   /*
2659     reseting the extra with
2660     table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2661     fires bug#27077
2662     todo: explain or fix
2663   */
2664   if ((local_error= m_table->file->ha_end_bulk_insert()))
2665   {
2666     m_table->file->print_error(local_error, MYF(0));
2667   }
2668   return error? error : local_error;
2669 }
2670 
2671 
2672 int
do_exec_row(const Relay_log_info * const rli)2673 Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2674 {
2675   assert(m_table != NULL);
2676   int error= write_row(rli, TRUE /* overwrite */);
2677 
2678   if (error && !thd->get_protocol_classic()->get_last_errno())
2679     thd->get_protocol_classic()->set_last_errno(error);
2680 
2681   return error;
2682 }
2683 
2684 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2685 
2686 
2687 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2688 void Write_rows_log_event_old::print(FILE *file,
2689                                      PRINT_EVENT_INFO* print_event_info)
2690 {
2691   Old_rows_log_event::print_helper(file, print_event_info, "Write_rows_old");
2692 }
2693 #endif
2694 
2695 
2696 /**************************************************************************
2697 	Delete_rows_log_event member functions
2698 **************************************************************************/
2699 
2700 /*
2701   Constructor used to build an event for writing to the binary log.
2702  */
2703 
2704 #ifndef MYSQL_CLIENT
Delete_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2705 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2706                                                      TABLE *tbl_arg,
2707                                                      ulong tid,
2708                                                      MY_BITMAP const *cols,
2709                                                      bool is_transactional)
2710   : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2711     m_after_image(NULL), m_memory(NULL)
2712 {
2713 
2714   // This constructor should not be reached.
2715   assert(0);
2716 
2717 }
2718 #endif /* #if !defined(MYSQL_CLIENT) */
2719 
2720 
2721 /*
2722   Constructor used by slave to read the event from the binary log.
2723  */
2724 #ifdef HAVE_REPLICATION
2725 Delete_rows_log_event_old::
Delete_rows_log_event_old(const char * buf,uint event_len,const Format_description_event * description_event)2726 Delete_rows_log_event_old(const char *buf, uint event_len,
2727                           const Format_description_event *description_event)
2728   : Old_rows_log_event(buf, event_len, binary_log::PRE_GA_DELETE_ROWS_EVENT,
2729                        description_event),
2730     m_after_image(NULL), m_memory(NULL)
2731 {
2732 }
2733 #endif
2734 
2735 
2736 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2737 
2738 int
do_before_row_operations(const Slave_reporting_capability * const)2739 Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2740 {
2741   if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2742       m_table->s->primary_key < MAX_KEY)
2743   {
2744     /*
2745       We don't need to allocate any memory for m_key since it is not used.
2746     */
2747     return 0;
2748   }
2749 
2750   if (m_table->s->keys > 0)
2751   {
2752     // Allocate buffer for key searches
2753     m_key= (uchar*)my_malloc(key_memory_log_event_old,
2754                              m_table->key_info->key_length, MYF(MY_WME));
2755     if (!m_key)
2756       return HA_ERR_OUT_OF_MEM;
2757   }
2758   return 0;
2759 }
2760 
2761 
2762 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2763 Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2764                                                    int error)
2765 {
2766   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2767   m_table->file->ha_index_or_rnd_end();
2768   my_free(m_key);
2769   m_key= NULL;
2770 
2771   return error;
2772 }
2773 
2774 
do_exec_row(const Relay_log_info * const rli)2775 int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2776 {
2777   int error;
2778   assert(m_table != NULL);
2779 
2780   if (!(error= find_row(rli)))
2781   {
2782     /*
2783       Delete the record found, located in record[0]
2784     */
2785     error= m_table->file->ha_delete_row(m_table->record[0]);
2786   }
2787   return error;
2788 }
2789 
2790 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2791 
2792 
2793 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2794 void Delete_rows_log_event_old::print(FILE *file,
2795                                       PRINT_EVENT_INFO* print_event_info)
2796 {
2797   Old_rows_log_event::print_helper(file, print_event_info, "Delete_rows_old");
2798 }
2799 #endif
2800 
2801 
2802 /**************************************************************************
2803 	Update_rows_log_event member functions
2804 **************************************************************************/
2805 
2806 /*
2807   Constructor used to build an event for writing to the binary log.
2808  */
2809 #if !defined(MYSQL_CLIENT)
Update_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2810 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2811                                                      TABLE *tbl_arg,
2812                                                      ulong tid,
2813                                                      MY_BITMAP const *cols,
2814                                                      bool is_transactional)
2815   : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2816     m_after_image(NULL), m_memory(NULL)
2817 {
2818 
2819   // This constructor should not be reached.
2820   assert(0);
2821 }
2822 #endif /* !defined(MYSQL_CLIENT) */
2823 
2824 
2825 /*
2826   Constructor used by slave to read the event from the binary log.
2827  */
2828 #ifdef HAVE_REPLICATION
2829 Update_rows_log_event_old::
Update_rows_log_event_old(const char * buf,uint event_len,const Format_description_event * description_event)2830 Update_rows_log_event_old(const char *buf, uint event_len,
2831                           const Format_description_event
2832                           *description_event)
2833   : Old_rows_log_event(buf, event_len, binary_log::PRE_GA_UPDATE_ROWS_EVENT,
2834                        description_event),
2835     m_after_image(NULL), m_memory(NULL)
2836 {
2837 }
2838 #endif
2839 
2840 
2841 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2842 
2843 int
do_before_row_operations(const Slave_reporting_capability * const)2844 Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2845 {
2846   if (m_table->s->keys > 0)
2847   {
2848     // Allocate buffer for key searches
2849     m_key= (uchar*)my_malloc(key_memory_log_event_old,
2850                              m_table->key_info->key_length, MYF(MY_WME));
2851     if (!m_key)
2852       return HA_ERR_OUT_OF_MEM;
2853   }
2854 
2855   return 0;
2856 }
2857 
2858 
2859 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2860 Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2861                                                    int error)
2862 {
2863   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2864   m_table->file->ha_index_or_rnd_end();
2865   my_free(m_key); // Free for multi_malloc
2866   m_key= NULL;
2867 
2868   return error;
2869 }
2870 
2871 
2872 int
do_exec_row(const Relay_log_info * const rli)2873 Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
2874 {
2875   assert(m_table != NULL);
2876 
2877   int error= find_row(rli);
2878   if (error)
2879   {
2880     /*
2881       We need to read the second image in the event of error to be
2882       able to skip to the next pair of updates
2883     */
2884     m_curr_row= m_curr_row_end;
2885     unpack_current_row(rli);
2886     return error;
2887   }
2888 
2889   /*
2890     This is the situation after locating BI:
2891 
2892     ===|=== before image ====|=== after image ===|===
2893        ^                     ^
2894        m_curr_row            m_curr_row_end
2895 
2896     BI found in the table is stored in record[0]. We copy it to record[1]
2897     and unpack AI to record[0].
2898    */
2899 
2900   store_record(m_table,record[1]);
2901 
2902   m_curr_row= m_curr_row_end;
2903   error= unpack_current_row(rli); // this also updates m_curr_row_end
2904 
2905   /*
2906     Now we have the right row to update.  The old row (the one we're
2907     looking for) is in record[1] and the new row is in record[0].
2908   */
2909   DBUG_PRINT("info",("Updating row in table"));
2910   DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2911   DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2912 
2913   error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2914   if (error == HA_ERR_RECORD_IS_THE_SAME)
2915     error= 0;
2916 
2917   return error;
2918 }
2919 
2920 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2921 
2922 
2923 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2924 void Update_rows_log_event_old::print(FILE *file,
2925                                       PRINT_EVENT_INFO* print_event_info)
2926 {
2927   Old_rows_log_event::print_helper(file, print_event_info, "Update_rows_old");
2928 }
2929 #endif
2930