1 /*****************************************************************************
2
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2013, 2020, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /**************************************************//**
22 @file log/log0recv.cc
23 Recovery
24
25 Created 9/20/1997 Heikki Tuuri
26 *******************************************************/
27
28 #include "univ.i"
29
30 #include <map>
31 #include <string>
32 #include <my_service_manager.h>
33
34 #include "log0recv.h"
35
36 #ifdef HAVE_MY_AES_H
37 #include <my_aes.h>
38 #endif
39
40 #include "log0crypt.h"
41 #include "mem0mem.h"
42 #include "buf0buf.h"
43 #include "buf0flu.h"
44 #include "mtr0mtr.h"
45 #include "mtr0log.h"
46 #include "page0cur.h"
47 #include "page0zip.h"
48 #include "btr0btr.h"
49 #include "btr0cur.h"
50 #include "ibuf0ibuf.h"
51 #include "trx0undo.h"
52 #include "trx0rec.h"
53 #include "fil0fil.h"
54 #include "buf0rea.h"
55 #include "srv0srv.h"
56 #include "srv0start.h"
57 #include "trx0roll.h"
58 #include "row0merge.h"
59 #include "fil0pagecompress.h"
60
61 /** Log records are stored in the hash table in chunks at most of this size;
62 this must be less than srv_page_size as it is stored in the buffer pool */
63 #define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t) - REDZONE_SIZE)
64
65 /** Read-ahead area in applying log records to file pages */
66 #define RECV_READ_AHEAD_AREA 32
67
68 /** The recovery system */
69 recv_sys_t recv_sys;
70 /** TRUE when applying redo log records during crash recovery; FALSE
71 otherwise. Note that this is FALSE while a background thread is
72 rolling back incomplete transactions. */
73 volatile bool recv_recovery_on;
74
75 /** TRUE when recv_init_crash_recovery() has been called. */
76 bool recv_needed_recovery;
77 #ifdef UNIV_DEBUG
78 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
79 Protected by log_sys.mutex. */
80 bool recv_no_log_write = false;
81 #endif /* UNIV_DEBUG */
82
83 /** TRUE if buf_page_is_corrupted() should check if the log sequence
84 number (FIL_PAGE_LSN) is in the future. Initially FALSE, and set by
85 recv_recovery_from_checkpoint_start(). */
86 bool recv_lsn_checks_on;
87
88 /** If the following is TRUE, the buffer pool file pages must be invalidated
89 after recovery and no ibuf operations are allowed; this becomes TRUE if
90 the log record hash table becomes too full, and log records must be merged
91 to file pages already before the recovery is finished: in this case no
92 ibuf operations are allowed, as they could modify the pages read in the
93 buffer pool before the pages have been recovered to the up-to-date state.
94
95 TRUE means that recovery is running and no operations on the log files
96 are allowed yet: the variable name is misleading. */
97 bool recv_no_ibuf_operations;
98
99 /** The type of the previous parsed redo log record */
100 static mlog_id_t recv_previous_parsed_rec_type;
101 /** The offset of the previous parsed redo log record */
102 static ulint recv_previous_parsed_rec_offset;
103 /** The 'multi' flag of the previous parsed redo log record */
104 static ulint recv_previous_parsed_rec_is_multi;
105
106 /** The maximum lsn we see for a page during the recovery process. If this
107 is bigger than the lsn we are able to scan up to, that is an indication that
108 the recovery failed and the database may be corrupt. */
109 static lsn_t recv_max_page_lsn;
110
111 #ifdef UNIV_PFS_THREAD
112 mysql_pfs_key_t trx_rollback_clean_thread_key;
113 mysql_pfs_key_t recv_writer_thread_key;
114 #endif /* UNIV_PFS_THREAD */
115
116 /** Is recv_writer_thread active? */
117 bool recv_writer_thread_active;
118
119 #ifndef DBUG_OFF
120 /** Return string name of the redo log record type.
121 @param[in] type record log record enum
122 @return string name of record log record */
123 static const char* get_mlog_string(mlog_id_t type);
124 #endif /* !DBUG_OFF */
125
126 /** Tablespace item during recovery */
127 struct file_name_t {
128 /** Tablespace file name (MLOG_FILE_NAME) */
129 std::string name;
130 /** Tablespace object (NULL if not valid or not found) */
131 fil_space_t* space;
132
133 /** Tablespace status. */
134 enum fil_status {
135 /** Normal tablespace */
136 NORMAL,
137 /** Deleted tablespace */
138 DELETED,
139 /** Missing tablespace */
140 MISSING
141 };
142
143 /** Status of the tablespace */
144 fil_status status;
145
146 /** FSP_SIZE of tablespace */
147 ulint size = 0;
148
149 /** the log sequence number of the last observed MLOG_INDEX_LOAD
150 record for the tablespace */
151 lsn_t enable_lsn = 0;
152
153 /** Dummy flags before they have been read from the .ibd file */
154 static constexpr uint32_t initial_flags = FSP_FLAGS_FCRC32_MASK_MARKER;
155 /** FSP_SPACE_FLAGS of tablespace */
156 uint32_t flags = initial_flags;
157
158 /** Constructor */
file_name_tfile_name_t159 file_name_t(std::string name_, bool deleted)
160 : name(std::move(name_)), space(NULL),
161 status(deleted ? DELETED: NORMAL) {}
162
163 /** Report a MLOG_INDEX_LOAD operation, meaning that
164 mlog_init for any earlier LSN must be skipped.
165 @param lsn log sequence number of the MLOG_INDEX_LOAD */
mlog_index_loadfile_name_t166 void mlog_index_load(lsn_t lsn)
167 {
168 if (enable_lsn < lsn) enable_lsn = lsn;
169 }
170 };
171
172 /** Map of dirty tablespaces during recovery */
173 typedef std::map<
174 ulint,
175 file_name_t,
176 std::less<ulint>,
177 ut_allocator<std::pair<const ulint, file_name_t> > > recv_spaces_t;
178
179 static recv_spaces_t recv_spaces;
180
181 /** States of recv_addr_t */
182 enum recv_addr_state {
183 /** not yet processed */
184 RECV_NOT_PROCESSED,
185 /** not processed; the page will be reinitialized */
186 RECV_WILL_NOT_READ,
187 /** page is being read */
188 RECV_BEING_READ,
189 /** log records are being applied on the page */
190 RECV_BEING_PROCESSED,
191 /** log records have been applied on the page */
192 RECV_PROCESSED,
193 /** log records have been discarded because the tablespace
194 does not exist */
195 RECV_DISCARDED
196 };
197
198 /** Hashed page file address struct */
199 struct recv_addr_t{
200 /** recovery state of the page */
201 recv_addr_state state;
202 /** tablespace identifier */
203 unsigned space:32;
204 /** page number */
205 unsigned page_no:32;
206 /** list of log records for this page */
207 UT_LIST_BASE_NODE_T(recv_t) rec_list;
208 /** hash node in the hash bucket chain */
209 hash_node_t addr_hash;
210 };
211
212 /** Report optimized DDL operation (without redo log),
213 corresponding to MLOG_INDEX_LOAD.
214 @param[in] space_id tablespace identifier
215 */
216 void (*log_optimized_ddl_op)(ulint space_id);
217
218 /** Report an operation to create, delete, or rename a file during backup.
219 @param[in] space_id tablespace identifier
220 @param[in] flags tablespace flags (NULL if not create)
221 @param[in] name file name (not NUL-terminated)
222 @param[in] len length of name, in bytes
223 @param[in] new_name new file name (NULL if not rename)
224 @param[in] new_len length of new_name, in bytes (0 if NULL) */
225 void (*log_file_op)(ulint space_id, const byte* flags,
226 const byte* name, ulint len,
227 const byte* new_name, ulint new_len);
228
229 /** Information about initializing page contents during redo log processing */
230 class mlog_init_t
231 {
232 public:
233 /** A page initialization operation that was parsed from
234 the redo log */
235 struct init {
236 /** log sequence number of the page initialization */
237 lsn_t lsn;
238 /** Whether btr_page_create() avoided a read of the page.
239
240 At the end of the last recovery batch, ibuf_merge()
241 will invoke change buffer merge for pages that reside
242 in the buffer pool. (In the last batch, loading pages
243 would trigger change buffer merge.) */
244 bool created;
245 };
246
247 private:
248 typedef std::map<const page_id_t, init,
249 std::less<const page_id_t>,
250 ut_allocator<std::pair<const page_id_t, init> > >
251 map;
252 /** Map of page initialization operations.
253 FIXME: Merge this to recv_sys.addr_hash! */
254 map inits;
255 public:
256 /** Record that a page will be initialized by the redo log.
257 @param[in] space tablespace identifier
258 @param[in] page_no page number
259 @param[in] lsn log sequence number */
add(ulint space,ulint page_no,lsn_t lsn)260 void add(ulint space, ulint page_no, lsn_t lsn)
261 {
262 ut_ad(mutex_own(&recv_sys.mutex));
263 const init init = { lsn, false };
264 std::pair<map::iterator, bool> p = inits.insert(
265 map::value_type(page_id_t(space, page_no), init));
266 ut_ad(!p.first->second.created);
267 if (!p.second && p.first->second.lsn < init.lsn) {
268 p.first->second = init;
269 }
270 }
271
272 /** Get the last stored lsn of the page id and its respective
273 init/load operation.
274 @param[in] page_id page id
275 @param[in,out] init initialize log or load log
276 @return the latest page initialization;
277 not valid after releasing recv_sys.mutex. */
last(page_id_t page_id)278 init& last(page_id_t page_id)
279 {
280 ut_ad(mutex_own(&recv_sys.mutex));
281 return inits.find(page_id)->second;
282 }
283
284 /** At the end of each recovery batch, reset the 'created' flags. */
reset()285 void reset()
286 {
287 ut_ad(mutex_own(&recv_sys.mutex));
288 ut_ad(recv_no_ibuf_operations);
289 for (map::value_type& i : inits) {
290 i.second.created = false;
291 }
292 }
293
294 /** On the last recovery batch, merge buffered changes to those
295 pages that were initialized by buf_page_create() and still reside
296 in the buffer pool. Stale pages are not allowed in the buffer pool.
297
298 Note: When MDEV-14481 implements redo log apply in the
299 background, we will have to ensure that buf_page_get_gen()
300 will not deliver stale pages to users (pages on which the
301 change buffer was not merged yet). Normally, the change
302 buffer merge is performed on I/O completion. Maybe, add a
303 flag to buf_page_t and perform the change buffer merge on
304 the first actual access?
305 @param[in,out] mtr dummy mini-transaction */
ibuf_merge(mtr_t & mtr)306 void ibuf_merge(mtr_t& mtr)
307 {
308 ut_ad(mutex_own(&recv_sys.mutex));
309 ut_ad(!recv_no_ibuf_operations);
310 mtr.start();
311
312 for (const map::value_type& i : inits) {
313 if (!i.second.created) {
314 continue;
315 }
316 if (buf_block_t* block = buf_page_get_low(
317 i.first, 0, RW_X_LATCH, NULL,
318 BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
319 &mtr, NULL)) {
320 mutex_exit(&recv_sys.mutex);
321 ibuf_merge_or_delete_for_page(
322 block, i.first,
323 block->zip_size());
324 mtr.commit();
325 mtr.start();
326 mutex_enter(&recv_sys.mutex);
327 }
328 }
329
330 mtr.commit();
331 }
332
333 /** Clear the data structure */
clear()334 void clear() { inits.clear(); }
335 };
336
337 static mlog_init_t mlog_init;
338
339 /** Process a MLOG_CREATE2 record that indicates that a tablespace
340 is being shrunk in size.
341 @param[in] space_id tablespace identifier
342 @param[in] pages trimmed size of the file, in pages
343 @param[in] lsn log sequence number of the operation */
recv_addr_trim(ulint space_id,unsigned pages,lsn_t lsn)344 static void recv_addr_trim(ulint space_id, unsigned pages, lsn_t lsn)
345 {
346 DBUG_ENTER("recv_addr_trim");
347 DBUG_LOG("ib_log",
348 "discarding log beyond end of tablespace "
349 << page_id_t(space_id, pages) << " before LSN " << lsn);
350 ut_ad(mutex_own(&recv_sys.mutex));
351 for (ulint i = recv_sys.addr_hash->n_cells; i--; ) {
352 hash_cell_t* const cell = hash_get_nth_cell(
353 recv_sys.addr_hash, i);
354 for (recv_addr_t* addr = static_cast<recv_addr_t*>(cell->node),
355 *next;
356 addr; addr = next) {
357 next = static_cast<recv_addr_t*>(addr->addr_hash);
358
359 if (addr->space != space_id || addr->page_no < pages) {
360 continue;
361 }
362
363 for (recv_t* recv = UT_LIST_GET_FIRST(addr->rec_list);
364 recv; ) {
365 recv_t* n = UT_LIST_GET_NEXT(rec_list, recv);
366 if (recv->start_lsn < lsn) {
367 DBUG_PRINT("ib_log",
368 ("Discarding %s for"
369 " page %u:%u at " LSN_PF,
370 get_mlog_string(
371 recv->type),
372 addr->space, addr->page_no,
373 recv->start_lsn));
374 UT_LIST_REMOVE(addr->rec_list, recv);
375 }
376 recv = n;
377 }
378 }
379 }
380 if (fil_space_t* space = fil_space_get(space_id)) {
381 ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
382 fil_node_t* file = UT_LIST_GET_FIRST(space->chain);
383 ut_ad(file->is_open());
384 os_file_truncate(file->name, file->handle,
385 os_offset_t(pages) << srv_page_size_shift,
386 true);
387 }
388 DBUG_VOID_RETURN;
389 }
390
391 /** Process a file name from a MLOG_FILE_* record.
392 @param[in,out] name file name
393 @param[in] len length of the file name
394 @param[in] space_id the tablespace ID
395 @param[in] deleted whether this is a MLOG_FILE_DELETE record */
396 static
397 void
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)398 fil_name_process(
399 char* name,
400 ulint len,
401 ulint space_id,
402 bool deleted)
403 {
404 if (srv_operation == SRV_OPERATION_BACKUP) {
405 return;
406 }
407
408 ut_ad(srv_operation == SRV_OPERATION_NORMAL
409 || is_mariabackup_restore_or_export());
410
411 /* We will also insert space=NULL into the map, so that
412 further checks can ensure that a MLOG_FILE_NAME record was
413 scanned before applying any page records for the space_id. */
414
415 os_normalize_path(name);
416 file_name_t fname(std::string(name, len - 1), deleted);
417 std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.insert(
418 std::make_pair(space_id, fname));
419 ut_ad(p.first->first == space_id);
420
421 file_name_t& f = p.first->second;
422
423 if (deleted) {
424 /* Got MLOG_FILE_DELETE */
425
426 if (!p.second && f.status != file_name_t::DELETED) {
427 f.status = file_name_t::DELETED;
428 if (f.space != NULL) {
429 fil_space_free(space_id, false);
430 f.space = NULL;
431 }
432 }
433
434 ut_ad(f.space == NULL);
435 } else if (p.second // the first MLOG_FILE_NAME or MLOG_FILE_RENAME2
436 || f.name != fname.name) {
437 fil_space_t* space;
438
439 /* Check if the tablespace file exists and contains
440 the space_id. If not, ignore the file after displaying
441 a note. Abort if there are multiple files with the
442 same space_id. */
443 switch (fil_ibd_load(space_id, name, space)) {
444 case FIL_LOAD_OK:
445 ut_ad(space != NULL);
446
447 if (!f.space) {
448 if (f.size
449 || f.flags != f.initial_flags) {
450 fil_space_set_recv_size_and_flags(
451 space->id, f.size, f.flags);
452 }
453
454 f.space = space;
455 goto same_space;
456 } else if (f.space == space) {
457 same_space:
458 f.name = fname.name;
459 f.status = file_name_t::NORMAL;
460 } else {
461 ib::error() << "Tablespace " << space_id
462 << " has been found in two places: '"
463 << f.name << "' and '" << name << "'."
464 " You must delete one of them.";
465 recv_sys.found_corrupt_fs = true;
466 }
467 break;
468
469 case FIL_LOAD_ID_CHANGED:
470 ut_ad(space == NULL);
471 break;
472
473 case FIL_LOAD_NOT_FOUND:
474 /* No matching tablespace was found; maybe it
475 was renamed, and we will find a subsequent
476 MLOG_FILE_* record. */
477 ut_ad(space == NULL);
478
479 if (srv_force_recovery) {
480 /* Without innodb_force_recovery,
481 missing tablespaces will only be
482 reported in
483 recv_init_crash_recovery_spaces().
484 Enable some more diagnostics when
485 forcing recovery. */
486
487 ib::info()
488 << "At LSN: " << recv_sys.recovered_lsn
489 << ": unable to open file " << name
490 << " for tablespace " << space_id;
491 }
492 break;
493
494 case FIL_LOAD_INVALID:
495 ut_ad(space == NULL);
496 if (srv_force_recovery == 0) {
497 ib::warn() << "We do not continue the crash"
498 " recovery, because the table may"
499 " become corrupt if we cannot apply"
500 " the log records in the InnoDB log to"
501 " it. To fix the problem and start"
502 " mysqld:";
503 ib::info() << "1) If there is a permission"
504 " problem in the file and mysqld"
505 " cannot open the file, you should"
506 " modify the permissions.";
507 ib::info() << "2) If the tablespace is not"
508 " needed, or you can restore an older"
509 " version from a backup, then you can"
510 " remove the .ibd file, and use"
511 " --innodb_force_recovery=1 to force"
512 " startup without this file.";
513 ib::info() << "3) If the file system or the"
514 " disk is broken, and you cannot"
515 " remove the .ibd file, you can set"
516 " --innodb_force_recovery.";
517 recv_sys.found_corrupt_fs = true;
518 break;
519 }
520
521 ib::info() << "innodb_force_recovery was set to "
522 << srv_force_recovery << ". Continuing crash"
523 " recovery even though we cannot access the"
524 " files for tablespace " << space_id << ".";
525 break;
526 }
527 }
528 }
529
530 /** Parse or process a MLOG_FILE_* record.
531 @param[in] ptr redo log record
532 @param[in] end end of the redo log buffer
533 @param[in] page_id first page number in the file
534 @param[in] type MLOG_FILE_NAME or MLOG_FILE_DELETE
535 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
536 @param[in] apply whether to apply the record
537 @return pointer to next redo log record
538 @retval NULL if this log record was truncated */
539 static
540 byte*
fil_name_parse(byte * ptr,const byte * end,const page_id_t page_id,mlog_id_t type,bool apply)541 fil_name_parse(
542 byte* ptr,
543 const byte* end,
544 const page_id_t page_id,
545 mlog_id_t type,
546 bool apply)
547 {
548 if (type == MLOG_FILE_CREATE2) {
549 if (end < ptr + 4) {
550 return(NULL);
551 }
552 ptr += 4;
553 }
554
555 if (end < ptr + 2) {
556 return(NULL);
557 }
558
559 ulint len = mach_read_from_2(ptr);
560 ptr += 2;
561 if (end < ptr + len) {
562 return(NULL);
563 }
564
565 /* MLOG_FILE_* records should only be written for
566 user-created tablespaces. The name must be long enough
567 and end in .ibd. */
568 bool corrupt = is_predefined_tablespace(page_id.space())
569 || len < sizeof "/a.ibd\0"
570 || (!page_id.page_no() != !memcmp(ptr + len - 5, DOT_IBD, 5));
571
572 if (!corrupt && !memchr(ptr, OS_PATH_SEPARATOR, len)) {
573 if (byte* c = static_cast<byte*>
574 (memchr(ptr, OS_PATH_SEPARATOR_ALT, len))) {
575 ut_ad(c >= ptr);
576 ut_ad(c < ptr + len);
577 do {
578 *c = OS_PATH_SEPARATOR;
579 } while ((c = static_cast<byte*>
580 (memchr(ptr, OS_PATH_SEPARATOR_ALT,
581 len - ulint(c - ptr)))) != NULL);
582 } else {
583 corrupt = true;
584 }
585 }
586
587 byte* end_ptr = ptr + len;
588
589 switch (type) {
590 default:
591 ut_ad(0); // the caller checked this
592 /* fall through */
593 case MLOG_FILE_NAME:
594 if (UNIV_UNLIKELY(corrupt)) {
595 ib::error() << "MLOG_FILE_NAME incorrect:" << ptr;
596 recv_sys.found_corrupt_log = true;
597 break;
598 }
599
600 fil_name_process(
601 reinterpret_cast<char*>(ptr), len, page_id.space(),
602 false);
603 break;
604 case MLOG_FILE_DELETE:
605 if (UNIV_UNLIKELY(corrupt)) {
606 ib::error() << "MLOG_FILE_DELETE incorrect:" << ptr;
607 recv_sys.found_corrupt_log = true;
608 break;
609 }
610
611 fil_name_process(reinterpret_cast<char*>(ptr), len,
612 page_id.space(), true);
613 /* fall through */
614 case MLOG_FILE_CREATE2:
615 if (page_id.page_no()) {
616 ut_ad(page_id.page_no()
617 == SRV_UNDO_TABLESPACE_SIZE_IN_PAGES);
618 ut_a(srv_is_undo_tablespace(page_id.space()));
619 compile_time_assert(
620 UT_ARR_SIZE(recv_sys.truncated_undo_spaces)
621 == TRX_SYS_MAX_UNDO_SPACES);
622 recv_sys_t::trunc& t = recv_sys.truncated_undo_spaces[
623 page_id.space() - srv_undo_space_id_start];
624 t.lsn = recv_sys.recovered_lsn;
625 t.pages = uint32_t(page_id.page_no());
626 } else if (log_file_op) {
627 log_file_op(page_id.space(),
628 type == MLOG_FILE_CREATE2 ? ptr - 4 : NULL,
629 ptr, len, NULL, 0);
630 }
631 break;
632 case MLOG_FILE_RENAME2:
633 if (UNIV_UNLIKELY(corrupt)) {
634 ib::error() << "MLOG_FILE_RENAME2 incorrect:" << ptr;
635 recv_sys.found_corrupt_log = true;
636 }
637
638 /* The new name follows the old name. */
639 byte* new_name = end_ptr + 2;
640 if (end < new_name) {
641 return(NULL);
642 }
643
644 ulint new_len = mach_read_from_2(end_ptr);
645
646 if (end < end_ptr + 2 + new_len) {
647 return(NULL);
648 }
649
650 end_ptr += 2 + new_len;
651
652 corrupt = corrupt
653 || new_len < sizeof "/a.ibd\0"
654 || memcmp(new_name + new_len - 5, DOT_IBD, 5) != 0;
655
656 if (!corrupt && !memchr(new_name, OS_PATH_SEPARATOR, new_len)) {
657 if (byte* c = static_cast<byte*>
658 (memchr(new_name, OS_PATH_SEPARATOR_ALT,
659 new_len))) {
660 ut_ad(c >= new_name);
661 ut_ad(c < new_name + new_len);
662 do {
663 *c = OS_PATH_SEPARATOR;
664 } while ((c = static_cast<byte*>
665 (memchr(ptr, OS_PATH_SEPARATOR_ALT,
666 new_len
667 - ulint(c - new_name))))
668 != NULL);
669 } else {
670 corrupt = true;
671 }
672 }
673
674 if (UNIV_UNLIKELY(corrupt)) {
675 ib::error() << "MLOG_FILE_RENAME2 new_name incorrect:" << ptr
676 << " new_name: " << new_name;
677 recv_sys.found_corrupt_log = true;
678 break;
679 }
680
681 fil_name_process(
682 reinterpret_cast<char*>(ptr), len,
683 page_id.space(), false);
684 fil_name_process(
685 reinterpret_cast<char*>(new_name), new_len,
686 page_id.space(), false);
687
688 if (log_file_op) {
689 log_file_op(page_id.space(), NULL,
690 ptr, len, new_name, new_len);
691 }
692
693 if (!apply) {
694 break;
695 }
696 if (!fil_op_replay_rename(
697 page_id.space(), page_id.page_no(),
698 reinterpret_cast<const char*>(ptr),
699 reinterpret_cast<const char*>(new_name))) {
700 recv_sys.found_corrupt_fs = true;
701 }
702 }
703
704 return(end_ptr);
705 }
706
707 /** Clean up after recv_sys_t::create() */
close()708 void recv_sys_t::close()
709 {
710 ut_ad(this == &recv_sys);
711 ut_ad(!recv_writer_thread_active);
712
713 if (is_initialised()) {
714 dblwr.pages.clear();
715
716 if (addr_hash) {
717 hash_table_free(addr_hash);
718 addr_hash = NULL;
719 }
720
721 if (heap) {
722 mem_heap_free(heap);
723 heap = NULL;
724 }
725
726 if (flush_start) {
727 os_event_destroy(flush_start);
728 }
729
730 if (flush_end) {
731 os_event_destroy(flush_end);
732 }
733
734 if (buf) {
735 ut_free_dodump(buf, buf_size);
736 buf = NULL;
737 }
738
739 buf_size = 0;
740 mutex_free(&writer_mutex);
741 mutex_free(&mutex);
742 }
743
744 recv_spaces.clear();
745 mlog_init.clear();
746 }
747
748 /************************************************************
749 Reset the state of the recovery system variables. */
750 void
recv_sys_var_init(void)751 recv_sys_var_init(void)
752 /*===================*/
753 {
754 recv_recovery_on = false;
755 recv_needed_recovery = false;
756 recv_lsn_checks_on = false;
757 recv_no_ibuf_operations = false;
758 recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
759 recv_previous_parsed_rec_offset = 0;
760 recv_previous_parsed_rec_is_multi = 0;
761 recv_max_page_lsn = 0;
762 }
763
764 /******************************************************************//**
765 recv_writer thread tasked with flushing dirty pages from the buffer
766 pools.
767 @return a dummy parameter */
768 extern "C"
769 os_thread_ret_t
DECLARE_THREAD(recv_writer_thread)770 DECLARE_THREAD(recv_writer_thread)(
771 /*===============================*/
772 void* arg MY_ATTRIBUTE((unused)))
773 /*!< in: a dummy parameter required by
774 os_thread_create */
775 {
776 my_thread_init();
777 ut_ad(!srv_read_only_mode);
778
779 #ifdef UNIV_PFS_THREAD
780 pfs_register_thread(recv_writer_thread_key);
781 #endif /* UNIV_PFS_THREAD */
782
783 #ifdef UNIV_DEBUG_THREAD_CREATION
784 ib::info() << "recv_writer thread running, id "
785 << os_thread_pf(os_thread_get_curr_id());
786 #endif /* UNIV_DEBUG_THREAD_CREATION */
787
788 while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
789
790 /* Wait till we get a signal to clean the LRU list.
791 Bounded by max wait time of 100ms. */
792 int64_t sig_count = os_event_reset(buf_flush_event);
793 os_event_wait_time_low(buf_flush_event, 100000, sig_count);
794
795 mutex_enter(&recv_sys.writer_mutex);
796
797 if (!recv_recovery_is_on()) {
798 mutex_exit(&recv_sys.writer_mutex);
799 break;
800 }
801
802 /* Flush pages from end of LRU if required */
803 os_event_reset(recv_sys.flush_end);
804 recv_sys.flush_type = BUF_FLUSH_LRU;
805 os_event_set(recv_sys.flush_start);
806 os_event_wait(recv_sys.flush_end);
807
808 mutex_exit(&recv_sys.writer_mutex);
809 }
810
811 recv_writer_thread_active = false;
812
813 my_thread_end();
814 /* We count the number of threads in os_thread_exit().
815 A created thread should always use that to exit and not
816 use return() to exit. */
817 os_thread_exit();
818
819 OS_THREAD_DUMMY_RETURN;
820 }
821
822 /** Initialize the redo log recovery subsystem. */
create()823 void recv_sys_t::create()
824 {
825 ut_ad(this == &recv_sys);
826 ut_ad(!is_initialised());
827 ut_ad(!flush_start);
828 ut_ad(!flush_end);
829 mutex_create(LATCH_ID_RECV_SYS, &mutex);
830 mutex_create(LATCH_ID_RECV_WRITER, &writer_mutex);
831
832 heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
833
834 if (!srv_read_only_mode) {
835 flush_start = os_event_create(0);
836 flush_end = os_event_create(0);
837 }
838
839 flush_type = BUF_FLUSH_LRU;
840 apply_log_recs = false;
841 apply_batch_on = false;
842
843 buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
844 buf_size = RECV_PARSING_BUF_SIZE;
845 len = 0;
846 parse_start_lsn = 0;
847 scanned_lsn = 0;
848 scanned_checkpoint_no = 0;
849 recovered_offset = 0;
850 recovered_lsn = 0;
851 found_corrupt_log = false;
852 found_corrupt_fs = false;
853 mlog_checkpoint_lsn = 0;
854
855 addr_hash = hash_create(buf_pool_get_curr_size() / 512);
856 n_addrs = 0;
857 progress_time = time(NULL);
858 recv_max_page_lsn = 0;
859
860 memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
861 last_stored_lsn = 0;
862 }
863
864 /** Empty a fully processed set of stored redo log records. */
empty()865 inline void recv_sys_t::empty()
866 {
867 ut_ad(mutex_own(&mutex));
868 ut_a(n_addrs == 0);
869
870 hash_table_free(addr_hash);
871 mem_heap_empty(heap);
872
873 addr_hash = hash_create(buf_pool_get_curr_size() / 512);
874 }
875
876 /** Free most recovery data structures. */
debug_free()877 void recv_sys_t::debug_free()
878 {
879 ut_ad(this == &recv_sys);
880 ut_ad(is_initialised());
881 mutex_enter(&mutex);
882
883 hash_table_free(addr_hash);
884 mem_heap_free(heap);
885 ut_free_dodump(buf, buf_size);
886
887 buf = NULL;
888 heap = NULL;
889 addr_hash = NULL;
890
891 /* wake page cleaner up to progress */
892 if (!srv_read_only_mode) {
893 ut_ad(!recv_recovery_is_on());
894 ut_ad(!recv_writer_thread_active);
895 os_event_reset(buf_flush_event);
896 os_event_set(flush_start);
897 }
898
899 mutex_exit(&mutex);
900 }
901
902 /** Read a log segment to log_sys.buf.
903 @param[in,out] start_lsn in: read area start,
904 out: the last read valid lsn
905 @param[in] end_lsn read area end
906 @return whether no invalid blocks (e.g checksum mismatch) were found */
read_log_seg(lsn_t * start_lsn,lsn_t end_lsn)907 bool log_t::files::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
908 {
909 ulint len;
910 bool success = true;
911 ut_ad(log_sys.mutex.is_owned());
912 ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE));
913 ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE));
914 byte* buf = log_sys.buf;
915 loop:
916 lsn_t source_offset = calc_lsn_offset(*start_lsn);
917
918 ut_a(end_lsn - *start_lsn <= ULINT_MAX);
919 len = (ulint) (end_lsn - *start_lsn);
920
921 ut_ad(len != 0);
922
923 const bool at_eof = (source_offset % file_size) + len > file_size;
924 if (at_eof) {
925 /* If the above condition is true then len (which is ulint)
926 is > the expression below, so the typecast is ok */
927 len = ulint(file_size - (source_offset % file_size));
928 }
929
930 log_sys.n_log_ios++;
931
932 MONITOR_INC(MONITOR_LOG_IO);
933
934 ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX);
935
936 const ulint page_no = ulint(source_offset >> srv_page_size_shift);
937
938 fil_io(IORequestLogRead, true,
939 page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
940 0,
941 ulint(source_offset & (srv_page_size - 1)),
942 len, buf, NULL);
943
944 for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE,
945 buf += OS_FILE_LOG_BLOCK_SIZE,
946 (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) {
947 const ulint block_number = log_block_get_hdr_no(buf);
948
949 if (block_number != log_block_convert_lsn_to_no(*start_lsn)) {
950 /* Garbage or an incompletely written log block.
951 We will not report any error, because this can
952 happen when InnoDB was killed while it was
953 writing redo log. We simply treat this as an
954 abrupt end of the redo log. */
955 fail:
956 end_lsn = *start_lsn;
957 success = false;
958 break;
959 }
960
961 if (innodb_log_checksums || is_encrypted()) {
962 ulint crc = log_block_calc_checksum_crc32(buf);
963 ulint cksum = log_block_get_checksum(buf);
964
965 DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
966 static int block_counter;
967 if (block_counter++ == 0) {
968 cksum = crc + 1;
969 }
970 });
971
972 DBUG_EXECUTE_IF("log_checksum_mismatch", { cksum = crc + 1; });
973
974 if (crc != cksum) {
975 ib::error_or_warn(srv_operation != SRV_OPERATION_BACKUP)
976 << "Invalid log block checksum."
977 << " block: " << block_number
978 << " checkpoint no: "
979 << log_block_get_checkpoint_no(buf)
980 << " expected: " << crc
981 << " found: " << cksum;
982 goto fail;
983 }
984
985 if (is_encrypted()
986 && !log_crypt(buf, *start_lsn,
987 OS_FILE_LOG_BLOCK_SIZE,
988 LOG_DECRYPT)) {
989 goto fail;
990 }
991 }
992
993 ulint dl = log_block_get_data_len(buf);
994 if (dl < LOG_BLOCK_HDR_SIZE
995 || (dl != OS_FILE_LOG_BLOCK_SIZE
996 && dl > log_sys.trailer_offset())) {
997 recv_sys.found_corrupt_log = true;
998 goto fail;
999 }
1000 }
1001
1002 if (recv_sys.report(time(NULL))) {
1003 ib::info() << "Read redo log up to LSN=" << *start_lsn;
1004 service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
1005 "Read redo log up to LSN=" LSN_PF,
1006 *start_lsn);
1007 }
1008
1009 if (*start_lsn != end_lsn) {
1010 goto loop;
1011 }
1012
1013 return(success);
1014 }
1015
1016
1017
1018 /********************************************************//**
1019 Copies a log segment from the most up-to-date log group to the other log
1020 groups, so that they all contain the latest log data. Also writes the info
1021 about the latest checkpoint to the groups, and inits the fields in the group
1022 memory structs to up-to-date values. */
1023 static
1024 void
recv_synchronize_groups()1025 recv_synchronize_groups()
1026 {
1027 const lsn_t recovered_lsn = recv_sys.recovered_lsn;
1028
1029 /* Read the last recovered log block to the recovery system buffer:
1030 the block is always incomplete */
1031
1032 lsn_t start_lsn = ut_uint64_align_down(recovered_lsn,
1033 OS_FILE_LOG_BLOCK_SIZE);
1034 log_sys.log.read_log_seg(&start_lsn,
1035 start_lsn + OS_FILE_LOG_BLOCK_SIZE);
1036 log_sys.log.set_fields(recovered_lsn);
1037
1038 /* Copy the checkpoint info to the log; remember that we have
1039 incremented checkpoint_no by one, and the info will not be written
1040 over the max checkpoint info, thus making the preservation of max
1041 checkpoint info on disk certain */
1042
1043 if (!srv_read_only_mode) {
1044 log_write_checkpoint_info(true, 0);
1045 log_mutex_enter();
1046 }
1047 }
1048
1049 /** Check the consistency of a log header block.
1050 @param[in] log header block
1051 @return true if ok */
1052 static
1053 bool
recv_check_log_header_checksum(const byte * buf)1054 recv_check_log_header_checksum(
1055 const byte* buf)
1056 {
1057 return(log_block_get_checksum(buf)
1058 == log_block_calc_checksum_crc32(buf));
1059 }
1060
1061 /** Find the latest checkpoint in the format-0 log header.
1062 @param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1063 @return error code or DB_SUCCESS */
1064 static MY_ATTRIBUTE((warn_unused_result))
1065 dberr_t
recv_find_max_checkpoint_0(ulint * max_field)1066 recv_find_max_checkpoint_0(ulint* max_field)
1067 {
1068 ib_uint64_t max_no = 0;
1069 ib_uint64_t checkpoint_no;
1070 byte* buf = log_sys.checkpoint_buf;
1071
1072 ut_ad(log_sys.log.format == 0);
1073
1074 /** Offset of the first checkpoint checksum */
1075 static const uint CHECKSUM_1 = 288;
1076 /** Offset of the second checkpoint checksum */
1077 static const uint CHECKSUM_2 = CHECKSUM_1 + 4;
1078 /** Most significant bits of the checkpoint offset */
1079 static const uint OFFSET_HIGH32 = CHECKSUM_2 + 12;
1080 /** Least significant bits of the checkpoint offset */
1081 static const uint OFFSET_LOW32 = 16;
1082
1083 bool found = false;
1084
1085 for (ulint field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1086 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1087 log_header_read(field);
1088
1089 if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1))
1090 != mach_read_from_4(buf + CHECKSUM_1)
1091 || static_cast<uint32_t>(
1092 ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
1093 CHECKSUM_2 - LOG_CHECKPOINT_LSN))
1094 != mach_read_from_4(buf + CHECKSUM_2)) {
1095 DBUG_LOG("ib_log",
1096 "invalid pre-10.2.2 checkpoint " << field);
1097 continue;
1098 }
1099
1100 checkpoint_no = mach_read_from_8(
1101 buf + LOG_CHECKPOINT_NO);
1102
1103 if (!log_crypt_101_read_checkpoint(buf)) {
1104 ib::error() << "Decrypting checkpoint failed";
1105 continue;
1106 }
1107
1108 DBUG_PRINT("ib_log",
1109 ("checkpoint " UINT64PF " at " LSN_PF " found",
1110 checkpoint_no,
1111 mach_read_from_8(buf + LOG_CHECKPOINT_LSN)));
1112
1113 if (checkpoint_no >= max_no) {
1114 found = true;
1115 *max_field = field;
1116 max_no = checkpoint_no;
1117
1118 log_sys.log.set_lsn(mach_read_from_8(
1119 buf + LOG_CHECKPOINT_LSN));
1120 log_sys.log.set_lsn_offset(
1121 lsn_t(mach_read_from_4(buf + OFFSET_HIGH32))
1122 << 32
1123 | mach_read_from_4(buf + OFFSET_LOW32));
1124 }
1125 }
1126
1127 if (found) {
1128 return(DB_SUCCESS);
1129 }
1130
1131 ib::error() << "Upgrade after a crash is not supported."
1132 " This redo log was created before MariaDB 10.2.2,"
1133 " and we did not find a valid checkpoint."
1134 " Please follow the instructions at"
1135 " https://mariadb.com/kb/en/library/upgrading/";
1136 return(DB_ERROR);
1137 }
1138
1139 /** Determine if a pre-MySQL 5.7.9/MariaDB 10.2.2 redo log is clean.
1140 @param[in] lsn checkpoint LSN
1141 @param[in] crypt whether the log might be encrypted
1142 @return error code
1143 @retval DB_SUCCESS if the redo log is clean
1144 @retval DB_ERROR if the redo log is corrupted or dirty */
recv_log_format_0_recover(lsn_t lsn,bool crypt)1145 static dberr_t recv_log_format_0_recover(lsn_t lsn, bool crypt)
1146 {
1147 log_mutex_enter();
1148 const lsn_t source_offset = log_sys.log.calc_lsn_offset(lsn);
1149 log_mutex_exit();
1150 const ulint page_no = ulint(source_offset >> srv_page_size_shift);
1151 byte* buf = log_sys.buf;
1152
1153 static const char* NO_UPGRADE_RECOVERY_MSG =
1154 "Upgrade after a crash is not supported."
1155 " This redo log was created before MariaDB 10.2.2";
1156
1157 fil_io(IORequestLogRead, true,
1158 page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
1159 0,
1160 ulint((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1161 & (srv_page_size - 1)),
1162 OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1163
1164 if (log_block_calc_checksum_format_0(buf)
1165 != log_block_get_checksum(buf)
1166 && !log_crypt_101_read_block(buf)) {
1167 ib::error() << NO_UPGRADE_RECOVERY_MSG
1168 << ", and it appears corrupted.";
1169 return(DB_CORRUPTION);
1170 }
1171
1172 if (log_block_get_data_len(buf)
1173 == (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1174 } else if (crypt) {
1175 ib::error() << "Cannot decrypt log for upgrading."
1176 " The encrypted log was created"
1177 " before MariaDB 10.2.2.";
1178 return DB_ERROR;
1179 } else {
1180 ib::error() << NO_UPGRADE_RECOVERY_MSG << ".";
1181 return(DB_ERROR);
1182 }
1183
1184 /* Mark the redo log for upgrading. */
1185 srv_log_file_size = 0;
1186 recv_sys.parse_start_lsn = recv_sys.recovered_lsn
1187 = recv_sys.scanned_lsn
1188 = recv_sys.mlog_checkpoint_lsn = lsn;
1189 log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1190 = log_sys.lsn = log_sys.write_lsn
1191 = log_sys.current_flush_lsn = log_sys.flushed_to_disk_lsn
1192 = lsn;
1193 log_sys.next_checkpoint_no = 0;
1194 return(DB_SUCCESS);
1195 }
1196
1197 /** Find the latest checkpoint in the log header.
1198 @param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1199 @return error code or DB_SUCCESS */
1200 dberr_t
recv_find_max_checkpoint(ulint * max_field)1201 recv_find_max_checkpoint(ulint* max_field)
1202 {
1203 ib_uint64_t max_no;
1204 ib_uint64_t checkpoint_no;
1205 ulint field;
1206 byte* buf;
1207
1208 max_no = 0;
1209 *max_field = 0;
1210
1211 buf = log_sys.checkpoint_buf;
1212
1213 log_header_read(0);
1214 /* Check the header page checksum. There was no
1215 checksum in the first redo log format (version 0). */
1216 log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1217 log_sys.log.subformat = log_sys.log.format != log_t::FORMAT_3_23
1218 ? mach_read_from_4(buf + LOG_HEADER_SUBFORMAT)
1219 : 0;
1220 if (log_sys.log.format != log_t::FORMAT_3_23
1221 && !recv_check_log_header_checksum(buf)) {
1222 ib::error() << "Invalid redo log header checksum.";
1223 return(DB_CORRUPTION);
1224 }
1225
1226 char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
1227
1228 memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator);
1229 /* Ensure that the string is NUL-terminated. */
1230 creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0;
1231
1232 switch (log_sys.log.format) {
1233 case log_t::FORMAT_3_23:
1234 return(recv_find_max_checkpoint_0(max_field));
1235 case log_t::FORMAT_10_2:
1236 case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED:
1237 case log_t::FORMAT_10_3:
1238 case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED:
1239 case log_t::FORMAT_10_4:
1240 case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED:
1241 break;
1242 default:
1243 ib::error() << "Unsupported redo log format."
1244 " The redo log was created with " << creator << ".";
1245 return(DB_ERROR);
1246 }
1247
1248 for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1249 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1250
1251 log_header_read(field);
1252
1253 const ulint crc32 = log_block_calc_checksum_crc32(buf);
1254 const ulint cksum = log_block_get_checksum(buf);
1255
1256 if (crc32 != cksum) {
1257 DBUG_PRINT("ib_log",
1258 ("invalid checkpoint,"
1259 " at " ULINTPF
1260 ", checksum " ULINTPFx
1261 " expected " ULINTPFx,
1262 field, cksum, crc32));
1263 continue;
1264 }
1265
1266 if (log_sys.is_encrypted()
1267 && !log_crypt_read_checkpoint_buf(buf)) {
1268 ib::error() << "Reading checkpoint"
1269 " encryption info failed.";
1270 continue;
1271 }
1272
1273 checkpoint_no = mach_read_from_8(
1274 buf + LOG_CHECKPOINT_NO);
1275
1276 DBUG_PRINT("ib_log",
1277 ("checkpoint " UINT64PF " at " LSN_PF " found",
1278 checkpoint_no, mach_read_from_8(
1279 buf + LOG_CHECKPOINT_LSN)));
1280
1281 if (checkpoint_no >= max_no) {
1282 *max_field = field;
1283 max_no = checkpoint_no;
1284 log_sys.log.set_lsn(mach_read_from_8(
1285 buf + LOG_CHECKPOINT_LSN));
1286 log_sys.log.set_lsn_offset(mach_read_from_8(
1287 buf + LOG_CHECKPOINT_OFFSET));
1288 log_sys.next_checkpoint_no = checkpoint_no;
1289 }
1290 }
1291
1292 if (*max_field == 0) {
1293 /* Before 10.2.2, we could get here during database
1294 initialization if we created an ib_logfile0 file that
1295 was filled with zeroes, and were killed. After
1296 10.2.2, we would reject such a file already earlier,
1297 when checking the file header. */
1298 ib::error() << "No valid checkpoint found"
1299 " (corrupted redo log)."
1300 " You can try --innodb-force-recovery=6"
1301 " as a last resort.";
1302 return(DB_ERROR);
1303 }
1304
1305 return(DB_SUCCESS);
1306 }
1307
1308 /** Try to parse a single log record body and also applies it if
1309 specified.
1310 @param[in] type redo log entry type
1311 @param[in] ptr redo log record body
1312 @param[in] end_ptr end of buffer
1313 @param[in] page_id page identifier
1314 @param[in] apply whether to apply the record
1315 @param[in,out] block buffer block, or NULL if
1316 a page log record should not be applied
1317 or if it is a MLOG_FILE_ operation
1318 @param[in,out] mtr mini-transaction, or NULL if
1319 a page log record should not be applied
1320 @return log record end, NULL if not a complete record */
1321 static
1322 byte*
recv_parse_or_apply_log_rec_body(mlog_id_t type,byte * ptr,byte * end_ptr,const page_id_t page_id,bool apply,buf_block_t * block,mtr_t * mtr)1323 recv_parse_or_apply_log_rec_body(
1324 mlog_id_t type,
1325 byte* ptr,
1326 byte* end_ptr,
1327 const page_id_t page_id,
1328 bool apply,
1329 buf_block_t* block,
1330 mtr_t* mtr)
1331 {
1332 ut_ad(!block == !mtr);
1333 ut_ad(!apply || recv_sys.mlog_checkpoint_lsn);
1334
1335 switch (type) {
1336 case MLOG_FILE_NAME:
1337 case MLOG_FILE_DELETE:
1338 case MLOG_FILE_CREATE2:
1339 case MLOG_FILE_RENAME2:
1340 ut_ad(block == NULL);
1341 /* Collect the file names when parsing the log,
1342 before applying any log records. */
1343 return fil_name_parse(ptr, end_ptr, page_id, type, apply);
1344 case MLOG_INDEX_LOAD:
1345 if (end_ptr < ptr + 8) {
1346 return(NULL);
1347 }
1348 return(ptr + 8);
1349 case MLOG_TRUNCATE:
1350 ib::error() << "Cannot crash-upgrade from "
1351 "old-style TRUNCATE TABLE";
1352 recv_sys.found_corrupt_log = true;
1353 return NULL;
1354 default:
1355 break;
1356 }
1357
1358 dict_index_t* index = NULL;
1359 page_t* page;
1360 page_zip_des_t* page_zip;
1361 #ifdef UNIV_DEBUG
1362 ulint page_type;
1363 #endif /* UNIV_DEBUG */
1364
1365 if (block) {
1366 /* Applying a page log record. */
1367 ut_ad(apply);
1368 page = block->frame;
1369 page_zip = buf_block_get_page_zip(block);
1370 ut_d(page_type = fil_page_get_type(page));
1371 } else if (apply
1372 && !is_predefined_tablespace(page_id.space())
1373 && recv_spaces.find(page_id.space()) == recv_spaces.end()) {
1374 if (recv_sys.recovered_lsn < recv_sys.mlog_checkpoint_lsn) {
1375 /* We have not seen all records between the
1376 checkpoint and MLOG_CHECKPOINT. There should be
1377 a MLOG_FILE_DELETE for this tablespace later. */
1378 recv_spaces.insert(
1379 std::make_pair(page_id.space(),
1380 file_name_t("", false)));
1381 goto parse_log;
1382 }
1383
1384 ib::error() << "Missing MLOG_FILE_NAME or MLOG_FILE_DELETE"
1385 " for redo log record " << type << page_id << " at "
1386 << recv_sys.recovered_lsn << ".";
1387 recv_sys.found_corrupt_log = true;
1388 return(NULL);
1389 } else {
1390 parse_log:
1391 /* Parsing a page log record. */
1392 page = NULL;
1393 page_zip = NULL;
1394 ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1395 }
1396
1397 const byte* old_ptr = ptr;
1398
1399 switch (type) {
1400 #ifdef UNIV_LOG_LSN_DEBUG
1401 case MLOG_LSN:
1402 /* The LSN is checked in recv_parse_log_rec(). */
1403 break;
1404 #endif /* UNIV_LOG_LSN_DEBUG */
1405 case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
1406 case MLOG_MEMSET:
1407 #ifdef UNIV_DEBUG
1408 if (page && page_type == FIL_PAGE_TYPE_ALLOCATED
1409 && end_ptr >= ptr + 2) {
1410 /* It is OK to set FIL_PAGE_TYPE and certain
1411 list node fields on an empty page. Any other
1412 write is not OK. */
1413
1414 /* NOTE: There may be bogus assertion failures for
1415 dict_hdr_create(), trx_rseg_header_create(),
1416 trx_sys_create_doublewrite_buf(), and
1417 trx_sysf_create().
1418 These are only called during database creation. */
1419 ulint offs = mach_read_from_2(ptr);
1420
1421 switch (type) {
1422 default:
1423 ut_error;
1424 case MLOG_2BYTES:
1425 /* Note that this can fail when the
1426 redo log been written with something
1427 older than InnoDB Plugin 1.0.4. */
1428 ut_ad(offs == FIL_PAGE_TYPE
1429 || srv_is_undo_tablespace(
1430 page_id.space())
1431 || offs == IBUF_TREE_SEG_HEADER
1432 + IBUF_HEADER + FSEG_HDR_OFFSET
1433 || offs == PAGE_BTR_IBUF_FREE_LIST
1434 + PAGE_HEADER + FIL_ADDR_BYTE
1435 || offs == PAGE_BTR_IBUF_FREE_LIST
1436 + PAGE_HEADER + FIL_ADDR_BYTE
1437 + FIL_ADDR_SIZE
1438 || offs == PAGE_BTR_SEG_LEAF
1439 + PAGE_HEADER + FSEG_HDR_OFFSET
1440 || offs == PAGE_BTR_SEG_TOP
1441 + PAGE_HEADER + FSEG_HDR_OFFSET
1442 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1443 + PAGE_HEADER + FIL_ADDR_BYTE
1444 + 0 /*FLST_PREV*/
1445 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1446 + PAGE_HEADER + FIL_ADDR_BYTE
1447 + FIL_ADDR_SIZE /*FLST_NEXT*/);
1448 break;
1449 case MLOG_4BYTES:
1450 /* Note that this can fail when the
1451 redo log been written with something
1452 older than InnoDB Plugin 1.0.4. */
1453 ut_ad(0
1454 /* fil_crypt_rotate_page() writes this */
1455 || offs == FIL_PAGE_SPACE_ID
1456 || srv_is_undo_tablespace(
1457 page_id.space())
1458 || offs == IBUF_TREE_SEG_HEADER
1459 + IBUF_HEADER + FSEG_HDR_SPACE
1460 || offs == IBUF_TREE_SEG_HEADER
1461 + IBUF_HEADER + FSEG_HDR_PAGE_NO
1462 || offs == PAGE_BTR_IBUF_FREE_LIST
1463 + PAGE_HEADER/* flst_init */
1464 || offs == PAGE_BTR_IBUF_FREE_LIST
1465 + PAGE_HEADER + FIL_ADDR_PAGE
1466 || offs == PAGE_BTR_IBUF_FREE_LIST
1467 + PAGE_HEADER + FIL_ADDR_PAGE
1468 + FIL_ADDR_SIZE
1469 || offs == PAGE_BTR_SEG_LEAF
1470 + PAGE_HEADER + FSEG_HDR_PAGE_NO
1471 || offs == PAGE_BTR_SEG_LEAF
1472 + PAGE_HEADER + FSEG_HDR_SPACE
1473 || offs == PAGE_BTR_SEG_TOP
1474 + PAGE_HEADER + FSEG_HDR_PAGE_NO
1475 || offs == PAGE_BTR_SEG_TOP
1476 + PAGE_HEADER + FSEG_HDR_SPACE
1477 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1478 + PAGE_HEADER + FIL_ADDR_PAGE
1479 + 0 /*FLST_PREV*/
1480 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1481 + PAGE_HEADER + FIL_ADDR_PAGE
1482 + FIL_ADDR_SIZE /*FLST_NEXT*/);
1483 break;
1484 }
1485 }
1486 #endif /* UNIV_DEBUG */
1487 ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1488 if (ptr != NULL && page != NULL
1489 && page_id.page_no() == 0 && type == MLOG_4BYTES) {
1490 ulint offs = mach_read_from_2(old_ptr);
1491 switch (offs) {
1492 fil_space_t* space;
1493 ulint val;
1494 default:
1495 break;
1496 case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1497 case FSP_HEADER_OFFSET + FSP_SIZE:
1498 case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1499 case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1500 space = fil_space_get(page_id.space());
1501 ut_a(space != NULL);
1502 val = mach_read_from_4(page + offs);
1503
1504 switch (offs) {
1505 case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1506 space->flags = val;
1507 break;
1508 case FSP_HEADER_OFFSET + FSP_SIZE:
1509 space->size_in_header = val;
1510 break;
1511 case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1512 space->free_limit = val;
1513 break;
1514 case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1515 space->free_len = val;
1516 ut_ad(val == flst_get_len(
1517 page + offs));
1518 break;
1519 }
1520 }
1521 }
1522 break;
1523 case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
1524 ut_ad(!page || fil_page_type_is_index(page_type));
1525
1526 if (NULL != (ptr = mlog_parse_index(
1527 ptr, end_ptr,
1528 type == MLOG_COMP_REC_INSERT,
1529 &index))) {
1530 ut_a(!page
1531 || (ibool)!!page_is_comp(page)
1532 == dict_table_is_comp(index->table));
1533 ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr,
1534 block, index, mtr);
1535 }
1536 break;
1537 case MLOG_REC_CLUST_DELETE_MARK: case MLOG_COMP_REC_CLUST_DELETE_MARK:
1538 ut_ad(!page || fil_page_type_is_index(page_type));
1539
1540 if (NULL != (ptr = mlog_parse_index(
1541 ptr, end_ptr,
1542 type == MLOG_COMP_REC_CLUST_DELETE_MARK,
1543 &index))) {
1544 ut_a(!page
1545 || (ibool)!!page_is_comp(page)
1546 == dict_table_is_comp(index->table));
1547 ptr = btr_cur_parse_del_mark_set_clust_rec(
1548 ptr, end_ptr, page, page_zip, index);
1549 }
1550 break;
1551 case MLOG_REC_SEC_DELETE_MARK:
1552 ut_ad(!page || fil_page_type_is_index(page_type));
1553 ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
1554 page, page_zip);
1555 break;
1556 case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
1557 ut_ad(!page || fil_page_type_is_index(page_type));
1558
1559 if (NULL != (ptr = mlog_parse_index(
1560 ptr, end_ptr,
1561 type == MLOG_COMP_REC_UPDATE_IN_PLACE,
1562 &index))) {
1563 ut_a(!page
1564 || (ibool)!!page_is_comp(page)
1565 == dict_table_is_comp(index->table));
1566 ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page,
1567 page_zip, index);
1568 }
1569 break;
1570 case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE:
1571 case MLOG_LIST_START_DELETE: case MLOG_COMP_LIST_START_DELETE:
1572 ut_ad(!page || fil_page_type_is_index(page_type));
1573
1574 if (NULL != (ptr = mlog_parse_index(
1575 ptr, end_ptr,
1576 type == MLOG_COMP_LIST_END_DELETE
1577 || type == MLOG_COMP_LIST_START_DELETE,
1578 &index))) {
1579 ut_a(!page
1580 || (ibool)!!page_is_comp(page)
1581 == dict_table_is_comp(index->table));
1582 ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
1583 block, index, mtr);
1584 }
1585 break;
1586 case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
1587 ut_ad(!page || fil_page_type_is_index(page_type));
1588
1589 if (NULL != (ptr = mlog_parse_index(
1590 ptr, end_ptr,
1591 type == MLOG_COMP_LIST_END_COPY_CREATED,
1592 &index))) {
1593 ut_a(!page
1594 || (ibool)!!page_is_comp(page)
1595 == dict_table_is_comp(index->table));
1596 ptr = page_parse_copy_rec_list_to_created_page(
1597 ptr, end_ptr, block, index, mtr);
1598 }
1599 break;
1600 case MLOG_PAGE_REORGANIZE:
1601 case MLOG_COMP_PAGE_REORGANIZE:
1602 case MLOG_ZIP_PAGE_REORGANIZE:
1603 ut_ad(!page || fil_page_type_is_index(page_type));
1604
1605 if (NULL != (ptr = mlog_parse_index(
1606 ptr, end_ptr,
1607 type != MLOG_PAGE_REORGANIZE,
1608 &index))) {
1609 ut_a(!page
1610 || (ibool)!!page_is_comp(page)
1611 == dict_table_is_comp(index->table));
1612 ptr = btr_parse_page_reorganize(
1613 ptr, end_ptr, index,
1614 type == MLOG_ZIP_PAGE_REORGANIZE,
1615 block, mtr);
1616 }
1617 break;
1618 case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
1619 /* Allow anything in page_type when creating a page. */
1620 ut_a(!page_zip);
1621 page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
1622 break;
1623 case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
1624 page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
1625 true);
1626 break;
1627 case MLOG_UNDO_INSERT:
1628 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1629 ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
1630 break;
1631 case MLOG_UNDO_ERASE_END:
1632 if (page) {
1633 ut_ad(page_type == FIL_PAGE_UNDO_LOG);
1634 trx_undo_erase_page_end(page);
1635 }
1636 break;
1637 case MLOG_UNDO_INIT:
1638 /* Allow anything in page_type when creating a page. */
1639 ptr = trx_undo_parse_page_init(ptr, end_ptr, page);
1640 break;
1641 case MLOG_UNDO_HDR_REUSE:
1642 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1643 ptr = trx_undo_parse_page_header_reuse(ptr, end_ptr, page);
1644 break;
1645 case MLOG_UNDO_HDR_CREATE:
1646 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1647 ptr = trx_undo_parse_page_header(ptr, end_ptr, page, mtr);
1648 break;
1649 case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
1650 ut_ad(!page || fil_page_type_is_index(page_type));
1651 /* On a compressed page, MLOG_COMP_REC_MIN_MARK
1652 will be followed by MLOG_COMP_REC_DELETE
1653 or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_NULL)
1654 in the same mini-transaction. */
1655 ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
1656 ptr = btr_parse_set_min_rec_mark(
1657 ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK,
1658 page, mtr);
1659 break;
1660 case MLOG_REC_DELETE: case MLOG_COMP_REC_DELETE:
1661 ut_ad(!page || fil_page_type_is_index(page_type));
1662
1663 if (NULL != (ptr = mlog_parse_index(
1664 ptr, end_ptr,
1665 type == MLOG_COMP_REC_DELETE,
1666 &index))) {
1667 ut_a(!page
1668 || (ibool)!!page_is_comp(page)
1669 == dict_table_is_comp(index->table));
1670 ptr = page_cur_parse_delete_rec(ptr, end_ptr,
1671 block, index, mtr);
1672 }
1673 break;
1674 case MLOG_IBUF_BITMAP_INIT:
1675 /* Allow anything in page_type when creating a page. */
1676 if (block) ibuf_bitmap_init_apply(block);
1677 break;
1678 case MLOG_INIT_FILE_PAGE2:
1679 /* Allow anything in page_type when creating a page. */
1680 if (block) fsp_apply_init_file_page(block);
1681 break;
1682 case MLOG_INIT_FREE_PAGE:
1683 /* The page can be zero-filled and its previous
1684 contents can be ignored. We do not write or apply
1685 this record yet. */
1686 break;
1687 case MLOG_WRITE_STRING:
1688 ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
1689 break;
1690 case MLOG_ZIP_WRITE_NODE_PTR:
1691 ut_ad(!page || fil_page_type_is_index(page_type));
1692 ptr = page_zip_parse_write_node_ptr(ptr, end_ptr,
1693 page, page_zip);
1694 break;
1695 case MLOG_ZIP_WRITE_BLOB_PTR:
1696 ut_ad(!page || fil_page_type_is_index(page_type));
1697 ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr,
1698 page, page_zip);
1699 break;
1700 case MLOG_ZIP_WRITE_HEADER:
1701 ut_ad(!page || fil_page_type_is_index(page_type));
1702 ptr = page_zip_parse_write_header(ptr, end_ptr,
1703 page, page_zip);
1704 break;
1705 case MLOG_ZIP_PAGE_COMPRESS:
1706 /* Allow anything in page_type when creating a page. */
1707 ptr = page_zip_parse_compress(ptr, end_ptr, block);
1708 break;
1709 case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
1710 if (NULL != (ptr = mlog_parse_index(
1711 ptr, end_ptr, TRUE, &index))) {
1712
1713 ut_a(!page || ((ibool)!!page_is_comp(page)
1714 == dict_table_is_comp(index->table)));
1715 ptr = page_zip_parse_compress_no_data(
1716 ptr, end_ptr, page, page_zip, index);
1717 }
1718 break;
1719 case MLOG_ZIP_WRITE_TRX_ID:
1720 /* This must be a clustered index leaf page. */
1721 ut_ad(!page || page_type == FIL_PAGE_INDEX);
1722 ptr = page_zip_parse_write_trx_id(ptr, end_ptr,
1723 page, page_zip);
1724 break;
1725 case MLOG_FILE_WRITE_CRYPT_DATA:
1726 dberr_t err;
1727 ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, &err));
1728
1729 if (err != DB_SUCCESS) {
1730 recv_sys.found_corrupt_log = TRUE;
1731 }
1732 break;
1733 default:
1734 ptr = NULL;
1735 ib::error() << "Incorrect log record type "
1736 << ib::hex(unsigned(type));
1737
1738 recv_sys.found_corrupt_log = true;
1739 }
1740
1741 if (index) {
1742 dict_table_t* table = index->table;
1743
1744 dict_mem_index_free(index);
1745 dict_mem_table_free(table);
1746 }
1747
1748 return(ptr);
1749 }
1750
1751 /*********************************************************************//**
1752 Calculates the fold value of a page file address: used in inserting or
1753 searching for a log record in the hash table.
1754 @return folded value */
1755 UNIV_INLINE
1756 ulint
recv_fold(ulint space,ulint page_no)1757 recv_fold(
1758 /*======*/
1759 ulint space, /*!< in: space */
1760 ulint page_no)/*!< in: page number */
1761 {
1762 return(ut_fold_ulint_pair(space, page_no));
1763 }
1764
1765 /*********************************************************************//**
1766 Calculates the hash value of a page file address: used in inserting or
1767 searching for a log record in the hash table.
1768 @return folded value */
1769 UNIV_INLINE
1770 ulint
recv_hash(ulint space,ulint page_no)1771 recv_hash(
1772 /*======*/
1773 ulint space, /*!< in: space */
1774 ulint page_no)/*!< in: page number */
1775 {
1776 return(hash_calc_hash(recv_fold(space, page_no), recv_sys.addr_hash));
1777 }
1778
1779 /*********************************************************************//**
1780 Gets the hashed file address struct for a page.
1781 @return file address struct, NULL if not found from the hash table */
1782 static
1783 recv_addr_t*
recv_get_fil_addr_struct(ulint space,ulint page_no)1784 recv_get_fil_addr_struct(
1785 /*=====================*/
1786 ulint space, /*!< in: space id */
1787 ulint page_no)/*!< in: page number */
1788 {
1789 ut_ad(mutex_own(&recv_sys.mutex));
1790
1791 recv_addr_t* recv_addr;
1792
1793 for (recv_addr = static_cast<recv_addr_t*>(
1794 HASH_GET_FIRST(recv_sys.addr_hash,
1795 recv_hash(space, page_no)));
1796 recv_addr != 0;
1797 recv_addr = static_cast<recv_addr_t*>(
1798 HASH_GET_NEXT(addr_hash, recv_addr))) {
1799
1800 if (recv_addr->space == space
1801 && recv_addr->page_no == page_no) {
1802
1803 return(recv_addr);
1804 }
1805 }
1806
1807 return(NULL);
1808 }
1809
1810 /** Store a redo log record for applying.
1811 @param type record type
1812 @param space tablespace identifier
1813 @param page_no page number
1814 @param body record body
1815 @param rec_end end of record
1816 @param lsn start LSN of the mini-transaction
1817 @param end_lsn end LSN of the mini-transaction */
add(mlog_id_t type,ulint space,ulint page_no,byte * body,byte * rec_end,lsn_t lsn,lsn_t end_lsn)1818 inline void recv_sys_t::add(mlog_id_t type, ulint space, ulint page_no,
1819 byte* body, byte* rec_end, lsn_t lsn,
1820 lsn_t end_lsn)
1821 {
1822 ut_ad(type != MLOG_FILE_DELETE);
1823 ut_ad(type != MLOG_FILE_CREATE2);
1824 ut_ad(type != MLOG_FILE_RENAME2);
1825 ut_ad(type != MLOG_FILE_NAME);
1826 ut_ad(type != MLOG_DUMMY_RECORD);
1827 ut_ad(type != MLOG_CHECKPOINT);
1828 ut_ad(type != MLOG_INDEX_LOAD);
1829 ut_ad(type != MLOG_TRUNCATE);
1830
1831 recv_t* recv= static_cast<recv_t*>(mem_heap_alloc(heap, sizeof *recv));
1832
1833 recv->type = type;
1834 recv->len = ulint(rec_end - body);
1835 recv->start_lsn = lsn;
1836 recv->end_lsn = end_lsn;
1837
1838 recv_addr_t* recv_addr = recv_get_fil_addr_struct(space, page_no);
1839
1840 if (recv_addr == NULL) {
1841 recv_addr = static_cast<recv_addr_t*>(
1842 mem_heap_alloc(heap, sizeof(recv_addr_t)));
1843
1844 recv_addr->space = space;
1845 recv_addr->page_no = page_no;
1846 recv_addr->state = RECV_NOT_PROCESSED;
1847
1848 UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
1849
1850 HASH_INSERT(recv_addr_t, addr_hash, addr_hash,
1851 recv_fold(space, page_no), recv_addr);
1852 n_addrs++;
1853 }
1854
1855 switch (type) {
1856 case MLOG_INIT_FILE_PAGE2:
1857 case MLOG_ZIP_PAGE_COMPRESS:
1858 case MLOG_INIT_FREE_PAGE:
1859 /* Ignore any earlier redo log records for this page. */
1860 ut_ad(recv_addr->state == RECV_NOT_PROCESSED
1861 || recv_addr->state == RECV_WILL_NOT_READ);
1862 recv_addr->state = RECV_WILL_NOT_READ;
1863 mlog_init.add(space, page_no, lsn);
1864 default:
1865 break;
1866 }
1867
1868 UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
1869
1870 recv_data_t** prev_field = &recv->data;
1871
1872 /* Store the log record body in chunks of less than srv_page_size:
1873 heap grows into the buffer pool, and bigger chunks could not
1874 be allocated */
1875
1876 while (rec_end > body) {
1877 ulint rec_len = ulint(rec_end - body);
1878
1879 if (rec_len > RECV_DATA_BLOCK_SIZE) {
1880 rec_len = RECV_DATA_BLOCK_SIZE;
1881 }
1882
1883 recv_data_t* recv_data = static_cast<recv_data_t*>(
1884 mem_heap_alloc(heap, sizeof(recv_data_t) + rec_len));
1885
1886 *prev_field = recv_data;
1887
1888 memcpy(recv_data + 1, body, rec_len);
1889
1890 prev_field = &recv_data->next;
1891
1892 body += rec_len;
1893 }
1894
1895 *prev_field = NULL;
1896 }
1897
1898 /*********************************************************************//**
1899 Copies the log record body from recv to buf. */
1900 static
1901 void
recv_data_copy_to_buf(byte * buf,recv_t * recv)1902 recv_data_copy_to_buf(
1903 /*==================*/
1904 byte* buf, /*!< in: buffer of length at least recv->len */
1905 recv_t* recv) /*!< in: log record */
1906 {
1907 recv_data_t* recv_data;
1908 ulint part_len;
1909 ulint len;
1910
1911 len = recv->len;
1912 recv_data = recv->data;
1913
1914 while (len > 0) {
1915 if (len > RECV_DATA_BLOCK_SIZE) {
1916 part_len = RECV_DATA_BLOCK_SIZE;
1917 } else {
1918 part_len = len;
1919 }
1920
1921 ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
1922 part_len);
1923 buf += part_len;
1924 len -= part_len;
1925
1926 recv_data = recv_data->next;
1927 }
1928 }
1929
1930 /** Apply the hashed log records to the page, if the page lsn is less than the
1931 lsn of a log record.
1932 @param[in,out] block buffer pool page
1933 @param[in,out] mtr mini-transaction
1934 @param[in,out] recv_addr recovery address
1935 @param[in,out] init page initialization operation, or NULL */
recv_recover_page(buf_block_t * block,mtr_t & mtr,recv_addr_t * recv_addr,mlog_init_t::init * init=NULL)1936 static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
1937 recv_addr_t* recv_addr,
1938 mlog_init_t::init* init = NULL)
1939 {
1940 page_t* page;
1941 page_zip_des_t* page_zip;
1942
1943 ut_ad(mutex_own(&recv_sys.mutex));
1944 ut_ad(recv_sys.apply_log_recs);
1945 ut_ad(recv_needed_recovery);
1946 ut_ad(recv_addr->state != RECV_BEING_PROCESSED);
1947 ut_ad(recv_addr->state != RECV_PROCESSED);
1948 ut_ad(!init || init->created);
1949 ut_ad(!init || init->lsn);
1950
1951 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
1952 fprintf(stderr, "Applying log to page %u:%u\n",
1953 recv_addr->space, recv_addr->page_no);
1954 }
1955
1956 DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
1957
1958 recv_addr->state = RECV_BEING_PROCESSED;
1959 mutex_exit(&recv_sys.mutex);
1960
1961 page = block->frame;
1962 page_zip = buf_block_get_page_zip(block);
1963
1964 /* The page may have been modified in the buffer pool.
1965 FIL_PAGE_LSN would only be updated right before flushing. */
1966 lsn_t page_lsn = buf_page_get_newest_modification(&block->page);
1967 if (!page_lsn) {
1968 page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
1969 }
1970
1971 bool free_page = false;
1972 lsn_t start_lsn = 0, end_lsn = 0;
1973 const lsn_t init_lsn = init ? init->lsn : 0;
1974
1975 for (recv_t* recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
1976 recv; recv = UT_LIST_GET_NEXT(rec_list, recv)) {
1977 ut_ad(recv->start_lsn);
1978 end_lsn = recv->end_lsn;
1979 ut_ad(end_lsn <= log_sys.log.scanned_lsn);
1980
1981 if (recv->start_lsn < page_lsn) {
1982 /* Ignore this record, because there are later changes
1983 for this page. */
1984 DBUG_LOG("ib_log", "apply skip "
1985 << get_mlog_string(recv->type)
1986 << " LSN " << recv->start_lsn << " < "
1987 << page_lsn);
1988 } else if (recv->start_lsn < init_lsn) {
1989 DBUG_LOG("ib_log", "init skip "
1990 << get_mlog_string(recv->type)
1991 << " LSN " << recv->start_lsn << " < "
1992 << init_lsn);
1993 } else {
1994 if (recv->type == MLOG_INIT_FREE_PAGE) {
1995 /* This does not really modify the page. */
1996 free_page = true;
1997 } else if (!start_lsn) {
1998 start_lsn = recv->start_lsn;
1999 }
2000
2001 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2002 fprintf(stderr, "apply " LSN_PF ":"
2003 " %d len " ULINTPF " page %u:%u\n",
2004 recv->start_lsn, recv->type, recv->len,
2005 recv_addr->space, recv_addr->page_no);
2006 }
2007
2008 DBUG_LOG("ib_log", "apply " << recv->start_lsn << ": "
2009 << get_mlog_string(recv->type)
2010 << " len " << recv->len
2011 << " page " << block->page.id);
2012
2013 byte* buf;
2014
2015 if (recv->len > RECV_DATA_BLOCK_SIZE) {
2016 /* We have to copy the record body to
2017 a separate buffer */
2018 buf = static_cast<byte*>
2019 (ut_malloc_nokey(recv->len));
2020 recv_data_copy_to_buf(buf, recv);
2021 } else {
2022 buf = reinterpret_cast<byte*>(recv->data)
2023 + sizeof *recv->data;
2024 }
2025
2026 recv_parse_or_apply_log_rec_body(
2027 recv->type, buf, buf + recv->len,
2028 block->page.id, true, block, &mtr);
2029
2030 end_lsn = recv->start_lsn + recv->len;
2031 mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
2032 mach_write_to_8(srv_page_size
2033 - FIL_PAGE_END_LSN_OLD_CHKSUM
2034 + page, end_lsn);
2035
2036 if (page_zip) {
2037 mach_write_to_8(FIL_PAGE_LSN + page_zip->data,
2038 end_lsn);
2039 }
2040
2041 if (recv->len > RECV_DATA_BLOCK_SIZE) {
2042 ut_free(buf);
2043 }
2044 }
2045 }
2046
2047 #ifdef UNIV_ZIP_DEBUG
2048 ut_ad(!fil_page_index_page_check(page)
2049 || !page_zip
2050 || page_zip_validate_low(page_zip, page, NULL, FALSE));
2051 #endif /* UNIV_ZIP_DEBUG */
2052
2053 if (start_lsn) {
2054 log_flush_order_mutex_enter();
2055 buf_flush_note_modification(block, start_lsn, end_lsn, NULL);
2056 log_flush_order_mutex_exit();
2057 } else if (free_page && init) {
2058 /* There have been no operations than MLOG_INIT_FREE_PAGE.
2059 Any buffered changes must not be merged. A subsequent
2060 buf_page_create() from a user thread should discard
2061 any buffered changes. */
2062 init->created = false;
2063 ut_ad(!mtr.has_modifications());
2064 }
2065
2066 /* Make sure that committing mtr does not change the modification
2067 lsn values of page */
2068
2069 mtr.discard_modifications();
2070 mtr.commit();
2071
2072 time_t now = time(NULL);
2073
2074 mutex_enter(&recv_sys.mutex);
2075
2076 if (recv_max_page_lsn < page_lsn) {
2077 recv_max_page_lsn = page_lsn;
2078 }
2079
2080 ut_ad(recv_addr->state == RECV_BEING_PROCESSED);
2081 recv_addr->state = RECV_PROCESSED;
2082
2083 ut_a(recv_sys.n_addrs > 0);
2084 if (ulint n = --recv_sys.n_addrs) {
2085 if (recv_sys.report(now)) {
2086 ib::info() << "To recover: " << n << " pages from log";
2087 service_manager_extend_timeout(
2088 INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
2089 }
2090 }
2091 }
2092
2093 /** Reduces recv_sys.n_addrs for the corrupted page.
2094 This function should called when srv_force_recovery > 0.
2095 @param[in] page_id page id of the corrupted page */
recv_recover_corrupt_page(page_id_t page_id)2096 void recv_recover_corrupt_page(page_id_t page_id)
2097 {
2098 ut_ad(srv_force_recovery);
2099 mutex_enter(&recv_sys.mutex);
2100
2101 if (!recv_sys.apply_log_recs) {
2102 } else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2103 page_id.space(), page_id.page_no())) {
2104 switch (recv_addr->state) {
2105 case RECV_WILL_NOT_READ:
2106 ut_ad(!"wrong state");
2107 break;
2108 case RECV_BEING_PROCESSED:
2109 case RECV_PROCESSED:
2110 break;
2111 default:
2112 recv_addr->state = RECV_PROCESSED;
2113 ut_ad(recv_sys.n_addrs);
2114 recv_sys.n_addrs--;
2115 }
2116 }
2117
2118 mutex_exit(&recv_sys.mutex);
2119 }
2120
2121 /** Apply any buffered redo log to a page that was just read from a data file.
2122 @param[in,out] bpage buffer pool page */
recv_recover_page(buf_page_t * bpage)2123 void recv_recover_page(buf_page_t* bpage)
2124 {
2125 mtr_t mtr;
2126 mtr.start();
2127 mtr.set_log_mode(MTR_LOG_NONE);
2128
2129 ut_ad(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
2130 buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage);
2131
2132 /* Move the ownership of the x-latch on the page to
2133 this OS thread, so that we can acquire a second
2134 x-latch on it. This is needed for the operations to
2135 the page to pass the debug checks. */
2136 rw_lock_x_lock_move_ownership(&block->lock);
2137 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2138 ibool success = buf_page_get_known_nowait(
2139 RW_X_LATCH, block, BUF_KEEP_OLD,
2140 __FILE__, __LINE__, &mtr);
2141 ut_a(success);
2142
2143 mutex_enter(&recv_sys.mutex);
2144 if (!recv_sys.apply_log_recs) {
2145 } else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2146 bpage->id.space(), bpage->id.page_no())) {
2147 switch (recv_addr->state) {
2148 case RECV_BEING_PROCESSED:
2149 case RECV_PROCESSED:
2150 break;
2151 default:
2152 recv_recover_page(block, mtr, recv_addr);
2153 goto func_exit;
2154 }
2155 }
2156
2157 mtr.commit();
2158 func_exit:
2159 mutex_exit(&recv_sys.mutex);
2160 ut_ad(mtr.has_committed());
2161 }
2162
2163 /** Reads in pages which have hashed log records, from an area around a given
2164 page number.
2165 @param[in] page_id page id */
recv_read_in_area(const page_id_t page_id)2166 static void recv_read_in_area(const page_id_t page_id)
2167 {
2168 ulint page_nos[RECV_READ_AHEAD_AREA];
2169 ulint page_no = page_id.page_no()
2170 - (page_id.page_no() % RECV_READ_AHEAD_AREA);
2171 ulint* p = page_nos;
2172
2173 for (const ulint up_limit = page_no + RECV_READ_AHEAD_AREA;
2174 page_no < up_limit; page_no++) {
2175 recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2176 page_id.space(), page_no);
2177 if (recv_addr
2178 && recv_addr->state == RECV_NOT_PROCESSED
2179 && !buf_page_peek(page_id_t(page_id.space(), page_no))) {
2180 recv_addr->state = RECV_BEING_READ;
2181 *p++ = page_no;
2182 }
2183 }
2184
2185 mutex_exit(&recv_sys.mutex);
2186 buf_read_recv_pages(FALSE, page_id.space(), page_nos,
2187 ulint(p - page_nos));
2188 mutex_enter(&recv_sys.mutex);
2189 }
2190
2191 /** This is another low level function for the recovery system
2192 to create a page which has buffered page intialization redo log records.
2193 @param[in] page_id page to be created using redo logs
2194 @param[in,out] recv_addr Hashed redo logs for the given page id
2195 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id,recv_addr_t * recv_addr)2196 static buf_block_t* recv_recovery_create_page_low(const page_id_t page_id,
2197 recv_addr_t* recv_addr)
2198 {
2199 mtr_t mtr;
2200 mlog_init_t::init &i= mlog_init.last(page_id);
2201 const lsn_t end_lsn= UT_LIST_GET_LAST(recv_addr->rec_list)->end_lsn;
2202
2203 if (end_lsn < i.lsn)
2204 {
2205 DBUG_LOG("ib_log", "skip log for page "
2206 << page_id
2207 << " LSN " << end_lsn
2208 << " < " << i.lsn);
2209 recv_addr->state= RECV_PROCESSED;
2210 ignore:
2211 ut_a(recv_sys.n_addrs);
2212 recv_sys.n_addrs--;
2213 return NULL;
2214 }
2215
2216 fil_space_t *space= fil_space_acquire_for_io(recv_addr->space);
2217 if (!space)
2218 {
2219 recv_addr->state= RECV_PROCESSED;
2220 goto ignore;
2221 }
2222
2223 if (space->enable_lsn)
2224 {
2225 init_fail:
2226 space->release_for_io();
2227 recv_addr->state= RECV_NOT_PROCESSED;
2228 return NULL;
2229 }
2230
2231 /* Determine if a tablespace could be for an internal table
2232 for FULLTEXT INDEX. For those tables, no MLOG_INDEX_LOAD record
2233 used to be written when redo logging was disabled. Hence, we
2234 cannot optimize away page reads, because all the redo
2235 log records for initializing and modifying the page in the
2236 past could be older than the page in the data file.
2237
2238 The check is too broad, causing all
2239 tables whose names start with FTS_ to skip the optimization. */
2240
2241 if (strstr(space->name, "/FTS_"))
2242 goto init_fail;
2243
2244 mtr.start();
2245 mtr.set_log_mode(MTR_LOG_NONE);
2246 buf_block_t *block= buf_page_create(page_id, space->zip_size(), &mtr);
2247 if (recv_addr->state == RECV_PROCESSED)
2248 /* The page happened to exist in the buffer pool, or it was
2249 just being read in. Before buf_page_get_with_no_latch() returned,
2250 all changes must have been applied to the page already. */
2251 mtr.commit();
2252 else
2253 {
2254 i.created= true;
2255 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2256 recv_recover_page(block, mtr, recv_addr, &i);
2257 ut_ad(mtr.has_committed());
2258 }
2259
2260 space->release_for_io();
2261 return block;
2262 }
2263
2264 /** This is a low level function for the recovery system
2265 to create a page which has buffered intialized redo log records.
2266 @param[in] page_id page to be created using redo logs
2267 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id)2268 buf_block_t* recv_recovery_create_page_low(const page_id_t page_id)
2269 {
2270 buf_block_t* block= nullptr;
2271 mutex_enter(&recv_sys.mutex);
2272 recv_addr_t* recv_addr= recv_get_fil_addr_struct(page_id.space(),
2273 page_id.page_no());
2274 if (recv_addr && recv_addr->state == RECV_WILL_NOT_READ)
2275 block= recv_recovery_create_page_low(page_id, recv_addr);
2276 mutex_exit(&recv_sys.mutex);
2277 return block;
2278 }
2279
2280 /** Apply the hash table of stored log records to persistent data pages.
2281 @param[in] last_batch whether the change buffer merge will be
2282 performed as part of the operation */
recv_apply_hashed_log_recs(bool last_batch)2283 void recv_apply_hashed_log_recs(bool last_batch)
2284 {
2285 ut_ad(srv_operation == SRV_OPERATION_NORMAL
2286 || is_mariabackup_restore_or_export());
2287
2288 mutex_enter(&recv_sys.mutex);
2289
2290 while (recv_sys.apply_batch_on) {
2291 bool abort = recv_sys.found_corrupt_log;
2292 mutex_exit(&recv_sys.mutex);
2293
2294 if (abort) {
2295 return;
2296 }
2297
2298 os_thread_sleep(500000);
2299 mutex_enter(&recv_sys.mutex);
2300 }
2301
2302 ut_ad(!last_batch == log_mutex_own());
2303
2304 recv_no_ibuf_operations
2305 = !last_batch || is_mariabackup_restore_or_export();
2306
2307 if (ulint n = recv_sys.n_addrs) {
2308 if (!log_sys.log.subformat && !srv_force_recovery
2309 && srv_undo_tablespaces_open) {
2310 ib::error() << "Recovery of separately logged"
2311 " TRUNCATE operations is no longer supported."
2312 " Set innodb_force_recovery=1"
2313 " if no *trunc.log files exist";
2314 recv_sys.found_corrupt_log = true;
2315 mutex_exit(&recv_sys.mutex);
2316 return;
2317 }
2318
2319 const char* msg = last_batch
2320 ? "Starting final batch to recover "
2321 : "Starting a batch to recover ";
2322 ib::info() << msg << n << " pages from redo log.";
2323 sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log",
2324 msg, n);
2325 }
2326 recv_sys.apply_log_recs = true;
2327 recv_sys.apply_batch_on = true;
2328
2329 for (ulint id = srv_undo_tablespaces_open; id--; ) {
2330 recv_sys_t::trunc& t = recv_sys.truncated_undo_spaces[id];
2331 if (t.lsn) {
2332 recv_addr_trim(id + srv_undo_space_id_start, t.pages,
2333 t.lsn);
2334 }
2335 }
2336
2337 mtr_t mtr;
2338
2339 for (ulint i = 0; i < hash_get_n_cells(recv_sys.addr_hash); i++) {
2340 for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
2341 HASH_GET_FIRST(recv_sys.addr_hash, i));
2342 recv_addr;
2343 recv_addr = static_cast<recv_addr_t*>(
2344 HASH_GET_NEXT(addr_hash, recv_addr))) {
2345 if (!UT_LIST_GET_LEN(recv_addr->rec_list)) {
2346 ignore:
2347 ut_a(recv_sys.n_addrs);
2348 recv_sys.n_addrs--;
2349 continue;
2350 }
2351
2352 switch (recv_addr->state) {
2353 case RECV_BEING_READ:
2354 case RECV_BEING_PROCESSED:
2355 case RECV_PROCESSED:
2356 continue;
2357 case RECV_DISCARDED:
2358 goto ignore;
2359 case RECV_NOT_PROCESSED:
2360 case RECV_WILL_NOT_READ:
2361 break;
2362 }
2363
2364 const page_id_t page_id(recv_addr->space,
2365 recv_addr->page_no);
2366
2367 if (recv_addr->state == RECV_NOT_PROCESSED) {
2368 apply:
2369 mtr.start();
2370 mtr.set_log_mode(MTR_LOG_NONE);
2371 if (buf_block_t* block = buf_page_get_low(
2372 page_id, 0, RW_X_LATCH, NULL,
2373 BUF_GET_IF_IN_POOL,
2374 __FILE__, __LINE__, &mtr, NULL)) {
2375 buf_block_dbg_add_level(
2376 block, SYNC_NO_ORDER_CHECK);
2377 recv_recover_page(block, mtr,
2378 recv_addr);
2379 ut_ad(mtr.has_committed());
2380 } else {
2381 mtr.commit();
2382 recv_read_in_area(page_id);
2383 }
2384 } else if (!recv_recovery_create_page_low(
2385 page_id, recv_addr)) {
2386 goto apply;
2387 }
2388 }
2389 }
2390
2391 /* Wait until all the pages have been processed */
2392
2393 while (recv_sys.n_addrs || buf_get_n_pending_read_ios()) {
2394 const bool abort = recv_sys.found_corrupt_log
2395 || recv_sys.found_corrupt_fs;
2396
2397 if (recv_sys.found_corrupt_fs && !srv_force_recovery) {
2398 ib::info() << "Set innodb_force_recovery=1"
2399 " to ignore corrupted pages.";
2400 }
2401
2402 mutex_exit(&(recv_sys.mutex));
2403
2404 if (abort) {
2405 return;
2406 }
2407
2408 os_thread_sleep(500000);
2409
2410 mutex_enter(&(recv_sys.mutex));
2411 }
2412
2413 if (!last_batch) {
2414 /* Flush all the file pages to disk and invalidate them in
2415 the buffer pool */
2416
2417 mutex_exit(&(recv_sys.mutex));
2418 log_mutex_exit();
2419
2420 /* Stop the recv_writer thread from issuing any LRU
2421 flush batches. */
2422 mutex_enter(&recv_sys.writer_mutex);
2423
2424 /* Wait for any currently run batch to end. */
2425 buf_flush_wait_LRU_batch_end();
2426
2427 os_event_reset(recv_sys.flush_end);
2428 recv_sys.flush_type = BUF_FLUSH_LIST;
2429 os_event_set(recv_sys.flush_start);
2430 os_event_wait(recv_sys.flush_end);
2431
2432 buf_pool_invalidate();
2433
2434 /* Allow batches from recv_writer thread. */
2435 mutex_exit(&recv_sys.writer_mutex);
2436
2437 log_mutex_enter();
2438 mutex_enter(&(recv_sys.mutex));
2439 mlog_init.reset();
2440 } else if (!recv_no_ibuf_operations) {
2441 /* We skipped this in buf_page_create(). */
2442 mlog_init.ibuf_merge(mtr);
2443 }
2444
2445 recv_sys.apply_log_recs = false;
2446 recv_sys.apply_batch_on = false;
2447
2448 recv_sys.empty();
2449
2450 mutex_exit(&recv_sys.mutex);
2451 }
2452
2453 /** Parse the redo log to set the space recovery size and flags
2454 @param[in] ptr pointer to parsing redo buffer
2455 @param[in] end_ptr end of the parsing redo buffer
2456 @param[in] space tablespace id */
2457 static
recv_parse_set_size_and_flags(const byte * ptr,byte * end_ptr,ulint space)2458 void recv_parse_set_size_and_flags(const byte *ptr, byte *end_ptr,
2459 ulint space)
2460 {
2461 switch (const uint16_t offset= mach_read_from_2(ptr))
2462 {
2463 default:
2464 break;
2465 case FSP_HEADER_OFFSET + FSP_SIZE:
2466 case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
2467 ptr += 2;
2468 ulint val= mach_parse_compressed(&ptr, end_ptr);
2469 recv_spaces_t::iterator it= recv_spaces.find(space);
2470
2471 ut_ad(!recv_sys.mlog_checkpoint_lsn || space == TRX_SYS_SPACE ||
2472 srv_is_undo_tablespace(space) || it != recv_spaces.end());
2473
2474 if (offset == FSP_HEADER_OFFSET + FSP_SIZE)
2475 fil_space_set_recv_size_and_flags(
2476 space, val, FSP_FLAGS_FCRC32_MASK_MARKER);
2477 else
2478 fil_space_set_recv_size_and_flags(
2479 space, 0, static_cast<uint32_t>(val));
2480
2481 if (it == recv_spaces.end() || it->second.space)
2482 return;
2483
2484 if (offset == FSP_HEADER_OFFSET + FSP_SIZE)
2485 it->second.size= val;
2486 else
2487 it->second.flags= static_cast<uint32_t>(val);
2488 }
2489 }
2490
2491 /** Tries to parse a single log record.
2492 @param[out] type log record type
2493 @param[in] ptr pointer to a buffer
2494 @param[in] end_ptr end of the buffer
2495 @param[out] space_id tablespace identifier
2496 @param[out] page_no page number
2497 @param[in] apply whether to apply MLOG_FILE_* records
2498 @param[out] body start of log record body
2499 @return length of the record, or 0 if the record was not complete */
2500 static
2501 ulint
recv_parse_log_rec(mlog_id_t * type,byte * ptr,byte * end_ptr,ulint * space,ulint * page_no,bool apply,byte ** body)2502 recv_parse_log_rec(
2503 mlog_id_t* type,
2504 byte* ptr,
2505 byte* end_ptr,
2506 ulint* space,
2507 ulint* page_no,
2508 bool apply,
2509 byte** body)
2510 {
2511 byte* new_ptr;
2512
2513 *body = NULL;
2514
2515 MEM_UNDEFINED(type, sizeof *type);
2516 MEM_UNDEFINED(space, sizeof *space);
2517 MEM_UNDEFINED(page_no, sizeof *page_no);
2518 MEM_UNDEFINED(body, sizeof *body);
2519
2520 if (ptr == end_ptr) {
2521
2522 return(0);
2523 }
2524
2525 switch (*ptr) {
2526 #ifdef UNIV_LOG_LSN_DEBUG
2527 case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
2528 case MLOG_LSN:
2529 new_ptr = mlog_parse_initial_log_record(
2530 ptr, end_ptr, type, space, page_no);
2531 if (new_ptr != NULL) {
2532 const lsn_t lsn = static_cast<lsn_t>(
2533 *space) << 32 | *page_no;
2534 ut_a(lsn == recv_sys.recovered_lsn);
2535 }
2536
2537 *type = MLOG_LSN;
2538 return(new_ptr - ptr);
2539 #endif /* UNIV_LOG_LSN_DEBUG */
2540 case MLOG_MULTI_REC_END:
2541 case MLOG_DUMMY_RECORD:
2542 *type = static_cast<mlog_id_t>(*ptr);
2543 return(1);
2544 case MLOG_CHECKPOINT:
2545 if (end_ptr < ptr + SIZE_OF_MLOG_CHECKPOINT) {
2546 return(0);
2547 }
2548 *type = static_cast<mlog_id_t>(*ptr);
2549 return(SIZE_OF_MLOG_CHECKPOINT);
2550 case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
2551 case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
2552 case MLOG_CHECKPOINT | MLOG_SINGLE_REC_FLAG:
2553 ib::error() << "Incorrect log record type "
2554 << ib::hex(unsigned(*ptr));
2555 recv_sys.found_corrupt_log = true;
2556 return(0);
2557 }
2558
2559 new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
2560 page_no);
2561 *body = new_ptr;
2562
2563 if (UNIV_UNLIKELY(!new_ptr)) {
2564
2565 return(0);
2566 }
2567
2568 const byte* old_ptr = new_ptr;
2569 new_ptr = recv_parse_or_apply_log_rec_body(
2570 *type, new_ptr, end_ptr, page_id_t(*space, *page_no), apply,
2571 NULL, NULL);
2572
2573 if (UNIV_UNLIKELY(new_ptr == NULL)) {
2574 return(0);
2575 }
2576
2577 if (*page_no == 0 && *type == MLOG_4BYTES && apply) {
2578 recv_parse_set_size_and_flags(old_ptr, end_ptr, *space);
2579 }
2580
2581 return ulint(new_ptr - ptr);
2582 }
2583
2584 /*******************************************************//**
2585 Calculates the new value for lsn when more data is added to the log. */
2586 static
2587 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)2588 recv_calc_lsn_on_data_add(
2589 /*======================*/
2590 lsn_t lsn, /*!< in: old lsn */
2591 ib_uint64_t len) /*!< in: this many bytes of data is
2592 added, log block headers not included */
2593 {
2594 unsigned frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
2595 unsigned payload_size = log_sys.payload_size();
2596 ut_ad(frag_len < payload_size);
2597 lsn_t lsn_len = len;
2598 lsn_len += (lsn_len + frag_len) / payload_size
2599 * (OS_FILE_LOG_BLOCK_SIZE - payload_size);
2600
2601 return(lsn + lsn_len);
2602 }
2603
2604 /** Prints diagnostic info of corrupt log.
2605 @param[in] ptr pointer to corrupt log record
2606 @param[in] type type of the log record (could be garbage)
2607 @param[in] space tablespace ID (could be garbage)
2608 @param[in] page_no page number (could be garbage)
2609 @return whether processing should continue */
2610 ATTRIBUTE_COLD
2611 static
2612 bool
recv_report_corrupt_log(const byte * ptr,int type,ulint space,ulint page_no)2613 recv_report_corrupt_log(
2614 const byte* ptr,
2615 int type,
2616 ulint space,
2617 ulint page_no)
2618 {
2619 ib::error() <<
2620 "############### CORRUPT LOG RECORD FOUND ##################";
2621
2622 const ulint ptr_offset = ulint(ptr - recv_sys.buf);
2623
2624 ib::info() << "Log record type " << type << ", page " << space << ":"
2625 << page_no << ". Log parsing proceeded successfully up to "
2626 << recv_sys.recovered_lsn << ". Previous log record type "
2627 << recv_previous_parsed_rec_type << ", is multi "
2628 << recv_previous_parsed_rec_is_multi << " Recv offset "
2629 << ptr_offset << ", prev "
2630 << recv_previous_parsed_rec_offset;
2631
2632 ut_ad(ptr <= recv_sys.buf + recv_sys.len);
2633
2634 const ulint limit = 100;
2635 const ulint prev_offset = std::min(recv_previous_parsed_rec_offset,
2636 ptr_offset);
2637 const ulint before = std::min(prev_offset, limit);
2638 const ulint after = std::min(recv_sys.len - ptr_offset, limit);
2639
2640 ib::info() << "Hex dump starting " << before << " bytes before and"
2641 " ending " << after << " bytes after the corrupted record:";
2642
2643 const byte* start = recv_sys.buf + prev_offset - before;
2644
2645 ut_print_buf(stderr, start, ulint(ptr - start) + after);
2646 putc('\n', stderr);
2647
2648 if (!srv_force_recovery) {
2649 ib::info() << "Set innodb_force_recovery to ignore this error.";
2650 return(false);
2651 }
2652
2653 ib::warn() << "The log file may have been corrupt and it is possible"
2654 " that the log scan did not proceed far enough in recovery!"
2655 " Please run CHECK TABLE on your InnoDB tables to check"
2656 " that they are ok! If mysqld crashes after this recovery; "
2657 << FORCE_RECOVERY_MSG;
2658 return(true);
2659 }
2660
2661 /** Report a MLOG_INDEX_LOAD operation.
2662 @param[in] space_id tablespace id
2663 @param[in] page_no page number
2664 @param[in] lsn log sequence number */
2665 ATTRIBUTE_COLD static void
recv_mlog_index_load(ulint space_id,ulint page_no,lsn_t lsn)2666 recv_mlog_index_load(ulint space_id, ulint page_no, lsn_t lsn)
2667 {
2668 recv_spaces_t::iterator it = recv_spaces.find(space_id);
2669 if (it != recv_spaces.end()) {
2670 it->second.mlog_index_load(lsn);
2671 }
2672
2673 if (log_optimized_ddl_op) {
2674 log_optimized_ddl_op(space_id);
2675 }
2676 }
2677
2678 /** Check whether read redo log memory exceeds the available memory
2679 of buffer pool. Store last_stored_lsn if it is not in last phase
2680 @param[in] store whether to store page operations
2681 @param[in] available_mem Available memory in buffer pool to
2682 read redo logs. */
recv_sys_heap_check(store_t * store,ulint available_mem)2683 static bool recv_sys_heap_check(store_t* store, ulint available_mem)
2684 {
2685 if (*store != STORE_NO && mem_heap_get_size(recv_sys.heap) >= available_mem)
2686 {
2687 if (*store == STORE_YES)
2688 recv_sys.last_stored_lsn= recv_sys.recovered_lsn;
2689
2690 *store= STORE_NO;
2691 DBUG_PRINT("ib_log",("Ran out of memory and last "
2692 "stored lsn " LSN_PF " last stored offset "
2693 ULINTPF "\n",
2694 recv_sys.recovered_lsn, recv_sys.recovered_offset));
2695 return true;
2696 }
2697
2698 return false;
2699 }
2700
2701 /** Parse log records from a buffer and optionally store them to a
2702 hash table to wait merging to file pages.
2703 @param[in] checkpoint_lsn the LSN of the latest checkpoint
2704 @param[in] store whether to store page operations
2705 @param[in] available_mem memory to read the redo logs
2706 @param[in] apply whether to apply the records
2707 @return whether MLOG_CHECKPOINT record was seen the first time,
2708 or corruption was noticed */
recv_parse_log_recs(lsn_t checkpoint_lsn,store_t * store,ulint available_mem,bool apply)2709 bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t* store,
2710 ulint available_mem, bool apply)
2711 {
2712 byte* ptr;
2713 byte* end_ptr;
2714 bool single_rec;
2715 ulint len;
2716 lsn_t new_recovered_lsn;
2717 lsn_t old_lsn;
2718 mlog_id_t type;
2719 ulint space;
2720 ulint page_no;
2721 byte* body;
2722 const bool last_phase = (*store == STORE_IF_EXISTS);
2723
2724 ut_ad(log_mutex_own());
2725 ut_ad(mutex_own(&recv_sys.mutex));
2726 ut_ad(recv_sys.parse_start_lsn != 0);
2727 loop:
2728 ptr = recv_sys.buf + recv_sys.recovered_offset;
2729
2730 end_ptr = recv_sys.buf + recv_sys.len;
2731
2732 if (ptr == end_ptr) {
2733
2734 return(false);
2735 }
2736
2737 /* Check for memory overflow and ignore the parsing of remaining
2738 redo log records if InnoDB ran out of memory */
2739 if (recv_sys_heap_check(store, available_mem) && last_phase) {
2740 return false;
2741 }
2742
2743 switch (*ptr) {
2744 case MLOG_CHECKPOINT:
2745 #ifdef UNIV_LOG_LSN_DEBUG
2746 case MLOG_LSN:
2747 #endif /* UNIV_LOG_LSN_DEBUG */
2748 case MLOG_DUMMY_RECORD:
2749 single_rec = true;
2750 break;
2751 default:
2752 single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
2753 }
2754
2755 if (single_rec) {
2756 /* The mtr did not modify multiple pages */
2757
2758 old_lsn = recv_sys.recovered_lsn;
2759
2760 /* Try to parse a log record, fetching its type, space id,
2761 page no, and a pointer to the body of the log record */
2762
2763 len = recv_parse_log_rec(&type, ptr, end_ptr, &space,
2764 &page_no, apply, &body);
2765
2766 if (UNIV_UNLIKELY(recv_sys.found_corrupt_log)) {
2767 recv_report_corrupt_log(ptr, type, space, page_no);
2768 return(true);
2769 }
2770
2771 if (UNIV_UNLIKELY(recv_sys.found_corrupt_fs)) {
2772 return(true);
2773 }
2774
2775 if (len == 0) {
2776 return(false);
2777 }
2778
2779 new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
2780
2781 if (new_recovered_lsn > recv_sys.scanned_lsn) {
2782 /* The log record filled a log block, and we require
2783 that also the next log block should have been scanned
2784 in */
2785
2786 return(false);
2787 }
2788
2789 recv_previous_parsed_rec_type = type;
2790 recv_previous_parsed_rec_offset = recv_sys.recovered_offset;
2791 recv_previous_parsed_rec_is_multi = 0;
2792
2793 recv_sys.recovered_offset += len;
2794 recv_sys.recovered_lsn = new_recovered_lsn;
2795
2796 switch (type) {
2797 lsn_t lsn;
2798 case MLOG_DUMMY_RECORD:
2799 /* Do nothing */
2800 break;
2801 case MLOG_CHECKPOINT:
2802 compile_time_assert(SIZE_OF_MLOG_CHECKPOINT == 1 + 8);
2803 lsn = mach_read_from_8(ptr + 1);
2804
2805 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2806 fprintf(stderr,
2807 "MLOG_CHECKPOINT(" LSN_PF ") %s at "
2808 LSN_PF "\n", lsn,
2809 lsn != checkpoint_lsn ? "ignored"
2810 : recv_sys.mlog_checkpoint_lsn
2811 ? "reread" : "read",
2812 recv_sys.recovered_lsn);
2813 }
2814
2815 DBUG_PRINT("ib_log",
2816 ("MLOG_CHECKPOINT(" LSN_PF ") %s at "
2817 LSN_PF,
2818 lsn,
2819 lsn != checkpoint_lsn ? "ignored"
2820 : recv_sys.mlog_checkpoint_lsn
2821 ? "reread" : "read",
2822 recv_sys.recovered_lsn));
2823
2824 if (lsn == checkpoint_lsn) {
2825 if (recv_sys.mlog_checkpoint_lsn) {
2826 /* There can be multiple
2827 MLOG_CHECKPOINT lsn for the
2828 same checkpoint. */
2829 break;
2830 }
2831 recv_sys.mlog_checkpoint_lsn
2832 = recv_sys.recovered_lsn;
2833 return(true);
2834 }
2835 break;
2836 #ifdef UNIV_LOG_LSN_DEBUG
2837 case MLOG_LSN:
2838 /* Do not add these records to the hash table.
2839 The page number and space id fields are misused
2840 for something else. */
2841 break;
2842 #endif /* UNIV_LOG_LSN_DEBUG */
2843 default:
2844 switch (*store) {
2845 case STORE_NO:
2846 break;
2847 case STORE_IF_EXISTS:
2848 if (fil_space_get_flags(space)
2849 == ULINT_UNDEFINED) {
2850 break;
2851 }
2852 /* fall through */
2853 case STORE_YES:
2854 recv_sys.add(
2855 type, space, page_no, body,
2856 ptr + len, old_lsn,
2857 recv_sys.recovered_lsn);
2858 }
2859 /* fall through */
2860 case MLOG_INDEX_LOAD:
2861 if (type == MLOG_INDEX_LOAD) {
2862 recv_mlog_index_load(space, page_no, old_lsn);
2863 }
2864 /* fall through */
2865 case MLOG_FILE_NAME:
2866 case MLOG_FILE_DELETE:
2867 case MLOG_FILE_CREATE2:
2868 case MLOG_FILE_RENAME2:
2869 case MLOG_TRUNCATE:
2870 /* These were already handled by
2871 recv_parse_log_rec() and
2872 recv_parse_or_apply_log_rec_body(). */
2873 DBUG_PRINT("ib_log",
2874 ("scan " LSN_PF ": log rec %s"
2875 " len " ULINTPF
2876 " page " ULINTPF ":" ULINTPF,
2877 old_lsn, get_mlog_string(type),
2878 len, space, page_no));
2879 }
2880 } else {
2881 /* Check that all the records associated with the single mtr
2882 are included within the buffer */
2883
2884 ulint total_len = 0;
2885 ulint n_recs = 0;
2886 bool only_mlog_file = true;
2887 ulint mlog_rec_len = 0;
2888
2889 for (;;) {
2890 len = recv_parse_log_rec(
2891 &type, ptr, end_ptr, &space, &page_no,
2892 false, &body);
2893
2894 if (UNIV_UNLIKELY(recv_sys.found_corrupt_log)) {
2895 corrupted_log:
2896 recv_report_corrupt_log(
2897 ptr, type, space, page_no);
2898 return(true);
2899 }
2900
2901 if (ptr == end_ptr) {
2902 } else if (type == MLOG_CHECKPOINT
2903 || (*ptr & MLOG_SINGLE_REC_FLAG)) {
2904 recv_sys.found_corrupt_log = true;
2905 goto corrupted_log;
2906 }
2907
2908 if (recv_sys.found_corrupt_fs) {
2909 return(true);
2910 }
2911
2912 if (len == 0) {
2913 return(false);
2914 }
2915
2916 recv_previous_parsed_rec_type = type;
2917 recv_previous_parsed_rec_offset
2918 = recv_sys.recovered_offset + total_len;
2919 recv_previous_parsed_rec_is_multi = 1;
2920
2921 /* MLOG_FILE_NAME redo log records doesn't make changes
2922 to persistent data. If only MLOG_FILE_NAME redo
2923 log record exists then reset the parsing buffer pointer
2924 by changing recovered_lsn and recovered_offset. */
2925 if (type != MLOG_FILE_NAME && only_mlog_file == true) {
2926 only_mlog_file = false;
2927 }
2928
2929 if (only_mlog_file) {
2930 new_recovered_lsn = recv_calc_lsn_on_data_add(
2931 recv_sys.recovered_lsn, len);
2932 mlog_rec_len += len;
2933 recv_sys.recovered_offset += len;
2934 recv_sys.recovered_lsn = new_recovered_lsn;
2935 }
2936
2937 total_len += len;
2938 n_recs++;
2939
2940 ptr += len;
2941
2942 if (type == MLOG_MULTI_REC_END) {
2943 DBUG_PRINT("ib_log",
2944 ("scan " LSN_PF
2945 ": multi-log end"
2946 " total_len " ULINTPF
2947 " n=" ULINTPF,
2948 recv_sys.recovered_lsn,
2949 total_len, n_recs));
2950 total_len -= mlog_rec_len;
2951 break;
2952 }
2953
2954 DBUG_PRINT("ib_log",
2955 ("scan " LSN_PF ": multi-log rec %s"
2956 " len " ULINTPF
2957 " page " ULINTPF ":" ULINTPF,
2958 recv_sys.recovered_lsn,
2959 get_mlog_string(type), len, space, page_no));
2960 }
2961
2962 new_recovered_lsn = recv_calc_lsn_on_data_add(
2963 recv_sys.recovered_lsn, total_len);
2964
2965 if (new_recovered_lsn > recv_sys.scanned_lsn) {
2966 /* The log record filled a log block, and we require
2967 that also the next log block should have been scanned
2968 in */
2969
2970 return(false);
2971 }
2972
2973 /* Add all the records to the hash table */
2974
2975 ptr = recv_sys.buf + recv_sys.recovered_offset;
2976
2977 for (;;) {
2978 old_lsn = recv_sys.recovered_lsn;
2979 /* This will apply MLOG_FILE_ records. We
2980 had to skip them in the first scan, because we
2981 did not know if the mini-transaction was
2982 completely recovered (until MLOG_MULTI_REC_END). */
2983 len = recv_parse_log_rec(
2984 &type, ptr, end_ptr, &space, &page_no,
2985 apply, &body);
2986
2987 if (UNIV_UNLIKELY(recv_sys.found_corrupt_log)
2988 && !recv_report_corrupt_log(
2989 ptr, type, space, page_no)) {
2990 return(true);
2991 }
2992
2993 if (UNIV_UNLIKELY(recv_sys.found_corrupt_fs)) {
2994 return(true);
2995 }
2996
2997 ut_a(len != 0);
2998 ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
2999
3000 recv_sys.recovered_offset += len;
3001 recv_sys.recovered_lsn
3002 = recv_calc_lsn_on_data_add(old_lsn, len);
3003
3004 switch (type) {
3005 case MLOG_MULTI_REC_END:
3006 /* Found the end mark for the records */
3007 goto loop;
3008 #ifdef UNIV_LOG_LSN_DEBUG
3009 case MLOG_LSN:
3010 /* Do not add these records to the hash table.
3011 The page number and space id fields are misused
3012 for something else. */
3013 break;
3014 #endif /* UNIV_LOG_LSN_DEBUG */
3015 case MLOG_INDEX_LOAD:
3016 recv_mlog_index_load(space, page_no, old_lsn);
3017 break;
3018 case MLOG_FILE_NAME:
3019 case MLOG_FILE_DELETE:
3020 case MLOG_FILE_CREATE2:
3021 case MLOG_FILE_RENAME2:
3022 case MLOG_TRUNCATE:
3023 /* These were already handled by
3024 recv_parse_log_rec() and
3025 recv_parse_or_apply_log_rec_body(). */
3026 break;
3027 default:
3028 switch (*store) {
3029 case STORE_NO:
3030 break;
3031 case STORE_IF_EXISTS:
3032 if (fil_space_get_flags(space)
3033 == ULINT_UNDEFINED) {
3034 break;
3035 }
3036 /* fall through */
3037 case STORE_YES:
3038 recv_sys.add(
3039 type, space, page_no,
3040 body, ptr + len,
3041 old_lsn,
3042 new_recovered_lsn);
3043 }
3044 }
3045
3046 ptr += len;
3047 }
3048 }
3049
3050 goto loop;
3051 }
3052
3053 /** Adds data from a new log block to the parsing buffer of recv_sys if
3054 recv_sys.parse_start_lsn is non-zero.
3055 @param[in] log_block log block to add
3056 @param[in] scanned_lsn lsn of how far we were able to find
3057 data in this log block
3058 @return true if more data added */
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)3059 bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
3060 {
3061 ulint more_len;
3062 ulint data_len;
3063 ulint start_offset;
3064 ulint end_offset;
3065
3066 ut_ad(scanned_lsn >= recv_sys.scanned_lsn);
3067
3068 if (!recv_sys.parse_start_lsn) {
3069 /* Cannot start parsing yet because no start point for
3070 it found */
3071 return(false);
3072 }
3073
3074 data_len = log_block_get_data_len(log_block);
3075
3076 if (recv_sys.parse_start_lsn >= scanned_lsn) {
3077
3078 return(false);
3079
3080 } else if (recv_sys.scanned_lsn >= scanned_lsn) {
3081
3082 return(false);
3083
3084 } else if (recv_sys.parse_start_lsn > recv_sys.scanned_lsn) {
3085 more_len = (ulint) (scanned_lsn - recv_sys.parse_start_lsn);
3086 } else {
3087 more_len = (ulint) (scanned_lsn - recv_sys.scanned_lsn);
3088 }
3089
3090 if (more_len == 0) {
3091 return(false);
3092 }
3093
3094 ut_ad(data_len >= more_len);
3095
3096 start_offset = data_len - more_len;
3097
3098 if (start_offset < LOG_BLOCK_HDR_SIZE) {
3099 start_offset = LOG_BLOCK_HDR_SIZE;
3100 }
3101
3102 end_offset = std::min<ulint>(data_len, log_sys.trailer_offset());
3103
3104 ut_ad(start_offset <= end_offset);
3105
3106 if (start_offset < end_offset) {
3107 ut_memcpy(recv_sys.buf + recv_sys.len,
3108 log_block + start_offset, end_offset - start_offset);
3109
3110 recv_sys.len += end_offset - start_offset;
3111
3112 ut_a(recv_sys.len <= RECV_PARSING_BUF_SIZE);
3113 }
3114
3115 return(true);
3116 }
3117
3118 /** Moves the parsing buffer data left to the buffer start. */
recv_sys_justify_left_parsing_buf()3119 void recv_sys_justify_left_parsing_buf()
3120 {
3121 memmove(recv_sys.buf, recv_sys.buf + recv_sys.recovered_offset,
3122 recv_sys.len - recv_sys.recovered_offset);
3123
3124 recv_sys.len -= recv_sys.recovered_offset;
3125
3126 recv_sys.recovered_offset = 0;
3127 }
3128
3129 /** Scan redo log from a buffer and stores new log data to the parsing buffer.
3130 Parse and hash the log records if new data found.
3131 Apply log records automatically when the hash table becomes full.
3132 @param[in] available_mem we let the hash table of recs to
3133 grow to this size, at the maximum
3134 @param[in,out] store_to_hash whether the records should be
3135 stored to the hash table; this is
3136 reset if just debug checking is
3137 needed, or when the available_mem
3138 runs out
3139 @param[in] log_block log segment
3140 @param[in] checkpoint_lsn latest checkpoint LSN
3141 @param[in] start_lsn buffer start LSN
3142 @param[in] end_lsn buffer end LSN
3143 @param[in,out] contiguous_lsn it is known that all groups contain
3144 contiguous log data upto this lsn
3145 @param[out] group_scanned_lsn scanning succeeded upto this lsn
3146 @return true if not able to scan any more in this log group */
recv_scan_log_recs(ulint available_mem,store_t * store_to_hash,const byte * log_block,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t end_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)3147 static bool recv_scan_log_recs(
3148 ulint available_mem,
3149 store_t* store_to_hash,
3150 const byte* log_block,
3151 lsn_t checkpoint_lsn,
3152 lsn_t start_lsn,
3153 lsn_t end_lsn,
3154 lsn_t* contiguous_lsn,
3155 lsn_t* group_scanned_lsn)
3156 {
3157 lsn_t scanned_lsn = start_lsn;
3158 bool finished = false;
3159 ulint data_len;
3160 bool more_data = false;
3161 bool apply = recv_sys.mlog_checkpoint_lsn != 0;
3162 ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
3163 const bool last_phase = (*store_to_hash == STORE_IF_EXISTS);
3164 ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3165 ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3166 ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
3167
3168 const byte* const log_end = log_block
3169 + ulint(end_lsn - start_lsn);
3170 do {
3171 ut_ad(!finished);
3172
3173 if (log_block_get_flush_bit(log_block)) {
3174 /* This block was a start of a log flush operation:
3175 we know that the previous flush operation must have
3176 been completed for all log groups before this block
3177 can have been flushed to any of the groups. Therefore,
3178 we know that log data is contiguous up to scanned_lsn
3179 in all non-corrupt log groups. */
3180
3181 if (scanned_lsn > *contiguous_lsn) {
3182 *contiguous_lsn = scanned_lsn;
3183 }
3184 }
3185
3186 data_len = log_block_get_data_len(log_block);
3187
3188 if (scanned_lsn + data_len > recv_sys.scanned_lsn
3189 && log_block_get_checkpoint_no(log_block)
3190 < recv_sys.scanned_checkpoint_no
3191 && (recv_sys.scanned_checkpoint_no
3192 - log_block_get_checkpoint_no(log_block)
3193 > 0x80000000UL)) {
3194
3195 /* Garbage from a log buffer flush which was made
3196 before the most recent database recovery */
3197 finished = true;
3198 break;
3199 }
3200
3201 if (!recv_sys.parse_start_lsn
3202 && (log_block_get_first_rec_group(log_block) > 0)) {
3203
3204 /* We found a point from which to start the parsing
3205 of log records */
3206
3207 recv_sys.parse_start_lsn = scanned_lsn
3208 + log_block_get_first_rec_group(log_block);
3209 recv_sys.scanned_lsn = recv_sys.parse_start_lsn;
3210 recv_sys.recovered_lsn = recv_sys.parse_start_lsn;
3211 }
3212
3213 scanned_lsn += data_len;
3214
3215 if (data_len == LOG_BLOCK_HDR_SIZE + SIZE_OF_MLOG_CHECKPOINT
3216 && scanned_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3217 && log_block[LOG_BLOCK_HDR_SIZE] == MLOG_CHECKPOINT
3218 && checkpoint_lsn == mach_read_from_8(LOG_BLOCK_HDR_SIZE
3219 + 1 + log_block)) {
3220 /* The redo log is logically empty. */
3221 ut_ad(recv_sys.mlog_checkpoint_lsn == 0
3222 || recv_sys.mlog_checkpoint_lsn
3223 == checkpoint_lsn);
3224 recv_sys.mlog_checkpoint_lsn = checkpoint_lsn;
3225 DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF,
3226 scanned_lsn));
3227 finished = true;
3228 break;
3229 }
3230
3231 if (scanned_lsn > recv_sys.scanned_lsn) {
3232 ut_ad(!srv_log_files_created);
3233 if (!recv_needed_recovery) {
3234 recv_needed_recovery = true;
3235
3236 if (srv_read_only_mode) {
3237 ib::warn() << "innodb_read_only"
3238 " prevents crash recovery";
3239 return(true);
3240 }
3241
3242 ib::info() << "Starting crash recovery from"
3243 " checkpoint LSN="
3244 << recv_sys.scanned_lsn;
3245 }
3246
3247 /* We were able to find more log data: add it to the
3248 parsing buffer if parse_start_lsn is already
3249 non-zero */
3250
3251 DBUG_EXECUTE_IF(
3252 "reduce_recv_parsing_buf",
3253 recv_parsing_buf_size
3254 = (70 * 1024);
3255 );
3256
3257 if (recv_sys.len + 4 * OS_FILE_LOG_BLOCK_SIZE
3258 >= recv_parsing_buf_size) {
3259 ib::error() << "Log parsing buffer overflow."
3260 " Recovery may have failed!";
3261
3262 recv_sys.found_corrupt_log = true;
3263
3264 if (!srv_force_recovery) {
3265 ib::error()
3266 << "Set innodb_force_recovery"
3267 " to ignore this error.";
3268 return(true);
3269 }
3270 } else if (!recv_sys.found_corrupt_log) {
3271 more_data = recv_sys_add_to_parsing_buf(
3272 log_block, scanned_lsn);
3273 }
3274
3275 recv_sys.scanned_lsn = scanned_lsn;
3276 recv_sys.scanned_checkpoint_no
3277 = log_block_get_checkpoint_no(log_block);
3278 }
3279
3280 /* During last phase of scanning, there can be redo logs
3281 left in recv_sys.buf to parse & store it in recv_sys.heap */
3282 if (last_phase
3283 && recv_sys.recovered_lsn < recv_sys.scanned_lsn) {
3284 more_data = true;
3285 }
3286
3287 if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3288 /* Log data for this group ends here */
3289 finished = true;
3290 break;
3291 } else {
3292 log_block += OS_FILE_LOG_BLOCK_SIZE;
3293 }
3294 } while (log_block < log_end);
3295
3296 *group_scanned_lsn = scanned_lsn;
3297
3298 mutex_enter(&recv_sys.mutex);
3299
3300 if (more_data && !recv_sys.found_corrupt_log) {
3301 /* Try to parse more log records */
3302
3303 if (recv_parse_log_recs(checkpoint_lsn,
3304 store_to_hash, available_mem,
3305 apply)) {
3306 ut_ad(recv_sys.found_corrupt_log
3307 || recv_sys.found_corrupt_fs
3308 || recv_sys.mlog_checkpoint_lsn
3309 == recv_sys.recovered_lsn);
3310 finished = true;
3311 goto func_exit;
3312 }
3313
3314 recv_sys_heap_check(store_to_hash, available_mem);
3315
3316 if (recv_sys.recovered_offset > recv_parsing_buf_size / 4) {
3317 /* Move parsing buffer data to the buffer start */
3318 recv_sys_justify_left_parsing_buf();
3319 }
3320
3321 /* Need to re-parse the redo log which're stored
3322 in recv_sys.buf */
3323 if (last_phase && *store_to_hash == STORE_NO) {
3324 finished = false;
3325 }
3326 }
3327
3328 func_exit:
3329 mutex_exit(&recv_sys.mutex);
3330 return(finished);
3331 }
3332
3333 /** Scans log from a buffer and stores new log data to the parsing buffer.
3334 Parses and hashes the log records if new data found.
3335 @param[in] checkpoint_lsn latest checkpoint log sequence number
3336 @param[in,out] contiguous_lsn log sequence number
3337 until which all redo log has been scanned
3338 @param[in] last_phase whether changes
3339 can be applied to the tablespaces
3340 @return whether rescan is needed (not everything was stored) */
3341 static
3342 bool
recv_group_scan_log_recs(lsn_t checkpoint_lsn,lsn_t * contiguous_lsn,bool last_phase)3343 recv_group_scan_log_recs(
3344 lsn_t checkpoint_lsn,
3345 lsn_t* contiguous_lsn,
3346 bool last_phase)
3347 {
3348 DBUG_ENTER("recv_group_scan_log_recs");
3349 DBUG_ASSERT(!last_phase || recv_sys.mlog_checkpoint_lsn > 0);
3350
3351 mutex_enter(&recv_sys.mutex);
3352 recv_sys.len = 0;
3353 recv_sys.recovered_offset = 0;
3354 recv_sys.n_addrs = 0;
3355 recv_sys.empty();
3356 srv_start_lsn = *contiguous_lsn;
3357 recv_sys.parse_start_lsn = *contiguous_lsn;
3358 recv_sys.scanned_lsn = *contiguous_lsn;
3359 recv_sys.recovered_lsn = *contiguous_lsn;
3360 recv_sys.scanned_checkpoint_no = 0;
3361 recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
3362 recv_previous_parsed_rec_offset = 0;
3363 recv_previous_parsed_rec_is_multi = 0;
3364 ut_ad(recv_max_page_lsn == 0);
3365 ut_ad(last_phase || !recv_writer_thread_active);
3366 mutex_exit(&recv_sys.mutex);
3367
3368 lsn_t start_lsn;
3369 lsn_t end_lsn;
3370 store_t store_to_hash = recv_sys.mlog_checkpoint_lsn == 0
3371 ? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
3372 ulint available_mem = (buf_pool_get_n_pages() * 2 / 3)
3373 << srv_page_size_shift;
3374
3375 log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
3376 ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3377
3378 do {
3379 if (last_phase && store_to_hash == STORE_NO) {
3380 store_to_hash = STORE_IF_EXISTS;
3381 /* We must not allow change buffer
3382 merge here, because it would generate
3383 redo log records before we have
3384 finished the redo log scan. */
3385 recv_apply_hashed_log_recs(false);
3386 /* Rescan the redo logs from last stored lsn */
3387 end_lsn = recv_sys.recovered_lsn;
3388 }
3389
3390 start_lsn = ut_uint64_align_down(end_lsn,
3391 OS_FILE_LOG_BLOCK_SIZE);
3392 end_lsn = start_lsn;
3393 log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
3394 } while (end_lsn != start_lsn
3395 && !recv_scan_log_recs(
3396 available_mem, &store_to_hash, log_sys.buf,
3397 checkpoint_lsn,
3398 start_lsn, end_lsn,
3399 contiguous_lsn, &log_sys.log.scanned_lsn));
3400
3401 if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) {
3402 DBUG_RETURN(false);
3403 }
3404
3405 DBUG_PRINT("ib_log", ("%s " LSN_PF " completed",
3406 last_phase ? "rescan" : "scan",
3407 log_sys.log.scanned_lsn));
3408
3409 DBUG_RETURN(store_to_hash == STORE_NO);
3410 }
3411
3412 /** Report a missing tablespace for which page-redo log exists.
3413 @param[in] err previous error code
3414 @param[in] i tablespace descriptor
3415 @return new error code */
3416 static
3417 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3418 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3419 {
3420 if (is_mariabackup_restore_or_export()) {
3421 if (i->second.name.find(TEMP_TABLE_PATH_PREFIX)
3422 != std::string::npos) {
3423 ib::warn() << "Tablespace " << i->first << " was not"
3424 " found at " << i->second.name << " when"
3425 " restoring a (partial?) backup. All redo log"
3426 " for this file will be ignored!";
3427 }
3428 return(err);
3429 }
3430
3431 if (srv_force_recovery == 0) {
3432 ib::error() << "Tablespace " << i->first << " was not"
3433 " found at " << i->second.name << ".";
3434
3435 if (err == DB_SUCCESS) {
3436 ib::error() << "Set innodb_force_recovery=1 to"
3437 " ignore this and to permanently lose"
3438 " all changes to the tablespace.";
3439 err = DB_TABLESPACE_NOT_FOUND;
3440 }
3441 } else {
3442 ib::warn() << "Tablespace " << i->first << " was not"
3443 " found at " << i->second.name << ", and"
3444 " innodb_force_recovery was set. All redo log"
3445 " for this tablespace will be ignored!";
3446 }
3447
3448 return(err);
3449 }
3450
3451 /** Report the missing tablespace and discard the redo logs for the deleted
3452 tablespace.
3453 @param[in] rescan rescan of redo logs is needed
3454 if hash table ran out of memory
3455 @param[out] missing_tablespace missing tablespace exists or not
3456 @return error code or DB_SUCCESS. */
3457 static MY_ATTRIBUTE((warn_unused_result))
3458 dberr_t
recv_validate_tablespace(bool rescan,bool & missing_tablespace)3459 recv_validate_tablespace(bool rescan, bool& missing_tablespace)
3460 {
3461 dberr_t err = DB_SUCCESS;
3462
3463 for (ulint h = 0; h < hash_get_n_cells(recv_sys.addr_hash); h++) {
3464 for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
3465 HASH_GET_FIRST(recv_sys.addr_hash, h));
3466 recv_addr != 0;
3467 recv_addr = static_cast<recv_addr_t*>(
3468 HASH_GET_NEXT(addr_hash, recv_addr))) {
3469
3470 const ulint space = recv_addr->space;
3471
3472 if (is_predefined_tablespace(space)) {
3473 continue;
3474 }
3475
3476 recv_spaces_t::iterator i = recv_spaces.find(space);
3477 ut_ad(i != recv_spaces.end());
3478
3479 switch (i->second.status) {
3480 case file_name_t::MISSING:
3481 err = recv_init_missing_space(err, i);
3482 i->second.status = file_name_t::DELETED;
3483 /* fall through */
3484 case file_name_t::DELETED:
3485 recv_addr->state = RECV_DISCARDED;
3486 /* fall through */
3487 case file_name_t::NORMAL:
3488 continue;
3489 }
3490 ut_ad(0);
3491 }
3492 }
3493
3494 if (err != DB_SUCCESS) {
3495 return(err);
3496 }
3497
3498 /* When rescan is not needed, recv_sys.addr_hash will contain the
3499 entire redo log. If rescan is needed or innodb_force_recovery
3500 is set, we can ignore missing tablespaces. */
3501 for (const recv_spaces_t::value_type& rs : recv_spaces) {
3502 if (UNIV_LIKELY(rs.second.status != file_name_t::MISSING)) {
3503 continue;
3504 }
3505
3506 missing_tablespace = true;
3507
3508 if (srv_force_recovery > 0) {
3509 ib::warn() << "Tablespace " << rs.first
3510 <<" was not found at " << rs.second.name
3511 <<", and innodb_force_recovery was set."
3512 <<" All redo log for this tablespace"
3513 <<" will be ignored!";
3514 continue;
3515 }
3516
3517 if (!rescan) {
3518 ib::info() << "Tablespace " << rs.first
3519 << " was not found at '"
3520 << rs.second.name << "', but there"
3521 <<" were no modifications either.";
3522 }
3523 }
3524
3525 if (!rescan || srv_force_recovery > 0) {
3526 missing_tablespace = false;
3527 }
3528
3529 return DB_SUCCESS;
3530 }
3531
3532 /** Check if all tablespaces were found for crash recovery.
3533 @param[in] rescan rescan of redo logs is needed
3534 @param[out] missing_tablespace missing table exists
3535 @return error code or DB_SUCCESS */
3536 static MY_ATTRIBUTE((warn_unused_result))
3537 dberr_t
recv_init_crash_recovery_spaces(bool rescan,bool & missing_tablespace)3538 recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
3539 {
3540 bool flag_deleted = false;
3541
3542 ut_ad(!srv_read_only_mode);
3543 ut_ad(recv_needed_recovery);
3544
3545 for (recv_spaces_t::value_type& rs : recv_spaces) {
3546 ut_ad(!is_predefined_tablespace(rs.first));
3547 ut_ad(rs.second.status != file_name_t::DELETED
3548 || !rs.second.space);
3549
3550 if (rs.second.status == file_name_t::DELETED) {
3551 /* The tablespace was deleted,
3552 so we can ignore any redo log for it. */
3553 flag_deleted = true;
3554 } else if (rs.second.space != NULL) {
3555 /* The tablespace was found, and there
3556 are some redo log records for it. */
3557 fil_names_dirty(rs.second.space);
3558 rs.second.space->enable_lsn = rs.second.enable_lsn;
3559 } else if (rs.second.name == "") {
3560 ib::error() << "Missing MLOG_FILE_NAME"
3561 " or MLOG_FILE_DELETE"
3562 " before MLOG_CHECKPOINT for tablespace "
3563 << rs.first;
3564 recv_sys.found_corrupt_log = true;
3565 return(DB_CORRUPTION);
3566 } else {
3567 rs.second.status = file_name_t::MISSING;
3568 flag_deleted = true;
3569 }
3570
3571 ut_ad(rs.second.status == file_name_t::DELETED
3572 || rs.second.name != "");
3573 }
3574
3575 if (flag_deleted) {
3576 return recv_validate_tablespace(rescan, missing_tablespace);
3577 }
3578
3579 return DB_SUCCESS;
3580 }
3581
3582 /** Start recovering from a redo log checkpoint.
3583 @see recv_recovery_from_checkpoint_finish
3584 @param[in] flush_lsn FIL_PAGE_FILE_FLUSH_LSN
3585 of first system tablespace page
3586 @return error code or DB_SUCCESS */
3587 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)3588 recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
3589 {
3590 ulint max_cp_field;
3591 lsn_t checkpoint_lsn;
3592 bool rescan;
3593 ib_uint64_t checkpoint_no;
3594 lsn_t contiguous_lsn;
3595 byte* buf;
3596 dberr_t err = DB_SUCCESS;
3597
3598 ut_ad(srv_operation == SRV_OPERATION_NORMAL
3599 || is_mariabackup_restore_or_export());
3600
3601 /* Initialize red-black tree for fast insertions into the
3602 flush_list during recovery process. */
3603 buf_flush_init_flush_rbt();
3604
3605 if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3606
3607 ib::info() << "innodb_force_recovery=6 skips redo log apply";
3608
3609 return(DB_SUCCESS);
3610 }
3611
3612 recv_recovery_on = true;
3613
3614 log_mutex_enter();
3615
3616 err = recv_find_max_checkpoint(&max_cp_field);
3617
3618 if (err != DB_SUCCESS) {
3619
3620 srv_start_lsn = recv_sys.recovered_lsn = log_sys.lsn;
3621 log_mutex_exit();
3622 return(err);
3623 }
3624
3625 log_header_read(max_cp_field);
3626
3627 buf = log_sys.checkpoint_buf;
3628
3629 checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
3630 checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
3631
3632 /* Start reading the log from the checkpoint lsn. The variable
3633 contiguous_lsn contains an lsn up to which the log is known to
3634 be contiguously written. */
3635 recv_sys.mlog_checkpoint_lsn = 0;
3636
3637 ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
3638
3639 const lsn_t end_lsn = mach_read_from_8(
3640 buf + LOG_CHECKPOINT_END_LSN);
3641
3642 ut_ad(recv_sys.n_addrs == 0);
3643 contiguous_lsn = checkpoint_lsn;
3644 switch (log_sys.log.format) {
3645 case 0:
3646 log_mutex_exit();
3647 return recv_log_format_0_recover(checkpoint_lsn,
3648 buf[20 + 32 * 9] == 2);
3649 default:
3650 if (end_lsn == 0) {
3651 break;
3652 }
3653 if (end_lsn >= checkpoint_lsn) {
3654 contiguous_lsn = end_lsn;
3655 break;
3656 }
3657 recv_sys.found_corrupt_log = true;
3658 log_mutex_exit();
3659 return(DB_ERROR);
3660 }
3661
3662 /* Look for MLOG_CHECKPOINT. */
3663 recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
3664 /* The first scan should not have stored or applied any records. */
3665 ut_ad(recv_sys.n_addrs == 0);
3666 ut_ad(!recv_sys.found_corrupt_fs);
3667
3668 if (srv_read_only_mode && recv_needed_recovery) {
3669 log_mutex_exit();
3670 return(DB_READ_ONLY);
3671 }
3672
3673 if (recv_sys.found_corrupt_log && !srv_force_recovery) {
3674 log_mutex_exit();
3675 ib::warn() << "Log scan aborted at LSN " << contiguous_lsn;
3676 return(DB_ERROR);
3677 }
3678
3679 if (recv_sys.mlog_checkpoint_lsn == 0) {
3680 lsn_t scan_lsn = log_sys.log.scanned_lsn;
3681 if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) {
3682 log_mutex_exit();
3683 ib::error err;
3684 err << "Missing MLOG_CHECKPOINT";
3685 if (end_lsn) {
3686 err << " at " << end_lsn;
3687 }
3688 err << " between the checkpoint " << checkpoint_lsn
3689 << " and the end " << scan_lsn << ".";
3690 return(DB_ERROR);
3691 }
3692
3693 log_sys.log.scanned_lsn = checkpoint_lsn;
3694 rescan = false;
3695 } else {
3696 contiguous_lsn = checkpoint_lsn;
3697 rescan = recv_group_scan_log_recs(
3698 checkpoint_lsn, &contiguous_lsn, false);
3699
3700 if ((recv_sys.found_corrupt_log && !srv_force_recovery)
3701 || recv_sys.found_corrupt_fs) {
3702 log_mutex_exit();
3703 return(DB_ERROR);
3704 }
3705 }
3706
3707 /* NOTE: we always do a 'recovery' at startup, but only if
3708 there is something wrong we will print a message to the
3709 user about recovery: */
3710
3711 if (flush_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3712 && recv_sys.mlog_checkpoint_lsn == checkpoint_lsn) {
3713 /* The redo log is logically empty. */
3714 } else if (checkpoint_lsn != flush_lsn) {
3715 ut_ad(!srv_log_files_created);
3716
3717 if (checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT < flush_lsn) {
3718 ib::warn() << "Are you sure you are using the"
3719 " right ib_logfiles to start up the database?"
3720 " Log sequence number in the ib_logfiles is "
3721 << checkpoint_lsn << ", less than the"
3722 " log sequence number in the first system"
3723 " tablespace file header, " << flush_lsn << ".";
3724 }
3725
3726 if (!recv_needed_recovery) {
3727
3728 ib::info() << "The log sequence number " << flush_lsn
3729 << " in the system tablespace does not match"
3730 " the log sequence number " << checkpoint_lsn
3731 << " in the ib_logfiles!";
3732
3733 if (srv_read_only_mode) {
3734 ib::error() << "innodb_read_only"
3735 " prevents crash recovery";
3736 log_mutex_exit();
3737 return(DB_READ_ONLY);
3738 }
3739
3740 recv_needed_recovery = true;
3741 }
3742 }
3743
3744 log_sys.lsn = recv_sys.recovered_lsn;
3745
3746 if (recv_needed_recovery) {
3747 bool missing_tablespace = false;
3748
3749 err = recv_init_crash_recovery_spaces(
3750 rescan, missing_tablespace);
3751
3752 if (err != DB_SUCCESS) {
3753 log_mutex_exit();
3754 return(err);
3755 }
3756
3757 /* If there is any missing tablespace and rescan is needed
3758 then there is a possiblity that hash table will not contain
3759 all space ids redo logs. Rescan the remaining unstored
3760 redo logs for the validation of missing tablespace. */
3761 ut_ad(rescan || !missing_tablespace);
3762
3763 while (missing_tablespace) {
3764 DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
3765 "the missing tablespace. Scan "
3766 "from last stored LSN " LSN_PF,
3767 recv_sys.last_stored_lsn));
3768
3769 lsn_t recent_stored_lsn = recv_sys.last_stored_lsn;
3770 rescan = recv_group_scan_log_recs(
3771 checkpoint_lsn, &recent_stored_lsn, false);
3772
3773 ut_ad(!recv_sys.found_corrupt_fs);
3774
3775 missing_tablespace = false;
3776
3777 err = recv_sys.found_corrupt_log
3778 ? DB_ERROR
3779 : recv_validate_tablespace(
3780 rescan, missing_tablespace);
3781
3782 if (err != DB_SUCCESS) {
3783 log_mutex_exit();
3784 return err;
3785 }
3786
3787 rescan = true;
3788 }
3789
3790 recv_sys.parse_start_lsn = checkpoint_lsn;
3791
3792 if (srv_operation == SRV_OPERATION_NORMAL) {
3793 buf_dblwr_process();
3794 }
3795
3796 ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
3797
3798 /* Spawn the background thread to flush dirty pages
3799 from the buffer pools. */
3800 recv_writer_thread_active = true;
3801 os_thread_create(recv_writer_thread, 0, 0);
3802
3803 if (rescan) {
3804 contiguous_lsn = checkpoint_lsn;
3805
3806 recv_group_scan_log_recs(
3807 checkpoint_lsn, &contiguous_lsn, true);
3808
3809 if ((recv_sys.found_corrupt_log
3810 && !srv_force_recovery)
3811 || recv_sys.found_corrupt_fs) {
3812 log_mutex_exit();
3813 return(DB_ERROR);
3814 }
3815 }
3816 } else {
3817 ut_ad(!rescan || recv_sys.n_addrs == 0);
3818 }
3819
3820 if (log_sys.log.scanned_lsn < checkpoint_lsn
3821 || log_sys.log.scanned_lsn < recv_max_page_lsn) {
3822
3823 ib::error() << "We scanned the log up to "
3824 << log_sys.log.scanned_lsn
3825 << ". A checkpoint was at " << checkpoint_lsn << " and"
3826 " the maximum LSN on a database page was "
3827 << recv_max_page_lsn << ". It is possible that the"
3828 " database is now corrupt!";
3829 }
3830
3831 if (recv_sys.recovered_lsn < checkpoint_lsn) {
3832 log_mutex_exit();
3833
3834 ib::error() << "Recovered only to lsn:"
3835 << recv_sys.recovered_lsn << " checkpoint_lsn: " << checkpoint_lsn;
3836
3837 return(DB_ERROR);
3838 }
3839
3840 log_sys.next_checkpoint_lsn = checkpoint_lsn;
3841 log_sys.next_checkpoint_no = checkpoint_no + 1;
3842
3843 recv_synchronize_groups();
3844
3845 if (!recv_needed_recovery) {
3846 ut_a(checkpoint_lsn == recv_sys.recovered_lsn);
3847 } else {
3848 srv_start_lsn = recv_sys.recovered_lsn;
3849 }
3850
3851 log_sys.buf_free = ulong(log_sys.lsn % OS_FILE_LOG_BLOCK_SIZE);
3852 log_sys.buf_next_to_write = log_sys.buf_free;
3853 log_sys.write_lsn = log_sys.lsn;
3854
3855 log_sys.last_checkpoint_lsn = checkpoint_lsn;
3856
3857 if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL) {
3858 /* Write a MLOG_CHECKPOINT marker as the first thing,
3859 before generating any other redo log. This ensures
3860 that subsequent crash recovery will be possible even
3861 if the server were killed soon after this. */
3862 fil_names_clear(log_sys.last_checkpoint_lsn, true);
3863 }
3864
3865 MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
3866 log_sys.lsn - log_sys.last_checkpoint_lsn);
3867
3868 log_sys.next_checkpoint_no = ++checkpoint_no;
3869
3870 mutex_enter(&recv_sys.mutex);
3871
3872 recv_sys.apply_log_recs = true;
3873 recv_no_ibuf_operations = is_mariabackup_restore_or_export();
3874 ut_d(recv_no_log_write = recv_no_ibuf_operations);
3875
3876 mutex_exit(&recv_sys.mutex);
3877
3878 log_mutex_exit();
3879
3880 recv_lsn_checks_on = true;
3881
3882 /* The database is now ready to start almost normal processing of user
3883 transactions: transaction rollbacks and the application of the log
3884 records in the hash table can be run in background. */
3885
3886 return(DB_SUCCESS);
3887 }
3888
3889 /** Complete recovery from a checkpoint. */
3890 void
recv_recovery_from_checkpoint_finish(void)3891 recv_recovery_from_checkpoint_finish(void)
3892 {
3893 /* Make sure that the recv_writer thread is done. This is
3894 required because it grabs various mutexes and we want to
3895 ensure that when we enable sync_order_checks there is no
3896 mutex currently held by any thread. */
3897 mutex_enter(&recv_sys.writer_mutex);
3898
3899 /* Free the resources of the recovery system */
3900 recv_recovery_on = false;
3901
3902 /* By acquring the mutex we ensure that the recv_writer thread
3903 won't trigger any more LRU batches. Now wait for currently
3904 in progress batches to finish. */
3905 buf_flush_wait_LRU_batch_end();
3906
3907 mutex_exit(&recv_sys.writer_mutex);
3908
3909 ulint count = 0;
3910 while (recv_writer_thread_active) {
3911 ++count;
3912 os_thread_sleep(100000);
3913 if (srv_print_verbose_log && count > 600) {
3914 ib::info() << "Waiting for recv_writer to"
3915 " finish flushing of buffer pool";
3916 count = 0;
3917 }
3918 }
3919
3920 recv_sys.debug_free();
3921
3922 /* Free up the flush_rbt. */
3923 buf_flush_free_flush_rbt();
3924 }
3925
3926 /********************************************************//**
3927 Initiates the rollback of active transactions. */
3928 void
recv_recovery_rollback_active(void)3929 recv_recovery_rollback_active(void)
3930 /*===============================*/
3931 {
3932 ut_ad(!recv_writer_thread_active);
3933
3934 /* Switch latching order checks on in sync0debug.cc, if
3935 --innodb-sync-debug=true (default) */
3936 ut_d(sync_check_enable());
3937
3938 /* We can't start any (DDL) transactions if UNDO logging
3939 has been disabled, additionally disable ROLLBACK of recovered
3940 user transactions. */
3941 if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO
3942 && !srv_read_only_mode) {
3943
3944 /* Drop partially created indexes. */
3945 row_merge_drop_temp_indexes();
3946 /* Drop garbage tables. */
3947 row_mysql_drop_garbage_tables();
3948
3949 /* Drop any auxiliary tables that were not dropped when the
3950 parent table was dropped. This can happen if the parent table
3951 was dropped but the server crashed before the auxiliary tables
3952 were dropped. */
3953 fts_drop_orphaned_tables();
3954
3955 /* Rollback the uncommitted transactions which have no user
3956 session */
3957
3958 trx_rollback_is_active = true;
3959 os_thread_create(trx_rollback_all_recovered, 0, 0);
3960 }
3961 }
3962
validate_page(const page_id_t page_id,const byte * page,const fil_space_t * space,byte * tmp_buf)3963 bool recv_dblwr_t::validate_page(const page_id_t page_id,
3964 const byte *page,
3965 const fil_space_t *space,
3966 byte *tmp_buf)
3967 {
3968 if (page_id.page_no() == 0)
3969 {
3970 ulint flags= fsp_header_get_flags(page);
3971 if (!fil_space_t::is_valid_flags(flags, page_id.space()))
3972 {
3973 ulint cflags= fsp_flags_convert_from_101(flags);
3974 if (cflags == ULINT_UNDEFINED)
3975 {
3976 ib::warn() << "Ignoring a doublewrite copy of page " << page_id
3977 << "due to invalid flags " << ib::hex(flags);
3978 return false;
3979 }
3980
3981 flags= cflags;
3982 }
3983
3984 /* Page 0 is never page_compressed or encrypted. */
3985 return !buf_page_is_corrupted(true, page, flags);
3986 }
3987
3988 ut_ad(tmp_buf);
3989 byte *tmp_frame= tmp_buf;
3990 byte *tmp_page= tmp_buf + srv_page_size;
3991 const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
3992 const bool expect_encrypted= space->crypt_data &&
3993 space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
3994
3995 if (space->full_crc32())
3996 return !buf_page_is_corrupted(true, page, space->flags);
3997
3998 if (expect_encrypted &&
3999 mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
4000 {
4001 if (!fil_space_verify_crypt_checksum(page, space->zip_size()))
4002 return false;
4003 if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
4004 return true;
4005 if (space->zip_size())
4006 return false;
4007 memcpy(tmp_page, page, space->physical_size());
4008 if (!fil_space_decrypt(space, tmp_frame, tmp_page))
4009 return false;
4010 }
4011
4012 switch (page_type) {
4013 case FIL_PAGE_PAGE_COMPRESSED:
4014 memcpy(tmp_page, page, space->physical_size());
4015 /* fall through */
4016 case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
4017 if (space->zip_size())
4018 return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
4019 ulint decomp= fil_page_decompress(tmp_frame, tmp_page, space->flags);
4020 if (!decomp)
4021 return false; /* decompression failed */
4022 if (decomp == srv_page_size)
4023 return false; /* the page was not compressed (invalid page type) */
4024 return !buf_page_is_corrupted(true, tmp_page, space->flags);
4025 }
4026
4027 return !buf_page_is_corrupted(true, page, space->flags);
4028 }
4029
find_page(const page_id_t page_id,const fil_space_t * space,byte * tmp_buf)4030 byte *recv_dblwr_t::find_page(const page_id_t page_id,
4031 const fil_space_t *space, byte *tmp_buf)
4032 {
4033 byte *result= NULL;
4034 lsn_t max_lsn= 0;
4035
4036 for (byte *page : pages)
4037 {
4038 if (page_get_page_no(page) != page_id.page_no() ||
4039 page_get_space_id(page) != page_id.space())
4040 continue;
4041 const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
4042 if (lsn <= max_lsn ||
4043 !validate_page(page_id, page, space, tmp_buf))
4044 {
4045 /* Mark processed for subsequent iterations in buf_dblwr_process() */
4046 memset(page + FIL_PAGE_LSN, 0, 8);
4047 continue;
4048 }
4049 max_lsn= lsn;
4050 result= page;
4051 }
4052
4053 return result;
4054 }
4055
4056 #ifndef DBUG_OFF
4057 /** Return string name of the redo log record type.
4058 @param[in] type record log record enum
4059 @return string name of record log record */
get_mlog_string(mlog_id_t type)4060 static const char* get_mlog_string(mlog_id_t type)
4061 {
4062 switch (type) {
4063 case MLOG_SINGLE_REC_FLAG:
4064 return("MLOG_SINGLE_REC_FLAG");
4065
4066 case MLOG_1BYTE:
4067 return("MLOG_1BYTE");
4068
4069 case MLOG_2BYTES:
4070 return("MLOG_2BYTES");
4071
4072 case MLOG_4BYTES:
4073 return("MLOG_4BYTES");
4074
4075 case MLOG_8BYTES:
4076 return("MLOG_8BYTES");
4077
4078 case MLOG_REC_INSERT:
4079 return("MLOG_REC_INSERT");
4080
4081 case MLOG_REC_CLUST_DELETE_MARK:
4082 return("MLOG_REC_CLUST_DELETE_MARK");
4083
4084 case MLOG_REC_SEC_DELETE_MARK:
4085 return("MLOG_REC_SEC_DELETE_MARK");
4086
4087 case MLOG_REC_UPDATE_IN_PLACE:
4088 return("MLOG_REC_UPDATE_IN_PLACE");
4089
4090 case MLOG_REC_DELETE:
4091 return("MLOG_REC_DELETE");
4092
4093 case MLOG_LIST_END_DELETE:
4094 return("MLOG_LIST_END_DELETE");
4095
4096 case MLOG_LIST_START_DELETE:
4097 return("MLOG_LIST_START_DELETE");
4098
4099 case MLOG_LIST_END_COPY_CREATED:
4100 return("MLOG_LIST_END_COPY_CREATED");
4101
4102 case MLOG_PAGE_REORGANIZE:
4103 return("MLOG_PAGE_REORGANIZE");
4104
4105 case MLOG_PAGE_CREATE:
4106 return("MLOG_PAGE_CREATE");
4107
4108 case MLOG_UNDO_INSERT:
4109 return("MLOG_UNDO_INSERT");
4110
4111 case MLOG_UNDO_ERASE_END:
4112 return("MLOG_UNDO_ERASE_END");
4113
4114 case MLOG_UNDO_INIT:
4115 return("MLOG_UNDO_INIT");
4116
4117 case MLOG_UNDO_HDR_REUSE:
4118 return("MLOG_UNDO_HDR_REUSE");
4119
4120 case MLOG_UNDO_HDR_CREATE:
4121 return("MLOG_UNDO_HDR_CREATE");
4122
4123 case MLOG_REC_MIN_MARK:
4124 return("MLOG_REC_MIN_MARK");
4125
4126 case MLOG_IBUF_BITMAP_INIT:
4127 return("MLOG_IBUF_BITMAP_INIT");
4128
4129 #ifdef UNIV_LOG_LSN_DEBUG
4130 case MLOG_LSN:
4131 return("MLOG_LSN");
4132 #endif /* UNIV_LOG_LSN_DEBUG */
4133
4134 case MLOG_WRITE_STRING:
4135 return("MLOG_WRITE_STRING");
4136
4137 case MLOG_MULTI_REC_END:
4138 return("MLOG_MULTI_REC_END");
4139
4140 case MLOG_DUMMY_RECORD:
4141 return("MLOG_DUMMY_RECORD");
4142
4143 case MLOG_FILE_DELETE:
4144 return("MLOG_FILE_DELETE");
4145
4146 case MLOG_COMP_REC_MIN_MARK:
4147 return("MLOG_COMP_REC_MIN_MARK");
4148
4149 case MLOG_COMP_PAGE_CREATE:
4150 return("MLOG_COMP_PAGE_CREATE");
4151
4152 case MLOG_COMP_REC_INSERT:
4153 return("MLOG_COMP_REC_INSERT");
4154
4155 case MLOG_COMP_REC_CLUST_DELETE_MARK:
4156 return("MLOG_COMP_REC_CLUST_DELETE_MARK");
4157
4158 case MLOG_COMP_REC_UPDATE_IN_PLACE:
4159 return("MLOG_COMP_REC_UPDATE_IN_PLACE");
4160
4161 case MLOG_COMP_REC_DELETE:
4162 return("MLOG_COMP_REC_DELETE");
4163
4164 case MLOG_COMP_LIST_END_DELETE:
4165 return("MLOG_COMP_LIST_END_DELETE");
4166
4167 case MLOG_COMP_LIST_START_DELETE:
4168 return("MLOG_COMP_LIST_START_DELETE");
4169
4170 case MLOG_COMP_LIST_END_COPY_CREATED:
4171 return("MLOG_COMP_LIST_END_COPY_CREATED");
4172
4173 case MLOG_COMP_PAGE_REORGANIZE:
4174 return("MLOG_COMP_PAGE_REORGANIZE");
4175
4176 case MLOG_FILE_CREATE2:
4177 return("MLOG_FILE_CREATE2");
4178
4179 case MLOG_ZIP_WRITE_NODE_PTR:
4180 return("MLOG_ZIP_WRITE_NODE_PTR");
4181
4182 case MLOG_ZIP_WRITE_BLOB_PTR:
4183 return("MLOG_ZIP_WRITE_BLOB_PTR");
4184
4185 case MLOG_ZIP_WRITE_HEADER:
4186 return("MLOG_ZIP_WRITE_HEADER");
4187
4188 case MLOG_ZIP_PAGE_COMPRESS:
4189 return("MLOG_ZIP_PAGE_COMPRESS");
4190
4191 case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
4192 return("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
4193
4194 case MLOG_ZIP_PAGE_REORGANIZE:
4195 return("MLOG_ZIP_PAGE_REORGANIZE");
4196
4197 case MLOG_ZIP_WRITE_TRX_ID:
4198 return("MLOG_ZIP_WRITE_TRX_ID");
4199
4200 case MLOG_FILE_RENAME2:
4201 return("MLOG_FILE_RENAME2");
4202
4203 case MLOG_FILE_NAME:
4204 return("MLOG_FILE_NAME");
4205
4206 case MLOG_CHECKPOINT:
4207 return("MLOG_CHECKPOINT");
4208
4209 case MLOG_PAGE_CREATE_RTREE:
4210 return("MLOG_PAGE_CREATE_RTREE");
4211
4212 case MLOG_COMP_PAGE_CREATE_RTREE:
4213 return("MLOG_COMP_PAGE_CREATE_RTREE");
4214
4215 case MLOG_INIT_FILE_PAGE2:
4216 return("MLOG_INIT_FILE_PAGE2");
4217
4218 case MLOG_INDEX_LOAD:
4219 return("MLOG_INDEX_LOAD");
4220
4221 case MLOG_TRUNCATE:
4222 return("MLOG_TRUNCATE");
4223
4224 case MLOG_MEMSET:
4225 return("MLOG_MEMSET");
4226
4227 case MLOG_INIT_FREE_PAGE:
4228 return("MLOG_INIT_FREE_PAGE");
4229
4230 case MLOG_FILE_WRITE_CRYPT_DATA:
4231 return("MLOG_FILE_WRITE_CRYPT_DATA");
4232 }
4233 DBUG_ASSERT(0);
4234 return(NULL);
4235 }
4236 #endif /* !DBUG_OFF */
4237