1 /* Copyright (c) 2007, 2019, Oracle and/or its affiliates.
2    Copyright (c) 2009, 2019, MariaDB
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
16 
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #ifndef MYSQL_CLIENT
20 #include "unireg.h"
21 #endif
22 #include "log_event.h"
23 #ifndef MYSQL_CLIENT
24 #include "sql_cache.h"                       // QUERY_CACHE_FLAGS_SIZE
25 #include "sql_base.h"                       // close_tables_for_reopen
26 #include "key.h"                            // key_copy
27 #include "lock.h"                           // mysql_unlock_tables
28 #include "rpl_rli.h"
29 #include "rpl_utility.h"
30 #endif
31 #include "log_event_old.h"
32 #include "rpl_record_old.h"
33 #include "transaction.h"
34 
35 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
36 
37 // Old implementation of do_apply_event()
38 int
do_apply_event(Old_rows_log_event * ev,rpl_group_info * rgi)39 Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
40 {
41   DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
42   int error= 0;
43   THD *ev_thd= ev->thd;
44   uchar const *row_start= ev->m_rows_buf;
45   const Relay_log_info *rli= rgi->rli;
46 
47   /*
48     If m_table_id == ~0UL, then we have a dummy event that does not
49     contain any data.  In that case, we just remove all tables in the
50     tables_to_lock list, close the thread tables, and return with
51     success.
52    */
53   if (ev->m_table_id == ~0UL)
54   {
55     /*
56        This one is supposed to be set: just an extra check so that
57        nothing strange has happened.
58      */
59     DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
60 
61     rgi->slave_close_thread_tables(ev_thd);
62     ev_thd->clear_error();
63     DBUG_RETURN(0);
64   }
65 
66   /*
67     'ev_thd' has been set by exec_relay_log_event(), just before calling
68     do_apply_event(). We still check here to prevent future coding
69     errors.
70   */
71   DBUG_ASSERT(rgi->thd == ev_thd);
72 
73   /*
74     If there is no locks taken, this is the first binrow event seen
75     after the table map events.  We should then lock all the tables
76     used in the transaction and proceed with execution of the actual
77     event.
78   */
79   if (!ev_thd->lock)
80   {
81     /*
82       Lock_tables() reads the contents of ev_thd->lex, so they must be
83       initialized.
84 
85       We also call the THD::reset_for_next_command(), since this
86       is the logical start of the next "statement". Note that this
87       call might reset the value of current_stmt_binlog_format, so
88       we need to do any changes to that value after this function.
89     */
90     delete_explain_query(thd->lex);
91     lex_start(ev_thd);
92     ev_thd->reset_for_next_command();
93 
94     /*
95       This is a row injection, so we flag the "statement" as
96       such. Note that this code is called both when the slave does row
97       injections and when the BINLOG statement is used to do row
98       injections.
99     */
100     ev_thd->lex->set_stmt_row_injection();
101 
102     if (unlikely(open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0)))
103     {
104       if (ev_thd->is_error())
105       {
106         /*
107           Error reporting borrowed from Query_log_event with many excessive
108           simplifications.
109           We should not honour --slave-skip-errors at this point as we are
110           having severe errors which should not be skipped.
111         */
112         rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
113                     "Error '%s' on opening tables",
114                     ev_thd->get_stmt_da()->message());
115         ev_thd->is_slave_error= 1;
116       }
117       DBUG_RETURN(1);
118     }
119 
120     /*
121       When the open and locking succeeded, we check all tables to
122       ensure that they still have the correct type.
123     */
124 
125     {
126       TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
127       for (uint i=0 ; table_list_ptr&& (i< rgi->tables_to_lock_count);
128            table_list_ptr= table_list_ptr->next_global, i++)
129       {
130         /*
131           Please see comment in log_event.cc-Rows_log_event::do_apply_event()
132           function for the explanation of the below if condition
133         */
134         if (table_list_ptr->parent_l)
135           continue;
136         /*
137           We can use a down cast here since we know that every table added
138           to the tables_to_lock is a RPL_TABLE_LIST(or child table which is
139           skipped above).
140         */
141         RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
142         DBUG_ASSERT(ptr->m_tabledef_valid);
143         TABLE *conv_table;
144         if (!ptr->m_tabledef.compatible_with(thd, rgi, ptr->table, &conv_table))
145         {
146           ev_thd->is_slave_error= 1;
147           rgi->slave_close_thread_tables(ev_thd);
148           DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
149         }
150         DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
151                              " - conv_table: %p",
152                              ptr->table->s->db.str,
153                              ptr->table->s->table_name.str, conv_table));
154         ptr->m_conv_table= conv_table;
155       }
156     }
157 
158     /*
159       ... and then we add all the tables to the table map and remove
160       them from tables to lock.
161 
162       We also invalidate the query cache for all the tables, since
163       they will now be changed.
164 
165       TODO [/Matz]: Maybe the query cache should not be invalidated
166       here? It might be that a table is not changed, even though it
167       was locked for the statement.  We do know that each
168       Old_rows_log_event contain at least one row, so after processing one
169       Old_rows_log_event, we can invalidate the query cache for the
170       associated table.
171      */
172     TABLE_LIST *ptr= rgi->tables_to_lock;
173     for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++)
174     {
175       /*
176         Please see comment in log_event.cc-Rows_log_event::do_apply_event()
177         function for the explanation of the below if condition
178        */
179       if (ptr->parent_l)
180         continue;
181       rgi->m_table_map.set_table(ptr->table_id, ptr->table);
182     }
183 #ifdef HAVE_QUERY_CACHE
184     query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
185 #endif
186   }
187 
188   TABLE* table= rgi->m_table_map.get_table(ev->m_table_id);
189 
190   if (table)
191   {
192     /*
193       table == NULL means that this table should not be replicated
194       (this was set up by Table_map_log_event::do_apply_event()
195       which tested replicate-* rules).
196     */
197 
198     /*
199       It's not needed to set_time() but
200       1) it continues the property that "Time" in SHOW PROCESSLIST shows how
201       much slave is behind
202       2) it will be needed when we allow replication from a table with no
203       TIMESTAMP column to a table with one.
204       So we call set_time(), like in SBR. Presently it changes nothing.
205     */
206     ev_thd->set_time(ev->when, ev->when_sec_part);
207     /*
208       There are a few flags that are replicated with each row event.
209       Make sure to set/clear them before executing the main body of
210       the event.
211     */
212     if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
213         ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
214     else
215         ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
216 
217     if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
218         ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
219     else
220         ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
221     /* A small test to verify that objects have consistent types */
222     DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
223 
224     table->rpl_write_set= table->write_set;
225 
226     error= do_before_row_operations(table);
227     while (error == 0 && row_start < ev->m_rows_end)
228     {
229       uchar const *row_end= NULL;
230       if (unlikely((error= do_prepare_row(ev_thd, rgi, table, row_start,
231                                           &row_end))))
232         break; // We should perform the after-row operation even in
233                // the case of error
234 
235       DBUG_ASSERT(row_end != NULL); // cannot happen
236       DBUG_ASSERT(row_end <= ev->m_rows_end);
237 
238       /* in_use can have been set to NULL in close_tables_for_reopen */
239       THD* old_thd= table->in_use;
240       if (!table->in_use)
241         table->in_use= ev_thd;
242       error= do_exec_row(table);
243       table->in_use = old_thd;
244       switch (error)
245       {
246         /* Some recoverable errors */
247       case HA_ERR_RECORD_CHANGED:
248       case HA_ERR_KEY_NOT_FOUND:  /* Idempotency support: OK if
249                                            tuple does not exist */
250   error= 0;
251       case 0:
252   break;
253 
254       default:
255   rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
256                     "Error in %s event: row application failed. %s",
257                     ev->get_type_str(),
258                     ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
259   thd->is_slave_error= 1;
260   break;
261       }
262 
263       row_start= row_end;
264     }
265     DBUG_EXECUTE_IF("stop_slave_middle_group",
266                     const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
267     error= do_after_row_operations(table, error);
268   }
269 
270   if (unlikely(error))
271   {                     /* error has occurred during the transaction */
272     rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
273                 "Error in %s event: error during transaction execution "
274                 "on table %s.%s. %s",
275                 ev->get_type_str(), table->s->db.str,
276                 table->s->table_name.str,
277                 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
278 
279     /*
280       If one day we honour --skip-slave-errors in row-based replication, and
281       the error should be skipped, then we would clear mappings, rollback,
282       close tables, but the slave SQL thread would not stop and then may
283       assume the mapping is still available, the tables are still open...
284       So then we should clear mappings/rollback/close here only if this is a
285       STMT_END_F.
286       For now we code, knowing that error is not skippable and so slave SQL
287       thread is certainly going to stop.
288       rollback at the caller along with sbr.
289     */
290     ev_thd->reset_current_stmt_binlog_format_row();
291     rgi->cleanup_context(ev_thd, error);
292     ev_thd->is_slave_error= 1;
293     DBUG_RETURN(error);
294   }
295 
296   DBUG_RETURN(0);
297 }
298 #endif
299 
300 
301 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
302 
303 /*
304   Check if there are more UNIQUE keys after the given key.
305 */
306 static int
last_uniq_key(TABLE * table,uint keyno)307 last_uniq_key(TABLE *table, uint keyno)
308 {
309   while (++keyno < table->s->keys)
310     if (table->key_info[keyno].flags & HA_NOSAME)
311       return 0;
312   return 1;
313 }
314 
315 
316 /*
317   Compares table->record[0] and table->record[1]
318 
319   Returns TRUE if different.
320 */
record_compare(TABLE * table)321 static bool record_compare(TABLE *table)
322 {
323   bool result= FALSE;
324   if (table->s->blob_fields + table->s->varchar_fields == 0)
325   {
326     result= cmp_record(table,record[1]);
327     goto record_compare_exit;
328   }
329 
330   /* Compare null bits */
331   if (memcmp(table->null_flags,
332        table->null_flags+table->s->rec_buff_length,
333        table->s->null_bytes))
334   {
335     result= TRUE;       // Diff in NULL value
336     goto record_compare_exit;
337   }
338 
339   /* Compare updated fields */
340   for (Field **ptr=table->field ; *ptr ; ptr++)
341   {
342     if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
343     {
344       result= TRUE;
345       goto record_compare_exit;
346     }
347   }
348 
349 record_compare_exit:
350   return result;
351 }
352 
353 
354 /*
355   Copy "extra" columns from record[1] to record[0].
356 
357   Copy the extra fields that are not present on the master but are
358   present on the slave from record[1] to record[0].  This is used
359   after fetching a record that are to be updated, either inside
360   replace_record() or as part of executing an update_row().
361  */
362 static int
copy_extra_record_fields(TABLE * table,size_t master_reclength,my_ptrdiff_t master_fields)363 copy_extra_record_fields(TABLE *table,
364                          size_t master_reclength,
365                          my_ptrdiff_t master_fields)
366 {
367   DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
368   DBUG_PRINT("info", ("Copying to %p "
369                       "from field %lu at offset %lu "
370                       "to field %d at offset %lu",
371                       table->record[0],
372                       (ulong) master_fields, (ulong) master_reclength,
373                       table->s->fields, table->s->reclength));
374   /*
375     Copying the extra fields of the slave that does not exist on
376     master into record[0] (which are basically the default values).
377   */
378 
379   if (table->s->fields < (uint) master_fields)
380     DBUG_RETURN(0);
381 
382  DBUG_ASSERT(master_reclength <= table->s->reclength);
383   if (master_reclength < table->s->reclength)
384     memcpy(table->record[0] + master_reclength,
385                 table->record[1] + master_reclength,
386                 table->s->reclength - master_reclength);
387 
388   /*
389     Bit columns are special.  We iterate over all the remaining
390     columns and copy the "extra" bits to the new record.  This is
391     not a very good solution: it should be refactored on
392     opportunity.
393 
394     REFACTORING SUGGESTION (Matz).  Introduce a member function
395     similar to move_field_offset() called copy_field_offset() to
396     copy field values and implement it for all Field subclasses. Use
397     this function to copy data from the found record to the record
398     that are going to be inserted.
399 
400     The copy_field_offset() function need to be a virtual function,
401     which in this case will prevent copying an entire range of
402     fields efficiently.
403   */
404   {
405     Field **field_ptr= table->field + master_fields;
406     for ( ; *field_ptr ; ++field_ptr)
407     {
408       /*
409         Set the null bit according to the values in record[1]
410        */
411       if ((*field_ptr)->maybe_null() &&
412           (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
413         (*field_ptr)->set_null();
414       else
415         (*field_ptr)->set_notnull();
416 
417       /*
418         Do the extra work for special columns.
419        */
420       switch ((*field_ptr)->real_type())
421       {
422       default:
423         /* Nothing to do */
424         break;
425 
426       case MYSQL_TYPE_BIT:
427         Field_bit *f= static_cast<Field_bit*>(*field_ptr);
428         if (f->bit_len > 0)
429         {
430           my_ptrdiff_t const offset= table->record[1] - table->record[0];
431           uchar const bits=
432             get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
433           set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
434         }
435         break;
436       }
437     }
438   }
439   DBUG_RETURN(0);                                     // All OK
440 }
441 
442 
443 /*
444   Replace the provided record in the database.
445 
446   SYNOPSIS
447       replace_record()
448       thd    Thread context for writing the record.
449       table  Table to which record should be written.
450       master_reclength
451              Offset to first column that is not present on the master,
452              alternatively the length of the record on the master
453              side.
454 
455   RETURN VALUE
456       Error code on failure, 0 on success.
457 
458   DESCRIPTION
459       Similar to how it is done in mysql_insert(), we first try to do
460       a ha_write_row() and of that fails due to duplicated keys (or
461       indices), we do an ha_update_row() or a ha_delete_row() instead.
462  */
463 static int
replace_record(THD * thd,TABLE * table,ulong const master_reclength,uint const master_fields)464 replace_record(THD *thd, TABLE *table,
465                ulong const master_reclength,
466                uint const master_fields)
467 {
468   DBUG_ENTER("replace_record");
469   DBUG_ASSERT(table != NULL && thd != NULL);
470 
471   int error;
472   int keynum;
473   auto_afree_ptr<char> key(NULL);
474 
475 #ifndef DBUG_OFF
476   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
477   DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
478   DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
479 #endif
480 
481   while (unlikely(error= table->file->ha_write_row(table->record[0])))
482   {
483     if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
484     {
485       table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
486       DBUG_RETURN(error);
487     }
488     if (unlikely((keynum= table->file->get_dup_key(error)) < 0))
489     {
490       table->file->print_error(error, MYF(0));
491       /*
492         We failed to retrieve the duplicate key
493         - either because the error was not "duplicate key" error
494         - or because the information which key is not available
495       */
496       DBUG_RETURN(error);
497     }
498 
499     /*
500        We need to retrieve the old row into record[1] to be able to
501        either update or delete the offending record.  We either:
502 
503        - use rnd_pos() with a row-id (available as dupp_row) to the
504          offending row, if that is possible (MyISAM and Blackhole), or else
505 
506        - use index_read_idx() with the key that is duplicated, to
507          retrieve the offending row.
508      */
509     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
510     {
511       error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
512       if (unlikely(error))
513       {
514         DBUG_PRINT("info",("rnd_pos() returns error %d",error));
515         table->file->print_error(error, MYF(0));
516         DBUG_RETURN(error);
517       }
518     }
519     else
520     {
521       if (unlikely(table->file->extra(HA_EXTRA_FLUSH_CACHE)))
522       {
523         DBUG_RETURN(my_errno);
524       }
525 
526       if (key.get() == NULL)
527       {
528         key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
529         if (unlikely(key.get() == NULL))
530           DBUG_RETURN(ENOMEM);
531       }
532 
533       key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
534                0);
535       error= table->file->ha_index_read_idx_map(table->record[1], keynum,
536                                                 (const uchar*)key.get(),
537                                                 HA_WHOLE_KEY,
538                                                 HA_READ_KEY_EXACT);
539       if (unlikely(error))
540       {
541         DBUG_PRINT("info", ("index_read_idx() returns error %d", error));
542         table->file->print_error(error, MYF(0));
543         DBUG_RETURN(error);
544       }
545     }
546 
547     /*
548        Now, table->record[1] should contain the offending row.  That
549        will enable us to update it or, alternatively, delete it (so
550        that we can insert the new row afterwards).
551 
552        First we copy the columns into table->record[0] that are not
553        present on the master from table->record[1], if there are any.
554     */
555     copy_extra_record_fields(table, master_reclength, master_fields);
556 
557     /*
558        REPLACE is defined as either INSERT or DELETE + INSERT.  If
559        possible, we can replace it with an UPDATE, but that will not
560        work on InnoDB if FOREIGN KEY checks are necessary.
561 
562        I (Matz) am not sure of the reason for the last_uniq_key()
563        check as, but I'm guessing that it's something along the
564        following lines.
565 
566        Suppose that we got the duplicate key to be a key that is not
567        the last unique key for the table and we perform an update:
568        then there might be another key for which the unique check will
569        fail, so we're better off just deleting the row and inserting
570        the correct row.
571      */
572     if (last_uniq_key(table, keynum) &&
573         !table->file->referenced_by_foreign_key())
574     {
575       error=table->file->ha_update_row(table->record[1],
576                                        table->record[0]);
577       if (unlikely(error) && error != HA_ERR_RECORD_IS_THE_SAME)
578         table->file->print_error(error, MYF(0));
579       else
580         error= 0;
581       DBUG_RETURN(error);
582     }
583     else
584     {
585       if (unlikely((error= table->file->ha_delete_row(table->record[1]))))
586       {
587         table->file->print_error(error, MYF(0));
588         DBUG_RETURN(error);
589       }
590       /* Will retry ha_write_row() with the offending row removed. */
591     }
592   }
593 
594   DBUG_RETURN(error);
595 }
596 
597 
598 /**
599   Find the row given by 'key', if the table has keys, or else use a table scan
600   to find (and fetch) the row.
601 
602   If the engine allows random access of the records, a combination of
603   position() and rnd_pos() will be used.
604 
605   @param table Pointer to table to search
606   @param key   Pointer to key to use for search, if table has key
607 
608   @pre <code>table->record[0]</code> shall contain the row to locate
609   and <code>key</code> shall contain a key to use for searching, if
610   the engine has a key.
611 
612   @post If the return value is zero, <code>table->record[1]</code>
613   will contain the fetched row and the internal "cursor" will refer to
614   the row. If the return value is non-zero,
615   <code>table->record[1]</code> is undefined.  In either case,
616   <code>table->record[0]</code> is undefined.
617 
618   @return Zero if the row was successfully fetched into
619   <code>table->record[1]</code>, error code otherwise.
620  */
621 
find_and_fetch_row(TABLE * table,uchar * key)622 static int find_and_fetch_row(TABLE *table, uchar *key)
623 {
624   DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
625   DBUG_PRINT("enter", ("table: %p, key: %p  record: %p",
626            table, key, table->record[1]));
627 
628   DBUG_ASSERT(table->in_use != NULL);
629 
630   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
631 
632   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
633       table->s->primary_key < MAX_KEY)
634   {
635     /*
636       Use a more efficient method to fetch the record given by
637       table->record[0] if the engine allows it.  We first compute a
638       row reference using the position() member function (it will be
639       stored in table->file->ref) and the use rnd_pos() to position
640       the "cursor" (i.e., record[0] in this case) at the correct row.
641 
642       TODO: Add a check that the correct record has been fetched by
643       comparing with the original record. Take into account that the
644       record on the master and slave can be of different
645       length. Something along these lines should work:
646 
647       ADD>>>  store_record(table,record[1]);
648               int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
649       ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
650                                  table->s->reclength) == 0);
651 
652     */
653     table->file->position(table->record[0]);
654     int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
655     /*
656       rnd_pos() returns the record in table->record[0], so we have to
657       move it to table->record[1].
658      */
659     memcpy(table->record[1], table->record[0], table->s->reclength);
660     DBUG_RETURN(error);
661   }
662 
663   /* We need to retrieve all fields */
664   /* TODO: Move this out from this function to main loop */
665   table->use_all_columns();
666 
667   if (table->s->keys > 0)
668   {
669     int error;
670     /* We have a key: search the table using the index */
671     if (!table->file->inited &&
672         unlikely(error= table->file->ha_index_init(0, FALSE)))
673     {
674       table->file->print_error(error, MYF(0));
675       DBUG_RETURN(error);
676     }
677 
678   /*
679     Don't print debug messages when running valgrind since they can
680     trigger false warnings.
681    */
682 #ifndef HAVE_valgrind
683     DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
684     DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
685 #endif
686 
687     /*
688       We need to set the null bytes to ensure that the filler bit are
689       all set when returning.  There are storage engines that just set
690       the necessary bits on the bytes and don't set the filler bits
691       correctly.
692     */
693     my_ptrdiff_t const pos=
694       table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
695     table->record[1][pos]= 0xFF;
696     if (unlikely((error= table->file->ha_index_read_map(table->record[1], key,
697                                                         HA_WHOLE_KEY,
698                                                         HA_READ_KEY_EXACT))))
699     {
700       table->file->print_error(error, MYF(0));
701       table->file->ha_index_end();
702       DBUG_RETURN(error);
703     }
704 
705   /*
706     Don't print debug messages when running valgrind since they can
707     trigger false warnings.
708    */
709 #ifndef HAVE_valgrind
710     DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
711     DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
712 #endif
713     /*
714       Below is a minor "optimization".  If the key (i.e., key number
715       0) has the HA_NOSAME flag set, we know that we have found the
716       correct record (since there can be no duplicates); otherwise, we
717       have to compare the record with the one found to see if it is
718       the correct one.
719 
720       CAVEAT! This behaviour is essential for the replication of,
721       e.g., the mysql.proc table since the correct record *shall* be
722       found using the primary key *only*.  There shall be no
723       comparison of non-PK columns to decide if the correct record is
724       found.  I can see no scenario where it would be incorrect to
725       chose the row to change only using a PK or an UNNI.
726     */
727     if (table->key_info->flags & HA_NOSAME)
728     {
729       table->file->ha_index_end();
730       DBUG_RETURN(0);
731     }
732 
733     while (record_compare(table))
734     {
735       int error;
736 
737       while ((error= table->file->ha_index_next(table->record[1])))
738       {
739         table->file->print_error(error, MYF(0));
740         table->file->ha_index_end();
741         DBUG_RETURN(error);
742       }
743     }
744 
745     /*
746       Have to restart the scan to be able to fetch the next row.
747     */
748     table->file->ha_index_end();
749   }
750   else
751   {
752     int restart_count= 0; // Number of times scanning has restarted from top
753     int error;
754 
755     /* We don't have a key: search the table using rnd_next() */
756     if (unlikely((error= table->file->ha_rnd_init_with_error(1))))
757       return error;
758 
759     /* Continue until we find the right record or have made a full loop */
760     do
761     {
762       error= table->file->ha_rnd_next(table->record[1]);
763 
764       DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
765       DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
766 
767       switch (error) {
768       case 0:
769         break;
770 
771       case HA_ERR_END_OF_FILE:
772         if (++restart_count < 2)
773         {
774           int error2;
775           if (unlikely((error2= table->file->ha_rnd_init_with_error(1))))
776             DBUG_RETURN(error2);
777         }
778         break;
779 
780       default:
781         table->file->print_error(error, MYF(0));
782         DBUG_PRINT("info", ("Record not found"));
783         (void) table->file->ha_rnd_end();
784         DBUG_RETURN(error);
785       }
786     }
787     while (restart_count < 2 && record_compare(table));
788 
789     /*
790       Have to restart the scan to be able to fetch the next row.
791     */
792     DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
793     table->file->ha_rnd_end();
794 
795     DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
796     DBUG_RETURN(error);
797   }
798 
799   DBUG_RETURN(0);
800 }
801 
802 
803 /**********************************************************
804   Row handling primitives for Write_rows_log_event_old
805  **********************************************************/
806 
do_before_row_operations(TABLE * table)807 int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
808 {
809   int error= 0;
810 
811   /*
812     We are using REPLACE semantics and not INSERT IGNORE semantics
813     when writing rows, that is: new rows replace old rows.  We need to
814     inform the storage engine that it should use this behaviour.
815   */
816 
817   /* Tell the storage engine that we are using REPLACE semantics. */
818   thd->lex->duplicates= DUP_REPLACE;
819 
820   thd->lex->sql_command= SQLCOM_REPLACE;
821   /*
822      Do not raise the error flag in case of hitting to an unique attribute
823   */
824   table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
825   table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
826   table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
827   table->file->ha_start_bulk_insert(0);
828   return error;
829 }
830 
831 
do_after_row_operations(TABLE * table,int error)832 int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
833 {
834   int local_error= 0;
835   table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
836   table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
837   /*
838     resetting the extra with
839     table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
840     fires bug#27077
841     todo: explain or fix
842   */
843   if (unlikely((local_error= table->file->ha_end_bulk_insert())))
844   {
845     table->file->print_error(local_error, MYF(0));
846   }
847   return error? error : local_error;
848 }
849 
850 
851 int
do_prepare_row(THD * thd_arg,rpl_group_info * rgi,TABLE * table,uchar const * row_start,uchar const ** row_end)852 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
853                                          rpl_group_info *rgi,
854                                          TABLE *table,
855                                          uchar const *row_start,
856                                          uchar const **row_end)
857 {
858   DBUG_ASSERT(table != NULL);
859   DBUG_ASSERT(row_start && row_end);
860 
861   int error;
862   error= unpack_row_old(rgi,
863                         table, m_width, table->record[0],
864                         row_start, m_rows_end,
865                         &m_cols, row_end, &m_master_reclength,
866                         table->write_set, PRE_GA_WRITE_ROWS_EVENT);
867   bitmap_copy(table->read_set, table->write_set);
868   return error;
869 }
870 
871 
do_exec_row(TABLE * table)872 int Write_rows_log_event_old::do_exec_row(TABLE *table)
873 {
874   DBUG_ASSERT(table != NULL);
875   int error= replace_record(thd, table, m_master_reclength, m_width);
876   return error;
877 }
878 
879 
880 /**********************************************************
881   Row handling primitives for Delete_rows_log_event_old
882  **********************************************************/
883 
do_before_row_operations(TABLE * table)884 int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
885 {
886   DBUG_ASSERT(m_memory == NULL);
887 
888   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
889       table->s->primary_key < MAX_KEY)
890   {
891     /*
892       We don't need to allocate any memory for m_after_image and
893       m_key since they are not used.
894     */
895     return 0;
896   }
897 
898   int error= 0;
899 
900   if (table->s->keys > 0)
901   {
902     m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
903                                        &m_after_image,
904                                        (uint) table->s->reclength,
905                                        &m_key,
906                                        (uint) table->key_info->key_length,
907                                        NullS);
908   }
909   else
910   {
911     m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
912     m_memory= (uchar*)m_after_image;
913     m_key= NULL;
914   }
915   if (!m_memory)
916     return HA_ERR_OUT_OF_MEM;
917 
918   return error;
919 }
920 
921 
do_after_row_operations(TABLE * table,int error)922 int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
923 {
924   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
925   table->file->ha_index_or_rnd_end();
926   my_free(m_memory); // Free for multi_malloc
927   m_memory= NULL;
928   m_after_image= NULL;
929   m_key= NULL;
930 
931   return error;
932 }
933 
934 
935 int
do_prepare_row(THD * thd_arg,rpl_group_info * rgi,TABLE * table,uchar const * row_start,uchar const ** row_end)936 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
937                                           rpl_group_info *rgi,
938                                           TABLE *table,
939                                           uchar const *row_start,
940                                           uchar const **row_end)
941 {
942   int error;
943   DBUG_ASSERT(row_start && row_end);
944   /*
945     This assertion actually checks that there is at least as many
946     columns on the slave as on the master.
947   */
948   DBUG_ASSERT(table->s->fields >= m_width);
949 
950   error= unpack_row_old(rgi,
951                         table, m_width, table->record[0],
952                         row_start, m_rows_end,
953                         &m_cols, row_end, &m_master_reclength,
954                         table->read_set, PRE_GA_DELETE_ROWS_EVENT);
955   /*
956     If we will access rows using the random access method, m_key will
957     be set to NULL, so we do not need to make a key copy in that case.
958    */
959   if (m_key)
960   {
961     KEY *const key_info= table->key_info;
962 
963     key_copy(m_key, table->record[0], key_info, 0);
964   }
965 
966   return error;
967 }
968 
969 
do_exec_row(TABLE * table)970 int Delete_rows_log_event_old::do_exec_row(TABLE *table)
971 {
972   int error;
973   DBUG_ASSERT(table != NULL);
974 
975   if (likely(!(error= ::find_and_fetch_row(table, m_key))))
976   {
977     /*
978       Now we should have the right row to delete.  We are using
979       record[0] since it is guaranteed to point to a record with the
980       correct value.
981     */
982     error= table->file->ha_delete_row(table->record[0]);
983   }
984   return error;
985 }
986 
987 
988 /**********************************************************
989   Row handling primitives for Update_rows_log_event_old
990  **********************************************************/
991 
do_before_row_operations(TABLE * table)992 int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
993 {
994   DBUG_ASSERT(m_memory == NULL);
995 
996   int error= 0;
997 
998   if (table->s->keys > 0)
999   {
1000     m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1001                                        &m_after_image,
1002                                        (uint) table->s->reclength,
1003                                        &m_key,
1004                                        (uint) table->key_info->key_length,
1005                                        NullS);
1006   }
1007   else
1008   {
1009     m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1010     m_memory= m_after_image;
1011     m_key= NULL;
1012   }
1013   if (!m_memory)
1014     return HA_ERR_OUT_OF_MEM;
1015 
1016   return error;
1017 }
1018 
1019 
do_after_row_operations(TABLE * table,int error)1020 int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1021 {
1022   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1023   table->file->ha_index_or_rnd_end();
1024   my_free(m_memory);
1025   m_memory= NULL;
1026   m_after_image= NULL;
1027   m_key= NULL;
1028 
1029   return error;
1030 }
1031 
1032 
do_prepare_row(THD * thd_arg,rpl_group_info * rgi,TABLE * table,uchar const * row_start,uchar const ** row_end)1033 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1034                                               rpl_group_info *rgi,
1035                                               TABLE *table,
1036                                               uchar const *row_start,
1037                                               uchar const **row_end)
1038 {
1039   int error;
1040   DBUG_ASSERT(row_start && row_end);
1041   /*
1042     This assertion actually checks that there is at least as many
1043     columns on the slave as on the master.
1044   */
1045   DBUG_ASSERT(table->s->fields >= m_width);
1046 
1047   /* record[0] is the before image for the update */
1048   error= unpack_row_old(rgi,
1049                         table, m_width, table->record[0],
1050                         row_start, m_rows_end,
1051                         &m_cols, row_end, &m_master_reclength,
1052                         table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
1053   row_start = *row_end;
1054   /* m_after_image is the after image for the update */
1055   error= unpack_row_old(rgi,
1056                         table, m_width, m_after_image,
1057                         row_start, m_rows_end,
1058                         &m_cols, row_end, &m_master_reclength,
1059                         table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
1060 
1061   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1062   DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1063 
1064   /*
1065     If we will access rows using the random access method, m_key will
1066     be set to NULL, so we do not need to make a key copy in that case.
1067    */
1068   if (m_key)
1069   {
1070     KEY *const key_info= table->key_info;
1071 
1072     key_copy(m_key, table->record[0], key_info, 0);
1073   }
1074 
1075   return error;
1076 }
1077 
1078 
do_exec_row(TABLE * table)1079 int Update_rows_log_event_old::do_exec_row(TABLE *table)
1080 {
1081   DBUG_ASSERT(table != NULL);
1082 
1083   int error= ::find_and_fetch_row(table, m_key);
1084   if (unlikely(error))
1085     return error;
1086 
1087   /*
1088     We have to ensure that the new record (i.e., the after image) is
1089     in record[0] and the old record (i.e., the before image) is in
1090     record[1].  This since some storage engines require this (for
1091     example, the partition engine).
1092 
1093     Since find_and_fetch_row() puts the fetched record (i.e., the old
1094     record) in record[1], we can keep it there. We put the new record
1095     (i.e., the after image) into record[0], and copy the fields that
1096     are on the slave (i.e., in record[1]) into record[0], effectively
1097     overwriting the default values that where put there by the
1098     unpack_row() function.
1099   */
1100   memcpy(table->record[0], m_after_image, table->s->reclength);
1101   copy_extra_record_fields(table, m_master_reclength, m_width);
1102 
1103   /*
1104     Now we have the right row to update.  The old row (the one we're
1105     looking for) is in record[1] and the new row has is in record[0].
1106     We also have copied the original values already in the slave's
1107     database into the after image delivered from the master.
1108   */
1109   error= table->file->ha_update_row(table->record[1], table->record[0]);
1110   if (unlikely(error == HA_ERR_RECORD_IS_THE_SAME))
1111     error= 0;
1112 
1113   return error;
1114 }
1115 
1116 #endif
1117 
1118 
1119 /**************************************************************************
1120 	Rows_log_event member functions
1121 **************************************************************************/
1122 
1123 #ifndef MYSQL_CLIENT
Old_rows_log_event(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)1124 Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1125                                        MY_BITMAP const *cols,
1126                                        bool is_transactional)
1127   : Log_event(thd_arg, 0, is_transactional),
1128     m_row_count(0),
1129     m_table(tbl_arg),
1130     m_table_id(tid),
1131     m_width(tbl_arg ? tbl_arg->s->fields : 1),
1132     m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1133 #ifdef HAVE_REPLICATION
1134     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1135 #endif
1136 {
1137 
1138   // This constructor should not be reached.
1139   assert(0);
1140 
1141   /*
1142     We allow a special form of dummy event when the table, and cols
1143     are null and the table id is ~0UL.  This is a temporary
1144     solution, to be able to terminate a started statement in the
1145     binary log: the extraneous events will be removed in the future.
1146    */
1147   DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1148               (!tbl_arg && !cols && tid == ~0UL));
1149 
1150   if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1151       set_flags(NO_FOREIGN_KEY_CHECKS_F);
1152   if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1153       set_flags(RELAXED_UNIQUE_CHECKS_F);
1154   /* if my_bitmap_init fails, caught in is_valid() */
1155   if (likely(!my_bitmap_init(&m_cols,
1156                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1157                           m_width,
1158                           false)))
1159   {
1160     /* Cols can be zero if this is a dummy binrows event */
1161     if (likely(cols != NULL))
1162     {
1163       memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1164       create_last_word_mask(&m_cols);
1165     }
1166   }
1167   else
1168   {
1169     // Needed because my_bitmap_init() does not set it to null on failure
1170     m_cols.bitmap= 0;
1171   }
1172 }
1173 #endif
1174 
1175 
Old_rows_log_event(const char * buf,uint event_len,Log_event_type event_type,const Format_description_log_event * description_event)1176 Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len,
1177                                        Log_event_type event_type,
1178                                        const Format_description_log_event
1179                                        *description_event)
1180   : Log_event(buf, description_event),
1181     m_row_count(0),
1182 #ifndef MYSQL_CLIENT
1183     m_table(NULL),
1184 #endif
1185     m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1186 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1187     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1188 #endif
1189 {
1190   DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1191   uint8 const common_header_len= description_event->common_header_len;
1192   uint8 const post_header_len= description_event->post_header_len[event_type-1];
1193 
1194   DBUG_PRINT("enter",("event_len: %u  common_header_len: %d  "
1195 		      "post_header_len: %d",
1196 		      event_len, common_header_len,
1197 		      post_header_len));
1198 
1199   const char *post_start= buf + common_header_len;
1200   DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1201   post_start+= RW_MAPID_OFFSET;
1202   if (post_header_len == 6)
1203   {
1204     /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1205     m_table_id= uint4korr(post_start);
1206     post_start+= 4;
1207   }
1208   else
1209   {
1210     m_table_id= (ulong) uint6korr(post_start);
1211     post_start+= RW_FLAGS_OFFSET;
1212   }
1213 
1214   m_flags= uint2korr(post_start);
1215 
1216   uchar const *const var_start=
1217     (const uchar *)buf + common_header_len + post_header_len;
1218   uchar const *const ptr_width= var_start;
1219   uchar *ptr_after_width= (uchar*) ptr_width;
1220   DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1221   m_width = net_field_length(&ptr_after_width);
1222   DBUG_PRINT("debug", ("m_width=%lu", m_width));
1223   /* Avoid reading out of buffer */
1224   if (ptr_after_width + m_width > (uchar *)buf + event_len)
1225   {
1226     m_cols.bitmap= NULL;
1227     DBUG_VOID_RETURN;
1228   }
1229 
1230   /* if my_bitmap_init fails, caught in is_valid() */
1231   if (likely(!my_bitmap_init(&m_cols,
1232                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1233                           m_width,
1234                           false)))
1235   {
1236     DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1237     memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1238     create_last_word_mask(&m_cols);
1239     ptr_after_width+= (m_width + 7) / 8;
1240     DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1241   }
1242   else
1243   {
1244     // Needed because my_bitmap_init() does not set it to null on failure
1245     m_cols.bitmap= NULL;
1246     DBUG_VOID_RETURN;
1247   }
1248 
1249   const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1250   size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1251   DBUG_PRINT("info",("m_table_id: %lu  m_flags: %d  m_width: %lu  data_size: %zu",
1252                      m_table_id, m_flags, m_width, data_size));
1253   DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1254 
1255   m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
1256   if (likely((bool)m_rows_buf))
1257   {
1258 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1259     m_curr_row= m_rows_buf;
1260 #endif
1261     m_rows_end= m_rows_buf + data_size;
1262     m_rows_cur= m_rows_end;
1263     memcpy(m_rows_buf, ptr_rows_data, data_size);
1264   }
1265   else
1266     m_cols.bitmap= 0; // to not free it
1267 
1268   DBUG_VOID_RETURN;
1269 }
1270 
1271 
~Old_rows_log_event()1272 Old_rows_log_event::~Old_rows_log_event()
1273 {
1274   if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1275     m_cols.bitmap= 0; // so no my_free in my_bitmap_free
1276   my_bitmap_free(&m_cols); // To pair with my_bitmap_init().
1277   my_free(m_rows_buf);
1278 }
1279 
1280 
get_data_size()1281 int Old_rows_log_event::get_data_size()
1282 {
1283   uchar buf[MAX_INT_WIDTH];
1284   uchar *end= net_store_length(buf, (m_width + 7) / 8);
1285 
1286   DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1287                   return (int)(6 + no_bytes_in_map(&m_cols) + (end - buf) +
1288                   m_rows_cur - m_rows_buf););
1289   int data_size= ROWS_HEADER_LEN;
1290   data_size+= no_bytes_in_map(&m_cols);
1291   data_size+= (uint) (end - buf);
1292 
1293   data_size+= (uint) (m_rows_cur - m_rows_buf);
1294   return data_size;
1295 }
1296 
1297 
1298 #ifndef MYSQL_CLIENT
do_add_row_data(uchar * row_data,size_t length)1299 int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1300 {
1301   /*
1302     When the table has a primary key, we would probably want, by default, to
1303     log only the primary key value instead of the entire "before image". This
1304     would save binlog space. TODO
1305   */
1306   DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1307   DBUG_PRINT("enter", ("row_data: %p  length: %zu",row_data,
1308                        length));
1309   /*
1310     Don't print debug messages when running valgrind since they can
1311     trigger false warnings.
1312    */
1313 #ifndef HAVE_valgrind
1314   DBUG_DUMP("row_data", row_data, MY_MIN(length, 32));
1315 #endif
1316 
1317   DBUG_ASSERT(m_rows_buf <= m_rows_cur);
1318   DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1319   DBUG_ASSERT(m_rows_cur <= m_rows_end);
1320 
1321   /* The cast will always work since m_rows_cur <= m_rows_end */
1322   if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1323   {
1324     size_t const block_size= 1024;
1325     my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1326     my_ptrdiff_t const new_alloc=
1327         block_size * ((cur_size + length + block_size - 1) / block_size);
1328 
1329     uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
1330                                            MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1331     if (unlikely(!new_buf))
1332       DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1333 
1334     /* If the memory moved, we need to move the pointers */
1335     if (new_buf != m_rows_buf)
1336     {
1337       m_rows_buf= new_buf;
1338       m_rows_cur= m_rows_buf + cur_size;
1339     }
1340 
1341     /*
1342        The end pointer should always be changed to point to the end of
1343        the allocated memory.
1344     */
1345     m_rows_end= m_rows_buf + new_alloc;
1346   }
1347 
1348   DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
1349   memcpy(m_rows_cur, row_data, length);
1350   m_rows_cur+= length;
1351   m_row_count++;
1352   DBUG_RETURN(0);
1353 }
1354 #endif
1355 
1356 
1357 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
do_apply_event(rpl_group_info * rgi)1358 int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
1359 {
1360   DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1361   int error= 0;
1362   Relay_log_info const *rli= rgi->rli;
1363 
1364   /*
1365     If m_table_id == ~0UL, then we have a dummy event that does not
1366     contain any data.  In that case, we just remove all tables in the
1367     tables_to_lock list, close the thread tables, and return with
1368     success.
1369    */
1370   if (m_table_id == ~0UL)
1371   {
1372     /*
1373        This one is supposed to be set: just an extra check so that
1374        nothing strange has happened.
1375      */
1376     DBUG_ASSERT(get_flags(STMT_END_F));
1377 
1378     rgi->slave_close_thread_tables(thd);
1379     thd->clear_error();
1380     DBUG_RETURN(0);
1381   }
1382 
1383   /*
1384     'thd' has been set by exec_relay_log_event(), just before calling
1385     do_apply_event(). We still check here to prevent future coding
1386     errors.
1387   */
1388   DBUG_ASSERT(rgi->thd == thd);
1389 
1390   /*
1391     If there is no locks taken, this is the first binrow event seen
1392     after the table map events.  We should then lock all the tables
1393     used in the transaction and proceed with execution of the actual
1394     event.
1395   */
1396   if (!thd->lock)
1397   {
1398     /*
1399       lock_tables() reads the contents of thd->lex, so they must be
1400       initialized. Contrary to in
1401       Table_map_log_event::do_apply_event() we don't call
1402       mysql_init_query() as that may reset the binlog format.
1403     */
1404     lex_start(thd);
1405 
1406     if (unlikely((error= lock_tables(thd, rgi->tables_to_lock,
1407                                      rgi->tables_to_lock_count, 0))))
1408     {
1409       if (thd->is_slave_error || thd->is_fatal_error)
1410       {
1411         /*
1412           Error reporting borrowed from Query_log_event with many excessive
1413           simplifications (we don't honour --slave-skip-errors)
1414         */
1415         uint actual_error= thd->net.last_errno;
1416         rli->report(ERROR_LEVEL, actual_error, NULL,
1417                     "Error '%s' in %s event: when locking tables",
1418                     (actual_error ? thd->net.last_error :
1419                      "unexpected success or fatal error"),
1420                     get_type_str());
1421         thd->is_fatal_error= 1;
1422       }
1423       else
1424       {
1425         rli->report(ERROR_LEVEL, error, NULL,
1426                     "Error in %s event: when locking tables",
1427                     get_type_str());
1428       }
1429       rgi->slave_close_thread_tables(thd);
1430       DBUG_RETURN(error);
1431     }
1432 
1433     /*
1434       When the open and locking succeeded, we check all tables to
1435       ensure that they still have the correct type.
1436     */
1437 
1438     {
1439       TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
1440       for (uint i=0; table_list_ptr&& (i< rgi->tables_to_lock_count);
1441            table_list_ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr->next_global), i++)
1442       {
1443         /*
1444           Please see comment in log_event.cc-Rows_log_event::do_apply_event()
1445           function for the explanation of the below if condition
1446         */
1447         if (table_list_ptr->parent_l)
1448           continue;
1449         /*
1450           We can use a down cast here since we know that every table added
1451           to the tables_to_lock is a RPL_TABLE_LIST (or child table which is
1452           skipped above).
1453         */
1454         RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
1455         TABLE *conv_table;
1456         if (ptr->m_tabledef.compatible_with(thd, rgi, ptr->table, &conv_table))
1457         {
1458           thd->is_slave_error= 1;
1459           rgi->slave_close_thread_tables(thd);
1460           DBUG_RETURN(ERR_BAD_TABLE_DEF);
1461         }
1462         ptr->m_conv_table= conv_table;
1463       }
1464     }
1465 
1466     /*
1467       ... and then we add all the tables to the table map but keep
1468       them in the tables to lock list.
1469 
1470 
1471       We also invalidate the query cache for all the tables, since
1472       they will now be changed.
1473 
1474       TODO [/Matz]: Maybe the query cache should not be invalidated
1475       here? It might be that a table is not changed, even though it
1476       was locked for the statement.  We do know that each
1477       Old_rows_log_event contain at least one row, so after processing one
1478       Old_rows_log_event, we can invalidate the query cache for the
1479       associated table.
1480      */
1481     for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global)
1482     {
1483       rgi->m_table_map.set_table(ptr->table_id, ptr->table);
1484     }
1485 #ifdef HAVE_QUERY_CACHE
1486     query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
1487 #endif
1488   }
1489 
1490   TABLE*
1491     table=
1492     m_table= rgi->m_table_map.get_table(m_table_id);
1493 
1494   if (table)
1495   {
1496     /*
1497       table == NULL means that this table should not be replicated
1498       (this was set up by Table_map_log_event::do_apply_event()
1499       which tested replicate-* rules).
1500     */
1501 
1502     /*
1503       It's not needed to set_time() but
1504       1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1505       much slave is behind
1506       2) it will be needed when we allow replication from a table with no
1507       TIMESTAMP column to a table with one.
1508       So we call set_time(), like in SBR. Presently it changes nothing.
1509     */
1510     thd->set_time(when, when_sec_part);
1511     /*
1512       There are a few flags that are replicated with each row event.
1513       Make sure to set/clear them before executing the main body of
1514       the event.
1515     */
1516     if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1517         thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1518     else
1519         thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1520 
1521     if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1522         thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1523     else
1524         thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1525     /* A small test to verify that objects have consistent types */
1526     DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1527 
1528      if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1529       set_flags(COMPLETE_ROWS_F);
1530 
1531     /*
1532       Set tables write and read sets.
1533 
1534       Read_set contains all slave columns (in case we are going to fetch
1535       a complete record from slave)
1536 
1537       Write_set equals the m_cols bitmap sent from master but it can be
1538       longer if slave has extra columns.
1539      */
1540 
1541     DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1542 
1543     bitmap_set_all(table->read_set);
1544     bitmap_set_all(table->write_set);
1545     if (!get_flags(COMPLETE_ROWS_F))
1546       bitmap_intersect(table->write_set,&m_cols);
1547     table->rpl_write_set= table->write_set;
1548 
1549     // Do event specific preparations
1550 
1551     error= do_before_row_operations(rli);
1552 
1553     // row processing loop
1554 
1555     while (error == 0 && m_curr_row < m_rows_end)
1556     {
1557       /* in_use can have been set to NULL in close_tables_for_reopen */
1558       THD* old_thd= table->in_use;
1559       if (!table->in_use)
1560         table->in_use= thd;
1561 
1562       error= do_exec_row(rgi);
1563 
1564       DBUG_PRINT("info", ("error: %d", error));
1565       DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
1566 
1567       table->in_use = old_thd;
1568       switch (error)
1569       {
1570       case 0:
1571 	break;
1572 
1573       /* Some recoverable errors */
1574       case HA_ERR_RECORD_CHANGED:
1575       case HA_ERR_KEY_NOT_FOUND:	/* Idempotency support: OK if
1576                                            tuple does not exist */
1577         error= 0;
1578         break;
1579 
1580       default:
1581         rli->report(ERROR_LEVEL, thd->net.last_errno, NULL,
1582                     "Error in %s event: row application failed. %s",
1583                     get_type_str(), thd->net.last_error);
1584         thd->is_slave_error= 1;
1585 	break;
1586       }
1587 
1588       /*
1589        If m_curr_row_end  was not set during event execution (e.g., because
1590        of errors) we can't proceed to the next row. If the error is transient
1591        (i.e., error==0 at this point) we must call unpack_current_row() to set
1592        m_curr_row_end.
1593       */
1594 
1595       DBUG_PRINT("info", ("error: %d", error));
1596       DBUG_PRINT("info", ("curr_row: %p; curr_row_end:%p; rows_end: %p",
1597                           m_curr_row, m_curr_row_end, m_rows_end));
1598 
1599       if (!m_curr_row_end && likely(!error))
1600         unpack_current_row(rgi);
1601 
1602       // at this moment m_curr_row_end should be set
1603       DBUG_ASSERT(error || m_curr_row_end != NULL);
1604       DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
1605       DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
1606 
1607       m_curr_row= m_curr_row_end;
1608 
1609     } // row processing loop
1610 
1611     DBUG_EXECUTE_IF("stop_slave_middle_group",
1612                     const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1613     error= do_after_row_operations(rli, error);
1614   } // if (table)
1615 
1616   if (unlikely(error))
1617   {                     /* error has occurred during the transaction */
1618     rli->report(ERROR_LEVEL, thd->net.last_errno, NULL,
1619                 "Error in %s event: error during transaction execution "
1620                 "on table %s.%s. %s",
1621                 get_type_str(), table->s->db.str,
1622                 table->s->table_name.str,
1623                 thd->net.last_error);
1624 
1625     /*
1626       If one day we honour --skip-slave-errors in row-based replication, and
1627       the error should be skipped, then we would clear mappings, rollback,
1628       close tables, but the slave SQL thread would not stop and then may
1629       assume the mapping is still available, the tables are still open...
1630       So then we should clear mappings/rollback/close here only if this is a
1631       STMT_END_F.
1632       For now we code, knowing that error is not skippable and so slave SQL
1633       thread is certainly going to stop.
1634       rollback at the caller along with sbr.
1635     */
1636     thd->reset_current_stmt_binlog_format_row();
1637     rgi->cleanup_context(thd, error);
1638     thd->is_slave_error= 1;
1639     DBUG_RETURN(error);
1640   }
1641 
1642   /*
1643     This code would ideally be placed in do_update_pos() instead, but
1644     since we have no access to table there, we do the setting of
1645     last_event_start_time here instead.
1646   */
1647   if (table && (table->s->primary_key == MAX_KEY) &&
1648       !use_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1649   {
1650     /*
1651       ------------ Temporary fix until WL#2975 is implemented ---------
1652 
1653       This event is not the last one (no STMT_END_F). If we stop now
1654       (in case of terminate_slave_thread()), how will we restart? We
1655       have to restart from Table_map_log_event, but as this table is
1656       not transactional, the rows already inserted will still be
1657       present, and idempotency is not guaranteed (no PK) so we risk
1658       that repeating leads to double insert. So we desperately try to
1659       continue, hope we'll eventually leave this buggy situation (by
1660       executing the final Old_rows_log_event). If we are in a hopeless
1661       wait (reached end of last relay log and nothing gets appended
1662       there), we timeout after one minute, and notify DBA about the
1663       problem.  When WL#2975 is implemented, just remove the member
1664       Relay_log_info::last_event_start_time and all its occurrences.
1665     */
1666     rgi->last_event_start_time= my_time(0);
1667   }
1668 
1669   if (get_flags(STMT_END_F))
1670   {
1671     /*
1672       This is the end of a statement or transaction, so close (and
1673       unlock) the tables we opened when processing the
1674       Table_map_log_event starting the statement.
1675 
1676       OBSERVER.  This will clear *all* mappings, not only those that
1677       are open for the table. There is not good handle for on-close
1678       actions for tables.
1679 
1680       NOTE. Even if we have no table ('table' == 0) we still need to be
1681       here, so that we increase the group relay log position. If we didn't, we
1682       could have a group relay log position which lags behind "forever"
1683       (assume the last master's transaction is ignored by the slave because of
1684       replicate-ignore rules).
1685     */
1686     int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1687 
1688     /*
1689       If this event is not in a transaction, the call below will, if some
1690       transactional storage engines are involved, commit the statement into
1691       them and flush the pending event to binlog.
1692       If this event is in a transaction, the call will do nothing, but a
1693       Xid_log_event will come next which will, if some transactional engines
1694       are involved, commit the transaction and flush the pending event to the
1695       binlog.
1696       If there was a deadlock the transaction should have been rolled back
1697       already. So there should be no need to rollback the transaction.
1698     */
1699     DBUG_ASSERT(! thd->transaction_rollback_request);
1700     if (unlikely((error= (binlog_error ?
1701                           trans_rollback_stmt(thd) :
1702                           trans_commit_stmt(thd)))))
1703       rli->report(ERROR_LEVEL, error, NULL,
1704                   "Error in %s event: commit of row events failed, "
1705                   "table `%s`.`%s`",
1706                   get_type_str(), m_table->s->db.str,
1707                   m_table->s->table_name.str);
1708     error|= binlog_error;
1709 
1710     /*
1711       Now what if this is not a transactional engine? we still need to
1712       flush the pending event to the binlog; we did it with
1713       thd->binlog_flush_pending_rows_event(). Note that we imitate
1714       what is done for real queries: a call to
1715       ha_autocommit_or_rollback() (sometimes only if involves a
1716       transactional engine), and a call to be sure to have the pending
1717       event flushed.
1718     */
1719 
1720     thd->reset_current_stmt_binlog_format_row();
1721     rgi->cleanup_context(thd, 0);
1722   }
1723 
1724   DBUG_RETURN(error);
1725 }
1726 
1727 
1728 Log_event::enum_skip_reason
do_shall_skip(rpl_group_info * rgi)1729 Old_rows_log_event::do_shall_skip(rpl_group_info *rgi)
1730 {
1731   /*
1732     If the slave skip counter is 1 and this event does not end a
1733     statement, then we should not start executing on the next event.
1734     Otherwise, we defer the decision to the normal skipping logic.
1735   */
1736   if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1737     return Log_event::EVENT_SKIP_IGNORE;
1738   else
1739     return Log_event::do_shall_skip(rgi);
1740 }
1741 
1742 int
do_update_pos(rpl_group_info * rgi)1743 Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
1744 {
1745   Relay_log_info *rli= rgi->rli;
1746   int error= 0;
1747   DBUG_ENTER("Old_rows_log_event::do_update_pos");
1748 
1749   DBUG_PRINT("info", ("flags: %s",
1750                       get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1751 
1752   if (get_flags(STMT_END_F))
1753   {
1754     /*
1755       Indicate that a statement is finished.
1756       Step the group log position if we are not in a transaction,
1757       otherwise increase the event log position.
1758      */
1759     error= rli->stmt_done(log_pos, thd, rgi);
1760     /*
1761       Clear any errors in thd->net.last_err*. It is not known if this is
1762       needed or not. It is believed that any errors that may exist in
1763       thd->net.last_err* are allowed. Examples of errors are "key not
1764       found", which is produced in the test case rpl_row_conflicts.test
1765     */
1766     thd->clear_error();
1767   }
1768   else
1769   {
1770     rgi->inc_event_relay_log_pos();
1771   }
1772 
1773   DBUG_RETURN(error);
1774 }
1775 
1776 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1777 
1778 
1779 #ifndef MYSQL_CLIENT
write_data_header()1780 bool Old_rows_log_event::write_data_header()
1781 {
1782   uchar buf[ROWS_HEADER_LEN];	// No need to init the buffer
1783 
1784   // This method should not be reached.
1785   assert(0);
1786 
1787   DBUG_ASSERT(m_table_id != ~0UL);
1788   DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1789                   {
1790                     int4store(buf + 0, m_table_id);
1791                     int2store(buf + 4, m_flags);
1792                     return write_data(buf, 6);
1793                   });
1794   int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
1795   int2store(buf + RW_FLAGS_OFFSET, m_flags);
1796   return write_data(buf, ROWS_HEADER_LEN);
1797 }
1798 
1799 
write_data_body()1800 bool Old_rows_log_event::write_data_body()
1801 {
1802   /*
1803      Note that this should be the number of *bits*, not the number of
1804      bytes.
1805   */
1806   uchar sbuf[MAX_INT_WIDTH];
1807   my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1808 
1809   // This method should not be reached.
1810   assert(0);
1811 
1812   bool res= false;
1813   uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1814   DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1815 
1816   DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1817   res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
1818 
1819   DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1820   res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
1821   DBUG_DUMP("rows", m_rows_buf, data_size);
1822   res= res || write_data(m_rows_buf, (size_t) data_size);
1823 
1824   return res;
1825 
1826 }
1827 #endif
1828 
1829 
1830 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
pack_info(Protocol * protocol)1831 void Old_rows_log_event::pack_info(Protocol *protocol)
1832 {
1833   char buf[256];
1834   char const *const flagstr=
1835     get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1836   size_t bytes= my_snprintf(buf, sizeof(buf),
1837                                "table_id: %lu%s", m_table_id, flagstr);
1838   protocol->store(buf, bytes, &my_charset_bin);
1839 }
1840 #endif
1841 
1842 
1843 #ifdef MYSQL_CLIENT
1844 /* Method duplicates Rows_log_event's one */
print_helper(FILE * file,PRINT_EVENT_INFO * print_event_info,char const * const name)1845 bool Old_rows_log_event::print_helper(FILE *file,
1846                                       PRINT_EVENT_INFO *print_event_info,
1847                                       char const *const name)
1848 {
1849   IO_CACHE *const head= &print_event_info->head_cache;
1850   IO_CACHE *const body= &print_event_info->body_cache;
1851   IO_CACHE *const tail= &print_event_info->tail_cache;
1852   bool do_print_encoded=
1853     print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
1854     print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
1855     !print_event_info->short_form;
1856 
1857   if (!print_event_info->short_form)
1858   {
1859     if (print_header(head, print_event_info, !do_print_encoded) ||
1860         my_b_printf(head, "\t%s: table id %lu%s\n",
1861                     name, m_table_id,
1862                     do_print_encoded ? " flags: STMT_END_F" : "") ||
1863         print_base64(body, print_event_info, do_print_encoded))
1864       goto err;
1865   }
1866 
1867   if (get_flags(STMT_END_F))
1868   {
1869     if (copy_event_cache_to_file_and_reinit(head, file) ||
1870         copy_cache_to_file_wrapped(body, file, do_print_encoded,
1871                                    print_event_info->delimiter,
1872                                    print_event_info->verbose) ||
1873         copy_event_cache_to_file_and_reinit(tail, file))
1874       goto err;
1875   }
1876   return 0;
1877 err:
1878   return 1;
1879 }
1880 #endif
1881 
1882 
1883 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1884 /**
1885   Write the current row into event's table.
1886 
1887   The row is located in the row buffer, pointed by @c m_curr_row member.
1888   Number of columns of the row is stored in @c m_width member (it can be
1889   different from the number of columns in the table to which we insert).
1890   Bitmap @c m_cols indicates which columns are present in the row. It is assumed
1891   that event's table is already open and pointed by @c m_table.
1892 
1893   If the same record already exists in the table it can be either overwritten
1894   or an error is reported depending on the value of @c overwrite flag
1895   (error reporting not yet implemented). Note that the matching record can be
1896   different from the row we insert if we use primary keys to identify records in
1897   the table.
1898 
1899   The row to be inserted can contain values only for selected columns. The
1900   missing columns are filled with default values using @c prepare_record()
1901   function. If a matching record is found in the table and @c overwritte is
1902   true, the missing columns are taken from it.
1903 
1904   @param  rli   Relay log info (needed for row unpacking).
1905   @param  overwrite
1906                 Shall we overwrite if the row already exists or signal
1907                 error (currently ignored).
1908 
1909   @returns Error code on failure, 0 on success.
1910 
1911   This method, if successful, sets @c m_curr_row_end pointer to point at the
1912   next row in the rows buffer. This is done when unpacking the row to be
1913   inserted.
1914 
1915   @note If a matching record is found, it is either updated using
1916   @c ha_update_row() or first deleted and then new record written.
1917 */
1918 
1919 int
write_row(rpl_group_info * rgi,const bool overwrite)1920 Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite)
1921 {
1922   DBUG_ENTER("write_row");
1923   DBUG_ASSERT(m_table != NULL && thd != NULL);
1924 
1925   TABLE *table= m_table;  // pointer to event's table
1926   int error;
1927   int keynum;
1928   auto_afree_ptr<char> key(NULL);
1929 
1930   /* fill table->record[0] with default values */
1931 
1932   if (unlikely((error=
1933                 prepare_record(table, m_width,
1934                                TRUE /* check if columns have def. values */))))
1935     DBUG_RETURN(error);
1936 
1937   /* unpack row into table->record[0] */
1938   if ((error= unpack_current_row(rgi)))
1939     DBUG_RETURN(error);
1940 
1941 #ifndef DBUG_OFF
1942   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1943   DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
1944   DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
1945 #endif
1946 
1947   /*
1948     Try to write record. If a corresponding record already exists in the table,
1949     we try to change it using ha_update_row() if possible. Otherwise we delete
1950     it and repeat the whole process again.
1951 
1952     TODO: Add safety measures against infinite looping.
1953    */
1954 
1955   while (unlikely(error= table->file->ha_write_row(table->record[0])))
1956   {
1957     if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
1958     {
1959       table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
1960       DBUG_RETURN(error);
1961     }
1962     if (unlikely((keynum= table->file->get_dup_key(error)) < 0))
1963     {
1964       DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
1965       table->file->print_error(error, MYF(0));
1966       /*
1967         We failed to retrieve the duplicate key
1968         - either because the error was not "duplicate key" error
1969         - or because the information which key is not available
1970       */
1971       DBUG_RETURN(error);
1972     }
1973 
1974     /*
1975        We need to retrieve the old row into record[1] to be able to
1976        either update or delete the offending record.  We either:
1977 
1978        - use rnd_pos() with a row-id (available as dupp_row) to the
1979          offending row, if that is possible (MyISAM and Blackhole), or else
1980 
1981        - use index_read_idx() with the key that is duplicated, to
1982          retrieve the offending row.
1983      */
1984     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1985     {
1986       DBUG_PRINT("info",("Locating offending record using rnd_pos()"));
1987       error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
1988       if (unlikely(error))
1989       {
1990         DBUG_PRINT("info",("rnd_pos() returns error %d",error));
1991         table->file->print_error(error, MYF(0));
1992         DBUG_RETURN(error);
1993       }
1994     }
1995     else
1996     {
1997       DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
1998 
1999       if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2000       {
2001         DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
2002         DBUG_RETURN(my_errno);
2003       }
2004 
2005       if (key.get() == NULL)
2006       {
2007         key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
2008         if (unlikely(key.get() == NULL))
2009         {
2010           DBUG_PRINT("info",("Can't allocate key buffer"));
2011           DBUG_RETURN(ENOMEM);
2012         }
2013       }
2014 
2015       key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
2016                0);
2017       error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2018                                                 (const uchar*)key.get(),
2019                                                 HA_WHOLE_KEY,
2020                                                 HA_READ_KEY_EXACT);
2021       if (unlikely(error))
2022       {
2023         DBUG_PRINT("info",("index_read_idx() returns error %d", error));
2024         table->file->print_error(error, MYF(0));
2025         DBUG_RETURN(error);
2026       }
2027     }
2028 
2029     /*
2030        Now, record[1] should contain the offending row.  That
2031        will enable us to update it or, alternatively, delete it (so
2032        that we can insert the new row afterwards).
2033      */
2034 
2035     /*
2036       If row is incomplete we will use the record found to fill
2037       missing columns.
2038     */
2039     if (!get_flags(COMPLETE_ROWS_F))
2040     {
2041       restore_record(table,record[1]);
2042       error= unpack_current_row(rgi);
2043     }
2044 
2045 #ifndef DBUG_OFF
2046     DBUG_PRINT("debug",("preparing for update: before and after image"));
2047     DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2048     DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2049 #endif
2050 
2051     /*
2052        REPLACE is defined as either INSERT or DELETE + INSERT.  If
2053        possible, we can replace it with an UPDATE, but that will not
2054        work on InnoDB if FOREIGN KEY checks are necessary.
2055 
2056        I (Matz) am not sure of the reason for the last_uniq_key()
2057        check as, but I'm guessing that it's something along the
2058        following lines.
2059 
2060        Suppose that we got the duplicate key to be a key that is not
2061        the last unique key for the table and we perform an update:
2062        then there might be another key for which the unique check will
2063        fail, so we're better off just deleting the row and inserting
2064        the correct row.
2065      */
2066     if (last_uniq_key(table, keynum) &&
2067         !table->file->referenced_by_foreign_key())
2068     {
2069       DBUG_PRINT("info",("Updating row using ha_update_row()"));
2070       error=table->file->ha_update_row(table->record[1],
2071                                        table->record[0]);
2072       switch (error) {
2073 
2074       case HA_ERR_RECORD_IS_THE_SAME:
2075         DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2076                            " ha_update_row()"));
2077         error= 0;
2078 
2079       case 0:
2080         break;
2081 
2082       default:
2083         DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2084         table->file->print_error(error, MYF(0));
2085       }
2086 
2087       DBUG_RETURN(error);
2088     }
2089     else
2090     {
2091       DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2092       if (unlikely((error= table->file->ha_delete_row(table->record[1]))))
2093       {
2094         DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2095         table->file->print_error(error, MYF(0));
2096         DBUG_RETURN(error);
2097       }
2098       /* Will retry ha_write_row() with the offending row removed. */
2099     }
2100   }
2101 
2102   DBUG_RETURN(error);
2103 }
2104 
2105 
2106 /**
2107   Locate the current row in event's table.
2108 
2109   The current row is pointed by @c m_curr_row. Member @c m_width tells how many
2110   columns are there in the row (this can be differnet from the number of columns
2111   in the table). It is assumed that event's table is already open and pointed
2112   by @c m_table.
2113 
2114   If a corresponding record is found in the table it is stored in
2115   @c m_table->record[0]. Note that when record is located based on a primary
2116   key, it is possible that the record found differs from the row being located.
2117 
2118   If no key is specified or table does not have keys, a table scan is used to
2119   find the row. In that case the row should be complete and contain values for
2120   all columns. However, it can still be shorter than the table, i.e. the table
2121   can contain extra columns not present in the row. It is also possible that
2122   the table has fewer columns than the row being located.
2123 
2124   @returns Error code on failure, 0 on success.
2125 
2126   @post In case of success @c m_table->record[0] contains the record found.
2127   Also, the internal "cursor" of the table is positioned at the record found.
2128 
2129   @note If the engine allows random access of the records, a combination of
2130   @c position() and @c rnd_pos() will be used.
2131 
2132   Note that one MUST call ha_index_or_rnd_end() after this function if
2133   it returns 0 as we must leave the row position in the handler intact
2134   for any following update/delete command.
2135 */
2136 
find_row(rpl_group_info * rgi)2137 int Old_rows_log_event::find_row(rpl_group_info *rgi)
2138 {
2139   DBUG_ENTER("find_row");
2140 
2141   DBUG_ASSERT(m_table && m_table->in_use != NULL);
2142 
2143   TABLE *table= m_table;
2144   int error;
2145 
2146   /* unpack row - missing fields get default values */
2147 
2148   // TODO: shall we check and report errors here?
2149   prepare_record(table, m_width, FALSE /* don't check errors */);
2150   error= unpack_current_row(rgi);
2151 
2152 #ifndef DBUG_OFF
2153   DBUG_PRINT("info",("looking for the following record"));
2154   DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2155 #endif
2156 
2157   if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2158       table->s->primary_key < MAX_KEY)
2159   {
2160     /*
2161       Use a more efficient method to fetch the record given by
2162       table->record[0] if the engine allows it.  We first compute a
2163       row reference using the position() member function (it will be
2164       stored in table->file->ref) and the use rnd_pos() to position
2165       the "cursor" (i.e., record[0] in this case) at the correct row.
2166 
2167       TODO: Add a check that the correct record has been fetched by
2168       comparing with the original record. Take into account that the
2169       record on the master and slave can be of different
2170       length. Something along these lines should work:
2171 
2172       ADD>>>  store_record(table,record[1]);
2173               int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
2174       ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
2175                                  table->s->reclength) == 0);
2176 
2177     */
2178     DBUG_PRINT("info",("locating record using primary key (position)"));
2179     int error= table->file->ha_rnd_pos_by_record(table->record[0]);
2180     if (unlikely(error))
2181     {
2182       DBUG_PRINT("info",("rnd_pos returns error %d",error));
2183       table->file->print_error(error, MYF(0));
2184     }
2185     DBUG_RETURN(error);
2186   }
2187 
2188   // We can't use position() - try other methods.
2189 
2190   /*
2191     We need to retrieve all fields
2192     TODO: Move this out from this function to main loop
2193    */
2194   table->use_all_columns();
2195 
2196   /*
2197     Save copy of the record in table->record[1]. It might be needed
2198     later if linear search is used to find exact match.
2199    */
2200   store_record(table,record[1]);
2201 
2202   if (table->s->keys > 0)
2203   {
2204     DBUG_PRINT("info",("locating record using primary key (index_read)"));
2205 
2206     /* We have a key: search the table using the index */
2207     if (!table->file->inited &&
2208         unlikely(error= table->file->ha_index_init(0, FALSE)))
2209     {
2210       DBUG_PRINT("info",("ha_index_init returns error %d",error));
2211       table->file->print_error(error, MYF(0));
2212       DBUG_RETURN(error);
2213     }
2214 
2215     /* Fill key data for the row */
2216 
2217     DBUG_ASSERT(m_key);
2218     key_copy(m_key, table->record[0], table->key_info, 0);
2219 
2220     /*
2221       Don't print debug messages when running valgrind since they can
2222       trigger false warnings.
2223      */
2224 #ifndef HAVE_valgrind
2225     DBUG_DUMP("key data", m_key, table->key_info->key_length);
2226 #endif
2227 
2228     /*
2229       We need to set the null bytes to ensure that the filler bit are
2230       all set when returning.  There are storage engines that just set
2231       the necessary bits on the bytes and don't set the filler bits
2232       correctly.
2233     */
2234     my_ptrdiff_t const pos=
2235       table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2236     table->record[0][pos]= 0xFF;
2237 
2238     if (unlikely((error= table->file->ha_index_read_map(table->record[0],
2239                                                         m_key,
2240                                                         HA_WHOLE_KEY,
2241                                                         HA_READ_KEY_EXACT))))
2242     {
2243       DBUG_PRINT("info",("no record matching the key found in the table"));
2244       table->file->print_error(error, MYF(0));
2245       table->file->ha_index_end();
2246       DBUG_RETURN(error);
2247     }
2248 
2249   /*
2250     Don't print debug messages when running valgrind since they can
2251     trigger false warnings.
2252    */
2253 #ifndef HAVE_valgrind
2254     DBUG_PRINT("info",("found first matching record"));
2255     DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2256 #endif
2257     /*
2258       Below is a minor "optimization".  If the key (i.e., key number
2259       0) has the HA_NOSAME flag set, we know that we have found the
2260       correct record (since there can be no duplicates); otherwise, we
2261       have to compare the record with the one found to see if it is
2262       the correct one.
2263 
2264       CAVEAT! This behaviour is essential for the replication of,
2265       e.g., the mysql.proc table since the correct record *shall* be
2266       found using the primary key *only*.  There shall be no
2267       comparison of non-PK columns to decide if the correct record is
2268       found.  I can see no scenario where it would be incorrect to
2269       chose the row to change only using a PK or an UNNI.
2270     */
2271     if (table->key_info->flags & HA_NOSAME)
2272     {
2273       /* Unique does not have non nullable part */
2274       if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2275       {
2276         DBUG_RETURN(0);
2277       }
2278       else
2279       {
2280         KEY *keyinfo= table->key_info;
2281         /*
2282           Unique has nullable part. We need to check if there is any
2283           field in the BI image that is null and part of UNNI.
2284         */
2285         bool null_found= FALSE;
2286         for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2287         {
2288           uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2289           Field **f= table->field+fieldnr;
2290           null_found= (*f)->is_null();
2291         }
2292 
2293         if (!null_found)
2294         {
2295           DBUG_RETURN(0);
2296         }
2297 
2298         /* else fall through to index scan */
2299       }
2300     }
2301 
2302     /*
2303       In case key is not unique, we still have to iterate over records found
2304       and find the one which is identical to the row given. A copy of the
2305       record we are looking for is stored in record[1].
2306      */
2307     DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2308 
2309     while (record_compare(table))
2310     {
2311       while (unlikely(error= table->file->ha_index_next(table->record[0])))
2312       {
2313         DBUG_PRINT("info",("no record matching the given row found"));
2314         table->file->print_error(error, MYF(0));
2315         (void) table->file->ha_index_end();
2316         DBUG_RETURN(error);
2317       }
2318     }
2319   }
2320   else
2321   {
2322     DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
2323 
2324     int restart_count= 0; // Number of times scanning has restarted from top
2325 
2326     /* We don't have a key: search the table using rnd_next() */
2327     if (unlikely((error= table->file->ha_rnd_init_with_error(1))))
2328     {
2329       DBUG_PRINT("info",("error initializing table scan"
2330                          " (ha_rnd_init returns %d)",error));
2331       DBUG_RETURN(error);
2332     }
2333 
2334     /* Continue until we find the right record or have made a full loop */
2335     do
2336     {
2337   restart_rnd_next:
2338       error= table->file->ha_rnd_next(table->record[0]);
2339 
2340       switch (error) {
2341 
2342       case 0:
2343         break;
2344 
2345       case HA_ERR_END_OF_FILE:
2346         if (++restart_count < 2)
2347         {
2348           int error2;
2349           table->file->ha_rnd_end();
2350           if (unlikely((error2= table->file->ha_rnd_init_with_error(1))))
2351             DBUG_RETURN(error2);
2352           goto restart_rnd_next;
2353         }
2354         break;
2355 
2356       default:
2357         DBUG_PRINT("info", ("Failed to get next record"
2358                             " (rnd_next returns %d)",error));
2359         table->file->print_error(error, MYF(0));
2360         table->file->ha_rnd_end();
2361         DBUG_RETURN(error);
2362       }
2363     }
2364     while (restart_count < 2 && record_compare(table));
2365 
2366     /*
2367       Note: above record_compare will take into accout all record fields
2368       which might be incorrect in case a partial row was given in the event
2369      */
2370 
2371     /*
2372       Have to restart the scan to be able to fetch the next row.
2373     */
2374     if (restart_count == 2)
2375       DBUG_PRINT("info", ("Record not found"));
2376     else
2377       DBUG_DUMP("record found", table->record[0], table->s->reclength);
2378     if (error)
2379       table->file->ha_rnd_end();
2380 
2381     DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
2382     DBUG_RETURN(error);
2383   }
2384 
2385   DBUG_RETURN(0);
2386 }
2387 
2388 #endif
2389 
2390 
2391 /**************************************************************************
2392 	Write_rows_log_event member functions
2393 **************************************************************************/
2394 
2395 /*
2396   Constructor used to build an event for writing to the binary log.
2397  */
2398 #if !defined(MYSQL_CLIENT)
Write_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid_arg,MY_BITMAP const * cols,bool is_transactional)2399 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2400                                                    TABLE *tbl_arg,
2401                                                    ulong tid_arg,
2402                                                    MY_BITMAP const *cols,
2403                                                    bool is_transactional)
2404   : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2405 {
2406 
2407   // This constructor should not be reached.
2408   assert(0);
2409 
2410 }
2411 #endif
2412 
2413 
2414 /*
2415   Constructor used by slave to read the event from the binary log.
2416  */
2417 #ifdef HAVE_REPLICATION
Write_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2418 Write_rows_log_event_old::Write_rows_log_event_old(const char *buf,
2419                                                    uint event_len,
2420                                                    const Format_description_log_event
2421                                                    *description_event)
2422 : Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT,
2423                      description_event)
2424 {
2425 }
2426 #endif
2427 
2428 
2429 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2430 int
do_before_row_operations(const Slave_reporting_capability * const)2431 Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2432 {
2433   int error= 0;
2434 
2435   /*
2436     We are using REPLACE semantics and not INSERT IGNORE semantics
2437     when writing rows, that is: new rows replace old rows.  We need to
2438     inform the storage engine that it should use this behaviour.
2439   */
2440 
2441   /* Tell the storage engine that we are using REPLACE semantics. */
2442   thd->lex->duplicates= DUP_REPLACE;
2443 
2444   thd->lex->sql_command= SQLCOM_REPLACE;
2445   /*
2446      Do not raise the error flag in case of hitting to an unique attribute
2447   */
2448   m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2449   m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2450   m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2451   m_table->file->ha_start_bulk_insert(0);
2452   return error;
2453 }
2454 
2455 
2456 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2457 Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2458                                                   int error)
2459 {
2460   int local_error= 0;
2461   m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2462   m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2463   /*
2464     resetting the extra with
2465     table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2466     fires bug#27077
2467     todo: explain or fix
2468   */
2469   if (unlikely((local_error= m_table->file->ha_end_bulk_insert())))
2470   {
2471     m_table->file->print_error(local_error, MYF(0));
2472   }
2473   return error? error : local_error;
2474 }
2475 
2476 
2477 int
do_exec_row(rpl_group_info * rgi)2478 Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2479 {
2480   DBUG_ASSERT(m_table != NULL);
2481   int error= write_row(rgi, TRUE /* overwrite */);
2482 
2483   if (unlikely(error) && !thd->net.last_errno)
2484     thd->net.last_errno= error;
2485 
2486   return error;
2487 }
2488 
2489 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2490 
2491 
2492 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2493 bool Write_rows_log_event_old::print(FILE *file,
2494                                      PRINT_EVENT_INFO* print_event_info)
2495 {
2496   return Old_rows_log_event::print_helper(file, print_event_info,
2497                                           "Write_rows_old");
2498 }
2499 #endif
2500 
2501 
2502 /**************************************************************************
2503 	Delete_rows_log_event member functions
2504 **************************************************************************/
2505 
2506 /*
2507   Constructor used to build an event for writing to the binary log.
2508  */
2509 
2510 #ifndef MYSQL_CLIENT
Delete_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2511 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2512                                                      TABLE *tbl_arg,
2513                                                      ulong tid,
2514                                                      MY_BITMAP const *cols,
2515                                                      bool is_transactional)
2516   : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2517     m_after_image(NULL), m_memory(NULL)
2518 {
2519 
2520   // This constructor should not be reached.
2521   assert(0);
2522 
2523 }
2524 #endif /* #if !defined(MYSQL_CLIENT) */
2525 
2526 
2527 /*
2528   Constructor used by slave to read the event from the binary log.
2529  */
2530 #ifdef HAVE_REPLICATION
Delete_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2531 Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf,
2532                                                      uint event_len,
2533                                                      const Format_description_log_event
2534                                                      *description_event)
2535   : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT,
2536                        description_event),
2537     m_after_image(NULL), m_memory(NULL)
2538 {
2539 }
2540 #endif
2541 
2542 
2543 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2544 
2545 int
do_before_row_operations(const Slave_reporting_capability * const)2546 Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2547 {
2548   if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2549       m_table->s->primary_key < MAX_KEY)
2550   {
2551     /*
2552       We don't need to allocate any memory for m_key since it is not used.
2553     */
2554     return 0;
2555   }
2556 
2557   if (m_table->s->keys > 0)
2558   {
2559     // Allocate buffer for key searches
2560     m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2561     if (!m_key)
2562       return HA_ERR_OUT_OF_MEM;
2563   }
2564   return 0;
2565 }
2566 
2567 
2568 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2569 Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2570                                                    int error)
2571 {
2572   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2573   m_table->file->ha_index_or_rnd_end();
2574   my_free(m_key);
2575   m_key= NULL;
2576 
2577   return error;
2578 }
2579 
2580 
do_exec_row(rpl_group_info * rgi)2581 int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2582 {
2583   int error;
2584   DBUG_ASSERT(m_table != NULL);
2585 
2586   if (likely(!(error= find_row(rgi))) )
2587   {
2588     /*
2589       Delete the record found, located in record[0]
2590     */
2591     error= m_table->file->ha_delete_row(m_table->record[0]);
2592     m_table->file->ha_index_or_rnd_end();
2593   }
2594   return error;
2595 }
2596 
2597 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2598 
2599 
2600 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2601 bool Delete_rows_log_event_old::print(FILE *file,
2602                                       PRINT_EVENT_INFO* print_event_info)
2603 {
2604   return Old_rows_log_event::print_helper(file, print_event_info,
2605                                           "Delete_rows_old");
2606 }
2607 #endif
2608 
2609 
2610 /**************************************************************************
2611 	Update_rows_log_event member functions
2612 **************************************************************************/
2613 
2614 /*
2615   Constructor used to build an event for writing to the binary log.
2616  */
2617 #if !defined(MYSQL_CLIENT)
Update_rows_log_event_old(THD * thd_arg,TABLE * tbl_arg,ulong tid,MY_BITMAP const * cols,bool is_transactional)2618 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2619                                                      TABLE *tbl_arg,
2620                                                      ulong tid,
2621                                                      MY_BITMAP const *cols,
2622                                                      bool is_transactional)
2623   : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2624     m_after_image(NULL), m_memory(NULL)
2625 {
2626 
2627   // This constructor should not be reached.
2628   assert(0);
2629 }
2630 #endif /* !defined(MYSQL_CLIENT) */
2631 
2632 
2633 /*
2634   Constructor used by slave to read the event from the binary log.
2635  */
2636 #ifdef HAVE_REPLICATION
Update_rows_log_event_old(const char * buf,uint event_len,const Format_description_log_event * description_event)2637 Update_rows_log_event_old::Update_rows_log_event_old(const char *buf,
2638                                                      uint event_len,
2639                                                      const
2640                                                      Format_description_log_event
2641                                                      *description_event)
2642   : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT,
2643                        description_event),
2644     m_after_image(NULL), m_memory(NULL)
2645 {
2646 }
2647 #endif
2648 
2649 
2650 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2651 
2652 int
do_before_row_operations(const Slave_reporting_capability * const)2653 Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2654 {
2655   if (m_table->s->keys > 0)
2656   {
2657     // Allocate buffer for key searches
2658     m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2659     if (!m_key)
2660       return HA_ERR_OUT_OF_MEM;
2661   }
2662 
2663   return 0;
2664 }
2665 
2666 
2667 int
do_after_row_operations(const Slave_reporting_capability * const,int error)2668 Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2669                                                    int error)
2670 {
2671   /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2672   m_table->file->ha_index_or_rnd_end();
2673   my_free(m_key); // Free for multi_malloc
2674   m_key= NULL;
2675 
2676   return error;
2677 }
2678 
2679 
2680 int
do_exec_row(rpl_group_info * rgi)2681 Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2682 {
2683   DBUG_ASSERT(m_table != NULL);
2684 
2685   int error= find_row(rgi);
2686   if (unlikely(error))
2687   {
2688     /*
2689       We need to read the second image in the event of error to be
2690       able to skip to the next pair of updates
2691     */
2692     m_curr_row= m_curr_row_end;
2693     unpack_current_row(rgi);
2694     return error;
2695   }
2696 
2697   /*
2698     This is the situation after locating BI:
2699 
2700     ===|=== before image ====|=== after image ===|===
2701        ^                     ^
2702        m_curr_row            m_curr_row_end
2703 
2704     BI found in the table is stored in record[0]. We copy it to record[1]
2705     and unpack AI to record[0].
2706    */
2707 
2708   store_record(m_table,record[1]);
2709 
2710   m_curr_row= m_curr_row_end;
2711   error= unpack_current_row(rgi); // this also updates m_curr_row_end
2712 
2713   /*
2714     Now we have the right row to update.  The old row (the one we're
2715     looking for) is in record[1] and the new row is in record[0].
2716   */
2717 #ifndef HAVE_valgrind
2718   /*
2719     Don't print debug messages when running valgrind since they can
2720     trigger false warnings.
2721    */
2722   DBUG_PRINT("info",("Updating row in table"));
2723   DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2724   DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2725 #endif
2726 
2727   error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2728   m_table->file->ha_index_or_rnd_end();
2729 
2730   if (unlikely(error == HA_ERR_RECORD_IS_THE_SAME))
2731     error= 0;
2732 
2733   return error;
2734 }
2735 
2736 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2737 
2738 
2739 #ifdef MYSQL_CLIENT
print(FILE * file,PRINT_EVENT_INFO * print_event_info)2740 bool Update_rows_log_event_old::print(FILE *file,
2741                                       PRINT_EVENT_INFO* print_event_info)
2742 {
2743   return Old_rows_log_event::print_helper(file, print_event_info,
2744                                           "Update_rows_old");
2745 }
2746 #endif
2747