1 /*****************************************************************************
2
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2013, 2020, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /**************************************************//**
22 @file log/log0recv.cc
23 Recovery
24
25 Created 9/20/1997 Heikki Tuuri
26 *******************************************************/
27
28 #include "univ.i"
29
30 #include <map>
31 #include <string>
32 #include <my_service_manager.h>
33
34 #include "log0recv.h"
35
36 #ifdef HAVE_MY_AES_H
37 #include <my_aes.h>
38 #endif
39
40 #include "log0crypt.h"
41 #include "mem0mem.h"
42 #include "buf0buf.h"
43 #include "buf0flu.h"
44 #include "mtr0mtr.h"
45 #include "mtr0log.h"
46 #include "page0cur.h"
47 #include "page0zip.h"
48 #include "btr0btr.h"
49 #include "btr0cur.h"
50 #include "ibuf0ibuf.h"
51 #include "trx0undo.h"
52 #include "trx0rec.h"
53 #include "fil0fil.h"
54 #include "row0trunc.h"
55 #include "buf0rea.h"
56 #include "srv0srv.h"
57 #include "srv0start.h"
58 #include "trx0roll.h"
59 #include "row0merge.h"
60 #include "fil0pagecompress.h"
61
62 /** Log records are stored in the hash table in chunks at most of this size;
63 this must be less than srv_page_size as it is stored in the buffer pool */
64 #define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t) - REDZONE_SIZE)
65
66 /** Read-ahead area in applying log records to file pages */
67 #define RECV_READ_AHEAD_AREA 32
68
69 /** The recovery system */
70 recv_sys_t* recv_sys;
71 /** TRUE when applying redo log records during crash recovery; FALSE
72 otherwise. Note that this is FALSE while a background thread is
73 rolling back incomplete transactions. */
74 volatile bool recv_recovery_on;
75
76 /** TRUE when recv_init_crash_recovery() has been called. */
77 bool recv_needed_recovery;
78 #ifdef UNIV_DEBUG
79 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
80 Protected by log_sys.mutex. */
81 bool recv_no_log_write = false;
82 #endif /* UNIV_DEBUG */
83
84 /** TRUE if buf_page_is_corrupted() should check if the log sequence
85 number (FIL_PAGE_LSN) is in the future. Initially FALSE, and set by
86 recv_recovery_from_checkpoint_start(). */
87 bool recv_lsn_checks_on;
88
89 /** If the following is TRUE, the buffer pool file pages must be invalidated
90 after recovery and no ibuf operations are allowed; this becomes TRUE if
91 the log record hash table becomes too full, and log records must be merged
92 to file pages already before the recovery is finished: in this case no
93 ibuf operations are allowed, as they could modify the pages read in the
94 buffer pool before the pages have been recovered to the up-to-date state.
95
96 TRUE means that recovery is running and no operations on the log files
97 are allowed yet: the variable name is misleading. */
98 bool recv_no_ibuf_operations;
99
100 /** The type of the previous parsed redo log record */
101 static mlog_id_t recv_previous_parsed_rec_type;
102 /** The offset of the previous parsed redo log record */
103 static ulint recv_previous_parsed_rec_offset;
104 /** The 'multi' flag of the previous parsed redo log record */
105 static ulint recv_previous_parsed_rec_is_multi;
106
107 /** The maximum lsn we see for a page during the recovery process. If this
108 is bigger than the lsn we are able to scan up to, that is an indication that
109 the recovery failed and the database may be corrupt. */
110 static lsn_t recv_max_page_lsn;
111
112 #ifdef UNIV_PFS_THREAD
113 mysql_pfs_key_t trx_rollback_clean_thread_key;
114 mysql_pfs_key_t recv_writer_thread_key;
115 #endif /* UNIV_PFS_THREAD */
116
117 /** Is recv_writer_thread active? */
118 bool recv_writer_thread_active;
119
120 #ifndef DBUG_OFF
121 /** Return string name of the redo log record type.
122 @param[in] type record log record enum
123 @return string name of record log record */
124 static const char* get_mlog_string(mlog_id_t type);
125 #endif /* !DBUG_OFF */
126
127 /** Tablespace item during recovery */
128 struct file_name_t {
129 /** Tablespace file name (MLOG_FILE_NAME) */
130 std::string name;
131 /** Tablespace object (NULL if not valid or not found) */
132 fil_space_t* space;
133
134 /** Tablespace status. */
135 enum fil_status {
136 /** Normal tablespace */
137 NORMAL,
138 /** Deleted tablespace */
139 DELETED,
140 /** Missing tablespace */
141 MISSING
142 };
143
144 /** Status of the tablespace */
145 fil_status status;
146
147 /** FSP_SIZE of tablespace */
148 ulint size;
149
150 /** the log sequence number of the last observed MLOG_INDEX_LOAD
151 record for the tablespace */
152 lsn_t enable_lsn;
153
154 /** Constructor */
file_name_tfile_name_t155 file_name_t(std::string name_, bool deleted) :
156 name(name_), space(NULL), status(deleted ? DELETED: NORMAL),
157 size(0), enable_lsn(0) {}
158
159 /** Report a MLOG_INDEX_LOAD operation, meaning that
160 mlog_init for any earlier LSN must be skipped.
161 @param lsn log sequence number of the MLOG_INDEX_LOAD */
mlog_index_loadfile_name_t162 void mlog_index_load(lsn_t lsn)
163 {
164 if (enable_lsn < lsn) enable_lsn = lsn;
165 }
166 };
167
168 /** Map of dirty tablespaces during recovery */
169 typedef std::map<
170 ulint,
171 file_name_t,
172 std::less<ulint>,
173 ut_allocator<std::pair<const ulint, file_name_t> > > recv_spaces_t;
174
175 static recv_spaces_t recv_spaces;
176
177 /** States of recv_addr_t */
178 enum recv_addr_state {
179 /** not yet processed */
180 RECV_NOT_PROCESSED,
181 /** not processed; the page will be reinitialized */
182 RECV_WILL_NOT_READ,
183 /** page is being read */
184 RECV_BEING_READ,
185 /** log records are being applied on the page */
186 RECV_BEING_PROCESSED,
187 /** log records have been applied on the page */
188 RECV_PROCESSED,
189 /** log records have been discarded because the tablespace
190 does not exist */
191 RECV_DISCARDED
192 };
193
194 /** Hashed page file address struct */
195 struct recv_addr_t{
196 /** recovery state of the page */
197 recv_addr_state state;
198 /** tablespace identifier */
199 unsigned space:32;
200 /** page number */
201 unsigned page_no:32;
202 /** list of log records for this page */
203 UT_LIST_BASE_NODE_T(recv_t) rec_list;
204 /** hash node in the hash bucket chain */
205 hash_node_t addr_hash;
206 };
207
208 /** Report optimized DDL operation (without redo log),
209 corresponding to MLOG_INDEX_LOAD.
210 @param[in] space_id tablespace identifier
211 */
212 void (*log_optimized_ddl_op)(ulint space_id);
213
214 /** Report backup-unfriendly TRUNCATE operation (with separate log file),
215 corresponding to MLOG_TRUNCATE. */
216 void (*log_truncate)();
217
218 /** Report an operation to create, delete, or rename a file during backup.
219 @param[in] space_id tablespace identifier
220 @param[in] flags tablespace flags (NULL if not create)
221 @param[in] name file name (not NUL-terminated)
222 @param[in] len length of name, in bytes
223 @param[in] new_name new file name (NULL if not rename)
224 @param[in] new_len length of new_name, in bytes (0 if NULL) */
225 void (*log_file_op)(ulint space_id, const byte* flags,
226 const byte* name, ulint len,
227 const byte* new_name, ulint new_len);
228
229 /** Information about initializing page contents during redo log processing */
230 class mlog_init_t
231 {
232 public:
233 /** A page initialization operation that was parsed from
234 the redo log */
235 struct init {
236 /** log sequence number of the page initialization */
237 lsn_t lsn;
238 /** Whether btr_page_create() avoided a read of the page.
239
240 At the end of the last recovery batch, ibuf_merge()
241 will invoke change buffer merge for pages that reside
242 in the buffer pool. (In the last batch, loading pages
243 would trigger change buffer merge.) */
244 bool created;
245 };
246
247 private:
248 typedef std::map<const page_id_t, init,
249 std::less<const page_id_t>,
250 ut_allocator<std::pair<const page_id_t, init> > >
251 map;
252 /** Map of page initialization operations.
253 FIXME: Merge this to recv_sys->addr_hash! */
254 map inits;
255 public:
256 /** Record that a page will be initialized by the redo log.
257 @param[in] space tablespace identifier
258 @param[in] page_no page number
259 @param[in] lsn log sequence number */
add(ulint space,ulint page_no,lsn_t lsn)260 void add(ulint space, ulint page_no, lsn_t lsn)
261 {
262 ut_ad(mutex_own(&recv_sys->mutex));
263 const init init = { lsn, false };
264 std::pair<map::iterator, bool> p = inits.insert(
265 map::value_type(page_id_t(space, page_no), init));
266 ut_ad(!p.first->second.created);
267 if (!p.second && p.first->second.lsn < init.lsn) {
268 p.first->second = init;
269 }
270 }
271
272 /** Get the last stored lsn of the page id and its respective
273 init/load operation.
274 @param[in] page_id page id
275 @param[in,out] init initialize log or load log
276 @return the latest page initialization;
277 not valid after releasing recv_sys->mutex. */
last(page_id_t page_id)278 init& last(page_id_t page_id)
279 {
280 ut_ad(mutex_own(&recv_sys->mutex));
281 return inits.find(page_id)->second;
282 }
283
284 /** At the end of each recovery batch, reset the 'created' flags. */
reset()285 void reset()
286 {
287 ut_ad(mutex_own(&recv_sys->mutex));
288 ut_ad(recv_no_ibuf_operations);
289 for (map::iterator i= inits.begin(); i != inits.end(); i++) {
290 i->second.created = false;
291 }
292 }
293
294 /** On the last recovery batch, merge buffered changes to those
295 pages that were initialized by buf_page_create() and still reside
296 in the buffer pool. Stale pages are not allowed in the buffer pool.
297
298 Note: When MDEV-14481 implements redo log apply in the
299 background, we will have to ensure that buf_page_get_gen()
300 will not deliver stale pages to users (pages on which the
301 change buffer was not merged yet). Normally, the change
302 buffer merge is performed on I/O completion. Maybe, add a
303 flag to buf_page_t and perform the change buffer merge on
304 the first actual access?
305 @param[in,out] mtr dummy mini-transaction */
ibuf_merge(mtr_t & mtr)306 void ibuf_merge(mtr_t& mtr)
307 {
308 ut_ad(mutex_own(&recv_sys->mutex));
309 ut_ad(!recv_no_ibuf_operations);
310 mtr.start();
311
312 for (map::const_iterator i= inits.begin(); i != inits.end();
313 i++) {
314 if (!i->second.created) {
315 continue;
316 }
317 if (buf_block_t* block = buf_page_get_low(
318 i->first, univ_page_size, RW_X_LATCH, NULL,
319 BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
320 &mtr, NULL)) {
321 mutex_exit(&recv_sys->mutex);
322 ibuf_merge_or_delete_for_page(
323 block, i->first, block->page.size);
324 mtr.commit();
325 mtr.start();
326 mutex_enter(&recv_sys->mutex);
327 }
328 }
329
330 mtr.commit();
331 }
332
333 /** Clear the data structure */
clear()334 void clear() { inits.clear(); }
335 };
336
337 static mlog_init_t mlog_init;
338
339 /** Process a MLOG_CREATE2 record that indicates that a tablespace
340 is being shrunk in size.
341 @param[in] space_id tablespace identifier
342 @param[in] pages trimmed size of the file, in pages
343 @param[in] lsn log sequence number of the operation */
recv_addr_trim(ulint space_id,unsigned pages,lsn_t lsn)344 static void recv_addr_trim(ulint space_id, unsigned pages, lsn_t lsn)
345 {
346 DBUG_ENTER("recv_addr_trim");
347 DBUG_LOG("ib_log",
348 "discarding log beyond end of tablespace "
349 << page_id_t(space_id, pages) << " before LSN " << lsn);
350 ut_ad(mutex_own(&recv_sys->mutex));
351 for (ulint i = recv_sys->addr_hash->n_cells; i--; ) {
352 hash_cell_t* const cell = hash_get_nth_cell(
353 recv_sys->addr_hash, i);
354 for (recv_addr_t* addr = static_cast<recv_addr_t*>(cell->node),
355 *next;
356 addr; addr = next) {
357 next = static_cast<recv_addr_t*>(addr->addr_hash);
358
359 if (addr->space != space_id || addr->page_no < pages) {
360 continue;
361 }
362
363 for (recv_t* recv = UT_LIST_GET_FIRST(addr->rec_list);
364 recv; ) {
365 recv_t* n = UT_LIST_GET_NEXT(rec_list, recv);
366 if (recv->start_lsn < lsn) {
367 DBUG_PRINT("ib_log",
368 ("Discarding %s for"
369 " page %u:%u at " LSN_PF,
370 get_mlog_string(
371 recv->type),
372 addr->space, addr->page_no,
373 recv->start_lsn));
374 UT_LIST_REMOVE(addr->rec_list, recv);
375 }
376 recv = n;
377 }
378 }
379 }
380 if (fil_space_t* space = fil_space_get(space_id)) {
381 ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
382 fil_node_t* file = UT_LIST_GET_FIRST(space->chain);
383 ut_ad(file->is_open());
384 os_file_truncate(file->name, file->handle,
385 os_offset_t(pages) << srv_page_size_shift,
386 true);
387 }
388 DBUG_VOID_RETURN;
389 }
390
391 /** Process a file name from a MLOG_FILE_* record.
392 @param[in,out] name file name
393 @param[in] len length of the file name
394 @param[in] space_id the tablespace ID
395 @param[in] deleted whether this is a MLOG_FILE_DELETE record */
396 static
397 void
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)398 fil_name_process(
399 char* name,
400 ulint len,
401 ulint space_id,
402 bool deleted)
403 {
404 if (srv_operation == SRV_OPERATION_BACKUP) {
405 return;
406 }
407
408 ut_ad(srv_operation == SRV_OPERATION_NORMAL
409 || is_mariabackup_restore_or_export());
410
411 /* We will also insert space=NULL into the map, so that
412 further checks can ensure that a MLOG_FILE_NAME record was
413 scanned before applying any page records for the space_id. */
414
415 os_normalize_path(name);
416 file_name_t fname(std::string(name, len - 1), deleted);
417 std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.insert(
418 std::make_pair(space_id, fname));
419 ut_ad(p.first->first == space_id);
420
421 file_name_t& f = p.first->second;
422
423 if (deleted) {
424 /* Got MLOG_FILE_DELETE */
425
426 if (!p.second && f.status != file_name_t::DELETED) {
427 f.status = file_name_t::DELETED;
428 if (f.space != NULL) {
429 fil_space_free(space_id, false);
430 f.space = NULL;
431 }
432 }
433
434 ut_ad(f.space == NULL);
435 } else if (p.second // the first MLOG_FILE_NAME or MLOG_FILE_RENAME2
436 || f.name != fname.name) {
437 fil_space_t* space;
438
439 /* Check if the tablespace file exists and contains
440 the space_id. If not, ignore the file after displaying
441 a note. Abort if there are multiple files with the
442 same space_id. */
443 switch (fil_ibd_load(space_id, name, space)) {
444 case FIL_LOAD_OK:
445 ut_ad(space != NULL);
446
447 if (f.space == NULL || f.space == space) {
448
449 if (f.size && f.space == NULL) {
450 fil_space_set_recv_size(space->id, f.size);
451 }
452
453 f.name = fname.name;
454 f.space = space;
455 f.status = file_name_t::NORMAL;
456 } else {
457 ib::error() << "Tablespace " << space_id
458 << " has been found in two places: '"
459 << f.name << "' and '" << name << "'."
460 " You must delete one of them.";
461 recv_sys->found_corrupt_fs = true;
462 }
463 break;
464
465 case FIL_LOAD_ID_CHANGED:
466 ut_ad(space == NULL);
467 break;
468
469 case FIL_LOAD_NOT_FOUND:
470 /* No matching tablespace was found; maybe it
471 was renamed, and we will find a subsequent
472 MLOG_FILE_* record. */
473 ut_ad(space == NULL);
474
475 if (srv_force_recovery) {
476 /* Without innodb_force_recovery,
477 missing tablespaces will only be
478 reported in
479 recv_init_crash_recovery_spaces().
480 Enable some more diagnostics when
481 forcing recovery. */
482
483 ib::info()
484 << "At LSN: " << recv_sys->recovered_lsn
485 << ": unable to open file " << name
486 << " for tablespace " << space_id;
487 }
488 break;
489
490 case FIL_LOAD_INVALID:
491 ut_ad(space == NULL);
492 if (srv_force_recovery == 0) {
493 ib::warn() << "We do not continue the crash"
494 " recovery, because the table may"
495 " become corrupt if we cannot apply"
496 " the log records in the InnoDB log to"
497 " it. To fix the problem and start"
498 " mysqld:";
499 ib::info() << "1) If there is a permission"
500 " problem in the file and mysqld"
501 " cannot open the file, you should"
502 " modify the permissions.";
503 ib::info() << "2) If the tablespace is not"
504 " needed, or you can restore an older"
505 " version from a backup, then you can"
506 " remove the .ibd file, and use"
507 " --innodb_force_recovery=1 to force"
508 " startup without this file.";
509 ib::info() << "3) If the file system or the"
510 " disk is broken, and you cannot"
511 " remove the .ibd file, you can set"
512 " --innodb_force_recovery.";
513 recv_sys->found_corrupt_fs = true;
514 break;
515 }
516
517 ib::info() << "innodb_force_recovery was set to "
518 << srv_force_recovery << ". Continuing crash"
519 " recovery even though we cannot access the"
520 " files for tablespace " << space_id << ".";
521 break;
522 }
523 }
524 }
525
526 /** Parse or process a MLOG_FILE_* record.
527 @param[in] ptr redo log record
528 @param[in] end end of the redo log buffer
529 @param[in] space_id the tablespace ID
530 @param[in] first_page_no first page number in the file
531 @param[in] type MLOG_FILE_NAME or MLOG_FILE_DELETE
532 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
533 @param[in] apply whether to apply the record
534 @return pointer to next redo log record
535 @retval NULL if this log record was truncated */
536 static
537 byte*
fil_name_parse(byte * ptr,const byte * end,ulint space_id,ulint first_page_no,mlog_id_t type,bool apply)538 fil_name_parse(
539 byte* ptr,
540 const byte* end,
541 ulint space_id,
542 ulint first_page_no,
543 mlog_id_t type,
544 bool apply)
545 {
546 if (type == MLOG_FILE_CREATE2) {
547 if (end < ptr + 4) {
548 return(NULL);
549 }
550 ptr += 4;
551 }
552
553 if (end < ptr + 2) {
554 return(NULL);
555 }
556
557 ulint len = mach_read_from_2(ptr);
558 ptr += 2;
559 if (end < ptr + len) {
560 return(NULL);
561 }
562
563 /* MLOG_FILE_* records should only be written for
564 user-created tablespaces. The name must be long enough
565 and end in .ibd. */
566 bool corrupt = is_predefined_tablespace(space_id)
567 || len < sizeof "/a.ibd\0"
568 || (!first_page_no != !memcmp(ptr + len - 5, DOT_IBD, 5));
569
570 if (!corrupt && !memchr(ptr, OS_PATH_SEPARATOR, len)) {
571 if (byte* c = static_cast<byte*>
572 (memchr(ptr, OS_PATH_SEPARATOR_ALT, len))) {
573 ut_ad(c >= ptr);
574 ut_ad(c < ptr + len);
575 do {
576 *c = OS_PATH_SEPARATOR;
577 } while ((c = static_cast<byte*>
578 (memchr(ptr, OS_PATH_SEPARATOR_ALT,
579 len - ulint(c - ptr)))) != NULL);
580 } else {
581 corrupt = true;
582 }
583 }
584
585 byte* end_ptr = ptr + len;
586
587 switch (type) {
588 default:
589 ut_ad(0); // the caller checked this
590 /* fall through */
591 case MLOG_FILE_NAME:
592 if (UNIV_UNLIKELY(corrupt)) {
593 ib::error() << "MLOG_FILE_NAME incorrect:" << ptr;
594 recv_sys->found_corrupt_log = true;
595 break;
596 }
597
598 fil_name_process(
599 reinterpret_cast<char*>(ptr), len, space_id, false);
600 break;
601 case MLOG_FILE_DELETE:
602 if (UNIV_UNLIKELY(corrupt)) {
603 ib::error() << "MLOG_FILE_DELETE incorrect:" << ptr;
604 recv_sys->found_corrupt_log = true;
605 break;
606 }
607
608 fil_name_process(
609 reinterpret_cast<char*>(ptr), len, space_id, true);
610 /* fall through */
611 case MLOG_FILE_CREATE2:
612 if (first_page_no) {
613 ut_ad(first_page_no
614 == SRV_UNDO_TABLESPACE_SIZE_IN_PAGES);
615 ut_a(srv_is_undo_tablespace(space_id));
616 compile_time_assert(
617 UT_ARR_SIZE(recv_sys->truncated_undo_spaces)
618 == TRX_SYS_MAX_UNDO_SPACES);
619 recv_sys_t::trunc& t = recv_sys->truncated_undo_spaces[
620 space_id - srv_undo_space_id_start];
621 t.lsn = recv_sys->recovered_lsn;
622 t.pages = uint32_t(first_page_no);
623 } else if (log_file_op) {
624 log_file_op(space_id,
625 type == MLOG_FILE_CREATE2 ? ptr - 4 : NULL,
626 ptr, len, NULL, 0);
627 }
628 break;
629 case MLOG_FILE_RENAME2:
630 if (UNIV_UNLIKELY(corrupt)) {
631 ib::error() << "MLOG_FILE_RENAME2 incorrect:" << ptr;
632 recv_sys->found_corrupt_log = true;
633 }
634
635 /* The new name follows the old name. */
636 byte* new_name = end_ptr + 2;
637 if (end < new_name) {
638 return(NULL);
639 }
640
641 ulint new_len = mach_read_from_2(end_ptr);
642
643 if (end < end_ptr + 2 + new_len) {
644 return(NULL);
645 }
646
647 end_ptr += 2 + new_len;
648
649 corrupt = corrupt
650 || new_len < sizeof "/a.ibd\0"
651 || memcmp(new_name + new_len - 5, DOT_IBD, 5) != 0;
652
653 if (!corrupt && !memchr(new_name, OS_PATH_SEPARATOR, new_len)) {
654 if (byte* c = static_cast<byte*>
655 (memchr(new_name, OS_PATH_SEPARATOR_ALT,
656 new_len))) {
657 ut_ad(c >= new_name);
658 ut_ad(c < new_name + new_len);
659 do {
660 *c = OS_PATH_SEPARATOR;
661 } while ((c = static_cast<byte*>
662 (memchr(ptr, OS_PATH_SEPARATOR_ALT,
663 new_len
664 - ulint(c - new_name))))
665 != NULL);
666 } else {
667 corrupt = true;
668 }
669 }
670
671 if (UNIV_UNLIKELY(corrupt)) {
672 ib::error() << "MLOG_FILE_RENAME2 new_name incorrect:" << ptr
673 << " new_name: " << new_name;
674 recv_sys->found_corrupt_log = true;
675 break;
676 }
677
678 fil_name_process(
679 reinterpret_cast<char*>(ptr), len,
680 space_id, false);
681 fil_name_process(
682 reinterpret_cast<char*>(new_name), new_len,
683 space_id, false);
684
685 if (log_file_op) {
686 log_file_op(space_id, NULL,
687 ptr, len, new_name, new_len);
688 }
689
690 if (!apply) {
691 break;
692 }
693 if (!fil_op_replay_rename(
694 space_id, first_page_no,
695 reinterpret_cast<const char*>(ptr),
696 reinterpret_cast<const char*>(new_name))) {
697 recv_sys->found_corrupt_fs = true;
698 }
699 }
700
701 return(end_ptr);
702 }
703
704 /** Clean up after recv_sys_init() */
705 void
recv_sys_close()706 recv_sys_close()
707 {
708 if (recv_sys != NULL) {
709 recv_sys->dblwr.pages.clear();
710
711 if (recv_sys->addr_hash != NULL) {
712 hash_table_free(recv_sys->addr_hash);
713 }
714
715 if (recv_sys->heap != NULL) {
716 mem_heap_free(recv_sys->heap);
717 }
718
719 if (recv_sys->flush_start != NULL) {
720 os_event_destroy(recv_sys->flush_start);
721 }
722
723 if (recv_sys->flush_end != NULL) {
724 os_event_destroy(recv_sys->flush_end);
725 }
726
727 if (recv_sys->buf != NULL) {
728 ut_free_dodump(recv_sys->buf, recv_sys->buf_size);
729 }
730
731 ut_ad(!recv_writer_thread_active);
732 mutex_free(&recv_sys->writer_mutex);
733
734 mutex_free(&recv_sys->mutex);
735
736 ut_free(recv_sys);
737 recv_sys = NULL;
738 }
739
740 recv_spaces.clear();
741 mlog_init.clear();
742 }
743
744 /************************************************************
745 Reset the state of the recovery system variables. */
746 void
recv_sys_var_init(void)747 recv_sys_var_init(void)
748 /*===================*/
749 {
750 recv_recovery_on = false;
751 recv_needed_recovery = false;
752 recv_lsn_checks_on = false;
753 recv_no_ibuf_operations = false;
754 recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
755 recv_previous_parsed_rec_offset = 0;
756 recv_previous_parsed_rec_is_multi = 0;
757 recv_max_page_lsn = 0;
758 }
759
760 /******************************************************************//**
761 recv_writer thread tasked with flushing dirty pages from the buffer
762 pools.
763 @return a dummy parameter */
764 extern "C"
765 os_thread_ret_t
DECLARE_THREAD(recv_writer_thread)766 DECLARE_THREAD(recv_writer_thread)(
767 /*===============================*/
768 void* arg MY_ATTRIBUTE((unused)))
769 /*!< in: a dummy parameter required by
770 os_thread_create */
771 {
772 my_thread_init();
773 ut_ad(!srv_read_only_mode);
774
775 #ifdef UNIV_PFS_THREAD
776 pfs_register_thread(recv_writer_thread_key);
777 #endif /* UNIV_PFS_THREAD */
778
779 #ifdef UNIV_DEBUG_THREAD_CREATION
780 ib::info() << "recv_writer thread running, id "
781 << os_thread_pf(os_thread_get_curr_id());
782 #endif /* UNIV_DEBUG_THREAD_CREATION */
783
784 while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
785
786 /* Wait till we get a signal to clean the LRU list.
787 Bounded by max wait time of 100ms. */
788 int64_t sig_count = os_event_reset(buf_flush_event);
789 os_event_wait_time_low(buf_flush_event, 100000, sig_count);
790
791 mutex_enter(&recv_sys->writer_mutex);
792
793 if (!recv_recovery_is_on()) {
794 mutex_exit(&recv_sys->writer_mutex);
795 break;
796 }
797
798 /* Flush pages from end of LRU if required */
799 os_event_reset(recv_sys->flush_end);
800 recv_sys->flush_type = BUF_FLUSH_LRU;
801 os_event_set(recv_sys->flush_start);
802 os_event_wait(recv_sys->flush_end);
803
804 mutex_exit(&recv_sys->writer_mutex);
805 }
806
807 recv_writer_thread_active = false;
808
809 my_thread_end();
810 /* We count the number of threads in os_thread_exit().
811 A created thread should always use that to exit and not
812 use return() to exit. */
813 os_thread_exit();
814
815 OS_THREAD_DUMMY_RETURN;
816 }
817
818 /** Initialize the redo log recovery subsystem. */
819 void
recv_sys_init()820 recv_sys_init()
821 {
822 ut_ad(recv_sys == NULL);
823
824 recv_sys = static_cast<recv_sys_t*>(ut_zalloc_nokey(sizeof(*recv_sys)));
825
826 mutex_create(LATCH_ID_RECV_SYS, &recv_sys->mutex);
827 mutex_create(LATCH_ID_RECV_WRITER, &recv_sys->writer_mutex);
828
829 recv_sys->heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
830
831 if (!srv_read_only_mode) {
832 recv_sys->flush_start = os_event_create(0);
833 recv_sys->flush_end = os_event_create(0);
834 }
835
836 recv_sys->buf = static_cast<byte*>(
837 ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
838 recv_sys->buf_size = RECV_PARSING_BUF_SIZE;
839
840 recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 512);
841 recv_sys->progress_time = time(NULL);
842 recv_max_page_lsn = 0;
843
844 /* Call the constructor for recv_sys_t::dblwr member */
845 new (&recv_sys->dblwr) recv_dblwr_t();
846 }
847
848 /** Empty a fully processed hash table. */
849 static
850 void
recv_sys_empty_hash()851 recv_sys_empty_hash()
852 {
853 ut_ad(mutex_own(&(recv_sys->mutex)));
854 ut_a(recv_sys->n_addrs == 0);
855
856 hash_table_free(recv_sys->addr_hash);
857 mem_heap_empty(recv_sys->heap);
858
859 recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 512);
860 }
861
862 /********************************************************//**
863 Frees the recovery system. */
864 void
recv_sys_debug_free(void)865 recv_sys_debug_free(void)
866 /*=====================*/
867 {
868 mutex_enter(&(recv_sys->mutex));
869
870 hash_table_free(recv_sys->addr_hash);
871 mem_heap_free(recv_sys->heap);
872 ut_free_dodump(recv_sys->buf, recv_sys->buf_size);
873
874 recv_sys->buf_size = 0;
875 recv_sys->buf = NULL;
876 recv_sys->heap = NULL;
877 recv_sys->addr_hash = NULL;
878
879 /* wake page cleaner up to progress */
880 if (!srv_read_only_mode) {
881 ut_ad(!recv_recovery_is_on());
882 ut_ad(!recv_writer_thread_active);
883 os_event_reset(buf_flush_event);
884 os_event_set(recv_sys->flush_start);
885 }
886
887 mutex_exit(&(recv_sys->mutex));
888 }
889
890 /** Read a log segment to log_sys.buf.
891 @param[in,out] start_lsn in: read area start,
892 out: the last read valid lsn
893 @param[in] end_lsn read area end
894 @return whether no invalid blocks (e.g checksum mismatch) were found */
read_log_seg(lsn_t * start_lsn,lsn_t end_lsn)895 bool log_t::files::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
896 {
897 ulint len;
898 bool success = true;
899 ut_ad(log_sys.mutex.is_owned());
900 ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE));
901 ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE));
902 byte* buf = log_sys.buf;
903 loop:
904 lsn_t source_offset = calc_lsn_offset(*start_lsn);
905
906 ut_a(end_lsn - *start_lsn <= ULINT_MAX);
907 len = (ulint) (end_lsn - *start_lsn);
908
909 ut_ad(len != 0);
910
911 const bool at_eof = (source_offset % file_size) + len > file_size;
912 if (at_eof) {
913 /* If the above condition is true then len (which is ulint)
914 is > the expression below, so the typecast is ok */
915 len = ulint(file_size - (source_offset % file_size));
916 }
917
918 log_sys.n_log_ios++;
919
920 MONITOR_INC(MONITOR_LOG_IO);
921
922 ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX);
923
924 const ulint page_no = ulint(source_offset >> srv_page_size_shift);
925
926 fil_io(IORequestLogRead, true,
927 page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
928 univ_page_size,
929 ulint(source_offset & (srv_page_size - 1)),
930 len, buf, NULL);
931
932 for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE,
933 buf += OS_FILE_LOG_BLOCK_SIZE,
934 (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) {
935 const ulint block_number = log_block_get_hdr_no(buf);
936
937 if (block_number != log_block_convert_lsn_to_no(*start_lsn)) {
938 /* Garbage or an incompletely written log block.
939 We will not report any error, because this can
940 happen when InnoDB was killed while it was
941 writing redo log. We simply treat this as an
942 abrupt end of the redo log. */
943 fail:
944 end_lsn = *start_lsn;
945 success = false;
946 break;
947 }
948
949 if (innodb_log_checksums || is_encrypted()) {
950 ulint crc = log_block_calc_checksum_crc32(buf);
951 ulint cksum = log_block_get_checksum(buf);
952
953 DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
954 static int block_counter;
955 if (block_counter++ == 0) {
956 cksum = crc + 1;
957 }
958 });
959
960 DBUG_EXECUTE_IF("log_checksum_mismatch", { cksum = crc + 1; });
961
962 if (crc != cksum) {
963 ib::error_or_warn(srv_operation != SRV_OPERATION_BACKUP)
964 << "Invalid log block checksum."
965 << " block: " << block_number
966 << " checkpoint no: "
967 << log_block_get_checkpoint_no(buf)
968 << " expected: " << crc
969 << " found: " << cksum;
970 goto fail;
971 }
972
973 if (is_encrypted()) {
974 log_crypt(buf, *start_lsn,
975 OS_FILE_LOG_BLOCK_SIZE, true);
976 }
977 }
978
979 ulint dl = log_block_get_data_len(buf);
980 if (dl < LOG_BLOCK_HDR_SIZE
981 || (dl > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE
982 && dl != OS_FILE_LOG_BLOCK_SIZE)) {
983 recv_sys->found_corrupt_log = true;
984 goto fail;
985 }
986 }
987
988 if (recv_sys->report(time(NULL))) {
989 ib::info() << "Read redo log up to LSN=" << *start_lsn;
990 service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
991 "Read redo log up to LSN=" LSN_PF,
992 *start_lsn);
993 }
994
995 if (*start_lsn != end_lsn) {
996 goto loop;
997 }
998
999 return(success);
1000 }
1001
1002
1003
1004 /********************************************************//**
1005 Copies a log segment from the most up-to-date log group to the other log
1006 groups, so that they all contain the latest log data. Also writes the info
1007 about the latest checkpoint to the groups, and inits the fields in the group
1008 memory structs to up-to-date values. */
1009 static
1010 void
recv_synchronize_groups()1011 recv_synchronize_groups()
1012 {
1013 const lsn_t recovered_lsn = recv_sys->recovered_lsn;
1014
1015 /* Read the last recovered log block to the recovery system buffer:
1016 the block is always incomplete */
1017
1018 lsn_t start_lsn = ut_uint64_align_down(recovered_lsn,
1019 OS_FILE_LOG_BLOCK_SIZE);
1020 log_sys.log.read_log_seg(&start_lsn,
1021 start_lsn + OS_FILE_LOG_BLOCK_SIZE);
1022 log_sys.log.set_fields(recovered_lsn);
1023
1024 /* Copy the checkpoint info to the log; remember that we have
1025 incremented checkpoint_no by one, and the info will not be written
1026 over the max checkpoint info, thus making the preservation of max
1027 checkpoint info on disk certain */
1028
1029 if (!srv_read_only_mode) {
1030 log_write_checkpoint_info(true, 0);
1031 log_mutex_enter();
1032 }
1033 }
1034
1035 /** Check the consistency of a log header block.
1036 @param[in] log header block
1037 @return true if ok */
1038 static
1039 bool
recv_check_log_header_checksum(const byte * buf)1040 recv_check_log_header_checksum(
1041 const byte* buf)
1042 {
1043 return(log_block_get_checksum(buf)
1044 == log_block_calc_checksum_crc32(buf));
1045 }
1046
1047 /** Find the latest checkpoint in the format-0 log header.
1048 @param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1049 @return error code or DB_SUCCESS */
1050 static MY_ATTRIBUTE((warn_unused_result))
1051 dberr_t
recv_find_max_checkpoint_0(ulint * max_field)1052 recv_find_max_checkpoint_0(ulint* max_field)
1053 {
1054 ib_uint64_t max_no = 0;
1055 ib_uint64_t checkpoint_no;
1056 byte* buf = log_sys.checkpoint_buf;
1057
1058 ut_ad(log_sys.log.format == 0);
1059
1060 /** Offset of the first checkpoint checksum */
1061 static const uint CHECKSUM_1 = 288;
1062 /** Offset of the second checkpoint checksum */
1063 static const uint CHECKSUM_2 = CHECKSUM_1 + 4;
1064 /** Most significant bits of the checkpoint offset */
1065 static const uint OFFSET_HIGH32 = CHECKSUM_2 + 12;
1066 /** Least significant bits of the checkpoint offset */
1067 static const uint OFFSET_LOW32 = 16;
1068
1069 bool found = false;
1070
1071 for (ulint field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1072 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1073 log_header_read(field);
1074
1075 if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1))
1076 != mach_read_from_4(buf + CHECKSUM_1)
1077 || static_cast<uint32_t>(
1078 ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
1079 CHECKSUM_2 - LOG_CHECKPOINT_LSN))
1080 != mach_read_from_4(buf + CHECKSUM_2)) {
1081 DBUG_LOG("ib_log",
1082 "invalid pre-10.2.2 checkpoint " << field);
1083 continue;
1084 }
1085
1086 checkpoint_no = mach_read_from_8(
1087 buf + LOG_CHECKPOINT_NO);
1088
1089 if (!log_crypt_101_read_checkpoint(buf)) {
1090 ib::error() << "Decrypting checkpoint failed";
1091 continue;
1092 }
1093
1094 DBUG_PRINT("ib_log",
1095 ("checkpoint " UINT64PF " at " LSN_PF " found",
1096 checkpoint_no,
1097 mach_read_from_8(buf + LOG_CHECKPOINT_LSN)));
1098
1099 if (checkpoint_no >= max_no) {
1100 found = true;
1101 *max_field = field;
1102 max_no = checkpoint_no;
1103
1104 log_sys.log.set_lsn(mach_read_from_8(
1105 buf + LOG_CHECKPOINT_LSN));
1106 log_sys.log.set_lsn_offset(
1107 lsn_t(mach_read_from_4(buf + OFFSET_HIGH32))
1108 << 32
1109 | mach_read_from_4(buf + OFFSET_LOW32));
1110 }
1111 }
1112
1113 if (found) {
1114 return(DB_SUCCESS);
1115 }
1116
1117 ib::error() << "Upgrade after a crash is not supported."
1118 " This redo log was created before MariaDB 10.2.2,"
1119 " and we did not find a valid checkpoint."
1120 " Please follow the instructions at"
1121 " https://mariadb.com/kb/en/library/upgrading/";
1122 return(DB_ERROR);
1123 }
1124
1125 /** Determine if a pre-MySQL 5.7.9/MariaDB 10.2.2 redo log is clean.
1126 @param[in] lsn checkpoint LSN
1127 @param[in] crypt whether the log might be encrypted
1128 @return error code
1129 @retval DB_SUCCESS if the redo log is clean
1130 @retval DB_ERROR if the redo log is corrupted or dirty */
recv_log_format_0_recover(lsn_t lsn,bool crypt)1131 static dberr_t recv_log_format_0_recover(lsn_t lsn, bool crypt)
1132 {
1133 log_mutex_enter();
1134 const lsn_t source_offset = log_sys.log.calc_lsn_offset(lsn);
1135 log_mutex_exit();
1136 const ulint page_no = ulint(source_offset >> srv_page_size_shift);
1137 byte* buf = log_sys.buf;
1138
1139 static const char* NO_UPGRADE_RECOVERY_MSG =
1140 "Upgrade after a crash is not supported."
1141 " This redo log was created before MariaDB 10.2.2";
1142
1143 fil_io(IORequestLogRead, true,
1144 page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
1145 univ_page_size,
1146 ulint((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1147 & (srv_page_size - 1)),
1148 OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1149
1150 if (log_block_calc_checksum_format_0(buf)
1151 != log_block_get_checksum(buf)
1152 && !log_crypt_101_read_block(buf)) {
1153 ib::error() << NO_UPGRADE_RECOVERY_MSG
1154 << ", and it appears corrupted.";
1155 return(DB_CORRUPTION);
1156 }
1157
1158 if (log_block_get_data_len(buf)
1159 == (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1160 } else if (crypt) {
1161 ib::error() << "Cannot decrypt log for upgrading."
1162 " The encrypted log was created"
1163 " before MariaDB 10.2.2.";
1164 return DB_ERROR;
1165 } else {
1166 ib::error() << NO_UPGRADE_RECOVERY_MSG << ".";
1167 return(DB_ERROR);
1168 }
1169
1170 /* Mark the redo log for upgrading. */
1171 srv_log_file_size = 0;
1172 recv_sys->parse_start_lsn = recv_sys->recovered_lsn
1173 = recv_sys->scanned_lsn
1174 = recv_sys->mlog_checkpoint_lsn = lsn;
1175 log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1176 = log_sys.lsn = log_sys.write_lsn
1177 = log_sys.current_flush_lsn = log_sys.flushed_to_disk_lsn
1178 = lsn;
1179 log_sys.next_checkpoint_no = 0;
1180 return(DB_SUCCESS);
1181 }
1182
1183 /** Determine if a redo log from MariaDB 10.4 is clean.
1184 @return error code
1185 @retval DB_SUCCESS if the redo log is clean
1186 @retval DB_CORRUPTION if the redo log is corrupted
1187 @retval DB_ERROR if the redo log is not empty */
recv_log_recover_10_4()1188 static dberr_t recv_log_recover_10_4()
1189 {
1190 ut_ad(!log_sys.is_encrypted());
1191 const lsn_t lsn = log_sys.log.get_lsn();
1192 const lsn_t source_offset = log_sys.log.calc_lsn_offset(lsn);
1193 const ulint page_no
1194 = (ulint) (source_offset / univ_page_size.physical());
1195 byte* buf = log_sys.buf;
1196
1197 fil_io(IORequestLogRead, true,
1198 page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
1199 univ_page_size,
1200 (ulint) ((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1201 % univ_page_size.physical()),
1202 OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1203
1204 const ulint cksum = log_block_get_checksum(buf);
1205
1206 if (cksum != LOG_NO_CHECKSUM_MAGIC
1207 && cksum != log_block_calc_checksum_crc32(buf)) {
1208 return DB_CORRUPTION;
1209 }
1210
1211 /* On a clean shutdown, the redo log will be logically empty
1212 after the checkpoint lsn. */
1213
1214 if (log_block_get_data_len(buf)
1215 != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1216 return DB_ERROR;
1217 }
1218
1219 /* Mark the redo log for downgrading. */
1220 srv_log_file_size = 0;
1221 recv_sys->parse_start_lsn = recv_sys->recovered_lsn
1222 = recv_sys->scanned_lsn
1223 = recv_sys->mlog_checkpoint_lsn = lsn;
1224 log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1225 = log_sys.lsn = log_sys.write_lsn
1226 = log_sys.current_flush_lsn = log_sys.flushed_to_disk_lsn
1227 = lsn;
1228 log_sys.next_checkpoint_no = 0;
1229 return DB_SUCCESS;
1230 }
1231
1232 /** Find the latest checkpoint in the log header.
1233 @param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1234 @return error code or DB_SUCCESS */
1235 dberr_t
recv_find_max_checkpoint(ulint * max_field)1236 recv_find_max_checkpoint(ulint* max_field)
1237 {
1238 ib_uint64_t max_no;
1239 ib_uint64_t checkpoint_no;
1240 ulint field;
1241 byte* buf;
1242
1243 max_no = 0;
1244 *max_field = 0;
1245
1246 buf = log_sys.checkpoint_buf;
1247
1248 log_header_read(0);
1249 /* Check the header page checksum. There was no
1250 checksum in the first redo log format (version 0). */
1251 log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1252 log_sys.log.subformat = log_sys.log.format != LOG_HEADER_FORMAT_3_23
1253 ? mach_read_from_4(buf + LOG_HEADER_SUBFORMAT)
1254 : 0;
1255 if (log_sys.log.format != LOG_HEADER_FORMAT_3_23
1256 && !recv_check_log_header_checksum(buf)) {
1257 ib::error() << "Invalid redo log header checksum.";
1258 return(DB_CORRUPTION);
1259 }
1260
1261 char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
1262
1263 memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator);
1264 /* Ensure that the string is NUL-terminated. */
1265 creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0;
1266
1267 switch (log_sys.log.format) {
1268 case LOG_HEADER_FORMAT_3_23:
1269 return(recv_find_max_checkpoint_0(max_field));
1270 case LOG_HEADER_FORMAT_10_2:
1271 case LOG_HEADER_FORMAT_10_2 | LOG_HEADER_FORMAT_ENCRYPTED:
1272 case LOG_HEADER_FORMAT_CURRENT:
1273 case LOG_HEADER_FORMAT_CURRENT | LOG_HEADER_FORMAT_ENCRYPTED:
1274 case LOG_HEADER_FORMAT_10_4:
1275 /* We can only parse the unencrypted LOG_HEADER_FORMAT_10_4.
1276 The encrypted format uses a larger redo log block trailer. */
1277 break;
1278 default:
1279 ib::error() << "Unsupported redo log format."
1280 " The redo log was created with " << creator << ".";
1281 return(DB_ERROR);
1282 }
1283
1284 for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1285 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1286
1287 log_header_read(field);
1288
1289 const ulint crc32 = log_block_calc_checksum_crc32(buf);
1290 const ulint cksum = log_block_get_checksum(buf);
1291
1292 if (crc32 != cksum) {
1293 DBUG_PRINT("ib_log",
1294 ("invalid checkpoint,"
1295 " at " ULINTPF
1296 ", checksum " ULINTPFx
1297 " expected " ULINTPFx,
1298 field, cksum, crc32));
1299 continue;
1300 }
1301
1302 if (log_sys.is_encrypted()
1303 && !log_crypt_read_checkpoint_buf(buf)) {
1304 ib::error() << "Reading checkpoint"
1305 " encryption info failed.";
1306 continue;
1307 }
1308
1309 checkpoint_no = mach_read_from_8(
1310 buf + LOG_CHECKPOINT_NO);
1311
1312 DBUG_PRINT("ib_log",
1313 ("checkpoint " UINT64PF " at " LSN_PF " found",
1314 checkpoint_no, mach_read_from_8(
1315 buf + LOG_CHECKPOINT_LSN)));
1316
1317 if (checkpoint_no >= max_no) {
1318 *max_field = field;
1319 max_no = checkpoint_no;
1320 log_sys.log.set_lsn(mach_read_from_8(
1321 buf + LOG_CHECKPOINT_LSN));
1322 log_sys.log.set_lsn_offset(mach_read_from_8(
1323 buf + LOG_CHECKPOINT_OFFSET));
1324 log_sys.next_checkpoint_no = checkpoint_no;
1325 }
1326 }
1327
1328 if (*max_field == 0) {
1329 /* Before 10.2.2, we could get here during database
1330 initialization if we created an ib_logfile0 file that
1331 was filled with zeroes, and were killed. After
1332 10.2.2, we would reject such a file already earlier,
1333 when checking the file header. */
1334 ib::error() << "No valid checkpoint found"
1335 " (corrupted redo log)."
1336 " You can try --innodb-force-recovery=6"
1337 " as a last resort.";
1338 return(DB_ERROR);
1339 }
1340
1341 if (log_sys.log.format == LOG_HEADER_FORMAT_10_4) {
1342 dberr_t err = recv_log_recover_10_4();
1343 if (err != DB_SUCCESS) {
1344 ib::error()
1345 << "Downgrade after a crash is not supported."
1346 " The redo log was created with " << creator
1347 << (err == DB_ERROR
1348 ? "." : ", and it appears corrupted.");
1349 }
1350 return err;
1351 }
1352
1353 return DB_SUCCESS;
1354 }
1355
1356 /** Try to parse a single log record body and also applies it if
1357 specified.
1358 @param[in] type redo log entry type
1359 @param[in] ptr redo log record body
1360 @param[in] end_ptr end of buffer
1361 @param[in] space_id tablespace identifier
1362 @param[in] page_no page number
1363 @param[in] apply whether to apply the record
1364 @param[in,out] block buffer block, or NULL if
1365 a page log record should not be applied
1366 or if it is a MLOG_FILE_ operation
1367 @param[in,out] mtr mini-transaction, or NULL if
1368 a page log record should not be applied
1369 @return log record end, NULL if not a complete record */
1370 static
1371 byte*
recv_parse_or_apply_log_rec_body(mlog_id_t type,byte * ptr,byte * end_ptr,ulint space_id,ulint page_no,bool apply,buf_block_t * block,mtr_t * mtr)1372 recv_parse_or_apply_log_rec_body(
1373 mlog_id_t type,
1374 byte* ptr,
1375 byte* end_ptr,
1376 ulint space_id,
1377 ulint page_no,
1378 bool apply,
1379 buf_block_t* block,
1380 mtr_t* mtr)
1381 {
1382 ut_ad(!block == !mtr);
1383 ut_ad(!apply || recv_sys->mlog_checkpoint_lsn != 0);
1384
1385 switch (type) {
1386 case MLOG_FILE_NAME:
1387 case MLOG_FILE_DELETE:
1388 case MLOG_FILE_CREATE2:
1389 case MLOG_FILE_RENAME2:
1390 ut_ad(block == NULL);
1391 /* Collect the file names when parsing the log,
1392 before applying any log records. */
1393 return(fil_name_parse(ptr, end_ptr, space_id, page_no, type,
1394 apply));
1395 case MLOG_INDEX_LOAD:
1396 if (end_ptr < ptr + 8) {
1397 return(NULL);
1398 }
1399 return(ptr + 8);
1400 case MLOG_TRUNCATE:
1401 if (log_truncate) {
1402 ut_ad(srv_operation != SRV_OPERATION_NORMAL);
1403 log_truncate();
1404 recv_sys->found_corrupt_fs = true;
1405 return NULL;
1406 }
1407 return(truncate_t::parse_redo_entry(ptr, end_ptr, space_id));
1408
1409 default:
1410 break;
1411 }
1412
1413 dict_index_t* index = NULL;
1414 page_t* page;
1415 page_zip_des_t* page_zip;
1416 #ifdef UNIV_DEBUG
1417 ulint page_type;
1418 #endif /* UNIV_DEBUG */
1419
1420 if (block) {
1421 /* Applying a page log record. */
1422 ut_ad(apply);
1423 page = block->frame;
1424 page_zip = buf_block_get_page_zip(block);
1425 ut_d(page_type = fil_page_get_type(page));
1426 } else if (apply
1427 && !is_predefined_tablespace(space_id)
1428 && recv_spaces.find(space_id) == recv_spaces.end()) {
1429 if (recv_sys->recovered_lsn < recv_sys->mlog_checkpoint_lsn) {
1430 /* We have not seen all records between the
1431 checkpoint and MLOG_CHECKPOINT. There should be
1432 a MLOG_FILE_DELETE for this tablespace later. */
1433 recv_spaces.insert(
1434 std::make_pair(space_id,
1435 file_name_t("", false)));
1436 goto parse_log;
1437 }
1438
1439 ib::error() << "Missing MLOG_FILE_NAME or MLOG_FILE_DELETE"
1440 " for redo log record " << type << " (page "
1441 << space_id << ":" << page_no << ") at "
1442 << recv_sys->recovered_lsn << ".";
1443 recv_sys->found_corrupt_log = true;
1444 return(NULL);
1445 } else {
1446 parse_log:
1447 /* Parsing a page log record. */
1448 page = NULL;
1449 page_zip = NULL;
1450 ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1451 }
1452
1453 const byte* old_ptr = ptr;
1454
1455 switch (type) {
1456 #ifdef UNIV_LOG_LSN_DEBUG
1457 case MLOG_LSN:
1458 /* The LSN is checked in recv_parse_log_rec(). */
1459 break;
1460 #endif /* UNIV_LOG_LSN_DEBUG */
1461 case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
1462 #ifdef UNIV_DEBUG
1463 if (page && page_type == FIL_PAGE_TYPE_ALLOCATED
1464 && end_ptr >= ptr + 2) {
1465 /* It is OK to set FIL_PAGE_TYPE and certain
1466 list node fields on an empty page. Any other
1467 write is not OK. */
1468
1469 /* NOTE: There may be bogus assertion failures for
1470 dict_hdr_create(), trx_rseg_header_create(),
1471 trx_sys_create_doublewrite_buf(), and
1472 trx_sysf_create().
1473 These are only called during database creation. */
1474 ulint offs = mach_read_from_2(ptr);
1475
1476 switch (type) {
1477 default:
1478 ut_error;
1479 case MLOG_2BYTES:
1480 /* Note that this can fail when the
1481 redo log been written with something
1482 older than InnoDB Plugin 1.0.4. */
1483 ut_ad(offs == FIL_PAGE_TYPE
1484 || srv_is_undo_tablespace(space_id)
1485 || offs == IBUF_TREE_SEG_HEADER
1486 + IBUF_HEADER + FSEG_HDR_OFFSET
1487 || offs == PAGE_BTR_IBUF_FREE_LIST
1488 + PAGE_HEADER + FIL_ADDR_BYTE
1489 || offs == PAGE_BTR_IBUF_FREE_LIST
1490 + PAGE_HEADER + FIL_ADDR_BYTE
1491 + FIL_ADDR_SIZE
1492 || offs == PAGE_BTR_SEG_LEAF
1493 + PAGE_HEADER + FSEG_HDR_OFFSET
1494 || offs == PAGE_BTR_SEG_TOP
1495 + PAGE_HEADER + FSEG_HDR_OFFSET
1496 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1497 + PAGE_HEADER + FIL_ADDR_BYTE
1498 + 0 /*FLST_PREV*/
1499 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1500 + PAGE_HEADER + FIL_ADDR_BYTE
1501 + FIL_ADDR_SIZE /*FLST_NEXT*/);
1502 break;
1503 case MLOG_4BYTES:
1504 /* Note that this can fail when the
1505 redo log been written with something
1506 older than InnoDB Plugin 1.0.4. */
1507 ut_ad(0
1508 /* fil_crypt_rotate_page() writes this */
1509 || offs == FIL_PAGE_SPACE_ID
1510 || srv_is_undo_tablespace(space_id)
1511 || offs == IBUF_TREE_SEG_HEADER
1512 + IBUF_HEADER + FSEG_HDR_SPACE
1513 || offs == IBUF_TREE_SEG_HEADER
1514 + IBUF_HEADER + FSEG_HDR_PAGE_NO
1515 || offs == PAGE_BTR_IBUF_FREE_LIST
1516 + PAGE_HEADER/* flst_init */
1517 || offs == PAGE_BTR_IBUF_FREE_LIST
1518 + PAGE_HEADER + FIL_ADDR_PAGE
1519 || offs == PAGE_BTR_IBUF_FREE_LIST
1520 + PAGE_HEADER + FIL_ADDR_PAGE
1521 + FIL_ADDR_SIZE
1522 || offs == PAGE_BTR_SEG_LEAF
1523 + PAGE_HEADER + FSEG_HDR_PAGE_NO
1524 || offs == PAGE_BTR_SEG_LEAF
1525 + PAGE_HEADER + FSEG_HDR_SPACE
1526 || offs == PAGE_BTR_SEG_TOP
1527 + PAGE_HEADER + FSEG_HDR_PAGE_NO
1528 || offs == PAGE_BTR_SEG_TOP
1529 + PAGE_HEADER + FSEG_HDR_SPACE
1530 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1531 + PAGE_HEADER + FIL_ADDR_PAGE
1532 + 0 /*FLST_PREV*/
1533 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1534 + PAGE_HEADER + FIL_ADDR_PAGE
1535 + FIL_ADDR_SIZE /*FLST_NEXT*/);
1536 break;
1537 }
1538 }
1539 #endif /* UNIV_DEBUG */
1540 ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1541 if (ptr != NULL && page != NULL
1542 && page_no == 0 && type == MLOG_4BYTES) {
1543 ulint offs = mach_read_from_2(old_ptr);
1544 switch (offs) {
1545 fil_space_t* space;
1546 ulint val;
1547 default:
1548 break;
1549 case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1550 case FSP_HEADER_OFFSET + FSP_SIZE:
1551 case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1552 case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1553 space = fil_space_get(space_id);
1554 ut_a(space != NULL);
1555 val = mach_read_from_4(page + offs);
1556
1557 switch (offs) {
1558 case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1559 space->flags = val;
1560 break;
1561 case FSP_HEADER_OFFSET + FSP_SIZE:
1562 space->size_in_header = val;
1563 break;
1564 case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1565 space->free_limit = val;
1566 break;
1567 case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1568 space->free_len = val;
1569 ut_ad(val == flst_get_len(
1570 page + offs));
1571 break;
1572 }
1573 }
1574 }
1575 break;
1576 case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
1577 ut_ad(!page || fil_page_type_is_index(page_type));
1578
1579 if (NULL != (ptr = mlog_parse_index(
1580 ptr, end_ptr,
1581 type == MLOG_COMP_REC_INSERT,
1582 &index))) {
1583 ut_a(!page
1584 || (ibool)!!page_is_comp(page)
1585 == dict_table_is_comp(index->table));
1586 ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr,
1587 block, index, mtr);
1588 }
1589 break;
1590 case MLOG_REC_CLUST_DELETE_MARK: case MLOG_COMP_REC_CLUST_DELETE_MARK:
1591 ut_ad(!page || fil_page_type_is_index(page_type));
1592
1593 if (NULL != (ptr = mlog_parse_index(
1594 ptr, end_ptr,
1595 type == MLOG_COMP_REC_CLUST_DELETE_MARK,
1596 &index))) {
1597 ut_a(!page
1598 || (ibool)!!page_is_comp(page)
1599 == dict_table_is_comp(index->table));
1600 ptr = btr_cur_parse_del_mark_set_clust_rec(
1601 ptr, end_ptr, page, page_zip, index);
1602 }
1603 break;
1604 case MLOG_REC_SEC_DELETE_MARK:
1605 ut_ad(!page || fil_page_type_is_index(page_type));
1606 ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
1607 page, page_zip);
1608 break;
1609 case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
1610 ut_ad(!page || fil_page_type_is_index(page_type));
1611
1612 if (NULL != (ptr = mlog_parse_index(
1613 ptr, end_ptr,
1614 type == MLOG_COMP_REC_UPDATE_IN_PLACE,
1615 &index))) {
1616 ut_a(!page
1617 || (ibool)!!page_is_comp(page)
1618 == dict_table_is_comp(index->table));
1619 ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page,
1620 page_zip, index);
1621 }
1622 break;
1623 case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE:
1624 case MLOG_LIST_START_DELETE: case MLOG_COMP_LIST_START_DELETE:
1625 ut_ad(!page || fil_page_type_is_index(page_type));
1626
1627 if (NULL != (ptr = mlog_parse_index(
1628 ptr, end_ptr,
1629 type == MLOG_COMP_LIST_END_DELETE
1630 || type == MLOG_COMP_LIST_START_DELETE,
1631 &index))) {
1632 ut_a(!page
1633 || (ibool)!!page_is_comp(page)
1634 == dict_table_is_comp(index->table));
1635 ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
1636 block, index, mtr);
1637 }
1638 break;
1639 case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
1640 ut_ad(!page || fil_page_type_is_index(page_type));
1641
1642 if (NULL != (ptr = mlog_parse_index(
1643 ptr, end_ptr,
1644 type == MLOG_COMP_LIST_END_COPY_CREATED,
1645 &index))) {
1646 ut_a(!page
1647 || (ibool)!!page_is_comp(page)
1648 == dict_table_is_comp(index->table));
1649 ptr = page_parse_copy_rec_list_to_created_page(
1650 ptr, end_ptr, block, index, mtr);
1651 }
1652 break;
1653 case MLOG_PAGE_REORGANIZE:
1654 case MLOG_COMP_PAGE_REORGANIZE:
1655 case MLOG_ZIP_PAGE_REORGANIZE:
1656 ut_ad(!page || fil_page_type_is_index(page_type));
1657
1658 if (NULL != (ptr = mlog_parse_index(
1659 ptr, end_ptr,
1660 type != MLOG_PAGE_REORGANIZE,
1661 &index))) {
1662 ut_a(!page
1663 || (ibool)!!page_is_comp(page)
1664 == dict_table_is_comp(index->table));
1665 ptr = btr_parse_page_reorganize(
1666 ptr, end_ptr, index,
1667 type == MLOG_ZIP_PAGE_REORGANIZE,
1668 block, mtr);
1669 }
1670 break;
1671 case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
1672 /* Allow anything in page_type when creating a page. */
1673 ut_a(!page_zip);
1674 page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
1675 break;
1676 case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
1677 page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
1678 true);
1679 break;
1680 case MLOG_UNDO_INSERT:
1681 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1682 ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
1683 break;
1684 case MLOG_UNDO_ERASE_END:
1685 if (page) {
1686 ut_ad(page_type == FIL_PAGE_UNDO_LOG);
1687 trx_undo_erase_page_end(page);
1688 }
1689 break;
1690 case MLOG_UNDO_INIT:
1691 /* Allow anything in page_type when creating a page. */
1692 ptr = trx_undo_parse_page_init(ptr, end_ptr, page);
1693 break;
1694 case MLOG_UNDO_HDR_REUSE:
1695 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1696 ptr = trx_undo_parse_page_header_reuse(ptr, end_ptr, page);
1697 break;
1698 case MLOG_UNDO_HDR_CREATE:
1699 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1700 ptr = trx_undo_parse_page_header(ptr, end_ptr, page, mtr);
1701 break;
1702 case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
1703 ut_ad(!page || fil_page_type_is_index(page_type));
1704 /* On a compressed page, MLOG_COMP_REC_MIN_MARK
1705 will be followed by MLOG_COMP_REC_DELETE
1706 or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_NULL)
1707 in the same mini-transaction. */
1708 ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
1709 ptr = btr_parse_set_min_rec_mark(
1710 ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK,
1711 page, mtr);
1712 break;
1713 case MLOG_REC_DELETE: case MLOG_COMP_REC_DELETE:
1714 ut_ad(!page || fil_page_type_is_index(page_type));
1715
1716 if (NULL != (ptr = mlog_parse_index(
1717 ptr, end_ptr,
1718 type == MLOG_COMP_REC_DELETE,
1719 &index))) {
1720 ut_a(!page
1721 || (ibool)!!page_is_comp(page)
1722 == dict_table_is_comp(index->table));
1723 ptr = page_cur_parse_delete_rec(ptr, end_ptr,
1724 block, index, mtr);
1725 }
1726 break;
1727 case MLOG_IBUF_BITMAP_INIT:
1728 /* Allow anything in page_type when creating a page. */
1729 ptr = ibuf_parse_bitmap_init(ptr, end_ptr, block, mtr);
1730 break;
1731 case MLOG_INIT_FILE_PAGE2:
1732 /* Allow anything in page_type when creating a page. */
1733 if (block) fsp_apply_init_file_page(block);
1734 break;
1735 case MLOG_WRITE_STRING:
1736 ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
1737 break;
1738 case MLOG_ZIP_WRITE_NODE_PTR:
1739 ut_ad(!page || fil_page_type_is_index(page_type));
1740 ptr = page_zip_parse_write_node_ptr(ptr, end_ptr,
1741 page, page_zip);
1742 break;
1743 case MLOG_ZIP_WRITE_BLOB_PTR:
1744 ut_ad(!page || fil_page_type_is_index(page_type));
1745 ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr,
1746 page, page_zip);
1747 break;
1748 case MLOG_ZIP_WRITE_HEADER:
1749 ut_ad(!page || fil_page_type_is_index(page_type));
1750 ptr = page_zip_parse_write_header(ptr, end_ptr,
1751 page, page_zip);
1752 break;
1753 case MLOG_ZIP_PAGE_COMPRESS:
1754 /* Allow anything in page_type when creating a page. */
1755 ptr = page_zip_parse_compress(ptr, end_ptr, block);
1756 break;
1757 case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
1758 if (NULL != (ptr = mlog_parse_index(
1759 ptr, end_ptr, TRUE, &index))) {
1760
1761 ut_a(!page || ((ibool)!!page_is_comp(page)
1762 == dict_table_is_comp(index->table)));
1763 ptr = page_zip_parse_compress_no_data(
1764 ptr, end_ptr, page, page_zip, index);
1765 }
1766 break;
1767 case MLOG_ZIP_WRITE_TRX_ID:
1768 /* This must be a clustered index leaf page. */
1769 ut_ad(!page || page_type == FIL_PAGE_INDEX);
1770 ptr = page_zip_parse_write_trx_id(ptr, end_ptr,
1771 page, page_zip);
1772 break;
1773 case MLOG_FILE_WRITE_CRYPT_DATA:
1774 dberr_t err;
1775 ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, &err));
1776
1777 if (err != DB_SUCCESS) {
1778 recv_sys->found_corrupt_log = TRUE;
1779 }
1780 break;
1781 default:
1782 ptr = NULL;
1783 ib::error() << "Incorrect log record type "
1784 << ib::hex(unsigned(type));
1785
1786 recv_sys->found_corrupt_log = true;
1787 }
1788
1789 if (index) {
1790 dict_table_t* table = index->table;
1791
1792 dict_mem_index_free(index);
1793 dict_mem_table_free(table);
1794 }
1795
1796 return(ptr);
1797 }
1798
1799 /*********************************************************************//**
1800 Calculates the fold value of a page file address: used in inserting or
1801 searching for a log record in the hash table.
1802 @return folded value */
1803 UNIV_INLINE
1804 ulint
recv_fold(ulint space,ulint page_no)1805 recv_fold(
1806 /*======*/
1807 ulint space, /*!< in: space */
1808 ulint page_no)/*!< in: page number */
1809 {
1810 return(ut_fold_ulint_pair(space, page_no));
1811 }
1812
1813 /*********************************************************************//**
1814 Calculates the hash value of a page file address: used in inserting or
1815 searching for a log record in the hash table.
1816 @return folded value */
1817 UNIV_INLINE
1818 ulint
recv_hash(ulint space,ulint page_no)1819 recv_hash(
1820 /*======*/
1821 ulint space, /*!< in: space */
1822 ulint page_no)/*!< in: page number */
1823 {
1824 return(hash_calc_hash(recv_fold(space, page_no), recv_sys->addr_hash));
1825 }
1826
1827 /*********************************************************************//**
1828 Gets the hashed file address struct for a page.
1829 @return file address struct, NULL if not found from the hash table */
1830 static
1831 recv_addr_t*
recv_get_fil_addr_struct(ulint space,ulint page_no)1832 recv_get_fil_addr_struct(
1833 /*=====================*/
1834 ulint space, /*!< in: space id */
1835 ulint page_no)/*!< in: page number */
1836 {
1837 ut_ad(mutex_own(&recv_sys->mutex));
1838
1839 recv_addr_t* recv_addr;
1840
1841 for (recv_addr = static_cast<recv_addr_t*>(
1842 HASH_GET_FIRST(recv_sys->addr_hash,
1843 recv_hash(space, page_no)));
1844 recv_addr != 0;
1845 recv_addr = static_cast<recv_addr_t*>(
1846 HASH_GET_NEXT(addr_hash, recv_addr))) {
1847
1848 if (recv_addr->space == space
1849 && recv_addr->page_no == page_no) {
1850
1851 return(recv_addr);
1852 }
1853 }
1854
1855 return(NULL);
1856 }
1857
1858 /*******************************************************************//**
1859 Adds a new log record to the hash table of log records. */
1860 static
1861 void
recv_add_to_hash_table(mlog_id_t type,ulint space,ulint page_no,byte * body,byte * rec_end,lsn_t start_lsn,lsn_t end_lsn)1862 recv_add_to_hash_table(
1863 /*===================*/
1864 mlog_id_t type, /*!< in: log record type */
1865 ulint space, /*!< in: space id */
1866 ulint page_no, /*!< in: page number */
1867 byte* body, /*!< in: log record body */
1868 byte* rec_end, /*!< in: log record end */
1869 lsn_t start_lsn, /*!< in: start lsn of the mtr */
1870 lsn_t end_lsn) /*!< in: end lsn of the mtr */
1871 {
1872 recv_t* recv;
1873 ulint len;
1874 recv_data_t* recv_data;
1875 recv_data_t** prev_field;
1876 recv_addr_t* recv_addr;
1877
1878 ut_ad(type != MLOG_FILE_DELETE);
1879 ut_ad(type != MLOG_FILE_CREATE2);
1880 ut_ad(type != MLOG_FILE_RENAME2);
1881 ut_ad(type != MLOG_FILE_NAME);
1882 ut_ad(type != MLOG_DUMMY_RECORD);
1883 ut_ad(type != MLOG_CHECKPOINT);
1884 ut_ad(type != MLOG_INDEX_LOAD);
1885 ut_ad(type != MLOG_TRUNCATE);
1886
1887 len = ulint(rec_end - body);
1888
1889 recv = static_cast<recv_t*>(
1890 mem_heap_alloc(recv_sys->heap, sizeof(recv_t)));
1891
1892 recv->type = type;
1893 recv->len = ulint(rec_end - body);
1894 recv->start_lsn = start_lsn;
1895 recv->end_lsn = end_lsn;
1896
1897 recv_addr = recv_get_fil_addr_struct(space, page_no);
1898
1899 if (recv_addr == NULL) {
1900 recv_addr = static_cast<recv_addr_t*>(
1901 mem_heap_alloc(recv_sys->heap, sizeof(recv_addr_t)));
1902
1903 recv_addr->space = space;
1904 recv_addr->page_no = page_no;
1905 recv_addr->state = RECV_NOT_PROCESSED;
1906
1907 UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
1908
1909 HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash,
1910 recv_fold(space, page_no), recv_addr);
1911 recv_sys->n_addrs++;
1912 }
1913
1914 switch (type) {
1915 case MLOG_INIT_FILE_PAGE2:
1916 case MLOG_ZIP_PAGE_COMPRESS:
1917 /* Ignore any earlier redo log records for this page. */
1918 ut_ad(recv_addr->state == RECV_NOT_PROCESSED
1919 || recv_addr->state == RECV_WILL_NOT_READ);
1920 recv_addr->state = RECV_WILL_NOT_READ;
1921 mlog_init.add(space, page_no, start_lsn);
1922 default:
1923 break;
1924 }
1925
1926 UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
1927
1928 prev_field = &(recv->data);
1929
1930 /* Store the log record body in chunks of less than srv_page_size:
1931 recv_sys->heap grows into the buffer pool, and bigger chunks could not
1932 be allocated */
1933
1934 while (rec_end > body) {
1935
1936 len = ulint(rec_end - body);
1937
1938 if (len > RECV_DATA_BLOCK_SIZE) {
1939 len = RECV_DATA_BLOCK_SIZE;
1940 }
1941
1942 recv_data = static_cast<recv_data_t*>(
1943 mem_heap_alloc(recv_sys->heap,
1944 sizeof(recv_data_t) + len));
1945
1946 *prev_field = recv_data;
1947
1948 memcpy(recv_data + 1, body, len);
1949
1950 prev_field = &(recv_data->next);
1951
1952 body += len;
1953 }
1954
1955 *prev_field = NULL;
1956 }
1957
1958 /*********************************************************************//**
1959 Copies the log record body from recv to buf. */
1960 static
1961 void
recv_data_copy_to_buf(byte * buf,recv_t * recv)1962 recv_data_copy_to_buf(
1963 /*==================*/
1964 byte* buf, /*!< in: buffer of length at least recv->len */
1965 recv_t* recv) /*!< in: log record */
1966 {
1967 recv_data_t* recv_data;
1968 ulint part_len;
1969 ulint len;
1970
1971 len = recv->len;
1972 recv_data = recv->data;
1973
1974 while (len > 0) {
1975 if (len > RECV_DATA_BLOCK_SIZE) {
1976 part_len = RECV_DATA_BLOCK_SIZE;
1977 } else {
1978 part_len = len;
1979 }
1980
1981 ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
1982 part_len);
1983 buf += part_len;
1984 len -= part_len;
1985
1986 recv_data = recv_data->next;
1987 }
1988 }
1989
1990 /** Apply the hashed log records to the page, if the page lsn is less than the
1991 lsn of a log record.
1992 @param[in,out] block buffer pool page
1993 @param[in,out] mtr mini-transaction
1994 @param[in,out] recv_addr recovery address
1995 @param[in] init_lsn the initial LSN where to start recovery */
recv_recover_page(buf_block_t * block,mtr_t & mtr,recv_addr_t * recv_addr,lsn_t init_lsn=0)1996 static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
1997 recv_addr_t* recv_addr, lsn_t init_lsn = 0)
1998 {
1999 page_t* page;
2000 page_zip_des_t* page_zip;
2001
2002 ut_ad(mutex_own(&recv_sys->mutex));
2003 ut_ad(recv_sys->apply_log_recs);
2004 ut_ad(recv_needed_recovery);
2005 ut_ad(recv_addr->state != RECV_BEING_PROCESSED);
2006 ut_ad(recv_addr->state != RECV_PROCESSED);
2007
2008 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2009 fprintf(stderr, "Applying log to page %u:%u\n",
2010 recv_addr->space, recv_addr->page_no);
2011 }
2012
2013 DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
2014
2015 recv_addr->state = RECV_BEING_PROCESSED;
2016 mutex_exit(&recv_sys->mutex);
2017
2018 page = block->frame;
2019 page_zip = buf_block_get_page_zip(block);
2020
2021 /* The page may have been modified in the buffer pool.
2022 FIL_PAGE_LSN would only be updated right before flushing. */
2023 lsn_t page_lsn = buf_page_get_newest_modification(&block->page);
2024 if (!page_lsn) {
2025 page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
2026 }
2027
2028 lsn_t start_lsn = 0, end_lsn = 0;
2029 fil_space_t* space;
2030
2031 if (srv_is_tablespace_truncated(recv_addr->space)) {
2032 /* The table will be truncated after applying
2033 normal redo log records. */
2034 goto skip_log;
2035 }
2036
2037 space = fil_space_acquire(recv_addr->space);
2038 if (!space) {
2039 goto skip_log;
2040 }
2041
2042 for (recv_t* recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
2043 recv; recv = UT_LIST_GET_NEXT(rec_list, recv)) {
2044 ut_ad(recv->start_lsn);
2045 end_lsn = recv->end_lsn;
2046 ut_ad(end_lsn <= log_sys.log.scanned_lsn);
2047
2048 if (recv->start_lsn < page_lsn) {
2049 /* Ignore this record, because there are later changes
2050 for this page. */
2051 DBUG_LOG("ib_log", "apply skip "
2052 << get_mlog_string(recv->type)
2053 << " LSN " << recv->start_lsn << " < "
2054 << page_lsn);
2055 } else if (recv->start_lsn < init_lsn) {
2056 DBUG_LOG("ib_log", "init skip "
2057 << get_mlog_string(recv->type)
2058 << " LSN " << recv->start_lsn << " < "
2059 << init_lsn);
2060 } else if (srv_was_tablespace_truncated(space)
2061 && recv->start_lsn
2062 < truncate_t::get_truncated_tablespace_init_lsn(
2063 recv_addr->space)) {
2064 /* If per-table tablespace was truncated and
2065 there exist REDO records before truncate that
2066 are to be applied as part of recovery
2067 (checkpoint didn't happen since truncate was
2068 done) skip such records using lsn check as
2069 they may not stand valid post truncate. */
2070 } else {
2071 if (!start_lsn) {
2072 start_lsn = recv->start_lsn;
2073 }
2074
2075 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2076 fprintf(stderr, "apply " LSN_PF ":"
2077 " %d len " ULINTPF " page %u:%u\n",
2078 recv->start_lsn, recv->type, recv->len,
2079 recv_addr->space, recv_addr->page_no);
2080 }
2081
2082 DBUG_LOG("ib_log", "apply " << recv->start_lsn << ": "
2083 << get_mlog_string(recv->type)
2084 << " len " << recv->len
2085 << " page " << block->page.id);
2086
2087 byte* buf;
2088
2089 if (recv->len > RECV_DATA_BLOCK_SIZE) {
2090 /* We have to copy the record body to
2091 a separate buffer */
2092 buf = static_cast<byte*>
2093 (ut_malloc_nokey(recv->len));
2094 recv_data_copy_to_buf(buf, recv);
2095 } else {
2096 buf = reinterpret_cast<byte*>(recv->data)
2097 + sizeof *recv->data;
2098 }
2099
2100 recv_parse_or_apply_log_rec_body(
2101 recv->type, buf, buf + recv->len,
2102 block->page.id.space(),
2103 block->page.id.page_no(), true, block, &mtr);
2104
2105 end_lsn = recv->start_lsn + recv->len;
2106 mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
2107 mach_write_to_8(srv_page_size
2108 - FIL_PAGE_END_LSN_OLD_CHKSUM
2109 + page, end_lsn);
2110
2111 if (page_zip) {
2112 mach_write_to_8(FIL_PAGE_LSN + page_zip->data,
2113 end_lsn);
2114 }
2115
2116 if (recv->len > RECV_DATA_BLOCK_SIZE) {
2117 ut_free(buf);
2118 }
2119 }
2120 }
2121
2122 space->release();
2123
2124 skip_log:
2125 #ifdef UNIV_ZIP_DEBUG
2126 ut_ad(!fil_page_index_page_check(page)
2127 || !page_zip
2128 || page_zip_validate_low(page_zip, page, NULL, FALSE));
2129 #endif /* UNIV_ZIP_DEBUG */
2130
2131 if (start_lsn) {
2132 log_flush_order_mutex_enter();
2133 buf_flush_recv_note_modification(block, start_lsn, end_lsn);
2134 log_flush_order_mutex_exit();
2135 }
2136
2137 /* Make sure that committing mtr does not change the modification
2138 lsn values of page */
2139
2140 mtr.discard_modifications();
2141 mtr.commit();
2142
2143 time_t now = time(NULL);
2144
2145 mutex_enter(&recv_sys->mutex);
2146
2147 if (recv_max_page_lsn < page_lsn) {
2148 recv_max_page_lsn = page_lsn;
2149 }
2150
2151 ut_ad(recv_addr->state == RECV_BEING_PROCESSED);
2152 recv_addr->state = RECV_PROCESSED;
2153
2154 ut_a(recv_sys->n_addrs > 0);
2155 if (ulint n = --recv_sys->n_addrs) {
2156 if (recv_sys->report(now)) {
2157 ib::info() << "To recover: " << n << " pages from log";
2158 service_manager_extend_timeout(
2159 INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
2160 }
2161 }
2162 }
2163
2164 /** Reduces recv_sys->n_addrs for the corrupted page.
2165 This function should called when srv_force_recovery > 0.
2166 @param[in] page_id page id of the corrupted page */
recv_recover_corrupt_page(page_id_t page_id)2167 void recv_recover_corrupt_page(page_id_t page_id)
2168 {
2169 ut_ad(srv_force_recovery);
2170 mutex_enter(&recv_sys->mutex);
2171
2172 if (!recv_sys->apply_log_recs) {
2173 } else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2174 page_id.space(), page_id.page_no())) {
2175 switch (recv_addr->state) {
2176 case RECV_WILL_NOT_READ:
2177 ut_ad(!"wrong state");
2178 break;
2179 case RECV_BEING_PROCESSED:
2180 case RECV_PROCESSED:
2181 break;
2182 default:
2183 recv_addr->state = RECV_PROCESSED;
2184 ut_ad(recv_sys->n_addrs);
2185 recv_sys->n_addrs--;
2186 }
2187 }
2188
2189 mutex_exit(&recv_sys->mutex);
2190 }
2191
2192 /** Apply any buffered redo log to a page that was just read from a data file.
2193 @param[in,out] bpage buffer pool page */
recv_recover_page(buf_page_t * bpage)2194 void recv_recover_page(buf_page_t* bpage)
2195 {
2196 mtr_t mtr;
2197 mtr.start();
2198 mtr.set_log_mode(MTR_LOG_NONE);
2199
2200 ut_ad(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
2201 buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage);
2202
2203 /* Move the ownership of the x-latch on the page to
2204 this OS thread, so that we can acquire a second
2205 x-latch on it. This is needed for the operations to
2206 the page to pass the debug checks. */
2207 rw_lock_x_lock_move_ownership(&block->lock);
2208 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2209 ibool success = buf_page_get_known_nowait(
2210 RW_X_LATCH, block, BUF_KEEP_OLD,
2211 __FILE__, __LINE__, &mtr);
2212 ut_a(success);
2213
2214 mutex_enter(&recv_sys->mutex);
2215 if (!recv_sys->apply_log_recs) {
2216 } else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2217 bpage->id.space(), bpage->id.page_no())) {
2218 switch (recv_addr->state) {
2219 case RECV_BEING_PROCESSED:
2220 case RECV_PROCESSED:
2221 break;
2222 default:
2223 recv_recover_page(block, mtr, recv_addr);
2224 goto func_exit;
2225 }
2226 }
2227
2228 mtr.commit();
2229 func_exit:
2230 mutex_exit(&recv_sys->mutex);
2231 ut_ad(mtr.has_committed());
2232 }
2233
2234 /** Reads in pages which have hashed log records, from an area around a given
2235 page number.
2236 @param[in] page_id page id */
recv_read_in_area(const page_id_t page_id)2237 static void recv_read_in_area(const page_id_t page_id)
2238 {
2239 ulint page_nos[RECV_READ_AHEAD_AREA];
2240 ulint page_no = page_id.page_no()
2241 - (page_id.page_no() % RECV_READ_AHEAD_AREA);
2242 ulint* p = page_nos;
2243
2244 for (const ulint up_limit = page_no + RECV_READ_AHEAD_AREA;
2245 page_no < up_limit; page_no++) {
2246 recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2247 page_id.space(), page_no);
2248 if (recv_addr
2249 && recv_addr->state == RECV_NOT_PROCESSED
2250 && !buf_page_peek(page_id_t(page_id.space(), page_no))) {
2251 recv_addr->state = RECV_BEING_READ;
2252 *p++ = page_no;
2253 }
2254 }
2255
2256 mutex_exit(&recv_sys->mutex);
2257 buf_read_recv_pages(FALSE, page_id.space(), page_nos,
2258 ulint(p - page_nos));
2259 mutex_enter(&recv_sys->mutex);
2260 }
2261
2262 /** This is another low level function for the recovery system
2263 to create a page which has buffered page intialization redo log records.
2264 @param[in] page_id page to be created using redo logs
2265 @param[in,out] recv_addr Hashed redo logs for the given page id
2266 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id,recv_addr_t * recv_addr)2267 static buf_block_t* recv_recovery_create_page_low(const page_id_t page_id,
2268 recv_addr_t* recv_addr)
2269 {
2270 mtr_t mtr;
2271 mlog_init_t::init &i= mlog_init.last(page_id);
2272 const lsn_t end_lsn= UT_LIST_GET_LAST(recv_addr->rec_list)->end_lsn;
2273
2274 if (end_lsn < i.lsn)
2275 {
2276 DBUG_LOG("ib_log", "skip log for page "
2277 << page_id
2278 << " LSN " << end_lsn
2279 << " < " << i.lsn);
2280 recv_addr->state= RECV_PROCESSED;
2281 ignore:
2282 ut_a(recv_sys->n_addrs);
2283 recv_sys->n_addrs--;
2284 return NULL;
2285 }
2286
2287 fil_space_t *space= fil_space_acquire(recv_addr->space);
2288 if (!space)
2289 {
2290 recv_addr->state= RECV_PROCESSED;
2291 goto ignore;
2292 }
2293
2294 if (space->enable_lsn)
2295 {
2296 init_fail:
2297 space->release();
2298 recv_addr->state= RECV_NOT_PROCESSED;
2299 return NULL;
2300 }
2301
2302 /* Determine if a tablespace could be for an internal table
2303 for FULLTEXT INDEX. For those tables, no MLOG_INDEX_LOAD record
2304 used to be written when redo logging was disabled. Hence, we
2305 cannot optimize away page reads, because all the redo
2306 log records for initializing and modifying the page in the
2307 past could be older than the page in the data file.
2308
2309 The check is too broad, causing all
2310 tables whose names start with FTS_ to skip the optimization. */
2311
2312 if (strstr(space->name, "/FTS_"))
2313 goto init_fail;
2314
2315 mtr.start();
2316 mtr.set_log_mode(MTR_LOG_NONE);
2317 buf_block_t *block= buf_page_create(page_id, page_size_t(space->flags),
2318 &mtr);
2319 if (recv_addr->state == RECV_PROCESSED)
2320 /* The page happened to exist in the buffer pool, or it was
2321 just being read in. Before buf_page_get_with_no_latch() returned,
2322 all changes must have been applied to the page already. */
2323 mtr.commit();
2324 else
2325 {
2326 i.created= true;
2327 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2328 recv_recover_page(block, mtr, recv_addr, i.lsn);
2329 ut_ad(mtr.has_committed());
2330 }
2331
2332 space->release();
2333 return block;
2334 }
2335
2336 /** This is a low level function for the recovery system
2337 to create a page which has buffered intialized redo log records.
2338 @param[in] page_id page to be created using redo logs
2339 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id)2340 buf_block_t* recv_recovery_create_page_low(const page_id_t page_id)
2341 {
2342 buf_block_t* block= NULL;
2343 mutex_enter(&recv_sys->mutex);
2344 recv_addr_t* recv_addr= recv_get_fil_addr_struct(page_id.space(),
2345 page_id.page_no());
2346 if (recv_addr && recv_addr->state == RECV_WILL_NOT_READ)
2347 {
2348 block= recv_recovery_create_page_low(page_id, recv_addr);
2349 }
2350 mutex_exit(&recv_sys->mutex);
2351 return block;
2352 }
2353
2354 /** Apply the hash table of stored log records to persistent data pages.
2355 @param[in] last_batch whether the change buffer merge will be
2356 performed as part of the operation */
recv_apply_hashed_log_recs(bool last_batch)2357 void recv_apply_hashed_log_recs(bool last_batch)
2358 {
2359 ut_ad(srv_operation == SRV_OPERATION_NORMAL
2360 || is_mariabackup_restore_or_export());
2361
2362 mutex_enter(&recv_sys->mutex);
2363
2364 while (recv_sys->apply_batch_on) {
2365 bool abort = recv_sys->found_corrupt_log;
2366 mutex_exit(&recv_sys->mutex);
2367
2368 if (abort) {
2369 return;
2370 }
2371
2372 os_thread_sleep(500000);
2373 mutex_enter(&recv_sys->mutex);
2374 }
2375
2376 ut_ad(!last_batch == log_mutex_own());
2377
2378 recv_no_ibuf_operations
2379 = !last_batch || is_mariabackup_restore_or_export();
2380
2381 if (ulint n = recv_sys->n_addrs) {
2382 const char* msg = last_batch
2383 ? "Starting final batch to recover "
2384 : "Starting a batch to recover ";
2385 ib::info() << msg << n << " pages from redo log.";
2386 sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log",
2387 msg, n);
2388 }
2389 recv_sys->apply_log_recs = TRUE;
2390 recv_sys->apply_batch_on = TRUE;
2391
2392 for (ulint id = srv_undo_tablespaces_open; id--; ) {
2393 recv_sys_t::trunc& t = recv_sys->truncated_undo_spaces[id];
2394 if (t.lsn) {
2395 recv_addr_trim(id + srv_undo_space_id_start, t.pages,
2396 t.lsn);
2397 }
2398 }
2399
2400 mtr_t mtr;
2401
2402 for (ulint i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) {
2403 for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
2404 HASH_GET_FIRST(recv_sys->addr_hash, i));
2405 recv_addr;
2406 recv_addr = static_cast<recv_addr_t*>(
2407 HASH_GET_NEXT(addr_hash, recv_addr))) {
2408 if (!UT_LIST_GET_LEN(recv_addr->rec_list)) {
2409 ignore:
2410 ut_a(recv_sys->n_addrs);
2411 recv_sys->n_addrs--;
2412 continue;
2413 }
2414
2415 switch (recv_addr->state) {
2416 case RECV_BEING_READ:
2417 case RECV_BEING_PROCESSED:
2418 case RECV_PROCESSED:
2419 continue;
2420 case RECV_DISCARDED:
2421 goto ignore;
2422 case RECV_NOT_PROCESSED:
2423 case RECV_WILL_NOT_READ:
2424 break;
2425 }
2426
2427 if (srv_is_tablespace_truncated(recv_addr->space)) {
2428 /* Avoid applying REDO log for the tablespace
2429 that is schedule for TRUNCATE. */
2430 recv_addr->state = RECV_DISCARDED;
2431 goto ignore;
2432 }
2433
2434 const page_id_t page_id(recv_addr->space,
2435 recv_addr->page_no);
2436
2437 if (recv_addr->state == RECV_NOT_PROCESSED) {
2438 apply:
2439 mtr.start();
2440 mtr.set_log_mode(MTR_LOG_NONE);
2441 if (buf_block_t* block = buf_page_get_low(
2442 page_id, univ_page_size,
2443 RW_X_LATCH, NULL,
2444 BUF_GET_IF_IN_POOL,
2445 __FILE__, __LINE__, &mtr, NULL)) {
2446 buf_block_dbg_add_level(
2447 block, SYNC_NO_ORDER_CHECK);
2448 recv_recover_page(block, mtr,
2449 recv_addr);
2450 ut_ad(mtr.has_committed());
2451 } else {
2452 mtr.commit();
2453 recv_read_in_area(page_id);
2454 }
2455 } else if (!recv_recovery_create_page_low(
2456 page_id, recv_addr)) {
2457 goto apply;
2458 }
2459 }
2460 }
2461
2462 /* Wait until all the pages have been processed */
2463
2464 while (recv_sys->n_addrs || buf_get_n_pending_read_ios()) {
2465 const bool abort = recv_sys->found_corrupt_log
2466 || recv_sys->found_corrupt_fs;
2467
2468 if (recv_sys->found_corrupt_fs && !srv_force_recovery) {
2469 ib::info() << "Set innodb_force_recovery=1"
2470 " to ignore corrupted pages.";
2471 }
2472
2473 mutex_exit(&(recv_sys->mutex));
2474
2475 if (abort) {
2476 return;
2477 }
2478
2479 os_thread_sleep(500000);
2480
2481 mutex_enter(&(recv_sys->mutex));
2482 }
2483
2484 if (!last_batch) {
2485 /* Flush all the file pages to disk and invalidate them in
2486 the buffer pool */
2487
2488 mutex_exit(&(recv_sys->mutex));
2489 log_mutex_exit();
2490
2491 /* Stop the recv_writer thread from issuing any LRU
2492 flush batches. */
2493 mutex_enter(&recv_sys->writer_mutex);
2494
2495 /* Wait for any currently run batch to end. */
2496 buf_flush_wait_LRU_batch_end();
2497
2498 os_event_reset(recv_sys->flush_end);
2499 recv_sys->flush_type = BUF_FLUSH_LIST;
2500 os_event_set(recv_sys->flush_start);
2501 os_event_wait(recv_sys->flush_end);
2502
2503 buf_pool_invalidate();
2504
2505 /* Allow batches from recv_writer thread. */
2506 mutex_exit(&recv_sys->writer_mutex);
2507
2508 log_mutex_enter();
2509 mutex_enter(&(recv_sys->mutex));
2510 mlog_init.reset();
2511 } else if (!recv_no_ibuf_operations) {
2512 /* We skipped this in buf_page_create(). */
2513 mlog_init.ibuf_merge(mtr);
2514 }
2515
2516 recv_sys->apply_log_recs = FALSE;
2517 recv_sys->apply_batch_on = FALSE;
2518
2519 recv_sys_empty_hash();
2520
2521 mutex_exit(&recv_sys->mutex);
2522 }
2523
2524 /** Tries to parse a single log record.
2525 @param[out] type log record type
2526 @param[in] ptr pointer to a buffer
2527 @param[in] end_ptr end of the buffer
2528 @param[out] space_id tablespace identifier
2529 @param[out] page_no page number
2530 @param[in] apply whether to apply MLOG_FILE_* records
2531 @param[out] body start of log record body
2532 @return length of the record, or 0 if the record was not complete */
2533 static
2534 ulint
recv_parse_log_rec(mlog_id_t * type,byte * ptr,byte * end_ptr,ulint * space,ulint * page_no,bool apply,byte ** body)2535 recv_parse_log_rec(
2536 mlog_id_t* type,
2537 byte* ptr,
2538 byte* end_ptr,
2539 ulint* space,
2540 ulint* page_no,
2541 bool apply,
2542 byte** body)
2543 {
2544 byte* new_ptr;
2545
2546 *body = NULL;
2547
2548 MEM_UNDEFINED(type, sizeof *type);
2549 MEM_UNDEFINED(space, sizeof *space);
2550 MEM_UNDEFINED(page_no, sizeof *page_no);
2551 MEM_UNDEFINED(body, sizeof *body);
2552
2553 if (ptr == end_ptr) {
2554
2555 return(0);
2556 }
2557
2558 switch (*ptr) {
2559 #ifdef UNIV_LOG_LSN_DEBUG
2560 case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
2561 case MLOG_LSN:
2562 new_ptr = mlog_parse_initial_log_record(
2563 ptr, end_ptr, type, space, page_no);
2564 if (new_ptr != NULL) {
2565 const lsn_t lsn = static_cast<lsn_t>(
2566 *space) << 32 | *page_no;
2567 ut_a(lsn == recv_sys->recovered_lsn);
2568 }
2569
2570 *type = MLOG_LSN;
2571 return(new_ptr - ptr);
2572 #endif /* UNIV_LOG_LSN_DEBUG */
2573 case MLOG_MULTI_REC_END:
2574 case MLOG_DUMMY_RECORD:
2575 *type = static_cast<mlog_id_t>(*ptr);
2576 return(1);
2577 case MLOG_CHECKPOINT:
2578 if (end_ptr < ptr + SIZE_OF_MLOG_CHECKPOINT) {
2579 return(0);
2580 }
2581 *type = static_cast<mlog_id_t>(*ptr);
2582 return(SIZE_OF_MLOG_CHECKPOINT);
2583 case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
2584 case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
2585 case MLOG_CHECKPOINT | MLOG_SINGLE_REC_FLAG:
2586 ib::error() << "Incorrect log record type "
2587 << ib::hex(unsigned(*ptr));
2588 recv_sys->found_corrupt_log = true;
2589 return(0);
2590 }
2591
2592 new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
2593 page_no);
2594 *body = new_ptr;
2595
2596 if (UNIV_UNLIKELY(!new_ptr)) {
2597
2598 return(0);
2599 }
2600
2601 const byte* old_ptr = new_ptr;
2602 new_ptr = recv_parse_or_apply_log_rec_body(
2603 *type, new_ptr, end_ptr, *space, *page_no, apply, NULL, NULL);
2604
2605 if (UNIV_UNLIKELY(new_ptr == NULL)) {
2606 return(0);
2607 }
2608
2609 if (*page_no == 0 && *type == MLOG_4BYTES
2610 && apply
2611 && mach_read_from_2(old_ptr) == FSP_HEADER_OFFSET + FSP_SIZE) {
2612 old_ptr += 2;
2613
2614 ulint size = mach_parse_compressed(&old_ptr, end_ptr);
2615
2616 recv_spaces_t::iterator it = recv_spaces.find(*space);
2617
2618 ut_ad(!recv_sys->mlog_checkpoint_lsn
2619 || *space == TRX_SYS_SPACE
2620 || srv_is_undo_tablespace(*space)
2621 || it != recv_spaces.end());
2622
2623 if (it != recv_spaces.end() && !it->second.space) {
2624 it->second.size = size;
2625 }
2626
2627 fil_space_set_recv_size(*space, size);
2628 }
2629
2630 return ulint(new_ptr - ptr);
2631 }
2632
2633 /*******************************************************//**
2634 Calculates the new value for lsn when more data is added to the log. */
2635 static
2636 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)2637 recv_calc_lsn_on_data_add(
2638 /*======================*/
2639 lsn_t lsn, /*!< in: old lsn */
2640 ib_uint64_t len) /*!< in: this many bytes of data is
2641 added, log block headers not included */
2642 {
2643 ulint frag_len;
2644 ib_uint64_t lsn_len;
2645
2646 frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
2647 ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
2648 - LOG_BLOCK_TRL_SIZE);
2649 lsn_len = len;
2650 lsn_len += (lsn_len + frag_len)
2651 / (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
2652 - LOG_BLOCK_TRL_SIZE)
2653 * (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
2654
2655 return(lsn + lsn_len);
2656 }
2657
2658 /** Prints diagnostic info of corrupt log.
2659 @param[in] ptr pointer to corrupt log record
2660 @param[in] type type of the log record (could be garbage)
2661 @param[in] space tablespace ID (could be garbage)
2662 @param[in] page_no page number (could be garbage)
2663 @return whether processing should continue */
2664 ATTRIBUTE_COLD
2665 static
2666 bool
recv_report_corrupt_log(const byte * ptr,int type,ulint space,ulint page_no)2667 recv_report_corrupt_log(
2668 const byte* ptr,
2669 int type,
2670 ulint space,
2671 ulint page_no)
2672 {
2673 ib::error() <<
2674 "############### CORRUPT LOG RECORD FOUND ##################";
2675
2676 const ulint ptr_offset = ulint(ptr - recv_sys->buf);
2677
2678 ib::info() << "Log record type " << type << ", page " << space << ":"
2679 << page_no << ". Log parsing proceeded successfully up to "
2680 << recv_sys->recovered_lsn << ". Previous log record type "
2681 << recv_previous_parsed_rec_type
2682 << ", is multi "
2683 << recv_previous_parsed_rec_is_multi << " Recv offset "
2684 << ptr_offset << ", prev "
2685 << recv_previous_parsed_rec_offset;
2686
2687 ut_ad(ptr <= recv_sys->buf + recv_sys->len);
2688
2689 const ulint limit = 100;
2690 const ulint prev_offset = std::min(recv_previous_parsed_rec_offset,
2691 ptr_offset);
2692 const ulint before = std::min(prev_offset, limit);
2693 const ulint after = std::min(recv_sys->len - ptr_offset, limit);
2694
2695 ib::info() << "Hex dump starting " << before << " bytes before and"
2696 " ending " << after << " bytes after the corrupted record:";
2697
2698 const byte* start = recv_sys->buf + prev_offset - before;
2699
2700 ut_print_buf(stderr, start, ulint(ptr - start) + after);
2701 putc('\n', stderr);
2702
2703 if (!srv_force_recovery) {
2704 ib::info() << "Set innodb_force_recovery to ignore this error.";
2705 return(false);
2706 }
2707
2708 ib::warn() << "The log file may have been corrupt and it is possible"
2709 " that the log scan did not proceed far enough in recovery!"
2710 " Please run CHECK TABLE on your InnoDB tables to check"
2711 " that they are ok! If mysqld crashes after this recovery; "
2712 << FORCE_RECOVERY_MSG;
2713 return(true);
2714 }
2715
2716 /** Report a MLOG_INDEX_LOAD operation.
2717 @param[in] space_id tablespace id
2718 @param[in] page_no page number
2719 @param[in] lsn log sequence number */
2720 ATTRIBUTE_COLD static void
recv_mlog_index_load(ulint space_id,ulint page_no,lsn_t lsn)2721 recv_mlog_index_load(ulint space_id, ulint page_no, lsn_t lsn)
2722 {
2723 recv_spaces_t::iterator it = recv_spaces.find(space_id);
2724 if (it != recv_spaces.end()) {
2725 it->second.mlog_index_load(lsn);
2726 }
2727
2728 if (log_optimized_ddl_op) {
2729 log_optimized_ddl_op(space_id);
2730 }
2731 }
2732
2733 /** Check whether read redo log memory exceeds the available memory
2734 of buffer pool. Store last_stored_lsn if it is not in last phase
2735 @param[in] store whether to store page operations
2736 @param[in] available_mem Available memory in buffer pool to
2737 read redo logs. */
recv_sys_heap_check(store_t * store,ulint available_mem)2738 static bool recv_sys_heap_check(store_t* store, ulint available_mem)
2739 {
2740 if (*store != STORE_NO
2741 && mem_heap_get_size(recv_sys->heap) >= available_mem)
2742 {
2743 if (*store == STORE_YES)
2744 recv_sys->last_stored_lsn= recv_sys->recovered_lsn;
2745
2746 *store= STORE_NO;
2747 DBUG_PRINT("ib_log",("Ran out of memory and last "
2748 "stored lsn " LSN_PF " last stored offset "
2749 ULINTPF "\n",recv_sys->recovered_lsn,
2750 recv_sys->recovered_offset));
2751 return true;
2752 }
2753
2754 return false;
2755 }
2756
2757 /** Parse log records from a buffer and optionally store them to a
2758 hash table to wait merging to file pages.
2759 @param[in] checkpoint_lsn the LSN of the latest checkpoint
2760 @param[in] store whether to store page operations
2761 @param[in] available_mem memory to read the redo logs
2762 @param[in] apply whether to apply the records
2763 @return whether MLOG_CHECKPOINT record was seen the first time,
2764 or corruption was noticed */
recv_parse_log_recs(lsn_t checkpoint_lsn,store_t * store,ulint available_mem,bool apply)2765 bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t* store,
2766 ulint available_mem, bool apply)
2767 {
2768 byte* ptr;
2769 byte* end_ptr;
2770 bool single_rec;
2771 ulint len;
2772 lsn_t new_recovered_lsn;
2773 lsn_t old_lsn;
2774 mlog_id_t type;
2775 ulint space;
2776 ulint page_no;
2777 byte* body;
2778 const bool last_phase = (*store == STORE_IF_EXISTS);
2779
2780 ut_ad(log_mutex_own());
2781 ut_ad(mutex_own(&recv_sys->mutex));
2782 ut_ad(recv_sys->parse_start_lsn != 0);
2783 loop:
2784 ptr = recv_sys->buf + recv_sys->recovered_offset;
2785
2786 end_ptr = recv_sys->buf + recv_sys->len;
2787
2788 if (ptr == end_ptr) {
2789
2790 return(false);
2791 }
2792
2793 /* Check for memory overflow and ignore the parsing of remaining
2794 redo log records if InnoDB ran out of memory */
2795 if (recv_sys_heap_check(store, available_mem) && last_phase) {
2796 return false;
2797 }
2798
2799 switch (*ptr) {
2800 case MLOG_CHECKPOINT:
2801 #ifdef UNIV_LOG_LSN_DEBUG
2802 case MLOG_LSN:
2803 #endif /* UNIV_LOG_LSN_DEBUG */
2804 case MLOG_DUMMY_RECORD:
2805 single_rec = true;
2806 break;
2807 default:
2808 single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
2809 }
2810
2811 if (single_rec) {
2812 /* The mtr did not modify multiple pages */
2813
2814 old_lsn = recv_sys->recovered_lsn;
2815
2816 /* Try to parse a log record, fetching its type, space id,
2817 page no, and a pointer to the body of the log record */
2818
2819 len = recv_parse_log_rec(&type, ptr, end_ptr, &space,
2820 &page_no, apply, &body);
2821
2822 if (UNIV_UNLIKELY(recv_sys->found_corrupt_log)) {
2823 recv_report_corrupt_log(ptr, type, space, page_no);
2824 return(true);
2825 }
2826
2827 if (UNIV_UNLIKELY(recv_sys->found_corrupt_fs)) {
2828 return(true);
2829 }
2830
2831 if (len == 0) {
2832 return(false);
2833 }
2834
2835 new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
2836
2837 if (new_recovered_lsn > recv_sys->scanned_lsn) {
2838 /* The log record filled a log block, and we require
2839 that also the next log block should have been scanned
2840 in */
2841
2842 return(false);
2843 }
2844
2845 recv_previous_parsed_rec_type = type;
2846 recv_previous_parsed_rec_offset = recv_sys->recovered_offset;
2847 recv_previous_parsed_rec_is_multi = 0;
2848
2849 recv_sys->recovered_offset += len;
2850 recv_sys->recovered_lsn = new_recovered_lsn;
2851
2852 switch (type) {
2853 lsn_t lsn;
2854 case MLOG_DUMMY_RECORD:
2855 /* Do nothing */
2856 break;
2857 case MLOG_CHECKPOINT:
2858 compile_time_assert(SIZE_OF_MLOG_CHECKPOINT == 1 + 8);
2859 lsn = mach_read_from_8(ptr + 1);
2860
2861 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2862 fprintf(stderr,
2863 "MLOG_CHECKPOINT(" LSN_PF ") %s at "
2864 LSN_PF "\n", lsn,
2865 lsn != checkpoint_lsn ? "ignored"
2866 : recv_sys->mlog_checkpoint_lsn
2867 ? "reread" : "read",
2868 recv_sys->recovered_lsn);
2869 }
2870
2871 DBUG_PRINT("ib_log",
2872 ("MLOG_CHECKPOINT(" LSN_PF ") %s at "
2873 LSN_PF,
2874 lsn,
2875 lsn != checkpoint_lsn ? "ignored"
2876 : recv_sys->mlog_checkpoint_lsn
2877 ? "reread" : "read",
2878 recv_sys->recovered_lsn));
2879
2880 if (lsn == checkpoint_lsn) {
2881 if (recv_sys->mlog_checkpoint_lsn) {
2882 /* There can be multiple
2883 MLOG_CHECKPOINT lsn for the
2884 same checkpoint. */
2885 break;
2886 }
2887 recv_sys->mlog_checkpoint_lsn
2888 = recv_sys->recovered_lsn;
2889 return(true);
2890 }
2891 break;
2892 #ifdef UNIV_LOG_LSN_DEBUG
2893 case MLOG_LSN:
2894 /* Do not add these records to the hash table.
2895 The page number and space id fields are misused
2896 for something else. */
2897 break;
2898 #endif /* UNIV_LOG_LSN_DEBUG */
2899 default:
2900 switch (*store) {
2901 case STORE_NO:
2902 break;
2903 case STORE_IF_EXISTS:
2904 if (fil_space_get_flags(space)
2905 == ULINT_UNDEFINED) {
2906 break;
2907 }
2908 /* fall through */
2909 case STORE_YES:
2910 recv_add_to_hash_table(
2911 type, space, page_no, body,
2912 ptr + len, old_lsn,
2913 recv_sys->recovered_lsn);
2914 }
2915 /* fall through */
2916 case MLOG_INDEX_LOAD:
2917 if (type == MLOG_INDEX_LOAD) {
2918 recv_mlog_index_load(space, page_no, old_lsn);
2919 }
2920 /* fall through */
2921 case MLOG_FILE_NAME:
2922 case MLOG_FILE_DELETE:
2923 case MLOG_FILE_CREATE2:
2924 case MLOG_FILE_RENAME2:
2925 case MLOG_TRUNCATE:
2926 /* These were already handled by
2927 recv_parse_log_rec() and
2928 recv_parse_or_apply_log_rec_body(). */
2929 DBUG_PRINT("ib_log",
2930 ("scan " LSN_PF ": log rec %s"
2931 " len " ULINTPF
2932 " page " ULINTPF ":" ULINTPF,
2933 old_lsn, get_mlog_string(type),
2934 len, space, page_no));
2935 }
2936 } else {
2937 /* Check that all the records associated with the single mtr
2938 are included within the buffer */
2939
2940 ulint total_len = 0;
2941 ulint n_recs = 0;
2942 bool only_mlog_file = true;
2943 ulint mlog_rec_len = 0;
2944
2945 for (;;) {
2946 len = recv_parse_log_rec(
2947 &type, ptr, end_ptr, &space, &page_no,
2948 false, &body);
2949
2950 if (UNIV_UNLIKELY(recv_sys->found_corrupt_log)) {
2951 corrupted_log:
2952 recv_report_corrupt_log(
2953 ptr, type, space, page_no);
2954 return(true);
2955 }
2956
2957 if (ptr == end_ptr) {
2958 } else if (type == MLOG_CHECKPOINT
2959 || (*ptr & MLOG_SINGLE_REC_FLAG)) {
2960 recv_sys->found_corrupt_log = true;
2961 goto corrupted_log;
2962 }
2963
2964 if (recv_sys->found_corrupt_fs) {
2965 return(true);
2966 }
2967
2968 if (len == 0) {
2969 return(false);
2970 }
2971
2972 recv_previous_parsed_rec_type = type;
2973 recv_previous_parsed_rec_offset
2974 = recv_sys->recovered_offset + total_len;
2975 recv_previous_parsed_rec_is_multi = 1;
2976
2977 /* MLOG_FILE_NAME redo log records doesn't make changes
2978 to persistent data. If only MLOG_FILE_NAME redo
2979 log record exists then reset the parsing buffer pointer
2980 by changing recovered_lsn and recovered_offset. */
2981 if (type != MLOG_FILE_NAME && only_mlog_file == true) {
2982 only_mlog_file = false;
2983 }
2984
2985 if (only_mlog_file) {
2986 new_recovered_lsn = recv_calc_lsn_on_data_add(
2987 recv_sys->recovered_lsn, len);
2988 mlog_rec_len += len;
2989 recv_sys->recovered_offset += len;
2990 recv_sys->recovered_lsn = new_recovered_lsn;
2991 }
2992
2993 total_len += len;
2994 n_recs++;
2995
2996 ptr += len;
2997
2998 if (type == MLOG_MULTI_REC_END) {
2999 DBUG_PRINT("ib_log",
3000 ("scan " LSN_PF
3001 ": multi-log end"
3002 " total_len " ULINTPF
3003 " n=" ULINTPF,
3004 recv_sys->recovered_lsn,
3005 total_len, n_recs));
3006 total_len -= mlog_rec_len;
3007 break;
3008 }
3009
3010 DBUG_PRINT("ib_log",
3011 ("scan " LSN_PF ": multi-log rec %s"
3012 " len " ULINTPF
3013 " page " ULINTPF ":" ULINTPF,
3014 recv_sys->recovered_lsn,
3015 get_mlog_string(type), len, space, page_no));
3016 }
3017
3018 new_recovered_lsn = recv_calc_lsn_on_data_add(
3019 recv_sys->recovered_lsn, total_len);
3020
3021 if (new_recovered_lsn > recv_sys->scanned_lsn) {
3022 /* The log record filled a log block, and we require
3023 that also the next log block should have been scanned
3024 in */
3025
3026 return(false);
3027 }
3028
3029 /* Add all the records to the hash table */
3030
3031 ptr = recv_sys->buf + recv_sys->recovered_offset;
3032
3033 for (;;) {
3034 old_lsn = recv_sys->recovered_lsn;
3035 /* This will apply MLOG_FILE_ records. We
3036 had to skip them in the first scan, because we
3037 did not know if the mini-transaction was
3038 completely recovered (until MLOG_MULTI_REC_END). */
3039 len = recv_parse_log_rec(
3040 &type, ptr, end_ptr, &space, &page_no,
3041 apply, &body);
3042
3043 if (UNIV_UNLIKELY(recv_sys->found_corrupt_log)
3044 && !recv_report_corrupt_log(
3045 ptr, type, space, page_no)) {
3046 return(true);
3047 }
3048
3049 if (UNIV_UNLIKELY(recv_sys->found_corrupt_fs)) {
3050 return(true);
3051 }
3052
3053 ut_a(len != 0);
3054 ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
3055
3056 recv_sys->recovered_offset += len;
3057 recv_sys->recovered_lsn
3058 = recv_calc_lsn_on_data_add(old_lsn, len);
3059
3060 switch (type) {
3061 case MLOG_MULTI_REC_END:
3062 /* Found the end mark for the records */
3063 goto loop;
3064 #ifdef UNIV_LOG_LSN_DEBUG
3065 case MLOG_LSN:
3066 /* Do not add these records to the hash table.
3067 The page number and space id fields are misused
3068 for something else. */
3069 break;
3070 #endif /* UNIV_LOG_LSN_DEBUG */
3071 case MLOG_INDEX_LOAD:
3072 recv_mlog_index_load(space, page_no, old_lsn);
3073 break;
3074 case MLOG_FILE_NAME:
3075 case MLOG_FILE_DELETE:
3076 case MLOG_FILE_CREATE2:
3077 case MLOG_FILE_RENAME2:
3078 case MLOG_TRUNCATE:
3079 /* These were already handled by
3080 recv_parse_log_rec() and
3081 recv_parse_or_apply_log_rec_body(). */
3082 break;
3083 default:
3084 switch (*store) {
3085 case STORE_NO:
3086 break;
3087 case STORE_IF_EXISTS:
3088 if (fil_space_get_flags(space)
3089 == ULINT_UNDEFINED) {
3090 break;
3091 }
3092 /* fall through */
3093 case STORE_YES:
3094 recv_add_to_hash_table(
3095 type, space, page_no,
3096 body, ptr + len,
3097 old_lsn,
3098 new_recovered_lsn);
3099 }
3100 }
3101
3102 ptr += len;
3103 }
3104 }
3105
3106 goto loop;
3107 }
3108
3109 /** Adds data from a new log block to the parsing buffer of recv_sys if
3110 recv_sys->parse_start_lsn is non-zero.
3111 @param[in] log_block log block to add
3112 @param[in] scanned_lsn lsn of how far we were able to find
3113 data in this log block
3114 @return true if more data added */
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)3115 bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
3116 {
3117 ulint more_len;
3118 ulint data_len;
3119 ulint start_offset;
3120 ulint end_offset;
3121
3122 ut_ad(scanned_lsn >= recv_sys->scanned_lsn);
3123
3124 if (!recv_sys->parse_start_lsn) {
3125 /* Cannot start parsing yet because no start point for
3126 it found */
3127 return(false);
3128 }
3129
3130 data_len = log_block_get_data_len(log_block);
3131
3132 if (recv_sys->parse_start_lsn >= scanned_lsn) {
3133
3134 return(false);
3135
3136 } else if (recv_sys->scanned_lsn >= scanned_lsn) {
3137
3138 return(false);
3139
3140 } else if (recv_sys->parse_start_lsn > recv_sys->scanned_lsn) {
3141 more_len = (ulint) (scanned_lsn - recv_sys->parse_start_lsn);
3142 } else {
3143 more_len = (ulint) (scanned_lsn - recv_sys->scanned_lsn);
3144 }
3145
3146 if (more_len == 0) {
3147 return(false);
3148 }
3149
3150 ut_ad(data_len >= more_len);
3151
3152 start_offset = data_len - more_len;
3153
3154 if (start_offset < LOG_BLOCK_HDR_SIZE) {
3155 start_offset = LOG_BLOCK_HDR_SIZE;
3156 }
3157
3158 end_offset = data_len;
3159
3160 if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
3161 end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
3162 }
3163
3164 ut_ad(start_offset <= end_offset);
3165
3166 if (start_offset < end_offset) {
3167 ut_memcpy(recv_sys->buf + recv_sys->len,
3168 log_block + start_offset, end_offset - start_offset);
3169
3170 recv_sys->len += end_offset - start_offset;
3171
3172 ut_a(recv_sys->len <= RECV_PARSING_BUF_SIZE);
3173 }
3174
3175 return(true);
3176 }
3177
3178 /** Moves the parsing buffer data left to the buffer start. */
recv_sys_justify_left_parsing_buf()3179 void recv_sys_justify_left_parsing_buf()
3180 {
3181 memmove(recv_sys->buf,
3182 recv_sys->buf + recv_sys->recovered_offset,
3183 recv_sys->len - recv_sys->recovered_offset);
3184
3185 recv_sys->len -= recv_sys->recovered_offset;
3186
3187 recv_sys->recovered_offset = 0;
3188 }
3189
3190 /** Scan redo log from a buffer and stores new log data to the parsing buffer.
3191 Parse and hash the log records if new data found.
3192 Apply log records automatically when the hash table becomes full.
3193 @param[in] available_mem we let the hash table of recs to
3194 grow to this size, at the maximum
3195 @param[in,out] store_to_hash whether the records should be
3196 stored to the hash table; this is
3197 reset if just debug checking is
3198 needed, or when the available_mem
3199 runs out
3200 @param[in] log_block log segment
3201 @param[in] checkpoint_lsn latest checkpoint LSN
3202 @param[in] start_lsn buffer start LSN
3203 @param[in] end_lsn buffer end LSN
3204 @param[in,out] contiguous_lsn it is known that all groups contain
3205 contiguous log data upto this lsn
3206 @param[out] group_scanned_lsn scanning succeeded upto this lsn
3207 @return true if not able to scan any more in this log group */
recv_scan_log_recs(ulint available_mem,store_t * store_to_hash,const byte * log_block,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t end_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)3208 static bool recv_scan_log_recs(
3209 ulint available_mem,
3210 store_t* store_to_hash,
3211 const byte* log_block,
3212 lsn_t checkpoint_lsn,
3213 lsn_t start_lsn,
3214 lsn_t end_lsn,
3215 lsn_t* contiguous_lsn,
3216 lsn_t* group_scanned_lsn)
3217 {
3218 lsn_t scanned_lsn = start_lsn;
3219 bool finished = false;
3220 ulint data_len;
3221 bool more_data = false;
3222 bool apply = recv_sys->mlog_checkpoint_lsn != 0;
3223 ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
3224 const bool last_phase = (*store_to_hash == STORE_IF_EXISTS);
3225 ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3226 ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3227 ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
3228
3229 const byte* const log_end = log_block
3230 + ulint(end_lsn - start_lsn);
3231 do {
3232 ut_ad(!finished);
3233
3234 if (log_block_get_flush_bit(log_block)) {
3235 /* This block was a start of a log flush operation:
3236 we know that the previous flush operation must have
3237 been completed for all log groups before this block
3238 can have been flushed to any of the groups. Therefore,
3239 we know that log data is contiguous up to scanned_lsn
3240 in all non-corrupt log groups. */
3241
3242 if (scanned_lsn > *contiguous_lsn) {
3243 *contiguous_lsn = scanned_lsn;
3244 }
3245 }
3246
3247 data_len = log_block_get_data_len(log_block);
3248
3249 if (scanned_lsn + data_len > recv_sys->scanned_lsn
3250 && log_block_get_checkpoint_no(log_block)
3251 < recv_sys->scanned_checkpoint_no
3252 && (recv_sys->scanned_checkpoint_no
3253 - log_block_get_checkpoint_no(log_block)
3254 > 0x80000000UL)) {
3255
3256 /* Garbage from a log buffer flush which was made
3257 before the most recent database recovery */
3258 finished = true;
3259 break;
3260 }
3261
3262 if (!recv_sys->parse_start_lsn
3263 && (log_block_get_first_rec_group(log_block) > 0)) {
3264
3265 /* We found a point from which to start the parsing
3266 of log records */
3267
3268 recv_sys->parse_start_lsn = scanned_lsn
3269 + log_block_get_first_rec_group(log_block);
3270 recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
3271 recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
3272 }
3273
3274 scanned_lsn += data_len;
3275
3276 if (data_len == LOG_BLOCK_HDR_SIZE + SIZE_OF_MLOG_CHECKPOINT
3277 && scanned_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3278 && log_block[LOG_BLOCK_HDR_SIZE] == MLOG_CHECKPOINT
3279 && checkpoint_lsn == mach_read_from_8(LOG_BLOCK_HDR_SIZE
3280 + 1 + log_block)) {
3281 /* The redo log is logically empty. */
3282 ut_ad(recv_sys->mlog_checkpoint_lsn == 0
3283 || recv_sys->mlog_checkpoint_lsn
3284 == checkpoint_lsn);
3285 recv_sys->mlog_checkpoint_lsn = checkpoint_lsn;
3286 DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF,
3287 scanned_lsn));
3288 finished = true;
3289 break;
3290 }
3291
3292 if (scanned_lsn > recv_sys->scanned_lsn) {
3293 ut_ad(!srv_log_files_created);
3294 if (!recv_needed_recovery) {
3295 recv_needed_recovery = true;
3296
3297 if (srv_read_only_mode) {
3298 ib::warn() << "innodb_read_only"
3299 " prevents crash recovery";
3300 return(true);
3301 }
3302
3303 ib::info() << "Starting crash recovery from"
3304 " checkpoint LSN="
3305 << recv_sys->scanned_lsn;
3306 }
3307
3308 /* We were able to find more log data: add it to the
3309 parsing buffer if parse_start_lsn is already
3310 non-zero */
3311
3312 DBUG_EXECUTE_IF(
3313 "reduce_recv_parsing_buf",
3314 recv_parsing_buf_size
3315 = (70 * 1024);
3316 );
3317
3318 if (recv_sys->len + 4 * OS_FILE_LOG_BLOCK_SIZE
3319 >= recv_parsing_buf_size) {
3320 ib::error() << "Log parsing buffer overflow."
3321 " Recovery may have failed!";
3322
3323 recv_sys->found_corrupt_log = true;
3324
3325 if (!srv_force_recovery) {
3326 ib::error()
3327 << "Set innodb_force_recovery"
3328 " to ignore this error.";
3329 return(true);
3330 }
3331 } else if (!recv_sys->found_corrupt_log) {
3332 more_data = recv_sys_add_to_parsing_buf(
3333 log_block, scanned_lsn);
3334 }
3335
3336 recv_sys->scanned_lsn = scanned_lsn;
3337 recv_sys->scanned_checkpoint_no
3338 = log_block_get_checkpoint_no(log_block);
3339 }
3340
3341 /* During last phase of scanning, there can be redo logs
3342 left in recv_sys->buf to parse & store it in recv_sys->heap */
3343 if (last_phase
3344 && recv_sys->recovered_lsn < recv_sys->scanned_lsn) {
3345 more_data = true;
3346 }
3347
3348 if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3349 /* Log data for this group ends here */
3350 finished = true;
3351 break;
3352 } else {
3353 log_block += OS_FILE_LOG_BLOCK_SIZE;
3354 }
3355 } while (log_block < log_end);
3356
3357 *group_scanned_lsn = scanned_lsn;
3358
3359 mutex_enter(&recv_sys->mutex);
3360
3361 if (more_data && !recv_sys->found_corrupt_log) {
3362 /* Try to parse more log records */
3363
3364 if (recv_parse_log_recs(checkpoint_lsn,
3365 store_to_hash, available_mem,
3366 apply)) {
3367 ut_ad(recv_sys->found_corrupt_log
3368 || recv_sys->found_corrupt_fs
3369 || recv_sys->mlog_checkpoint_lsn
3370 == recv_sys->recovered_lsn);
3371 finished = true;
3372 goto func_exit;
3373 }
3374
3375 recv_sys_heap_check(store_to_hash, available_mem);
3376
3377 if (recv_sys->recovered_offset > recv_parsing_buf_size / 4) {
3378 /* Move parsing buffer data to the buffer start */
3379 recv_sys_justify_left_parsing_buf();
3380 }
3381
3382 /* Need to re-parse the redo log which're stored
3383 in recv_sys->buf */
3384 if (last_phase && *store_to_hash == STORE_NO) {
3385 finished = false;
3386 }
3387 }
3388
3389 func_exit:
3390 mutex_exit(&recv_sys->mutex);
3391 return(finished);
3392 }
3393
3394 /** Scans log from a buffer and stores new log data to the parsing buffer.
3395 Parses and hashes the log records if new data found.
3396 @param[in] checkpoint_lsn latest checkpoint log sequence number
3397 @param[in,out] contiguous_lsn log sequence number
3398 until which all redo log has been scanned
3399 @param[in] last_phase whether changes
3400 can be applied to the tablespaces
3401 @return whether rescan is needed (not everything was stored) */
3402 static
3403 bool
recv_group_scan_log_recs(lsn_t checkpoint_lsn,lsn_t * contiguous_lsn,bool last_phase)3404 recv_group_scan_log_recs(
3405 lsn_t checkpoint_lsn,
3406 lsn_t* contiguous_lsn,
3407 bool last_phase)
3408 {
3409 DBUG_ENTER("recv_group_scan_log_recs");
3410 DBUG_ASSERT(!last_phase || recv_sys->mlog_checkpoint_lsn > 0);
3411
3412 mutex_enter(&recv_sys->mutex);
3413 recv_sys->len = 0;
3414 recv_sys->recovered_offset = 0;
3415 recv_sys->n_addrs = 0;
3416 recv_sys_empty_hash();
3417 srv_start_lsn = *contiguous_lsn;
3418 recv_sys->parse_start_lsn = *contiguous_lsn;
3419 recv_sys->scanned_lsn = *contiguous_lsn;
3420 recv_sys->recovered_lsn = *contiguous_lsn;
3421 recv_sys->scanned_checkpoint_no = 0;
3422 recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
3423 recv_previous_parsed_rec_offset = 0;
3424 recv_previous_parsed_rec_is_multi = 0;
3425 ut_ad(recv_max_page_lsn == 0);
3426 ut_ad(last_phase || !recv_writer_thread_active);
3427 mutex_exit(&recv_sys->mutex);
3428
3429 lsn_t start_lsn;
3430 lsn_t end_lsn;
3431 store_t store_to_hash = recv_sys->mlog_checkpoint_lsn == 0
3432 ? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
3433 ulint available_mem = (buf_pool_get_n_pages() * 2 / 3)
3434 << srv_page_size_shift;
3435
3436 log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
3437 ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3438
3439 do {
3440 if (last_phase && store_to_hash == STORE_NO) {
3441 store_to_hash = STORE_IF_EXISTS;
3442 /* We must not allow change buffer
3443 merge here, because it would generate
3444 redo log records before we have
3445 finished the redo log scan. */
3446 recv_apply_hashed_log_recs(false);
3447 /* Rescan the redo logs from last stored lsn */
3448 end_lsn = recv_sys->recovered_lsn;
3449 }
3450
3451 start_lsn = ut_uint64_align_down(end_lsn,
3452 OS_FILE_LOG_BLOCK_SIZE);
3453 end_lsn = start_lsn;
3454 log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
3455 } while (end_lsn != start_lsn
3456 && !recv_scan_log_recs(
3457 available_mem, &store_to_hash, log_sys.buf,
3458 checkpoint_lsn,
3459 start_lsn, end_lsn,
3460 contiguous_lsn, &log_sys.log.scanned_lsn));
3461
3462 if (recv_sys->found_corrupt_log || recv_sys->found_corrupt_fs) {
3463 DBUG_RETURN(false);
3464 }
3465
3466 DBUG_PRINT("ib_log", ("%s " LSN_PF " completed",
3467 last_phase ? "rescan" : "scan",
3468 log_sys.log.scanned_lsn));
3469
3470 DBUG_RETURN(store_to_hash == STORE_NO);
3471 }
3472
3473 /** Report a missing tablespace for which page-redo log exists.
3474 @param[in] err previous error code
3475 @param[in] i tablespace descriptor
3476 @return new error code */
3477 static
3478 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3479 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3480 {
3481 if (is_mariabackup_restore_or_export()) {
3482 ib::warn() << "Tablespace " << i->first << " was not"
3483 " found at " << i->second.name << " when"
3484 " restoring a (partial?) backup. All redo log"
3485 " for this file will be ignored!";
3486 return(err);
3487 }
3488
3489 if (srv_force_recovery == 0) {
3490 ib::error() << "Tablespace " << i->first << " was not"
3491 " found at " << i->second.name << ".";
3492
3493 if (err == DB_SUCCESS) {
3494 ib::error() << "Set innodb_force_recovery=1 to"
3495 " ignore this and to permanently lose"
3496 " all changes to the tablespace.";
3497 err = DB_TABLESPACE_NOT_FOUND;
3498 }
3499 } else {
3500 ib::warn() << "Tablespace " << i->first << " was not"
3501 " found at " << i->second.name << ", and"
3502 " innodb_force_recovery was set. All redo log"
3503 " for this tablespace will be ignored!";
3504 }
3505
3506 return(err);
3507 }
3508
3509 /** Report the missing tablespace and discard the redo logs for the deleted
3510 tablespace.
3511 @param[in] rescan rescan of redo logs is needed
3512 if hash table ran out of memory
3513 @param[out] missing_tablespace missing tablespace exists or not
3514 @return error code or DB_SUCCESS. */
3515 static MY_ATTRIBUTE((warn_unused_result))
3516 dberr_t
recv_validate_tablespace(bool rescan,bool & missing_tablespace)3517 recv_validate_tablespace(bool rescan, bool& missing_tablespace)
3518 {
3519 dberr_t err = DB_SUCCESS;
3520
3521 for (ulint h = 0; h < hash_get_n_cells(recv_sys->addr_hash); h++) {
3522 for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
3523 HASH_GET_FIRST(recv_sys->addr_hash, h));
3524 recv_addr != 0;
3525 recv_addr = static_cast<recv_addr_t*>(
3526 HASH_GET_NEXT(addr_hash, recv_addr))) {
3527
3528 const ulint space = recv_addr->space;
3529
3530 if (is_predefined_tablespace(space)) {
3531 continue;
3532 }
3533
3534 recv_spaces_t::iterator i = recv_spaces.find(space);
3535 ut_ad(i != recv_spaces.end());
3536
3537 switch (i->second.status) {
3538 case file_name_t::MISSING:
3539 err = recv_init_missing_space(err, i);
3540 i->second.status = file_name_t::DELETED;
3541 /* fall through */
3542 case file_name_t::DELETED:
3543 recv_addr->state = RECV_DISCARDED;
3544 /* fall through */
3545 case file_name_t::NORMAL:
3546 continue;
3547 }
3548 ut_ad(0);
3549 }
3550 }
3551
3552 if (err != DB_SUCCESS) {
3553 return(err);
3554 }
3555
3556 /* When rescan is not needed then recv_sys->addr_hash will have
3557 all space id belongs to redo log. If rescan is needed and
3558 innodb_force_recovery > 0 then InnoDB can ignore missing tablespace. */
3559 for (recv_spaces_t::iterator i = recv_spaces.begin();
3560 i != recv_spaces.end(); i++) {
3561
3562 if (UNIV_LIKELY(i->second.status != file_name_t::MISSING)) {
3563 continue;
3564 }
3565
3566 missing_tablespace = true;
3567
3568 if (srv_force_recovery > 0) {
3569 ib::warn() << "Tablespace " << i->first
3570 <<" was not found at " << i->second.name
3571 <<", and innodb_force_recovery was set."
3572 <<" All redo log for this tablespace"
3573 <<" will be ignored!";
3574 continue;
3575 }
3576
3577 if (!rescan) {
3578 ib::info() << "Tablespace " << i->first
3579 << " was not found at '"
3580 << i->second.name << "', but there"
3581 <<" were no modifications either.";
3582 }
3583 }
3584
3585 if (!rescan || srv_force_recovery > 0) {
3586 missing_tablespace = false;
3587 }
3588
3589 return DB_SUCCESS;
3590 }
3591
3592 /** Check if all tablespaces were found for crash recovery.
3593 @param[in] rescan rescan of redo logs is needed
3594 @param[out] missing_tablespace missing table exists
3595 @return error code or DB_SUCCESS */
3596 static MY_ATTRIBUTE((warn_unused_result))
3597 dberr_t
recv_init_crash_recovery_spaces(bool rescan,bool & missing_tablespace)3598 recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
3599 {
3600 bool flag_deleted = false;
3601
3602 ut_ad(!srv_read_only_mode);
3603 ut_ad(recv_needed_recovery);
3604
3605 for (recv_spaces_t::iterator i = recv_spaces.begin();
3606 i != recv_spaces.end(); i++) {
3607 ut_ad(!is_predefined_tablespace(i->first));
3608 ut_ad(i->second.status != file_name_t::DELETED || !i->second.space);
3609
3610 if (i->second.status == file_name_t::DELETED) {
3611 /* The tablespace was deleted,
3612 so we can ignore any redo log for it. */
3613 flag_deleted = true;
3614 } else if (i->second.space != NULL) {
3615 /* The tablespace was found, and there
3616 are some redo log records for it. */
3617 fil_names_dirty(i->second.space);
3618 i->second.space->enable_lsn = i->second.enable_lsn;
3619 } else if (i->second.name == "") {
3620 ib::error() << "Missing MLOG_FILE_NAME"
3621 " or MLOG_FILE_DELETE"
3622 " before MLOG_CHECKPOINT for tablespace "
3623 << i->first;
3624 recv_sys->found_corrupt_log = true;
3625 return(DB_CORRUPTION);
3626 } else {
3627 i->second.status = file_name_t::MISSING;
3628 flag_deleted = true;
3629 }
3630
3631 ut_ad(i->second.status == file_name_t::DELETED || i->second.name != "");
3632 }
3633
3634 if (flag_deleted) {
3635 return recv_validate_tablespace(rescan, missing_tablespace);
3636 }
3637
3638 return DB_SUCCESS;
3639 }
3640
3641 /** Start recovering from a redo log checkpoint.
3642 @see recv_recovery_from_checkpoint_finish
3643 @param[in] flush_lsn FIL_PAGE_FILE_FLUSH_LSN
3644 of first system tablespace page
3645 @return error code or DB_SUCCESS */
3646 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)3647 recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
3648 {
3649 ulint max_cp_field;
3650 lsn_t checkpoint_lsn;
3651 bool rescan;
3652 ib_uint64_t checkpoint_no;
3653 lsn_t contiguous_lsn;
3654 byte* buf;
3655 dberr_t err = DB_SUCCESS;
3656
3657 ut_ad(srv_operation == SRV_OPERATION_NORMAL
3658 || is_mariabackup_restore_or_export());
3659
3660 /* Initialize red-black tree for fast insertions into the
3661 flush_list during recovery process. */
3662 buf_flush_init_flush_rbt();
3663
3664 if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3665
3666 ib::info() << "innodb_force_recovery=6 skips redo log apply";
3667
3668 return(DB_SUCCESS);
3669 }
3670
3671 recv_recovery_on = true;
3672
3673 log_mutex_enter();
3674
3675 err = recv_find_max_checkpoint(&max_cp_field);
3676
3677 if (err != DB_SUCCESS) {
3678
3679 srv_start_lsn = recv_sys->recovered_lsn = log_sys.lsn;
3680 log_mutex_exit();
3681 return(err);
3682 }
3683
3684 log_header_read(max_cp_field);
3685
3686 buf = log_sys.checkpoint_buf;
3687
3688 checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
3689 checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
3690
3691 /* Start reading the log from the checkpoint lsn. The variable
3692 contiguous_lsn contains an lsn up to which the log is known to
3693 be contiguously written. */
3694 recv_sys->mlog_checkpoint_lsn = 0;
3695
3696 ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
3697
3698 const lsn_t end_lsn = mach_read_from_8(
3699 buf + LOG_CHECKPOINT_END_LSN);
3700
3701 ut_ad(recv_sys->n_addrs == 0);
3702 contiguous_lsn = checkpoint_lsn;
3703 switch (log_sys.log.format) {
3704 case 0:
3705 log_mutex_exit();
3706 return recv_log_format_0_recover(checkpoint_lsn,
3707 buf[20 + 32 * 9] == 2);
3708 default:
3709 if (end_lsn == 0) {
3710 break;
3711 }
3712 if (end_lsn >= checkpoint_lsn) {
3713 contiguous_lsn = end_lsn;
3714 break;
3715 }
3716 recv_sys->found_corrupt_log = true;
3717 log_mutex_exit();
3718 return(DB_ERROR);
3719 }
3720
3721 /* Look for MLOG_CHECKPOINT. */
3722 recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
3723 /* The first scan should not have stored or applied any records. */
3724 ut_ad(recv_sys->n_addrs == 0);
3725 ut_ad(!recv_sys->found_corrupt_fs);
3726
3727 if (srv_read_only_mode && recv_needed_recovery) {
3728 log_mutex_exit();
3729 return(DB_READ_ONLY);
3730 }
3731
3732 if (recv_sys->found_corrupt_log && !srv_force_recovery) {
3733 log_mutex_exit();
3734 ib::warn() << "Log scan aborted at LSN " << contiguous_lsn;
3735 return(DB_ERROR);
3736 }
3737
3738 if (recv_sys->mlog_checkpoint_lsn == 0) {
3739 lsn_t scan_lsn = log_sys.log.scanned_lsn;
3740 if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) {
3741 log_mutex_exit();
3742 ib::error err;
3743 err << "Missing MLOG_CHECKPOINT";
3744 if (end_lsn) {
3745 err << " at " << end_lsn;
3746 }
3747 err << " between the checkpoint " << checkpoint_lsn
3748 << " and the end " << scan_lsn << ".";
3749 return(DB_ERROR);
3750 }
3751
3752 log_sys.log.scanned_lsn = checkpoint_lsn;
3753 rescan = false;
3754 } else {
3755 contiguous_lsn = checkpoint_lsn;
3756 rescan = recv_group_scan_log_recs(
3757 checkpoint_lsn, &contiguous_lsn, false);
3758
3759 if ((recv_sys->found_corrupt_log && !srv_force_recovery)
3760 || recv_sys->found_corrupt_fs) {
3761 log_mutex_exit();
3762 return(DB_ERROR);
3763 }
3764 }
3765
3766 /* NOTE: we always do a 'recovery' at startup, but only if
3767 there is something wrong we will print a message to the
3768 user about recovery: */
3769
3770 if (flush_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3771 && recv_sys->mlog_checkpoint_lsn == checkpoint_lsn) {
3772 /* The redo log is logically empty. */
3773 } else if (checkpoint_lsn != flush_lsn) {
3774 ut_ad(!srv_log_files_created);
3775
3776 if (checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT < flush_lsn) {
3777 ib::warn() << "Are you sure you are using the"
3778 " right ib_logfiles to start up the database?"
3779 " Log sequence number in the ib_logfiles is "
3780 << checkpoint_lsn << ", less than the"
3781 " log sequence number in the first system"
3782 " tablespace file header, " << flush_lsn << ".";
3783 }
3784
3785 if (!recv_needed_recovery) {
3786
3787 ib::info() << "The log sequence number " << flush_lsn
3788 << " in the system tablespace does not match"
3789 " the log sequence number " << checkpoint_lsn
3790 << " in the ib_logfiles!";
3791
3792 if (srv_read_only_mode) {
3793 ib::error() << "innodb_read_only"
3794 " prevents crash recovery";
3795 log_mutex_exit();
3796 return(DB_READ_ONLY);
3797 }
3798
3799 recv_needed_recovery = true;
3800 }
3801 }
3802
3803 log_sys.lsn = recv_sys->recovered_lsn;
3804
3805 if (recv_needed_recovery) {
3806 bool missing_tablespace = false;
3807
3808 err = recv_init_crash_recovery_spaces(
3809 rescan, missing_tablespace);
3810
3811 if (err != DB_SUCCESS) {
3812 log_mutex_exit();
3813 return(err);
3814 }
3815
3816 /* If there is any missing tablespace and rescan is needed
3817 then there is a possiblity that hash table will not contain
3818 all space ids redo logs. Rescan the remaining unstored
3819 redo logs for the validation of missing tablespace. */
3820 ut_ad(rescan || !missing_tablespace);
3821
3822 while (missing_tablespace) {
3823 DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
3824 "the missing tablespace. Scan "
3825 "from last stored LSN " LSN_PF,
3826 recv_sys->last_stored_lsn));
3827
3828 lsn_t recent_stored_lsn = recv_sys->last_stored_lsn;
3829 rescan = recv_group_scan_log_recs(
3830 checkpoint_lsn, &recent_stored_lsn, false);
3831
3832 ut_ad(!recv_sys->found_corrupt_fs);
3833
3834 missing_tablespace = false;
3835
3836 err = recv_sys->found_corrupt_log
3837 ? DB_ERROR
3838 : recv_validate_tablespace(
3839 rescan, missing_tablespace);
3840
3841 if (err != DB_SUCCESS) {
3842 log_mutex_exit();
3843 return err;
3844 }
3845
3846 rescan = true;
3847 }
3848
3849 recv_sys->parse_start_lsn = checkpoint_lsn;
3850
3851 if (srv_operation == SRV_OPERATION_NORMAL) {
3852 buf_dblwr_process();
3853 }
3854
3855 ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
3856
3857 /* Spawn the background thread to flush dirty pages
3858 from the buffer pools. */
3859 recv_writer_thread_active = true;
3860 os_thread_create(recv_writer_thread, 0, 0);
3861
3862 if (rescan) {
3863 contiguous_lsn = checkpoint_lsn;
3864
3865 recv_group_scan_log_recs(
3866 checkpoint_lsn, &contiguous_lsn, true);
3867
3868 if ((recv_sys->found_corrupt_log
3869 && !srv_force_recovery)
3870 || recv_sys->found_corrupt_fs) {
3871 log_mutex_exit();
3872 return(DB_ERROR);
3873 }
3874 }
3875 } else {
3876 ut_ad(!rescan || recv_sys->n_addrs == 0);
3877 }
3878
3879 if (log_sys.log.scanned_lsn < checkpoint_lsn
3880 || log_sys.log.scanned_lsn < recv_max_page_lsn) {
3881
3882 ib::error() << "We scanned the log up to "
3883 << log_sys.log.scanned_lsn
3884 << ". A checkpoint was at " << checkpoint_lsn << " and"
3885 " the maximum LSN on a database page was "
3886 << recv_max_page_lsn << ". It is possible that the"
3887 " database is now corrupt!";
3888 }
3889
3890 if (recv_sys->recovered_lsn < checkpoint_lsn) {
3891 log_mutex_exit();
3892
3893 ib::error() << "Recovered only to lsn:"
3894 << recv_sys->recovered_lsn << " checkpoint_lsn: " << checkpoint_lsn;
3895
3896 return(DB_ERROR);
3897 }
3898
3899 log_sys.next_checkpoint_lsn = checkpoint_lsn;
3900 log_sys.next_checkpoint_no = checkpoint_no + 1;
3901
3902 recv_synchronize_groups();
3903
3904 if (!recv_needed_recovery) {
3905 ut_a(checkpoint_lsn == recv_sys->recovered_lsn);
3906 } else {
3907 srv_start_lsn = recv_sys->recovered_lsn;
3908 }
3909
3910 log_sys.buf_free = ulong(log_sys.lsn % OS_FILE_LOG_BLOCK_SIZE);
3911 log_sys.buf_next_to_write = log_sys.buf_free;
3912 log_sys.write_lsn = log_sys.lsn;
3913
3914 log_sys.last_checkpoint_lsn = checkpoint_lsn;
3915
3916 if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL) {
3917 /* Write a MLOG_CHECKPOINT marker as the first thing,
3918 before generating any other redo log. This ensures
3919 that subsequent crash recovery will be possible even
3920 if the server were killed soon after this. */
3921 fil_names_clear(log_sys.last_checkpoint_lsn, true);
3922 }
3923
3924 MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
3925 log_sys.lsn - log_sys.last_checkpoint_lsn);
3926
3927 log_sys.next_checkpoint_no = ++checkpoint_no;
3928
3929 mutex_enter(&recv_sys->mutex);
3930
3931 recv_sys->apply_log_recs = TRUE;
3932 recv_no_ibuf_operations = is_mariabackup_restore_or_export();
3933 ut_d(recv_no_log_write = recv_no_ibuf_operations);
3934
3935 mutex_exit(&recv_sys->mutex);
3936
3937 log_mutex_exit();
3938
3939 recv_lsn_checks_on = true;
3940
3941 /* The database is now ready to start almost normal processing of user
3942 transactions: transaction rollbacks and the application of the log
3943 records in the hash table can be run in background. */
3944
3945 return(DB_SUCCESS);
3946 }
3947
3948 /** Complete recovery from a checkpoint. */
3949 void
recv_recovery_from_checkpoint_finish(void)3950 recv_recovery_from_checkpoint_finish(void)
3951 {
3952 /* Make sure that the recv_writer thread is done. This is
3953 required because it grabs various mutexes and we want to
3954 ensure that when we enable sync_order_checks there is no
3955 mutex currently held by any thread. */
3956 mutex_enter(&recv_sys->writer_mutex);
3957
3958 /* Free the resources of the recovery system */
3959 recv_recovery_on = false;
3960
3961 /* By acquring the mutex we ensure that the recv_writer thread
3962 won't trigger any more LRU batches. Now wait for currently
3963 in progress batches to finish. */
3964 buf_flush_wait_LRU_batch_end();
3965
3966 mutex_exit(&recv_sys->writer_mutex);
3967
3968 ulint count = 0;
3969 while (recv_writer_thread_active) {
3970 ++count;
3971 os_thread_sleep(100000);
3972 if (srv_print_verbose_log && count > 600) {
3973 ib::info() << "Waiting for recv_writer to"
3974 " finish flushing of buffer pool";
3975 count = 0;
3976 }
3977 }
3978
3979 recv_sys_debug_free();
3980
3981 /* Free up the flush_rbt. */
3982 buf_flush_free_flush_rbt();
3983 }
3984
3985 /********************************************************//**
3986 Initiates the rollback of active transactions. */
3987 void
recv_recovery_rollback_active(void)3988 recv_recovery_rollback_active(void)
3989 /*===============================*/
3990 {
3991 ut_ad(!recv_writer_thread_active);
3992
3993 /* Switch latching order checks on in sync0debug.cc, if
3994 --innodb-sync-debug=true (default) */
3995 ut_d(sync_check_enable());
3996
3997 /* We can't start any (DDL) transactions if UNDO logging
3998 has been disabled, additionally disable ROLLBACK of recovered
3999 user transactions. */
4000 if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO
4001 && !srv_read_only_mode) {
4002
4003 /* Drop partially created indexes. */
4004 row_merge_drop_temp_indexes();
4005 /* Drop garbage tables. */
4006 row_mysql_drop_garbage_tables();
4007
4008 /* Drop any auxiliary tables that were not dropped when the
4009 parent table was dropped. This can happen if the parent table
4010 was dropped but the server crashed before the auxiliary tables
4011 were dropped. */
4012 fts_drop_orphaned_tables();
4013
4014 /* Rollback the uncommitted transactions which have no user
4015 session */
4016
4017 trx_rollback_is_active = true;
4018 os_thread_create(trx_rollback_all_recovered, 0, 0);
4019 }
4020 }
4021
validate_page(const page_id_t page_id,const byte * page,const fil_space_t * space,byte * tmp_buf)4022 bool recv_dblwr_t::validate_page(const page_id_t page_id,
4023 const byte *page,
4024 const fil_space_t *space,
4025 byte *tmp_buf)
4026 {
4027 if (page_id.page_no() == 0)
4028 {
4029 ulint flags= fsp_header_get_flags(page);
4030 if (!fsp_flags_is_valid(flags, page_id.space()))
4031 {
4032 ulint cflags= fsp_flags_convert_from_101(flags);
4033 if (cflags == ULINT_UNDEFINED)
4034 {
4035 ib::warn() << "Ignoring a doublewrite copy of page " << page_id
4036 << "due to invalid flags " << ib::hex(flags);
4037 return false;
4038 }
4039
4040 flags= cflags;
4041 }
4042
4043 /* Page 0 is never page_compressed or encrypted. */
4044 return !buf_page_is_corrupted(true, page, page_size_t(flags));
4045 }
4046
4047 ut_ad(tmp_buf);
4048 byte *tmp_frame= tmp_buf;
4049 byte *tmp_page= tmp_buf + srv_page_size;
4050 const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
4051 const page_size_t page_size(space->flags);
4052 const bool expect_encrypted= space->crypt_data &&
4053 space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
4054
4055 if (expect_encrypted &&
4056 mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
4057 {
4058 if (!fil_space_verify_crypt_checksum(page, page_size))
4059 return false;
4060 if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
4061 return true;
4062 if (page_size.is_compressed())
4063 return false;
4064 memcpy(tmp_page, page, page_size.physical());
4065 if (!fil_space_decrypt(space, tmp_frame, tmp_page))
4066 return false;
4067 }
4068
4069 switch (page_type) {
4070 case FIL_PAGE_PAGE_COMPRESSED:
4071 memcpy(tmp_page, page, page_size.physical());
4072 /* fall through */
4073 case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
4074 if (page_size.is_compressed())
4075 return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
4076 ulint decomp= fil_page_decompress(tmp_frame, tmp_page);
4077 if (!decomp)
4078 return false; /* decompression failed */
4079 if (decomp == srv_page_size)
4080 return false; /* the page was not compressed (invalid page type) */
4081 return !buf_page_is_corrupted(true, tmp_page, page_size, space);
4082 }
4083
4084 return !buf_page_is_corrupted(true, page, page_size, space);
4085 }
4086
find_page(const page_id_t page_id,const fil_space_t * space,byte * tmp_buf)4087 byte *recv_dblwr_t::find_page(const page_id_t page_id,
4088 const fil_space_t *space, byte *tmp_buf)
4089 {
4090 byte *result= NULL;
4091 lsn_t max_lsn= 0;
4092
4093 for (list::const_iterator i = pages.begin(); i != pages.end(); ++i)
4094 {
4095 byte *page= *i;
4096 if (page_get_page_no(page) != page_id.page_no() ||
4097 page_get_space_id(page) != page_id.space())
4098 continue;
4099 const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
4100 if (lsn <= max_lsn ||
4101 !validate_page(page_id, page, space, tmp_buf))
4102 {
4103 /* Mark processed for subsequent iterations in buf_dblwr_process() */
4104 memset(page + FIL_PAGE_LSN, 0, 8);
4105 continue;
4106 }
4107 max_lsn= lsn;
4108 result= page;
4109 }
4110
4111 return result;
4112 }
4113
4114 #ifndef DBUG_OFF
4115 /** Return string name of the redo log record type.
4116 @param[in] type record log record enum
4117 @return string name of record log record */
get_mlog_string(mlog_id_t type)4118 static const char* get_mlog_string(mlog_id_t type)
4119 {
4120 switch (type) {
4121 case MLOG_SINGLE_REC_FLAG:
4122 return("MLOG_SINGLE_REC_FLAG");
4123
4124 case MLOG_1BYTE:
4125 return("MLOG_1BYTE");
4126
4127 case MLOG_2BYTES:
4128 return("MLOG_2BYTES");
4129
4130 case MLOG_4BYTES:
4131 return("MLOG_4BYTES");
4132
4133 case MLOG_8BYTES:
4134 return("MLOG_8BYTES");
4135
4136 case MLOG_REC_INSERT:
4137 return("MLOG_REC_INSERT");
4138
4139 case MLOG_REC_CLUST_DELETE_MARK:
4140 return("MLOG_REC_CLUST_DELETE_MARK");
4141
4142 case MLOG_REC_SEC_DELETE_MARK:
4143 return("MLOG_REC_SEC_DELETE_MARK");
4144
4145 case MLOG_REC_UPDATE_IN_PLACE:
4146 return("MLOG_REC_UPDATE_IN_PLACE");
4147
4148 case MLOG_REC_DELETE:
4149 return("MLOG_REC_DELETE");
4150
4151 case MLOG_LIST_END_DELETE:
4152 return("MLOG_LIST_END_DELETE");
4153
4154 case MLOG_LIST_START_DELETE:
4155 return("MLOG_LIST_START_DELETE");
4156
4157 case MLOG_LIST_END_COPY_CREATED:
4158 return("MLOG_LIST_END_COPY_CREATED");
4159
4160 case MLOG_PAGE_REORGANIZE:
4161 return("MLOG_PAGE_REORGANIZE");
4162
4163 case MLOG_PAGE_CREATE:
4164 return("MLOG_PAGE_CREATE");
4165
4166 case MLOG_UNDO_INSERT:
4167 return("MLOG_UNDO_INSERT");
4168
4169 case MLOG_UNDO_ERASE_END:
4170 return("MLOG_UNDO_ERASE_END");
4171
4172 case MLOG_UNDO_INIT:
4173 return("MLOG_UNDO_INIT");
4174
4175 case MLOG_UNDO_HDR_REUSE:
4176 return("MLOG_UNDO_HDR_REUSE");
4177
4178 case MLOG_UNDO_HDR_CREATE:
4179 return("MLOG_UNDO_HDR_CREATE");
4180
4181 case MLOG_REC_MIN_MARK:
4182 return("MLOG_REC_MIN_MARK");
4183
4184 case MLOG_IBUF_BITMAP_INIT:
4185 return("MLOG_IBUF_BITMAP_INIT");
4186
4187 #ifdef UNIV_LOG_LSN_DEBUG
4188 case MLOG_LSN:
4189 return("MLOG_LSN");
4190 #endif /* UNIV_LOG_LSN_DEBUG */
4191
4192 case MLOG_WRITE_STRING:
4193 return("MLOG_WRITE_STRING");
4194
4195 case MLOG_MULTI_REC_END:
4196 return("MLOG_MULTI_REC_END");
4197
4198 case MLOG_DUMMY_RECORD:
4199 return("MLOG_DUMMY_RECORD");
4200
4201 case MLOG_FILE_DELETE:
4202 return("MLOG_FILE_DELETE");
4203
4204 case MLOG_COMP_REC_MIN_MARK:
4205 return("MLOG_COMP_REC_MIN_MARK");
4206
4207 case MLOG_COMP_PAGE_CREATE:
4208 return("MLOG_COMP_PAGE_CREATE");
4209
4210 case MLOG_COMP_REC_INSERT:
4211 return("MLOG_COMP_REC_INSERT");
4212
4213 case MLOG_COMP_REC_CLUST_DELETE_MARK:
4214 return("MLOG_COMP_REC_CLUST_DELETE_MARK");
4215
4216 case MLOG_COMP_REC_UPDATE_IN_PLACE:
4217 return("MLOG_COMP_REC_UPDATE_IN_PLACE");
4218
4219 case MLOG_COMP_REC_DELETE:
4220 return("MLOG_COMP_REC_DELETE");
4221
4222 case MLOG_COMP_LIST_END_DELETE:
4223 return("MLOG_COMP_LIST_END_DELETE");
4224
4225 case MLOG_COMP_LIST_START_DELETE:
4226 return("MLOG_COMP_LIST_START_DELETE");
4227
4228 case MLOG_COMP_LIST_END_COPY_CREATED:
4229 return("MLOG_COMP_LIST_END_COPY_CREATED");
4230
4231 case MLOG_COMP_PAGE_REORGANIZE:
4232 return("MLOG_COMP_PAGE_REORGANIZE");
4233
4234 case MLOG_FILE_CREATE2:
4235 return("MLOG_FILE_CREATE2");
4236
4237 case MLOG_ZIP_WRITE_NODE_PTR:
4238 return("MLOG_ZIP_WRITE_NODE_PTR");
4239
4240 case MLOG_ZIP_WRITE_BLOB_PTR:
4241 return("MLOG_ZIP_WRITE_BLOB_PTR");
4242
4243 case MLOG_ZIP_WRITE_HEADER:
4244 return("MLOG_ZIP_WRITE_HEADER");
4245
4246 case MLOG_ZIP_PAGE_COMPRESS:
4247 return("MLOG_ZIP_PAGE_COMPRESS");
4248
4249 case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
4250 return("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
4251
4252 case MLOG_ZIP_PAGE_REORGANIZE:
4253 return("MLOG_ZIP_PAGE_REORGANIZE");
4254
4255 case MLOG_ZIP_WRITE_TRX_ID:
4256 return("MLOG_ZIP_WRITE_TRX_ID");
4257
4258 case MLOG_FILE_RENAME2:
4259 return("MLOG_FILE_RENAME2");
4260
4261 case MLOG_FILE_NAME:
4262 return("MLOG_FILE_NAME");
4263
4264 case MLOG_CHECKPOINT:
4265 return("MLOG_CHECKPOINT");
4266
4267 case MLOG_PAGE_CREATE_RTREE:
4268 return("MLOG_PAGE_CREATE_RTREE");
4269
4270 case MLOG_COMP_PAGE_CREATE_RTREE:
4271 return("MLOG_COMP_PAGE_CREATE_RTREE");
4272
4273 case MLOG_INIT_FILE_PAGE2:
4274 return("MLOG_INIT_FILE_PAGE2");
4275
4276 case MLOG_INDEX_LOAD:
4277 return("MLOG_INDEX_LOAD");
4278
4279 case MLOG_TRUNCATE:
4280 return("MLOG_TRUNCATE");
4281
4282 case MLOG_FILE_WRITE_CRYPT_DATA:
4283 return("MLOG_FILE_WRITE_CRYPT_DATA");
4284 }
4285 DBUG_ASSERT(0);
4286 return(NULL);
4287 }
4288 #endif /* !DBUG_OFF */
4289