1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file trx/trx0purge.cc
29 Purge old versions
30 
31 Created 3/26/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include "trx0purge.h"
37 
38 #ifdef UNIV_NONINL
39 #include "trx0purge.ic"
40 #endif
41 
42 #include "fsp0fsp.h"
43 #include "fut0fut.h"
44 #include "mach0data.h"
45 #include "mtr0log.h"
46 #include "os0thread.h"
47 #include "que0que.h"
48 #include "read0read.h"
49 #include "row0purge.h"
50 #include "row0upd.h"
51 #include "srv0mon.h"
52 #include "fsp0sysspace.h"
53 #include "srv0srv.h"
54 #include "srv0start.h"
55 #include "sync0sync.h"
56 #include "trx0rec.h"
57 #include "trx0roll.h"
58 #include "trx0rseg.h"
59 #include "trx0trx.h"
60 
61 /** Maximum allowable purge history length.  <=0 means 'infinite'. */
62 ulong		srv_max_purge_lag = 0;
63 
64 /** Max DML user threads delay in micro-seconds. */
65 ulong		srv_max_purge_lag_delay = 0;
66 
67 /** The global data structure coordinating a purge */
68 trx_purge_t*	purge_sys = NULL;
69 
70 /** A dummy undo record used as a return value when we have a whole undo log
71 which needs no purge */
72 trx_undo_rec_t	trx_purge_dummy_rec;
73 
74 #ifdef UNIV_DEBUG
75 my_bool		srv_purge_view_update_only_debug;
76 bool		trx_commit_disallowed = false;
77 #endif /* UNIV_DEBUG */
78 
79 /** Sentinel value */
80 const TrxUndoRsegs TrxUndoRsegsIterator::NullElement(UINT64_UNDEFINED);
81 
82 /** Constructor */
TrxUndoRsegsIterator(trx_purge_t * purge_sys)83 TrxUndoRsegsIterator::TrxUndoRsegsIterator(trx_purge_t* purge_sys)
84 	:
85 	m_purge_sys(purge_sys),
86 	m_trx_undo_rsegs(NullElement),
87 	m_iter(m_trx_undo_rsegs.end())
88 {
89 }
90 
91 /** Sets the next rseg to purge in m_purge_sys.
92 @return page size of the table for which the log is.
93 NOTE: if rseg is NULL when this function returns this means that
94 there are no rollback segments to purge and then the returned page
95 size object should not be used. */
96 const page_size_t
set_next()97 TrxUndoRsegsIterator::set_next()
98 {
99 	mutex_enter(&m_purge_sys->pq_mutex);
100 
101 	/* Only purge consumes events from the priority queue, user
102 	threads only produce the events. */
103 
104 	/* Check if there are more rsegs to process in the
105 	current element. */
106 	if (m_iter != m_trx_undo_rsegs.end()) {
107 
108 		/* We are still processing rollback segment from
109 		the same transaction and so expected transaction
110 		number shouldn't increase. Undo increment of
111 		expected trx_no done by caller assuming rollback
112 		segments from given transaction are done. */
113 		m_purge_sys->iter.trx_no = (*m_iter)->last_trx_no;
114 
115 	} else if (!m_purge_sys->purge_queue->empty()) {
116 
117 		/* Read the next element from the queue.
118 		Combine elements if they have same transaction number.
119 		This can happen if a transaction shares redo rollback segment
120 		with another transaction that has already added it to purge
121 		queue and former transaction also needs to schedule non-redo
122 		rollback segment for purge. */
123 		m_trx_undo_rsegs = NullElement;
124 
125 		while (!m_purge_sys->purge_queue->empty()) {
126 
127 			if (m_trx_undo_rsegs.get_trx_no() == UINT64_UNDEFINED) {
128 				m_trx_undo_rsegs =
129 					purge_sys->purge_queue->top();
130 			} else if (purge_sys->purge_queue->top().get_trx_no() ==
131 					m_trx_undo_rsegs.get_trx_no()) {
132 				m_trx_undo_rsegs.append(
133 					purge_sys->purge_queue->top());
134 			} else {
135 				break;
136 			}
137 
138 			m_purge_sys->purge_queue->pop();
139 		}
140 
141 		m_iter = m_trx_undo_rsegs.begin();
142 
143 	} else {
144 		/* Queue is empty, reset iterator. */
145 		m_trx_undo_rsegs = NullElement;
146 		m_iter = m_trx_undo_rsegs.end();
147 
148 		mutex_exit(&m_purge_sys->pq_mutex);
149 
150 		m_purge_sys->rseg = NULL;
151 
152 		/* return a dummy object, not going to be used by the caller */
153 		return(univ_page_size);
154 	}
155 
156 	m_purge_sys->rseg = *m_iter++;
157 
158 	mutex_exit(&m_purge_sys->pq_mutex);
159 
160 	ut_a(m_purge_sys->rseg != NULL);
161 
162 	mutex_enter(&m_purge_sys->rseg->mutex);
163 
164 	ut_a(m_purge_sys->rseg->last_page_no != FIL_NULL);
165 	ut_ad(m_purge_sys->rseg->last_trx_no == m_trx_undo_rsegs.get_trx_no());
166 
167 	/* We assume in purge of externally stored fields that
168 	space id is in the range of UNDO tablespace space ids
169 	unless space is system tablespace */
170 	ut_a(srv_is_undo_tablespace(m_purge_sys->rseg->space)
171 	     || is_system_tablespace(
172 			m_purge_sys->rseg->space));
173 
174 	const page_size_t	page_size(m_purge_sys->rseg->page_size);
175 
176 	ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
177 
178 	m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no;
179 	m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset;
180 	m_purge_sys->hdr_page_no = m_purge_sys->rseg->last_page_no;
181 
182 	mutex_exit(&m_purge_sys->rseg->mutex);
183 
184 	return(page_size);
185 }
186 
187 /****************************************************************//**
188 Builds a purge 'query' graph. The actual purge is performed by executing
189 this query graph.
190 @return own: the query graph */
191 static
192 que_t*
trx_purge_graph_build(trx_t * trx,ulint n_purge_threads)193 trx_purge_graph_build(
194 /*==================*/
195 	trx_t*		trx,			/*!< in: transaction */
196 	ulint		n_purge_threads)	/*!< in: number of purge
197 						threads */
198 {
199 	ulint		i;
200 	mem_heap_t*	heap;
201 	que_fork_t*	fork;
202 
203 	heap = mem_heap_create(512);
204 	fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
205 	fork->trx = trx;
206 
207 	for (i = 0; i < n_purge_threads; ++i) {
208 		que_thr_t*	thr;
209 
210 		thr = que_thr_create(fork, heap, NULL);
211 
212 		thr->child = row_purge_node_create(thr, heap);
213 	}
214 
215 	return(fork);
216 }
217 
218 /********************************************************************//**
219 Creates the global purge system control structure and inits the history
220 mutex. */
221 void
trx_purge_sys_create(ulint n_purge_threads,purge_pq_t * purge_queue)222 trx_purge_sys_create(
223 /*=================*/
224 	ulint		n_purge_threads,	/*!< in: number of purge
225 						threads */
226 	purge_pq_t*	purge_queue)		/*!< in, own: UNDO log min
227 						binary heap */
228 {
229 	purge_sys = static_cast<trx_purge_t*>(
230 		ut_zalloc_nokey(sizeof(*purge_sys)));
231 
232 	purge_sys->state = PURGE_STATE_INIT;
233 	purge_sys->event = os_event_create(0);
234 
235 	new (&purge_sys->iter) purge_iter_t;
236 	new (&purge_sys->limit) purge_iter_t;
237 	new (&purge_sys->undo_trunc) undo::Truncate;
238 #ifdef UNIV_DEBUG
239 	new (&purge_sys->done) purge_iter_t;
240 #endif /* UNIV_DEBUG */
241 
242 	/* Take ownership of purge_queue, we are responsible for freeing it. */
243 	purge_sys->purge_queue = purge_queue;
244 
245 	rw_lock_create(trx_purge_latch_key,
246 		       &purge_sys->latch, SYNC_PURGE_LATCH);
247 
248 	mutex_create(LATCH_ID_PURGE_SYS_PQ, &purge_sys->pq_mutex);
249 
250 	ut_a(n_purge_threads > 0);
251 
252 	purge_sys->sess = sess_open();
253 
254 	purge_sys->trx = purge_sys->sess->trx;
255 
256 	ut_a(purge_sys->trx->sess == purge_sys->sess);
257 
258 	/* A purge transaction is not a real transaction, we use a transaction
259 	here only because the query threads code requires it. It is otherwise
260 	quite unnecessary. We should get rid of it eventually. */
261 	purge_sys->trx->id = 0;
262 	purge_sys->trx->start_time = ut_time_monotonic();
263 	purge_sys->trx->state = TRX_STATE_ACTIVE;
264 	purge_sys->trx->op_info = "purge trx";
265 
266 	purge_sys->query = trx_purge_graph_build(
267 		purge_sys->trx, n_purge_threads);
268 
269 	new(&purge_sys->view) ReadView();
270 
271 	trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
272 
273 	purge_sys->view_active = true;
274 
275 	purge_sys->rseg_iter = UT_NEW_NOKEY(TrxUndoRsegsIterator(purge_sys));
276 }
277 
278 /************************************************************************
279 Frees the global purge system control structure. */
280 void
trx_purge_sys_close(void)281 trx_purge_sys_close(void)
282 /*======================*/
283 {
284 	que_graph_free(purge_sys->query);
285 
286 	ut_a(purge_sys->trx->id == 0);
287 	ut_a(purge_sys->sess->trx == purge_sys->trx);
288 
289 	purge_sys->trx->state = TRX_STATE_NOT_STARTED;
290 
291 	sess_close(purge_sys->sess);
292 
293 	purge_sys->sess = NULL;
294 
295 	purge_sys->view.close();
296 	purge_sys->view.~ReadView();
297 
298 	rw_lock_free(&purge_sys->latch);
299 	mutex_free(&purge_sys->pq_mutex);
300 
301 	if (purge_sys->purge_queue != NULL) {
302 		UT_DELETE(purge_sys->purge_queue);
303 		purge_sys->purge_queue = NULL;
304 	}
305 
306 	os_event_destroy(purge_sys->event);
307 
308 	purge_sys->event = NULL;
309 
310 	UT_DELETE(purge_sys->rseg_iter);
311 
312 	ut_free(purge_sys);
313 
314 	purge_sys = NULL;
315 }
316 
317 /*================ UNDO LOG HISTORY LIST =============================*/
318 
319 /********************************************************************//**
320 Adds the update undo log as the first log in the history list. Removes the
321 update undo log segment from the rseg slot if it is too big for reuse. */
322 void
trx_purge_add_update_undo_to_history(trx_t * trx,trx_undo_ptr_t * undo_ptr,page_t * undo_page,bool update_rseg_history_len,ulint n_added_logs,mtr_t * mtr)323 trx_purge_add_update_undo_to_history(
324 /*=================================*/
325 	trx_t*		trx,		/*!< in: transaction */
326 	trx_undo_ptr_t*	undo_ptr,	/*!< in/out: update undo log. */
327 	page_t*		undo_page,	/*!< in: update undo log header page,
328 					x-latched */
329 	bool		update_rseg_history_len,
330 					/*!< in: if true: update rseg history
331 					len else skip updating it. */
332 	ulint		n_added_logs,	/*!< in: number of logs added */
333 	mtr_t*		mtr)		/*!< in: mtr */
334 {
335 	trx_undo_t*	undo;
336 	trx_rseg_t*	rseg;
337 	trx_rsegf_t*	rseg_header;
338 	trx_ulogf_t*	undo_header;
339 
340 	undo = undo_ptr->update_undo;
341 	rseg = undo->rseg;
342 
343 	rseg_header = trx_rsegf_get(
344 		undo->rseg->space, undo->rseg->page_no, undo->rseg->page_size,
345 		mtr);
346 
347 	undo_header = undo_page + undo->hdr_offset;
348 
349 	if (undo->state != TRX_UNDO_CACHED) {
350 		ulint		hist_size;
351 #ifdef UNIV_DEBUG
352 		trx_usegf_t*	seg_header = undo_page + TRX_UNDO_SEG_HDR;
353 #endif /* UNIV_DEBUG */
354 
355 		/* The undo log segment will not be reused */
356 
357 		if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) {
358 			ib::fatal() << "undo->id is " << undo->id;
359 		}
360 
361 		trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
362 
363 		MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
364 
365 		hist_size = mtr_read_ulint(
366 			rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);
367 
368 		ut_ad(undo->size == flst_get_len(
369 			      seg_header + TRX_UNDO_PAGE_LIST));
370 
371 		mlog_write_ulint(
372 			rseg_header + TRX_RSEG_HISTORY_SIZE,
373 			hist_size + undo->size, MLOG_4BYTES, mtr);
374 	}
375 
376 	ut_ad(!trx_commit_disallowed);
377 
378 	/* Add the log as the first in the history list */
379 	flst_add_first(rseg_header + TRX_RSEG_HISTORY,
380 		       undo_header + TRX_UNDO_HISTORY_NODE, mtr);
381 
382 	if (update_rseg_history_len) {
383 		os_atomic_increment_ulint(
384 			&trx_sys->rseg_history_len, n_added_logs);
385 		if (trx_sys->rseg_history_len
386 		    > srv_n_purge_threads * srv_purge_batch_size) {
387 			srv_wake_purge_thread_if_not_active();
388 		}
389 	}
390 
391 	/* Write the trx number to the undo log header */
392 	mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
393 
394 	/* Write information about delete markings to the undo log header */
395 
396 	if (!undo->del_marks) {
397 		mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
398 				 MLOG_2BYTES, mtr);
399 	}
400 
401 	if (rseg->last_page_no == FIL_NULL) {
402 		rseg->last_page_no = undo->hdr_page_no;
403 		rseg->last_offset = undo->hdr_offset;
404 		rseg->last_trx_no = trx->no;
405 		rseg->last_del_marks = undo->del_marks;
406 	}
407 }
408 
409 /** Remove undo log header from the history list.
410 @param[in,out]	rseg_hdr	rollback segment header
411 @param[in]	log_hdr		undo log segment header
412 @param[in,out]	mtr		mini transaction. */
413 static
414 void
trx_purge_remove_log_hdr(trx_rsegf_t * rseg_hdr,trx_ulogf_t * log_hdr,mtr_t * mtr)415 trx_purge_remove_log_hdr(
416 	trx_rsegf_t*	rseg_hdr,
417 	trx_ulogf_t*	log_hdr,
418 	mtr_t*		mtr)
419 {
420 	flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
421 		    log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
422 
423 	os_atomic_decrement_ulint(&trx_sys->rseg_history_len, 1);
424 }
425 
426 /** Frees an undo log segment which is in the history list. Removes the
427 undo log hdr from the history list.
428 @param[in,out]	rseg		rollback segment
429 @param[in]	hdr_addr	file address of log_hdr
430 @param[in]	noredo		skip redo logging. */
431 static
432 void
trx_purge_free_segment(trx_rseg_t * rseg,fil_addr_t hdr_addr,bool noredo)433 trx_purge_free_segment(
434 	trx_rseg_t*	rseg,
435 	fil_addr_t	hdr_addr,
436 	bool		noredo)
437 {
438 	mtr_t		mtr;
439 	trx_rsegf_t*	rseg_hdr;
440 	trx_ulogf_t*	log_hdr;
441 	trx_usegf_t*	seg_hdr;
442 	ulint		seg_size;
443 	ulint		hist_size;
444 	bool		marked		= noredo;
445 
446 	for (;;) {
447 		page_t*	undo_page;
448 
449 		mtr_start(&mtr);
450 		if (noredo) {
451 			mtr.set_log_mode(MTR_LOG_NO_REDO);
452 		}
453 		ut_ad(noredo == trx_sys_is_noredo_rseg_slot(rseg->id));
454 
455 		mutex_enter(&rseg->mutex);
456 
457 		rseg_hdr = trx_rsegf_get(
458 			rseg->space, rseg->page_no, rseg->page_size, &mtr);
459 
460 		undo_page = trx_undo_page_get(
461 			page_id_t(rseg->space, hdr_addr.page), rseg->page_size,
462 			&mtr);
463 
464 		seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
465 		log_hdr = undo_page + hdr_addr.boffset;
466 
467 		/* Mark the last undo log totally purged, so that if the
468 		system crashes, the tail of the undo log will not get accessed
469 		again. The list of pages in the undo log tail gets inconsistent
470 		during the freeing of the segment, and therefore purge should
471 		not try to access them again. */
472 
473 		if (!marked) {
474 			marked = true;
475 			mlog_write_ulint(
476 				log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
477 				MLOG_2BYTES, &mtr);
478 		}
479 
480 		if (fseg_free_step_not_header(
481 			    seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr)) {
482 
483 			break;
484 		}
485 
486 		mutex_exit(&rseg->mutex);
487 
488 		mtr_commit(&mtr);
489 	}
490 
491 	/* The page list may now be inconsistent, but the length field
492 	stored in the list base node tells us how big it was before we
493 	started the freeing. */
494 
495 	seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST);
496 
497 	/* We may free the undo log segment header page; it must be freed
498 	within the same mtr as the undo log header is removed from the
499 	history list: otherwise, in case of a database crash, the segment
500 	could become inaccessible garbage in the file space. */
501 
502 	trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
503 
504 	do {
505 
506 		/* Here we assume that a file segment with just the header
507 		page can be freed in a few steps, so that the buffer pool
508 		is not flooded with bufferfixed pages: see the note in
509 		fsp0fsp.cc. */
510 
511 	} while (!fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr));
512 
513 	hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
514 				   MLOG_4BYTES, &mtr);
515 	ut_ad(hist_size >= seg_size);
516 
517 	mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
518 			 hist_size - seg_size, MLOG_4BYTES, &mtr);
519 
520 	ut_ad(rseg->curr_size >= seg_size);
521 
522 	rseg->curr_size -= seg_size;
523 
524 	mutex_exit(&(rseg->mutex));
525 
526 	mtr_commit(&mtr);
527 }
528 
529 /********************************************************************//**
530 Removes unnecessary history data from a rollback segment. */
531 static
532 void
trx_purge_truncate_rseg_history(trx_rseg_t * rseg,const purge_iter_t * limit)533 trx_purge_truncate_rseg_history(
534 /*============================*/
535 	trx_rseg_t*		rseg,		/*!< in: rollback segment */
536 	const purge_iter_t*	limit)		/*!< in: truncate offset */
537 {
538 	fil_addr_t	hdr_addr;
539 	fil_addr_t	prev_hdr_addr;
540 	trx_rsegf_t*	rseg_hdr;
541 	page_t*		undo_page;
542 	trx_ulogf_t*	log_hdr;
543 	trx_usegf_t*	seg_hdr;
544 	mtr_t		mtr;
545 	trx_id_t	undo_trx_no;
546 	const bool	noredo		= trx_sys_is_noredo_rseg_slot(
547 		rseg->id);
548 
549 	mtr_start(&mtr);
550 	if (noredo) {
551 		mtr.set_log_mode(MTR_LOG_NO_REDO);
552 	}
553 	mutex_enter(&(rseg->mutex));
554 
555 	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
556 				 rseg->page_size, &mtr);
557 
558 	hdr_addr = trx_purge_get_log_from_hist(
559 		flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
560 loop:
561 	if (hdr_addr.page == FIL_NULL) {
562 
563 		mutex_exit(&(rseg->mutex));
564 
565 		mtr_commit(&mtr);
566 
567 		return;
568 	}
569 
570 	undo_page = trx_undo_page_get(page_id_t(rseg->space, hdr_addr.page),
571 				      rseg->page_size, &mtr);
572 
573 	log_hdr = undo_page + hdr_addr.boffset;
574 
575 	undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
576 
577 	if (undo_trx_no >= limit->trx_no) {
578 
579 		/* limit space_id should match the rollback segment
580 		space id to avoid freeing of the page belongs to
581 		different rollback segment for the same trx_no. */
582 		if (undo_trx_no == limit->trx_no
583 		    && rseg->space == limit->undo_rseg_space) {
584 
585 			trx_undo_truncate_start(
586 				rseg, hdr_addr.page,
587 				hdr_addr.boffset, limit->undo_no);
588 		}
589 
590 		mutex_exit(&(rseg->mutex));
591 		mtr_commit(&mtr);
592 
593 		return;
594 	}
595 
596 	prev_hdr_addr = trx_purge_get_log_from_hist(
597 		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
598 
599 	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
600 
601 	if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
602 	    && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
603 
604 		/* We can free the whole log segment */
605 
606 		mutex_exit(&(rseg->mutex));
607 		mtr_commit(&mtr);
608 
609 		/* calls the trx_purge_remove_log_hdr()
610 		inside trx_purge_free_segment(). */
611 		trx_purge_free_segment(rseg, hdr_addr, noredo);
612 
613 	} else {
614 		/* Remove the log hdr from the rseg history. */
615 		trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
616 
617 		mutex_exit(&(rseg->mutex));
618 		mtr_commit(&mtr);
619 	}
620 
621 	mtr_start(&mtr);
622 	if (noredo) {
623 		mtr.set_log_mode(MTR_LOG_NO_REDO);
624 	}
625 	mutex_enter(&(rseg->mutex));
626 
627 	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
628 				 rseg->page_size, &mtr);
629 
630 	hdr_addr = prev_hdr_addr;
631 
632 	goto loop;
633 }
634 
635 /** UNDO log truncate logger. Needed to track state of truncate during crash.
636 An auxiliary redo log file undo_<space_id>_trunc.log will created while the
637 truncate of the UNDO is in progress. This file is required during recovery
638 to complete the truncate. */
639 
640 namespace undo {
641 
642 	/** Populate log file name based on space_id
643 	@param[in]	space_id	id of the undo tablespace.
644 	@return DB_SUCCESS or error code */
populate_log_file_name(ulint space_id,char * & log_file_name)645 	dberr_t populate_log_file_name(
646 		ulint	space_id,
647 		char*&	log_file_name)
648 	{
649 		ulint log_file_name_sz =
650 			strlen(srv_log_group_home_dir) + 22 + 1 /* NUL */
651 			+ strlen(undo::s_log_prefix)
652 			+ strlen(undo::s_log_ext);
653 
654 		log_file_name = new (std::nothrow) char[log_file_name_sz];
655 		if (log_file_name == 0) {
656 			return(DB_OUT_OF_MEMORY);
657 		}
658 
659 		memset(log_file_name, 0, log_file_name_sz);
660 
661 		strcpy(log_file_name, srv_log_group_home_dir);
662 		ulint	log_file_name_len = strlen(log_file_name);
663 
664 		if (log_file_name[log_file_name_len - 1]
665 				!= OS_PATH_SEPARATOR) {
666 
667 			log_file_name[log_file_name_len]
668 				= OS_PATH_SEPARATOR;
669 			log_file_name_len = strlen(log_file_name);
670 		}
671 
672 		ut_snprintf(log_file_name + log_file_name_len,
673 			    log_file_name_sz - log_file_name_len,
674 			    "%s%lu_%s", undo::s_log_prefix,
675 			    (ulong) space_id, s_log_ext);
676 
677 		return(DB_SUCCESS);
678 	}
679 
680 	/** Create the truncate log file.
681 	@param[in]	space_id	id of the undo tablespace to truncate.
682 	@return DB_SUCCESS or error code. */
init(ulint space_id)683 	dberr_t init(ulint space_id)
684 	{
685 		dberr_t		err;
686 		char*		log_file_name;
687 
688 		/* Step-1: Create the log file name using the pre-decided
689 		prefix/suffix and table id of undo tablepsace to truncate. */
690 		err = populate_log_file_name(space_id, log_file_name);
691 		if (err != DB_SUCCESS) {
692 			return(err);
693 		}
694 
695 		/* Step-2: Create the log file, open it and write 0 to
696 		indicate init phase. */
697 		bool            ret;
698 		pfs_os_file_t	handle = os_file_create(
699 			innodb_log_file_key, log_file_name, OS_FILE_CREATE,
700 			OS_FILE_NORMAL, OS_LOG_FILE, srv_read_only_mode, &ret);
701 		if (!ret) {
702 			delete[] log_file_name;
703 			return(DB_IO_ERROR);
704 		}
705 
706 		ulint	sz = UNIV_PAGE_SIZE;
707 		void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
708 		if (buf == NULL) {
709 			os_file_close(handle);
710 			delete[] log_file_name;
711 			return(DB_OUT_OF_MEMORY);
712 		}
713 
714 		byte*	log_buf = static_cast<byte*>(
715 			ut_align(buf, UNIV_PAGE_SIZE));
716 
717 		IORequest	request(IORequest::WRITE);
718 
719 		request.disable_compression();
720 
721 		err = os_file_write(
722 			request, log_file_name, handle, log_buf, 0, sz);
723 
724 		os_file_flush(handle);
725 		os_file_close(handle);
726 
727 		ut_free(buf);
728 		delete[] log_file_name;
729 
730 		return(err);
731 	}
732 
733 	/** Mark completion of undo truncate action by writing magic number to
734 	the log file and then removing it from the disk.
735 	If we are going to remove it from disk then why write magic number ?
736 	This is to safeguard from unlink (file-system) anomalies that will keep
737 	the link to the file even after unlink action is successfull and
738 	ref-count = 0.
739 	@param[in]	space_id	id of the undo tablespace to truncate.*/
done(ulint space_id)740 	void done(
741 		ulint	space_id)
742 	{
743 		dberr_t		err;
744 		char*		log_file_name;
745 
746 		/* Step-1: Create the log file name using the pre-decided
747 		prefix/suffix and table id of undo tablepsace to truncate. */
748 		err = populate_log_file_name(space_id, log_file_name);
749 		if (err != DB_SUCCESS) {
750 			return;
751 		}
752 
753 		/* Step-2: Open log file and write magic number to
754 		indicate done phase. */
755 		bool    ret;
756 		pfs_os_file_t	handle =
757 			os_file_create_simple_no_error_handling(
758 				innodb_log_file_key, log_file_name,
759 				OS_FILE_OPEN, OS_FILE_READ_WRITE,
760 				srv_read_only_mode, &ret);
761 
762 		if (!ret) {
763 			os_file_delete(innodb_log_file_key, log_file_name);
764 			delete[] log_file_name;
765 			return;
766 		}
767 
768 		ulint	sz = UNIV_PAGE_SIZE;
769 		void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
770 		if (buf == NULL) {
771 			os_file_close(handle);
772 			os_file_delete(innodb_log_file_key, log_file_name);
773 			delete[] log_file_name;
774 			return;
775 		}
776 
777 		byte*	log_buf = static_cast<byte*>(
778 			ut_align(buf, UNIV_PAGE_SIZE));
779 
780 		mach_write_to_4(log_buf, undo::s_magic);
781 
782 		IORequest	request(IORequest::WRITE);
783 
784 		request.disable_compression();
785 
786 		err = os_file_write(
787 			request, log_file_name, handle, log_buf, 0, sz);
788 
789 		ut_ad(err == DB_SUCCESS);
790 
791 		os_file_flush(handle);
792 		os_file_close(handle);
793 
794 		ut_free(buf);
795 		os_file_delete(innodb_log_file_key, log_file_name);
796 		delete[] log_file_name;
797 	}
798 
799 	/** Check if TRUNCATE_DDL_LOG file exist.
800 	@param[in]	space_id	id of the undo tablespace.
801 	@return true if exist else false. */
is_log_present(ulint space_id)802 	bool is_log_present(
803 		ulint	space_id)
804 	{
805 		dberr_t		err;
806 		char*		log_file_name;
807 
808 		/* Step-1: Populate log file name. */
809 		err = populate_log_file_name(space_id, log_file_name);
810 		if (err != DB_SUCCESS) {
811 			return(false);
812 		}
813 
814 		/* Step-2: Check for existence of the file. */
815 		bool		exist;
816 		os_file_type_t	type;
817 		os_file_status(log_file_name, &exist, &type);
818 
819 		/* Step-3: If file exists, check it for presence of magic
820 		number.  If found, then delete the file and report file
821 		doesn't exist as presence of magic number suggest that
822 		truncate action was complete. */
823 
824 		if (exist) {
825 			bool    ret;
826 			pfs_os_file_t	handle =
827 				os_file_create_simple_no_error_handling(
828 					innodb_log_file_key, log_file_name,
829 					OS_FILE_OPEN, OS_FILE_READ_WRITE,
830 					srv_read_only_mode, &ret);
831 			if (!ret) {
832 				os_file_delete(innodb_log_file_key,
833 					       log_file_name);
834 				delete[] log_file_name;
835 				return(false);
836 			}
837 
838 			ulint	sz = UNIV_PAGE_SIZE;
839 			void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
840 			if (buf == NULL) {
841 				os_file_close(handle);
842 				os_file_delete(innodb_log_file_key,
843 					       log_file_name);
844 				delete[] log_file_name;
845 				return(false);
846 			}
847 
848 			byte*	log_buf = static_cast<byte*>(
849 				ut_align(buf, UNIV_PAGE_SIZE));
850 
851 			IORequest	request(IORequest::READ);
852 
853 			request.disable_compression();
854 
855 			dberr_t	err;
856 
857 			err = os_file_read(request, handle, log_buf, 0, sz);
858 
859 			os_file_close(handle);
860 
861 			if (err != DB_SUCCESS) {
862 
863 				ib::info()
864 					<< "Unable to read '"
865 					<< log_file_name << "' : "
866 					<< ut_strerr(err);
867 
868 				os_file_delete(
869 					innodb_log_file_key, log_file_name);
870 
871 				ut_free(buf);
872 
873 				delete[] log_file_name;
874 
875 				return(false);
876 			}
877 
878 			ulint	magic_no = mach_read_from_4(log_buf);
879 
880 			ut_free(buf);
881 
882 			if (magic_no == undo::s_magic) {
883 				/* Found magic number. */
884 				os_file_delete(innodb_log_file_key,
885 					       log_file_name);
886 				delete[] log_file_name;
887 				return(false);
888 			}
889 		}
890 
891 		delete[] log_file_name;
892 
893 		return(exist);
894 	}
895 };
896 
897 /** Iterate over all the UNDO tablespaces and check if any of the UNDO
898 tablespace qualifies for TRUNCATE (size > threshold).
899 @param[in,out]	undo_trunc	undo truncate tracker */
900 static
901 void
trx_purge_mark_undo_for_truncate(undo::Truncate * undo_trunc)902 trx_purge_mark_undo_for_truncate(
903 	undo::Truncate*	undo_trunc)
904 {
905 	/* Step-1: If UNDO Tablespace
906 		- already marked for truncate (OR)
907 		- truncate disabled
908 	return immediately else search for qualifying tablespace. */
909 	if (undo_trunc->is_marked() || !srv_undo_log_truncate) {
910 		return;
911 	}
912 
913 	/* Step-2: Validation/Qualification checks
914 	a. At-least 2 UNDO tablespaces so even if one UNDO tablespace
915 	   is being truncated server can continue to operate.
916 	b. At-least 2 UNDO redo rseg/undo logs (besides the default rseg-0)
917 	b. At-least 1 UNDO tablespace size > threshold. */
918 	if (srv_undo_tablespaces_active < 2
919 	    || (srv_rollback_segments < (1 + srv_tmp_undo_logs + 2))) {
920 		return;
921 	}
922 
923 	/* Avoid bias selection and so start the scan from immediate next
924 	of last selected UNDO tablespace for truncate. */
925 	ulint space_id = undo_trunc->get_scan_start();
926 
927 	for (ulint i = 1; i <= srv_undo_tablespaces_active; i++) {
928 
929 		ut_ad(srv_undo_space_id_start != 0);
930 
931 		if (fil_space_get_size(space_id)
932 		    > (srv_max_undo_log_size / srv_page_size)) {
933 			/* Tablespace qualifies for truncate. */
934 			undo_trunc->mark(space_id);
935 			undo::Truncate::add_space_to_trunc_list(space_id);
936 			break;
937 		}
938 
939 		space_id++;
940 
941 		if (space_id >= (srv_undo_space_id_start
942 				 + srv_undo_tablespaces_active)) {
943 			/* Note: UNDO tablespace ids starts from 1. */
944 			space_id = srv_undo_space_id_start;
945 		}
946 
947 		if (undo_trunc->is_marked()) {
948 			break;
949 		}
950 	}
951 
952 	undo_trunc->set_scan_start(space_id);
953 
954 	/* Couldn't make any selection. */
955 	if (!undo_trunc->is_marked()) {
956 		return;
957 	}
958 
959 #ifdef UNIV_DEBUG
960 	ib::info() << "UNDO tablespace with space identifier "
961 		<< undo_trunc->get_marked_space_id() << " marked for truncate";
962 #endif /* UNIV_DEBUG */
963 
964 	/* Step-3: Iterate over all the rsegs of selected UNDO tablespace
965 	and mark them temporarily unavailable for allocation.*/
966 	for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
967 		trx_rseg_t*	rseg = trx_sys->rseg_array[i];
968 
969 		if (rseg != NULL && !trx_sys_is_noredo_rseg_slot(rseg->id)) {
970 			if (rseg->space
971 				== undo_trunc->get_marked_space_id()) {
972 
973 				/* Once set this rseg will not be allocated
974 				to new booting transaction but we will wait
975 				for existing active transaction to finish. */
976 				rseg->skip_allocation = true;
977 				undo_trunc->add_rseg_to_trunc(rseg);
978 			}
979 		}
980 	}
981 }
982 
983 undo::undo_spaces_t	undo::Truncate::s_spaces_to_truncate;
984 
985 /** Cleanse purge queue to remove the rseg that reside in undo-tablespace
986 marked for truncate.
987 @param[in,out]	undo_trunc	undo truncate tracker */
988 static
989 void
trx_purge_cleanse_purge_queue(undo::Truncate * undo_trunc)990 trx_purge_cleanse_purge_queue(
991 	undo::Truncate*	undo_trunc)
992 {
993 	mutex_enter(&purge_sys->pq_mutex);
994 	typedef	std::vector<TrxUndoRsegs>	purge_elem_list_t;
995 	purge_elem_list_t			purge_elem_list;
996 
997 	/* Remove rseg instances that are in the purge queue before we start
998 	truncate of corresponding UNDO truncate. */
999 	while (!purge_sys->purge_queue->empty()) {
1000 		purge_elem_list.push_back(purge_sys->purge_queue->top());
1001 		purge_sys->purge_queue->pop();
1002 	}
1003 	ut_ad(purge_sys->purge_queue->empty());
1004 
1005 	for (purge_elem_list_t::iterator it = purge_elem_list.begin();
1006 	     it != purge_elem_list.end();
1007 	     ++it) {
1008 
1009 		for (TrxUndoRsegs::iterator it2 = it->begin();
1010 		     it2 != it->end();
1011 		     ++it2) {
1012 
1013 			if ((*it2)->space
1014 				== undo_trunc->get_marked_space_id()) {
1015 				it->erase(it2);
1016 				break;
1017 			}
1018 		}
1019 
1020 		const ulint	size = it->size();
1021 		if (size != 0) {
1022 			/* size != 0 suggest that there exist other rsegs that
1023 			needs processing so add this element to purge queue.
1024 			Note: Other rseg could be non-redo rsegs. */
1025 			purge_sys->purge_queue->push(*it);
1026 		}
1027 	}
1028 	mutex_exit(&purge_sys->pq_mutex);
1029 }
1030 
1031 /** Iterate over selected UNDO tablespace and check if all the rsegs
1032 that resides in the tablespace are free.
1033 @param[in]	limit		truncate_limit
1034 @param[in,out]	undo_trunc	undo truncate tracker */
1035 static
1036 void
trx_purge_initiate_truncate(purge_iter_t * limit,undo::Truncate * undo_trunc)1037 trx_purge_initiate_truncate(
1038 	purge_iter_t*	limit,
1039 	undo::Truncate*	undo_trunc)
1040 {
1041 	/* Step-1: Early check to findout if any of the the UNDO tablespace
1042 	is marked for truncate. */
1043 	if (!undo_trunc->is_marked()) {
1044 		/* No tablespace marked for truncate yet. */
1045 		return;
1046 	}
1047 
1048 	/* Step-2: Scan over each rseg and ensure that it doesn't hold any
1049 	active undo records. */
1050 	bool all_free = true;
1051 
1052 	for (ulint i = 0; i < undo_trunc->rsegs_size() && all_free; ++i) {
1053 
1054 		trx_rseg_t*	rseg = undo_trunc->get_ith_rseg(i);
1055 
1056 		mutex_enter(&rseg->mutex);
1057 
1058 		if (rseg->trx_ref_count > 0) {
1059 			/* This rseg is still being held by an active
1060 			transaction. */
1061 			all_free = false;
1062 			mutex_exit(&rseg->mutex);
1063 			continue;
1064 		}
1065 
1066 		ut_ad(rseg->trx_ref_count == 0);
1067 		ut_ad(rseg->skip_allocation);
1068 
1069 		ulint	size_of_rsegs = rseg->curr_size;
1070 
1071 		if (size_of_rsegs == 1) {
1072 			mutex_exit(&rseg->mutex);
1073 			continue;
1074 		} else {
1075 
1076 			/* There could be cached undo segment. Check if records
1077 			in these segments can be purged. Normal purge history
1078 			will not touch these cached segment. */
1079 			ulint		cached_undo_size = 0;
1080 
1081 			for (trx_undo_t* undo =
1082 				UT_LIST_GET_FIRST(rseg->update_undo_cached);
1083 			     undo != NULL && all_free;
1084 			     undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1085 
1086 				if (limit->trx_no < undo->trx_id) {
1087 					all_free = false;
1088 				} else {
1089 					cached_undo_size += undo->size;
1090 				}
1091 			}
1092 
1093 			for (trx_undo_t* undo =
1094 				UT_LIST_GET_FIRST(rseg->insert_undo_cached);
1095 			     undo != NULL && all_free;
1096 			     undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1097 
1098 				if (limit->trx_no < undo->trx_id) {
1099 					all_free = false;
1100 				} else {
1101 					cached_undo_size += undo->size;
1102 				}
1103 			}
1104 
1105 			ut_ad(size_of_rsegs >= (cached_undo_size + 1));
1106 
1107 			if (size_of_rsegs > (cached_undo_size + 1)) {
1108 				/* There are pages besides cached pages that
1109 				still hold active data. */
1110 				all_free = false;
1111 			}
1112 		}
1113 
1114 		mutex_exit(&rseg->mutex);
1115 	}
1116 
1117 	if (!all_free) {
1118 		/* rseg still holds active data.*/
1119 		return;
1120 	}
1121 
1122 
1123 	/* Step-3: Start the actual truncate.
1124 	a. log-checkpoint
1125 	b. Write the DDL log to protect truncate action from CRASH
1126 	c. Remove rseg instance if added to purge queue before we
1127 	   initiate truncate.
1128 	d. Execute actual truncate
1129 	e. Remove the DDL log. */
1130 	DBUG_EXECUTE_IF("ib_undo_trunc_before_checkpoint",
1131 			ib::info() << "ib_undo_trunc_before_checkpoint";
1132 			DBUG_SUICIDE(););
1133 
1134 	/* After truncate if server crashes then redo logging done for this
1135 	undo tablespace might not stand valid as tablespace has been
1136 	truncated. */
1137 	log_make_checkpoint_at(LSN_MAX, TRUE);
1138 
1139 	ib::info() << "Truncating UNDO tablespace with space identifier "
1140 		<< undo_trunc->get_marked_space_id();
1141 
1142 	DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_start",
1143 			ib::info() << "ib_undo_trunc_before_ddl_log_start";
1144 			DBUG_SUICIDE(););
1145 
1146 #ifdef UNIV_DEBUG
1147 	dberr_t	err =
1148 #endif /* UNIV_DEBUG */
1149 		undo_trunc->start_logging(
1150 			undo_trunc->get_marked_space_id());
1151 	ut_ad(err == DB_SUCCESS);
1152 
1153 	DBUG_EXECUTE_IF("ib_undo_trunc_before_truncate",
1154 			ib::info() << "ib_undo_trunc_before_truncate";
1155 			DBUG_SUICIDE(););
1156 
1157 	trx_purge_cleanse_purge_queue(undo_trunc);
1158 
1159 	bool	success = trx_undo_truncate_tablespace(undo_trunc);
1160 	if (!success) {
1161 		/* Note: In case of error we don't enable the rsegs
1162 		and neither unmark the tablespace so the tablespace
1163 		continue to remain inactive. */
1164 		ib::error() << "Failed to truncate UNDO tablespace with"
1165 			" space identifier "
1166 			<< undo_trunc->get_marked_space_id();
1167 		return;
1168 	}
1169 
1170 	if (purge_sys->rseg != NULL
1171 	    && purge_sys->rseg->last_page_no == FIL_NULL) {
1172 		/* If purge_sys->rseg is pointing to rseg that was recently
1173 		truncated then move to next rseg element.
1174 		Note: Ideally purge_sys->rseg should be NULL because purge
1175 		should complete processing of all the records but there is
1176 		purge_batch_size that can force the purge loop to exit before
1177 		all the records are purged and in this case purge_sys->rseg
1178 		could point to a valid rseg waiting for next purge cycle. */
1179 		purge_sys->next_stored = FALSE;
1180 		purge_sys->rseg = NULL;
1181 	}
1182 
1183 	DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_end",
1184 			ib::info() << "ib_undo_trunc_before_ddl_log_end";
1185 			DBUG_SUICIDE(););
1186 
1187 	log_make_checkpoint_at(LSN_MAX, TRUE);
1188 
1189 	undo_trunc->done_logging(undo_trunc->get_marked_space_id());
1190 
1191 	/* Completed truncate. Now it is safe to re-use the tablespace. */
1192 	for (ulint i = 0; i < undo_trunc->rsegs_size(); ++i) {
1193 		trx_rseg_t*	rseg = undo_trunc->get_ith_rseg(i);
1194 		rseg->skip_allocation = false;
1195 	}
1196 
1197 	ib::info() << "Completed truncate of UNDO tablespace with space"
1198 		" identifier " << undo_trunc->get_marked_space_id();
1199 
1200 	undo_trunc->reset();
1201 	undo::Truncate::clear_trunc_list();
1202 
1203 	DBUG_EXECUTE_IF("ib_undo_trunc_trunc_done",
1204 			ib::info() << "ib_undo_trunc_trunc_done";
1205 			DBUG_SUICIDE(););
1206 }
1207 
1208 /********************************************************************//**
1209 Removes unnecessary history data from rollback segments. NOTE that when this
1210 function is called, the caller must not have any latches on undo log pages! */
1211 static
1212 void
trx_purge_truncate_history(purge_iter_t * limit,const ReadView * view)1213 trx_purge_truncate_history(
1214 /*========================*/
1215 	purge_iter_t*		limit,		/*!< in: truncate limit */
1216 	const ReadView*		view)		/*!< in: purge view */
1217 {
1218 	ulint		i;
1219 
1220 	/* We play safe and set the truncate limit at most to the purge view
1221 	low_limit number, though this is not necessary */
1222 
1223 	if (limit->trx_no >= view->low_limit_no()) {
1224 		limit->trx_no = view->low_limit_no();
1225 		limit->undo_no = 0;
1226 		limit->undo_rseg_space = ULINT_UNDEFINED;
1227 	}
1228 
1229 	ut_ad(limit->trx_no <= purge_sys->view.low_limit_no());
1230 
1231 	for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1232 		trx_rseg_t*	rseg = trx_sys->rseg_array[i];
1233 
1234 		if (rseg != NULL) {
1235 			ut_a(rseg->id == i);
1236 			trx_purge_truncate_rseg_history(rseg, limit);
1237 		}
1238 	}
1239 
1240 	for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1241 		trx_rseg_t*	rseg = trx_sys->pending_purge_rseg_array[i];
1242 
1243 		if (rseg != NULL) {
1244 			ut_a(rseg->id == i);
1245 			trx_purge_truncate_rseg_history(rseg, limit);
1246 		}
1247 	}
1248 
1249 	/* UNDO tablespace truncate. We will try to truncate as much as we
1250 	can (greedy approach). This will ensure when the server is idle we
1251 	try and truncate all the UNDO tablespaces. */
1252 	ulint	nchances = srv_undo_tablespaces_active;
1253 	for (i = 0; i < nchances; i++) {
1254 		trx_purge_mark_undo_for_truncate(&purge_sys->undo_trunc);
1255 		trx_purge_initiate_truncate(limit, &purge_sys->undo_trunc);
1256 	}
1257 }
1258 
1259 /***********************************************************************//**
1260 Updates the last not yet purged history log info in rseg when we have purged
1261 a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
1262 static
1263 void
trx_purge_rseg_get_next_history_log(trx_rseg_t * rseg,ulint * n_pages_handled)1264 trx_purge_rseg_get_next_history_log(
1265 /*================================*/
1266 	trx_rseg_t*	rseg,		/*!< in: rollback segment */
1267 	ulint*		n_pages_handled)/*!< in/out: number of UNDO pages
1268 					handled */
1269 {
1270 	page_t*		undo_page;
1271 	trx_ulogf_t*	log_hdr;
1272 	fil_addr_t	prev_log_addr;
1273 	trx_id_t	trx_no;
1274 	ibool		del_marks;
1275 	mtr_t		mtr;
1276 
1277 	mutex_enter(&(rseg->mutex));
1278 
1279 	ut_a(rseg->last_page_no != FIL_NULL);
1280 
1281 	purge_sys->iter.trx_no = rseg->last_trx_no + 1;
1282 	purge_sys->iter.undo_no = 0;
1283 	purge_sys->iter.undo_rseg_space = ULINT_UNDEFINED;
1284 	purge_sys->next_stored = FALSE;
1285 
1286 	mtr_start(&mtr);
1287 
1288 	undo_page = trx_undo_page_get_s_latched(
1289 		page_id_t(rseg->space, rseg->last_page_no),
1290 		rseg->page_size, &mtr);
1291 
1292 	log_hdr = undo_page + rseg->last_offset;
1293 
1294 	/* Increase the purge page count by one for every handled log */
1295 
1296 	(*n_pages_handled)++;
1297 
1298 	prev_log_addr = trx_purge_get_log_from_hist(
1299 		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
1300 
1301 	if (prev_log_addr.page == FIL_NULL) {
1302 		/* No logs left in the history list */
1303 
1304 		rseg->last_page_no = FIL_NULL;
1305 
1306 		mutex_exit(&(rseg->mutex));
1307 		mtr_commit(&mtr);
1308 
1309 #ifdef UNIV_DEBUG
1310 		trx_sys_mutex_enter();
1311 
1312 		/* Add debug code to track history list corruption reported
1313 		on the MySQL mailing list on Nov 9, 2004. The fut0lst.cc
1314 		file-based list was corrupt. The prev node pointer was
1315 		FIL_NULL, even though the list length was over 8 million nodes!
1316 		We assume that purge truncates the history list in large
1317 		size pieces, and if we here reach the head of the list, the
1318 		list cannot be longer than 2000 000 undo logs now. */
1319 
1320 		if (trx_sys->rseg_history_len > 2000000) {
1321 			ib::warn() << "Purge reached the head of the history"
1322 				" list, but its length is still reported as "
1323 				<< trx_sys->rseg_history_len << " which is"
1324 				" unusually high.";
1325 			ib::info() << "This can happen for multiple reasons";
1326 			ib::info() << "1. A long running transaction is"
1327 				" withholding purging of undo logs or a read"
1328 				" view is open. Please try to commit the long"
1329 				" running transaction.";
1330 			ib::info() << "2. Try increasing the number of purge"
1331 				" threads to expedite purging of undo logs.";
1332 		}
1333 
1334 		trx_sys_mutex_exit();
1335 #endif
1336 		return;
1337 	}
1338 
1339 	mutex_exit(&rseg->mutex);
1340 
1341 	mtr_commit(&mtr);
1342 
1343 	/* Read the trx number and del marks from the previous log header */
1344 	mtr_start(&mtr);
1345 
1346 	log_hdr = trx_undo_page_get_s_latched(page_id_t(rseg->space,
1347 							prev_log_addr.page),
1348 					      rseg->page_size, &mtr)
1349 		+ prev_log_addr.boffset;
1350 
1351 	trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
1352 
1353 	del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);
1354 
1355 	mtr_commit(&mtr);
1356 
1357 	mutex_enter(&(rseg->mutex));
1358 
1359 	rseg->last_page_no = prev_log_addr.page;
1360 	rseg->last_offset = prev_log_addr.boffset;
1361 	rseg->last_trx_no = trx_no;
1362 	rseg->last_del_marks = del_marks;
1363 
1364 	TrxUndoRsegs elem(rseg->last_trx_no);
1365 	elem.push_back(rseg);
1366 
1367 	/* Purge can also produce events, however these are already ordered
1368 	in the rollback segment and any user generated event will be greater
1369 	than the events that Purge produces. ie. Purge can never produce
1370 	events from an empty rollback segment. */
1371 
1372 	mutex_enter(&purge_sys->pq_mutex);
1373 
1374 	purge_sys->purge_queue->push(elem);
1375 
1376 	mutex_exit(&purge_sys->pq_mutex);
1377 
1378 	mutex_exit(&rseg->mutex);
1379 }
1380 
1381 /** Position the purge sys "iterator" on the undo record to use for purging.
1382 @param[in,out]	purge_sys	purge instance
1383 @param[in]	page_size	page size */
1384 static
1385 void
trx_purge_read_undo_rec(trx_purge_t * purge_sys,const page_size_t & page_size)1386 trx_purge_read_undo_rec(
1387 	trx_purge_t*		purge_sys,
1388 	const page_size_t&	page_size)
1389 {
1390 	ulint		offset;
1391 	ulint		page_no;
1392 	ib_uint64_t	undo_no;
1393 	ulint		undo_rseg_space;
1394 
1395 	purge_sys->hdr_offset = purge_sys->rseg->last_offset;
1396 	page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
1397 
1398 	if (purge_sys->rseg->last_del_marks) {
1399 		mtr_t		mtr;
1400 		trx_undo_rec_t*	undo_rec = NULL;
1401 
1402 		mtr_start(&mtr);
1403 
1404 		undo_rec = trx_undo_get_first_rec(
1405 			purge_sys->rseg->space,
1406 			page_size,
1407 			purge_sys->hdr_page_no,
1408 			purge_sys->hdr_offset, RW_S_LATCH, &mtr);
1409 
1410 		if (undo_rec != NULL) {
1411 			offset = page_offset(undo_rec);
1412 			undo_no = trx_undo_rec_get_undo_no(undo_rec);
1413 			undo_rseg_space = purge_sys->rseg->space;
1414 			page_no = page_get_page_no(page_align(undo_rec));
1415 		} else {
1416 			offset = 0;
1417 			undo_no = 0;
1418 			undo_rseg_space = ULINT_UNDEFINED;
1419 		}
1420 
1421 		mtr_commit(&mtr);
1422 	} else {
1423 		offset = 0;
1424 		undo_no = 0;
1425 		undo_rseg_space = ULINT_UNDEFINED;
1426 	}
1427 
1428 	purge_sys->offset = offset;
1429 	purge_sys->page_no = page_no;
1430 	purge_sys->iter.undo_no = undo_no;
1431 	purge_sys->iter.undo_rseg_space = undo_rseg_space;
1432 
1433 	purge_sys->next_stored = TRUE;
1434 }
1435 
1436 /***********************************************************************//**
1437 Chooses the next undo log to purge and updates the info in purge_sys. This
1438 function is used to initialize purge_sys when the next record to purge is
1439 not known, and also to update the purge system info on the next record when
1440 purge has handled the whole undo log for a transaction. */
1441 static
1442 void
trx_purge_choose_next_log(void)1443 trx_purge_choose_next_log(void)
1444 /*===========================*/
1445 {
1446 	ut_ad(purge_sys->next_stored == FALSE);
1447 
1448 	const page_size_t&	page_size = purge_sys->rseg_iter->set_next();
1449 
1450 	if (purge_sys->rseg != NULL) {
1451 		trx_purge_read_undo_rec(purge_sys, page_size);
1452 	} else {
1453 		/* There is nothing to do yet. */
1454 		os_thread_yield();
1455 	}
1456 }
1457 
1458 /***********************************************************************//**
1459 Gets the next record to purge and updates the info in the purge system.
1460 @return copy of an undo log record or pointer to the dummy undo log record */
1461 static
1462 trx_undo_rec_t*
trx_purge_get_next_rec(ulint * n_pages_handled,mem_heap_t * heap)1463 trx_purge_get_next_rec(
1464 /*===================*/
1465 	ulint*		n_pages_handled,/*!< in/out: number of UNDO pages
1466 					handled */
1467 	mem_heap_t*	heap)		/*!< in: memory heap where copied */
1468 {
1469 	trx_undo_rec_t*	rec;
1470 	trx_undo_rec_t*	rec_copy;
1471 	trx_undo_rec_t*	rec2;
1472 	page_t*		undo_page;
1473 	page_t*		page;
1474 	ulint		offset;
1475 	ulint		page_no;
1476 	ulint		space;
1477 	mtr_t		mtr;
1478 
1479 	ut_ad(purge_sys->next_stored);
1480 	ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no());
1481 
1482 	space = purge_sys->rseg->space;
1483 	page_no = purge_sys->page_no;
1484 	offset = purge_sys->offset;
1485 
1486 	const page_size_t	page_size(purge_sys->rseg->page_size);
1487 
1488 	if (offset == 0) {
1489 		/* It is the dummy undo log record, which means that there is
1490 		no need to purge this undo log */
1491 
1492 		trx_purge_rseg_get_next_history_log(
1493 			purge_sys->rseg, n_pages_handled);
1494 
1495 		/* Look for the next undo log and record to purge */
1496 
1497 		trx_purge_choose_next_log();
1498 
1499 		return(&trx_purge_dummy_rec);
1500 	}
1501 
1502 	mtr_start(&mtr);
1503 
1504 	undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no),
1505 						page_size, &mtr);
1506 
1507 	rec = undo_page + offset;
1508 
1509 	rec2 = rec;
1510 
1511 	for (;;) {
1512 		ulint		type;
1513 		trx_undo_rec_t*	next_rec;
1514 		ulint		cmpl_info;
1515 
1516 		/* Try first to find the next record which requires a purge
1517 		operation from the same page of the same undo log */
1518 
1519 		next_rec = trx_undo_page_get_next_rec(
1520 			rec2, purge_sys->hdr_page_no, purge_sys->hdr_offset);
1521 
1522 		if (next_rec == NULL) {
1523 			rec2 = trx_undo_get_next_rec(
1524 				rec2, purge_sys->hdr_page_no,
1525 				purge_sys->hdr_offset, &mtr);
1526 			break;
1527 		}
1528 
1529 		rec2 = next_rec;
1530 
1531 		type = trx_undo_rec_get_type(rec2);
1532 
1533 		if (type == TRX_UNDO_DEL_MARK_REC) {
1534 
1535 			break;
1536 		}
1537 
1538 		cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
1539 
1540 		if (trx_undo_rec_get_extern_storage(rec2)) {
1541 			break;
1542 		}
1543 
1544 		if ((type == TRX_UNDO_UPD_EXIST_REC)
1545 		    && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1546 			break;
1547 		}
1548 	}
1549 
1550 	if (rec2 == NULL) {
1551 		mtr_commit(&mtr);
1552 
1553 		trx_purge_rseg_get_next_history_log(
1554 			purge_sys->rseg, n_pages_handled);
1555 
1556 		/* Look for the next undo log and record to purge */
1557 
1558 		trx_purge_choose_next_log();
1559 
1560 		mtr_start(&mtr);
1561 
1562 		undo_page = trx_undo_page_get_s_latched(
1563 			page_id_t(space, page_no), page_size, &mtr);
1564 
1565 	} else {
1566 		page = page_align(rec2);
1567 
1568 		purge_sys->offset = rec2 - page;
1569 		purge_sys->page_no = page_get_page_no(page);
1570 		purge_sys->iter.undo_no = trx_undo_rec_get_undo_no(rec2);
1571 		purge_sys->iter.undo_rseg_space = space;
1572 
1573 		if (undo_page != page) {
1574 			/* We advance to a new page of the undo log: */
1575 			(*n_pages_handled)++;
1576 		}
1577 	}
1578 
1579 	rec_copy = trx_undo_rec_copy(undo_page, offset, heap);
1580 	mtr_commit(&mtr);
1581 	return(rec_copy);
1582 }
1583 
1584 /********************************************************************//**
1585 Fetches the next undo log record from the history list to purge. It must be
1586 released with the corresponding release function.
1587 @return copy of an undo log record or pointer to trx_purge_dummy_rec,
1588 if the whole undo log can skipped in purge; NULL if none left */
1589 static MY_ATTRIBUTE((warn_unused_result))
1590 trx_undo_rec_t*
trx_purge_fetch_next_rec(roll_ptr_t * roll_ptr,ulint * n_pages_handled,mem_heap_t * heap)1591 trx_purge_fetch_next_rec(
1592 /*=====================*/
1593 	roll_ptr_t*	roll_ptr,	/*!< out: roll pointer to undo record */
1594 	ulint*		n_pages_handled,/*!< in/out: number of UNDO log pages
1595 					handled */
1596 	mem_heap_t*	heap)		/*!< in: memory heap where copied */
1597 {
1598 	if (!purge_sys->next_stored) {
1599 		trx_purge_choose_next_log();
1600 
1601 		if (!purge_sys->next_stored) {
1602 			DBUG_PRINT("ib_purge",
1603 				   ("no logs left in the history list"));
1604 			return(NULL);
1605 		}
1606 	}
1607 
1608 	if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) {
1609 
1610 		return(NULL);
1611 	}
1612 
1613 	/* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n",
1614 	os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */
1615 
1616 	*roll_ptr = trx_undo_build_roll_ptr(
1617 		FALSE, purge_sys->rseg->id,
1618 		purge_sys->page_no, purge_sys->offset);
1619 
1620 	/* The following call will advance the stored values of the
1621 	purge iterator. */
1622 
1623 	return(trx_purge_get_next_rec(n_pages_handled, heap));
1624 }
1625 
1626 /*******************************************************************//**
1627 This function runs a purge batch.
1628 @return number of undo log pages handled in the batch */
1629 static
1630 ulint
trx_purge_attach_undo_recs(ulint n_purge_threads,trx_purge_t * purge_sys,ulint batch_size)1631 trx_purge_attach_undo_recs(
1632 /*=======================*/
1633 	ulint		n_purge_threads,/*!< in: number of purge threads */
1634 	trx_purge_t*	purge_sys,	/*!< in/out: purge instance */
1635 	ulint		batch_size)	/*!< in: no. of pages to purge */
1636 {
1637 	que_thr_t*	thr;
1638 	ulint		i = 0;
1639 	ulint		n_pages_handled = 0;
1640 	ulint		n_thrs = UT_LIST_GET_LEN(purge_sys->query->thrs);
1641 
1642 	ut_a(n_purge_threads > 0);
1643 
1644 	purge_sys->limit = purge_sys->iter;
1645 
1646 	/* Debug code to validate some pre-requisites and reset done flag. */
1647 	for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1648 	     thr != NULL && i < n_purge_threads;
1649 	     thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
1650 
1651 		purge_node_t*		node;
1652 
1653 		/* Get the purge node. */
1654 		node = (purge_node_t*) thr->child;
1655 
1656 		ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1657 		ut_a(node->undo_recs == NULL);
1658 		ut_a(node->done);
1659 
1660 		node->done = FALSE;
1661 	}
1662 
1663 	/* There should never be fewer nodes than threads, the inverse
1664 	however is allowed because we only use purge threads as needed. */
1665 	ut_a(i == n_purge_threads);
1666 
1667 	/* Fetch and parse the UNDO records. The UNDO records are added
1668 	to a per purge node vector. */
1669 	thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1670 	ut_a(n_thrs > 0 && thr != NULL);
1671 
1672 	ut_ad(trx_purge_check_limit());
1673 
1674 	i = 0;
1675 
1676 	for (;;) {
1677 		purge_node_t*		node;
1678 		trx_purge_rec_t*	purge_rec;
1679 
1680 		ut_a(!thr->is_active);
1681 
1682 		/* Get the purge node. */
1683 		node = (purge_node_t*) thr->child;
1684 		ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1685 
1686 		purge_rec = static_cast<trx_purge_rec_t*>(
1687 			mem_heap_zalloc(node->heap, sizeof(*purge_rec)));
1688 
1689 		/* Track the max {trx_id, undo_no} for truncating the
1690 		UNDO logs once we have purged the records. */
1691 
1692 		if (trx_purge_check_limit()) {
1693 			purge_sys->limit = purge_sys->iter;
1694 		}
1695 
1696 		/* Fetch the next record, and advance the purge_sys->iter. */
1697 		purge_rec->undo_rec = trx_purge_fetch_next_rec(
1698 			&purge_rec->roll_ptr, &n_pages_handled, node->heap);
1699 
1700 		if (purge_rec->undo_rec != NULL) {
1701 
1702 			if (node->undo_recs == NULL) {
1703 				node->undo_recs = ib_vector_create(
1704 					ib_heap_allocator_create(node->heap),
1705 					sizeof(trx_purge_rec_t),
1706 					batch_size);
1707 			} else {
1708 				ut_a(!ib_vector_is_empty(node->undo_recs));
1709 			}
1710 
1711 			ib_vector_push(node->undo_recs, purge_rec);
1712 
1713 			if (n_pages_handled >= batch_size) {
1714 
1715 				break;
1716 			}
1717 		} else {
1718 			break;
1719 		}
1720 
1721 		thr = UT_LIST_GET_NEXT(thrs, thr);
1722 
1723 		if (!(++i % n_purge_threads)) {
1724 			thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1725 		}
1726 
1727 		ut_a(thr != NULL);
1728 	}
1729 
1730 	ut_ad(trx_purge_check_limit());
1731 
1732 	return(n_pages_handled);
1733 }
1734 
1735 /*******************************************************************//**
1736 Calculate the DML delay required.
1737 @return delay in microseconds or ULINT_MAX */
1738 static
1739 ulint
trx_purge_dml_delay(void)1740 trx_purge_dml_delay(void)
1741 /*=====================*/
1742 {
1743 	/* Determine how much data manipulation language (DML) statements
1744 	need to be delayed in order to reduce the lagging of the purge
1745 	thread. */
1746 	ulint	delay = 0; /* in microseconds; default: no delay */
1747 
1748 	/* If purge lag is set (ie. > 0) then calculate the new DML delay.
1749 	Note: we do a dirty read of the trx_sys_t data structure here,
1750 	without holding trx_sys->mutex. */
1751 
1752 	if (srv_max_purge_lag > 0
1753 	    && trx_sys->rseg_history_len
1754 	       > srv_n_purge_threads * srv_purge_batch_size) {
1755 		float	ratio;
1756 
1757 		ratio = float(trx_sys->rseg_history_len) / srv_max_purge_lag;
1758 
1759 		if (ratio > 1.0) {
1760 			/* If the history list length exceeds the
1761 			srv_max_purge_lag, the data manipulation
1762 			statements are delayed by at least 5000
1763 			microseconds. */
1764 			delay = (ulint) ((ratio - .5) * 10000);
1765 		}
1766 
1767 		if (delay > srv_max_purge_lag_delay) {
1768 			delay = srv_max_purge_lag_delay;
1769 		}
1770 
1771 		MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay);
1772 	}
1773 
1774 	return(delay);
1775 }
1776 
1777 /*******************************************************************//**
1778 Wait for pending purge jobs to complete. */
1779 static
1780 void
trx_purge_wait_for_workers_to_complete(trx_purge_t * purge_sys)1781 trx_purge_wait_for_workers_to_complete(
1782 /*===================================*/
1783 	trx_purge_t*	purge_sys)	/*!< in: purge instance */
1784 {
1785 	ulint		n_submitted = purge_sys->n_submitted;
1786 
1787 	/* Ensure that the work queue empties out. */
1788 	while (!os_compare_and_swap_ulint(
1789 			&purge_sys->n_completed, n_submitted, n_submitted)) {
1790 
1791 		if (srv_get_task_queue_length() > 0) {
1792 			srv_release_threads(SRV_WORKER, 1);
1793 		}
1794 
1795 		os_thread_yield();
1796 	}
1797 
1798 	/* None of the worker threads should be doing any work. */
1799 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1800 
1801 	/* There should be no outstanding tasks as long
1802 	as the worker threads are active. */
1803 	ut_a(srv_get_task_queue_length() == 0);
1804 }
1805 
1806 /******************************************************************//**
1807 Remove old historical changes from the rollback segments. */
1808 static
1809 void
trx_purge_truncate(void)1810 trx_purge_truncate(void)
1811 /*====================*/
1812 {
1813 	ut_ad(trx_purge_check_limit());
1814 
1815 	if (purge_sys->limit.trx_no == 0) {
1816 		trx_purge_truncate_history(&purge_sys->iter, &purge_sys->view);
1817 	} else {
1818 		trx_purge_truncate_history(&purge_sys->limit, &purge_sys->view);
1819 	}
1820 }
1821 
1822 /*******************************************************************//**
1823 This function runs a purge batch.
1824 @return number of undo log pages handled in the batch */
1825 ulint
trx_purge(ulint n_purge_threads,ulint batch_size,bool truncate)1826 trx_purge(
1827 /*======*/
1828 	ulint	n_purge_threads,	/*!< in: number of purge tasks
1829 					to submit to the work queue */
1830 	ulint	batch_size,		/*!< in: the maximum number of records
1831 					to purge in one batch */
1832 	bool	truncate)		/*!< in: truncate history if true */
1833 {
1834 	que_thr_t*	thr = NULL;
1835 	ulint		n_pages_handled;
1836 
1837 	ut_a(n_purge_threads > 0);
1838 
1839 	srv_dml_needed_delay = trx_purge_dml_delay();
1840 
1841 	/* The number of tasks submitted should be completed. */
1842 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1843 
1844 	rw_lock_x_lock(&purge_sys->latch);
1845 
1846 	purge_sys->view_active = false;
1847 
1848 	trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
1849 
1850 	purge_sys->view_active = true;
1851 
1852 	rw_lock_x_unlock(&purge_sys->latch);
1853 
1854 #ifdef UNIV_DEBUG
1855 	if (srv_purge_view_update_only_debug) {
1856 		return(0);
1857 	}
1858 #endif /* UNIV_DEBUG */
1859 
1860 	/* Fetch the UNDO recs that need to be purged. */
1861 	n_pages_handled = trx_purge_attach_undo_recs(
1862 		n_purge_threads, purge_sys, batch_size);
1863 
1864 	/* Do we do an asynchronous purge or not ? */
1865 	if (n_purge_threads > 1) {
1866 		ulint	i = 0;
1867 
1868 		/* Submit the tasks to the work queue. */
1869 		for (i = 0; i < n_purge_threads - 1; ++i) {
1870 			thr = que_fork_scheduler_round_robin(
1871 				purge_sys->query, thr);
1872 
1873 			ut_a(thr != NULL);
1874 
1875 			srv_que_task_enqueue_low(thr);
1876 		}
1877 
1878 		thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
1879 		ut_a(thr != NULL);
1880 
1881 		purge_sys->n_submitted += n_purge_threads - 1;
1882 
1883 		goto run_synchronously;
1884 
1885 	/* Do it synchronously. */
1886 	} else {
1887 		thr = que_fork_scheduler_round_robin(purge_sys->query, NULL);
1888 		ut_ad(thr);
1889 
1890 run_synchronously:
1891 		++purge_sys->n_submitted;
1892 
1893 		que_run_threads(thr);
1894 
1895 		os_atomic_inc_ulint(
1896 			&purge_sys->pq_mutex, &purge_sys->n_completed, 1);
1897 
1898 		if (n_purge_threads > 1) {
1899 			trx_purge_wait_for_workers_to_complete(purge_sys);
1900 		}
1901 	}
1902 
1903 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1904 
1905 #ifdef UNIV_DEBUG
1906 	rw_lock_x_lock(&purge_sys->latch);
1907 	if (purge_sys->limit.trx_no == 0) {
1908 		purge_sys->done = purge_sys->iter;
1909 	} else {
1910 		purge_sys->done = purge_sys->limit;
1911 	}
1912 	rw_lock_x_unlock(&purge_sys->latch);
1913 #endif /* UNIV_DEBUG */
1914 
1915 	if (truncate) {
1916 		trx_purge_truncate();
1917 	}
1918 
1919 	MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
1920 	MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled);
1921 
1922 	return(n_pages_handled);
1923 }
1924 
1925 /*******************************************************************//**
1926 Get the purge state.
1927 @return purge state. */
1928 purge_state_t
trx_purge_state(void)1929 trx_purge_state(void)
1930 /*=================*/
1931 {
1932 	purge_state_t	state;
1933 
1934 	rw_lock_x_lock(&purge_sys->latch);
1935 
1936 	state = purge_sys->state;
1937 
1938 	rw_lock_x_unlock(&purge_sys->latch);
1939 
1940 	return(state);
1941 }
1942 
1943 /*******************************************************************//**
1944 Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
1945 void
trx_purge_stop(void)1946 trx_purge_stop(void)
1947 /*================*/
1948 {
1949 	purge_state_t	state;
1950 	int64_t		sig_count = os_event_reset(purge_sys->event);
1951 
1952 	ut_a(srv_n_purge_threads > 0);
1953 
1954 	rw_lock_x_lock(&purge_sys->latch);
1955 
1956 	ut_a(purge_sys->state != PURGE_STATE_INIT);
1957 	ut_a(purge_sys->state != PURGE_STATE_EXIT);
1958 	ut_a(purge_sys->state != PURGE_STATE_DISABLED);
1959 
1960 	++purge_sys->n_stop;
1961 
1962 	state = purge_sys->state;
1963 
1964 	if (state == PURGE_STATE_RUN) {
1965 		ib::info() << "Stopping purge";
1966 
1967 		/* We need to wakeup the purge thread in case it is suspended,
1968 		so that it can acknowledge the state change. */
1969 
1970 		srv_purge_wakeup();
1971 	}
1972 
1973 	purge_sys->state = PURGE_STATE_STOP;
1974 
1975 	rw_lock_x_unlock(&purge_sys->latch);
1976 
1977 	if (state != PURGE_STATE_STOP) {
1978 
1979 		/* Wait for purge coordinator to signal that it
1980 		is suspended. */
1981 		os_event_wait_low(purge_sys->event, sig_count);
1982 	} else {
1983 		bool	once = true;
1984 
1985 		rw_lock_x_lock(&purge_sys->latch);
1986 
1987 		/* Wait for purge to signal that it has actually stopped. */
1988 		while (purge_sys->running) {
1989 
1990 			if (once) {
1991 				ib::info() << "Waiting for purge to stop";
1992 				once = false;
1993 			}
1994 
1995 			rw_lock_x_unlock(&purge_sys->latch);
1996 
1997 			os_thread_sleep(10000);
1998 
1999 			rw_lock_x_lock(&purge_sys->latch);
2000 		}
2001 
2002 		rw_lock_x_unlock(&purge_sys->latch);
2003 	}
2004 
2005 	MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
2006 }
2007 
2008 /*******************************************************************//**
2009 Resume purge, move to PURGE_STATE_RUN. */
2010 void
trx_purge_run(void)2011 trx_purge_run(void)
2012 /*===============*/
2013 {
2014 	rw_lock_x_lock(&purge_sys->latch);
2015 
2016 	switch (purge_sys->state) {
2017 	case PURGE_STATE_INIT:
2018 	case PURGE_STATE_EXIT:
2019 	case PURGE_STATE_DISABLED:
2020 		ut_error;
2021 
2022 	case PURGE_STATE_RUN:
2023 	case PURGE_STATE_STOP:
2024 		break;
2025 	}
2026 
2027 	if (purge_sys->n_stop > 0) {
2028 
2029 		ut_a(purge_sys->state == PURGE_STATE_STOP);
2030 
2031 		--purge_sys->n_stop;
2032 
2033 		if (purge_sys->n_stop == 0) {
2034 
2035 			ib::info() << "Resuming purge";
2036 
2037 			purge_sys->state = PURGE_STATE_RUN;
2038 		}
2039 
2040 		MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
2041 	} else {
2042 		ut_a(purge_sys->state == PURGE_STATE_RUN);
2043 	}
2044 
2045 	rw_lock_x_unlock(&purge_sys->latch);
2046 
2047 	srv_purge_wakeup();
2048 }
2049