1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file trx/trx0purge.cc
29 Purge old versions
30 
31 Created 3/26/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include "trx0purge.h"
37 
38 #ifdef UNIV_NONINL
39 #include "trx0purge.ic"
40 #endif
41 
42 #include "fsp0fsp.h"
43 #include "fut0fut.h"
44 #include "mach0data.h"
45 #include "mtr0log.h"
46 #include "os0thread.h"
47 #include "que0que.h"
48 #include "read0read.h"
49 #include "row0purge.h"
50 #include "row0upd.h"
51 #include "srv0mon.h"
52 #include "fsp0sysspace.h"
53 #include "srv0srv.h"
54 #include "srv0start.h"
55 #include "sync0sync.h"
56 #include "trx0rec.h"
57 #include "trx0roll.h"
58 #include "trx0rseg.h"
59 #include "trx0trx.h"
60 
61 /** Maximum allowable purge history length.  <=0 means 'infinite'. */
62 ulong		srv_max_purge_lag = 0;
63 
64 /** Max DML user threads delay in micro-seconds. */
65 ulong		srv_max_purge_lag_delay = 0;
66 
67 /** The global data structure coordinating a purge */
68 trx_purge_t*	purge_sys = NULL;
69 
70 /** A dummy undo record used as a return value when we have a whole undo log
71 which needs no purge */
72 trx_undo_rec_t	trx_purge_dummy_rec;
73 
74 #ifdef UNIV_DEBUG
75 my_bool		srv_purge_view_update_only_debug;
76 bool		trx_commit_disallowed = false;
77 #endif /* UNIV_DEBUG */
78 
79 /** Sentinel value */
80 const TrxUndoRsegs TrxUndoRsegsIterator::NullElement(UINT64_UNDEFINED);
81 
82 /** Constructor */
TrxUndoRsegsIterator(trx_purge_t * purge_sys)83 TrxUndoRsegsIterator::TrxUndoRsegsIterator(trx_purge_t* purge_sys)
84 	:
85 	m_purge_sys(purge_sys),
86 	m_trx_undo_rsegs(NullElement),
87 	m_iter(m_trx_undo_rsegs.end())
88 {
89 }
90 
91 /** Sets the next rseg to purge in m_purge_sys.
92 @return page size of the table for which the log is.
93 NOTE: if rseg is NULL when this function returns this means that
94 there are no rollback segments to purge and then the returned page
95 size object should not be used. */
96 const page_size_t
set_next()97 TrxUndoRsegsIterator::set_next()
98 {
99 	mutex_enter(&m_purge_sys->pq_mutex);
100 
101 	/* Only purge consumes events from the priority queue, user
102 	threads only produce the events. */
103 
104 	/* Check if there are more rsegs to process in the
105 	current element. */
106 	if (m_iter != m_trx_undo_rsegs.end()) {
107 
108 		/* We are still processing rollback segment from
109 		the same transaction and so expected transaction
110 		number shouldn't increase. Undo increment of
111 		expected trx_no done by caller assuming rollback
112 		segments from given transaction are done. */
113 		m_purge_sys->iter.trx_no = (*m_iter)->last_trx_no;
114 
115 	} else if (!m_purge_sys->purge_queue->empty()) {
116 
117 		/* Read the next element from the queue.
118 		Combine elements if they have same transaction number.
119 		This can happen if a transaction shares redo rollback segment
120 		with another transaction that has already added it to purge
121 		queue and former transaction also needs to schedule non-redo
122 		rollback segment for purge. */
123 		m_trx_undo_rsegs = NullElement;
124 
125 		while (!m_purge_sys->purge_queue->empty()) {
126 
127 			if (m_trx_undo_rsegs.get_trx_no() == UINT64_UNDEFINED) {
128 				m_trx_undo_rsegs =
129 					purge_sys->purge_queue->top();
130 			} else if (purge_sys->purge_queue->top().get_trx_no() ==
131 					m_trx_undo_rsegs.get_trx_no()) {
132 				m_trx_undo_rsegs.append(
133 					purge_sys->purge_queue->top());
134 			} else {
135 				break;
136 			}
137 
138 			m_purge_sys->purge_queue->pop();
139 		}
140 
141 		m_iter = m_trx_undo_rsegs.begin();
142 
143 	} else {
144 		/* Queue is empty, reset iterator. */
145 		m_trx_undo_rsegs = NullElement;
146 		m_iter = m_trx_undo_rsegs.end();
147 
148 		mutex_exit(&m_purge_sys->pq_mutex);
149 
150 		m_purge_sys->rseg = NULL;
151 
152 		/* return a dummy object, not going to be used by the caller */
153 		return(univ_page_size);
154 	}
155 
156 	m_purge_sys->rseg = *m_iter++;
157 
158 	mutex_exit(&m_purge_sys->pq_mutex);
159 
160 	ut_a(m_purge_sys->rseg != NULL);
161 
162 	mutex_enter(&m_purge_sys->rseg->mutex);
163 
164 	ut_a(m_purge_sys->rseg->last_page_no != FIL_NULL);
165 	ut_ad(m_purge_sys->rseg->last_trx_no == m_trx_undo_rsegs.get_trx_no());
166 
167 	/* We assume in purge of externally stored fields that
168 	space id is in the range of UNDO tablespace space ids
169 	unless space is system tablespace */
170 	ut_a(srv_is_undo_tablespace(m_purge_sys->rseg->space)
171 	     || is_system_tablespace(
172 			m_purge_sys->rseg->space));
173 
174 	const page_size_t	page_size(m_purge_sys->rseg->page_size);
175 
176 	ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
177 
178 	m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no;
179 	m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset;
180 	m_purge_sys->hdr_page_no = m_purge_sys->rseg->last_page_no;
181 
182 	mutex_exit(&m_purge_sys->rseg->mutex);
183 
184 	return(page_size);
185 }
186 
187 /****************************************************************//**
188 Builds a purge 'query' graph. The actual purge is performed by executing
189 this query graph.
190 @return own: the query graph */
191 static
192 que_t*
trx_purge_graph_build(trx_t * trx,ulint n_purge_threads)193 trx_purge_graph_build(
194 /*==================*/
195 	trx_t*		trx,			/*!< in: transaction */
196 	ulint		n_purge_threads)	/*!< in: number of purge
197 						threads */
198 {
199 	ulint		i;
200 	mem_heap_t*	heap;
201 	que_fork_t*	fork;
202 
203 	heap = mem_heap_create(512);
204 	fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
205 	fork->trx = trx;
206 
207 	for (i = 0; i < n_purge_threads; ++i) {
208 		que_thr_t*	thr;
209 
210 		thr = que_thr_create(fork, heap, NULL);
211 
212 		thr->child = row_purge_node_create(thr, heap);
213 	}
214 
215 	return(fork);
216 }
217 
218 /********************************************************************//**
219 Creates the global purge system control structure and inits the history
220 mutex. */
221 void
trx_purge_sys_create(ulint n_purge_threads,purge_pq_t * purge_queue)222 trx_purge_sys_create(
223 /*=================*/
224 	ulint		n_purge_threads,	/*!< in: number of purge
225 						threads */
226 	purge_pq_t*	purge_queue)		/*!< in, own: UNDO log min
227 						binary heap */
228 {
229 	purge_sys = static_cast<trx_purge_t*>(
230 		ut_zalloc_nokey(sizeof(*purge_sys)));
231 
232 	purge_sys->state = PURGE_STATE_INIT;
233 	purge_sys->event = os_event_create(0);
234 
235 	new (&purge_sys->iter) purge_iter_t;
236 	new (&purge_sys->limit) purge_iter_t;
237 	new (&purge_sys->undo_trunc) undo::Truncate;
238 #ifdef UNIV_DEBUG
239 	new (&purge_sys->done) purge_iter_t;
240 #endif /* UNIV_DEBUG */
241 
242 	/* Take ownership of purge_queue, we are responsible for freeing it. */
243 	purge_sys->purge_queue = purge_queue;
244 
245 	rw_lock_create(trx_purge_latch_key,
246 		       &purge_sys->latch, SYNC_PURGE_LATCH);
247 
248 	mutex_create(LATCH_ID_PURGE_SYS_PQ, &purge_sys->pq_mutex);
249 
250 	ut_a(n_purge_threads > 0);
251 
252 	purge_sys->sess = sess_open();
253 
254 	purge_sys->trx = purge_sys->sess->trx;
255 
256 	ut_a(purge_sys->trx->sess == purge_sys->sess);
257 
258 	/* A purge transaction is not a real transaction, we use a transaction
259 	here only because the query threads code requires it. It is otherwise
260 	quite unnecessary. We should get rid of it eventually. */
261 	purge_sys->trx->id = 0;
262 	purge_sys->trx->start_time = ut_time_monotonic();
263 	purge_sys->trx->state = TRX_STATE_ACTIVE;
264 	purge_sys->trx->op_info = "purge trx";
265 
266 	purge_sys->query = trx_purge_graph_build(
267 		purge_sys->trx, n_purge_threads);
268 
269 	new(&purge_sys->view) ReadView();
270 
271 	trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
272 
273 	purge_sys->view_active = true;
274 
275 	purge_sys->rseg_iter = UT_NEW_NOKEY(TrxUndoRsegsIterator(purge_sys));
276 }
277 
278 /************************************************************************
279 Frees the global purge system control structure. */
280 void
trx_purge_sys_close(void)281 trx_purge_sys_close(void)
282 /*======================*/
283 {
284 	que_graph_free(purge_sys->query);
285 
286 	ut_a(purge_sys->trx->id == 0);
287 	ut_a(purge_sys->sess->trx == purge_sys->trx);
288 
289 	purge_sys->trx->state = TRX_STATE_NOT_STARTED;
290 
291 	sess_close(purge_sys->sess);
292 
293 	purge_sys->sess = NULL;
294 
295 	purge_sys->view.close();
296 	purge_sys->view.~ReadView();
297 
298 	rw_lock_free(&purge_sys->latch);
299 	mutex_free(&purge_sys->pq_mutex);
300 
301 	if (purge_sys->purge_queue != NULL) {
302 		UT_DELETE(purge_sys->purge_queue);
303 		purge_sys->purge_queue = NULL;
304 	}
305 
306 	os_event_destroy(purge_sys->event);
307 
308 	purge_sys->event = NULL;
309 
310 	UT_DELETE(purge_sys->rseg_iter);
311 
312 	ut_free(purge_sys);
313 
314 	purge_sys = NULL;
315 }
316 
317 /*================ UNDO LOG HISTORY LIST =============================*/
318 
319 /********************************************************************//**
320 Adds the update undo log as the first log in the history list. Removes the
321 update undo log segment from the rseg slot if it is too big for reuse. */
322 void
trx_purge_add_update_undo_to_history(trx_t * trx,trx_undo_ptr_t * undo_ptr,page_t * undo_page,bool update_rseg_history_len,ulint n_added_logs,mtr_t * mtr)323 trx_purge_add_update_undo_to_history(
324 /*=================================*/
325 	trx_t*		trx,		/*!< in: transaction */
326 	trx_undo_ptr_t*	undo_ptr,	/*!< in/out: update undo log. */
327 	page_t*		undo_page,	/*!< in: update undo log header page,
328 					x-latched */
329 	bool		update_rseg_history_len,
330 					/*!< in: if true: update rseg history
331 					len else skip updating it. */
332 	ulint		n_added_logs,	/*!< in: number of logs added */
333 	mtr_t*		mtr)		/*!< in: mtr */
334 {
335 	trx_undo_t*	undo;
336 	trx_rseg_t*	rseg;
337 	trx_rsegf_t*	rseg_header;
338 	trx_ulogf_t*	undo_header;
339 
340 	undo = undo_ptr->update_undo;
341 	rseg = undo->rseg;
342 
343 	rseg_header = trx_rsegf_get(
344 		undo->rseg->space, undo->rseg->page_no, undo->rseg->page_size,
345 		mtr);
346 
347 	undo_header = undo_page + undo->hdr_offset;
348 
349 	if (undo->state != TRX_UNDO_CACHED) {
350 		ulint		hist_size;
351 #ifdef UNIV_DEBUG
352 		trx_usegf_t*	seg_header = undo_page + TRX_UNDO_SEG_HDR;
353 #endif /* UNIV_DEBUG */
354 
355 		/* The undo log segment will not be reused */
356 
357 		if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) {
358 			ib::fatal() << "undo->id is " << undo->id;
359 		}
360 
361 		trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
362 
363 		MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
364 
365 		hist_size = mtr_read_ulint(
366 			rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);
367 
368 		ut_ad(undo->size == flst_get_len(
369 			      seg_header + TRX_UNDO_PAGE_LIST));
370 
371 		mlog_write_ulint(
372 			rseg_header + TRX_RSEG_HISTORY_SIZE,
373 			hist_size + undo->size, MLOG_4BYTES, mtr);
374 	}
375 
376 	ut_ad(!trx_commit_disallowed);
377 
378 	/* Add the log as the first in the history list */
379 	flst_add_first(rseg_header + TRX_RSEG_HISTORY,
380 		       undo_header + TRX_UNDO_HISTORY_NODE, mtr);
381 
382 	if (update_rseg_history_len) {
383 		os_atomic_increment_ulint(
384 			&trx_sys->rseg_history_len, n_added_logs);
385 		if (trx_sys->rseg_history_len
386 		    > srv_n_purge_threads * srv_purge_batch_size) {
387 			srv_wake_purge_thread_if_not_active();
388 		}
389 	}
390 
391 	/* Write the trx number to the undo log header */
392 	mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
393 
394 	/* Write information about delete markings to the undo log header */
395 
396 	if (!undo->del_marks) {
397 		mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
398 				 MLOG_2BYTES, mtr);
399 	}
400 
401 	if (rseg->last_page_no == FIL_NULL) {
402 		rseg->last_page_no = undo->hdr_page_no;
403 		rseg->last_offset = undo->hdr_offset;
404 		rseg->last_trx_no = trx->no;
405 		rseg->last_del_marks = undo->del_marks;
406 	}
407 }
408 
409 /** Remove undo log header from the history list.
410 @param[in,out]	rseg_hdr	rollback segment header
411 @param[in]	log_hdr		undo log segment header
412 @param[in,out]	mtr		mini transaction. */
413 static
414 void
trx_purge_remove_log_hdr(trx_rsegf_t * rseg_hdr,trx_ulogf_t * log_hdr,mtr_t * mtr)415 trx_purge_remove_log_hdr(
416 	trx_rsegf_t*	rseg_hdr,
417 	trx_ulogf_t*	log_hdr,
418 	mtr_t*		mtr)
419 {
420 	flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
421 		    log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
422 
423 	os_atomic_decrement_ulint(&trx_sys->rseg_history_len, 1);
424 }
425 
426 /** Frees an undo log segment which is in the history list. Removes the
427 undo log hdr from the history list.
428 @param[in,out]	rseg		rollback segment
429 @param[in]	hdr_addr	file address of log_hdr
430 @param[in]	noredo		skip redo logging. */
431 static
432 void
trx_purge_free_segment(trx_rseg_t * rseg,fil_addr_t hdr_addr,bool noredo)433 trx_purge_free_segment(
434 	trx_rseg_t*	rseg,
435 	fil_addr_t	hdr_addr,
436 	bool		noredo)
437 {
438 	mtr_t		mtr;
439 	trx_rsegf_t*	rseg_hdr;
440 	trx_ulogf_t*	log_hdr;
441 	trx_usegf_t*	seg_hdr;
442 	ulint		seg_size;
443 	ulint		hist_size;
444 	bool		marked		= noredo;
445 
446 	for (;;) {
447 		page_t*	undo_page;
448 
449 		mtr_start(&mtr);
450 		if (noredo) {
451 			mtr.set_log_mode(MTR_LOG_NO_REDO);
452 		}
453 		// ut_ad(noredo == trx_sys_is_noredo_rseg_slot(rseg->id));
454 
455 		mutex_enter(&rseg->mutex);
456 
457 		rseg_hdr = trx_rsegf_get(
458 			rseg->space, rseg->page_no, rseg->page_size, &mtr);
459 
460 		undo_page = trx_undo_page_get(
461 			page_id_t(rseg->space, hdr_addr.page), rseg->page_size,
462 			&mtr);
463 
464 		seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
465 		log_hdr = undo_page + hdr_addr.boffset;
466 
467 		/* Mark the last undo log totally purged, so that if the
468 		system crashes, the tail of the undo log will not get accessed
469 		again. The list of pages in the undo log tail gets inconsistent
470 		during the freeing of the segment, and therefore purge should
471 		not try to access them again. */
472 
473 		if (!marked) {
474 			marked = true;
475 			mlog_write_ulint(
476 				log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
477 				MLOG_2BYTES, &mtr);
478 		}
479 
480 		if (fseg_free_step_not_header(
481 			    seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr)) {
482 
483 			break;
484 		}
485 
486 		mutex_exit(&rseg->mutex);
487 
488 		mtr_commit(&mtr);
489 	}
490 
491 	/* The page list may now be inconsistent, but the length field
492 	stored in the list base node tells us how big it was before we
493 	started the freeing. */
494 
495 	seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST);
496 
497 	/* We may free the undo log segment header page; it must be freed
498 	within the same mtr as the undo log header is removed from the
499 	history list: otherwise, in case of a database crash, the segment
500 	could become inaccessible garbage in the file space. */
501 
502 	trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
503 
504 	do {
505 
506 		/* Here we assume that a file segment with just the header
507 		page can be freed in a few steps, so that the buffer pool
508 		is not flooded with bufferfixed pages: see the note in
509 		fsp0fsp.cc. */
510 
511 	} while (!fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr));
512 
513 	hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
514 				   MLOG_4BYTES, &mtr);
515 	ut_ad(hist_size >= seg_size);
516 
517 	mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
518 			 hist_size - seg_size, MLOG_4BYTES, &mtr);
519 
520 	ut_ad(rseg->curr_size >= seg_size);
521 
522 	rseg->curr_size -= seg_size;
523 
524 	mutex_exit(&(rseg->mutex));
525 
526 	mtr_commit(&mtr);
527 }
528 
529 /********************************************************************//**
530 Removes unnecessary history data from a rollback segment. */
531 static
532 void
trx_purge_truncate_rseg_history(trx_rseg_t * rseg,const purge_iter_t * limit)533 trx_purge_truncate_rseg_history(
534 /*============================*/
535 	trx_rseg_t*		rseg,		/*!< in: rollback segment */
536 	const purge_iter_t*	limit)		/*!< in: truncate offset */
537 {
538 	fil_addr_t	hdr_addr;
539 	fil_addr_t	prev_hdr_addr;
540 	trx_rsegf_t*	rseg_hdr;
541 	page_t*		undo_page;
542 	trx_ulogf_t*	log_hdr;
543 	trx_usegf_t*	seg_hdr;
544 	mtr_t		mtr;
545 	trx_id_t	undo_trx_no;
546 	const bool	noredo		= trx_sys_is_noredo_rseg_slot(
547 		rseg->id) &&
548 		(trx_sys->pending_purge_rseg_array[rseg->id] != rseg);
549 
550 	mtr_start(&mtr);
551 	if (noredo) {
552 		mtr.set_log_mode(MTR_LOG_NO_REDO);
553 	}
554 	mutex_enter(&(rseg->mutex));
555 
556 	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
557 				 rseg->page_size, &mtr);
558 
559 	hdr_addr = trx_purge_get_log_from_hist(
560 		flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
561 loop:
562 	if (hdr_addr.page == FIL_NULL) {
563 
564 		mutex_exit(&(rseg->mutex));
565 
566 		mtr_commit(&mtr);
567 
568 		return;
569 	}
570 
571 	undo_page = trx_undo_page_get(page_id_t(rseg->space, hdr_addr.page),
572 				      rseg->page_size, &mtr);
573 
574 	log_hdr = undo_page + hdr_addr.boffset;
575 
576 	undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
577 
578 	if (undo_trx_no >= limit->trx_no) {
579 
580 		/* limit space_id should match the rollback segment
581 		space id to avoid freeing of the page belongs to
582 		different rollback segment for the same trx_no. */
583 		if (undo_trx_no == limit->trx_no
584 		    && rseg->space == limit->undo_rseg_space) {
585 
586 			trx_undo_truncate_start(
587 				rseg, hdr_addr.page,
588 				hdr_addr.boffset, limit->undo_no);
589 		}
590 
591 		mutex_exit(&(rseg->mutex));
592 		mtr_commit(&mtr);
593 
594 		return;
595 	}
596 
597 	prev_hdr_addr = trx_purge_get_log_from_hist(
598 		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
599 
600 	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
601 
602 	if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
603 	    && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
604 
605 		/* We can free the whole log segment */
606 
607 		mutex_exit(&(rseg->mutex));
608 		mtr_commit(&mtr);
609 
610 		/* calls the trx_purge_remove_log_hdr()
611 		inside trx_purge_free_segment(). */
612 		trx_purge_free_segment(rseg, hdr_addr, noredo);
613 
614 	} else {
615 		/* Remove the log hdr from the rseg history. */
616 		trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
617 
618 		mutex_exit(&(rseg->mutex));
619 		mtr_commit(&mtr);
620 	}
621 
622 	mtr_start(&mtr);
623 	if (noredo) {
624 		mtr.set_log_mode(MTR_LOG_NO_REDO);
625 	}
626 	mutex_enter(&(rseg->mutex));
627 
628 	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
629 				 rseg->page_size, &mtr);
630 
631 	hdr_addr = prev_hdr_addr;
632 
633 	goto loop;
634 }
635 
636 /** UNDO log truncate logger. Needed to track state of truncate during crash.
637 An auxiliary redo log file undo_<space_id>_trunc.log will created while the
638 truncate of the UNDO is in progress. This file is required during recovery
639 to complete the truncate. */
640 
641 namespace undo {
642 
643 	/** Populate log file name based on space_id
644 	@param[in]	space_id	id of the undo tablespace.
645 	@return DB_SUCCESS or error code */
populate_log_file_name(ulint space_id,char * & log_file_name)646 	dberr_t populate_log_file_name(
647 		ulint	space_id,
648 		char*&	log_file_name)
649 	{
650 		ulint log_file_name_sz =
651 			strlen(srv_log_group_home_dir) + 22 + 1 /* NUL */
652 			+ strlen(undo::s_log_prefix)
653 			+ strlen(undo::s_log_ext);
654 
655 		log_file_name = new (std::nothrow) char[log_file_name_sz];
656 		if (log_file_name == 0) {
657 			return(DB_OUT_OF_MEMORY);
658 		}
659 
660 		memset(log_file_name, 0, log_file_name_sz);
661 
662 		strcpy(log_file_name, srv_log_group_home_dir);
663 		ulint	log_file_name_len = strlen(log_file_name);
664 
665 		if (log_file_name[log_file_name_len - 1]
666 				!= OS_PATH_SEPARATOR) {
667 
668 			log_file_name[log_file_name_len]
669 				= OS_PATH_SEPARATOR;
670 			log_file_name_len = strlen(log_file_name);
671 		}
672 
673 		ut_snprintf(log_file_name + log_file_name_len,
674 			    log_file_name_sz - log_file_name_len,
675 			    "%s%lu_%s", undo::s_log_prefix,
676 			    (ulong) space_id, s_log_ext);
677 
678 		return(DB_SUCCESS);
679 	}
680 
681 	/** Create the truncate log file.
682 	@param[in]	space_id	id of the undo tablespace to truncate.
683 	@return DB_SUCCESS or error code. */
init(ulint space_id)684 	dberr_t init(ulint space_id)
685 	{
686 		dberr_t		err;
687 		char*		log_file_name;
688 
689 		/* Step-1: Create the log file name using the pre-decided
690 		prefix/suffix and table id of undo tablepsace to truncate. */
691 		err = populate_log_file_name(space_id, log_file_name);
692 		if (err != DB_SUCCESS) {
693 			return(err);
694 		}
695 
696 		/* Step-2: Create the log file, open it and write 0 to
697 		indicate init phase. */
698 		bool            ret;
699 		pfs_os_file_t	handle = os_file_create(
700 			innodb_log_file_key, log_file_name, OS_FILE_CREATE,
701 			OS_FILE_NORMAL, OS_LOG_FILE, srv_read_only_mode, &ret);
702 		if (!ret) {
703 			delete[] log_file_name;
704 			return(DB_IO_ERROR);
705 		}
706 
707 		ulint	sz = UNIV_PAGE_SIZE;
708 		void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
709 		if (buf == NULL) {
710 			os_file_close(handle);
711 			delete[] log_file_name;
712 			return(DB_OUT_OF_MEMORY);
713 		}
714 
715 		byte*	log_buf = static_cast<byte*>(
716 			ut_align(buf, UNIV_PAGE_SIZE));
717 
718 		IORequest	request(IORequest::WRITE);
719 
720 		request.disable_compression();
721 
722 		err = os_file_write(
723 			request, log_file_name, handle, log_buf, 0, sz);
724 
725 		os_file_flush(handle);
726 		os_file_close(handle);
727 
728 		ut_free(buf);
729 		delete[] log_file_name;
730 
731 		return(err);
732 	}
733 
734 	/** Mark completion of undo truncate action by writing magic number to
735 	the log file and then removing it from the disk.
736 	If we are going to remove it from disk then why write magic number ?
737 	This is to safeguard from unlink (file-system) anomalies that will keep
738 	the link to the file even after unlink action is successfull and
739 	ref-count = 0.
740 	@param[in]	space_id	id of the undo tablespace to truncate.*/
done(ulint space_id)741 	void done(
742 		ulint	space_id)
743 	{
744 		dberr_t		err;
745 		char*		log_file_name;
746 
747 		/* Step-1: Create the log file name using the pre-decided
748 		prefix/suffix and table id of undo tablepsace to truncate. */
749 		err = populate_log_file_name(space_id, log_file_name);
750 		if (err != DB_SUCCESS) {
751 			return;
752 		}
753 
754 		/* Step-2: Open log file and write magic number to
755 		indicate done phase. */
756 		bool    ret;
757 		pfs_os_file_t	handle =
758 			os_file_create_simple_no_error_handling(
759 				innodb_log_file_key, log_file_name,
760 				OS_FILE_OPEN, OS_FILE_READ_WRITE,
761 				srv_read_only_mode, &ret);
762 
763 		if (!ret) {
764 			os_file_delete(innodb_log_file_key, log_file_name);
765 			delete[] log_file_name;
766 			return;
767 		}
768 
769 		ulint	sz = UNIV_PAGE_SIZE;
770 		void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
771 		if (buf == NULL) {
772 			os_file_close(handle);
773 			os_file_delete(innodb_log_file_key, log_file_name);
774 			delete[] log_file_name;
775 			return;
776 		}
777 
778 		byte*	log_buf = static_cast<byte*>(
779 			ut_align(buf, UNIV_PAGE_SIZE));
780 
781 		mach_write_to_4(log_buf, undo::s_magic);
782 
783 		IORequest	request(IORequest::WRITE);
784 
785 		request.disable_compression();
786 
787 		err = os_file_write(
788 			request, log_file_name, handle, log_buf, 0, sz);
789 
790 		ut_ad(err == DB_SUCCESS);
791 
792 		os_file_flush(handle);
793 		os_file_close(handle);
794 
795 		ut_free(buf);
796 		os_file_delete(innodb_log_file_key, log_file_name);
797 		delete[] log_file_name;
798 	}
799 
800 	/** Check if TRUNCATE_DDL_LOG file exist.
801 	@param[in]	space_id	id of the undo tablespace.
802 	@return true if exist else false. */
is_log_present(ulint space_id)803 	bool is_log_present(
804 		ulint	space_id)
805 	{
806 		dberr_t		err;
807 		char*		log_file_name;
808 
809 		/* Step-1: Populate log file name. */
810 		err = populate_log_file_name(space_id, log_file_name);
811 		if (err != DB_SUCCESS) {
812 			return(false);
813 		}
814 
815 		/* Step-2: Check for existence of the file. */
816 		bool		exist;
817 		os_file_type_t	type;
818 		os_file_status(log_file_name, &exist, &type);
819 
820 		/* Step-3: If file exists, check it for presence of magic
821 		number.  If found, then delete the file and report file
822 		doesn't exist as presence of magic number suggest that
823 		truncate action was complete. */
824 
825 		if (exist) {
826 			bool    ret;
827 			pfs_os_file_t	handle =
828 				os_file_create_simple_no_error_handling(
829 					innodb_log_file_key, log_file_name,
830 					OS_FILE_OPEN, OS_FILE_READ_WRITE,
831 					srv_read_only_mode, &ret);
832 			if (!ret) {
833 				os_file_delete(innodb_log_file_key,
834 					       log_file_name);
835 				delete[] log_file_name;
836 				return(false);
837 			}
838 
839 			ulint	sz = UNIV_PAGE_SIZE;
840 			void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
841 			if (buf == NULL) {
842 				os_file_close(handle);
843 				os_file_delete(innodb_log_file_key,
844 					       log_file_name);
845 				delete[] log_file_name;
846 				return(false);
847 			}
848 
849 			byte*	log_buf = static_cast<byte*>(
850 				ut_align(buf, UNIV_PAGE_SIZE));
851 
852 			IORequest	request(IORequest::READ);
853 
854 			request.disable_compression();
855 
856 			dberr_t	err;
857 
858 			err = os_file_read(request, handle, log_buf, 0, sz);
859 
860 			os_file_close(handle);
861 
862 			if (err != DB_SUCCESS) {
863 
864 				ib::info()
865 					<< "Unable to read '"
866 					<< log_file_name << "' : "
867 					<< ut_strerr(err);
868 
869 				os_file_delete(
870 					innodb_log_file_key, log_file_name);
871 
872 				ut_free(buf);
873 
874 				delete[] log_file_name;
875 
876 				return(false);
877 			}
878 
879 			ulint	magic_no = mach_read_from_4(log_buf);
880 
881 			ut_free(buf);
882 
883 			if (magic_no == undo::s_magic) {
884 				/* Found magic number. */
885 				os_file_delete(innodb_log_file_key,
886 					       log_file_name);
887 				delete[] log_file_name;
888 				return(false);
889 			}
890 		}
891 
892 		delete[] log_file_name;
893 
894 		return(exist);
895 	}
896 };
897 
898 /** Iterate over all the UNDO tablespaces and check if any of the UNDO
899 tablespace qualifies for TRUNCATE (size > threshold).
900 @param[in,out]	undo_trunc	undo truncate tracker */
901 static
902 void
trx_purge_mark_undo_for_truncate(undo::Truncate * undo_trunc)903 trx_purge_mark_undo_for_truncate(
904 	undo::Truncate*	undo_trunc)
905 {
906 	/* Step-1: If UNDO Tablespace
907 		- already marked for truncate (OR)
908 		- truncate disabled
909 	return immediately else search for qualifying tablespace. */
910 	if (undo_trunc->is_marked() || !srv_undo_log_truncate) {
911 		return;
912 	}
913 
914 	/* Step-2: Validation/Qualification checks
915 	a. At-least 2 UNDO tablespaces so even if one UNDO tablespace
916 	   is being truncated server can continue to operate.
917 	b. At-least 2 UNDO redo rseg/undo logs (besides the default rseg-0)
918 	b. At-least 1 UNDO tablespace size > threshold. */
919 	if (srv_undo_tablespaces_active < 2
920 	    || (srv_rollback_segments < (1 + srv_tmp_undo_logs + 2))) {
921 		return;
922 	}
923 
924 	/* Avoid bias selection and so start the scan from immediate next
925 	of last selected UNDO tablespace for truncate. */
926 	ulint space_id = undo_trunc->get_scan_start();
927 
928 	for (ulint i = 1; i <= srv_undo_tablespaces_active; i++) {
929 
930 		ut_ad(srv_undo_space_id_start != 0);
931 
932 		if (fil_space_get_size(space_id)
933 		    > (srv_max_undo_log_size / srv_page_size)) {
934 			/* Tablespace qualifies for truncate. */
935 			undo_trunc->mark(space_id);
936 			undo::Truncate::add_space_to_trunc_list(space_id);
937 			break;
938 		}
939 
940 		space_id++;
941 
942 		if (space_id >= (srv_undo_space_id_start
943 				 + srv_undo_tablespaces_active)) {
944 			/* Note: UNDO tablespace ids starts from 1. */
945 			space_id = srv_undo_space_id_start;
946 		}
947 
948 		if (undo_trunc->is_marked()) {
949 			break;
950 		}
951 	}
952 
953 	undo_trunc->set_scan_start(space_id);
954 
955 	/* Couldn't make any selection. */
956 	if (!undo_trunc->is_marked()) {
957 		return;
958 	}
959 
960 #ifdef UNIV_DEBUG
961 	ib::info() << "UNDO tablespace with space identifier "
962 		<< undo_trunc->get_marked_space_id() << " marked for truncate";
963 #endif /* UNIV_DEBUG */
964 
965 	/* Step-3: Iterate over all the rsegs of selected UNDO tablespace
966 	and mark them temporarily unavailable for allocation.*/
967 	for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
968 		trx_rseg_t*	rseg = trx_sys->rseg_array[i];
969 
970 		if (rseg != NULL && !trx_sys_is_noredo_rseg_slot(rseg->id)) {
971 			if (rseg->space
972 				== undo_trunc->get_marked_space_id()) {
973 
974 				/* Once set this rseg will not be allocated
975 				to new booting transaction but we will wait
976 				for existing active transaction to finish. */
977 				rseg->skip_allocation = true;
978 				undo_trunc->add_rseg_to_trunc(rseg);
979 			}
980 		}
981 	}
982 }
983 
984 undo::undo_spaces_t	undo::Truncate::s_spaces_to_truncate;
985 
986 /** Cleanse purge queue to remove the rseg that reside in undo-tablespace
987 marked for truncate.
988 @param[in,out]	undo_trunc	undo truncate tracker */
989 static
990 void
trx_purge_cleanse_purge_queue(undo::Truncate * undo_trunc)991 trx_purge_cleanse_purge_queue(
992 	undo::Truncate*	undo_trunc)
993 {
994 	mutex_enter(&purge_sys->pq_mutex);
995 	typedef	std::vector<TrxUndoRsegs>	purge_elem_list_t;
996 	purge_elem_list_t			purge_elem_list;
997 
998 	/* Remove rseg instances that are in the purge queue before we start
999 	truncate of corresponding UNDO truncate. */
1000 	while (!purge_sys->purge_queue->empty()) {
1001 		purge_elem_list.push_back(purge_sys->purge_queue->top());
1002 		purge_sys->purge_queue->pop();
1003 	}
1004 	ut_ad(purge_sys->purge_queue->empty());
1005 
1006 	for (purge_elem_list_t::iterator it = purge_elem_list.begin();
1007 	     it != purge_elem_list.end();
1008 	     ++it) {
1009 
1010 		for (TrxUndoRsegs::iterator it2 = it->begin();
1011 		     it2 != it->end();
1012 		     ++it2) {
1013 
1014 			if ((*it2)->space
1015 				== undo_trunc->get_marked_space_id()) {
1016 				it->erase(it2);
1017 				break;
1018 			}
1019 		}
1020 
1021 		const ulint	size = it->size();
1022 		if (size != 0) {
1023 			/* size != 0 suggest that there exist other rsegs that
1024 			needs processing so add this element to purge queue.
1025 			Note: Other rseg could be non-redo rsegs. */
1026 			purge_sys->purge_queue->push(*it);
1027 		}
1028 	}
1029 	mutex_exit(&purge_sys->pq_mutex);
1030 }
1031 
1032 /** Iterate over selected UNDO tablespace and check if all the rsegs
1033 that resides in the tablespace are free.
1034 @param[in]	limit		truncate_limit
1035 @param[in,out]	undo_trunc	undo truncate tracker */
1036 static
1037 void
trx_purge_initiate_truncate(purge_iter_t * limit,undo::Truncate * undo_trunc)1038 trx_purge_initiate_truncate(
1039 	purge_iter_t*	limit,
1040 	undo::Truncate*	undo_trunc)
1041 {
1042 	/* Step-1: Early check to findout if any of the the UNDO tablespace
1043 	is marked for truncate. */
1044 	if (!undo_trunc->is_marked()) {
1045 		/* No tablespace marked for truncate yet. */
1046 		return;
1047 	}
1048 
1049 	/* Step-2: Scan over each rseg and ensure that it doesn't hold any
1050 	active undo records. */
1051 	bool all_free = true;
1052 
1053 	for (ulint i = 0; i < undo_trunc->rsegs_size() && all_free; ++i) {
1054 
1055 		trx_rseg_t*	rseg = undo_trunc->get_ith_rseg(i);
1056 
1057 		mutex_enter(&rseg->mutex);
1058 
1059 		if (rseg->trx_ref_count > 0) {
1060 			/* This rseg is still being held by an active
1061 			transaction. */
1062 			all_free = false;
1063 			mutex_exit(&rseg->mutex);
1064 			continue;
1065 		}
1066 
1067 		ut_ad(rseg->trx_ref_count == 0);
1068 		ut_ad(rseg->skip_allocation);
1069 
1070 		ulint	size_of_rsegs = rseg->curr_size;
1071 
1072 		if (size_of_rsegs == 1) {
1073 			mutex_exit(&rseg->mutex);
1074 			continue;
1075 		} else {
1076 
1077 			/* There could be cached undo segment. Check if records
1078 			in these segments can be purged. Normal purge history
1079 			will not touch these cached segment. */
1080 			ulint		cached_undo_size = 0;
1081 
1082 			for (trx_undo_t* undo =
1083 				UT_LIST_GET_FIRST(rseg->update_undo_cached);
1084 			     undo != NULL && all_free;
1085 			     undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1086 
1087 				if (limit->trx_no < undo->trx_id) {
1088 					all_free = false;
1089 				} else {
1090 					cached_undo_size += undo->size;
1091 				}
1092 			}
1093 
1094 			for (trx_undo_t* undo =
1095 				UT_LIST_GET_FIRST(rseg->insert_undo_cached);
1096 			     undo != NULL && all_free;
1097 			     undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1098 
1099 				if (limit->trx_no < undo->trx_id) {
1100 					all_free = false;
1101 				} else {
1102 					cached_undo_size += undo->size;
1103 				}
1104 			}
1105 
1106 			ut_ad(size_of_rsegs >= (cached_undo_size + 1));
1107 
1108 			if (size_of_rsegs > (cached_undo_size + 1)) {
1109 				/* There are pages besides cached pages that
1110 				still hold active data. */
1111 				all_free = false;
1112 			}
1113 		}
1114 
1115 		mutex_exit(&rseg->mutex);
1116 	}
1117 
1118 	if (!all_free) {
1119 		/* rseg still holds active data.*/
1120 		return;
1121 	}
1122 
1123 
1124 	/* Step-3: Start the actual truncate.
1125 	a. log-checkpoint
1126 	b. Write the DDL log to protect truncate action from CRASH
1127 	c. Remove rseg instance if added to purge queue before we
1128 	   initiate truncate.
1129 	d. Execute actual truncate
1130 	e. Remove the DDL log. */
1131 	DBUG_EXECUTE_IF("ib_undo_trunc_before_checkpoint",
1132 			ib::info() << "ib_undo_trunc_before_checkpoint";
1133 			DBUG_SUICIDE(););
1134 
1135 	/* After truncate if server crashes then redo logging done for this
1136 	undo tablespace might not stand valid as tablespace has been
1137 	truncated. */
1138 	log_make_checkpoint_at(LSN_MAX, TRUE);
1139 
1140 	ib::info() << "Truncating UNDO tablespace with space identifier "
1141 		<< undo_trunc->get_marked_space_id();
1142 
1143 	DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_start",
1144 			ib::info() << "ib_undo_trunc_before_ddl_log_start";
1145 			DBUG_SUICIDE(););
1146 
1147 #ifdef UNIV_DEBUG
1148 	dberr_t	err =
1149 #endif /* UNIV_DEBUG */
1150 		undo_trunc->start_logging(
1151 			undo_trunc->get_marked_space_id());
1152 	ut_ad(err == DB_SUCCESS);
1153 
1154 	DBUG_EXECUTE_IF("ib_undo_trunc_before_truncate",
1155 			ib::info() << "ib_undo_trunc_before_truncate";
1156 			DBUG_SUICIDE(););
1157 
1158 	trx_purge_cleanse_purge_queue(undo_trunc);
1159 
1160 	bool	success = trx_undo_truncate_tablespace(undo_trunc);
1161 	if (!success) {
1162 		/* Note: In case of error we don't enable the rsegs
1163 		and neither unmark the tablespace so the tablespace
1164 		continue to remain inactive. */
1165 		ib::error() << "Failed to truncate UNDO tablespace with"
1166 			" space identifier "
1167 			<< undo_trunc->get_marked_space_id();
1168 		return;
1169 	}
1170 
1171 	if (purge_sys->rseg != NULL
1172 	    && purge_sys->rseg->last_page_no == FIL_NULL) {
1173 		/* If purge_sys->rseg is pointing to rseg that was recently
1174 		truncated then move to next rseg element.
1175 		Note: Ideally purge_sys->rseg should be NULL because purge
1176 		should complete processing of all the records but there is
1177 		purge_batch_size that can force the purge loop to exit before
1178 		all the records are purged and in this case purge_sys->rseg
1179 		could point to a valid rseg waiting for next purge cycle. */
1180 		purge_sys->next_stored = FALSE;
1181 		purge_sys->rseg = NULL;
1182 	}
1183 
1184 	DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_end",
1185 			ib::info() << "ib_undo_trunc_before_ddl_log_end";
1186 			DBUG_SUICIDE(););
1187 
1188 	log_make_checkpoint_at(LSN_MAX, TRUE);
1189 
1190 	undo_trunc->done_logging(undo_trunc->get_marked_space_id());
1191 
1192 	/* Completed truncate. Now it is safe to re-use the tablespace. */
1193 	for (ulint i = 0; i < undo_trunc->rsegs_size(); ++i) {
1194 		trx_rseg_t*	rseg = undo_trunc->get_ith_rseg(i);
1195 		rseg->skip_allocation = false;
1196 	}
1197 
1198 	ib::info() << "Completed truncate of UNDO tablespace with space"
1199 		" identifier " << undo_trunc->get_marked_space_id();
1200 
1201 	undo_trunc->reset();
1202 	undo::Truncate::clear_trunc_list();
1203 
1204 	DBUG_EXECUTE_IF("ib_undo_trunc_trunc_done",
1205 			ib::info() << "ib_undo_trunc_trunc_done";
1206 			DBUG_SUICIDE(););
1207 }
1208 
1209 /********************************************************************//**
1210 Removes unnecessary history data from rollback segments. NOTE that when this
1211 function is called, the caller must not have any latches on undo log pages! */
1212 static
1213 void
trx_purge_truncate_history(purge_iter_t * limit,const ReadView * view)1214 trx_purge_truncate_history(
1215 /*========================*/
1216 	purge_iter_t*		limit,		/*!< in: truncate limit */
1217 	const ReadView*		view)		/*!< in: purge view */
1218 {
1219 	ulint		i;
1220 
1221 	/* We play safe and set the truncate limit at most to the purge view
1222 	low_limit number, though this is not necessary */
1223 
1224 	if (limit->trx_no >= view->low_limit_no()) {
1225 		limit->trx_no = view->low_limit_no();
1226 		limit->undo_no = 0;
1227 		limit->undo_rseg_space = ULINT_UNDEFINED;
1228 	}
1229 
1230 	ut_ad(limit->trx_no <= purge_sys->view.low_limit_no());
1231 
1232 	for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1233 		trx_rseg_t*	rseg = trx_sys->rseg_array[i];
1234 
1235 		if (rseg != NULL) {
1236 			ut_a(rseg->id == i);
1237 			trx_purge_truncate_rseg_history(rseg, limit);
1238 		}
1239 	}
1240 
1241 	for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1242 		trx_rseg_t*	rseg = trx_sys->pending_purge_rseg_array[i];
1243 
1244 		if (rseg != NULL) {
1245 			ut_a(rseg->id == i);
1246 			trx_purge_truncate_rseg_history(rseg, limit);
1247 		}
1248 	}
1249 
1250 	/* UNDO tablespace truncate. We will try to truncate as much as we
1251 	can (greedy approach). This will ensure when the server is idle we
1252 	try and truncate all the UNDO tablespaces. */
1253 	ulint	nchances = srv_undo_tablespaces_active;
1254 	for (i = 0; i < nchances; i++) {
1255 		trx_purge_mark_undo_for_truncate(&purge_sys->undo_trunc);
1256 		trx_purge_initiate_truncate(limit, &purge_sys->undo_trunc);
1257 	}
1258 }
1259 
1260 /***********************************************************************//**
1261 Updates the last not yet purged history log info in rseg when we have purged
1262 a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
1263 static
1264 void
trx_purge_rseg_get_next_history_log(trx_rseg_t * rseg,ulint * n_pages_handled)1265 trx_purge_rseg_get_next_history_log(
1266 /*================================*/
1267 	trx_rseg_t*	rseg,		/*!< in: rollback segment */
1268 	ulint*		n_pages_handled)/*!< in/out: number of UNDO pages
1269 					handled */
1270 {
1271 	page_t*		undo_page;
1272 	trx_ulogf_t*	log_hdr;
1273 	fil_addr_t	prev_log_addr;
1274 	trx_id_t	trx_no;
1275 	ibool		del_marks;
1276 	mtr_t		mtr;
1277 
1278 	mutex_enter(&(rseg->mutex));
1279 
1280 	ut_a(rseg->last_page_no != FIL_NULL);
1281 
1282 	purge_sys->iter.trx_no = rseg->last_trx_no + 1;
1283 	purge_sys->iter.undo_no = 0;
1284 	purge_sys->iter.undo_rseg_space = ULINT_UNDEFINED;
1285 	purge_sys->next_stored = FALSE;
1286 
1287 	mtr_start(&mtr);
1288 
1289 	undo_page = trx_undo_page_get_s_latched(
1290 		page_id_t(rseg->space, rseg->last_page_no),
1291 		rseg->page_size, &mtr);
1292 
1293 	log_hdr = undo_page + rseg->last_offset;
1294 
1295 	/* Increase the purge page count by one for every handled log */
1296 
1297 	(*n_pages_handled)++;
1298 
1299 	prev_log_addr = trx_purge_get_log_from_hist(
1300 		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
1301 
1302 	if (prev_log_addr.page == FIL_NULL) {
1303 		/* No logs left in the history list */
1304 
1305 		rseg->last_page_no = FIL_NULL;
1306 
1307 		mutex_exit(&(rseg->mutex));
1308 		mtr_commit(&mtr);
1309 
1310 #ifdef UNIV_DEBUG
1311 		trx_sys_mutex_enter();
1312 
1313 		/* Add debug code to track history list corruption reported
1314 		on the MySQL mailing list on Nov 9, 2004. The fut0lst.cc
1315 		file-based list was corrupt. The prev node pointer was
1316 		FIL_NULL, even though the list length was over 8 million nodes!
1317 		We assume that purge truncates the history list in large
1318 		size pieces, and if we here reach the head of the list, the
1319 		list cannot be longer than 2000 000 undo logs now. */
1320 
1321 		if (trx_sys->rseg_history_len > 2000000) {
1322 			ib::warn() << "Purge reached the head of the history"
1323 				" list, but its length is still reported as "
1324 				<< trx_sys->rseg_history_len << " which is"
1325 				" unusually high.";
1326 			ib::info() << "This can happen for multiple reasons";
1327 			ib::info() << "1. A long running transaction is"
1328 				" withholding purging of undo logs or a read"
1329 				" view is open. Please try to commit the long"
1330 				" running transaction.";
1331 			ib::info() << "2. Try increasing the number of purge"
1332 				" threads to expedite purging of undo logs.";
1333 		}
1334 
1335 		trx_sys_mutex_exit();
1336 #endif
1337 		return;
1338 	}
1339 
1340 	mutex_exit(&rseg->mutex);
1341 
1342 	mtr_commit(&mtr);
1343 
1344 	/* Read the trx number and del marks from the previous log header */
1345 	mtr_start(&mtr);
1346 
1347 	log_hdr = trx_undo_page_get_s_latched(page_id_t(rseg->space,
1348 							prev_log_addr.page),
1349 					      rseg->page_size, &mtr)
1350 		+ prev_log_addr.boffset;
1351 
1352 	trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
1353 
1354 	del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);
1355 
1356 	mtr_commit(&mtr);
1357 
1358 	mutex_enter(&(rseg->mutex));
1359 
1360 	rseg->last_page_no = prev_log_addr.page;
1361 	rseg->last_offset = prev_log_addr.boffset;
1362 	rseg->last_trx_no = trx_no;
1363 	rseg->last_del_marks = del_marks;
1364 
1365 	TrxUndoRsegs elem(rseg->last_trx_no);
1366 	elem.push_back(rseg);
1367 
1368 	/* Purge can also produce events, however these are already ordered
1369 	in the rollback segment and any user generated event will be greater
1370 	than the events that Purge produces. ie. Purge can never produce
1371 	events from an empty rollback segment. */
1372 
1373 	mutex_enter(&purge_sys->pq_mutex);
1374 
1375 	purge_sys->purge_queue->push(elem);
1376 
1377 	mutex_exit(&purge_sys->pq_mutex);
1378 
1379 	mutex_exit(&rseg->mutex);
1380 }
1381 
1382 /** Position the purge sys "iterator" on the undo record to use for purging.
1383 @param[in,out]	purge_sys	purge instance
1384 @param[in]	page_size	page size */
1385 static
1386 void
trx_purge_read_undo_rec(trx_purge_t * purge_sys,const page_size_t & page_size)1387 trx_purge_read_undo_rec(
1388 	trx_purge_t*		purge_sys,
1389 	const page_size_t&	page_size)
1390 {
1391 	ulint		offset;
1392 	ulint		page_no;
1393 	ib_uint64_t	undo_no;
1394 	ulint		undo_rseg_space;
1395 
1396 	purge_sys->hdr_offset = purge_sys->rseg->last_offset;
1397 	page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
1398 
1399 	if (purge_sys->rseg->last_del_marks) {
1400 		mtr_t		mtr;
1401 		trx_undo_rec_t*	undo_rec = NULL;
1402 
1403 		mtr_start(&mtr);
1404 
1405 		undo_rec = trx_undo_get_first_rec(
1406 			purge_sys->rseg->space,
1407 			page_size,
1408 			purge_sys->hdr_page_no,
1409 			purge_sys->hdr_offset, RW_S_LATCH, &mtr);
1410 
1411 		if (undo_rec != NULL) {
1412 			offset = page_offset(undo_rec);
1413 			undo_no = trx_undo_rec_get_undo_no(undo_rec);
1414 			undo_rseg_space = purge_sys->rseg->space;
1415 			page_no = page_get_page_no(page_align(undo_rec));
1416 		} else {
1417 			offset = 0;
1418 			undo_no = 0;
1419 			undo_rseg_space = ULINT_UNDEFINED;
1420 		}
1421 
1422 		mtr_commit(&mtr);
1423 	} else {
1424 		offset = 0;
1425 		undo_no = 0;
1426 		undo_rseg_space = ULINT_UNDEFINED;
1427 	}
1428 
1429 	purge_sys->offset = offset;
1430 	purge_sys->page_no = page_no;
1431 	purge_sys->iter.undo_no = undo_no;
1432 	purge_sys->iter.undo_rseg_space = undo_rseg_space;
1433 
1434 	purge_sys->next_stored = TRUE;
1435 }
1436 
1437 /***********************************************************************//**
1438 Chooses the next undo log to purge and updates the info in purge_sys. This
1439 function is used to initialize purge_sys when the next record to purge is
1440 not known, and also to update the purge system info on the next record when
1441 purge has handled the whole undo log for a transaction. */
1442 static
1443 void
trx_purge_choose_next_log(void)1444 trx_purge_choose_next_log(void)
1445 /*===========================*/
1446 {
1447 	ut_ad(purge_sys->next_stored == FALSE);
1448 
1449 	const page_size_t&	page_size = purge_sys->rseg_iter->set_next();
1450 
1451 	if (purge_sys->rseg != NULL) {
1452 		trx_purge_read_undo_rec(purge_sys, page_size);
1453 	} else {
1454 		/* There is nothing to do yet. */
1455 		os_thread_yield();
1456 	}
1457 }
1458 
1459 /***********************************************************************//**
1460 Gets the next record to purge and updates the info in the purge system.
1461 @return copy of an undo log record or pointer to the dummy undo log record */
1462 static
1463 trx_undo_rec_t*
trx_purge_get_next_rec(ulint * n_pages_handled,mem_heap_t * heap)1464 trx_purge_get_next_rec(
1465 /*===================*/
1466 	ulint*		n_pages_handled,/*!< in/out: number of UNDO pages
1467 					handled */
1468 	mem_heap_t*	heap)		/*!< in: memory heap where copied */
1469 {
1470 	trx_undo_rec_t*	rec;
1471 	trx_undo_rec_t*	rec_copy;
1472 	trx_undo_rec_t*	rec2;
1473 	page_t*		undo_page;
1474 	page_t*		page;
1475 	ulint		offset;
1476 	ulint		page_no;
1477 	ulint		space;
1478 	mtr_t		mtr;
1479 
1480 	ut_ad(purge_sys->next_stored);
1481 	ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no());
1482 
1483 	space = purge_sys->rseg->space;
1484 	page_no = purge_sys->page_no;
1485 	offset = purge_sys->offset;
1486 
1487 	const page_size_t	page_size(purge_sys->rseg->page_size);
1488 
1489 	if (offset == 0) {
1490 		/* It is the dummy undo log record, which means that there is
1491 		no need to purge this undo log */
1492 
1493 		trx_purge_rseg_get_next_history_log(
1494 			purge_sys->rseg, n_pages_handled);
1495 
1496 		/* Look for the next undo log and record to purge */
1497 
1498 		trx_purge_choose_next_log();
1499 
1500 		return(&trx_purge_dummy_rec);
1501 	}
1502 
1503 	mtr_start(&mtr);
1504 
1505 	undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no),
1506 						page_size, &mtr);
1507 
1508 	rec = undo_page + offset;
1509 
1510 	rec2 = rec;
1511 
1512 	for (;;) {
1513 		ulint		type;
1514 		trx_undo_rec_t*	next_rec;
1515 		ulint		cmpl_info;
1516 
1517 		/* Try first to find the next record which requires a purge
1518 		operation from the same page of the same undo log */
1519 
1520 		next_rec = trx_undo_page_get_next_rec(
1521 			rec2, purge_sys->hdr_page_no, purge_sys->hdr_offset);
1522 
1523 		if (next_rec == NULL) {
1524 			rec2 = trx_undo_get_next_rec(
1525 				rec2, purge_sys->hdr_page_no,
1526 				purge_sys->hdr_offset, &mtr);
1527 			break;
1528 		}
1529 
1530 		rec2 = next_rec;
1531 
1532 		type = trx_undo_rec_get_type(rec2);
1533 
1534 		if (type == TRX_UNDO_DEL_MARK_REC) {
1535 
1536 			break;
1537 		}
1538 
1539 		cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
1540 
1541 		if (trx_undo_rec_get_extern_storage(rec2)) {
1542 			break;
1543 		}
1544 
1545 		if ((type == TRX_UNDO_UPD_EXIST_REC)
1546 		    && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1547 			break;
1548 		}
1549 	}
1550 
1551 	if (rec2 == NULL) {
1552 		mtr_commit(&mtr);
1553 
1554 		trx_purge_rseg_get_next_history_log(
1555 			purge_sys->rseg, n_pages_handled);
1556 
1557 		/* Look for the next undo log and record to purge */
1558 
1559 		trx_purge_choose_next_log();
1560 
1561 		mtr_start(&mtr);
1562 
1563 		undo_page = trx_undo_page_get_s_latched(
1564 			page_id_t(space, page_no), page_size, &mtr);
1565 
1566 	} else {
1567 		page = page_align(rec2);
1568 
1569 		purge_sys->offset = rec2 - page;
1570 		purge_sys->page_no = page_get_page_no(page);
1571 		purge_sys->iter.undo_no = trx_undo_rec_get_undo_no(rec2);
1572 		purge_sys->iter.undo_rseg_space = space;
1573 
1574 		if (undo_page != page) {
1575 			/* We advance to a new page of the undo log: */
1576 			(*n_pages_handled)++;
1577 		}
1578 	}
1579 
1580 	rec_copy = trx_undo_rec_copy(undo_page, offset, heap);
1581 	mtr_commit(&mtr);
1582 	return(rec_copy);
1583 }
1584 
1585 /********************************************************************//**
1586 Fetches the next undo log record from the history list to purge. It must be
1587 released with the corresponding release function.
1588 @return copy of an undo log record or pointer to trx_purge_dummy_rec,
1589 if the whole undo log can skipped in purge; NULL if none left */
1590 static MY_ATTRIBUTE((warn_unused_result))
1591 trx_undo_rec_t*
trx_purge_fetch_next_rec(roll_ptr_t * roll_ptr,ulint * n_pages_handled,mem_heap_t * heap)1592 trx_purge_fetch_next_rec(
1593 /*=====================*/
1594 	roll_ptr_t*	roll_ptr,	/*!< out: roll pointer to undo record */
1595 	ulint*		n_pages_handled,/*!< in/out: number of UNDO log pages
1596 					handled */
1597 	mem_heap_t*	heap)		/*!< in: memory heap where copied */
1598 {
1599 	if (!purge_sys->next_stored) {
1600 		trx_purge_choose_next_log();
1601 
1602 		if (!purge_sys->next_stored) {
1603 			DBUG_PRINT("ib_purge",
1604 				   ("no logs left in the history list"));
1605 			return(NULL);
1606 		}
1607 	}
1608 
1609 	if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) {
1610 
1611 		return(NULL);
1612 	}
1613 
1614 	/* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n",
1615 	os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */
1616 
1617 	*roll_ptr = trx_undo_build_roll_ptr(
1618 		FALSE, purge_sys->rseg->id,
1619 		purge_sys->page_no, purge_sys->offset);
1620 
1621 	/* The following call will advance the stored values of the
1622 	purge iterator. */
1623 
1624 	return(trx_purge_get_next_rec(n_pages_handled, heap));
1625 }
1626 
1627 /*******************************************************************//**
1628 This function runs a purge batch.
1629 @return number of undo log pages handled in the batch */
1630 static
1631 ulint
trx_purge_attach_undo_recs(ulint n_purge_threads,trx_purge_t * purge_sys,ulint batch_size)1632 trx_purge_attach_undo_recs(
1633 /*=======================*/
1634 	ulint		n_purge_threads,/*!< in: number of purge threads */
1635 	trx_purge_t*	purge_sys,	/*!< in/out: purge instance */
1636 	ulint		batch_size)	/*!< in: no. of pages to purge */
1637 {
1638 	que_thr_t*	thr;
1639 	ulint		i = 0;
1640 	ulint		n_pages_handled = 0;
1641 	ulint		n_thrs = UT_LIST_GET_LEN(purge_sys->query->thrs);
1642 
1643 	ut_a(n_purge_threads > 0);
1644 
1645 	purge_sys->limit = purge_sys->iter;
1646 
1647 	/* Debug code to validate some pre-requisites and reset done flag. */
1648 	for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1649 	     thr != NULL && i < n_purge_threads;
1650 	     thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
1651 
1652 		purge_node_t*		node;
1653 
1654 		/* Get the purge node. */
1655 		node = (purge_node_t*) thr->child;
1656 
1657 		ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1658 		ut_a(node->undo_recs == NULL);
1659 		ut_a(node->done);
1660 
1661 		node->done = FALSE;
1662 	}
1663 
1664 	/* There should never be fewer nodes than threads, the inverse
1665 	however is allowed because we only use purge threads as needed. */
1666 	ut_a(i == n_purge_threads);
1667 
1668 	/* Fetch and parse the UNDO records. The UNDO records are added
1669 	to a per purge node vector. */
1670 	thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1671 	ut_a(n_thrs > 0 && thr != NULL);
1672 
1673 	ut_ad(trx_purge_check_limit());
1674 
1675 	i = 0;
1676 
1677 	for (;;) {
1678 		purge_node_t*		node;
1679 		trx_purge_rec_t*	purge_rec;
1680 
1681 		ut_a(!thr->is_active);
1682 
1683 		/* Get the purge node. */
1684 		node = (purge_node_t*) thr->child;
1685 		ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1686 
1687 		purge_rec = static_cast<trx_purge_rec_t*>(
1688 			mem_heap_zalloc(node->heap, sizeof(*purge_rec)));
1689 
1690 		/* Track the max {trx_id, undo_no} for truncating the
1691 		UNDO logs once we have purged the records. */
1692 
1693 		if (trx_purge_check_limit()) {
1694 			purge_sys->limit = purge_sys->iter;
1695 		}
1696 
1697 		/* Fetch the next record, and advance the purge_sys->iter. */
1698 		purge_rec->undo_rec = trx_purge_fetch_next_rec(
1699 			&purge_rec->roll_ptr, &n_pages_handled, node->heap);
1700 
1701 		if (purge_rec->undo_rec != NULL) {
1702 
1703 			if (node->undo_recs == NULL) {
1704 				node->undo_recs = ib_vector_create(
1705 					ib_heap_allocator_create(node->heap),
1706 					sizeof(trx_purge_rec_t),
1707 					batch_size);
1708 			} else {
1709 				ut_a(!ib_vector_is_empty(node->undo_recs));
1710 			}
1711 
1712 			ib_vector_push(node->undo_recs, purge_rec);
1713 
1714 			if (n_pages_handled >= batch_size) {
1715 
1716 				break;
1717 			}
1718 		} else {
1719 			break;
1720 		}
1721 
1722 		thr = UT_LIST_GET_NEXT(thrs, thr);
1723 
1724 		if (!(++i % n_purge_threads)) {
1725 			thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1726 		}
1727 
1728 		ut_a(thr != NULL);
1729 	}
1730 
1731 	ut_ad(trx_purge_check_limit());
1732 
1733 	return(n_pages_handled);
1734 }
1735 
1736 /*******************************************************************//**
1737 Calculate the DML delay required.
1738 @return delay in microseconds or ULINT_MAX */
1739 static
1740 ulint
trx_purge_dml_delay(void)1741 trx_purge_dml_delay(void)
1742 /*=====================*/
1743 {
1744 	/* Determine how much data manipulation language (DML) statements
1745 	need to be delayed in order to reduce the lagging of the purge
1746 	thread. */
1747 	ulint	delay = 0; /* in microseconds; default: no delay */
1748 
1749 	/* If purge lag is set (ie. > 0) then calculate the new DML delay.
1750 	Note: we do a dirty read of the trx_sys_t data structure here,
1751 	without holding trx_sys->mutex. */
1752 
1753 	if (srv_max_purge_lag > 0
1754 	    && trx_sys->rseg_history_len
1755 	       > srv_n_purge_threads * srv_purge_batch_size) {
1756 		float	ratio;
1757 
1758 		ratio = float(trx_sys->rseg_history_len) / srv_max_purge_lag;
1759 
1760 		if (ratio > 1.0) {
1761 			/* If the history list length exceeds the
1762 			srv_max_purge_lag, the data manipulation
1763 			statements are delayed by at least 5000
1764 			microseconds. */
1765 			delay = (ulint) ((ratio - .5) * 10000);
1766 		}
1767 
1768 		if (delay > srv_max_purge_lag_delay) {
1769 			delay = srv_max_purge_lag_delay;
1770 		}
1771 
1772 		MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay);
1773 	}
1774 
1775 	return(delay);
1776 }
1777 
1778 /*******************************************************************//**
1779 Wait for pending purge jobs to complete. */
1780 static
1781 void
trx_purge_wait_for_workers_to_complete(trx_purge_t * purge_sys)1782 trx_purge_wait_for_workers_to_complete(
1783 /*===================================*/
1784 	trx_purge_t*	purge_sys)	/*!< in: purge instance */
1785 {
1786 	ulint		n_submitted = purge_sys->n_submitted;
1787 
1788 	/* Ensure that the work queue empties out. */
1789 	while (!os_compare_and_swap_ulint(
1790 			&purge_sys->n_completed, n_submitted, n_submitted)) {
1791 
1792 		if (srv_get_task_queue_length() > 0) {
1793 			srv_release_threads(SRV_WORKER, 1);
1794 		}
1795 
1796 		os_thread_yield();
1797 	}
1798 
1799 	/* None of the worker threads should be doing any work. */
1800 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1801 
1802 	/* There should be no outstanding tasks as long
1803 	as the worker threads are active. */
1804 	ut_a(srv_get_task_queue_length() == 0);
1805 }
1806 
1807 /******************************************************************//**
1808 Remove old historical changes from the rollback segments. */
1809 static
1810 void
trx_purge_truncate(void)1811 trx_purge_truncate(void)
1812 /*====================*/
1813 {
1814 	ut_ad(trx_purge_check_limit());
1815 
1816 	if (purge_sys->limit.trx_no == 0) {
1817 		trx_purge_truncate_history(&purge_sys->iter, &purge_sys->view);
1818 	} else {
1819 		trx_purge_truncate_history(&purge_sys->limit, &purge_sys->view);
1820 	}
1821 }
1822 
1823 /*******************************************************************//**
1824 This function runs a purge batch.
1825 @return number of undo log pages handled in the batch */
1826 ulint
trx_purge(ulint n_purge_threads,ulint batch_size,bool truncate)1827 trx_purge(
1828 /*======*/
1829 	ulint	n_purge_threads,	/*!< in: number of purge tasks
1830 					to submit to the work queue */
1831 	ulint	batch_size,		/*!< in: the maximum number of records
1832 					to purge in one batch */
1833 	bool	truncate)		/*!< in: truncate history if true */
1834 {
1835 	que_thr_t*	thr = NULL;
1836 	ulint		n_pages_handled;
1837 
1838 	ut_a(n_purge_threads > 0);
1839 
1840 	srv_dml_needed_delay = trx_purge_dml_delay();
1841 
1842 	/* The number of tasks submitted should be completed. */
1843 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1844 
1845 	rw_lock_x_lock(&purge_sys->latch);
1846 
1847 	purge_sys->view_active = false;
1848 
1849 	trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
1850 
1851 	purge_sys->view_active = true;
1852 
1853 	rw_lock_x_unlock(&purge_sys->latch);
1854 
1855 #ifdef UNIV_DEBUG
1856 	if (srv_purge_view_update_only_debug) {
1857 		return(0);
1858 	}
1859 #endif /* UNIV_DEBUG */
1860 
1861 	/* Fetch the UNDO recs that need to be purged. */
1862 	n_pages_handled = trx_purge_attach_undo_recs(
1863 		n_purge_threads, purge_sys, batch_size);
1864 
1865 	/* Do we do an asynchronous purge or not ? */
1866 	if (n_purge_threads > 1) {
1867 		ulint	i = 0;
1868 
1869 		/* Submit the tasks to the work queue. */
1870 		for (i = 0; i < n_purge_threads - 1; ++i) {
1871 			thr = que_fork_scheduler_round_robin(
1872 				purge_sys->query, thr);
1873 
1874 			ut_a(thr != NULL);
1875 
1876 			srv_que_task_enqueue_low(thr);
1877 		}
1878 
1879 		thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
1880 		ut_a(thr != NULL);
1881 
1882 		purge_sys->n_submitted += n_purge_threads - 1;
1883 
1884 		goto run_synchronously;
1885 
1886 	/* Do it synchronously. */
1887 	} else {
1888 		thr = que_fork_scheduler_round_robin(purge_sys->query, NULL);
1889 		ut_ad(thr);
1890 
1891 run_synchronously:
1892 		++purge_sys->n_submitted;
1893 
1894 		que_run_threads(thr);
1895 
1896 		os_atomic_inc_ulint(
1897 			&purge_sys->pq_mutex, &purge_sys->n_completed, 1);
1898 
1899 		if (n_purge_threads > 1) {
1900 			trx_purge_wait_for_workers_to_complete(purge_sys);
1901 		}
1902 	}
1903 
1904 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1905 
1906 #ifdef UNIV_DEBUG
1907 	rw_lock_x_lock(&purge_sys->latch);
1908 	if (purge_sys->limit.trx_no == 0) {
1909 		purge_sys->done = purge_sys->iter;
1910 	} else {
1911 		purge_sys->done = purge_sys->limit;
1912 	}
1913 	rw_lock_x_unlock(&purge_sys->latch);
1914 #endif /* UNIV_DEBUG */
1915 
1916 	if (truncate) {
1917 		trx_purge_truncate();
1918 	}
1919 
1920 	MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
1921 	MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled);
1922 
1923 	return(n_pages_handled);
1924 }
1925 
1926 /*******************************************************************//**
1927 Get the purge state.
1928 @return purge state. */
1929 purge_state_t
trx_purge_state(void)1930 trx_purge_state(void)
1931 /*=================*/
1932 {
1933 	purge_state_t	state;
1934 
1935 	rw_lock_x_lock(&purge_sys->latch);
1936 
1937 	state = purge_sys->state;
1938 
1939 	rw_lock_x_unlock(&purge_sys->latch);
1940 
1941 	return(state);
1942 }
1943 
1944 /*******************************************************************//**
1945 Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
1946 void
trx_purge_stop(void)1947 trx_purge_stop(void)
1948 /*================*/
1949 {
1950 	purge_state_t	state;
1951 	int64_t		sig_count = os_event_reset(purge_sys->event);
1952 
1953 	ut_a(srv_n_purge_threads > 0);
1954 
1955 	rw_lock_x_lock(&purge_sys->latch);
1956 
1957 	ut_a(purge_sys->state != PURGE_STATE_INIT);
1958 	ut_a(purge_sys->state != PURGE_STATE_EXIT);
1959 	ut_a(purge_sys->state != PURGE_STATE_DISABLED);
1960 
1961 	++purge_sys->n_stop;
1962 
1963 	state = purge_sys->state;
1964 
1965 	if (state == PURGE_STATE_RUN) {
1966 		ib::info() << "Stopping purge";
1967 
1968 		/* We need to wakeup the purge thread in case it is suspended,
1969 		so that it can acknowledge the state change. */
1970 
1971 		srv_purge_wakeup();
1972 	}
1973 
1974 	purge_sys->state = PURGE_STATE_STOP;
1975 
1976 	rw_lock_x_unlock(&purge_sys->latch);
1977 
1978 	if (state != PURGE_STATE_STOP) {
1979 
1980 		/* Wait for purge coordinator to signal that it
1981 		is suspended. */
1982 		os_event_wait_low(purge_sys->event, sig_count);
1983 	} else {
1984 		bool	once = true;
1985 
1986 		rw_lock_x_lock(&purge_sys->latch);
1987 
1988 		/* Wait for purge to signal that it has actually stopped. */
1989 		while (purge_sys->running) {
1990 
1991 			if (once) {
1992 				ib::info() << "Waiting for purge to stop";
1993 				once = false;
1994 			}
1995 
1996 			rw_lock_x_unlock(&purge_sys->latch);
1997 
1998 			os_thread_sleep(10000);
1999 
2000 			rw_lock_x_lock(&purge_sys->latch);
2001 		}
2002 
2003 		rw_lock_x_unlock(&purge_sys->latch);
2004 	}
2005 
2006 	MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
2007 }
2008 
2009 /*******************************************************************//**
2010 Resume purge, move to PURGE_STATE_RUN. */
2011 void
trx_purge_run(void)2012 trx_purge_run(void)
2013 /*===============*/
2014 {
2015 	rw_lock_x_lock(&purge_sys->latch);
2016 
2017 	switch (purge_sys->state) {
2018 	case PURGE_STATE_INIT:
2019 	case PURGE_STATE_EXIT:
2020 	case PURGE_STATE_DISABLED:
2021 		ut_error;
2022 
2023 	case PURGE_STATE_RUN:
2024 	case PURGE_STATE_STOP:
2025 		break;
2026 	}
2027 
2028 	if (purge_sys->n_stop > 0) {
2029 
2030 		ut_a(purge_sys->state == PURGE_STATE_STOP);
2031 
2032 		--purge_sys->n_stop;
2033 
2034 		if (purge_sys->n_stop == 0) {
2035 
2036 			ib::info() << "Resuming purge";
2037 
2038 			purge_sys->state = PURGE_STATE_RUN;
2039 		}
2040 
2041 		MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
2042 	} else {
2043 		ut_a(purge_sys->state == PURGE_STATE_RUN);
2044 	}
2045 
2046 	rw_lock_x_unlock(&purge_sys->latch);
2047 
2048 	srv_purge_wakeup();
2049 }
2050