1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file trx/trx0purge.cc
29 Purge old versions
30 
31 Created 3/26/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include "trx0purge.h"
37 
38 #ifdef UNIV_NONINL
39 #include "trx0purge.ic"
40 #endif
41 
42 #include "fsp0fsp.h"
43 #include "fut0fut.h"
44 #include "mach0data.h"
45 #include "mtr0log.h"
46 #include "os0thread.h"
47 #include "que0que.h"
48 #include "read0read.h"
49 #include "row0mysql.h"
50 #include "row0purge.h"
51 #include "row0upd.h"
52 #include "srv0mon.h"
53 #include "fsp0sysspace.h"
54 #include "srv0srv.h"
55 #include "srv0start.h"
56 #include "sync0sync.h"
57 #include "trx0rec.h"
58 #include "trx0roll.h"
59 #include "trx0rseg.h"
60 #include "trx0trx.h"
61 
62 /** Maximum allowable purge history length.  <=0 means 'infinite'. */
63 ulong		srv_max_purge_lag = 0;
64 
65 /** Max DML user threads delay in micro-seconds. */
66 ulong		srv_max_purge_lag_delay = 0;
67 
68 /** The global data structure coordinating a purge */
69 trx_purge_t*	purge_sys = NULL;
70 
71 #ifdef UNIV_DEBUG
72 my_bool		srv_purge_view_update_only_debug;
73 bool		trx_commit_disallowed = false;
74 #endif /* UNIV_DEBUG */
75 
76 /** Sentinel value */
77 const TrxUndoRsegs TrxUndoRsegsIterator::NullElement(UINT64_UNDEFINED);
78 
79 /** A sentinel undo record used as a return value when we have a whole
80 undo log which can be skipped by purge */
81 static trx_undo_rec_t	trx_purge_ignore_rec;
82 
83 /** Constructor */
TrxUndoRsegsIterator(trx_purge_t * purge_sys)84 TrxUndoRsegsIterator::TrxUndoRsegsIterator(trx_purge_t* purge_sys)
85 	:
86 	m_purge_sys(purge_sys),
87 	m_trx_undo_rsegs(NullElement),
88 	m_iter(m_trx_undo_rsegs.end())
89 {
90 }
91 
92 /** Sets the next rseg to purge in m_purge_sys.
93 @return page size of the table for which the log is.
94 NOTE: if rseg is NULL when this function returns this means that
95 there are no rollback segments to purge and then the returned page
96 size object should not be used. */
97 const page_size_t
set_next()98 TrxUndoRsegsIterator::set_next()
99 {
100 	mutex_enter(&m_purge_sys->pq_mutex);
101 
102 	/* Only purge consumes events from the priority queue, user
103 	threads only produce the events. */
104 
105 	/* Check if there are more rsegs to process in the
106 	current element. */
107 	if (m_iter != m_trx_undo_rsegs.end()) {
108 
109 		/* We are still processing rollback segment from
110 		the same transaction and so expected transaction
111 		number shouldn't increase. Undo increment of
112 		expected trx_no done by caller assuming rollback
113 		segments from given transaction are done. */
114 		m_purge_sys->iter.trx_no = (*m_iter)->last_trx_no;
115 
116 	} else if (!m_purge_sys->purge_queue->empty()) {
117 
118 		/* Read the next element from the queue.
119 		Combine elements if they have same transaction number.
120 		This can happen if a transaction shares redo rollback segment
121 		with another transaction that has already added it to purge
122 		queue and former transaction also needs to schedule non-redo
123 		rollback segment for purge. */
124 		m_trx_undo_rsegs = NullElement;
125 
126 		while (!m_purge_sys->purge_queue->empty()) {
127 
128 			if (m_trx_undo_rsegs.get_trx_no() == UINT64_UNDEFINED) {
129 				m_trx_undo_rsegs =
130 					purge_sys->purge_queue->top();
131 			} else if (purge_sys->purge_queue->top().get_trx_no() ==
132 					m_trx_undo_rsegs.get_trx_no()) {
133 				m_trx_undo_rsegs.append(
134 					purge_sys->purge_queue->top());
135 			} else {
136 				break;
137 			}
138 
139 			m_purge_sys->purge_queue->pop();
140 		}
141 
142 		m_iter = m_trx_undo_rsegs.begin();
143 
144 	} else {
145 		/* Queue is empty, reset iterator. */
146 		m_trx_undo_rsegs = NullElement;
147 		m_iter = m_trx_undo_rsegs.end();
148 
149 		mutex_exit(&m_purge_sys->pq_mutex);
150 
151 		m_purge_sys->rseg = NULL;
152 
153 		/* return a dummy object, not going to be used by the caller */
154 		return(univ_page_size);
155 	}
156 
157 	m_purge_sys->rseg = *m_iter++;
158 
159 	mutex_exit(&m_purge_sys->pq_mutex);
160 
161 	ut_a(m_purge_sys->rseg != NULL);
162 
163 	mutex_enter(&m_purge_sys->rseg->mutex);
164 
165 	ut_a(m_purge_sys->rseg->last_page_no != FIL_NULL);
166 	ut_ad(m_purge_sys->rseg->last_trx_no == m_trx_undo_rsegs.get_trx_no());
167 
168 	/* We assume in purge of externally stored fields that
169 	space id is in the range of UNDO tablespace space ids
170 	unless space is system tablespace */
171 	ut_a(srv_is_undo_tablespace(m_purge_sys->rseg->space)
172 	     || is_system_tablespace(
173 			m_purge_sys->rseg->space));
174 
175 	const page_size_t	page_size(m_purge_sys->rseg->page_size);
176 
177 	ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
178 
179 	m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no;
180 	m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset;
181 	m_purge_sys->hdr_page_no = m_purge_sys->rseg->last_page_no;
182 
183 	mutex_exit(&m_purge_sys->rseg->mutex);
184 
185 	return(page_size);
186 }
187 
188 /****************************************************************//**
189 Builds a purge 'query' graph. The actual purge is performed by executing
190 this query graph.
191 @return own: the query graph */
192 static
193 que_t*
trx_purge_graph_build(trx_t * trx,ulint n_purge_threads)194 trx_purge_graph_build(
195 /*==================*/
196 	trx_t*		trx,			/*!< in: transaction */
197 	ulint		n_purge_threads)	/*!< in: number of purge
198 						threads */
199 {
200 	ulint		i;
201 	mem_heap_t*	heap;
202 	que_fork_t*	fork;
203 
204 	heap = mem_heap_create(512);
205 	fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
206 	fork->trx = trx;
207 
208 	for (i = 0; i < n_purge_threads; ++i) {
209 		que_thr_t*	thr;
210 		row_prebuilt_t*	prebuilt;
211 
212 		prebuilt = static_cast<row_prebuilt_t*>(
213 			mem_heap_zalloc(heap, sizeof(*prebuilt)));
214 
215 		thr = que_thr_create(fork, heap, prebuilt);
216 
217 		thr->child = row_purge_node_create(thr, heap);
218 	}
219 
220 	return(fork);
221 }
222 
223 /********************************************************************//**
224 Creates the global purge system control structure and inits the history
225 mutex. */
226 void
trx_purge_sys_create(ulint n_purge_threads,purge_pq_t * purge_queue)227 trx_purge_sys_create(
228 /*=================*/
229 	ulint		n_purge_threads,	/*!< in: number of purge
230 						threads */
231 	purge_pq_t*	purge_queue)		/*!< in, own: UNDO log min
232 						binary heap */
233 {
234 	purge_sys = static_cast<trx_purge_t*>(
235 		ut_zalloc_nokey(sizeof(*purge_sys)));
236 
237 	purge_sys->state = PURGE_STATE_INIT;
238 	purge_sys->event = os_event_create(0);
239 
240 	new (&purge_sys->iter) purge_iter_t;
241 	new (&purge_sys->limit) purge_iter_t;
242 	new (&purge_sys->undo_trunc) undo::Truncate;
243 #ifdef UNIV_DEBUG
244 	new (&purge_sys->done) purge_iter_t;
245 #endif /* UNIV_DEBUG */
246 
247 	/* Take ownership of purge_queue, we are responsible for freeing it. */
248 	purge_sys->purge_queue = purge_queue;
249 
250 	rw_lock_create(trx_purge_latch_key,
251 		       &purge_sys->latch, SYNC_PURGE_LATCH);
252 
253 	mutex_create(LATCH_ID_PURGE_SYS_PQ, &purge_sys->pq_mutex);
254 
255 	ut_a(n_purge_threads > 0);
256 
257 	purge_sys->sess = sess_open();
258 
259 	purge_sys->trx = purge_sys->sess->trx;
260 
261 	ut_a(purge_sys->trx->sess == purge_sys->sess);
262 
263 	/* A purge transaction is not a real transaction, we use a transaction
264 	here only because the query threads code requires it. It is otherwise
265 	quite unnecessary. We should get rid of it eventually. */
266 	purge_sys->trx->id = 0;
267 	purge_sys->trx->start_time = ut_time_monotonic();
268 	purge_sys->trx->state = TRX_STATE_ACTIVE;
269 	purge_sys->trx->op_info = "purge trx";
270 
271 	purge_sys->query = trx_purge_graph_build(
272 		purge_sys->trx, n_purge_threads);
273 
274 	new(&purge_sys->view) ReadView();
275 
276 	trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
277 
278 	purge_sys->view_active = true;
279 
280 	purge_sys->rseg_iter = UT_NEW_NOKEY(TrxUndoRsegsIterator(purge_sys));
281 
282 	/* Allocate 8K bytes for the initial heap. */
283 	purge_sys->heap = mem_heap_create(8 * 1024);
284 }
285 
286 /************************************************************************
287 Frees the global purge system control structure. */
288 void
trx_purge_sys_close(void)289 trx_purge_sys_close(void)
290 /*======================*/
291 {
292 	for (que_thr_t* thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
293 		thr != NULL;
294 		thr = UT_LIST_GET_NEXT(thrs, thr)) {
295 		if (thr->prebuilt != 0 &&
296 			thr->prebuilt->compress_heap != 0) {
297 			row_mysql_prebuilt_free_compress_heap(thr->prebuilt);
298 		}
299 	}
300 
301 	que_graph_free(purge_sys->query);
302 
303 	ut_a(purge_sys->trx->id == 0);
304 	ut_a(purge_sys->sess->trx == purge_sys->trx);
305 
306 	purge_sys->trx->state = TRX_STATE_NOT_STARTED;
307 
308 	sess_close(purge_sys->sess);
309 
310 	purge_sys->sess = NULL;
311 
312 	purge_sys->view.close();
313 	purge_sys->view.~ReadView();
314 
315 	rw_lock_free(&purge_sys->latch);
316 	mutex_free(&purge_sys->pq_mutex);
317 
318 	if (purge_sys->purge_queue != NULL) {
319 		UT_DELETE(purge_sys->purge_queue);
320 		purge_sys->purge_queue = NULL;
321 	}
322 
323 	os_event_destroy(purge_sys->event);
324 
325 	purge_sys->event = NULL;
326 
327 	mem_heap_free(purge_sys->heap);
328 
329 	purge_sys->heap = NULL;
330 
331 	UT_DELETE(purge_sys->rseg_iter);
332 
333 	ut_free(purge_sys);
334 
335 	purge_sys = NULL;
336 }
337 
338 /*================ UNDO LOG HISTORY LIST =============================*/
339 
340 /********************************************************************//**
341 Adds the update undo log as the first log in the history list. Removes the
342 update undo log segment from the rseg slot if it is too big for reuse. */
343 void
trx_purge_add_update_undo_to_history(trx_t * trx,trx_undo_ptr_t * undo_ptr,page_t * undo_page,bool update_rseg_history_len,ulint n_added_logs,mtr_t * mtr)344 trx_purge_add_update_undo_to_history(
345 /*=================================*/
346 	trx_t*		trx,		/*!< in: transaction */
347 	trx_undo_ptr_t*	undo_ptr,	/*!< in/out: update undo log. */
348 	page_t*		undo_page,	/*!< in: update undo log header page,
349 					x-latched */
350 	bool		update_rseg_history_len,
351 					/*!< in: if true: update rseg history
352 					len else skip updating it. */
353 	ulint		n_added_logs,	/*!< in: number of logs added */
354 	mtr_t*		mtr)		/*!< in: mtr */
355 {
356 	trx_undo_t*	undo;
357 	trx_rseg_t*	rseg;
358 	trx_rsegf_t*	rseg_header;
359 	trx_ulogf_t*	undo_header;
360 
361 	undo = undo_ptr->update_undo;
362 	rseg = undo->rseg;
363 
364 	rseg_header = trx_rsegf_get(
365 		undo->rseg->space, undo->rseg->page_no, undo->rseg->page_size,
366 		mtr);
367 
368 	undo_header = undo_page + undo->hdr_offset;
369 
370 	if (undo->state != TRX_UNDO_CACHED) {
371 		ulint		hist_size;
372 #ifdef UNIV_DEBUG
373 		trx_usegf_t*	seg_header = undo_page + TRX_UNDO_SEG_HDR;
374 #endif /* UNIV_DEBUG */
375 
376 		/* The undo log segment will not be reused */
377 
378 		if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) {
379 			ib::fatal() << "undo->id is " << undo->id;
380 		}
381 
382 		trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
383 
384 		MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
385 
386 		hist_size = mtr_read_ulint(
387 			rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);
388 
389 		ut_ad(undo->size == flst_get_len(
390 			      seg_header + TRX_UNDO_PAGE_LIST));
391 
392 		mlog_write_ulint(
393 			rseg_header + TRX_RSEG_HISTORY_SIZE,
394 			hist_size + undo->size, MLOG_4BYTES, mtr);
395 	}
396 
397 	ut_ad(!trx_commit_disallowed);
398 
399 	/* Add the log as the first in the history list */
400 	flst_add_first(rseg_header + TRX_RSEG_HISTORY,
401 		       undo_header + TRX_UNDO_HISTORY_NODE, mtr);
402 
403 	if (update_rseg_history_len) {
404 		os_atomic_increment_ulint(
405 			&trx_sys->rseg_history_len, n_added_logs);
406 		if (trx_sys->rseg_history_len
407 		    > srv_n_purge_threads * srv_purge_batch_size) {
408 			srv_wake_purge_thread_if_not_active();
409 		}
410 	}
411 
412 	/* Write the trx number to the undo log header */
413 	mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
414 
415 	/* Write information about delete markings to the undo log header */
416 
417 	if (!undo->del_marks) {
418 		mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
419 				 MLOG_2BYTES, mtr);
420 	}
421 
422 	if (rseg->last_page_no == FIL_NULL) {
423 		rseg->last_page_no = undo->hdr_page_no;
424 		rseg->last_offset = undo->hdr_offset;
425 		rseg->last_trx_no = trx->no;
426 		rseg->last_del_marks = undo->del_marks;
427 	}
428 }
429 
430 /** Remove undo log header from the history list.
431 @param[in,out]	rseg_hdr	rollback segment header
432 @param[in]	log_hdr		undo log segment header
433 @param[in,out]	mtr		mini transaction. */
434 static
435 void
trx_purge_remove_log_hdr(trx_rsegf_t * rseg_hdr,trx_ulogf_t * log_hdr,mtr_t * mtr)436 trx_purge_remove_log_hdr(
437 	trx_rsegf_t*	rseg_hdr,
438 	trx_ulogf_t*	log_hdr,
439 	mtr_t*		mtr)
440 {
441 	flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
442 		    log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
443 
444 	os_atomic_decrement_ulint(&trx_sys->rseg_history_len, 1);
445 }
446 
447 /** Frees an undo log segment which is in the history list. Removes the
448 undo log hdr from the history list.
449 @param[in,out]	rseg		rollback segment
450 @param[in]	hdr_addr	file address of log_hdr
451 @param[in]	noredo		skip redo logging. */
452 static
453 void
trx_purge_free_segment(trx_rseg_t * rseg,fil_addr_t hdr_addr,bool noredo)454 trx_purge_free_segment(
455 	trx_rseg_t*	rseg,
456 	fil_addr_t	hdr_addr,
457 	bool		noredo)
458 {
459 	mtr_t		mtr;
460 	trx_rsegf_t*	rseg_hdr;
461 	trx_ulogf_t*	log_hdr;
462 	trx_usegf_t*	seg_hdr;
463 	ulint		seg_size;
464 	ulint		hist_size;
465 	bool		marked		= noredo;
466 
467 	for (;;) {
468 		page_t*	undo_page;
469 
470 		mtr_start(&mtr);
471 		if (noredo) {
472 			mtr.set_log_mode(MTR_LOG_NO_REDO);
473 		}
474 		ut_ad(noredo == trx_sys_is_noredo_rseg_slot(rseg->id));
475 
476 		mutex_enter(&rseg->mutex);
477 
478 		rseg_hdr = trx_rsegf_get(
479 			rseg->space, rseg->page_no, rseg->page_size, &mtr);
480 
481 		undo_page = trx_undo_page_get(
482 			page_id_t(rseg->space, hdr_addr.page), rseg->page_size,
483 			&mtr);
484 
485 		seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
486 		log_hdr = undo_page + hdr_addr.boffset;
487 
488 		/* Mark the last undo log totally purged, so that if the
489 		system crashes, the tail of the undo log will not get accessed
490 		again. The list of pages in the undo log tail gets inconsistent
491 		during the freeing of the segment, and therefore purge should
492 		not try to access them again. */
493 
494 		if (!marked) {
495 			marked = true;
496 			mlog_write_ulint(
497 				log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
498 				MLOG_2BYTES, &mtr);
499 		}
500 
501 		if (fseg_free_step_not_header(
502 			    seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr)) {
503 
504 			break;
505 		}
506 
507 		mutex_exit(&rseg->mutex);
508 
509 		mtr_commit(&mtr);
510 	}
511 
512 	/* The page list may now be inconsistent, but the length field
513 	stored in the list base node tells us how big it was before we
514 	started the freeing. */
515 
516 	seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST);
517 
518 	/* We may free the undo log segment header page; it must be freed
519 	within the same mtr as the undo log header is removed from the
520 	history list: otherwise, in case of a database crash, the segment
521 	could become inaccessible garbage in the file space. */
522 
523 	trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
524 
525 	do {
526 
527 		/* Here we assume that a file segment with just the header
528 		page can be freed in a few steps, so that the buffer pool
529 		is not flooded with bufferfixed pages: see the note in
530 		fsp0fsp.cc. */
531 
532 	} while (!fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr));
533 
534 	hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
535 				   MLOG_4BYTES, &mtr);
536 	ut_ad(hist_size >= seg_size);
537 
538 	mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
539 			 hist_size - seg_size, MLOG_4BYTES, &mtr);
540 
541 	ut_ad(rseg->curr_size >= seg_size);
542 
543 	rseg->curr_size -= seg_size;
544 
545 	mutex_exit(&(rseg->mutex));
546 
547 	mtr_commit(&mtr);
548 }
549 
550 /********************************************************************//**
551 Removes unnecessary history data from a rollback segment. */
552 static
553 void
trx_purge_truncate_rseg_history(trx_rseg_t * rseg,const purge_iter_t * limit)554 trx_purge_truncate_rseg_history(
555 /*============================*/
556 	trx_rseg_t*		rseg,		/*!< in: rollback segment */
557 	const purge_iter_t*	limit)		/*!< in: truncate offset */
558 {
559 	fil_addr_t	hdr_addr;
560 	fil_addr_t	prev_hdr_addr;
561 	trx_rsegf_t*	rseg_hdr;
562 	page_t*		undo_page;
563 	trx_ulogf_t*	log_hdr;
564 	trx_usegf_t*	seg_hdr;
565 	mtr_t		mtr;
566 	trx_id_t	undo_trx_no;
567 	const bool	noredo		= trx_sys_is_noredo_rseg_slot(
568 		rseg->id);
569 
570 	mtr_start(&mtr);
571 	if (noredo) {
572 		mtr.set_log_mode(MTR_LOG_NO_REDO);
573 	}
574 	mutex_enter(&(rseg->mutex));
575 
576 	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
577 				 rseg->page_size, &mtr);
578 
579 	hdr_addr = trx_purge_get_log_from_hist(
580 		flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
581 loop:
582 	if (hdr_addr.page == FIL_NULL) {
583 
584 		mutex_exit(&(rseg->mutex));
585 
586 		mtr_commit(&mtr);
587 
588 		return;
589 	}
590 
591 	undo_page = trx_undo_page_get(page_id_t(rseg->space, hdr_addr.page),
592 				      rseg->page_size, &mtr);
593 
594 	log_hdr = undo_page + hdr_addr.boffset;
595 
596 	undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
597 
598 	if (undo_trx_no >= limit->trx_no) {
599 
600 		/* limit space_id should match the rollback segment
601 		space id to avoid freeing of the page belongs to
602 		different rollback segment for the same trx_no. */
603 		if (undo_trx_no == limit->trx_no
604 		    && rseg->space == limit->undo_rseg_space) {
605 
606 			trx_undo_truncate_start(
607 				rseg, hdr_addr.page,
608 				hdr_addr.boffset, limit->undo_no);
609 		}
610 
611 		mutex_exit(&(rseg->mutex));
612 		mtr_commit(&mtr);
613 
614 		return;
615 	}
616 
617 	prev_hdr_addr = trx_purge_get_log_from_hist(
618 		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
619 
620 	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
621 
622 	if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
623 	    && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
624 
625 		/* We can free the whole log segment */
626 
627 		mutex_exit(&(rseg->mutex));
628 		mtr_commit(&mtr);
629 
630 		/* calls the trx_purge_remove_log_hdr()
631 		inside trx_purge_free_segment(). */
632 		trx_purge_free_segment(rseg, hdr_addr, noredo);
633 
634 	} else {
635 		/* Remove the log hdr from the rseg history. */
636 		trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
637 
638 		mutex_exit(&(rseg->mutex));
639 		mtr_commit(&mtr);
640 	}
641 
642 	mtr_start(&mtr);
643 	if (noredo) {
644 		mtr.set_log_mode(MTR_LOG_NO_REDO);
645 	}
646 	mutex_enter(&(rseg->mutex));
647 
648 	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
649 				 rseg->page_size, &mtr);
650 
651 	hdr_addr = prev_hdr_addr;
652 
653 	goto loop;
654 }
655 
656 /** UNDO log truncate logger. Needed to track state of truncate during crash.
657 An auxiliary redo log file undo_<space_id>_trunc.log will created while the
658 truncate of the UNDO is in progress. This file is required during recovery
659 to complete the truncate. */
660 
661 namespace undo {
662 
663 	/** Populate log file name based on space_id
664 	@param[in]	space_id	id of the undo tablespace.
665 	@return DB_SUCCESS or error code */
populate_log_file_name(ulint space_id,char * & log_file_name)666 	dberr_t populate_log_file_name(
667 		ulint	space_id,
668 		char*&	log_file_name)
669 	{
670 		ulint log_file_name_sz =
671 			strlen(srv_log_group_home_dir) + 22 + 1 /* NUL */
672 			+ strlen(undo::s_log_prefix)
673 			+ strlen(undo::s_log_ext);
674 
675 		log_file_name = new (std::nothrow) char[log_file_name_sz];
676 		if (log_file_name == 0) {
677 			return(DB_OUT_OF_MEMORY);
678 		}
679 
680 		memset(log_file_name, 0, log_file_name_sz);
681 
682 		strcpy(log_file_name, srv_log_group_home_dir);
683 		ulint	log_file_name_len = strlen(log_file_name);
684 
685 		if (log_file_name[log_file_name_len - 1]
686 				!= OS_PATH_SEPARATOR) {
687 
688 			log_file_name[log_file_name_len]
689 				= OS_PATH_SEPARATOR;
690 			log_file_name_len = strlen(log_file_name);
691 		}
692 
693 		ut_snprintf(log_file_name + log_file_name_len,
694 			    log_file_name_sz - log_file_name_len,
695 			    "%s%lu_%s", undo::s_log_prefix,
696 			    (ulong) space_id, s_log_ext);
697 
698 		return(DB_SUCCESS);
699 	}
700 
701 	/** Create the truncate log file.
702 	@param[in]	space_id	id of the undo tablespace to truncate.
703 	@return DB_SUCCESS or error code. */
init(ulint space_id)704 	dberr_t init(ulint space_id)
705 	{
706 		dberr_t		err;
707 		char*		log_file_name;
708 
709 		/* Step-1: Create the log file name using the pre-decided
710 		prefix/suffix and table id of undo tablepsace to truncate. */
711 		err = populate_log_file_name(space_id, log_file_name);
712 		if (err != DB_SUCCESS) {
713 			return(err);
714 		}
715 
716 		/* Step-2: Create the log file, open it and write 0 to
717 		indicate init phase. */
718 		bool            ret;
719 		pfs_os_file_t	handle = os_file_create(
720 			innodb_log_file_key, log_file_name, OS_FILE_CREATE,
721 			OS_FILE_NORMAL, OS_LOG_FILE, srv_read_only_mode, &ret);
722 		if (!ret) {
723 			delete[] log_file_name;
724 			return(DB_IO_ERROR);
725 		}
726 
727 		ulint	sz = UNIV_PAGE_SIZE;
728 		void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
729 		if (buf == NULL) {
730 			os_file_close(handle);
731 			delete[] log_file_name;
732 			return(DB_OUT_OF_MEMORY);
733 		}
734 
735 		byte*	log_buf = static_cast<byte*>(
736 			ut_align(buf, UNIV_PAGE_SIZE));
737 
738 		IORequest	request(IORequest::WRITE);
739 
740 		request.disable_compression();
741 
742 		err = os_file_write(
743 			request, log_file_name, handle, log_buf, 0, sz);
744 
745 		os_file_flush(handle);
746 		os_file_close(handle);
747 
748 		ut_free(buf);
749 		delete[] log_file_name;
750 
751 		return(err);
752 	}
753 
754 	/** Mark completion of undo truncate action by writing magic number to
755 	the log file and then removing it from the disk.
756 	If we are going to remove it from disk then why write magic number ?
757 	This is to safeguard from unlink (file-system) anomalies that will keep
758 	the link to the file even after unlink action is successfull and
759 	ref-count = 0.
760 	@param[in]	space_id	id of the undo tablespace to truncate.*/
done(ulint space_id)761 	void done(
762 		ulint	space_id)
763 	{
764 		dberr_t		err;
765 		char*		log_file_name;
766 
767 		/* Step-1: Create the log file name using the pre-decided
768 		prefix/suffix and table id of undo tablepsace to truncate. */
769 		err = populate_log_file_name(space_id, log_file_name);
770 		if (err != DB_SUCCESS) {
771 			return;
772 		}
773 
774 		/* Step-2: Open log file and write magic number to
775 		indicate done phase. */
776 		bool    ret;
777 		pfs_os_file_t	handle =
778 			os_file_create_simple_no_error_handling(
779 				innodb_log_file_key, log_file_name,
780 				OS_FILE_OPEN, OS_FILE_READ_WRITE,
781 				srv_read_only_mode, &ret);
782 
783 		if (!ret) {
784 			os_file_delete(innodb_log_file_key, log_file_name);
785 			delete[] log_file_name;
786 			return;
787 		}
788 
789 		ulint	sz = UNIV_PAGE_SIZE;
790 		void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
791 		if (buf == NULL) {
792 			os_file_close(handle);
793 			os_file_delete(innodb_log_file_key, log_file_name);
794 			delete[] log_file_name;
795 			return;
796 		}
797 
798 		byte*	log_buf = static_cast<byte*>(
799 			ut_align(buf, UNIV_PAGE_SIZE));
800 
801 		mach_write_to_4(log_buf, undo::s_magic);
802 
803 		IORequest	request(IORequest::WRITE);
804 
805 		request.disable_compression();
806 
807 		err = os_file_write(
808 			request, log_file_name, handle, log_buf, 0, sz);
809 
810 		ut_ad(err == DB_SUCCESS);
811 
812 		os_file_flush(handle);
813 		os_file_close(handle);
814 
815 		ut_free(buf);
816 		os_file_delete(innodb_log_file_key, log_file_name);
817 		delete[] log_file_name;
818 	}
819 
820 	/** Check if TRUNCATE_DDL_LOG file exist.
821 	@param[in]	space_id	id of the undo tablespace.
822 	@return true if exist else false. */
is_log_present(ulint space_id)823 	bool is_log_present(
824 		ulint	space_id)
825 	{
826 		dberr_t		err;
827 		char*		log_file_name;
828 
829 		/* Step-1: Populate log file name. */
830 		err = populate_log_file_name(space_id, log_file_name);
831 		if (err != DB_SUCCESS) {
832 			return(false);
833 		}
834 
835 		/* Step-2: Check for existence of the file. */
836 		bool		exist;
837 		os_file_type_t	type;
838 		os_file_status(log_file_name, &exist, &type);
839 
840 		/* Step-3: If file exists, check it for presence of magic
841 		number.  If found, then delete the file and report file
842 		doesn't exist as presence of magic number suggest that
843 		truncate action was complete. */
844 
845 		if (exist) {
846 			bool    ret;
847 			pfs_os_file_t	handle =
848 				os_file_create_simple_no_error_handling(
849 					innodb_log_file_key, log_file_name,
850 					OS_FILE_OPEN, OS_FILE_READ_WRITE,
851 					srv_read_only_mode, &ret);
852 			if (!ret) {
853 				os_file_delete(innodb_log_file_key,
854 					       log_file_name);
855 				delete[] log_file_name;
856 				return(false);
857 			}
858 
859 			ulint	sz = UNIV_PAGE_SIZE;
860 			void*	buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
861 			if (buf == NULL) {
862 				os_file_close(handle);
863 				os_file_delete(innodb_log_file_key,
864 					       log_file_name);
865 				delete[] log_file_name;
866 				return(false);
867 			}
868 
869 			byte*	log_buf = static_cast<byte*>(
870 				ut_align(buf, UNIV_PAGE_SIZE));
871 
872 			IORequest	request(IORequest::READ);
873 
874 			request.disable_compression();
875 
876 			dberr_t	err;
877 
878 			err = os_file_read(request, handle, log_buf, 0, sz);
879 
880 			os_file_close(handle);
881 
882 			if (err != DB_SUCCESS) {
883 
884 				ib::info()
885 					<< "Unable to read '"
886 					<< log_file_name << "' : "
887 					<< ut_strerr(err);
888 
889 				os_file_delete(
890 					innodb_log_file_key, log_file_name);
891 
892 				ut_free(buf);
893 
894 				delete[] log_file_name;
895 
896 				return(false);
897 			}
898 
899 			ulint	magic_no = mach_read_from_4(log_buf);
900 
901 			ut_free(buf);
902 
903 			if (magic_no == undo::s_magic) {
904 				/* Found magic number. */
905 				os_file_delete(innodb_log_file_key,
906 					       log_file_name);
907 				delete[] log_file_name;
908 				return(false);
909 			}
910 		}
911 
912 		delete[] log_file_name;
913 
914 		return(exist);
915 	}
916 };
917 
918 /** Iterate over all the UNDO tablespaces and check if any of the UNDO
919 tablespace qualifies for TRUNCATE (size > threshold).
920 @param[in,out]	undo_trunc	undo truncate tracker */
921 static
922 void
trx_purge_mark_undo_for_truncate(undo::Truncate * undo_trunc)923 trx_purge_mark_undo_for_truncate(
924 	undo::Truncate*	undo_trunc)
925 {
926 	/* Step-1: If UNDO Tablespace
927 		- already marked for truncate (OR)
928 		- truncate disabled
929 	return immediately else search for qualifying tablespace. */
930 	if (undo_trunc->is_marked() || !srv_undo_log_truncate) {
931 		return;
932 	}
933 
934 	/* Step-2: Validation/Qualification checks
935 	a. At-least 2 UNDO tablespaces so even if one UNDO tablespace
936 	   is being truncated server can continue to operate.
937 	b. At-least 2 UNDO redo rseg/undo logs (besides the default rseg-0)
938 	b. At-least 1 UNDO tablespace size > threshold. */
939 	if (srv_undo_tablespaces_active < 2
940 	    || (srv_rollback_segments < (1 + srv_tmp_undo_logs + 2))) {
941 		return;
942 	}
943 
944 	/* Avoid bias selection and so start the scan from immediate next
945 	of last selected UNDO tablespace for truncate. */
946 	ulint space_id = undo_trunc->get_scan_start();
947 
948 	for (ulint i = 1; i <= srv_undo_tablespaces_active; i++) {
949 
950 		ut_ad(srv_undo_space_id_start != 0);
951 
952 		if (fil_space_get_size(space_id)
953 		    > (srv_max_undo_log_size / srv_page_size)) {
954 			/* Tablespace qualifies for truncate. */
955 			undo_trunc->mark(space_id);
956 			undo::Truncate::add_space_to_trunc_list(space_id);
957 			break;
958 		}
959 
960 		space_id++;
961 
962 		if (space_id >= (srv_undo_space_id_start
963 				 + srv_undo_tablespaces_active)) {
964 			/* Note: UNDO tablespace ids starts from 1. */
965 			space_id = srv_undo_space_id_start;
966 		}
967 
968 		if (undo_trunc->is_marked()) {
969 			break;
970 		}
971 	}
972 
973 	undo_trunc->set_scan_start(space_id);
974 
975 	/* Couldn't make any selection. */
976 	if (!undo_trunc->is_marked()) {
977 		return;
978 	}
979 
980 #ifdef UNIV_DEBUG
981 	ib::info() << "UNDO tablespace with space identifier "
982 		<< undo_trunc->get_marked_space_id() << " marked for truncate";
983 #endif /* UNIV_DEBUG */
984 
985 	/* Step-3: Iterate over all the rsegs of selected UNDO tablespace
986 	and mark them temporarily unavailable for allocation.*/
987 	for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
988 		trx_rseg_t*	rseg = trx_sys->rseg_array[i];
989 
990 		if (rseg != NULL && !trx_sys_is_noredo_rseg_slot(rseg->id)) {
991 			if (rseg->space
992 				== undo_trunc->get_marked_space_id()) {
993 
994 				/* Once set this rseg will not be allocated
995 				to new booting transaction but we will wait
996 				for existing active transaction to finish. */
997 				rseg->skip_allocation = true;
998 				undo_trunc->add_rseg_to_trunc(rseg);
999 			}
1000 		}
1001 	}
1002 }
1003 
1004 undo::undo_spaces_t	undo::Truncate::s_spaces_to_truncate;
1005 
1006 /** Cleanse purge queue to remove the rseg that reside in undo-tablespace
1007 marked for truncate.
1008 @param[in,out]	undo_trunc	undo truncate tracker */
1009 static
1010 void
trx_purge_cleanse_purge_queue(undo::Truncate * undo_trunc)1011 trx_purge_cleanse_purge_queue(
1012 	undo::Truncate*	undo_trunc)
1013 {
1014 	mutex_enter(&purge_sys->pq_mutex);
1015 	typedef	std::vector<TrxUndoRsegs>	purge_elem_list_t;
1016 	purge_elem_list_t			purge_elem_list;
1017 
1018 	/* Remove rseg instances that are in the purge queue before we start
1019 	truncate of corresponding UNDO truncate. */
1020 	while (!purge_sys->purge_queue->empty()) {
1021 		purge_elem_list.push_back(purge_sys->purge_queue->top());
1022 		purge_sys->purge_queue->pop();
1023 	}
1024 	ut_ad(purge_sys->purge_queue->empty());
1025 
1026 	for (purge_elem_list_t::iterator it = purge_elem_list.begin();
1027 	     it != purge_elem_list.end();
1028 	     ++it) {
1029 
1030 		for (TrxUndoRsegs::iterator it2 = it->begin();
1031 		     it2 != it->end();
1032 		     ++it2) {
1033 
1034 			if ((*it2)->space
1035 				== undo_trunc->get_marked_space_id()) {
1036 				it->erase(it2);
1037 				break;
1038 			}
1039 		}
1040 
1041 		const ulint	size = it->size();
1042 		if (size != 0) {
1043 			/* size != 0 suggest that there exist other rsegs that
1044 			needs processing so add this element to purge queue.
1045 			Note: Other rseg could be non-redo rsegs. */
1046 			purge_sys->purge_queue->push(*it);
1047 		}
1048 	}
1049 	mutex_exit(&purge_sys->pq_mutex);
1050 }
1051 
1052 /** Iterate over selected UNDO tablespace and check if all the rsegs
1053 that resides in the tablespace are free.
1054 @param[in]	limit		truncate_limit
1055 @param[in,out]	undo_trunc	undo truncate tracker */
1056 static
1057 void
trx_purge_initiate_truncate(purge_iter_t * limit,undo::Truncate * undo_trunc)1058 trx_purge_initiate_truncate(
1059 	purge_iter_t*	limit,
1060 	undo::Truncate*	undo_trunc)
1061 {
1062 	/* Step-1: Early check to findout if any of the the UNDO tablespace
1063 	is marked for truncate. */
1064 	if (!undo_trunc->is_marked()) {
1065 		/* No tablespace marked for truncate yet. */
1066 		return;
1067 	}
1068 
1069 	/* Step-2: Scan over each rseg and ensure that it doesn't hold any
1070 	active undo records. */
1071 	bool all_free = true;
1072 
1073 	for (ulint i = 0; i < undo_trunc->rsegs_size() && all_free; ++i) {
1074 
1075 		trx_rseg_t*	rseg = undo_trunc->get_ith_rseg(i);
1076 
1077 		mutex_enter(&rseg->mutex);
1078 
1079 		if (rseg->trx_ref_count > 0) {
1080 			/* This rseg is still being held by an active
1081 			transaction. */
1082 			all_free = false;
1083 			mutex_exit(&rseg->mutex);
1084 			continue;
1085 		}
1086 
1087 		ut_ad(rseg->trx_ref_count == 0);
1088 		ut_ad(rseg->skip_allocation);
1089 
1090 		ulint	size_of_rsegs = rseg->curr_size;
1091 
1092 		if (size_of_rsegs == 1) {
1093 			mutex_exit(&rseg->mutex);
1094 			continue;
1095 		} else {
1096 
1097 			/* There could be cached undo segment. Check if records
1098 			in these segments can be purged. Normal purge history
1099 			will not touch these cached segment. */
1100 			ulint		cached_undo_size = 0;
1101 
1102 			for (trx_undo_t* undo =
1103 				UT_LIST_GET_FIRST(rseg->update_undo_cached);
1104 			     undo != NULL && all_free;
1105 			     undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1106 
1107 				if (limit->trx_no < undo->trx_id) {
1108 					all_free = false;
1109 				} else {
1110 					cached_undo_size += undo->size;
1111 				}
1112 			}
1113 
1114 			for (trx_undo_t* undo =
1115 				UT_LIST_GET_FIRST(rseg->insert_undo_cached);
1116 			     undo != NULL && all_free;
1117 			     undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1118 
1119 				if (limit->trx_no < undo->trx_id) {
1120 					all_free = false;
1121 				} else {
1122 					cached_undo_size += undo->size;
1123 				}
1124 			}
1125 
1126 			ut_ad(size_of_rsegs >= (cached_undo_size + 1));
1127 
1128 			if (size_of_rsegs > (cached_undo_size + 1)) {
1129 				/* There are pages besides cached pages that
1130 				still hold active data. */
1131 				all_free = false;
1132 			}
1133 		}
1134 
1135 		mutex_exit(&rseg->mutex);
1136 	}
1137 
1138 	if (!all_free) {
1139 		/* rseg still holds active data.*/
1140 		return;
1141 	}
1142 
1143 
1144 	/* Step-3: Start the actual truncate.
1145 	a. log-checkpoint
1146 	b. Write the DDL log to protect truncate action from CRASH
1147 	c. Remove rseg instance if added to purge queue before we
1148 	   initiate truncate.
1149 	d. Execute actual truncate
1150 	e. Remove the DDL log. */
1151 	DBUG_EXECUTE_IF("ib_undo_trunc_before_checkpoint",
1152 			ib::info() << "ib_undo_trunc_before_checkpoint";
1153 			DBUG_SUICIDE(););
1154 
1155 	/* After truncate if server crashes then redo logging done for this
1156 	undo tablespace might not stand valid as tablespace has been
1157 	truncated. */
1158 	log_make_checkpoint_at(LSN_MAX, TRUE);
1159 
1160 	ib::info() << "Truncating UNDO tablespace with space identifier "
1161 		<< undo_trunc->get_marked_space_id();
1162 
1163 	DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_start",
1164 			ib::info() << "ib_undo_trunc_before_ddl_log_start";
1165 			DBUG_SUICIDE(););
1166 
1167 #ifdef UNIV_DEBUG
1168 	dberr_t	err =
1169 #endif /* UNIV_DEBUG */
1170 		undo_trunc->start_logging(
1171 			undo_trunc->get_marked_space_id());
1172 	ut_ad(err == DB_SUCCESS);
1173 
1174 	DBUG_EXECUTE_IF("ib_undo_trunc_before_truncate",
1175 			ib::info() << "ib_undo_trunc_before_truncate";
1176 			DBUG_SUICIDE(););
1177 
1178 	trx_purge_cleanse_purge_queue(undo_trunc);
1179 
1180 	bool	success = trx_undo_truncate_tablespace(undo_trunc);
1181 	if (!success) {
1182 		/* Note: In case of error we don't enable the rsegs
1183 		and neither unmark the tablespace so the tablespace
1184 		continue to remain inactive. */
1185 		ib::error() << "Failed to truncate UNDO tablespace with"
1186 			" space identifier "
1187 			<< undo_trunc->get_marked_space_id();
1188 		return;
1189 	}
1190 
1191 	if (purge_sys->rseg != NULL
1192 	    && purge_sys->rseg->last_page_no == FIL_NULL) {
1193 		/* If purge_sys->rseg is pointing to rseg that was recently
1194 		truncated then move to next rseg element.
1195 		Note: Ideally purge_sys->rseg should be NULL because purge
1196 		should complete processing of all the records but there is
1197 		purge_batch_size that can force the purge loop to exit before
1198 		all the records are purged and in this case purge_sys->rseg
1199 		could point to a valid rseg waiting for next purge cycle. */
1200 		purge_sys->next_stored = FALSE;
1201 		purge_sys->rseg = NULL;
1202 	}
1203 
1204 	DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_end",
1205 			ib::info() << "ib_undo_trunc_before_ddl_log_end";
1206 			DBUG_SUICIDE(););
1207 
1208 	log_make_checkpoint_at(LSN_MAX, TRUE);
1209 
1210 	undo_trunc->done_logging(undo_trunc->get_marked_space_id());
1211 
1212 	/* Completed truncate. Now it is safe to re-use the tablespace. */
1213 	for (ulint i = 0; i < undo_trunc->rsegs_size(); ++i) {
1214 		trx_rseg_t*	rseg = undo_trunc->get_ith_rseg(i);
1215 		rseg->skip_allocation = false;
1216 	}
1217 
1218 	ib::info() << "Completed truncate of UNDO tablespace with space"
1219 		" identifier " << undo_trunc->get_marked_space_id();
1220 
1221 	undo_trunc->reset();
1222 	undo::Truncate::clear_trunc_list();
1223 
1224 	DBUG_EXECUTE_IF("ib_undo_trunc_trunc_done",
1225 			ib::info() << "ib_undo_trunc_trunc_done";
1226 			DBUG_SUICIDE(););
1227 }
1228 
1229 /********************************************************************//**
1230 Removes unnecessary history data from rollback segments. NOTE that when this
1231 function is called, the caller must not have any latches on undo log pages! */
1232 static
1233 void
trx_purge_truncate_history(purge_iter_t * limit,const ReadView * view)1234 trx_purge_truncate_history(
1235 /*========================*/
1236 	purge_iter_t*		limit,		/*!< in: truncate limit */
1237 	const ReadView*		view)		/*!< in: purge view */
1238 {
1239 	ulint		i;
1240 
1241 	/* We play safe and set the truncate limit at most to the purge view
1242 	low_limit number, though this is not necessary */
1243 
1244 	if (limit->trx_no >= view->low_limit_no()) {
1245 		limit->trx_no = view->low_limit_no();
1246 		limit->undo_no = 0;
1247 		limit->undo_rseg_space = ULINT_UNDEFINED;
1248 	}
1249 
1250 	ut_ad(limit->trx_no <= purge_sys->view.low_limit_no());
1251 
1252 	for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1253 		trx_rseg_t*	rseg = trx_sys->rseg_array[i];
1254 
1255 		if (rseg != NULL) {
1256 			ut_a(rseg->id == i);
1257 			trx_purge_truncate_rseg_history(rseg, limit);
1258 		}
1259 	}
1260 
1261 	for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1262 		trx_rseg_t*	rseg = trx_sys->pending_purge_rseg_array[i];
1263 
1264 		if (rseg != NULL) {
1265 			ut_a(rseg->id == i);
1266 			trx_purge_truncate_rseg_history(rseg, limit);
1267 		}
1268 	}
1269 
1270 	/* UNDO tablespace truncate. We will try to truncate as much as we
1271 	can (greedy approach). This will ensure when the server is idle we
1272 	try and truncate all the UNDO tablespaces. */
1273 	ulint	nchances = srv_undo_tablespaces_active;
1274 	for (i = 0; i < nchances; i++) {
1275 		trx_purge_mark_undo_for_truncate(&purge_sys->undo_trunc);
1276 		trx_purge_initiate_truncate(limit, &purge_sys->undo_trunc);
1277 	}
1278 }
1279 
1280 /***********************************************************************//**
1281 Updates the last not yet purged history log info in rseg when we have purged
1282 a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
1283 static
1284 void
trx_purge_rseg_get_next_history_log(trx_rseg_t * rseg,ulint * n_pages_handled)1285 trx_purge_rseg_get_next_history_log(
1286 /*================================*/
1287 	trx_rseg_t*	rseg,		/*!< in: rollback segment */
1288 	ulint*		n_pages_handled)/*!< in/out: number of UNDO pages
1289 					handled */
1290 {
1291 	page_t*		undo_page;
1292 	trx_ulogf_t*	log_hdr;
1293 	fil_addr_t	prev_log_addr;
1294 	trx_id_t	trx_no;
1295 	ibool		del_marks;
1296 	mtr_t		mtr;
1297 
1298 	mutex_enter(&(rseg->mutex));
1299 
1300 	ut_a(rseg->last_page_no != FIL_NULL);
1301 
1302 	purge_sys->iter.trx_no = rseg->last_trx_no + 1;
1303 	purge_sys->iter.undo_no = 0;
1304 	purge_sys->iter.undo_rseg_space = ULINT_UNDEFINED;
1305 	purge_sys->next_stored = FALSE;
1306 
1307 	mtr_start(&mtr);
1308 
1309 	undo_page = trx_undo_page_get_s_latched(
1310 		page_id_t(rseg->space, rseg->last_page_no),
1311 		rseg->page_size, &mtr);
1312 
1313 	log_hdr = undo_page + rseg->last_offset;
1314 
1315 	/* Increase the purge page count by one for every handled log */
1316 
1317 	(*n_pages_handled)++;
1318 
1319 	prev_log_addr = trx_purge_get_log_from_hist(
1320 		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
1321 
1322 	if (prev_log_addr.page == FIL_NULL) {
1323 		/* No logs left in the history list */
1324 
1325 		rseg->last_page_no = FIL_NULL;
1326 
1327 		mutex_exit(&(rseg->mutex));
1328 		mtr_commit(&mtr);
1329 
1330 #ifdef UNIV_DEBUG
1331 		trx_sys_mutex_enter();
1332 
1333 		/* Add debug code to track history list corruption reported
1334 		on the MySQL mailing list on Nov 9, 2004. The fut0lst.cc
1335 		file-based list was corrupt. The prev node pointer was
1336 		FIL_NULL, even though the list length was over 8 million nodes!
1337 		We assume that purge truncates the history list in large
1338 		size pieces, and if we here reach the head of the list, the
1339 		list cannot be longer than 2000 000 undo logs now. */
1340 
1341 		if (trx_sys->rseg_history_len > 2000000) {
1342 			ib::warn() << "Purge reached the head of the history"
1343 				" list, but its length is still reported as "
1344 				<< trx_sys->rseg_history_len << " which is"
1345 				" unusually high.";
1346 			ib::info() << "This can happen for multiple reasons";
1347 			ib::info() << "1. A long running transaction is"
1348 				" withholding purging of undo logs or a read"
1349 				" view is open. Please try to commit the long"
1350 				" running transaction.";
1351 			ib::info() << "2. Try increasing the number of purge"
1352 				" threads to expedite purging of undo logs.";
1353 		}
1354 
1355 		trx_sys_mutex_exit();
1356 #endif
1357 		return;
1358 	}
1359 
1360 	mutex_exit(&rseg->mutex);
1361 
1362 	mtr_commit(&mtr);
1363 
1364 	/* Read the trx number and del marks from the previous log header */
1365 	mtr_start(&mtr);
1366 
1367 	log_hdr = trx_undo_page_get_s_latched(page_id_t(rseg->space,
1368 							prev_log_addr.page),
1369 					      rseg->page_size, &mtr)
1370 		+ prev_log_addr.boffset;
1371 
1372 	trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
1373 
1374 	del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);
1375 
1376 	mtr_commit(&mtr);
1377 
1378 	mutex_enter(&(rseg->mutex));
1379 
1380 	rseg->last_page_no = prev_log_addr.page;
1381 	rseg->last_offset = prev_log_addr.boffset;
1382 	rseg->last_trx_no = trx_no;
1383 	rseg->last_del_marks = del_marks;
1384 
1385 	TrxUndoRsegs elem(rseg->last_trx_no);
1386 	elem.push_back(rseg);
1387 
1388 	/* Purge can also produce events, however these are already ordered
1389 	in the rollback segment and any user generated event will be greater
1390 	than the events that Purge produces. ie. Purge can never produce
1391 	events from an empty rollback segment. */
1392 
1393 	mutex_enter(&purge_sys->pq_mutex);
1394 
1395 	purge_sys->purge_queue->push(elem);
1396 
1397 	mutex_exit(&purge_sys->pq_mutex);
1398 
1399 	mutex_exit(&rseg->mutex);
1400 }
1401 
1402 /** Position the purge sys "iterator" on the undo record to use for purging.
1403 @param[in,out]	purge_sys	purge instance
1404 @param[in]	page_size	page size */
1405 static
1406 void
trx_purge_read_undo_rec(trx_purge_t * purge_sys,const page_size_t & page_size)1407 trx_purge_read_undo_rec(
1408 	trx_purge_t*		purge_sys,
1409 	const page_size_t&	page_size)
1410 {
1411 	ulint		offset;
1412 	ulint		page_no;
1413 	ib_uint64_t	undo_no;
1414 	ulint		undo_rseg_space;
1415 
1416 	purge_sys->hdr_offset = purge_sys->rseg->last_offset;
1417 	page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
1418 
1419 	if (purge_sys->rseg->last_del_marks) {
1420 		mtr_t		mtr;
1421 		trx_undo_rec_t*	undo_rec = NULL;
1422 
1423 		mtr_start(&mtr);
1424 
1425 		undo_rec = trx_undo_get_first_rec(
1426 			purge_sys->rseg->space,
1427 			page_size,
1428 			purge_sys->hdr_page_no,
1429 			purge_sys->hdr_offset, RW_S_LATCH, &mtr);
1430 
1431 		if (undo_rec != NULL) {
1432 			offset = page_offset(undo_rec);
1433 			undo_no = trx_undo_rec_get_undo_no(undo_rec);
1434 			undo_rseg_space = purge_sys->rseg->space;
1435 			page_no = page_get_page_no(page_align(undo_rec));
1436 		} else {
1437 			offset = 0;
1438 			undo_no = 0;
1439 			undo_rseg_space = ULINT_UNDEFINED;
1440 		}
1441 
1442 		mtr_commit(&mtr);
1443 	} else {
1444 		offset = 0;
1445 		undo_no = 0;
1446 		undo_rseg_space = ULINT_UNDEFINED;
1447 	}
1448 
1449 	purge_sys->offset = offset;
1450 	purge_sys->page_no = page_no;
1451 	purge_sys->iter.undo_no = undo_no;
1452 	purge_sys->iter.undo_rseg_space = undo_rseg_space;
1453 
1454 	purge_sys->next_stored = TRUE;
1455 }
1456 
1457 /***********************************************************************//**
1458 Chooses the next undo log to purge and updates the info in purge_sys. This
1459 function is used to initialize purge_sys when the next record to purge is
1460 not known, and also to update the purge system info on the next record when
1461 purge has handled the whole undo log for a transaction. */
1462 static
1463 void
trx_purge_choose_next_log(void)1464 trx_purge_choose_next_log(void)
1465 /*===========================*/
1466 {
1467 	ut_ad(purge_sys->next_stored == FALSE);
1468 
1469 	const page_size_t&	page_size = purge_sys->rseg_iter->set_next();
1470 
1471 	if (purge_sys->rseg != NULL) {
1472 		trx_purge_read_undo_rec(purge_sys, page_size);
1473 	} else {
1474 		/* There is nothing to do yet. */
1475 		os_thread_yield();
1476 	}
1477 }
1478 
1479 /***********************************************************************//**
1480 Gets the next record to purge and updates the info in the purge system.
1481 @return copy of an undo log record or pointer to the dummy undo log record */
1482 static
1483 trx_undo_rec_t*
trx_purge_get_next_rec(ulint * n_pages_handled,mem_heap_t * heap)1484 trx_purge_get_next_rec(
1485 /*===================*/
1486 	ulint*		n_pages_handled,/*!< in/out: number of UNDO pages
1487 					handled */
1488 	mem_heap_t*	heap)		/*!< in: memory heap where copied */
1489 {
1490 	trx_undo_rec_t*	rec;
1491 	trx_undo_rec_t*	rec_copy;
1492 	trx_undo_rec_t*	rec2;
1493 	page_t*		undo_page;
1494 	page_t*		page;
1495 	ulint		offset;
1496 	ulint		page_no;
1497 	ulint		space;
1498 	mtr_t		mtr;
1499 
1500 	ut_ad(purge_sys->next_stored);
1501 	ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no());
1502 
1503 	space = purge_sys->rseg->space;
1504 	page_no = purge_sys->page_no;
1505 	offset = purge_sys->offset;
1506 
1507 	const page_size_t	page_size(purge_sys->rseg->page_size);
1508 
1509 	if (offset == 0) {
1510 
1511 		/* It is the dummy undo log record, which means that there is
1512 		no need to purge this undo log */
1513 
1514 		trx_purge_rseg_get_next_history_log(
1515 			purge_sys->rseg, n_pages_handled);
1516 
1517 		/* Look for the next undo log and record to purge */
1518 
1519 		trx_purge_choose_next_log();
1520 
1521 		return(&trx_purge_ignore_rec);
1522 	}
1523 
1524 	mtr_start(&mtr);
1525 
1526 	undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no),
1527 						page_size, &mtr);
1528 
1529 	rec = undo_page + offset;
1530 
1531 	rec2 = rec;
1532 
1533 	for (;;) {
1534 		ulint		type;
1535 		trx_undo_rec_t*	next_rec;
1536 		ulint		cmpl_info;
1537 
1538 		/* Try first to find the next record which requires a purge
1539 		operation from the same page of the same undo log */
1540 
1541 		next_rec = trx_undo_page_get_next_rec(
1542 			rec2, purge_sys->hdr_page_no, purge_sys->hdr_offset);
1543 
1544 		if (next_rec == NULL) {
1545 			rec2 = trx_undo_get_next_rec(
1546 				rec2, purge_sys->hdr_page_no,
1547 				purge_sys->hdr_offset, &mtr);
1548 			break;
1549 		}
1550 
1551 		rec2 = next_rec;
1552 
1553 		type = trx_undo_rec_get_type(rec2);
1554 
1555 		if (type == TRX_UNDO_DEL_MARK_REC) {
1556 
1557 			break;
1558 		}
1559 
1560 		cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
1561 
1562 		if (trx_undo_rec_get_extern_storage(rec2)) {
1563 			break;
1564 		}
1565 
1566 		if ((type == TRX_UNDO_UPD_EXIST_REC)
1567 		    && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1568 			break;
1569 		}
1570 	}
1571 
1572 	if (rec2 == NULL) {
1573 		mtr_commit(&mtr);
1574 
1575 		trx_purge_rseg_get_next_history_log(
1576 			purge_sys->rseg, n_pages_handled);
1577 
1578 		/* Look for the next undo log and record to purge */
1579 
1580 		trx_purge_choose_next_log();
1581 
1582 		mtr_start(&mtr);
1583 
1584 		undo_page = trx_undo_page_get_s_latched(
1585 			page_id_t(space, page_no), page_size, &mtr);
1586 
1587 	} else {
1588 		page = page_align(rec2);
1589 
1590 		purge_sys->offset = rec2 - page;
1591 		purge_sys->page_no = page_get_page_no(page);
1592 		purge_sys->iter.undo_no = trx_undo_rec_get_undo_no(rec2);
1593 		purge_sys->iter.undo_rseg_space = space;
1594 
1595 		if (undo_page != page) {
1596 			/* We advance to a new page of the undo log: */
1597 			(*n_pages_handled)++;
1598 		}
1599 	}
1600 
1601 	rec_copy = trx_undo_rec_copy(undo_page, offset, heap);
1602 	mtr_commit(&mtr);
1603 	return(rec_copy);
1604 }
1605 
1606 /********************************************************************//**
1607 Fetches the next undo log record from the history list to purge. It must be
1608 released with the corresponding release function.
1609 @return copy of an undo log record or pointer to trx_purge_ignore_rec,
1610 if the whole undo log can skipped in purge; NULL if none left */
1611 static MY_ATTRIBUTE((warn_unused_result))
1612 trx_undo_rec_t*
trx_purge_fetch_next_rec(roll_ptr_t * roll_ptr,ulint * n_pages_handled,mem_heap_t * heap)1613 trx_purge_fetch_next_rec(
1614 /*=====================*/
1615 	roll_ptr_t*	roll_ptr,	/*!< out: roll pointer to undo record */
1616 	ulint*		n_pages_handled,/*!< in/out: number of UNDO log pages
1617 					handled */
1618 	mem_heap_t*	heap)		/*!< in: memory heap where copied */
1619 {
1620 	if (!purge_sys->next_stored) {
1621 		trx_purge_choose_next_log();
1622 
1623 		if (!purge_sys->next_stored) {
1624 			DBUG_PRINT("ib_purge",
1625 				   ("no logs left in the history list"));
1626 			return(NULL);
1627 		}
1628 	}
1629 
1630 	if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) {
1631 
1632 		return(NULL);
1633 	}
1634 
1635 	/* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n",
1636 	os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */
1637 
1638 	*roll_ptr = trx_undo_build_roll_ptr(
1639 		FALSE, purge_sys->rseg->id,
1640 		purge_sys->page_no, purge_sys->offset);
1641 
1642 	/* The following call will advance the stored values of the
1643 	purge iterator. */
1644 
1645 	return(trx_purge_get_next_rec(n_pages_handled, heap));
1646 }
1647 
1648 /** This function runs a purge batch.
1649 @param[in]	n_purge_threads	number of purge threads
1650 @param[in,out]	purge_sys	purge instance
1651 @param[in]	batch_size	no. of pages to purge
1652 @return number of undo log pages handled in the batch */
1653 static
1654 ulint
trx_purge_attach_undo_recs(const ulint n_purge_threads,trx_purge_t * purge_sys,ulint batch_size)1655 trx_purge_attach_undo_recs(
1656 	const ulint	n_purge_threads,
1657 	trx_purge_t*	purge_sys,
1658 	ulint		batch_size)
1659 {
1660 	que_thr_t*	thr;
1661 	ulint		n_pages_handled = 0;
1662 
1663 	ut_a(n_purge_threads > 0);
1664 	ut_a(n_purge_threads <= SRV_MAX_N_PURGE_THREADS);
1665 
1666 	purge_sys->limit = purge_sys->iter;
1667 
1668 	que_thr_t*	run_thrs[SRV_MAX_N_PURGE_THREADS];
1669 
1670 	/* Validate some pre-requisites and reset done flag. */
1671 	ulint		i = 0;
1672 
1673 	for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1674 	     thr != NULL && i < n_purge_threads;
1675 	     thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
1676 
1677 		purge_node_t*		node;
1678 
1679 		/* Get the purge node. */
1680 		node = static_cast<purge_node_t*>(thr->child);
1681 
1682 		ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1683 		ut_a(node->recs == NULL);
1684 		ut_a(node->done);
1685 
1686 		node->done = false;
1687 
1688 		ut_a(!thr->is_active);
1689 
1690 		run_thrs[i] = thr;
1691 	}
1692 
1693 	/* There should never be fewer nodes than threads, the inverse
1694 	however is allowed because we only use purge threads as needed. */
1695 	ut_a(i == n_purge_threads);
1696 	ut_ad(trx_purge_check_limit());
1697 
1698 	mem_heap_t*	heap = purge_sys->heap;
1699 
1700 	mem_heap_empty(heap);
1701 
1702 	typedef std::map<
1703 	    table_id_t, purge_node_t::Recs *, std::less<table_id_t>,
1704 	    mem_heap_allocator<std::pair<const table_id_t, purge_node_t::Recs *> > >
1705 	    GroupBy;
1706 
1707 	GroupBy group_by((GroupBy::key_compare()), mem_heap_allocator<GroupBy::value_type>(heap));
1708 
1709 	for (ulint i = 0; n_pages_handled < batch_size; ++i) {
1710 
1711 		/* Track the max {trx_id, undo_no} for truncating the
1712 		UNDO logs once we have purged the records. */
1713 
1714 		if (trx_purge_check_limit()) {
1715 			purge_sys->limit = purge_sys->iter;
1716 		}
1717 
1718 		purge_node_t::rec_t	rec;
1719 
1720 		/* Fetch the next record, and advance the purge_sys->iter. */
1721 		rec.undo_rec = trx_purge_fetch_next_rec(
1722 			&rec.roll_ptr, &n_pages_handled, heap);
1723 
1724 		if (rec.undo_rec == &trx_purge_ignore_rec) {
1725 
1726 			continue;
1727 
1728 		} else if (rec.undo_rec == NULL) {
1729 
1730 			break;
1731 		}
1732 
1733 		table_id_t	table_id;
1734 
1735 		table_id = trx_undo_rec_get_table_id(rec.undo_rec);
1736 
1737 		GroupBy::iterator lb = group_by.lower_bound(table_id);
1738 
1739 		if (lb != group_by.end()
1740 		    && !(group_by.key_comp()(table_id, lb->first))) {
1741 
1742 			lb->second->push_back(rec);
1743 
1744 		} else {
1745 			typedef GroupBy::value_type value_type;
1746 
1747 			void*			ptr;
1748 			purge_node_t::Recs*	recs;
1749 
1750 			ptr = mem_heap_alloc(heap, sizeof(purge_node_t::Recs));
1751 
1752 			/* Call the destructor explicitly in row_purge_end() */
1753 			recs = new (ptr) purge_node_t::Recs(mem_heap_allocator<purge_node_t::rec_t>(heap));
1754 
1755 			recs->push_back(rec);
1756 
1757 			group_by.insert(lb, value_type(table_id, recs));
1758 		}
1759 	}
1760 
1761 	/* Objective is to ensure that all the table entries in one
1762 	batch are handled by the same thread. Ths is to avoid contention
1763 	on the dict_index_t::lock */
1764 
1765 	const GroupBy& group_by_const = group_by;
1766 	GroupBy::const_iterator	end = group_by_const.end();
1767 
1768 	for (GroupBy::const_iterator it = group_by_const.begin(); it != end; ) {
1769 
1770 		for (ulint i = 0; i < n_purge_threads && it != end; ++i, ++it) {
1771 
1772 			purge_node_t*	node;
1773 
1774 			node = static_cast<purge_node_t*>(run_thrs[i]->child);
1775 
1776 			ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1777 
1778 			if (node->recs == NULL) {
1779 				node->recs = it->second;
1780 			} else {
1781 				node->recs->insert(
1782 					node->recs->end(),
1783 					it->second->begin(),
1784 					it->second->end());
1785 			}
1786 		}
1787 	}
1788 
1789 	ut_ad(trx_purge_check_limit());
1790 
1791 	return(n_pages_handled);
1792 }
1793 
1794 /*******************************************************************//**
1795 Calculate the DML delay required.
1796 @return delay in microseconds or ULINT_MAX */
1797 static
1798 ulint
trx_purge_dml_delay(void)1799 trx_purge_dml_delay(void)
1800 /*=====================*/
1801 {
1802 	/* Determine how much data manipulation language (DML) statements
1803 	need to be delayed in order to reduce the lagging of the purge
1804 	thread. */
1805 	ulint	delay = 0; /* in microseconds; default: no delay */
1806 
1807 	/* If purge lag is set (ie. > 0) then calculate the new DML delay.
1808 	Note: we do a dirty read of the trx_sys_t data structure here,
1809 	without holding trx_sys->mutex. */
1810 
1811 	if (srv_max_purge_lag > 0
1812 	    && trx_sys->rseg_history_len
1813 	       > srv_n_purge_threads * srv_purge_batch_size) {
1814 		float	ratio;
1815 
1816 		ratio = float(trx_sys->rseg_history_len) / srv_max_purge_lag;
1817 
1818 		if (ratio > 1.0) {
1819 			/* If the history list length exceeds the
1820 			srv_max_purge_lag, the data manipulation
1821 			statements are delayed by at least 5000
1822 			microseconds. */
1823 			delay = (ulint) ((ratio - .5) * 10000);
1824 		}
1825 
1826 		if (delay > srv_max_purge_lag_delay) {
1827 			delay = srv_max_purge_lag_delay;
1828 		}
1829 
1830 		MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay);
1831 	}
1832 
1833 	return(delay);
1834 }
1835 
1836 /*******************************************************************//**
1837 Wait for pending purge jobs to complete. */
1838 static
1839 void
trx_purge_wait_for_workers_to_complete(trx_purge_t * purge_sys)1840 trx_purge_wait_for_workers_to_complete(
1841 /*===================================*/
1842 	trx_purge_t*	purge_sys)	/*!< in: purge instance */
1843 {
1844 	ulint		i = 0;
1845 	ulint		n_submitted = purge_sys->n_submitted;
1846 
1847 	/* Ensure that the work queue empties out. */
1848 	while (!os_compare_and_swap_ulint(
1849 			&purge_sys->n_completed, n_submitted, n_submitted)) {
1850 
1851 		if (++i < 10) {
1852 			os_thread_yield();
1853 		} else {
1854 
1855 			if (srv_get_task_queue_length() > 0) {
1856 				srv_release_threads(SRV_WORKER, 1);
1857 			}
1858 
1859 			os_thread_sleep(20);
1860 			i = 0;
1861 		}
1862 	}
1863 
1864 	/* None of the worker threads should be doing any work. */
1865 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1866 
1867 	/* There should be no outstanding tasks as long
1868 	as the worker threads are active. */
1869 	ut_a(srv_get_task_queue_length() == 0);
1870 }
1871 
1872 /******************************************************************//**
1873 Remove old historical changes from the rollback segments. */
1874 static
1875 void
trx_purge_truncate(void)1876 trx_purge_truncate(void)
1877 /*====================*/
1878 {
1879 	ut_ad(trx_purge_check_limit());
1880 
1881 	if (purge_sys->limit.trx_no == 0) {
1882 		trx_purge_truncate_history(&purge_sys->iter, &purge_sys->view);
1883 	} else {
1884 		trx_purge_truncate_history(&purge_sys->limit, &purge_sys->view);
1885 	}
1886 }
1887 
1888 /*******************************************************************//**
1889 This function runs a purge batch.
1890 @return number of undo log pages handled in the batch */
1891 ulint
trx_purge(ulint n_purge_threads,ulint batch_size,bool truncate)1892 trx_purge(
1893 /*======*/
1894 	ulint	n_purge_threads,	/*!< in: number of purge tasks
1895 					to submit to the work queue */
1896 	ulint	batch_size,		/*!< in: the maximum number of records
1897 					to purge in one batch */
1898 	bool	truncate)		/*!< in: truncate history if true */
1899 {
1900 	que_thr_t*	thr = NULL;
1901 	ulint		n_pages_handled;
1902 
1903 	ut_a(n_purge_threads > 0);
1904 
1905 	srv_dml_needed_delay = trx_purge_dml_delay();
1906 
1907 	/* The number of tasks submitted should be completed. */
1908 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1909 
1910 	rw_lock_x_lock(&purge_sys->latch);
1911 
1912 	purge_sys->view_active = false;
1913 
1914 	trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
1915 
1916 	purge_sys->view_active = true;
1917 
1918 	rw_lock_x_unlock(&purge_sys->latch);
1919 
1920 #ifdef UNIV_DEBUG
1921 	if (srv_purge_view_update_only_debug) {
1922 		return(0);
1923 	}
1924 #endif /* UNIV_DEBUG */
1925 
1926 	/* Fetch the UNDO recs that need to be purged. */
1927 	n_pages_handled = trx_purge_attach_undo_recs(
1928 		n_purge_threads, purge_sys, batch_size);
1929 
1930 	/* Do we do an asynchronous purge or not ? */
1931 	if (n_purge_threads > 1) {
1932 
1933 		/* Submit the tasks to the work queue. */
1934 		for (ulint i = 0; i < n_purge_threads - 1; ++i) {
1935 			thr = que_fork_scheduler_round_robin(
1936 				purge_sys->query, thr);
1937 
1938 			ut_a(thr != NULL);
1939 
1940 			srv_que_task_enqueue_low(thr);
1941 		}
1942 
1943 		thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
1944 		ut_a(thr != NULL);
1945 
1946 		purge_sys->n_submitted += n_purge_threads - 1;
1947 
1948 		goto run_synchronously;
1949 
1950 	/* Do it synchronously. */
1951 	} else {
1952 		thr = que_fork_scheduler_round_robin(purge_sys->query, NULL);
1953 		ut_ad(thr);
1954 
1955 run_synchronously:
1956 		++purge_sys->n_submitted;
1957 
1958 		que_run_threads(thr);
1959 
1960 		os_atomic_inc_ulint(
1961 			&purge_sys->pq_mutex, &purge_sys->n_completed, 1);
1962 
1963 		if (n_purge_threads > 1) {
1964 			trx_purge_wait_for_workers_to_complete(purge_sys);
1965 		}
1966 	}
1967 
1968 	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1969 
1970 #ifdef UNIV_DEBUG
1971 	rw_lock_x_lock(&purge_sys->latch);
1972 	if (purge_sys->limit.trx_no == 0) {
1973 		purge_sys->done = purge_sys->iter;
1974 	} else {
1975 		purge_sys->done = purge_sys->limit;
1976 	}
1977 	rw_lock_x_unlock(&purge_sys->latch);
1978 #endif /* UNIV_DEBUG */
1979 
1980 	if (truncate) {
1981 		trx_purge_truncate();
1982 	}
1983 
1984 	MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
1985 	MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled);
1986 
1987 	return(n_pages_handled);
1988 }
1989 
1990 /*******************************************************************//**
1991 Get the purge state.
1992 @return purge state. */
1993 purge_state_t
trx_purge_state(void)1994 trx_purge_state(void)
1995 /*=================*/
1996 {
1997 	purge_state_t	state;
1998 
1999 	rw_lock_x_lock(&purge_sys->latch);
2000 
2001 	state = purge_sys->state;
2002 
2003 	rw_lock_x_unlock(&purge_sys->latch);
2004 
2005 	return(state);
2006 }
2007 
2008 /*******************************************************************//**
2009 Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
2010 void
trx_purge_stop(void)2011 trx_purge_stop(void)
2012 /*================*/
2013 {
2014 	purge_state_t	state;
2015 	int64_t		sig_count = os_event_reset(purge_sys->event);
2016 
2017 	ut_a(srv_n_purge_threads > 0);
2018 
2019 	rw_lock_x_lock(&purge_sys->latch);
2020 
2021 	ut_a(purge_sys->state != PURGE_STATE_INIT);
2022 	ut_a(purge_sys->state != PURGE_STATE_EXIT);
2023 	ut_a(purge_sys->state != PURGE_STATE_DISABLED);
2024 
2025 	++purge_sys->n_stop;
2026 
2027 	state = purge_sys->state;
2028 
2029 	if (state == PURGE_STATE_RUN) {
2030 		ib::info() << "Stopping purge";
2031 
2032 		/* We need to wakeup the purge thread in case it is suspended,
2033 		so that it can acknowledge the state change. */
2034 
2035 		srv_purge_wakeup();
2036 	}
2037 
2038 	purge_sys->state = PURGE_STATE_STOP;
2039 
2040 	rw_lock_x_unlock(&purge_sys->latch);
2041 
2042 	if (state != PURGE_STATE_STOP) {
2043 
2044 		/* Wait for purge coordinator to signal that it
2045 		is suspended. */
2046 		os_event_wait_low(purge_sys->event, sig_count);
2047 	} else {
2048 		bool	once = true;
2049 
2050 		rw_lock_x_lock(&purge_sys->latch);
2051 
2052 		/* Wait for purge to signal that it has actually stopped. */
2053 		while (purge_sys->running) {
2054 
2055 			if (once) {
2056 				ib::info() << "Waiting for purge to stop";
2057 				once = false;
2058 			}
2059 
2060 			rw_lock_x_unlock(&purge_sys->latch);
2061 
2062 			os_thread_sleep(10000);
2063 
2064 			rw_lock_x_lock(&purge_sys->latch);
2065 		}
2066 
2067 		rw_lock_x_unlock(&purge_sys->latch);
2068 	}
2069 
2070 	MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
2071 }
2072 
2073 /*******************************************************************//**
2074 Resume purge, move to PURGE_STATE_RUN. */
2075 void
trx_purge_run(void)2076 trx_purge_run(void)
2077 /*===============*/
2078 {
2079 	rw_lock_x_lock(&purge_sys->latch);
2080 
2081 	switch (purge_sys->state) {
2082 	case PURGE_STATE_INIT:
2083 	case PURGE_STATE_EXIT:
2084 	case PURGE_STATE_DISABLED:
2085 		ut_error;
2086 
2087 	case PURGE_STATE_RUN:
2088 	case PURGE_STATE_STOP:
2089 		break;
2090 	}
2091 
2092 	if (purge_sys->n_stop > 0) {
2093 
2094 		ut_a(purge_sys->state == PURGE_STATE_STOP);
2095 
2096 		--purge_sys->n_stop;
2097 
2098 		if (purge_sys->n_stop == 0) {
2099 
2100 			ib::info() << "Resuming purge";
2101 
2102 			purge_sys->state = PURGE_STATE_RUN;
2103 		}
2104 
2105 		MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
2106 	} else {
2107 		ut_a(purge_sys->state == PURGE_STATE_RUN);
2108 	}
2109 
2110 	rw_lock_x_unlock(&purge_sys->latch);
2111 
2112 	srv_purge_wakeup();
2113 }
2114