1 /*****************************************************************************
2
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file trx/trx0purge.cc
29 Purge old versions
30
31 Created 3/26/1996 Heikki Tuuri
32 *******************************************************/
33
34 #include "ha_prototypes.h"
35
36 #include "trx0purge.h"
37
38 #ifdef UNIV_NONINL
39 #include "trx0purge.ic"
40 #endif
41
42 #include "fsp0fsp.h"
43 #include "fut0fut.h"
44 #include "mach0data.h"
45 #include "mtr0log.h"
46 #include "os0thread.h"
47 #include "que0que.h"
48 #include "read0read.h"
49 #include "row0mysql.h"
50 #include "row0purge.h"
51 #include "row0upd.h"
52 #include "srv0mon.h"
53 #include "fsp0sysspace.h"
54 #include "srv0srv.h"
55 #include "srv0start.h"
56 #include "sync0sync.h"
57 #include "trx0rec.h"
58 #include "trx0roll.h"
59 #include "trx0rseg.h"
60 #include "trx0trx.h"
61
62 /** Maximum allowable purge history length. <=0 means 'infinite'. */
63 ulong srv_max_purge_lag = 0;
64
65 /** Max DML user threads delay in micro-seconds. */
66 ulong srv_max_purge_lag_delay = 0;
67
68 /** The global data structure coordinating a purge */
69 trx_purge_t* purge_sys = NULL;
70
71 #ifdef UNIV_DEBUG
72 my_bool srv_purge_view_update_only_debug;
73 bool trx_commit_disallowed = false;
74 #endif /* UNIV_DEBUG */
75
76 /** Sentinel value */
77 const TrxUndoRsegs TrxUndoRsegsIterator::NullElement(UINT64_UNDEFINED);
78
79 /** A sentinel undo record used as a return value when we have a whole
80 undo log which can be skipped by purge */
81 static trx_undo_rec_t trx_purge_ignore_rec;
82
83 /** Constructor */
TrxUndoRsegsIterator(trx_purge_t * purge_sys)84 TrxUndoRsegsIterator::TrxUndoRsegsIterator(trx_purge_t* purge_sys)
85 :
86 m_purge_sys(purge_sys),
87 m_trx_undo_rsegs(NullElement),
88 m_iter(m_trx_undo_rsegs.end())
89 {
90 }
91
92 /** Sets the next rseg to purge in m_purge_sys.
93 @return page size of the table for which the log is.
94 NOTE: if rseg is NULL when this function returns this means that
95 there are no rollback segments to purge and then the returned page
96 size object should not be used. */
97 const page_size_t
set_next()98 TrxUndoRsegsIterator::set_next()
99 {
100 mutex_enter(&m_purge_sys->pq_mutex);
101
102 /* Only purge consumes events from the priority queue, user
103 threads only produce the events. */
104
105 /* Check if there are more rsegs to process in the
106 current element. */
107 if (m_iter != m_trx_undo_rsegs.end()) {
108
109 /* We are still processing rollback segment from
110 the same transaction and so expected transaction
111 number shouldn't increase. Undo increment of
112 expected trx_no done by caller assuming rollback
113 segments from given transaction are done. */
114 m_purge_sys->iter.trx_no = (*m_iter)->last_trx_no;
115
116 } else if (!m_purge_sys->purge_queue->empty()) {
117
118 /* Read the next element from the queue.
119 Combine elements if they have same transaction number.
120 This can happen if a transaction shares redo rollback segment
121 with another transaction that has already added it to purge
122 queue and former transaction also needs to schedule non-redo
123 rollback segment for purge. */
124 m_trx_undo_rsegs = NullElement;
125
126 while (!m_purge_sys->purge_queue->empty()) {
127
128 if (m_trx_undo_rsegs.get_trx_no() == UINT64_UNDEFINED) {
129 m_trx_undo_rsegs =
130 purge_sys->purge_queue->top();
131 } else if (purge_sys->purge_queue->top().get_trx_no() ==
132 m_trx_undo_rsegs.get_trx_no()) {
133 m_trx_undo_rsegs.append(
134 purge_sys->purge_queue->top());
135 } else {
136 break;
137 }
138
139 m_purge_sys->purge_queue->pop();
140 }
141
142 m_iter = m_trx_undo_rsegs.begin();
143
144 } else {
145 /* Queue is empty, reset iterator. */
146 m_trx_undo_rsegs = NullElement;
147 m_iter = m_trx_undo_rsegs.end();
148
149 mutex_exit(&m_purge_sys->pq_mutex);
150
151 m_purge_sys->rseg = NULL;
152
153 /* return a dummy object, not going to be used by the caller */
154 return(univ_page_size);
155 }
156
157 m_purge_sys->rseg = *m_iter++;
158
159 mutex_exit(&m_purge_sys->pq_mutex);
160
161 ut_a(m_purge_sys->rseg != NULL);
162
163 mutex_enter(&m_purge_sys->rseg->mutex);
164
165 ut_a(m_purge_sys->rseg->last_page_no != FIL_NULL);
166 ut_ad(m_purge_sys->rseg->last_trx_no == m_trx_undo_rsegs.get_trx_no());
167
168 /* We assume in purge of externally stored fields that
169 space id is in the range of UNDO tablespace space ids
170 unless space is system tablespace */
171 ut_a(srv_is_undo_tablespace(m_purge_sys->rseg->space)
172 || is_system_tablespace(
173 m_purge_sys->rseg->space));
174
175 const page_size_t page_size(m_purge_sys->rseg->page_size);
176
177 ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
178
179 m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no;
180 m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset;
181 m_purge_sys->hdr_page_no = m_purge_sys->rseg->last_page_no;
182
183 mutex_exit(&m_purge_sys->rseg->mutex);
184
185 return(page_size);
186 }
187
188 /****************************************************************//**
189 Builds a purge 'query' graph. The actual purge is performed by executing
190 this query graph.
191 @return own: the query graph */
192 static
193 que_t*
trx_purge_graph_build(trx_t * trx,ulint n_purge_threads)194 trx_purge_graph_build(
195 /*==================*/
196 trx_t* trx, /*!< in: transaction */
197 ulint n_purge_threads) /*!< in: number of purge
198 threads */
199 {
200 ulint i;
201 mem_heap_t* heap;
202 que_fork_t* fork;
203
204 heap = mem_heap_create(512);
205 fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
206 fork->trx = trx;
207
208 for (i = 0; i < n_purge_threads; ++i) {
209 que_thr_t* thr;
210 row_prebuilt_t* prebuilt;
211
212 prebuilt = static_cast<row_prebuilt_t*>(
213 mem_heap_zalloc(heap, sizeof(*prebuilt)));
214
215 thr = que_thr_create(fork, heap, prebuilt);
216
217 thr->child = row_purge_node_create(thr, heap);
218 }
219
220 return(fork);
221 }
222
223 /********************************************************************//**
224 Creates the global purge system control structure and inits the history
225 mutex. */
226 void
trx_purge_sys_create(ulint n_purge_threads,purge_pq_t * purge_queue)227 trx_purge_sys_create(
228 /*=================*/
229 ulint n_purge_threads, /*!< in: number of purge
230 threads */
231 purge_pq_t* purge_queue) /*!< in, own: UNDO log min
232 binary heap */
233 {
234 purge_sys = static_cast<trx_purge_t*>(
235 ut_zalloc_nokey(sizeof(*purge_sys)));
236
237 purge_sys->state = PURGE_STATE_INIT;
238 purge_sys->event = os_event_create(0);
239
240 new (&purge_sys->iter) purge_iter_t;
241 new (&purge_sys->limit) purge_iter_t;
242 new (&purge_sys->undo_trunc) undo::Truncate;
243 #ifdef UNIV_DEBUG
244 new (&purge_sys->done) purge_iter_t;
245 #endif /* UNIV_DEBUG */
246
247 /* Take ownership of purge_queue, we are responsible for freeing it. */
248 purge_sys->purge_queue = purge_queue;
249
250 rw_lock_create(trx_purge_latch_key,
251 &purge_sys->latch, SYNC_PURGE_LATCH);
252
253 mutex_create(LATCH_ID_PURGE_SYS_PQ, &purge_sys->pq_mutex);
254
255 ut_a(n_purge_threads > 0);
256
257 purge_sys->sess = sess_open();
258
259 purge_sys->trx = purge_sys->sess->trx;
260
261 ut_a(purge_sys->trx->sess == purge_sys->sess);
262
263 /* A purge transaction is not a real transaction, we use a transaction
264 here only because the query threads code requires it. It is otherwise
265 quite unnecessary. We should get rid of it eventually. */
266 purge_sys->trx->id = 0;
267 purge_sys->trx->start_time = ut_time_monotonic();
268 purge_sys->trx->state = TRX_STATE_ACTIVE;
269 purge_sys->trx->op_info = "purge trx";
270
271 purge_sys->query = trx_purge_graph_build(
272 purge_sys->trx, n_purge_threads);
273
274 new(&purge_sys->view) ReadView();
275
276 trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
277
278 purge_sys->view_active = true;
279
280 purge_sys->rseg_iter = UT_NEW_NOKEY(TrxUndoRsegsIterator(purge_sys));
281
282 /* Allocate 8K bytes for the initial heap. */
283 purge_sys->heap = mem_heap_create(8 * 1024);
284 }
285
286 /************************************************************************
287 Frees the global purge system control structure. */
288 void
trx_purge_sys_close(void)289 trx_purge_sys_close(void)
290 /*======================*/
291 {
292 for (que_thr_t* thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
293 thr != NULL;
294 thr = UT_LIST_GET_NEXT(thrs, thr)) {
295 if (thr->prebuilt != 0 &&
296 thr->prebuilt->compress_heap != 0) {
297 row_mysql_prebuilt_free_compress_heap(thr->prebuilt);
298 }
299 }
300
301 que_graph_free(purge_sys->query);
302
303 ut_a(purge_sys->trx->id == 0);
304 ut_a(purge_sys->sess->trx == purge_sys->trx);
305
306 purge_sys->trx->state = TRX_STATE_NOT_STARTED;
307
308 sess_close(purge_sys->sess);
309
310 purge_sys->sess = NULL;
311
312 purge_sys->view.close();
313 purge_sys->view.~ReadView();
314
315 rw_lock_free(&purge_sys->latch);
316 mutex_free(&purge_sys->pq_mutex);
317
318 if (purge_sys->purge_queue != NULL) {
319 UT_DELETE(purge_sys->purge_queue);
320 purge_sys->purge_queue = NULL;
321 }
322
323 os_event_destroy(purge_sys->event);
324
325 purge_sys->event = NULL;
326
327 mem_heap_free(purge_sys->heap);
328
329 purge_sys->heap = NULL;
330
331 UT_DELETE(purge_sys->rseg_iter);
332
333 ut_free(purge_sys);
334
335 purge_sys = NULL;
336 }
337
338 /*================ UNDO LOG HISTORY LIST =============================*/
339
340 /********************************************************************//**
341 Adds the update undo log as the first log in the history list. Removes the
342 update undo log segment from the rseg slot if it is too big for reuse. */
343 void
trx_purge_add_update_undo_to_history(trx_t * trx,trx_undo_ptr_t * undo_ptr,page_t * undo_page,bool update_rseg_history_len,ulint n_added_logs,mtr_t * mtr)344 trx_purge_add_update_undo_to_history(
345 /*=================================*/
346 trx_t* trx, /*!< in: transaction */
347 trx_undo_ptr_t* undo_ptr, /*!< in/out: update undo log. */
348 page_t* undo_page, /*!< in: update undo log header page,
349 x-latched */
350 bool update_rseg_history_len,
351 /*!< in: if true: update rseg history
352 len else skip updating it. */
353 ulint n_added_logs, /*!< in: number of logs added */
354 mtr_t* mtr) /*!< in: mtr */
355 {
356 trx_undo_t* undo;
357 trx_rseg_t* rseg;
358 trx_rsegf_t* rseg_header;
359 trx_ulogf_t* undo_header;
360
361 undo = undo_ptr->update_undo;
362 rseg = undo->rseg;
363
364 rseg_header = trx_rsegf_get(
365 undo->rseg->space, undo->rseg->page_no, undo->rseg->page_size,
366 mtr);
367
368 undo_header = undo_page + undo->hdr_offset;
369
370 if (undo->state != TRX_UNDO_CACHED) {
371 ulint hist_size;
372 #ifdef UNIV_DEBUG
373 trx_usegf_t* seg_header = undo_page + TRX_UNDO_SEG_HDR;
374 #endif /* UNIV_DEBUG */
375
376 /* The undo log segment will not be reused */
377
378 if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) {
379 ib::fatal() << "undo->id is " << undo->id;
380 }
381
382 trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
383
384 MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
385
386 hist_size = mtr_read_ulint(
387 rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);
388
389 ut_ad(undo->size == flst_get_len(
390 seg_header + TRX_UNDO_PAGE_LIST));
391
392 mlog_write_ulint(
393 rseg_header + TRX_RSEG_HISTORY_SIZE,
394 hist_size + undo->size, MLOG_4BYTES, mtr);
395 }
396
397 ut_ad(!trx_commit_disallowed);
398
399 /* Add the log as the first in the history list */
400 flst_add_first(rseg_header + TRX_RSEG_HISTORY,
401 undo_header + TRX_UNDO_HISTORY_NODE, mtr);
402
403 if (update_rseg_history_len) {
404 os_atomic_increment_ulint(
405 &trx_sys->rseg_history_len, n_added_logs);
406 if (trx_sys->rseg_history_len
407 > srv_n_purge_threads * srv_purge_batch_size) {
408 srv_wake_purge_thread_if_not_active();
409 }
410 }
411
412 /* Write the trx number to the undo log header */
413 mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
414
415 /* Write information about delete markings to the undo log header */
416
417 if (!undo->del_marks) {
418 mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
419 MLOG_2BYTES, mtr);
420 }
421
422 if (rseg->last_page_no == FIL_NULL) {
423 rseg->last_page_no = undo->hdr_page_no;
424 rseg->last_offset = undo->hdr_offset;
425 rseg->last_trx_no = trx->no;
426 rseg->last_del_marks = undo->del_marks;
427 }
428 }
429
430 /** Remove undo log header from the history list.
431 @param[in,out] rseg_hdr rollback segment header
432 @param[in] log_hdr undo log segment header
433 @param[in,out] mtr mini transaction. */
434 static
435 void
trx_purge_remove_log_hdr(trx_rsegf_t * rseg_hdr,trx_ulogf_t * log_hdr,mtr_t * mtr)436 trx_purge_remove_log_hdr(
437 trx_rsegf_t* rseg_hdr,
438 trx_ulogf_t* log_hdr,
439 mtr_t* mtr)
440 {
441 flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
442 log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
443
444 os_atomic_decrement_ulint(&trx_sys->rseg_history_len, 1);
445 }
446
447 /** Frees an undo log segment which is in the history list. Removes the
448 undo log hdr from the history list.
449 @param[in,out] rseg rollback segment
450 @param[in] hdr_addr file address of log_hdr
451 @param[in] noredo skip redo logging. */
452 static
453 void
trx_purge_free_segment(trx_rseg_t * rseg,fil_addr_t hdr_addr,bool noredo)454 trx_purge_free_segment(
455 trx_rseg_t* rseg,
456 fil_addr_t hdr_addr,
457 bool noredo)
458 {
459 mtr_t mtr;
460 trx_rsegf_t* rseg_hdr;
461 trx_ulogf_t* log_hdr;
462 trx_usegf_t* seg_hdr;
463 ulint seg_size;
464 ulint hist_size;
465 bool marked = noredo;
466
467 for (;;) {
468 page_t* undo_page;
469
470 mtr_start(&mtr);
471 if (noredo) {
472 mtr.set_log_mode(MTR_LOG_NO_REDO);
473 }
474 ut_ad(noredo == trx_sys_is_noredo_rseg_slot(rseg->id));
475
476 mutex_enter(&rseg->mutex);
477
478 rseg_hdr = trx_rsegf_get(
479 rseg->space, rseg->page_no, rseg->page_size, &mtr);
480
481 undo_page = trx_undo_page_get(
482 page_id_t(rseg->space, hdr_addr.page), rseg->page_size,
483 &mtr);
484
485 seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
486 log_hdr = undo_page + hdr_addr.boffset;
487
488 /* Mark the last undo log totally purged, so that if the
489 system crashes, the tail of the undo log will not get accessed
490 again. The list of pages in the undo log tail gets inconsistent
491 during the freeing of the segment, and therefore purge should
492 not try to access them again. */
493
494 if (!marked) {
495 marked = true;
496 mlog_write_ulint(
497 log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
498 MLOG_2BYTES, &mtr);
499 }
500
501 if (fseg_free_step_not_header(
502 seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr)) {
503
504 break;
505 }
506
507 mutex_exit(&rseg->mutex);
508
509 mtr_commit(&mtr);
510 }
511
512 /* The page list may now be inconsistent, but the length field
513 stored in the list base node tells us how big it was before we
514 started the freeing. */
515
516 seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST);
517
518 /* We may free the undo log segment header page; it must be freed
519 within the same mtr as the undo log header is removed from the
520 history list: otherwise, in case of a database crash, the segment
521 could become inaccessible garbage in the file space. */
522
523 trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
524
525 do {
526
527 /* Here we assume that a file segment with just the header
528 page can be freed in a few steps, so that the buffer pool
529 is not flooded with bufferfixed pages: see the note in
530 fsp0fsp.cc. */
531
532 } while (!fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER, false, &mtr));
533
534 hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
535 MLOG_4BYTES, &mtr);
536 ut_ad(hist_size >= seg_size);
537
538 mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
539 hist_size - seg_size, MLOG_4BYTES, &mtr);
540
541 ut_ad(rseg->curr_size >= seg_size);
542
543 rseg->curr_size -= seg_size;
544
545 mutex_exit(&(rseg->mutex));
546
547 mtr_commit(&mtr);
548 }
549
550 /********************************************************************//**
551 Removes unnecessary history data from a rollback segment. */
552 static
553 void
trx_purge_truncate_rseg_history(trx_rseg_t * rseg,const purge_iter_t * limit)554 trx_purge_truncate_rseg_history(
555 /*============================*/
556 trx_rseg_t* rseg, /*!< in: rollback segment */
557 const purge_iter_t* limit) /*!< in: truncate offset */
558 {
559 fil_addr_t hdr_addr;
560 fil_addr_t prev_hdr_addr;
561 trx_rsegf_t* rseg_hdr;
562 page_t* undo_page;
563 trx_ulogf_t* log_hdr;
564 trx_usegf_t* seg_hdr;
565 mtr_t mtr;
566 trx_id_t undo_trx_no;
567 const bool noredo = trx_sys_is_noredo_rseg_slot(
568 rseg->id);
569
570 mtr_start(&mtr);
571 if (noredo) {
572 mtr.set_log_mode(MTR_LOG_NO_REDO);
573 }
574 mutex_enter(&(rseg->mutex));
575
576 rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
577 rseg->page_size, &mtr);
578
579 hdr_addr = trx_purge_get_log_from_hist(
580 flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
581 loop:
582 if (hdr_addr.page == FIL_NULL) {
583
584 mutex_exit(&(rseg->mutex));
585
586 mtr_commit(&mtr);
587
588 return;
589 }
590
591 undo_page = trx_undo_page_get(page_id_t(rseg->space, hdr_addr.page),
592 rseg->page_size, &mtr);
593
594 log_hdr = undo_page + hdr_addr.boffset;
595
596 undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
597
598 if (undo_trx_no >= limit->trx_no) {
599
600 /* limit space_id should match the rollback segment
601 space id to avoid freeing of the page belongs to
602 different rollback segment for the same trx_no. */
603 if (undo_trx_no == limit->trx_no
604 && rseg->space == limit->undo_rseg_space) {
605
606 trx_undo_truncate_start(
607 rseg, hdr_addr.page,
608 hdr_addr.boffset, limit->undo_no);
609 }
610
611 mutex_exit(&(rseg->mutex));
612 mtr_commit(&mtr);
613
614 return;
615 }
616
617 prev_hdr_addr = trx_purge_get_log_from_hist(
618 flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
619
620 seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
621
622 if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
623 && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
624
625 /* We can free the whole log segment */
626
627 mutex_exit(&(rseg->mutex));
628 mtr_commit(&mtr);
629
630 /* calls the trx_purge_remove_log_hdr()
631 inside trx_purge_free_segment(). */
632 trx_purge_free_segment(rseg, hdr_addr, noredo);
633
634 } else {
635 /* Remove the log hdr from the rseg history. */
636 trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
637
638 mutex_exit(&(rseg->mutex));
639 mtr_commit(&mtr);
640 }
641
642 mtr_start(&mtr);
643 if (noredo) {
644 mtr.set_log_mode(MTR_LOG_NO_REDO);
645 }
646 mutex_enter(&(rseg->mutex));
647
648 rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no,
649 rseg->page_size, &mtr);
650
651 hdr_addr = prev_hdr_addr;
652
653 goto loop;
654 }
655
656 /** UNDO log truncate logger. Needed to track state of truncate during crash.
657 An auxiliary redo log file undo_<space_id>_trunc.log will created while the
658 truncate of the UNDO is in progress. This file is required during recovery
659 to complete the truncate. */
660
661 namespace undo {
662
663 /** Populate log file name based on space_id
664 @param[in] space_id id of the undo tablespace.
665 @return DB_SUCCESS or error code */
populate_log_file_name(ulint space_id,char * & log_file_name)666 dberr_t populate_log_file_name(
667 ulint space_id,
668 char*& log_file_name)
669 {
670 ulint log_file_name_sz =
671 strlen(srv_log_group_home_dir) + 22 + 1 /* NUL */
672 + strlen(undo::s_log_prefix)
673 + strlen(undo::s_log_ext);
674
675 log_file_name = new (std::nothrow) char[log_file_name_sz];
676 if (log_file_name == 0) {
677 return(DB_OUT_OF_MEMORY);
678 }
679
680 memset(log_file_name, 0, log_file_name_sz);
681
682 strcpy(log_file_name, srv_log_group_home_dir);
683 ulint log_file_name_len = strlen(log_file_name);
684
685 if (log_file_name[log_file_name_len - 1]
686 != OS_PATH_SEPARATOR) {
687
688 log_file_name[log_file_name_len]
689 = OS_PATH_SEPARATOR;
690 log_file_name_len = strlen(log_file_name);
691 }
692
693 ut_snprintf(log_file_name + log_file_name_len,
694 log_file_name_sz - log_file_name_len,
695 "%s%lu_%s", undo::s_log_prefix,
696 (ulong) space_id, s_log_ext);
697
698 return(DB_SUCCESS);
699 }
700
701 /** Create the truncate log file.
702 @param[in] space_id id of the undo tablespace to truncate.
703 @return DB_SUCCESS or error code. */
init(ulint space_id)704 dberr_t init(ulint space_id)
705 {
706 dberr_t err;
707 char* log_file_name;
708
709 /* Step-1: Create the log file name using the pre-decided
710 prefix/suffix and table id of undo tablepsace to truncate. */
711 err = populate_log_file_name(space_id, log_file_name);
712 if (err != DB_SUCCESS) {
713 return(err);
714 }
715
716 /* Step-2: Create the log file, open it and write 0 to
717 indicate init phase. */
718 bool ret;
719 pfs_os_file_t handle = os_file_create(
720 innodb_log_file_key, log_file_name, OS_FILE_CREATE,
721 OS_FILE_NORMAL, OS_LOG_FILE, srv_read_only_mode, &ret);
722 if (!ret) {
723 delete[] log_file_name;
724 return(DB_IO_ERROR);
725 }
726
727 ulint sz = UNIV_PAGE_SIZE;
728 void* buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
729 if (buf == NULL) {
730 os_file_close(handle);
731 delete[] log_file_name;
732 return(DB_OUT_OF_MEMORY);
733 }
734
735 byte* log_buf = static_cast<byte*>(
736 ut_align(buf, UNIV_PAGE_SIZE));
737
738 IORequest request(IORequest::WRITE);
739
740 request.disable_compression();
741
742 err = os_file_write(
743 request, log_file_name, handle, log_buf, 0, sz);
744
745 os_file_flush(handle);
746 os_file_close(handle);
747
748 ut_free(buf);
749 delete[] log_file_name;
750
751 return(err);
752 }
753
754 /** Mark completion of undo truncate action by writing magic number to
755 the log file and then removing it from the disk.
756 If we are going to remove it from disk then why write magic number ?
757 This is to safeguard from unlink (file-system) anomalies that will keep
758 the link to the file even after unlink action is successfull and
759 ref-count = 0.
760 @param[in] space_id id of the undo tablespace to truncate.*/
done(ulint space_id)761 void done(
762 ulint space_id)
763 {
764 dberr_t err;
765 char* log_file_name;
766
767 /* Step-1: Create the log file name using the pre-decided
768 prefix/suffix and table id of undo tablepsace to truncate. */
769 err = populate_log_file_name(space_id, log_file_name);
770 if (err != DB_SUCCESS) {
771 return;
772 }
773
774 /* Step-2: Open log file and write magic number to
775 indicate done phase. */
776 bool ret;
777 pfs_os_file_t handle =
778 os_file_create_simple_no_error_handling(
779 innodb_log_file_key, log_file_name,
780 OS_FILE_OPEN, OS_FILE_READ_WRITE,
781 srv_read_only_mode, &ret);
782
783 if (!ret) {
784 os_file_delete(innodb_log_file_key, log_file_name);
785 delete[] log_file_name;
786 return;
787 }
788
789 ulint sz = UNIV_PAGE_SIZE;
790 void* buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
791 if (buf == NULL) {
792 os_file_close(handle);
793 os_file_delete(innodb_log_file_key, log_file_name);
794 delete[] log_file_name;
795 return;
796 }
797
798 byte* log_buf = static_cast<byte*>(
799 ut_align(buf, UNIV_PAGE_SIZE));
800
801 mach_write_to_4(log_buf, undo::s_magic);
802
803 IORequest request(IORequest::WRITE);
804
805 request.disable_compression();
806
807 err = os_file_write(
808 request, log_file_name, handle, log_buf, 0, sz);
809
810 ut_ad(err == DB_SUCCESS);
811
812 os_file_flush(handle);
813 os_file_close(handle);
814
815 ut_free(buf);
816 os_file_delete(innodb_log_file_key, log_file_name);
817 delete[] log_file_name;
818 }
819
820 /** Check if TRUNCATE_DDL_LOG file exist.
821 @param[in] space_id id of the undo tablespace.
822 @return true if exist else false. */
is_log_present(ulint space_id)823 bool is_log_present(
824 ulint space_id)
825 {
826 dberr_t err;
827 char* log_file_name;
828
829 /* Step-1: Populate log file name. */
830 err = populate_log_file_name(space_id, log_file_name);
831 if (err != DB_SUCCESS) {
832 return(false);
833 }
834
835 /* Step-2: Check for existence of the file. */
836 bool exist;
837 os_file_type_t type;
838 os_file_status(log_file_name, &exist, &type);
839
840 /* Step-3: If file exists, check it for presence of magic
841 number. If found, then delete the file and report file
842 doesn't exist as presence of magic number suggest that
843 truncate action was complete. */
844
845 if (exist) {
846 bool ret;
847 pfs_os_file_t handle =
848 os_file_create_simple_no_error_handling(
849 innodb_log_file_key, log_file_name,
850 OS_FILE_OPEN, OS_FILE_READ_WRITE,
851 srv_read_only_mode, &ret);
852 if (!ret) {
853 os_file_delete(innodb_log_file_key,
854 log_file_name);
855 delete[] log_file_name;
856 return(false);
857 }
858
859 ulint sz = UNIV_PAGE_SIZE;
860 void* buf = ut_zalloc_nokey(sz + UNIV_PAGE_SIZE);
861 if (buf == NULL) {
862 os_file_close(handle);
863 os_file_delete(innodb_log_file_key,
864 log_file_name);
865 delete[] log_file_name;
866 return(false);
867 }
868
869 byte* log_buf = static_cast<byte*>(
870 ut_align(buf, UNIV_PAGE_SIZE));
871
872 IORequest request(IORequest::READ);
873
874 request.disable_compression();
875
876 dberr_t err;
877
878 err = os_file_read(request, handle, log_buf, 0, sz);
879
880 os_file_close(handle);
881
882 if (err != DB_SUCCESS) {
883
884 ib::info()
885 << "Unable to read '"
886 << log_file_name << "' : "
887 << ut_strerr(err);
888
889 os_file_delete(
890 innodb_log_file_key, log_file_name);
891
892 ut_free(buf);
893
894 delete[] log_file_name;
895
896 return(false);
897 }
898
899 ulint magic_no = mach_read_from_4(log_buf);
900
901 ut_free(buf);
902
903 if (magic_no == undo::s_magic) {
904 /* Found magic number. */
905 os_file_delete(innodb_log_file_key,
906 log_file_name);
907 delete[] log_file_name;
908 return(false);
909 }
910 }
911
912 delete[] log_file_name;
913
914 return(exist);
915 }
916 };
917
918 /** Iterate over all the UNDO tablespaces and check if any of the UNDO
919 tablespace qualifies for TRUNCATE (size > threshold).
920 @param[in,out] undo_trunc undo truncate tracker */
921 static
922 void
trx_purge_mark_undo_for_truncate(undo::Truncate * undo_trunc)923 trx_purge_mark_undo_for_truncate(
924 undo::Truncate* undo_trunc)
925 {
926 /* Step-1: If UNDO Tablespace
927 - already marked for truncate (OR)
928 - truncate disabled
929 return immediately else search for qualifying tablespace. */
930 if (undo_trunc->is_marked() || !srv_undo_log_truncate) {
931 return;
932 }
933
934 /* Step-2: Validation/Qualification checks
935 a. At-least 2 UNDO tablespaces so even if one UNDO tablespace
936 is being truncated server can continue to operate.
937 b. At-least 2 UNDO redo rseg/undo logs (besides the default rseg-0)
938 b. At-least 1 UNDO tablespace size > threshold. */
939 if (srv_undo_tablespaces_active < 2
940 || (srv_rollback_segments < (1 + srv_tmp_undo_logs + 2))) {
941 return;
942 }
943
944 /* Avoid bias selection and so start the scan from immediate next
945 of last selected UNDO tablespace for truncate. */
946 ulint space_id = undo_trunc->get_scan_start();
947
948 for (ulint i = 1; i <= srv_undo_tablespaces_active; i++) {
949
950 ut_ad(srv_undo_space_id_start != 0);
951
952 if (fil_space_get_size(space_id)
953 > (srv_max_undo_log_size / srv_page_size)) {
954 /* Tablespace qualifies for truncate. */
955 undo_trunc->mark(space_id);
956 undo::Truncate::add_space_to_trunc_list(space_id);
957 break;
958 }
959
960 space_id++;
961
962 if (space_id >= (srv_undo_space_id_start
963 + srv_undo_tablespaces_active)) {
964 /* Note: UNDO tablespace ids starts from 1. */
965 space_id = srv_undo_space_id_start;
966 }
967
968 if (undo_trunc->is_marked()) {
969 break;
970 }
971 }
972
973 undo_trunc->set_scan_start(space_id);
974
975 /* Couldn't make any selection. */
976 if (!undo_trunc->is_marked()) {
977 return;
978 }
979
980 #ifdef UNIV_DEBUG
981 ib::info() << "UNDO tablespace with space identifier "
982 << undo_trunc->get_marked_space_id() << " marked for truncate";
983 #endif /* UNIV_DEBUG */
984
985 /* Step-3: Iterate over all the rsegs of selected UNDO tablespace
986 and mark them temporarily unavailable for allocation.*/
987 for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
988 trx_rseg_t* rseg = trx_sys->rseg_array[i];
989
990 if (rseg != NULL && !trx_sys_is_noredo_rseg_slot(rseg->id)) {
991 if (rseg->space
992 == undo_trunc->get_marked_space_id()) {
993
994 /* Once set this rseg will not be allocated
995 to new booting transaction but we will wait
996 for existing active transaction to finish. */
997 rseg->skip_allocation = true;
998 undo_trunc->add_rseg_to_trunc(rseg);
999 }
1000 }
1001 }
1002 }
1003
1004 undo::undo_spaces_t undo::Truncate::s_spaces_to_truncate;
1005
1006 /** Cleanse purge queue to remove the rseg that reside in undo-tablespace
1007 marked for truncate.
1008 @param[in,out] undo_trunc undo truncate tracker */
1009 static
1010 void
trx_purge_cleanse_purge_queue(undo::Truncate * undo_trunc)1011 trx_purge_cleanse_purge_queue(
1012 undo::Truncate* undo_trunc)
1013 {
1014 mutex_enter(&purge_sys->pq_mutex);
1015 typedef std::vector<TrxUndoRsegs> purge_elem_list_t;
1016 purge_elem_list_t purge_elem_list;
1017
1018 /* Remove rseg instances that are in the purge queue before we start
1019 truncate of corresponding UNDO truncate. */
1020 while (!purge_sys->purge_queue->empty()) {
1021 purge_elem_list.push_back(purge_sys->purge_queue->top());
1022 purge_sys->purge_queue->pop();
1023 }
1024 ut_ad(purge_sys->purge_queue->empty());
1025
1026 for (purge_elem_list_t::iterator it = purge_elem_list.begin();
1027 it != purge_elem_list.end();
1028 ++it) {
1029
1030 for (TrxUndoRsegs::iterator it2 = it->begin();
1031 it2 != it->end();
1032 ++it2) {
1033
1034 if ((*it2)->space
1035 == undo_trunc->get_marked_space_id()) {
1036 it->erase(it2);
1037 break;
1038 }
1039 }
1040
1041 const ulint size = it->size();
1042 if (size != 0) {
1043 /* size != 0 suggest that there exist other rsegs that
1044 needs processing so add this element to purge queue.
1045 Note: Other rseg could be non-redo rsegs. */
1046 purge_sys->purge_queue->push(*it);
1047 }
1048 }
1049 mutex_exit(&purge_sys->pq_mutex);
1050 }
1051
1052 /** Iterate over selected UNDO tablespace and check if all the rsegs
1053 that resides in the tablespace are free.
1054 @param[in] limit truncate_limit
1055 @param[in,out] undo_trunc undo truncate tracker */
1056 static
1057 void
trx_purge_initiate_truncate(purge_iter_t * limit,undo::Truncate * undo_trunc)1058 trx_purge_initiate_truncate(
1059 purge_iter_t* limit,
1060 undo::Truncate* undo_trunc)
1061 {
1062 /* Step-1: Early check to findout if any of the the UNDO tablespace
1063 is marked for truncate. */
1064 if (!undo_trunc->is_marked()) {
1065 /* No tablespace marked for truncate yet. */
1066 return;
1067 }
1068
1069 /* Step-2: Scan over each rseg and ensure that it doesn't hold any
1070 active undo records. */
1071 bool all_free = true;
1072
1073 for (ulint i = 0; i < undo_trunc->rsegs_size() && all_free; ++i) {
1074
1075 trx_rseg_t* rseg = undo_trunc->get_ith_rseg(i);
1076
1077 mutex_enter(&rseg->mutex);
1078
1079 if (rseg->trx_ref_count > 0) {
1080 /* This rseg is still being held by an active
1081 transaction. */
1082 all_free = false;
1083 mutex_exit(&rseg->mutex);
1084 continue;
1085 }
1086
1087 ut_ad(rseg->trx_ref_count == 0);
1088 ut_ad(rseg->skip_allocation);
1089
1090 ulint size_of_rsegs = rseg->curr_size;
1091
1092 if (size_of_rsegs == 1) {
1093 mutex_exit(&rseg->mutex);
1094 continue;
1095 } else {
1096
1097 /* There could be cached undo segment. Check if records
1098 in these segments can be purged. Normal purge history
1099 will not touch these cached segment. */
1100 ulint cached_undo_size = 0;
1101
1102 for (trx_undo_t* undo =
1103 UT_LIST_GET_FIRST(rseg->update_undo_cached);
1104 undo != NULL && all_free;
1105 undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1106
1107 if (limit->trx_no < undo->trx_id) {
1108 all_free = false;
1109 } else {
1110 cached_undo_size += undo->size;
1111 }
1112 }
1113
1114 for (trx_undo_t* undo =
1115 UT_LIST_GET_FIRST(rseg->insert_undo_cached);
1116 undo != NULL && all_free;
1117 undo = UT_LIST_GET_NEXT(undo_list, undo)) {
1118
1119 if (limit->trx_no < undo->trx_id) {
1120 all_free = false;
1121 } else {
1122 cached_undo_size += undo->size;
1123 }
1124 }
1125
1126 ut_ad(size_of_rsegs >= (cached_undo_size + 1));
1127
1128 if (size_of_rsegs > (cached_undo_size + 1)) {
1129 /* There are pages besides cached pages that
1130 still hold active data. */
1131 all_free = false;
1132 }
1133 }
1134
1135 mutex_exit(&rseg->mutex);
1136 }
1137
1138 if (!all_free) {
1139 /* rseg still holds active data.*/
1140 return;
1141 }
1142
1143
1144 /* Step-3: Start the actual truncate.
1145 a. log-checkpoint
1146 b. Write the DDL log to protect truncate action from CRASH
1147 c. Remove rseg instance if added to purge queue before we
1148 initiate truncate.
1149 d. Execute actual truncate
1150 e. Remove the DDL log. */
1151 DBUG_EXECUTE_IF("ib_undo_trunc_before_checkpoint",
1152 ib::info() << "ib_undo_trunc_before_checkpoint";
1153 DBUG_SUICIDE(););
1154
1155 /* After truncate if server crashes then redo logging done for this
1156 undo tablespace might not stand valid as tablespace has been
1157 truncated. */
1158 log_make_checkpoint_at(LSN_MAX, TRUE);
1159
1160 ib::info() << "Truncating UNDO tablespace with space identifier "
1161 << undo_trunc->get_marked_space_id();
1162
1163 DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_start",
1164 ib::info() << "ib_undo_trunc_before_ddl_log_start";
1165 DBUG_SUICIDE(););
1166
1167 #ifdef UNIV_DEBUG
1168 dberr_t err =
1169 #endif /* UNIV_DEBUG */
1170 undo_trunc->start_logging(
1171 undo_trunc->get_marked_space_id());
1172 ut_ad(err == DB_SUCCESS);
1173
1174 DBUG_EXECUTE_IF("ib_undo_trunc_before_truncate",
1175 ib::info() << "ib_undo_trunc_before_truncate";
1176 DBUG_SUICIDE(););
1177
1178 trx_purge_cleanse_purge_queue(undo_trunc);
1179
1180 bool success = trx_undo_truncate_tablespace(undo_trunc);
1181 if (!success) {
1182 /* Note: In case of error we don't enable the rsegs
1183 and neither unmark the tablespace so the tablespace
1184 continue to remain inactive. */
1185 ib::error() << "Failed to truncate UNDO tablespace with"
1186 " space identifier "
1187 << undo_trunc->get_marked_space_id();
1188 return;
1189 }
1190
1191 if (purge_sys->rseg != NULL
1192 && purge_sys->rseg->last_page_no == FIL_NULL) {
1193 /* If purge_sys->rseg is pointing to rseg that was recently
1194 truncated then move to next rseg element.
1195 Note: Ideally purge_sys->rseg should be NULL because purge
1196 should complete processing of all the records but there is
1197 purge_batch_size that can force the purge loop to exit before
1198 all the records are purged and in this case purge_sys->rseg
1199 could point to a valid rseg waiting for next purge cycle. */
1200 purge_sys->next_stored = FALSE;
1201 purge_sys->rseg = NULL;
1202 }
1203
1204 DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_end",
1205 ib::info() << "ib_undo_trunc_before_ddl_log_end";
1206 DBUG_SUICIDE(););
1207
1208 log_make_checkpoint_at(LSN_MAX, TRUE);
1209
1210 undo_trunc->done_logging(undo_trunc->get_marked_space_id());
1211
1212 /* Completed truncate. Now it is safe to re-use the tablespace. */
1213 for (ulint i = 0; i < undo_trunc->rsegs_size(); ++i) {
1214 trx_rseg_t* rseg = undo_trunc->get_ith_rseg(i);
1215 rseg->skip_allocation = false;
1216 }
1217
1218 ib::info() << "Completed truncate of UNDO tablespace with space"
1219 " identifier " << undo_trunc->get_marked_space_id();
1220
1221 undo_trunc->reset();
1222 undo::Truncate::clear_trunc_list();
1223
1224 DBUG_EXECUTE_IF("ib_undo_trunc_trunc_done",
1225 ib::info() << "ib_undo_trunc_trunc_done";
1226 DBUG_SUICIDE(););
1227 }
1228
1229 /********************************************************************//**
1230 Removes unnecessary history data from rollback segments. NOTE that when this
1231 function is called, the caller must not have any latches on undo log pages! */
1232 static
1233 void
trx_purge_truncate_history(purge_iter_t * limit,const ReadView * view)1234 trx_purge_truncate_history(
1235 /*========================*/
1236 purge_iter_t* limit, /*!< in: truncate limit */
1237 const ReadView* view) /*!< in: purge view */
1238 {
1239 ulint i;
1240
1241 /* We play safe and set the truncate limit at most to the purge view
1242 low_limit number, though this is not necessary */
1243
1244 if (limit->trx_no >= view->low_limit_no()) {
1245 limit->trx_no = view->low_limit_no();
1246 limit->undo_no = 0;
1247 limit->undo_rseg_space = ULINT_UNDEFINED;
1248 }
1249
1250 ut_ad(limit->trx_no <= purge_sys->view.low_limit_no());
1251
1252 for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1253 trx_rseg_t* rseg = trx_sys->rseg_array[i];
1254
1255 if (rseg != NULL) {
1256 ut_a(rseg->id == i);
1257 trx_purge_truncate_rseg_history(rseg, limit);
1258 }
1259 }
1260
1261 for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1262 trx_rseg_t* rseg = trx_sys->pending_purge_rseg_array[i];
1263
1264 if (rseg != NULL) {
1265 ut_a(rseg->id == i);
1266 trx_purge_truncate_rseg_history(rseg, limit);
1267 }
1268 }
1269
1270 /* UNDO tablespace truncate. We will try to truncate as much as we
1271 can (greedy approach). This will ensure when the server is idle we
1272 try and truncate all the UNDO tablespaces. */
1273 ulint nchances = srv_undo_tablespaces_active;
1274 for (i = 0; i < nchances; i++) {
1275 trx_purge_mark_undo_for_truncate(&purge_sys->undo_trunc);
1276 trx_purge_initiate_truncate(limit, &purge_sys->undo_trunc);
1277 }
1278 }
1279
1280 /***********************************************************************//**
1281 Updates the last not yet purged history log info in rseg when we have purged
1282 a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
1283 static
1284 void
trx_purge_rseg_get_next_history_log(trx_rseg_t * rseg,ulint * n_pages_handled)1285 trx_purge_rseg_get_next_history_log(
1286 /*================================*/
1287 trx_rseg_t* rseg, /*!< in: rollback segment */
1288 ulint* n_pages_handled)/*!< in/out: number of UNDO pages
1289 handled */
1290 {
1291 page_t* undo_page;
1292 trx_ulogf_t* log_hdr;
1293 fil_addr_t prev_log_addr;
1294 trx_id_t trx_no;
1295 ibool del_marks;
1296 mtr_t mtr;
1297
1298 mutex_enter(&(rseg->mutex));
1299
1300 ut_a(rseg->last_page_no != FIL_NULL);
1301
1302 purge_sys->iter.trx_no = rseg->last_trx_no + 1;
1303 purge_sys->iter.undo_no = 0;
1304 purge_sys->iter.undo_rseg_space = ULINT_UNDEFINED;
1305 purge_sys->next_stored = FALSE;
1306
1307 mtr_start(&mtr);
1308
1309 undo_page = trx_undo_page_get_s_latched(
1310 page_id_t(rseg->space, rseg->last_page_no),
1311 rseg->page_size, &mtr);
1312
1313 log_hdr = undo_page + rseg->last_offset;
1314
1315 /* Increase the purge page count by one for every handled log */
1316
1317 (*n_pages_handled)++;
1318
1319 prev_log_addr = trx_purge_get_log_from_hist(
1320 flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
1321
1322 if (prev_log_addr.page == FIL_NULL) {
1323 /* No logs left in the history list */
1324
1325 rseg->last_page_no = FIL_NULL;
1326
1327 mutex_exit(&(rseg->mutex));
1328 mtr_commit(&mtr);
1329
1330 #ifdef UNIV_DEBUG
1331 trx_sys_mutex_enter();
1332
1333 /* Add debug code to track history list corruption reported
1334 on the MySQL mailing list on Nov 9, 2004. The fut0lst.cc
1335 file-based list was corrupt. The prev node pointer was
1336 FIL_NULL, even though the list length was over 8 million nodes!
1337 We assume that purge truncates the history list in large
1338 size pieces, and if we here reach the head of the list, the
1339 list cannot be longer than 2000 000 undo logs now. */
1340
1341 if (trx_sys->rseg_history_len > 2000000) {
1342 ib::warn() << "Purge reached the head of the history"
1343 " list, but its length is still reported as "
1344 << trx_sys->rseg_history_len << " which is"
1345 " unusually high.";
1346 ib::info() << "This can happen for multiple reasons";
1347 ib::info() << "1. A long running transaction is"
1348 " withholding purging of undo logs or a read"
1349 " view is open. Please try to commit the long"
1350 " running transaction.";
1351 ib::info() << "2. Try increasing the number of purge"
1352 " threads to expedite purging of undo logs.";
1353 }
1354
1355 trx_sys_mutex_exit();
1356 #endif
1357 return;
1358 }
1359
1360 mutex_exit(&rseg->mutex);
1361
1362 mtr_commit(&mtr);
1363
1364 /* Read the trx number and del marks from the previous log header */
1365 mtr_start(&mtr);
1366
1367 log_hdr = trx_undo_page_get_s_latched(page_id_t(rseg->space,
1368 prev_log_addr.page),
1369 rseg->page_size, &mtr)
1370 + prev_log_addr.boffset;
1371
1372 trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
1373
1374 del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);
1375
1376 mtr_commit(&mtr);
1377
1378 mutex_enter(&(rseg->mutex));
1379
1380 rseg->last_page_no = prev_log_addr.page;
1381 rseg->last_offset = prev_log_addr.boffset;
1382 rseg->last_trx_no = trx_no;
1383 rseg->last_del_marks = del_marks;
1384
1385 TrxUndoRsegs elem(rseg->last_trx_no);
1386 elem.push_back(rseg);
1387
1388 /* Purge can also produce events, however these are already ordered
1389 in the rollback segment and any user generated event will be greater
1390 than the events that Purge produces. ie. Purge can never produce
1391 events from an empty rollback segment. */
1392
1393 mutex_enter(&purge_sys->pq_mutex);
1394
1395 purge_sys->purge_queue->push(elem);
1396
1397 mutex_exit(&purge_sys->pq_mutex);
1398
1399 mutex_exit(&rseg->mutex);
1400 }
1401
1402 /** Position the purge sys "iterator" on the undo record to use for purging.
1403 @param[in,out] purge_sys purge instance
1404 @param[in] page_size page size */
1405 static
1406 void
trx_purge_read_undo_rec(trx_purge_t * purge_sys,const page_size_t & page_size)1407 trx_purge_read_undo_rec(
1408 trx_purge_t* purge_sys,
1409 const page_size_t& page_size)
1410 {
1411 ulint offset;
1412 ulint page_no;
1413 ib_uint64_t undo_no;
1414 ulint undo_rseg_space;
1415
1416 purge_sys->hdr_offset = purge_sys->rseg->last_offset;
1417 page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
1418
1419 if (purge_sys->rseg->last_del_marks) {
1420 mtr_t mtr;
1421 trx_undo_rec_t* undo_rec = NULL;
1422
1423 mtr_start(&mtr);
1424
1425 undo_rec = trx_undo_get_first_rec(
1426 purge_sys->rseg->space,
1427 page_size,
1428 purge_sys->hdr_page_no,
1429 purge_sys->hdr_offset, RW_S_LATCH, &mtr);
1430
1431 if (undo_rec != NULL) {
1432 offset = page_offset(undo_rec);
1433 undo_no = trx_undo_rec_get_undo_no(undo_rec);
1434 undo_rseg_space = purge_sys->rseg->space;
1435 page_no = page_get_page_no(page_align(undo_rec));
1436 } else {
1437 offset = 0;
1438 undo_no = 0;
1439 undo_rseg_space = ULINT_UNDEFINED;
1440 }
1441
1442 mtr_commit(&mtr);
1443 } else {
1444 offset = 0;
1445 undo_no = 0;
1446 undo_rseg_space = ULINT_UNDEFINED;
1447 }
1448
1449 purge_sys->offset = offset;
1450 purge_sys->page_no = page_no;
1451 purge_sys->iter.undo_no = undo_no;
1452 purge_sys->iter.undo_rseg_space = undo_rseg_space;
1453
1454 purge_sys->next_stored = TRUE;
1455 }
1456
1457 /***********************************************************************//**
1458 Chooses the next undo log to purge and updates the info in purge_sys. This
1459 function is used to initialize purge_sys when the next record to purge is
1460 not known, and also to update the purge system info on the next record when
1461 purge has handled the whole undo log for a transaction. */
1462 static
1463 void
trx_purge_choose_next_log(void)1464 trx_purge_choose_next_log(void)
1465 /*===========================*/
1466 {
1467 ut_ad(purge_sys->next_stored == FALSE);
1468
1469 const page_size_t& page_size = purge_sys->rseg_iter->set_next();
1470
1471 if (purge_sys->rseg != NULL) {
1472 trx_purge_read_undo_rec(purge_sys, page_size);
1473 } else {
1474 /* There is nothing to do yet. */
1475 os_thread_yield();
1476 }
1477 }
1478
1479 /***********************************************************************//**
1480 Gets the next record to purge and updates the info in the purge system.
1481 @return copy of an undo log record or pointer to the dummy undo log record */
1482 static
1483 trx_undo_rec_t*
trx_purge_get_next_rec(ulint * n_pages_handled,mem_heap_t * heap)1484 trx_purge_get_next_rec(
1485 /*===================*/
1486 ulint* n_pages_handled,/*!< in/out: number of UNDO pages
1487 handled */
1488 mem_heap_t* heap) /*!< in: memory heap where copied */
1489 {
1490 trx_undo_rec_t* rec;
1491 trx_undo_rec_t* rec_copy;
1492 trx_undo_rec_t* rec2;
1493 page_t* undo_page;
1494 page_t* page;
1495 ulint offset;
1496 ulint page_no;
1497 ulint space;
1498 mtr_t mtr;
1499
1500 ut_ad(purge_sys->next_stored);
1501 ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no());
1502
1503 space = purge_sys->rseg->space;
1504 page_no = purge_sys->page_no;
1505 offset = purge_sys->offset;
1506
1507 const page_size_t page_size(purge_sys->rseg->page_size);
1508
1509 if (offset == 0) {
1510
1511 /* It is the dummy undo log record, which means that there is
1512 no need to purge this undo log */
1513
1514 trx_purge_rseg_get_next_history_log(
1515 purge_sys->rseg, n_pages_handled);
1516
1517 /* Look for the next undo log and record to purge */
1518
1519 trx_purge_choose_next_log();
1520
1521 return(&trx_purge_ignore_rec);
1522 }
1523
1524 mtr_start(&mtr);
1525
1526 undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no),
1527 page_size, &mtr);
1528
1529 rec = undo_page + offset;
1530
1531 rec2 = rec;
1532
1533 for (;;) {
1534 ulint type;
1535 trx_undo_rec_t* next_rec;
1536 ulint cmpl_info;
1537
1538 /* Try first to find the next record which requires a purge
1539 operation from the same page of the same undo log */
1540
1541 next_rec = trx_undo_page_get_next_rec(
1542 rec2, purge_sys->hdr_page_no, purge_sys->hdr_offset);
1543
1544 if (next_rec == NULL) {
1545 rec2 = trx_undo_get_next_rec(
1546 rec2, purge_sys->hdr_page_no,
1547 purge_sys->hdr_offset, &mtr);
1548 break;
1549 }
1550
1551 rec2 = next_rec;
1552
1553 type = trx_undo_rec_get_type(rec2);
1554
1555 if (type == TRX_UNDO_DEL_MARK_REC) {
1556
1557 break;
1558 }
1559
1560 cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
1561
1562 if (trx_undo_rec_get_extern_storage(rec2)) {
1563 break;
1564 }
1565
1566 if ((type == TRX_UNDO_UPD_EXIST_REC)
1567 && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1568 break;
1569 }
1570 }
1571
1572 if (rec2 == NULL) {
1573 mtr_commit(&mtr);
1574
1575 trx_purge_rseg_get_next_history_log(
1576 purge_sys->rseg, n_pages_handled);
1577
1578 /* Look for the next undo log and record to purge */
1579
1580 trx_purge_choose_next_log();
1581
1582 mtr_start(&mtr);
1583
1584 undo_page = trx_undo_page_get_s_latched(
1585 page_id_t(space, page_no), page_size, &mtr);
1586
1587 } else {
1588 page = page_align(rec2);
1589
1590 purge_sys->offset = rec2 - page;
1591 purge_sys->page_no = page_get_page_no(page);
1592 purge_sys->iter.undo_no = trx_undo_rec_get_undo_no(rec2);
1593 purge_sys->iter.undo_rseg_space = space;
1594
1595 if (undo_page != page) {
1596 /* We advance to a new page of the undo log: */
1597 (*n_pages_handled)++;
1598 }
1599 }
1600
1601 rec_copy = trx_undo_rec_copy(undo_page, offset, heap);
1602 mtr_commit(&mtr);
1603 return(rec_copy);
1604 }
1605
1606 /********************************************************************//**
1607 Fetches the next undo log record from the history list to purge. It must be
1608 released with the corresponding release function.
1609 @return copy of an undo log record or pointer to trx_purge_ignore_rec,
1610 if the whole undo log can skipped in purge; NULL if none left */
1611 static MY_ATTRIBUTE((warn_unused_result))
1612 trx_undo_rec_t*
trx_purge_fetch_next_rec(roll_ptr_t * roll_ptr,ulint * n_pages_handled,mem_heap_t * heap)1613 trx_purge_fetch_next_rec(
1614 /*=====================*/
1615 roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */
1616 ulint* n_pages_handled,/*!< in/out: number of UNDO log pages
1617 handled */
1618 mem_heap_t* heap) /*!< in: memory heap where copied */
1619 {
1620 if (!purge_sys->next_stored) {
1621 trx_purge_choose_next_log();
1622
1623 if (!purge_sys->next_stored) {
1624 DBUG_PRINT("ib_purge",
1625 ("no logs left in the history list"));
1626 return(NULL);
1627 }
1628 }
1629
1630 if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) {
1631
1632 return(NULL);
1633 }
1634
1635 /* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n",
1636 os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */
1637
1638 *roll_ptr = trx_undo_build_roll_ptr(
1639 FALSE, purge_sys->rseg->id,
1640 purge_sys->page_no, purge_sys->offset);
1641
1642 /* The following call will advance the stored values of the
1643 purge iterator. */
1644
1645 return(trx_purge_get_next_rec(n_pages_handled, heap));
1646 }
1647
1648 /** This function runs a purge batch.
1649 @param[in] n_purge_threads number of purge threads
1650 @param[in,out] purge_sys purge instance
1651 @param[in] batch_size no. of pages to purge
1652 @return number of undo log pages handled in the batch */
1653 static
1654 ulint
trx_purge_attach_undo_recs(const ulint n_purge_threads,trx_purge_t * purge_sys,ulint batch_size)1655 trx_purge_attach_undo_recs(
1656 const ulint n_purge_threads,
1657 trx_purge_t* purge_sys,
1658 ulint batch_size)
1659 {
1660 que_thr_t* thr;
1661 ulint n_pages_handled = 0;
1662
1663 ut_a(n_purge_threads > 0);
1664 ut_a(n_purge_threads <= SRV_MAX_N_PURGE_THREADS);
1665
1666 purge_sys->limit = purge_sys->iter;
1667
1668 que_thr_t* run_thrs[SRV_MAX_N_PURGE_THREADS];
1669
1670 /* Validate some pre-requisites and reset done flag. */
1671 ulint i = 0;
1672
1673 for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1674 thr != NULL && i < n_purge_threads;
1675 thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
1676
1677 purge_node_t* node;
1678
1679 /* Get the purge node. */
1680 node = static_cast<purge_node_t*>(thr->child);
1681
1682 ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1683 ut_a(node->recs == NULL);
1684 ut_a(node->done);
1685
1686 node->done = false;
1687
1688 ut_a(!thr->is_active);
1689
1690 run_thrs[i] = thr;
1691 }
1692
1693 /* There should never be fewer nodes than threads, the inverse
1694 however is allowed because we only use purge threads as needed. */
1695 ut_a(i == n_purge_threads);
1696 ut_ad(trx_purge_check_limit());
1697
1698 mem_heap_t* heap = purge_sys->heap;
1699
1700 mem_heap_empty(heap);
1701
1702 typedef std::map<
1703 table_id_t, purge_node_t::Recs *, std::less<table_id_t>,
1704 mem_heap_allocator<std::pair<const table_id_t, purge_node_t::Recs *> > >
1705 GroupBy;
1706
1707 GroupBy group_by((GroupBy::key_compare()), mem_heap_allocator<GroupBy::value_type>(heap));
1708
1709 for (ulint i = 0; n_pages_handled < batch_size; ++i) {
1710
1711 /* Track the max {trx_id, undo_no} for truncating the
1712 UNDO logs once we have purged the records. */
1713
1714 if (trx_purge_check_limit()) {
1715 purge_sys->limit = purge_sys->iter;
1716 }
1717
1718 purge_node_t::rec_t rec;
1719
1720 /* Fetch the next record, and advance the purge_sys->iter. */
1721 rec.undo_rec = trx_purge_fetch_next_rec(
1722 &rec.roll_ptr, &n_pages_handled, heap);
1723
1724 if (rec.undo_rec == &trx_purge_ignore_rec) {
1725
1726 continue;
1727
1728 } else if (rec.undo_rec == NULL) {
1729
1730 break;
1731 }
1732
1733 table_id_t table_id;
1734
1735 table_id = trx_undo_rec_get_table_id(rec.undo_rec);
1736
1737 GroupBy::iterator lb = group_by.lower_bound(table_id);
1738
1739 if (lb != group_by.end()
1740 && !(group_by.key_comp()(table_id, lb->first))) {
1741
1742 lb->second->push_back(rec);
1743
1744 } else {
1745 typedef GroupBy::value_type value_type;
1746
1747 void* ptr;
1748 purge_node_t::Recs* recs;
1749
1750 ptr = mem_heap_alloc(heap, sizeof(purge_node_t::Recs));
1751
1752 /* Call the destructor explicitly in row_purge_end() */
1753 recs = new (ptr) purge_node_t::Recs(mem_heap_allocator<purge_node_t::rec_t>(heap));
1754
1755 recs->push_back(rec);
1756
1757 group_by.insert(lb, value_type(table_id, recs));
1758 }
1759 }
1760
1761 /* Objective is to ensure that all the table entries in one
1762 batch are handled by the same thread. Ths is to avoid contention
1763 on the dict_index_t::lock */
1764
1765 const GroupBy& group_by_const = group_by;
1766 GroupBy::const_iterator end = group_by_const.end();
1767
1768 for (GroupBy::const_iterator it = group_by_const.begin(); it != end; ) {
1769
1770 for (ulint i = 0; i < n_purge_threads && it != end; ++i, ++it) {
1771
1772 purge_node_t* node;
1773
1774 node = static_cast<purge_node_t*>(run_thrs[i]->child);
1775
1776 ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1777
1778 if (node->recs == NULL) {
1779 node->recs = it->second;
1780 } else {
1781 node->recs->insert(
1782 node->recs->end(),
1783 it->second->begin(),
1784 it->second->end());
1785 }
1786 }
1787 }
1788
1789 ut_ad(trx_purge_check_limit());
1790
1791 return(n_pages_handled);
1792 }
1793
1794 /*******************************************************************//**
1795 Calculate the DML delay required.
1796 @return delay in microseconds or ULINT_MAX */
1797 static
1798 ulint
trx_purge_dml_delay(void)1799 trx_purge_dml_delay(void)
1800 /*=====================*/
1801 {
1802 /* Determine how much data manipulation language (DML) statements
1803 need to be delayed in order to reduce the lagging of the purge
1804 thread. */
1805 ulint delay = 0; /* in microseconds; default: no delay */
1806
1807 /* If purge lag is set (ie. > 0) then calculate the new DML delay.
1808 Note: we do a dirty read of the trx_sys_t data structure here,
1809 without holding trx_sys->mutex. */
1810
1811 if (srv_max_purge_lag > 0
1812 && trx_sys->rseg_history_len
1813 > srv_n_purge_threads * srv_purge_batch_size) {
1814 float ratio;
1815
1816 ratio = float(trx_sys->rseg_history_len) / srv_max_purge_lag;
1817
1818 if (ratio > 1.0) {
1819 /* If the history list length exceeds the
1820 srv_max_purge_lag, the data manipulation
1821 statements are delayed by at least 5000
1822 microseconds. */
1823 delay = (ulint) ((ratio - .5) * 10000);
1824 }
1825
1826 if (delay > srv_max_purge_lag_delay) {
1827 delay = srv_max_purge_lag_delay;
1828 }
1829
1830 MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay);
1831 }
1832
1833 return(delay);
1834 }
1835
1836 /*******************************************************************//**
1837 Wait for pending purge jobs to complete. */
1838 static
1839 void
trx_purge_wait_for_workers_to_complete(trx_purge_t * purge_sys)1840 trx_purge_wait_for_workers_to_complete(
1841 /*===================================*/
1842 trx_purge_t* purge_sys) /*!< in: purge instance */
1843 {
1844 ulint i = 0;
1845 ulint n_submitted = purge_sys->n_submitted;
1846
1847 /* Ensure that the work queue empties out. */
1848 while (!os_compare_and_swap_ulint(
1849 &purge_sys->n_completed, n_submitted, n_submitted)) {
1850
1851 if (++i < 10) {
1852 os_thread_yield();
1853 } else {
1854
1855 if (srv_get_task_queue_length() > 0) {
1856 srv_release_threads(SRV_WORKER, 1);
1857 }
1858
1859 os_thread_sleep(20);
1860 i = 0;
1861 }
1862 }
1863
1864 /* None of the worker threads should be doing any work. */
1865 ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1866
1867 /* There should be no outstanding tasks as long
1868 as the worker threads are active. */
1869 ut_a(srv_get_task_queue_length() == 0);
1870 }
1871
1872 /******************************************************************//**
1873 Remove old historical changes from the rollback segments. */
1874 static
1875 void
trx_purge_truncate(void)1876 trx_purge_truncate(void)
1877 /*====================*/
1878 {
1879 ut_ad(trx_purge_check_limit());
1880
1881 if (purge_sys->limit.trx_no == 0) {
1882 trx_purge_truncate_history(&purge_sys->iter, &purge_sys->view);
1883 } else {
1884 trx_purge_truncate_history(&purge_sys->limit, &purge_sys->view);
1885 }
1886 }
1887
1888 /*******************************************************************//**
1889 This function runs a purge batch.
1890 @return number of undo log pages handled in the batch */
1891 ulint
trx_purge(ulint n_purge_threads,ulint batch_size,bool truncate)1892 trx_purge(
1893 /*======*/
1894 ulint n_purge_threads, /*!< in: number of purge tasks
1895 to submit to the work queue */
1896 ulint batch_size, /*!< in: the maximum number of records
1897 to purge in one batch */
1898 bool truncate) /*!< in: truncate history if true */
1899 {
1900 que_thr_t* thr = NULL;
1901 ulint n_pages_handled;
1902
1903 ut_a(n_purge_threads > 0);
1904
1905 srv_dml_needed_delay = trx_purge_dml_delay();
1906
1907 /* The number of tasks submitted should be completed. */
1908 ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1909
1910 rw_lock_x_lock(&purge_sys->latch);
1911
1912 purge_sys->view_active = false;
1913
1914 trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
1915
1916 purge_sys->view_active = true;
1917
1918 rw_lock_x_unlock(&purge_sys->latch);
1919
1920 #ifdef UNIV_DEBUG
1921 if (srv_purge_view_update_only_debug) {
1922 return(0);
1923 }
1924 #endif /* UNIV_DEBUG */
1925
1926 /* Fetch the UNDO recs that need to be purged. */
1927 n_pages_handled = trx_purge_attach_undo_recs(
1928 n_purge_threads, purge_sys, batch_size);
1929
1930 /* Do we do an asynchronous purge or not ? */
1931 if (n_purge_threads > 1) {
1932
1933 /* Submit the tasks to the work queue. */
1934 for (ulint i = 0; i < n_purge_threads - 1; ++i) {
1935 thr = que_fork_scheduler_round_robin(
1936 purge_sys->query, thr);
1937
1938 ut_a(thr != NULL);
1939
1940 srv_que_task_enqueue_low(thr);
1941 }
1942
1943 thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
1944 ut_a(thr != NULL);
1945
1946 purge_sys->n_submitted += n_purge_threads - 1;
1947
1948 goto run_synchronously;
1949
1950 /* Do it synchronously. */
1951 } else {
1952 thr = que_fork_scheduler_round_robin(purge_sys->query, NULL);
1953 ut_ad(thr);
1954
1955 run_synchronously:
1956 ++purge_sys->n_submitted;
1957
1958 que_run_threads(thr);
1959
1960 os_atomic_inc_ulint(
1961 &purge_sys->pq_mutex, &purge_sys->n_completed, 1);
1962
1963 if (n_purge_threads > 1) {
1964 trx_purge_wait_for_workers_to_complete(purge_sys);
1965 }
1966 }
1967
1968 ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1969
1970 #ifdef UNIV_DEBUG
1971 rw_lock_x_lock(&purge_sys->latch);
1972 if (purge_sys->limit.trx_no == 0) {
1973 purge_sys->done = purge_sys->iter;
1974 } else {
1975 purge_sys->done = purge_sys->limit;
1976 }
1977 rw_lock_x_unlock(&purge_sys->latch);
1978 #endif /* UNIV_DEBUG */
1979
1980 if (truncate) {
1981 trx_purge_truncate();
1982 }
1983
1984 MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
1985 MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled);
1986
1987 return(n_pages_handled);
1988 }
1989
1990 /*******************************************************************//**
1991 Get the purge state.
1992 @return purge state. */
1993 purge_state_t
trx_purge_state(void)1994 trx_purge_state(void)
1995 /*=================*/
1996 {
1997 purge_state_t state;
1998
1999 rw_lock_x_lock(&purge_sys->latch);
2000
2001 state = purge_sys->state;
2002
2003 rw_lock_x_unlock(&purge_sys->latch);
2004
2005 return(state);
2006 }
2007
2008 /*******************************************************************//**
2009 Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
2010 void
trx_purge_stop(void)2011 trx_purge_stop(void)
2012 /*================*/
2013 {
2014 purge_state_t state;
2015 int64_t sig_count = os_event_reset(purge_sys->event);
2016
2017 ut_a(srv_n_purge_threads > 0);
2018
2019 rw_lock_x_lock(&purge_sys->latch);
2020
2021 ut_a(purge_sys->state != PURGE_STATE_INIT);
2022 ut_a(purge_sys->state != PURGE_STATE_EXIT);
2023 ut_a(purge_sys->state != PURGE_STATE_DISABLED);
2024
2025 ++purge_sys->n_stop;
2026
2027 state = purge_sys->state;
2028
2029 if (state == PURGE_STATE_RUN) {
2030 ib::info() << "Stopping purge";
2031
2032 /* We need to wakeup the purge thread in case it is suspended,
2033 so that it can acknowledge the state change. */
2034
2035 srv_purge_wakeup();
2036 }
2037
2038 purge_sys->state = PURGE_STATE_STOP;
2039
2040 rw_lock_x_unlock(&purge_sys->latch);
2041
2042 if (state != PURGE_STATE_STOP) {
2043
2044 /* Wait for purge coordinator to signal that it
2045 is suspended. */
2046 os_event_wait_low(purge_sys->event, sig_count);
2047 } else {
2048 bool once = true;
2049
2050 rw_lock_x_lock(&purge_sys->latch);
2051
2052 /* Wait for purge to signal that it has actually stopped. */
2053 while (purge_sys->running) {
2054
2055 if (once) {
2056 ib::info() << "Waiting for purge to stop";
2057 once = false;
2058 }
2059
2060 rw_lock_x_unlock(&purge_sys->latch);
2061
2062 os_thread_sleep(10000);
2063
2064 rw_lock_x_lock(&purge_sys->latch);
2065 }
2066
2067 rw_lock_x_unlock(&purge_sys->latch);
2068 }
2069
2070 MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
2071 }
2072
2073 /*******************************************************************//**
2074 Resume purge, move to PURGE_STATE_RUN. */
2075 void
trx_purge_run(void)2076 trx_purge_run(void)
2077 /*===============*/
2078 {
2079 rw_lock_x_lock(&purge_sys->latch);
2080
2081 switch (purge_sys->state) {
2082 case PURGE_STATE_INIT:
2083 case PURGE_STATE_EXIT:
2084 case PURGE_STATE_DISABLED:
2085 ut_error;
2086
2087 case PURGE_STATE_RUN:
2088 case PURGE_STATE_STOP:
2089 break;
2090 }
2091
2092 if (purge_sys->n_stop > 0) {
2093
2094 ut_a(purge_sys->state == PURGE_STATE_STOP);
2095
2096 --purge_sys->n_stop;
2097
2098 if (purge_sys->n_stop == 0) {
2099
2100 ib::info() << "Resuming purge";
2101
2102 purge_sys->state = PURGE_STATE_RUN;
2103 }
2104
2105 MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
2106 } else {
2107 ut_a(purge_sys->state == PURGE_STATE_RUN);
2108 }
2109
2110 rw_lock_x_unlock(&purge_sys->latch);
2111
2112 srv_purge_wakeup();
2113 }
2114