1 /* Copyright (C) 2006, 2007 MySQL AB
2    Copyright (C) 2010, 2013, Monty Program Ab.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 /*
18   WL#3072 Maria recovery
19   First version written by Guilhem Bichot on 2006-04-27.
20 */
21 
22 /* Here is the implementation of this module */
23 
24 #include "maria_def.h"
25 #include "ma_recovery.h"
26 #include "ma_blockrec.h"
27 #include "ma_checkpoint.h"
28 #include "trnman.h"
29 #include "ma_key_recover.h"
30 #include "ma_recovery_util.h"
31 #include "hash.h"
32 #include <my_check_opt.h>
33 
34 struct st_trn_for_recovery /* used only in the REDO phase */
35 {
36   LSN group_start_lsn, undo_lsn, first_undo_lsn;
37   TrID long_trid;
38 };
39 struct st_table_for_recovery /* used in the REDO and UNDO phase */
40 {
41   MARIA_HA *info;
42 };
43 /* Variables used by all functions of this module. Ok as single-threaded */
44 static struct st_trn_for_recovery *all_active_trans;
45 static struct st_table_for_recovery *all_tables;
46 static struct st_dirty_page *dirty_pages_pool;
47 static LSN current_group_end_lsn;
48 #ifndef DBUG_OFF
49 /** Current group of REDOs is about this table and only this one */
50 static MARIA_HA *current_group_table;
51 #endif
52 static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
53 /** @brief to avoid writing a checkpoint if recovery did nothing. */
54 static my_bool checkpoint_useful;
55 static my_bool in_redo_phase;
56 static my_bool trns_created;
57 static ulong skipped_undo_phase;
58 static ulonglong now; /**< for tracking execution time of phases */
59 static void (*save_error_handler_hook)(uint, const char *,myf);
60 static ulong recovery_warnings; /**< count of warnings */
61 HASH tables_to_redo;                          /* For maria_read_log */
62 ulong maria_recovery_force_crash_counter;
63 TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
64 
65 #define prototype_redo_exec_hook(R)                                          \
66   static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
67 
68 #define prototype_redo_exec_hook_dummy(R)                                    \
69   static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec        \
70                                __attribute__ ((unused)))
71 
72 #define prototype_undo_exec_hook(R)                                          \
73   static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
74 
75 prototype_redo_exec_hook(LONG_TRANSACTION_ID);
76 prototype_redo_exec_hook_dummy(CHECKPOINT);
77 prototype_redo_exec_hook(REDO_CREATE_TABLE);
78 prototype_redo_exec_hook(REDO_RENAME_TABLE);
79 prototype_redo_exec_hook(REDO_REPAIR_TABLE);
80 prototype_redo_exec_hook(REDO_DROP_TABLE);
81 prototype_redo_exec_hook(FILE_ID);
82 prototype_redo_exec_hook(INCOMPLETE_LOG);
83 prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
84 prototype_redo_exec_hook(UNDO_BULK_INSERT);
85 prototype_redo_exec_hook(IMPORTED_TABLE);
86 prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
87 prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
88 prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
89 prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
90 prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
91 prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
92 prototype_redo_exec_hook(REDO_FREE_BLOCKS);
93 prototype_redo_exec_hook(REDO_DELETE_ALL);
94 prototype_redo_exec_hook(REDO_INDEX);
95 prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE);
96 prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE);
97 prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
98 prototype_redo_exec_hook(UNDO_ROW_INSERT);
99 prototype_redo_exec_hook(UNDO_ROW_DELETE);
100 prototype_redo_exec_hook(UNDO_ROW_UPDATE);
101 prototype_redo_exec_hook(UNDO_KEY_INSERT);
102 prototype_redo_exec_hook(UNDO_KEY_DELETE);
103 prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
104 prototype_redo_exec_hook(COMMIT);
105 prototype_redo_exec_hook(CLR_END);
106 prototype_redo_exec_hook(DEBUG_INFO);
107 prototype_undo_exec_hook(UNDO_ROW_INSERT);
108 prototype_undo_exec_hook(UNDO_ROW_DELETE);
109 prototype_undo_exec_hook(UNDO_ROW_UPDATE);
110 prototype_undo_exec_hook(UNDO_KEY_INSERT);
111 prototype_undo_exec_hook(UNDO_KEY_DELETE);
112 prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
113 prototype_undo_exec_hook(UNDO_BULK_INSERT);
114 
115 static int run_redo_phase(LSN lsn, LSN end_lsn,
116                           enum maria_apply_log_way apply);
117 static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
118 static int run_undo_phase(uint uncommitted);
119 static void display_record_position(const LOG_DESC *log_desc,
120                                     const TRANSLOG_HEADER_BUFFER *rec,
121                                     uint number);
122 static int display_and_apply_record(const LOG_DESC *log_desc,
123                                     const TRANSLOG_HEADER_BUFFER *rec);
124 static MARIA_HA *get_MARIA_HA_from_REDO_record(const
125                                                TRANSLOG_HEADER_BUFFER *rec);
126 static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
127                                                TRANSLOG_HEADER_BUFFER *rec);
128 static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon);
129 static LSN parse_checkpoint_record(LSN lsn);
130 static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
131                             LSN first_undo_lsn);
132 static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id);
133 static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
134                     struct st_dirty_page *dirty_page);
135 static int close_all_tables(void);
136 static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
137 static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
138 static void delete_all_transactions();
139 
140 /** @brief global [out] buffer for translog_read_record(); never shrinks */
141 static struct
142 {
143   /*
144     uchar* is more adapted (less casts) than char*, thus we don't use
145     LEX_STRING.
146   */
147   uchar *str;
148   size_t length;
149 } log_record_buffer;
enlarge_buffer(const TRANSLOG_HEADER_BUFFER * rec)150 static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec)
151 {
152   if (log_record_buffer.length < rec->record_length)
153   {
154     log_record_buffer.length= rec->record_length;
155     log_record_buffer.str= my_realloc(log_record_buffer.str,
156                                       rec->record_length,
157                                       MYF(MY_WME | MY_ALLOW_ZERO_PTR));
158   }
159 }
160 /** @brief Tells what kind of progress message was printed to the error log */
161 static enum recovery_message_type
162 {
163   REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH
164 } recovery_message_printed;
165 
166 
167 /* Hook to ensure we get nicer output if we get an error */
168 
maria_recover_error_handler_hook(uint error,const char * str,myf flags)169 void maria_recover_error_handler_hook(uint error, const char *str,
170                                      myf flags)
171 {
172   if (procent_printed)
173   {
174     procent_printed= 0;
175     fputc('\n', stderr);
176     fflush(stderr);
177   }
178   (*save_error_handler_hook)(error, str, flags);
179 }
180 
181 /* Define this if you want gdb to break in some interesting situations */
182 #define ALERT_USER()
183 
print_preamble()184 static void print_preamble()
185 {
186   ma_message_no_user(ME_NOTE, "starting recovery");
187 }
188 
189 
table_is_part_of_recovery_set(LEX_STRING * file_name)190 static my_bool table_is_part_of_recovery_set(LEX_STRING *file_name)
191 {
192   uint offset =0;
193   if (!tables_to_redo.records)
194     return 1;                                   /* Default, recover table */
195 
196   /* Skip base directory */
197   if (file_name->str[0] == '.' &&
198       (file_name->str[1] == '/' || file_name->str[1] == '\\'))
199     offset= 2;
200   /* Only recover if table is in hash */
201   return my_hash_search(&tables_to_redo, (uchar*) file_name->str + offset,
202                         file_name->length - offset) != 0;
203 }
204 
205 /**
206    @brief Recovers from the last checkpoint.
207 
208    Runs the REDO phase using special structures, then sets up the playground
209    of runtime: recreates transactions inside trnman, open tables with their
210    two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
211    tables.
212 
213    @return Operation status
214      @retval 0      OK
215      @retval !=0    Error
216 */
217 
maria_recovery_from_log(void)218 int maria_recovery_from_log(void)
219 {
220   int res= 1;
221   FILE *trace_file;
222   uint warnings_count;
223 #ifdef EXTRA_DEBUG
224   char name_buff[FN_REFLEN];
225 #endif
226   DBUG_ENTER("maria_recovery_from_log");
227 
228   DBUG_ASSERT(!maria_in_recovery);
229   maria_in_recovery= TRUE;
230 
231 #ifdef EXTRA_DEBUG
232   fn_format(name_buff, "aria_recovery.trace", maria_data_root, "", MYF(0));
233   trace_file= my_fopen(name_buff, O_WRONLY|O_APPEND|O_CREAT, MYF(MY_WME));
234 #else
235   trace_file= NULL; /* no trace file for being fast */
236 #endif
237   tprint(trace_file, "TRACE of the last Aria recovery from mysqld\n");
238   DBUG_ASSERT(maria_pagecache->inited);
239   res= maria_apply_log(LSN_IMPOSSIBLE, LSN_IMPOSSIBLE, MARIA_LOG_APPLY,
240                        trace_file, TRUE, TRUE, TRUE, &warnings_count);
241   if (!res)
242   {
243     if (warnings_count == 0 && recovery_found_crashed_tables == 0)
244       tprint(trace_file, "SUCCESS\n");
245     else
246       tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n",
247              warnings_count);
248   }
249   if (trace_file)
250     my_fclose(trace_file, MYF(0));
251   maria_in_recovery= FALSE;
252   DBUG_RETURN(res);
253 }
254 
255 
256 /**
257    @brief Displays and/or applies the log
258 
259    @param  from_lsn        LSN from which log reading/applying should start;
260                            LSN_IMPOSSIBLE means "use last checkpoint"
261    @param  end_lsn         Apply until this. LSN_IMPOSSIBLE means until end.
262    @param  apply           how log records should be applied or not
263    @param  trace_file      trace file where progress/debug messages will go
264    @param  skip_DDLs_arg   Should DDL records (CREATE/RENAME/DROP/REPAIR)
265                            be skipped by the REDO phase or not
266    @param  take_checkpoints Should we take checkpoints or not.
267    @param[out] warnings_count Count of warnings will be put there
268 
269    @todo This trace_file thing is primitive; soon we will make it similar to
270    ma_check_print_warning() etc, and a successful recovery does not need to
271    create a trace file. But for debugging now it is useful.
272 
273    @return Operation status
274      @retval 0      OK
275      @retval !=0    Error
276 */
277 
maria_apply_log(LSN from_lsn,LSN end_lsn,enum maria_apply_log_way apply,FILE * trace_file,my_bool should_run_undo_phase,my_bool skip_DDLs_arg,my_bool take_checkpoints,uint * warnings_count)278 int maria_apply_log(LSN from_lsn, LSN end_lsn,
279                     enum maria_apply_log_way apply,
280                     FILE *trace_file,
281                     my_bool should_run_undo_phase, my_bool skip_DDLs_arg,
282                     my_bool take_checkpoints, uint *warnings_count)
283 {
284   int error= 0;
285   uint uncommitted_trans;
286   ulonglong old_now;
287   my_bool abort_message_printed= 0;
288   DBUG_ENTER("maria_apply_log");
289 
290   DBUG_ASSERT(apply == MARIA_LOG_APPLY || !should_run_undo_phase);
291   DBUG_ASSERT(!maria_multi_threaded);
292   recovery_warnings= recovery_found_crashed_tables= 0;
293   skipped_lsn_err_count= 0;
294   maria_recovery_changed_data= 0;
295   /* checkpoints can happen only if TRNs have been built */
296   DBUG_ASSERT(should_run_undo_phase || !take_checkpoints);
297   DBUG_ASSERT(end_lsn == LSN_IMPOSSIBLE || should_run_undo_phase == 0);
298   all_active_trans= (struct st_trn_for_recovery *)
299     my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
300               MYF(MY_ZEROFILL));
301   all_tables= (struct st_table_for_recovery *)
302     my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
303               MYF(MY_ZEROFILL));
304 
305   save_error_handler_hook= error_handler_hook;
306   error_handler_hook= maria_recover_error_handler_hook;
307 
308   if (!all_active_trans || !all_tables)
309     goto err;
310 
311   if (take_checkpoints && ma_checkpoint_init(0))
312     goto err;
313 
314   recovery_message_printed= REC_MSG_NONE;
315   checkpoint_useful= trns_created= FALSE;
316   tracef= trace_file;
317 #ifdef INSTANT_FLUSH_OF_MESSAGES
318   /* enable this for instant flush of messages to trace file */
319   setbuf(tracef, NULL);
320 #endif
321   skip_DDLs= skip_DDLs_arg;
322   skipped_undo_phase= 0;
323 
324   trnman_init(max_trid_in_control_file);
325 
326   if (from_lsn == LSN_IMPOSSIBLE)
327   {
328     if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
329     {
330       from_lsn= translog_first_lsn_in_log();
331       if (unlikely(from_lsn == LSN_ERROR))
332       {
333         trnman_destroy();
334         goto err;
335       }
336     }
337     else
338     {
339       from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
340       if (from_lsn == LSN_ERROR)
341       {
342         trnman_destroy();
343         goto err;
344       }
345     }
346   }
347 
348   now= microsecond_interval_timer();
349   in_redo_phase= TRUE;
350   if (run_redo_phase(from_lsn, end_lsn, apply))
351   {
352     ma_message_no_user(0, "Redo phase failed");
353     trnman_destroy();
354     goto err;
355   }
356   trnman_destroy();
357 
358   if (end_lsn != LSN_IMPOSSIBLE)
359   {
360     abort_message_printed= 1;
361     if (!trace_file)
362       fputc('\n', stderr);
363     my_message(HA_ERR_INITIALIZATION,
364                "Maria recovery aborted as end_lsn/end of file was reached",
365                MYF(0));
366     goto err2;
367   }
368 
369   if ((uncommitted_trans=
370        end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
371   {
372     ma_message_no_user(0, "End of redo phase failed");
373     goto err;
374   }
375   in_redo_phase= FALSE;
376 
377   old_now= now;
378   now= microsecond_interval_timer();
379   if (recovery_message_printed == REC_MSG_REDO)
380   {
381     double phase_took= (now - old_now)/1000000.0;
382     /*
383       Detailed progress info goes to stderr, because ma_message_no_user()
384       cannot put several messages on one line.
385     */
386     procent_printed= 1;
387     fprintf(stderr, " (%.1f seconds); ", phase_took);
388     fflush(stderr);
389   }
390 
391   /**
392      REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be
393      wrong: if a future recovery used it, the REDO phase would always
394      start from the checkpoint and never from before, wrongly skipping REDOs
395      (tested). Another problem is that the REDO phase uses
396      PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE.
397 
398      @todo fix this. pagecache_write() now can have a rec_lsn argument. And we
399      could make a function which goes through pages at end of REDO phase and
400      changes their type.
401   */
402 #ifdef FIX_AND_ENABLE_LATER
403   if (take_checkpoints && checkpoint_useful)
404   {
405     /*
406       We take a checkpoint as it can save future recovery work if we crash
407       during the UNDO phase. But we don't flush pages, as UNDOs will change
408       them again probably.
409       If we wanted to take checkpoints in the middle of the REDO phase, at a
410       moment when we haven't reached the end of log so don't have exact data
411       about transactions, we could write a special checkpoint: containing only
412       the list of dirty pages, otherwise to be treated as if it was at the
413       same LSN as the last checkpoint.
414     */
415     if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE))
416       goto err;
417   }
418 #endif
419 
420   if (should_run_undo_phase)
421   {
422     if (run_undo_phase(uncommitted_trans))
423     {
424       ma_message_no_user(0, "Undo phase failed");
425       goto err;
426     }
427   }
428   else if (uncommitted_trans > 0)
429   {
430     eprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
431            " be left inconsistent!***", uncommitted_trans);
432     recovery_warnings++;
433   }
434 
435   if (skipped_undo_phase)
436   {
437     /*
438       We could want to print a list of tables for which UNDOs were skipped,
439       but not one line per skipped UNDO.
440     */
441     eprint(tracef, "***WARNING: %lu UNDO records skipped in UNDO phase; some"
442            " tables may be left inconsistent!***", skipped_undo_phase);
443     recovery_warnings++;
444   }
445 
446   old_now= now;
447   now= microsecond_interval_timer();
448   if (recovery_message_printed == REC_MSG_UNDO)
449   {
450     double phase_took= (now - old_now)/1000000.0;
451     procent_printed= 1;
452     fprintf(stderr, " (%.1f seconds); ", phase_took);
453     fflush(stderr);
454   }
455 
456   /*
457     we don't use maria_panic() because it would maria_end(), and Recovery does
458     not want that (we want to keep some modules initialized for runtime).
459   */
460   if (close_all_tables())
461   {
462     ma_message_no_user(0, "closing of tables failed");
463     goto err;
464   }
465 
466   old_now= now;
467   now= microsecond_interval_timer();
468   if (recovery_message_printed == REC_MSG_FLUSH)
469   {
470     double phase_took= (now - old_now)/1000000.0;
471     procent_printed= 1;
472     fprintf(stderr, " (%.1f seconds); ", phase_took);
473     fflush(stderr);
474   }
475 
476   if (max_long_trid > max_trid_in_control_file)
477   {
478     if (ma_control_file_write_and_force(last_checkpoint_lsn, last_logno,
479                                         max_long_trid, recovery_failures))
480       goto err;
481   }
482 
483   if (take_checkpoints && checkpoint_useful)
484   {
485     /* No dirty pages, all tables are closed, no active transactions, save: */
486     if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE))
487       goto err;
488   }
489 
490   goto end;
491 err:
492   tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n");
493 err2:
494   if (trns_created)
495     delete_all_transactions();
496   error= 1;
497   if (close_all_tables())
498   {
499     ma_message_no_user(0, "closing of tables failed");
500   }
501 end:
502   error_handler_hook= save_error_handler_hook;
503   my_hash_free(&all_dirty_pages);
504   bzero(&all_dirty_pages, sizeof(all_dirty_pages));
505   my_free(dirty_pages_pool);
506   dirty_pages_pool= NULL;
507   my_free(all_tables);
508   all_tables= NULL;
509   my_free(all_active_trans);
510   all_active_trans= NULL;
511   my_free(log_record_buffer.str);
512   log_record_buffer.str= NULL;
513   log_record_buffer.length= 0;
514   ma_checkpoint_end();
515   *warnings_count= recovery_warnings + recovery_found_crashed_tables;
516   if (recovery_message_printed != REC_MSG_NONE)
517   {
518     if (procent_printed)
519     {
520       procent_printed= 0;
521       fprintf(stderr, "\n");
522       fflush(stderr);
523     }
524     if (!error)
525     {
526       ma_message_no_user(ME_NOTE, "recovery done");
527       maria_recovery_changed_data= 1;
528     }
529   }
530   else if (!error && max_trid_in_control_file != max_long_trid)
531   {
532     /*
533       maria_end() will set max trid in log file so that one can run
534       maria_chk on the tables
535     */
536     maria_recovery_changed_data= 1;
537   }
538 
539   if (error && !abort_message_printed)
540   {
541     my_message(HA_ERR_INITIALIZATION,
542                "Aria recovery failed. Please run aria_chk -r on all Aria "
543                "tables and delete all aria_log.######## files", MYF(0));
544   }
545   procent_printed= 0;
546   /*
547     We don't cleanly close tables if we hit some error (may corrupt them by
548     flushing some wrong blocks made from wrong REDOs). It also leaves their
549     open_count>0, which ensures that --aria-recover, if used, will try to
550     repair them.
551   */
552   DBUG_RETURN(error);
553 }
554 
555 
556 /* very basic info about the record's header */
display_record_position(const LOG_DESC * log_desc,const TRANSLOG_HEADER_BUFFER * rec,uint number)557 static void display_record_position(const LOG_DESC *log_desc,
558                                     const TRANSLOG_HEADER_BUFFER *rec,
559                                     uint number)
560 {
561   /*
562     if number==0, we're going over records which we had already seen and which
563     form a group, so we indent below the group's end record
564   */
565   tprint(tracef,
566          "%sRec#%u LSN " LSN_FMT " short_trid %u %s(num_type:%u) len %lu\n",
567          number ? "" : "   ", number, LSN_IN_PARTS(rec->lsn),
568          rec->short_trid, log_desc->name, rec->type,
569          (ulong)rec->record_length);
570   if (rec->type == LOGREC_DEBUG_INFO)
571   {
572     /* Print some extra information */
573     (*log_desc->record_execute_in_redo_phase)(rec);
574   }
575 }
576 
577 
display_and_apply_record(const LOG_DESC * log_desc,const TRANSLOG_HEADER_BUFFER * rec)578 static int display_and_apply_record(const LOG_DESC *log_desc,
579                                     const TRANSLOG_HEADER_BUFFER *rec)
580 {
581   int error;
582   if (log_desc->record_execute_in_redo_phase == NULL)
583   {
584     /* die on all not-yet-handled records :) */
585     DBUG_ASSERT("one more hook to write" == 0);
586     return 1;
587   }
588   if (rec->type == LOGREC_DEBUG_INFO)
589   {
590     /* Query already printed by display_record_position() */
591     return 0;
592   }
593   if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
594     eprint(tracef, "Got error %d when executing record %s",
595            my_errno, log_desc->name);
596   return error;
597 }
598 
599 
prototype_redo_exec_hook(LONG_TRANSACTION_ID)600 prototype_redo_exec_hook(LONG_TRANSACTION_ID)
601 {
602   uint16 sid= rec->short_trid;
603   TrID long_trid= all_active_trans[sid].long_trid;
604   /*
605     Any incomplete group should be of an old crash which already had a
606     recovery and thus has logged INCOMPLETE_GROUP which we must have seen.
607   */
608   DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE);
609   if (long_trid != 0)
610   {
611     LSN ulsn= all_active_trans[sid].undo_lsn;
612     /*
613       If the first record of that transaction is after 'rec', it's probably
614       because that transaction was found in the checkpoint record, and then
615       it's ok, we can forget about that transaction (we'll meet it later
616       again in the REDO phase) and replace it with the one in 'rec'.
617     */
618     if ((ulsn != LSN_IMPOSSIBLE) &&
619         (cmp_translog_addr(ulsn, rec->lsn) < 0))
620     {
621       char llbuf[22];
622       llstr(long_trid, llbuf);
623       eprint(tracef, "Found an old transaction long_trid %s short_trid %u"
624              " with same short id as this new transaction, and has neither"
625              " committed nor rollback (undo_lsn: " LSN_FMT ")",
626              llbuf, sid, LSN_IN_PARTS(ulsn));
627       goto err;
628     }
629   }
630   long_trid= uint6korr(rec->header);
631   new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
632   goto end;
633 err:
634   ALERT_USER();
635   return 1;
636 end:
637   return 0;
638 }
639 
640 
new_transaction(uint16 sid,TrID long_id,LSN undo_lsn,LSN first_undo_lsn)641 static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
642                             LSN first_undo_lsn)
643 {
644   char llbuf[22];
645   all_active_trans[sid].long_trid= long_id;
646   llstr(long_id, llbuf);
647   tprint(tracef, "Transaction long_trid %s short_trid %u starts,"
648          " undo_lsn " LSN_FMT " first_undo_lsn " LSN_FMT "\n",
649          llbuf, sid, LSN_IN_PARTS(undo_lsn), LSN_IN_PARTS(first_undo_lsn));
650   all_active_trans[sid].undo_lsn= undo_lsn;
651   all_active_trans[sid].first_undo_lsn= first_undo_lsn;
652   set_if_bigger(max_long_trid, long_id);
653 }
654 
655 
prototype_redo_exec_hook_dummy(CHECKPOINT)656 prototype_redo_exec_hook_dummy(CHECKPOINT)
657 {
658   /* the only checkpoint we care about was found via control file, ignore */
659   tprint(tracef, "CHECKPOINT found\n");
660   return 0;
661 }
662 
663 
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)664 prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
665 {
666   /* abortion was already made */
667   return 0;
668 }
669 
670 
prototype_redo_exec_hook(INCOMPLETE_LOG)671 prototype_redo_exec_hook(INCOMPLETE_LOG)
672 {
673   MARIA_HA *info;
674 
675   /* We try to get table first, so that we get the table in in the trace log */
676   info= get_MARIA_HA_from_REDO_record(rec);
677 
678   if (skip_DDLs)
679   {
680     tprint(tracef, "we skip DDLs\n");
681     return 0;
682   }
683 
684   if (!info)
685   {
686     /* no such table, don't need to warn */
687     return 0;
688   }
689 
690   if (maria_is_crashed(info))
691     return 0;
692 
693   if (info->s->state.is_of_horizon > rec->lsn)
694   {
695     /*
696       This table was repaired at a time after this log entry.
697       We can assume that all rows was inserted sucessfully and we don't
698       have to warn about that the inserted data was not logged
699     */
700     return 0;
701   }
702 
703   /*
704     Example of what can go wrong when replaying DDLs:
705     CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged);
706     ALTER TABLE t ... which does
707     CREATE a temporary table #sql... (logged)
708     INSERT data from t into #sql... (not logged)
709     RENAME #sql TO t (logged)
710     Removing tables by hand and replaying the log will leave in the
711     end an empty table "t": missing records. If after the RENAME an INSERT
712     into t was done, that row had number 1 in its page, executing the
713     REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion
714     failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is
715     created whereas rownr is not 0).
716     So when the server disables logging for ALTER TABLE or CREATE SELECT, it
717     logs LOGREC_INCOMPLETE_LOG to warn aria_read_log and then the user.
718 
719     Another issue is that replaying of DDLs is not correct enough to work if
720     there was a crash during a DDL (see comment in execution of
721     REDO_RENAME_TABLE ).
722   */
723 
724   eprint(tracef, "***WARNING: Aria engine currently logs no records "
725           "about insertion of data by ALTER TABLE and CREATE SELECT, "
726           "as they are not necessary for recovery; "
727           "present applying of log records to table '%s' may well not work."
728           "***", info->s->index_file_name.str);
729 
730   /* Prevent using the table for anything else than undo repair */
731   _ma_mark_file_crashed(info->s);
732   recovery_warnings++;
733   return 0;
734 }
735 
736 
create_database_if_not_exists(const char * name)737 static my_bool create_database_if_not_exists(const char *name)
738 {
739   char dirname[FN_REFLEN];
740   size_t length;
741   MY_STAT stat_info;
742   DBUG_ENTER("create_database_if_not_exists");
743 
744   dirname_part(dirname, name, &length);
745   if (!length)
746   {
747     /* Skip files without directores */
748     DBUG_RETURN(0);
749   }
750   /*
751     Safety;  Don't create files with hard path;
752     Should never happen with MariaDB
753     If hard path, then error will be detected when trying to create index file
754   */
755   if (test_if_hard_path(dirname))
756     DBUG_RETURN(0);
757 
758   if (my_stat(dirname,&stat_info,MYF(0)))
759     DBUG_RETURN(0);
760 
761 
762   tprint(tracef, "Creating not existing database '%s'\n", dirname);
763   if (my_mkdir(dirname, 0777, MYF(MY_WME)))
764   {
765     eprint(tracef, "***WARNING: Can't create not existing database '%s'",
766            dirname);
767     DBUG_RETURN(1);
768   }
769   DBUG_RETURN(0);
770 }
771 
772 
773 
774 
775 
prototype_redo_exec_hook(REDO_CREATE_TABLE)776 prototype_redo_exec_hook(REDO_CREATE_TABLE)
777 {
778   File dfile= -1, kfile= -1;
779   char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *ptr2,
780     *data_file_name, *index_file_name;
781   uchar *kfile_header;
782   myf create_flag;
783   uint flags;
784   int error= 1, create_mode= O_RDWR | O_TRUNC, i;
785   MARIA_HA *info= NULL;
786   uint kfile_size_before_extension, keystart;
787   DBUG_ENTER("exec_REDO_LOGREC_REDO_CREATE_TABLE");
788 
789   if (skip_DDLs)
790   {
791     tprint(tracef, "we skip DDLs\n");
792     DBUG_RETURN(0);
793   }
794   enlarge_buffer(rec);
795   if (log_record_buffer.str == NULL ||
796       translog_read_record(rec->lsn, 0, rec->record_length,
797                            log_record_buffer.str, NULL) !=
798       rec->record_length)
799   {
800     eprint(tracef, "Failed to read record");
801     goto end;
802   }
803   name= (char *)log_record_buffer.str;
804   /*
805     TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can
806     find a REDO_CREATE_TABLE for a table which we have open, that's why we
807     need to look for any open instances and close them first.
808   */
809   if (close_one_table(name, rec->lsn))
810   {
811     eprint(tracef, "Table '%s' got error %d on close", name, my_errno);
812     ALERT_USER();
813     goto end;
814   }
815   /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
816   info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
817   if (info)
818   {
819     MARIA_SHARE *share= info->s;
820     /* check that we're not already using it */
821     if (share->reopen != 1)
822     {
823       eprint(tracef, "Table '%s is already open (reopen=%u)",
824              name, share->reopen);
825       ALERT_USER();
826       goto end;
827     }
828     DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
829     if (!share->base.born_transactional)
830     {
831       /*
832         could be that transactional table was later dropped, and a non-trans
833         one was renamed to its name, thus create_rename_lsn is 0 and should
834         not be trusted.
835       */
836       tprint(tracef, "Table '%s' is not transactional, ignoring creation\n",
837              name);
838       ALERT_USER();
839       error= 0;
840       goto end;
841     }
842     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
843     {
844       tprint(tracef, "Table '%s' has create_rename_lsn " LSN_FMT " more "
845              "recent than record, ignoring creation",
846              name, LSN_IN_PARTS(share->state.create_rename_lsn));
847       error= 0;
848       goto end;
849     }
850     if (maria_is_crashed(info))
851     {
852       eprint(tracef, "Table '%s' is crashed, can't recreate it", name);
853       ALERT_USER();
854       goto end;
855     }
856     maria_close(info);
857     info= NULL;
858   }
859   else
860   {
861     /* one or two files absent, or header corrupted... */
862     tprint(tracef, "Table '%s' can't be opened (Error: %d)\n",
863            name, my_errno);
864   }
865   /* if does not exist, or is older, overwrite it */
866   ptr= name + strlen(name) + 1;
867   if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
868     tprint(tracef, ", we will only touch index file");
869   ptr++;
870   kfile_size_before_extension= uint2korr(ptr);
871   ptr+= 2;
872   keystart= uint2korr(ptr);
873   ptr+= 2;
874   kfile_header= (uchar *)ptr;
875   ptr+= kfile_size_before_extension;
876   /* set header lsns */
877   ptr2= (char *) kfile_header + sizeof(info->s->state.header) +
878     MARIA_FILE_CREATE_RENAME_LSN_OFFSET;
879   for (i= 0; i<3; i++)
880   {
881     lsn_store(ptr2, rec->lsn);
882     ptr2+= LSN_STORE_SIZE;
883   }
884   data_file_name= ptr;
885   ptr+= strlen(data_file_name) + 1;
886   index_file_name= ptr;
887   ptr+= strlen(index_file_name) + 1;
888   /** @todo handle symlinks */
889   if (data_file_name[0] || index_file_name[0])
890   {
891     eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled",
892            name);
893     goto end;
894   }
895   if (create_database_if_not_exists(name))
896     goto end;
897   fn_format(filename, name, "", MARIA_NAME_IEXT,
898             MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH | MY_APPEND_EXT);
899   linkname_ptr= NULL;
900   create_flag= MY_DELETE_OLD;
901   tprint(tracef, "Table '%s' creating as '%s'\n", name, filename);
902   if ((kfile= mysql_file_create_with_symlink(key_file_kfile, linkname_ptr,
903                                              filename, 0, create_mode,
904                                              MYF(MY_WME|create_flag))) < 0)
905   {
906     eprint(tracef, "Failed to create index file");
907     goto end;
908   }
909   if (my_pwrite(kfile, kfile_header,
910                 kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
911       mysql_file_chsize(kfile, keystart, 0, MYF(MY_WME)))
912   {
913     eprint(tracef, "Failed to write to index file");
914     goto end;
915   }
916   if (!(flags & HA_DONT_TOUCH_DATA))
917   {
918     fn_format(filename,name,"", MARIA_NAME_DEXT,
919               MY_UNPACK_FILENAME | MY_APPEND_EXT);
920     linkname_ptr= NULL;
921     create_flag=MY_DELETE_OLD;
922     if (((dfile=
923           mysql_file_create_with_symlink(key_file_dfile, linkname_ptr,
924                                          filename, 0, create_mode,
925                                          MYF(MY_WME | create_flag))) < 0) ||
926         mysql_file_close(dfile, MYF(MY_WME)))
927     {
928       eprint(tracef, "Failed to create data file");
929       goto end;
930     }
931     /*
932       we now have an empty data file. To be able to
933       _ma_initialize_data_file() we need some pieces of the share to be
934       correctly filled. So we just open the table (fortunately, an empty
935       data file does not preclude this).
936     */
937     if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
938         _ma_initialize_data_file(info->s, info->dfile.file))
939     {
940       eprint(tracef, "Failed to open new table or write to data file");
941       goto end;
942     }
943   }
944   error= 0;
945 end:
946   if (kfile >= 0)
947     error|= mysql_file_close(kfile, MYF(MY_WME));
948   if (info != NULL)
949     error|= maria_close(info);
950   DBUG_RETURN(error);
951 }
952 
953 
prototype_redo_exec_hook(REDO_RENAME_TABLE)954 prototype_redo_exec_hook(REDO_RENAME_TABLE)
955 {
956   char *old_name, *new_name;
957   int error= 1;
958   MARIA_HA *info= NULL;
959   my_bool from_table_is_crashed= 0;
960   DBUG_ENTER("exec_REDO_LOGREC_REDO_RENAME_TABLE");
961 
962   if (skip_DDLs)
963   {
964     tprint(tracef, "we skip DDLs\n");
965     DBUG_RETURN(0);
966   }
967   enlarge_buffer(rec);
968   if (log_record_buffer.str == NULL ||
969       translog_read_record(rec->lsn, 0, rec->record_length,
970                            log_record_buffer.str, NULL) !=
971       rec->record_length)
972   {
973     eprint(tracef, "Failed to read record");
974     goto end;
975   }
976   old_name= (char *)log_record_buffer.str;
977   new_name= old_name + strlen(old_name) + 1;
978   tprint(tracef, "Table '%s' to rename to '%s'; old-name table ", old_name,
979          new_name);
980   /*
981     Here is why we skip CREATE/DROP/RENAME when doing a recovery from
982     ha_maria (whereas we do when called from aria_read_log). Consider:
983     CREATE TABLE t;
984     RENAME TABLE t to u;
985     DROP TABLE u;
986     RENAME TABLE v to u; # crash between index rename and data rename.
987     And do a Recovery (not removing tables beforehand).
988     Recovery replays CREATE, then RENAME: the maria_open("t") works,
989     maria_open("u") does not (no data file) so table "u" is considered
990     inexistent and so maria_rename() is done which overwrites u's index file,
991     which is lost. Ok, the data file (v.MAD) is still available, but only a
992     REPAIR USE_FRM can rebuild the index, which is unsafe and downtime.
993     So it is preferrable to not execute RENAME, and leave the "mess" of files,
994     rather than possibly destroy a file. DBA will manually rename files.
995     A safe recovery method would probably require checking the existence of
996     the index file and of the data file separately (not via maria_open()), and
997     maybe also to store a create_rename_lsn in the data file too
998     For now, all we risk is to leave the mess (half-renamed files) left by the
999     crash. We however sync files and directories at each file rename. The SQL
1000     layer is anyway not crash-safe for DDLs (except the repartioning-related
1001     ones).
1002     We replay DDLs in aria_read_log to be able to recreate tables from
1003     scratch. It means that "aria_read_log -a" should not be used on a
1004     database which just crashed during a DDL. And also ALTER TABLE does not
1005     log insertions of records into the temporary table, so replaying may
1006     fail (grep for INCOMPLETE_LOG in files).
1007   */
1008   info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
1009   if (info)
1010   {
1011     MARIA_SHARE *share= info->s;
1012     if (!share->base.born_transactional)
1013     {
1014       tprint(tracef, ", is not transactional, ignoring renaming\n");
1015       ALERT_USER();
1016       error= 0;
1017       goto end;
1018     }
1019     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1020     {
1021       tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1022              " record, ignoring renaming",
1023              LSN_IN_PARTS(share->state.create_rename_lsn));
1024       error= 0;
1025       goto end;
1026     }
1027     if (maria_is_crashed(info))
1028     {
1029       tprint(tracef, "is crashed, can't be used for rename ; new-name table ");
1030       from_table_is_crashed= 1;
1031     }
1032     if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
1033         maria_close(info))
1034       goto end;
1035     info= NULL;
1036     if (!from_table_is_crashed)
1037       tprint(tracef, "is ok for renaming; new-name table ");
1038   }
1039   else /* one or two files absent, or header corrupted... */
1040   {
1041     tprint(tracef, ", can't be opened, probably does not exist");
1042     error= 0;
1043     goto end;
1044   }
1045   /*
1046     We must also check the create_rename_lsn of the 'new_name' table if it
1047     exists: otherwise we may, with our rename which overwrites, destroy
1048     another table. For example:
1049     CREATE TABLE t;
1050     RENAME t to u;
1051     DROP TABLE u;
1052     RENAME v to u; # v is an old table, its creation/insertions not in log
1053     And start executing the log (without removing tables beforehand): creates
1054     t, renames it to u (if not testing create_rename_lsn) thus overwriting
1055     old-named v, drops u, and we are stuck, we have lost data.
1056   */
1057   info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
1058   if (info)
1059   {
1060     MARIA_SHARE *share= info->s;
1061     /* We should not have open instances on this table. */
1062     if (share->reopen != 1)
1063     {
1064       tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
1065       ALERT_USER();
1066       goto end;
1067     }
1068     if (!share->base.born_transactional)
1069     {
1070       tprint(tracef, ", is not transactional, ignoring renaming\n");
1071       ALERT_USER();
1072       goto drop;
1073     }
1074     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1075     {
1076       tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1077              " record, ignoring renaming",
1078              LSN_IN_PARTS(share->state.create_rename_lsn));
1079       /*
1080         We have to drop the old_name table. Consider:
1081         CREATE TABLE t;
1082         CREATE TABLE v;
1083         RENAME TABLE t to u;
1084         DROP TABLE u;
1085         RENAME TABLE v to u;
1086         and apply the log without removing tables beforehand. t will be
1087         created, v too; in REDO_RENAME u will be more recent, but we still
1088         have to drop t otherwise it stays.
1089       */
1090       goto drop;
1091     }
1092     if (maria_is_crashed(info))
1093     {
1094       tprint(tracef, ", is crashed, can't rename it");
1095       ALERT_USER();
1096       goto end;
1097     }
1098     if (maria_close(info))
1099       goto end;
1100     info= NULL;
1101     /* abnormal situation */
1102     tprint(tracef, "exists but is older than record, can't rename it");
1103     goto end;
1104   }
1105   else /* one or two files absent, or header corrupted... */
1106     tprint(tracef, "can't be opened, probably does not exist");
1107 
1108   if (from_table_is_crashed)
1109   {
1110     eprint(tracef, "Aborting rename as old table was crashed");
1111     ALERT_USER();
1112     goto end;
1113   }
1114 
1115   tprint(tracef, ", renaming '%s'", old_name);
1116   if (maria_rename(old_name, new_name))
1117   {
1118     eprint(tracef, "Failed to rename table");
1119     goto end;
1120   }
1121   info= maria_open(new_name, O_RDONLY, 0);
1122   if (info == NULL)
1123   {
1124     eprint(tracef, "Failed to open renamed table");
1125     goto end;
1126   }
1127   if (_ma_update_state_lsns(info->s, rec->lsn, info->s->state.create_trid,
1128                             TRUE, TRUE))
1129     goto end;
1130   if (maria_close(info))
1131     goto end;
1132   info= NULL;
1133   error= 0;
1134   goto end;
1135 drop:
1136   tprint(tracef, ", only dropping '%s'", old_name);
1137   if (maria_delete_table(old_name))
1138   {
1139     eprint(tracef, "Failed to drop table");
1140     goto end;
1141   }
1142   error= 0;
1143   goto end;
1144 end:
1145   tprint(tracef, "\n");
1146   if (info != NULL)
1147     error|= maria_close(info);
1148   DBUG_RETURN(error);
1149 }
1150 
1151 
1152 /*
1153   The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE.
1154 */
prototype_redo_exec_hook(REDO_REPAIR_TABLE)1155 prototype_redo_exec_hook(REDO_REPAIR_TABLE)
1156 {
1157   int error= 1;
1158   MARIA_HA *info;
1159   HA_CHECK param;
1160   char *name;
1161   my_bool quick_repair;
1162   DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE");
1163 
1164   /* We try to get table first, so that we get the table in in the trace log */
1165   info= get_MARIA_HA_from_REDO_record(rec);
1166 
1167   if (skip_DDLs)
1168   {
1169     /*
1170       REPAIR is not exactly a DDL, but it manipulates files without logging
1171       insertions into them.
1172     */
1173     tprint(tracef, "we skip DDLs\n");
1174     DBUG_RETURN(0);
1175   }
1176 
1177   if (!info)
1178   {
1179     /* no such table, don't need to warn */
1180     return 0;
1181   }
1182 
1183   if (maria_is_crashed(info))
1184   {
1185     tprint(tracef, "we skip repairing crashed table\n");
1186     DBUG_RETURN(0);
1187   }
1188   /*
1189     Otherwise, the mapping is newer than the table, and our record is newer
1190     than the mapping, so we can repair.
1191   */
1192   tprint(tracef, "   repairing...\n");
1193 
1194   maria_chk_init(&param);
1195   param.isam_file_name= name= info->s->open_file_name.str;
1196   param.testflag= uint8korr(rec->header + FILEID_STORE_SIZE);
1197   param.tmpdir= maria_tmpdir;
1198   param.max_trid= max_long_trid;
1199   DBUG_ASSERT(maria_tmpdir);
1200 
1201   info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 8);
1202   quick_repair= MY_TEST(param.testflag & T_QUICK);
1203 
1204   if (param.testflag & T_REP_PARALLEL)
1205   {
1206     if (maria_repair_parallel(&param, info, name, quick_repair))
1207       goto end;
1208   }
1209   else if (param.testflag & T_REP_BY_SORT)
1210   {
1211     if (maria_repair_by_sort(&param, info, name, quick_repair))
1212       goto end;
1213   }
1214   else if (maria_repair(&param, info, name, quick_repair))
1215     goto end;
1216 
1217   if (_ma_update_state_lsns(info->s, rec->lsn, trnman_get_min_safe_trid(),
1218                             TRUE, !(param.testflag & T_NO_CREATE_RENAME_LSN)))
1219     goto end;
1220   error= 0;
1221 
1222 end:
1223   DBUG_RETURN(error);
1224 }
1225 
1226 
prototype_redo_exec_hook(REDO_DROP_TABLE)1227 prototype_redo_exec_hook(REDO_DROP_TABLE)
1228 {
1229   char *name;
1230   int error= 1;
1231   MARIA_HA *info;
1232   if (skip_DDLs)
1233   {
1234     tprint(tracef, "we skip DDLs\n");
1235     return 0;
1236   }
1237   enlarge_buffer(rec);
1238   if (log_record_buffer.str == NULL ||
1239       translog_read_record(rec->lsn, 0, rec->record_length,
1240                            log_record_buffer.str, NULL) !=
1241       rec->record_length)
1242   {
1243     eprint(tracef, "Failed to read record");
1244     return 1;
1245   }
1246   name= (char *)log_record_buffer.str;
1247   tprint(tracef, "Table '%s'", name);
1248   info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
1249   if (info)
1250   {
1251     MARIA_SHARE *share= info->s;
1252     if (!share->base.born_transactional)
1253     {
1254       tprint(tracef, ", is not transactional, ignoring removal\n");
1255       ALERT_USER();
1256       error= 0;
1257       goto end;
1258     }
1259     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1260     {
1261       tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1262              " record, ignoring removal",
1263              LSN_IN_PARTS(share->state.create_rename_lsn));
1264       error= 0;
1265       goto end;
1266     }
1267     if (maria_is_crashed(info))
1268     {
1269       tprint(tracef, ", is crashed, can't drop it");
1270       ALERT_USER();
1271       goto end;
1272     }
1273     if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
1274         maria_close(info))
1275       goto end;
1276     info= NULL;
1277     /* if it is older, or its header is corrupted, drop it */
1278     tprint(tracef, ", dropping '%s'", name);
1279     if (maria_delete_table(name))
1280     {
1281       eprint(tracef, "Failed to drop table");
1282       goto end;
1283     }
1284   }
1285   else /* one or two files absent, or header corrupted... */
1286     tprint(tracef,", can't be opened, probably does not exist");
1287   error= 0;
1288 end:
1289   tprint(tracef, "\n");
1290   if (info != NULL)
1291     error|= maria_close(info);
1292   return error;
1293 }
1294 
1295 
prototype_redo_exec_hook(FILE_ID)1296 prototype_redo_exec_hook(FILE_ID)
1297 {
1298   uint16 sid;
1299   int error= 1;
1300   const char *name;
1301   MARIA_HA *info;
1302   DBUG_ENTER("exec_REDO_LOGREC_FILE_ID");
1303 
1304   if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
1305   {
1306     /*
1307       If that mapping was still true at checkpoint time, it was found in
1308       checkpoint record, no need to recreate it. If that mapping had ended at
1309       checkpoint time (table was closed or repaired), a flush and force
1310       happened and so mapping is not needed.
1311     */
1312     tprint(tracef, "ignoring because before checkpoint\n");
1313     DBUG_RETURN(0);
1314   }
1315 
1316   enlarge_buffer(rec);
1317   if (log_record_buffer.str == NULL ||
1318       translog_read_record(rec->lsn, 0, rec->record_length,
1319                            log_record_buffer.str, NULL) !=
1320        rec->record_length)
1321   {
1322     eprint(tracef, "Failed to read record");
1323     goto end;
1324   }
1325   sid= fileid_korr(log_record_buffer.str);
1326   info= all_tables[sid].info;
1327   if (info != NULL)
1328   {
1329     tprint(tracef, "   Closing table '%s'\n", info->s->open_file_name.str);
1330     prepare_table_for_close(info, rec->lsn);
1331 
1332     /*
1333       Ensure that open count is 1 on close.  This is needed as the
1334       table may initially had an open_count > 0 when we initially
1335       opened it as the server may have crashed without closing it
1336       properly.  As we now have applied all redo's for the table up to
1337       now, we know the table is ok, so it's safe to reset the open
1338       count to 0.
1339     */
1340     if (info->s->state.open_count != 0 && info->s->reopen == 1)
1341     {
1342       /* let ma_close() mark the table properly closed */
1343       info->s->state.open_count= 1;
1344       info->s->global_changed= 1;
1345       info->s->changed= 1;
1346     }
1347     if (maria_close(info))
1348     {
1349       eprint(tracef, "Failed to close table");
1350       goto end;
1351     }
1352     all_tables[sid].info= NULL;
1353   }
1354   name= (char *)log_record_buffer.str + FILEID_STORE_SIZE;
1355   if (new_table(sid, name, rec->lsn))
1356     goto end;
1357   error= 0;
1358 end:
1359   DBUG_RETURN(error);
1360 }
1361 
1362 
new_table(uint16 sid,const char * name,LSN lsn_of_file_id)1363 static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
1364 {
1365   /*
1366     -1 (skip table): close table and return 0;
1367     1 (error): close table and return 1;
1368     0 (success): leave table open and return 0.
1369   */
1370   int error= 1;
1371   MARIA_HA *info;
1372   MARIA_SHARE *share;
1373   my_off_t dfile_len, kfile_len;
1374   DBUG_ENTER("new_table");
1375 
1376   checkpoint_useful= TRUE;
1377   if ((name == NULL) || (name[0] == 0))
1378   {
1379     /*
1380       we didn't use DBUG_ASSERT() because such record corruption could
1381       silently pass in the "info == NULL" test below.
1382     */
1383     tprint(tracef, ", record is corrupted");
1384     eprint(tracef, "\n***WARNING: %s may be corrupted", name ? name : "NULL");
1385     info= NULL;
1386     recovery_warnings++;
1387     goto end;
1388   }
1389   tprint(tracef, "Table '%s', id %u", name, sid);
1390   info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
1391   if (info == NULL)
1392   {
1393     tprint(tracef, ", is absent (must have been dropped later?)"
1394            " or its header is so corrupted that we cannot open it;"
1395            " we skip it");
1396     if (my_errno != ENOENT)
1397     {
1398       recovery_found_crashed_tables++;
1399       eprint(tracef, "\n***WARNING: %s could not be opened: Error: %d",
1400              name ? name : "NULL", (int) my_errno);
1401     }
1402     error= 0;
1403     goto end;
1404   }
1405   share= info->s;
1406   /* check that we're not already using it */
1407   if (share->reopen != 1)
1408   {
1409     tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
1410     /*
1411       It could be that we have in the log
1412       FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12);
1413     */
1414     if (close_one_table(share->open_file_name.str, lsn_of_file_id))
1415       goto end;
1416     /*
1417       We should not try to get length of data/index files as the files
1418       are not on disk yet.
1419     */
1420     _ma_tmp_disable_logging_for_table(info, FALSE);
1421     goto set_lsn_of_file_id;
1422   }
1423   if (!share->base.born_transactional)
1424   {
1425     /*
1426       This can happen if one converts a transactional table to a
1427       not transactional table
1428     */
1429     tprint(tracef, ", is not transactional.  Ignoring open request");
1430     eprint(tracef, "\n***WARNING: '%s' may be crashed", name);
1431     error= -1;
1432     recovery_warnings++;
1433     goto end;
1434   }
1435   if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0)
1436   {
1437     /*
1438       This can happen if the table was dropped and re-created since this
1439       redo entry or if the table had a bulk insert directly after create,
1440       in which case the create_rename_lsn changed.
1441     */
1442     tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1443            " LOGREC_FILE_ID's LSN " LSN_FMT ", ignoring open request",
1444            LSN_IN_PARTS(share->state.create_rename_lsn),
1445            LSN_IN_PARTS(lsn_of_file_id));
1446     recovery_warnings++;
1447     error= -1;
1448     goto end;
1449     /*
1450       Note that we tested that before testing corruption; a recent corrupted
1451       table is not a blocker for the present log record.
1452     */
1453   }
1454   if (maria_is_crashed(info))
1455   {
1456     tprint(tracef, "\n");
1457     eprint(tracef, "Table '%s' is crashed, skipping it. Please repair it with"
1458            " aria_chk -r", share->open_file_name.str);
1459     recovery_found_crashed_tables++;
1460     error= -1; /* not fatal, try with other tables */
1461     goto end;
1462     /*
1463       Note that if a first recovery fails to apply a REDO, it marks the table
1464       corrupted and stops the entire recovery. A second recovery will find the
1465       table is marked corrupted and skip it (and thus possibly handle other
1466       tables).
1467     */
1468   }
1469   /* don't log any records for this work */
1470   _ma_tmp_disable_logging_for_table(info, FALSE);
1471   /* execution of some REDO records relies on data_file_length */
1472   dfile_len= mysql_file_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
1473   kfile_len= mysql_file_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
1474   if ((dfile_len == MY_FILEPOS_ERROR) ||
1475       (kfile_len == MY_FILEPOS_ERROR))
1476   {
1477     tprint(tracef, ", length unknown\n");
1478     eprint(tracef, "\n***WARNING: Can't read length of file '%s'",
1479            share->open_file_name.str);
1480     recovery_warnings++;
1481     goto end;
1482   }
1483   if (share->state.state.data_file_length != dfile_len)
1484   {
1485     tprint(tracef, ", has wrong state.data_file_length "
1486            "(fixing it from %llu to %llu)",
1487            (ulonglong) share->state.state.data_file_length, (ulonglong) dfile_len);
1488     share->state.state.data_file_length= dfile_len;
1489   }
1490   if (share->state.state.key_file_length != kfile_len)
1491   {
1492     tprint(tracef, ", has wrong state.key_file_length "
1493            "(fixing it from %llu to %llu)",
1494            (ulonglong) share->state.state.key_file_length, (ulonglong) kfile_len);
1495     share->state.state.key_file_length= kfile_len;
1496   }
1497   if ((dfile_len % share->block_size) || (kfile_len % share->block_size))
1498   {
1499     tprint(tracef, ", has too short last page");
1500     /* Recovery will fix this, no error */
1501     ALERT_USER();
1502   }
1503 
1504 set_lsn_of_file_id:
1505   /*
1506     This LSN serves in this situation; assume log is:
1507     FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1")
1508     then crash, checkpoint record is parsed and opens "t1" with id 6; assume
1509     REDO phase starts from the REDO_INSERT above: it will wrongly try to
1510     update a page of "t1". With this LSN below, REDO_INSERT can realize the
1511     mapping is newer than itself, and not execute.
1512     Same example is possible with UNDO_INSERT (update of the state).
1513   */
1514   info->s->lsn_of_file_id= lsn_of_file_id;
1515   all_tables[sid].info= info;
1516   /*
1517     We don't set info->s->id, it would be useless (no logging in REDO phase);
1518     if you change that, know that some records in REDO phase call
1519     _ma_update_state_lsns() which resets info->s->id.
1520   */
1521   tprint(tracef, ", opened");
1522   error= 0;
1523 end:
1524   tprint(tracef, "\n");
1525   if (error)
1526   {
1527     if (info != NULL)
1528     {
1529       /* let maria_close() mark the table properly closed */
1530       info->s->state.open_count= 1;
1531       info->s->global_changed= 1;
1532       info->s->changed= 1;
1533       maria_close(info);
1534     }
1535     if (error == -1)
1536       error= 0;
1537   }
1538   DBUG_RETURN(error);
1539 }
1540 
1541 /*
1542   NOTE
1543   This is called for REDO_INSERT_ROW_HEAD and READ_NEW_ROW_HEAD
1544 */
1545 
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)1546 prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
1547 {
1548   int error= 1;
1549   uchar *buff= NULL;
1550   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1551   if (info == NULL || maria_is_crashed(info))
1552 
1553   {
1554     /*
1555       Table was skipped at open time (because later dropped/renamed, not
1556       transactional, or create_rename_lsn newer than LOGREC_FILE_ID), or
1557       record was skipped due to skip_redo_lsn; it is not an error.
1558     */
1559     return 0;
1560   }
1561   /*
1562     Note that REDO is per page, we still consider it if its transaction
1563     committed long ago and is unknown.
1564   */
1565   /*
1566     If REDO's LSN is > page's LSN (read from disk), we are going to modify the
1567     page and change its LSN. The normal runtime code stores the UNDO's LSN
1568     into the page. Here storing the REDO's LSN (rec->lsn) would work
1569     (we are not writing to the log here, so don't have to "flush up to UNDO's
1570     LSN"). But in a test scenario where we do updates at runtime, then remove
1571     tables, apply the log and check that this results in the same table as at
1572     runtime, putting the same LSN as runtime had done will decrease
1573     differences. So we use the UNDO's LSN which is current_group_end_lsn.
1574   */
1575   enlarge_buffer(rec);
1576   if (log_record_buffer.str == NULL)
1577   {
1578     eprint(tracef, "Failed to read allocate buffer for record");
1579     goto end;
1580   }
1581   if (translog_read_record(rec->lsn, 0, rec->record_length,
1582                            log_record_buffer.str, NULL) !=
1583       rec->record_length)
1584   {
1585     eprint(tracef, "Failed to read record");
1586     goto end;
1587   }
1588   buff= log_record_buffer.str;
1589   if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
1590                                              HEAD_PAGE,
1591                                              (rec->type ==
1592                                               LOGREC_REDO_NEW_ROW_HEAD),
1593                                              buff + FILEID_STORE_SIZE,
1594                                              buff +
1595                                              FILEID_STORE_SIZE +
1596                                              PAGE_STORE_SIZE +
1597                                              DIRPOS_STORE_SIZE,
1598                                              rec->record_length -
1599                                              (FILEID_STORE_SIZE +
1600                                               PAGE_STORE_SIZE +
1601                                               DIRPOS_STORE_SIZE)))
1602     goto end;
1603   error= 0;
1604 end:
1605   return error;
1606 }
1607 
1608 /*
1609   NOTE
1610   This is called for REDO_INSERT_ROW_TAIL and READ_NEW_ROW_TAIL
1611 */
1612 
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)1613 prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
1614 {
1615   int error= 1;
1616   uchar *buff;
1617   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1618   if (info == NULL || maria_is_crashed(info))
1619     return 0;
1620   enlarge_buffer(rec);
1621   if (log_record_buffer.str == NULL ||
1622       translog_read_record(rec->lsn, 0, rec->record_length,
1623                            log_record_buffer.str, NULL) !=
1624        rec->record_length)
1625   {
1626     eprint(tracef, "Failed to read record");
1627     goto end;
1628   }
1629   buff= log_record_buffer.str;
1630   if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
1631                                              TAIL_PAGE,
1632                                              (rec->type ==
1633                                               LOGREC_REDO_NEW_ROW_TAIL),
1634                                              buff + FILEID_STORE_SIZE,
1635                                              buff +
1636                                              FILEID_STORE_SIZE +
1637                                              PAGE_STORE_SIZE +
1638                                              DIRPOS_STORE_SIZE,
1639                                              rec->record_length -
1640                                              (FILEID_STORE_SIZE +
1641                                               PAGE_STORE_SIZE +
1642                                               DIRPOS_STORE_SIZE)))
1643     goto end;
1644   error= 0;
1645 
1646 end:
1647   return error;
1648 }
1649 
1650 
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)1651 prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
1652 {
1653   int error= 1;
1654   uchar *buff;
1655   uint number_of_blobs, number_of_ranges;
1656   pgcache_page_no_t first_page, last_page;
1657   char llbuf1[22], llbuf2[22];
1658   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1659   if (info == NULL  || maria_is_crashed(info))
1660     return 0;
1661   enlarge_buffer(rec);
1662   if (log_record_buffer.str == NULL ||
1663       translog_read_record(rec->lsn, 0, rec->record_length,
1664                            log_record_buffer.str, NULL) !=
1665        rec->record_length)
1666   {
1667     eprint(tracef, "Failed to read record");
1668     goto end;
1669   }
1670   buff= log_record_buffer.str;
1671   if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
1672                                       buff, rec->lsn, &number_of_blobs,
1673                                       &number_of_ranges,
1674                                       &first_page, &last_page))
1675     goto end;
1676   llstr(first_page, llbuf1);
1677   llstr(last_page, llbuf2);
1678   tprint(tracef, " %u blobs %u ranges, first page %s last %s",
1679          number_of_blobs, number_of_ranges, llbuf1, llbuf2);
1680 
1681   error= 0;
1682 
1683 end:
1684   tprint(tracef, " \n");
1685   return error;
1686 }
1687 
1688 
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)1689 prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
1690 {
1691   int error= 1;
1692   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1693   if (info == NULL || maria_is_crashed(info))
1694     return 0;
1695   if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
1696                                             HEAD_PAGE,
1697                                             rec->header + FILEID_STORE_SIZE))
1698     goto end;
1699   error= 0;
1700 end:
1701   return error;
1702 }
1703 
1704 
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)1705 prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
1706 {
1707   int error= 1;
1708   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1709   if (info == NULL || maria_is_crashed(info))
1710     return 0;
1711   if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
1712                                             TAIL_PAGE,
1713                                             rec->header + FILEID_STORE_SIZE))
1714     goto end;
1715   error= 0;
1716 end:
1717   return error;
1718 }
1719 
1720 
prototype_redo_exec_hook(REDO_FREE_BLOCKS)1721 prototype_redo_exec_hook(REDO_FREE_BLOCKS)
1722 {
1723   int error= 1;
1724   uchar *buff;
1725   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1726   if (info == NULL || maria_is_crashed(info))
1727     return 0;
1728   enlarge_buffer(rec);
1729 
1730   if (log_record_buffer.str == NULL ||
1731       translog_read_record(rec->lsn, 0, rec->record_length,
1732                            log_record_buffer.str, NULL) !=
1733        rec->record_length)
1734   {
1735     eprint(tracef, "Failed to read record");
1736     goto end;
1737   }
1738 
1739   buff= log_record_buffer.str;
1740   if (_ma_apply_redo_free_blocks(info, current_group_end_lsn, rec->lsn,
1741                                  buff))
1742     goto end;
1743   error= 0;
1744 end:
1745   return error;
1746 }
1747 
1748 
prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)1749 prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)
1750 {
1751   int error= 1;
1752   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1753   if (info == NULL || maria_is_crashed(info))
1754     return 0;
1755 
1756   if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn,
1757                                        rec->header + FILEID_STORE_SIZE))
1758     goto end;
1759   error= 0;
1760 end:
1761   return error;
1762 }
1763 
1764 
prototype_redo_exec_hook(REDO_DELETE_ALL)1765 prototype_redo_exec_hook(REDO_DELETE_ALL)
1766 {
1767   int error= 1;
1768   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1769   if (info == NULL)
1770     return 0;
1771   tprint(tracef, "   deleting all %lu rows\n",
1772          (ulong)info->s->state.state.records);
1773   if (maria_delete_all_rows(info))
1774     goto end;
1775   error= 0;
1776 end:
1777   return error;
1778 }
1779 
1780 
prototype_redo_exec_hook(REDO_INDEX)1781 prototype_redo_exec_hook(REDO_INDEX)
1782 {
1783   int error= 1;
1784   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1785   if (info == NULL || maria_is_crashed(info))
1786     return 0;
1787   enlarge_buffer(rec);
1788 
1789   if (log_record_buffer.str == NULL ||
1790       translog_read_record(rec->lsn, 0, rec->record_length,
1791                            log_record_buffer.str, NULL) !=
1792        rec->record_length)
1793   {
1794     eprint(tracef, "Failed to read record");
1795     goto end;
1796   }
1797 
1798   if (_ma_apply_redo_index(info, current_group_end_lsn,
1799                            log_record_buffer.str + FILEID_STORE_SIZE,
1800                            rec->record_length - FILEID_STORE_SIZE))
1801     goto end;
1802   error= 0;
1803 end:
1804   return error;
1805 }
1806 
prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)1807 prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)
1808 {
1809   int error= 1;
1810   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1811   if (info == NULL || maria_is_crashed(info))
1812     return 0;
1813   enlarge_buffer(rec);
1814 
1815   if (log_record_buffer.str == NULL ||
1816       translog_read_record(rec->lsn, 0, rec->record_length,
1817                            log_record_buffer.str, NULL) !=
1818        rec->record_length)
1819   {
1820     eprint(tracef, "Failed to read record");
1821     goto end;
1822   }
1823 
1824   if (_ma_apply_redo_index_new_page(info, current_group_end_lsn,
1825                                     log_record_buffer.str + FILEID_STORE_SIZE,
1826                                     rec->record_length - FILEID_STORE_SIZE))
1827     goto end;
1828   error= 0;
1829 end:
1830   return error;
1831 }
1832 
1833 
prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)1834 prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)
1835 {
1836   int error= 1;
1837   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1838   if (info == NULL || maria_is_crashed(info))
1839     return 0;
1840 
1841   if (_ma_apply_redo_index_free_page(info, current_group_end_lsn,
1842                                      rec->header + FILEID_STORE_SIZE))
1843     goto end;
1844   error= 0;
1845 end:
1846   return error;
1847 }
1848 
1849 
prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)1850 prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)
1851 {
1852   int error= 1;
1853   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1854   if (info == NULL || maria_is_crashed(info))
1855     return 0;
1856   enlarge_buffer(rec);
1857 
1858   if (log_record_buffer.str == NULL ||
1859       translog_read_record(rec->lsn, 0, rec->record_length,
1860                            log_record_buffer.str, NULL) !=
1861        rec->record_length)
1862   {
1863     eprint(tracef, "Failed to read record");
1864     goto end;
1865   }
1866 
1867   if (cmp_translog_addr(rec->lsn, checkpoint_start) >= 0)
1868   {
1869     /*
1870       Record is potentially after the bitmap flush made by Checkpoint, so has
1871       to be replayed. It may overwrite a more recent state but that will be
1872       corrected by all upcoming REDOs for data pages.
1873       If the condition is false, we must not apply the record: it is unneeded
1874       and nocive (may not be corrected as REDOs can be skipped due to
1875       dirty-pages list).
1876     */
1877     if (_ma_apply_redo_bitmap_new_page(info, current_group_end_lsn,
1878                                        log_record_buffer.str +
1879                                        FILEID_STORE_SIZE))
1880       goto end;
1881   }
1882   error= 0;
1883 end:
1884   return error;
1885 }
1886 
1887 
set_undo_lsn_for_active_trans(uint16 short_trid,LSN lsn)1888 static inline void set_undo_lsn_for_active_trans(uint16 short_trid, LSN lsn)
1889 {
1890   if (all_active_trans[short_trid].long_trid == 0)
1891   {
1892     /* transaction unknown, so has committed or fully rolled back long ago */
1893     return;
1894   }
1895   all_active_trans[short_trid].undo_lsn= lsn;
1896   if (all_active_trans[short_trid].first_undo_lsn == LSN_IMPOSSIBLE)
1897     all_active_trans[short_trid].first_undo_lsn= lsn;
1898 }
1899 
1900 
prototype_redo_exec_hook(UNDO_ROW_INSERT)1901 prototype_redo_exec_hook(UNDO_ROW_INSERT)
1902 {
1903   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1904   MARIA_SHARE *share;
1905 
1906   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1907   if (info == NULL)
1908   {
1909     /*
1910       Note that we set undo_lsn anyway. So that if the transaction is later
1911       rolled back, this UNDO is tried for execution and we get a warning (as
1912       it would then be abnormal that info==NULL).
1913     */
1914     return 0;
1915   }
1916   share= info->s;
1917   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1918   {
1919     tprint(tracef, "   state has LSN " LSN_FMT " older than record, updating"
1920            " rows' count\n", LSN_IN_PARTS(share->state.is_of_horizon));
1921     share->state.state.records++;
1922     if (share->calc_checksum)
1923     {
1924       uchar buff[HA_CHECKSUM_STORE_SIZE];
1925       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1926                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
1927                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1928           HA_CHECKSUM_STORE_SIZE)
1929       {
1930         eprint(tracef, "Failed to read record");
1931         return 1;
1932       }
1933       share->state.state.checksum+= ha_checksum_korr(buff);
1934     }
1935     info->s->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
1936                               STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
1937   }
1938   tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
1939   /* Unpin all pages, stamp them with UNDO's LSN */
1940   _ma_unpin_all_pages(info, rec->lsn);
1941   return 0;
1942 }
1943 
1944 
prototype_redo_exec_hook(UNDO_ROW_DELETE)1945 prototype_redo_exec_hook(UNDO_ROW_DELETE)
1946 {
1947   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1948   MARIA_SHARE *share;
1949 
1950   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1951   if (info == NULL)
1952     return 0;
1953   share= info->s;
1954   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1955   {
1956     tprint(tracef, "   state older than record\n");
1957     share->state.state.records--;
1958     if (share->calc_checksum)
1959     {
1960       uchar buff[HA_CHECKSUM_STORE_SIZE];
1961       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1962                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE + 2 +
1963                                PAGERANGE_STORE_SIZE,
1964                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1965           HA_CHECKSUM_STORE_SIZE)
1966       {
1967         eprint(tracef, "Failed to read record");
1968         return 1;
1969       }
1970       share->state.state.checksum+= ha_checksum_korr(buff);
1971     }
1972     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
1973                             STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
1974                             STATE_NOT_MOVABLE);
1975   }
1976   tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
1977   _ma_unpin_all_pages(info, rec->lsn);
1978   return 0;
1979 }
1980 
1981 
prototype_redo_exec_hook(UNDO_ROW_UPDATE)1982 prototype_redo_exec_hook(UNDO_ROW_UPDATE)
1983 {
1984   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1985   MARIA_SHARE *share;
1986 
1987   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1988   if (info == NULL)
1989     return 0;
1990   share= info->s;
1991   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1992   {
1993     if (share->calc_checksum)
1994     {
1995       uchar buff[HA_CHECKSUM_STORE_SIZE];
1996       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1997                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
1998                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1999           HA_CHECKSUM_STORE_SIZE)
2000       {
2001         eprint(tracef, "Failed to read record");
2002         return 1;
2003       }
2004       share->state.state.checksum+= ha_checksum_korr(buff);
2005     }
2006     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2007                             STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2008   }
2009   _ma_unpin_all_pages(info, rec->lsn);
2010   return 0;
2011 }
2012 
2013 
prototype_redo_exec_hook(UNDO_KEY_INSERT)2014 prototype_redo_exec_hook(UNDO_KEY_INSERT)
2015 {
2016   MARIA_HA *info;
2017   MARIA_SHARE *share;
2018 
2019   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2020   if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
2021     return 0;
2022   share= info->s;
2023   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2024   {
2025     const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE;
2026     uint keynr= key_nr_korr(ptr);
2027     if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */
2028     {
2029       const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg;
2030       ulonglong value;
2031       char llbuf[22];
2032       uchar reversed[MARIA_MAX_KEY_BUFF], *to;
2033       tprint(tracef, "   state older than record\n");
2034       /* we read the record to find the auto_increment value */
2035       enlarge_buffer(rec);
2036       if (log_record_buffer.str == NULL ||
2037           translog_read_record(rec->lsn, 0, rec->record_length,
2038                                log_record_buffer.str, NULL) !=
2039           rec->record_length)
2040       {
2041         eprint(tracef, "Failed to read record");
2042         return 1;
2043       }
2044       to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
2045         KEY_NR_STORE_SIZE;
2046       if (keyseg->flag & HA_SWAP_KEY)
2047       {
2048         /* We put key from log record to "data record" packing format... */
2049         uchar *key_ptr= to;
2050         uchar *key_end= key_ptr + keyseg->length;
2051         to= reversed + keyseg->length;
2052         do
2053         {
2054           *--to= *key_ptr++;
2055         } while (key_ptr != key_end);
2056         /* ... so that we can read it with: */
2057       }
2058       value= ma_retrieve_auto_increment(to, keyseg->type);
2059       set_if_bigger(share->state.auto_increment, value);
2060       llstr(share->state.auto_increment, llbuf);
2061       tprint(tracef, "   auto-inc %s\n", llbuf);
2062     }
2063   }
2064   _ma_unpin_all_pages(info, rec->lsn);
2065   return 0;
2066 }
2067 
2068 
prototype_redo_exec_hook(UNDO_KEY_DELETE)2069 prototype_redo_exec_hook(UNDO_KEY_DELETE)
2070 {
2071   MARIA_HA *info;
2072 
2073   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2074   if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
2075     return 0;
2076   _ma_unpin_all_pages(info, rec->lsn);
2077   return 0;
2078 }
2079 
2080 
prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)2081 prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
2082 {
2083   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2084   MARIA_SHARE *share;
2085 
2086   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2087   if (info == NULL)
2088     return 0;
2089   share= info->s;
2090   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2091   {
2092     uint key_nr;
2093     my_off_t page;
2094     key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
2095     page=  page_korr(rec->header +  LSN_STORE_SIZE + FILEID_STORE_SIZE +
2096                      KEY_NR_STORE_SIZE);
2097     share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
2098                                     HA_OFFSET_ERROR :
2099                                     page * share->block_size);
2100   }
2101   _ma_unpin_all_pages(info, rec->lsn);
2102   return 0;
2103 }
2104 
2105 
prototype_redo_exec_hook(UNDO_BULK_INSERT)2106 prototype_redo_exec_hook(UNDO_BULK_INSERT)
2107 {
2108   /*
2109     If the repair finished it wrote and sync the state. If it didn't finish,
2110     we are going to empty the table and that will fix the state.
2111   */
2112   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2113   return 0;
2114 }
2115 
2116 
prototype_redo_exec_hook(IMPORTED_TABLE)2117 prototype_redo_exec_hook(IMPORTED_TABLE)
2118 {
2119   char *name;
2120   enlarge_buffer(rec);
2121   if (log_record_buffer.str == NULL ||
2122       translog_read_record(rec->lsn, 0, rec->record_length,
2123                            log_record_buffer.str, NULL) !=
2124       rec->record_length)
2125   {
2126     eprint(tracef, "Failed to read record");
2127     return 1;
2128   }
2129   name= (char *)log_record_buffer.str;
2130   tprint(tracef, "Table '%s' was imported (auto-zerofilled) in this Aria instance\n", name);
2131   return 0;
2132 }
2133 
2134 
prototype_redo_exec_hook(COMMIT)2135 prototype_redo_exec_hook(COMMIT)
2136 {
2137   uint16 sid= rec->short_trid;
2138   TrID long_trid= all_active_trans[sid].long_trid;
2139   char llbuf[22];
2140   if (long_trid == 0)
2141   {
2142     tprint(tracef, "We don't know about transaction with short_trid %u;"
2143            "it probably committed long ago, forget it\n", sid);
2144     bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
2145     return 0;
2146   }
2147   llstr(long_trid, llbuf);
2148   tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
2149          llbuf, sid);
2150   bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
2151 #ifdef MARIA_VERSIONING
2152   /*
2153     if real recovery:
2154     transaction was committed, move it to some separate list for later
2155     purging (but don't purge now! purging may have been started before, we
2156     may find REDO_PURGE records soon).
2157   */
2158 #endif
2159   return 0;
2160 }
2161 
prototype_redo_exec_hook(CLR_END)2162 prototype_redo_exec_hook(CLR_END)
2163 {
2164   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2165   MARIA_SHARE *share;
2166   LSN previous_undo_lsn;
2167   enum translog_record_type undone_record_type;
2168   const LOG_DESC *log_desc;
2169   my_bool row_entry= 0;
2170   uchar *logpos;
2171   DBUG_ENTER("exec_REDO_LOGREC_CLR_END");
2172 
2173   previous_undo_lsn= lsn_korr(rec->header);
2174   undone_record_type=
2175     clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
2176   log_desc= &log_record_type_descriptor[undone_record_type];
2177 
2178   set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
2179   if (info == NULL)
2180     DBUG_RETURN(0);
2181   share= info->s;
2182   tprint(tracef, "   CLR_END was about %s, undo_lsn now LSN " LSN_FMT "\n",
2183          log_desc->name, LSN_IN_PARTS(previous_undo_lsn));
2184 
2185   enlarge_buffer(rec);
2186   if (log_record_buffer.str == NULL ||
2187       translog_read_record(rec->lsn, 0, rec->record_length,
2188                            log_record_buffer.str, NULL) !=
2189       rec->record_length)
2190   {
2191     eprint(tracef, "Failed to read record");
2192     return 1;
2193   }
2194   logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
2195            CLR_TYPE_STORE_SIZE);
2196 
2197   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2198   {
2199     tprint(tracef, "   state older than record\n");
2200     switch (undone_record_type) {
2201     case LOGREC_UNDO_ROW_DELETE:
2202       row_entry= 1;
2203       share->state.state.records++;
2204       break;
2205     case LOGREC_UNDO_ROW_INSERT:
2206       share->state.state.records--;
2207       share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
2208       row_entry= 1;
2209       break;
2210     case LOGREC_UNDO_ROW_UPDATE:
2211       row_entry= 1;
2212       break;
2213     case LOGREC_UNDO_KEY_INSERT:
2214     case LOGREC_UNDO_KEY_DELETE:
2215       break;
2216     case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
2217     case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
2218     {
2219       uint key_nr;
2220       my_off_t page;
2221       key_nr= key_nr_korr(logpos);
2222       page=  page_korr(logpos + KEY_NR_STORE_SIZE);
2223       share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
2224                                       HA_OFFSET_ERROR :
2225                                       page * share->block_size);
2226       break;
2227     }
2228     case LOGREC_UNDO_BULK_INSERT:
2229       break;
2230     default:
2231       DBUG_ASSERT(0);
2232     }
2233     if (row_entry && share->calc_checksum)
2234       share->state.state.checksum+= ha_checksum_korr(logpos);
2235     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2236                             STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2237   }
2238   if (row_entry)
2239     tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
2240   _ma_unpin_all_pages(info, rec->lsn);
2241   DBUG_RETURN(0);
2242 }
2243 
2244 
2245 /**
2246    Hock to print debug information (like MySQL query)
2247 */
2248 
prototype_redo_exec_hook(DEBUG_INFO)2249 prototype_redo_exec_hook(DEBUG_INFO)
2250 {
2251   uchar *data;
2252   enum translog_debug_info_type debug_info;
2253 
2254   enlarge_buffer(rec);
2255   if (log_record_buffer.str == NULL ||
2256       translog_read_record(rec->lsn, 0, rec->record_length,
2257                            log_record_buffer.str, NULL) !=
2258       rec->record_length)
2259   {
2260     eprint(tracef, "Failed to read record debug record");
2261     return 1;
2262   }
2263   debug_info= (enum translog_debug_info_type) log_record_buffer.str[0];
2264   data= log_record_buffer.str + 1;
2265   switch (debug_info) {
2266   case LOGREC_DEBUG_INFO_QUERY:
2267     tprint(tracef, "Query: %.*s\n", rec->record_length - 1,
2268            (char*) data);
2269     break;
2270   default:
2271     DBUG_ASSERT(0);
2272   }
2273   return 0;
2274 }
2275 
2276 
2277 /**
2278   In some cases we have to skip execution of an UNDO record during the UNDO
2279   phase.
2280 */
2281 
skip_undo_record(LSN previous_undo_lsn,TRN * trn)2282 static void skip_undo_record(LSN previous_undo_lsn, TRN *trn)
2283 {
2284   trn->undo_lsn= previous_undo_lsn;
2285   if (previous_undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
2286     trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
2287   skipped_undo_phase++;
2288 }
2289 
2290 
prototype_undo_exec_hook(UNDO_ROW_INSERT)2291 prototype_undo_exec_hook(UNDO_ROW_INSERT)
2292 {
2293   my_bool error;
2294   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2295   LSN previous_undo_lsn= lsn_korr(rec->header);
2296   MARIA_SHARE *share;
2297   const uchar *record_ptr;
2298 
2299   if (info == NULL || maria_is_crashed(info))
2300   {
2301     /*
2302       Unlike for REDOs, if the table was skipped it is abnormal; we have a
2303       transaction to rollback which used this table, as it is not rolled back
2304       it was supposed to hold this table and so the table should still be
2305       there. Skip it (user may have repaired the table with maria_chk because
2306       it was so badly corrupted that a previous recovery failed) but warn.
2307     */
2308     skip_undo_record(previous_undo_lsn, trn);
2309     return 0;
2310   }
2311   share= info->s;
2312   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2313                           STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
2314                           STATE_NOT_MOVABLE);
2315   record_ptr= rec->header;
2316   if (share->calc_checksum)
2317   {
2318     /*
2319       We need to read more of the record to put the checksum into the record
2320       buffer used by _ma_apply_undo_row_insert().
2321       If the table has no live checksum, rec->header will be enough.
2322     */
2323     enlarge_buffer(rec);
2324     if (log_record_buffer.str == NULL ||
2325         translog_read_record(rec->lsn, 0, rec->record_length,
2326                              log_record_buffer.str, NULL) !=
2327         rec->record_length)
2328     {
2329       eprint(tracef, "Failed to read record");
2330       return 1;
2331     }
2332     record_ptr= log_record_buffer.str;
2333   }
2334 
2335   info->trn= trn;
2336   error= _ma_apply_undo_row_insert(info, previous_undo_lsn,
2337                                    record_ptr + LSN_STORE_SIZE +
2338                                    FILEID_STORE_SIZE);
2339   info->trn= 0;
2340   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2341   tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
2342   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2343          LSN_IN_PARTS(trn->undo_lsn));
2344   return error;
2345 }
2346 
2347 
prototype_undo_exec_hook(UNDO_ROW_DELETE)2348 prototype_undo_exec_hook(UNDO_ROW_DELETE)
2349 {
2350   my_bool error;
2351   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2352   LSN previous_undo_lsn= lsn_korr(rec->header);
2353   MARIA_SHARE *share;
2354 
2355   if (info == NULL || maria_is_crashed(info))
2356   {
2357     skip_undo_record(previous_undo_lsn, trn);
2358     return 0;
2359   }
2360 
2361   share= info->s;
2362   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2363                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2364   enlarge_buffer(rec);
2365   if (log_record_buffer.str == NULL ||
2366       translog_read_record(rec->lsn, 0, rec->record_length,
2367                            log_record_buffer.str, NULL) !=
2368        rec->record_length)
2369   {
2370     eprint(tracef, "Failed to read record");
2371     return 1;
2372   }
2373 
2374   info->trn= trn;
2375   error= _ma_apply_undo_row_delete(info, previous_undo_lsn,
2376                                    log_record_buffer.str + LSN_STORE_SIZE +
2377                                    FILEID_STORE_SIZE,
2378                                    rec->record_length -
2379                                    (LSN_STORE_SIZE + FILEID_STORE_SIZE));
2380   info->trn= 0;
2381   tprint(tracef, "   rows' count %lu\n   undo_lsn now LSN " LSN_FMT "\n",
2382          (ulong)share->state.state.records, LSN_IN_PARTS(trn->undo_lsn));
2383   return error;
2384 }
2385 
2386 
prototype_undo_exec_hook(UNDO_ROW_UPDATE)2387 prototype_undo_exec_hook(UNDO_ROW_UPDATE)
2388 {
2389   my_bool error;
2390   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2391   LSN previous_undo_lsn= lsn_korr(rec->header);
2392   MARIA_SHARE *share;
2393 
2394   if (info == NULL || maria_is_crashed(info))
2395   {
2396     skip_undo_record(previous_undo_lsn, trn);
2397     return 0;
2398   }
2399 
2400   share= info->s;
2401   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2402                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2403   enlarge_buffer(rec);
2404   if (log_record_buffer.str == NULL ||
2405       translog_read_record(rec->lsn, 0, rec->record_length,
2406                            log_record_buffer.str, NULL) !=
2407        rec->record_length)
2408   {
2409     eprint(tracef, "Failed to read record");
2410     return 1;
2411   }
2412 
2413   info->trn= trn;
2414   error= _ma_apply_undo_row_update(info, previous_undo_lsn,
2415                                    log_record_buffer.str + LSN_STORE_SIZE +
2416                                    FILEID_STORE_SIZE,
2417                                    rec->record_length -
2418                                    (LSN_STORE_SIZE + FILEID_STORE_SIZE));
2419   info->trn= 0;
2420   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2421          LSN_IN_PARTS(trn->undo_lsn));
2422   return error;
2423 }
2424 
2425 
prototype_undo_exec_hook(UNDO_KEY_INSERT)2426 prototype_undo_exec_hook(UNDO_KEY_INSERT)
2427 {
2428   my_bool error;
2429   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2430   LSN previous_undo_lsn= lsn_korr(rec->header);
2431   MARIA_SHARE *share;
2432 
2433   if (info == NULL || maria_is_crashed(info))
2434   {
2435     skip_undo_record(previous_undo_lsn, trn);
2436     return 0;
2437   }
2438 
2439   share= info->s;
2440   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2441                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2442 
2443   enlarge_buffer(rec);
2444   if (log_record_buffer.str == NULL ||
2445       translog_read_record(rec->lsn, 0, rec->record_length,
2446                            log_record_buffer.str, NULL) !=
2447         rec->record_length)
2448   {
2449     eprint(tracef, "Failed to read record");
2450     return 1;
2451   }
2452 
2453   info->trn= trn;
2454   error= _ma_apply_undo_key_insert(info, previous_undo_lsn,
2455                                    log_record_buffer.str + LSN_STORE_SIZE +
2456                                    FILEID_STORE_SIZE,
2457                                    rec->record_length - LSN_STORE_SIZE -
2458                                    FILEID_STORE_SIZE);
2459   info->trn= 0;
2460   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2461   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2462          LSN_IN_PARTS(trn->undo_lsn));
2463   return error;
2464 }
2465 
2466 
prototype_undo_exec_hook(UNDO_KEY_DELETE)2467 prototype_undo_exec_hook(UNDO_KEY_DELETE)
2468 {
2469   my_bool error;
2470   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2471   LSN previous_undo_lsn= lsn_korr(rec->header);
2472   MARIA_SHARE *share;
2473 
2474   if (info == NULL || maria_is_crashed(info))
2475   {
2476     skip_undo_record(previous_undo_lsn, trn);
2477     return 0;
2478   }
2479 
2480   share= info->s;
2481   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2482                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2483 
2484   enlarge_buffer(rec);
2485   if (log_record_buffer.str == NULL ||
2486       translog_read_record(rec->lsn, 0, rec->record_length,
2487                            log_record_buffer.str, NULL) !=
2488         rec->record_length)
2489   {
2490     eprint(tracef, "Failed to read record");
2491     return 1;
2492   }
2493 
2494   info->trn= trn;
2495   error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
2496                                    log_record_buffer.str + LSN_STORE_SIZE +
2497                                    FILEID_STORE_SIZE,
2498                                    rec->record_length - LSN_STORE_SIZE -
2499                                    FILEID_STORE_SIZE, FALSE);
2500   info->trn= 0;
2501   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2502   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2503          LSN_IN_PARTS(trn->undo_lsn));
2504   return error;
2505 }
2506 
2507 
prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)2508 prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
2509 {
2510   my_bool error;
2511   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2512   LSN previous_undo_lsn= lsn_korr(rec->header);
2513   MARIA_SHARE *share;
2514 
2515   if (info == NULL || maria_is_crashed(info))
2516   {
2517     skip_undo_record(previous_undo_lsn, trn);
2518     return 0;
2519   }
2520 
2521   share= info->s;
2522   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2523                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2524 
2525   enlarge_buffer(rec);
2526   if (log_record_buffer.str == NULL ||
2527       translog_read_record(rec->lsn, 0, rec->record_length,
2528                            log_record_buffer.str, NULL) !=
2529         rec->record_length)
2530   {
2531     eprint(tracef, "Failed to read record");
2532     return 1;
2533   }
2534 
2535   info->trn= trn;
2536   error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
2537                                    log_record_buffer.str + LSN_STORE_SIZE +
2538                                    FILEID_STORE_SIZE,
2539                                    rec->record_length - LSN_STORE_SIZE -
2540                                    FILEID_STORE_SIZE, TRUE);
2541   info->trn= 0;
2542   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2543   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2544          LSN_IN_PARTS(trn->undo_lsn));
2545   return error;
2546 }
2547 
2548 
prototype_undo_exec_hook(UNDO_BULK_INSERT)2549 prototype_undo_exec_hook(UNDO_BULK_INSERT)
2550 {
2551   my_bool error;
2552   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2553   LSN previous_undo_lsn= lsn_korr(rec->header);
2554   MARIA_SHARE *share;
2555 
2556   /* Here we don't check for crashed as we can undo the bulk insert */
2557   if (info == NULL)
2558   {
2559     skip_undo_record(previous_undo_lsn, trn);
2560     return 0;
2561   }
2562 
2563   share= info->s;
2564   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2565                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2566 
2567   info->trn= trn;
2568   error= _ma_apply_undo_bulk_insert(info, previous_undo_lsn);
2569   info->trn= 0;
2570   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2571   tprint(tracef, "   undo_lsn now LSN " LSN_FMT "\n",
2572          LSN_IN_PARTS(trn->undo_lsn));
2573   return error;
2574 }
2575 
2576 
run_redo_phase(LSN lsn,LSN lsn_end,enum maria_apply_log_way apply)2577 static int run_redo_phase(LSN lsn, LSN lsn_end, enum maria_apply_log_way apply)
2578 {
2579   TRANSLOG_HEADER_BUFFER rec;
2580   struct st_translog_scanner_data scanner;
2581   int len;
2582   uint i;
2583   DBUG_ENTER("run_redo_phase");
2584 
2585   /* install hooks for execution */
2586 #define install_redo_exec_hook(R)                                        \
2587   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
2588     exec_REDO_LOGREC_ ## R;
2589 #define install_redo_exec_hook_shared(R,S)                               \
2590   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
2591     exec_REDO_LOGREC_ ## S;
2592 #define install_undo_exec_hook(R)                                        \
2593   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
2594     exec_UNDO_LOGREC_ ## R;
2595   install_redo_exec_hook(LONG_TRANSACTION_ID);
2596   install_redo_exec_hook(CHECKPOINT);
2597   install_redo_exec_hook(REDO_CREATE_TABLE);
2598   install_redo_exec_hook(REDO_RENAME_TABLE);
2599   install_redo_exec_hook(REDO_REPAIR_TABLE);
2600   install_redo_exec_hook(REDO_DROP_TABLE);
2601   install_redo_exec_hook(FILE_ID);
2602   install_redo_exec_hook(INCOMPLETE_LOG);
2603   install_redo_exec_hook(INCOMPLETE_GROUP);
2604   install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
2605   install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
2606   install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
2607   install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
2608   install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
2609   install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
2610   install_redo_exec_hook(REDO_FREE_BLOCKS);
2611   install_redo_exec_hook(REDO_DELETE_ALL);
2612   install_redo_exec_hook(REDO_INDEX);
2613   install_redo_exec_hook(REDO_INDEX_NEW_PAGE);
2614   install_redo_exec_hook(REDO_INDEX_FREE_PAGE);
2615   install_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
2616   install_redo_exec_hook(UNDO_ROW_INSERT);
2617   install_redo_exec_hook(UNDO_ROW_DELETE);
2618   install_redo_exec_hook(UNDO_ROW_UPDATE);
2619   install_redo_exec_hook(UNDO_KEY_INSERT);
2620   install_redo_exec_hook(UNDO_KEY_DELETE);
2621   install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
2622   install_redo_exec_hook(COMMIT);
2623   install_redo_exec_hook(CLR_END);
2624   install_undo_exec_hook(UNDO_ROW_INSERT);
2625   install_undo_exec_hook(UNDO_ROW_DELETE);
2626   install_undo_exec_hook(UNDO_ROW_UPDATE);
2627   install_undo_exec_hook(UNDO_KEY_INSERT);
2628   install_undo_exec_hook(UNDO_KEY_DELETE);
2629   install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
2630   /* REDO_NEW_ROW_HEAD shares entry with REDO_INSERT_ROW_HEAD */
2631   install_redo_exec_hook_shared(REDO_NEW_ROW_HEAD, REDO_INSERT_ROW_HEAD);
2632   /* REDO_NEW_ROW_TAIL shares entry with REDO_INSERT_ROW_TAIL */
2633   install_redo_exec_hook_shared(REDO_NEW_ROW_TAIL, REDO_INSERT_ROW_TAIL);
2634   install_redo_exec_hook(UNDO_BULK_INSERT);
2635   install_undo_exec_hook(UNDO_BULK_INSERT);
2636   install_redo_exec_hook(IMPORTED_TABLE);
2637   install_redo_exec_hook(DEBUG_INFO);
2638 
2639   current_group_end_lsn= LSN_IMPOSSIBLE;
2640 #ifndef DBUG_OFF
2641   current_group_table= NULL;
2642 #endif
2643 
2644   if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
2645   {
2646     tprint(tracef, "checkpoint address refers to the log end log or "
2647            "log is empty, nothing to do.\n");
2648     DBUG_RETURN(0);
2649   }
2650 
2651   len= translog_read_record_header(lsn, &rec);
2652 
2653   if (len == RECHEADER_READ_ERROR)
2654   {
2655     eprint(tracef, "Failed to read header of the first record.");
2656     DBUG_RETURN(1);
2657   }
2658   if (translog_scanner_init(lsn, 1, &scanner, 1))
2659   {
2660     tprint(tracef, "Scanner init failed\n");
2661     DBUG_RETURN(1);
2662   }
2663   for (i= 1;;i++)
2664   {
2665     uint16 sid= rec.short_trid;
2666     const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
2667     display_record_position(log_desc, &rec, i);
2668     /*
2669       A complete group is a set of log records with an "end mark" record
2670       (e.g. a set of REDOs for an operation, terminated by an UNDO for this
2671       operation); if there is no "end mark" record the group is incomplete and
2672       won't be executed.
2673     */
2674     if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
2675         (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
2676     {
2677       if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
2678       {
2679         if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
2680         {
2681           /*
2682             Can happen if the transaction got a table write error, then
2683             unlocked tables thus wrote a COMMIT record. Or can be an
2684             INCOMPLETE_GROUP record written by a previous recovery.
2685           */
2686           tprint(tracef, "\nDiscarding incomplete group before this record\n");
2687           all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2688         }
2689         else
2690         {
2691           struct st_translog_scanner_data scanner2;
2692           TRANSLOG_HEADER_BUFFER rec2;
2693           /*
2694             There is a complete group for this transaction, containing more
2695             than this event.
2696           */
2697           tprint(tracef, "   ends a group:\n");
2698           len=
2699             translog_read_record_header(all_active_trans[sid].group_start_lsn,
2700                                         &rec2);
2701           if (len < 0) /* EOF or error */
2702           {
2703             tprint(tracef, "Cannot find record where it should be\n");
2704             goto err;
2705           }
2706           if (lsn_end != LSN_IMPOSSIBLE && rec2.lsn >= lsn_end)
2707           {
2708             tprint(tracef,
2709                    "lsn_end reached at " LSN_FMT ". "
2710                    "Skipping rest of redo entries",
2711                    LSN_IN_PARTS(rec2.lsn));
2712             translog_destroy_scanner(&scanner);
2713             translog_free_record_header(&rec);
2714             DBUG_RETURN(0);
2715           }
2716 
2717           if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1))
2718           {
2719             tprint(tracef, "Scanner2 init failed\n");
2720             goto err;
2721           }
2722           current_group_end_lsn= rec.lsn;
2723           do
2724           {
2725             if (rec2.short_trid == sid) /* it's in our group */
2726             {
2727               const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
2728               display_record_position(log_desc2, &rec2, 0);
2729               if (apply == MARIA_LOG_CHECK)
2730               {
2731                 translog_size_t read_len;
2732                 enlarge_buffer(&rec2);
2733                 read_len=
2734                   translog_read_record(rec2.lsn, 0, rec2.record_length,
2735                                        log_record_buffer.str, NULL);
2736                 if (read_len != rec2.record_length)
2737                 {
2738                   tprint(tracef, "Cannot read record's body: read %u of"
2739                          " %u bytes\n", read_len, rec2.record_length);
2740                   translog_destroy_scanner(&scanner2);
2741                   translog_free_record_header(&rec2);
2742                   goto err;
2743                 }
2744               }
2745               if (apply == MARIA_LOG_APPLY &&
2746                   display_and_apply_record(log_desc2, &rec2))
2747               {
2748                 translog_destroy_scanner(&scanner2);
2749                 translog_free_record_header(&rec2);
2750                 goto err;
2751               }
2752             }
2753             translog_free_record_header(&rec2);
2754             len= translog_read_next_record_header(&scanner2, &rec2);
2755             if (len < 0) /* EOF or error */
2756             {
2757               tprint(tracef, "Cannot find record where it should be\n");
2758               translog_destroy_scanner(&scanner2);
2759               translog_free_record_header(&rec2);
2760               goto err;
2761             }
2762           }
2763           while (rec2.lsn < rec.lsn);
2764           /* group finished */
2765           all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2766           current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
2767           display_record_position(log_desc, &rec, 0);
2768           translog_destroy_scanner(&scanner2);
2769           translog_free_record_header(&rec2);
2770         }
2771       }
2772       if (apply == MARIA_LOG_APPLY &&
2773           display_and_apply_record(log_desc, &rec))
2774         goto err;
2775 #ifndef DBUG_OFF
2776       current_group_table= NULL;
2777 #endif
2778     }
2779     else /* record does not end group */
2780     {
2781       /* just record the fact, can't know if can execute yet */
2782       if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
2783       {
2784         /* group not yet started */
2785         all_active_trans[sid].group_start_lsn= rec.lsn;
2786       }
2787     }
2788     translog_free_record_header(&rec);
2789     len= translog_read_next_record_header(&scanner, &rec);
2790     if (len < 0)
2791     {
2792       switch (len)
2793       {
2794       case RECHEADER_READ_EOF:
2795         tprint(tracef, "EOF on the log\n");
2796         break;
2797       case RECHEADER_READ_ERROR:
2798         tprint(tracef, "Error reading log\n");
2799         goto err;
2800       }
2801       break;
2802     }
2803   }
2804   translog_destroy_scanner(&scanner);
2805   translog_free_record_header(&rec);
2806   if (recovery_message_printed == REC_MSG_REDO)
2807   {
2808     fprintf(stderr, " 100%%");
2809     fflush(stderr);
2810     procent_printed= 1;                         /* Will be follwed by time */
2811   }
2812   DBUG_RETURN(0);
2813 
2814 err:
2815   translog_destroy_scanner(&scanner);
2816   translog_free_record_header(&rec);
2817   DBUG_RETURN(1);
2818 }
2819 
2820 
2821 /**
2822    @brief Informs about any aborted groups or uncommitted transactions,
2823    prepares for the UNDO phase if needed.
2824 
2825    @note Observe that it may init trnman.
2826 */
end_of_redo_phase(my_bool prepare_for_undo_phase)2827 static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
2828 {
2829   uint sid, uncommitted= 0;
2830   char llbuf[22];
2831   LSN addr;
2832 
2833   my_hash_free(&all_dirty_pages);
2834   /*
2835     hash_free() can be called multiple times probably, but be safe if that
2836     changes
2837   */
2838   bzero(&all_dirty_pages, sizeof(all_dirty_pages));
2839   my_free(dirty_pages_pool);
2840   dirty_pages_pool= NULL;
2841 
2842   llstr(max_long_trid, llbuf);
2843   tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf);
2844   llstr(max_trid_in_control_file, llbuf);
2845   tprint(tracef, "Maximum transaction long id seen in control file: %s\n",
2846          llbuf);
2847   /*
2848     If logs were deleted, or lost, trid in control file is needed to set
2849     trnman's generator:
2850   */
2851   set_if_bigger(max_long_trid, max_trid_in_control_file);
2852   if (prepare_for_undo_phase && trnman_init(max_long_trid))
2853     return -1;
2854 
2855   trns_created= TRUE;
2856 
2857   for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
2858   {
2859     TrID long_trid= all_active_trans[sid].long_trid;
2860     LSN gslsn= all_active_trans[sid].group_start_lsn;
2861     TRN *trn;
2862     if (gslsn != LSN_IMPOSSIBLE)
2863     {
2864       tprint(tracef, "Group at LSN " LSN_FMT " short_trid %u incomplete\n",
2865              LSN_IN_PARTS(gslsn), sid);
2866       all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2867     }
2868     if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
2869     {
2870       llstr(long_trid, llbuf);
2871       tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
2872              llbuf, sid);
2873       /*
2874         dummy_transaction_object serves only for DDLs, where there is never a
2875         rollback or incomplete group. And unknown transactions (which have
2876         long_trid==0) should have undo_lsn==LSN_IMPOSSIBLE.
2877       */
2878       if (long_trid ==0)
2879       {
2880         eprint(tracef, "Transaction with long_trid 0 should not roll back");
2881         ALERT_USER();
2882         return -1;
2883       }
2884       if (prepare_for_undo_phase)
2885       {
2886         if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
2887           return -1;
2888         trn->undo_lsn= all_active_trans[sid].undo_lsn;
2889         trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
2890           TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
2891         if (gslsn != LSN_IMPOSSIBLE)
2892         {
2893           /*
2894             UNDO phase will log some records. So, a future recovery may see:
2895             REDO(from incomplete group) - REDO(from rollback) - CLR_END
2896             and thus execute the first REDO (finding it in "a complete
2897             group"). To prevent that:
2898           */
2899           LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS];
2900           LSN lsn;
2901           if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
2902                                     trn, NULL, 0,
2903                                     TRANSLOG_INTERNAL_PARTS, log_array,
2904                                     NULL, NULL))
2905             return -1;
2906         }
2907       }
2908       uncommitted++;
2909     }
2910 #ifdef MARIA_VERSIONING
2911     /*
2912       If real recovery: if transaction was committed, move it to some separate
2913       list for soon purging.
2914     */
2915 #endif
2916   }
2917 
2918   my_free(all_active_trans);
2919   all_active_trans= NULL;
2920 
2921   /*
2922     The UNDO phase uses some normal run-time code of ROLLBACK: generates log
2923     records, etc; prepare tables for that
2924   */
2925   addr= translog_get_horizon();
2926   for (sid= 0; sid <= SHARE_ID_MAX; sid++)
2927   {
2928     MARIA_HA *info= all_tables[sid].info;
2929     if (info != NULL)
2930     {
2931       prepare_table_for_close(info, addr);
2932       /*
2933         But we don't close it; we leave it available for the UNDO phase;
2934         it's likely that the UNDO phase will need it.
2935       */
2936       if (prepare_for_undo_phase)
2937         translog_assign_id_to_share_from_recovery(info->s, sid);
2938     }
2939   }
2940   return uncommitted;
2941 }
2942 
2943 
run_undo_phase(uint uncommitted)2944 static int run_undo_phase(uint uncommitted)
2945 {
2946   LSN last_undo __attribute__((unused));
2947   DBUG_ENTER("run_undo_phase");
2948 
2949   if (uncommitted > 0)
2950   {
2951     checkpoint_useful= TRUE;
2952     if (tracef != stdout)
2953     {
2954       if (recovery_message_printed == REC_MSG_NONE)
2955         print_preamble();
2956       fprintf(stderr, "transactions to roll back:");
2957       recovery_message_printed= REC_MSG_UNDO;
2958     }
2959     tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
2960     for( ; ; )
2961     {
2962       char llbuf[22];
2963       TRN *trn;
2964       if (recovery_message_printed == REC_MSG_UNDO)
2965       {
2966         fprintf(stderr, " %u", uncommitted);
2967         fflush(stderr);
2968       }
2969       if ((uncommitted--) == 0)
2970         break;
2971       trn= trnman_get_any_trn();
2972       DBUG_ASSERT(trn != NULL);
2973       llstr(trn->trid, llbuf);
2974       tprint(tracef, "Rolling back transaction of long id %s\n", llbuf);
2975       last_undo= trn->undo_lsn + 1;
2976 
2977       /* Execute all undo entries */
2978       while (trn->undo_lsn)
2979       {
2980         TRANSLOG_HEADER_BUFFER rec;
2981         LOG_DESC *log_desc;
2982         DBUG_ASSERT(trn->undo_lsn < last_undo);
2983         last_undo= trn->undo_lsn;
2984 
2985         if (translog_read_record_header(trn->undo_lsn, &rec) ==
2986             RECHEADER_READ_ERROR)
2987           DBUG_RETURN(1);
2988         log_desc= &log_record_type_descriptor[rec.type];
2989         display_record_position(log_desc, &rec, 0);
2990         if (log_desc->record_execute_in_undo_phase(&rec, trn))
2991         {
2992           eprint(tracef, "Got error %d when executing undo %s", my_errno,
2993                  log_desc->name);
2994           translog_free_record_header(&rec);
2995           DBUG_RETURN(1);
2996         }
2997         translog_free_record_header(&rec);
2998       }
2999 
3000       /* Force a crash to test recovery of recovery */
3001       if (maria_recovery_force_crash_counter)
3002       {
3003         DBUG_ASSERT(--maria_recovery_force_crash_counter > 0);
3004       }
3005 
3006       if (trnman_rollback_trn(trn))
3007         DBUG_RETURN(1);
3008       /* We could want to span a few threads (4?) instead of 1 */
3009       /* In the future, we want to have this phase *online* */
3010     }
3011   }
3012   DBUG_RETURN(0);
3013 }
3014 
3015 
3016 /**
3017   In case of error in recovery, deletes all transactions from the transaction
3018   manager so that this module does not assert.
3019 
3020   @note no checkpoint should be taken as those transactions matter for the
3021   next recovery (they still haven't been properly dealt with).
3022 */
3023 
delete_all_transactions()3024 static void delete_all_transactions()
3025 {
3026   for( ; ; )
3027   {
3028     TRN *trn= trnman_get_any_trn();
3029     if (trn == NULL)
3030       break;
3031     trn->undo_lsn= trn->first_undo_lsn= LSN_IMPOSSIBLE;
3032     trnman_rollback_trn(trn); /* ignore error */
3033   }
3034 }
3035 
3036 
3037 /**
3038    @brief re-enables transactionality, updates is_of_horizon
3039 
3040    @param  info                table
3041    @param  horizon             address to set is_of_horizon
3042 */
3043 
prepare_table_for_close(MARIA_HA * info,TRANSLOG_ADDRESS horizon)3044 static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon)
3045 {
3046   MARIA_SHARE *share= info->s;
3047   /*
3048     In a fully-forward REDO phase (no checkpoint record),
3049     state is now at least as new as the LSN of the current record. It may be
3050     newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a
3051     table, but that table was later modified further in the log.
3052     But if we parsed a checkpoint record, it may be this way in the log:
3053     FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1)
3054     Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to
3055     make t1 close; the first condition below is however false (when checkpoint
3056     was taken it increased is_of_horizon) and so it works. For safety we
3057     add the second condition.
3058   */
3059   if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 &&
3060       cmp_translog_addr(share->lsn_of_file_id, horizon) < 0)
3061   {
3062     share->state.is_of_horizon= horizon;
3063     _ma_state_info_write_sub(share->kfile.file, &share->state,
3064                              MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET);
3065   }
3066 
3067   /*
3068    Ensure that info->state is up to date as
3069    _ma_renable_logging_for_table() is depending on this
3070   */
3071   *info->state= info->s->state.state;
3072 
3073   /*
3074     This leaves PAGECACHE_PLAIN_PAGE pages into the cache, while the table is
3075     going to switch back to transactional. So the table will be a mix of
3076     pages, which is ok as long as we don't take any checkpoints until all
3077     tables get closed at the end of the UNDO phase.
3078   */
3079   _ma_reenable_logging_for_table(info, FALSE);
3080   info->trn= NULL; /* safety */
3081 }
3082 
3083 
get_MARIA_HA_from_REDO_record(const TRANSLOG_HEADER_BUFFER * rec)3084 static MARIA_HA *get_MARIA_HA_from_REDO_record(const
3085                                                TRANSLOG_HEADER_BUFFER *rec)
3086 {
3087   uint16 sid;
3088   pgcache_page_no_t UNINIT_VAR(page);
3089   MARIA_HA *info;
3090   MARIA_SHARE *share;
3091   char llbuf[22];
3092   my_bool index_page_redo_entry= FALSE, page_redo_entry= FALSE;
3093 
3094   print_redo_phase_progress(rec->lsn);
3095   sid= fileid_korr(rec->header);
3096   switch (rec->type) {
3097     /* not all REDO records have a page: */
3098   case LOGREC_REDO_INDEX_NEW_PAGE:
3099   case LOGREC_REDO_INDEX:
3100   case LOGREC_REDO_INDEX_FREE_PAGE:
3101     index_page_redo_entry= 1;
3102     /* fall through*/
3103   case LOGREC_REDO_INSERT_ROW_HEAD:
3104   case LOGREC_REDO_INSERT_ROW_TAIL:
3105   case LOGREC_REDO_PURGE_ROW_HEAD:
3106   case LOGREC_REDO_PURGE_ROW_TAIL:
3107   case LOGREC_REDO_NEW_ROW_HEAD:
3108   case LOGREC_REDO_NEW_ROW_TAIL:
3109   case LOGREC_REDO_FREE_HEAD_OR_TAIL:
3110     page_redo_entry= TRUE;
3111     page= page_korr(rec->header + FILEID_STORE_SIZE);
3112     llstr(page, llbuf);
3113     break;
3114   case LOGREC_REDO_FREE_BLOCKS:
3115     /*
3116       We are checking against the dirty pages in _ma_apply_redo_free_blocks()
3117     */
3118     break;
3119   default:
3120     break;
3121   }
3122   tprint(tracef, "   For table of short id %u", sid);
3123   info= all_tables[sid].info;
3124 #ifndef DBUG_OFF
3125   DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
3126   current_group_table= info;
3127 #endif
3128   if (info == NULL)
3129   {
3130     tprint(tracef, ", table skipped, so skipping record\n");
3131     return NULL;
3132   }
3133   share= info->s;
3134   tprint(tracef, ", '%s'", share->open_file_name.str);
3135   DBUG_ASSERT(in_redo_phase);
3136   if (!table_is_part_of_recovery_set(&share->open_file_name))
3137   {
3138     tprint(tracef, ", skipped by user\n");
3139     return NULL;
3140   }
3141 
3142   if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
3143   {
3144     /*
3145       This can happen only if processing a record before the checkpoint
3146       record.
3147       id->name mapping is newer than REDO record: for sure the table subject
3148       of the REDO has been flushed and forced (id re-assignment implies this);
3149       REDO can be ignored (and must be, as we don't know what this subject
3150       table was).
3151     */
3152     DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0);
3153     tprint(tracef, ", table's LOGREC_FILE_ID has LSN " LSN_FMT " more recent"
3154            " than record, skipping record",
3155            LSN_IN_PARTS(share->lsn_of_file_id));
3156     return NULL;
3157   }
3158   if (cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
3159   {
3160     /* probably a bulk insert repair */
3161     tprint(tracef, ", has skip_redo_lsn " LSN_FMT " more recent than"
3162            " record, skipping record\n",
3163            LSN_IN_PARTS(share->state.skip_redo_lsn));
3164     return NULL;
3165   }
3166   /* detect if an open instance of a dropped table (internal bug) */
3167   DBUG_ASSERT(share->last_version != 0);
3168   if (page_redo_entry)
3169   {
3170     /*
3171       Consult dirty pages list.
3172       REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
3173       pages.
3174     */
3175     if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
3176                                      index_page_redo_entry))
3177       return NULL;
3178   }
3179   /*
3180     So we are going to read the page, and if its LSN is older than the
3181     record's we will modify the page
3182   */
3183   tprint(tracef, ", applying record\n");
3184   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
3185   return info;
3186 }
3187 
3188 
get_MARIA_HA_from_UNDO_record(const TRANSLOG_HEADER_BUFFER * rec)3189 static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
3190                                                TRANSLOG_HEADER_BUFFER *rec)
3191 {
3192   uint16 sid;
3193   MARIA_HA *info;
3194   MARIA_SHARE *share;
3195 
3196   sid= fileid_korr(rec->header + LSN_STORE_SIZE);
3197   tprint(tracef, "   For table of short id %u", sid);
3198   info= all_tables[sid].info;
3199 #ifndef DBUG_OFF
3200   DBUG_ASSERT(!in_redo_phase ||
3201               current_group_table == NULL || current_group_table == info);
3202   current_group_table= info;
3203 #endif
3204   if (info == NULL)
3205   {
3206     tprint(tracef, ", table skipped, so skipping record\n");
3207     return NULL;
3208   }
3209   share= info->s;
3210   tprint(tracef, ", '%s'", share->open_file_name.str);
3211 
3212   if (!table_is_part_of_recovery_set(&share->open_file_name))
3213   {
3214     tprint(tracef, ", skipped by user\n");
3215     return NULL;
3216   }
3217 
3218   if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
3219   {
3220     tprint(tracef, ", table's LOGREC_FILE_ID has LSN " LSN_FMT " more recent"
3221            " than record, skipping record",
3222            LSN_IN_PARTS(share->lsn_of_file_id));
3223     return NULL;
3224   }
3225   if (in_redo_phase &&
3226       cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
3227   {
3228     /* probably a bulk insert repair */
3229     tprint(tracef, ", has skip_redo_lsn " LSN_FMT " more recent than"
3230            " record, skipping record\n",
3231            LSN_IN_PARTS(share->state.skip_redo_lsn));
3232     return NULL;
3233   }
3234   DBUG_ASSERT(share->last_version != 0);
3235   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
3236   tprint(tracef, ", applying record\n");
3237   return info;
3238 }
3239 
3240 
3241 /**
3242    @brief Parses checkpoint record.
3243 
3244    Builds from it the dirty_pages list (a hash), opens tables and maps them to
3245    their 2-byte IDs, recreates transactions (not real TRNs though).
3246 
3247    @return LSN from where in the log the REDO phase should start
3248      @retval LSN_ERROR error
3249      @retval other     ok
3250 */
3251 
parse_checkpoint_record(LSN lsn)3252 static LSN parse_checkpoint_record(LSN lsn)
3253 {
3254   ulong i;
3255   ulonglong nb_dirty_pages;
3256   TRANSLOG_HEADER_BUFFER rec;
3257   TRANSLOG_ADDRESS start_address;
3258   int len;
3259   uint nb_active_transactions, nb_committed_transactions, nb_tables;
3260   uchar *ptr;
3261   LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages;
3262   struct st_dirty_page *next_dirty_page_in_pool;
3263 
3264   tprint(tracef, "Loading data from checkpoint record at LSN " LSN_FMT "\n",
3265          LSN_IN_PARTS(lsn));
3266   if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR ||
3267       rec.type != LOGREC_CHECKPOINT)
3268   {
3269     eprint(tracef, "Cannot find checkpoint record at LSN " LSN_FMT,
3270            LSN_IN_PARTS(lsn));
3271     return LSN_ERROR;
3272   }
3273 
3274   enlarge_buffer(&rec);
3275   if (log_record_buffer.str == NULL ||
3276       translog_read_record(rec.lsn, 0, rec.record_length,
3277                            log_record_buffer.str, NULL) !=
3278       rec.record_length)
3279   {
3280     eprint(tracef, "Failed to read record");
3281     return LSN_ERROR;
3282   }
3283 
3284   ptr= log_record_buffer.str;
3285   start_address= lsn_korr(ptr);
3286   ptr+= LSN_STORE_SIZE;
3287   tprint(tracef, "Checkpoint record has start_horizon at " LSN_FMT "\n",
3288          LSN_IN_PARTS(start_address));
3289 
3290   /* transactions */
3291   nb_active_transactions= uint2korr(ptr);
3292   ptr+= 2;
3293   tprint(tracef, "%u active transactions\n", nb_active_transactions);
3294   minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
3295   ptr+= LSN_STORE_SIZE;
3296   max_long_trid= transid_korr(ptr);
3297   ptr+= TRANSID_SIZE;
3298 
3299   /*
3300     how much brain juice and discussions there was to come to writing this
3301     line. It may make start_address slightly decrease (only by the time it
3302     takes to write one or a few rows, roughly).
3303   */
3304   tprint(tracef, "Checkpoint record has min_rec_lsn of active transactions"
3305          " at " LSN_FMT "\n",
3306          LSN_IN_PARTS(minimum_rec_lsn_of_active_transactions));
3307   set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions);
3308 
3309   for (i= 0; i < nb_active_transactions; i++)
3310   {
3311     uint16 sid= uint2korr(ptr);
3312     TrID long_id;
3313     LSN undo_lsn, first_undo_lsn;
3314     ptr+= 2;
3315     long_id= uint6korr(ptr);
3316     ptr+= 6;
3317     DBUG_ASSERT(sid > 0 && long_id > 0);
3318     undo_lsn= lsn_korr(ptr);
3319     ptr+= LSN_STORE_SIZE;
3320     first_undo_lsn= lsn_korr(ptr);
3321     ptr+= LSN_STORE_SIZE;
3322     new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
3323   }
3324   nb_committed_transactions= uint4korr(ptr);
3325   ptr+= 4;
3326   tprint(tracef, "%lu committed transactions\n",
3327          (ulong)nb_committed_transactions);
3328   /* no purging => committed transactions are not important */
3329   ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
3330 
3331   /* tables  */
3332   nb_tables= uint4korr(ptr);
3333   ptr+= 4;
3334   tprint(tracef, "%u open tables\n", nb_tables);
3335   for (i= 0; i< nb_tables; i++)
3336   {
3337     char name[FN_REFLEN];
3338     LSN first_log_write_lsn;
3339     size_t name_len;
3340     uint16 sid= uint2korr(ptr);
3341     ptr+= 2;
3342     DBUG_ASSERT(sid > 0);
3343     first_log_write_lsn= lsn_korr(ptr);
3344     ptr+= LSN_STORE_SIZE;
3345     name_len= strlen((char *)ptr) + 1;
3346     strmake_buf(name, (char *)ptr);
3347     ptr+= name_len;
3348     if (new_table(sid, name, first_log_write_lsn))
3349       return LSN_ERROR;
3350   }
3351 
3352   /* dirty pages */
3353   nb_dirty_pages= uint8korr(ptr);
3354 
3355   /* Ensure casts later will not loose significant bits. */
3356   DBUG_ASSERT((nb_dirty_pages <= SIZE_T_MAX/sizeof(struct st_dirty_page)) &&
3357               (nb_dirty_pages <= ULONG_MAX));
3358 
3359   ptr+= 8;
3360   tprint(tracef, "%lu dirty pages\n", (ulong) nb_dirty_pages);
3361   if (my_hash_init(&all_dirty_pages, &my_charset_bin, (ulong)nb_dirty_pages,
3362                    offsetof(struct st_dirty_page, file_and_page_id),
3363                    sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
3364                    NULL, NULL, 0))
3365     return LSN_ERROR;
3366   dirty_pages_pool=
3367     (struct st_dirty_page *)my_malloc((size_t)nb_dirty_pages *
3368                                       sizeof(struct st_dirty_page),
3369                                       MYF(MY_WME));
3370   if (unlikely(dirty_pages_pool == NULL))
3371     return LSN_ERROR;
3372   next_dirty_page_in_pool= dirty_pages_pool;
3373   minimum_rec_lsn_of_dirty_pages= LSN_MAX;
3374   if (maria_recovery_verbose)
3375     tprint(tracef, "Table_id  Is_index       Page_id    Rec_lsn\n");
3376   for (i= 0; i < nb_dirty_pages ; i++)
3377   {
3378     pgcache_page_no_t page_id;
3379     LSN rec_lsn;
3380     uint32 is_index;
3381     uint16 table_id= uint2korr(ptr);
3382     ptr+= 2;
3383     is_index= ptr[0];
3384     ptr++;
3385     page_id= page_korr(ptr);
3386     ptr+= PAGE_STORE_SIZE;
3387     rec_lsn= lsn_korr(ptr);
3388     ptr+= LSN_STORE_SIZE;
3389     if (new_page((is_index << 16) | table_id,
3390                  page_id, rec_lsn, next_dirty_page_in_pool++))
3391       return LSN_ERROR;
3392     if (maria_recovery_verbose)
3393       tprint(tracef, "%8u  %8u  %12lu    " LSN_FMT "\n", (uint) table_id,
3394              (uint) is_index, (ulong) page_id, LSN_IN_PARTS(rec_lsn));
3395     set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
3396   }
3397   /* after that, there will be no insert/delete into the hash */
3398   /*
3399     sanity check on record (did we screw up with all those "ptr+=", did the
3400     checkpoint write code and checkpoint read code go out of sync?).
3401   */
3402   if (ptr != (log_record_buffer.str + log_record_buffer.length))
3403   {
3404     eprint(tracef, "checkpoint record corrupted\n");
3405     return LSN_ERROR;
3406   }
3407 
3408   /*
3409     start_address is now from where the dirty pages list can be ignored.
3410     Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for
3411     translog_read_record() functions.
3412   */
3413   start_address= checkpoint_start=
3414     translog_next_LSN(start_address, LSN_IMPOSSIBLE);
3415   tprint(tracef, "Checkpoint record start_horizon now adjusted to"
3416          " LSN " LSN_FMT "\n", LSN_IN_PARTS(start_address));
3417   if (checkpoint_start == LSN_IMPOSSIBLE)
3418   {
3419     /*
3420       There must be a problem, as our checkpoint record exists and is >= the
3421       address which is stored in its first bytes, which is >= start_address.
3422     */
3423     return LSN_ERROR;
3424   }
3425   /* now, where the REDO phase should start reading log: */
3426   tprint(tracef, "Checkpoint has min_rec_lsn of dirty pages at"
3427          " LSN " LSN_FMT "\n", LSN_IN_PARTS(minimum_rec_lsn_of_dirty_pages));
3428   set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);
3429   DBUG_PRINT("info",
3430              ("checkpoint_start: " LSN_FMT " start_address: " LSN_FMT,
3431               LSN_IN_PARTS(checkpoint_start), LSN_IN_PARTS(start_address)));
3432   return start_address;
3433 }
3434 
3435 
new_page(uint32 fileid,pgcache_page_no_t pageid,LSN rec_lsn,struct st_dirty_page * dirty_page)3436 static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
3437                     struct st_dirty_page *dirty_page)
3438 {
3439   /* serves as hash key */
3440   dirty_page->file_and_page_id= (((uint64)fileid) << 40) | pageid;
3441   dirty_page->rec_lsn= rec_lsn;
3442   return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
3443 }
3444 
3445 
close_all_tables(void)3446 static int close_all_tables(void)
3447 {
3448   int error= 0;
3449   uint count= 0;
3450   LIST *list_element, *next_open;
3451   MARIA_HA *info;
3452   TRANSLOG_ADDRESS addr;
3453   DBUG_ENTER("close_all_tables");
3454 
3455   mysql_mutex_lock(&THR_LOCK_maria);
3456   if (maria_open_list == NULL)
3457     goto end;
3458   tprint(tracef, "Closing all tables\n");
3459   if (tracef != stdout)
3460   {
3461     if (recovery_message_printed == REC_MSG_NONE)
3462       print_preamble();
3463     for (count= 0, list_element= maria_open_list ;
3464          list_element ; count++, (list_element= list_element->next))
3465       ;
3466     fprintf(stderr, "tables to flush:");
3467     recovery_message_printed= REC_MSG_FLUSH;
3468   }
3469   /*
3470     Since the end of end_of_redo_phase(), we may have written new records
3471     (if UNDO phase ran)  and thus the state is newer than at
3472     end_of_redo_phase(), we need to bump is_of_horizon again.
3473   */
3474   addr= translog_get_horizon();
3475   for (list_element= maria_open_list ; ; list_element= next_open)
3476   {
3477     if (recovery_message_printed == REC_MSG_FLUSH)
3478     {
3479       fprintf(stderr, " %u", count--);
3480       fflush(stderr);
3481     }
3482     if (list_element == NULL)
3483       break;
3484     next_open= list_element->next;
3485     info= (MARIA_HA*)list_element->data;
3486     mysql_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
3487     /*
3488       Tables which we see here are exactly those which were open at time of
3489       crash. They might have open_count>0 as Checkpoint maybe flushed their
3490       state while they were used. As Recovery corrected them, don't alarm the
3491       user, don't ask for a table check:
3492     */
3493     if (info->s->state.open_count != 0)
3494     {
3495       /* let maria_close() mark the table properly closed */
3496       info->s->state.open_count= 1;
3497       info->s->global_changed= 1;
3498       info->s->changed= 1;
3499     }
3500     prepare_table_for_close(info, addr);
3501     error|= maria_close(info);
3502     mysql_mutex_lock(&THR_LOCK_maria);
3503 
3504     /* Force a crash to test recovery of recovery */
3505     if (maria_recovery_force_crash_counter)
3506     {
3507       DBUG_ASSERT(--maria_recovery_force_crash_counter > 0);
3508     }
3509   }
3510 end:
3511   if (recovery_message_printed == REC_MSG_FLUSH)
3512   {
3513     fputc('\n', stderr);
3514     fflush(stderr);
3515   }
3516   mysql_mutex_unlock(&THR_LOCK_maria);
3517   DBUG_RETURN(error);
3518 }
3519 
3520 
3521 /**
3522    @brief Close all table instances with a certain name which are present in
3523    all_tables.
3524 
3525    @param  name                Name of table
3526    @param  addr                Log address passed to prepare_table_for_close()
3527 */
3528 
close_one_table(const char * name,TRANSLOG_ADDRESS addr)3529 static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr)
3530 {
3531   my_bool res= 0;
3532   /* There are no other threads using the tables, so we don't need any locks */
3533   struct st_table_for_recovery *internal_table, *end;
3534   for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
3535        internal_table < end ;
3536        internal_table++)
3537   {
3538     MARIA_HA *info= internal_table->info;
3539     if ((info != NULL) && !strcmp(info->s->open_file_name.str, name))
3540     {
3541       prepare_table_for_close(info, addr);
3542       if (maria_close(info))
3543         res= 1;
3544       internal_table->info= NULL;
3545     }
3546   }
3547   return res;
3548 }
3549 
3550 
3551 /**
3552    Temporarily disables logging for this table.
3553 
3554    If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log
3555    to warn log readers.
3556 
3557    @param  info            table
3558    @param  log_incomplete  if that disabling makes the log incomplete
3559 
3560    @note for example in the REDO phase we disable logging but that does not
3561    make the log incomplete.
3562 */
3563 
_ma_tmp_disable_logging_for_table(MARIA_HA * info,my_bool log_incomplete)3564 void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
3565                                        my_bool log_incomplete)
3566 {
3567   MARIA_SHARE *share= info->s;
3568   DBUG_ENTER("_ma_tmp_disable_logging_for_table");
3569 
3570   /*
3571     We have to ensure that bitmap is flushed, as it's checking
3572     that share->now_transactional is set
3573   */
3574   if (share->now_transactional && share->data_file_type == BLOCK_RECORD)
3575     _ma_bitmap_flush_all(share);
3576 
3577   if (log_incomplete)
3578   {
3579     uchar log_data[FILEID_STORE_SIZE];
3580     LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
3581     LSN lsn;
3582     log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
3583     log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
3584     translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG,
3585                           &dummy_transaction_object, info,
3586                           (translog_size_t) sizeof(log_data),
3587                           TRANSLOG_INTERNAL_PARTS + 1, log_array,
3588                           log_data, NULL);
3589   }
3590 
3591   /* if we disabled before writing the record, record wouldn't reach log */
3592   share->now_transactional= FALSE;
3593 
3594   /*
3595     Reset state pointers. This is needed as in ALTER table we may do
3596     commit followed by _ma_renable_logging_for_table and then
3597     info->state may point to a state that was deleted by
3598     _ma_trnman_end_trans_hook()
3599    */
3600   share->state.no_logging= *info->state;
3601   info->state= &share->state.no_logging;
3602   info->switched_transactional= TRUE;
3603 
3604   /*
3605     Some code in ma_blockrec.c assumes a trn even if !now_transactional but in
3606     this case it only reads trn->rec_lsn, which has to be LSN_IMPOSSIBLE and
3607     should be now. info->trn may be NULL in maria_chk.
3608   */
3609   if (info->trn == NULL)
3610   {
3611     info->trn= &dummy_transaction_object;
3612     info->trn_next= 0;
3613     info->trn_prev= 0;
3614   }
3615 
3616   DBUG_ASSERT(info->trn->rec_lsn == LSN_IMPOSSIBLE);
3617   share->page_type= PAGECACHE_PLAIN_PAGE;
3618   /* Functions below will pick up now_transactional and change callbacks */
3619   _ma_set_data_pagecache_callbacks(&info->dfile, share);
3620   _ma_set_index_pagecache_callbacks(&share->kfile, share);
3621   _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
3622   DBUG_VOID_RETURN;
3623 }
3624 
3625 
3626 /**
3627    Re-enables logging for a table which had it temporarily disabled.
3628 
3629    Only the thread which disabled logging is allowed to reenable it. Indeed,
3630    re-enabling logging affects all open instances, one must have exclusive
3631    access to the table to do that. In practice, the one which disables has
3632    such access.
3633 
3634    @param  info            table
3635    @param  flush_pages     if function needs to flush pages first
3636 */
3637 
_ma_reenable_logging_for_table(MARIA_HA * info,my_bool flush_pages)3638 my_bool _ma_reenable_logging_for_table(MARIA_HA *info, my_bool flush_pages)
3639 {
3640   MARIA_SHARE *share= info->s;
3641   DBUG_ENTER("_ma_reenable_logging_for_table");
3642 
3643   if (share->now_transactional == share->base.born_transactional ||
3644       !info->switched_transactional)
3645   {
3646     info->switched_transactional= FALSE;
3647     DBUG_RETURN(0);
3648   }
3649   info->switched_transactional= FALSE;
3650 
3651   if ((share->now_transactional= share->base.born_transactional))
3652   {
3653     share->page_type= PAGECACHE_LSN_PAGE;
3654 
3655     /*
3656       Copy state information that where updated while the table was used
3657       in not transactional mode
3658     */
3659     _ma_copy_nontrans_state_information(info);
3660     _ma_reset_history(info->s);
3661 
3662     /* Reset state to point to state.common, as on open() */
3663     info->state=  &share->state.common;
3664     *info->state=  share->state.state;
3665 
3666     if (flush_pages)
3667     {
3668       /* Ensure that recover is not executing any redo before this */
3669       if (!maria_in_recovery)
3670       {
3671         if (share->id != 0)
3672         {
3673           mysql_mutex_lock(&share->intern_lock);
3674           translog_deassign_id_from_share(share);
3675           mysql_mutex_unlock(&share->intern_lock);
3676         }
3677         share->state.is_of_horizon= share->state.create_rename_lsn=
3678           share->state.skip_redo_lsn= translog_get_horizon();
3679       }
3680       /*
3681         We are going to change callbacks; if a page is flushed at this moment
3682         this can cause race conditions, that's one reason to flush pages
3683         now. Other reasons: a checkpoint could be running and miss pages; the
3684         pages have type PAGECACHE_PLAIN_PAGE which should not remain. As
3685         there are no REDOs for pages, them, bitmaps and the state also have to
3686         be flushed and synced.
3687       */
3688       if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
3689                                 FLUSH_RELEASE, FLUSH_RELEASE) ||
3690           _ma_state_info_write(share,
3691                                MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
3692                                MA_STATE_INFO_WRITE_LOCK) ||
3693           _ma_sync_table_files(info))
3694         DBUG_RETURN(1);
3695     }
3696     else if (!maria_in_recovery)
3697     {
3698       /*
3699         Except in Recovery, we mustn't leave dirty pages (see comments above).
3700         Note that this does not verify that the state was flushed, but hey.
3701       */
3702       pagecache_file_no_dirty_page(share->pagecache, &info->dfile);
3703       pagecache_file_no_dirty_page(share->pagecache, &share->kfile);
3704     }
3705     _ma_set_data_pagecache_callbacks(&info->dfile, share);
3706     _ma_set_index_pagecache_callbacks(&share->kfile, share);
3707     _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
3708     /*
3709       info->trn was not changed in the disable/enable combo, so that it's
3710       still usable in this kind of combination:
3711       external_lock;
3712       start_bulk_insert; # table is empty, disables logging
3713       end_bulk_insert;   # enables logging
3714       start_bulk_insert; # table is not empty, logging stays
3715                          # so rows insertion needs the real trn.
3716       as happens during row-based replication on the slave.
3717     */
3718   }
3719   DBUG_RETURN(0);
3720 }
3721 
3722 
print_redo_phase_progress(TRANSLOG_ADDRESS addr)3723 static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
3724 {
3725   static uint end_logno= FILENO_IMPOSSIBLE, percentage_printed= 0;
3726   static ulong end_offset;
3727   static ulonglong initial_remainder= ~(ulonglong) 0;
3728 
3729   uint cur_logno;
3730   ulong cur_offset;
3731   ulonglong local_remainder;
3732   uint percentage_done;
3733 
3734   if (tracef == stdout)
3735     return;
3736   if (recovery_message_printed == REC_MSG_NONE)
3737   {
3738     print_preamble();
3739     fprintf(stderr, "recovered pages: 0%%");
3740     fflush(stderr);
3741     procent_printed= 1;
3742     recovery_message_printed= REC_MSG_REDO;
3743   }
3744   if (end_logno == FILENO_IMPOSSIBLE)
3745   {
3746     LSN end_addr= translog_get_horizon();
3747     end_logno= LSN_FILE_NO(end_addr);
3748     end_offset= LSN_OFFSET(end_addr);
3749   }
3750   cur_logno= LSN_FILE_NO(addr);
3751   cur_offset= LSN_OFFSET(addr);
3752   local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) :
3753     (((longlong)log_file_size) - cur_offset +
3754      MY_MAX(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) +
3755      end_offset);
3756   if (initial_remainder == (ulonglong)(-1))
3757     initial_remainder= local_remainder;
3758   percentage_done= (uint) ((initial_remainder - local_remainder) * 100ULL /
3759                            initial_remainder);
3760   if ((percentage_done - percentage_printed) >= 10)
3761   {
3762     percentage_printed= percentage_done;
3763     fprintf(stderr, " %u%%", percentage_done);
3764     fflush(stderr);
3765     procent_printed= 1;
3766   }
3767 }
3768 
3769 
3770 #ifdef MARIA_EXTERNAL_LOCKING
3771 #error Marias Checkpoint and Recovery are really not ready for it
3772 #endif
3773 
3774 /*
3775 Recovery of the state :  how it works
3776 =====================================
3777 
3778 Here we ignore Checkpoints for a start.
3779 
3780 The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in
3781 memory frequently (at least at every row write/update/delete) but goes
3782 to disk at few moments: maria_close() when closing the last open
3783 instance, and a few rare places like CHECK/REPAIR/ALTER
3784 (non-transactional tables also do it at maria_lock_database() but we
3785 needn't cover them here).
3786 
3787 In case of crash, state on disk is likely to be older than what it was
3788 in memory, the REDO phase needs to recreate the state as it was in
3789 memory at the time of crash. When we say Recovery here we will always
3790 mean "REDO phase".
3791 
3792 For example MARIA_STATUS_INFO::records (count of records). It is updated at
3793 the end of every row write/update/delete/delete_all. When Recovery sees the
3794 sign of such row operation (UNDO or REDO), it may need to update the records'
3795 count if that count does not reflect that operation (is older). How to know
3796 the age of the state compared to the log record: every time the state
3797 goes to disk at runtime, its member "is_of_horizon" is updated to the
3798 current end-of-log horizon. So Recovery just needs to compare is_of_horizon
3799 and the record's LSN to know if it should modify "records".
3800 
3801 Other operations like ALTER TABLE DISABLE KEYS update the state but
3802 don't write log records, thus the REDO phase cannot repeat their
3803 effect on the state in case of crash. But we make them sync the state
3804 as soon as they have finished. This reduces the window for a problem.
3805 
3806 It looks like only one thread at a time updates the state in memory or
3807 on disk. We assume that the upper level (normally MySQL) has protection
3808 against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these
3809 are not issued while there are any running transactions on the given table.
3810 If this is not done, we may write a corrupted state to disk.
3811 
3812 With checkpoints
3813 ================
3814 
3815 Checkpoint module needs to read the state in memory and write it to
3816 disk. This may happen while some other thread is modifying the state
3817 in memory or on disk. Checkpoint thus may be reading changing data, it
3818 needs a mutex to not have it corrupted, and concurrent modifiers of
3819 the state need that mutex too for the same reason.
3820 "records" is modified for every row write/update/delete, we don't want
3821 to add a mutex lock/unlock there. So we re-use the mutex lock/unlock
3822 which is already present in these moments, namely the log's mutex which is
3823 taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in
3824 under-log-mutex hooks when writing these records (thus "records" is
3825 not updated at the end of maria_write/update/delete() anymore).
3826 Thus Checkpoint takes the log's lock and can read "records" from
3827 memory an write it to disk and release log's lock.
3828 We however want to avoid having the disk write under the log's
3829 lock. So it has to be under another mutex, natural choice is
3830 intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile,
3831 and as maria_close() takes it too). All state writes to disk are
3832 changed to be protected with intern_lock.
3833 So Checkpoint takes intern_lock, log's lock, reads "records" from
3834 memory, releases log's lock, updates is_of_horizon and writes "records" to
3835 disk, release intern_lock.
3836 In practice, not only "records" needs to be written but the full
3837 state. So, Checkpoint reads the full state from memory. Some other
3838 thread may at this moment be modifying in memory some pieces of the
3839 state which are not protected by the lock's log (see ma_extra.c
3840 HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state
3841 from memory; to guard against that we extend the intern_lock-zone to
3842 changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and
3843 also any change made in memory to create_rename_lsn/state_is_of_horizon.
3844 Last, we don't want in Checkpoint to do
3845  log lock; read state from memory; release log lock;
3846 for each table, it may hold the log's lock too much in total.
3847 So, we instead do
3848  log lock; read N states from memory; release log lock;
3849 Thus, the sequence above happens outside of any intern_lock.
3850 But this re-introduces the problem that some other thread may be changing the
3851 state in memory and on disk under intern_lock, without log's lock, like
3852 HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later
3853 comes to handling the table under intern_lock, which is serialized with
3854 HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state
3855 was read from memory under log's lock, and thus can decide to not flush the
3856 obsolete state it has, knowing that the other thread flushed a more recent
3857 state already. If on the other hand is_of_horizon is not higher, the read
3858 state is current and can be flushed. So we have a per-table sequence:
3859  lock intern_lock; test if is_of_horizon is higher than when we read the state
3860  under log's lock; if no then flush the read state to disk.
3861 */
3862 
3863 /* some comments and pseudo-code which we keep for later */
3864 #if 0
3865   /*
3866     MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
3867     after a certain amount of log records have been executed. This helps
3868     against repeated crashes. Those checkpoints could not be user-requested
3869     (as engine is not communicating during the REDO phase), so they would be
3870     automatic: this changes the original assumption that we don't write to the
3871     log while in the REDO phase, but why not. How often should we checkpoint?
3872   */
3873 
3874   /*
3875     We want to have two steps:
3876     engine->recover_with_max_memory();
3877     next_engine->recover_with_max_memory();
3878     engine->init_with_normal_memory();
3879     next_engine->init_with_normal_memory();
3880     So: in recover_with_max_memory() allocate a giant page cache, do REDO
3881     phase, then all page cache is flushed and emptied and freed (only retain
3882     small structures like TM): take full checkpoint, which is useful if
3883     next engine crashes in its recovery the next second.
3884     Destroy all shares (maria_close()), then at init_with_normal_memory() we
3885     do this:
3886   */
3887 
3888   /**** UNDO PHASE *****/
3889 
3890   /*
3891     Launch one or more threads to do the background rollback. Don't wait for
3892     them to complete their rollback (background rollback; for debugging, we
3893     can have an option which waits). Set a counter (total_of_rollback_threads)
3894     to the number of threads to lauch.
3895 
3896     Note that InnoDB's rollback-in-background works as long as InnoDB is the
3897     last engine to recover, otherwise MySQL will refuse new connections until
3898     the last engine has recovered so it's not "background" from the user's
3899     point of view. InnoDB is near top of sys_table_types so all others
3900     (e.g. BDB) recover after it... So it's really "online rollback" only if
3901     InnoDB is the only engine.
3902   */
3903 
3904   /* wake up delete/update handler */
3905   /* tell the TM that it can now accept new transactions */
3906 
3907   /*
3908     mark that checkpoint requests are now allowed.
3909   */
3910 #endif
3911