1 /*****************************************************************************
2
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file trx/trx0purge.cc
29 Purge old versions
30
31 Created 3/26/1996 Heikki Tuuri
32 *******************************************************/
33
34 #include "ha_prototypes.h"
35
36 #include "trx0purge.h"
37
38 #ifdef UNIV_NONINL
39 #include "trx0purge.ic"
40 #endif
41
42 #include "fsp0fsp.h"
43 #include "fut0fut.h"
44 #include "mach0data.h"
45 #include "mtr0log.h"
46 #include "os0thread.h"
47 #include "que0que.h"
48 #include "read0read.h"
49 #include "row0purge.h"
50 #include "row0upd.h"
51 #include "srv0mon.h"
52 #include "fsp0sysspace.h"
53 #include "srv0srv.h"
54 #include "srv0start.h"
55 #include "sync0sync.h"
56 #include "trx0rec.h"
57 #include "trx0roll.h"
58 #include "trx0rseg.h"
59 #include "trx0trx.h"
60
61 /** Maximum allowable purge history length. <=0 means 'infinite'. */
62 ulong srv_max_purge_lag = 0;
63
64 /** Max DML user threads delay in micro-seconds. */
65 ulong srv_max_purge_lag_delay = 0;
66
67 /** The global data structure coordinating a purge */
68 trx_purge_t* purge_sys = NULL;
69
70 /** A dummy undo record used as a return value when we have a whole undo log
71 which needs no purge */
72 trx_undo_rec_t trx_purge_dummy_rec;
73
74 #ifdef UNIV_DEBUG
75 my_bool srv_purge_view_update_only_debug;
76 bool trx_commit_disallowed = false;
77 #endif /* UNIV_DEBUG */
78
79 /** Sentinel value */
80 const TrxUndoRsegs TrxUndoRsegsIterator::NullElement(UINT64_UNDEFINED);
81
82 /** Constructor */
TrxUndoRsegsIterator(trx_purge_t * purge_sys)83 TrxUndoRsegsIterator::TrxUndoRsegsIterator(trx_purge_t* purge_sys)
84 :
85 m_purge_sys(purge_sys),
86 m_trx_undo_rsegs(NullElement),
87 m_iter(m_trx_undo_rsegs.end())
88 {
89 }
90
91 /** Sets the next rseg to purge in m_purge_sys.
92 @return page size of the table for which the log is.
93 NOTE: if rseg is NULL when this function returns this means that
94 there are no rollback segments to purge and then the returned page
95 size object should not be used. */
96 const page_size_t
set_next()97 TrxUndoRsegsIterator::set_next()
98 {
99 mutex_enter(&m_purge_sys->pq_mutex);
100
101 /* Only purge consumes events from the priority queue, user
102 threads only produce the events. */
103
104 /* Check if there are more rsegs to process in the
105 current element. */
106 if (m_iter != m_trx_undo_rsegs.end()) {
107
108 /* We are still processing rollback segment from
109 the same transaction and so expected transaction
110 number shouldn't increase. Undo increment of
111 expected trx_no done by caller assuming rollback
112 segments from given transaction are done. */
113 m_purge_sys->iter.trx_no = (*m_iter)->last_trx_no;
114
115 } else if (!m_purge_sys->purge_queue->empty()) {
116
117 /* Read the next element from the queue.
118 Combine elements if they have same transaction number.
119 This can happen if a transaction shares redo rollback segment
120 with another transaction that has already added it to purge
121 queue and former transaction also needs to schedule non-redo
122 rollback segment for purge. */
123 m_trx_undo_rsegs = NullElement;
124
125 while (!m_purge_sys->purge_queue->empty()) {
126
127 if (m_trx_undo_rsegs.get_trx_no() == UINT64_UNDEFINED) {
128 m_trx_undo_rsegs =
129 purge_sys->purge_queue->top();
130 } else if (purge_sys->purge_queue->top().get_trx_no() ==
131 m_trx_undo_rsegs.get_trx_no()) {
132 m_trx_undo_rsegs.append(
133 purge_sys->purge_queue->top());
134 } else {
135 break;
136 }
137
138 m_purge_sys->purge_queue->pop();
139 }
140
141 m_iter = m_trx_undo_rsegs.begin();
142
143 } else {
144 /* Queue is empty, reset iterator. */
145 m_trx_undo_rsegs = NullElement;
146 m_iter = m_trx_undo_rsegs.end();
147
148 mutex_exit(&m_purge_sys->pq_mutex);
149
150 m_purge_sys->rseg = NULL;
151
152 /* return a dummy object, not going to be used by the caller */
153 return(univ_page_size);
154 }
155
156 m_purge_sys->rseg = *m_iter++;
157
158 mutex_exit(&m_purge_sys->pq_mutex);
159
160 ut_a(m_purge_sys->rseg != NULL);
161
162 mutex_enter(&m_purge_sys->rseg->mutex);
163
164 ut_a(m_purge_sys->rseg->last_page_no != FIL_NULL);
165 ut_ad(m_purge_sys->rseg->last_trx_no == m_trx_undo_rsegs.get_trx_no());
166
167 /* We assume in purge of externally stored fields that
168 space id is in the range of UNDO tablespace space ids
169 unless space is system tablespace */
170 ut_a(srv_is_undo_tablespace(m_purge_sys->rseg->space)
171 || is_system_tablespace(
172 m_purge_sys->rseg->space));
173
174 const page_size_t page_size(m_purge_sys->rseg->page_size);
175
176 ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
177
178 m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no;
179 m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset;
180 m_purge_sys->hdr_page_no = m_purge_sys->rseg->last_page_no;
181
182 mutex_exit(&m_purge_sys->rseg->mutex);
183
184 return(page_size);
185 }
186
187 /****************************************************************//**
188 Builds a purge 'query' graph. The actual purge is performed by executing
189 this query graph.
190 @return own: the query graph */
191 static
192 que_t*
trx_purge_graph_build(trx_t * trx,ulint n_purge_threads)193 trx_purge_graph_build(
194 /*==================*/
195 trx_t* trx, /*!< in: transaction */
196 ulint n_purge_threads) /*!< in: number of purge
197 threads */
198 {
199 ulint i;
200 mem_heap_t* heap;
201 que_fork_t* fork;
202
203 heap = mem_heap_create(512);
204 fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
205 fork->trx = trx;
206
207 for (i = 0; i < n_purge_threads; ++i) {
208 que_thr_t* thr;
209
210 thr = que_thr_create(fork, heap, NULL);
211
212 thr->child = row_purge_node_create(thr, heap);
213 }
214
215 return(fork);
216 }
217
218 /********************************************************************//**
219 Creates the global purge system control structure and inits the history
220 mutex. */
221 void
trx_purge_sys_create(ulint n_purge_threads,purge_pq_t * purge_queue)222 trx_purge_sys_create(
223 /*=================*/
224 ulint n_purge_threads, /*!< in: number of purge
225 threads */
226 purge_pq_t* purge_queue) /*!< in, own: UNDO log min
227 binary heap */
228 {
229 purge_sys = static_cast<trx_purge_t*>(
230 ut_zalloc_nokey(sizeof(*purge_sys)));
231
232 purge_sys->state = PURGE_STATE_INIT;
233 purge_sys->event = os_event_create(0);
234
235 new (&purge_sys->iter) purge_iter_t;
236 new (&purge_sys->limit) purge_iter_t;
237 new (&purge_sys->undo_trunc) undo::Truncate;
238 #ifdef UNIV_DEBUG
239 new (&purge_sys->done) purge_iter_t;
240 #endif /* UNIV_DEBUG */
241
242 /* Take ownership of purge_queue, we are responsible for freeing it. */
243 purge_sys->purge_queue = purge_queue;
244
245 rw_lock_create(trx_purge_latch_key,
246 &purge_sys->latch, SYNC_PURGE_LATCH);
247
248 mutex_create(LATCH_ID_PURGE_SYS_PQ, &purge_sys->pq_mutex);
249
250 ut_a(n_purge_threads > 0);
251
252 purge_sys->sess = sess_open();
253
254 purge_sys->trx = purge_sys->sess->trx;
255
256 ut_a(purge_sys->trx->sess == purge_sys->sess);
257
258 /* A purge transaction is not a real transaction, we use a transaction
259 here only because the query threads code requires it. It is otherwise
260 quite unnecessary. We should get rid of it eventually. */
261 purge_sys->trx->id = 0;
262 purge_sys->trx->start_time = ut_time_monotonic();
263 purge_sys->trx->state = TRX_STATE_ACTIVE;
264 purge_sys->trx->op_info = "purge trx";
265
266 purge_sys->query = trx_purge_graph_build(
267 purge_sys->trx, n_purge_threads);
268
269 new(&purge_sys->view) ReadView();
270
271 trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
272
273 purge_sys->view_active = true;
274
275 purge_sys->rseg_iter = UT_NEW_NOKEY(TrxUndoRsegsIterator(purge_sys));
276 }
277
278 /************************************************************************
279 Frees the global purge system control structure. */
280 void
trx_purge_sys_close(void)281 trx_purge_sys_close(void)
282 /*======================*/
283 {
284 que_graph_free(purge_sys->query);
285
286 ut_a(purge_sys->trx->id == 0);
287 ut_a(purge_sys->sess->trx == purge_sys->trx);
288
289 purge_sys->trx->state = TRX_STATE_NOT_STARTED;
290
291 sess_close(purge_sys->sess);
292
293 purge_sys->sess = NULL;
294
295 purge_sys->view.close();
296 purge_sys->view.~ReadView();
297
298 rw_lock_free(&purge_sys->latch);
299 mutex_free(&purge_sys->pq_mutex);
300
301 if (purge_sys->purge_queue != NULL) {
302 UT_DELETE(purge_sys->purge_queue);
303 purge_sys->purge_queue = NULL;
304 }
305
306 os_event_destroy(purge_sys->event);
307
308 purge_sys->event = NULL;
309
310 UT_DELETE(purge_sys->rseg_iter);
311
312 ut_free(purge_sys);
313
314 purge_sys = NULL;
315 }
316
317 /*================ UNDO LOG HISTORY LIST =============================*/
318
319 /********************************************************************//**
320 Adds the update undo log as the first log in the history list. Removes the
321 update undo log segment from the rseg slot if it is too big for reuse. */
322 void
trx_purge_add_update_undo_to_history(trx_t * trx,trx_undo_ptr_t * undo_ptr,page_t * undo_page,bool update_rseg_history_len,ulint n_added_logs,mtr_t * mtr)323 trx_purge_add_update_undo_to_history(
324 /*=================================*/
325 trx_t* trx, /*!< in: transaction */
326 trx_undo_ptr_t* undo_ptr, /*!< in/out: update undo log. */
327 page_t* undo_page, /*!< in: update undo log header page,
328 x-latched */
329 bool update_rseg_history_len,
330 /*!< in: if true: update rseg history
331 len else skip updating it. */
332 ulint n_added_logs, /*!< in: number of logs added */
333 mtr_t* mtr) /*!< in: mtr */
334 {
335 trx_undo_t* undo;
336 trx_rseg_t* rseg;
337 trx_rsegf_t* rseg_header;
338 trx_ulogf_t* undo_header;
339
340 undo = undo_ptr->update_undo;
341 rseg = undo->rseg;
342
343 rseg_header = trx_rsegf_get(
344 undo->rseg->space, undo->rseg->page_no, undo->rseg->page_size,
345 mtr);
346
347 undo_header = undo_page + undo->hdr_offset;
348
349 if (undo->state != TRX_UNDO_CACHED) {
350 ulint hist_size;
351 #ifdef UNIV_DEBUG
352 trx_usegf_t* seg_header = undo_page + TRX_UNDO_SEG_HDR;
353 #endif /* UNIV_DEBUG */
354
355 /* The undo log segment will not be reused */
356
357 if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) {
358 ib::fatal() << "undo->id is " << undo->id;
359 }
360
361 trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
362
363 MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
364
365 hist_size = mtr_read_ulint(
366 rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);
367
368 ut_ad(undo->size == flst_get_len(
369 seg_header + TRX_UNDO_PAGE_LIST));
370
371 mlog_write_ulint(
372 rseg_header + TRX_RSEG_HISTORY_SIZE,
373 hist_size + undo->size, MLOG_4BYTES, mtr);
374 }
375
376 ut_ad(!trx_commit_disallowed);
377
378 /* Add the log as the first in the history list */
379 flst_add_first(rseg_header + TRX_RSEG_HISTORY,
380 undo_header + TRX_UNDO_HISTORY_NODE, mtr);
381
382 if (update_rseg_history_len) {
383 os_atomic_increment_ulint(
384 &trx_sys->rseg_history_len, n_added_logs);
385 if (trx_sys->rseg_history_len
386 > srv_n_purge_threads * srv_purge_batch_size) {
387 srv_wake_purge_thread_if_not_active();
388 }
389 }
390
391 /* Write the trx number to the undo log header */
392 mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
393
394 /* Write information about delete markings to the undo log header */
395
396 if (!undo->del_marks) {
397 mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
398 MLOG_2BYTES, mtr);
399 }
400
401 if (rseg->last_page_no == FIL_NULL) {
402 rseg->last_page_no = undo->hdr_page_no;
403 rseg->last_offset = undo->hdr_offset;
404 rseg->last_trx_no = trx->no;
405 rseg->last_del_marks = undo->del_marks;
406 }
407 }
408
409 /** Remove undo log header from the history list.
410 @param[in,out] rseg_hdr rollback segment header
411 @param[in] log_hdr undo log segment header
412 @param[in,out] mtr mini transaction. */
413 static
414 void
trx_purge_remove_log_hdr(trx_rsegf_t * rseg_hdr,trx_ulogf_t * log_hdr,mtr_t * mtr)415 trx_purge_remove_log_hdr(
416 trx_rsegf_t* rseg_hdr,
417 trx_ulogf_t* log_hdr,
418 mtr_t* mtr)
419 {
420 flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
421 log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
422
423 os_atomic_decrement_ulint(&trx_sys->rseg_history_len, 1);
424 }
425
426 /** Frees an undo log segment which is in the history list. Removes the
427 undo log hdr from the history list.
428 @param[in,out] rseg rollback segment
429 @param[in] hdr_addr file address of log_hdr
430 @param[in] noredo skip redo logging. */
431 static
432 void
trx_purge_free_segment(trx_rseg_t * rseg,fil_addr_t hdr_addr,bool noredo)433 trx_purge_free_segment(
434 trx_rseg_t* rseg,
435 fil_addr_t hdr_addr,
436 bool noredo)
437 {
438 mtr_t mtr;
439 trx_rsegf_t* rseg_hdr;
440 trx_ulogf_t* log_hdr;
441 trx_usegf_t* seg_hdr;
442 ulint seg_size;
443 ulint hist_size;
444 bool marked = noredo;
445
446 for (;;) {
447 page_t* undo_page;
448
449 mtr_start(&mtr);
450 if (noredo) {
451 mtr.set_log_mode(MTR_LOG_NO_REDO);
452 }
453 ut_ad(noredo == trx_sys_is_noredo_rseg_slot(rseg->id));
454
455 mutex_enter(&rseg->mutex);
456
457 rseg_hdr = trx_rsegf_get(
458 rseg->space, rseg->page_no, rseg->page_size, &mtr);
459
460 undo_page = trx_undo_page_get(
461 page_id_t(rseg->space, hdr_addr.page), rseg->page_size,
462 &mtr);
463
464 seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
465 log_hdr = undo_page + hdr_addr.boffset;
466
467 /* Mark the last undo log totally purged, so that if the
468 system crashes, the tail of the undo log will not get accessed
469 again. The list of pages in the undo log tail gets inconsistent
470 during the freeing of the segment, and therefore purge should
471 not try to access them again. */
472
473 if (!marked) {
474 marked = true;
475 mlog_write_ulint(
476 log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
477 MLOG_2BYTES, &mtr);
478 }
479
480 if (fseg_free_step_not_header(
481 seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr)) {
482
483 break;
484 }
485
486 mutex_exit(&rseg->mutex);
487
488 mtr_commit(&mtr);
489 }
490
491 /* The page list may now be inconsistent, but the length field
492 stored in the list base node tells us how big it was before we
493 started the freeing. */
494
495 seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST);
496
497 /* We may free the undo log segment header page; it must be freed
498 within the same mtr as the undo log header is removed from the
499 history list: otherwise, in case of a database crash, the segment
500 could become inaccessible garbage in the file space. */
501
502 trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
503
504 do {
505
506 /* Here we assume that a file segment with just the header
507 page can be freed in a few steps, so that the buffer pool
508 is not flooded with bufferfixed pages: see the note in
509 fsp0fsp.cc. */
510
511 } while (!fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr));
512
513 hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
514 MLOG_4BYTES, &mtr);
515 ut_ad(hist_size >= seg_size);
516
517 mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
518 hist_size - seg_size, MLOG_4BYTES, &mtr);
519
520 ut_ad(rseg->curr_size >= seg_size);
521
522 rseg->curr_size -= seg_size;
523
524 mutex_exit(&(rseg->mutex));
525
526 mtr_commit(&mtr);
527 }
528
529 /********************************************************************//**
530 Removes unnecessary history data from a rollback segment. */
531 static
532 void
trx_purge_truncate_rseg_history(trx_rseg_t * rseg,const purge_iter_t * limit)533 trx_purge_truncate_rseg_history(
534 /*============================*/
535 trx_rseg_t* rseg, /*!< in: rollback segment */
536 const purge_iter_t* limit) /*!< in: truncate offset */
537 {
538 fil_addr_t hdr_addr;
539 fil_addr_t prev_hdr_addr;
540 trx_rsegf_t* rseg_hdr;
541 page_t* undo_page;
542 trx_ulogf_t* log_hdr;
543 trx_usegf_t* seg_hdr;
544 mtr_t mtr;
545 trx_id_t undo_trx_no;
546 const bool noredo = trx_sys_is_noredo_rseg_slot(
547 rseg->id);
548
549 mtr_start(&mtr);
550 if (noredo) {
551 mtr.set_log_mode(MTR_LOG_NO_REDO);
552 }
553 mutex_enter(&(rseg->mutex));
554
555 rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
556 rseg->page_size, &mtr);
557
558 hdr_addr = trx_purge_get_log_from_hist(
559 flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
560 loop:
561 if (hdr_addr.page == FIL_NULL) {
562
563 mutex_exit(&(rseg->mutex));
564
565 mtr_commit(&mtr);
566
567 return;
568 }
569
570 undo_page = trx_undo_page_get(page_id_t(rseg->space, hdr_addr.page),
571 rseg->page_size, &mtr);
572
573 log_hdr = undo_page + hdr_addr.boffset;
574
575 undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
576
577 if (undo_trx_no >= limit->trx_no) {
578
579 /* limit space_id should match the rollback segment
580 space id to avoid freeing of the page belongs to
581 different rollback segment for the same trx_no. */
582 if (undo_trx_no == limit->trx_no
583 && rseg->space == limit->undo_rseg_space) {
584
585 trx_undo_truncate_start(
586 rseg, hdr_addr.page,
587 hdr_addr.boffset, limit->undo_no);
588 }
589
590 mutex_exit(&(rseg->mutex));
591 mtr_commit(&mtr);
592
593 return;
594 }
595
596 prev_hdr_addr = trx_purge_get_log_from_hist(
597 flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
598
599 seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
600
601 if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
602 && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
603
604 /* We can free the whole log segment */
605
606 mutex_exit(&(rseg->mutex));
607 mtr_commit(&mtr);
608
609 /* calls the trx_purge_remove_log_hdr()
610 inside trx_purge_free_segment(). */
611 trx_purge_free_segment(rseg, hdr_addr, noredo);
612
613 } else {
614 /* Remove the log hdr from the rseg history. */
615 trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
616
617 mutex_exit(&(rseg->mutex));
618 mtr_commit(&mtr);
619 }
620
621 mtr_start(&mtr);
622 if (noredo) {
623 mtr.set_log_mode(MTR_LOG_NO_REDO);
624 }
625 mutex_enter(&(rseg->mutex));
626
627 rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
628 rseg->page_size, &mtr);
629
630 hdr_addr = prev_hdr_addr;
631
632 goto loop;
633 }
634
635 /** UNDO log truncate logger. Needed to track state of truncate during crash.
636 An auxiliary redo log file undo_<space_id>_trunc.log will created while the
637 truncate of the UNDO is in progress. This file is required during recovery
638 to complete the truncate. */
639
640 namespace undo {
641
642 /** Populate log file name based on space_id
643 @param[in] space_id id of the undo tablespace.
644 @return DB_SUCCESS or error code */
populate_log_file_name(ulint space_id,char * & log_file_name)645 dberr_t populate_log_file_name(
646 ulint space_id,
647 char*& log_file_name)
648 {
649 ulint log_file_name_sz =
650 strlen(srv_log_group_home_dir) + 22 + 1 /* NUL */
651 + strlen(undo::s_log_prefix)
652 + strlen(undo::s_log_ext);
653
654 log_file_name = new (std::nothrow) char[log_file_name_sz];
655 if (log_file_name == 0) {
656 return(DB_OUT_OF_MEMORY);
657 }
658
659 memset(log_file_name, 0, log_file_name_sz);
660
661 strcpy(log_file_name, srv_log_group_home_dir);
662 ulint log_file_name_len = strlen(log_file_name);
663
664 if (log_file_name[log_file_name_len - 1]
665 != OS_PATH_SEPARATOR) {
666
667 log_file_name[log_file_name_len]
668 = OS_PATH_SEPARATOR;
669 log_file_name_len = strlen(log_file_name);
670 }
671
672 ut_snprintf(log_file_name + log_file_name_len,
673 log_file_name_sz - log_file_name_len,
674 "%s%lu_%s", undo::s_log_prefix,
675 (ulong) space_id, s_log_ext);
676
677 return(DB_SUCCESS);
678 }
679
680 /** Create the truncate log file.
681 @param[in] space_id id of the undo tablespace to truncate.
682 @return DB_SUCCESS or error code. */
init(ulint space_id)683 dberr_t init(ulint space_id)
684 {
685 dberr_t err;
686 char* log_file_name;
687
688 /* Step-1: Create the log file name using the pre-decided
689 prefix/suffix and table id of undo tablepsace to truncate. */
690 err = populate_log_file_name(space_id, log_file_name);
691 if (err != DB_SUCCESS) {
692 return(err);
693 }
694
695 /* Step-2: Create the log file, open it and write 0 to
696 indicate init phase. */
697 bool ret;
698 pfs_os_file_t handle = os_file_create(
699 innodb_log_file_key, log_file_name, OS_FILE_CREATE,
700 OS_FILE_NORMAL, OS_LOG_FILE, srv_read_only_mode, &ret);
701 if (!ret) {
702 delete[] log_file_name;
703 return(DB_IO_ERROR);
704 }
705
706 ulint sz = UNIV_PAGE_SIZE;
707 void* buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
708 if (buf == NULL) {
709 os_file_close(handle);
710 delete[] log_file_name;
711 return(DB_OUT_OF_MEMORY);
712 }
713
714 byte* log_buf = static_cast<byte*>(
715 ut_align(buf, UNIV_PAGE_SIZE));
716
717 IORequest request(IORequest::WRITE);
718
719 request.disable_compression();
720
721 err = os_file_write(
722 request, log_file_name, handle, log_buf, 0, sz);
723
724 os_file_flush(handle);
725 os_file_close(handle);
726
727 ut_free(buf);
728 delete[] log_file_name;
729
730 return(err);
731 }
732
733 /** Mark completion of undo truncate action by writing magic number to
734 the log file and then removing it from the disk.
735 If we are going to remove it from disk then why write magic number ?
736 This is to safeguard from unlink (file-system) anomalies that will keep
737 the link to the file even after unlink action is successfull and
738 ref-count = 0.
739 @param[in] space_id id of the undo tablespace to truncate.*/
done(ulint space_id)740 void done(
741 ulint space_id)
742 {
743 dberr_t err;
744 char* log_file_name;
745
746 /* Step-1: Create the log file name using the pre-decided
747 prefix/suffix and table id of undo tablepsace to truncate. */
748 err = populate_log_file_name(space_id, log_file_name);
749 if (err != DB_SUCCESS) {
750 return;
751 }
752
753 /* Step-2: Open log file and write magic number to
754 indicate done phase. */
755 bool ret;
756 pfs_os_file_t handle =
757 os_file_create_simple_no_error_handling(
758 innodb_log_file_key, log_file_name,
759 OS_FILE_OPEN, OS_FILE_READ_WRITE,
760 srv_read_only_mode, &ret);
761
762 if (!ret) {
763 os_file_delete(innodb_log_file_key, log_file_name);
764 delete[] log_file_name;
765 return;
766 }
767
768 ulint sz = UNIV_PAGE_SIZE;
769 void* buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
770 if (buf == NULL) {
771 os_file_close(handle);
772 os_file_delete(innodb_log_file_key, log_file_name);
773 delete[] log_file_name;
774 return;
775 }
776
777 byte* log_buf = static_cast<byte*>(
778 ut_align(buf, UNIV_PAGE_SIZE));
779
780 mach_write_to_4(log_buf, undo::s_magic);
781
782 IORequest request(IORequest::WRITE);
783
784 request.disable_compression();
785
786 err = os_file_write(
787 request, log_file_name, handle, log_buf, 0, sz);
788
789 ut_ad(err == DB_SUCCESS);
790
791 os_file_flush(handle);
792 os_file_close(handle);
793
794 ut_free(buf);
795 os_file_delete(innodb_log_file_key, log_file_name);
796 delete[] log_file_name;
797 }
798
799 /** Check if TRUNCATE_DDL_LOG file exist.
800 @param[in] space_id id of the undo tablespace.
801 @return true if exist else false. */
is_log_present(ulint space_id)802 bool is_log_present(
803 ulint space_id)
804 {
805 dberr_t err;
806 char* log_file_name;
807
808 /* Step-1: Populate log file name. */
809 err = populate_log_file_name(space_id, log_file_name);
810 if (err != DB_SUCCESS) {
811 return(false);
812 }
813
814 /* Step-2: Check for existence of the file. */
815 bool exist;
816 os_file_type_t type;
817 os_file_status(log_file_name, &exist, &type);
818
819 /* Step-3: If file exists, check it for presence of magic
820 number. If found, then delete the file and report file
821 doesn't exist as presence of magic number suggest that
822 truncate action was complete. */
823
824 if (exist) {
825 bool ret;
826 pfs_os_file_t handle =
827 os_file_create_simple_no_error_handling(
828 innodb_log_file_key, log_file_name,
829 OS_FILE_OPEN, OS_FILE_READ_WRITE,
830 srv_read_only_mode, &ret);
831 if (!ret) {
832 os_file_delete(innodb_log_file_key,
833 log_file_name);
834 delete[] log_file_name;
835 return(false);
836 }
837
838 ulint sz = UNIV_PAGE_SIZE;
839 void* buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
840 if (buf == NULL) {
841 os_file_close(handle);
842 os_file_delete(innodb_log_file_key,
843 log_file_name);
844 delete[] log_file_name;
845 return(false);
846 }
847
848 byte* log_buf = static_cast<byte*>(
849 ut_align(buf, UNIV_PAGE_SIZE));
850
851 IORequest request(IORequest::READ);
852
853 request.disable_compression();
854
855 dberr_t err;
856
857 err = os_file_read(request, handle, log_buf, 0, sz);
858
859 os_file_close(handle);
860
861 if (err != DB_SUCCESS) {
862
863 ib::info()
864 << "Unable to read '"
865 << log_file_name << "' : "
866 << ut_strerr(err);
867
868 os_file_delete(
869 innodb_log_file_key, log_file_name);
870
871 ut_free(buf);
872
873 delete[] log_file_name;
874
875 return(false);
876 }
877
878 ulint magic_no = mach_read_from_4(log_buf);
879
880 ut_free(buf);
881
882 if (magic_no == undo::s_magic) {
883 /* Found magic number. */
884 os_file_delete(innodb_log_file_key,
885 log_file_name);
886 delete[] log_file_name;
887 return(false);
888 }
889 }
890
891 delete[] log_file_name;
892
893 return(exist);
894 }
895 };
896
897 /** Iterate over all the UNDO tablespaces and check if any of the UNDO
898 tablespace qualifies for TRUNCATE (size > threshold).
899 @param[in,out] undo_trunc undo truncate tracker */
900 static
901 void
trx_purge_mark_undo_for_truncate(undo::Truncate * undo_trunc)902 trx_purge_mark_undo_for_truncate(
903 undo::Truncate* undo_trunc)
904 {
905 /* Step-1: If UNDO Tablespace
906 - already marked for truncate (OR)
907 - truncate disabled
908 return immediately else search for qualifying tablespace. */
909 if (undo_trunc->is_marked() || !srv_undo_log_truncate) {
910 return;
911 }
912
913 /* Step-2: Validation/Qualification checks
914 a. At-least 2 UNDO tablespaces so even if one UNDO tablespace
915 is being truncated server can continue to operate.
916 b. At-least 2 UNDO redo rseg/undo logs (besides the default rseg-0)
917 b. At-least 1 UNDO tablespace size > threshold. */
918 if (srv_undo_tablespaces_active < 2
919 || (srv_rollback_segments < (1 + srv_tmp_undo_logs + 2))) {
920 return;
921 }
922
923 /* Avoid bias selection and so start the scan from immediate next
924 of last selected UNDO tablespace for truncate. */
925 ulint space_id = undo_trunc->get_scan_start();
926
927 for (ulint i = 1; i <= srv_undo_tablespaces_active; i++) {
928
929 ut_ad(srv_undo_space_id_start != 0);
930
931 if (fil_space_get_size(space_id)
932 > (srv_max_undo_log_size / srv_page_size)) {
933 /* Tablespace qualifies for truncate. */
934 undo_trunc->mark(space_id);
935 undo::Truncate::add_space_to_trunc_list(space_id);
936 break;
937 }
938
939 space_id++;
940
941 if (space_id >= (srv_undo_space_id_start
942 + srv_undo_tablespaces_active)) {
943 /* Note: UNDO tablespace ids starts from 1. */
944 space_id = srv_undo_space_id_start;
945 }
946
947 if (undo_trunc->is_marked()) {
948 break;
949 }
950 }
951
952 undo_trunc->set_scan_start(space_id);
953
954 /* Couldn't make any selection. */
955 if (!undo_trunc->is_marked()) {
956 return;
957 }
958
959 #ifdef UNIV_DEBUG
960 ib::info() << "UNDO tablespace with space identifier "
961 << undo_trunc->get_marked_space_id() << " marked for truncate";
962 #endif /* UNIV_DEBUG */
963
964 /* Step-3: Iterate over all the rsegs of selected UNDO tablespace
965 and mark them temporarily unavailable for allocation.*/
966 for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
967 trx_rseg_t* rseg = trx_sys->rseg_array[i];
968
969 if (rseg != NULL && !trx_sys_is_noredo_rseg_slot(rseg->id)) {
970 if (rseg->space
971 == undo_trunc->get_marked_space_id()) {
972
973 /* Once set this rseg will not be allocated
974 to new booting transaction but we will wait
975 for existing active transaction to finish. */
976 rseg->skip_allocation = true;
977 undo_trunc->add_rseg_to_trunc(rseg);
978 }
979 }
980 }
981 }
982
983 undo::undo_spaces_t undo::Truncate::s_spaces_to_truncate;
984
985 /** Cleanse purge queue to remove the rseg that reside in undo-tablespace
986 marked for truncate.
987 @param[in,out] undo_trunc undo truncate tracker */
988 static
989 void
trx_purge_cleanse_purge_queue(undo::Truncate * undo_trunc)990 trx_purge_cleanse_purge_queue(
991 undo::Truncate* undo_trunc)
992 {
993 mutex_enter(&purge_sys->pq_mutex);
994 typedef std::vector<TrxUndoRsegs> purge_elem_list_t;
995 purge_elem_list_t purge_elem_list;
996
997 /* Remove rseg instances that are in the purge queue before we start
998 truncate of corresponding UNDO truncate. */
999 while (!purge_sys->purge_queue->empty()) {
1000 purge_elem_list.push_back(purge_sys->purge_queue->top());
1001 purge_sys->purge_queue->pop();
1002 }
1003 ut_ad(purge_sys->purge_queue->empty());
1004
1005 for (purge_elem_list_t::iterator it = purge_elem_list.begin();
1006 it != purge_elem_list.end();
1007 ++it) {
1008
1009 for (TrxUndoRsegs::iterator it2 = it->begin();
1010 it2 != it->end();
1011 ++it2) {
1012
1013 if ((*it2)->space
1014 == undo_trunc->get_marked_space_id()) {
1015 it->erase(it2);
1016 break;
1017 }
1018 }
1019
1020 const ulint size = it->size();
1021 if (size != 0) {
1022 /* size != 0 suggest that there exist other rsegs that
1023 needs processing so add this element to purge queue.
1024 Note: Other rseg could be non-redo rsegs. */
1025 purge_sys->purge_queue->push(*it);
1026 }
1027 }
1028 mutex_exit(&purge_sys->pq_mutex);
1029 }
1030
1031 /** Iterate over selected UNDO tablespace and check if all the rsegs
1032 that resides in the tablespace are free.
1033 @param[in] limit truncate_limit
1034 @param[in,out] undo_trunc undo truncate tracker */
1035 static
1036 void
trx_purge_initiate_truncate(purge_iter_t * limit,undo::Truncate * undo_trunc)1037 trx_purge_initiate_truncate(
1038 purge_iter_t* limit,
1039 undo::Truncate* undo_trunc)
1040 {
1041 /* Step-1: Early check to findout if any of the the UNDO tablespace
1042 is marked for truncate. */
1043 if (!undo_trunc->is_marked()) {
1044 /* No tablespace marked for truncate yet. */
1045 return;
1046 }
1047
1048 /* Step-2: Scan over each rseg and ensure that it doesn't hold any
1049 active undo records. */
1050 bool all_free = true;
1051
1052 for (ulint i = 0; i < undo_trunc->rsegs_size() && all_free; ++i) {
1053
1054 trx_rseg_t* rseg = undo_trunc->get_ith_rseg(i);
1055
1056 mutex_enter(&rseg->mutex);
1057
1058 if (rseg->trx_ref_count > 0) {
1059 /* This rseg is still being held by an active
1060 transaction. */
1061 all_free = false;
1062 mutex_exit(&rseg->mutex);
1063 continue;
1064 }
1065
1066 ut_ad(rseg->trx_ref_count == 0);
1067 ut_ad(rseg->skip_allocation);
1068
1069 ulint size_of_rsegs = rseg->curr_size;
1070
1071 if (size_of_rsegs == 1) {
1072 mutex_exit(&rseg->mutex);
1073 continue;
1074 } else {
1075
1076 /* There could be cached undo segment. Check if records
1077 in these segments can be purged. Normal purge history
1078 will not touch these cached segment. */
1079 ulint cached_undo_size = 0;
1080
1081 for (trx_undo_t* undo =
1082 UT_LIST_GET_FIRST(rseg->update_undo_cached);
1083 undo != NULL && all_free;
1084 undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1085
1086 if (limit->trx_no < undo->trx_id) {
1087 all_free = false;
1088 } else {
1089 cached_undo_size += undo->size;
1090 }
1091 }
1092
1093 for (trx_undo_t* undo =
1094 UT_LIST_GET_FIRST(rseg->insert_undo_cached);
1095 undo != NULL && all_free;
1096 undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1097
1098 if (limit->trx_no < undo->trx_id) {
1099 all_free = false;
1100 } else {
1101 cached_undo_size += undo->size;
1102 }
1103 }
1104
1105 ut_ad(size_of_rsegs >= (cached_undo_size + 1));
1106
1107 if (size_of_rsegs > (cached_undo_size + 1)) {
1108 /* There are pages besides cached pages that
1109 still hold active data. */
1110 all_free = false;
1111 }
1112 }
1113
1114 mutex_exit(&rseg->mutex);
1115 }
1116
1117 if (!all_free) {
1118 /* rseg still holds active data.*/
1119 return;
1120 }
1121
1122
1123 /* Step-3: Start the actual truncate.
1124 a. log-checkpoint
1125 b. Write the DDL log to protect truncate action from CRASH
1126 c. Remove rseg instance if added to purge queue before we
1127 initiate truncate.
1128 d. Execute actual truncate
1129 e. Remove the DDL log. */
1130 DBUG_EXECUTE_IF("ib_undo_trunc_before_checkpoint",
1131 ib::info() << "ib_undo_trunc_before_checkpoint";
1132 DBUG_SUICIDE(););
1133
1134 /* After truncate if server crashes then redo logging done for this
1135 undo tablespace might not stand valid as tablespace has been
1136 truncated. */
1137 log_make_checkpoint_at(LSN_MAX, TRUE);
1138
1139 ib::info() << "Truncating UNDO tablespace with space identifier "
1140 << undo_trunc->get_marked_space_id();
1141
1142 DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_start",
1143 ib::info() << "ib_undo_trunc_before_ddl_log_start";
1144 DBUG_SUICIDE(););
1145
1146 #ifdef UNIV_DEBUG
1147 dberr_t err =
1148 #endif /* UNIV_DEBUG */
1149 undo_trunc->start_logging(
1150 undo_trunc->get_marked_space_id());
1151 ut_ad(err == DB_SUCCESS);
1152
1153 DBUG_EXECUTE_IF("ib_undo_trunc_before_truncate",
1154 ib::info() << "ib_undo_trunc_before_truncate";
1155 DBUG_SUICIDE(););
1156
1157 trx_purge_cleanse_purge_queue(undo_trunc);
1158
1159 bool success = trx_undo_truncate_tablespace(undo_trunc);
1160 if (!success) {
1161 /* Note: In case of error we don't enable the rsegs
1162 and neither unmark the tablespace so the tablespace
1163 continue to remain inactive. */
1164 ib::error() << "Failed to truncate UNDO tablespace with"
1165 " space identifier "
1166 << undo_trunc->get_marked_space_id();
1167 return;
1168 }
1169
1170 if (purge_sys->rseg != NULL
1171 && purge_sys->rseg->last_page_no == FIL_NULL) {
1172 /* If purge_sys->rseg is pointing to rseg that was recently
1173 truncated then move to next rseg element.
1174 Note: Ideally purge_sys->rseg should be NULL because purge
1175 should complete processing of all the records but there is
1176 purge_batch_size that can force the purge loop to exit before
1177 all the records are purged and in this case purge_sys->rseg
1178 could point to a valid rseg waiting for next purge cycle. */
1179 purge_sys->next_stored = FALSE;
1180 purge_sys->rseg = NULL;
1181 }
1182
1183 DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_end",
1184 ib::info() << "ib_undo_trunc_before_ddl_log_end";
1185 DBUG_SUICIDE(););
1186
1187 log_make_checkpoint_at(LSN_MAX, TRUE);
1188
1189 undo_trunc->done_logging(undo_trunc->get_marked_space_id());
1190
1191 /* Completed truncate. Now it is safe to re-use the tablespace. */
1192 for (ulint i = 0; i < undo_trunc->rsegs_size(); ++i) {
1193 trx_rseg_t* rseg = undo_trunc->get_ith_rseg(i);
1194 rseg->skip_allocation = false;
1195 }
1196
1197 ib::info() << "Completed truncate of UNDO tablespace with space"
1198 " identifier " << undo_trunc->get_marked_space_id();
1199
1200 undo_trunc->reset();
1201 undo::Truncate::clear_trunc_list();
1202
1203 DBUG_EXECUTE_IF("ib_undo_trunc_trunc_done",
1204 ib::info() << "ib_undo_trunc_trunc_done";
1205 DBUG_SUICIDE(););
1206 }
1207
1208 /********************************************************************//**
1209 Removes unnecessary history data from rollback segments. NOTE that when this
1210 function is called, the caller must not have any latches on undo log pages! */
1211 static
1212 void
trx_purge_truncate_history(purge_iter_t * limit,const ReadView * view)1213 trx_purge_truncate_history(
1214 /*========================*/
1215 purge_iter_t* limit, /*!< in: truncate limit */
1216 const ReadView* view) /*!< in: purge view */
1217 {
1218 ulint i;
1219
1220 /* We play safe and set the truncate limit at most to the purge view
1221 low_limit number, though this is not necessary */
1222
1223 if (limit->trx_no >= view->low_limit_no()) {
1224 limit->trx_no = view->low_limit_no();
1225 limit->undo_no = 0;
1226 limit->undo_rseg_space = ULINT_UNDEFINED;
1227 }
1228
1229 ut_ad(limit->trx_no <= purge_sys->view.low_limit_no());
1230
1231 for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1232 trx_rseg_t* rseg = trx_sys->rseg_array[i];
1233
1234 if (rseg != NULL) {
1235 ut_a(rseg->id == i);
1236 trx_purge_truncate_rseg_history(rseg, limit);
1237 }
1238 }
1239
1240 for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1241 trx_rseg_t* rseg = trx_sys->pending_purge_rseg_array[i];
1242
1243 if (rseg != NULL) {
1244 ut_a(rseg->id == i);
1245 trx_purge_truncate_rseg_history(rseg, limit);
1246 }
1247 }
1248
1249 /* UNDO tablespace truncate. We will try to truncate as much as we
1250 can (greedy approach). This will ensure when the server is idle we
1251 try and truncate all the UNDO tablespaces. */
1252 ulint nchances = srv_undo_tablespaces_active;
1253 for (i = 0; i < nchances; i++) {
1254 trx_purge_mark_undo_for_truncate(&purge_sys->undo_trunc);
1255 trx_purge_initiate_truncate(limit, &purge_sys->undo_trunc);
1256 }
1257 }
1258
1259 /***********************************************************************//**
1260 Updates the last not yet purged history log info in rseg when we have purged
1261 a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
1262 static
1263 void
trx_purge_rseg_get_next_history_log(trx_rseg_t * rseg,ulint * n_pages_handled)1264 trx_purge_rseg_get_next_history_log(
1265 /*================================*/
1266 trx_rseg_t* rseg, /*!< in: rollback segment */
1267 ulint* n_pages_handled)/*!< in/out: number of UNDO pages
1268 handled */
1269 {
1270 page_t* undo_page;
1271 trx_ulogf_t* log_hdr;
1272 fil_addr_t prev_log_addr;
1273 trx_id_t trx_no;
1274 ibool del_marks;
1275 mtr_t mtr;
1276
1277 mutex_enter(&(rseg->mutex));
1278
1279 ut_a(rseg->last_page_no != FIL_NULL);
1280
1281 purge_sys->iter.trx_no = rseg->last_trx_no + 1;
1282 purge_sys->iter.undo_no = 0;
1283 purge_sys->iter.undo_rseg_space = ULINT_UNDEFINED;
1284 purge_sys->next_stored = FALSE;
1285
1286 mtr_start(&mtr);
1287
1288 undo_page = trx_undo_page_get_s_latched(
1289 page_id_t(rseg->space, rseg->last_page_no),
1290 rseg->page_size, &mtr);
1291
1292 log_hdr = undo_page + rseg->last_offset;
1293
1294 /* Increase the purge page count by one for every handled log */
1295
1296 (*n_pages_handled)++;
1297
1298 prev_log_addr = trx_purge_get_log_from_hist(
1299 flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
1300
1301 if (prev_log_addr.page == FIL_NULL) {
1302 /* No logs left in the history list */
1303
1304 rseg->last_page_no = FIL_NULL;
1305
1306 mutex_exit(&(rseg->mutex));
1307 mtr_commit(&mtr);
1308
1309 #ifdef UNIV_DEBUG
1310 trx_sys_mutex_enter();
1311
1312 /* Add debug code to track history list corruption reported
1313 on the MySQL mailing list on Nov 9, 2004. The fut0lst.cc
1314 file-based list was corrupt. The prev node pointer was
1315 FIL_NULL, even though the list length was over 8 million nodes!
1316 We assume that purge truncates the history list in large
1317 size pieces, and if we here reach the head of the list, the
1318 list cannot be longer than 2000 000 undo logs now. */
1319
1320 if (trx_sys->rseg_history_len > 2000000) {
1321 ib::warn() << "Purge reached the head of the history"
1322 " list, but its length is still reported as "
1323 << trx_sys->rseg_history_len << " which is"
1324 " unusually high.";
1325 ib::info() << "This can happen for multiple reasons";
1326 ib::info() << "1. A long running transaction is"
1327 " withholding purging of undo logs or a read"
1328 " view is open. Please try to commit the long"
1329 " running transaction.";
1330 ib::info() << "2. Try increasing the number of purge"
1331 " threads to expedite purging of undo logs.";
1332 }
1333
1334 trx_sys_mutex_exit();
1335 #endif
1336 return;
1337 }
1338
1339 mutex_exit(&rseg->mutex);
1340
1341 mtr_commit(&mtr);
1342
1343 /* Read the trx number and del marks from the previous log header */
1344 mtr_start(&mtr);
1345
1346 log_hdr = trx_undo_page_get_s_latched(page_id_t(rseg->space,
1347 prev_log_addr.page),
1348 rseg->page_size, &mtr)
1349 + prev_log_addr.boffset;
1350
1351 trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
1352
1353 del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);
1354
1355 mtr_commit(&mtr);
1356
1357 mutex_enter(&(rseg->mutex));
1358
1359 rseg->last_page_no = prev_log_addr.page;
1360 rseg->last_offset = prev_log_addr.boffset;
1361 rseg->last_trx_no = trx_no;
1362 rseg->last_del_marks = del_marks;
1363
1364 TrxUndoRsegs elem(rseg->last_trx_no);
1365 elem.push_back(rseg);
1366
1367 /* Purge can also produce events, however these are already ordered
1368 in the rollback segment and any user generated event will be greater
1369 than the events that Purge produces. ie. Purge can never produce
1370 events from an empty rollback segment. */
1371
1372 mutex_enter(&purge_sys->pq_mutex);
1373
1374 purge_sys->purge_queue->push(elem);
1375
1376 mutex_exit(&purge_sys->pq_mutex);
1377
1378 mutex_exit(&rseg->mutex);
1379 }
1380
1381 /** Position the purge sys "iterator" on the undo record to use for purging.
1382 @param[in,out] purge_sys purge instance
1383 @param[in] page_size page size */
1384 static
1385 void
trx_purge_read_undo_rec(trx_purge_t * purge_sys,const page_size_t & page_size)1386 trx_purge_read_undo_rec(
1387 trx_purge_t* purge_sys,
1388 const page_size_t& page_size)
1389 {
1390 ulint offset;
1391 ulint page_no;
1392 ib_uint64_t undo_no;
1393 ulint undo_rseg_space;
1394
1395 purge_sys->hdr_offset = purge_sys->rseg->last_offset;
1396 page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
1397
1398 if (purge_sys->rseg->last_del_marks) {
1399 mtr_t mtr;
1400 trx_undo_rec_t* undo_rec = NULL;
1401
1402 mtr_start(&mtr);
1403
1404 undo_rec = trx_undo_get_first_rec(
1405 purge_sys->rseg->space,
1406 page_size,
1407 purge_sys->hdr_page_no,
1408 purge_sys->hdr_offset, RW_S_LATCH, &mtr);
1409
1410 if (undo_rec != NULL) {
1411 offset = page_offset(undo_rec);
1412 undo_no = trx_undo_rec_get_undo_no(undo_rec);
1413 undo_rseg_space = purge_sys->rseg->space;
1414 page_no = page_get_page_no(page_align(undo_rec));
1415 } else {
1416 offset = 0;
1417 undo_no = 0;
1418 undo_rseg_space = ULINT_UNDEFINED;
1419 }
1420
1421 mtr_commit(&mtr);
1422 } else {
1423 offset = 0;
1424 undo_no = 0;
1425 undo_rseg_space = ULINT_UNDEFINED;
1426 }
1427
1428 purge_sys->offset = offset;
1429 purge_sys->page_no = page_no;
1430 purge_sys->iter.undo_no = undo_no;
1431 purge_sys->iter.undo_rseg_space = undo_rseg_space;
1432
1433 purge_sys->next_stored = TRUE;
1434 }
1435
1436 /***********************************************************************//**
1437 Chooses the next undo log to purge and updates the info in purge_sys. This
1438 function is used to initialize purge_sys when the next record to purge is
1439 not known, and also to update the purge system info on the next record when
1440 purge has handled the whole undo log for a transaction. */
1441 static
1442 void
trx_purge_choose_next_log(void)1443 trx_purge_choose_next_log(void)
1444 /*===========================*/
1445 {
1446 ut_ad(purge_sys->next_stored == FALSE);
1447
1448 const page_size_t& page_size = purge_sys->rseg_iter->set_next();
1449
1450 if (purge_sys->rseg != NULL) {
1451 trx_purge_read_undo_rec(purge_sys, page_size);
1452 } else {
1453 /* There is nothing to do yet. */
1454 os_thread_yield();
1455 }
1456 }
1457
1458 /***********************************************************************//**
1459 Gets the next record to purge and updates the info in the purge system.
1460 @return copy of an undo log record or pointer to the dummy undo log record */
1461 static
1462 trx_undo_rec_t*
trx_purge_get_next_rec(ulint * n_pages_handled,mem_heap_t * heap)1463 trx_purge_get_next_rec(
1464 /*===================*/
1465 ulint* n_pages_handled,/*!< in/out: number of UNDO pages
1466 handled */
1467 mem_heap_t* heap) /*!< in: memory heap where copied */
1468 {
1469 trx_undo_rec_t* rec;
1470 trx_undo_rec_t* rec_copy;
1471 trx_undo_rec_t* rec2;
1472 page_t* undo_page;
1473 page_t* page;
1474 ulint offset;
1475 ulint page_no;
1476 ulint space;
1477 mtr_t mtr;
1478
1479 ut_ad(purge_sys->next_stored);
1480 ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no());
1481
1482 space = purge_sys->rseg->space;
1483 page_no = purge_sys->page_no;
1484 offset = purge_sys->offset;
1485
1486 const page_size_t page_size(purge_sys->rseg->page_size);
1487
1488 if (offset == 0) {
1489 /* It is the dummy undo log record, which means that there is
1490 no need to purge this undo log */
1491
1492 trx_purge_rseg_get_next_history_log(
1493 purge_sys->rseg, n_pages_handled);
1494
1495 /* Look for the next undo log and record to purge */
1496
1497 trx_purge_choose_next_log();
1498
1499 return(&trx_purge_dummy_rec);
1500 }
1501
1502 mtr_start(&mtr);
1503
1504 undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no),
1505 page_size, &mtr);
1506
1507 rec = undo_page + offset;
1508
1509 rec2 = rec;
1510
1511 for (;;) {
1512 ulint type;
1513 trx_undo_rec_t* next_rec;
1514 ulint cmpl_info;
1515
1516 /* Try first to find the next record which requires a purge
1517 operation from the same page of the same undo log */
1518
1519 next_rec = trx_undo_page_get_next_rec(
1520 rec2, purge_sys->hdr_page_no, purge_sys->hdr_offset);
1521
1522 if (next_rec == NULL) {
1523 rec2 = trx_undo_get_next_rec(
1524 rec2, purge_sys->hdr_page_no,
1525 purge_sys->hdr_offset, &mtr);
1526 break;
1527 }
1528
1529 rec2 = next_rec;
1530
1531 type = trx_undo_rec_get_type(rec2);
1532
1533 if (type == TRX_UNDO_DEL_MARK_REC) {
1534
1535 break;
1536 }
1537
1538 cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
1539
1540 if (trx_undo_rec_get_extern_storage(rec2)) {
1541 break;
1542 }
1543
1544 if ((type == TRX_UNDO_UPD_EXIST_REC)
1545 && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1546 break;
1547 }
1548 }
1549
1550 if (rec2 == NULL) {
1551 mtr_commit(&mtr);
1552
1553 trx_purge_rseg_get_next_history_log(
1554 purge_sys->rseg, n_pages_handled);
1555
1556 /* Look for the next undo log and record to purge */
1557
1558 trx_purge_choose_next_log();
1559
1560 mtr_start(&mtr);
1561
1562 undo_page = trx_undo_page_get_s_latched(
1563 page_id_t(space, page_no), page_size, &mtr);
1564
1565 } else {
1566 page = page_align(rec2);
1567
1568 purge_sys->offset = rec2 - page;
1569 purge_sys->page_no = page_get_page_no(page);
1570 purge_sys->iter.undo_no = trx_undo_rec_get_undo_no(rec2);
1571 purge_sys->iter.undo_rseg_space = space;
1572
1573 if (undo_page != page) {
1574 /* We advance to a new page of the undo log: */
1575 (*n_pages_handled)++;
1576 }
1577 }
1578
1579 rec_copy = trx_undo_rec_copy(undo_page, offset, heap);
1580 mtr_commit(&mtr);
1581 return(rec_copy);
1582 }
1583
1584 /********************************************************************//**
1585 Fetches the next undo log record from the history list to purge. It must be
1586 released with the corresponding release function.
1587 @return copy of an undo log record or pointer to trx_purge_dummy_rec,
1588 if the whole undo log can skipped in purge; NULL if none left */
1589 static MY_ATTRIBUTE((warn_unused_result))
1590 trx_undo_rec_t*
trx_purge_fetch_next_rec(roll_ptr_t * roll_ptr,ulint * n_pages_handled,mem_heap_t * heap)1591 trx_purge_fetch_next_rec(
1592 /*=====================*/
1593 roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */
1594 ulint* n_pages_handled,/*!< in/out: number of UNDO log pages
1595 handled */
1596 mem_heap_t* heap) /*!< in: memory heap where copied */
1597 {
1598 if (!purge_sys->next_stored) {
1599 trx_purge_choose_next_log();
1600
1601 if (!purge_sys->next_stored) {
1602 DBUG_PRINT("ib_purge",
1603 ("no logs left in the history list"));
1604 return(NULL);
1605 }
1606 }
1607
1608 if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) {
1609
1610 return(NULL);
1611 }
1612
1613 /* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n",
1614 os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */
1615
1616 *roll_ptr = trx_undo_build_roll_ptr(
1617 FALSE, purge_sys->rseg->id,
1618 purge_sys->page_no, purge_sys->offset);
1619
1620 /* The following call will advance the stored values of the
1621 purge iterator. */
1622
1623 return(trx_purge_get_next_rec(n_pages_handled, heap));
1624 }
1625
1626 /*******************************************************************//**
1627 This function runs a purge batch.
1628 @return number of undo log pages handled in the batch */
1629 static
1630 ulint
trx_purge_attach_undo_recs(ulint n_purge_threads,trx_purge_t * purge_sys,ulint batch_size)1631 trx_purge_attach_undo_recs(
1632 /*=======================*/
1633 ulint n_purge_threads,/*!< in: number of purge threads */
1634 trx_purge_t* purge_sys, /*!< in/out: purge instance */
1635 ulint batch_size) /*!< in: no. of pages to purge */
1636 {
1637 que_thr_t* thr;
1638 ulint i = 0;
1639 ulint n_pages_handled = 0;
1640 ulint n_thrs = UT_LIST_GET_LEN(purge_sys->query->thrs);
1641
1642 ut_a(n_purge_threads > 0);
1643
1644 purge_sys->limit = purge_sys->iter;
1645
1646 /* Debug code to validate some pre-requisites and reset done flag. */
1647 for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1648 thr != NULL && i < n_purge_threads;
1649 thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
1650
1651 purge_node_t* node;
1652
1653 /* Get the purge node. */
1654 node = (purge_node_t*) thr->child;
1655
1656 ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1657 ut_a(node->undo_recs == NULL);
1658 ut_a(node->done);
1659
1660 node->done = FALSE;
1661 }
1662
1663 /* There should never be fewer nodes than threads, the inverse
1664 however is allowed because we only use purge threads as needed. */
1665 ut_a(i == n_purge_threads);
1666
1667 /* Fetch and parse the UNDO records. The UNDO records are added
1668 to a per purge node vector. */
1669 thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1670 ut_a(n_thrs > 0 && thr != NULL);
1671
1672 ut_ad(trx_purge_check_limit());
1673
1674 i = 0;
1675
1676 for (;;) {
1677 purge_node_t* node;
1678 trx_purge_rec_t* purge_rec;
1679
1680 ut_a(!thr->is_active);
1681
1682 /* Get the purge node. */
1683 node = (purge_node_t*) thr->child;
1684 ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1685
1686 purge_rec = static_cast<trx_purge_rec_t*>(
1687 mem_heap_zalloc(node->heap, sizeof(*purge_rec)));
1688
1689 /* Track the max {trx_id, undo_no} for truncating the
1690 UNDO logs once we have purged the records. */
1691
1692 if (trx_purge_check_limit()) {
1693 purge_sys->limit = purge_sys->iter;
1694 }
1695
1696 /* Fetch the next record, and advance the purge_sys->iter. */
1697 purge_rec->undo_rec = trx_purge_fetch_next_rec(
1698 &purge_rec->roll_ptr, &n_pages_handled, node->heap);
1699
1700 if (purge_rec->undo_rec != NULL) {
1701
1702 if (node->undo_recs == NULL) {
1703 node->undo_recs = ib_vector_create(
1704 ib_heap_allocator_create(node->heap),
1705 sizeof(trx_purge_rec_t),
1706 batch_size);
1707 } else {
1708 ut_a(!ib_vector_is_empty(node->undo_recs));
1709 }
1710
1711 ib_vector_push(node->undo_recs, purge_rec);
1712
1713 if (n_pages_handled >= batch_size) {
1714
1715 break;
1716 }
1717 } else {
1718 break;
1719 }
1720
1721 thr = UT_LIST_GET_NEXT(thrs, thr);
1722
1723 if (!(++i % n_purge_threads)) {
1724 thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1725 }
1726
1727 ut_a(thr != NULL);
1728 }
1729
1730 ut_ad(trx_purge_check_limit());
1731
1732 return(n_pages_handled);
1733 }
1734
1735 /*******************************************************************//**
1736 Calculate the DML delay required.
1737 @return delay in microseconds or ULINT_MAX */
1738 static
1739 ulint
trx_purge_dml_delay(void)1740 trx_purge_dml_delay(void)
1741 /*=====================*/
1742 {
1743 /* Determine how much data manipulation language (DML) statements
1744 need to be delayed in order to reduce the lagging of the purge
1745 thread. */
1746 ulint delay = 0; /* in microseconds; default: no delay */
1747
1748 /* If purge lag is set (ie. > 0) then calculate the new DML delay.
1749 Note: we do a dirty read of the trx_sys_t data structure here,
1750 without holding trx_sys->mutex. */
1751
1752 if (srv_max_purge_lag > 0
1753 && trx_sys->rseg_history_len
1754 > srv_n_purge_threads * srv_purge_batch_size) {
1755 float ratio;
1756
1757 ratio = float(trx_sys->rseg_history_len) / srv_max_purge_lag;
1758
1759 if (ratio > 1.0) {
1760 /* If the history list length exceeds the
1761 srv_max_purge_lag, the data manipulation
1762 statements are delayed by at least 5000
1763 microseconds. */
1764 delay = (ulint) ((ratio - .5) * 10000);
1765 }
1766
1767 if (delay > srv_max_purge_lag_delay) {
1768 delay = srv_max_purge_lag_delay;
1769 }
1770
1771 MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay);
1772 }
1773
1774 return(delay);
1775 }
1776
1777 /*******************************************************************//**
1778 Wait for pending purge jobs to complete. */
1779 static
1780 void
trx_purge_wait_for_workers_to_complete(trx_purge_t * purge_sys)1781 trx_purge_wait_for_workers_to_complete(
1782 /*===================================*/
1783 trx_purge_t* purge_sys) /*!< in: purge instance */
1784 {
1785 ulint n_submitted = purge_sys->n_submitted;
1786
1787 /* Ensure that the work queue empties out. */
1788 while (!os_compare_and_swap_ulint(
1789 &purge_sys->n_completed, n_submitted, n_submitted)) {
1790
1791 if (srv_get_task_queue_length() > 0) {
1792 srv_release_threads(SRV_WORKER, 1);
1793 }
1794
1795 os_thread_yield();
1796 }
1797
1798 /* None of the worker threads should be doing any work. */
1799 ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1800
1801 /* There should be no outstanding tasks as long
1802 as the worker threads are active. */
1803 ut_a(srv_get_task_queue_length() == 0);
1804 }
1805
1806 /******************************************************************//**
1807 Remove old historical changes from the rollback segments. */
1808 static
1809 void
trx_purge_truncate(void)1810 trx_purge_truncate(void)
1811 /*====================*/
1812 {
1813 ut_ad(trx_purge_check_limit());
1814
1815 if (purge_sys->limit.trx_no == 0) {
1816 trx_purge_truncate_history(&purge_sys->iter, &purge_sys->view);
1817 } else {
1818 trx_purge_truncate_history(&purge_sys->limit, &purge_sys->view);
1819 }
1820 }
1821
1822 /*******************************************************************//**
1823 This function runs a purge batch.
1824 @return number of undo log pages handled in the batch */
1825 ulint
trx_purge(ulint n_purge_threads,ulint batch_size,bool truncate)1826 trx_purge(
1827 /*======*/
1828 ulint n_purge_threads, /*!< in: number of purge tasks
1829 to submit to the work queue */
1830 ulint batch_size, /*!< in: the maximum number of records
1831 to purge in one batch */
1832 bool truncate) /*!< in: truncate history if true */
1833 {
1834 que_thr_t* thr = NULL;
1835 ulint n_pages_handled;
1836
1837 ut_a(n_purge_threads > 0);
1838
1839 srv_dml_needed_delay = trx_purge_dml_delay();
1840
1841 /* The number of tasks submitted should be completed. */
1842 ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1843
1844 rw_lock_x_lock(&purge_sys->latch);
1845
1846 purge_sys->view_active = false;
1847
1848 trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
1849
1850 purge_sys->view_active = true;
1851
1852 rw_lock_x_unlock(&purge_sys->latch);
1853
1854 #ifdef UNIV_DEBUG
1855 if (srv_purge_view_update_only_debug) {
1856 return(0);
1857 }
1858 #endif /* UNIV_DEBUG */
1859
1860 /* Fetch the UNDO recs that need to be purged. */
1861 n_pages_handled = trx_purge_attach_undo_recs(
1862 n_purge_threads, purge_sys, batch_size);
1863
1864 /* Do we do an asynchronous purge or not ? */
1865 if (n_purge_threads > 1) {
1866 ulint i = 0;
1867
1868 /* Submit the tasks to the work queue. */
1869 for (i = 0; i < n_purge_threads - 1; ++i) {
1870 thr = que_fork_scheduler_round_robin(
1871 purge_sys->query, thr);
1872
1873 ut_a(thr != NULL);
1874
1875 srv_que_task_enqueue_low(thr);
1876 }
1877
1878 thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
1879 ut_a(thr != NULL);
1880
1881 purge_sys->n_submitted += n_purge_threads - 1;
1882
1883 goto run_synchronously;
1884
1885 /* Do it synchronously. */
1886 } else {
1887 thr = que_fork_scheduler_round_robin(purge_sys->query, NULL);
1888 ut_ad(thr);
1889
1890 run_synchronously:
1891 ++purge_sys->n_submitted;
1892
1893 que_run_threads(thr);
1894
1895 os_atomic_inc_ulint(
1896 &purge_sys->pq_mutex, &purge_sys->n_completed, 1);
1897
1898 if (n_purge_threads > 1) {
1899 trx_purge_wait_for_workers_to_complete(purge_sys);
1900 }
1901 }
1902
1903 ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1904
1905 #ifdef UNIV_DEBUG
1906 rw_lock_x_lock(&purge_sys->latch);
1907 if (purge_sys->limit.trx_no == 0) {
1908 purge_sys->done = purge_sys->iter;
1909 } else {
1910 purge_sys->done = purge_sys->limit;
1911 }
1912 rw_lock_x_unlock(&purge_sys->latch);
1913 #endif /* UNIV_DEBUG */
1914
1915 if (truncate) {
1916 trx_purge_truncate();
1917 }
1918
1919 MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
1920 MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled);
1921
1922 return(n_pages_handled);
1923 }
1924
1925 /*******************************************************************//**
1926 Get the purge state.
1927 @return purge state. */
1928 purge_state_t
trx_purge_state(void)1929 trx_purge_state(void)
1930 /*=================*/
1931 {
1932 purge_state_t state;
1933
1934 rw_lock_x_lock(&purge_sys->latch);
1935
1936 state = purge_sys->state;
1937
1938 rw_lock_x_unlock(&purge_sys->latch);
1939
1940 return(state);
1941 }
1942
1943 /*******************************************************************//**
1944 Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
1945 void
trx_purge_stop(void)1946 trx_purge_stop(void)
1947 /*================*/
1948 {
1949 purge_state_t state;
1950 int64_t sig_count = os_event_reset(purge_sys->event);
1951
1952 ut_a(srv_n_purge_threads > 0);
1953
1954 rw_lock_x_lock(&purge_sys->latch);
1955
1956 ut_a(purge_sys->state != PURGE_STATE_INIT);
1957 ut_a(purge_sys->state != PURGE_STATE_EXIT);
1958 ut_a(purge_sys->state != PURGE_STATE_DISABLED);
1959
1960 ++purge_sys->n_stop;
1961
1962 state = purge_sys->state;
1963
1964 if (state == PURGE_STATE_RUN) {
1965 ib::info() << "Stopping purge";
1966
1967 /* We need to wakeup the purge thread in case it is suspended,
1968 so that it can acknowledge the state change. */
1969
1970 srv_purge_wakeup();
1971 }
1972
1973 purge_sys->state = PURGE_STATE_STOP;
1974
1975 rw_lock_x_unlock(&purge_sys->latch);
1976
1977 if (state != PURGE_STATE_STOP) {
1978
1979 /* Wait for purge coordinator to signal that it
1980 is suspended. */
1981 os_event_wait_low(purge_sys->event, sig_count);
1982 } else {
1983 bool once = true;
1984
1985 rw_lock_x_lock(&purge_sys->latch);
1986
1987 /* Wait for purge to signal that it has actually stopped. */
1988 while (purge_sys->running) {
1989
1990 if (once) {
1991 ib::info() << "Waiting for purge to stop";
1992 once = false;
1993 }
1994
1995 rw_lock_x_unlock(&purge_sys->latch);
1996
1997 os_thread_sleep(10000);
1998
1999 rw_lock_x_lock(&purge_sys->latch);
2000 }
2001
2002 rw_lock_x_unlock(&purge_sys->latch);
2003 }
2004
2005 MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
2006 }
2007
2008 /*******************************************************************//**
2009 Resume purge, move to PURGE_STATE_RUN. */
2010 void
trx_purge_run(void)2011 trx_purge_run(void)
2012 /*===============*/
2013 {
2014 rw_lock_x_lock(&purge_sys->latch);
2015
2016 switch (purge_sys->state) {
2017 case PURGE_STATE_INIT:
2018 case PURGE_STATE_EXIT:
2019 case PURGE_STATE_DISABLED:
2020 ut_error;
2021
2022 case PURGE_STATE_RUN:
2023 case PURGE_STATE_STOP:
2024 break;
2025 }
2026
2027 if (purge_sys->n_stop > 0) {
2028
2029 ut_a(purge_sys->state == PURGE_STATE_STOP);
2030
2031 --purge_sys->n_stop;
2032
2033 if (purge_sys->n_stop == 0) {
2034
2035 ib::info() << "Resuming purge";
2036
2037 purge_sys->state = PURGE_STATE_RUN;
2038 }
2039
2040 MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
2041 } else {
2042 ut_a(purge_sys->state == PURGE_STATE_RUN);
2043 }
2044
2045 rw_lock_x_unlock(&purge_sys->latch);
2046
2047 srv_purge_wakeup();
2048 }
2049