1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0purge.cc
22 Purge obsolete records
radar_factory(num_vars, frame='circle')23 
24 Created 3/14/1997 Heikki Tuuri
25 *******************************************************/
26 
27 #include "row0purge.h"
28 #include "fsp0fsp.h"
29 #include "mach0data.h"
30 #include "dict0stats.h"
31 #include "trx0rseg.h"
32 #include "trx0trx.h"
33 #include "trx0roll.h"
34 #include "trx0undo.h"
35 #include "trx0purge.h"
36 #include "trx0rec.h"
37 #include "que0que.h"
38 #include "row0row.h"
39 #include "row0upd.h"
40 #include "row0vers.h"
41 #include "row0mysql.h"
42 #include "row0log.h"
43 #include "log0log.h"
44 #include "srv0mon.h"
45 #include "srv0start.h"
46 #include "handler.h"
47 #include "ha_innodb.h"
48 #include "fil0fil.h"
49 #include "debug_sync.h"
50 
51 /*************************************************************************
52 IMPORTANT NOTE: Any operation that generates redo MUST check that there
53 is enough space in the redo log before for that operation. This is
54 done by calling log_free_check(). The reason for checking the
55 availability of the redo log space before the start of the operation is
56 that we MUST not hold any synchonization objects when performing the
57 check.
58 If you make a change in this module make sure that no codepath is
59 introduced where a call to log_free_check() is bypassed. */
60 
61 /***********************************************************//**
62 Repositions the pcur in the purge node on the clustered index record,
63 if found. If the record is not found, close pcur.
64 @return TRUE if the record was found */
65 static
66 ibool
67 row_purge_reposition_pcur(
68 /*======================*/
69 	ulint		mode,	/*!< in: latching mode */
70 	purge_node_t*	node,	/*!< in: row purge node */
71 	mtr_t*		mtr)	/*!< in: mtr */
72 {
73 	if (node->found_clust) {
74 		ut_ad(node->validate_pcur());
75 
76 		node->found_clust = btr_pcur_restore_position(mode, &node->pcur, mtr);
77 
78 	} else {
79 		node->found_clust = row_search_on_row_ref(
80 			&node->pcur, mode, node->table, node->ref, mtr);
81 
82 		if (node->found_clust) {
83 			btr_pcur_store_position(&node->pcur, mtr);
84 		}
85 	}
86 
87 	/* Close the current cursor if we fail to position it correctly. */
88 	if (!node->found_clust) {
89 		btr_pcur_close(&node->pcur);
90 	}
91 
92 	return(node->found_clust);
93 }
94 
95 /***********************************************************//**
96 Removes a delete marked clustered index record if possible.
97 @retval true if the row was not found, or it was successfully removed
98 @retval false if the row was modified after the delete marking */
99 static MY_ATTRIBUTE((nonnull, warn_unused_result))
100 bool
101 row_purge_remove_clust_if_poss_low(
102 /*===============================*/
103 	purge_node_t*	node,	/*!< in/out: row purge node */
104 	ulint		mode)	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE */
105 {
106 	dict_index_t*		index;
readData(f)107 	bool			success		= true;
108 	mtr_t			mtr;
109 	rec_t*			rec;
110 	mem_heap_t*		heap		= NULL;
111 	rec_offs*		offsets;
112 	rec_offs		offsets_[REC_OFFS_NORMAL_SIZE];
113 	rec_offs_init(offsets_);
114 
115 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_S)
116 	      || node->vcol_info.is_used());
117 
118 	index = dict_table_get_first_index(node->table);
119 
120 	log_free_check();
121 	mtr_start(&mtr);
122 	index->set_modified(mtr);
readTimers(f)123 
124 	if (!row_purge_reposition_pcur(mode, node, &mtr)) {
125 		/* The record was already removed. */
126 		goto func_exit;
127 	}
128 
129 	rec = btr_pcur_get_rec(&node->pcur);
130 
131 	offsets = rec_get_offsets(rec, index, offsets_, index->n_core_fields,
132 				  ULINT_UNDEFINED, &heap);
readCounters(f)133 
134 	if (node->roll_ptr != row_get_rec_roll_ptr(rec, index, offsets)) {
135 		/* Someone else has modified the record later: do not remove */
136 		goto func_exit;
137 	}
138 
139 	ut_ad(rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
140 	/* In delete-marked records, DB_TRX_ID must
141 	always refer to an existing undo log record. */
142 	ut_ad(row_get_rec_trx_id(rec, index, offsets));
143 
144 	if (mode == BTR_MODIFY_LEAF) {
145 		success = btr_cur_optimistic_delete(
146 			btr_pcur_get_btr_cur(&node->pcur), 0, &mtr);
147 	} else {
148 		dberr_t	err;
usefulValues(l)149 		ut_ad(mode == (BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE));
150 		btr_cur_pessimistic_delete(
151 			&err, FALSE, btr_pcur_get_btr_cur(&node->pcur), 0,
152 			false, &mtr);
uselessValues(l)153 
154 		switch (err) {
155 		case DB_SUCCESS:
156 			break;
157 		case DB_OUT_OF_FILE_SPACE:
158 			success = false;
159 			break;
160 		default:
161 			ut_error;
162 		}
163 	}
164 
165 func_exit:
166 	if (heap) {
167 		mem_heap_free(heap);
168 	}
169 
170 	/* Persistent cursor is closed if reposition fails. */
171 	if (node->found_clust) {
normalizeValues(data, countField, factor)172 		btr_pcur_commit_specify_mtr(&node->pcur, &mtr);
173 	} else {
174 		mtr_commit(&mtr);
175 	}
176 
setRadarFigure(titles)177 	return(success);
178 }
179 
180 /***********************************************************//**
181 Removes a clustered index record if it has not been modified after the delete
182 marking.
183 @retval true if the row was not found, or it was successfully removed
184 @retval false the purge needs to be suspended because of running out
185 of file space. */
186 static MY_ATTRIBUTE((nonnull, warn_unused_result))
187 bool
188 row_purge_remove_clust_if_poss(
189 /*===========================*/
190 	purge_node_t*	node)	/*!< in/out: row purge node */
drawRadarChart(data, kind, filebase, params, color)191 {
192 	if (row_purge_remove_clust_if_poss_low(node, BTR_MODIFY_LEAF)) {
193 		return(true);
194 	}
195 
196 	for (ulint n_tries = 0;
197 	     n_tries < BTR_CUR_RETRY_DELETE_N_TIMES;
198 	     n_tries++) {
199 		if (row_purge_remove_clust_if_poss_low(
200 			    node, BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE)) {
201 			return(true);
202 		}
203 
204 		os_thread_sleep(BTR_CUR_RETRY_SLEEP_TIME);
205 	}
206 
207 	return(false);
208 }
209 
210 /** Tries to store secondary index cursor before openin mysql table for
211 virtual index condition computation.
212 @param[in,out]	node		row purge node
213 @param[in]	index		secondary index
214 @param[in,out]	sec_pcur	secondary index cursor
derivedTimerStats(data)215 @param[in,out]	sec_mtr		mini-transaction which holds
216 				secondary index entry */
217 static void row_purge_store_vsec_cur(
218         purge_node_t*   node,
219         dict_index_t*   index,
220         btr_pcur_t*     sec_pcur,
221         mtr_t*          sec_mtr)
222 {
223 	row_purge_reposition_pcur(BTR_SEARCH_LEAF, node, sec_mtr);
224 
225 	if (!node->found_clust) {
226 		return;
227 	}
228 
229 	node->vcol_info.set_requested();
230 
231 	btr_pcur_store_position(sec_pcur, sec_mtr);
232 
233 	btr_pcurs_commit_specify_mtr(&node->pcur, sec_pcur, sec_mtr);
234 }
235 
236 /** Tries to restore secondary index cursor after opening the mysql table
237 @param[in,out]	node	row purge node
238 @param[in]	index	secondary index
239 @param[in,out]	sec_mtr	mini-transaction which holds secondary index entry
240 @param[in]	is_tree	true=pessimistic purge,
drawMainPie(data, filebase, colors)241 			false=optimistic (leaf-page only)
242 @return false in case of restore failure. */
243 static bool row_purge_restore_vsec_cur(
244 	purge_node_t*	node,
245 	dict_index_t*	index,
246 	btr_pcur_t*	sec_pcur,
247 	mtr_t*		sec_mtr,
248 	bool		is_tree)
249 {
250 	sec_mtr->start();
251 	index->set_modified(*sec_mtr);
252 
253 	return btr_pcur_restore_position(
254 		is_tree ? BTR_PURGE_TREE : BTR_PURGE_LEAF,
255 		sec_pcur, sec_mtr);
256 }
257 
258 /** Determines if it is possible to remove a secondary index entry.
259 Removal is possible if the secondary index entry does not refer to any
260 not delete marked version of a clustered index record where DB_TRX_ID
261 is newer than the purge view.
262 
263 NOTE: This function should only be called by the purge thread, only
264 while holding a latch on the leaf page of the secondary index entry
265 (or keeping the buffer pool watch on the page).  It is possible that
266 this function first returns true and then false, if a user transaction
267 inserts a record that the secondary index entry would refer to.
main()268 However, in that case, the user transaction would also re-insert the
269 secondary index entry after purge has removed it and released the leaf
270 page latch.
271 @param[in,out]	node		row purge node
272 @param[in]	index		secondary index
273 @param[in]	entry		secondary index entry
274 @param[in,out]	sec_pcur	secondary index cursor or NULL
275 				if it is called for purge buffering
276 				operation.
277 @param[in,out]	sec_mtr		mini-transaction which holds
278 				secondary index entry or NULL if it is
279 				called for purge buffering operation.
280 @param[in]	is_tree		true=pessimistic purge,
281 				false=optimistic (leaf-page only)
282 @return true if the secondary index record can be purged */
283 bool
284 row_purge_poss_sec(
285 	purge_node_t*	node,
286 	dict_index_t*	index,
287 	const dtuple_t*	entry,
288 	btr_pcur_t*	sec_pcur,
289 	mtr_t*		sec_mtr,
290 	bool		is_tree)
291 {
292 	bool	can_delete;
293 	mtr_t	mtr;
294 
295 	ut_ad(!dict_index_is_clust(index));
296 
297 	const bool	store_cur = sec_mtr && !node->vcol_info.is_used()
298 		&& dict_index_has_virtual(index);
299 
300 	if (store_cur) {
301 		row_purge_store_vsec_cur(node, index, sec_pcur, sec_mtr);
302 		ut_ad(sec_mtr->has_committed()
303 		      == node->vcol_info.is_requested());
304 
305 		/* The PRIMARY KEY value was not found in the clustered
306 		index. The secondary index record found. We can purge
307 		the secondary index record. */
308 		if (!node->vcol_info.is_requested()) {
309 			ut_ad(!node->found_clust);
310 			return true;
311 		}
312 	}
313 
314 retry_purge_sec:
315 	mtr_start(&mtr);
316 
317 	can_delete = !row_purge_reposition_pcur(BTR_SEARCH_LEAF, node, &mtr)
318 		|| !row_vers_old_has_index_entry(true,
319 						 btr_pcur_get_rec(&node->pcur),
320 						 &mtr, index, entry,
321 						 node->roll_ptr, node->trx_id,
322 						 &node->vcol_info);
323 
324 	if (node->vcol_info.is_first_fetch()) {
325 		ut_ad(store_cur);
326 
327 		const TABLE* t= node->vcol_info.table();
328 		DBUG_LOG("purge", "retry " << t
329 			 << (is_tree ? " tree" : " leaf")
330 			 << index->name << "," << index->table->name
331 			 << ": " << rec_printer(entry).str());
332 
333 		ut_ad(mtr.has_committed());
334 
335 		if (t) {
336 			node->vcol_info.set_used();
337 			goto retry_purge_sec;
338 		}
339 
340 		node->table = NULL;
341 		sec_pcur = NULL;
342 		return false;
343 	}
344 
345 	/* Persistent cursor is closed if reposition fails. */
346 	if (node->found_clust) {
347 		btr_pcur_commit_specify_mtr(&node->pcur, &mtr);
348 	} else {
349 		mtr.commit();
350 	}
351 
352 	ut_ad(mtr.has_committed());
353 
354 	/* If the virtual column info is not used then reset the virtual column
355 	info. */
356 	if (node->vcol_info.is_requested()
357 	    && !node->vcol_info.is_used()) {
358 		node->vcol_info.reset();
359 	}
360 
361 	if (store_cur && !row_purge_restore_vsec_cur(
362 		    node, index, sec_pcur, sec_mtr, is_tree)) {
363 		return false;
364 	}
365 
366 	return can_delete;
367 }
368 
369 /***************************************************************
370 Removes a secondary index entry if possible, by modifying the
371 index tree.  Does not try to buffer the delete.
372 @return TRUE if success or if not found */
373 static MY_ATTRIBUTE((nonnull, warn_unused_result))
374 ibool
375 row_purge_remove_sec_if_poss_tree(
376 /*==============================*/
377 	purge_node_t*	node,	/*!< in: row purge node */
378 	dict_index_t*	index,	/*!< in: index */
379 	const dtuple_t*	entry)	/*!< in: index entry */
380 {
381 	btr_pcur_t		pcur;
382 	ibool			success	= TRUE;
383 	dberr_t			err;
384 	mtr_t			mtr;
385 	enum row_search_result	search_result;
386 
387 	log_free_check();
388 	mtr.start();
389 	index->set_modified(mtr);
390 
391 	if (!index->is_committed()) {
392 		/* The index->online_status may change if the index is
393 		or was being created online, but not committed yet. It
394 		is protected by index->lock. */
395 		mtr_sx_lock_index(index, &mtr);
396 
397 		if (dict_index_is_online_ddl(index)) {
398 			/* Online secondary index creation will not
399 			copy any delete-marked records. Therefore
400 			there is nothing to be purged. We must also
401 			skip the purge when a completed index is
402 			dropped by rollback_inplace_alter_table(). */
403 			goto func_exit_no_pcur;
404 		}
405 	} else {
406 		/* For secondary indexes,
407 		index->online_status==ONLINE_INDEX_COMPLETE if
408 		index->is_committed(). */
409 		ut_ad(!dict_index_is_online_ddl(index));
410 	}
411 
412 	search_result = row_search_index_entry(
413 				index, entry,
414 				BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
415 				&pcur, &mtr);
416 
417 	switch (search_result) {
418 	case ROW_NOT_FOUND:
419 		/* Not found.  This is a legitimate condition.  In a
420 		rollback, InnoDB will remove secondary recs that would
421 		be purged anyway.  Then the actual purge will not find
422 		the secondary index record.  Also, the purge itself is
423 		eager: if it comes to consider a secondary index
424 		record, and notices it does not need to exist in the
425 		index, it will remove it.  Then if/when the purge
426 		comes to consider the secondary index record a second
427 		time, it will not exist any more in the index. */
428 
429 		/* fputs("PURGE:........sec entry not found\n", stderr); */
430 		/* dtuple_print(stderr, entry); */
431 		goto func_exit;
432 	case ROW_FOUND:
433 		break;
434 	case ROW_BUFFERED:
435 	case ROW_NOT_DELETED_REF:
436 		/* These are invalid outcomes, because the mode passed
437 		to row_search_index_entry() did not include any of the
438 		flags BTR_INSERT, BTR_DELETE, or BTR_DELETE_MARK. */
439 		ut_error;
440 	}
441 
442 	/* We should remove the index record if no later version of the row,
443 	which cannot be purged yet, requires its existence. If some requires,
444 	we should do nothing. */
445 
446 	if (row_purge_poss_sec(node, index, entry, &pcur, &mtr, true)) {
447 
448 		/* Remove the index record, which should have been
449 		marked for deletion. */
450 		if (!rec_get_deleted_flag(btr_cur_get_rec(
451 						btr_pcur_get_btr_cur(&pcur)),
452 					  dict_table_is_comp(index->table))) {
453 			ib::error()
454 				<< "tried to purge non-delete-marked record"
455 				" in index " << index->name
456 				<< " of table " << index->table->name
457 				<< ": tuple: " << *entry
458 				<< ", record: " << rec_index_print(
459 					btr_cur_get_rec(
460 						btr_pcur_get_btr_cur(&pcur)),
461 					index);
462 
463 			ut_ad(0);
464 
465 			goto func_exit;
466 		}
467 
468 		btr_cur_pessimistic_delete(&err, FALSE,
469 					   btr_pcur_get_btr_cur(&pcur),
470 					   0, false, &mtr);
471 		switch (UNIV_EXPECT(err, DB_SUCCESS)) {
472 		case DB_SUCCESS:
473 			break;
474 		case DB_OUT_OF_FILE_SPACE:
475 			success = FALSE;
476 			break;
477 		default:
478 			ut_error;
479 		}
480 	}
481 
482 	if (node->vcol_op_failed()) {
483 		ut_ad(mtr.has_committed());
484 		ut_ad(!pcur.old_rec_buf);
485 		ut_ad(pcur.pos_state == BTR_PCUR_NOT_POSITIONED);
486 		return false;
487 	}
488 
489 func_exit:
490 	btr_pcur_close(&pcur); // FIXME: need this?
491 func_exit_no_pcur:
492 	mtr.commit();
493 
494 	return(success);
495 }
496 
497 /***************************************************************
498 Removes a secondary index entry without modifying the index tree,
499 if possible.
500 @retval true if success or if not found
501 @retval false if row_purge_remove_sec_if_poss_tree() should be invoked */
502 static MY_ATTRIBUTE((nonnull, warn_unused_result))
503 bool
504 row_purge_remove_sec_if_poss_leaf(
505 /*==============================*/
506 	purge_node_t*	node,	/*!< in: row purge node */
507 	dict_index_t*	index,	/*!< in: index */
508 	const dtuple_t*	entry)	/*!< in: index entry */
509 {
510 	mtr_t			mtr;
511 	btr_pcur_t		pcur;
512 	enum btr_latch_mode	mode;
513 	enum row_search_result	search_result;
514 	bool			success	= true;
515 
516 	log_free_check();
517 	ut_ad(index->table == node->table);
518 	ut_ad(!index->table->is_temporary());
519 	mtr.start();
520 	index->set_modified(mtr);
521 
522 	if (!index->is_committed()) {
523 		/* For uncommitted spatial index, we also skip the purge. */
524 		if (dict_index_is_spatial(index)) {
525 			goto func_exit_no_pcur;
526 		}
527 
528 		/* The index->online_status may change if the the
529 		index is or was being created online, but not
530 		committed yet. It is protected by index->lock. */
531 		mtr_s_lock_index(index, &mtr);
532 
533 		if (dict_index_is_online_ddl(index)) {
534 			/* Online secondary index creation will not
535 			copy any delete-marked records. Therefore
536 			there is nothing to be purged. We must also
537 			skip the purge when a completed index is
538 			dropped by rollback_inplace_alter_table(). */
539 			goto func_exit_no_pcur;
540 		}
541 
542 		mode = BTR_PURGE_LEAF_ALREADY_S_LATCHED;
543 	} else {
544 		/* For secondary indexes,
545 		index->online_status==ONLINE_INDEX_COMPLETE if
546 		index->is_committed(). */
547 		ut_ad(!dict_index_is_online_ddl(index));
548 
549 		/* Change buffering is disabled for spatial index and
550 		virtual index. */
551 		mode = (dict_index_is_spatial(index)
552 			|| dict_index_has_virtual(index))
553 			? BTR_MODIFY_LEAF
554 			: BTR_PURGE_LEAF;
555 	}
556 
557 	/* Set the purge node for the call to row_purge_poss_sec(). */
558 	pcur.btr_cur.purge_node = node;
559 	if (dict_index_is_spatial(index)) {
560 		rw_lock_sx_lock(dict_index_get_lock(index));
561 		pcur.btr_cur.thr = NULL;
562 	} else {
563 		/* Set the query thread, so that ibuf_insert_low() will be
564 		able to invoke thd_get_trx(). */
565 		pcur.btr_cur.thr = static_cast<que_thr_t*>(
566 			que_node_get_parent(node));
567 	}
568 
569 	search_result = row_search_index_entry(
570 		index, entry, mode, &pcur, &mtr);
571 
572 	if (dict_index_is_spatial(index)) {
573 		rw_lock_sx_unlock(dict_index_get_lock(index));
574 	}
575 
576 	switch (search_result) {
577 	case ROW_FOUND:
578 		/* Before attempting to purge a record, check
579 		if it is safe to do so. */
580 		if (row_purge_poss_sec(node, index, entry, &pcur, &mtr, false)) {
581 			btr_cur_t* btr_cur = btr_pcur_get_btr_cur(&pcur);
582 
583 			/* Only delete-marked records should be purged. */
584 			if (!rec_get_deleted_flag(
585 				btr_cur_get_rec(btr_cur),
586 				dict_table_is_comp(index->table))) {
587 
588 				ib::error()
589 					<< "tried to purge non-delete-marked"
590 					" record" " in index " << index->name
591 					<< " of table " << index->table->name
592 					<< ": tuple: " << *entry
593 					<< ", record: "
594 					<< rec_index_print(
595 						btr_cur_get_rec(btr_cur),
596 						index);
597 				ut_ad(0);
598 
599 				btr_pcur_close(&pcur);
600 
601 				goto func_exit_no_pcur;
602 			}
603 
604 			if (dict_index_is_spatial(index)) {
605 				const page_t*   page;
606 				const trx_t*	trx = NULL;
607 
608 				if (btr_cur->rtr_info != NULL
609 				    && btr_cur->rtr_info->thr != NULL) {
610 					trx = thr_get_trx(
611 						btr_cur->rtr_info->thr);
612 				}
613 
614 				page = btr_cur_get_page(btr_cur);
615 
616 				if (!lock_test_prdt_page_lock(
617 					    trx,
618 					    page_get_space_id(page),
619 					    page_get_page_no(page))
620 				    && page_get_n_recs(page) < 2
621 				    && btr_cur_get_block(btr_cur)
622 				    ->page.id.page_no() !=
623 				    dict_index_get_page(index)) {
624 					/* this is the last record on page,
625 					and it has a "page" lock on it,
626 					which mean search is still depending
627 					on it, so do not delete */
628 					DBUG_LOG("purge",
629 						 "skip purging last"
630 						 " record on page "
631 						 << btr_cur_get_block(btr_cur)
632 						 ->page.id);
633 
634 					btr_pcur_close(&pcur);
635 					mtr.commit();
636 					return(success);
637 				}
638 			}
639 
640 			if (!btr_cur_optimistic_delete(btr_cur, 0, &mtr)) {
641 
642 				/* The index entry could not be deleted. */
643 				success = false;
644 			}
645 		}
646 
647 		if (node->vcol_op_failed()) {
648 			btr_pcur_close(&pcur);
649 			return false;
650 		}
651 
652 		/* (The index entry is still needed,
653 		or the deletion succeeded) */
654 		/* fall through */
655 	case ROW_NOT_DELETED_REF:
656 		/* The index entry is still needed. */
657 	case ROW_BUFFERED:
658 		/* The deletion was buffered. */
659 	case ROW_NOT_FOUND:
660 		/* The index entry does not exist, nothing to do. */
661 		btr_pcur_close(&pcur); // FIXME: do we need these? when is btr_cur->rtr_info set?
662 func_exit_no_pcur:
663 		mtr.commit();
664 		return(success);
665 	}
666 
667 	ut_error;
668 	return(false);
669 }
670 
671 /***********************************************************//**
672 Removes a secondary index entry if possible. */
673 UNIV_INLINE MY_ATTRIBUTE((nonnull(1,2)))
674 void
675 row_purge_remove_sec_if_poss(
676 /*=========================*/
677 	purge_node_t*	node,	/*!< in: row purge node */
678 	dict_index_t*	index,	/*!< in: index */
679 	const dtuple_t*	entry)	/*!< in: index entry */
680 {
681 	ibool	success;
682 	ulint	n_tries		= 0;
683 
684 	/*	fputs("Purge: Removing secondary record\n", stderr); */
685 
686 	if (!entry) {
687 		/* The node->row must have lacked some fields of this
688 		index. This is possible when the undo log record was
689 		written before this index was created. */
690 		return;
691 	}
692 
693 	if (row_purge_remove_sec_if_poss_leaf(node, index, entry)) {
694 
695 		return;
696 	}
697 retry:
698 	if (node->vcol_op_failed()) {
699 		return;
700 	}
701 
702 	success = row_purge_remove_sec_if_poss_tree(node, index, entry);
703 	/* The delete operation may fail if we have little
704 	file space left: TODO: easiest to crash the database
705 	and restart with more file space */
706 
707 	if (!success && n_tries < BTR_CUR_RETRY_DELETE_N_TIMES) {
708 
709 		n_tries++;
710 
711 		os_thread_sleep(BTR_CUR_RETRY_SLEEP_TIME);
712 
713 		goto retry;
714 	}
715 
716 	ut_a(success);
717 }
718 
719 /** Skip uncommitted virtual indexes on newly added virtual column.
720 @param[in,out]	index	dict index object */
721 static
722 inline
723 void
724 row_purge_skip_uncommitted_virtual_index(
725 	dict_index_t*&	index)
726 {
727 	/* We need to skip virtual indexes which is not
728 	committed yet. It's safe because these indexes are
729 	newly created by alter table, and because we do
730 	not support LOCK=NONE when adding an index on newly
731 	added virtual column.*/
732 	while (index != NULL && dict_index_has_virtual(index)
733 	       && !index->is_committed() && index->has_new_v_col()) {
734 		index = dict_table_get_next_index(index);
735 	}
736 }
737 
738 /***********************************************************//**
739 Purges a delete marking of a record.
740 @retval true if the row was not found, or it was successfully removed
741 @retval false the purge needs to be suspended because of
742 running out of file space */
743 static MY_ATTRIBUTE((nonnull, warn_unused_result))
744 bool
745 row_purge_del_mark(
746 /*===============*/
747 	purge_node_t*	node)	/*!< in/out: row purge node */
748 {
749 	mem_heap_t*	heap;
750 
751 	heap = mem_heap_create(1024);
752 
753 	while (node->index != NULL) {
754 		/* skip corrupted secondary index */
755 		dict_table_skip_corrupt_index(node->index);
756 
757 		row_purge_skip_uncommitted_virtual_index(node->index);
758 
759 		if (!node->index) {
760 			break;
761 		}
762 
763 		if (node->index->type != DICT_FTS) {
764 			dtuple_t*	entry = row_build_index_entry_low(
765 				node->row, NULL, node->index,
766 				heap, ROW_BUILD_FOR_PURGE);
767 			row_purge_remove_sec_if_poss(node, node->index, entry);
768 
769 			if (node->vcol_op_failed()) {
770 				mem_heap_free(heap);
771 				return false;
772 			}
773 
774 			mem_heap_empty(heap);
775 		}
776 
777 		node->index = dict_table_get_next_index(node->index);
778 	}
779 
780 	mem_heap_free(heap);
781 
782 	return(row_purge_remove_clust_if_poss(node));
783 }
784 
785 /** Reset DB_TRX_ID, DB_ROLL_PTR of a clustered index record
786 whose old history can no longer be observed.
787 @param[in,out]	node	purge node
788 @param[in,out]	mtr	mini-transaction (will be started and committed) */
789 static void row_purge_reset_trx_id(purge_node_t* node, mtr_t* mtr)
790 {
791 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_S)
792 	      || node->vcol_info.is_used());
793 	/* Reset DB_TRX_ID, DB_ROLL_PTR for old records. */
794 	mtr->start();
795 
796 	if (row_purge_reposition_pcur(BTR_MODIFY_LEAF, node, mtr)) {
797 		dict_index_t*	index = dict_table_get_first_index(
798 			node->table);
799 		ulint	trx_id_pos = index->n_uniq ? index->n_uniq : 1;
800 		rec_t*	rec = btr_pcur_get_rec(&node->pcur);
801 		mem_heap_t*	heap = NULL;
802 		/* Reserve enough offsets for the PRIMARY KEY and 2 columns
803 		so that we can access DB_TRX_ID, DB_ROLL_PTR. */
804 		rec_offs offsets_[REC_OFFS_HEADER_SIZE + MAX_REF_PARTS + 2];
805 		rec_offs_init(offsets_);
806 		rec_offs*	offsets = rec_get_offsets(
807 			rec, index, offsets_, index->n_core_fields,
808 			trx_id_pos + 2, &heap);
809 		ut_ad(heap == NULL);
810 
811 		ut_ad(dict_index_get_nth_field(index, trx_id_pos)
812 		      ->col->mtype == DATA_SYS);
813 		ut_ad(dict_index_get_nth_field(index, trx_id_pos)
814 		      ->col->prtype == (DATA_TRX_ID | DATA_NOT_NULL));
815 		ut_ad(dict_index_get_nth_field(index, trx_id_pos + 1)
816 		      ->col->mtype == DATA_SYS);
817 		ut_ad(dict_index_get_nth_field(index, trx_id_pos + 1)
818 		      ->col->prtype == (DATA_ROLL_PTR | DATA_NOT_NULL));
819 
820 		/* Only update the record if DB_ROLL_PTR matches (the
821 		record has not been modified after this transaction
822 		became purgeable) */
823 		if (node->roll_ptr
824 		    == row_get_rec_roll_ptr(rec, index, offsets)) {
825 			ut_ad(!rec_get_deleted_flag(rec,
826 						    rec_offs_comp(offsets)));
827 			DBUG_LOG("purge", "reset DB_TRX_ID="
828 				 << ib::hex(row_get_rec_trx_id(
829 						    rec, index, offsets)));
830 
831 			index->set_modified(*mtr);
832 			if (page_zip_des_t* page_zip
833 			    = buf_block_get_page_zip(
834 				    btr_pcur_get_block(&node->pcur))) {
835 				page_zip_write_trx_id_and_roll_ptr(
836 					page_zip, rec, offsets, trx_id_pos,
837 					0, 1ULL << ROLL_PTR_INSERT_FLAG_POS,
838 					mtr);
839 			} else {
840 				ulint	len;
841 				byte*	ptr = rec_get_nth_field(
842 					rec, offsets, trx_id_pos, &len);
843 				ut_ad(len == DATA_TRX_ID_LEN);
844 				mlog_write_string(ptr, reset_trx_id,
845 						  sizeof reset_trx_id, mtr);
846 			}
847 		}
848 	}
849 
850 	mtr->commit();
851 }
852 
853 /***********************************************************//**
854 Purges an update of an existing record. Also purges an update of a delete
855 marked record if that record contained an externally stored field. */
856 static
857 void
858 row_purge_upd_exist_or_extern_func(
859 /*===============================*/
860 #ifdef UNIV_DEBUG
861 	const que_thr_t*thr,		/*!< in: query thread */
862 #endif /* UNIV_DEBUG */
863 	purge_node_t*	node,		/*!< in: row purge node */
864 	trx_undo_rec_t*	undo_rec)	/*!< in: record to purge */
865 {
866 	mem_heap_t*	heap;
867 
868 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_S)
869 	      || node->vcol_info.is_used());
870 	ut_ad(!node->table->skip_alter_undo);
871 
872 	if (node->rec_type == TRX_UNDO_UPD_DEL_REC
873 	    || (node->cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
874 
875 		goto skip_secondaries;
876 	}
877 
878 	heap = mem_heap_create(1024);
879 
880 	while (node->index != NULL) {
881 		dict_table_skip_corrupt_index(node->index);
882 
883 		row_purge_skip_uncommitted_virtual_index(node->index);
884 
885 		if (!node->index) {
886 			break;
887 		}
888 
889 		if (row_upd_changes_ord_field_binary(node->index, node->update,
890 						     thr, NULL, NULL)) {
891 			/* Build the older version of the index entry */
892 			dtuple_t*	entry = row_build_index_entry_low(
893 				node->row, NULL, node->index,
894 				heap, ROW_BUILD_FOR_PURGE);
895 			row_purge_remove_sec_if_poss(node, node->index, entry);
896 
897 			if (node->vcol_op_failed()) {
898 				ut_ad(!node->table);
899 				mem_heap_free(heap);
900 				return;
901 			}
902 			ut_ad(node->table);
903 
904 			mem_heap_empty(heap);
905 		}
906 
907 		node->index = dict_table_get_next_index(node->index);
908 	}
909 
910 	mem_heap_free(heap);
911 
912 skip_secondaries:
913 	mtr_t		mtr;
914 	dict_index_t*	index = dict_table_get_first_index(node->table);
915 	/* Free possible externally stored fields */
916 	for (ulint i = 0; i < upd_get_n_fields(node->update); i++) {
917 
918 		const upd_field_t*	ufield
919 			= upd_get_nth_field(node->update, i);
920 
921 		if (dfield_is_ext(&ufield->new_val)) {
922 			trx_rseg_t*	rseg;
923 			buf_block_t*	block;
924 			ulint		internal_offset;
925 			byte*		data_field;
926 			ibool		is_insert;
927 			ulint		rseg_id;
928 			ulint		page_no;
929 			ulint		offset;
930 
931 			/* We use the fact that new_val points to
932 			undo_rec and get thus the offset of
933 			dfield data inside the undo record. Then we
934 			can calculate from node->roll_ptr the file
935 			address of the new_val data */
936 
937 			internal_offset = ulint(
938 				static_cast<const byte*>
939 				(dfield_get_data(&ufield->new_val))
940 				- undo_rec);
941 
942 			ut_a(internal_offset < srv_page_size);
943 
944 			trx_undo_decode_roll_ptr(node->roll_ptr,
945 						 &is_insert, &rseg_id,
946 						 &page_no, &offset);
947 
948 			rseg = trx_sys.rseg_array[rseg_id];
949 
950 			ut_a(rseg != NULL);
951 			ut_ad(rseg->id == rseg_id);
952 			ut_ad(rseg->is_persistent());
953 
954 			mtr.start();
955 
956 			/* We have to acquire an SX-latch to the clustered
957 			index tree (exclude other tree changes) */
958 
959 			mtr_sx_lock_index(index, &mtr);
960 
961 			index->set_modified(mtr);
962 
963 			/* NOTE: we must also acquire an X-latch to the
964 			root page of the tree. We will need it when we
965 			free pages from the tree. If the tree is of height 1,
966 			the tree X-latch does NOT protect the root page,
967 			because it is also a leaf page. Since we will have a
968 			latch on an undo log page, we would break the
969 			latching order if we would only later latch the
970 			root page of such a tree! */
971 
972 			btr_root_get(index, &mtr);
973 
974 			block = buf_page_get(
975 				page_id_t(rseg->space->id, page_no),
976 				univ_page_size, RW_X_LATCH, &mtr);
977 
978 			buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE);
979 
980 			data_field = buf_block_get_frame(block)
981 				+ offset + internal_offset;
982 
983 			ut_a(dfield_get_len(&ufield->new_val)
984 			     >= BTR_EXTERN_FIELD_REF_SIZE);
985 			btr_free_externally_stored_field(
986 				index,
987 				data_field + dfield_get_len(&ufield->new_val)
988 				- BTR_EXTERN_FIELD_REF_SIZE,
989 				NULL, NULL, NULL, 0, false, &mtr);
990 			mtr.commit();
991 		}
992 	}
993 
994 	row_purge_reset_trx_id(node, &mtr);
995 }
996 
997 #ifdef UNIV_DEBUG
998 # define row_purge_upd_exist_or_extern(thr,node,undo_rec)	\
999 	row_purge_upd_exist_or_extern_func(thr,node,undo_rec)
1000 #else /* UNIV_DEBUG */
1001 # define row_purge_upd_exist_or_extern(thr,node,undo_rec)	\
1002 	row_purge_upd_exist_or_extern_func(node,undo_rec)
1003 #endif /* UNIV_DEBUG */
1004 
1005 /***********************************************************//**
1006 Parses the row reference and other info in a modify undo log record.
1007 @return true if purge operation required */
1008 static
1009 bool
1010 row_purge_parse_undo_rec(
1011 /*=====================*/
1012 	purge_node_t*		node,		/*!< in: row undo node */
1013 	trx_undo_rec_t*		undo_rec,	/*!< in: record to purge */
1014 	bool*			updated_extern, /*!< out: true if an externally
1015 						stored field was updated */
1016 	que_thr_t*		thr)		/*!< in: query thread */
1017 {
1018 	dict_index_t*	clust_index;
1019 	byte*		ptr;
1020 	undo_no_t	undo_no;
1021 	table_id_t	table_id;
1022 	roll_ptr_t	roll_ptr;
1023 	ulint		info_bits;
1024 	ulint		type;
1025 
1026 	ut_ad(node != NULL);
1027 	ut_ad(thr != NULL);
1028 
1029 	ptr = trx_undo_rec_get_pars(
1030 		undo_rec, &type, &node->cmpl_info,
1031 		updated_extern, &undo_no, &table_id);
1032 
1033 	node->rec_type = type;
1034 
1035 	switch (type) {
1036 	case TRX_UNDO_RENAME_TABLE:
1037 		return false;
1038 	case TRX_UNDO_INSERT_METADATA:
1039 	case TRX_UNDO_INSERT_REC:
1040 		/* These records do not store any transaction identifier.
1041 
1042 		FIXME: Update SYS_TABLES.ID on both DISCARD TABLESPACE
1043 		and IMPORT TABLESPACE to get rid of the repeated lookups! */
1044 		node->trx_id = TRX_ID_MAX;
1045 		break;
1046 	default:
1047 #ifdef UNIV_DEBUG
1048 		ut_ad(!"unknown undo log record type");
1049 		return false;
1050 	case TRX_UNDO_UPD_DEL_REC:
1051 	case TRX_UNDO_UPD_EXIST_REC:
1052 	case TRX_UNDO_DEL_MARK_REC:
1053 #endif /* UNIV_DEBUG */
1054 		ptr = trx_undo_update_rec_get_sys_cols(ptr, &node->trx_id,
1055 						       &roll_ptr, &info_bits);
1056 		break;
1057 	}
1058 
1059 	if (node->is_skipped(table_id)) {
1060 		return false;
1061 	}
1062 
1063 	/* Prevent DROP TABLE etc. from running when we are doing the purge
1064 	for this row */
1065 
1066 try_again:
1067 	rw_lock_s_lock_inline(&dict_operation_lock, 0, __FILE__, __LINE__);
1068 
1069 	node->table = dict_table_open_on_id(
1070 		table_id, FALSE, DICT_TABLE_OP_NORMAL);
1071 
1072 	trx_id_t trx_id = TRX_ID_MAX;
1073 
1074 	if (node->table == NULL) {
1075 		/* The table has been dropped: no need to do purge */
1076 		goto err_exit;
1077 	}
1078 
1079 	ut_ad(!node->table->is_temporary());
1080 
1081 	if (!fil_table_accessible(node->table)) {
1082 		goto inaccessible;
1083 	}
1084 
1085 	switch (type) {
1086 	case TRX_UNDO_INSERT_METADATA:
1087 	case TRX_UNDO_INSERT_REC:
1088 		break;
1089 	default:
1090 		if (!node->table->n_v_cols || node->table->vc_templ
1091 		    || !dict_table_has_indexed_v_cols(node->table)) {
1092 			break;
1093 		}
1094 		/* Need server fully up for virtual column computation */
1095 		if (!mysqld_server_started) {
1096 
1097 			dict_table_close(node->table, FALSE, FALSE);
1098 			rw_lock_s_unlock(&dict_operation_lock);
1099 			if (srv_shutdown_state > SRV_SHUTDOWN_INITIATED) {
1100 				return(false);
1101 			}
1102 			os_thread_sleep(1000000);
1103 			goto try_again;
1104 		}
1105 
1106 		node->vcol_info.set_requested();
1107 		node->vcol_info.set_used();
1108 		node->vcol_info.set_table(innobase_init_vc_templ(node->table));
1109 		node->vcol_info.set_used();
1110 	}
1111 
1112 	clust_index = dict_table_get_first_index(node->table);
1113 
1114 	if (!clust_index || clust_index->is_corrupted()) {
1115 		/* The table was corrupt in the data dictionary.
1116 		dict_set_corrupted() works on an index, and
1117 		we do not have an index to call it with. */
1118 inaccessible:
1119 		DBUG_ASSERT(table_id == node->table->id);
1120 		trx_id = node->table->def_trx_id;
1121 		if (!trx_id) {
1122 			trx_id = TRX_ID_MAX;
1123 		}
1124 
1125 		dict_table_close(node->table, FALSE, FALSE);
1126 		node->table = NULL;
1127 err_exit:
1128 		rw_lock_s_unlock(&dict_operation_lock);
1129 		node->skip(table_id, trx_id);
1130 		return(false);
1131 	}
1132 
1133 	if (type == TRX_UNDO_INSERT_METADATA) {
1134 		node->ref = &trx_undo_metadata;
1135 		return(true);
1136 	}
1137 
1138 	ptr = trx_undo_rec_get_row_ref(ptr, clust_index, &(node->ref),
1139 				       node->heap);
1140 
1141 	if (type == TRX_UNDO_INSERT_REC) {
1142 		return(true);
1143 	}
1144 
1145 	ptr = trx_undo_update_rec_get_update(ptr, clust_index, type,
1146 					     node->trx_id,
1147 					     roll_ptr, info_bits,
1148 					     node->heap, &(node->update));
1149 
1150 	/* Read to the partial row the fields that occur in indexes */
1151 
1152 	if (!(node->cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1153 		ut_ad(!(node->update->info_bits & REC_INFO_MIN_REC_FLAG));
1154 		ptr = trx_undo_rec_get_partial_row(
1155 			ptr, clust_index, node->update, &node->row,
1156 			type == TRX_UNDO_UPD_DEL_REC,
1157 			node->heap);
1158 	} else if (node->update->info_bits & REC_INFO_MIN_REC_FLAG) {
1159 		node->ref = &trx_undo_metadata;
1160 	}
1161 
1162 	return(true);
1163 }
1164 
1165 /***********************************************************//**
1166 Purges the parsed record.
1167 @return true if purged, false if skipped */
1168 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1169 bool
1170 row_purge_record_func(
1171 /*==================*/
1172 	purge_node_t*	node,		/*!< in: row purge node */
1173 	trx_undo_rec_t*	undo_rec,	/*!< in: record to purge */
1174 #if defined UNIV_DEBUG || defined WITH_WSREP
1175 	const que_thr_t*thr,		/*!< in: query thread */
1176 #endif /* UNIV_DEBUG || WITH_WSREP */
1177 	bool		updated_extern)	/*!< in: whether external columns
1178 					were updated */
1179 {
1180 	dict_index_t*	clust_index;
1181 	bool		purged		= true;
1182 
1183 	ut_ad(!node->found_clust);
1184 	ut_ad(!node->table->skip_alter_undo);
1185 
1186 	clust_index = dict_table_get_first_index(node->table);
1187 
1188 	node->index = dict_table_get_next_index(clust_index);
1189 	ut_ad(!trx_undo_roll_ptr_is_insert(node->roll_ptr));
1190 
1191 	switch (node->rec_type) {
1192 	case TRX_UNDO_DEL_MARK_REC:
1193 		purged = row_purge_del_mark(node);
1194 		if (purged) {
1195 			if (node->table->stat_initialized
1196 			    && srv_stats_include_delete_marked) {
1197 				dict_stats_update_if_needed(
1198 					node->table, *thr->graph->trx);
1199 			}
1200 			MONITOR_INC(MONITOR_N_DEL_ROW_PURGE);
1201 		}
1202 		break;
1203 	case TRX_UNDO_INSERT_METADATA:
1204 	case TRX_UNDO_INSERT_REC:
1205 		node->roll_ptr |= 1ULL << ROLL_PTR_INSERT_FLAG_POS;
1206 		/* fall through */
1207 	default:
1208 		if (!updated_extern) {
1209 			mtr_t		mtr;
1210 			row_purge_reset_trx_id(node, &mtr);
1211 			break;
1212 		}
1213 		/* fall through */
1214 	case TRX_UNDO_UPD_EXIST_REC:
1215 		row_purge_upd_exist_or_extern(thr, node, undo_rec);
1216 		MONITOR_INC(MONITOR_N_UPD_EXIST_EXTERN);
1217 		break;
1218 	}
1219 
1220 	if (node->found_clust) {
1221 		btr_pcur_close(&node->pcur);
1222 		node->found_clust = FALSE;
1223 	}
1224 
1225 	if (node->table != NULL) {
1226 		dict_table_close(node->table, FALSE, FALSE);
1227 		node->table = NULL;
1228 	}
1229 
1230 	return(purged);
1231 }
1232 
1233 #if defined UNIV_DEBUG || defined WITH_WSREP
1234 # define row_purge_record(node,undo_rec,thr,updated_extern)	\
1235 	row_purge_record_func(node,undo_rec,thr,updated_extern)
1236 #else /* UNIV_DEBUG || WITH_WSREP */
1237 # define row_purge_record(node,undo_rec,thr,updated_extern)	\
1238 	row_purge_record_func(node,undo_rec,updated_extern)
1239 #endif /* UNIV_DEBUG || WITH_WSREP */
1240 
1241 /***********************************************************//**
1242 Fetches an undo log record and does the purge for the recorded operation.
1243 If none left, or the current purge completed, returns the control to the
1244 parent node, which is always a query thread node. */
1245 static MY_ATTRIBUTE((nonnull))
1246 void
1247 row_purge(
1248 /*======*/
1249 	purge_node_t*	node,		/*!< in: row purge node */
1250 	trx_undo_rec_t*	undo_rec,	/*!< in: record to purge */
1251 	que_thr_t*	thr)		/*!< in: query thread */
1252 {
1253 	if (undo_rec != &trx_purge_dummy_rec) {
1254 		bool	updated_extern;
1255 
1256 		while (row_purge_parse_undo_rec(
1257 			       node, undo_rec, &updated_extern, thr)) {
1258 
1259 			bool purged = row_purge_record(
1260 				node, undo_rec, thr, updated_extern);
1261 
1262 			if (!node->vcol_info.is_used()) {
1263 				rw_lock_s_unlock(&dict_operation_lock);
1264 			}
1265 
1266 			ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_S));
1267 
1268 			if (purged
1269 			    || srv_shutdown_state > SRV_SHUTDOWN_INITIATED
1270 			    || node->vcol_op_failed()) {
1271 				return;
1272 			}
1273 
1274 			/* Retry the purge in a second. */
1275 			os_thread_sleep(1000000);
1276 		}
1277 	}
1278 }
1279 
1280 /***********************************************************//**
1281 Reset the purge query thread. */
1282 UNIV_INLINE
1283 void
1284 row_purge_end(
1285 /*==========*/
1286 	que_thr_t*	thr)	/*!< in: query thread */
1287 {
1288 	ut_ad(thr);
1289 
1290 	thr->run_node = static_cast<purge_node_t*>(thr->run_node)->end();
1291 
1292 	ut_a(thr->run_node != NULL);
1293 }
1294 
1295 /***********************************************************//**
1296 Does the purge operation for a single undo log record. This is a high-level
1297 function used in an SQL execution graph.
1298 @return query thread to run next or NULL */
1299 que_thr_t*
1300 row_purge_step(
1301 /*===========*/
1302 	que_thr_t*	thr)	/*!< in: query thread */
1303 {
1304 	purge_node_t*	node;
1305 
1306 	node = static_cast<purge_node_t*>(thr->run_node);
1307 
1308 	node->start();
1309 
1310 #ifdef UNIV_DEBUG
1311 	srv_slot_t *slot = thr->thread_slot;
1312 	ut_ad(slot);
1313 
1314 	rw_lock_x_lock(&slot->debug_sync_lock);
1315 	while (UT_LIST_GET_LEN(slot->debug_sync)) {
1316 		srv_slot_t::debug_sync_t *sync =
1317 					UT_LIST_GET_FIRST(slot->debug_sync);
1318 		bool result = debug_sync_set_action(current_thd,
1319 						    sync->str,
1320 						    strlen(sync->str));
1321 		ut_a(!result);
1322 
1323 		UT_LIST_REMOVE(slot->debug_sync, sync);
1324 		ut_free(sync);
1325 	}
1326 	rw_lock_x_unlock(&slot->debug_sync_lock);
1327 #endif
1328 
1329 	if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) {
1330 		trx_purge_rec_t*purge_rec;
1331 
1332 		purge_rec = static_cast<trx_purge_rec_t*>(
1333 			ib_vector_pop(node->undo_recs));
1334 
1335 		node->roll_ptr = purge_rec->roll_ptr;
1336 
1337 		row_purge(node, purge_rec->undo_rec, thr);
1338 
1339 		if (ib_vector_is_empty(node->undo_recs)) {
1340 			row_purge_end(thr);
1341 		} else {
1342 			thr->run_node = node;
1343 			node->vcol_info.reset();
1344 		}
1345 	} else {
1346 		row_purge_end(thr);
1347 	}
1348 
1349 	innobase_reset_background_thd(thr_get_trx(thr)->mysql_thd);
1350 
1351 	return(thr);
1352 }
1353 
1354 #ifdef UNIV_DEBUG
1355 /***********************************************************//**
1356 Validate the persisent cursor. The purge node has two references
1357 to the clustered index record - one via the ref member, and the
1358 other via the persistent cursor.  These two references must match
1359 each other if the found_clust flag is set.
1360 @return true if the stored copy of persistent cursor is consistent
1361 with the ref member.*/
1362 bool
1363 purge_node_t::validate_pcur()
1364 {
1365 	if (!found_clust) {
1366 		return(true);
1367 	}
1368 
1369 	if (index == NULL) {
1370 		return(true);
1371 	}
1372 
1373 	if (index->type == DICT_FTS) {
1374 		return(true);
1375 	}
1376 
1377 	if (!pcur.old_stored) {
1378 		return(true);
1379 	}
1380 
1381 	dict_index_t*	clust_index = pcur.btr_cur.index;
1382 
1383 	rec_offs* offsets = rec_get_offsets(
1384 		pcur.old_rec, clust_index, NULL, pcur.old_n_core_fields,
1385 		pcur.old_n_fields, &heap);
1386 
1387 	/* Here we are comparing the purge ref record and the stored initial
1388 	part in persistent cursor. Both cases we store n_uniq fields of the
1389 	cluster index and so it is fine to do the comparison. We note this
1390 	dependency here as pcur and ref belong to different modules. */
1391 	int st = cmp_dtuple_rec(ref, pcur.old_rec, offsets);
1392 
1393 	if (st != 0) {
1394 		ib::error() << "Purge node pcur validation failed";
1395 		ib::error() << rec_printer(ref).str();
1396 		ib::error() << rec_printer(pcur.old_rec, offsets).str();
1397 		return(false);
1398 	}
1399 
1400 	return(true);
1401 }
1402 #endif /* UNIV_DEBUG */
1403