1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file trx/trx0rseg.cc
22 Rollback segment
23 
24 Created 3/26/1996 Heikki Tuuri
25 *******************************************************/
26 
27 #include "trx0rseg.h"
28 #include "trx0undo.h"
29 #include "fut0lst.h"
30 #include "srv0srv.h"
31 #include "trx0purge.h"
32 #include "srv0mon.h"
33 
34 #include <algorithm>
35 
36 #ifdef WITH_WSREP
37 #include <mysql/service_wsrep.h>
38 
39 #ifdef UNIV_DEBUG
40 /** The latest known WSREP XID sequence number */
41 static long long wsrep_seqno = -1;
42 #endif /* UNIV_DEBUG */
43 /** The latest known WSREP XID UUID */
44 static unsigned char wsrep_uuid[16];
45 
46 /** Write the WSREP XID information into rollback segment header.
47 @param[in,out]	rseg_header	rollback segment header
48 @param[in]	xid		WSREP XID
49 @param[in,out]	mtr		mini transaction */
50 static void
51 trx_rseg_write_wsrep_checkpoint(
52 	trx_rsegf_t*	rseg_header,
53 	const XID*	xid,
54 	mtr_t*		mtr)
55 {
56 	DBUG_ASSERT(xid->gtrid_length >= 0);
57 	DBUG_ASSERT(xid->bqual_length >= 0);
58 	DBUG_ASSERT(xid->gtrid_length + xid->bqual_length < XIDDATASIZE);
59 
60 	mlog_write_ulint(TRX_RSEG_WSREP_XID_FORMAT + rseg_header,
61 			 uint32_t(xid->formatID),
62 			 MLOG_4BYTES, mtr);
63 
64 	mlog_write_ulint(TRX_RSEG_WSREP_XID_GTRID_LEN + rseg_header,
65 			 uint32_t(xid->gtrid_length),
66 			 MLOG_4BYTES, mtr);
67 
68 	mlog_write_ulint(TRX_RSEG_WSREP_XID_BQUAL_LEN + rseg_header,
69 			 uint32_t(xid->bqual_length),
70 			 MLOG_4BYTES, mtr);
71 
72 	const ulint xid_length = static_cast<ulint>(xid->gtrid_length
73 						    + xid->bqual_length);
74 	mlog_write_string(TRX_RSEG_WSREP_XID_DATA + rseg_header,
75 			  reinterpret_cast<const byte*>(xid->data),
76 			  xid_length, mtr);
77 	if (UNIV_LIKELY(xid_length < XIDDATASIZE)) {
78 		mlog_memset(TRX_RSEG_WSREP_XID_DATA + rseg_header + xid_length,
79 			    XIDDATASIZE - xid_length, 0, mtr);
80 	}
81 }
82 
83 /** Update the WSREP XID information in rollback segment header.
84 @param[in,out]	rseg_header	rollback segment header
85 @param[in]	xid		WSREP XID
86 @param[in,out]	mtr		mini-transaction */
87 void
88 trx_rseg_update_wsrep_checkpoint(
89 	trx_rsegf_t*	rseg_header,
90 	const XID*	xid,
91 	mtr_t*		mtr)
92 {
93 	ut_ad(wsrep_is_wsrep_xid(xid));
94 
95 #ifdef UNIV_DEBUG
96 	/* Check that seqno is monotonically increasing */
97 	long long xid_seqno = wsrep_xid_seqno(xid);
98 	const byte* xid_uuid = wsrep_xid_uuid(xid);
99 
100 	if (xid_seqno != -1
101 	    && !memcmp(xid_uuid, wsrep_uuid, sizeof wsrep_uuid)) {
102 		ut_ad(xid_seqno > wsrep_seqno);
103 	} else {
104 		memcpy(wsrep_uuid, xid_uuid, sizeof wsrep_uuid);
105 	}
106 	wsrep_seqno = xid_seqno;
107 #endif /* UNIV_DEBUG */
108 	trx_rseg_write_wsrep_checkpoint(rseg_header, xid, mtr);
109 }
110 
111 /** Clear the WSREP XID information from rollback segment header.
112 @param[in,out]	rseg_header	Rollback segment header
113 @param[in,out]	mtr 		mini-transaction */
114 static void
115 trx_rseg_clear_wsrep_checkpoint(
116 	trx_rsegf_t*	rseg_header,
117 	mtr_t*		mtr)
118 {
119 	mlog_memset(rseg_header + TRX_RSEG_WSREP_XID_INFO,
120 		    TRX_RSEG_WSREP_XID_DATA + XIDDATASIZE
121 		    - TRX_RSEG_WSREP_XID_INFO, 0, mtr);
122 }
123 
124 static void
125 trx_rseg_update_wsrep_checkpoint(const XID* xid, mtr_t* mtr)
126 {
127 	const byte* xid_uuid = wsrep_xid_uuid(xid);
128 	/* We must make check against wsrep_uuid here, the
129 	trx_rseg_update_wsrep_checkpoint() writes over wsrep_uuid with
130 	xid contents in debug mode and the memcmp() will never give nonzero
131 	result. */
132 	const bool must_clear_rsegs = memcmp(wsrep_uuid, xid_uuid,
133 					     sizeof wsrep_uuid);
134 	const trx_rseg_t* rseg = trx_sys.rseg_array[0];
135 
136 	trx_rsegf_t* rseg_header = trx_rsegf_get(rseg->space, rseg->page_no,
137 						 mtr);
138 	if (UNIV_UNLIKELY(mach_read_from_4(rseg_header + TRX_RSEG_FORMAT))) {
139 		trx_rseg_format_upgrade(rseg_header, mtr);
140 	}
141 
142 	trx_rseg_update_wsrep_checkpoint(rseg_header, xid, mtr);
143 
144 	if (must_clear_rsegs) {
145 		/* Because the UUID part of the WSREP XID differed
146 		from current_xid_uuid, the WSREP group UUID was
147 		changed, and we must reset the XID in all rollback
148 		segment headers. */
149 		for (ulint rseg_id = 1; rseg_id < TRX_SYS_N_RSEGS; ++rseg_id) {
150 			if (const trx_rseg_t* rseg =
151 			    trx_sys.rseg_array[rseg_id]) {
152 				trx_rseg_clear_wsrep_checkpoint(
153 					trx_rsegf_get(rseg->space,
154 						      rseg->page_no, mtr),
155 				        mtr);
156 			}
157 		}
158 	}
159 }
160 
161 /** Update WSREP checkpoint XID in first rollback segment header
162 as part of wsrep_set_SE_checkpoint() when it is guaranteed that there
163 are no wsrep transactions committing.
164 If the UUID part of the WSREP XID does not match to the UUIDs of XIDs already
165 stored into rollback segments, the WSREP XID in all the remaining rollback
166 segments will be reset.
167 @param[in]	xid		WSREP XID */
168 void trx_rseg_update_wsrep_checkpoint(const XID* xid)
169 {
170 	mtr_t	mtr;
171 	mtr.start();
172 	trx_rseg_update_wsrep_checkpoint(xid, &mtr);
173 	mtr.commit();
174 }
175 
176 /** Read the WSREP XID information in rollback segment header.
177 @param[in]	rseg_header	Rollback segment header
178 @param[out]	xid		Transaction XID
179 @return	whether the WSREP XID was present */
180 static
181 bool trx_rseg_read_wsrep_checkpoint(const trx_rsegf_t* rseg_header, XID& xid)
182 {
183 	int formatID = static_cast<int>(
184 		mach_read_from_4(
185 			TRX_RSEG_WSREP_XID_FORMAT + rseg_header));
186 	if (formatID == 0) {
187 		return false;
188 	}
189 
190 	xid.formatID = formatID;
191 	xid.gtrid_length = static_cast<int>(
192 		mach_read_from_4(
193 			TRX_RSEG_WSREP_XID_GTRID_LEN + rseg_header));
194 
195 	xid.bqual_length = static_cast<int>(
196 		mach_read_from_4(
197 			TRX_RSEG_WSREP_XID_BQUAL_LEN + rseg_header));
198 
199 	memcpy(xid.data, TRX_RSEG_WSREP_XID_DATA + rseg_header, XIDDATASIZE);
200 
201 	return true;
202 }
203 
204 /** Read the WSREP XID from the TRX_SYS page (in case of upgrade).
205 @param[in]	page	TRX_SYS page
206 @param[out]	xid	WSREP XID (if present)
207 @return	whether the WSREP XID is present */
208 static bool trx_rseg_init_wsrep_xid(const page_t* page, XID& xid)
209 {
210 	if (mach_read_from_4(TRX_SYS + TRX_SYS_WSREP_XID_INFO
211 			     + TRX_SYS_WSREP_XID_MAGIC_N_FLD
212 			     + page)
213 	    != TRX_SYS_WSREP_XID_MAGIC_N) {
214 		return false;
215 	}
216 
217 	xid.formatID = static_cast<int>(
218 		mach_read_from_4(
219 			TRX_SYS + TRX_SYS_WSREP_XID_INFO
220 			+ TRX_SYS_WSREP_XID_FORMAT + page));
221 	xid.gtrid_length = static_cast<int>(
222 		mach_read_from_4(
223 			TRX_SYS + TRX_SYS_WSREP_XID_INFO
224 			+ TRX_SYS_WSREP_XID_GTRID_LEN + page));
225 	xid.bqual_length = static_cast<int>(
226 		mach_read_from_4(
227 			TRX_SYS + TRX_SYS_WSREP_XID_INFO
228 			+ TRX_SYS_WSREP_XID_BQUAL_LEN + page));
229 	memcpy(xid.data,
230 	       TRX_SYS + TRX_SYS_WSREP_XID_INFO
231 	       + TRX_SYS_WSREP_XID_DATA + page, XIDDATASIZE);
232 	return true;
233 }
234 
235 /** Recover the latest WSREP checkpoint XID.
236 @param[out]	xid	WSREP XID
237 @return	whether the WSREP XID was found */
238 bool trx_rseg_read_wsrep_checkpoint(XID& xid)
239 {
240 	mtr_t		mtr;
241 	long long       max_xid_seqno = -1;
242 	bool		found = false;
243 
244 	for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS;
245 	     rseg_id++, mtr.commit()) {
246 		mtr.start();
247 		const buf_block_t* sys = trx_sysf_get(&mtr, false);
248 		const uint32_t page_no = trx_sysf_rseg_get_page_no(
249 			sys, rseg_id);
250 
251 		if (page_no == FIL_NULL) {
252 			continue;
253 		}
254 
255 		const trx_rsegf_t* rseg_header = trx_rsegf_get_new(
256 			trx_sysf_rseg_get_space(sys, rseg_id), page_no, &mtr);
257 
258 		if (mach_read_from_4(rseg_header + TRX_RSEG_FORMAT)) {
259 			continue;
260 		}
261 
262 		XID tmp_xid;
263 		long long tmp_seqno = 0;
264 		if (trx_rseg_read_wsrep_checkpoint(rseg_header, tmp_xid)
265 		    && (tmp_seqno = wsrep_xid_seqno(&tmp_xid))
266 		    > max_xid_seqno) {
267 			found = true;
268 			max_xid_seqno = tmp_seqno;
269 			xid = tmp_xid;
270 			memcpy(wsrep_uuid, wsrep_xid_uuid(&tmp_xid),
271 			       sizeof wsrep_uuid);
272 		}
273 	}
274 
275 	return found;
276 }
277 #endif /* WITH_WSREP */
278 
279 /** Upgrade a rollback segment header page to MariaDB 10.3 format.
280 @param[in,out]	rseg_header	rollback segment header page
281 @param[in,out]	mtr		mini-transaction */
282 void trx_rseg_format_upgrade(trx_rsegf_t* rseg_header, mtr_t* mtr)
283 {
284 	ut_ad(page_offset(rseg_header) == TRX_RSEG);
285 	byte* rseg_format = TRX_RSEG_FORMAT + rseg_header;
286 	mlog_write_ulint(rseg_format, 0, MLOG_4BYTES, mtr);
287 	/* Clear also possible garbage at the end of the page. Old
288 	InnoDB versions did not initialize unused parts of pages. */
289 	mlog_memset(TRX_RSEG_MAX_TRX_ID + 8 + rseg_header,
290 		    srv_page_size
291 		    - (FIL_PAGE_DATA_END
292 		       + TRX_RSEG + TRX_RSEG_MAX_TRX_ID + 8), 0, mtr);
293 }
294 
295 /** Create a rollback segment header.
296 @param[in,out]	space		system, undo, or temporary tablespace
297 @param[in]	rseg_id		rollback segment identifier
298 @param[in]	max_trx_id	new value of TRX_RSEG_MAX_TRX_ID
299 @param[in,out]	sys_header	the TRX_SYS page (NULL for temporary rseg)
300 @param[in,out]	mtr		mini-transaction
301 @return the created rollback segment
302 @retval	NULL	on failure */
303 buf_block_t*
304 trx_rseg_header_create(
305 	fil_space_t*	space,
306 	ulint		rseg_id,
307 	trx_id_t	max_trx_id,
308 	buf_block_t*	sys_header,
309 	mtr_t*		mtr)
310 {
311 	buf_block_t*	block;
312 
313 	ut_ad(mtr_memo_contains(mtr, space, MTR_MEMO_SPACE_X_LOCK));
314 	ut_ad(!sys_header == (space == fil_system.temp_space));
315 
316 	/* Allocate a new file segment for the rollback segment */
317 	block = fseg_create(space, TRX_RSEG + TRX_RSEG_FSEG_HEADER, mtr);
318 
319 	if (block == NULL) {
320 		/* No space left */
321 		return block;
322 	}
323 
324 	buf_block_dbg_add_level(block, SYNC_RSEG_HEADER_NEW);
325 
326 	ut_ad(0 == mach_read_from_4(TRX_RSEG_FORMAT + TRX_RSEG
327 				    + block->frame));
328 	ut_ad(0 == mach_read_from_4(TRX_RSEG_HISTORY_SIZE + TRX_RSEG
329 				    + block->frame));
330 	ut_ad(0 == mach_read_from_4(TRX_RSEG_MAX_TRX_ID + TRX_RSEG
331 				    + block->frame));
332 
333 	/* Initialize the history list */
334 	if (max_trx_id) {
335 		mlog_write_ull(TRX_RSEG + TRX_RSEG_MAX_TRX_ID + block->frame,
336 			       max_trx_id, mtr);
337 	}
338 
339 	flst_init(block, TRX_RSEG_HISTORY + TRX_RSEG, mtr);
340 
341 	/* Reset the undo log slots */
342 	mlog_memset(block, TRX_RSEG_UNDO_SLOTS + TRX_RSEG,
343 		    TRX_RSEG_N_SLOTS * 4, 0xff, mtr);
344 
345 	if (sys_header) {
346 		/* Add the rollback segment info to the free slot in
347 		the trx system header */
348 
349 		mlog_write_ulint(TRX_SYS + TRX_SYS_RSEGS
350 				 + TRX_SYS_RSEG_SPACE
351 				 + rseg_id * TRX_SYS_RSEG_SLOT_SIZE
352 				 + sys_header->frame,
353 				 space->id, MLOG_4BYTES, mtr);
354 		mlog_write_ulint(TRX_SYS + TRX_SYS_RSEGS
355 				 + TRX_SYS_RSEG_PAGE_NO
356 				 + rseg_id * TRX_SYS_RSEG_SLOT_SIZE
357 				 + sys_header->frame,
358 				 block->page.id.page_no(), MLOG_4BYTES, mtr);
359 	}
360 
361 	return block;
362 }
363 
364 /** Free a rollback segment in memory. */
365 void
366 trx_rseg_mem_free(trx_rseg_t* rseg)
367 {
368 	trx_undo_t*	undo;
369 	trx_undo_t*	next_undo;
370 
371 	mutex_free(&rseg->mutex);
372 
373 	/* There can't be any active transactions. */
374 	ut_a(UT_LIST_GET_LEN(rseg->undo_list) == 0);
375 
376 	for (undo = UT_LIST_GET_FIRST(rseg->undo_cached);
377 	     undo != NULL;
378 	     undo = next_undo) {
379 
380 		next_undo = UT_LIST_GET_NEXT(undo_list, undo);
381 
382 		UT_LIST_REMOVE(rseg->undo_cached, undo);
383 
384 		MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_CACHED);
385 
386 		ut_free(undo);
387 	}
388 
389 	ut_free(rseg);
390 }
391 
392 /** Create a rollback segment object.
393 @param[in]	id		rollback segment id
394 @param[in]	space		space where the segment is placed
395 @param[in]	page_no		page number of the segment header */
396 static
397 trx_rseg_t*
398 trx_rseg_mem_create(ulint id, fil_space_t* space, ulint page_no)
399 {
400 	trx_rseg_t* rseg = static_cast<trx_rseg_t*>(
401 		ut_zalloc_nokey(sizeof *rseg));
402 
403 	rseg->id = id;
404 	rseg->space = space;
405 	rseg->page_no = page_no;
406 	rseg->last_page_no = FIL_NULL;
407 	rseg->curr_size = 1;
408 
409 	mutex_create(rseg->is_persistent()
410 		     ? LATCH_ID_REDO_RSEG : LATCH_ID_NOREDO_RSEG,
411 		     &rseg->mutex);
412 
413 	UT_LIST_INIT(rseg->undo_list, &trx_undo_t::undo_list);
414 	UT_LIST_INIT(rseg->undo_cached, &trx_undo_t::undo_list);
415 
416 	return(rseg);
417 }
418 
419 /** Read the undo log lists.
420 @param[in,out]  rseg            rollback segment
421 @param[in,out]  max_trx_id      maximum observed transaction identifier
422 @param[in]      rseg_header     rollback segment header
423 @return error code */
424 static dberr_t trx_undo_lists_init(trx_rseg_t *rseg, trx_id_t &max_trx_id,
425                                    const trx_rsegf_t *rseg_header)
426 {
427   ut_ad(srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN);
428 
429   for (ulint i= 0; i < TRX_RSEG_N_SLOTS; i++)
430   {
431     uint32_t page_no= trx_rsegf_get_nth_undo(rseg_header, i);
432     if (page_no != FIL_NULL)
433     {
434       const trx_undo_t *undo= trx_undo_mem_create_at_db_start(rseg, i, page_no,
435                                                               max_trx_id);
436       if (!undo)
437         return DB_CORRUPTION;
438       rseg->curr_size+= undo->size;
439       MONITOR_INC(MONITOR_NUM_UNDO_SLOT_USED);
440     }
441   }
442 
443   return DB_SUCCESS;
444 }
445 
446 /** Restore the state of a persistent rollback segment.
447 @param[in,out]	rseg		persistent rollback segment
448 @param[in,out]	max_trx_id	maximum observed transaction identifier
449 @param[in,out]	mtr		mini-transaction
450 @return error code */
451 static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id,
452                                     mtr_t *mtr)
453 {
454 	/* This is based on trx_rsegf_get_new().
455 	We need to access buf_block_t. */
456 	buf_block_t *block = buf_page_get(
457 		page_id_t(rseg->space->id, rseg->page_no), 0, RW_S_LATCH, mtr);
458 	buf_block_dbg_add_level(block, SYNC_RSEG_HEADER_NEW);
459 
460 	const trx_rsegf_t* rseg_header = TRX_RSEG + block->frame;
461 
462 	if (mach_read_from_4(rseg_header + TRX_RSEG_FORMAT) == 0) {
463 		trx_id_t id = mach_read_from_8(rseg_header
464 					       + TRX_RSEG_MAX_TRX_ID);
465 
466 		if (id > max_trx_id) {
467 			max_trx_id = id;
468 		}
469 
470 		if (rseg_header[TRX_RSEG_BINLOG_NAME]) {
471 			lsn_t lsn = std::max(block->page.newest_modification,
472 					     mach_read_from_8(FIL_PAGE_LSN
473 							      + block->frame));
474 			compile_time_assert(TRX_RSEG_BINLOG_NAME_LEN == sizeof
475 					    trx_sys.recovered_binlog_filename);
476 			if (lsn > trx_sys.recovered_binlog_lsn) {
477 				trx_sys.recovered_binlog_lsn = lsn;
478 				trx_sys.recovered_binlog_offset
479 					= mach_read_from_8(
480 						rseg_header
481 						+ TRX_RSEG_BINLOG_OFFSET);
482 				memcpy(trx_sys.recovered_binlog_filename,
483 				       rseg_header + TRX_RSEG_BINLOG_NAME,
484 				       TRX_RSEG_BINLOG_NAME_LEN);
485 			}
486 
487 #ifdef WITH_WSREP
488 			trx_rseg_read_wsrep_checkpoint(
489 				rseg_header, trx_sys.recovered_wsrep_xid);
490 #endif
491 		}
492 	}
493 
494 	if (srv_operation == SRV_OPERATION_RESTORE) {
495 		/* mariabackup --prepare only deals with
496 		the redo log and the data files, not with
497 		transactions or the data dictionary. */
498 		return DB_SUCCESS;
499 	}
500 
501 	/* Initialize the undo log lists according to the rseg header */
502 
503 	rseg->curr_size = mach_read_from_4(rseg_header + TRX_RSEG_HISTORY_SIZE)
504 		+ 1;
505 	if (dberr_t err = trx_undo_lists_init(rseg, max_trx_id, rseg_header)) {
506 		return err;
507 	}
508 
509 	if (auto len = flst_get_len(rseg_header + TRX_RSEG_HISTORY)) {
510 		trx_sys.rseg_history_len += len;
511 
512 		fil_addr_t	node_addr = trx_purge_get_log_from_hist(
513 			flst_get_last(rseg_header + TRX_RSEG_HISTORY, mtr));
514 
515 		rseg->last_page_no = static_cast<uint32_t>(node_addr.page);
516 
517 		const trx_ulogf_t*	undo_log_hdr = trx_undo_page_get(
518 			page_id_t(rseg->space->id, node_addr.page), mtr)
519 			+ node_addr.boffset;
520 
521 		trx_id_t id = mach_read_from_8(undo_log_hdr + TRX_UNDO_TRX_ID);
522 		if (id > max_trx_id) {
523 			max_trx_id = id;
524 		}
525 		id = mach_read_from_8(undo_log_hdr + TRX_UNDO_TRX_NO);
526 		if (id > max_trx_id) {
527 			max_trx_id = id;
528 		}
529 		rseg->set_last_commit(node_addr.boffset, id);
530 		unsigned purge = mach_read_from_2(
531 			undo_log_hdr + TRX_UNDO_NEEDS_PURGE);
532 		ut_ad(purge <= 1);
533 		rseg->needs_purge = purge != 0;
534 
535 		if (rseg->last_page_no != FIL_NULL) {
536 
537 			/* There is no need to cover this operation by the purge
538 			mutex because we are still bootstrapping. */
539 			purge_sys.purge_queue.push(*rseg);
540 		}
541 	}
542 
543 	return DB_SUCCESS;
544 }
545 
546 /** Read binlog metadata from the TRX_SYS page, in case we are upgrading
547 from MySQL or a MariaDB version older than 10.3.5. */
548 static void trx_rseg_init_binlog_info(const page_t* page)
549 {
550 	if (mach_read_from_4(TRX_SYS + TRX_SYS_MYSQL_LOG_INFO
551 			     + TRX_SYS_MYSQL_LOG_MAGIC_N_FLD
552 			     + page)
553 	    == TRX_SYS_MYSQL_LOG_MAGIC_N) {
554 		memcpy(trx_sys.recovered_binlog_filename,
555 		       TRX_SYS_MYSQL_LOG_INFO + TRX_SYS_MYSQL_LOG_NAME
556 		       + TRX_SYS + page, TRX_SYS_MYSQL_LOG_NAME_LEN);
557 		trx_sys.recovered_binlog_offset = mach_read_from_8(
558 			TRX_SYS_MYSQL_LOG_INFO + TRX_SYS_MYSQL_LOG_OFFSET
559 			+ TRX_SYS + page);
560 	}
561 
562 #ifdef WITH_WSREP
563 	trx_rseg_init_wsrep_xid(page, trx_sys.recovered_wsrep_xid);
564 #endif
565 }
566 
567 /** Initialize or recover the rollback segments at startup. */
568 dberr_t trx_rseg_array_init()
569 {
570 	trx_id_t max_trx_id = 0;
571 
572 	*trx_sys.recovered_binlog_filename = '\0';
573 	trx_sys.recovered_binlog_offset = 0;
574 #ifdef WITH_WSREP
575 	trx_sys.recovered_wsrep_xid.null();
576 	XID wsrep_sys_xid;
577 	wsrep_sys_xid.null();
578 	bool wsrep_xid_in_rseg_found = false;
579 #endif
580 	mtr_t mtr;
581 	dberr_t err = DB_SUCCESS;
582 
583 	for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) {
584 		mtr.start();
585 		if (const buf_block_t* sys = trx_sysf_get(&mtr, false)) {
586 			if (rseg_id == 0) {
587 				/* In case this is an upgrade from
588 				before MariaDB 10.3.5, fetch the base
589 				information from the TRX_SYS page. */
590 				max_trx_id = mach_read_from_8(
591 					TRX_SYS + TRX_SYS_TRX_ID_STORE
592 					+ sys->frame);
593 				trx_rseg_init_binlog_info(sys->frame);
594 #ifdef WITH_WSREP
595 				wsrep_sys_xid.set(&trx_sys.recovered_wsrep_xid);
596 #endif
597 			}
598 
599 			const uint32_t	page_no = trx_sysf_rseg_get_page_no(
600 				sys, rseg_id);
601 			if (page_no != FIL_NULL) {
602 				trx_rseg_t* rseg = trx_rseg_mem_create(
603 					rseg_id,
604 					fil_space_get(trx_sysf_rseg_get_space(
605 							      sys, rseg_id)),
606 					page_no);
607 				ut_ad(rseg->is_persistent());
608 				ut_ad(rseg->id == rseg_id);
609 				ut_ad(!trx_sys.rseg_array[rseg_id]);
610 				trx_sys.rseg_array[rseg_id] = rseg;
611 				if ((err = trx_rseg_mem_restore(
612 					     rseg, max_trx_id, &mtr))
613 				    != DB_SUCCESS) {
614 					mtr.commit();
615 					break;
616 				}
617 #ifdef WITH_WSREP
618 				if (!wsrep_sys_xid.is_null() &&
619 				    !wsrep_sys_xid.eq(&trx_sys.recovered_wsrep_xid)) {
620 					wsrep_xid_in_rseg_found = true;
621 					ut_ad(memcmp(wsrep_xid_uuid(&wsrep_sys_xid),
622 						     wsrep_xid_uuid(&trx_sys.recovered_wsrep_xid),
623 						     sizeof wsrep_uuid)
624 					      || wsrep_xid_seqno(
625 						      &wsrep_sys_xid)
626 					      <= wsrep_xid_seqno(
627 						      &trx_sys.recovered_wsrep_xid));
628 				}
629 #endif
630 			}
631 		}
632 
633 		mtr.commit();
634 	}
635 
636 	if (err != DB_SUCCESS) {
637 		for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) {
638 			if (trx_rseg_t*& rseg = trx_sys.rseg_array[rseg_id]) {
639 				while (trx_undo_t* u= UT_LIST_GET_FIRST(
640 					     rseg->undo_list)) {
641 					UT_LIST_REMOVE(rseg->undo_list, u);
642 					ut_free(u);
643 				}
644 				trx_rseg_mem_free(rseg);
645 				rseg = NULL;
646 			}
647 		}
648 		return err;
649 	}
650 
651 #ifdef WITH_WSREP
652 	if (!wsrep_sys_xid.is_null()) {
653 		/* Upgrade from a version prior to 10.3.5,
654 		where WSREP XID was stored in TRX_SYS page.
655 		If no rollback segment has a WSREP XID set,
656 		we must copy the XID found in TRX_SYS page
657 		to rollback segments. */
658 		mtr.start();
659 
660 		if (!wsrep_xid_in_rseg_found) {
661 			trx_rseg_update_wsrep_checkpoint(&wsrep_sys_xid, &mtr);
662 		}
663 
664 		/* Finally, clear WSREP XID in TRX_SYS page. */
665 		const buf_block_t* sys = trx_sysf_get(&mtr);
666 		mlog_memset(TRX_SYS + TRX_SYS_WSREP_XID_INFO + sys->frame,
667 			    TRX_SYS_WSREP_XID_LEN, 0, &mtr);
668 		mtr.commit();
669 	}
670 #endif
671 
672 	trx_sys.init_max_trx_id(max_trx_id + 1);
673 	return DB_SUCCESS;
674 }
675 
676 /** Create a persistent rollback segment.
677 @param[in]	space_id	system or undo tablespace id
678 @return pointer to new rollback segment
679 @retval	NULL	on failure */
680 trx_rseg_t*
681 trx_rseg_create(ulint space_id)
682 {
683 	trx_rseg_t*		rseg = NULL;
684 	mtr_t			mtr;
685 
686 	mtr.start();
687 
688 	/* To obey the latching order, acquire the file space
689 	x-latch before the trx_sys.mutex. */
690 	fil_space_t*	space = mtr_x_lock_space(space_id, &mtr);
691 	ut_ad(space->purpose == FIL_TYPE_TABLESPACE);
692 
693 	if (buf_block_t* sys_header = trx_sysf_get(&mtr)) {
694 		ulint	rseg_id = trx_sys_rseg_find_free(sys_header);
695 		if (buf_block_t* rblock = rseg_id == ULINT_UNDEFINED
696 		    ? NULL
697 		    : trx_rseg_header_create(space, rseg_id, 0, sys_header,
698 					     &mtr)) {
699 			ut_ad(trx_sysf_rseg_get_space(sys_header, rseg_id)
700 			      == space_id);
701 			rseg = trx_rseg_mem_create(rseg_id, space,
702 						   rblock->page.id.page_no());
703 			ut_ad(rseg->id == rseg_id);
704 			ut_ad(rseg->is_persistent());
705 			ut_ad(!trx_sys.rseg_array[rseg->id]);
706 			trx_sys.rseg_array[rseg->id] = rseg;
707 		}
708 	}
709 
710 	mtr.commit();
711 
712 	return(rseg);
713 }
714 
715 /** Create the temporary rollback segments. */
716 void
717 trx_temp_rseg_create()
718 {
719 	mtr_t		mtr;
720 
721 	for (ulong i = 0; i < TRX_SYS_N_RSEGS; i++) {
722 		mtr.start();
723 		mtr.set_log_mode(MTR_LOG_NO_REDO);
724 		mtr_x_lock_space(fil_system.temp_space, &mtr);
725 
726 		buf_block_t* rblock = trx_rseg_header_create(
727 			fil_system.temp_space, i, 0, NULL, &mtr);
728 		trx_rseg_t* rseg = trx_rseg_mem_create(
729 			i, fil_system.temp_space, rblock->page.id.page_no());
730 		ut_ad(!rseg->is_persistent());
731 		ut_ad(!trx_sys.temp_rsegs[i]);
732 		trx_sys.temp_rsegs[i] = rseg;
733 		mtr.commit();
734 	}
735 }
736 
737 /********************************************************************
738 Get the number of unique rollback tablespaces in use except space id 0.
739 The last space id will be the sentinel value ULINT_UNDEFINED. The array
740 will be sorted on space id. Note: space_ids should have have space for
741 TRX_SYS_N_RSEGS + 1 elements.
742 @return number of unique rollback tablespaces in use. */
743 ulint
744 trx_rseg_get_n_undo_tablespaces(
745 /*============================*/
746 	ulint*		space_ids)	/*!< out: array of space ids of
747 					UNDO tablespaces */
748 {
749 	mtr_t mtr;
750 	mtr.start();
751 
752 	buf_block_t* sys_header = trx_sysf_get(&mtr, false);
753 	if (!sys_header) {
754 		mtr.commit();
755 		return 0;
756 	}
757 
758 	ulint* end = space_ids;
759 
760 	for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) {
761 		uint32_t page_no = trx_sysf_rseg_get_page_no(sys_header,
762 							     rseg_id);
763 
764 		if (page_no == FIL_NULL) {
765 			continue;
766 		}
767 
768 		if (ulint space = trx_sysf_rseg_get_space(sys_header,
769 							  rseg_id)) {
770 			if (std::find(space_ids, end, space) == end) {
771 				*end++ = space;
772 			}
773 		}
774 	}
775 
776 	mtr.commit();
777 
778 	ut_a(end - space_ids <= TRX_SYS_N_RSEGS);
779 	*end = ULINT_UNDEFINED;
780 
781 	std::sort(space_ids, end);
782 
783 	return ulint(end - space_ids);
784 }
785 
786 /** Update the offset information about the end of the binlog entry
787 which corresponds to the transaction just being committed.
788 In a replication slave, this updates the master binlog position
789 up to which replication has proceeded.
790 @param[in,out]	rseg_header	rollback segment header
791 @param[in]	trx		committing transaction
792 @param[in,out]	mtr		mini-transaction */
793 void
794 trx_rseg_update_binlog_offset(byte* rseg_header, const trx_t* trx, mtr_t* mtr)
795 {
796 	DBUG_LOG("trx", "trx_mysql_binlog_offset: " << trx->mysql_log_offset);
797 
798 	const size_t len = strlen(trx->mysql_log_file_name) + 1;
799 
800 	ut_ad(len > 1);
801 
802 	if (UNIV_UNLIKELY(len > TRX_RSEG_BINLOG_NAME_LEN)) {
803 		return;
804 	}
805 
806 	mlog_write_ull(rseg_header + TRX_RSEG_BINLOG_OFFSET,
807 		       trx->mysql_log_offset, mtr);
808 	byte* p = rseg_header + TRX_RSEG_BINLOG_NAME;
809 	const byte* binlog_name = reinterpret_cast<const byte*>
810 		(trx->mysql_log_file_name);
811 
812 	if (memcmp(binlog_name, p, len)) {
813 		mlog_write_string(p, binlog_name, len, mtr);
814 	}
815 }
816