1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file mtr/mtr0mtr.cc
22 Mini-transaction buffer
23 
24 Created 11/26/1995 Heikki Tuuri
25 *******************************************************/
26 
27 #include "mtr0mtr.h"
28 
29 #include "buf0buf.h"
30 #include "buf0flu.h"
31 #include "fsp0sysspace.h"
32 #include "page0types.h"
33 #include "mtr0log.h"
34 #include "log0recv.h"
35 
36 /** Iterate over a memo block in reverse. */
37 template <typename Functor>
38 struct CIterate {
CIterateCIterate39 	CIterate() : functor() {}
40 
CIterateCIterate41 	CIterate(const Functor& functor) : functor(functor) {}
42 
43 	/** @return false if the functor returns false. */
operator ()CIterate44 	bool operator()(mtr_buf_t::block_t* block) const
45 	{
46 		const mtr_memo_slot_t*	start =
47 			reinterpret_cast<const mtr_memo_slot_t*>(
48 				block->begin());
49 
50 		mtr_memo_slot_t*	slot =
51 			reinterpret_cast<mtr_memo_slot_t*>(
52 				block->end());
53 
54 		ut_ad(!(block->used() % sizeof(*slot)));
55 
56 		while (slot-- != start) {
57 
58 			if (!functor(slot)) {
59 				return(false);
60 			}
61 		}
62 
63 		return(true);
64 	}
65 
66 	Functor functor;
67 };
68 
69 template <typename Functor>
70 struct Iterate {
IterateIterate71 	Iterate() : functor() {}
72 
IterateIterate73 	Iterate(const Functor& functor) : functor(functor) {}
74 
75 	/** @return false if the functor returns false. */
operator ()Iterate76 	bool operator()(mtr_buf_t::block_t* block)
77 	{
78 		const mtr_memo_slot_t*	start =
79 			reinterpret_cast<const mtr_memo_slot_t*>(
80 				block->begin());
81 
82 		mtr_memo_slot_t*	slot =
83 			reinterpret_cast<mtr_memo_slot_t*>(
84 				block->end());
85 
86 		ut_ad(!(block->used() % sizeof(*slot)));
87 
88 		while (slot-- != start) {
89 
90 			if (!functor(slot)) {
91 				return(false);
92 			}
93 		}
94 
95 		return(true);
96 	}
97 
98 	Functor functor;
99 };
100 
101 /** Find specific object */
102 struct Find {
103 
104 	/** Constructor */
FindFind105 	Find(const void* object, ulint type)
106 		:
107 		m_slot(),
108 		m_type(type),
109 		m_object(object)
110 	{
111 		ut_a(object != NULL);
112 	}
113 
114 	/** @return false if the object was found. */
operator ()Find115 	bool operator()(mtr_memo_slot_t* slot)
116 	{
117 		if (m_object == slot->object && m_type == slot->type) {
118 			m_slot = slot;
119 			return(false);
120 		}
121 
122 		return(true);
123 	}
124 
125 	/** Slot if found */
126 	mtr_memo_slot_t*m_slot;
127 
128 	/** Type of the object to look for */
129 	ulint		m_type;
130 
131 	/** The object instance to look for */
132 	const void*	m_object;
133 };
134 
135 /** Find a page frame */
136 struct FindPage
137 {
138 	/** Constructor
139 	@param[in]	ptr	pointer to within a page frame
140 	@param[in]	flags	MTR_MEMO flags to look for */
FindPageFindPage141 	FindPage(const void* ptr, ulint flags)
142 		: m_ptr(ptr), m_flags(flags), m_slot(NULL)
143 	{
144 		/* There must be some flags to look for. */
145 		ut_ad(flags);
146 		/* We can only look for page-related flags. */
147 		ut_ad(!(flags & ulint(~(MTR_MEMO_PAGE_S_FIX
148 					| MTR_MEMO_PAGE_X_FIX
149 					| MTR_MEMO_PAGE_SX_FIX
150 					| MTR_MEMO_BUF_FIX
151 					| MTR_MEMO_MODIFY))));
152 	}
153 
154 	/** Visit a memo entry.
155 	@param[in]	slot	memo entry to visit
156 	@retval	false	if a page was found
157 	@retval	true	if the iteration should continue */
operator ()FindPage158 	bool operator()(mtr_memo_slot_t* slot)
159 	{
160 		ut_ad(m_slot == NULL);
161 
162 		if (!(m_flags & slot->type) || slot->object == NULL) {
163 			return(true);
164 		}
165 
166 		buf_block_t* block = reinterpret_cast<buf_block_t*>(
167 			slot->object);
168 
169 		if (m_ptr < block->frame
170 		    || m_ptr >= block->frame + srv_page_size) {
171 			return(true);
172 		}
173 
174 		ut_ad(!(m_flags & (MTR_MEMO_PAGE_S_FIX
175 				   | MTR_MEMO_PAGE_SX_FIX
176 				   | MTR_MEMO_PAGE_X_FIX))
177 		      || rw_lock_own_flagged(&block->lock, m_flags));
178 
179 		m_slot = slot;
180 		return(false);
181 	}
182 
183 	/** @return the slot that was found */
get_slotFindPage184 	mtr_memo_slot_t* get_slot() const
185 	{
186 		ut_ad(m_slot != NULL);
187 		return(m_slot);
188 	}
189 	/** @return the block that was found */
get_blockFindPage190 	buf_block_t* get_block() const
191 	{
192 		return(reinterpret_cast<buf_block_t*>(get_slot()->object));
193 	}
194 private:
195 	/** Pointer inside a page frame to look for */
196 	const void*const	m_ptr;
197 	/** MTR_MEMO flags to look for */
198 	const ulint		m_flags;
199 	/** The slot corresponding to m_ptr */
200 	mtr_memo_slot_t*	m_slot;
201 };
202 
203 /** Release latches and decrement the buffer fix count.
204 @param slot	memo slot */
memo_slot_release(mtr_memo_slot_t * slot)205 static void memo_slot_release(mtr_memo_slot_t *slot)
206 {
207   switch (slot->type) {
208 #ifdef UNIV_DEBUG
209   default:
210     ut_ad(!"invalid type");
211     break;
212   case MTR_MEMO_MODIFY:
213     break;
214 #endif /* UNIV_DEBUG */
215   case MTR_MEMO_S_LOCK:
216     rw_lock_s_unlock(reinterpret_cast<rw_lock_t*>(slot->object));
217     break;
218   case MTR_MEMO_SX_LOCK:
219     rw_lock_sx_unlock(reinterpret_cast<rw_lock_t*>(slot->object));
220     break;
221   case MTR_MEMO_SPACE_X_LOCK:
222     {
223       fil_space_t *space= static_cast<fil_space_t*>(slot->object);
224       space->committed_size= space->size;
225       rw_lock_x_unlock(&space->latch);
226     }
227     break;
228   case MTR_MEMO_X_LOCK:
229     rw_lock_x_unlock(reinterpret_cast<rw_lock_t*>(slot->object));
230     break;
231   case MTR_MEMO_BUF_FIX:
232   case MTR_MEMO_PAGE_S_FIX:
233   case MTR_MEMO_PAGE_SX_FIX:
234   case MTR_MEMO_PAGE_X_FIX:
235     buf_block_t *block= reinterpret_cast<buf_block_t*>(slot->object);
236     buf_page_release_latch(block, slot->type);
237     block->unfix();
238     break;
239   }
240   slot->object= NULL;
241 }
242 
243 /** Release the latches acquired by the mini-transaction. */
244 struct ReleaseLatches {
245   /** @return true always. */
operator ()ReleaseLatches246   bool operator()(mtr_memo_slot_t *slot) const
247   {
248     if (!slot->object)
249       return true;
250     switch (slot->type) {
251 #ifdef UNIV_DEBUG
252     default:
253       ut_ad(!"invalid type");
254       break;
255     case MTR_MEMO_MODIFY:
256       break;
257 #endif /* UNIV_DEBUG */
258     case MTR_MEMO_S_LOCK:
259       rw_lock_s_unlock(reinterpret_cast<rw_lock_t*>(slot->object));
260       break;
261     case MTR_MEMO_SPACE_X_LOCK:
262       {
263         fil_space_t *space= static_cast<fil_space_t*>(slot->object);
264         space->committed_size= space->size;
265         rw_lock_x_unlock(&space->latch);
266       }
267       break;
268     case MTR_MEMO_X_LOCK:
269       rw_lock_x_unlock(reinterpret_cast<rw_lock_t*>(slot->object));
270       break;
271     case MTR_MEMO_SX_LOCK:
272       rw_lock_sx_unlock(reinterpret_cast<rw_lock_t*>(slot->object));
273       break;
274     case MTR_MEMO_BUF_FIX:
275     case MTR_MEMO_PAGE_S_FIX:
276     case MTR_MEMO_PAGE_SX_FIX:
277     case MTR_MEMO_PAGE_X_FIX:
278       buf_block_t *block= reinterpret_cast<buf_block_t*>(slot->object);
279       buf_page_release_latch(block, slot->type);
280       block->unfix();
281       break;
282     }
283     slot->object= NULL;
284     return true;
285   }
286 };
287 
288 /** Release the latches and blocks acquired by the mini-transaction. */
289 struct ReleaseAll {
290   /** @return true always. */
operator ()ReleaseAll291   bool operator()(mtr_memo_slot_t *slot) const
292   {
293     if (slot->object)
294       memo_slot_release(slot);
295     return true;
296   }
297 };
298 
299 #ifdef UNIV_DEBUG
300 /** Check that all slots have been handled. */
301 struct DebugCheck {
302 	/** @return true always. */
operator ()DebugCheck303 	bool operator()(const mtr_memo_slot_t* slot) const
304 	{
305 		ut_ad(!slot->object);
306 		return(true);
307 	}
308 };
309 #endif
310 
311 /** Release a resource acquired by the mini-transaction. */
312 struct ReleaseBlocks {
313 	/** Release specific object */
ReleaseBlocksReleaseBlocks314 	ReleaseBlocks(lsn_t start_lsn, lsn_t end_lsn, FlushObserver* observer)
315 		:
316 		m_end_lsn(end_lsn),
317 		m_start_lsn(start_lsn),
318 		m_flush_observer(observer)
319 	{
320 		/* Do nothing */
321 	}
322 
323 	/** Add the modified page to the buffer flush list. */
add_dirty_page_to_flush_listReleaseBlocks324 	void add_dirty_page_to_flush_list(mtr_memo_slot_t* slot) const
325 	{
326 		ut_ad(m_end_lsn > 0);
327 		ut_ad(m_start_lsn > 0);
328 
329 		buf_block_t*	block;
330 
331 		block = reinterpret_cast<buf_block_t*>(slot->object);
332 
333 		buf_flush_note_modification(block, m_start_lsn,
334 					    m_end_lsn, m_flush_observer);
335 	}
336 
337 	/** @return true always. */
operator ()ReleaseBlocks338 	bool operator()(mtr_memo_slot_t* slot) const
339 	{
340 		if (slot->object != NULL) {
341 
342 			if (slot->type == MTR_MEMO_PAGE_X_FIX
343 			    || slot->type == MTR_MEMO_PAGE_SX_FIX) {
344 
345 				add_dirty_page_to_flush_list(slot);
346 			}
347 		}
348 
349 		return(true);
350 	}
351 
352 	/** Mini-transaction REDO start LSN */
353 	lsn_t		m_end_lsn;
354 
355 	/** Mini-transaction REDO end LSN */
356 	lsn_t		m_start_lsn;
357 
358 	/** Flush observer */
359 	FlushObserver*	m_flush_observer;
360 };
361 
362 /** Write the block contents to the REDO log */
363 struct mtr_write_log_t {
364 	/** Append a block to the redo log buffer.
365 	@return whether the appending should continue */
operator ()mtr_write_log_t366 	bool operator()(const mtr_buf_t::block_t* block) const
367 	{
368 		log_write_low(block->begin(), block->used());
369 		return(true);
370 	}
371 };
372 
373 /** Append records to the system-wide redo log buffer.
374 @param[in]	log	redo log records */
375 void
mtr_write_log(const mtr_buf_t * log)376 mtr_write_log(
377 	const mtr_buf_t*	log)
378 {
379 	const ulint	len = log->size();
380 	mtr_write_log_t	write_log;
381 
382 	ut_ad(!recv_no_log_write);
383 	DBUG_PRINT("ib_log",
384 		   (ULINTPF " extra bytes written at " LSN_PF,
385 		    len, log_sys.lsn));
386 
387 	log_reserve_and_open(len);
388 	log->for_each_block(write_log);
389 	log_close();
390 }
391 
392 /** Start a mini-transaction. */
start()393 void mtr_t::start()
394 {
395   MEM_UNDEFINED(this, sizeof *this);
396 
397   new(&m_memo) mtr_buf_t();
398   new(&m_log) mtr_buf_t();
399 
400   m_made_dirty= false;
401   m_inside_ibuf= false;
402   m_modifications= false;
403   m_n_log_recs= 0;
404   m_log_mode= MTR_LOG_ALL;
405   ut_d(m_user_space_id= TRX_SYS_SPACE);
406   m_user_space= NULL;
407   m_state= MTR_STATE_ACTIVE;
408   m_flush_observer= NULL;
409   m_commit_lsn= 0;
410 }
411 
412 /** Release the resources */
release_resources()413 inline void mtr_t::release_resources()
414 {
415   ut_d(m_memo.for_each_block_in_reverse(CIterate<DebugCheck>()));
416   m_log.erase();
417   m_memo.erase();
418   m_state= MTR_STATE_COMMITTED;
419 }
420 
421 /** Commit a mini-transaction. */
422 void
commit()423 mtr_t::commit()
424 {
425   ut_ad(is_active());
426   ut_ad(!is_inside_ibuf());
427 
428   /* This is a dirty read, for debugging. */
429   ut_ad(!m_modifications || !recv_no_log_write);
430   ut_ad(!m_modifications || m_log_mode != MTR_LOG_NONE);
431 
432   if (m_modifications
433       && (m_n_log_recs || m_log_mode == MTR_LOG_NO_REDO))
434   {
435     ut_ad(!srv_read_only_mode || m_log_mode == MTR_LOG_NO_REDO);
436 
437     lsn_t start_lsn;
438 
439     if (const ulint len= prepare_write())
440       start_lsn= finish_write(len);
441     else
442       start_lsn= m_commit_lsn;
443 
444     if (m_made_dirty)
445       log_flush_order_mutex_enter();
446 
447     /* It is now safe to release the log mutex because the
448     flush_order mutex will ensure that we are the first one
449     to insert into the flush list. */
450     log_mutex_exit();
451 
452     m_memo.for_each_block_in_reverse(CIterate<const ReleaseBlocks>
453                                      (ReleaseBlocks(start_lsn, m_commit_lsn,
454                                                     m_flush_observer)));
455     if (m_made_dirty)
456       log_flush_order_mutex_exit();
457 
458     m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>());
459   }
460   else
461     m_memo.for_each_block_in_reverse(CIterate<ReleaseAll>());
462 
463   release_resources();
464 }
465 
466 #ifdef UNIV_DEBUG
467 /** Check that all pages belong to a shrunk tablespace. */
468 struct Shrink
469 {
470   const fil_space_t &space;
ShrinkShrink471   Shrink(const fil_space_t &space) : space(space) {}
472 
operator ()Shrink473   bool operator()(const mtr_memo_slot_t *slot) const
474   {
475     if (!slot->object)
476       return true;
477     switch (slot->type) {
478     default:
479       ut_ad("invalid type" == 0);
480       return false;
481     case MTR_MEMO_MODIFY:
482       break;
483     case MTR_MEMO_SPACE_X_LOCK:
484       ut_ad(&space == slot->object);
485       return true;
486     case MTR_MEMO_PAGE_X_FIX:
487     case MTR_MEMO_PAGE_SX_FIX:
488       const buf_page_t &bpage= static_cast<buf_block_t*>(slot->object)->page;
489       const page_id_t &id= bpage.id;
490       if (id.space() == 0 && id.page_no() == TRX_SYS_PAGE_NO)
491       {
492         ut_ad(srv_is_undo_tablespace(space.id));
493         break;
494       }
495       ut_ad(id.space() == space.id);
496       ut_ad(id.page_no() < space.size);
497       ut_ad(bpage.state == BUF_BLOCK_FILE_PAGE);
498       ut_ad(!bpage.oldest_modification);
499       break;
500     }
501     return true;
502   }
503 };
504 #endif
505 
506 /** Commit a mini-transaction that is shrinking a tablespace.
507 @param space   tablespace that is being shrunk */
commit_shrink(fil_space_t & space)508 void mtr_t::commit_shrink(fil_space_t &space)
509 {
510   ut_ad(is_active());
511   ut_ad(!is_inside_ibuf());
512   ut_ad(!high_level_read_only);
513   ut_ad(m_modifications);
514   ut_ad(m_made_dirty);
515   ut_ad(!recv_recovery_is_on());
516   ut_ad(m_log_mode == MTR_LOG_ALL);
517   ut_ad(UT_LIST_GET_LEN(space.chain) == 1);
518 
519   log_write_and_flush_prepare();
520 
521   const lsn_t start_lsn= finish_write(prepare_write());
522 
523   log_flush_order_mutex_enter();
524   /* Durably write the reduced FSP_SIZE before truncating the data file. */
525   log_write_and_flush();
526 
527   os_file_truncate(space.chain.start->name, space.chain.start->handle,
528                    os_offset_t(space.size) << srv_page_size_shift, true);
529 
530   ut_d(m_memo.for_each_block_in_reverse(CIterate<Shrink>(space)));
531 
532   m_memo.for_each_block_in_reverse(CIterate<const ReleaseBlocks>
533                                    (ReleaseBlocks(start_lsn, m_commit_lsn,
534                                                   m_flush_observer)));
535   log_flush_order_mutex_exit();
536 
537   mutex_enter(&fil_system.mutex);
538   ut_ad(space.is_being_truncated);
539   space.is_being_truncated= false;
540   space.set_stopping(false);
541   mutex_exit(&fil_system.mutex);
542 
543   m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>());
544   srv_stats.log_write_requests.inc();
545 
546   release_resources();
547 }
548 
549 /** Commit a mini-transaction that did not modify any pages,
550 but generated some redo log on a higher level, such as
551 MLOG_FILE_NAME records and a MLOG_CHECKPOINT marker.
552 The caller must invoke log_mutex_enter() and log_mutex_exit().
553 This is to be used at log_checkpoint().
554 @param[in]	checkpoint_lsn		the LSN of the log checkpoint
555 @param[in]	write_mlog_checkpoint	Write MLOG_CHECKPOINT marker
556 					if it is enabled. */
557 void
commit_checkpoint(lsn_t checkpoint_lsn,bool write_mlog_checkpoint)558 mtr_t::commit_checkpoint(
559 	lsn_t	checkpoint_lsn,
560 	bool	write_mlog_checkpoint)
561 {
562 	ut_ad(log_mutex_own());
563 	ut_ad(is_active());
564 	ut_ad(!is_inside_ibuf());
565 	ut_ad(get_log_mode() == MTR_LOG_ALL);
566 	ut_ad(!m_made_dirty);
567 	ut_ad(m_memo.size() == 0);
568 	ut_ad(!srv_read_only_mode);
569 	ut_ad(write_mlog_checkpoint || m_n_log_recs > 1);
570 
571 	switch (m_n_log_recs) {
572 	case 0:
573 		break;
574 	case 1:
575 		*m_log.front()->begin() |= MLOG_SINGLE_REC_FLAG;
576 		break;
577 	default:
578 		mlog_catenate_ulint(&m_log, MLOG_MULTI_REC_END, MLOG_1BYTE);
579 	}
580 
581 	if (write_mlog_checkpoint) {
582 		byte*	ptr = m_log.push<byte*>(SIZE_OF_MLOG_CHECKPOINT);
583 		compile_time_assert(SIZE_OF_MLOG_CHECKPOINT == 1 + 8);
584 		*ptr = MLOG_CHECKPOINT;
585 		mach_write_to_8(ptr + 1, checkpoint_lsn);
586 	}
587 
588 	finish_write(m_log.size());
589 	release_resources();
590 
591 	if (write_mlog_checkpoint) {
592 		DBUG_PRINT("ib_log",
593 			   ("MLOG_CHECKPOINT(" LSN_PF ") written at " LSN_PF,
594 			    checkpoint_lsn, log_sys.lsn));
595 	}
596 }
597 
598 #ifdef UNIV_DEBUG
599 /** Check if a tablespace is associated with the mini-transaction
600 (needed for generating a MLOG_FILE_NAME record)
601 @param[in]	space	tablespace
602 @return whether the mini-transaction is associated with the space */
603 bool
is_named_space(ulint space) const604 mtr_t::is_named_space(ulint space) const
605 {
606 	ut_ad(!m_user_space || m_user_space->id != TRX_SYS_SPACE);
607 
608 	switch (get_log_mode()) {
609 	case MTR_LOG_NONE:
610 	case MTR_LOG_NO_REDO:
611 		return(true);
612 	case MTR_LOG_ALL:
613 	case MTR_LOG_SHORT_INSERTS:
614 		return(m_user_space_id == space
615 		       || is_predefined_tablespace(space));
616 	}
617 
618 	ut_error;
619 	return(false);
620 }
621 /** Check if a tablespace is associated with the mini-transaction
622 (needed for generating a MLOG_FILE_NAME record)
623 @param[in]	space	tablespace
624 @return whether the mini-transaction is associated with the space */
is_named_space(const fil_space_t * space) const625 bool mtr_t::is_named_space(const fil_space_t* space) const
626 {
627   ut_ad(!m_user_space || m_user_space->id != TRX_SYS_SPACE);
628 
629   switch (get_log_mode()) {
630   case MTR_LOG_NONE:
631   case MTR_LOG_NO_REDO:
632     return true;
633   case MTR_LOG_ALL:
634   case MTR_LOG_SHORT_INSERTS:
635     return m_user_space == space || is_predefined_tablespace(space->id);
636   }
637 
638   ut_error;
639   return false;
640 }
641 #endif /* UNIV_DEBUG */
642 
643 /** Acquire a tablespace X-latch.
644 NOTE: use mtr_x_lock_space().
645 @param[in]	space_id	tablespace ID
646 @param[in]	file		file name from where called
647 @param[in]	line		line number in file
648 @return the tablespace object (never NULL) */
649 fil_space_t*
x_lock_space(ulint space_id,const char * file,unsigned line)650 mtr_t::x_lock_space(ulint space_id, const char* file, unsigned line)
651 {
652 	fil_space_t*	space;
653 
654 	ut_ad(is_active());
655 
656 	if (space_id == TRX_SYS_SPACE) {
657 		space = fil_system.sys_space;
658 	} else if ((space = m_user_space) && space_id == space->id) {
659 	} else {
660 		space = fil_space_get(space_id);
661 		ut_ad(get_log_mode() != MTR_LOG_NO_REDO
662 		      || space->purpose == FIL_TYPE_TEMPORARY
663 		      || space->purpose == FIL_TYPE_IMPORT
664 		      || space->redo_skipped_count > 0);
665 	}
666 
667 	ut_ad(space);
668 	ut_ad(space->id == space_id);
669 	x_lock_space(space, file, line);
670 	return(space);
671 }
672 
673 /** Release an object in the memo stack.
674 @return true if released */
675 bool
memo_release(const void * object,ulint type)676 mtr_t::memo_release(const void* object, ulint type)
677 {
678 	ut_ad(is_active());
679 
680 	/* We cannot release a page that has been written to in the
681 	middle of a mini-transaction. */
682 	ut_ad(!m_modifications || type != MTR_MEMO_PAGE_X_FIX);
683 
684 	Iterate<Find> iteration(Find(object, type));
685 
686 	if (!m_memo.for_each_block_in_reverse(iteration)) {
687 		memo_slot_release(iteration.functor.m_slot);
688 		return(true);
689 	}
690 
691 	return(false);
692 }
693 
694 /** Release a page latch.
695 @param[in]	ptr	pointer to within a page frame
696 @param[in]	type	object type: MTR_MEMO_PAGE_X_FIX, ... */
697 void
release_page(const void * ptr,mtr_memo_type_t type)698 mtr_t::release_page(const void* ptr, mtr_memo_type_t type)
699 {
700 	ut_ad(is_active());
701 
702 	/* We cannot release a page that has been written to in the
703 	middle of a mini-transaction. */
704 	ut_ad(!m_modifications || type != MTR_MEMO_PAGE_X_FIX);
705 
706 	Iterate<FindPage> iteration(FindPage(ptr, type));
707 
708 	if (!m_memo.for_each_block_in_reverse(iteration)) {
709 		memo_slot_release(iteration.functor.get_slot());
710 		return;
711 	}
712 
713 	/* The page was not found! */
714 	ut_ad(0);
715 }
716 
717 /** Prepare to write the mini-transaction log to the redo log buffer.
718 @return number of bytes to write in finish_write() */
prepare_write()719 inline ulint mtr_t::prepare_write()
720 {
721 	ut_ad(!recv_no_log_write);
722 
723 	if (UNIV_UNLIKELY(m_log_mode != MTR_LOG_ALL)) {
724 		ut_ad(m_log_mode == MTR_LOG_NO_REDO);
725 		ut_ad(m_log.size() == 0);
726 		log_mutex_enter();
727 		m_commit_lsn = log_sys.lsn;
728 		return 0;
729 	}
730 
731 	ulint	len	= m_log.size();
732 	ulint	n_recs	= m_n_log_recs;
733 	ut_ad(len > 0);
734 	ut_ad(n_recs > 0);
735 
736 	if (len > srv_log_buffer_size / 2) {
737 		log_buffer_extend(ulong((len + 1) * 2));
738 	}
739 
740 	ut_ad(m_n_log_recs == n_recs);
741 
742 	fil_space_t*	space = m_user_space;
743 
744 	if (space != NULL && is_predefined_tablespace(space->id)) {
745 		/* Omit MLOG_FILE_NAME for predefined tablespaces. */
746 		space = NULL;
747 	}
748 
749 	log_mutex_enter();
750 
751 	if (fil_names_write_if_was_clean(space, this)) {
752 		/* This mini-transaction was the first one to modify
753 		this tablespace since the latest checkpoint, so
754 		some MLOG_FILE_NAME records were appended to m_log. */
755 		ut_ad(m_n_log_recs > n_recs);
756 		mlog_catenate_ulint(&m_log, MLOG_MULTI_REC_END, MLOG_1BYTE);
757 		len = m_log.size();
758 	} else {
759 		/* This was not the first time of dirtying a
760 		tablespace since the latest checkpoint. */
761 
762 		ut_ad(n_recs == m_n_log_recs);
763 
764 		if (n_recs <= 1) {
765 			ut_ad(n_recs == 1);
766 
767 			/* Flag the single log record as the
768 			only record in this mini-transaction. */
769 			*m_log.front()->begin() |= MLOG_SINGLE_REC_FLAG;
770 		} else {
771 			/* Because this mini-transaction comprises
772 			multiple log records, append MLOG_MULTI_REC_END
773 			at the end. */
774 
775 			mlog_catenate_ulint(&m_log, MLOG_MULTI_REC_END,
776 					    MLOG_1BYTE);
777 			len++;
778 		}
779 	}
780 
781 	/* check and attempt a checkpoint if exceeding capacity */
782 	log_margin_checkpoint_age(len);
783 
784 	return(len);
785 }
786 
787 /** Append the redo log records to the redo log buffer
788 @param[in] len	number of bytes to write
789 @return start_lsn */
finish_write(ulint len)790 inline lsn_t mtr_t::finish_write(ulint len)
791 {
792 	ut_ad(m_log_mode == MTR_LOG_ALL);
793 	ut_ad(log_mutex_own());
794 	ut_ad(m_log.size() == len);
795 	ut_ad(len > 0);
796 
797 	lsn_t start_lsn;
798 
799 	if (m_log.is_small()) {
800 		const mtr_buf_t::block_t* front = m_log.front();
801 		ut_ad(len <= front->used());
802 
803 		m_commit_lsn = log_reserve_and_write_fast(front->begin(), len,
804 							  &start_lsn);
805 
806 		if (m_commit_lsn) {
807 			return start_lsn;
808 		}
809 	}
810 
811 	/* Open the database log for log_write_low */
812 	start_lsn = log_reserve_and_open(len);
813 
814 	mtr_write_log_t	write_log;
815 	m_log.for_each_block(write_log);
816 
817 	m_commit_lsn = log_close();
818 	return start_lsn;
819 }
820 
821 /** Find out whether a block was not X-latched by the mini-transaction */
822 struct FindBlockX
823 {
824   const buf_block_t &block;
825 
FindBlockXFindBlockX826   FindBlockX(const buf_block_t &block): block(block) {}
827 
828   /** @return whether the block was not found x-latched */
operator ()FindBlockX829   bool operator()(const mtr_memo_slot_t *slot) const
830   {
831     return slot->object != &block || slot->type != MTR_MEMO_PAGE_X_FIX;
832   }
833 };
834 
835 #ifdef UNIV_DEBUG
836 /** Assert that the block is not present in the mini-transaction */
837 struct FindNoBlock
838 {
839   const buf_block_t &block;
840 
FindNoBlockFindNoBlock841   FindNoBlock(const buf_block_t &block): block(block) {}
842 
843   /** @return whether the block was not found */
operator ()FindNoBlock844   bool operator()(const mtr_memo_slot_t *slot) const
845   {
846     return slot->object != &block;
847   }
848 };
849 #endif /* UNIV_DEBUG */
850 
have_x_latch(const buf_block_t & block) const851 bool mtr_t::have_x_latch(const buf_block_t &block) const
852 {
853   if (m_memo.for_each_block(CIterate<FindBlockX>(FindBlockX(block))))
854   {
855     ut_ad(m_memo.for_each_block(CIterate<FindNoBlock>(FindNoBlock(block))));
856     ut_ad(!memo_contains_flagged(&block,
857                                  MTR_MEMO_PAGE_S_FIX | MTR_MEMO_PAGE_SX_FIX |
858                                  MTR_MEMO_BUF_FIX | MTR_MEMO_MODIFY));
859     return false;
860   }
861   ut_ad(rw_lock_own(&block.lock, RW_LOCK_X));
862   return true;
863 }
864 
865 #ifdef UNIV_DEBUG
866 /** Check if memo contains the given item.
867 @return	true if contains */
868 bool
memo_contains(const mtr_buf_t * memo,const void * object,ulint type)869 mtr_t::memo_contains(
870 	const mtr_buf_t*	memo,
871 	const void*		object,
872 	ulint			type)
873 {
874 	Iterate<Find> iteration(Find(object, type));
875 	if (memo->for_each_block_in_reverse(iteration)) {
876 		return(false);
877 	}
878 
879 	const rw_lock_t *lock = static_cast<const rw_lock_t*>(object);
880 
881 	switch (type) {
882 	case MTR_MEMO_X_LOCK:
883 		ut_ad(rw_lock_own(lock, RW_LOCK_X));
884 		break;
885 	case MTR_MEMO_SX_LOCK:
886 		ut_ad(rw_lock_own(lock, RW_LOCK_SX));
887 		break;
888 	case MTR_MEMO_S_LOCK:
889 		ut_ad(rw_lock_own(lock, RW_LOCK_S));
890 		break;
891 	}
892 
893 	return(true);
894 }
895 
896 /** Debug check for flags */
897 struct FlaggedCheck {
FlaggedCheckFlaggedCheck898 	FlaggedCheck(const void* ptr, ulint flags)
899 		:
900 		m_ptr(ptr),
901 		m_flags(flags)
902 	{
903 		/* There must be some flags to look for. */
904 		ut_ad(flags);
905 		/* Look for rw-lock-related and page-related flags. */
906 		ut_ad(!(flags & ulint(~(MTR_MEMO_PAGE_S_FIX
907 					| MTR_MEMO_PAGE_X_FIX
908 					| MTR_MEMO_PAGE_SX_FIX
909 					| MTR_MEMO_BUF_FIX
910 					| MTR_MEMO_MODIFY
911 					| MTR_MEMO_X_LOCK
912 					| MTR_MEMO_SX_LOCK
913 					| MTR_MEMO_S_LOCK))));
914 		/* Either some rw-lock-related or page-related flags
915 		must be specified, but not both at the same time. */
916 		ut_ad(!(flags & (MTR_MEMO_PAGE_S_FIX
917 				 | MTR_MEMO_PAGE_X_FIX
918 				 | MTR_MEMO_PAGE_SX_FIX
919 				 | MTR_MEMO_BUF_FIX
920 				 | MTR_MEMO_MODIFY))
921 		      == !!(flags & (MTR_MEMO_X_LOCK
922 				     | MTR_MEMO_SX_LOCK
923 				     | MTR_MEMO_S_LOCK)));
924 	}
925 
926 	/** Visit a memo entry.
927 	@param[in]	slot	memo entry to visit
928 	@retval	false	if m_ptr was found
929 	@retval	true	if the iteration should continue */
operator ()FlaggedCheck930 	bool operator()(const mtr_memo_slot_t* slot) const
931 	{
932 		if (m_ptr != slot->object || !(m_flags & slot->type)) {
933 			return(true);
934 		}
935 
936 		if (ulint flags = m_flags & (MTR_MEMO_PAGE_S_FIX
937 					     | MTR_MEMO_PAGE_SX_FIX
938 					     | MTR_MEMO_PAGE_X_FIX)) {
939 			rw_lock_t* lock = &static_cast<buf_block_t*>(
940 				const_cast<void*>(m_ptr))->lock;
941 			ut_ad(rw_lock_own_flagged(lock, flags));
942 		} else {
943 			rw_lock_t* lock = static_cast<rw_lock_t*>(
944 				const_cast<void*>(m_ptr));
945 			ut_ad(rw_lock_own_flagged(lock, m_flags >> 5));
946 		}
947 
948 		return(false);
949 	}
950 
951 	const void*const	m_ptr;
952 	const ulint		m_flags;
953 };
954 
955 /** Check if memo contains the given item.
956 @param object		object to search
957 @param flags		specify types of object (can be ORred) of
958 			MTR_MEMO_PAGE_S_FIX ... values
959 @return true if contains */
960 bool
memo_contains_flagged(const void * ptr,ulint flags) const961 mtr_t::memo_contains_flagged(const void* ptr, ulint flags) const
962 {
963 	ut_ad(is_active());
964 
965 	return !m_memo.for_each_block_in_reverse(
966 		CIterate<FlaggedCheck>(FlaggedCheck(ptr, flags)));
967 }
968 
969 /** Check if memo contains the given page.
970 @param[in]	ptr	pointer to within buffer frame
971 @param[in]	flags	specify types of object with OR of
972 			MTR_MEMO_PAGE_S_FIX... values
973 @return	the block
974 @retval	NULL	if not found */
975 buf_block_t*
memo_contains_page_flagged(const byte * ptr,ulint flags) const976 mtr_t::memo_contains_page_flagged(
977 	const byte*	ptr,
978 	ulint		flags) const
979 {
980 	Iterate<FindPage> iteration(FindPage(ptr, flags));
981 	return m_memo.for_each_block_in_reverse(iteration)
982 		? NULL : iteration.functor.get_block();
983 }
984 
985 /** Mark the given latched page as modified.
986 @param[in]	ptr	pointer to within buffer frame */
987 void
memo_modify_page(const byte * ptr)988 mtr_t::memo_modify_page(const byte* ptr)
989 {
990 	buf_block_t*	block = memo_contains_page_flagged(
991 		ptr, MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX);
992 	ut_ad(block != NULL);
993 
994 	if (!memo_contains(get_memo(), block, MTR_MEMO_MODIFY)) {
995 		memo_push(block, MTR_MEMO_MODIFY);
996 	}
997 }
998 
999 /** Print info of an mtr handle. */
1000 void
print() const1001 mtr_t::print() const
1002 {
1003 	ib::info() << "Mini-transaction handle: memo size "
1004 		<< m_memo.size() << " bytes log size "
1005 		<< get_log()->size() << " bytes";
1006 }
1007 
1008 #endif /* UNIV_DEBUG */
1009