1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file mtr/mtr0mtr.cc
28  Mini-transaction buffer
29 
30  Created 11/26/1995 Heikki Tuuri
31  *******************************************************/
32 
33 #include "mtr0mtr.h"
34 
35 #include "buf0buf.h"
36 #include "buf0flu.h"
37 #include "clone0api.h"
38 #include "fsp0sysspace.h"
39 #include "log0meb.h"
40 #ifndef UNIV_HOTBACKUP
41 #include "clone0clone.h"
42 #include "log0log.h"
43 #include "log0recv.h"
44 #include "mtr0log.h"
45 #endif /* !UNIV_HOTBACKUP */
46 #include "my_dbug.h"
47 #ifndef UNIV_HOTBACKUP
48 #include "page0types.h"
49 #include "trx0purge.h"
50 #endif /* !UNIV_HOTBACKUP */
51 
52 static_assert(static_cast<int>(MTR_MEMO_PAGE_S_FIX) ==
53                   static_cast<int>(RW_S_LATCH),
54               "");
55 
56 static_assert(static_cast<int>(MTR_MEMO_PAGE_X_FIX) ==
57                   static_cast<int>(RW_X_LATCH),
58               "");
59 
60 static_assert(static_cast<int>(MTR_MEMO_PAGE_SX_FIX) ==
61                   static_cast<int>(RW_SX_LATCH),
62               "");
63 
64 /** Iterate over a memo block in reverse. */
65 template <typename Functor>
66 struct Iterate {
67   /** Release specific object */
IterateIterate68   explicit Iterate(Functor &functor) : m_functor(functor) { /* Do nothing */
69   }
70 
71   /** @return false if the functor returns false. */
operator ()Iterate72   bool operator()(mtr_buf_t::block_t *block) {
73     const mtr_memo_slot_t *start =
74         reinterpret_cast<const mtr_memo_slot_t *>(block->begin());
75 
76     mtr_memo_slot_t *slot = reinterpret_cast<mtr_memo_slot_t *>(block->end());
77 
78     ut_ad(!(block->used() % sizeof(*slot)));
79 
80     while (slot-- != start) {
81       if (!m_functor(slot)) {
82         return (false);
83       }
84     }
85 
86     return (true);
87   }
88 
89   Functor &m_functor;
90 };
91 
92 /** Find specific object */
93 struct Find {
94   /** Constructor */
FindFind95   Find(const void *object, ulint type)
96       : m_slot(), m_type(type), m_object(object) {
97     ut_a(object != nullptr);
98   }
99 
100   /** @return false if the object was found. */
operator ()Find101   bool operator()(mtr_memo_slot_t *slot) {
102     if (m_object == slot->object && m_type == slot->type) {
103       m_slot = slot;
104       return (false);
105     }
106 
107     return (true);
108   }
109 
110   /** Slot if found */
111   mtr_memo_slot_t *m_slot;
112 
113   /** Type of the object to look for */
114   ulint m_type;
115 
116   /** The object instance to look for */
117   const void *m_object;
118 };
119 
120 /** Find a page frame */
121 struct Find_page {
122   /** Constructor
123   @param[in]	ptr	pointer to within a page frame
124   @param[in]	flags	MTR_MEMO flags to look for */
Find_pageFind_page125   Find_page(const void *ptr, ulint flags)
126       : m_ptr(ptr), m_flags(flags), m_slot(nullptr) {
127     /* We can only look for page-related flags. */
128     ut_ad(!(flags &
129             ~(MTR_MEMO_PAGE_S_FIX | MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX |
130               MTR_MEMO_BUF_FIX | MTR_MEMO_MODIFY)));
131   }
132 
133   /** Visit a memo entry.
134   @param[in]	slot	memo entry to visit
135   @retval	false	if a page was found
136   @retval	true	if the iteration should continue */
operator ()Find_page137   bool operator()(mtr_memo_slot_t *slot) {
138     ut_ad(m_slot == nullptr);
139 
140     if (!(m_flags & slot->type) || slot->object == nullptr) {
141       return (true);
142     }
143 
144     buf_block_t *block = reinterpret_cast<buf_block_t *>(slot->object);
145 
146     if (m_ptr < block->frame ||
147         m_ptr >= block->frame + block->page.size.logical()) {
148       return (true);
149     }
150 
151     m_slot = slot;
152     return (false);
153   }
154 
155   /** @return the slot that was found */
get_slotFind_page156   mtr_memo_slot_t *get_slot() const {
157     ut_ad(m_slot != nullptr);
158     return (m_slot);
159   }
160   /** @return the block that was found */
get_blockFind_page161   buf_block_t *get_block() const {
162     return (reinterpret_cast<buf_block_t *>(get_slot()->object));
163   }
164 
165  private:
166   /** Pointer inside a page frame to look for */
167   const void *const m_ptr;
168   /** MTR_MEMO flags to look for */
169   const ulint m_flags;
170   /** The slot corresponding to m_ptr */
171   mtr_memo_slot_t *m_slot;
172 };
173 
174 /** Release latches and decrement the buffer fix count.
175 @param[in]	slot	memo slot */
memo_slot_release(mtr_memo_slot_t * slot)176 static void memo_slot_release(mtr_memo_slot_t *slot) {
177   switch (slot->type) {
178 #ifndef UNIV_HOTBACKUP
179     buf_block_t *block;
180 #endif /* !UNIV_HOTBACKUP */
181 
182     case MTR_MEMO_BUF_FIX:
183     case MTR_MEMO_PAGE_S_FIX:
184     case MTR_MEMO_PAGE_SX_FIX:
185     case MTR_MEMO_PAGE_X_FIX:
186 #ifndef UNIV_HOTBACKUP
187       block = reinterpret_cast<buf_block_t *>(slot->object);
188 
189       buf_block_unfix(block);
190       buf_page_release_latch(block, slot->type);
191 #endif /* !UNIV_HOTBACKUP */
192       break;
193 
194     case MTR_MEMO_S_LOCK:
195       rw_lock_s_unlock(reinterpret_cast<rw_lock_t *>(slot->object));
196       break;
197 
198     case MTR_MEMO_SX_LOCK:
199       rw_lock_sx_unlock(reinterpret_cast<rw_lock_t *>(slot->object));
200       break;
201 
202     case MTR_MEMO_X_LOCK:
203       rw_lock_x_unlock(reinterpret_cast<rw_lock_t *>(slot->object));
204       break;
205 
206 #ifdef UNIV_DEBUG
207     default:
208       ut_ad(slot->type == MTR_MEMO_MODIFY);
209 #endif /* UNIV_DEBUG */
210   }
211 
212   slot->object = nullptr;
213 }
214 
215 /** Release the latches and blocks acquired by the mini-transaction. */
216 struct Release_all {
217   /** @return true always. */
operator ()Release_all218   bool operator()(mtr_memo_slot_t *slot) const {
219     if (slot->object != nullptr) {
220       memo_slot_release(slot);
221     }
222 
223     return (true);
224   }
225 };
226 
227 /** Check that all slots have been handled. */
228 struct Debug_check {
229   /** @return true always. */
operator ()Debug_check230   bool operator()(const mtr_memo_slot_t *slot) const {
231     ut_a(slot->object == nullptr);
232     return (true);
233   }
234 };
235 
236 /** Add blocks modified by the mini-transaction to the flush list. */
237 struct Add_dirty_blocks_to_flush_list {
238   /** Constructor.
239   @param[in]	start_lsn	LSN of the first entry that was
240                                   added to REDO by the MTR
241   @param[in]	end_lsn		LSN after the last entry was
242                                   added to REDO by the MTR
243   @param[in,out]	observer	flush observer */
244   Add_dirty_blocks_to_flush_list(lsn_t start_lsn, lsn_t end_lsn,
245                                  FlushObserver *observer);
246 
247   /** Add the modified page to the buffer flush list. */
add_dirty_page_to_flush_listAdd_dirty_blocks_to_flush_list248   void add_dirty_page_to_flush_list(mtr_memo_slot_t *slot) const {
249     ut_ad(m_end_lsn > m_start_lsn || (m_end_lsn == 0 && m_start_lsn == 0));
250 
251 #ifndef UNIV_HOTBACKUP
252     buf_block_t *block;
253 
254     block = reinterpret_cast<buf_block_t *>(slot->object);
255 
256     buf_flush_note_modification(block, m_start_lsn, m_end_lsn,
257                                 m_flush_observer);
258 #endif /* !UNIV_HOTBACKUP */
259   }
260 
261   /** @return true always. */
operator ()Add_dirty_blocks_to_flush_list262   bool operator()(mtr_memo_slot_t *slot) const {
263     if (slot->object != nullptr) {
264       if (slot->type == MTR_MEMO_PAGE_X_FIX ||
265           slot->type == MTR_MEMO_PAGE_SX_FIX) {
266         add_dirty_page_to_flush_list(slot);
267 
268       } else if (slot->type == MTR_MEMO_BUF_FIX) {
269         buf_block_t *block;
270         block = reinterpret_cast<buf_block_t *>(slot->object);
271         if (block->made_dirty_with_no_latch) {
272           add_dirty_page_to_flush_list(slot);
273           block->made_dirty_with_no_latch = false;
274         }
275       }
276     }
277 
278     return (true);
279   }
280 
281   /** Mini-transaction REDO end LSN */
282   const lsn_t m_end_lsn;
283 
284   /** Mini-transaction REDO start LSN */
285   const lsn_t m_start_lsn;
286 
287   /** Flush observer */
288   FlushObserver *const m_flush_observer;
289 };
290 
291 /** Constructor.
292 @param[in]	start_lsn	LSN of the first entry that was added
293                                 to REDO by the MTR
294 @param[in]	end_lsn		LSN after the last entry was added
295                                 to REDO by the MTR
296 @param[in,out]	observer	flush observer */
Add_dirty_blocks_to_flush_list(lsn_t start_lsn,lsn_t end_lsn,FlushObserver * observer)297 Add_dirty_blocks_to_flush_list::Add_dirty_blocks_to_flush_list(
298     lsn_t start_lsn, lsn_t end_lsn, FlushObserver *observer)
299     : m_end_lsn(end_lsn), m_start_lsn(start_lsn), m_flush_observer(observer) {
300   /* Do nothing */
301 }
302 
303 class mtr_t::Command {
304  public:
305   /** Constructor.
306   Takes ownership of the mtr->m_impl, is responsible for deleting it.
307   @param[in,out]	mtr	mini-transaction */
Command(mtr_t * mtr)308   explicit Command(mtr_t *mtr) : m_locks_released() { init(mtr); }
309 
init(mtr_t * mtr)310   void init(mtr_t *mtr) {
311     m_impl = &mtr->m_impl;
312     m_sync = mtr->m_sync;
313   }
314 
315   /** Destructor */
~Command()316   ~Command() { ut_ad(m_impl == nullptr); }
317 
318   /** Write the redo log record, add dirty pages to the flush list and
319   release the resources. */
320   void execute();
321 
322   /** Add blocks modified in this mini-transaction to the flush list. */
323   void add_dirty_blocks_to_flush_list(lsn_t start_lsn, lsn_t end_lsn);
324 
325   /** Release both the latches and blocks used in the mini-transaction. */
326   void release_all();
327 
328   /** Release the resources */
329   void release_resources();
330 
331  private:
332 #ifndef UNIV_HOTBACKUP
333   /** Prepare to write the mini-transaction log to the redo log buffer.
334   @return number of bytes to write in finish_write() */
335   ulint prepare_write();
336 #endif /* !UNIV_HOTBACKUP */
337 
338   /** true if it is a sync mini-transaction. */
339   bool m_sync;
340 
341   /** The mini-transaction state. */
342   mtr_t::Impl *m_impl;
343 
344   /** Set to 1 after the user thread releases the latches. The log
345   writer thread must wait for this to be set to 1. */
346   volatile ulint m_locks_released;
347 };
348 
349 /* Mode update matrix. The array is indexed as [old mode][new mode].
350 All new modes for a specific old mode are in one horizontal line.
351 true : update to new mode
352 false: ignore new mode
353    A  - MTR_LOG_ALL
354    N  - MTR_LOG_NONE
355    NR - MTR_LOG_NO_REDO
356    S  - MTR_LOG_SHORT_INSERTS */
357 bool mtr_t::s_mode_update[MTR_LOG_MODE_MAX][MTR_LOG_MODE_MAX] = {
358     /*      |  A      N    NR     S  */
359     /* A */ {false, true, true, true},   /* A is default and we allow to switch
360                                             to all other modes. */
361     /* N */ {true, false, true, false},  /* For both A & NR, we can shortly
362                                              switch to N and return back*/
363     /* NR*/ {false, true, false, false}, /* Default is NR when global redo is
364                                             disabled. Allow to move to N */
365     /* S */ {true, false, false, false}  /* Only allow return back to A after
366                                             short switch from A to S */
367 };
368 #ifdef UNIV_DEBUG
369 /* Mode update validity matrix. The array is indexed as [old mode][new mode]. */
370 bool mtr_t::s_mode_update_valid[MTR_LOG_MODE_MAX][MTR_LOG_MODE_MAX] = {
371     /*      | A      N    NR    S  */
372     /* A */ {true, true, true, true}, /* No assert case. */
373 
374     /* N */ {true, true, true, true},
375 
376     /* NR*/ {true, true, true, true}, /* We generally never return back from
377                                          NR to A but need to allow for LOB
378                                          restarting B-tree mtr. */
379 
380     /* S */ {true, false, false, true} /* Short Insert state is set transiently
381                                           and we don't expect N or NR switch. */
382 };
383 #endif /* UNIV_DEBUG */
384 
385 #ifndef UNIV_HOTBACKUP
386 mtr_t::Logging mtr_t::s_logging;
387 #endif /* !UNIV_HOTBACKUP */
388 
set_log_mode(mtr_log_t mode)389 mtr_log_t mtr_t::set_log_mode(mtr_log_t mode) {
390   ut_ad(mode < MTR_LOG_MODE_MAX);
391 
392   const auto old_mode = m_impl.m_log_mode;
393   ut_ad(s_mode_update_valid[old_mode][mode]);
394 
395 #ifdef UNIV_DEBUG
396   if (mode == MTR_LOG_NO_REDO && old_mode == MTR_LOG_ALL) {
397     /* Should change to no redo mode before generating any redo. */
398     ut_ad(m_impl.m_n_log_recs == 0);
399   }
400 #endif /* UNIV_DEBUG */
401 
402   if (s_mode_update[old_mode][mode]) {
403     m_impl.m_log_mode = mode;
404   }
405 
406 #ifndef UNIV_HOTBACKUP
407   /* If we are explicitly setting no logging, this mtr doesn't need
408   logging and we can safely unmark it. */
409   if (mode == MTR_LOG_NO_REDO && mode == old_mode) {
410     check_nolog_and_unmark();
411     m_impl.m_log_mode = mode;
412   }
413 #endif /* !UNIV_HOTBACKUP */
414 
415   return (old_mode);
416 }
417 
418 /** Check if a mini-transaction is dirtying a clean page.
419 @return true if the mtr is dirtying a clean page. */
is_block_dirtied(const buf_block_t * block)420 bool mtr_t::is_block_dirtied(const buf_block_t *block) {
421   ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
422   ut_ad(block->page.buf_fix_count > 0);
423 
424   /* It is OK to read oldest_modification because no
425   other thread can be performing a write of it and it
426   is only during write that the value is reset to 0. */
427   return (block->page.oldest_modification == 0);
428 }
429 
430 #ifndef UNIV_HOTBACKUP
431 /** Write the block contents to the REDO log */
432 struct mtr_write_log_t {
433   /** Append a block to the redo log buffer.
434   @return whether the appending should continue */
operator ()mtr_write_log_t435   bool operator()(const mtr_buf_t::block_t *block) {
436     lsn_t start_lsn;
437     lsn_t end_lsn;
438 
439     ut_ad(block != nullptr);
440 
441     if (block->used() == 0) {
442       return (true);
443     }
444 
445     start_lsn = m_lsn;
446 
447     end_lsn = log_buffer_write(*log_sys, m_handle, block->begin(),
448                                block->used(), start_lsn);
449 
450     ut_a(end_lsn % OS_FILE_LOG_BLOCK_SIZE <
451          OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE);
452 
453     m_left_to_write -= block->used();
454 
455     if (m_left_to_write == 0
456         /* This write was up to the end of record group,
457         the last record in group has been written.
458 
459         Therefore next group of records starts at m_lsn.
460         We need to find out, if the next group is the first group,
461         that starts in this log block.
462 
463         In such case we need to set first_rec_group.
464 
465         Now, we could have two cases:
466         1. This group of log records has started in previous block
467            to block containing m_lsn.
468         2. This group of log records has started in the same block
469            as block containing m_lsn.
470 
471         Only in case 1), the next group of records is the first group
472         of log records in block containing m_lsn. */
473         && m_handle.start_lsn / OS_FILE_LOG_BLOCK_SIZE !=
474                end_lsn / OS_FILE_LOG_BLOCK_SIZE) {
475       log_buffer_set_first_record_group(*log_sys, m_handle, end_lsn);
476     }
477 
478     log_buffer_write_completed(*log_sys, m_handle, start_lsn, end_lsn);
479 
480     m_lsn = end_lsn;
481 
482     return (true);
483   }
484 
485   Log_handle m_handle;
486   lsn_t m_lsn;
487   ulint m_left_to_write;
488 };
489 #endif /* !UNIV_HOTBACKUP */
490 
491 /** Start a mini-transaction.
492 @param sync		true if it is a synchronous mini-transaction
493 @param read_only	true if read only mini-transaction */
start(bool sync,bool read_only)494 void mtr_t::start(bool sync, bool read_only) {
495   ut_ad(m_impl.m_state == MTR_STATE_INIT ||
496         m_impl.m_state == MTR_STATE_COMMITTED);
497 
498   UNIV_MEM_INVALID(this, sizeof(*this));
499 
500   UNIV_MEM_INVALID(&m_impl, sizeof(m_impl));
501 
502   m_sync = sync;
503 
504   m_commit_lsn = 0;
505 
506   new (&m_impl.m_log) mtr_buf_t();
507   new (&m_impl.m_memo) mtr_buf_t();
508 
509   m_impl.m_mtr = this;
510   m_impl.m_log_mode = MTR_LOG_ALL;
511   m_impl.m_inside_ibuf = false;
512   m_impl.m_modifications = false;
513   m_impl.m_made_dirty = false;
514   m_impl.m_n_log_recs = 0;
515   m_impl.m_state = MTR_STATE_ACTIVE;
516   m_impl.m_flush_observer = nullptr;
517   m_impl.m_marked_nolog = false;
518 
519 #ifndef UNIV_HOTBACKUP
520   check_nolog_and_mark();
521 #endif /* !UNIV_HOTBACKUP */
522   ut_d(m_impl.m_magic_n = MTR_MAGIC_N);
523 }
524 
525 #ifndef UNIV_HOTBACKUP
check_nolog_and_mark()526 void mtr_t::check_nolog_and_mark() {
527   /* Safe check to make this call idempotent. */
528   if (m_impl.m_marked_nolog) {
529     return;
530   }
531 
532   size_t shard_index = default_indexer_t<>::get_rnd_index();
533   m_impl.m_marked_nolog = s_logging.mark_mtr(shard_index);
534 
535   /* Disable redo logging by this mtr if logging is globally off. */
536   if (m_impl.m_marked_nolog) {
537     ut_ad(m_impl.m_log_mode == MTR_LOG_ALL);
538     m_impl.m_log_mode = MTR_LOG_NO_REDO;
539     m_impl.m_shard_index = shard_index;
540   }
541 }
542 
check_nolog_and_unmark()543 void mtr_t::check_nolog_and_unmark() {
544   if (m_impl.m_marked_nolog) {
545     s_logging.unmark_mtr(m_impl.m_shard_index);
546 
547     m_impl.m_marked_nolog = false;
548     m_impl.m_shard_index = 0;
549 
550     if (m_impl.m_log_mode == MTR_LOG_NO_REDO) {
551       /* Reset back to default mode. */
552       m_impl.m_log_mode = MTR_LOG_ALL;
553     }
554   }
555 }
556 #endif /* !UNIV_HOTBACKUP */
557 
558 /** Release the resources */
release_resources()559 void mtr_t::Command::release_resources() {
560   ut_ad(m_impl->m_magic_n == MTR_MAGIC_N);
561 
562   /* Currently only used in commit */
563   ut_ad(m_impl->m_state == MTR_STATE_COMMITTING);
564 
565 #ifdef UNIV_DEBUG
566   Debug_check release;
567   Iterate<Debug_check> iterator(release);
568 
569   m_impl->m_memo.for_each_block_in_reverse(iterator);
570 #endif /* UNIV_DEBUG */
571 
572   /* Reset the mtr buffers */
573   m_impl->m_log.erase();
574 
575   m_impl->m_memo.erase();
576 
577   m_impl->m_state = MTR_STATE_COMMITTED;
578 
579   m_impl = nullptr;
580 }
581 
582 /** Commit a mini-transaction. */
commit()583 void mtr_t::commit() {
584   ut_ad(is_active());
585   ut_ad(!is_inside_ibuf());
586   ut_ad(m_impl.m_magic_n == MTR_MAGIC_N);
587   m_impl.m_state = MTR_STATE_COMMITTING;
588 
589   DBUG_EXECUTE_IF("mtr_commit_crash", DBUG_SUICIDE(););
590 
591   Command cmd(this);
592 
593   if (m_impl.m_n_log_recs > 0 ||
594       (m_impl.m_modifications && m_impl.m_log_mode == MTR_LOG_NO_REDO)) {
595     ut_ad(!srv_read_only_mode || m_impl.m_log_mode == MTR_LOG_NO_REDO);
596 
597     cmd.execute();
598   } else {
599     cmd.release_all();
600     cmd.release_resources();
601   }
602 #ifndef UNIV_HOTBACKUP
603   check_nolog_and_unmark();
604 #endif /* !UNIV_HOTBACKUP */
605 }
606 
607 #ifndef UNIV_HOTBACKUP
608 /** Acquire a tablespace X-latch.
609 @param[in]	space		tablespace instance
610 @param[in]	file		file name from where called
611 @param[in]	line		line number in file */
x_lock_space(fil_space_t * space,const char * file,ulint line)612 void mtr_t::x_lock_space(fil_space_t *space, const char *file, ulint line) {
613   ut_ad(m_impl.m_magic_n == MTR_MAGIC_N);
614   ut_ad(is_active());
615 
616   x_lock(&space->latch, file, line);
617 }
618 
619 /** Release an object in the memo stack. */
memo_release(const void * object,ulint type)620 void mtr_t::memo_release(const void *object, ulint type) {
621   ut_ad(m_impl.m_magic_n == MTR_MAGIC_N);
622   ut_ad(is_active());
623 
624   /* We cannot release a page that has been written to in the
625   middle of a mini-transaction. */
626   ut_ad(!m_impl.m_modifications || type != MTR_MEMO_PAGE_X_FIX);
627 
628   Find find(object, type);
629   Iterate<Find> iterator(find);
630 
631   if (!m_impl.m_memo.for_each_block_in_reverse(iterator)) {
632     memo_slot_release(find.m_slot);
633   }
634 }
635 
636 /** Release a page latch.
637 @param[in]	ptr	pointer to within a page frame
638 @param[in]	type	object type: MTR_MEMO_PAGE_X_FIX, ... */
release_page(const void * ptr,mtr_memo_type_t type)639 void mtr_t::release_page(const void *ptr, mtr_memo_type_t type) {
640   ut_ad(m_impl.m_magic_n == MTR_MAGIC_N);
641   ut_ad(is_active());
642 
643   /* We cannot release a page that has been written to in the
644   middle of a mini-transaction. */
645   ut_ad(!m_impl.m_modifications || type != MTR_MEMO_PAGE_X_FIX);
646 
647   Find_page find(ptr, type);
648   Iterate<Find_page> iterator(find);
649 
650   if (!m_impl.m_memo.for_each_block_in_reverse(iterator)) {
651     memo_slot_release(find.get_slot());
652     return;
653   }
654 
655   /* The page was not found! */
656   ut_ad(0);
657 }
658 
659 /** Prepare to write the mini-transaction log to the redo log buffer.
660 @return number of bytes to write in finish_write() */
prepare_write()661 ulint mtr_t::Command::prepare_write() {
662   switch (m_impl->m_log_mode) {
663     case MTR_LOG_SHORT_INSERTS:
664       ut_ad(0);
665       /* fall through (write no redo log) */
666     case MTR_LOG_NO_REDO:
667     case MTR_LOG_NONE:
668       ut_ad(m_impl->m_log.size() == 0);
669       return (0);
670     case MTR_LOG_ALL:
671       break;
672     default:
673       ut_ad(false);
674       return (0);
675   }
676 
677   /* An ibuf merge could happen when loading page to apply log
678   records during recovery. During the ibuf merge mtr is used. */
679 
680   ut_a(!recv_recovery_is_on() || !recv_no_ibuf_operations);
681 
682   ulint len = m_impl->m_log.size();
683   ut_ad(len > 0);
684 
685   ulint n_recs = m_impl->m_n_log_recs;
686   ut_ad(n_recs > 0);
687 
688   ut_ad(log_sys != nullptr);
689 
690   ut_ad(m_impl->m_n_log_recs == n_recs);
691 
692   /* This was not the first time of dirtying a
693   tablespace since the latest checkpoint. */
694 
695   ut_ad(n_recs == m_impl->m_n_log_recs);
696 
697   if (n_recs <= 1) {
698     ut_ad(n_recs == 1);
699 
700     /* Flag the single log record as the
701     only record in this mini-transaction. */
702 
703     *m_impl->m_log.front()->begin() |= MLOG_SINGLE_REC_FLAG;
704 
705   } else {
706     /* Because this mini-transaction comprises
707     multiple log records, append MLOG_MULTI_REC_END
708     at the end. */
709 
710     mlog_catenate_ulint(&m_impl->m_log, MLOG_MULTI_REC_END, MLOG_1BYTE);
711     ++len;
712   }
713 
714   ut_ad(m_impl->m_log_mode == MTR_LOG_ALL);
715   ut_ad(m_impl->m_log.size() == len);
716   ut_ad(len > 0);
717 
718   return (len);
719 }
720 #endif /* !UNIV_HOTBACKUP */
721 
722 /** Release the latches and blocks acquired by this mini-transaction */
release_all()723 void mtr_t::Command::release_all() {
724   Release_all release;
725   Iterate<Release_all> iterator(release);
726 
727   m_impl->m_memo.for_each_block_in_reverse(iterator);
728 
729   /* Note that we have released the latches. */
730   m_locks_released = 1;
731 }
732 
733 /** Add blocks modified in this mini-transaction to the flush list. */
add_dirty_blocks_to_flush_list(lsn_t start_lsn,lsn_t end_lsn)734 void mtr_t::Command::add_dirty_blocks_to_flush_list(lsn_t start_lsn,
735                                                     lsn_t end_lsn) {
736   Add_dirty_blocks_to_flush_list add_to_flush(start_lsn, end_lsn,
737                                               m_impl->m_flush_observer);
738 
739   Iterate<Add_dirty_blocks_to_flush_list> iterator(add_to_flush);
740 
741   m_impl->m_memo.for_each_block_in_reverse(iterator);
742 }
743 
744 /** Write the redo log record, add dirty pages to the flush list and release
745 the resources. */
execute()746 void mtr_t::Command::execute() {
747   ut_ad(m_impl->m_log_mode != MTR_LOG_NONE);
748 
749   ulint len;
750 
751 #ifndef UNIV_HOTBACKUP
752   len = prepare_write();
753 
754   if (len > 0) {
755     mtr_write_log_t write_log;
756 
757     write_log.m_left_to_write = len;
758 
759     auto handle = log_buffer_reserve(*log_sys, len);
760 
761     write_log.m_handle = handle;
762     write_log.m_lsn = handle.start_lsn;
763 
764     m_impl->m_log.for_each_block(write_log);
765 
766     ut_ad(write_log.m_left_to_write == 0);
767     ut_ad(write_log.m_lsn == handle.end_lsn);
768 
769     log_wait_for_space_in_log_recent_closed(*log_sys, handle.start_lsn);
770 
771     DEBUG_SYNC_C("mtr_redo_before_add_dirty_blocks");
772 
773     add_dirty_blocks_to_flush_list(handle.start_lsn, handle.end_lsn);
774 
775     log_buffer_close(*log_sys, handle);
776 
777     m_impl->m_mtr->m_commit_lsn = handle.end_lsn;
778 
779   } else {
780     DEBUG_SYNC_C("mtr_noredo_before_add_dirty_blocks");
781 
782     add_dirty_blocks_to_flush_list(0, 0);
783   }
784 #endif /* !UNIV_HOTBACKUP */
785 
786   release_all();
787   release_resources();
788 }
789 
790 #ifndef UNIV_HOTBACKUP
enable(THD * thd)791 int mtr_t::Logging::enable(THD *thd) {
792   if (is_enabled()) {
793     return (0);
794   }
795   /* Allow mtrs to generate redo log. Concurrent clone and redo
796   log archiving is still restricted till we reach a recoverable state. */
797   ut_ad(m_state.load() == DISABLED);
798   m_state.store(ENABLED_RESTRICT);
799 
800   /* 1. Wait for all no-log mtrs to finish and add dirty pages to disk.*/
801   auto err = wait_no_log_mtr(thd);
802   if (err != 0) {
803     m_state.store(DISABLED);
804     return (err);
805   }
806 
807   /* 2. Wait for dirty pages to flush by forcing checkpoint at current LSN.
808   All no-logging page modification are done with the LSN when we stopped
809   redo logging. We need to have one write mini-transaction after enabling redo
810   to progress the system LSN and take a checkpoint. An easy way is to flush
811   the max transaction ID which is generally done at TRX_SYS_TRX_ID_WRITE_MARGIN
812   interval but safe to do any time. */
813   trx_sys_mutex_enter();
814   trx_sys_flush_max_trx_id();
815   trx_sys_mutex_exit();
816 
817   /* It would ensure that the modified page in previous mtr and all other
818   pages modified before are flushed to disk. Since there could be large
819   number of left over pages from LAD operation, we still don't enable
820   double-write at this stage. */
821   log_make_latest_checkpoint(*log_sys);
822   m_state.store(ENABLED_DBLWR);
823 
824   /* 3. Take another checkpoint after enabling double write to ensure any page
825   being written without double write are already synced to disk. */
826   log_make_latest_checkpoint(*log_sys);
827 
828   /* 4. Mark that it is safe to recover from crash. */
829   log_persist_enable(*log_sys);
830 
831   ib::warn(ER_IB_WRN_REDO_ENABLED);
832   m_state.store(ENABLED);
833 
834   return (0);
835 }
836 
disable(THD *)837 int mtr_t::Logging::disable(THD *) {
838   if (is_disabled()) {
839     return (0);
840   }
841 
842   /* Disallow archiving to start. */
843   ut_ad(m_state.load() == ENABLED);
844   m_state.store(ENABLED_RESTRICT);
845 
846   /* Check if redo log archiving is active. */
847   if (meb::redo_log_archive_is_active()) {
848     m_state.store(ENABLED);
849     my_error(ER_INNODB_REDO_ARCHIVING_ENABLED, MYF(0));
850     return (ER_INNODB_REDO_ARCHIVING_ENABLED);
851   }
852 
853   /* Concurrent clone is blocked by BACKUP MDL lock except when
854   clone_ddl_timeout = 0. Force any existing clone to abort. */
855   clone_mark_abort(true);
856   ut_ad(!clone_check_active());
857 
858   /* Mark that it is unsafe to crash going forward. */
859   log_persist_disable(*log_sys);
860 
861   ib::warn(ER_IB_WRN_REDO_DISABLED);
862   m_state.store(DISABLED);
863 
864   clone_mark_active();
865 
866   /* Reset sync LSN if beyond current system LSN. */
867   reset_buf_flush_sync_lsn();
868 
869   return (0);
870 }
871 
wait_no_log_mtr(THD * thd)872 int mtr_t::Logging::wait_no_log_mtr(THD *thd) {
873   auto wait_cond = [&](bool alert, bool &result) {
874     if (Counter::total(m_count_nologging_mtr) == 0) {
875       result = false;
876       return (0);
877     }
878     result = true;
879 
880     if (thd_killed(thd)) {
881       my_error(ER_QUERY_INTERRUPTED, MYF(0));
882       return (ER_QUERY_INTERRUPTED);
883     }
884     return (0);
885   };
886 
887   /* Sleep for 1 millisecond */
888   Clone_Msec sleep_time(10);
889   /* Generate alert message every 5 second. */
890   Clone_Sec alert_interval(5);
891   /* Wait for 5 minutes. */
892   Clone_Sec time_out(Clone_Min(5));
893 
894   bool is_timeout = false;
895   auto err = Clone_Sys::wait(sleep_time, time_out, alert_interval, wait_cond,
896                              nullptr, is_timeout);
897 
898   if (err == 0 && is_timeout) {
899     ut_ad(false);
900     my_error(ER_INTERNAL_ERROR, MYF(0),
901              "Innodb wait for no-log mtr timed out.");
902     err = ER_INTERNAL_ERROR;
903   }
904 
905   return (err);
906 }
907 
908 #ifdef UNIV_DEBUG
909 /** Check if memo contains the given item.
910 @return	true if contains */
memo_contains(mtr_buf_t * memo,const void * object,ulint type)911 bool mtr_t::memo_contains(mtr_buf_t *memo, const void *object, ulint type) {
912   Find find(object, type);
913   Iterate<Find> iterator(find);
914 
915   return (!memo->for_each_block_in_reverse(iterator));
916 }
917 
918 /** Debug check for flags */
919 struct FlaggedCheck {
FlaggedCheckFlaggedCheck920   FlaggedCheck(const void *ptr, ulint flags) : m_ptr(ptr), m_flags(flags) {
921     // Do nothing
922   }
923 
operator ()FlaggedCheck924   bool operator()(const mtr_memo_slot_t *slot) const {
925     if (m_ptr == slot->object && (m_flags & slot->type)) {
926       return (false);
927     }
928 
929     return (true);
930   }
931 
932   const void *m_ptr;
933   ulint m_flags;
934 };
935 
936 /** Check if memo contains the given item.
937 @param ptr		object to search
938 @param flags		specify types of object (can be ORred) of
939                         MTR_MEMO_PAGE_S_FIX ... values
940 @return true if contains */
memo_contains_flagged(const void * ptr,ulint flags) const941 bool mtr_t::memo_contains_flagged(const void *ptr, ulint flags) const {
942   ut_ad(m_impl.m_magic_n == MTR_MAGIC_N);
943   ut_ad(is_committing() || is_active());
944 
945   FlaggedCheck check(ptr, flags);
946   Iterate<FlaggedCheck> iterator(check);
947 
948   return (!m_impl.m_memo.for_each_block_in_reverse(iterator));
949 }
950 
951 /** Check if memo contains the given page.
952 @param[in]	ptr	pointer to within buffer frame
953 @param[in]	flags	specify types of object with OR of
954                         MTR_MEMO_PAGE_S_FIX... values
955 @return	the block
956 @retval	NULL	if not found */
memo_contains_page_flagged(const byte * ptr,ulint flags) const957 buf_block_t *mtr_t::memo_contains_page_flagged(const byte *ptr,
958                                                ulint flags) const {
959   Find_page check(ptr, flags);
960   Iterate<Find_page> iterator(check);
961 
962   return (m_impl.m_memo.for_each_block_in_reverse(iterator)
963               ? nullptr
964               : check.get_block());
965 }
966 
967 /** Mark the given latched page as modified.
968 @param[in]	ptr	pointer to within buffer frame */
memo_modify_page(const byte * ptr)969 void mtr_t::memo_modify_page(const byte *ptr) {
970   buf_block_t *block = memo_contains_page_flagged(
971       ptr, MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX);
972   ut_ad(block != nullptr);
973 
974   if (!memo_contains(get_memo(), block, MTR_MEMO_MODIFY)) {
975     memo_push(block, MTR_MEMO_MODIFY);
976   }
977 }
978 
979 /** Print info of an mtr handle. */
print() const980 void mtr_t::print() const {
981   ib::info(ER_IB_MSG_1275) << "Mini-transaction handle: memo size "
982                            << m_impl.m_memo.size() << " bytes log size "
983                            << get_log()->size() << " bytes";
984 }
985 
mtr_commit_mlog_test(log_t & log,size_t payload)986 lsn_t mtr_commit_mlog_test(log_t &log, size_t payload) {
987   constexpr size_t MAX_PAYLOAD_SIZE = 1024;
988   ut_a(payload <= MAX_PAYLOAD_SIZE);
989 
990   /* Create MLOG_TEST record in the memory. */
991   byte record[MLOG_TEST_REC_OVERHEAD + MAX_PAYLOAD_SIZE];
992 
993   byte *record_end =
994       Log_test::create_mlog_rec(record, 1, MLOG_TEST_VALUE, payload);
995 
996   const size_t rec_len = record_end - record;
997 
998   mtr_t mtr;
999   mtr_start(&mtr);
1000 
1001   /* Copy the created MLOG_TEST to mtr's local buffer. */
1002   byte *dst = nullptr;
1003   bool success = mlog_open(&mtr, rec_len, dst);
1004   ut_a(success);
1005   std::memcpy(dst, record, rec_len);
1006   mlog_close(&mtr, dst + rec_len);
1007 
1008   mtr.added_rec();
1009 
1010   ut_ad(mtr.get_expected_log_size() == MLOG_TEST_REC_OVERHEAD + payload);
1011 
1012   mtr_commit(&mtr);
1013 
1014   return (mtr.commit_lsn());
1015 }
1016 
mtr_commit_mlog_test_filling_block_low(log_t & log,size_t req_space_left,size_t recursive_level)1017 static void mtr_commit_mlog_test_filling_block_low(log_t &log,
1018                                                    size_t req_space_left,
1019                                                    size_t recursive_level) {
1020   ut_a(req_space_left <= LOG_BLOCK_DATA_SIZE);
1021 
1022   /* Compute how much free space we have in current log block. */
1023   const lsn_t current_lsn = log_get_lsn(log);
1024   size_t cur_space_left = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE -
1025                           current_lsn % OS_FILE_LOG_BLOCK_SIZE;
1026 
1027   /* Subtract minimum space required for a single MLOG_TEST. */
1028   if (cur_space_left < MLOG_TEST_REC_OVERHEAD) {
1029     /* Even the smallest MLOG_TEST was not fitting the left space,
1030     so we will need to use the next log block too. */
1031     cur_space_left += LOG_BLOCK_DATA_SIZE;
1032   }
1033   cur_space_left -= MLOG_TEST_REC_OVERHEAD;
1034 
1035   /* Compute how big payload is required to leave exactly the provided
1036   req_space_left bytes free in last block. */
1037   size_t payload;
1038   if (cur_space_left < req_space_left) {
1039     /* We requested to leave more free bytes, than we have currently
1040     in the last block, we need to use the next log block. */
1041     payload = cur_space_left + LOG_BLOCK_DATA_SIZE - req_space_left;
1042   } else {
1043     payload = cur_space_left - req_space_left;
1044   }
1045 
1046   /* Check if size of the record fits the maximum allowed size, which
1047   is defined by the dyn_buf_t used in mtr_t (mtr_buf_t). */
1048 
1049   if (MLOG_TEST_REC_OVERHEAD + payload <= mtr_buf_t::MAX_DATA_SIZE) {
1050     mtr_commit_mlog_test(*log_sys, payload);
1051   } else {
1052     /* It does not fit, so we need to write as much as possible here,
1053     but keep in mind that next record will need to take at least
1054     MLOG_TEST_REC_OVERHEAD bytes. Fortunately the MAX_DATA_SIZE is
1055     always at least twice larger than the MLOG_TEST_REC_OVERHEAD,
1056     so the payload has to be larger than MLOG_TEST_REC_OVERHEAD. */
1057     ut_ad(mtr_buf_t::MAX_DATA_SIZE >= MLOG_TEST_REC_OVERHEAD * 2);
1058     ut_a(payload > MLOG_TEST_REC_OVERHEAD);
1059 
1060     /* Subtract space which we will consume by usage of next record.
1061     The remaining space is maximum we are allowed to consume within
1062     this record. */
1063     payload -= MLOG_TEST_REC_OVERHEAD;
1064 
1065     if (MLOG_TEST_REC_OVERHEAD + payload > mtr_buf_t::MAX_DATA_SIZE) {
1066       /* We still cannot fit mtr_buf_t::MAX_DATA_SIZE bytes, so write
1067       as much as possible within this record. */
1068       payload = mtr_buf_t::MAX_DATA_SIZE - MLOG_TEST_REC_OVERHEAD;
1069     }
1070 
1071     /* Write this MLOG_TEST record. */
1072     mtr_commit_mlog_test(*log_sys, payload);
1073 
1074     /* Compute upper bound for maximum level of recursion that is ever possible.
1075     This is to verify the guarantee that we don't go to deep.
1076 
1077     We do not want to depend on actual difference between the
1078     mtr_buf_t::MAX_DATA_SIZE and LOG_BLOCK_DATA_SIZE.
1079 
1080     Note that mtr_buf_t::MAX_DATA_SIZE is the maximum size of log record we
1081     could add. The LOG_BLOCK_DATA_SIZE consists of LOG_BLOCK_DATA_SIZE /
1082     mtr_buf_t::MAX_DATA_SIZE records of mtr_buf_t::MAX_DATA_SIZE size each (0 if
1083     MAX_DATA_SIZE is larger than the LOG_BLOCK_DATA_SIZE). If we shifted these
1084     records then possibly 2 more records are required at boundaries (beginning
1085     and ending) to cover the whole range. If the last record would not end at
1086     proper offset, we decrease its payload. If we needed to move its end to even
1087     smaller offset from beginning of log block than we reach with payload=0,
1088     then we subtract up to MLOG_TEST_REC_OVERHEAD bytes from payload of previous
1089     record, which is always possible because:
1090       MAX_DATA_SIZE - MLOG_TEST_REC_OVERHEAD >= MLOG_TEST_REC_OVERHEAD.
1091 
1092     If the initial free space minus MLOG_TEST_REC_OVERHEAD is smaller than the
1093     requested free space, then we need to move forward by at most
1094     LOG_BLOCK_DATA_SIZE bytes. For that we need at most LOG_BLOCK_DATA_SIZE /
1095     mtr_buf_t::MAX_DATA_SIZE + 2 records shifted in the way described above.
1096 
1097     This solution is reached by the loop of writing MAX_DATA_SIZE records until
1098     the distance to target is <= MAX_DATA_SIZE + MLOG_TEST_REC_OVERHEAD, in
1099     which case we adjust size of next record to end it exactly
1100     MLOG_TEST_REC_OVERHEAD bytes before the target (this is why we subtract
1101     MLOG_TEST_REC_OVERHEAD from payload). Then next recursive call will have an
1102     easy task of adding record with payload=0. The loop mentioned above is
1103     implemented by the recursion. */
1104     constexpr auto MAX_REC_N =
1105         LOG_BLOCK_DATA_SIZE / mtr_buf_t::MAX_DATA_SIZE + 2;
1106 
1107     ut_a(recursive_level + 1 <= MAX_REC_N);
1108 
1109     /* Write next MLOG_TEST record(s). */
1110     mtr_commit_mlog_test_filling_block_low(log, req_space_left,
1111                                            recursive_level + 1);
1112   }
1113 }
1114 
mtr_commit_mlog_test_filling_block(log_t & log,size_t req_space_left)1115 void mtr_commit_mlog_test_filling_block(log_t &log, size_t req_space_left) {
1116   mtr_commit_mlog_test_filling_block_low(log, req_space_left, 1);
1117 }
1118 
wait_for_flush()1119 void mtr_t::wait_for_flush() {
1120   ut_ad(commit_lsn() > 0);
1121   log_write_up_to(*log_sys, commit_lsn(), true);
1122 }
1123 
1124 #endif /* UNIV_DEBUG */
1125 #endif /* !UNIV_HOTBACKUP */
1126