1 /* Copyright (C) 2006, 2007 MySQL AB
2    Copyright (C) 2010, 2013, Monty Program Ab.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 /*
18   WL#3072 Maria recovery
19   First version written by Guilhem Bichot on 2006-04-27.
20 */
21 
22 /* Here is the implementation of this module */
23 
24 #include "maria_def.h"
25 #include "ma_recovery.h"
26 #include "ma_blockrec.h"
27 #include "ma_checkpoint.h"
28 #include "trnman.h"
29 #include "ma_key_recover.h"
30 #include "ma_recovery_util.h"
31 #include "hash.h"
32 #include <my_check_opt.h>
33 
34 struct st_trn_for_recovery /* used only in the REDO phase */
35 {
36   LSN group_start_lsn, undo_lsn, first_undo_lsn;
37   TrID long_trid;
38 };
39 struct st_table_for_recovery /* used in the REDO and UNDO phase */
40 {
41   MARIA_HA *info;
42 };
43 /* Variables used by all functions of this module. Ok as single-threaded */
44 static struct st_trn_for_recovery *all_active_trans;
45 static struct st_table_for_recovery *all_tables;
46 static struct st_dirty_page *dirty_pages_pool;
47 static LSN current_group_end_lsn;
48 #ifndef DBUG_OFF
49 /** Current group of REDOs is about this table and only this one */
50 static MARIA_HA *current_group_table;
51 #endif
52 static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
53 /** @brief to avoid writing a checkpoint if recovery did nothing. */
54 static my_bool checkpoint_useful;
55 static my_bool in_redo_phase;
56 static my_bool trns_created;
57 static int aria_undo_aborted= 0;
58 static ulong skipped_undo_phase;
59 static ulonglong now; /**< for tracking execution time of phases */
60 static void (*save_error_handler_hook)(uint, const char *,myf);
61 static ulong recovery_warnings; /**< count of warnings */
62 HASH tables_to_redo;                          /* For maria_read_log */
63 ulong maria_recovery_force_crash_counter;
64 TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
65 
66 #define prototype_redo_exec_hook(R)                                          \
67   static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
68 
69 #define prototype_redo_exec_hook_dummy(R)                                    \
70   static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec        \
71                                __attribute__ ((unused)))
72 
73 #define prototype_undo_exec_hook(R)                                          \
74   static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
75 
76 prototype_redo_exec_hook(LONG_TRANSACTION_ID);
77 prototype_redo_exec_hook_dummy(CHECKPOINT);
78 prototype_redo_exec_hook(REDO_CREATE_TABLE);
79 prototype_redo_exec_hook(REDO_RENAME_TABLE);
80 prototype_redo_exec_hook(REDO_REPAIR_TABLE);
81 prototype_redo_exec_hook(REDO_DROP_TABLE);
82 prototype_redo_exec_hook(FILE_ID);
83 prototype_redo_exec_hook(INCOMPLETE_LOG);
84 prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
85 prototype_redo_exec_hook(UNDO_BULK_INSERT);
86 prototype_redo_exec_hook(IMPORTED_TABLE);
87 prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
88 prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
89 prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
90 prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
91 prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
92 prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
93 prototype_redo_exec_hook(REDO_FREE_BLOCKS);
94 prototype_redo_exec_hook(REDO_DELETE_ALL);
95 prototype_redo_exec_hook(REDO_INDEX);
96 prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE);
97 prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE);
98 prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
99 prototype_redo_exec_hook(UNDO_ROW_INSERT);
100 prototype_redo_exec_hook(UNDO_ROW_DELETE);
101 prototype_redo_exec_hook(UNDO_ROW_UPDATE);
102 prototype_redo_exec_hook(UNDO_KEY_INSERT);
103 prototype_redo_exec_hook(UNDO_KEY_DELETE);
104 prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
105 prototype_redo_exec_hook(COMMIT);
106 prototype_redo_exec_hook(CLR_END);
107 prototype_redo_exec_hook(DEBUG_INFO);
108 prototype_undo_exec_hook(UNDO_ROW_INSERT);
109 prototype_undo_exec_hook(UNDO_ROW_DELETE);
110 prototype_undo_exec_hook(UNDO_ROW_UPDATE);
111 prototype_undo_exec_hook(UNDO_KEY_INSERT);
112 prototype_undo_exec_hook(UNDO_KEY_DELETE);
113 prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
114 prototype_undo_exec_hook(UNDO_BULK_INSERT);
115 
116 static int run_redo_phase(LSN lsn, LSN end_lsn,
117                           enum maria_apply_log_way apply);
118 static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
119 static int run_undo_phase(LSN end_undo_lsn, uint uncommitted);
120 static void display_record_position(const LOG_DESC *log_desc,
121                                     const TRANSLOG_HEADER_BUFFER *rec,
122                                     uint number);
123 static int display_and_apply_record(const LOG_DESC *log_desc,
124                                     const TRANSLOG_HEADER_BUFFER *rec);
125 static MARIA_HA *get_MARIA_HA_from_REDO_record(const
126                                                TRANSLOG_HEADER_BUFFER *rec);
127 static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
128                                                TRANSLOG_HEADER_BUFFER *rec);
129 static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon);
130 static LSN parse_checkpoint_record(LSN lsn);
131 static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
132                             LSN first_undo_lsn);
133 static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id);
134 static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
135                     struct st_dirty_page *dirty_page);
136 static int close_all_tables(void);
137 static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
138 static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
139 static void delete_all_transactions();
140 
141 /** @brief global [out] buffer for translog_read_record(); never shrinks */
142 static struct
143 {
144   /*
145     uchar* is more adapted (less casts) than char*, thus we don't use
146     LEX_STRING.
147   */
148   uchar *str;
149   size_t length;
150 } log_record_buffer;
enlarge_buffer(const TRANSLOG_HEADER_BUFFER * rec)151 static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec)
152 {
153   if (log_record_buffer.length < rec->record_length)
154   {
155     log_record_buffer.length= rec->record_length;
156     log_record_buffer.str= my_realloc(PSI_INSTRUMENT_ME, log_record_buffer.str,
157                                       rec->record_length,
158                                       MYF(MY_WME | MY_ALLOW_ZERO_PTR));
159   }
160 }
161 /** @brief Tells what kind of progress message was printed to the error log */
162 static enum recovery_message_type
163 {
164   REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH
165 } recovery_message_printed;
166 
167 
168 /* Hook to ensure we get nicer output if we get an error */
169 
maria_recover_error_handler_hook(uint error,const char * str,myf flags)170 void maria_recover_error_handler_hook(uint error, const char *str,
171                                      myf flags)
172 {
173   if (procent_printed)
174   {
175     procent_printed= 0;
176     fputc('\n', stderr);
177     fflush(stderr);
178   }
179   (*save_error_handler_hook)(error, str, flags);
180 }
181 
182 /* Define this if you want gdb to break in some interesting situations */
183 #define ALERT_USER()
184 
print_preamble()185 static void print_preamble()
186 {
187   ma_message_no_user(ME_NOTE, "starting recovery");
188 }
189 
190 
table_is_part_of_recovery_set(LEX_STRING * file_name)191 static my_bool table_is_part_of_recovery_set(LEX_STRING *file_name)
192 {
193   uint offset =0;
194   if (!tables_to_redo.records)
195     return 1;                                   /* Default, recover table */
196 
197   /* Skip base directory */
198   if (file_name->str[0] == '.' &&
199       (file_name->str[1] == '/' || file_name->str[1] == '\\'))
200     offset= 2;
201   /* Only recover if table is in hash */
202   return my_hash_search(&tables_to_redo, (uchar*) file_name->str + offset,
203                         file_name->length - offset) != 0;
204 }
205 
206 /**
207    @brief Recovers from the last checkpoint.
208 
209    Runs the REDO phase using special structures, then sets up the playground
210    of runtime: recreates transactions inside trnman, open tables with their
211    two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
212    tables.
213 
214    @return Operation status
215      @retval 0      OK
216      @retval !=0    Error
217 */
218 
maria_recovery_from_log(void)219 int maria_recovery_from_log(void)
220 {
221   int res= 1;
222   FILE *trace_file;
223   uint warnings_count;
224 #ifdef EXTRA_DEBUG
225   char name_buff[FN_REFLEN];
226 #endif
227   DBUG_ENTER("maria_recovery_from_log");
228 
229   DBUG_ASSERT(!maria_in_recovery);
230   maria_in_recovery= TRUE;
231 
232 #ifdef EXTRA_DEBUG
233   fn_format(name_buff, "aria_recovery.trace", maria_data_root, "", MYF(0));
234   trace_file= my_fopen(name_buff, O_WRONLY|O_APPEND|O_CREAT, MYF(MY_WME));
235 #else
236   trace_file= NULL; /* no trace file for being fast */
237 #endif
238   tprint(trace_file, "TRACE of the last Aria recovery from mysqld\n");
239   DBUG_ASSERT(maria_pagecache->inited);
240   res= maria_apply_log(LSN_IMPOSSIBLE, LSN_IMPOSSIBLE, 0, MARIA_LOG_APPLY,
241                        trace_file, TRUE, TRUE, &warnings_count);
242   if (!res)
243   {
244     if (warnings_count == 0 && recovery_found_crashed_tables == 0)
245       tprint(trace_file, "SUCCESS\n");
246     else
247       tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n",
248              warnings_count);
249   }
250   if (trace_file)
251     my_fclose(trace_file, MYF(0));
252   maria_in_recovery= FALSE;
253   DBUG_RETURN(res);
254 }
255 
256 
257 /**
258    @brief Displays and/or applies the log
259 
260    @param  from_lsn        LSN from which log reading/applying should start;
261                            LSN_IMPOSSIBLE means "use last checkpoint"
262    @param  end_redo_lsn    Apply until this. LSN_IMPOSSIBLE means until end.
263    @param  end_und_lsn     Apply all undo >= end_undo_lsn. Set to LSN_MAX if
264                            no undo's should be applied.
265    @param  apply           how log records should be applied or not
266    @param  trace_file      trace file where progress/debug messages will go
267    @param  skip_DDLs_arg   Should DDL records (CREATE/RENAME/DROP/REPAIR)
268                            be skipped by the REDO phase or not
269    @param  take_checkpoints Should we take checkpoints or not.
270    @param[out] warnings_count Count of warnings will be put there
271 
272    @todo This trace_file thing is primitive; soon we will make it similar to
273    ma_check_print_warning() etc, and a successful recovery does not need to
274    create a trace file. But for debugging now it is useful.
275 
276    @return Operation status
277      @retval 0      OK
278      @retval !=0    Error
279 */
280 
maria_apply_log(LSN from_lsn,LSN end_redo_lsn,LSN end_undo_lsn,enum maria_apply_log_way apply,FILE * trace_file,my_bool skip_DDLs_arg,my_bool take_checkpoints,uint * warnings_count)281 int maria_apply_log(LSN from_lsn, LSN end_redo_lsn, LSN end_undo_lsn,
282                     enum maria_apply_log_way apply,
283                     FILE *trace_file,
284                     my_bool skip_DDLs_arg,
285                     my_bool take_checkpoints, uint *warnings_count)
286 {
287   int error= 0;
288   uint uncommitted_trans;
289   ulonglong old_now;
290   my_bool abort_message_printed= 0;
291   DBUG_ENTER("maria_apply_log");
292 
293   DBUG_ASSERT(apply == MARIA_LOG_APPLY || end_undo_lsn == LSN_MAX);
294   DBUG_ASSERT(!maria_multi_threaded);
295   recovery_warnings= recovery_found_crashed_tables= 0;
296   skipped_lsn_err_count= 0;
297   maria_recovery_changed_data= 0;
298   /* checkpoints can happen only if TRNs have been built */
299   DBUG_ASSERT(end_undo_lsn != LSN_MAX || !take_checkpoints);
300   all_active_trans= (struct st_trn_for_recovery *)
301     my_malloc(PSI_INSTRUMENT_ME, (SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
302               MYF(MY_ZEROFILL));
303   all_tables= (struct st_table_for_recovery *)
304     my_malloc(PSI_INSTRUMENT_ME, (SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
305               MYF(MY_ZEROFILL));
306 
307   save_error_handler_hook= error_handler_hook;
308   error_handler_hook= maria_recover_error_handler_hook;
309 
310   if (!all_active_trans || !all_tables)
311     goto err;
312 
313   if (take_checkpoints && ma_checkpoint_init(0))
314     goto err;
315 
316   recovery_message_printed= REC_MSG_NONE;
317   checkpoint_useful= trns_created= FALSE;
318   aria_undo_aborted= 0;
319   tracef= trace_file;
320 #ifdef INSTANT_FLUSH_OF_MESSAGES
321   /* enable this for instant flush of messages to trace file */
322   setbuf(tracef, NULL);
323 #endif
324   skip_DDLs= skip_DDLs_arg;
325   skipped_undo_phase= 0;
326 
327   trnman_init(max_trid_in_control_file);
328 
329   if (from_lsn == LSN_IMPOSSIBLE)
330   {
331     if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
332     {
333       from_lsn= translog_first_lsn_in_log();
334       if (unlikely(from_lsn == LSN_ERROR))
335       {
336         trnman_destroy();
337         goto err;
338       }
339     }
340     else
341     {
342       from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
343       if (from_lsn == LSN_ERROR)
344       {
345         trnman_destroy();
346         goto err;
347       }
348     }
349   }
350 
351   now= microsecond_interval_timer();
352   in_redo_phase= TRUE;
353   if (run_redo_phase(from_lsn, end_redo_lsn, apply))
354   {
355     ma_message_no_user(0, "Redo phase failed");
356     trnman_destroy();
357     goto err;
358   }
359   trnman_destroy();
360 
361   if (end_redo_lsn != LSN_IMPOSSIBLE &&
362       (end_undo_lsn == LSN_MAX || end_undo_lsn == LSN_IMPOSSIBLE))
363   {
364     abort_message_printed= 1;
365     if (!trace_file)
366       fputc('\n', stderr);
367     my_message(HA_ERR_INITIALIZATION,
368                "Maria recovery aborted as end_lsn/end of file was reached",
369                MYF(0));
370     goto err2;
371   }
372 
373   if ((uncommitted_trans=
374        end_of_redo_phase(end_undo_lsn != LSN_MAX)) == (uint)-1)
375   {
376     ma_message_no_user(0, "End of redo phase failed");
377     goto err;
378   }
379   in_redo_phase= FALSE;
380 
381   old_now= now;
382   now= microsecond_interval_timer();
383   if (recovery_message_printed == REC_MSG_REDO)
384   {
385     double phase_took= (now - old_now)/1000000.0;
386     /*
387       Detailed progress info goes to stderr, because ma_message_no_user()
388       cannot put several messages on one line.
389     */
390     procent_printed= 1;
391     fprintf(stderr, " (%.1f seconds); ", phase_took);
392     fflush(stderr);
393   }
394 
395   /**
396      REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be
397      wrong: if a future recovery used it, the REDO phase would always
398      start from the checkpoint and never from before, wrongly skipping REDOs
399      (tested). Another problem is that the REDO phase uses
400      PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE.
401 
402      @todo fix this. pagecache_write() now can have a rec_lsn argument. And we
403      could make a function which goes through pages at end of REDO phase and
404      changes their type.
405   */
406 #ifdef FIX_AND_ENABLE_LATER
407   if (take_checkpoints && checkpoint_useful)
408   {
409     /*
410       We take a checkpoint as it can save future recovery work if we crash
411       during the UNDO phase. But we don't flush pages, as UNDOs will change
412       them again probably.
413       If we wanted to take checkpoints in the middle of the REDO phase, at a
414       moment when we haven't reached the end of log so don't have exact data
415       about transactions, we could write a special checkpoint: containing only
416       the list of dirty pages, otherwise to be treated as if it was at the
417       same LSN as the last checkpoint.
418     */
419     if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE))
420       goto err;
421   }
422 #endif
423 
424   if (end_undo_lsn != LSN_MAX)
425   {
426     if (run_undo_phase(end_undo_lsn, uncommitted_trans))
427     {
428       ma_message_no_user(0, "Undo phase failed");
429       goto err;
430     }
431     if (aria_undo_aborted)
432       ma_message_no_user(0, "Undo phase aborted in the middle on user request");
433     else if (end_redo_lsn != LSN_IMPOSSIBLE)
434       my_message(HA_ERR_INITIALIZATION,
435                  "Maria recovery aborted as end_lsn followed by end_undo was "
436                  "reached", MYF(0));
437   }
438   else if (uncommitted_trans > 0)
439   {
440     eprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
441            " be left inconsistent!***", uncommitted_trans);
442     recovery_warnings++;
443   }
444 
445   if (skipped_undo_phase)
446   {
447     /*
448       We could want to print a list of tables for which UNDOs were skipped,
449       but not one line per skipped UNDO.
450     */
451     eprint(tracef, "***WARNING: %lu UNDO records skipped in UNDO phase; some"
452            " tables may be left inconsistent!***", skipped_undo_phase);
453     recovery_warnings++;
454   }
455 
456   old_now= now;
457   now= microsecond_interval_timer();
458   if (recovery_message_printed == REC_MSG_UNDO)
459   {
460     double phase_took= (now - old_now)/1000000.0;
461     procent_printed= 1;
462     fprintf(stderr, " (%.1f seconds); ", phase_took);
463     fflush(stderr);
464   }
465 
466   /*
467     we don't use maria_panic() because it would maria_end(), and Recovery does
468     not want that (we want to keep some modules initialized for runtime).
469   */
470   if (close_all_tables())
471   {
472     ma_message_no_user(0, "closing of tables failed");
473     goto err;
474   }
475 
476   old_now= now;
477   now= microsecond_interval_timer();
478   if (recovery_message_printed == REC_MSG_FLUSH)
479   {
480     double phase_took= (now - old_now)/1000000.0;
481     procent_printed= 1;
482     fprintf(stderr, " (%.1f seconds); ", phase_took);
483     fflush(stderr);
484   }
485 
486   if (max_long_trid > max_trid_in_control_file)
487   {
488     if (ma_control_file_write_and_force(last_checkpoint_lsn, last_logno,
489                                         max_long_trid, recovery_failures))
490       goto err;
491   }
492 
493   if (take_checkpoints && checkpoint_useful)
494   {
495     /* No dirty pages, all tables are closed, no active transactions, save: */
496     if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE))
497       goto err;
498   }
499 
500   goto end;
501 err:
502   tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n");
503 err2:
504   if (trns_created)
505     delete_all_transactions();
506   if (!abort_message_printed)
507     error= 1;
508   if (close_all_tables())
509   {
510     ma_message_no_user(0, "closing of tables failed");
511   }
512 end:
513   error_handler_hook= save_error_handler_hook;
514   my_hash_free(&all_dirty_pages);
515   bzero(&all_dirty_pages, sizeof(all_dirty_pages));
516   my_free(dirty_pages_pool);
517   dirty_pages_pool= NULL;
518   my_free(all_tables);
519   all_tables= NULL;
520   my_free(all_active_trans);
521   all_active_trans= NULL;
522   my_free(log_record_buffer.str);
523   log_record_buffer.str= NULL;
524   log_record_buffer.length= 0;
525   ma_checkpoint_end();
526   *warnings_count= recovery_warnings + recovery_found_crashed_tables;
527   if (recovery_message_printed != REC_MSG_NONE)
528   {
529     if (procent_printed)
530     {
531       procent_printed= 0;
532       fprintf(stderr, "\n");
533       fflush(stderr);
534     }
535     if (!error && !abort_message_printed)
536     {
537       ma_message_no_user(ME_NOTE, "recovery done");
538       maria_recovery_changed_data= 1;
539     }
540   }
541   else if (!error && max_trid_in_control_file != max_long_trid)
542   {
543     /*
544       maria_end() will set max trid in log file so that one can run
545       maria_chk on the tables
546     */
547     maria_recovery_changed_data= 1;
548   }
549 
550   if (error && !abort_message_printed)
551   {
552     my_message(HA_ERR_INITIALIZATION,
553                "Aria recovery failed. Please run aria_chk -r on all Aria "
554                "tables (*.MAI) and delete all aria_log.######## files", MYF(0));
555   }
556   procent_printed= 0;
557   /*
558     We don't cleanly close tables if we hit some error (may corrupt them by
559     flushing some wrong blocks made from wrong REDOs). It also leaves their
560     open_count>0, which ensures that --aria-recover, if used, will try to
561     repair them.
562   */
563   DBUG_RETURN(error);
564 }
565 
566 
567 /* very basic info about the record's header */
display_record_position(const LOG_DESC * log_desc,const TRANSLOG_HEADER_BUFFER * rec,uint number)568 static void display_record_position(const LOG_DESC *log_desc,
569                                     const TRANSLOG_HEADER_BUFFER *rec,
570                                     uint number)
571 {
572   /*
573     if number==0, we're going over records which we had already seen and which
574     form a group, so we indent below the group's end record
575   */
576   tprint(tracef,
577          "%sRec#%u LSN " LSN_FMT " short_trid %u %s(num_type:%u) len %lu\n",
578          number ? "" : "   ", number, LSN_IN_PARTS(rec->lsn),
579          rec->short_trid, log_desc->name, rec->type,
580          (ulong)rec->record_length);
581   if (rec->type == LOGREC_DEBUG_INFO)
582   {
583     /* Print some extra information */
584     (*log_desc->record_execute_in_redo_phase)(rec);
585   }
586 }
587 
588 
display_and_apply_record(const LOG_DESC * log_desc,const TRANSLOG_HEADER_BUFFER * rec)589 static int display_and_apply_record(const LOG_DESC *log_desc,
590                                     const TRANSLOG_HEADER_BUFFER *rec)
591 {
592   int error;
593   if (log_desc->record_execute_in_redo_phase == NULL)
594   {
595     /* die on all not-yet-handled records :) */
596     DBUG_ASSERT("one more hook to write" == 0);
597     return 1;
598   }
599   if (rec->type == LOGREC_DEBUG_INFO)
600   {
601     /* Query already printed by display_record_position() */
602     return 0;
603   }
604   if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
605     eprint(tracef, "Got error %d when executing record %s",
606            my_errno, log_desc->name);
607   return error;
608 }
609 
610 
prototype_redo_exec_hook(LONG_TRANSACTION_ID)611 prototype_redo_exec_hook(LONG_TRANSACTION_ID)
612 {
613   uint16 sid= rec->short_trid;
614   TrID long_trid= all_active_trans[sid].long_trid;
615   /*
616     Any incomplete group should be of an old crash which already had a
617     recovery and thus has logged INCOMPLETE_GROUP which we must have seen.
618   */
619   DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE);
620   if (long_trid != 0)
621   {
622     LSN ulsn= all_active_trans[sid].undo_lsn;
623     /*
624       If the first record of that transaction is after 'rec', it's probably
625       because that transaction was found in the checkpoint record, and then
626       it's ok, we can forget about that transaction (we'll meet it later
627       again in the REDO phase) and replace it with the one in 'rec'.
628     */
629     if ((ulsn != LSN_IMPOSSIBLE) &&
630         (cmp_translog_addr(ulsn, rec->lsn) < 0))
631     {
632       char llbuf[22];
633       llstr(long_trid, llbuf);
634       eprint(tracef, "Found an old transaction long_trid %s short_trid %u"
635              " with same short id as this new transaction, and has neither"
636              " committed nor rollback (undo_lsn: " LSN_FMT ")",
637              llbuf, sid, LSN_IN_PARTS(ulsn));
638       goto err;
639     }
640   }
641   long_trid= uint6korr(rec->header);
642   new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
643   goto end;
644 err:
645   ALERT_USER();
646   return 1;
647 end:
648   return 0;
649 }
650 
651 
new_transaction(uint16 sid,TrID long_id,LSN undo_lsn,LSN first_undo_lsn)652 static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
653                             LSN first_undo_lsn)
654 {
655   char llbuf[22];
656   all_active_trans[sid].long_trid= long_id;
657   llstr(long_id, llbuf);
658   tprint(tracef, "Transaction long_trid %s short_trid %u starts,"
659          " undo_lsn " LSN_FMT " first_undo_lsn " LSN_FMT "\n",
660          llbuf, sid, LSN_IN_PARTS(undo_lsn), LSN_IN_PARTS(first_undo_lsn));
661   all_active_trans[sid].undo_lsn= undo_lsn;
662   all_active_trans[sid].first_undo_lsn= first_undo_lsn;
663   set_if_bigger(max_long_trid, long_id);
664 }
665 
666 
prototype_redo_exec_hook_dummy(CHECKPOINT)667 prototype_redo_exec_hook_dummy(CHECKPOINT)
668 {
669   /* the only checkpoint we care about was found via control file, ignore */
670   tprint(tracef, "CHECKPOINT found\n");
671   return 0;
672 }
673 
674 
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)675 prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
676 {
677   /* abortion was already made */
678   return 0;
679 }
680 
681 
prototype_redo_exec_hook(INCOMPLETE_LOG)682 prototype_redo_exec_hook(INCOMPLETE_LOG)
683 {
684   MARIA_HA *info;
685 
686   /* We try to get table first, so that we get the table in in the trace log */
687   info= get_MARIA_HA_from_REDO_record(rec);
688 
689   if (skip_DDLs)
690   {
691     tprint(tracef, "we skip DDLs\n");
692     return 0;
693   }
694 
695   if (!info)
696   {
697     /* no such table, don't need to warn */
698     return 0;
699   }
700 
701   if (maria_is_crashed(info))
702     return 0;
703 
704   if (info->s->state.is_of_horizon > rec->lsn)
705   {
706     /*
707       This table was repaired at a time after this log entry.
708       We can assume that all rows was inserted sucessfully and we don't
709       have to warn about that the inserted data was not logged
710     */
711     return 0;
712   }
713 
714   /*
715     Example of what can go wrong when replaying DDLs:
716     CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged);
717     ALTER TABLE t ... which does
718     CREATE a temporary table #sql... (logged)
719     INSERT data from t into #sql... (not logged)
720     RENAME #sql TO t (logged)
721     Removing tables by hand and replaying the log will leave in the
722     end an empty table "t": missing records. If after the RENAME an INSERT
723     into t was done, that row had number 1 in its page, executing the
724     REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion
725     failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is
726     created whereas rownr is not 0).
727     So when the server disables logging for ALTER TABLE or CREATE SELECT, it
728     logs LOGREC_INCOMPLETE_LOG to warn aria_read_log and then the user.
729 
730     Another issue is that replaying of DDLs is not correct enough to work if
731     there was a crash during a DDL (see comment in execution of
732     REDO_RENAME_TABLE ).
733   */
734 
735   eprint(tracef, "***WARNING: Aria engine currently logs no records "
736           "about insertion of data by ALTER TABLE and CREATE SELECT, "
737           "as they are not necessary for recovery; "
738           "present applying of log records to table '%s' may well not work."
739           "***", info->s->index_file_name.str);
740 
741   /* Prevent using the table for anything else than undo repair */
742   _ma_mark_file_crashed(info->s);
743   recovery_warnings++;
744   return 0;
745 }
746 
747 
create_database_if_not_exists(const char * name)748 static my_bool create_database_if_not_exists(const char *name)
749 {
750   char dirname[FN_REFLEN];
751   size_t length;
752   MY_STAT stat_info;
753   DBUG_ENTER("create_database_if_not_exists");
754 
755   dirname_part(dirname, name, &length);
756   if (!length)
757   {
758     /* Skip files without directores */
759     DBUG_RETURN(0);
760   }
761   /*
762     Safety;  Don't create files with hard path;
763     Should never happen with MariaDB
764     If hard path, then error will be detected when trying to create index file
765   */
766   if (test_if_hard_path(dirname))
767     DBUG_RETURN(0);
768 
769   if (my_stat(dirname,&stat_info,MYF(0)))
770     DBUG_RETURN(0);
771 
772 
773   tprint(tracef, "Creating not existing database '%s'\n", dirname);
774   if (my_mkdir(dirname, 0777, MYF(MY_WME)))
775   {
776     eprint(tracef, "***WARNING: Can't create not existing database '%s'",
777            dirname);
778     DBUG_RETURN(1);
779   }
780   DBUG_RETURN(0);
781 }
782 
783 
784 
785 
786 
prototype_redo_exec_hook(REDO_CREATE_TABLE)787 prototype_redo_exec_hook(REDO_CREATE_TABLE)
788 {
789   File dfile= -1, kfile= -1;
790   char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *ptr2,
791     *data_file_name, *index_file_name;
792   uchar *kfile_header;
793   myf create_flag;
794   uint flags;
795   int error= 1, create_mode= O_RDWR | O_TRUNC, i;
796   MARIA_HA *info= NULL;
797   uint kfile_size_before_extension, keystart;
798   DBUG_ENTER("exec_REDO_LOGREC_REDO_CREATE_TABLE");
799 
800   if (skip_DDLs)
801   {
802     tprint(tracef, "we skip DDLs\n");
803     DBUG_RETURN(0);
804   }
805   enlarge_buffer(rec);
806   if (log_record_buffer.str == NULL ||
807       translog_read_record(rec->lsn, 0, rec->record_length,
808                            log_record_buffer.str, NULL) !=
809       rec->record_length)
810   {
811     eprint(tracef, "Failed to read record");
812     goto end;
813   }
814   name= (char *)log_record_buffer.str;
815   /*
816     TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can
817     find a REDO_CREATE_TABLE for a table which we have open, that's why we
818     need to look for any open instances and close them first.
819   */
820   if (close_one_table(name, rec->lsn))
821   {
822     eprint(tracef, "Table '%s' got error %d on close", name, my_errno);
823     ALERT_USER();
824     goto end;
825   }
826   /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
827   info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0);
828   if (info)
829   {
830     MARIA_SHARE *share= info->s;
831     /* check that we're not already using it */
832     if (share->reopen != 1)
833     {
834       eprint(tracef, "Table '%s is already open (reopen=%u)",
835              name, share->reopen);
836       ALERT_USER();
837       goto end;
838     }
839     DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
840     if (!share->base.born_transactional)
841     {
842       /*
843         could be that transactional table was later dropped, and a non-trans
844         one was renamed to its name, thus create_rename_lsn is 0 and should
845         not be trusted.
846       */
847       tprint(tracef, "Table '%s' is not transactional, ignoring creation\n",
848              name);
849       ALERT_USER();
850       error= 0;
851       goto end;
852     }
853     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
854     {
855       tprint(tracef, "Table '%s' has create_rename_lsn " LSN_FMT " more "
856              "recent than record, ignoring creation\n",
857              name, LSN_IN_PARTS(share->state.create_rename_lsn));
858       error= 0;
859       goto end;
860     }
861     if (maria_is_crashed(info))
862     {
863       eprint(tracef, "Table '%s' is crashed, can't recreate it", name);
864       ALERT_USER();
865       goto end;
866     }
867     maria_close(info);
868     info= NULL;
869   }
870   else
871   {
872     /* one or two files absent, or header corrupted... */
873     tprint(tracef, "Table '%s' can't be opened (Error: %d)\n",
874            name, my_errno);
875   }
876   /* if does not exist, or is older, overwrite it */
877   ptr= name + strlen(name) + 1;
878   if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
879     tprint(tracef, ", we will only touch index file");
880   ptr++;
881   kfile_size_before_extension= uint2korr(ptr);
882   ptr+= 2;
883   keystart= uint2korr(ptr);
884   ptr+= 2;
885   kfile_header= (uchar *)ptr;
886   ptr+= kfile_size_before_extension;
887   /* set header lsns */
888   ptr2= (char *) kfile_header + sizeof(info->s->state.header) +
889     MARIA_FILE_CREATE_RENAME_LSN_OFFSET;
890   for (i= 0; i<3; i++)
891   {
892     lsn_store(ptr2, rec->lsn);
893     ptr2+= LSN_STORE_SIZE;
894   }
895   data_file_name= ptr;
896   ptr+= strlen(data_file_name) + 1;
897   index_file_name= ptr;
898   ptr+= strlen(index_file_name) + 1;
899   /** @todo handle symlinks */
900   if (data_file_name[0] || index_file_name[0])
901   {
902     eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled",
903            name);
904     goto end;
905   }
906   if (create_database_if_not_exists(name))
907     goto end;
908   fn_format(filename, name, "", MARIA_NAME_IEXT,
909             MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH | MY_APPEND_EXT);
910   linkname_ptr= NULL;
911   create_flag= MY_DELETE_OLD;
912   tprint(tracef, "Table '%s' creating as '%s'\n", name, filename);
913   if ((kfile= mysql_file_create_with_symlink(key_file_kfile, linkname_ptr,
914                                              filename, 0, create_mode,
915                                              MYF(MY_WME|create_flag))) < 0)
916   {
917     eprint(tracef, "Failed to create index file");
918     goto end;
919   }
920   if (my_pwrite(kfile, kfile_header,
921                 kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
922       mysql_file_chsize(kfile, keystart, 0, MYF(MY_WME)))
923   {
924     eprint(tracef, "Failed to write to index file");
925     goto end;
926   }
927   if (!(flags & HA_DONT_TOUCH_DATA))
928   {
929     fn_format(filename,name,"", MARIA_NAME_DEXT,
930               MY_UNPACK_FILENAME | MY_APPEND_EXT);
931     linkname_ptr= NULL;
932     create_flag=MY_DELETE_OLD;
933     if (((dfile=
934           mysql_file_create_with_symlink(key_file_dfile, linkname_ptr,
935                                          filename, 0, create_mode,
936                                          MYF(MY_WME | create_flag))) < 0) ||
937         mysql_file_close(dfile, MYF(MY_WME)))
938     {
939       eprint(tracef, "Failed to create data file");
940       goto end;
941     }
942     /*
943       we now have an empty data file. To be able to
944       _ma_initialize_data_file() we need some pieces of the share to be
945       correctly filled. So we just open the table (fortunately, an empty
946       data file does not preclude this).
947     */
948     if (((info= maria_open(name, O_RDONLY, 0, 0)) == NULL) ||
949         _ma_initialize_data_file(info->s, info->dfile.file))
950     {
951       eprint(tracef, "Failed to open new table or write to data file");
952       goto end;
953     }
954   }
955   error= 0;
956 end:
957   if (kfile >= 0)
958     error|= mysql_file_close(kfile, MYF(MY_WME));
959   if (info != NULL)
960     error|= maria_close(info);
961   DBUG_RETURN(error);
962 }
963 
964 
prototype_redo_exec_hook(REDO_RENAME_TABLE)965 prototype_redo_exec_hook(REDO_RENAME_TABLE)
966 {
967   char *old_name, *new_name;
968   int error= 1;
969   MARIA_HA *info= NULL;
970   my_bool from_table_is_crashed= 0;
971   DBUG_ENTER("exec_REDO_LOGREC_REDO_RENAME_TABLE");
972 
973   if (skip_DDLs)
974   {
975     tprint(tracef, "we skip DDLs\n");
976     DBUG_RETURN(0);
977   }
978   enlarge_buffer(rec);
979   if (log_record_buffer.str == NULL ||
980       translog_read_record(rec->lsn, 0, rec->record_length,
981                            log_record_buffer.str, NULL) !=
982       rec->record_length)
983   {
984     eprint(tracef, "Failed to read record");
985     goto end;
986   }
987   old_name= (char *)log_record_buffer.str;
988   new_name= old_name + strlen(old_name) + 1;
989   tprint(tracef, "Table '%s' to be renamed to '%s'; old-name table ", old_name,
990          new_name);
991   /*
992     Here is why we skip CREATE/DROP/RENAME when doing a recovery from
993     ha_maria (whereas we do when called from aria_read_log). Consider:
994     CREATE TABLE t;
995     RENAME TABLE t to u;
996     DROP TABLE u;
997     RENAME TABLE v to u; # crash between index rename and data rename.
998     And do a Recovery (not removing tables beforehand).
999     Recovery replays CREATE, then RENAME: the maria_open("t") works,
1000     maria_open("u") does not (no data file) so table "u" is considered
1001     inexistent and so maria_rename() is done which overwrites u's index file,
1002     which is lost. Ok, the data file (v.MAD) is still available, but only a
1003     REPAIR USE_FRM can rebuild the index, which is unsafe and downtime.
1004     So it is preferrable to not execute RENAME, and leave the "mess" of files,
1005     rather than possibly destroy a file. DBA will manually rename files.
1006     A safe recovery method would probably require checking the existence of
1007     the index file and of the data file separately (not via maria_open()), and
1008     maybe also to store a create_rename_lsn in the data file too
1009     For now, all we risk is to leave the mess (half-renamed files) left by the
1010     crash. We however sync files and directories at each file rename. The SQL
1011     layer is anyway not crash-safe for DDLs (except the repartioning-related
1012     ones).
1013     We replay DDLs in aria_read_log to be able to recreate tables from
1014     scratch. It means that "aria_read_log -a" should not be used on a
1015     database which just crashed during a DDL. And also ALTER TABLE does not
1016     log insertions of records into the temporary table, so replaying may
1017     fail (grep for INCOMPLETE_LOG in files).
1018   */
1019   info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0);
1020   if (info)
1021   {
1022     MARIA_SHARE *share= info->s;
1023     if (!share->base.born_transactional)
1024     {
1025       tprint(tracef, "is not transactional, ignoring renaming");
1026       ALERT_USER();
1027       error= 0;
1028       goto end;
1029     }
1030     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1031     {
1032       tprint(tracef, "has create_rename_lsn " LSN_FMT " more recent than"
1033              " record, ignoring renaming",
1034              LSN_IN_PARTS(share->state.create_rename_lsn));
1035       error= 0;
1036       goto end;
1037     }
1038     if (maria_is_crashed(info))
1039     {
1040       tprint(tracef, "is crashed, can't be used for rename ; new-name table ");
1041       from_table_is_crashed= 1;
1042     }
1043     if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
1044         maria_close(info))
1045       goto end;
1046     info= NULL;
1047     if (!from_table_is_crashed)
1048       tprint(tracef, "is ok for renaming; new-name table ");
1049   }
1050   else /* one or two files absent, or header corrupted... */
1051   {
1052     tprint(tracef, ", can't be opened, probably does not exist");
1053     error= 0;
1054     goto end;
1055   }
1056   /*
1057     We must also check the create_rename_lsn of the 'new_name' table if it
1058     exists: otherwise we may, with our rename which overwrites, destroy
1059     another table. For example:
1060     CREATE TABLE t;
1061     RENAME t to u;
1062     DROP TABLE u;
1063     RENAME v to u; # v is an old table, its creation/insertions not in log
1064     And start executing the log (without removing tables beforehand): creates
1065     t, renames it to u (if not testing create_rename_lsn) thus overwriting
1066     old-named v, drops u, and we are stuck, we have lost data.
1067   */
1068   info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0);
1069   if (info)
1070   {
1071     MARIA_SHARE *share= info->s;
1072     /* We should not have open instances on this table. */
1073     if (share->reopen != 1)
1074     {
1075       tprint(tracef, "is already open (reopen=%u)", share->reopen);
1076       ALERT_USER();
1077       goto end;
1078     }
1079     if (!share->base.born_transactional)
1080     {
1081       tprint(tracef, "is not transactional, ignoring renaming");
1082       ALERT_USER();
1083       goto drop;
1084     }
1085     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1086     {
1087       tprint(tracef, "has create_rename_lsn " LSN_FMT " more recent than"
1088              " record, ignoring renaming",
1089              LSN_IN_PARTS(share->state.create_rename_lsn));
1090       /*
1091         We have to drop the old_name table. Consider:
1092         CREATE TABLE t;
1093         CREATE TABLE v;
1094         RENAME TABLE t to u;
1095         DROP TABLE u;
1096         RENAME TABLE v to u;
1097         and apply the log without removing tables beforehand. t will be
1098         created, v too; in REDO_RENAME u will be more recent, but we still
1099         have to drop t otherwise it stays.
1100       */
1101       goto drop;
1102     }
1103     if (maria_is_crashed(info))
1104     {
1105       tprint(tracef, "is crashed, can't rename it");
1106       ALERT_USER();
1107       goto end;
1108     }
1109     if (maria_close(info))
1110       goto end;
1111     info= NULL;
1112     /* abnormal situation */
1113     tprint(tracef, "exists but is older than record, can't rename it");
1114     goto end;
1115   }
1116   else /* one or two files absent, or header corrupted... */
1117     tprint(tracef, "can't be opened, probably does not exist");
1118 
1119   if (from_table_is_crashed)
1120   {
1121     eprint(tracef, "Aborting rename as old table was crashed");
1122     ALERT_USER();
1123     goto end;
1124   }
1125 
1126   tprint(tracef, ", renaming '%s'", old_name);
1127   if (maria_rename(old_name, new_name))
1128   {
1129     eprint(tracef, "Failed to rename table");
1130     goto end;
1131   }
1132   info= maria_open(new_name, O_RDONLY, 0, 0);
1133   if (info == NULL)
1134   {
1135     eprint(tracef, "Failed to open renamed table");
1136     goto end;
1137   }
1138   if (_ma_update_state_lsns(info->s, rec->lsn, info->s->state.create_trid,
1139                             TRUE, TRUE))
1140     goto end;
1141   if (maria_close(info))
1142     goto end;
1143   info= NULL;
1144   error= 0;
1145   goto end;
1146 drop:
1147   tprint(tracef, ", only dropping '%s'", old_name);
1148   if (maria_delete_table(old_name))
1149   {
1150     eprint(tracef, "Failed to drop table");
1151     goto end;
1152   }
1153   error= 0;
1154   goto end;
1155 end:
1156   tprint(tracef, "\n");
1157   if (info != NULL)
1158     error|= maria_close(info);
1159   DBUG_RETURN(error);
1160 }
1161 
1162 
1163 /*
1164   The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE.
1165 */
prototype_redo_exec_hook(REDO_REPAIR_TABLE)1166 prototype_redo_exec_hook(REDO_REPAIR_TABLE)
1167 {
1168   int error= 1;
1169   MARIA_HA *info;
1170   HA_CHECK param;
1171   char *name;
1172   my_bool quick_repair;
1173   DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE");
1174 
1175   /* We try to get table first, so that we get the table in in the trace log */
1176   info= get_MARIA_HA_from_REDO_record(rec);
1177 
1178   if (skip_DDLs)
1179   {
1180     /*
1181       REPAIR is not exactly a DDL, but it manipulates files without logging
1182       insertions into them.
1183     */
1184     tprint(tracef, "we skip DDLs\n");
1185     DBUG_RETURN(0);
1186   }
1187 
1188   if (!info)
1189   {
1190     /* no such table, don't need to warn */
1191     DBUG_RETURN(0);
1192   }
1193 
1194   if (maria_is_crashed(info))
1195   {
1196     tprint(tracef, "we skip repairing crashed table\n");
1197     DBUG_RETURN(0);
1198   }
1199   /*
1200     Otherwise, the mapping is newer than the table, and our record is newer
1201     than the mapping, so we can repair.
1202   */
1203   tprint(tracef, "   repairing...\n");
1204 
1205   maria_chk_init(&param);
1206   param.isam_file_name= name= info->s->open_file_name.str;
1207   param.testflag= uint8korr(rec->header + FILEID_STORE_SIZE);
1208   param.tmpdir= maria_tmpdir;
1209   param.max_trid= max_long_trid;
1210   DBUG_ASSERT(maria_tmpdir);
1211 
1212   info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 8);
1213   quick_repair= MY_TEST(param.testflag & T_QUICK);
1214 
1215   if (param.testflag & T_REP_PARALLEL)
1216   {
1217     if (maria_repair_parallel(&param, info, name, quick_repair))
1218       goto end;
1219   }
1220   else if (param.testflag & T_REP_BY_SORT)
1221   {
1222     if (maria_repair_by_sort(&param, info, name, quick_repair))
1223       goto end;
1224   }
1225   else if (maria_repair(&param, info, name, quick_repair))
1226     goto end;
1227 
1228   if (_ma_update_state_lsns(info->s, rec->lsn, trnman_get_min_safe_trid(),
1229                             TRUE, !(param.testflag & T_NO_CREATE_RENAME_LSN)))
1230     goto end;
1231   error= 0;
1232 
1233 end:
1234   DBUG_RETURN(error);
1235 }
1236 
1237 
prototype_redo_exec_hook(REDO_DROP_TABLE)1238 prototype_redo_exec_hook(REDO_DROP_TABLE)
1239 {
1240   char *name;
1241   int error= 1;
1242   MARIA_HA *info;
1243   if (skip_DDLs)
1244   {
1245     tprint(tracef, "we skip DDLs\n");
1246     return 0;
1247   }
1248   enlarge_buffer(rec);
1249   if (log_record_buffer.str == NULL ||
1250       translog_read_record(rec->lsn, 0, rec->record_length,
1251                            log_record_buffer.str, NULL) !=
1252       rec->record_length)
1253   {
1254     eprint(tracef, "Failed to read record");
1255     return 1;
1256   }
1257   name= (char *)log_record_buffer.str;
1258   tprint(tracef, "Table '%s'", name);
1259   info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR, 0);
1260   if (info)
1261   {
1262     MARIA_SHARE *share= info->s;
1263     if (!share->base.born_transactional)
1264     {
1265       tprint(tracef, ", is not transactional, ignoring removal\n");
1266       ALERT_USER();
1267       error= 0;
1268       goto end;
1269     }
1270     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1271     {
1272       tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1273              " record, ignoring removal",
1274              LSN_IN_PARTS(share->state.create_rename_lsn));
1275       error= 0;
1276       goto end;
1277     }
1278     if (maria_is_crashed(info))
1279     {
1280       tprint(tracef, ", is crashed, can't drop it");
1281       ALERT_USER();
1282       goto end;
1283     }
1284     if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
1285         maria_close(info))
1286       goto end;
1287     info= NULL;
1288     /* if it is older, or its header is corrupted, drop it */
1289     tprint(tracef, ", dropping '%s'", name);
1290     if (maria_delete_table(name))
1291     {
1292       eprint(tracef, "Failed to drop table");
1293       goto end;
1294     }
1295   }
1296   else /* one or two files absent, or header corrupted... */
1297     tprint(tracef,", can't be opened, probably does not exist");
1298   error= 0;
1299 end:
1300   tprint(tracef, "\n");
1301   if (info != NULL)
1302     error|= maria_close(info);
1303   return error;
1304 }
1305 
1306 
prototype_redo_exec_hook(FILE_ID)1307 prototype_redo_exec_hook(FILE_ID)
1308 {
1309   uint16 sid;
1310   int error= 1;
1311   const char *name;
1312   MARIA_HA *info;
1313   DBUG_ENTER("exec_REDO_LOGREC_FILE_ID");
1314 
1315   if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
1316   {
1317     /*
1318       If that mapping was still true at checkpoint time, it was found in
1319       checkpoint record, no need to recreate it. If that mapping had ended at
1320       checkpoint time (table was closed or repaired), a flush and force
1321       happened and so mapping is not needed.
1322     */
1323     tprint(tracef, "ignoring because before checkpoint\n");
1324     DBUG_RETURN(0);
1325   }
1326 
1327   enlarge_buffer(rec);
1328   if (log_record_buffer.str == NULL ||
1329       translog_read_record(rec->lsn, 0, rec->record_length,
1330                            log_record_buffer.str, NULL) !=
1331        rec->record_length)
1332   {
1333     eprint(tracef, "Failed to read record");
1334     goto end;
1335   }
1336   sid= fileid_korr(log_record_buffer.str);
1337   info= all_tables[sid].info;
1338   if (info != NULL)
1339   {
1340     tprint(tracef, "   Closing table '%s'\n", info->s->open_file_name.str);
1341     prepare_table_for_close(info, rec->lsn);
1342 
1343     /*
1344       Ensure that open count is 1 on close.  This is needed as the
1345       table may initially had an open_count > 0 when we initially
1346       opened it as the server may have crashed without closing it
1347       properly.  As we now have applied all redo's for the table up to
1348       now, we know the table is ok, so it's safe to reset the open
1349       count to 0.
1350     */
1351     if (info->s->state.open_count != 0 && info->s->reopen == 1)
1352     {
1353       /* let ma_close() mark the table properly closed */
1354       info->s->state.open_count= 1;
1355       info->s->global_changed= 1;
1356       info->s->changed= 1;
1357     }
1358     if (maria_close(info))
1359     {
1360       eprint(tracef, "Failed to close table");
1361       goto end;
1362     }
1363     all_tables[sid].info= NULL;
1364   }
1365   name= (char *)log_record_buffer.str + FILEID_STORE_SIZE;
1366   if (new_table(sid, name, rec->lsn))
1367     goto end;
1368   error= 0;
1369 end:
1370   DBUG_RETURN(error);
1371 }
1372 
1373 
new_table(uint16 sid,const char * name,LSN lsn_of_file_id)1374 static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
1375 {
1376   /*
1377     -1 (skip table): close table and return 0;
1378     1 (error): close table and return 1;
1379     0 (success): leave table open and return 0.
1380   */
1381   int error= 1;
1382   MARIA_HA *info;
1383   MARIA_SHARE *share;
1384   my_off_t dfile_len, kfile_len;
1385   DBUG_ENTER("new_table");
1386 
1387   checkpoint_useful= TRUE;
1388   if ((name == NULL) || (name[0] == 0))
1389   {
1390     /*
1391       we didn't use DBUG_ASSERT() because such record corruption could
1392       silently pass in the "info == NULL" test below.
1393     */
1394     tprint(tracef, ", record is corrupted");
1395     eprint(tracef, "\n***WARNING: %s may be corrupted", name ? name : "NULL");
1396     info= NULL;
1397     recovery_warnings++;
1398     goto end;
1399   }
1400   tprint(tracef, "Table '%s', id %u", name, sid);
1401   info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR, 0);
1402   if (info == NULL)
1403   {
1404     tprint(tracef, ", is absent (must have been dropped later?)"
1405            " or its header is so corrupted that we cannot open it;"
1406            " we skip it");
1407     if (my_errno != ENOENT)
1408     {
1409       recovery_found_crashed_tables++;
1410       eprint(tracef, "\n***WARNING: %s could not be opened: Error: %d",
1411              name ? name : "NULL", (int) my_errno);
1412     }
1413     error= 0;
1414     goto end;
1415   }
1416   share= info->s;
1417   /* check that we're not already using it */
1418   if (share->reopen != 1)
1419   {
1420     tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
1421     /*
1422       It could be that we have in the log
1423       FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12);
1424     */
1425     if (close_one_table(share->open_file_name.str, lsn_of_file_id))
1426       goto end;
1427     /*
1428       We should not try to get length of data/index files as the files
1429       are not on disk yet.
1430     */
1431     _ma_tmp_disable_logging_for_table(info, FALSE);
1432     goto set_lsn_of_file_id;
1433   }
1434   if (!share->base.born_transactional)
1435   {
1436     /*
1437       This can happen if one converts a transactional table to a
1438       not transactional table
1439     */
1440     tprint(tracef, ", is not transactional.  Ignoring open request");
1441     eprint(tracef, "\n***WARNING: '%s' may be crashed", name);
1442     error= -1;
1443     recovery_warnings++;
1444     goto end;
1445   }
1446   if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0)
1447   {
1448     /*
1449       This can happen if the table was dropped and re-created since this
1450       redo entry or if the table had a bulk insert directly after create,
1451       in which case the create_rename_lsn changed.
1452     */
1453     tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1454            " LOGREC_FILE_ID's LSN " LSN_FMT ", ignoring open request",
1455            LSN_IN_PARTS(share->state.create_rename_lsn),
1456            LSN_IN_PARTS(lsn_of_file_id));
1457     recovery_warnings++;
1458     error= -1;
1459     goto end;
1460     /*
1461       Note that we tested that before testing corruption; a recent corrupted
1462       table is not a blocker for the present log record.
1463     */
1464   }
1465   if (maria_is_crashed(info))
1466   {
1467     tprint(tracef, "\n");
1468     eprint(tracef, "Table '%s' is crashed, skipping it. Please repair it with"
1469            " aria_chk -r", share->open_file_name.str);
1470     recovery_found_crashed_tables++;
1471     error= -1; /* not fatal, try with other tables */
1472     goto end;
1473     /*
1474       Note that if a first recovery fails to apply a REDO, it marks the table
1475       corrupted and stops the entire recovery. A second recovery will find the
1476       table is marked corrupted and skip it (and thus possibly handle other
1477       tables).
1478     */
1479   }
1480   /* don't log any records for this work */
1481   _ma_tmp_disable_logging_for_table(info, FALSE);
1482   /* execution of some REDO records relies on data_file_length */
1483   dfile_len= mysql_file_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
1484   kfile_len= mysql_file_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
1485   if ((dfile_len == MY_FILEPOS_ERROR) ||
1486       (kfile_len == MY_FILEPOS_ERROR))
1487   {
1488     tprint(tracef, ", length unknown\n");
1489     eprint(tracef, "\n***WARNING: Can't read length of file '%s'",
1490            share->open_file_name.str);
1491     recovery_warnings++;
1492     goto end;
1493   }
1494   if (share->state.state.data_file_length != dfile_len)
1495   {
1496     tprint(tracef, ", has wrong state.data_file_length "
1497            "(fixing it from %llu to %llu)",
1498            (ulonglong) share->state.state.data_file_length, (ulonglong) dfile_len);
1499     share->state.state.data_file_length= dfile_len;
1500   }
1501   if (share->state.state.key_file_length != kfile_len)
1502   {
1503     tprint(tracef, ", has wrong state.key_file_length "
1504            "(fixing it from %llu to %llu)",
1505            (ulonglong) share->state.state.key_file_length, (ulonglong) kfile_len);
1506     share->state.state.key_file_length= kfile_len;
1507   }
1508   if ((dfile_len % share->block_size) || (kfile_len % share->block_size))
1509   {
1510     tprint(tracef, ", has too short last page");
1511     /* Recovery will fix this, no error */
1512     ALERT_USER();
1513   }
1514 
1515 set_lsn_of_file_id:
1516   /*
1517     This LSN serves in this situation; assume log is:
1518     FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1")
1519     then crash, checkpoint record is parsed and opens "t1" with id 6; assume
1520     REDO phase starts from the REDO_INSERT above: it will wrongly try to
1521     update a page of "t1". With this LSN below, REDO_INSERT can realize the
1522     mapping is newer than itself, and not execute.
1523     Same example is possible with UNDO_INSERT (update of the state).
1524   */
1525   info->s->lsn_of_file_id= lsn_of_file_id;
1526   all_tables[sid].info= info;
1527   /*
1528     We don't set info->s->id, it would be useless (no logging in REDO phase);
1529     if you change that, know that some records in REDO phase call
1530     _ma_update_state_lsns() which resets info->s->id.
1531   */
1532   tprint(tracef, ", opened");
1533   error= 0;
1534 end:
1535   tprint(tracef, "\n");
1536   if (error)
1537   {
1538     if (info != NULL)
1539     {
1540       /* let maria_close() mark the table properly closed */
1541       info->s->state.open_count= 1;
1542       info->s->global_changed= 1;
1543       info->s->changed= 1;
1544       maria_close(info);
1545     }
1546     if (error == -1)
1547       error= 0;
1548   }
1549   DBUG_RETURN(error);
1550 }
1551 
1552 /*
1553   NOTE
1554   This is called for REDO_INSERT_ROW_HEAD and READ_NEW_ROW_HEAD
1555 */
1556 
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)1557 prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
1558 {
1559   int error= 1;
1560   uchar *buff= NULL;
1561   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1562   if (info == NULL || maria_is_crashed(info))
1563 
1564   {
1565     /*
1566       Table was skipped at open time (because later dropped/renamed, not
1567       transactional, or create_rename_lsn newer than LOGREC_FILE_ID), or
1568       record was skipped due to skip_redo_lsn; it is not an error.
1569     */
1570     return 0;
1571   }
1572   /*
1573     Note that REDO is per page, we still consider it if its transaction
1574     committed long ago and is unknown.
1575   */
1576   /*
1577     If REDO's LSN is > page's LSN (read from disk), we are going to modify the
1578     page and change its LSN. The normal runtime code stores the UNDO's LSN
1579     into the page. Here storing the REDO's LSN (rec->lsn) would work
1580     (we are not writing to the log here, so don't have to "flush up to UNDO's
1581     LSN"). But in a test scenario where we do updates at runtime, then remove
1582     tables, apply the log and check that this results in the same table as at
1583     runtime, putting the same LSN as runtime had done will decrease
1584     differences. So we use the UNDO's LSN which is current_group_end_lsn.
1585   */
1586   enlarge_buffer(rec);
1587   if (log_record_buffer.str == NULL)
1588   {
1589     eprint(tracef, "Failed to read allocate buffer for record");
1590     goto end;
1591   }
1592   if (translog_read_record(rec->lsn, 0, rec->record_length,
1593                            log_record_buffer.str, NULL) !=
1594       rec->record_length)
1595   {
1596     eprint(tracef, "Failed to read record");
1597     goto end;
1598   }
1599   buff= log_record_buffer.str;
1600   if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
1601                                              HEAD_PAGE,
1602                                              (rec->type ==
1603                                               LOGREC_REDO_NEW_ROW_HEAD),
1604                                              buff + FILEID_STORE_SIZE,
1605                                              buff +
1606                                              FILEID_STORE_SIZE +
1607                                              PAGE_STORE_SIZE +
1608                                              DIRPOS_STORE_SIZE,
1609                                              rec->record_length -
1610                                              (FILEID_STORE_SIZE +
1611                                               PAGE_STORE_SIZE +
1612                                               DIRPOS_STORE_SIZE)))
1613     goto end;
1614   error= 0;
1615 end:
1616   return error;
1617 }
1618 
1619 /*
1620   NOTE
1621   This is called for REDO_INSERT_ROW_TAIL and READ_NEW_ROW_TAIL
1622 */
1623 
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)1624 prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
1625 {
1626   int error= 1;
1627   uchar *buff;
1628   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1629   if (info == NULL || maria_is_crashed(info))
1630     return 0;
1631   enlarge_buffer(rec);
1632   if (log_record_buffer.str == NULL ||
1633       translog_read_record(rec->lsn, 0, rec->record_length,
1634                            log_record_buffer.str, NULL) !=
1635        rec->record_length)
1636   {
1637     eprint(tracef, "Failed to read record");
1638     goto end;
1639   }
1640   buff= log_record_buffer.str;
1641   if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
1642                                              TAIL_PAGE,
1643                                              (rec->type ==
1644                                               LOGREC_REDO_NEW_ROW_TAIL),
1645                                              buff + FILEID_STORE_SIZE,
1646                                              buff +
1647                                              FILEID_STORE_SIZE +
1648                                              PAGE_STORE_SIZE +
1649                                              DIRPOS_STORE_SIZE,
1650                                              rec->record_length -
1651                                              (FILEID_STORE_SIZE +
1652                                               PAGE_STORE_SIZE +
1653                                               DIRPOS_STORE_SIZE)))
1654     goto end;
1655   error= 0;
1656 
1657 end:
1658   return error;
1659 }
1660 
1661 
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)1662 prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
1663 {
1664   int error= 1;
1665   uchar *buff;
1666   uint number_of_blobs, number_of_ranges;
1667   pgcache_page_no_t first_page, last_page;
1668   char llbuf1[22], llbuf2[22];
1669   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1670   if (info == NULL  || maria_is_crashed(info))
1671     return 0;
1672   enlarge_buffer(rec);
1673   if (log_record_buffer.str == NULL ||
1674       translog_read_record(rec->lsn, 0, rec->record_length,
1675                            log_record_buffer.str, NULL) !=
1676        rec->record_length)
1677   {
1678     eprint(tracef, "Failed to read record");
1679     goto end;
1680   }
1681   buff= log_record_buffer.str;
1682   if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
1683                                       buff, rec->lsn, &number_of_blobs,
1684                                       &number_of_ranges,
1685                                       &first_page, &last_page))
1686     goto end;
1687   llstr(first_page, llbuf1);
1688   llstr(last_page, llbuf2);
1689   tprint(tracef, " %u blobs %u ranges, first page %s last %s",
1690          number_of_blobs, number_of_ranges, llbuf1, llbuf2);
1691 
1692   error= 0;
1693 
1694 end:
1695   tprint(tracef, " \n");
1696   return error;
1697 }
1698 
1699 
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)1700 prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
1701 {
1702   int error= 1;
1703   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1704   if (info == NULL || maria_is_crashed(info))
1705     return 0;
1706   if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
1707                                             HEAD_PAGE,
1708                                             rec->header + FILEID_STORE_SIZE))
1709     goto end;
1710   error= 0;
1711 end:
1712   return error;
1713 }
1714 
1715 
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)1716 prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
1717 {
1718   int error= 1;
1719   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1720   if (info == NULL || maria_is_crashed(info))
1721     return 0;
1722   if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
1723                                             TAIL_PAGE,
1724                                             rec->header + FILEID_STORE_SIZE))
1725     goto end;
1726   error= 0;
1727 end:
1728   return error;
1729 }
1730 
1731 
prototype_redo_exec_hook(REDO_FREE_BLOCKS)1732 prototype_redo_exec_hook(REDO_FREE_BLOCKS)
1733 {
1734   int error= 1;
1735   uchar *buff;
1736   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1737   if (info == NULL || maria_is_crashed(info))
1738     return 0;
1739   enlarge_buffer(rec);
1740 
1741   if (log_record_buffer.str == NULL ||
1742       translog_read_record(rec->lsn, 0, rec->record_length,
1743                            log_record_buffer.str, NULL) !=
1744        rec->record_length)
1745   {
1746     eprint(tracef, "Failed to read record");
1747     goto end;
1748   }
1749 
1750   buff= log_record_buffer.str;
1751   if (_ma_apply_redo_free_blocks(info, current_group_end_lsn, rec->lsn,
1752                                  buff))
1753     goto end;
1754   error= 0;
1755 end:
1756   return error;
1757 }
1758 
1759 
prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)1760 prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)
1761 {
1762   int error= 1;
1763   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1764   if (info == NULL || maria_is_crashed(info))
1765     return 0;
1766 
1767   if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn,
1768                                        rec->header + FILEID_STORE_SIZE))
1769     goto end;
1770   error= 0;
1771 end:
1772   return error;
1773 }
1774 
1775 
prototype_redo_exec_hook(REDO_DELETE_ALL)1776 prototype_redo_exec_hook(REDO_DELETE_ALL)
1777 {
1778   int error= 1;
1779   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1780   if (info == NULL)
1781     return 0;
1782   tprint(tracef, "   deleting all %lu rows\n",
1783          (ulong)info->s->state.state.records);
1784   if (maria_delete_all_rows(info))
1785     goto end;
1786   error= 0;
1787 end:
1788   return error;
1789 }
1790 
1791 
prototype_redo_exec_hook(REDO_INDEX)1792 prototype_redo_exec_hook(REDO_INDEX)
1793 {
1794   int error= 1;
1795   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1796   if (info == NULL || maria_is_crashed(info))
1797     return 0;
1798   enlarge_buffer(rec);
1799 
1800   if (log_record_buffer.str == NULL ||
1801       translog_read_record(rec->lsn, 0, rec->record_length,
1802                            log_record_buffer.str, NULL) !=
1803        rec->record_length)
1804   {
1805     eprint(tracef, "Failed to read record");
1806     goto end;
1807   }
1808 
1809   if (_ma_apply_redo_index(info, current_group_end_lsn,
1810                            log_record_buffer.str + FILEID_STORE_SIZE,
1811                            rec->record_length - FILEID_STORE_SIZE))
1812     goto end;
1813   error= 0;
1814 end:
1815   return error;
1816 }
1817 
prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)1818 prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)
1819 {
1820   int error= 1;
1821   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1822   if (info == NULL || maria_is_crashed(info))
1823     return 0;
1824   enlarge_buffer(rec);
1825 
1826   if (log_record_buffer.str == NULL ||
1827       translog_read_record(rec->lsn, 0, rec->record_length,
1828                            log_record_buffer.str, NULL) !=
1829        rec->record_length)
1830   {
1831     eprint(tracef, "Failed to read record");
1832     goto end;
1833   }
1834 
1835   if (_ma_apply_redo_index_new_page(info, current_group_end_lsn,
1836                                     log_record_buffer.str + FILEID_STORE_SIZE,
1837                                     rec->record_length - FILEID_STORE_SIZE))
1838     goto end;
1839   error= 0;
1840 end:
1841   return error;
1842 }
1843 
1844 
prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)1845 prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)
1846 {
1847   int error= 1;
1848   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1849   if (info == NULL || maria_is_crashed(info))
1850     return 0;
1851 
1852   if (_ma_apply_redo_index_free_page(info, current_group_end_lsn,
1853                                      rec->header + FILEID_STORE_SIZE))
1854     goto end;
1855   error= 0;
1856 end:
1857   return error;
1858 }
1859 
1860 
prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)1861 prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)
1862 {
1863   int error= 1;
1864   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1865   if (info == NULL || maria_is_crashed(info))
1866     return 0;
1867   enlarge_buffer(rec);
1868 
1869   if (log_record_buffer.str == NULL ||
1870       translog_read_record(rec->lsn, 0, rec->record_length,
1871                            log_record_buffer.str, NULL) !=
1872        rec->record_length)
1873   {
1874     eprint(tracef, "Failed to read record");
1875     goto end;
1876   }
1877 
1878   if (cmp_translog_addr(rec->lsn, checkpoint_start) >= 0)
1879   {
1880     /*
1881       Record is potentially after the bitmap flush made by Checkpoint, so has
1882       to be replayed. It may overwrite a more recent state but that will be
1883       corrected by all upcoming REDOs for data pages.
1884       If the condition is false, we must not apply the record: it is unneeded
1885       and nocive (may not be corrected as REDOs can be skipped due to
1886       dirty-pages list).
1887     */
1888     if (_ma_apply_redo_bitmap_new_page(info, current_group_end_lsn,
1889                                        log_record_buffer.str +
1890                                        FILEID_STORE_SIZE))
1891       goto end;
1892   }
1893   error= 0;
1894 end:
1895   return error;
1896 }
1897 
1898 
set_undo_lsn_for_active_trans(uint16 short_trid,LSN lsn)1899 static inline void set_undo_lsn_for_active_trans(uint16 short_trid, LSN lsn)
1900 {
1901   if (all_active_trans[short_trid].long_trid == 0)
1902   {
1903     /* transaction unknown, so has committed or fully rolled back long ago */
1904     return;
1905   }
1906   all_active_trans[short_trid].undo_lsn= lsn;
1907   if (all_active_trans[short_trid].first_undo_lsn == LSN_IMPOSSIBLE)
1908     all_active_trans[short_trid].first_undo_lsn= lsn;
1909 }
1910 
1911 
prototype_redo_exec_hook(UNDO_ROW_INSERT)1912 prototype_redo_exec_hook(UNDO_ROW_INSERT)
1913 {
1914   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1915   MARIA_SHARE *share;
1916 
1917   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1918   if (info == NULL)
1919   {
1920     /*
1921       Note that we set undo_lsn anyway. So that if the transaction is later
1922       rolled back, this UNDO is tried for execution and we get a warning (as
1923       it would then be abnormal that info==NULL).
1924     */
1925     return 0;
1926   }
1927   share= info->s;
1928   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1929   {
1930     tprint(tracef, "   state has LSN " LSN_FMT " older than record, updating"
1931            " row count\n", LSN_IN_PARTS(share->state.is_of_horizon));
1932     share->state.state.records++;
1933     if (share->calc_checksum)
1934     {
1935       uchar buff[HA_CHECKSUM_STORE_SIZE];
1936       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1937                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
1938                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1939           HA_CHECKSUM_STORE_SIZE)
1940       {
1941         eprint(tracef, "Failed to read record");
1942         return 1;
1943       }
1944       share->state.state.checksum+= ha_checksum_korr(buff);
1945     }
1946     info->s->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
1947                               STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
1948   }
1949   tprint(tracef, "   row count: %lu\n", (ulong)info->s->state.state.records);
1950   /* Unpin all pages, stamp them with UNDO's LSN */
1951   _ma_unpin_all_pages(info, rec->lsn);
1952   return 0;
1953 }
1954 
1955 
prototype_redo_exec_hook(UNDO_ROW_DELETE)1956 prototype_redo_exec_hook(UNDO_ROW_DELETE)
1957 {
1958   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1959   MARIA_SHARE *share;
1960 
1961   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1962   if (info == NULL)
1963     return 0;
1964   share= info->s;
1965   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1966   {
1967     tprint(tracef, "   state older than record\n");
1968     share->state.state.records--;
1969     if (share->calc_checksum)
1970     {
1971       uchar buff[HA_CHECKSUM_STORE_SIZE];
1972       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1973                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE + 2 +
1974                                PAGERANGE_STORE_SIZE,
1975                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1976           HA_CHECKSUM_STORE_SIZE)
1977       {
1978         eprint(tracef, "Failed to read record");
1979         return 1;
1980       }
1981       share->state.state.checksum+= ha_checksum_korr(buff);
1982     }
1983     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
1984                             STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
1985                             STATE_NOT_MOVABLE);
1986   }
1987   tprint(tracef, "   row count: %lu\n", (ulong)share->state.state.records);
1988   _ma_unpin_all_pages(info, rec->lsn);
1989   return 0;
1990 }
1991 
1992 
prototype_redo_exec_hook(UNDO_ROW_UPDATE)1993 prototype_redo_exec_hook(UNDO_ROW_UPDATE)
1994 {
1995   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1996   MARIA_SHARE *share;
1997 
1998   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1999   if (info == NULL)
2000     return 0;
2001   share= info->s;
2002   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2003   {
2004     if (share->calc_checksum)
2005     {
2006       uchar buff[HA_CHECKSUM_STORE_SIZE];
2007       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
2008                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
2009                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
2010           HA_CHECKSUM_STORE_SIZE)
2011       {
2012         eprint(tracef, "Failed to read record");
2013         return 1;
2014       }
2015       share->state.state.checksum+= ha_checksum_korr(buff);
2016     }
2017     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2018                             STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2019   }
2020   _ma_unpin_all_pages(info, rec->lsn);
2021   return 0;
2022 }
2023 
2024 
prototype_redo_exec_hook(UNDO_KEY_INSERT)2025 prototype_redo_exec_hook(UNDO_KEY_INSERT)
2026 {
2027   MARIA_HA *info;
2028   MARIA_SHARE *share;
2029 
2030   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2031   if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
2032     return 0;
2033   share= info->s;
2034   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2035   {
2036     const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE;
2037     uint keynr= key_nr_korr(ptr);
2038     if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */
2039     {
2040       const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg;
2041       ulonglong value;
2042       char llbuf[22];
2043       uchar reversed[MARIA_MAX_KEY_BUFF], *to;
2044       tprint(tracef, "   state older than record\n");
2045       /* we read the record to find the auto_increment value */
2046       enlarge_buffer(rec);
2047       if (log_record_buffer.str == NULL ||
2048           translog_read_record(rec->lsn, 0, rec->record_length,
2049                                log_record_buffer.str, NULL) !=
2050           rec->record_length)
2051       {
2052         eprint(tracef, "Failed to read record");
2053         return 1;
2054       }
2055       to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
2056         KEY_NR_STORE_SIZE;
2057       if (keyseg->flag & HA_SWAP_KEY)
2058       {
2059         /* We put key from log record to "data record" packing format... */
2060         uchar *key_ptr= to;
2061         uchar *key_end= key_ptr + keyseg->length;
2062         to= reversed + keyseg->length;
2063         do
2064         {
2065           *--to= *key_ptr++;
2066         } while (key_ptr != key_end);
2067         /* ... so that we can read it with: */
2068       }
2069       value= ma_retrieve_auto_increment(to, keyseg->type);
2070       set_if_bigger(share->state.auto_increment, value);
2071       llstr(share->state.auto_increment, llbuf);
2072       tprint(tracef, "   auto-inc %s\n", llbuf);
2073     }
2074   }
2075   _ma_unpin_all_pages(info, rec->lsn);
2076   return 0;
2077 }
2078 
2079 
prototype_redo_exec_hook(UNDO_KEY_DELETE)2080 prototype_redo_exec_hook(UNDO_KEY_DELETE)
2081 {
2082   MARIA_HA *info;
2083 
2084   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2085   if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
2086     return 0;
2087   _ma_unpin_all_pages(info, rec->lsn);
2088   return 0;
2089 }
2090 
2091 
prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)2092 prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
2093 {
2094   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2095   MARIA_SHARE *share;
2096 
2097   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2098   if (info == NULL)
2099     return 0;
2100   share= info->s;
2101   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2102   {
2103     uint key_nr;
2104     my_off_t page;
2105     key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
2106     page=  page_korr(rec->header +  LSN_STORE_SIZE + FILEID_STORE_SIZE +
2107                      KEY_NR_STORE_SIZE);
2108     share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
2109                                     HA_OFFSET_ERROR :
2110                                     page * share->block_size);
2111   }
2112   _ma_unpin_all_pages(info, rec->lsn);
2113   return 0;
2114 }
2115 
2116 
prototype_redo_exec_hook(UNDO_BULK_INSERT)2117 prototype_redo_exec_hook(UNDO_BULK_INSERT)
2118 {
2119   /*
2120     If the repair finished it wrote and sync the state. If it didn't finish,
2121     we are going to empty the table and that will fix the state.
2122   */
2123   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2124   return 0;
2125 }
2126 
2127 
prototype_redo_exec_hook(IMPORTED_TABLE)2128 prototype_redo_exec_hook(IMPORTED_TABLE)
2129 {
2130   char *name;
2131   enlarge_buffer(rec);
2132   if (log_record_buffer.str == NULL ||
2133       translog_read_record(rec->lsn, 0, rec->record_length,
2134                            log_record_buffer.str, NULL) !=
2135       rec->record_length)
2136   {
2137     eprint(tracef, "Failed to read record");
2138     return 1;
2139   }
2140   name= (char *)log_record_buffer.str;
2141   tprint(tracef, "Table '%s' was imported (auto-zerofilled) in this Aria instance\n", name);
2142   return 0;
2143 }
2144 
2145 
prototype_redo_exec_hook(COMMIT)2146 prototype_redo_exec_hook(COMMIT)
2147 {
2148   uint16 sid= rec->short_trid;
2149   TrID long_trid= all_active_trans[sid].long_trid;
2150   char llbuf[22];
2151   if (long_trid == 0)
2152   {
2153     tprint(tracef, "We don't know about transaction with short_trid %u;"
2154            "it probably committed long ago, forget it\n", sid);
2155     bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
2156     return 0;
2157   }
2158   llstr(long_trid, llbuf);
2159   tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
2160          llbuf, sid);
2161   bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
2162 #ifdef MARIA_VERSIONING
2163   /*
2164     if real recovery:
2165     transaction was committed, move it to some separate list for later
2166     purging (but don't purge now! purging may have been started before, we
2167     may find REDO_PURGE records soon).
2168   */
2169 #endif
2170   return 0;
2171 }
2172 
prototype_redo_exec_hook(CLR_END)2173 prototype_redo_exec_hook(CLR_END)
2174 {
2175   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2176   MARIA_SHARE *share;
2177   LSN previous_undo_lsn;
2178   enum translog_record_type undone_record_type;
2179   const LOG_DESC *log_desc;
2180   my_bool row_entry= 0;
2181   uchar *logpos;
2182   DBUG_ENTER("exec_REDO_LOGREC_CLR_END");
2183 
2184   previous_undo_lsn= lsn_korr(rec->header);
2185   undone_record_type=
2186     clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
2187   log_desc= &log_record_type_descriptor[undone_record_type];
2188 
2189   set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
2190   if (info == NULL)
2191     DBUG_RETURN(0);
2192   share= info->s;
2193   tprint(tracef, "   CLR_END was about %s, undo_lsn " LSN_FMT "\n",
2194          log_desc->name, LSN_IN_PARTS(previous_undo_lsn));
2195 
2196   enlarge_buffer(rec);
2197   if (log_record_buffer.str == NULL ||
2198       translog_read_record(rec->lsn, 0, rec->record_length,
2199                            log_record_buffer.str, NULL) !=
2200       rec->record_length)
2201   {
2202     eprint(tracef, "Failed to read record");
2203     return 1;
2204   }
2205   logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
2206            CLR_TYPE_STORE_SIZE);
2207 
2208   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2209   {
2210     tprint(tracef, "   state older than record\n");
2211     switch (undone_record_type) {
2212     case LOGREC_UNDO_ROW_DELETE:
2213       row_entry= 1;
2214       share->state.state.records++;
2215       break;
2216     case LOGREC_UNDO_ROW_INSERT:
2217       share->state.state.records--;
2218       share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
2219       row_entry= 1;
2220       break;
2221     case LOGREC_UNDO_ROW_UPDATE:
2222       row_entry= 1;
2223       break;
2224     case LOGREC_UNDO_KEY_INSERT:
2225     case LOGREC_UNDO_KEY_DELETE:
2226       break;
2227     case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
2228     case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
2229     {
2230       uint key_nr;
2231       my_off_t page;
2232       key_nr= key_nr_korr(logpos);
2233       page=  page_korr(logpos + KEY_NR_STORE_SIZE);
2234       share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
2235                                       HA_OFFSET_ERROR :
2236                                       page * share->block_size);
2237       break;
2238     }
2239     case LOGREC_UNDO_BULK_INSERT:
2240       break;
2241     default:
2242       DBUG_ASSERT(0);
2243     }
2244     if (row_entry && share->calc_checksum)
2245       share->state.state.checksum+= ha_checksum_korr(logpos);
2246     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2247                             STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2248   }
2249   if (row_entry)
2250     tprint(tracef, "   row count: %lu\n", (ulong)share->state.state.records);
2251   _ma_unpin_all_pages(info, rec->lsn);
2252   DBUG_RETURN(0);
2253 }
2254 
2255 
2256 /**
2257    Hock to print debug information (like MySQL query)
2258 */
2259 
prototype_redo_exec_hook(DEBUG_INFO)2260 prototype_redo_exec_hook(DEBUG_INFO)
2261 {
2262   char *data;
2263   enum translog_debug_info_type debug_info;
2264 
2265   enlarge_buffer(rec);
2266   if (log_record_buffer.str == NULL ||
2267       translog_read_record(rec->lsn, 0, rec->record_length,
2268                            log_record_buffer.str, NULL) !=
2269       rec->record_length)
2270   {
2271     eprint(tracef, "Failed to read record debug record");
2272     return 1;
2273   }
2274   debug_info= (enum translog_debug_info_type) log_record_buffer.str[0];
2275   data= (char*) log_record_buffer.str + 1;
2276   switch (debug_info) {
2277   case LOGREC_DEBUG_INFO_QUERY:
2278     tprint(tracef, "Query: %.*s\n", (int) rec->record_length - 1, data);
2279     break;
2280   default:
2281     DBUG_ASSERT(0);
2282   }
2283   return 0;
2284 }
2285 
2286 
2287 /**
2288   In some cases we have to skip execution of an UNDO record during the UNDO
2289   phase.
2290 */
2291 
skip_undo_record(LSN previous_undo_lsn,TRN * trn)2292 static void skip_undo_record(LSN previous_undo_lsn, TRN *trn)
2293 {
2294   trn->undo_lsn= previous_undo_lsn;
2295   if (previous_undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
2296     trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
2297   skipped_undo_phase++;
2298 }
2299 
2300 
prototype_undo_exec_hook(UNDO_ROW_INSERT)2301 prototype_undo_exec_hook(UNDO_ROW_INSERT)
2302 {
2303   my_bool error;
2304   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2305   LSN previous_undo_lsn= lsn_korr(rec->header);
2306   MARIA_SHARE *share;
2307   const uchar *record_ptr;
2308 
2309   if (info == NULL || maria_is_crashed(info))
2310   {
2311     /*
2312       Unlike for REDOs, if the table was skipped it is abnormal; we have a
2313       transaction to rollback which used this table, as it is not rolled back
2314       it was supposed to hold this table and so the table should still be
2315       there. Skip it (user may have repaired the table with maria_chk because
2316       it was so badly corrupted that a previous recovery failed) but warn.
2317     */
2318     skip_undo_record(previous_undo_lsn, trn);
2319     return 0;
2320   }
2321   share= info->s;
2322   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2323                           STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
2324                           STATE_NOT_MOVABLE);
2325   record_ptr= rec->header;
2326   if (share->calc_checksum)
2327   {
2328     /*
2329       We need to read more of the record to put the checksum into the record
2330       buffer used by _ma_apply_undo_row_insert().
2331       If the table has no live checksum, rec->header will be enough.
2332     */
2333     enlarge_buffer(rec);
2334     if (log_record_buffer.str == NULL ||
2335         translog_read_record(rec->lsn, 0, rec->record_length,
2336                              log_record_buffer.str, NULL) !=
2337         rec->record_length)
2338     {
2339       eprint(tracef, "Failed to read record");
2340       return 1;
2341     }
2342     record_ptr= log_record_buffer.str;
2343   }
2344 
2345   info->trn= trn;
2346   error= _ma_apply_undo_row_insert(info, previous_undo_lsn,
2347                                    record_ptr + LSN_STORE_SIZE +
2348                                    FILEID_STORE_SIZE);
2349   info->trn= 0;
2350   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2351   tprint(tracef, "   row count: %lu\n", (ulong)info->s->state.state.records);
2352   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2353          LSN_IN_PARTS(trn->undo_lsn));
2354   return error;
2355 }
2356 
2357 
prototype_undo_exec_hook(UNDO_ROW_DELETE)2358 prototype_undo_exec_hook(UNDO_ROW_DELETE)
2359 {
2360   my_bool error;
2361   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2362   LSN previous_undo_lsn= lsn_korr(rec->header);
2363   MARIA_SHARE *share;
2364 
2365   if (info == NULL || maria_is_crashed(info))
2366   {
2367     skip_undo_record(previous_undo_lsn, trn);
2368     return 0;
2369   }
2370 
2371   share= info->s;
2372   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2373                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2374   enlarge_buffer(rec);
2375   if (log_record_buffer.str == NULL ||
2376       translog_read_record(rec->lsn, 0, rec->record_length,
2377                            log_record_buffer.str, NULL) !=
2378        rec->record_length)
2379   {
2380     eprint(tracef, "Failed to read record");
2381     return 1;
2382   }
2383 
2384   info->trn= trn;
2385   error= _ma_apply_undo_row_delete(info, previous_undo_lsn,
2386                                    log_record_buffer.str + LSN_STORE_SIZE +
2387                                    FILEID_STORE_SIZE,
2388                                    rec->record_length -
2389                                    (LSN_STORE_SIZE + FILEID_STORE_SIZE));
2390   info->trn= 0;
2391   tprint(tracef, "   row count: %lu\n   undo_lsn now LSN " LSN_FMT "\n",
2392          (ulong)share->state.state.records, LSN_IN_PARTS(trn->undo_lsn));
2393   return error;
2394 }
2395 
2396 
prototype_undo_exec_hook(UNDO_ROW_UPDATE)2397 prototype_undo_exec_hook(UNDO_ROW_UPDATE)
2398 {
2399   my_bool error;
2400   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2401   LSN previous_undo_lsn= lsn_korr(rec->header);
2402   MARIA_SHARE *share;
2403 
2404   if (info == NULL || maria_is_crashed(info))
2405   {
2406     skip_undo_record(previous_undo_lsn, trn);
2407     return 0;
2408   }
2409 
2410   share= info->s;
2411   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2412                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2413   enlarge_buffer(rec);
2414   if (log_record_buffer.str == NULL ||
2415       translog_read_record(rec->lsn, 0, rec->record_length,
2416                            log_record_buffer.str, NULL) !=
2417        rec->record_length)
2418   {
2419     eprint(tracef, "Failed to read record");
2420     return 1;
2421   }
2422 
2423   info->trn= trn;
2424   error= _ma_apply_undo_row_update(info, previous_undo_lsn,
2425                                    log_record_buffer.str + LSN_STORE_SIZE +
2426                                    FILEID_STORE_SIZE,
2427                                    rec->record_length -
2428                                    (LSN_STORE_SIZE + FILEID_STORE_SIZE));
2429   info->trn= 0;
2430   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2431          LSN_IN_PARTS(trn->undo_lsn));
2432   return error;
2433 }
2434 
2435 
prototype_undo_exec_hook(UNDO_KEY_INSERT)2436 prototype_undo_exec_hook(UNDO_KEY_INSERT)
2437 {
2438   my_bool error;
2439   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2440   LSN previous_undo_lsn= lsn_korr(rec->header);
2441   MARIA_SHARE *share;
2442 
2443   if (info == NULL || maria_is_crashed(info))
2444   {
2445     skip_undo_record(previous_undo_lsn, trn);
2446     return 0;
2447   }
2448 
2449   share= info->s;
2450   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2451                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2452 
2453   enlarge_buffer(rec);
2454   if (log_record_buffer.str == NULL ||
2455       translog_read_record(rec->lsn, 0, rec->record_length,
2456                            log_record_buffer.str, NULL) !=
2457         rec->record_length)
2458   {
2459     eprint(tracef, "Failed to read record");
2460     return 1;
2461   }
2462 
2463   info->trn= trn;
2464   error= _ma_apply_undo_key_insert(info, previous_undo_lsn,
2465                                    log_record_buffer.str + LSN_STORE_SIZE +
2466                                    FILEID_STORE_SIZE,
2467                                    rec->record_length - LSN_STORE_SIZE -
2468                                    FILEID_STORE_SIZE);
2469   info->trn= 0;
2470   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2471   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2472          LSN_IN_PARTS(trn->undo_lsn));
2473   return error;
2474 }
2475 
2476 
prototype_undo_exec_hook(UNDO_KEY_DELETE)2477 prototype_undo_exec_hook(UNDO_KEY_DELETE)
2478 {
2479   my_bool error;
2480   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2481   LSN previous_undo_lsn= lsn_korr(rec->header);
2482   MARIA_SHARE *share;
2483 
2484   if (info == NULL || maria_is_crashed(info))
2485   {
2486     skip_undo_record(previous_undo_lsn, trn);
2487     return 0;
2488   }
2489 
2490   share= info->s;
2491   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2492                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2493 
2494   enlarge_buffer(rec);
2495   if (log_record_buffer.str == NULL ||
2496       translog_read_record(rec->lsn, 0, rec->record_length,
2497                            log_record_buffer.str, NULL) !=
2498         rec->record_length)
2499   {
2500     eprint(tracef, "Failed to read record");
2501     return 1;
2502   }
2503 
2504   info->trn= trn;
2505   error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
2506                                    log_record_buffer.str + LSN_STORE_SIZE +
2507                                    FILEID_STORE_SIZE,
2508                                    rec->record_length - LSN_STORE_SIZE -
2509                                    FILEID_STORE_SIZE, FALSE);
2510   info->trn= 0;
2511   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2512   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2513          LSN_IN_PARTS(trn->undo_lsn));
2514   return error;
2515 }
2516 
2517 
prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)2518 prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
2519 {
2520   my_bool error;
2521   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2522   LSN previous_undo_lsn= lsn_korr(rec->header);
2523   MARIA_SHARE *share;
2524 
2525   if (info == NULL || maria_is_crashed(info))
2526   {
2527     skip_undo_record(previous_undo_lsn, trn);
2528     return 0;
2529   }
2530 
2531   share= info->s;
2532   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2533                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2534 
2535   enlarge_buffer(rec);
2536   if (log_record_buffer.str == NULL ||
2537       translog_read_record(rec->lsn, 0, rec->record_length,
2538                            log_record_buffer.str, NULL) !=
2539         rec->record_length)
2540   {
2541     eprint(tracef, "Failed to read record");
2542     return 1;
2543   }
2544 
2545   info->trn= trn;
2546   error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
2547                                    log_record_buffer.str + LSN_STORE_SIZE +
2548                                    FILEID_STORE_SIZE,
2549                                    rec->record_length - LSN_STORE_SIZE -
2550                                    FILEID_STORE_SIZE, TRUE);
2551   info->trn= 0;
2552   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2553   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2554          LSN_IN_PARTS(trn->undo_lsn));
2555   return error;
2556 }
2557 
2558 
prototype_undo_exec_hook(UNDO_BULK_INSERT)2559 prototype_undo_exec_hook(UNDO_BULK_INSERT)
2560 {
2561   my_bool error;
2562   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2563   LSN previous_undo_lsn= lsn_korr(rec->header);
2564   MARIA_SHARE *share;
2565 
2566   /* Here we don't check for crashed as we can undo the bulk insert */
2567   if (info == NULL)
2568   {
2569     skip_undo_record(previous_undo_lsn, trn);
2570     return 0;
2571   }
2572 
2573   share= info->s;
2574   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2575                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2576 
2577   info->trn= trn;
2578   error= _ma_apply_undo_bulk_insert(info, previous_undo_lsn);
2579   info->trn= 0;
2580   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2581   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2582          LSN_IN_PARTS(trn->undo_lsn));
2583   return error;
2584 }
2585 
2586 
run_redo_phase(LSN lsn,LSN lsn_end,enum maria_apply_log_way apply)2587 static int run_redo_phase(LSN lsn, LSN lsn_end, enum maria_apply_log_way apply)
2588 {
2589   TRANSLOG_HEADER_BUFFER rec;
2590   struct st_translog_scanner_data scanner;
2591   int len;
2592   uint i;
2593   DBUG_ENTER("run_redo_phase");
2594 
2595   /* install hooks for execution */
2596 #define install_redo_exec_hook(R)                                        \
2597   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
2598     exec_REDO_LOGREC_ ## R;
2599 #define install_redo_exec_hook_shared(R,S)                               \
2600   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
2601     exec_REDO_LOGREC_ ## S;
2602 #define install_undo_exec_hook(R)                                        \
2603   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
2604     exec_UNDO_LOGREC_ ## R;
2605   install_redo_exec_hook(LONG_TRANSACTION_ID);
2606   install_redo_exec_hook(CHECKPOINT);
2607   install_redo_exec_hook(REDO_CREATE_TABLE);
2608   install_redo_exec_hook(REDO_RENAME_TABLE);
2609   install_redo_exec_hook(REDO_REPAIR_TABLE);
2610   install_redo_exec_hook(REDO_DROP_TABLE);
2611   install_redo_exec_hook(FILE_ID);
2612   install_redo_exec_hook(INCOMPLETE_LOG);
2613   install_redo_exec_hook(INCOMPLETE_GROUP);
2614   install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
2615   install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
2616   install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
2617   install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
2618   install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
2619   install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
2620   install_redo_exec_hook(REDO_FREE_BLOCKS);
2621   install_redo_exec_hook(REDO_DELETE_ALL);
2622   install_redo_exec_hook(REDO_INDEX);
2623   install_redo_exec_hook(REDO_INDEX_NEW_PAGE);
2624   install_redo_exec_hook(REDO_INDEX_FREE_PAGE);
2625   install_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
2626   install_redo_exec_hook(UNDO_ROW_INSERT);
2627   install_redo_exec_hook(UNDO_ROW_DELETE);
2628   install_redo_exec_hook(UNDO_ROW_UPDATE);
2629   install_redo_exec_hook(UNDO_KEY_INSERT);
2630   install_redo_exec_hook(UNDO_KEY_DELETE);
2631   install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
2632   install_redo_exec_hook(COMMIT);
2633   install_redo_exec_hook(CLR_END);
2634   install_undo_exec_hook(UNDO_ROW_INSERT);
2635   install_undo_exec_hook(UNDO_ROW_DELETE);
2636   install_undo_exec_hook(UNDO_ROW_UPDATE);
2637   install_undo_exec_hook(UNDO_KEY_INSERT);
2638   install_undo_exec_hook(UNDO_KEY_DELETE);
2639   install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
2640   /* REDO_NEW_ROW_HEAD shares entry with REDO_INSERT_ROW_HEAD */
2641   install_redo_exec_hook_shared(REDO_NEW_ROW_HEAD, REDO_INSERT_ROW_HEAD);
2642   /* REDO_NEW_ROW_TAIL shares entry with REDO_INSERT_ROW_TAIL */
2643   install_redo_exec_hook_shared(REDO_NEW_ROW_TAIL, REDO_INSERT_ROW_TAIL);
2644   install_redo_exec_hook(UNDO_BULK_INSERT);
2645   install_undo_exec_hook(UNDO_BULK_INSERT);
2646   install_redo_exec_hook(IMPORTED_TABLE);
2647   install_redo_exec_hook(DEBUG_INFO);
2648 
2649   current_group_end_lsn= LSN_IMPOSSIBLE;
2650 #ifndef DBUG_OFF
2651   current_group_table= NULL;
2652 #endif
2653 
2654   if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
2655   {
2656     tprint(tracef, "checkpoint address refers to the log end log or "
2657            "log is empty, nothing to do.\n");
2658     DBUG_RETURN(0);
2659   }
2660 
2661   len= translog_read_record_header(lsn, &rec);
2662 
2663   if (len == RECHEADER_READ_ERROR)
2664   {
2665     eprint(tracef, "Failed to read header of the first record.");
2666     DBUG_RETURN(1);
2667   }
2668   if (translog_scanner_init(lsn, 1, &scanner, 1))
2669   {
2670     tprint(tracef, "Scanner init failed\n");
2671     DBUG_RETURN(1);
2672   }
2673   for (i= 1;;i++)
2674   {
2675     uint16 sid= rec.short_trid;
2676     const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
2677     display_record_position(log_desc, &rec, i);
2678     /*
2679       A complete group is a set of log records with an "end mark" record
2680       (e.g. a set of REDOs for an operation, terminated by an UNDO for this
2681       operation); if there is no "end mark" record the group is incomplete and
2682       won't be executed.
2683     */
2684     if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
2685         (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
2686     {
2687       if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
2688       {
2689         if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
2690         {
2691           /*
2692             Can happen if the transaction got a table write error, then
2693             unlocked tables thus wrote a COMMIT record. Or can be an
2694             INCOMPLETE_GROUP record written by a previous recovery.
2695           */
2696           tprint(tracef, "\nDiscarding incomplete group before this record\n");
2697           all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2698         }
2699         else
2700         {
2701           struct st_translog_scanner_data scanner2;
2702           TRANSLOG_HEADER_BUFFER rec2;
2703           /*
2704             There is a complete group for this transaction, containing more
2705             than this event.
2706           */
2707           tprint(tracef, "   ends a group:\n");
2708           len=
2709             translog_read_record_header(all_active_trans[sid].group_start_lsn,
2710                                         &rec2);
2711           if (len < 0) /* EOF or error */
2712           {
2713             tprint(tracef, "Cannot find record where it should be\n");
2714             goto err;
2715           }
2716           if (lsn_end != LSN_IMPOSSIBLE && rec2.lsn >= lsn_end)
2717           {
2718             tprint(tracef,
2719                    "lsn_redo_end reached at " LSN_FMT ". "
2720                    "Skipping rest of redo entries\n",
2721                    LSN_IN_PARTS(rec2.lsn));
2722             translog_destroy_scanner(&scanner);
2723             translog_free_record_header(&rec);
2724             DBUG_RETURN(0);
2725           }
2726 
2727           if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1))
2728           {
2729             tprint(tracef, "Scanner2 init failed\n");
2730             goto err;
2731           }
2732           current_group_end_lsn= rec.lsn;
2733           do
2734           {
2735             if (rec2.short_trid == sid) /* it's in our group */
2736             {
2737               const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
2738               display_record_position(log_desc2, &rec2, 0);
2739               if (apply == MARIA_LOG_CHECK)
2740               {
2741                 translog_size_t read_len;
2742                 enlarge_buffer(&rec2);
2743                 read_len=
2744                   translog_read_record(rec2.lsn, 0, rec2.record_length,
2745                                        log_record_buffer.str, NULL);
2746                 if (read_len != rec2.record_length)
2747                 {
2748                   tprint(tracef, "Cannot read record's body: read %u of"
2749                          " %u bytes\n", read_len, rec2.record_length);
2750                   translog_destroy_scanner(&scanner2);
2751                   translog_free_record_header(&rec2);
2752                   goto err;
2753                 }
2754               }
2755               if (apply == MARIA_LOG_APPLY &&
2756                   display_and_apply_record(log_desc2, &rec2))
2757               {
2758                 translog_destroy_scanner(&scanner2);
2759                 translog_free_record_header(&rec2);
2760                 goto err;
2761               }
2762             }
2763             translog_free_record_header(&rec2);
2764             len= translog_read_next_record_header(&scanner2, &rec2);
2765             if (len < 0) /* EOF or error */
2766             {
2767               tprint(tracef, "Cannot find record where it should be\n");
2768               translog_destroy_scanner(&scanner2);
2769               translog_free_record_header(&rec2);
2770               goto err;
2771             }
2772           }
2773           while (rec2.lsn < rec.lsn);
2774           /* group finished */
2775           all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2776           current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
2777           display_record_position(log_desc, &rec, 0);
2778           translog_destroy_scanner(&scanner2);
2779           translog_free_record_header(&rec2);
2780         }
2781       }
2782       if (apply == MARIA_LOG_APPLY &&
2783           display_and_apply_record(log_desc, &rec))
2784         goto err;
2785 #ifndef DBUG_OFF
2786       current_group_table= NULL;
2787 #endif
2788     }
2789     else /* record does not end group */
2790     {
2791       /* just record the fact, can't know if can execute yet */
2792       if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
2793       {
2794         /* group not yet started */
2795         all_active_trans[sid].group_start_lsn= rec.lsn;
2796       }
2797     }
2798     translog_free_record_header(&rec);
2799     len= translog_read_next_record_header(&scanner, &rec);
2800     if (len < 0)
2801     {
2802       switch (len)
2803       {
2804       case RECHEADER_READ_EOF:
2805         tprint(tracef, "*** End of log ***\n");
2806         break;
2807       case RECHEADER_READ_ERROR:
2808         tprint(tracef, "Error reading log\n");
2809         goto err;
2810       }
2811       break;
2812     }
2813   }
2814   translog_destroy_scanner(&scanner);
2815   translog_free_record_header(&rec);
2816   if (recovery_message_printed == REC_MSG_REDO)
2817   {
2818     fprintf(stderr, " 100%%");
2819     fflush(stderr);
2820     procent_printed= 1;                         /* Will be follwed by time */
2821   }
2822   DBUG_RETURN(0);
2823 
2824 err:
2825   translog_destroy_scanner(&scanner);
2826   translog_free_record_header(&rec);
2827   DBUG_RETURN(1);
2828 }
2829 
2830 
2831 /**
2832    @brief Informs about any aborted groups or uncommitted transactions,
2833    prepares for the UNDO phase if needed.
2834 
2835    @note Observe that it may init trnman.
2836 */
end_of_redo_phase(my_bool prepare_for_undo_phase)2837 static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
2838 {
2839   uint sid, uncommitted= 0;
2840   char llbuf[22];
2841   LSN addr;
2842 
2843   my_hash_free(&all_dirty_pages);
2844   /*
2845     hash_free() can be called multiple times probably, but be safe if that
2846     changes
2847   */
2848   bzero(&all_dirty_pages, sizeof(all_dirty_pages));
2849   my_free(dirty_pages_pool);
2850   dirty_pages_pool= NULL;
2851 
2852   llstr(max_long_trid, llbuf);
2853   tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf);
2854   llstr(max_trid_in_control_file, llbuf);
2855   tprint(tracef, "Maximum transaction long id seen in control file: %s\n",
2856          llbuf);
2857   /*
2858     If logs were deleted, or lost, trid in control file is needed to set
2859     trnman's generator:
2860   */
2861   set_if_bigger(max_long_trid, max_trid_in_control_file);
2862   if (prepare_for_undo_phase && trnman_init(max_long_trid))
2863     return -1;
2864 
2865   trns_created= TRUE;
2866 
2867   for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
2868   {
2869     TrID long_trid= all_active_trans[sid].long_trid;
2870     LSN gslsn= all_active_trans[sid].group_start_lsn;
2871     TRN *trn;
2872     if (gslsn != LSN_IMPOSSIBLE)
2873     {
2874       tprint(tracef, "Group at LSN " LSN_FMT " short_trid %u incomplete\n",
2875              LSN_IN_PARTS(gslsn), sid);
2876       all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2877     }
2878     if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
2879     {
2880       llstr(long_trid, llbuf);
2881       tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
2882              llbuf, sid);
2883       /*
2884         dummy_transaction_object serves only for DDLs, where there is never a
2885         rollback or incomplete group. And unknown transactions (which have
2886         long_trid==0) should have undo_lsn==LSN_IMPOSSIBLE.
2887       */
2888       if (long_trid ==0)
2889       {
2890         eprint(tracef, "Transaction with long_trid 0 should not roll back");
2891         ALERT_USER();
2892         return -1;
2893       }
2894       if (prepare_for_undo_phase)
2895       {
2896         if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
2897           return -1;
2898         trn->undo_lsn= all_active_trans[sid].undo_lsn;
2899         trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
2900           TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
2901         if (gslsn != LSN_IMPOSSIBLE)
2902         {
2903           /*
2904             UNDO phase will log some records. So, a future recovery may see:
2905             REDO(from incomplete group) - REDO(from rollback) - CLR_END
2906             and thus execute the first REDO (finding it in "a complete
2907             group"). To prevent that:
2908           */
2909           LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS];
2910           LSN lsn;
2911           if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
2912                                     trn, NULL, 0,
2913                                     TRANSLOG_INTERNAL_PARTS, log_array,
2914                                     NULL, NULL))
2915             return -1;
2916         }
2917       }
2918       uncommitted++;
2919     }
2920 #ifdef MARIA_VERSIONING
2921     /*
2922       If real recovery: if transaction was committed, move it to some separate
2923       list for soon purging.
2924     */
2925 #endif
2926   }
2927 
2928   my_free(all_active_trans);
2929   all_active_trans= NULL;
2930 
2931   /*
2932     The UNDO phase uses some normal run-time code of ROLLBACK: generates log
2933     records, etc; prepare tables for that
2934   */
2935   addr= translog_get_horizon();
2936   for (sid= 0; sid <= SHARE_ID_MAX; sid++)
2937   {
2938     MARIA_HA *info= all_tables[sid].info;
2939     if (info != NULL)
2940     {
2941       prepare_table_for_close(info, addr);
2942       /*
2943         But we don't close it; we leave it available for the UNDO phase;
2944         it's likely that the UNDO phase will need it.
2945       */
2946       if (prepare_for_undo_phase)
2947         translog_assign_id_to_share_from_recovery(info->s, sid);
2948     }
2949   }
2950   return uncommitted;
2951 }
2952 
2953 
run_undo_phase(LSN end_undo_lsn,uint uncommitted)2954 static int run_undo_phase(LSN end_undo_lsn, uint uncommitted)
2955 {
2956   LSN last_undo __attribute__((unused));
2957   DBUG_ENTER("run_undo_phase");
2958 
2959   if (uncommitted > 0)
2960   {
2961     checkpoint_useful= TRUE;
2962     if (tracef != stdout)
2963     {
2964       if (recovery_message_printed == REC_MSG_NONE)
2965         print_preamble();
2966       fprintf(stderr, "transactions to roll back:");
2967       recovery_message_printed= REC_MSG_UNDO;
2968     }
2969     tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
2970     for( ; ; )
2971     {
2972       char llbuf[22];
2973       TRN *trn;
2974       if (recovery_message_printed == REC_MSG_UNDO)
2975       {
2976         fprintf(stderr, " %u", uncommitted);
2977         fflush(stderr);
2978       }
2979       if ((uncommitted--) == 0)
2980       {
2981         if (aria_undo_aborted <= 0)
2982         {
2983           aria_undo_aborted= 0;
2984           break;
2985         }
2986       }
2987       if (aria_undo_aborted)
2988       {
2989         tprint(tracef,
2990                "lsn_undo_end found. Skipping rest of undo entries\n");
2991         break;
2992       }
2993 
2994       trn= trnman_get_any_trn();
2995       DBUG_ASSERT(trn != NULL);
2996       llstr(trn->trid, llbuf);
2997       tprint(tracef, "Rolling back transaction of long id %s\n", llbuf);
2998       last_undo= trn->undo_lsn + 1;
2999 
3000       /* Execute all undo entries */
3001       while (trn->undo_lsn)
3002       {
3003         TRANSLOG_HEADER_BUFFER rec;
3004         LOG_DESC *log_desc;
3005         DBUG_ASSERT(trn->undo_lsn < last_undo);
3006         last_undo= trn->undo_lsn;
3007 
3008         if (translog_read_record_header(trn->undo_lsn, &rec) ==
3009             RECHEADER_READ_ERROR)
3010           DBUG_RETURN(1);
3011         log_desc= &log_record_type_descriptor[rec.type];
3012         display_record_position(log_desc, &rec, 0);
3013         if (log_desc->record_execute_in_undo_phase(&rec, trn))
3014         {
3015           eprint(tracef, "Got error %d when executing undo %s", my_errno,
3016                  log_desc->name);
3017           translog_free_record_header(&rec);
3018           DBUG_RETURN(1);
3019         }
3020         translog_free_record_header(&rec);
3021 
3022         if (last_undo == end_undo_lsn)
3023         {
3024           aria_undo_aborted= trn->undo_lsn ? 1 : -1;
3025           break;
3026         }
3027       }
3028 
3029       /* Force a crash to test recovery of recovery */
3030       if (maria_recovery_force_crash_counter)
3031       {
3032         DBUG_ASSERT(--maria_recovery_force_crash_counter > 0);
3033       }
3034 
3035       trn->undo_lsn= 0;            /* Avoid abort in trnman_rollbac_trn */
3036       if (trnman_rollback_trn(trn))
3037         DBUG_RETURN(1);
3038       /* We could want to span a few threads (4?) instead of 1 */
3039       /* In the future, we want to have this phase *online* */
3040     }
3041   }
3042   DBUG_RETURN(0);
3043 }
3044 
3045 
3046 /**
3047   In case of error in recovery, deletes all transactions from the transaction
3048   manager so that this module does not assert.
3049 
3050   @note no checkpoint should be taken as those transactions matter for the
3051   next recovery (they still haven't been properly dealt with).
3052 */
3053 
delete_all_transactions()3054 static void delete_all_transactions()
3055 {
3056   for( ; ; )
3057   {
3058     TRN *trn= trnman_get_any_trn();
3059     if (trn == NULL)
3060       break;
3061     trn->undo_lsn= trn->first_undo_lsn= LSN_IMPOSSIBLE;
3062     trnman_rollback_trn(trn); /* ignore error */
3063   }
3064 }
3065 
3066 
3067 /**
3068    @brief re-enables transactionality, updates is_of_horizon
3069 
3070    @param  info                table
3071    @param  horizon             address to set is_of_horizon
3072 */
3073 
prepare_table_for_close(MARIA_HA * info,TRANSLOG_ADDRESS horizon)3074 static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon)
3075 {
3076   MARIA_SHARE *share= info->s;
3077   /*
3078     In a fully-forward REDO phase (no checkpoint record),
3079     state is now at least as new as the LSN of the current record. It may be
3080     newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a
3081     table, but that table was later modified further in the log.
3082     But if we parsed a checkpoint record, it may be this way in the log:
3083     FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1)
3084     Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to
3085     make t1 close; the first condition below is however false (when checkpoint
3086     was taken it increased is_of_horizon) and so it works. For safety we
3087     add the second condition.
3088   */
3089   if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 &&
3090       cmp_translog_addr(share->lsn_of_file_id, horizon) < 0)
3091   {
3092     share->state.is_of_horizon= horizon;
3093     _ma_state_info_write_sub(share->kfile.file, &share->state,
3094                              MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET);
3095   }
3096 
3097   /*
3098    Ensure that info->state is up to date as
3099    _ma_renable_logging_for_table() is depending on this
3100   */
3101   *info->state= info->s->state.state;
3102 
3103   /*
3104     This leaves PAGECACHE_PLAIN_PAGE pages into the cache, while the table is
3105     going to switch back to transactional. So the table will be a mix of
3106     pages, which is ok as long as we don't take any checkpoints until all
3107     tables get closed at the end of the UNDO phase.
3108   */
3109   _ma_reenable_logging_for_table(info, FALSE);
3110   info->trn= NULL; /* safety */
3111 }
3112 
3113 
get_MARIA_HA_from_REDO_record(const TRANSLOG_HEADER_BUFFER * rec)3114 static MARIA_HA *get_MARIA_HA_from_REDO_record(const
3115                                                TRANSLOG_HEADER_BUFFER *rec)
3116 {
3117   uint16 sid;
3118   pgcache_page_no_t UNINIT_VAR(page);
3119   MARIA_HA *info;
3120   MARIA_SHARE *share;
3121   char llbuf[22];
3122   my_bool index_page_redo_entry= FALSE, page_redo_entry= FALSE;
3123 
3124   print_redo_phase_progress(rec->lsn);
3125   sid= fileid_korr(rec->header);
3126   switch (rec->type) {
3127     /* not all REDO records have a page: */
3128   case LOGREC_REDO_INDEX_NEW_PAGE:
3129   case LOGREC_REDO_INDEX:
3130   case LOGREC_REDO_INDEX_FREE_PAGE:
3131     index_page_redo_entry= 1;
3132     /* fall through*/
3133   case LOGREC_REDO_INSERT_ROW_HEAD:
3134   case LOGREC_REDO_INSERT_ROW_TAIL:
3135   case LOGREC_REDO_PURGE_ROW_HEAD:
3136   case LOGREC_REDO_PURGE_ROW_TAIL:
3137   case LOGREC_REDO_NEW_ROW_HEAD:
3138   case LOGREC_REDO_NEW_ROW_TAIL:
3139   case LOGREC_REDO_FREE_HEAD_OR_TAIL:
3140     page_redo_entry= TRUE;
3141     page= page_korr(rec->header + FILEID_STORE_SIZE);
3142     llstr(page, llbuf);
3143     break;
3144   case LOGREC_REDO_FREE_BLOCKS:
3145     /*
3146       We are checking against the dirty pages in _ma_apply_redo_free_blocks()
3147     */
3148     break;
3149   default:
3150     break;
3151   }
3152   tprint(tracef, "   For table of short id %u", sid);
3153   info= all_tables[sid].info;
3154 #ifndef DBUG_OFF
3155   DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
3156   current_group_table= info;
3157 #endif
3158   if (info == NULL)
3159   {
3160     tprint(tracef, ", table skipped, so skipping record\n");
3161     return NULL;
3162   }
3163   share= info->s;
3164   tprint(tracef, ", '%s'", share->open_file_name.str);
3165   DBUG_ASSERT(in_redo_phase);
3166   if (!table_is_part_of_recovery_set(&share->open_file_name))
3167   {
3168     tprint(tracef, ", skipped by user\n");
3169     return NULL;
3170   }
3171 
3172   if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
3173   {
3174     /*
3175       This can happen only if processing a record before the checkpoint
3176       record.
3177       id->name mapping is newer than REDO record: for sure the table subject
3178       of the REDO has been flushed and forced (id re-assignment implies this);
3179       REDO can be ignored (and must be, as we don't know what this subject
3180       table was).
3181     */
3182     DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0);
3183     tprint(tracef, ", table's LOGREC_FILE_ID has LSN " LSN_FMT " more recent"
3184            " than record, skipping record",
3185            LSN_IN_PARTS(share->lsn_of_file_id));
3186     return NULL;
3187   }
3188   if (cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
3189   {
3190     /* probably a bulk insert repair */
3191     tprint(tracef, ", has skip_redo_lsn " LSN_FMT " more recent than"
3192            " record, skipping record\n",
3193            LSN_IN_PARTS(share->state.skip_redo_lsn));
3194     return NULL;
3195   }
3196   /* detect if an open instance of a dropped table (internal bug) */
3197   DBUG_ASSERT(share->last_version != 0);
3198   if (page_redo_entry)
3199   {
3200     /*
3201       Consult dirty pages list.
3202       REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
3203       pages.
3204     */
3205     if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
3206                                      index_page_redo_entry))
3207       return NULL;
3208   }
3209   /*
3210     So we are going to read the page, and if its LSN is older than the
3211     record's we will modify the page
3212   */
3213   tprint(tracef, ", applying record\n");
3214   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
3215   return info;
3216 }
3217 
3218 
get_MARIA_HA_from_UNDO_record(const TRANSLOG_HEADER_BUFFER * rec)3219 static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
3220                                                TRANSLOG_HEADER_BUFFER *rec)
3221 {
3222   uint16 sid;
3223   MARIA_HA *info;
3224   MARIA_SHARE *share;
3225 
3226   sid= fileid_korr(rec->header + LSN_STORE_SIZE);
3227   tprint(tracef, "   For table of short id %u", sid);
3228   info= all_tables[sid].info;
3229 #ifndef DBUG_OFF
3230   DBUG_ASSERT(!in_redo_phase ||
3231               current_group_table == NULL || current_group_table == info);
3232   current_group_table= info;
3233 #endif
3234   if (info == NULL)
3235   {
3236     tprint(tracef, ", table skipped, so skipping record\n");
3237     return NULL;
3238   }
3239   share= info->s;
3240   tprint(tracef, ", '%s'", share->open_file_name.str);
3241 
3242   if (!table_is_part_of_recovery_set(&share->open_file_name))
3243   {
3244     tprint(tracef, ", skipped by user\n");
3245     return NULL;
3246   }
3247 
3248   if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
3249   {
3250     tprint(tracef, ", table's LOGREC_FILE_ID has LSN " LSN_FMT " more recent"
3251            " than record, skipping record",
3252            LSN_IN_PARTS(share->lsn_of_file_id));
3253     return NULL;
3254   }
3255   if (in_redo_phase &&
3256       cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
3257   {
3258     /* probably a bulk insert repair */
3259     tprint(tracef, ", has skip_redo_lsn " LSN_FMT " more recent than"
3260            " record, skipping record\n",
3261            LSN_IN_PARTS(share->state.skip_redo_lsn));
3262     return NULL;
3263   }
3264   DBUG_ASSERT(share->last_version != 0);
3265   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
3266   if (in_redo_phase)
3267     tprint(tracef, ", remembering undo\n");
3268   else
3269     tprint(tracef, ", applying record\n");
3270   return info;
3271 }
3272 
3273 
3274 /**
3275    @brief Parses checkpoint record.
3276 
3277    Builds from it the dirty_pages list (a hash), opens tables and maps them to
3278    their 2-byte IDs, recreates transactions (not real TRNs though).
3279 
3280    @return LSN from where in the log the REDO phase should start
3281      @retval LSN_ERROR error
3282      @retval other     ok
3283 */
3284 
parse_checkpoint_record(LSN lsn)3285 static LSN parse_checkpoint_record(LSN lsn)
3286 {
3287   ulong i;
3288   ulonglong nb_dirty_pages;
3289   TRANSLOG_HEADER_BUFFER rec;
3290   TRANSLOG_ADDRESS start_address;
3291   int len;
3292   uint nb_active_transactions, nb_committed_transactions, nb_tables;
3293   uchar *ptr;
3294   LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages;
3295   struct st_dirty_page *next_dirty_page_in_pool;
3296 
3297   tprint(tracef, "Loading data from checkpoint record at LSN " LSN_FMT "\n",
3298          LSN_IN_PARTS(lsn));
3299   if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR ||
3300       rec.type != LOGREC_CHECKPOINT)
3301   {
3302     eprint(tracef, "Cannot find checkpoint record at LSN " LSN_FMT,
3303            LSN_IN_PARTS(lsn));
3304     return LSN_ERROR;
3305   }
3306 
3307   enlarge_buffer(&rec);
3308   if (log_record_buffer.str == NULL ||
3309       translog_read_record(rec.lsn, 0, rec.record_length,
3310                            log_record_buffer.str, NULL) !=
3311       rec.record_length)
3312   {
3313     eprint(tracef, "Failed to read record");
3314     return LSN_ERROR;
3315   }
3316 
3317   ptr= log_record_buffer.str;
3318   start_address= lsn_korr(ptr);
3319   ptr+= LSN_STORE_SIZE;
3320   tprint(tracef, "Checkpoint record has start_horizon at " LSN_FMT "\n",
3321          LSN_IN_PARTS(start_address));
3322 
3323   /* transactions */
3324   nb_active_transactions= uint2korr(ptr);
3325   ptr+= 2;
3326   tprint(tracef, "%u active transactions\n", nb_active_transactions);
3327   minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
3328   ptr+= LSN_STORE_SIZE;
3329   max_long_trid= transid_korr(ptr);
3330   ptr+= TRANSID_SIZE;
3331 
3332   /*
3333     how much brain juice and discussions there was to come to writing this
3334     line. It may make start_address slightly decrease (only by the time it
3335     takes to write one or a few rows, roughly).
3336   */
3337   tprint(tracef, "Checkpoint record has min_rec_lsn of active transactions"
3338          " at " LSN_FMT "\n",
3339          LSN_IN_PARTS(minimum_rec_lsn_of_active_transactions));
3340   set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions);
3341 
3342   for (i= 0; i < nb_active_transactions; i++)
3343   {
3344     uint16 sid= uint2korr(ptr);
3345     TrID long_id;
3346     LSN undo_lsn, first_undo_lsn;
3347     ptr+= 2;
3348     long_id= uint6korr(ptr);
3349     ptr+= 6;
3350     DBUG_ASSERT(sid > 0 && long_id > 0);
3351     undo_lsn= lsn_korr(ptr);
3352     ptr+= LSN_STORE_SIZE;
3353     first_undo_lsn= lsn_korr(ptr);
3354     ptr+= LSN_STORE_SIZE;
3355     new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
3356   }
3357   nb_committed_transactions= uint4korr(ptr);
3358   ptr+= 4;
3359   tprint(tracef, "%lu committed transactions\n",
3360          (ulong)nb_committed_transactions);
3361   /* no purging => committed transactions are not important */
3362   ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
3363 
3364   /* tables  */
3365   nb_tables= uint4korr(ptr);
3366   ptr+= 4;
3367   tprint(tracef, "%u open tables\n", nb_tables);
3368   for (i= 0; i< nb_tables; i++)
3369   {
3370     char name[FN_REFLEN];
3371     LSN first_log_write_lsn;
3372     size_t name_len;
3373     uint16 sid= uint2korr(ptr);
3374     ptr+= 2;
3375     DBUG_ASSERT(sid > 0);
3376     first_log_write_lsn= lsn_korr(ptr);
3377     ptr+= LSN_STORE_SIZE;
3378     name_len= strlen((char *)ptr) + 1;
3379     strmake_buf(name, (char *)ptr);
3380     ptr+= name_len;
3381     if (new_table(sid, name, first_log_write_lsn))
3382       return LSN_ERROR;
3383   }
3384 
3385   /* dirty pages */
3386   nb_dirty_pages= uint8korr(ptr);
3387 
3388   /* Ensure casts later will not lose significant bits. */
3389   DBUG_ASSERT((nb_dirty_pages <= SIZE_T_MAX/sizeof(struct st_dirty_page)) &&
3390               (nb_dirty_pages <= ULONG_MAX));
3391 
3392   ptr+= 8;
3393   tprint(tracef, "%lu dirty pages\n", (ulong) nb_dirty_pages);
3394   if (my_hash_init(PSI_INSTRUMENT_ME, &all_dirty_pages, &my_charset_bin,
3395                    (ulong)nb_dirty_pages, offsetof(struct st_dirty_page, file_and_page_id),
3396                    sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
3397                    NULL, NULL, 0))
3398     return LSN_ERROR;
3399   dirty_pages_pool=
3400     (struct st_dirty_page *)my_malloc(PSI_INSTRUMENT_ME, (size_t)nb_dirty_pages *
3401                                       sizeof(struct st_dirty_page),
3402                                       MYF(MY_WME));
3403   if (unlikely(dirty_pages_pool == NULL))
3404     return LSN_ERROR;
3405   next_dirty_page_in_pool= dirty_pages_pool;
3406   minimum_rec_lsn_of_dirty_pages= LSN_MAX;
3407   if (maria_recovery_verbose)
3408     tprint(tracef, "Table_id  Is_index       Page_id    Rec_lsn\n");
3409   for (i= 0; i < nb_dirty_pages ; i++)
3410   {
3411     pgcache_page_no_t page_id;
3412     LSN rec_lsn;
3413     uint32 is_index;
3414     uint16 table_id= uint2korr(ptr);
3415     ptr+= 2;
3416     is_index= ptr[0];
3417     ptr++;
3418     page_id= page_korr(ptr);
3419     ptr+= PAGE_STORE_SIZE;
3420     rec_lsn= lsn_korr(ptr);
3421     ptr+= LSN_STORE_SIZE;
3422     if (new_page((is_index << 16) | table_id,
3423                  page_id, rec_lsn, next_dirty_page_in_pool++))
3424       return LSN_ERROR;
3425     if (maria_recovery_verbose)
3426       tprint(tracef, "%8u  %8u  %12lu    " LSN_FMT "\n", (uint) table_id,
3427              (uint) is_index, (ulong) page_id, LSN_IN_PARTS(rec_lsn));
3428     set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
3429   }
3430   /* after that, there will be no insert/delete into the hash */
3431   /*
3432     sanity check on record (did we screw up with all those "ptr+=", did the
3433     checkpoint write code and checkpoint read code go out of sync?).
3434   */
3435   if (ptr != (log_record_buffer.str + log_record_buffer.length))
3436   {
3437     eprint(tracef, "checkpoint record corrupted\n");
3438     return LSN_ERROR;
3439   }
3440 
3441   /*
3442     start_address is now from where the dirty pages list can be ignored.
3443     Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for
3444     translog_read_record() functions.
3445   */
3446   start_address= checkpoint_start=
3447     translog_next_LSN(start_address, LSN_IMPOSSIBLE);
3448   tprint(tracef, "Checkpoint record start_horizon now adjusted to"
3449          " LSN " LSN_FMT "\n", LSN_IN_PARTS(start_address));
3450   if (checkpoint_start == LSN_IMPOSSIBLE)
3451   {
3452     /*
3453       There must be a problem, as our checkpoint record exists and is >= the
3454       address which is stored in its first bytes, which is >= start_address.
3455     */
3456     return LSN_ERROR;
3457   }
3458   /* now, where the REDO phase should start reading log: */
3459   tprint(tracef, "Checkpoint has min_rec_lsn of dirty pages at"
3460          " LSN " LSN_FMT "\n", LSN_IN_PARTS(minimum_rec_lsn_of_dirty_pages));
3461   set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);
3462   DBUG_PRINT("info",
3463              ("checkpoint_start: " LSN_FMT " start_address: " LSN_FMT,
3464               LSN_IN_PARTS(checkpoint_start), LSN_IN_PARTS(start_address)));
3465   return start_address;
3466 }
3467 
3468 
new_page(uint32 fileid,pgcache_page_no_t pageid,LSN rec_lsn,struct st_dirty_page * dirty_page)3469 static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
3470                     struct st_dirty_page *dirty_page)
3471 {
3472   /* serves as hash key */
3473   dirty_page->file_and_page_id= (((uint64)fileid) << 40) | pageid;
3474   dirty_page->rec_lsn= rec_lsn;
3475   return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
3476 }
3477 
3478 
close_all_tables(void)3479 static int close_all_tables(void)
3480 {
3481   int error= 0;
3482   uint count= 0;
3483   LIST *list_element, *next_open;
3484   MARIA_HA *info;
3485   TRANSLOG_ADDRESS addr;
3486   DBUG_ENTER("close_all_tables");
3487 
3488   mysql_mutex_lock(&THR_LOCK_maria);
3489   if (maria_open_list == NULL)
3490     goto end;
3491   tprint(tracef, "Closing all tables\n");
3492   if (tracef != stdout)
3493   {
3494     if (recovery_message_printed == REC_MSG_NONE)
3495       print_preamble();
3496     for (count= 0, list_element= maria_open_list ;
3497          list_element ; count++, (list_element= list_element->next))
3498       ;
3499     fprintf(stderr, "tables to flush:");
3500     recovery_message_printed= REC_MSG_FLUSH;
3501   }
3502   /*
3503     Since the end of end_of_redo_phase(), we may have written new records
3504     (if UNDO phase ran)  and thus the state is newer than at
3505     end_of_redo_phase(), we need to bump is_of_horizon again.
3506   */
3507   addr= translog_get_horizon();
3508   for (list_element= maria_open_list ; ; list_element= next_open)
3509   {
3510     if (recovery_message_printed == REC_MSG_FLUSH)
3511     {
3512       fprintf(stderr, " %u", count--);
3513       fflush(stderr);
3514     }
3515     if (list_element == NULL)
3516       break;
3517     next_open= list_element->next;
3518     info= (MARIA_HA*)list_element->data;
3519     mysql_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
3520     /*
3521       Tables which we see here are exactly those which were open at time of
3522       crash. They might have open_count>0 as Checkpoint maybe flushed their
3523       state while they were used. As Recovery corrected them, don't alarm the
3524       user, don't ask for a table check:
3525     */
3526     if (info->s->state.open_count != 0)
3527     {
3528       /* let maria_close() mark the table properly closed */
3529       info->s->state.open_count= 1;
3530       info->s->global_changed= 1;
3531       info->s->changed= 1;
3532     }
3533     prepare_table_for_close(info, addr);
3534     error|= maria_close(info);
3535     mysql_mutex_lock(&THR_LOCK_maria);
3536 
3537     /* Force a crash to test recovery of recovery */
3538     if (maria_recovery_force_crash_counter)
3539     {
3540       DBUG_ASSERT(--maria_recovery_force_crash_counter > 0);
3541     }
3542   }
3543 end:
3544   if (recovery_message_printed == REC_MSG_FLUSH)
3545   {
3546     fputc('\n', stderr);
3547     fflush(stderr);
3548   }
3549   mysql_mutex_unlock(&THR_LOCK_maria);
3550   DBUG_RETURN(error);
3551 }
3552 
3553 
3554 /**
3555    @brief Close all table instances with a certain name which are present in
3556    all_tables.
3557 
3558    @param  name                Name of table
3559    @param  addr                Log address passed to prepare_table_for_close()
3560 */
3561 
close_one_table(const char * name,TRANSLOG_ADDRESS addr)3562 static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr)
3563 {
3564   my_bool res= 0;
3565   /* There are no other threads using the tables, so we don't need any locks */
3566   struct st_table_for_recovery *internal_table, *end;
3567   for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
3568        internal_table < end ;
3569        internal_table++)
3570   {
3571     MARIA_HA *info= internal_table->info;
3572     if ((info != NULL) && !strcmp(info->s->open_file_name.str, name))
3573     {
3574       prepare_table_for_close(info, addr);
3575       if (maria_close(info))
3576         res= 1;
3577       internal_table->info= NULL;
3578     }
3579   }
3580   return res;
3581 }
3582 
3583 
3584 /**
3585    Temporarily disables logging for this table.
3586 
3587    If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log
3588    to warn log readers.
3589 
3590    @param  info            table
3591    @param  log_incomplete  if that disabling makes the log incomplete
3592 
3593    @note for example in the REDO phase we disable logging but that does not
3594    make the log incomplete.
3595 */
3596 
_ma_tmp_disable_logging_for_table(MARIA_HA * info,my_bool log_incomplete)3597 void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
3598                                        my_bool log_incomplete)
3599 {
3600   MARIA_SHARE *share= info->s;
3601   DBUG_ENTER("_ma_tmp_disable_logging_for_table");
3602 
3603   /*
3604     We have to ensure that bitmap is flushed, as it's checking
3605     that share->now_transactional is set
3606   */
3607   if (share->now_transactional && share->data_file_type == BLOCK_RECORD)
3608     _ma_bitmap_flush_all(share);
3609 
3610   if (log_incomplete)
3611   {
3612     uchar log_data[FILEID_STORE_SIZE];
3613     LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
3614     LSN lsn;
3615     log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
3616     log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
3617     translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG,
3618                           &dummy_transaction_object, info,
3619                           (translog_size_t) sizeof(log_data),
3620                           TRANSLOG_INTERNAL_PARTS + 1, log_array,
3621                           log_data, NULL);
3622   }
3623 
3624   /* if we disabled before writing the record, record wouldn't reach log */
3625   share->now_transactional= FALSE;
3626 
3627   /*
3628     Reset state pointers. This is needed as in ALTER table we may do
3629     commit followed by _ma_renable_logging_for_table and then
3630     info->state may point to a state that was deleted by
3631     _ma_trnman_end_trans_hook()
3632    */
3633   share->state.no_logging= *info->state;
3634   info->state= &share->state.no_logging;
3635   info->switched_transactional= TRUE;
3636 
3637   /*
3638     Some code in ma_blockrec.c assumes a trn even if !now_transactional but in
3639     this case it only reads trn->rec_lsn, which has to be LSN_IMPOSSIBLE and
3640     should be now. info->trn may be NULL in maria_chk.
3641   */
3642   if (info->trn == NULL)
3643   {
3644     info->trn= &dummy_transaction_object;
3645     info->trn_next= 0;
3646     info->trn_prev= 0;
3647   }
3648 
3649   DBUG_ASSERT(info->trn->rec_lsn == LSN_IMPOSSIBLE);
3650   share->page_type= PAGECACHE_PLAIN_PAGE;
3651   /* Functions below will pick up now_transactional and change callbacks */
3652   _ma_set_data_pagecache_callbacks(&info->dfile, share);
3653   _ma_set_index_pagecache_callbacks(&share->kfile, share);
3654   _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
3655   DBUG_VOID_RETURN;
3656 }
3657 
3658 
3659 /**
3660    Re-enables logging for a table which had it temporarily disabled.
3661 
3662    Only the thread which disabled logging is allowed to reenable it. Indeed,
3663    re-enabling logging affects all open instances, one must have exclusive
3664    access to the table to do that. In practice, the one which disables has
3665    such access.
3666 
3667    @param  info            table
3668    @param  flush_pages     if function needs to flush pages first
3669 */
3670 
_ma_reenable_logging_for_table(MARIA_HA * info,my_bool flush_pages)3671 my_bool _ma_reenable_logging_for_table(MARIA_HA *info, my_bool flush_pages)
3672 {
3673   MARIA_SHARE *share= info->s;
3674   DBUG_ENTER("_ma_reenable_logging_for_table");
3675 
3676   if (share->now_transactional == share->base.born_transactional ||
3677       !info->switched_transactional)
3678   {
3679     info->switched_transactional= FALSE;
3680     DBUG_RETURN(0);
3681   }
3682   info->switched_transactional= FALSE;
3683 
3684   if ((share->now_transactional= share->base.born_transactional))
3685   {
3686     share->page_type= PAGECACHE_LSN_PAGE;
3687 
3688     /*
3689       Copy state information that where updated while the table was used
3690       in not transactional mode
3691     */
3692     _ma_copy_nontrans_state_information(info);
3693     _ma_reset_history(info->s);
3694 
3695     /* Reset state to point to state.common, as on open() */
3696     info->state=  &share->state.common;
3697     *info->state=  share->state.state;
3698 
3699     if (flush_pages)
3700     {
3701       /* Ensure that recover is not executing any redo before this */
3702       if (!maria_in_recovery)
3703       {
3704         if (share->id != 0)
3705         {
3706           mysql_mutex_lock(&share->intern_lock);
3707           translog_deassign_id_from_share(share);
3708           mysql_mutex_unlock(&share->intern_lock);
3709         }
3710         share->state.is_of_horizon= share->state.create_rename_lsn=
3711           share->state.skip_redo_lsn= translog_get_horizon();
3712       }
3713       /*
3714         We are going to change callbacks; if a page is flushed at this moment
3715         this can cause race conditions, that's one reason to flush pages
3716         now. Other reasons: a checkpoint could be running and miss pages; the
3717         pages have type PAGECACHE_PLAIN_PAGE which should not remain. As
3718         there are no REDOs for pages, them, bitmaps and the state also have to
3719         be flushed and synced.
3720       */
3721       if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
3722                                 FLUSH_RELEASE, FLUSH_RELEASE) ||
3723           _ma_state_info_write(share,
3724                                MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
3725                                MA_STATE_INFO_WRITE_LOCK) ||
3726           _ma_sync_table_files(info))
3727         DBUG_RETURN(1);
3728     }
3729     else if (!maria_in_recovery)
3730     {
3731       /*
3732         Except in Recovery, we mustn't leave dirty pages (see comments above).
3733         Note that this does not verify that the state was flushed, but hey.
3734       */
3735       pagecache_file_no_dirty_page(share->pagecache, &info->dfile);
3736       pagecache_file_no_dirty_page(share->pagecache, &share->kfile);
3737     }
3738     _ma_set_data_pagecache_callbacks(&info->dfile, share);
3739     _ma_set_index_pagecache_callbacks(&share->kfile, share);
3740     _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
3741     /*
3742       info->trn was not changed in the disable/enable combo, so that it's
3743       still usable in this kind of combination:
3744       external_lock;
3745       start_bulk_insert; # table is empty, disables logging
3746       end_bulk_insert;   # enables logging
3747       start_bulk_insert; # table is not empty, logging stays
3748                          # so rows insertion needs the real trn.
3749       as happens during row-based replication on the slave.
3750     */
3751   }
3752   DBUG_RETURN(0);
3753 }
3754 
3755 
print_redo_phase_progress(TRANSLOG_ADDRESS addr)3756 static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
3757 {
3758   static uint end_logno= FILENO_IMPOSSIBLE, percentage_printed= 0;
3759   static ulong end_offset;
3760   static ulonglong initial_remainder= ~(ulonglong) 0;
3761 
3762   uint cur_logno;
3763   ulong cur_offset;
3764   ulonglong local_remainder;
3765   uint percentage_done;
3766 
3767   if (tracef == stdout)
3768     return;
3769   if (recovery_message_printed == REC_MSG_NONE)
3770   {
3771     print_preamble();
3772     fprintf(stderr, "recovered pages: 0%%");
3773     fflush(stderr);
3774     procent_printed= 1;
3775     recovery_message_printed= REC_MSG_REDO;
3776   }
3777   if (end_logno == FILENO_IMPOSSIBLE)
3778   {
3779     LSN end_addr= translog_get_horizon();
3780     end_logno= LSN_FILE_NO(end_addr);
3781     end_offset= LSN_OFFSET(end_addr);
3782   }
3783   cur_logno= LSN_FILE_NO(addr);
3784   cur_offset= LSN_OFFSET(addr);
3785   local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) :
3786     (((longlong)log_file_size) - cur_offset +
3787      MY_MAX(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) +
3788      end_offset);
3789   if (initial_remainder == (ulonglong)(-1))
3790     initial_remainder= local_remainder;
3791   percentage_done= (uint) ((initial_remainder - local_remainder) * 100ULL /
3792                            initial_remainder);
3793   if ((percentage_done - percentage_printed) >= 10)
3794   {
3795     percentage_printed= percentage_done;
3796     fprintf(stderr, " %u%%", percentage_done);
3797     fflush(stderr);
3798     procent_printed= 1;
3799   }
3800 }
3801 
3802 
3803 #ifdef MARIA_EXTERNAL_LOCKING
3804 #error Marias Checkpoint and Recovery are really not ready for it
3805 #endif
3806 
3807 /*
3808 Recovery of the state :  how it works
3809 =====================================
3810 
3811 Here we ignore Checkpoints for a start.
3812 
3813 The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in
3814 memory frequently (at least at every row write/update/delete) but goes
3815 to disk at few moments: maria_close() when closing the last open
3816 instance, and a few rare places like CHECK/REPAIR/ALTER
3817 (non-transactional tables also do it at maria_lock_database() but we
3818 needn't cover them here).
3819 
3820 In case of crash, state on disk is likely to be older than what it was
3821 in memory, the REDO phase needs to recreate the state as it was in
3822 memory at the time of crash. When we say Recovery here we will always
3823 mean "REDO phase".
3824 
3825 For example MARIA_STATUS_INFO::records (count of records). It is updated at
3826 the end of every row write/update/delete/delete_all. When Recovery sees the
3827 sign of such row operation (UNDO or REDO), it may need to update the records'
3828 count if that count does not reflect that operation (is older). How to know
3829 the age of the state compared to the log record: every time the state
3830 goes to disk at runtime, its member "is_of_horizon" is updated to the
3831 current end-of-log horizon. So Recovery just needs to compare is_of_horizon
3832 and the record's LSN to know if it should modify "records".
3833 
3834 Other operations like ALTER TABLE DISABLE KEYS update the state but
3835 don't write log records, thus the REDO phase cannot repeat their
3836 effect on the state in case of crash. But we make them sync the state
3837 as soon as they have finished. This reduces the window for a problem.
3838 
3839 It looks like only one thread at a time updates the state in memory or
3840 on disk. We assume that the upper level (normally MySQL) has protection
3841 against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these
3842 are not issued while there are any running transactions on the given table.
3843 If this is not done, we may write a corrupted state to disk.
3844 
3845 With checkpoints
3846 ================
3847 
3848 Checkpoint module needs to read the state in memory and write it to
3849 disk. This may happen while some other thread is modifying the state
3850 in memory or on disk. Checkpoint thus may be reading changing data, it
3851 needs a mutex to not have it corrupted, and concurrent modifiers of
3852 the state need that mutex too for the same reason.
3853 "records" is modified for every row write/update/delete, we don't want
3854 to add a mutex lock/unlock there. So we re-use the mutex lock/unlock
3855 which is already present in these moments, namely the log's mutex which is
3856 taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in
3857 under-log-mutex hooks when writing these records (thus "records" is
3858 not updated at the end of maria_write/update/delete() anymore).
3859 Thus Checkpoint takes the log's lock and can read "records" from
3860 memory an write it to disk and release log's lock.
3861 We however want to avoid having the disk write under the log's
3862 lock. So it has to be under another mutex, natural choice is
3863 intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile,
3864 and as maria_close() takes it too). All state writes to disk are
3865 changed to be protected with intern_lock.
3866 So Checkpoint takes intern_lock, log's lock, reads "records" from
3867 memory, releases log's lock, updates is_of_horizon and writes "records" to
3868 disk, release intern_lock.
3869 In practice, not only "records" needs to be written but the full
3870 state. So, Checkpoint reads the full state from memory. Some other
3871 thread may at this moment be modifying in memory some pieces of the
3872 state which are not protected by the lock's log (see ma_extra.c
3873 HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state
3874 from memory; to guard against that we extend the intern_lock-zone to
3875 changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and
3876 also any change made in memory to create_rename_lsn/state_is_of_horizon.
3877 Last, we don't want in Checkpoint to do
3878  log lock; read state from memory; release log lock;
3879 for each table, it may hold the log's lock too much in total.
3880 So, we instead do
3881  log lock; read N states from memory; release log lock;
3882 Thus, the sequence above happens outside of any intern_lock.
3883 But this re-introduces the problem that some other thread may be changing the
3884 state in memory and on disk under intern_lock, without log's lock, like
3885 HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later
3886 comes to handling the table under intern_lock, which is serialized with
3887 HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state
3888 was read from memory under log's lock, and thus can decide to not flush the
3889 obsolete state it has, knowing that the other thread flushed a more recent
3890 state already. If on the other hand is_of_horizon is not higher, the read
3891 state is current and can be flushed. So we have a per-table sequence:
3892  lock intern_lock; test if is_of_horizon is higher than when we read the state
3893  under log's lock; if no then flush the read state to disk.
3894 */
3895 
3896 /* some comments and pseudo-code which we keep for later */
3897 #if 0
3898   /*
3899     MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
3900     after a certain amount of log records have been executed. This helps
3901     against repeated crashes. Those checkpoints could not be user-requested
3902     (as engine is not communicating during the REDO phase), so they would be
3903     automatic: this changes the original assumption that we don't write to the
3904     log while in the REDO phase, but why not. How often should we checkpoint?
3905   */
3906 
3907   /*
3908     We want to have two steps:
3909     engine->recover_with_max_memory();
3910     next_engine->recover_with_max_memory();
3911     engine->init_with_normal_memory();
3912     next_engine->init_with_normal_memory();
3913     So: in recover_with_max_memory() allocate a giant page cache, do REDO
3914     phase, then all page cache is flushed and emptied and freed (only retain
3915     small structures like TM): take full checkpoint, which is useful if
3916     next engine crashes in its recovery the next second.
3917     Destroy all shares (maria_close()), then at init_with_normal_memory() we
3918     do this:
3919   */
3920 
3921   /**** UNDO PHASE *****/
3922 
3923   /*
3924     Launch one or more threads to do the background rollback. Don't wait for
3925     them to complete their rollback (background rollback; for debugging, we
3926     can have an option which waits). Set a counter (total_of_rollback_threads)
3927     to the number of threads to lauch.
3928 
3929     Note that InnoDB's rollback-in-background works as long as InnoDB is the
3930     last engine to recover, otherwise MySQL will refuse new connections until
3931     the last engine has recovered so it's not "background" from the user's
3932     point of view. InnoDB is near top of sys_table_types so all others
3933     (e.g. BDB) recover after it... So it's really "online rollback" only if
3934     InnoDB is the only engine.
3935   */
3936 
3937   /* wake up delete/update handler */
3938   /* tell the TM that it can now accept new transactions */
3939 
3940   /*
3941     mark that checkpoint requests are now allowed.
3942   */
3943 #endif
3944