1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2020, Oracle and/or its affiliates.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License, version 2.0, as published by the
8 Free Software Foundation.
9 
10 This program is also distributed with certain software (including but not
11 limited to OpenSSL) that is licensed under separate terms, as designated in a
12 particular file or component or in included license documentation. The authors
13 of MySQL hereby grant you an additional permission to link the program and
14 your derivative works with the separately licensed software that they have
15 included with MySQL.
16 
17 This program is distributed in the hope that it will be useful, but WITHOUT
18 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20 for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
25 
26 *****************************************************************************/
27 
28 /** @file log/log0recv.cc
29  Recovery
30 
31  Created 9/20/1997 Heikki Tuuri
32  *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include <my_aes.h>
37 #include <sys/types.h>
38 
39 #include <array>
40 #include <iomanip>
41 #include <map>
42 #include <new>
43 #include <string>
44 #include <vector>
45 
46 #include "arch0arch.h"
47 #include "log0recv.h"
48 
49 #include "btr0btr.h"
50 #include "btr0cur.h"
51 #include "buf0buf.h"
52 #include "buf0flu.h"
53 #include "clone0api.h"
54 #include "dict0dd.h"
55 #include "fil0fil.h"
56 #include "ha_prototypes.h"
57 #include "ibuf0ibuf.h"
58 #include "log0log.h"
59 #include "mem0mem.h"
60 #include "mtr0log.h"
61 #include "mtr0mtr.h"
62 #include "os0thread-create.h"
63 #include "page0cur.h"
64 #include "page0zip.h"
65 #include "trx0rec.h"
66 #include "trx0undo.h"
67 #include "ut0new.h"
68 #include "xb0xb.h"
69 
70 #include "my_dbug.h"
71 
72 #ifndef UNIV_HOTBACKUP
73 #include "buf0rea.h"
74 #include "row0merge.h"
75 #include "srv0srv.h"
76 #include "srv0start.h"
77 #include "trx0purge.h"
78 #else /* !UNIV_HOTBACKUP */
79 #include "../meb/mutex.h"
80 #endif /* !UNIV_HOTBACKUP */
81 
82 std::list<space_id_t> recv_encr_ts_list;
83 
84 /** Log records are stored in the hash table in chunks at most of this size;
85 this must be less than UNIV_PAGE_SIZE as it is stored in the buffer pool */
86 #define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t))
87 
88 /** Read-ahead area in applying log records to file pages */
89 static const size_t RECV_READ_AHEAD_AREA = 32;
90 
91 /** The recovery system */
92 recv_sys_t *recv_sys = nullptr;
93 
94 /** true when applying redo log records during crash recovery; false
95 otherwise.  Note that this is false while a background thread is
96 rolling back incomplete transactions. */
97 volatile bool recv_recovery_on;
98 volatile lsn_t backup_redo_log_flushed_lsn;
99 
100 #ifdef UNIV_HOTBACKUP
101 std::list<std::pair<space_id_t, lsn_t>> index_load_list;
102 
103 extern bool meb_is_space_loaded(const space_id_t space_id);
104 
105 /* Re-define mutex macros to use the Mutex class defined by the MEB
106 source. MEB calls the routines in "fil0fil.cc" in parallel and,
107 therefore, the mutex protecting the critical sections of the tablespace
108 memory cache must be included also in the MEB compilation of this
109 module. (For other modules the mutex macros are defined as no ops in the
110 MEB compilation in "meb/src/include/bh_univ.i".) */
111 
112 #undef mutex_enter
113 #undef mutex_exit
114 #undef mutex_own
115 #undef mutex_validate
116 
117 #define mutex_enter(M) recv_mutex.lock()
118 #define mutex_exit(M) recv_mutex.unlock()
119 #define mutex_own(M) 1
120 #define mutex_validate(M) 1
121 
122 /* Re-define the mutex macros for the mutex protecting the critical
123 sections of the log subsystem using an object of the meb::Mutex class. */
124 
125 meb::Mutex recv_mutex;
126 extern meb::Mutex log_mutex;
127 meb::Mutex apply_log_mutex;
128 
129 #undef log_mutex_enter
130 #undef log_mutex_exit
131 #define log_mutex_enter() log_mutex.lock()
132 #define log_mutex_exit() log_mutex.unlock()
133 
134 /** Print important values from a page header.
135 @param[in]	page	page */
meb_print_page_header(const page_t * page)136 void meb_print_page_header(const page_t *page) {
137   ib::trace_1() << "space " << mach_read_from_4(page + FIL_PAGE_SPACE_ID)
138                 << " nr " << mach_read_from_4(page + FIL_PAGE_OFFSET) << " lsn "
139                 << mach_read_from_8(page + FIL_PAGE_LSN) << " type "
140                 << mach_read_from_2(page + FIL_PAGE_TYPE);
141 }
142 #endif /* UNIV_HOTBACKUP */
143 
144 #ifndef UNIV_HOTBACKUP
145 PSI_memory_key mem_log_recv_page_hash_key;
146 PSI_memory_key mem_log_recv_space_hash_key;
147 #endif /* !UNIV_HOTBACKUP */
148 
149 /** true when recv_init_crash_recovery() has been called. */
150 bool recv_needed_recovery;
151 
152 /** true if buf_page_is_corrupted() should check if the log sequence
153 number (FIL_PAGE_LSN) is in the future.  Initially false, and set by
154 recv_recovery_from_checkpoint_start(). */
155 bool recv_lsn_checks_on;
156 
157 /** If the following is true, the buffer pool file pages must be invalidated
158 after recovery and no ibuf operations are allowed; this becomes true if
159 the log record hash table becomes too full, and log records must be merged
160 to file pages already before the recovery is finished: in this case no
161 ibuf operations are allowed, as they could modify the pages read in the
162 buffer pool before the pages have been recovered to the up-to-date state.
163 
164 true means that recovery is running and no operations on the log files
165 are allowed yet: the variable name is misleading. */
166 bool recv_no_ibuf_operations;
167 
168 /** true When the redo log is being backed up */
169 bool recv_is_making_a_backup = false;
170 
171 /** true when recovering from a backed up redo log file */
172 bool recv_is_from_backup = false;
173 
174 #define buf_pool_get_curr_size() (5 * 1024 * 1024)
175 
176 /** The following counter is used to decide when to print info on
177 log scan */
178 static ulint recv_scan_print_counter;
179 
180 /** The type of the previous parsed redo log record */
181 static mlog_id_t recv_previous_parsed_rec_type;
182 
183 /** The offset of the previous parsed redo log record */
184 static ulint recv_previous_parsed_rec_offset;
185 
186 /** The 'multi' flag of the previous parsed redo log record */
187 static ulint recv_previous_parsed_rec_is_multi;
188 
189 /** This many frames must be left free in the buffer pool when we scan
190 the log and store the scanned log records in the buffer pool: we will
191 use these free frames to read in pages when we start applying the
192 log records to the database.
193 This is the default value. If the actual size of the buffer pool is
194 larger than 10 MB we'll set this value to 512. */
195 ulint recv_n_pool_free_frames;
196 
197 /** The maximum lsn we see for a page during the recovery process. If this
198 is bigger than the lsn we are able to scan up to, that is an indication that
199 the recovery failed and the database may be corrupt. */
200 static lsn_t recv_max_page_lsn;
201 
202 #ifndef UNIV_HOTBACKUP
203 #ifdef UNIV_PFS_THREAD
204 mysql_pfs_key_t recv_writer_thread_key;
205 #endif /* UNIV_PFS_THREAD */
206 
recv_writer_is_active()207 static bool recv_writer_is_active() {
208   return (srv_thread_is_active(srv_threads.m_recv_writer));
209 }
210 
211 #endif /* !UNIV_HOTBACKUP */
212 
213 /* prototypes */
214 
215 #ifndef UNIV_HOTBACKUP
216 
217 /** Initialize crash recovery environment. Can be called iff
218 recv_needed_recovery == false. */
219 static void recv_init_crash_recovery();
220 #endif /* !UNIV_HOTBACKUP */
221 
222 /** Calculates the new value for lsn when more data is added to the log.
223 @param[in]	lsn		Old LSN
224 @param[in]	len		This many bytes of data is added, log block
225                                 headers not included
226 @return LSN after data addition */
recv_calc_lsn_on_data_add(lsn_t lsn,uint64_t len)227 lsn_t recv_calc_lsn_on_data_add(lsn_t lsn, uint64_t len) {
228   ulint frag_len;
229   uint64_t lsn_len;
230 
231   frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
232 
233   ut_ad(frag_len <
234         OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE - LOG_BLOCK_TRL_SIZE);
235 
236   lsn_len = len;
237 
238   lsn_len +=
239       (lsn_len + frag_len) /
240       (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE - LOG_BLOCK_TRL_SIZE) *
241       (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
242 
243   return (lsn + lsn_len);
244 }
245 
246 /** Destructor */
~MetadataRecover()247 MetadataRecover::~MetadataRecover() {
248   for (auto &table : m_tables) {
249     UT_DELETE(table.second);
250   }
251 }
252 
253 /** Get the dynamic metadata of a specified table, create a new one
254 if not exist
255 @param[in]	id	table id
256 @return the metadata of the specified table */
getMetadata(table_id_t id)257 PersistentTableMetadata *MetadataRecover::getMetadata(table_id_t id) {
258   PersistentTableMetadata *metadata = nullptr;
259   PersistentTables::iterator iter = m_tables.find(id);
260 
261   if (iter == m_tables.end()) {
262     metadata = UT_NEW_NOKEY(PersistentTableMetadata(id, 0));
263 
264     m_tables.insert(std::make_pair(id, metadata));
265   } else {
266     metadata = iter->second;
267     ut_ad(metadata->get_table_id() == id);
268   }
269 
270   ut_ad(metadata != nullptr);
271   return (metadata);
272 }
273 
274 /** Parse a dynamic metadata redo log of a table and store
275 the metadata locally
276 @param[in]	id	table id
277 @param[in]	version	table dynamic metadata version
278 @param[in]	ptr	redo log start
279 @param[in]	end	end of redo log
280 @retval ptr to next redo log record, nullptr if this log record
281 was truncated */
parseMetadataLog(table_id_t id,uint64_t version,byte * ptr,byte * end)282 byte *MetadataRecover::parseMetadataLog(table_id_t id, uint64_t version,
283                                         byte *ptr, byte *end) {
284   if (ptr + 2 > end) {
285     /* At least we should get type byte and another one byte
286     for data, if not, it's an incomplete log */
287     return (nullptr);
288   }
289 
290   persistent_type_t type = static_cast<persistent_type_t>(ptr[0]);
291 
292   ut_ad(dict_persist->persisters != nullptr);
293 
294   Persister *persister = dict_persist->persisters->get(type);
295   PersistentTableMetadata *metadata = getMetadata(id);
296 
297   bool corrupt;
298   ulint consumed = persister->read(*metadata, ptr, end - ptr, &corrupt);
299 
300   if (corrupt) {
301     recv_sys->found_corrupt_log = true;
302   } else if (consumed != 0) {
303     metadata->set_version(version);
304   }
305 
306   if (consumed == 0) {
307     return (nullptr);
308   } else {
309     return (ptr + consumed);
310   }
311 }
312 
313 /** Apply the collected persistent dynamic metadata to in-memory
314 table objects */
apply()315 void MetadataRecover::apply() {
316   PersistentTables::iterator iter;
317 
318   for (iter = m_tables.begin(); iter != m_tables.end(); ++iter) {
319     table_id_t table_id = iter->first;
320     PersistentTableMetadata *metadata = iter->second;
321     dict_table_t *table;
322 
323     table = dd_table_open_on_id(table_id, nullptr, nullptr, false, true);
324 
325     /* If the table is nullptr, it might be already dropped */
326     if (table == nullptr) {
327       continue;
328     }
329 
330     mutex_enter(&dict_sys->mutex);
331 
332     /* At this time, the metadata in DDTableBuffer has
333     already been applied to table object, we can apply
334     the latest status of metadata read from redo logs to
335     the table now. We can read the dirty_status directly
336     since it's in recovery phase */
337 
338     /* The table should be either CLEAN or applied BUFFERED
339     metadata from DDTableBuffer just now */
340     ut_ad(table->dirty_status.load() == METADATA_CLEAN ||
341           table->dirty_status.load() == METADATA_BUFFERED);
342 
343     bool buffered = (table->dirty_status.load() == METADATA_BUFFERED);
344 
345     mutex_enter(&dict_persist->mutex);
346 
347     uint64_t autoinc_persisted = table->autoinc_persisted;
348     bool is_dirty = dict_table_apply_dynamic_metadata(table, metadata);
349 
350     if (is_dirty) {
351       /* This table was not marked as METADATA_BUFFERED
352       before the redo logs are applied, so it's not in
353       the list */
354       if (!buffered) {
355         ut_ad(!table->in_dirty_dict_tables_list);
356 
357         UT_LIST_ADD_LAST(dict_persist->dirty_dict_tables, table);
358       }
359 
360       table->dirty_status.store(METADATA_DIRTY);
361       ut_d(table->in_dirty_dict_tables_list = true);
362       ++dict_persist->num_dirty_tables;
363 
364       /* For those tables which are not initialized by
365       innobase_initialize_autoinc(), the next counter should be advanced to
366       point to the next auto increment value.  This is simlilar to
367       metadata_applier::operator(). */
368       if (autoinc_persisted != table->autoinc_persisted &&
369           table->autoinc != ~0ULL) {
370         ++table->autoinc;
371       }
372     }
373 
374     mutex_exit(&dict_persist->mutex);
375     mutex_exit(&dict_sys->mutex);
376 
377     dd_table_close(table, nullptr, nullptr, false);
378   }
379 }
380 
381 /** Creates the recovery system. */
recv_sys_create()382 void recv_sys_create() {
383   if (recv_sys != nullptr) {
384     return;
385   }
386 
387   recv_sys = static_cast<recv_sys_t *>(ut_zalloc_nokey(sizeof(*recv_sys)));
388 
389   mutex_create(LATCH_ID_RECV_SYS, &recv_sys->mutex);
390   mutex_create(LATCH_ID_RECV_WRITER, &recv_sys->writer_mutex);
391 
392   recv_sys->spaces = nullptr;
393 }
394 
395 /** Resize the recovery parsing buffer upto log_buffer_size */
recv_sys_resize_buf()396 bool recv_sys_resize_buf() {
397   ut_ad(recv_sys->buf_len <= srv_log_buffer_size);
398 
399 #ifndef UNIV_HOTBACKUP
400   /* If the buffer cannot be extended further, return false. */
401   if (recv_sys->buf_len == srv_log_buffer_size) {
402     ib::error(ER_IB_MSG_723, srv_log_buffer_size);
403     return false;
404   }
405 #else  /* !UNIV_HOTBACKUP */
406   if ((recv_sys->buf_len >= srv_log_buffer_size) ||
407       (recv_sys->len >= srv_log_buffer_size)) {
408     ib::fatal(ER_IB_ERR_LOG_PARSING_BUFFER_OVERFLOW)
409         << "Log parsing buffer overflow. Log parse failed. "
410         << "Please increase --limit-memory above "
411         << srv_log_buffer_size / 1024 / 1024 << " (MB)";
412   }
413 #endif /* !UNIV_HOTBACKUP */
414 
415   /* Extend the buffer by double the current size with the resulting
416   size not more than srv_log_buffer_size. */
417   recv_sys->buf_len = ((recv_sys->buf_len * 2) >= srv_log_buffer_size)
418                           ? srv_log_buffer_size
419                           : recv_sys->buf_len * 2;
420 
421   /* Resize the buffer to the new size. */
422   recv_sys->buf =
423       static_cast<byte *>(ut_realloc(recv_sys->buf, recv_sys->buf_len));
424 
425   ut_ad(recv_sys->buf != nullptr);
426 
427   /* Return error and fail the recovery if not enough memory available */
428   if (recv_sys->buf == nullptr) {
429     ib::error(ER_IB_MSG_740);
430     return false;
431   }
432 
433   ib::info(ER_IB_MSG_739, recv_sys->buf_len);
434   return true;
435 }
436 
437 /** Free up recovery data structures. */
recv_sys_finish()438 static void recv_sys_finish() {
439 #ifndef UNIV_HOTBACKUP
440   recv_sys->dblwr->recovered();
441 #endif /* !UNIV_HOTBACKUP */
442 
443   if (recv_sys->spaces != nullptr) {
444     for (auto &space : *recv_sys->spaces) {
445       if (space.second.m_heap != nullptr) {
446         mem_heap_free(space.second.m_heap);
447         space.second.m_heap = nullptr;
448       }
449     }
450 
451     UT_DELETE(recv_sys->spaces);
452   }
453 
454   ut_free(recv_sys->buf);
455   ut_free(recv_sys->last_block_buf_start);
456   UT_DELETE(recv_sys->metadata_recover);
457 
458   recv_sys->buf = nullptr;
459   recv_sys->spaces = nullptr;
460   recv_sys->metadata_recover = nullptr;
461   recv_sys->last_block_buf_start = nullptr;
462 }
463 
464 /** Release recovery system mutexes. */
recv_sys_close()465 void recv_sys_close() {
466   if (recv_sys == nullptr) {
467     return;
468   }
469 
470   recv_sys_finish();
471 
472 #ifndef UNIV_HOTBACKUP
473   if (recv_sys->flush_start != nullptr) {
474     os_event_destroy(recv_sys->flush_start);
475   }
476 
477   if (recv_sys->flush_end != nullptr) {
478     os_event_destroy(recv_sys->flush_end);
479   }
480 
481 #endif /* !UNIV_HOTBACKUP */
482 
483   UT_DELETE(recv_sys->dblwr);
484 
485   call_destructor(&recv_sys->deleted);
486   call_destructor(&recv_sys->missing_ids);
487 
488   mutex_free(&recv_sys->mutex);
489 
490   ut_ad(!recv_writer_is_active());
491   mutex_free(&recv_sys->writer_mutex);
492 
493   ut_free(recv_sys);
494   recv_sys = nullptr;
495 }
496 
497 #ifndef UNIV_HOTBACKUP
498 /** Reset the state of the recovery system variables. */
recv_sys_var_init()499 void recv_sys_var_init() {
500   recv_recovery_on = false;
501   recv_needed_recovery = false;
502   recv_lsn_checks_on = false;
503   recv_no_ibuf_operations = false;
504   recv_scan_print_counter = 0;
505   recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
506   recv_previous_parsed_rec_offset = 0;
507   recv_previous_parsed_rec_is_multi = 0;
508   recv_n_pool_free_frames = 256;
509   recv_max_page_lsn = 0;
510 }
511 #endif /* !UNIV_HOTBACKUP */
512 
513 /** Get the number of bytes used by all the heaps
514 @return number of bytes used */
515 #ifndef UNIV_HOTBACKUP
recv_heap_used()516 static size_t recv_heap_used()
517 #else  /* !UNIV_HOTBACKUP */
518 size_t meb_heap_used()
519 #endif /* !UNIV_HOTBACKUP */
520 {
521   size_t size = 0;
522 
523   for (auto &space : *recv_sys->spaces) {
524     if (space.second.m_heap != nullptr) {
525       size += mem_heap_get_size(space.second.m_heap);
526     }
527   }
528 
529   return (size);
530 }
531 
532 /** Prints diagnostic info of corrupt log.
533 @param[in]	ptr	pointer to corrupt log record
534 @param[in]	type	type of the log record (could be garbage)
535 @param[in]	space	tablespace ID (could be garbage)
536 @param[in]	page_no	page number (could be garbage)
537 @return whether processing should continue */
recv_report_corrupt_log(const byte * ptr,int type,space_id_t space,page_no_t page_no)538 static bool recv_report_corrupt_log(const byte *ptr, int type, space_id_t space,
539                                     page_no_t page_no) {
540   ib::error(ER_IB_MSG_694);
541 
542   ib::info(
543       ER_IB_MSG_695, type, ulong{space}, ulong{page_no},
544       ulonglong{recv_sys->recovered_lsn}, int{recv_previous_parsed_rec_type},
545       ulonglong{recv_previous_parsed_rec_is_multi},
546       ssize_t{ptr - recv_sys->buf}, ulonglong{recv_previous_parsed_rec_offset});
547 
548 #ifdef UNIV_HOTBACKUP
549   ut_ad(ptr >= recv_sys->buf);
550 #endif /* UNIV_HOTBACKUP */
551   ut_ad(ptr <= recv_sys->buf + recv_sys->len);
552 
553   const ulint limit = 100;
554   const ulint before = std::min(recv_previous_parsed_rec_offset, limit);
555   const ulint after = std::min(recv_sys->len - (ptr - recv_sys->buf), limit);
556 
557   ib::info(ER_IB_MSG_696, ulonglong{before}, ulonglong{after});
558 
559   ut_print_buf(
560       stderr, recv_sys->buf + recv_previous_parsed_rec_offset - before,
561       ptr - recv_sys->buf + before + after - recv_previous_parsed_rec_offset);
562   putc('\n', stderr);
563 
564 #ifndef UNIV_HOTBACKUP
565   if (srv_force_recovery == 0) {
566     ib::info(ER_IB_MSG_697);
567 
568     return (false);
569   }
570 
571   ib::warn(ER_IB_MSG_698, FORCE_RECOVERY_MSG);
572 #endif /* !UNIV_HOTBACKUP */
573 
574   return (true);
575 }
576 
recv_sys_init(ulint max_mem)577 void recv_sys_init(ulint max_mem) {
578   if (recv_sys->spaces != nullptr) {
579     return;
580   }
581 
582   mutex_enter(&recv_sys->mutex);
583 
584 #ifndef UNIV_HOTBACKUP
585   if (!srv_read_only_mode) {
586     recv_sys->flush_start = os_event_create();
587     recv_sys->flush_end = os_event_create();
588   }
589 #else  /* !UNIV_HOTBACKUP */
590   recv_is_from_backup = true;
591   recv_sys->apply_file_operations = false;
592 #endif /* !UNIV_HOTBACKUP */
593 
594   /* Set appropriate value of recv_n_pool_free_frames. If capacity
595   is at least 10M and 25% above 512 pages then bump free frames to
596   512. */
597   if (buf_pool_get_curr_size() >= (10 * 1024 * 1024) &&
598       (buf_pool_get_curr_size() >= ((512 + 128) * UNIV_PAGE_SIZE))) {
599     /* Buffer pool of size greater than 10 MB. */
600     recv_n_pool_free_frames = 512;
601   }
602 
603   recv_sys->buf = static_cast<byte *>(ut_malloc_nokey(RECV_PARSING_BUF_SIZE));
604   recv_sys->buf_len = RECV_PARSING_BUF_SIZE;
605 
606   recv_sys->len = 0;
607   recv_sys->recovered_offset = 0;
608 
609   using Spaces = recv_sys_t::Spaces;
610 
611   recv_sys->spaces = UT_NEW(Spaces(), mem_log_recv_space_hash_key);
612 
613   recv_sys->n_addrs = 0;
614 
615   recv_sys->apply_log_recs = false;
616   recv_sys->apply_batch_on = false;
617   recv_sys->is_cloned_db = false;
618 
619   recv_sys->last_block_buf_start =
620       static_cast<byte *>(ut_malloc_nokey(2 * OS_FILE_LOG_BLOCK_SIZE));
621 
622   recv_sys->last_block = static_cast<byte *>(
623       ut_align(recv_sys->last_block_buf_start, OS_FILE_LOG_BLOCK_SIZE));
624 
625   recv_sys->found_corrupt_log = false;
626   recv_sys->found_corrupt_fs = false;
627 
628   recv_max_page_lsn = 0;
629 
630   recv_sys->dblwr = UT_NEW_NOKEY(dblwr::recv::DBLWR());
631 
632   new (&recv_sys->deleted) recv_sys_t::Missing_Ids();
633 
634   new (&recv_sys->missing_ids) recv_sys_t::Missing_Ids();
635 
636   recv_sys->metadata_recover = UT_NEW_NOKEY(MetadataRecover());
637 
638   mutex_exit(&recv_sys->mutex);
639 }
640 
641 /** Empties the hash table when it has been fully processed. */
recv_sys_empty_hash()642 static void recv_sys_empty_hash() {
643   ut_ad(mutex_own(&recv_sys->mutex));
644 
645   if (recv_sys->n_addrs != 0) {
646     ib::fatal(ER_IB_MSG_699, ulonglong{recv_sys->n_addrs});
647   }
648 
649   for (auto &space : *recv_sys->spaces) {
650     if (space.second.m_heap != nullptr) {
651       mem_heap_free(space.second.m_heap);
652       space.second.m_heap = nullptr;
653     }
654   }
655 
656   UT_DELETE(recv_sys->spaces);
657 
658   using Spaces = recv_sys_t::Spaces;
659 
660   recv_sys->spaces = UT_NEW(Spaces(), mem_log_recv_space_hash_key);
661 }
662 
663 /** Check the consistency of a log header block.
664 @param[in]	buf	header block
665 @return true if ok */
666 #ifndef UNIV_HOTBACKUP
667 static
668 #endif /* !UNIV_HOTBACKUP */
669     bool
recv_check_log_header_checksum(const byte * buf)670     recv_check_log_header_checksum(const byte *buf) {
671   auto c1 = log_block_get_checksum(buf);
672   auto c2 = log_block_calc_checksum_crc32(buf);
673 
674   return (c1 == c2);
675 }
676 
677 /** Check the 4-byte checksum to the trailer checksum field of a log
678 block.
679 @param[in]	block	pointer to a log block
680 @return whether the checksum matches */
681 #if !defined(UNIV_HOTBACKUP) && !defined(XTRABACKUP)
682 static
683 #endif /* !UNIV_HOTBACKUP && !XTRABACKUP */
684     bool
log_block_checksum_is_ok(const byte * block)685     log_block_checksum_is_ok(const byte *block) {
686   return (!srv_log_checksums ||
687           log_block_get_checksum(block) == log_block_calc_checksum(block));
688 }
689 
690 /** Get the page map for a tablespace. It will create one if one isn't found.
691 @param[in]	space_id	Tablespace ID for which page map required.
692 @param[in]	create		false if lookup only
693 @return the space data or null if not found */
recv_get_page_map(space_id_t space_id,bool create)694 static recv_sys_t::Space *recv_get_page_map(space_id_t space_id, bool create) {
695   auto it = recv_sys->spaces->find(space_id);
696 
697   if (it != recv_sys->spaces->end()) {
698     return (&it->second);
699 
700   } else if (create) {
701     mem_heap_t *heap;
702 
703     heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
704 
705     using Space = recv_sys_t::Space;
706     using value_type = recv_sys_t::Spaces::value_type;
707 
708     auto where =
709         recv_sys->spaces->insert(it, value_type(space_id, Space(heap)));
710 
711     return (&where->second);
712   }
713 
714   return (nullptr);
715 }
716 
717 /** Gets the list of log records for a <space, page>.
718 @param[in]	space_id	Tablespace ID
719 @param[in]	page_no		Page number
720 @return the redo log entries or nullptr if not found */
recv_get_rec(space_id_t space_id,page_no_t page_no)721 static recv_addr_t *recv_get_rec(space_id_t space_id, page_no_t page_no) {
722   recv_sys_t::Space *space;
723 
724   space = recv_get_page_map(space_id, false);
725 
726   if (space != nullptr) {
727     auto it = space->m_pages.find(page_no);
728 
729     if (it != space->m_pages.end()) {
730       return (it->second);
731     }
732   }
733 
734   return (nullptr);
735 }
736 
737 #ifndef UNIV_HOTBACKUP
738 /** Store the collected persistent dynamic metadata to
739 mysql.innodb_dynamic_metadata */
store()740 void MetadataRecover::store() {
741   ut_ad(dict_sys->dynamic_metadata != nullptr);
742   ut_ad(dict_persist->table_buffer != nullptr);
743 
744   DDTableBuffer *table_buffer = dict_persist->table_buffer;
745 
746   if (empty()) {
747     return;
748   }
749 
750   mutex_enter(&dict_persist->mutex);
751 
752   for (auto meta : m_tables) {
753     table_id_t table_id = meta.first;
754     PersistentTableMetadata *metadata = meta.second;
755     byte buffer[REC_MAX_DATA_SIZE];
756     ulint size;
757 
758     size = dict_persist->persisters->write(*metadata, buffer);
759 
760     dberr_t error =
761         table_buffer->replace(table_id, metadata->get_version(), buffer, size);
762     if (error != DB_SUCCESS) {
763       ut_ad(0);
764     }
765   }
766 
767   mutex_exit(&dict_persist->mutex);
768 }
769 
770 /** recv_writer thread tasked with flushing dirty pages from the buffer
771 pools. */
recv_writer_thread()772 static void recv_writer_thread() {
773   ut_ad(!srv_read_only_mode);
774 
775   /* The code flow is as follows:
776   Step 1: In recv_recovery_from_checkpoint_start().
777   Step 2: This recv_writer thread is started.
778   Step 3: In recv_recovery_from_checkpoint_finish().
779   Step 4: Wait for recv_writer thread to complete.
780   Step 5: Assert that recv_writer thread is not active anymore.
781 
782   It is possible that the thread that is started in step 2,
783   becomes active only after step 4 and hence the assert in
784   step 5 fails.  So mark this thread active only if necessary. */
785   mutex_enter(&recv_sys->writer_mutex);
786 
787   if (!recv_recovery_on) {
788     mutex_exit(&recv_sys->writer_mutex);
789     return;
790   }
791   mutex_exit(&recv_sys->writer_mutex);
792 
793   while (srv_shutdown_state.load() == SRV_SHUTDOWN_NONE) {
794     ut_a(srv_shutdown_state_matches([](auto state) {
795       return state == SRV_SHUTDOWN_NONE || state == SRV_SHUTDOWN_EXIT_THREADS;
796     }));
797 
798     os_thread_sleep(100000);
799 
800     mutex_enter(&recv_sys->writer_mutex);
801 
802     if (!recv_recovery_on) {
803       mutex_exit(&recv_sys->writer_mutex);
804       break;
805     }
806 
807     if (log_test != nullptr) {
808       mutex_exit(&recv_sys->writer_mutex);
809       continue;
810     }
811 
812     /* Flush pages from end of LRU if required */
813     os_event_reset(recv_sys->flush_end);
814     recv_sys->flush_type = BUF_FLUSH_LRU;
815     os_event_set(recv_sys->flush_start);
816     os_event_wait(recv_sys->flush_end);
817 
818     mutex_exit(&recv_sys->writer_mutex);
819   }
820 }
821 
822 /** Frees the recovery system. */
recv_sys_free()823 void recv_sys_free() {
824   mutex_enter(&recv_sys->mutex);
825 
826   recv_sys_finish();
827 
828   /* wake page cleaner up to progress */
829   if (!srv_read_only_mode) {
830     ut_ad(!recv_recovery_on);
831     ut_ad(!recv_writer_is_active());
832     if (buf_flush_event != nullptr) {
833       os_event_reset(buf_flush_event);
834     }
835     os_event_set(recv_sys->flush_start);
836   }
837 
838   /* Free encryption data structures. */
839   if (recv_sys->keys != nullptr) {
840     for (auto &key : *recv_sys->keys) {
841       if (key.ptr != nullptr) {
842         ut_free(key.ptr);
843         key.ptr = nullptr;
844       }
845 
846       if (key.iv != nullptr) {
847         ut_free(key.iv);
848         key.iv = nullptr;
849       }
850     }
851 
852     recv_sys->keys->swap(*recv_sys->keys);
853 
854     UT_DELETE(recv_sys->keys);
855     recv_sys->keys = nullptr;
856   }
857 
858   mutex_exit(&recv_sys->mutex);
859 }
860 
861 /** Copy of the LOG_HEADER_CREATOR field. */
862 static char log_header_creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
863 
864 /** Determine if a redo log from a version before MySQL 8.0.3 is clean.
865 @param[in,out]	log		redo log
866 @param[in]	checkpoint_no	checkpoint number
867 @param[in]	checkpoint_lsn	checkpoint LSN
868 @return error code
869 @retval DB_SUCCESS	if the redo log is clean
870 @retval DB_ERROR	if the redo log is corrupted or dirty */
recv_log_recover_pre_8_0_4(log_t & log,checkpoint_no_t checkpoint_no,lsn_t checkpoint_lsn)871 static dberr_t recv_log_recover_pre_8_0_4(log_t &log,
872                                           checkpoint_no_t checkpoint_no,
873                                           lsn_t checkpoint_lsn) {
874   lsn_t source_offset;
875   lsn_t block_lsn;
876   page_no_t page_no;
877   byte *buf;
878 
879   source_offset = log_files_real_offset_for_lsn(log, checkpoint_lsn);
880 
881   block_lsn = ut_uint64_align_down(checkpoint_lsn, OS_FILE_LOG_BLOCK_SIZE);
882 
883   page_no = (page_no_t)(source_offset / univ_page_size.physical());
884 
885   buf = log.buf + block_lsn % log.buf_size;
886 
887   static const char *NO_UPGRADE_RECOVERY_MSG =
888       "Upgrade after a crash is not supported."
889       " This redo log was created with ";
890 
891   static const char *NO_UPGRADE_RTFM_MSG =
892       ". Please follow the instructions at " REFMAN "upgrading.html";
893 
894   dberr_t err;
895 
896   err = fil_redo_io(IORequestLogRead, page_id_t(log.files_space_id, page_no),
897                     univ_page_size,
898                     (ulint)((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1)) %
899                             univ_page_size.physical()),
900                     OS_FILE_LOG_BLOCK_SIZE, buf);
901 
902   ut_a(err == DB_SUCCESS);
903 
904   if (log_block_calc_checksum(buf) != log_block_get_checksum(buf)) {
905     ib::error(ER_IB_MSG_700)
906         << NO_UPGRADE_RECOVERY_MSG << log_header_creator
907         << ", and it appears corrupted" << NO_UPGRADE_RTFM_MSG;
908 
909     return (DB_CORRUPTION);
910   }
911 
912   /* On a clean shutdown, the redo log will be logically empty
913   after the checkpoint LSN. */
914 
915   if (log_block_get_data_len(buf) !=
916       (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
917     ib::error(ER_IB_MSG_701)
918         << NO_UPGRADE_RECOVERY_MSG << log_header_creator << NO_UPGRADE_RTFM_MSG;
919 
920     return (DB_ERROR);
921   }
922 
923   /* Mark the redo log for upgrading. */
924   srv_log_file_size = 0;
925 
926   recv_sys->parse_start_lsn = checkpoint_lsn;
927   recv_sys->bytes_to_ignore_before_checkpoint = 0;
928   recv_sys->recovered_lsn = checkpoint_lsn;
929   recv_sys->previous_recovered_lsn = checkpoint_lsn;
930   recv_sys->checkpoint_lsn = checkpoint_lsn;
931   recv_sys->scanned_lsn = checkpoint_lsn;
932   recv_sys->last_block_first_rec_group = 0;
933 
934   ut_d(log.first_block_is_correct_for_lsn = checkpoint_lsn);
935 
936   /* We are not going to rewrite the block, but just in case we prefer to
937   have first_rec_group which points on checkpoint_lsn (instead of pointing
938   on mini transactions from earlier formats). This is extra safety if one
939   day this block would become rewritten because of some new bug (using new
940   format). */
941   log_block_set_first_rec_group(buf, checkpoint_lsn % OS_FILE_LOG_BLOCK_SIZE);
942 
943   log_start(log, checkpoint_no + 1, checkpoint_lsn, checkpoint_lsn);
944 
945   return (DB_SUCCESS);
946 }
947 
948 /** Find the latest checkpoint in the log header.
949 @param[in,out]	log		redo log
950 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
951 @return error code or DB_SUCCESS */
952 MY_ATTRIBUTE((warn_unused_result))
recv_find_max_checkpoint(log_t & log,ulint * max_field)953 dberr_t recv_find_max_checkpoint(log_t &log, ulint *max_field) {
954   bool found_checkpoint = false;
955 
956   *max_field = 0;
957 
958   byte *buf = log.checkpoint_buf;
959 
960   log.state = log_state_t::CORRUPTED;
961 
962   log_files_header_read(log, 0);
963 
964   /* Check the header page checksum. There was no
965   checksum in the first redo log format (version 0). */
966   log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
967 
968   if (log_detected_format == UINT32_MAX) {
969     log_detected_format = log.format;
970   } else {
971     // TODO: make it debug assert
972     ut_a(log.format == log_detected_format);
973   }
974 
975   if (log.format != 0 && !recv_check_log_header_checksum(buf)) {
976     ib::error(ER_IB_MSG_1264) << "Invalid redo log header checksum.";
977 
978     return (DB_CORRUPTION);
979   }
980 
981   memcpy(log_header_creator, buf + LOG_HEADER_CREATOR,
982          sizeof log_header_creator);
983 
984   log_header_creator[(sizeof log_header_creator) - 1] = 0;
985 
986   switch (log.format) {
987     case 0:
988       ib::error(ER_IB_MSG_1265) << "Unsupported redo log format (" << log.format
989                                 << "). The redo log was created"
990                                 << " before MySQL 5.7.9";
991 
992       return (DB_ERROR);
993 
994     case LOG_HEADER_FORMAT_5_7_9:
995     case LOG_HEADER_FORMAT_8_0_1:
996 
997       ib::info(ER_IB_MSG_704, ulong{log.format});
998 
999     case LOG_HEADER_FORMAT_8_0_3:
1000     case LOG_HEADER_FORMAT_CURRENT:
1001       /* The checkpoint page format is identical upto v4. */
1002       break;
1003 
1004     default:
1005       ib::error(ER_IB_MSG_705, ulong{log.format}, REFMAN);
1006 
1007       return (DB_ERROR);
1008   }
1009 
1010   log.m_first_file_lsn = mach_read_from_8(buf + LOG_HEADER_START_LSN);
1011 
1012   uint32_t flags = mach_read_from_4(buf + LOG_HEADER_FLAGS);
1013 
1014   if (LOG_HEADER_CHECK_FLAG(flags, LOG_HEADER_FLAG_NO_LOGGING)) {
1015     /* Exit if server is crashed while running without redo logging. */
1016     if (LOG_HEADER_CHECK_FLAG(flags, LOG_HEADER_FLAG_CRASH_UNSAFE)) {
1017       ib::error(ER_IB_ERR_RECOVERY_REDO_DISABLED);
1018       /* Allow to proceed with SRV_FORCE_NO_LOG_REDO[6] */
1019       if (srv_force_recovery < SRV_FORCE_NO_LOG_REDO) {
1020         return (DB_ERROR);
1021       }
1022     }
1023     auto err = mtr_t::s_logging.disable(nullptr);
1024     /* Currently never fails. */
1025     ut_a(err == 0);
1026     srv_redo_log = false;
1027   }
1028 
1029   uint64_t max_no = 0;
1030   constexpr ulint CKP1 = LOG_CHECKPOINT_1;
1031   constexpr ulint CKP2 = LOG_CHECKPOINT_2;
1032 
1033   for (auto i = CKP1; i <= CKP2; i += CKP2 - CKP1) {
1034     log_files_header_read(log, static_cast<uint32_t>(i));
1035 
1036     if (!recv_check_log_header_checksum(buf)) {
1037       DBUG_PRINT("ib_log", ("invalid checkpoint, at %lu, checksum %x", i,
1038                             (unsigned)log_block_get_checksum(buf)));
1039       continue;
1040     }
1041 
1042     log.state = log_state_t::OK;
1043 
1044     log.current_file_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
1045 
1046     log.current_file_real_offset =
1047         mach_read_from_8(buf + LOG_CHECKPOINT_OFFSET);
1048 
1049     if (log.current_file_real_offset % log.file_size < LOG_FILE_HDR_SIZE) {
1050       log.current_file_real_offset -=
1051           log.current_file_real_offset % log.file_size;
1052 
1053       log.current_file_real_offset += LOG_FILE_HDR_SIZE;
1054     }
1055 
1056     log_files_update_offsets(log, log.current_file_lsn);
1057 
1058     uint64_t checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
1059 
1060     DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF, checkpoint_no,
1061                           log.current_file_lsn));
1062 
1063     if (checkpoint_no >= max_no) {
1064       *max_field = i;
1065       max_no = checkpoint_no;
1066       found_checkpoint = true;
1067     }
1068   }
1069 
1070   if (!found_checkpoint) {
1071     /* Before 5.7.9, we could get here during database
1072     initialization if we created an ib_logfile0 file that
1073     was filled with zeroes, and were killed. After
1074     5.7.9, we would reject such a file already earlier,
1075     when checking the file header. */
1076 
1077     ib::error(ER_IB_MSG_706);
1078     return (DB_ERROR);
1079   }
1080 
1081   return (DB_SUCCESS);
1082 }
1083 
1084 /** Reads in pages which have hashed log records, from an area around a given
1085 page number.
1086 @param[in]	page_id		Read the pages around this page number
1087 @return number of pages found */
recv_read_in_area(const page_id_t & page_id)1088 static ulint recv_read_in_area(const page_id_t &page_id) {
1089   page_no_t low_limit;
1090 
1091   low_limit = page_id.page_no() - (page_id.page_no() % RECV_READ_AHEAD_AREA);
1092 
1093   ulint n = 0;
1094 
1095   std::array<page_no_t, RECV_READ_AHEAD_AREA> page_nos;
1096 
1097   for (page_no_t page_no = low_limit;
1098        page_no < low_limit + RECV_READ_AHEAD_AREA; ++page_no) {
1099     recv_addr_t *recv_addr;
1100 
1101     recv_addr = recv_get_rec(page_id.space(), page_no);
1102 
1103     const page_id_t cur_page_id(page_id.space(), page_no);
1104 
1105     if (recv_addr != nullptr && !buf_page_peek(cur_page_id)) {
1106       mutex_enter(&recv_sys->mutex);
1107 
1108       if (recv_addr->state == RECV_NOT_PROCESSED) {
1109         recv_addr->state = RECV_BEING_READ;
1110 
1111         page_nos[n] = page_no;
1112 
1113         ++n;
1114       }
1115 
1116       mutex_exit(&recv_sys->mutex);
1117     }
1118   }
1119 
1120   if (n > 0) {
1121     /* There are pages that need to be read. Go ahead and read them
1122     for recovery. */
1123     buf_read_recv_pages(false, page_id.space(), &page_nos[0], n);
1124   }
1125 
1126   return (n);
1127 }
1128 
1129 /** Apply the log records to a page
1130 @param[in,out]	recv_addr	Redo log records to apply */
recv_apply_log_rec(recv_addr_t * recv_addr)1131 static void recv_apply_log_rec(recv_addr_t *recv_addr) {
1132   if (recv_addr->state == RECV_DISCARDED) {
1133     ut_a(recv_sys->n_addrs > 0);
1134     --recv_sys->n_addrs;
1135     return;
1136   }
1137 
1138   bool found;
1139   const page_id_t page_id(recv_addr->space, recv_addr->page_no);
1140 
1141   const page_size_t page_size =
1142       fil_space_get_page_size(recv_addr->space, &found);
1143 
1144   if (!found || recv_sys->missing_ids.find(recv_addr->space) !=
1145                     recv_sys->missing_ids.end()) {
1146     /* Tablespace was discarded or dropped after changes were
1147     made to it. Or, we have ignored redo log for this tablespace
1148     earlier and somehow it has been found now. We can't apply
1149     this redo log out of order. */
1150 
1151     recv_addr->state = RECV_PROCESSED;
1152 
1153     ut_a(recv_sys->n_addrs > 0);
1154     --recv_sys->n_addrs;
1155 
1156     /* If the tablespace has been explicitly deleted, we
1157     can safely ignore it. */
1158 
1159     if (recv_sys->deleted.find(recv_addr->space) == recv_sys->deleted.end()) {
1160       recv_sys->missing_ids.insert(recv_addr->space);
1161     }
1162 
1163   } else if (recv_addr->state == RECV_NOT_PROCESSED) {
1164     mutex_exit(&recv_sys->mutex);
1165 
1166     if (buf_page_peek(page_id)) {
1167       mtr_t mtr;
1168 
1169       mtr_start(&mtr);
1170 
1171       buf_block_t *block;
1172 
1173       block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1174 
1175       buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
1176 
1177       recv_recover_page(false, block);
1178 
1179       mtr_commit(&mtr);
1180 
1181     } else {
1182       recv_read_in_area(page_id);
1183     }
1184 
1185     mutex_enter(&recv_sys->mutex);
1186   }
1187 }
1188 
1189 /** Empties the hash table of stored log records, applying them to appropriate
1190 pages.
1191 @param[in,out]	log		Redo log
1192 @param[in]	allow_ibuf	if true, ibuf operations are allowed during
1193                                 the application; if false, no ibuf operations
1194                                 are allowed, and after the application all
1195                                 file pages are flushed to disk and invalidated
1196                                 in buffer pool: this alternative means that
1197                                 no new log records can be generated during
1198                                 the application; the caller must in this case
1199                                 own the log mutex */
recv_apply_hashed_log_recs(log_t & log,bool allow_ibuf)1200 void recv_apply_hashed_log_recs(log_t &log, bool allow_ibuf) {
1201   for (;;) {
1202     mutex_enter(&recv_sys->mutex);
1203 
1204     if (!recv_sys->apply_batch_on) {
1205       break;
1206     }
1207 
1208     mutex_exit(&recv_sys->mutex);
1209 
1210     os_thread_sleep(500000);
1211   }
1212 
1213   if (!allow_ibuf) {
1214     recv_no_ibuf_operations = true;
1215   }
1216 
1217   recv_sys->apply_log_recs = true;
1218   recv_sys->apply_batch_on = true;
1219 
1220   auto batch_size = recv_sys->n_addrs;
1221 
1222   ib::info(ER_IB_MSG_707, ulonglong{batch_size});
1223 
1224   static const size_t PCT = 10;
1225 
1226   size_t pct = PCT;
1227   size_t applied = 0;
1228   auto unit = batch_size / PCT;
1229 
1230   if (unit <= PCT) {
1231     pct = 100;
1232     unit = batch_size;
1233   }
1234 
1235   auto start_time = ut_time_monotonic();
1236 
1237   for (const auto &space : *recv_sys->spaces) {
1238     bool dropped;
1239 
1240     if (space.first != TRX_SYS_SPACE &&
1241         !fil_tablespace_open_for_recovery(space.first)) {
1242       /* Tablespace was dropped. It should not have been scanned unless it
1243       is an undo space that was under construction. */
1244 
1245       if (fil_tablespace_lookup_for_recovery(space.first)) {
1246         ut_ad(fsp_is_undo_tablespace(space.first));
1247       }
1248 
1249       dropped = true;
1250     } else {
1251       dropped = false;
1252     }
1253 
1254     for (auto pages : space.second.m_pages) {
1255       ut_ad(pages.second->space == space.first);
1256 
1257       if (dropped) {
1258         pages.second->state = RECV_DISCARDED;
1259       }
1260 
1261       recv_apply_log_rec(pages.second);
1262 
1263       ++applied;
1264 
1265       if (unit == 0 || (applied % unit) == 0) {
1266         ib::info(ER_IB_MSG_708) << pct << "%";
1267 
1268         pct += PCT;
1269 
1270         start_time = ut_time_monotonic();
1271 
1272       } else if (ut_time_monotonic() - start_time >= PRINT_INTERVAL_SECS) {
1273         start_time = ut_time_monotonic();
1274 
1275         ib::info(ER_IB_MSG_709)
1276             << std::setprecision(2)
1277             << ((double)applied * 100) / (double)batch_size << "%";
1278       }
1279     }
1280   }
1281 
1282   /* Wait until all the pages have been processed */
1283 
1284   while (recv_sys->n_addrs != 0) {
1285     mutex_exit(&recv_sys->mutex);
1286 
1287     os_thread_sleep(500000);
1288 
1289     mutex_enter(&recv_sys->mutex);
1290   }
1291 
1292   if (!allow_ibuf) {
1293     /* Flush all the file pages to disk and invalidate them in
1294     the buffer pool */
1295 
1296     ut_d(log.disable_redo_writes = true);
1297 
1298     mutex_exit(&recv_sys->mutex);
1299 
1300     /* Stop the recv_writer thread from issuing any LRU
1301     flush batches. */
1302     mutex_enter(&recv_sys->writer_mutex);
1303 
1304     /* Wait for any currently run batch to end. */
1305     buf_flush_wait_LRU_batch_end();
1306 
1307     os_event_reset(recv_sys->flush_end);
1308 
1309     recv_sys->flush_type = BUF_FLUSH_LIST;
1310 
1311     os_event_set(recv_sys->flush_start);
1312 
1313     os_event_wait(recv_sys->flush_end);
1314 
1315     buf_pool_invalidate();
1316 
1317     /* Allow batches from recv_writer thread. */
1318     mutex_exit(&recv_sys->writer_mutex);
1319 
1320     ut_d(log.disable_redo_writes = false);
1321 
1322     mutex_enter(&recv_sys->mutex);
1323 
1324     recv_no_ibuf_operations = false;
1325   }
1326 
1327   recv_sys->apply_log_recs = false;
1328   recv_sys->apply_batch_on = false;
1329 
1330   recv_sys_empty_hash();
1331 
1332   mutex_exit(&recv_sys->mutex);
1333 
1334   ib::info(ER_IB_MSG_710);
1335 }
1336 
1337 #else /* !UNIV_HOTBACKUP */
1338 /** Scans the log segment and n_bytes_scanned is set to the length of valid
1339 log scanned.
1340 @param[in]	buf			buffer containing log data
1341 @param[in]	buf_len			data length in that buffer
1342 @param[in,out]	scanned_lsn		LSN of buffer start, we return scanned
1343 lsn
1344 @param[in,out]	scanned_checkpoint_no	4 lowest bytes of the highest scanned
1345 checkpoint number so far
1346 @param[out]	block_no	highest block no in scanned buffer.
1347 @param[out]	n_bytes_scanned		how much we were able to scan, smaller
1348 than buf_len if log data ended here
1349 +@param[out]	has_encrypted_log	set true, if buffer contains encrypted
1350 +redo log, set false otherwise */
meb_scan_log_seg(byte * buf,ulint buf_len,lsn_t * scanned_lsn,uint32_t * scanned_checkpoint_no,uint32_t * block_no,ulint * n_bytes_scanned,bool * has_encrypted_log)1351 void meb_scan_log_seg(byte *buf, ulint buf_len, lsn_t *scanned_lsn,
1352                       uint32_t *scanned_checkpoint_no, uint32_t *block_no,
1353                       ulint *n_bytes_scanned, bool *has_encrypted_log) {
1354   *n_bytes_scanned = 0;
1355   *has_encrypted_log = false;
1356 
1357   for (auto log_block = buf; log_block < buf + buf_len;
1358        log_block += OS_FILE_LOG_BLOCK_SIZE) {
1359     uint32_t no = log_block_get_hdr_no(log_block);
1360     bool is_encrypted = log_block_get_encrypt_bit(log_block);
1361 
1362     if (is_encrypted) {
1363       *has_encrypted_log = true;
1364       return;
1365     }
1366 
1367     if (no != log_block_convert_lsn_to_no(*scanned_lsn) ||
1368         !log_block_checksum_is_ok(log_block)) {
1369       ib::trace_2() << "Scanned lsn: " << *scanned_lsn << " header no: " << no
1370                     << " converted no: "
1371                     << log_block_convert_lsn_to_no(*scanned_lsn)
1372                     << " checksum: " << log_block_checksum_is_ok(log_block)
1373                     << " block cp no: "
1374                     << log_block_get_checkpoint_no(log_block);
1375 
1376       /* Garbage or an incompletely written log block */
1377 
1378       log_block += OS_FILE_LOG_BLOCK_SIZE;
1379       break;
1380     }
1381 
1382     if (*scanned_checkpoint_no > 0 &&
1383         log_block_get_checkpoint_no(log_block) < *scanned_checkpoint_no &&
1384         *scanned_checkpoint_no - log_block_get_checkpoint_no(log_block) >
1385             0x80000000UL) {
1386       /* Garbage from a log buffer flush which was made
1387       before the most recent database recovery */
1388 
1389       ib::trace_2() << "Scanned cp no: " << *scanned_checkpoint_no
1390                     << " block cp no "
1391                     << log_block_get_checkpoint_no(log_block);
1392 
1393       break;
1394     }
1395 
1396     ulint data_len = log_block_get_data_len(log_block);
1397 
1398     *scanned_checkpoint_no = log_block_get_checkpoint_no(log_block);
1399     *scanned_lsn += data_len;
1400 
1401     *n_bytes_scanned += data_len;
1402 
1403     if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
1404       /* Log data ends here */
1405 
1406       break;
1407     }
1408     *block_no = no;
1409   }
1410 }
1411 
1412 /** Apply a single log record stored in the hash table.
1413 @param[in,out]	recv_addr	a parsed log record
1414 @param[in,out]	block           a buffer pool frame for applying the record */
meb_apply_log_record(recv_addr_t * recv_addr,buf_block_t * block)1415 void meb_apply_log_record(recv_addr_t *recv_addr, buf_block_t *block) {
1416   bool found;
1417   const page_id_t page_id(recv_addr->space, recv_addr->page_no);
1418 
1419   const page_size_t &page_size =
1420       fil_space_get_page_size(recv_addr->space, &found);
1421 
1422   ib::trace_3() << "recv_addr {State: " << recv_addr->state
1423                 << ", Space id: " << recv_addr->space
1424                 << ", Page no: " << recv_addr->page_no
1425                 << ", Page size: " << page_size << ", found: " << found << "\n";
1426 
1427   if (!found) {
1428     recv_addr->state = RECV_DISCARDED;
1429 
1430     mutex_enter(&recv_sys->mutex);
1431 
1432     ut_a(recv_sys->n_addrs);
1433     --recv_sys->n_addrs;
1434 
1435     mutex_exit(&recv_sys->mutex);
1436 
1437     return;
1438   }
1439 
1440   mutex_enter(&recv_sys->mutex);
1441 
1442   /* We simulate a page read made by the buffer pool, to
1443   make sure the recovery apparatus works ok. We must init
1444   the block. */
1445 
1446   meb_page_init(page_id, page_size, block);
1447 
1448   /* Extend the tablespace's last file if the page_no
1449   does not fall inside its bounds; we assume the last
1450   file is auto-extending, and mysqlbackup copied the file
1451   when it still was smaller */
1452 
1453   fil_space_t *space = fil_space_get(recv_addr->space);
1454 
1455   bool success;
1456 
1457   success = fil_space_extend(space, recv_addr->page_no + 1);
1458 
1459   if (!success) {
1460     ib::fatal(ER_IB_MSG_711) << "Cannot extend tablespace " << recv_addr->space
1461                              << " to hold " << recv_addr->page_no << " pages";
1462   }
1463 
1464   mutex_exit(&recv_sys->mutex);
1465 
1466   /* Read the page from the tablespace file. */
1467 
1468   dberr_t err;
1469 
1470   if (page_size.is_compressed()) {
1471     err = fil_io(IORequestRead, true, page_id, page_size, 0,
1472                  page_size.physical(), block->page.zip.data, nullptr);
1473 
1474     if (err == DB_SUCCESS && !buf_zip_decompress(block, TRUE)) {
1475       ut_error;
1476     }
1477   } else {
1478     err = fil_io(IORequestRead, true, page_id, page_size, 0,
1479                  page_size.logical(), block->frame, nullptr);
1480   }
1481 
1482   if (err != DB_SUCCESS) {
1483     ib::fatal(ER_IB_MSG_712)
1484         << "Cannot read from tablespace " << recv_addr->space << " page number "
1485         << recv_addr->page_no;
1486   }
1487 
1488   apply_log_mutex.lock();
1489 
1490   /* Apply the log records to this page */
1491   recv_recover_page(false, block);
1492 
1493   apply_log_mutex.unlock();
1494 
1495   mutex_enter(&recv_sys->mutex);
1496 
1497   /* Write the page back to the tablespace file using the
1498   fil0fil.cc routines */
1499 
1500   buf_flush_init_for_writing(block, block->frame, buf_block_get_page_zip(block),
1501                              mach_read_from_8(block->frame + FIL_PAGE_LSN),
1502                              fsp_is_checksum_disabled(block->page.id.space()),
1503                              true /* skip_lsn_check */);
1504 
1505   mutex_exit(&recv_sys->mutex);
1506 
1507   if (page_size.is_compressed()) {
1508     err = fil_io(IORequestWrite, true, page_id, page_size, 0,
1509                  page_size.physical(), block->page.zip.data, nullptr);
1510   } else {
1511     err = fil_io(IORequestWrite, true, page_id, page_size, 0,
1512                  page_size.logical(), block->frame, nullptr);
1513   }
1514 
1515   if (err != DB_SUCCESS) {
1516     ib::fatal(ER_IB_MSG_713)
1517         << "Cannot write to tablespace " << recv_addr->space << " page number "
1518         << recv_addr->page_no;
1519   }
1520 }
1521 
1522 /** Apply a single log record stored in the hash table using default block.
1523 @param[in,out]	recv_addr	a parsed log record */
meb_apply_log_rec_func(recv_addr_t * recv_addr)1524 void meb_apply_log_rec_func(recv_addr_t *recv_addr) {
1525   meb_apply_log_record(recv_addr, back_block1);
1526 }
1527 
1528 /** Dummy wait function for meb_apply_log_recs_via_callback(). */
meb_nowait_func()1529 void meb_nowait_func() { return; }
1530 
1531 /** Applies log records in the hash table to a backup. */
meb_apply_log_recs()1532 void meb_apply_log_recs() {
1533   meb_apply_log_recs_via_callback(meb_apply_log_rec_func, meb_nowait_func);
1534 }
1535 
1536 /** Apply all log records in the hash table to a backup using callback
1537 functions. This function employes two callback functions that allow redo
1538 log records to be applied in parallel. The apply_log_record_function
1539 assigns a parsed redo log record for application. The
1540 apply_log_record_function is called repeatedly until all log records in
1541 the hash table are assigned for application. After that the
1542 wait_till_done_function is called once. The wait_till_done_function
1543 function blocks until the application of all the redo log records
1544 previously assigned with apply_log_record_function calls is complete.
1545 Even though this function assigns the log records in the hash table
1546 sequentially, the application of the log records may be done in parallel
1547 if the apply_log_record_function delegates the actual application work
1548 to multiple worker threads running in parallel.
1549 @param[in]  apply_log_record_function	a function that assigns one redo log
1550 record for application
1551 @param[in]  wait_till_done_function     a function that blocks until all
1552 assigned redo log records have been applied */
meb_apply_log_recs_via_callback(void (* apply_log_record_function)(recv_addr_t *),void (* wait_till_done_function)())1553 void meb_apply_log_recs_via_callback(
1554     void (*apply_log_record_function)(recv_addr_t *),
1555     void (*wait_till_done_function)()) {
1556   ulint n_hash_cells = recv_sys->n_addrs;
1557   ulint i = 0;
1558 
1559   recv_sys->apply_log_recs = true;
1560   recv_sys->apply_batch_on = true;
1561 
1562   ib::info(ER_IB_MSG_714) << "Starting to apply a batch of log records to the"
1563                           << " database...";
1564 
1565   fputs("InnoDB: Progress in percent: ", stderr);
1566 
1567   for (const auto &space : *recv_sys->spaces) {
1568     for (auto pages : space.second.m_pages) {
1569       ut_ad(pages.second->space == space.first);
1570 
1571       (*apply_log_record_function)(pages.second);
1572     }
1573 
1574     ++i;
1575     if ((100 * i) / n_hash_cells != (100 * (i + 1)) / n_hash_cells) {
1576       fprintf(stderr, "%lu ", (ulong)((100 * i) / n_hash_cells));
1577       fflush(stderr);
1578     }
1579   }
1580 
1581   /* wait till all the redo log records have been applied */
1582   (*wait_till_done_function)();
1583 
1584   /* write logs in next line */
1585   fprintf(stderr, "\n");
1586   recv_sys->apply_log_recs = false;
1587   recv_sys->apply_batch_on = false;
1588   recv_sys_empty_hash();
1589 }
1590 
1591 #endif /* !UNIV_HOTBACKUP */
1592 
1593 /** Try to parse a single log record body and also applies it if
1594 specified.
1595 @param[in]	type		redo log entry type
1596 @param[in]	ptr		redo log record body
1597 @param[in]	end_ptr		end of buffer
1598 @param[in]	space_id	tablespace identifier
1599 @param[in]	page_no		page number
1600 @param[in,out]	block		buffer block, or nullptr if
1601                                 a page log record should not be applied
1602                                 or if it is a MLOG_FILE_ operation
1603 @param[in,out]	mtr		mini-transaction, or nullptr if
1604                                 a page log record should not be applied
1605 @param[in]	parsed_bytes	Number of bytes parsed so far
1606 @return log record end, nullptr if not a complete record */
recv_parse_or_apply_log_rec_body(mlog_id_t type,byte * ptr,byte * end_ptr,space_id_t space_id,page_no_t page_no,buf_block_t * block,mtr_t * mtr,ulint parsed_bytes)1607 static byte *recv_parse_or_apply_log_rec_body(
1608     mlog_id_t type, byte *ptr, byte *end_ptr, space_id_t space_id,
1609     page_no_t page_no, buf_block_t *block, mtr_t *mtr, ulint parsed_bytes) {
1610   bool applying_redo = (block != nullptr);
1611 
1612   switch (type) {
1613 #ifndef UNIV_HOTBACKUP
1614     case MLOG_FILE_DELETE:
1615       /* error out backup if undo truncation happens during backup */
1616       if (srv_backup_mode && fsp_is_undo_tablespace(space_id) &&
1617           backup_redo_log_flushed_lsn < recv_sys->recovered_lsn) {
1618         ib::info() << "Last flushed lsn: " << backup_redo_log_flushed_lsn
1619                    << " undo_delete lsn " << recv_sys->recovered_lsn;
1620 
1621         ib::error(ER_IB_MSG_716)
1622             << "An undo ddl truncation (could be automatic)"
1623             << " operation has been"
1624             << " performed. \n"
1625             << " PXB will not be able to"
1626             << " take a consistent backup."
1627             << " Retry the backup"
1628             << " operation later or with --lock-ddl";
1629         exit(EXIT_FAILURE);
1630       }
1631 
1632       return (fil_tablespace_redo_delete(
1633           ptr, end_ptr, page_id_t(space_id, page_no), parsed_bytes,
1634           recv_sys->bytes_to_ignore_before_checkpoint != 0 ||
1635               recv_sys->recovered_lsn + parsed_bytes <
1636                   backup_redo_log_flushed_lsn));
1637 
1638     case MLOG_FILE_CREATE:
1639 
1640       return (fil_tablespace_redo_create(
1641           ptr, end_ptr, page_id_t(space_id, page_no), parsed_bytes,
1642           recv_sys->bytes_to_ignore_before_checkpoint != 0 ||
1643               recv_sys->recovered_lsn + parsed_bytes <
1644                   backup_redo_log_flushed_lsn));
1645 
1646     case MLOG_FILE_RENAME:
1647 
1648       return (fil_tablespace_redo_rename(
1649           ptr, end_ptr, page_id_t(space_id, page_no), parsed_bytes,
1650           recv_sys->bytes_to_ignore_before_checkpoint != 0 ||
1651               recv_sys->recovered_lsn + parsed_bytes <
1652                   backup_redo_log_flushed_lsn));
1653 #endif /* !UNIV_HOTBACKUP */
1654     case MLOG_INDEX_LOAD:
1655 #if defined(UNIV_HOTBACKUP) || defined(XTRABACKUP)
1656       /* While scaning redo logs during  backup phase a
1657       MLOG_INDEX_LOAD type redo log record indicates a DDL
1658       (create index, alter table...)is performed with
1659       'algorithm=inplace'. This redo log indicates that
1660 
1661       1. The DDL was started after MEB started backing up, in which
1662       case MEB will not be able to take a consistent backup and should
1663       fail. or
1664       2. There is a possibility of this record existing in the REDO
1665       even after the completion of the index create operation. This is
1666       because of InnoDB does  not checkpointing after the flushing the
1667       index pages.
1668 
1669       If MEB gets the last_redo_flush_lsn and that is less than the
1670       lsn of the current record MEB fails the backup process.
1671       Error out in case of online backup and emit a warning in case
1672       of offline backup and continue. */
1673       if (!recv_recovery_on) {
1674         if (!opt_lock_ddl_per_table) {
1675           if (backup_redo_log_flushed_lsn < recv_sys->recovered_lsn) {
1676             ib::info() << "Last flushed lsn: " << backup_redo_log_flushed_lsn
1677                        << " load_index lsn " << recv_sys->recovered_lsn;
1678 
1679             if (backup_redo_log_flushed_lsn == 0) {
1680               ib::error(ER_IB_MSG_715) << "PXB was not able"
1681                                        << " to determine the"
1682                                        << " InnoDB Engine"
1683                                        << " Status";
1684             }
1685 
1686             ib::error(ER_IB_MSG_716) << "An optimized (without"
1687                                      << " redo logging) DDL"
1688                                      << " operation has been"
1689                                      << " performed. All modified"
1690                                      << " pages may not have been"
1691                                      << " flushed to the disk yet.\n"
1692                                      << "    PXB will not be able to"
1693                                      << " take a consistent backup."
1694                                      << " Retry the backup"
1695                                      << " operation";
1696             exit(EXIT_FAILURE);
1697           }
1698           /** else the index is flushed to disk before
1699           backup started hence no error */
1700         } else {
1701           /* offline backup */
1702           ib::info() << "Last flushed lsn: " << backup_redo_log_flushed_lsn
1703                      << " load_index lsn " << recv_sys->recovered_lsn;
1704 
1705           ib::warn(ER_IB_MSG_717);
1706         }
1707       }
1708 #endif /* UNIV_HOTBACKUP */
1709       if (end_ptr < ptr + 8) {
1710         return (nullptr);
1711       }
1712 
1713       return (ptr + 8);
1714 
1715     case MLOG_WRITE_STRING:
1716 
1717 #ifdef UNIV_HOTBACKUP
1718       if (recv_recovery_on && meb_is_space_loaded(space_id)) {
1719 #endif /* UNIV_HOTBACKUP */
1720         /* For encrypted tablespace, we need to get the encryption key
1721         information before the page 0 is recovered. Otherwise, redo will not
1722         find the key to decrypt the data pages. */
1723         if (page_no == 0 && !applying_redo &&
1724             !fsp_is_system_or_temp_tablespace(space_id) &&
1725             /* For cloned db header page has the encryption information. */
1726             !recv_sys->is_cloned_db) {
1727           if (fil_tablespace_redo_encryption(ptr, end_ptr, space_id) == nullptr)
1728             return (nullptr);
1729         }
1730 #ifdef UNIV_HOTBACKUP
1731       }
1732 #endif /* UNIV_HOTBACKUP */
1733 
1734       break;
1735 
1736     default:
1737       break;
1738   }
1739 
1740   page_t *page;
1741   page_zip_des_t *page_zip;
1742   dict_index_t *index = nullptr;
1743 
1744 #ifdef UNIV_DEBUG
1745   ulint page_type;
1746 #endif /* UNIV_DEBUG */
1747 
1748 #if defined(UNIV_HOTBACKUP) && defined(UNIV_DEBUG)
1749   ib::trace_3() << "recv_parse_or_apply_log_rec_body { type: "
1750                 << get_mlog_string(type) << ", space_id: " << space_id
1751                 << ", page_no: " << page_no
1752                 << ", ptr : " << static_cast<const void *>(ptr)
1753                 << ", end_ptr: " << static_cast<const void *>(end_ptr)
1754                 << ", block: " << static_cast<const void *>(block)
1755                 << ", mtr: " << static_cast<const void *>(mtr) << " }";
1756 #endif /* UNIV_HOTBACKUP && UNIV_DEBUG */
1757 
1758   if (applying_redo) {
1759     /* Applying a page log record. */
1760     ut_ad(mtr != nullptr);
1761 
1762     page = block->frame;
1763     page_zip = buf_block_get_page_zip(block);
1764 
1765     ut_d(page_type = fil_page_get_type(page));
1766 #if defined(UNIV_HOTBACKUP) && defined(UNIV_DEBUG)
1767     if (page_type == 0) {
1768       meb_print_page_header(page);
1769     }
1770 #endif /* UNIV_HOTBACKUP && UNIV_DEBUG */
1771 
1772   } else {
1773     /* Parsing a page log record. */
1774     ut_ad(mtr == nullptr);
1775     page = nullptr;
1776     page_zip = nullptr;
1777 
1778     ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1779   }
1780 
1781   const byte *old_ptr = ptr;
1782 
1783   switch (type) {
1784 #ifdef UNIV_LOG_LSN_DEBUG
1785     case MLOG_LSN:
1786       /* The LSN is checked in recv_parse_log_rec(). */
1787       break;
1788 #endif /* UNIV_LOG_LSN_DEBUG */
1789     case MLOG_4BYTES:
1790 
1791       ut_ad(page == nullptr || end_ptr > ptr + 2);
1792 
1793       /* Most FSP flags can only be changed by CREATE or ALTER with
1794       ALGORITHM=COPY, so they do not change once the file
1795       is created. The SDI flag is the only one that can be
1796       changed by a recoverable transaction. So if there is
1797       change in FSP flags, update the in-memory space structure
1798       (fil_space_t) */
1799 
1800       if (page != nullptr && page_no == 0 &&
1801           mach_read_from_2(ptr) == FSP_HEADER_OFFSET + FSP_SPACE_FLAGS) {
1802         ptr = mlog_parse_nbytes(MLOG_4BYTES, ptr, end_ptr, page, page_zip);
1803 
1804         /* When applying log, we have complete records.
1805         They can be incomplete (ptr=nullptr) only during
1806         scanning (page==nullptr) */
1807 
1808         ut_ad(ptr != nullptr);
1809 
1810         fil_space_t *space = fil_space_acquire(space_id);
1811 
1812         ut_ad(space != nullptr);
1813 
1814         fil_space_set_flags(space, mach_read_from_4(FSP_HEADER_OFFSET +
1815                                                     FSP_SPACE_FLAGS + page));
1816         fil_space_release(space);
1817 
1818         break;
1819       }
1820 
1821       // fall through
1822 
1823     case MLOG_1BYTE:
1824       /* If 'ALTER TABLESPACE ... ENCRYPTION' was in progress and page 0 has
1825       REDO entry for this, set encryption_op_in_progress flag now so that any
1826       other page of this tablespace in redo log is written accordingly. */
1827       if (page_no == 0 && page != nullptr && end_ptr >= ptr + 2) {
1828         ulint offs = mach_read_from_2(ptr);
1829 
1830         fil_space_t *space = fil_space_acquire(space_id);
1831         ut_ad(space != nullptr);
1832         ulint offset = fsp_header_get_encryption_progress_offset(
1833             page_size_t(space->flags));
1834 
1835         if (offs == offset) {
1836           ptr = mlog_parse_nbytes(MLOG_1BYTE, ptr, end_ptr, page, page_zip);
1837           byte op = mach_read_from_1(page + offset);
1838           switch (op) {
1839             case Encryption::ENCRYPT_IN_PROGRESS:
1840               space->encryption_op_in_progress = ENCRYPTION;
1841               break;
1842             case Encryption::DECRYPT_IN_PROGRESS:
1843               space->encryption_op_in_progress = DECRYPTION;
1844               break;
1845             default:
1846               /* Don't reset operation in progress yet. It'll be done in
1847               fsp_resume_encryption_unencryption(). */
1848               break;
1849           }
1850         }
1851         fil_space_release(space);
1852       }
1853 
1854       // fall through
1855 
1856     case MLOG_2BYTES:
1857     case MLOG_8BYTES:
1858 #ifdef UNIV_DEBUG
1859       if (page && page_type == FIL_PAGE_TYPE_ALLOCATED && end_ptr >= ptr + 2) {
1860         /* It is OK to set FIL_PAGE_TYPE and certain
1861         list node fields on an empty page.  Any other
1862         write is not OK. */
1863 
1864         /* NOTE: There may be bogus assertion failures for
1865         dict_hdr_create(), trx_rseg_header_create(),
1866         trx_sys_create_doublewrite_buf(), and
1867         trx_sysf_create().
1868         These are only called during database creation. */
1869 
1870         ulint offs = mach_read_from_2(ptr);
1871 
1872         switch (type) {
1873           default:
1874             ut_error;
1875           case MLOG_2BYTES:
1876             /* Note that this can fail when the
1877             redo log been written with something
1878             older than InnoDB Plugin 1.0.4. */
1879             ut_ad(
1880                 offs == FIL_PAGE_TYPE ||
1881                 offs == IBUF_TREE_SEG_HEADER + IBUF_HEADER + FSEG_HDR_OFFSET ||
1882                 offs == PAGE_BTR_IBUF_FREE_LIST + PAGE_HEADER + FIL_ADDR_BYTE ||
1883                 offs == PAGE_BTR_IBUF_FREE_LIST + PAGE_HEADER + FIL_ADDR_BYTE +
1884                             FIL_ADDR_SIZE ||
1885                 offs == PAGE_BTR_SEG_LEAF + PAGE_HEADER + FSEG_HDR_OFFSET ||
1886                 offs == PAGE_BTR_SEG_TOP + PAGE_HEADER + FSEG_HDR_OFFSET ||
1887                 offs == PAGE_BTR_IBUF_FREE_LIST_NODE + PAGE_HEADER +
1888                             FIL_ADDR_BYTE + 0 /*FLST_PREV*/
1889                 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE + PAGE_HEADER +
1890                                FIL_ADDR_BYTE + FIL_ADDR_SIZE /*FLST_NEXT*/);
1891             break;
1892           case MLOG_4BYTES:
1893             /* Note that this can fail when the
1894             redo log been written with something
1895             older than InnoDB Plugin 1.0.4. */
1896             ut_ad(
1897                 0 ||
1898                 offs == IBUF_TREE_SEG_HEADER + IBUF_HEADER + FSEG_HDR_SPACE ||
1899                 offs == IBUF_TREE_SEG_HEADER + IBUF_HEADER + FSEG_HDR_PAGE_NO ||
1900                 offs == PAGE_BTR_IBUF_FREE_LIST + PAGE_HEADER /* flst_init */
1901                 ||
1902                 offs == PAGE_BTR_IBUF_FREE_LIST + PAGE_HEADER + FIL_ADDR_PAGE ||
1903                 offs == PAGE_BTR_IBUF_FREE_LIST + PAGE_HEADER + FIL_ADDR_PAGE +
1904                             FIL_ADDR_SIZE ||
1905                 offs == PAGE_BTR_SEG_LEAF + PAGE_HEADER + FSEG_HDR_PAGE_NO ||
1906                 offs == PAGE_BTR_SEG_LEAF + PAGE_HEADER + FSEG_HDR_SPACE ||
1907                 offs == PAGE_BTR_SEG_TOP + PAGE_HEADER + FSEG_HDR_PAGE_NO ||
1908                 offs == PAGE_BTR_SEG_TOP + PAGE_HEADER + FSEG_HDR_SPACE ||
1909                 offs == PAGE_BTR_IBUF_FREE_LIST_NODE + PAGE_HEADER +
1910                             FIL_ADDR_PAGE + 0 /*FLST_PREV*/
1911                 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE + PAGE_HEADER +
1912                                FIL_ADDR_PAGE + FIL_ADDR_SIZE /*FLST_NEXT*/);
1913             break;
1914         }
1915       }
1916 #endif /* UNIV_DEBUG */
1917 
1918       ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1919 
1920       if (ptr != nullptr && page != nullptr && page_no == 0 &&
1921           type == MLOG_4BYTES) {
1922         ulint offs = mach_read_from_2(old_ptr);
1923 
1924         switch (offs) {
1925           fil_space_t *space;
1926           uint32_t val;
1927           default:
1928             break;
1929 
1930           case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1931           case FSP_HEADER_OFFSET + FSP_SIZE:
1932           case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1933           case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1934 
1935             space = fil_space_get(space_id);
1936 
1937             ut_a(space != nullptr);
1938 
1939             val = mach_read_from_4(page + offs);
1940 
1941             switch (offs) {
1942               case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1943                 space->flags = val;
1944                 break;
1945 
1946               case FSP_HEADER_OFFSET + FSP_SIZE:
1947 
1948                 space->size_in_header = val;
1949 
1950                 if (space->size >= val) {
1951                   break;
1952                 }
1953 
1954                 ib::info(ER_IB_MSG_718, ulong{space->id}, space->name,
1955                          ulong{val});
1956 
1957                 if (fil_space_extend(space, val)) {
1958                   break;
1959                 }
1960 
1961                 ib::error(ER_IB_MSG_719, ulong{space->id}, space->name,
1962                           ulong{val});
1963                 break;
1964 
1965               case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1966                 space->free_limit = val;
1967                 break;
1968 
1969               case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1970                 space->free_len = val;
1971                 ut_ad(val == flst_get_len(page + offs));
1972                 break;
1973             }
1974         }
1975       }
1976       break;
1977 
1978     case MLOG_REC_INSERT:
1979     case MLOG_COMP_REC_INSERT:
1980 
1981       ut_ad(!page || fil_page_type_is_index(page_type));
1982 
1983       if (nullptr !=
1984           (ptr = mlog_parse_index(ptr, end_ptr, type == MLOG_COMP_REC_INSERT,
1985                                   &index))) {
1986         ut_a(!page ||
1987              (ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
1988 
1989         ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr, block, index, mtr);
1990       }
1991 
1992       break;
1993 
1994     case MLOG_REC_CLUST_DELETE_MARK:
1995     case MLOG_COMP_REC_CLUST_DELETE_MARK:
1996 
1997       ut_ad(!page || fil_page_type_is_index(page_type));
1998 
1999       if (nullptr != (ptr = mlog_parse_index(
2000                           ptr, end_ptr, type == MLOG_COMP_REC_CLUST_DELETE_MARK,
2001                           &index))) {
2002         ut_a(!page ||
2003              (ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
2004 
2005         ptr = btr_cur_parse_del_mark_set_clust_rec(ptr, end_ptr, page, page_zip,
2006                                                    index);
2007       }
2008 
2009       break;
2010 
2011     case MLOG_COMP_REC_SEC_DELETE_MARK:
2012 
2013       ut_ad(!page || fil_page_type_is_index(page_type));
2014 
2015       /* This log record type is obsolete, but we process it for
2016       backward compatibility with MySQL 5.0.3 and 5.0.4. */
2017 
2018       ut_a(!page || page_is_comp(page));
2019       ut_a(!page_zip);
2020 
2021       ptr = mlog_parse_index(ptr, end_ptr, true, &index);
2022 
2023       if (ptr == nullptr) {
2024         break;
2025       }
2026 
2027       /* Fall through */
2028 
2029     case MLOG_REC_SEC_DELETE_MARK:
2030 
2031       ut_ad(!page || fil_page_type_is_index(page_type));
2032 
2033       ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr, page, page_zip);
2034       break;
2035 
2036     case MLOG_REC_UPDATE_IN_PLACE:
2037     case MLOG_COMP_REC_UPDATE_IN_PLACE:
2038 
2039       ut_ad(!page || fil_page_type_is_index(page_type));
2040 
2041       if (nullptr !=
2042           (ptr = mlog_parse_index(
2043                ptr, end_ptr, type == MLOG_COMP_REC_UPDATE_IN_PLACE, &index))) {
2044         ut_a(!page ||
2045              (ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
2046 
2047         ptr =
2048             btr_cur_parse_update_in_place(ptr, end_ptr, page, page_zip, index);
2049       }
2050 
2051       break;
2052 
2053     case MLOG_LIST_END_DELETE:
2054     case MLOG_COMP_LIST_END_DELETE:
2055     case MLOG_LIST_START_DELETE:
2056     case MLOG_COMP_LIST_START_DELETE:
2057 
2058       ut_ad(!page || fil_page_type_is_index(page_type));
2059 
2060       if (nullptr !=
2061           (ptr = mlog_parse_index(ptr, end_ptr,
2062                                   type == MLOG_COMP_LIST_END_DELETE ||
2063                                       type == MLOG_COMP_LIST_START_DELETE,
2064                                   &index))) {
2065         ut_a(!page ||
2066              (ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
2067 
2068         ptr = page_parse_delete_rec_list(type, ptr, end_ptr, block, index, mtr);
2069       }
2070 
2071       break;
2072 
2073     case MLOG_LIST_END_COPY_CREATED:
2074     case MLOG_COMP_LIST_END_COPY_CREATED:
2075 
2076       ut_ad(!page || fil_page_type_is_index(page_type));
2077 
2078       if (nullptr != (ptr = mlog_parse_index(
2079                           ptr, end_ptr, type == MLOG_COMP_LIST_END_COPY_CREATED,
2080                           &index))) {
2081         ut_a(!page ||
2082              (ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
2083 
2084         ptr = page_parse_copy_rec_list_to_created_page(ptr, end_ptr, block,
2085                                                        index, mtr);
2086       }
2087 
2088       break;
2089 
2090     case MLOG_PAGE_REORGANIZE:
2091     case MLOG_COMP_PAGE_REORGANIZE:
2092     case MLOG_ZIP_PAGE_REORGANIZE:
2093 
2094       ut_ad(!page || fil_page_type_is_index(page_type));
2095 
2096       if (nullptr !=
2097           (ptr = mlog_parse_index(ptr, end_ptr, type != MLOG_PAGE_REORGANIZE,
2098                                   &index))) {
2099         ut_a(!page ||
2100              (ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
2101 
2102         ptr = btr_parse_page_reorganize(
2103             ptr, end_ptr, index, type == MLOG_ZIP_PAGE_REORGANIZE, block, mtr);
2104       }
2105 
2106       break;
2107 
2108     case MLOG_PAGE_CREATE:
2109     case MLOG_COMP_PAGE_CREATE:
2110 
2111       /* Allow anything in page_type when creating a page. */
2112       ut_a(!page_zip);
2113 
2114       page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, FIL_PAGE_INDEX);
2115 
2116       break;
2117 
2118     case MLOG_PAGE_CREATE_RTREE:
2119     case MLOG_COMP_PAGE_CREATE_RTREE:
2120 
2121       page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
2122                         FIL_PAGE_RTREE);
2123 
2124       break;
2125 
2126     case MLOG_PAGE_CREATE_SDI:
2127     case MLOG_COMP_PAGE_CREATE_SDI:
2128 
2129       page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_SDI, FIL_PAGE_SDI);
2130 
2131       break;
2132 
2133     case MLOG_UNDO_INSERT:
2134 
2135       ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2136 
2137       ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
2138 
2139       break;
2140 
2141     case MLOG_UNDO_ERASE_END:
2142 
2143       ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2144 
2145       ptr = trx_undo_parse_erase_page_end(ptr, end_ptr, page, mtr);
2146 
2147       break;
2148 
2149     case MLOG_UNDO_INIT:
2150 
2151       /* Allow anything in page_type when creating a page. */
2152 
2153       ptr = trx_undo_parse_page_init(ptr, end_ptr, page, mtr);
2154 
2155       break;
2156     case MLOG_UNDO_HDR_CREATE:
2157     case MLOG_UNDO_HDR_REUSE:
2158 
2159       ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2160 
2161       ptr = trx_undo_parse_page_header(type, ptr, end_ptr, page, mtr);
2162 
2163       break;
2164 
2165     case MLOG_REC_MIN_MARK:
2166     case MLOG_COMP_REC_MIN_MARK:
2167 
2168       ut_ad(!page || fil_page_type_is_index(page_type));
2169 
2170       /* On a compressed page, MLOG_COMP_REC_MIN_MARK
2171       will be followed by MLOG_COMP_REC_DELETE
2172       or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_nullptr)
2173       in the same mini-transaction. */
2174 
2175       ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
2176 
2177       ptr = btr_parse_set_min_rec_mark(
2178           ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK, page, mtr);
2179 
2180       break;
2181 
2182     case MLOG_REC_DELETE:
2183     case MLOG_COMP_REC_DELETE:
2184 
2185       ut_ad(!page || fil_page_type_is_index(page_type));
2186 
2187       if (nullptr !=
2188           (ptr = mlog_parse_index(ptr, end_ptr, type == MLOG_COMP_REC_DELETE,
2189                                   &index))) {
2190         ut_a(!page ||
2191              (ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
2192 
2193         ptr = page_cur_parse_delete_rec(ptr, end_ptr, block, index, mtr);
2194       }
2195 
2196       break;
2197 
2198     case MLOG_IBUF_BITMAP_INIT:
2199 
2200       /* Allow anything in page_type when creating a page. */
2201 
2202       ptr = ibuf_parse_bitmap_init(ptr, end_ptr, block, mtr);
2203 
2204       break;
2205 
2206     case MLOG_INIT_FILE_PAGE:
2207     case MLOG_INIT_FILE_PAGE2:
2208 
2209       /* Allow anything in page_type when creating a page. */
2210 
2211       ptr = fsp_parse_init_file_page(ptr, end_ptr, block);
2212 
2213       break;
2214 
2215     case MLOG_WRITE_STRING:
2216 
2217       ut_ad(!page || page_type != FIL_PAGE_TYPE_ALLOCATED || page_no == 0);
2218 
2219       ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
2220 
2221       break;
2222 
2223     case MLOG_ZIP_WRITE_NODE_PTR:
2224 
2225       ut_ad(!page || fil_page_type_is_index(page_type));
2226 
2227       ptr = page_zip_parse_write_node_ptr(ptr, end_ptr, page, page_zip);
2228 
2229       break;
2230 
2231     case MLOG_ZIP_WRITE_BLOB_PTR:
2232 
2233       ut_ad(!page || fil_page_type_is_index(page_type));
2234 
2235       ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr, page, page_zip);
2236 
2237       break;
2238 
2239     case MLOG_ZIP_WRITE_HEADER:
2240 
2241       ut_ad(!page || fil_page_type_is_index(page_type));
2242 
2243       ptr = page_zip_parse_write_header(ptr, end_ptr, page, page_zip);
2244 
2245       break;
2246 
2247     case MLOG_ZIP_PAGE_COMPRESS:
2248 
2249       /* Allow anything in page_type when creating a page. */
2250       ptr = page_zip_parse_compress(ptr, end_ptr, page, page_zip);
2251       break;
2252 
2253     case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
2254 
2255       if (nullptr != (ptr = mlog_parse_index(ptr, end_ptr, true, &index))) {
2256         ut_a(!page || ((ibool) !!page_is_comp(page) ==
2257                        dict_table_is_comp(index->table)));
2258 
2259         ptr = page_zip_parse_compress_no_data(ptr, end_ptr, page, page_zip,
2260                                               index);
2261       }
2262 
2263       break;
2264 
2265     case MLOG_TEST:
2266 #ifndef UNIV_HOTBACKUP
2267       if (log_test != nullptr) {
2268         ptr = log_test->parse_mlog_rec(ptr, end_ptr);
2269       } else {
2270         /* Just parse and ignore record to pass it and go forward. Note that
2271         this record is also used in the innodb.log_first_rec_group mtr test. The
2272         record is written in the buf0flu.cc when flushing page in that case. */
2273         Log_test::Key key;
2274         Log_test::Value value;
2275         lsn_t start_lsn, end_lsn;
2276 
2277         ptr = Log_test::parse_mlog_rec(ptr, end_ptr, key, value, start_lsn,
2278                                        end_lsn);
2279       }
2280       break;
2281 #endif /* !UNIV_HOTBACKUP */
2282        /* Fall through. */
2283 
2284     default:
2285       ptr = nullptr;
2286       recv_sys->found_corrupt_log = true;
2287   }
2288 
2289   if (index != nullptr) {
2290     dict_table_t *table = index->table;
2291 
2292     dict_mem_index_free(index);
2293     dict_mem_table_free(table);
2294   }
2295 
2296   return (ptr);
2297 }
2298 
2299 /** Adds a new log record to the hash table of log records.
2300 @param[in]	type		log record type
2301 @param[in]	space_id	Tablespace id
2302 @param[in]	page_no		page number
2303 @param[in]	body		log record body
2304 @param[in]	rec_end		log record end
2305 @param[in]	start_lsn	start lsn of the mtr
2306 @param[in]	end_lsn		end lsn of the mtr */
recv_add_to_hash_table(mlog_id_t type,space_id_t space_id,page_no_t page_no,byte * body,byte * rec_end,lsn_t start_lsn,lsn_t end_lsn)2307 static void recv_add_to_hash_table(mlog_id_t type, space_id_t space_id,
2308                                    page_no_t page_no, byte *body, byte *rec_end,
2309                                    lsn_t start_lsn, lsn_t end_lsn) {
2310   ut_ad(type != MLOG_FILE_DELETE);
2311   ut_ad(type != MLOG_FILE_CREATE);
2312   ut_ad(type != MLOG_FILE_RENAME);
2313   ut_ad(type != MLOG_DUMMY_RECORD);
2314   ut_ad(type != MLOG_INDEX_LOAD);
2315 
2316   recv_sys_t::Space *space;
2317 
2318   space = recv_get_page_map(space_id, true);
2319 
2320   recv_t *recv;
2321 
2322   recv = static_cast<recv_t *>(mem_heap_alloc(space->m_heap, sizeof(*recv)));
2323 
2324   recv->type = type;
2325   recv->end_lsn = end_lsn;
2326   recv->len = rec_end - body;
2327   recv->start_lsn = start_lsn;
2328 
2329   auto it = space->m_pages.find(page_no);
2330 
2331   recv_addr_t *recv_addr;
2332 
2333   if (it != space->m_pages.end()) {
2334     recv_addr = it->second;
2335 
2336   } else {
2337     recv_addr = static_cast<recv_addr_t *>(
2338         mem_heap_alloc(space->m_heap, sizeof(*recv_addr)));
2339 
2340     recv_addr->space = space_id;
2341     recv_addr->page_no = page_no;
2342     recv_addr->state = RECV_NOT_PROCESSED;
2343 
2344     UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
2345 
2346     using value_type = recv_sys_t::Pages::value_type;
2347 
2348     space->m_pages.insert(it, value_type(page_no, recv_addr));
2349 
2350     ++recv_sys->n_addrs;
2351   }
2352 
2353   UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
2354 
2355   recv_data_t **prev_field;
2356 
2357   prev_field = &recv->data;
2358 
2359   /* Store the log record body in chunks of less than UNIV_PAGE_SIZE:
2360   the heap grows into the buffer pool, and bigger chunks could not
2361   be allocated */
2362 
2363   while (rec_end > body) {
2364     ulint len = rec_end - body;
2365 
2366     if (len > RECV_DATA_BLOCK_SIZE) {
2367       len = RECV_DATA_BLOCK_SIZE;
2368     }
2369 
2370     recv_data_t *recv_data;
2371 
2372     recv_data = static_cast<recv_data_t *>(
2373         mem_heap_alloc(space->m_heap, sizeof(*recv_data) + len));
2374 
2375     *prev_field = recv_data;
2376 
2377     memcpy(recv_data + 1, body, len);
2378 
2379     prev_field = &recv_data->next;
2380 
2381     body += len;
2382   }
2383 
2384   *prev_field = nullptr;
2385 }
2386 
2387 /** Copies the log record body from recv to buf.
2388 @param[in]	buf		Buffer of length at least recv->len
2389 @param[in]	recv		Log record */
recv_data_copy_to_buf(byte * buf,recv_t * recv)2390 static void recv_data_copy_to_buf(byte *buf, recv_t *recv) {
2391   ulint len = recv->len;
2392   recv_data_t *recv_data = recv->data;
2393 
2394   while (len > 0) {
2395     ulint part_len;
2396 
2397     if (len > RECV_DATA_BLOCK_SIZE) {
2398       part_len = RECV_DATA_BLOCK_SIZE;
2399     } else {
2400       part_len = len;
2401     }
2402 
2403     memcpy(buf, ((byte *)recv_data) + sizeof(*recv_data), part_len);
2404 
2405     buf += part_len;
2406     len -= part_len;
2407 
2408     recv_data = recv_data->next;
2409   }
2410 }
2411 
2412 /** Applies the hashed log records to the page, if the page lsn is less than the
2413 lsn of a log record. This can be called when a buffer page has just been
2414 read in, or also for a page already in the buffer pool. */
2415 #ifndef UNIV_HOTBACKUP
2416 /**
2417 @param[in]	just_read_in	true if the IO handler calls this for a freshly
2418                                 read page */
2419 #endif /* !UNIV_HOTBACKUP */
2420 /**
2421 @param[in,out]	block		Buffer block */
recv_recover_page_func(bool just_read_in,buf_block_t * block)2422 void recv_recover_page_func(
2423 #ifndef UNIV_HOTBACKUP
2424     bool just_read_in,
2425 #endif /* !UNIV_HOTBACKUP */
2426     buf_block_t *block) {
2427   mutex_enter(&recv_sys->mutex);
2428 
2429   if (recv_sys->apply_log_recs == false) {
2430     /* Log records should not be applied now */
2431 
2432     mutex_exit(&recv_sys->mutex);
2433 
2434     return;
2435   }
2436 
2437   recv_addr_t *recv_addr;
2438 
2439   recv_addr = recv_get_rec(block->page.id.space(), block->page.id.page_no());
2440 
2441   if (recv_addr == nullptr || recv_addr->state == RECV_BEING_PROCESSED ||
2442       recv_addr->state == RECV_PROCESSED) {
2443 #ifndef UNIV_HOTBACKUP
2444     ut_ad(recv_addr == nullptr || recv_needed_recovery ||
2445           recv_sys->scanned_lsn < recv_sys->checkpoint_lsn);
2446 #endif /* !UNIV_HOTBACKUP */
2447 
2448     mutex_exit(&recv_sys->mutex);
2449 
2450     return;
2451   }
2452 
2453 #ifndef UNIV_HOTBACKUP
2454   buf_page_t bpage = block->page;
2455 
2456   if (!fsp_is_system_temporary(bpage.id.space()) &&
2457       (arch_page_sys != nullptr && arch_page_sys->is_active())) {
2458     page_t *frame;
2459     lsn_t frame_lsn;
2460 
2461     frame = bpage.zip.data;
2462 
2463     if (!frame) {
2464       frame = block->frame;
2465     }
2466     frame_lsn = mach_read_from_8(frame + FIL_PAGE_LSN);
2467 
2468     arch_page_sys->track_page(&bpage, LSN_MAX, frame_lsn, true);
2469   }
2470 #endif /* !UNIV_HOTBACKUP */
2471 
2472 #ifndef UNIV_HOTBACKUP
2473   /* this is explicitly false in case of meb, skip the assert */
2474   ut_ad(recv_needed_recovery ||
2475         recv_sys->scanned_lsn < recv_sys->checkpoint_lsn);
2476 
2477   DBUG_PRINT("ib_log", ("Applying log to page %u:%u", recv_addr->space,
2478                         recv_addr->page_no));
2479 
2480 #ifdef UNIV_DEBUG
2481   lsn_t max_lsn;
2482 
2483   ut_d(max_lsn = log_sys->scanned_lsn);
2484 #endif /* UNIV_DEBUG */
2485 #else  /* !UNIV_HOTBACKUP */
2486   ib::trace_2() << "Applying log to space " << recv_addr->space << " page "
2487                 << recv_addr->page_no;
2488 #endif /* !UNIV_HOTBACKUP */
2489 
2490   recv_addr->state = RECV_BEING_PROCESSED;
2491 
2492   mutex_exit(&recv_sys->mutex);
2493 
2494   mtr_t mtr;
2495 
2496   mtr_start(&mtr);
2497 
2498   mtr_set_log_mode(&mtr, MTR_LOG_NONE);
2499 
2500   page_t *page = block->frame;
2501 
2502   page_zip_des_t *page_zip = buf_block_get_page_zip(block);
2503 
2504 #ifndef UNIV_HOTBACKUP
2505   if (just_read_in) {
2506     /* Move the ownership of the x-latch on the page to
2507     this OS thread, so that we can acquire a second
2508     x-latch on it.  This is needed for the operations to
2509     the page to pass the debug checks. */
2510 
2511     rw_lock_x_lock_move_ownership(&block->lock);
2512   }
2513 
2514   bool success = buf_page_get_known_nowait(
2515       RW_X_LATCH, block, Cache_hint::KEEP_OLD, __FILE__, __LINE__, &mtr);
2516   ut_a(success);
2517 
2518   buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2519 #endif /* !UNIV_HOTBACKUP */
2520 
2521   /* Read the newest modification lsn from the page */
2522   lsn_t page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
2523 
2524 #ifndef UNIV_HOTBACKUP
2525 
2526   /* It may be that the page has been modified in the buffer
2527   pool: read the newest modification LSN there */
2528 
2529   lsn_t page_newest_lsn;
2530 
2531   page_newest_lsn = buf_page_get_newest_modification(&block->page);
2532 
2533   if (page_newest_lsn) {
2534     page_lsn = page_newest_lsn;
2535   }
2536 #else  /* !UNIV_HOTBACKUP */
2537   /* In recovery from a backup we do not really use the buffer pool */
2538   lsn_t page_newest_lsn = 0;
2539   /* Count applied and skipped log records */
2540   size_t applied_recs = 0;
2541   size_t skipped_recs = 0;
2542 #endif /* !UNIV_HOTBACKUP */
2543 
2544 #ifndef UNIV_HOTBACKUP
2545   lsn_t end_lsn = 0;
2546 #endif /* !UNIV_HOTBACKUP */
2547   lsn_t start_lsn = 0;
2548   bool modification_to_page = false;
2549 
2550   for (auto recv = UT_LIST_GET_FIRST(recv_addr->rec_list); recv != nullptr;
2551        recv = UT_LIST_GET_NEXT(rec_list, recv)) {
2552 #ifndef UNIV_HOTBACKUP
2553     end_lsn = recv->end_lsn;
2554 
2555     ut_ad(end_lsn <= max_lsn);
2556 #endif /* !UNIV_HOTBACKUP */
2557 
2558     byte *buf;
2559 
2560     if (recv->len > RECV_DATA_BLOCK_SIZE) {
2561       /* We have to copy the record body to a separate
2562       buffer */
2563 
2564       buf = static_cast<byte *>(ut_malloc_nokey(recv->len));
2565 
2566       recv_data_copy_to_buf(buf, recv);
2567     } else {
2568       buf = ((byte *)(recv->data)) + sizeof(recv_data_t);
2569     }
2570 
2571     if (recv->type == MLOG_INIT_FILE_PAGE) {
2572       page_lsn = page_newest_lsn;
2573 
2574       memset(FIL_PAGE_LSN + page, 0, 8);
2575       memset(UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + page, 0, 8);
2576 
2577       if (page_zip) {
2578         memset(FIL_PAGE_LSN + page_zip->data, 0, 8);
2579       }
2580     }
2581 
2582     /* Ignore applying the redo logs for tablespace that is
2583     truncated. Truncated tablespaces are handled explicitly
2584     post-recovery, where we will restore the tablespace back
2585     to a normal state.
2586 
2587     Applying redo at this stage will cause problems because the
2588     redo will have action recorded on page before tablespace
2589     was re-inited and that would lead to a problem later. */
2590 
2591     if (recv->start_lsn >= page_lsn
2592 #ifndef UNIV_HOTBACKUP
2593         && undo::is_active(recv_addr->space)
2594 #endif /* !UNIV_HOTBACKUP */
2595     ) {
2596 
2597       lsn_t end_lsn;
2598 
2599       if (!modification_to_page) {
2600 #ifndef UNIV_HOTBACKUP
2601         ut_a(recv_needed_recovery);
2602 #endif /* !UNIV_HOTBACKUP */
2603         modification_to_page = true;
2604         start_lsn = recv->start_lsn;
2605       }
2606 
2607       DBUG_PRINT("ib_log", ("apply " LSN_PF ":"
2608                             " %s len " ULINTPF " page %u:%u",
2609                             recv->start_lsn, get_mlog_string(recv->type),
2610                             recv->len, recv_addr->space, recv_addr->page_no));
2611 
2612       recv_parse_or_apply_log_rec_body(recv->type, buf, buf + recv->len,
2613                                        recv_addr->space, recv_addr->page_no,
2614                                        block, &mtr, ULINT_UNDEFINED);
2615 
2616       end_lsn = recv->start_lsn + recv->len;
2617 
2618       mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
2619 
2620       mach_write_to_8(UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + page,
2621                       end_lsn);
2622 
2623       if (page_zip) {
2624         mach_write_to_8(FIL_PAGE_LSN + page_zip->data, end_lsn);
2625       }
2626 #ifdef UNIV_HOTBACKUP
2627       ++applied_recs;
2628     } else {
2629       ++skipped_recs;
2630 #endif /* UNIV_HOTBACKUP */
2631     }
2632 
2633     if (recv->len > RECV_DATA_BLOCK_SIZE) {
2634       ut_free(buf);
2635     }
2636   }
2637 
2638 #ifdef UNIV_ZIP_DEBUG
2639   if (fil_page_index_page_check(page)) {
2640     page_zip_des_t *page_zip = buf_block_get_page_zip(block);
2641 
2642     ut_a(!page_zip || page_zip_validate_low(page_zip, page, nullptr, FALSE));
2643   }
2644 #endif /* UNIV_ZIP_DEBUG */
2645 
2646 #ifndef UNIV_HOTBACKUP
2647   if (modification_to_page) {
2648     buf_flush_recv_note_modification(block, start_lsn, end_lsn);
2649   }
2650 #else  /* !UNIV_HOTBACKUP */
2651   UT_NOT_USED(start_lsn);
2652 #endif /* !UNIV_HOTBACKUP */
2653 
2654   /* Make sure that committing mtr does not change the modification
2655   LSN values of page */
2656 
2657   mtr.discard_modifications();
2658 
2659   mtr_commit(&mtr);
2660 
2661   mutex_enter(&recv_sys->mutex);
2662 
2663   if (recv_max_page_lsn < page_lsn) {
2664     recv_max_page_lsn = page_lsn;
2665   }
2666 
2667   recv_addr->state = RECV_PROCESSED;
2668 
2669   ut_a(recv_sys->n_addrs > 0);
2670   --recv_sys->n_addrs;
2671 
2672   mutex_exit(&recv_sys->mutex);
2673 
2674 #ifdef UNIV_HOTBACKUP
2675   ib::trace_2() << "Applied " << applied_recs << " Skipped " << skipped_recs;
2676 #endif /* UNIV_HOTBACKUP */
2677 }
2678 
2679 /** Tries to parse a single log record.
2680 @param[out]	type		log record type
2681 @param[in]	ptr		pointer to a buffer
2682 @param[in]	end_ptr		end of the buffer
2683 @param[out]	space_id	tablespace identifier
2684 @param[out]	page_no		page number
2685 @param[out]	body		start of log record body
2686 @return length of the record, or 0 if the record was not complete */
recv_parse_log_rec(mlog_id_t * type,byte * ptr,byte * end_ptr,space_id_t * space_id,page_no_t * page_no,byte ** body)2687 static ulint recv_parse_log_rec(mlog_id_t *type, byte *ptr, byte *end_ptr,
2688                                 space_id_t *space_id, page_no_t *page_no,
2689                                 byte **body) {
2690   byte *new_ptr;
2691 
2692   *body = nullptr;
2693 
2694   UNIV_MEM_INVALID(type, sizeof *type);
2695   UNIV_MEM_INVALID(space_id, sizeof *space_id);
2696   UNIV_MEM_INVALID(page_no, sizeof *page_no);
2697   UNIV_MEM_INVALID(body, sizeof *body);
2698 
2699   if (ptr == end_ptr) {
2700     return (0);
2701   }
2702 
2703   switch (*ptr) {
2704 #ifdef UNIV_LOG_LSN_DEBUG
2705     case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
2706     case MLOG_LSN:
2707 
2708       new_ptr =
2709           mlog_parse_initial_log_record(ptr, end_ptr, type, space_id, page_no);
2710 
2711       if (new_ptr != nullptr) {
2712         const lsn_t lsn = static_cast<lsn_t>(*space_id) << 32 | *page_no;
2713 
2714         ut_a(lsn == recv_sys->recovered_lsn);
2715       }
2716 
2717       *type = MLOG_LSN;
2718       return (new_ptr == nullptr ? 0 : new_ptr - ptr);
2719 #endif /* UNIV_LOG_LSN_DEBUG */
2720 
2721     case MLOG_MULTI_REC_END:
2722     case MLOG_DUMMY_RECORD:
2723       *page_no = FIL_NULL;
2724       *space_id = SPACE_UNKNOWN;
2725       *type = static_cast<mlog_id_t>(*ptr);
2726       return (1);
2727 
2728     case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
2729     case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
2730       recv_sys->found_corrupt_log = true;
2731       return (0);
2732 
2733     case MLOG_TABLE_DYNAMIC_META:
2734     case MLOG_TABLE_DYNAMIC_META | MLOG_SINGLE_REC_FLAG:
2735 
2736       table_id_t id;
2737       uint64 version;
2738 
2739       *page_no = FIL_NULL;
2740       *space_id = SPACE_UNKNOWN;
2741 
2742       new_ptr =
2743           mlog_parse_initial_dict_log_record(ptr, end_ptr, type, &id, &version);
2744 
2745       if (new_ptr != nullptr) {
2746         new_ptr = recv_sys->metadata_recover->parseMetadataLog(
2747             id, version, new_ptr, end_ptr);
2748       }
2749 
2750       return (new_ptr == nullptr ? 0 : new_ptr - ptr);
2751   }
2752 
2753   new_ptr =
2754       mlog_parse_initial_log_record(ptr, end_ptr, type, space_id, page_no);
2755 
2756   *body = new_ptr;
2757 
2758   if (new_ptr == nullptr) {
2759     return (0);
2760   }
2761 
2762   new_ptr = recv_parse_or_apply_log_rec_body(*type, new_ptr, end_ptr, *space_id,
2763                                              *page_no, nullptr, nullptr,
2764                                              new_ptr - ptr);
2765 
2766   if (new_ptr == nullptr) {
2767     return (0);
2768   }
2769 
2770   return (new_ptr - ptr);
2771 }
2772 
2773 /** Subtracts next number of bytes to ignore before we reach the checkpoint
2774 or returns information that there was nothing more to skip.
2775 @param[in]	next_parsed_bytes	number of next bytes that were parsed,
2776 which are supposed to be subtracted from bytes to ignore before checkpoint
2777 @retval	true	there were still bytes to ignore
2778 @retval false	there was already 0 bytes to ignore, nothing changed. */
recv_update_bytes_to_ignore_before_checkpoint(size_t next_parsed_bytes)2779 static bool recv_update_bytes_to_ignore_before_checkpoint(
2780     size_t next_parsed_bytes) {
2781   auto &to_ignore = recv_sys->bytes_to_ignore_before_checkpoint;
2782 
2783   if (to_ignore != 0) {
2784     if (to_ignore >= next_parsed_bytes) {
2785       to_ignore -= next_parsed_bytes;
2786     } else {
2787       to_ignore = 0;
2788     }
2789     return (true);
2790   }
2791 
2792   return (false);
2793 }
2794 
2795 /** Tracks changes of recovered_lsn and tracks proper values for what
2796 first_rec_group should be for consecutive blocks. Must be called when
2797 recv_sys->recovered_lsn is changed to next lsn pointing at boundary
2798 between consecutive parsed mini transactions. */
recv_track_changes_of_recovered_lsn()2799 static void recv_track_changes_of_recovered_lsn() {
2800   if (recv_sys->parse_start_lsn == 0) {
2801     return;
2802   }
2803   /* If we have already found the first block with mtr beginning there,
2804   we started to track boundaries between blocks. Since then we track
2805   all proper values of first_rec_group for consecutive blocks.
2806   The reason for that is to ensure that the first_rec_group of the last
2807   block is correct. Even though we do not depend during this recovery
2808   on that value, it would become important if we crashed later, because
2809   the last recovered block would become the first used block in redo and
2810   since then we would depend on a proper value of first_rec_group there.
2811   The checksums of log blocks should detect if it was incorrect, but the
2812   checksums might be disabled in the configuration. */
2813   const auto old_block =
2814       recv_sys->previous_recovered_lsn / OS_FILE_LOG_BLOCK_SIZE;
2815 
2816   const auto new_block = recv_sys->recovered_lsn / OS_FILE_LOG_BLOCK_SIZE;
2817 
2818   if (old_block != new_block) {
2819     ut_a(new_block > old_block);
2820 
2821     recv_sys->last_block_first_rec_group =
2822         recv_sys->recovered_lsn % OS_FILE_LOG_BLOCK_SIZE;
2823   }
2824 
2825   recv_sys->previous_recovered_lsn = recv_sys->recovered_lsn;
2826 }
2827 
2828 /** Parse and store a single log record entry.
2829 @param[in]	ptr		start of buffer
2830 @param[in]	end_ptr		end of buffer
2831 @return true if end of processing */
recv_single_rec(byte * ptr,byte * end_ptr)2832 static bool recv_single_rec(byte *ptr, byte *end_ptr) {
2833   /* The mtr did not modify multiple pages */
2834 
2835   lsn_t old_lsn = recv_sys->recovered_lsn;
2836 
2837   /* Try to parse a log record, fetching its type, space id,
2838   page no, and a pointer to the body of the log record */
2839 
2840   byte *body;
2841   mlog_id_t type;
2842   page_no_t page_no;
2843   space_id_t space_id;
2844 
2845   ulint len =
2846       recv_parse_log_rec(&type, ptr, end_ptr, &space_id, &page_no, &body);
2847 
2848   if (recv_sys->found_corrupt_log) {
2849     recv_report_corrupt_log(ptr, type, space_id, page_no);
2850 
2851 #ifdef UNIV_HOTBACKUP
2852     return (true);
2853 #endif /* UNIV_HOTBACKUP */
2854 
2855   } else if (len == 0 || recv_sys->found_corrupt_fs) {
2856     return (true);
2857   }
2858 
2859   lsn_t new_recovered_lsn;
2860 
2861   new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
2862 
2863   if (new_recovered_lsn > recv_sys->scanned_lsn) {
2864     /* The log record filled a log block, and we
2865     require that also the next log block should
2866     have been scanned in */
2867 
2868     return (true);
2869   }
2870 
2871   recv_previous_parsed_rec_type = type;
2872   recv_previous_parsed_rec_is_multi = 0;
2873   recv_previous_parsed_rec_offset = recv_sys->recovered_offset;
2874 
2875   recv_sys->recovered_offset += len;
2876   recv_sys->recovered_lsn = new_recovered_lsn;
2877 
2878   recv_track_changes_of_recovered_lsn();
2879 
2880   if (recv_update_bytes_to_ignore_before_checkpoint(len)) {
2881     return (false);
2882   }
2883 
2884   switch (type) {
2885     case MLOG_DUMMY_RECORD:
2886       /* Do nothing */
2887       break;
2888 
2889 #ifdef UNIV_LOG_LSN_DEBUG
2890     case MLOG_LSN:
2891       /* Do not add these records to the hash table.
2892       The page number and space id fields are misused
2893       for something else. */
2894       break;
2895 #endif /* UNIV_LOG_LSN_DEBUG */
2896 
2897     default:
2898 
2899       if (recv_recovery_on) {
2900 #ifndef UNIV_HOTBACKUP
2901         if (space_id == TRX_SYS_SPACE ||
2902             fil_tablespace_lookup_for_recovery(space_id)) {
2903 #endif /* !UNIV_HOTBACKUP */
2904 
2905           recv_add_to_hash_table(type, space_id, page_no, body, ptr + len,
2906                                  old_lsn, recv_sys->recovered_lsn);
2907 
2908 #ifndef UNIV_HOTBACKUP
2909         } else {
2910           recv_sys->missing_ids.insert(space_id);
2911         }
2912 #endif /* !UNIV_HOTBACKUP */
2913       }
2914 
2915       /* fall through */
2916 
2917     case MLOG_INDEX_LOAD:
2918     case MLOG_FILE_DELETE:
2919     case MLOG_FILE_RENAME:
2920     case MLOG_FILE_CREATE:
2921     case MLOG_TABLE_DYNAMIC_META:
2922 
2923       /* These were already handled by
2924       recv_parse_log_rec() and
2925       recv_parse_or_apply_log_rec_body(). */
2926 
2927       DBUG_PRINT("ib_log",
2928                  ("scan " LSN_PF ": log rec %s"
2929                   " len " ULINTPF " " PAGE_ID_PF,
2930                   old_lsn, get_mlog_string(type), len, space_id, page_no));
2931       break;
2932   }
2933 
2934   return (false);
2935 }
2936 
2937 /** Parse and store a multiple record log entry.
2938 @param[in]	ptr		start of buffer
2939 @param[in]	end_ptr		end of buffer
2940 @return true if end of processing */
recv_multi_rec(byte * ptr,byte * end_ptr)2941 static bool recv_multi_rec(byte *ptr, byte *end_ptr) {
2942   /* Check that all the records associated with the single mtr
2943   are included within the buffer */
2944 
2945   ulint n_recs = 0;
2946   ulint total_len = 0;
2947 
2948   for (;;) {
2949     mlog_id_t type = MLOG_BIGGEST_TYPE;
2950     byte *body;
2951     page_no_t page_no = 0;
2952     space_id_t space_id = 0;
2953 
2954     ulint len =
2955         recv_parse_log_rec(&type, ptr, end_ptr, &space_id, &page_no, &body);
2956 
2957     if (recv_sys->found_corrupt_log) {
2958       recv_report_corrupt_log(ptr, type, space_id, page_no);
2959 
2960       return (true);
2961 
2962     } else if (len == 0) {
2963       return (true);
2964 
2965     } else if ((*ptr & MLOG_SINGLE_REC_FLAG)) {
2966       recv_sys->found_corrupt_log = true;
2967 
2968       recv_report_corrupt_log(ptr, type, space_id, page_no);
2969 
2970       return (true);
2971 
2972     } else if (recv_sys->found_corrupt_fs) {
2973       return (true);
2974     }
2975 
2976     recv_previous_parsed_rec_type = type;
2977 
2978     recv_previous_parsed_rec_offset = recv_sys->recovered_offset + total_len;
2979 
2980     recv_previous_parsed_rec_is_multi = 1;
2981 
2982     total_len += len;
2983     ++n_recs;
2984 
2985     ptr += len;
2986 
2987     if (type == MLOG_MULTI_REC_END) {
2988       DBUG_PRINT("ib_log", ("scan " LSN_PF ": multi-log end total_len " ULINTPF
2989                             " n=" ULINTPF,
2990                             recv_sys->recovered_lsn, total_len, n_recs));
2991 
2992       break;
2993     }
2994 
2995     DBUG_PRINT("ib_log",
2996                ("scan " LSN_PF ": multi-log rec %s len " ULINTPF " " PAGE_ID_PF,
2997                 recv_sys->recovered_lsn, get_mlog_string(type), len, space_id,
2998                 page_no));
2999   }
3000 
3001   lsn_t new_recovered_lsn =
3002       recv_calc_lsn_on_data_add(recv_sys->recovered_lsn, total_len);
3003 
3004   if (new_recovered_lsn > recv_sys->scanned_lsn) {
3005     /* The log record filled a log block, and we require
3006     that also the next log block should have been scanned in */
3007 
3008     return (true);
3009   }
3010 
3011   /* Add all the records to the hash table */
3012 
3013   ptr = recv_sys->buf + recv_sys->recovered_offset;
3014 
3015   for (;;) {
3016     lsn_t old_lsn = recv_sys->recovered_lsn;
3017 
3018     /* This will apply MLOG_FILE_ records. */
3019 
3020     mlog_id_t type = MLOG_BIGGEST_TYPE;
3021     byte *body;
3022     page_no_t page_no = 0;
3023     space_id_t space_id = 0;
3024 
3025     ulint len =
3026         recv_parse_log_rec(&type, ptr, end_ptr, &space_id, &page_no, &body);
3027 
3028     if (recv_sys->found_corrupt_log &&
3029         !recv_report_corrupt_log(ptr, type, space_id, page_no)) {
3030       return (true);
3031 
3032     } else if (recv_sys->found_corrupt_fs) {
3033       return (true);
3034     }
3035 
3036     ut_a(len != 0);
3037     ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
3038 
3039     recv_sys->recovered_offset += len;
3040 
3041     recv_sys->recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
3042 
3043     const bool apply = !recv_update_bytes_to_ignore_before_checkpoint(len);
3044 
3045     switch (type) {
3046       case MLOG_MULTI_REC_END:
3047         recv_track_changes_of_recovered_lsn();
3048         /* Found the end mark for the records */
3049         return (false);
3050 
3051 #ifdef UNIV_LOG_LSN_DEBUG
3052       case MLOG_LSN:
3053         /* Do not add these records to the hash table.
3054         The page number and space id fields are misused
3055         for something else. */
3056         break;
3057 #endif /* UNIV_LOG_LSN_DEBUG */
3058 
3059       case MLOG_FILE_DELETE:
3060       case MLOG_FILE_CREATE:
3061       case MLOG_FILE_RENAME:
3062       case MLOG_TABLE_DYNAMIC_META:
3063         /* case MLOG_TRUNCATE: Disabled for WL6378 */
3064         /* These were already handled by
3065         recv_parse_or_apply_log_rec_body(). */
3066         break;
3067 
3068       default:
3069 
3070         if (!apply) {
3071           break;
3072         }
3073 
3074         if (recv_recovery_on) {
3075 #ifndef UNIV_HOTBACKUP
3076           if (space_id == TRX_SYS_SPACE ||
3077               fil_tablespace_lookup_for_recovery(space_id)) {
3078 #endif /* !UNIV_HOTBACKUP */
3079 
3080             recv_add_to_hash_table(type, space_id, page_no, body, ptr + len,
3081                                    old_lsn, new_recovered_lsn);
3082 
3083 #ifndef UNIV_HOTBACKUP
3084           } else {
3085             recv_sys->missing_ids.insert(space_id);
3086           }
3087 #endif /* !UNIV_HOTBACKUP */
3088         }
3089     }
3090 
3091     ptr += len;
3092   }
3093 
3094   return (false);
3095 }
3096 
3097 /** Parse log records from a buffer and optionally store them to a
3098 hash table to wait merging to file pages.
3099 @param[in]	checkpoint_lsn	the LSN of the latest checkpoint */
recv_parse_log_recs(lsn_t checkpoint_lsn)3100 void recv_parse_log_recs(lsn_t checkpoint_lsn) {
3101   ut_ad(recv_sys->parse_start_lsn != 0);
3102 
3103   for (;;) {
3104     byte *ptr = recv_sys->buf + recv_sys->recovered_offset;
3105 
3106     byte *end_ptr = recv_sys->buf + recv_sys->len;
3107 
3108     if (ptr == end_ptr) {
3109       return;
3110     }
3111 
3112     bool single_rec;
3113 
3114     switch (*ptr) {
3115 #ifdef UNIV_LOG_LSN_DEBUG
3116       case MLOG_LSN:
3117 #endif /* UNIV_LOG_LSN_DEBUG */
3118       case MLOG_DUMMY_RECORD:
3119         single_rec = true;
3120         break;
3121       default:
3122         single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
3123     }
3124 
3125     if (single_rec) {
3126       if (recv_single_rec(ptr, end_ptr)) {
3127         return;
3128       }
3129 
3130     } else if (recv_multi_rec(ptr, end_ptr)) {
3131       return;
3132     }
3133   }
3134 }
3135 
3136 /** Adds data from a new log block to the parsing buffer of recv_sys if
3137 recv_sys->parse_start_lsn is non-zero.
3138 @param[in]  log_block   log block
3139 @param[in]  scanned_lsn  lsn of how far we were able
3140                          to find data in this log block
3141 @param[in]  len          0 if full block or length of the data to add
3142 @return true if more data added */
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn,ulint len)3143 bool recv_sys_add_to_parsing_buf(const byte *log_block, lsn_t scanned_lsn,
3144                                  ulint len) {
3145   ut_ad(scanned_lsn >= recv_sys->scanned_lsn);
3146 
3147   if (!recv_sys->parse_start_lsn) {
3148     /* Cannot start parsing yet because no start point for
3149     it found */
3150 
3151     return (false);
3152   }
3153 
3154   ulint more_len;
3155   ulint data_len = len > 0 ? len : log_block_get_data_len(log_block);
3156 
3157   if (recv_sys->parse_start_lsn >= scanned_lsn) {
3158     return (false);
3159 
3160   } else if (recv_sys->scanned_lsn >= scanned_lsn) {
3161     return (false);
3162 
3163   } else if (recv_sys->parse_start_lsn > recv_sys->scanned_lsn) {
3164     more_len = (ulint)(scanned_lsn - recv_sys->parse_start_lsn);
3165 
3166   } else {
3167     more_len = (ulint)(scanned_lsn - recv_sys->scanned_lsn);
3168   }
3169 
3170   if (more_len == 0) {
3171     return (false);
3172   }
3173 
3174   ut_ad(data_len >= more_len);
3175 
3176   ulint start_offset = data_len - more_len;
3177 
3178   if (start_offset < LOG_BLOCK_HDR_SIZE) {
3179     start_offset = LOG_BLOCK_HDR_SIZE;
3180   }
3181 
3182   ulint end_offset = data_len;
3183 
3184   if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
3185     end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
3186   }
3187 
3188   ut_ad(start_offset <= end_offset);
3189 
3190   if (start_offset < end_offset) {
3191     memcpy(recv_sys->buf + recv_sys->len, log_block + start_offset,
3192            end_offset - start_offset);
3193 
3194     recv_sys->len += end_offset - start_offset;
3195 
3196     ut_a(recv_sys->len <= recv_sys->buf_len);
3197   }
3198 
3199   return (true);
3200 }
3201 
3202 /** Moves the parsing buffer data left to the buffer start. */
recv_reset_buffer()3203 void recv_reset_buffer() {
3204   ut_memmove(recv_sys->buf, recv_sys->buf + recv_sys->recovered_offset,
3205              recv_sys->len - recv_sys->recovered_offset);
3206 
3207   recv_sys->len -= recv_sys->recovered_offset;
3208 
3209   recv_sys->recovered_offset = 0;
3210 }
3211 
3212 /** Scans log from a buffer and stores new log data to the parsing buffer.
3213 Parses and hashes the log records if new data found.  Unless
3214 UNIV_HOTBACKUP is defined, this function will apply log records
3215 automatically when the hash table becomes full.
3216 @param[in,out]	log		redo log
3217 @param[in]	max_memory	we let the hash table of recs to grow to
3218                                 this size, at the maximum
3219 @param[in]	buf		buffer containing a log segment or garbage
3220 @param[in]	len		buffer length
3221 @param[in]	checkpoint_lsn	latest checkpoint LSN
3222 @param[in]	start_lsn	buffer start lsn
3223 @param[in,out]	contiguous_lsn	it is known that log contain
3224                                 contiguous log data up to this lsn
3225 @param[out]	read_upto_lsn	scanning succeeded up to this lsn
3226 @param[in]  to_lsn LSN to stop scanning at
3227 @return true if not able to scan any more in this log */
3228 #ifndef UNIV_HOTBACKUP
recv_scan_log_recs(log_t & log,ulint max_memory,const byte * buf,ulint len,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t * contiguous_lsn,lsn_t * read_upto_lsn,lsn_t to_lsn)3229 static bool recv_scan_log_recs(log_t &log,
3230 #else  /* !UNIV_HOTBACKUP */
3231 bool meb_scan_log_recs(
3232 #endif /* !UNIV_HOTBACKUP */
3233                                ulint max_memory, const byte *buf, ulint len,
3234                                lsn_t checkpoint_lsn, lsn_t start_lsn,
3235                                lsn_t *contiguous_lsn, lsn_t *read_upto_lsn,
3236                                lsn_t to_lsn) {
3237   const byte *log_block = buf;
3238   lsn_t scanned_lsn = start_lsn;
3239   bool finished = false;
3240   bool more_data = false;
3241 
3242   ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3243   ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0);
3244   ut_ad(len >= OS_FILE_LOG_BLOCK_SIZE);
3245 
3246   do {
3247     ut_ad(!finished);
3248 
3249     ulint no = log_block_get_hdr_no(log_block);
3250 
3251     ulint expected_no = log_block_convert_lsn_to_no(scanned_lsn);
3252 
3253     if (no != expected_no) {
3254       /* Garbage or an incompletely written log block.
3255 
3256       We will not report any error, because this can
3257       happen when InnoDB was killed while it was
3258       writing redo log. We simply treat this as an
3259       abrupt end of the redo log. */
3260 
3261       finished = true;
3262 
3263       break;
3264     }
3265 
3266     if (!log_block_checksum_is_ok(log_block)) {
3267       uint32_t checksum1 = log_block_get_checksum(log_block);
3268       uint32_t checksum2 = log_block_calc_checksum(log_block);
3269       ib::error(ER_IB_MSG_720, ulonglong{no}, ulonglong{scanned_lsn},
3270                 ulong{checksum1}, ulong{checksum2});
3271 
3272       /* Garbage or an incompletely written log block.
3273 
3274       This could be the result of killing the server
3275       while it was writing this log block. We treat
3276       this as an abrupt end of the redo log. */
3277 
3278       finished = true;
3279 
3280       break;
3281     }
3282 
3283     if (log_block_get_flush_bit(log_block)) {
3284       /* This block was a start of a log flush operation:
3285       we know that the previous flush operation must have
3286       been completed before this block can have been flushed.
3287       Therefore, we know that log data is contiguous up to
3288       scanned_lsn. */
3289 
3290       if (scanned_lsn > *contiguous_lsn) {
3291         *contiguous_lsn = scanned_lsn;
3292       }
3293     }
3294 
3295     ulint data_len = log_block_get_data_len(log_block);
3296 
3297     if (scanned_lsn + data_len > recv_sys->scanned_lsn &&
3298         log_block_get_checkpoint_no(log_block) <
3299             recv_sys->scanned_checkpoint_no &&
3300         (recv_sys->scanned_checkpoint_no -
3301              log_block_get_checkpoint_no(log_block) >
3302          0x80000000UL)) {
3303       /* Garbage from a log buffer flush which was made
3304       before the most recent database recovery */
3305 
3306       finished = true;
3307 
3308       break;
3309     }
3310 
3311     if (!recv_sys->parse_start_lsn &&
3312         log_block_get_first_rec_group(log_block) > 0) {
3313       /* We found a point from which to start the parsing
3314       of log records */
3315 
3316       recv_sys->parse_start_lsn =
3317           scanned_lsn + log_block_get_first_rec_group(log_block);
3318 
3319       ib::info(ER_IB_MSG_1261)
3320           << "Starting to parse redo log at lsn = " << recv_sys->parse_start_lsn
3321           << ", whereas checkpoint_lsn = " << recv_sys->checkpoint_lsn;
3322 
3323       if (recv_sys->parse_start_lsn < recv_sys->checkpoint_lsn) {
3324         /* We start to parse log records even before
3325         checkpoint_lsn, from the beginning of the log
3326         block which contains the checkpoint_lsn.
3327 
3328         That's because the first group of log records
3329         in the log block, starts before checkpoint_lsn,
3330         and checkpoint_lsn could potentially point to
3331         the middle of some log record. We need to find
3332         the first group of log records that starts at
3333         or after checkpoint_lsn. This could be only
3334         achieved by traversing all groups of log records
3335         that start within the log block since the first
3336         one (to discover their beginnings we need to
3337         parse them). However, we don't want to report
3338         missing tablespaces for space_id in log records
3339         before checkpoint_lsn. Hence we need to ignore
3340         those records and that's why we need a counter
3341         of bytes to ignore. */
3342 
3343         recv_sys->bytes_to_ignore_before_checkpoint =
3344             recv_sys->checkpoint_lsn - recv_sys->parse_start_lsn;
3345 
3346         ut_a(recv_sys->bytes_to_ignore_before_checkpoint <=
3347              OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE);
3348 
3349         ut_a(recv_sys->checkpoint_lsn % OS_FILE_LOG_BLOCK_SIZE +
3350                  LOG_BLOCK_TRL_SIZE <
3351              OS_FILE_LOG_BLOCK_SIZE);
3352 
3353         ut_a(recv_sys->parse_start_lsn % OS_FILE_LOG_BLOCK_SIZE >=
3354              LOG_BLOCK_HDR_SIZE);
3355       }
3356 
3357       recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
3358       recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
3359 
3360       recv_track_changes_of_recovered_lsn();
3361     }
3362 
3363     scanned_lsn += data_len;
3364 
3365     if (scanned_lsn > recv_sys->scanned_lsn) {
3366 #ifndef UNIV_HOTBACKUP
3367       if (srv_read_only_mode) {
3368         if (scanned_lsn > recv_sys->checkpoint_lsn) {
3369           ib::warn(ER_IB_MSG_721);
3370 
3371           return (true);
3372         }
3373 
3374       } else if (!recv_needed_recovery &&
3375                  scanned_lsn > recv_sys->checkpoint_lsn) {
3376         ib::info(ER_IB_MSG_722, ulonglong{recv_sys->scanned_lsn});
3377 
3378         recv_init_crash_recovery();
3379       }
3380 #endif /* !UNIV_HOTBACKUP */
3381 
3382       /* We were able to find more log data: add it to the
3383       parsing buffer if parse_start_lsn is already
3384       non-zero */
3385 
3386       if (recv_sys->len + 4 * OS_FILE_LOG_BLOCK_SIZE >= recv_sys->buf_len) {
3387         if (!recv_sys_resize_buf()) {
3388           recv_sys->found_corrupt_log = true;
3389 
3390 #ifndef UNIV_HOTBACKUP
3391           if (srv_force_recovery == 0) {
3392             ib::error(ER_IB_MSG_724);
3393             return (true);
3394           }
3395 #else  /* !UNIV_HOTBACKUP */
3396           ib::fatal(ER_IB_ERR_NOT_ENOUGH_MEMORY_FOR_PARSE_BUFFER)
3397               << "Insufficient memory for InnoDB parse buffer; want "
3398               << recv_sys->buf_len;
3399 #endif /* !UNIV_HOTBACKUP */
3400         }
3401       }
3402 
3403       if (!recv_sys->found_corrupt_log) {
3404         more_data =
3405             recv_sys_add_to_parsing_buf(log_block, scanned_lsn, data_len);
3406       }
3407 
3408       recv_sys->scanned_lsn = scanned_lsn;
3409 
3410       recv_sys->scanned_checkpoint_no = log_block_get_checkpoint_no(log_block);
3411     }
3412 
3413     if (data_len < OS_FILE_LOG_BLOCK_SIZE || scanned_lsn >= to_lsn) {
3414       /* Log data for this group ends here */
3415       finished = true;
3416 
3417       break;
3418 
3419     } else {
3420       log_block += OS_FILE_LOG_BLOCK_SIZE;
3421     }
3422 
3423   } while (log_block < buf + len);
3424 
3425   *read_upto_lsn = scanned_lsn;
3426 
3427   if (recv_needed_recovery ||
3428       (recv_is_from_backup && !recv_is_making_a_backup)) {
3429     ++recv_scan_print_counter;
3430 
3431     if (finished || (recv_scan_print_counter % 80) == 0) {
3432       ib::info(ER_IB_MSG_725, ulonglong{scanned_lsn});
3433     }
3434   }
3435 
3436   if (more_data && !recv_sys->found_corrupt_log) {
3437     /* Try to parse more log records */
3438 
3439     recv_parse_log_recs(checkpoint_lsn);
3440 
3441 #ifndef UNIV_HOTBACKUP
3442     if (recv_heap_used() > max_memory) {
3443       recv_apply_hashed_log_recs(log, false);
3444     }
3445 #endif /* !UNIV_HOTBACKUP */
3446 
3447     if (recv_sys->recovered_offset > recv_sys->buf_len / 4) {
3448       /* Move parsing buffer data to the buffer start */
3449 
3450       recv_reset_buffer();
3451     }
3452   }
3453 
3454   return (finished);
3455 }
3456 
3457 #ifdef UNIV_HOTBACKUP
meb_read_log_encryption(IORequest & encryption_request,byte * encryption_info)3458 bool meb_read_log_encryption(IORequest &encryption_request,
3459                              byte *encryption_info) {
3460   space_id_t log_space_id = dict_sys_t::s_log_space_first_id;
3461   const page_id_t page_id(log_space_id, 0);
3462   byte *log_block_buf_ptr;
3463   byte *log_block_buf;
3464   byte key[Encryption::KEY_LEN];
3465   byte iv[Encryption::KEY_LEN];
3466   fil_space_t *space = fil_space_get(log_space_id);
3467   dberr_t err;
3468 
3469   log_block_buf_ptr =
3470       static_cast<byte *>(ut_malloc_nokey(2 * OS_FILE_LOG_BLOCK_SIZE));
3471   memset(log_block_buf_ptr, 0, 2 * OS_FILE_LOG_BLOCK_SIZE);
3472   log_block_buf =
3473       static_cast<byte *>(ut_align(log_block_buf_ptr, OS_FILE_LOG_BLOCK_SIZE));
3474 
3475   if (encryption_info != nullptr) {
3476     /* encryption info was given as a parameter */
3477     memcpy(log_block_buf + LOG_HEADER_CREATOR_END, encryption_info,
3478            Encryption::INFO_MAX_SIZE);
3479   } else {
3480     /* encryption info was not given as a parameter, read it from the
3481        header of "ib_logfile0" */
3482 
3483     err = fil_redo_io(IORequestLogRead, page_id, univ_page_size,
3484                       LOG_CHECKPOINT_1 + OS_FILE_LOG_BLOCK_SIZE,
3485                       OS_FILE_LOG_BLOCK_SIZE, log_block_buf);
3486     ut_a(err == DB_SUCCESS);
3487   }
3488 
3489   if (memcmp(log_block_buf + LOG_HEADER_CREATOR_END, Encryption::KEY_MAGIC_V3,
3490              Encryption::MAGIC_SIZE) == 0) {
3491     encryption_request = IORequestLogRead;
3492 
3493     if (Encryption::decode_encryption_info(
3494             key, iv, log_block_buf + LOG_HEADER_CREATOR_END, true)) {
3495       /* If redo log encryption is enabled, set the
3496       space flag. Otherwise, we just fill the encryption
3497       information to space object for decrypting old
3498       redo log blocks. */
3499       space->flags |= FSP_FLAGS_MASK_ENCRYPTION;
3500       err = fil_set_encryption(space->id, Encryption::AES, key, iv);
3501 
3502       if (err == DB_SUCCESS) {
3503         ib::info(ER_IB_MSG_1239) << "Read redo log encryption"
3504                                  << " metadata successful.";
3505       } else {
3506         ut_free(log_block_buf_ptr);
3507         ib::error(ER_IB_MSG_1240) << "Can't set redo log tablespace"
3508                                   << " encryption metadata.";
3509         return (false);
3510       }
3511 
3512       encryption_request.encryption_key(
3513           space->encryption_key, space->encryption_klen, space->encryption_iv);
3514 
3515       encryption_request.encryption_algorithm(Encryption::AES);
3516     } else {
3517       ut_free(log_block_buf_ptr);
3518       ib::error(ER_IB_MSG_1241) << "Cannot read the encryption"
3519                                    " information in log file header, please"
3520                                    " check if keyring plugin loaded and"
3521                                    " the key file exists.";
3522       return (false);
3523     }
3524   }
3525 
3526   ut_free(log_block_buf_ptr);
3527   return (true);
3528 }
3529 #endif /* UNIV_HOTBACKUP */
3530 
3531 #ifndef UNIV_HOTBACKUP
3532 /** Reads a specified log segment to a buffer.
3533 @param[in,out]	log		redo log
3534 @param[in,out]	buf		buffer where to read
3535 @param[in]	start_lsn	read area start
3536 @param[in]	end_lsn		read area end */
recv_read_log_seg(log_t & log,byte * buf,lsn_t start_lsn,lsn_t end_lsn)3537 void recv_read_log_seg(log_t &log, byte *buf, lsn_t start_lsn, lsn_t end_lsn) {
3538   log_background_threads_inactive_validate(log);
3539 
3540   do {
3541     lsn_t source_offset;
3542 
3543     source_offset = log_files_real_offset_for_lsn(log, start_lsn);
3544 
3545     ut_a(end_lsn - start_lsn <= ULINT_MAX);
3546 
3547     ulint len;
3548 
3549     len = (ulint)(end_lsn - start_lsn);
3550 
3551     ut_ad(len != 0);
3552 
3553     if ((source_offset % log.file_size) + len > log.file_size) {
3554       /* If the above condition is true then len
3555       (which is ulint) is > the expression below,
3556       so the typecast is ok */
3557       len = (ulint)(log.file_size - (source_offset % log.file_size));
3558     }
3559 
3560     ++log.n_log_ios;
3561 
3562     ut_a(source_offset / UNIV_PAGE_SIZE <= PAGE_NO_MAX);
3563 
3564     const page_no_t page_no =
3565         static_cast<page_no_t>(source_offset / univ_page_size.physical());
3566 
3567     dberr_t
3568 
3569         err = fil_redo_io(
3570             IORequestLogRead, page_id_t(log.files_space_id, page_no),
3571             univ_page_size, (ulint)(source_offset % univ_page_size.physical()),
3572             len, buf);
3573 
3574     ut_a(err == DB_SUCCESS);
3575 
3576     start_lsn += len;
3577     buf += len;
3578 
3579   } while (start_lsn != end_lsn);
3580 }
3581 
3582 /** Scans log from a buffer and stores new log data to the parsing buffer.
3583 Parses and hashes the log records if new data found.
3584 @param[in,out]	log               redo log
3585 @param[in,out]  contiguous_lsn    log sequence number
3586                                   until which all redo log has been
3587                                   scanned
3588 @param[in,out]  to_lsn            LSN to stop recovery at */
recv_recovery_begin(log_t & log,lsn_t * contiguous_lsn,lsn_t to_lsn)3589 static void recv_recovery_begin(log_t &log, lsn_t *contiguous_lsn,
3590                                 lsn_t to_lsn) {
3591   mutex_enter(&recv_sys->mutex);
3592 
3593   recv_sys->len = 0;
3594   recv_sys->recovered_offset = 0;
3595   recv_sys->n_addrs = 0;
3596   recv_sys_empty_hash();
3597 
3598   /* Since 8.0, we can start recovery at checkpoint_lsn which points
3599   to the middle of log record. In such case we first to need to find
3600   the beginning of the first group of log records, which is at lsn
3601   greater than the checkpoint_lsn. */
3602   recv_sys->parse_start_lsn = 0;
3603 
3604   /* This is updated when we find value for parse_start_lsn. */
3605   recv_sys->bytes_to_ignore_before_checkpoint = 0;
3606 
3607   recv_sys->checkpoint_lsn = *contiguous_lsn;
3608   recv_sys->scanned_lsn = *contiguous_lsn;
3609   recv_sys->recovered_lsn = *contiguous_lsn;
3610 
3611   /* We have to trust that the first_rec_group in the first block is
3612   correct as we can't start parsing earlier to check it ourselves. */
3613   recv_sys->previous_recovered_lsn = *contiguous_lsn;
3614   recv_sys->last_block_first_rec_group = 0;
3615 
3616   recv_sys->scanned_checkpoint_no = 0;
3617   recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
3618   recv_previous_parsed_rec_offset = 0;
3619   recv_previous_parsed_rec_is_multi = 0;
3620   ut_ad(recv_max_page_lsn == 0);
3621 
3622   mutex_exit(&recv_sys->mutex);
3623 
3624   ulint max_mem =
3625       UNIV_PAGE_SIZE * (buf_pool_get_n_pages() -
3626                         (recv_n_pool_free_frames * srv_buf_pool_instances));
3627 
3628   *contiguous_lsn =
3629       ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3630 
3631   lsn_t start_lsn = *contiguous_lsn;
3632 
3633   lsn_t checkpoint_lsn = start_lsn;
3634 
3635   bool finished = false;
3636 
3637   while (!finished) {
3638     lsn_t end_lsn = start_lsn + RECV_SCAN_SIZE;
3639 
3640     recv_read_log_seg(log, log.buf, start_lsn, end_lsn);
3641 
3642     finished = recv_scan_log_recs(log, max_mem, log.buf, RECV_SCAN_SIZE,
3643                                   checkpoint_lsn, start_lsn, contiguous_lsn,
3644                                   &log.scanned_lsn, to_lsn);
3645 
3646     start_lsn = end_lsn;
3647   }
3648 
3649   DBUG_PRINT("ib_log", ("scan " LSN_PF " completed", log.scanned_lsn));
3650 }
3651 
3652 /** Initialize crash recovery environment. Can be called iff
3653 recv_needed_recovery == false. */
recv_init_crash_recovery()3654 static void recv_init_crash_recovery() {
3655   ut_ad(!srv_read_only_mode);
3656   ut_a(!recv_needed_recovery);
3657 
3658   recv_needed_recovery = true;
3659 
3660   ib::info(ER_IB_MSG_726);
3661   ib::info(ER_IB_MSG_727);
3662 
3663   recv_sys->dblwr->recover();
3664 
3665   if (srv_force_recovery < SRV_FORCE_NO_LOG_REDO) {
3666     /* Spawn the background thread to flush dirty pages
3667     from the buffer pools. */
3668 
3669     srv_threads.m_recv_writer =
3670         os_thread_create(recv_writer_thread_key, recv_writer_thread);
3671 
3672     srv_threads.m_recv_writer.start();
3673   }
3674 }
3675 #endif /* !UNIV_HOTBACKUP */
3676 
3677 #ifndef UNIV_HOTBACKUP
3678 /** Start recovering from a redo log checkpoint.
3679 @see recv_recovery_from_checkpoint_finish
3680 @param[in,out]	log		redo log
3681 @param[in]  flush_lsn FIL_PAGE_FILE_FLUSH_LSN
3682                                 of first system tablespace page
3683 @param[in]  to_lsn    LSN to store recovery at
3684 @return error code or DB_SUCCESS */
recv_recovery_from_checkpoint_start(log_t & log,lsn_t flush_lsn,lsn_t to_lsn)3685 dberr_t recv_recovery_from_checkpoint_start(log_t &log, lsn_t flush_lsn,
3686                                             lsn_t to_lsn) {
3687   /* Initialize red-black tree for fast insertions into the
3688   flush_list during recovery process. */
3689   buf_flush_init_flush_rbt();
3690 
3691   if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3692     ib::info(ER_IB_MSG_728);
3693 
3694     /* We leave redo log not started and this is read-only mode. */
3695     ut_a(log.sn == 0);
3696     ut_a(srv_read_only_mode);
3697 
3698     return (DB_SUCCESS);
3699   }
3700 
3701   recv_recovery_on = true;
3702 
3703   /* Look for the latest checkpoint */
3704 
3705   dberr_t err;
3706   ulint max_cp_field;
3707 
3708   err = recv_find_max_checkpoint(log, &max_cp_field);
3709 
3710   if (err != DB_SUCCESS) {
3711     return (err);
3712   }
3713 
3714   log_files_header_read(log, static_cast<uint32_t>(max_cp_field));
3715 
3716   lsn_t checkpoint_lsn;
3717   checkpoint_no_t checkpoint_no;
3718 
3719   checkpoint_lsn = mach_read_from_8(log.checkpoint_buf + LOG_CHECKPOINT_LSN);
3720 
3721   checkpoint_no = mach_read_from_8(log.checkpoint_buf + LOG_CHECKPOINT_NO);
3722 
3723   ut_ad(to_lsn >= checkpoint_lsn);
3724 
3725   /* Read the first log file header to print a note if this is
3726   a recovery from a restored InnoDB Hot Backup */
3727   byte log_hdr_buf[LOG_FILE_HDR_SIZE];
3728   const page_id_t page_id{log.files_space_id, 0};
3729 
3730   err = fil_redo_io(IORequestLogRead, page_id, univ_page_size, 0,
3731                     LOG_FILE_HDR_SIZE, log_hdr_buf);
3732 
3733   ut_a(err == DB_SUCCESS);
3734 
3735   /* Make sure creator is properly '\0'-terminated for output */
3736   log_hdr_buf[LOG_HEADER_CREATOR_END - 1] = '\0';
3737 
3738   const auto p = log_hdr_buf + LOG_HEADER_CREATOR;
3739 
3740   if (memcmp(p, (byte *)"MEB", sizeof("MEB") - 1) == 0) {
3741     /* Disable the double write buffer. MEB ensures that the data pages
3742     are consistent. Therefore the dblwr is superfluous. Secondly, the dblwr
3743     file is not redo logged and we can have pages in there that were written
3744     after the redo log was copied by MEB. */
3745 
3746     /* Restore state after recovery completes. */
3747     recv_sys->dblwr_state = dblwr::enabled;
3748 
3749     dblwr::enabled = false;
3750 
3751     dblwr::set();
3752 
3753     recv_sys->is_meb_recovery = true;
3754 
3755     if (srv_read_only_mode) {
3756       ib::error(ER_IB_MSG_729);
3757 
3758       return (DB_ERROR);
3759     }
3760 
3761     /* This log file was created by mysqlbackup --restore: print
3762     a note to the user about it */
3763 
3764     ib::info(ER_IB_MSG_730,
3765              reinterpret_cast<const char *>(log_hdr_buf) + LOG_HEADER_CREATOR);
3766 
3767     /* Replace the label. */
3768     ut_ad(LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR >=
3769           sizeof LOG_HEADER_CREATOR_CURRENT);
3770 
3771     memset(log_hdr_buf + LOG_HEADER_CREATOR, 0,
3772            LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR);
3773 
3774     strcpy(reinterpret_cast<char *>(log_hdr_buf) + LOG_HEADER_CREATOR,
3775            LOG_HEADER_CREATOR_CURRENT);
3776 
3777     /* Re-calculate the header block checksum. */
3778     log_block_set_checksum(log_hdr_buf,
3779                            log_block_calc_checksum_crc32(log_hdr_buf));
3780 
3781     /* Write to the log file to wipe over the label */
3782     err = fil_redo_io(IORequestLogWrite, page_id, univ_page_size, 0,
3783                       OS_FILE_LOG_BLOCK_SIZE, log_hdr_buf);
3784 
3785     ut_a(err == DB_SUCCESS);
3786 
3787   } else if (0 == ut_memcmp(log_hdr_buf + LOG_HEADER_CREATOR,
3788                             (byte *)LOG_HEADER_CREATOR_CLONE,
3789                             (sizeof LOG_HEADER_CREATOR_CLONE) - 1)) {
3790     /* Refuse clone database recovery in read only mode. */
3791     if (srv_read_only_mode) {
3792       ib::error(ER_IB_MSG_736);
3793       return (DB_READ_ONLY);
3794     }
3795     recv_sys->is_cloned_db = true;
3796     ib::info(ER_IB_MSG_731);
3797   }
3798 
3799   /* Start reading the log from the checkpoint LSN up.
3800   The variable contiguous_lsn contains an LSN up to which
3801   the log is known to be contiguously written to log. */
3802 
3803   ut_ad(RECV_SCAN_SIZE <= log.buf_size);
3804 
3805   ut_ad(recv_sys->n_addrs == 0);
3806 
3807   lsn_t contiguous_lsn;
3808 
3809   contiguous_lsn = checkpoint_lsn;
3810 
3811   switch (log.format) {
3812     case LOG_HEADER_FORMAT_8_0_3:
3813     case LOG_HEADER_FORMAT_CURRENT:
3814       break;
3815 
3816     case LOG_HEADER_FORMAT_5_7_9:
3817     case LOG_HEADER_FORMAT_8_0_1:
3818 
3819       ib::info(ER_IB_MSG_732, ulong{log.format});
3820 
3821       /* Check if the redo log from an older known redo log
3822       version is from a clean shutdown. */
3823       err = recv_log_recover_pre_8_0_4(log, checkpoint_no, checkpoint_lsn);
3824 
3825       return (err);
3826 
3827     default:
3828       ib::error(ER_IB_MSG_733, ulong{log.format},
3829                 ulong{LOG_HEADER_FORMAT_CURRENT});
3830 
3831       ut_ad(0);
3832       recv_sys->found_corrupt_log = true;
3833       return (DB_ERROR);
3834   }
3835 
3836   /* NOTE: we always do a 'recovery' at startup, but only if
3837   there is something wrong we will print a message to the
3838   user about recovery: */
3839 
3840   if (checkpoint_lsn != flush_lsn) {
3841     if (checkpoint_lsn < flush_lsn) {
3842       ib::warn(ER_IB_MSG_734, ulonglong{checkpoint_lsn}, ulonglong{flush_lsn});
3843     }
3844 
3845     if (!recv_needed_recovery) {
3846       ib::info(ER_IB_MSG_735, ulonglong{flush_lsn}, ulonglong{checkpoint_lsn});
3847 
3848       if (srv_read_only_mode) {
3849         ib::error(ER_IB_MSG_736);
3850 
3851         return (DB_READ_ONLY);
3852       }
3853 
3854       recv_init_crash_recovery();
3855     }
3856   }
3857 
3858   contiguous_lsn = checkpoint_lsn;
3859 
3860   recv_recovery_begin(log, &contiguous_lsn, to_lsn);
3861 
3862   lsn_t recovered_lsn;
3863 
3864   recovered_lsn = recv_sys->recovered_lsn;
3865 
3866   ut_a(recv_needed_recovery || checkpoint_lsn == recovered_lsn);
3867 
3868   log.recovered_lsn = recovered_lsn;
3869 
3870   /* If it is at block boundary, add header size. */
3871   auto check_scanned_lsn = log.scanned_lsn;
3872   if (check_scanned_lsn % OS_FILE_LOG_BLOCK_SIZE == 0) {
3873     check_scanned_lsn += LOG_BLOCK_HDR_SIZE;
3874   }
3875 
3876   if (check_scanned_lsn < checkpoint_lsn ||
3877       check_scanned_lsn < recv_max_page_lsn) {
3878     ib::error(ER_IB_MSG_737, ulonglong{log.scanned_lsn},
3879               ulonglong{checkpoint_lsn}, ulonglong{recv_max_page_lsn});
3880   }
3881 
3882   if (recovered_lsn < checkpoint_lsn) {
3883     /* No harm in trying to do RO access. */
3884     if (!srv_read_only_mode) {
3885       ut_error;
3886     }
3887 
3888     return (DB_ERROR);
3889   }
3890 
3891   if ((recv_sys->found_corrupt_log && srv_force_recovery == 0) ||
3892       recv_sys->found_corrupt_fs) {
3893     return (DB_ERROR);
3894   }
3895 
3896   /* Read the last recovered log block. */
3897   lsn_t start_lsn;
3898   lsn_t end_lsn;
3899 
3900   start_lsn = ut_uint64_align_down(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE);
3901 
3902   end_lsn = ut_uint64_align_up(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE);
3903 
3904   ut_a(start_lsn < end_lsn);
3905   ut_a(start_lsn % log.buf_size + OS_FILE_LOG_BLOCK_SIZE <= log.buf_size);
3906 
3907   recv_read_log_seg(log, recv_sys->last_block, start_lsn, end_lsn);
3908 
3909   byte *log_buf_block = log.buf + start_lsn % log.buf_size;
3910 
3911   std::memcpy(log_buf_block, recv_sys->last_block, OS_FILE_LOG_BLOCK_SIZE);
3912 
3913   if (recv_sys->last_block_first_rec_group != 0 &&
3914       log_block_get_first_rec_group(log_buf_block) !=
3915           recv_sys->last_block_first_rec_group) {
3916     /* We must not start with invalid first_rec_group in the first block,
3917     because if we crashed, we could be unable to recover. We do NOT have
3918     guarantee that the first_rec_group was correct because recovery did
3919     not report error. The first_rec_group was used only to locate the
3920     beginning of the log for recovery. For later blocks it was not used.
3921     It might be corrupted on disk and stay unnoticed if checksums for
3922     log blocks are disabled. In such case it would be better to repair
3923     it now instead of relying on the broken value and risking data loss.
3924     We emit warning to notice user about the situation. We repair that
3925     only in the log buffer. */
3926 
3927     ib::warn(ER_IB_RECV_FIRST_REC_GROUP_INVALID,
3928              uint(log_block_get_first_rec_group(log_buf_block)),
3929              uint(recv_sys->last_block_first_rec_group));
3930 
3931     log_block_set_first_rec_group(log_buf_block,
3932                                   recv_sys->last_block_first_rec_group);
3933 
3934   } else if (log_block_get_first_rec_group(log_buf_block) == 0) {
3935     /* Again, if it was zero, for any reason, we prefer to fix it
3936     before starting (we emit warning). */
3937 
3938     ib::warn(ER_IB_RECV_FIRST_REC_GROUP_INVALID, uint(0),
3939              uint(recovered_lsn % OS_FILE_LOG_BLOCK_SIZE));
3940 
3941     log_block_set_first_rec_group(log_buf_block,
3942                                   recovered_lsn % OS_FILE_LOG_BLOCK_SIZE);
3943   }
3944 
3945   ut_d(log.first_block_is_correct_for_lsn = recovered_lsn);
3946 
3947   log_start(log, checkpoint_no + 1, checkpoint_lsn, recovered_lsn);
3948 
3949   /* Copy the checkpoint info to the log; remember that we have
3950   incremented checkpoint_no by one, and the info will not be written
3951   over the max checkpoint info, thus making the preservation of max
3952   checkpoint info on disk certain */
3953 
3954   if (!srv_read_only_mode) {
3955     log_files_write_checkpoint(log, checkpoint_lsn);
3956   }
3957 
3958   mutex_enter(&recv_sys->mutex);
3959   recv_sys->apply_log_recs = true;
3960   mutex_exit(&recv_sys->mutex);
3961 
3962   /* The database is now ready to start almost normal processing of user
3963   transactions: transaction rollbacks and the application of the log
3964   records in the hash table can be run in background. */
3965 
3966   return (DB_SUCCESS);
3967 }
3968 
3969 /** Complete the recovery from the latest checkpoint.
3970 @param[in,out]	log		redo log
3971 @param[in]	aborting	true if the server has to abort due to an error
3972 @return recovered persistent metadata or nullptr if aborting*/
recv_recovery_from_checkpoint_finish(log_t & log,bool aborting)3973 MetadataRecover *recv_recovery_from_checkpoint_finish(log_t &log,
3974                                                       bool aborting) {
3975   /* Make sure that the recv_writer thread is done. This is
3976   required because it grabs various mutexes and we want to
3977   ensure that when we enable sync_order_checks there is no
3978   mutex currently held by any thread. */
3979   mutex_enter(&recv_sys->writer_mutex);
3980 
3981   /* Free the resources of the recovery system */
3982   recv_recovery_on = false;
3983 
3984   /* By acquiring the mutex we ensure that the recv_writer thread
3985   won't trigger any more LRU batches. Now wait for currently
3986   in progress batches to finish. */
3987   buf_flush_wait_LRU_batch_end();
3988 
3989   mutex_exit(&recv_sys->writer_mutex);
3990 
3991   ulint count = 0;
3992 
3993   while (recv_writer_is_active()) {
3994     ++count;
3995 
3996     os_thread_sleep(100000);
3997 
3998     if (count >= 600) {
3999       ib::info(ER_IB_MSG_738);
4000       count = 0;
4001     }
4002   }
4003 
4004   MetadataRecover *metadata;
4005 
4006   if (!aborting) {
4007     metadata = recv_sys->metadata_recover;
4008 
4009     recv_sys->metadata_recover = nullptr;
4010   } else {
4011     metadata = nullptr;
4012   }
4013 
4014   recv_sys_free();
4015 
4016   if (!aborting) {
4017     /* Validate a few system page types that were left uninitialized
4018     by older versions of MySQL. */
4019     mtr_t mtr;
4020 
4021     mtr.start();
4022 
4023     buf_block_t *block;
4024 
4025     /* Bitmap page types will be reset in buf_dblwr_check_block()
4026     without redo logging. */
4027 
4028     block = buf_page_get(page_id_t(IBUF_SPACE_ID, FSP_IBUF_HEADER_PAGE_NO),
4029                          univ_page_size, RW_X_LATCH, &mtr);
4030 
4031     fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4032 
4033     /* Already MySQL 3.23.53 initialized FSP_IBUF_TREE_ROOT_PAGE_NO
4034     to FIL_PAGE_INDEX. No need to reset that one. */
4035 
4036     block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
4037                          univ_page_size, RW_X_LATCH, &mtr);
4038 
4039     fil_block_check_type(block, FIL_PAGE_TYPE_TRX_SYS, &mtr);
4040 
4041     block = buf_page_get(page_id_t(TRX_SYS_SPACE, FSP_FIRST_RSEG_PAGE_NO),
4042                          univ_page_size, RW_X_LATCH, &mtr);
4043 
4044     fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4045 
4046     block = buf_page_get(page_id_t(TRX_SYS_SPACE, FSP_DICT_HDR_PAGE_NO),
4047                          univ_page_size, RW_X_LATCH, &mtr);
4048 
4049     fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4050 
4051     mtr.commit();
4052   }
4053 
4054   /* Free up the flush_rbt. */
4055   buf_flush_free_flush_rbt();
4056 
4057   return (metadata);
4058 }
4059 
4060 #endif /* !UNIV_HOTBACKUP */
4061 
4062 #if defined(UNIV_DEBUG) || defined(UNIV_HOTBACKUP)
4063 /** Return string name of the redo log record type.
4064 @param[in]	type	record log record enum
4065 @return string name of record log record */
get_mlog_string(mlog_id_t type)4066 const char *get_mlog_string(mlog_id_t type) {
4067   switch (type) {
4068     case MLOG_SINGLE_REC_FLAG:
4069       return ("MLOG_SINGLE_REC_FLAG");
4070 
4071     case MLOG_1BYTE:
4072       return ("MLOG_1BYTE");
4073 
4074     case MLOG_2BYTES:
4075       return ("MLOG_2BYTES");
4076 
4077     case MLOG_4BYTES:
4078       return ("MLOG_4BYTES");
4079 
4080     case MLOG_8BYTES:
4081       return ("MLOG_8BYTES");
4082 
4083     case MLOG_REC_INSERT:
4084       return ("MLOG_REC_INSERT");
4085 
4086     case MLOG_REC_CLUST_DELETE_MARK:
4087       return ("MLOG_REC_CLUST_DELETE_MARK");
4088 
4089     case MLOG_REC_SEC_DELETE_MARK:
4090       return ("MLOG_REC_SEC_DELETE_MARK");
4091 
4092     case MLOG_REC_UPDATE_IN_PLACE:
4093       return ("MLOG_REC_UPDATE_IN_PLACE");
4094 
4095     case MLOG_REC_DELETE:
4096       return ("MLOG_REC_DELETE");
4097 
4098     case MLOG_LIST_END_DELETE:
4099       return ("MLOG_LIST_END_DELETE");
4100 
4101     case MLOG_LIST_START_DELETE:
4102       return ("MLOG_LIST_START_DELETE");
4103 
4104     case MLOG_LIST_END_COPY_CREATED:
4105       return ("MLOG_LIST_END_COPY_CREATED");
4106 
4107     case MLOG_PAGE_REORGANIZE:
4108       return ("MLOG_PAGE_REORGANIZE");
4109 
4110     case MLOG_PAGE_CREATE:
4111       return ("MLOG_PAGE_CREATE");
4112 
4113     case MLOG_UNDO_INSERT:
4114       return ("MLOG_UNDO_INSERT");
4115 
4116     case MLOG_UNDO_ERASE_END:
4117       return ("MLOG_UNDO_ERASE_END");
4118 
4119     case MLOG_UNDO_INIT:
4120       return ("MLOG_UNDO_INIT");
4121 
4122     case MLOG_UNDO_HDR_REUSE:
4123       return ("MLOG_UNDO_HDR_REUSE");
4124 
4125     case MLOG_UNDO_HDR_CREATE:
4126       return ("MLOG_UNDO_HDR_CREATE");
4127 
4128     case MLOG_REC_MIN_MARK:
4129       return ("MLOG_REC_MIN_MARK");
4130 
4131     case MLOG_IBUF_BITMAP_INIT:
4132       return ("MLOG_IBUF_BITMAP_INIT");
4133 
4134 #ifdef UNIV_LOG_LSN_DEBUG
4135     case MLOG_LSN:
4136       return ("MLOG_LSN");
4137 #endif /* UNIV_LOG_LSN_DEBUG */
4138 
4139     case MLOG_INIT_FILE_PAGE:
4140       return ("MLOG_INIT_FILE_PAGE");
4141 
4142     case MLOG_WRITE_STRING:
4143       return ("MLOG_WRITE_STRING");
4144 
4145     case MLOG_MULTI_REC_END:
4146       return ("MLOG_MULTI_REC_END");
4147 
4148     case MLOG_DUMMY_RECORD:
4149       return ("MLOG_DUMMY_RECORD");
4150 
4151     case MLOG_FILE_DELETE:
4152       return ("MLOG_FILE_DELETE");
4153 
4154     case MLOG_COMP_REC_MIN_MARK:
4155       return ("MLOG_COMP_REC_MIN_MARK");
4156 
4157     case MLOG_COMP_PAGE_CREATE:
4158       return ("MLOG_COMP_PAGE_CREATE");
4159 
4160     case MLOG_COMP_REC_INSERT:
4161       return ("MLOG_COMP_REC_INSERT");
4162 
4163     case MLOG_COMP_REC_CLUST_DELETE_MARK:
4164       return ("MLOG_COMP_REC_CLUST_DELETE_MARK");
4165 
4166     case MLOG_COMP_REC_SEC_DELETE_MARK:
4167       return ("MLOG_COMP_REC_SEC_DELETE_MARK");
4168 
4169     case MLOG_COMP_REC_UPDATE_IN_PLACE:
4170       return ("MLOG_COMP_REC_UPDATE_IN_PLACE");
4171 
4172     case MLOG_COMP_REC_DELETE:
4173       return ("MLOG_COMP_REC_DELETE");
4174 
4175     case MLOG_COMP_LIST_END_DELETE:
4176       return ("MLOG_COMP_LIST_END_DELETE");
4177 
4178     case MLOG_COMP_LIST_START_DELETE:
4179       return ("MLOG_COMP_LIST_START_DELETE");
4180 
4181     case MLOG_COMP_LIST_END_COPY_CREATED:
4182       return ("MLOG_COMP_LIST_END_COPY_CREATED");
4183 
4184     case MLOG_COMP_PAGE_REORGANIZE:
4185       return ("MLOG_COMP_PAGE_REORGANIZE");
4186 
4187     case MLOG_FILE_CREATE:
4188       return ("MLOG_FILE_CREATE");
4189 
4190     case MLOG_ZIP_WRITE_NODE_PTR:
4191       return ("MLOG_ZIP_WRITE_NODE_PTR");
4192 
4193     case MLOG_ZIP_WRITE_BLOB_PTR:
4194       return ("MLOG_ZIP_WRITE_BLOB_PTR");
4195 
4196     case MLOG_ZIP_WRITE_HEADER:
4197       return ("MLOG_ZIP_WRITE_HEADER");
4198 
4199     case MLOG_ZIP_PAGE_COMPRESS:
4200       return ("MLOG_ZIP_PAGE_COMPRESS");
4201 
4202     case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
4203       return ("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
4204 
4205     case MLOG_ZIP_PAGE_REORGANIZE:
4206       return ("MLOG_ZIP_PAGE_REORGANIZE");
4207 
4208     case MLOG_FILE_RENAME:
4209       return ("MLOG_FILE_RENAME");
4210 
4211     case MLOG_PAGE_CREATE_RTREE:
4212       return ("MLOG_PAGE_CREATE_RTREE");
4213 
4214     case MLOG_COMP_PAGE_CREATE_RTREE:
4215       return ("MLOG_COMP_PAGE_CREATE_RTREE");
4216 
4217     case MLOG_INIT_FILE_PAGE2:
4218       return ("MLOG_INIT_FILE_PAGE2");
4219 
4220     case MLOG_INDEX_LOAD:
4221       return ("MLOG_INDEX_LOAD");
4222 
4223       /* Disabled for WL6378
4224       case MLOG_TRUNCATE:
4225               return("MLOG_TRUNCATE");
4226       */
4227 
4228     case MLOG_TABLE_DYNAMIC_META:
4229       return ("MLOG_TABLE_DYNAMIC_META");
4230 
4231     case MLOG_PAGE_CREATE_SDI:
4232       return ("MLOG_PAGE_CREATE_SDI");
4233 
4234     case MLOG_COMP_PAGE_CREATE_SDI:
4235       return ("MLOG_COMP_PAGE_CREATE_SDI");
4236 
4237     case MLOG_TEST:
4238       return ("MLOG_TEST");
4239   }
4240 
4241   DBUG_ASSERT(0);
4242 
4243   return (nullptr);
4244 }
4245 #endif /* UNIV_DEBUG || UNIV_HOTBACKUP */
4246