1 /*****************************************************************************
2
3 Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2014, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file lock/lock0lock.cc
22 The transaction lock system
23
24 Created 5/7/1996 Heikki Tuuri
25 *******************************************************/
26
27 #define LOCK_MODULE_IMPLEMENTATION
28
29 #include "univ.i"
30
31 #include <mysql/service_thd_error_context.h>
32 #include <sql_class.h>
33
34 #include "lock0lock.h"
35 #include "lock0priv.h"
36 #include "dict0mem.h"
37 #include "trx0purge.h"
38 #include "trx0sys.h"
39 #include "ut0vec.h"
40 #include "btr0cur.h"
41 #include "row0sel.h"
42 #include "row0mysql.h"
43 #include "row0vers.h"
44 #include "pars0pars.h"
45
46 #include <set>
47
48 #ifdef WITH_WSREP
49 #include <mysql/service_wsrep.h>
50 #endif /* WITH_WSREP */
51
52 /** Lock scheduling algorithm */
53 ulong innodb_lock_schedule_algorithm;
54
55 /** The value of innodb_deadlock_detect */
56 my_bool innobase_deadlock_detect;
57
58 /*********************************************************************//**
59 Checks if a waiting record lock request still has to wait in a queue.
60 @return lock that is causing the wait */
61 static
62 const lock_t*
63 lock_rec_has_to_wait_in_queue(
64 /*==========================*/
65 const lock_t* wait_lock); /*!< in: waiting record lock */
66
67 /** Grant a lock to a waiting lock request and release the waiting transaction
68 after lock_reset_lock_and_trx_wait() has been called. */
69 static void lock_grant_after_reset(lock_t* lock);
70
71 extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
72 extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
73 extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
74
75 /** Pretty-print a table lock.
76 @param[in,out] file output stream
77 @param[in] lock table lock */
78 static void lock_table_print(FILE* file, const lock_t* lock);
79
80 /** Pretty-print a record lock.
81 @param[in,out] file output stream
82 @param[in] lock record lock
83 @param[in,out] mtr mini-transaction for accessing the record */
84 static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr);
85
86 /** Deadlock checker. */
87 class DeadlockChecker {
88 public:
89 /** Check if a joining lock request results in a deadlock.
90 If a deadlock is found, we will resolve the deadlock by
91 choosing a victim transaction and rolling it back.
92 We will attempt to resolve all deadlocks.
93
94 @param[in] lock the lock request
95 @param[in,out] trx transaction requesting the lock
96
97 @return trx if it was chosen as victim
98 @retval NULL if another victim was chosen,
99 or there is no deadlock (any more) */
100 static const trx_t* check_and_resolve(const lock_t* lock, trx_t* trx);
101
102 private:
103 /** Do a shallow copy. Default destructor OK.
104 @param trx the start transaction (start node)
105 @param wait_lock lock that a transaction wants
106 @param mark_start visited node counter
107 @param report_waiters whether to call thd_rpl_deadlock_check() */
DeadlockChecker(const trx_t * trx,const lock_t * wait_lock,ib_uint64_t mark_start,bool report_waiters)108 DeadlockChecker(
109 const trx_t* trx,
110 const lock_t* wait_lock,
111 ib_uint64_t mark_start,
112 bool report_waiters)
113 :
114 m_cost(),
115 m_start(trx),
116 m_too_deep(),
117 m_wait_lock(wait_lock),
118 m_mark_start(mark_start),
119 m_n_elems(),
120 m_report_waiters(report_waiters)
121 {
122 }
123
124 /** Check if the search is too deep. */
is_too_deep() const125 bool is_too_deep() const
126 {
127 return(m_n_elems > LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK
128 || m_cost > LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK);
129 }
130
131 /** Save current state.
132 @param lock lock to push on the stack.
133 @param heap_no the heap number to push on the stack.
134 @return false if stack is full. */
push(const lock_t * lock,ulint heap_no)135 bool push(const lock_t* lock, ulint heap_no)
136 {
137 ut_ad((lock_get_type_low(lock) & LOCK_REC)
138 || (lock_get_type_low(lock) & LOCK_TABLE));
139
140 ut_ad(((lock_get_type_low(lock) & LOCK_TABLE) != 0)
141 == (heap_no == ULINT_UNDEFINED));
142
143 /* Ensure that the stack is bounded. */
144 if (m_n_elems >= UT_ARR_SIZE(s_states)) {
145 return(false);
146 }
147
148 state_t& state = s_states[m_n_elems++];
149
150 state.m_lock = lock;
151 state.m_wait_lock = m_wait_lock;
152 state.m_heap_no =heap_no;
153
154 return(true);
155 }
156
157 /** Restore state.
158 @param[out] lock current lock
159 @param[out] heap_no current heap_no */
pop(const lock_t * & lock,ulint & heap_no)160 void pop(const lock_t*& lock, ulint& heap_no)
161 {
162 ut_a(m_n_elems > 0);
163
164 const state_t& state = s_states[--m_n_elems];
165
166 lock = state.m_lock;
167 heap_no = state.m_heap_no;
168 m_wait_lock = state.m_wait_lock;
169 }
170
171 /** Check whether the node has been visited.
172 @param lock lock to check
173 @return true if the node has been visited */
is_visited(const lock_t * lock) const174 bool is_visited(const lock_t* lock) const
175 {
176 return(lock->trx->lock.deadlock_mark > m_mark_start);
177 }
178
179 /** Get the next lock in the queue that is owned by a transaction
180 whose sub-tree has not already been searched.
181 Note: "next" here means PREV for table locks.
182 @param lock Lock in queue
183 @param heap_no heap_no if lock is a record lock else ULINT_UNDEFINED
184 @return next lock or NULL if at end of queue */
185 const lock_t* get_next_lock(const lock_t* lock, ulint heap_no) const;
186
187 /** Get the first lock to search. The search starts from the current
188 wait_lock. What we are really interested in is an edge from the
189 current wait_lock's owning transaction to another transaction that has
190 a lock ahead in the queue. We skip locks where the owning transaction's
191 sub-tree has already been searched.
192
193 Note: The record locks are traversed from the oldest lock to the
194 latest. For table locks we go from latest to oldest.
195
196 For record locks, we first position the iterator on first lock on
197 the page and then reposition on the actual heap_no. This is required
198 due to the way the record lock has is implemented.
199
200 @param[out] heap_no if rec lock, else ULINT_UNDEFINED.
201
202 @return first lock or NULL */
203 const lock_t* get_first_lock(ulint* heap_no) const;
204
205 /** Notify that a deadlock has been detected and print the conflicting
206 transaction info.
207 @param lock lock causing deadlock */
208 void notify(const lock_t* lock) const;
209
210 /** Select the victim transaction that should be rolledback.
211 @return victim transaction */
212 const trx_t* select_victim() const;
213
214 /** Rollback transaction selected as the victim. */
215 void trx_rollback();
216
217 /** Looks iteratively for a deadlock. Note: the joining transaction
218 may have been granted its lock by the deadlock checks.
219
220 @return 0 if no deadlock else the victim transaction.*/
221 const trx_t* search();
222
223 /** Print transaction data to the deadlock file and possibly to stderr.
224 @param trx transaction
225 @param max_query_len max query length to print */
226 static void print(const trx_t* trx, ulint max_query_len);
227
228 /** rewind(3) the file used for storing the latest detected deadlock
229 and print a heading message to stderr if printing of all deadlocks to
230 stderr is enabled. */
231 static void start_print();
232
233 /** Print lock data to the deadlock file and possibly to stderr.
234 @param lock record or table type lock */
235 static void print(const lock_t* lock);
236
237 /** Print a message to the deadlock file and possibly to stderr.
238 @param msg message to print */
239 static void print(const char* msg);
240
241 /** Print info about transaction that was rolled back.
242 @param trx transaction rolled back
243 @param lock lock trx wants */
244 static void rollback_print(const trx_t* trx, const lock_t* lock);
245
246 private:
247 /** DFS state information, used during deadlock checking. */
248 struct state_t {
249 const lock_t* m_lock; /*!< Current lock */
250 const lock_t* m_wait_lock; /*!< Waiting for lock */
251 ulint m_heap_no; /*!< heap number if rec lock */
252 };
253
254 /** Used in deadlock tracking. Protected by lock_sys.mutex. */
255 static ib_uint64_t s_lock_mark_counter;
256
257 /** Calculation steps thus far. It is the count of the nodes visited. */
258 ulint m_cost;
259
260 /** Joining transaction that is requesting a lock in an
261 incompatible mode */
262 const trx_t* m_start;
263
264 /** TRUE if search was too deep and was aborted */
265 bool m_too_deep;
266
267 /** Lock that trx wants */
268 const lock_t* m_wait_lock;
269
270 /** Value of lock_mark_count at the start of the deadlock check. */
271 ib_uint64_t m_mark_start;
272
273 /** Number of states pushed onto the stack */
274 size_t m_n_elems;
275
276 /** This is to avoid malloc/free calls. */
277 static state_t s_states[MAX_STACK_SIZE];
278
279 /** Set if thd_rpl_deadlock_check() should be called for waits. */
280 const bool m_report_waiters;
281 };
282
283 /** Counter to mark visited nodes during deadlock search. */
284 ib_uint64_t DeadlockChecker::s_lock_mark_counter = 0;
285
286 /** The stack used for deadlock searches. */
287 DeadlockChecker::state_t DeadlockChecker::s_states[MAX_STACK_SIZE];
288
289 #ifdef UNIV_DEBUG
290 /*********************************************************************//**
291 Validates the lock system.
292 @return TRUE if ok */
293 static
294 bool
295 lock_validate();
296 /*============*/
297
298 /*********************************************************************//**
299 Validates the record lock queues on a page.
300 @return TRUE if ok */
301 static
302 ibool
303 lock_rec_validate_page(
304 /*===================*/
305 const buf_block_t* block) /*!< in: buffer block */
306 MY_ATTRIBUTE((warn_unused_result));
307 #endif /* UNIV_DEBUG */
308
309 /* The lock system */
310 lock_sys_t lock_sys;
311
312 /** We store info on the latest deadlock error to this buffer. InnoDB
313 Monitor will then fetch it and print */
314 static bool lock_deadlock_found = false;
315
316 /** Only created if !srv_read_only_mode */
317 static FILE* lock_latest_err_file;
318
319 /*********************************************************************//**
320 Reports that a transaction id is insensible, i.e., in the future. */
321 ATTRIBUTE_COLD
322 void
lock_report_trx_id_insanity(trx_id_t trx_id,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,trx_id_t max_trx_id)323 lock_report_trx_id_insanity(
324 /*========================*/
325 trx_id_t trx_id, /*!< in: trx id */
326 const rec_t* rec, /*!< in: user record */
327 dict_index_t* index, /*!< in: index */
328 const rec_offs* offsets, /*!< in: rec_get_offsets(rec, index) */
329 trx_id_t max_trx_id) /*!< in: trx_sys.get_max_trx_id() */
330 {
331 ut_ad(rec_offs_validate(rec, index, offsets));
332 ut_ad(!rec_is_metadata(rec, index));
333
334 ib::error()
335 << "Transaction id " << ib::hex(trx_id)
336 << " associated with record" << rec_offsets_print(rec, offsets)
337 << " in index " << index->name
338 << " of table " << index->table->name
339 << " is greater than the global counter " << max_trx_id
340 << "! The table is corrupted.";
341 }
342
343 /*********************************************************************//**
344 Checks that a transaction id is sensible, i.e., not in the future.
345 @return true if ok */
346 bool
lock_check_trx_id_sanity(trx_id_t trx_id,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)347 lock_check_trx_id_sanity(
348 /*=====================*/
349 trx_id_t trx_id, /*!< in: trx id */
350 const rec_t* rec, /*!< in: user record */
351 dict_index_t* index, /*!< in: index */
352 const rec_offs* offsets) /*!< in: rec_get_offsets(rec, index) */
353 {
354 ut_ad(rec_offs_validate(rec, index, offsets));
355 ut_ad(!rec_is_metadata(rec, index));
356
357 trx_id_t max_trx_id= trx_sys.get_max_trx_id();
358 ut_ad(max_trx_id || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);
359
360 if (UNIV_LIKELY(max_trx_id != 0) && UNIV_UNLIKELY(trx_id >= max_trx_id))
361 {
362 lock_report_trx_id_insanity(trx_id, rec, index, offsets, max_trx_id);
363 return false;
364 }
365 return true;
366 }
367
368 /*********************************************************************//**
369 Checks that a record is seen in a consistent read.
370 @return true if sees, or false if an earlier version of the record
371 should be retrieved */
372 bool
lock_clust_rec_cons_read_sees(const rec_t * rec,dict_index_t * index,const rec_offs * offsets,ReadView * view)373 lock_clust_rec_cons_read_sees(
374 /*==========================*/
375 const rec_t* rec, /*!< in: user record which should be read or
376 passed over by a read cursor */
377 dict_index_t* index, /*!< in: clustered index */
378 const rec_offs* offsets,/*!< in: rec_get_offsets(rec, index) */
379 ReadView* view) /*!< in: consistent read view */
380 {
381 ut_ad(dict_index_is_clust(index));
382 ut_ad(page_rec_is_user_rec(rec));
383 ut_ad(rec_offs_validate(rec, index, offsets));
384 ut_ad(!rec_is_metadata(rec, index));
385
386 /* Temp-tables are not shared across connections and multiple
387 transactions from different connections cannot simultaneously
388 operate on same temp-table and so read of temp-table is
389 always consistent read. */
390 if (index->table->is_temporary()) {
391 return(true);
392 }
393
394 /* NOTE that we call this function while holding the search
395 system latch. */
396
397 trx_id_t trx_id = row_get_rec_trx_id(rec, index, offsets);
398
399 return(view->changes_visible(trx_id, index->table->name));
400 }
401
402 /*********************************************************************//**
403 Checks that a non-clustered index record is seen in a consistent read.
404
405 NOTE that a non-clustered index page contains so little information on
406 its modifications that also in the case false, the present version of
407 rec may be the right, but we must check this from the clustered index
408 record.
409
410 @return true if certainly sees, or false if an earlier version of the
411 clustered index record might be needed */
412 bool
lock_sec_rec_cons_read_sees(const rec_t * rec,const dict_index_t * index,const ReadView * view)413 lock_sec_rec_cons_read_sees(
414 /*========================*/
415 const rec_t* rec, /*!< in: user record which
416 should be read or passed over
417 by a read cursor */
418 const dict_index_t* index, /*!< in: index */
419 const ReadView* view) /*!< in: consistent read view */
420 {
421 ut_ad(page_rec_is_user_rec(rec));
422 ut_ad(!index->is_primary());
423 ut_ad(!rec_is_metadata(rec, index));
424
425 /* NOTE that we might call this function while holding the search
426 system latch. */
427
428 if (index->table->is_temporary()) {
429
430 /* Temp-tables are not shared across connections and multiple
431 transactions from different connections cannot simultaneously
432 operate on same temp-table and so read of temp-table is
433 always consistent read. */
434
435 return(true);
436 }
437
438 trx_id_t max_trx_id = page_get_max_trx_id(page_align(rec));
439
440 ut_ad(max_trx_id > 0);
441
442 return(view->sees(max_trx_id));
443 }
444
445
446 /**
447 Creates the lock system at database start.
448
449 @param[in] n_cells number of slots in lock hash table
450 */
create(ulint n_cells)451 void lock_sys_t::create(ulint n_cells)
452 {
453 ut_ad(this == &lock_sys);
454
455 m_initialised= true;
456
457 waiting_threads = static_cast<srv_slot_t*>
458 (ut_zalloc_nokey(srv_max_n_threads * sizeof *waiting_threads));
459 last_slot = waiting_threads;
460
461 mutex_create(LATCH_ID_LOCK_SYS, &mutex);
462
463 mutex_create(LATCH_ID_LOCK_SYS_WAIT, &wait_mutex);
464
465 timeout_event = os_event_create(0);
466
467 rec_hash = hash_create(n_cells);
468 prdt_hash = hash_create(n_cells);
469 prdt_page_hash = hash_create(n_cells);
470
471 if (!srv_read_only_mode) {
472 lock_latest_err_file = os_file_create_tmpfile();
473 ut_a(lock_latest_err_file);
474 }
475 }
476
477 /** Calculates the fold value of a lock: used in migrating the hash table.
478 @param[in] lock record lock object
479 @return folded value */
480 static
481 ulint
lock_rec_lock_fold(const lock_t * lock)482 lock_rec_lock_fold(
483 const lock_t* lock)
484 {
485 return(lock_rec_fold(lock->un_member.rec_lock.space,
486 lock->un_member.rec_lock.page_no));
487 }
488
489
490 /**
491 Resize the lock hash table.
492
493 @param[in] n_cells number of slots in lock hash table
494 */
resize(ulint n_cells)495 void lock_sys_t::resize(ulint n_cells)
496 {
497 ut_ad(this == &lock_sys);
498
499 mutex_enter(&mutex);
500
501 hash_table_t* old_hash = rec_hash;
502 rec_hash = hash_create(n_cells);
503 HASH_MIGRATE(old_hash, rec_hash, lock_t, hash,
504 lock_rec_lock_fold);
505 hash_table_free(old_hash);
506
507 old_hash = prdt_hash;
508 prdt_hash = hash_create(n_cells);
509 HASH_MIGRATE(old_hash, prdt_hash, lock_t, hash,
510 lock_rec_lock_fold);
511 hash_table_free(old_hash);
512
513 old_hash = prdt_page_hash;
514 prdt_page_hash = hash_create(n_cells);
515 HASH_MIGRATE(old_hash, prdt_page_hash, lock_t, hash,
516 lock_rec_lock_fold);
517 hash_table_free(old_hash);
518
519 /* need to update block->lock_hash_val */
520 for (ulint i = 0; i < srv_buf_pool_instances; ++i) {
521 buf_pool_t* buf_pool = buf_pool_from_array(i);
522
523 buf_pool_mutex_enter(buf_pool);
524 buf_page_t* bpage;
525 bpage = UT_LIST_GET_FIRST(buf_pool->LRU);
526
527 while (bpage != NULL) {
528 if (buf_page_get_state(bpage)
529 == BUF_BLOCK_FILE_PAGE) {
530 buf_block_t* block;
531 block = reinterpret_cast<buf_block_t*>(
532 bpage);
533
534 block->lock_hash_val
535 = lock_rec_hash(
536 bpage->id.space(),
537 bpage->id.page_no());
538 }
539 bpage = UT_LIST_GET_NEXT(LRU, bpage);
540 }
541 buf_pool_mutex_exit(buf_pool);
542 }
543
544 mutex_exit(&mutex);
545 }
546
547
548 /** Closes the lock system at database shutdown. */
close()549 void lock_sys_t::close()
550 {
551 ut_ad(this == &lock_sys);
552
553 if (!m_initialised) return;
554
555 if (lock_latest_err_file != NULL) {
556 fclose(lock_latest_err_file);
557 lock_latest_err_file = NULL;
558 }
559
560 hash_table_free(rec_hash);
561 hash_table_free(prdt_hash);
562 hash_table_free(prdt_page_hash);
563
564 os_event_destroy(timeout_event);
565
566 mutex_destroy(&mutex);
567 mutex_destroy(&wait_mutex);
568
569 for (ulint i = srv_max_n_threads; i--; ) {
570 if (os_event_t& event = waiting_threads[i].event) {
571 os_event_destroy(event);
572 }
573 }
574
575 ut_free(waiting_threads);
576 m_initialised= false;
577 }
578
579 /*********************************************************************//**
580 Gets the size of a lock struct.
581 @return size in bytes */
582 ulint
lock_get_size(void)583 lock_get_size(void)
584 /*===============*/
585 {
586 return((ulint) sizeof(lock_t));
587 }
588
lock_grant_have_trx_mutex(lock_t * lock)589 static inline void lock_grant_have_trx_mutex(lock_t* lock)
590 {
591 lock_reset_lock_and_trx_wait(lock);
592 lock_grant_after_reset(lock);
593 }
594
595 /*********************************************************************//**
596 Gets the gap flag of a record lock.
597 @return LOCK_GAP or 0 */
598 UNIV_INLINE
599 ulint
lock_rec_get_gap(const lock_t * lock)600 lock_rec_get_gap(
601 /*=============*/
602 const lock_t* lock) /*!< in: record lock */
603 {
604 ut_ad(lock);
605 ut_ad(lock_get_type_low(lock) == LOCK_REC);
606
607 return(lock->type_mode & LOCK_GAP);
608 }
609
610 /*********************************************************************//**
611 Gets the LOCK_REC_NOT_GAP flag of a record lock.
612 @return LOCK_REC_NOT_GAP or 0 */
613 UNIV_INLINE
614 ulint
lock_rec_get_rec_not_gap(const lock_t * lock)615 lock_rec_get_rec_not_gap(
616 /*=====================*/
617 const lock_t* lock) /*!< in: record lock */
618 {
619 ut_ad(lock);
620 ut_ad(lock_get_type_low(lock) == LOCK_REC);
621
622 return(lock->type_mode & LOCK_REC_NOT_GAP);
623 }
624
625 /*********************************************************************//**
626 Gets the waiting insert flag of a record lock.
627 @return LOCK_INSERT_INTENTION or 0 */
628 UNIV_INLINE
629 ulint
lock_rec_get_insert_intention(const lock_t * lock)630 lock_rec_get_insert_intention(
631 /*==========================*/
632 const lock_t* lock) /*!< in: record lock */
633 {
634 ut_ad(lock);
635 ut_ad(lock_get_type_low(lock) == LOCK_REC);
636
637 return(lock->type_mode & LOCK_INSERT_INTENTION);
638 }
639
640 #ifdef UNIV_DEBUG
641 #ifdef WITH_WSREP
642 /** Check if both conflicting lock transaction and other transaction
643 requesting record lock are brute force (BF). If they are check is
644 this BF-BF wait correct and if not report BF wait and assert.
645
646 @param[in] lock_rec other waiting record lock
647 @param[in] trx trx requesting conflicting record lock
648 */
wsrep_assert_no_bf_bf_wait(const lock_t * lock,const trx_t * trx)649 static void wsrep_assert_no_bf_bf_wait(const lock_t *lock, const trx_t *trx)
650 {
651 ut_ad(lock_get_type_low(lock) == LOCK_REC);
652 ut_ad(lock_mutex_own());
653 trx_t* lock_trx= lock->trx;
654
655 /* Note that we are holding lock_sys->mutex, thus we should
656 not acquire THD::LOCK_thd_data mutex below to avoid mutexing
657 order violation. */
658
659 if (!trx->is_wsrep() || !lock_trx->is_wsrep())
660 return;
661 if (UNIV_LIKELY(!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
662 || UNIV_LIKELY(!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)))
663 return;
664
665 ut_ad(trx->state == TRX_STATE_ACTIVE);
666
667 trx_mutex_enter(lock_trx);
668 const trx_state_t trx2_state= lock_trx->state;
669 trx_mutex_exit(lock_trx);
670
671 /* If transaction is already committed in memory or
672 prepared we should wait. When transaction is committed in
673 memory we held trx mutex, but not lock_sys->mutex. Therefore,
674 we could end here before transaction has time to do
675 lock_release() that is protected with lock_sys->mutex. */
676 switch (trx2_state) {
677 case TRX_STATE_COMMITTED_IN_MEMORY:
678 case TRX_STATE_PREPARED:
679 return;
680 case TRX_STATE_ACTIVE:
681 break;
682 default:
683 ut_ad("invalid state" == 0);
684 }
685
686 /* If BF - BF order is honored, i.e. trx already holding
687 record lock should be ordered before this new lock request
688 we can keep trx waiting for the lock. If conflicting
689 transaction is already aborting or rolling back for replaying
690 we can also let new transaction waiting. */
691 if (wsrep_trx_order_before(lock_trx->mysql_thd, trx->mysql_thd)
692 || wsrep_trx_is_aborting(lock_trx->mysql_thd))
693 return;
694
695 mtr_t mtr;
696
697 ib::error() << "Conflicting lock on table: "
698 << lock->index->table->name
699 << " index: "
700 << lock->index->name()
701 << " that has lock ";
702 lock_rec_print(stderr, lock, mtr);
703
704 ib::error() << "WSREP state: ";
705
706 wsrep_report_bf_lock_wait(trx->mysql_thd,
707 trx->id);
708 wsrep_report_bf_lock_wait(lock_trx->mysql_thd,
709 lock_trx->id);
710 /* BF-BF wait is a bug */
711 ut_error;
712 }
713 #endif /* WITH_WSREP */
714 #endif /* UNIV_DEBUG */
715
716 /*********************************************************************//**
717 Checks if a lock request for a new lock has to wait for request lock2.
718 @return TRUE if new lock has to wait for lock2 to be removed */
719 UNIV_INLINE
720 bool
lock_rec_has_to_wait(bool for_locking,const trx_t * trx,ulint type_mode,const lock_t * lock2,bool lock_is_on_supremum)721 lock_rec_has_to_wait(
722 /*=================*/
723 bool for_locking,
724 /*!< in is called locking or releasing */
725 const trx_t* trx, /*!< in: trx of new lock */
726 ulint type_mode,/*!< in: precise mode of the new lock
727 to set: LOCK_S or LOCK_X, possibly
728 ORed to LOCK_GAP or LOCK_REC_NOT_GAP,
729 LOCK_INSERT_INTENTION */
730 const lock_t* lock2, /*!< in: another record lock; NOTE that
731 it is assumed that this has a lock bit
732 set on the same record as in the new
733 lock we are setting */
734 bool lock_is_on_supremum)
735 /*!< in: TRUE if we are setting the
736 lock on the 'supremum' record of an
737 index page: we know then that the lock
738 request is really for a 'gap' type lock */
739 {
740 ut_ad(trx && lock2);
741 ut_ad(lock_get_type_low(lock2) == LOCK_REC);
742 ut_ad(lock_mutex_own());
743
744 if (trx == lock2->trx
745 || lock_mode_compatible(
746 static_cast<lock_mode>(LOCK_MODE_MASK & type_mode),
747 lock_get_mode(lock2))) {
748 return false;
749 }
750
751 /* We have somewhat complex rules when gap type record locks
752 cause waits */
753
754 if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
755 && !(type_mode & LOCK_INSERT_INTENTION)) {
756
757 /* Gap type locks without LOCK_INSERT_INTENTION flag
758 do not need to wait for anything. This is because
759 different users can have conflicting lock types
760 on gaps. */
761
762 return false;
763 }
764
765 if (!(type_mode & LOCK_INSERT_INTENTION) && lock_rec_get_gap(lock2)) {
766
767 /* Record lock (LOCK_ORDINARY or LOCK_REC_NOT_GAP
768 does not need to wait for a gap type lock */
769
770 return false;
771 }
772
773 if ((type_mode & LOCK_GAP) && lock_rec_get_rec_not_gap(lock2)) {
774
775 /* Lock on gap does not need to wait for
776 a LOCK_REC_NOT_GAP type lock */
777
778 return false;
779 }
780
781 if (lock_rec_get_insert_intention(lock2)) {
782
783 /* No lock request needs to wait for an insert
784 intention lock to be removed. This is ok since our
785 rules allow conflicting locks on gaps. This eliminates
786 a spurious deadlock caused by a next-key lock waiting
787 for an insert intention lock; when the insert
788 intention lock was granted, the insert deadlocked on
789 the waiting next-key lock.
790
791 Also, insert intention locks do not disturb each
792 other. */
793
794 return false;
795 }
796
797 if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2))
798 && !thd_need_ordering_with(trx->mysql_thd, lock2->trx->mysql_thd)) {
799 /* If the upper server layer has already decided on the
800 commit order between the transaction requesting the
801 lock and the transaction owning the lock, we do not
802 need to wait for gap locks. Such ordeering by the upper
803 server layer happens in parallel replication, where the
804 commit order is fixed to match the original order on the
805 master.
806
807 Such gap locks are mainly needed to get serialisability
808 between transactions so that they will be binlogged in
809 the correct order so that statement-based replication
810 will give the correct results. Since the right order
811 was already determined on the master, we do not need
812 to enforce it again here.
813
814 Skipping the locks is not essential for correctness,
815 since in case of deadlock we will just kill the later
816 transaction and retry it. But it can save some
817 unnecessary rollbacks and retries. */
818
819 return false;
820 }
821
822 #ifdef WITH_WSREP
823 /* New lock request from a transaction is using unique key
824 scan and this transaction is a wsrep high priority transaction
825 (brute force). If conflicting transaction is also wsrep high
826 priority transaction we should avoid lock conflict because
827 ordering of these transactions is already decided and
828 conflicting transaction will be later replayed. Note
829 that thread holding conflicting lock can't be
830 committed or rolled back while we hold
831 lock_sys->mutex. */
832 if (trx->is_wsrep_UK_scan()
833 && wsrep_thd_is_BF(lock2->trx->mysql_thd, false)) {
834 return false;
835 }
836
837 /* We very well can let bf to wait normally as other
838 BF will be replayed in case of conflict. For debug
839 builds we will do additional sanity checks to catch
840 unsupported bf wait if any. */
841 ut_d(wsrep_assert_no_bf_bf_wait(lock2, trx));
842 #endif /* WITH_WSREP */
843
844 return true;
845 }
846
847 /*********************************************************************//**
848 Checks if a lock request lock1 has to wait for request lock2.
849 @return TRUE if lock1 has to wait for lock2 to be removed */
850 bool
lock_has_to_wait(const lock_t * lock1,const lock_t * lock2)851 lock_has_to_wait(
852 /*=============*/
853 const lock_t* lock1, /*!< in: waiting lock */
854 const lock_t* lock2) /*!< in: another lock; NOTE that it is
855 assumed that this has a lock bit set
856 on the same record as in lock1 if the
857 locks are record locks */
858 {
859 ut_ad(lock1 && lock2);
860
861 if (lock1->trx == lock2->trx
862 || lock_mode_compatible(lock_get_mode(lock1),
863 lock_get_mode(lock2))) {
864 return false;
865 }
866
867 if (lock_get_type_low(lock1) != LOCK_REC) {
868 return true;
869 }
870
871 ut_ad(lock_get_type_low(lock2) == LOCK_REC);
872
873 if (lock1->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
874 return lock_prdt_has_to_wait(lock1->trx, lock1->type_mode,
875 lock_get_prdt_from_lock(lock1),
876 lock2);
877 }
878
879 return lock_rec_has_to_wait(
880 false, lock1->trx, lock1->type_mode, lock2,
881 lock_rec_get_nth_bit(lock1, PAGE_HEAP_NO_SUPREMUM));
882 }
883
884 /*============== RECORD LOCK BASIC FUNCTIONS ============================*/
885
886 /**********************************************************************//**
887 Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
888 if none found.
889 @return bit index == heap number of the record, or ULINT_UNDEFINED if
890 none found */
891 ulint
lock_rec_find_set_bit(const lock_t * lock)892 lock_rec_find_set_bit(
893 /*==================*/
894 const lock_t* lock) /*!< in: record lock with at least one bit set */
895 {
896 for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
897
898 if (lock_rec_get_nth_bit(lock, i)) {
899
900 return(i);
901 }
902 }
903
904 return(ULINT_UNDEFINED);
905 }
906
907 /*********************************************************************//**
908 Determines if there are explicit record locks on a page.
909 @return an explicit record lock on the page, or NULL if there are none */
910 lock_t*
lock_rec_expl_exist_on_page(ulint space,ulint page_no)911 lock_rec_expl_exist_on_page(
912 /*========================*/
913 ulint space, /*!< in: space id */
914 ulint page_no)/*!< in: page number */
915 {
916 lock_t* lock;
917
918 lock_mutex_enter();
919 /* Only used in ibuf pages, so rec_hash is good enough */
920 lock = lock_rec_get_first_on_page_addr(lock_sys.rec_hash,
921 space, page_no);
922 lock_mutex_exit();
923
924 return(lock);
925 }
926
927 /*********************************************************************//**
928 Resets the record lock bitmap to zero. NOTE: does not touch the wait_lock
929 pointer in the transaction! This function is used in lock object creation
930 and resetting. */
931 static
932 void
lock_rec_bitmap_reset(lock_t * lock)933 lock_rec_bitmap_reset(
934 /*==================*/
935 lock_t* lock) /*!< in: record lock */
936 {
937 ulint n_bytes;
938
939 ut_ad(lock_get_type_low(lock) == LOCK_REC);
940
941 /* Reset to zero the bitmap which resides immediately after the lock
942 struct */
943
944 n_bytes = lock_rec_get_n_bits(lock) / 8;
945
946 ut_ad((lock_rec_get_n_bits(lock) % 8) == 0);
947
948 memset(&lock[1], 0, n_bytes);
949 }
950
951 /*********************************************************************//**
952 Copies a record lock to heap.
953 @return copy of lock */
954 static
955 lock_t*
lock_rec_copy(const lock_t * lock,mem_heap_t * heap)956 lock_rec_copy(
957 /*==========*/
958 const lock_t* lock, /*!< in: record lock */
959 mem_heap_t* heap) /*!< in: memory heap */
960 {
961 ulint size;
962
963 ut_ad(lock_get_type_low(lock) == LOCK_REC);
964
965 size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;
966
967 return(static_cast<lock_t*>(mem_heap_dup(heap, lock, size)));
968 }
969
970 /*********************************************************************//**
971 Gets the previous record lock set on a record.
972 @return previous lock on the same record, NULL if none exists */
973 const lock_t*
lock_rec_get_prev(const lock_t * in_lock,ulint heap_no)974 lock_rec_get_prev(
975 /*==============*/
976 const lock_t* in_lock,/*!< in: record lock */
977 ulint heap_no)/*!< in: heap number of the record */
978 {
979 lock_t* lock;
980 ulint space;
981 ulint page_no;
982 lock_t* found_lock = NULL;
983 hash_table_t* hash;
984
985 ut_ad(lock_mutex_own());
986 ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
987
988 space = in_lock->un_member.rec_lock.space;
989 page_no = in_lock->un_member.rec_lock.page_no;
990
991 hash = lock_hash_get(in_lock->type_mode);
992
993 for (lock = lock_rec_get_first_on_page_addr(hash, space, page_no);
994 /* No op */;
995 lock = lock_rec_get_next_on_page(lock)) {
996
997 ut_ad(lock);
998
999 if (lock == in_lock) {
1000
1001 return(found_lock);
1002 }
1003
1004 if (lock_rec_get_nth_bit(lock, heap_no)) {
1005
1006 found_lock = lock;
1007 }
1008 }
1009 }
1010
1011 /*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/
1012
1013 /*********************************************************************//**
1014 Checks if a transaction has a GRANTED explicit lock on rec stronger or equal
1015 to precise_mode.
1016 @return lock or NULL */
1017 UNIV_INLINE
1018 lock_t*
lock_rec_has_expl(ulint precise_mode,const buf_block_t * block,ulint heap_no,const trx_t * trx)1019 lock_rec_has_expl(
1020 /*==============*/
1021 ulint precise_mode,/*!< in: LOCK_S or LOCK_X
1022 possibly ORed to LOCK_GAP or
1023 LOCK_REC_NOT_GAP, for a
1024 supremum record we regard this
1025 always a gap type request */
1026 const buf_block_t* block, /*!< in: buffer block containing
1027 the record */
1028 ulint heap_no,/*!< in: heap number of the record */
1029 const trx_t* trx) /*!< in: transaction */
1030 {
1031 lock_t* lock;
1032
1033 ut_ad(lock_mutex_own());
1034 ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
1035 || (precise_mode & LOCK_MODE_MASK) == LOCK_X);
1036 ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
1037
1038 for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
1039 lock != NULL;
1040 lock = lock_rec_get_next(heap_no, lock)) {
1041
1042 if (lock->trx == trx
1043 && !lock_rec_get_insert_intention(lock)
1044 && lock_mode_stronger_or_eq(
1045 lock_get_mode(lock),
1046 static_cast<lock_mode>(
1047 precise_mode & LOCK_MODE_MASK))
1048 && !lock_get_wait(lock)
1049 && (!lock_rec_get_rec_not_gap(lock)
1050 || (precise_mode & LOCK_REC_NOT_GAP)
1051 || heap_no == PAGE_HEAP_NO_SUPREMUM)
1052 && (!lock_rec_get_gap(lock)
1053 || (precise_mode & LOCK_GAP)
1054 || heap_no == PAGE_HEAP_NO_SUPREMUM)) {
1055
1056 return(lock);
1057 }
1058 }
1059
1060 return(NULL);
1061 }
1062
1063 #ifdef UNIV_DEBUG
1064 /*********************************************************************//**
1065 Checks if some other transaction has a lock request in the queue.
1066 @return lock or NULL */
1067 static
1068 lock_t*
lock_rec_other_has_expl_req(lock_mode mode,const buf_block_t * block,bool wait,ulint heap_no,const trx_t * trx)1069 lock_rec_other_has_expl_req(
1070 /*========================*/
1071 lock_mode mode, /*!< in: LOCK_S or LOCK_X */
1072 const buf_block_t* block, /*!< in: buffer block containing
1073 the record */
1074 bool wait, /*!< in: whether also waiting locks
1075 are taken into account */
1076 ulint heap_no,/*!< in: heap number of the record */
1077 const trx_t* trx) /*!< in: transaction, or NULL if
1078 requests by all transactions
1079 are taken into account */
1080 {
1081
1082 ut_ad(lock_mutex_own());
1083 ut_ad(mode == LOCK_X || mode == LOCK_S);
1084
1085 /* Only GAP lock can be on SUPREMUM, and we are not looking for
1086 GAP lock */
1087 if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
1088 return(NULL);
1089 }
1090
1091 for (lock_t* lock = lock_rec_get_first(lock_sys.rec_hash,
1092 block, heap_no);
1093 lock != NULL;
1094 lock = lock_rec_get_next(heap_no, lock)) {
1095
1096 if (lock->trx != trx
1097 && !lock_rec_get_gap(lock)
1098 && (wait || !lock_get_wait(lock))
1099 && lock_mode_stronger_or_eq(lock_get_mode(lock), mode)) {
1100
1101 return(lock);
1102 }
1103 }
1104
1105 return(NULL);
1106 }
1107 #endif /* UNIV_DEBUG */
1108
1109 #ifdef WITH_WSREP
wsrep_kill_victim(const trx_t * const trx,const lock_t * lock)1110 static void wsrep_kill_victim(const trx_t * const trx, const lock_t *lock)
1111 {
1112 ut_ad(lock_mutex_own());
1113 ut_ad(trx->is_wsrep());
1114 trx_t* lock_trx = lock->trx;
1115 ut_ad(trx_mutex_own(lock_trx));
1116 ut_ad(lock_trx != trx);
1117
1118 if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
1119 return;
1120
1121 if (lock_trx->state == TRX_STATE_COMMITTED_IN_MEMORY
1122 || lock_trx->lock.was_chosen_as_deadlock_victim)
1123 return;
1124
1125 my_bool bf_other = wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE);
1126
1127 if (!bf_other
1128 || wsrep_trx_order_before(trx->mysql_thd,
1129 lock_trx->mysql_thd)) {
1130
1131 if (lock_trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
1132 if (UNIV_UNLIKELY(wsrep_debug))
1133 WSREP_INFO("BF victim waiting");
1134 /* cannot release lock, until our lock
1135 is in the queue*/
1136 } else {
1137 wsrep_innobase_kill_one_trx(trx->mysql_thd, trx,
1138 lock_trx, true);
1139 }
1140 }
1141 }
1142 #endif /* WITH_WSREP */
1143
1144 /*********************************************************************//**
1145 Checks if some other transaction has a conflicting explicit lock request
1146 in the queue, so that we have to wait.
1147 @return lock or NULL */
1148 static
1149 lock_t*
lock_rec_other_has_conflicting(ulint mode,const buf_block_t * block,ulint heap_no,const trx_t * trx)1150 lock_rec_other_has_conflicting(
1151 /*===========================*/
1152 ulint mode, /*!< in: LOCK_S or LOCK_X,
1153 possibly ORed to LOCK_GAP or
1154 LOC_REC_NOT_GAP,
1155 LOCK_INSERT_INTENTION */
1156 const buf_block_t* block, /*!< in: buffer block containing
1157 the record */
1158 ulint heap_no,/*!< in: heap number of the record */
1159 const trx_t* trx) /*!< in: our transaction */
1160 {
1161 lock_t* lock;
1162
1163 ut_ad(lock_mutex_own());
1164
1165 bool is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
1166
1167 for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
1168 lock != NULL;
1169 lock = lock_rec_get_next(heap_no, lock)) {
1170
1171 if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
1172 #ifdef WITH_WSREP
1173 if (trx->is_wsrep()) {
1174 trx_mutex_enter(lock->trx);
1175 /* Below function will roll back either trx
1176 or lock->trx depending on priority of the
1177 transaction. */
1178 wsrep_kill_victim(const_cast<trx_t*>(trx), lock);
1179 trx_mutex_exit(lock->trx);
1180 }
1181 #endif /* WITH_WSREP */
1182 return(lock);
1183 }
1184 }
1185
1186 return(NULL);
1187 }
1188
1189 /*********************************************************************//**
1190 Checks if some transaction has an implicit x-lock on a record in a secondary
1191 index.
1192 @return transaction id of the transaction which has the x-lock, or 0;
1193 NOTE that this function can return false positives but never false
1194 negatives. The caller must confirm all positive results by calling
1195 trx_is_active(). */
1196 static
1197 trx_t*
lock_sec_rec_some_has_impl(trx_t * caller_trx,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)1198 lock_sec_rec_some_has_impl(
1199 /*=======================*/
1200 trx_t* caller_trx,/*!<in/out: trx of current thread */
1201 const rec_t* rec, /*!< in: user record */
1202 dict_index_t* index, /*!< in: secondary index */
1203 const rec_offs* offsets)/*!< in: rec_get_offsets(rec, index) */
1204 {
1205 trx_t* trx;
1206 trx_id_t max_trx_id;
1207 const page_t* page = page_align(rec);
1208
1209 ut_ad(!lock_mutex_own());
1210 ut_ad(!dict_index_is_clust(index));
1211 ut_ad(page_rec_is_user_rec(rec));
1212 ut_ad(rec_offs_validate(rec, index, offsets));
1213 ut_ad(!rec_is_metadata(rec, index));
1214
1215 max_trx_id = page_get_max_trx_id(page);
1216
1217 /* Some transaction may have an implicit x-lock on the record only
1218 if the max trx id for the page >= min trx id for the trx list, or
1219 database recovery is running. */
1220
1221 if (max_trx_id < trx_sys.get_min_trx_id()) {
1222
1223 trx = 0;
1224
1225 } else if (!lock_check_trx_id_sanity(max_trx_id, rec, index, offsets)) {
1226
1227 /* The page is corrupt: try to avoid a crash by returning 0 */
1228 trx = 0;
1229
1230 /* In this case it is possible that some transaction has an implicit
1231 x-lock. We have to look in the clustered index. */
1232
1233 } else {
1234 trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
1235 }
1236
1237 return(trx);
1238 }
1239
1240 /*********************************************************************//**
1241 Return approximate number or record locks (bits set in the bitmap) for
1242 this transaction. Since delete-marked records may be removed, the
1243 record count will not be precise.
1244 The caller must be holding lock_sys.mutex. */
1245 ulint
lock_number_of_rows_locked(const trx_lock_t * trx_lock)1246 lock_number_of_rows_locked(
1247 /*=======================*/
1248 const trx_lock_t* trx_lock) /*!< in: transaction locks */
1249 {
1250 ut_ad(lock_mutex_own());
1251
1252 return(trx_lock->n_rec_locks);
1253 }
1254
1255 /*********************************************************************//**
1256 Return the number of table locks for a transaction.
1257 The caller must be holding lock_sys.mutex. */
1258 ulint
lock_number_of_tables_locked(const trx_lock_t * trx_lock)1259 lock_number_of_tables_locked(
1260 /*=========================*/
1261 const trx_lock_t* trx_lock) /*!< in: transaction locks */
1262 {
1263 const lock_t* lock;
1264 ulint n_tables = 0;
1265
1266 ut_ad(lock_mutex_own());
1267
1268 for (lock = UT_LIST_GET_FIRST(trx_lock->trx_locks);
1269 lock != NULL;
1270 lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
1271
1272 if (lock_get_type_low(lock) == LOCK_TABLE) {
1273 n_tables++;
1274 }
1275 }
1276
1277 return(n_tables);
1278 }
1279
1280 /*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/
1281
1282 #ifdef WITH_WSREP
1283 ATTRIBUTE_COLD
1284 static
1285 void
wsrep_print_wait_locks(lock_t * c_lock)1286 wsrep_print_wait_locks(
1287 /*===================*/
1288 lock_t* c_lock) /* conflicting lock to print */
1289 {
1290 if (c_lock->trx->lock.wait_lock != c_lock) {
1291 mtr_t mtr;
1292 ib::info() << "WSREP: c_lock != wait lock";
1293 ib::info() << " SQL: "
1294 << wsrep_thd_query(c_lock->trx->mysql_thd);
1295
1296 if (lock_get_type_low(c_lock) & LOCK_TABLE) {
1297 lock_table_print(stderr, c_lock);
1298 } else {
1299 lock_rec_print(stderr, c_lock, mtr);
1300 }
1301
1302 if (lock_get_type_low(c_lock->trx->lock.wait_lock) & LOCK_TABLE) {
1303 lock_table_print(stderr, c_lock->trx->lock.wait_lock);
1304 } else {
1305 lock_rec_print(stderr, c_lock->trx->lock.wait_lock,
1306 mtr);
1307 }
1308 }
1309 }
1310 #endif /* WITH_WSREP */
1311
1312 #ifdef UNIV_DEBUG
1313 /** Check transaction state */
check_trx_state(const trx_t * trx)1314 static void check_trx_state(const trx_t *trx)
1315 {
1316 ut_ad(!trx->auto_commit || trx->will_lock);
1317 const trx_state_t state= trx->state;
1318 ut_ad(state == TRX_STATE_ACTIVE ||
1319 state == TRX_STATE_PREPARED_RECOVERED ||
1320 state == TRX_STATE_PREPARED ||
1321 state == TRX_STATE_COMMITTED_IN_MEMORY);
1322 }
1323 #endif
1324
1325 /** Create a new record lock and inserts it to the lock queue,
1326 without checking for deadlocks or conflicts.
1327 @param[in] type_mode lock mode and wait flag; type will be replaced
1328 with LOCK_REC
1329 @param[in] space tablespace id
1330 @param[in] page_no index page number
1331 @param[in] page R-tree index page, or NULL
1332 @param[in] heap_no record heap number in the index page
1333 @param[in] index the index tree
1334 @param[in,out] trx transaction
1335 @param[in] holds_trx_mutex whether the caller holds trx->mutex
1336 @return created lock */
1337 lock_t*
lock_rec_create_low(lock_t * c_lock,que_thr_t * thr,ulint type_mode,ulint space,ulint page_no,const page_t * page,ulint heap_no,dict_index_t * index,trx_t * trx,bool holds_trx_mutex)1338 lock_rec_create_low(
1339 #ifdef WITH_WSREP
1340 lock_t* c_lock, /*!< conflicting lock */
1341 que_thr_t* thr, /*!< thread owning trx */
1342 #endif
1343 ulint type_mode,
1344 ulint space,
1345 ulint page_no,
1346 const page_t* page,
1347 ulint heap_no,
1348 dict_index_t* index,
1349 trx_t* trx,
1350 bool holds_trx_mutex)
1351 {
1352 lock_t* lock;
1353 ulint n_bits;
1354 ulint n_bytes;
1355
1356 ut_ad(lock_mutex_own());
1357 ut_ad(holds_trx_mutex == trx_mutex_own(trx));
1358 ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1359
1360 #ifdef UNIV_DEBUG
1361 /* Non-locking autocommit read-only transactions should not set
1362 any locks. See comment in trx_set_rw_mode explaining why this
1363 conditional check is required in debug code. */
1364 if (holds_trx_mutex) {
1365 check_trx_state(trx);
1366 }
1367 #endif /* UNIV_DEBUG */
1368
1369 /* If rec is the supremum record, then we reset the gap and
1370 LOCK_REC_NOT_GAP bits, as all locks on the supremum are
1371 automatically of the gap type */
1372
1373 if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
1374 ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
1375 type_mode = type_mode & ~(LOCK_GAP | LOCK_REC_NOT_GAP);
1376 }
1377
1378 if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
1379 /* Make lock bitmap bigger by a safety margin */
1380 n_bits = page_dir_get_n_heap(page) + LOCK_PAGE_BITMAP_MARGIN;
1381 n_bytes = 1 + n_bits / 8;
1382 } else {
1383 ut_ad(heap_no == PRDT_HEAPNO);
1384
1385 /* The lock is always on PAGE_HEAP_NO_INFIMUM (0), so
1386 we only need 1 bit (which round up to 1 byte) for
1387 lock bit setting */
1388 n_bytes = 1;
1389
1390 if (type_mode & LOCK_PREDICATE) {
1391 ulint tmp = UNIV_WORD_SIZE - 1;
1392
1393 /* We will attach predicate structure after lock.
1394 Make sure the memory is aligned on 8 bytes,
1395 the mem_heap_alloc will align it with
1396 MEM_SPACE_NEEDED anyway. */
1397 n_bytes = (n_bytes + sizeof(lock_prdt_t) + tmp) & ~tmp;
1398 ut_ad(n_bytes == sizeof(lock_prdt_t) + UNIV_WORD_SIZE);
1399 }
1400 }
1401
1402 if (trx->lock.rec_cached >= UT_ARR_SIZE(trx->lock.rec_pool)
1403 || sizeof *lock + n_bytes > sizeof *trx->lock.rec_pool) {
1404 lock = static_cast<lock_t*>(
1405 mem_heap_alloc(trx->lock.lock_heap,
1406 sizeof *lock + n_bytes));
1407 } else {
1408 lock = &trx->lock.rec_pool[trx->lock.rec_cached++].lock;
1409 }
1410
1411 lock->trx = trx;
1412 lock->type_mode = (type_mode & ~LOCK_TYPE_MASK) | LOCK_REC;
1413 lock->index = index;
1414 lock->un_member.rec_lock.space = uint32_t(space);
1415 lock->un_member.rec_lock.page_no = uint32_t(page_no);
1416
1417 if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
1418 lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
1419 } else {
1420 /* Predicate lock always on INFIMUM (0) */
1421 lock->un_member.rec_lock.n_bits = 8;
1422 }
1423 lock_rec_bitmap_reset(lock);
1424 lock_rec_set_nth_bit(lock, heap_no);
1425 index->table->n_rec_locks++;
1426 ut_ad(index->table->get_ref_count() > 0 || !index->table->can_be_evicted);
1427
1428 #ifdef WITH_WSREP
1429 if (c_lock && trx->is_wsrep()
1430 && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
1431 lock_t *hash = (lock_t *)c_lock->hash;
1432 lock_t *prev = NULL;
1433
1434 while (hash && wsrep_thd_is_BF(hash->trx->mysql_thd, FALSE)
1435 && wsrep_trx_order_before(hash->trx->mysql_thd,
1436 trx->mysql_thd)) {
1437 prev = hash;
1438 hash = (lock_t *)hash->hash;
1439 }
1440 lock->hash = hash;
1441 if (prev) {
1442 prev->hash = lock;
1443 } else {
1444 c_lock->hash = lock;
1445 }
1446 /*
1447 * delayed conflict resolution '...kill_one_trx' was not called,
1448 * if victim was waiting for some other lock
1449 */
1450 trx_mutex_enter(c_lock->trx);
1451 if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
1452
1453 c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
1454
1455 if (UNIV_UNLIKELY(wsrep_debug)) {
1456 wsrep_print_wait_locks(c_lock);
1457 }
1458
1459 trx->lock.que_state = TRX_QUE_LOCK_WAIT;
1460 lock_set_lock_and_trx_wait(lock, trx);
1461 UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
1462
1463 trx->lock.wait_thr = thr;
1464 thr->state = QUE_THR_LOCK_WAIT;
1465
1466 /* have to release trx mutex for the duration of
1467 victim lock release. This will eventually call
1468 lock_grant, which wants to grant trx mutex again
1469 */
1470 if (holds_trx_mutex) {
1471 trx_mutex_exit(trx);
1472 }
1473 lock_cancel_waiting_and_release(
1474 c_lock->trx->lock.wait_lock);
1475
1476 if (holds_trx_mutex) {
1477 trx_mutex_enter(trx);
1478 }
1479
1480 trx_mutex_exit(c_lock->trx);
1481
1482 /* have to bail out here to avoid lock_set_lock... */
1483 return(lock);
1484 }
1485 trx_mutex_exit(c_lock->trx);
1486 } else
1487 #endif /* WITH_WSREP */
1488 if (!(type_mode & (LOCK_WAIT | LOCK_PREDICATE | LOCK_PRDT_PAGE))
1489 && innodb_lock_schedule_algorithm
1490 == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
1491 && !thd_is_replication_slave_thread(trx->mysql_thd)) {
1492 HASH_PREPEND(lock_t, hash, lock_sys.rec_hash,
1493 lock_rec_fold(space, page_no), lock);
1494 } else {
1495 HASH_INSERT(lock_t, hash, lock_hash_get(type_mode),
1496 lock_rec_fold(space, page_no), lock);
1497 }
1498
1499 if (!holds_trx_mutex) {
1500 trx_mutex_enter(trx);
1501 }
1502 ut_ad(trx_mutex_own(trx));
1503 if (type_mode & LOCK_WAIT) {
1504 lock_set_lock_and_trx_wait(lock, trx);
1505 }
1506 UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
1507 if (!holds_trx_mutex) {
1508 trx_mutex_exit(trx);
1509 }
1510 MONITOR_INC(MONITOR_RECLOCK_CREATED);
1511 MONITOR_INC(MONITOR_NUM_RECLOCK);
1512
1513 return lock;
1514 }
1515
1516 /*********************************************************************//**
1517 Check if lock1 has higher priority than lock2.
1518 NULL has lowest priority.
1519 If neither of them is wait lock, the first one has higher priority.
1520 If only one of them is a wait lock, it has lower priority.
1521 If either is a high priority transaction, the lock has higher priority.
1522 Otherwise, the one with an older transaction has higher priority.
1523 @returns true if lock1 has higher priority, false otherwise. */
has_higher_priority(lock_t * lock1,lock_t * lock2)1524 static bool has_higher_priority(lock_t *lock1, lock_t *lock2)
1525 {
1526 if (lock1 == NULL) {
1527 return false;
1528 } else if (lock2 == NULL) {
1529 return true;
1530 }
1531 // Granted locks has higher priority.
1532 if (!lock_get_wait(lock1)) {
1533 return true;
1534 } else if (!lock_get_wait(lock2)) {
1535 return false;
1536 }
1537 return lock1->trx->start_time_micro <= lock2->trx->start_time_micro;
1538 }
1539
1540 /*********************************************************************//**
1541 Insert a lock to the hash list according to the mode (whether it is a wait
1542 lock) and the age of the transaction the it is associated with.
1543 If the lock is not a wait lock, insert it to the head of the hash list.
1544 Otherwise, insert it to the middle of the wait locks according to the age of
1545 the transaciton. */
1546 static
1547 dberr_t
lock_rec_insert_by_trx_age(lock_t * in_lock)1548 lock_rec_insert_by_trx_age(
1549 lock_t *in_lock) /*!< in: lock to be insert */{
1550 ulint space;
1551 ulint page_no;
1552 ulint rec_fold;
1553 lock_t* node;
1554 lock_t* next;
1555 hash_table_t* hash;
1556 hash_cell_t* cell;
1557
1558 ut_ad(!in_lock->trx->is_wsrep());
1559 space = in_lock->un_member.rec_lock.space;
1560 page_no = in_lock->un_member.rec_lock.page_no;
1561 rec_fold = lock_rec_fold(space, page_no);
1562 hash = lock_hash_get(in_lock->type_mode);
1563 cell = hash_get_nth_cell(hash,
1564 hash_calc_hash(rec_fold, hash));
1565
1566 node = (lock_t *) cell->node;
1567 // If in_lock is not a wait lock, we insert it to the head of the list.
1568 if (node == NULL || !lock_get_wait(in_lock) || has_higher_priority(in_lock, node)) {
1569 cell->node = in_lock;
1570 in_lock->hash = node;
1571 if (lock_get_wait(in_lock)) {
1572 lock_grant_have_trx_mutex(in_lock);
1573 return DB_SUCCESS_LOCKED_REC;
1574 }
1575 return DB_SUCCESS;
1576 }
1577 while (node != NULL && has_higher_priority((lock_t *) node->hash,
1578 in_lock)) {
1579 node = (lock_t *) node->hash;
1580 }
1581 next = (lock_t *) node->hash;
1582 node->hash = in_lock;
1583 in_lock->hash = next;
1584
1585 if (lock_get_wait(in_lock) && !lock_rec_has_to_wait_in_queue(in_lock)) {
1586 lock_grant_have_trx_mutex(in_lock);
1587 if (cell->node != in_lock) {
1588 // Move it to the front of the queue
1589 node->hash = in_lock->hash;
1590 next = (lock_t *) cell->node;
1591 cell->node = in_lock;
1592 in_lock->hash = next;
1593 }
1594 return DB_SUCCESS_LOCKED_REC;
1595 }
1596
1597 return DB_SUCCESS;
1598 }
1599
1600 #ifdef UNIV_DEBUG
1601 static
1602 bool
lock_queue_validate(const lock_t * in_lock)1603 lock_queue_validate(
1604 const lock_t *in_lock) /*!< in: lock whose hash list is to be validated */
1605 {
1606 ulint space;
1607 ulint page_no;
1608 ulint rec_fold;
1609 hash_table_t* hash;
1610 hash_cell_t* cell;
1611 lock_t* next;
1612 bool wait_lock __attribute__((unused))= false;
1613
1614 if (in_lock == NULL) {
1615 return true;
1616 }
1617
1618 space = in_lock->un_member.rec_lock.space;
1619 page_no = in_lock->un_member.rec_lock.page_no;
1620 rec_fold = lock_rec_fold(space, page_no);
1621 hash = lock_hash_get(in_lock->type_mode);
1622 cell = hash_get_nth_cell(hash,
1623 hash_calc_hash(rec_fold, hash));
1624 next = (lock_t *) cell->node;
1625 while (next != NULL) {
1626 // If this is a granted lock, check that there's no wait lock before it.
1627 if (!lock_get_wait(next)) {
1628 ut_ad(!wait_lock);
1629 } else {
1630 wait_lock = true;
1631 }
1632 next = next->hash;
1633 }
1634 return true;
1635 }
1636 #endif /* UNIV_DEBUG */
1637
1638 static
1639 void
lock_rec_insert_to_head(lock_t * in_lock,ulint rec_fold)1640 lock_rec_insert_to_head(
1641 lock_t *in_lock, /*!< in: lock to be insert */
1642 ulint rec_fold) /*!< in: rec_fold of the page */
1643 {
1644 hash_table_t* hash;
1645 hash_cell_t* cell;
1646 lock_t* node;
1647
1648 if (in_lock == NULL) {
1649 return;
1650 }
1651
1652 hash = lock_hash_get(in_lock->type_mode);
1653 cell = hash_get_nth_cell(hash,
1654 hash_calc_hash(rec_fold, hash));
1655 node = (lock_t *) cell->node;
1656 if (node != in_lock) {
1657 cell->node = in_lock;
1658 in_lock->hash = node;
1659 }
1660 }
1661
1662 /** Enqueue a waiting request for a lock which cannot be granted immediately.
1663 Check for deadlocks.
1664 @param[in] type_mode the requested lock mode (LOCK_S or LOCK_X)
1665 possibly ORed with LOCK_GAP or
1666 LOCK_REC_NOT_GAP, ORed with
1667 LOCK_INSERT_INTENTION if this
1668 waiting lock request is set
1669 when performing an insert of
1670 an index record
1671 @param[in] block leaf page in the index
1672 @param[in] heap_no record heap number in the block
1673 @param[in] index index tree
1674 @param[in,out] thr query thread
1675 @param[in] prdt minimum bounding box (spatial index)
1676 @retval DB_LOCK_WAIT if the waiting lock was enqueued
1677 @retval DB_DEADLOCK if this transaction was chosen as the victim
1678 @retval DB_SUCCESS_LOCKED_REC if the other transaction was chosen as a victim
1679 (or it happened to commit) */
1680 dberr_t
lock_rec_enqueue_waiting(lock_t * c_lock,ulint type_mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,que_thr_t * thr,lock_prdt_t * prdt)1681 lock_rec_enqueue_waiting(
1682 #ifdef WITH_WSREP
1683 lock_t* c_lock, /*!< conflicting lock */
1684 #endif
1685 ulint type_mode,
1686 const buf_block_t* block,
1687 ulint heap_no,
1688 dict_index_t* index,
1689 que_thr_t* thr,
1690 lock_prdt_t* prdt)
1691 {
1692 ut_ad(lock_mutex_own());
1693 ut_ad(!srv_read_only_mode);
1694 ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1695
1696 trx_t* trx = thr_get_trx(thr);
1697
1698 ut_ad(trx_mutex_own(trx));
1699 ut_a(!que_thr_stop(thr));
1700
1701 switch (trx_get_dict_operation(trx)) {
1702 case TRX_DICT_OP_NONE:
1703 break;
1704 case TRX_DICT_OP_TABLE:
1705 case TRX_DICT_OP_INDEX:
1706 ib::error() << "A record lock wait happens in a dictionary"
1707 " operation. index "
1708 << index->name
1709 << " of table "
1710 << index->table->name
1711 << ". " << BUG_REPORT_MSG;
1712 ut_ad(0);
1713 }
1714
1715 if (trx->mysql_thd && thd_lock_wait_timeout(trx->mysql_thd) == 0) {
1716 trx->error_state = DB_LOCK_WAIT_TIMEOUT;
1717 return DB_LOCK_WAIT_TIMEOUT;
1718 }
1719
1720 /* Enqueue the lock request that will wait to be granted, note that
1721 we already own the trx mutex. */
1722 lock_t* lock = lock_rec_create(
1723 #ifdef WITH_WSREP
1724 c_lock, thr,
1725 #endif
1726 type_mode | LOCK_WAIT, block, heap_no, index, trx, TRUE);
1727
1728 if (prdt && type_mode & LOCK_PREDICATE) {
1729 lock_prdt_set_prdt(lock, prdt);
1730 }
1731
1732 if (ut_d(const trx_t* victim =)
1733 DeadlockChecker::check_and_resolve(lock, trx)) {
1734 ut_ad(victim == trx);
1735 lock_reset_lock_and_trx_wait(lock);
1736 lock_rec_reset_nth_bit(lock, heap_no);
1737 return DB_DEADLOCK;
1738 }
1739
1740 if (!trx->lock.wait_lock) {
1741 /* If there was a deadlock but we chose another
1742 transaction as a victim, it is possible that we
1743 already have the lock now granted! */
1744 #ifdef WITH_WSREP
1745 if (UNIV_UNLIKELY(wsrep_debug)) {
1746 ib::info() << "WSREP: BF thread got lock granted early, ID " << ib::hex(trx->id)
1747 << " query: " << wsrep_thd_query(trx->mysql_thd);
1748 }
1749 #endif
1750 return DB_SUCCESS_LOCKED_REC;
1751 }
1752
1753 trx->lock.que_state = TRX_QUE_LOCK_WAIT;
1754
1755 trx->lock.was_chosen_as_deadlock_victim = false;
1756 trx->lock.wait_started = time(NULL);
1757
1758 ut_a(que_thr_stop(thr));
1759
1760 DBUG_LOG("ib_lock", "trx " << ib::hex(trx->id)
1761 << " waits for lock in index " << index->name
1762 << " of table " << index->table->name);
1763
1764 MONITOR_INC(MONITOR_LOCKREC_WAIT);
1765
1766 if (innodb_lock_schedule_algorithm
1767 == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
1768 && !prdt
1769 && !thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
1770 HASH_DELETE(lock_t, hash, lock_sys.rec_hash,
1771 lock_rec_lock_fold(lock), lock);
1772 dberr_t res = lock_rec_insert_by_trx_age(lock);
1773 if (res != DB_SUCCESS) {
1774 return res;
1775 }
1776 }
1777
1778 return DB_LOCK_WAIT;
1779 }
1780
1781 /*********************************************************************//**
1782 Adds a record lock request in the record queue. The request is normally
1783 added as the last in the queue, but if there are no waiting lock requests
1784 on the record, and the request to be added is not a waiting request, we
1785 can reuse a suitable record lock object already existing on the same page,
1786 just setting the appropriate bit in its bitmap. This is a low-level function
1787 which does NOT check for deadlocks or lock compatibility!
1788 @return lock where the bit was set */
1789 static
1790 void
lock_rec_add_to_queue(ulint type_mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,trx_t * trx,bool caller_owns_trx_mutex)1791 lock_rec_add_to_queue(
1792 /*==================*/
1793 ulint type_mode,/*!< in: lock mode, wait, gap
1794 etc. flags; type is ignored
1795 and replaced by LOCK_REC */
1796 const buf_block_t* block, /*!< in: buffer block containing
1797 the record */
1798 ulint heap_no,/*!< in: heap number of the record */
1799 dict_index_t* index, /*!< in: index of record */
1800 trx_t* trx, /*!< in/out: transaction */
1801 bool caller_owns_trx_mutex)
1802 /*!< in: TRUE if caller owns the
1803 transaction mutex */
1804 {
1805 #ifdef UNIV_DEBUG
1806 ut_ad(lock_mutex_own());
1807 ut_ad(caller_owns_trx_mutex == trx_mutex_own(trx));
1808 ut_ad(dict_index_is_clust(index)
1809 || dict_index_get_online_status(index) != ONLINE_INDEX_CREATION);
1810 switch (type_mode & LOCK_MODE_MASK) {
1811 case LOCK_X:
1812 case LOCK_S:
1813 break;
1814 default:
1815 ut_error;
1816 }
1817
1818 if (!(type_mode & (LOCK_WAIT | LOCK_GAP))) {
1819 lock_mode mode = (type_mode & LOCK_MODE_MASK) == LOCK_S
1820 ? LOCK_X
1821 : LOCK_S;
1822 const lock_t* other_lock
1823 = lock_rec_other_has_expl_req(
1824 mode, block, false, heap_no, trx);
1825 #ifdef WITH_WSREP
1826 if (UNIV_UNLIKELY(other_lock && trx->is_wsrep())) {
1827 /* Only BF transaction may be granted lock
1828 before other conflicting lock request. */
1829 if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)
1830 && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
1831 /* If it is not BF, this case is a bug. */
1832 wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
1833 wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
1834 ut_error;
1835 }
1836 } else
1837 #endif /* WITH_WSREP */
1838 ut_ad(!other_lock);
1839 }
1840 #endif /* UNIV_DEBUG */
1841
1842 type_mode |= LOCK_REC;
1843
1844 /* If rec is the supremum record, then we can reset the gap bit, as
1845 all locks on the supremum are automatically of the gap type, and we
1846 try to avoid unnecessary memory consumption of a new record lock
1847 struct for a gap type lock */
1848
1849 if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
1850 ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
1851
1852 /* There should never be LOCK_REC_NOT_GAP on a supremum
1853 record, but let us play safe */
1854
1855 type_mode &= ~(LOCK_GAP | LOCK_REC_NOT_GAP);
1856 }
1857
1858 lock_t* lock;
1859 lock_t* first_lock;
1860 hash_table_t* hash = lock_hash_get(type_mode);
1861
1862 /* Look for a waiting lock request on the same record or on a gap */
1863
1864 for (first_lock = lock = lock_rec_get_first_on_page(hash, block);
1865 lock != NULL;
1866 lock = lock_rec_get_next_on_page(lock)) {
1867
1868 if (lock_get_wait(lock)
1869 && lock_rec_get_nth_bit(lock, heap_no)) {
1870
1871 break;
1872 }
1873 }
1874
1875 if (lock == NULL && !(type_mode & LOCK_WAIT)) {
1876
1877 /* Look for a similar record lock on the same page:
1878 if one is found and there are no waiting lock requests,
1879 we can just set the bit */
1880
1881 lock = lock_rec_find_similar_on_page(
1882 type_mode, heap_no, first_lock, trx);
1883
1884 if (lock != NULL) {
1885
1886 lock_rec_set_nth_bit(lock, heap_no);
1887
1888 return;
1889 }
1890 }
1891
1892 lock_rec_create(
1893 #ifdef WITH_WSREP
1894 NULL, NULL,
1895 #endif
1896 type_mode, block, heap_no, index, trx, caller_owns_trx_mutex);
1897 }
1898
1899 /*********************************************************************//**
1900 Tries to lock the specified record in the mode requested. If not immediately
1901 possible, enqueues a waiting lock request. This is a low-level function
1902 which does NOT look at implicit locks! Checks lock compatibility within
1903 explicit locks. This function sets a normal next-key lock, or in the case
1904 of a page supremum record, a gap type lock.
1905 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
1906 static
1907 dberr_t
lock_rec_lock(bool impl,ulint mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,que_thr_t * thr)1908 lock_rec_lock(
1909 /*==========*/
1910 bool impl, /*!< in: if true, no lock is set
1911 if no wait is necessary: we
1912 assume that the caller will
1913 set an implicit lock */
1914 ulint mode, /*!< in: lock mode: LOCK_X or
1915 LOCK_S possibly ORed to either
1916 LOCK_GAP or LOCK_REC_NOT_GAP */
1917 const buf_block_t* block, /*!< in: buffer block containing
1918 the record */
1919 ulint heap_no,/*!< in: heap number of record */
1920 dict_index_t* index, /*!< in: index of record */
1921 que_thr_t* thr) /*!< in: query thread */
1922 {
1923 trx_t *trx= thr_get_trx(thr);
1924 dberr_t err= DB_SUCCESS;
1925
1926 ut_ad(!srv_read_only_mode);
1927 ut_ad((LOCK_MODE_MASK & mode) == LOCK_S ||
1928 (LOCK_MODE_MASK & mode) == LOCK_X);
1929 ut_ad((mode & LOCK_TYPE_MASK) == LOCK_GAP ||
1930 (mode & LOCK_TYPE_MASK) == LOCK_REC_NOT_GAP ||
1931 (mode & LOCK_TYPE_MASK) == 0);
1932 ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1933 DBUG_EXECUTE_IF("innodb_report_deadlock", return DB_DEADLOCK;);
1934
1935 lock_mutex_enter();
1936 ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ||
1937 lock_table_has(trx, index->table, LOCK_IS));
1938 ut_ad((LOCK_MODE_MASK & mode) != LOCK_X ||
1939 lock_table_has(trx, index->table, LOCK_IX));
1940
1941 if (lock_t *lock= lock_rec_get_first_on_page(lock_sys.rec_hash, block))
1942 {
1943 trx_mutex_enter(trx);
1944 if (lock_rec_get_next_on_page(lock) ||
1945 lock->trx != trx ||
1946 lock->type_mode != (ulint(mode) | LOCK_REC) ||
1947 lock_rec_get_n_bits(lock) <= heap_no)
1948 {
1949 /* Do nothing if the trx already has a strong enough lock on rec */
1950 if (!lock_rec_has_expl(mode, block, heap_no, trx))
1951 {
1952 if (
1953 #ifdef WITH_WSREP
1954 lock_t *c_lock=
1955 #endif
1956 lock_rec_other_has_conflicting(mode, block, heap_no, trx))
1957 {
1958 /*
1959 If another transaction has a non-gap conflicting
1960 request in the queue, as this transaction does not
1961 have a lock strong enough already granted on the
1962 record, we have to wait. */
1963 err = lock_rec_enqueue_waiting(
1964 #ifdef WITH_WSREP
1965 c_lock,
1966 #endif /* WITH_WSREP */
1967 mode, block, heap_no, index, thr, NULL);
1968 }
1969 else if (!impl)
1970 {
1971 /* Set the requested lock on the record. */
1972 lock_rec_add_to_queue(LOCK_REC | mode, block, heap_no, index, trx,
1973 true);
1974 err= DB_SUCCESS_LOCKED_REC;
1975 }
1976 }
1977 }
1978 else if (!impl)
1979 {
1980 /*
1981 If the nth bit of the record lock is already set then we do not set
1982 a new lock bit, otherwise we do set
1983 */
1984 if (!lock_rec_get_nth_bit(lock, heap_no))
1985 {
1986 lock_rec_set_nth_bit(lock, heap_no);
1987 err= DB_SUCCESS_LOCKED_REC;
1988 }
1989 }
1990 trx_mutex_exit(trx);
1991 }
1992 else
1993 {
1994 /*
1995 Simplified and faster path for the most common cases
1996 Note that we don't own the trx mutex.
1997 */
1998 if (!impl)
1999 lock_rec_create(
2000 #ifdef WITH_WSREP
2001 NULL, NULL,
2002 #endif
2003 mode, block, heap_no, index, trx, false);
2004
2005 err= DB_SUCCESS_LOCKED_REC;
2006 }
2007 lock_mutex_exit();
2008 MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
2009 return err;
2010 }
2011
2012 /*********************************************************************//**
2013 Checks if a waiting record lock request still has to wait in a queue.
2014 @return lock that is causing the wait */
2015 static
2016 const lock_t*
lock_rec_has_to_wait_in_queue(const lock_t * wait_lock)2017 lock_rec_has_to_wait_in_queue(
2018 /*==========================*/
2019 const lock_t* wait_lock) /*!< in: waiting record lock */
2020 {
2021 const lock_t* lock;
2022 ulint space;
2023 ulint page_no;
2024 ulint heap_no;
2025 ulint bit_mask;
2026 ulint bit_offset;
2027 hash_table_t* hash;
2028
2029 ut_ad(wait_lock);
2030 ut_ad(lock_mutex_own());
2031 ut_ad(lock_get_wait(wait_lock));
2032 ut_ad(lock_get_type_low(wait_lock) == LOCK_REC);
2033
2034 space = wait_lock->un_member.rec_lock.space;
2035 page_no = wait_lock->un_member.rec_lock.page_no;
2036 heap_no = lock_rec_find_set_bit(wait_lock);
2037
2038 bit_offset = heap_no / 8;
2039 bit_mask = static_cast<ulint>(1) << (heap_no % 8);
2040
2041 hash = lock_hash_get(wait_lock->type_mode);
2042
2043 for (lock = lock_rec_get_first_on_page_addr(hash, space, page_no);
2044 lock != wait_lock;
2045 lock = lock_rec_get_next_on_page_const(lock)) {
2046 const byte* p = (const byte*) &lock[1];
2047
2048 if (heap_no < lock_rec_get_n_bits(lock)
2049 && (p[bit_offset] & bit_mask)
2050 && lock_has_to_wait(wait_lock, lock)) {
2051 return(lock);
2052 }
2053 }
2054
2055 return(NULL);
2056 }
2057
2058 /** Grant a lock to a waiting lock request and release the waiting transaction
2059 after lock_reset_lock_and_trx_wait() has been called. */
lock_grant_after_reset(lock_t * lock)2060 static void lock_grant_after_reset(lock_t* lock)
2061 {
2062 ut_ad(lock_mutex_own());
2063 ut_ad(trx_mutex_own(lock->trx));
2064
2065 if (lock_get_mode(lock) == LOCK_AUTO_INC) {
2066 dict_table_t* table = lock->un_member.tab_lock.table;
2067
2068 if (table->autoinc_trx == lock->trx) {
2069 ib::error() << "Transaction already had an"
2070 << " AUTO-INC lock!";
2071 } else {
2072 table->autoinc_trx = lock->trx;
2073
2074 ib_vector_push(lock->trx->autoinc_locks, &lock);
2075 }
2076 }
2077
2078 DBUG_PRINT("ib_lock", ("wait for trx " TRX_ID_FMT " ends",
2079 trx_get_id_for_print(lock->trx)));
2080
2081 /* If we are resolving a deadlock by choosing another transaction
2082 as a victim, then our original transaction may not be in the
2083 TRX_QUE_LOCK_WAIT state, and there is no need to end the lock wait
2084 for it */
2085
2086 if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
2087 que_thr_t* thr;
2088
2089 thr = que_thr_end_lock_wait(lock->trx);
2090
2091 if (thr != NULL) {
2092 lock_wait_release_thread_if_suspended(thr);
2093 }
2094 }
2095 }
2096
2097 /** Grant a lock to a waiting lock request and release the waiting transaction. */
lock_grant(lock_t * lock)2098 static void lock_grant(lock_t* lock)
2099 {
2100 lock_reset_lock_and_trx_wait(lock);
2101 trx_mutex_enter(lock->trx);
2102 lock_grant_after_reset(lock);
2103 trx_mutex_exit(lock->trx);
2104 }
2105
2106 /*************************************************************//**
2107 Cancels a waiting record lock request and releases the waiting transaction
2108 that requested it. NOTE: does NOT check if waiting lock requests behind this
2109 one can now be granted! */
2110 static
2111 void
lock_rec_cancel(lock_t * lock)2112 lock_rec_cancel(
2113 /*============*/
2114 lock_t* lock) /*!< in: waiting record lock request */
2115 {
2116 que_thr_t* thr;
2117
2118 ut_ad(lock_mutex_own());
2119 ut_ad(lock_get_type_low(lock) == LOCK_REC);
2120
2121 /* Reset the bit (there can be only one set bit) in the lock bitmap */
2122 lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
2123
2124 /* Reset the wait flag and the back pointer to lock in trx */
2125
2126 lock_reset_lock_and_trx_wait(lock);
2127
2128 /* The following function releases the trx from lock wait */
2129
2130 trx_mutex_enter(lock->trx);
2131
2132 thr = que_thr_end_lock_wait(lock->trx);
2133
2134 if (thr != NULL) {
2135 lock_wait_release_thread_if_suspended(thr);
2136 }
2137
2138 trx_mutex_exit(lock->trx);
2139 }
2140
2141 static
2142 void
lock_grant_and_move_on_page(ulint rec_fold,ulint space,ulint page_no)2143 lock_grant_and_move_on_page(ulint rec_fold, ulint space, ulint page_no)
2144 {
2145 lock_t* lock;
2146 lock_t* previous = static_cast<lock_t*>(
2147 hash_get_nth_cell(lock_sys.rec_hash,
2148 hash_calc_hash(rec_fold, lock_sys.rec_hash))
2149 ->node);
2150 if (previous == NULL) {
2151 return;
2152 }
2153 if (previous->un_member.rec_lock.space == space &&
2154 previous->un_member.rec_lock.page_no == page_no) {
2155 lock = previous;
2156 }
2157 else {
2158 while (previous->hash &&
2159 (previous->hash->un_member.rec_lock.space != space ||
2160 previous->hash->un_member.rec_lock.page_no != page_no)) {
2161 previous = previous->hash;
2162 }
2163 lock = previous->hash;
2164 }
2165
2166 ut_ad(previous->hash == lock || previous == lock);
2167 /* Grant locks if there are no conflicting locks ahead.
2168 Move granted locks to the head of the list. */
2169 while (lock) {
2170 /* If the lock is a wait lock on this page, and it does not need to wait. */
2171 ut_ad(!lock->trx->is_wsrep());
2172 if (lock_get_wait(lock)
2173 && lock->un_member.rec_lock.space == space
2174 && lock->un_member.rec_lock.page_no == page_no
2175 && !lock_rec_has_to_wait_in_queue(lock)) {
2176 lock_grant(lock);
2177
2178 if (previous != NULL) {
2179 /* Move the lock to the head of the list. */
2180 HASH_GET_NEXT(hash, previous) = HASH_GET_NEXT(hash, lock);
2181 lock_rec_insert_to_head(lock, rec_fold);
2182 } else {
2183 /* Already at the head of the list. */
2184 previous = lock;
2185 }
2186 /* Move on to the next lock. */
2187 lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, previous));
2188 } else {
2189 previous = lock;
2190 lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, lock));
2191 }
2192 }
2193 }
2194
2195 /** Remove a record lock request, waiting or granted, from the queue and
2196 grant locks to other transactions in the queue if they now are entitled
2197 to a lock. NOTE: all record locks contained in in_lock are removed.
2198 @param[in,out] in_lock record lock */
lock_rec_dequeue_from_page(lock_t * in_lock)2199 static void lock_rec_dequeue_from_page(lock_t* in_lock)
2200 {
2201 ulint space;
2202 ulint page_no;
2203 hash_table_t* lock_hash;
2204
2205 ut_ad(lock_mutex_own());
2206 ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
2207 /* We may or may not be holding in_lock->trx->mutex here. */
2208
2209 space = in_lock->un_member.rec_lock.space;
2210 page_no = in_lock->un_member.rec_lock.page_no;
2211
2212 in_lock->index->table->n_rec_locks--;
2213
2214 lock_hash = lock_hash_get(in_lock->type_mode);
2215
2216 ulint rec_fold = lock_rec_fold(space, page_no);
2217
2218 HASH_DELETE(lock_t, hash, lock_hash, rec_fold, in_lock);
2219 UT_LIST_REMOVE(in_lock->trx->lock.trx_locks, in_lock);
2220
2221 MONITOR_INC(MONITOR_RECLOCK_REMOVED);
2222 MONITOR_DEC(MONITOR_NUM_RECLOCK);
2223
2224 if (innodb_lock_schedule_algorithm
2225 == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS
2226 || lock_hash != lock_sys.rec_hash
2227 || thd_is_replication_slave_thread(in_lock->trx->mysql_thd)) {
2228 /* Check if waiting locks in the queue can now be granted:
2229 grant locks if there are no conflicting locks ahead. Stop at
2230 the first X lock that is waiting or has been granted. */
2231
2232 for (lock_t* lock = lock_rec_get_first_on_page_addr(
2233 lock_hash, space, page_no);
2234 lock != NULL;
2235 lock = lock_rec_get_next_on_page(lock)) {
2236
2237 if (!lock_get_wait(lock)) {
2238 continue;
2239 }
2240 const lock_t* c = lock_rec_has_to_wait_in_queue(lock);
2241 if (!c) {
2242 /* Grant the lock */
2243 ut_ad(lock->trx != in_lock->trx);
2244 lock_grant(lock);
2245 }
2246 }
2247 } else {
2248 lock_grant_and_move_on_page(rec_fold, space, page_no);
2249 }
2250 }
2251
2252 /*************************************************************//**
2253 Removes a record lock request, waiting or granted, from the queue. */
2254 void
lock_rec_discard(lock_t * in_lock)2255 lock_rec_discard(
2256 /*=============*/
2257 lock_t* in_lock) /*!< in: record lock object: all
2258 record locks which are contained
2259 in this lock object are removed */
2260 {
2261 ulint space;
2262 ulint page_no;
2263 trx_lock_t* trx_lock;
2264
2265 ut_ad(lock_mutex_own());
2266 ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
2267
2268 trx_lock = &in_lock->trx->lock;
2269
2270 space = in_lock->un_member.rec_lock.space;
2271 page_no = in_lock->un_member.rec_lock.page_no;
2272
2273 in_lock->index->table->n_rec_locks--;
2274
2275 HASH_DELETE(lock_t, hash, lock_hash_get(in_lock->type_mode),
2276 lock_rec_fold(space, page_no), in_lock);
2277
2278 UT_LIST_REMOVE(trx_lock->trx_locks, in_lock);
2279
2280 MONITOR_INC(MONITOR_RECLOCK_REMOVED);
2281 MONITOR_DEC(MONITOR_NUM_RECLOCK);
2282 }
2283
2284 /*************************************************************//**
2285 Removes record lock objects set on an index page which is discarded. This
2286 function does not move locks, or check for waiting locks, therefore the
2287 lock bitmaps must already be reset when this function is called. */
2288 static
2289 void
lock_rec_free_all_from_discard_page_low(ulint space,ulint page_no,hash_table_t * lock_hash)2290 lock_rec_free_all_from_discard_page_low(
2291 /*====================================*/
2292 ulint space,
2293 ulint page_no,
2294 hash_table_t* lock_hash)
2295 {
2296 lock_t* lock;
2297 lock_t* next_lock;
2298
2299 lock = lock_rec_get_first_on_page_addr(lock_hash, space, page_no);
2300
2301 while (lock != NULL) {
2302 ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2303 ut_ad(!lock_get_wait(lock));
2304
2305 next_lock = lock_rec_get_next_on_page(lock);
2306
2307 lock_rec_discard(lock);
2308
2309 lock = next_lock;
2310 }
2311 }
2312
2313 /*************************************************************//**
2314 Removes record lock objects set on an index page which is discarded. This
2315 function does not move locks, or check for waiting locks, therefore the
2316 lock bitmaps must already be reset when this function is called. */
2317 void
lock_rec_free_all_from_discard_page(const buf_block_t * block)2318 lock_rec_free_all_from_discard_page(
2319 /*================================*/
2320 const buf_block_t* block) /*!< in: page to be discarded */
2321 {
2322 ulint space;
2323 ulint page_no;
2324
2325 ut_ad(lock_mutex_own());
2326
2327 space = block->page.id.space();
2328 page_no = block->page.id.page_no();
2329
2330 lock_rec_free_all_from_discard_page_low(
2331 space, page_no, lock_sys.rec_hash);
2332 lock_rec_free_all_from_discard_page_low(
2333 space, page_no, lock_sys.prdt_hash);
2334 lock_rec_free_all_from_discard_page_low(
2335 space, page_no, lock_sys.prdt_page_hash);
2336 }
2337
2338 /*============= RECORD LOCK MOVING AND INHERITING ===================*/
2339
2340 /*************************************************************//**
2341 Resets the lock bits for a single record. Releases transactions waiting for
2342 lock requests here. */
2343 static
2344 void
lock_rec_reset_and_release_wait_low(hash_table_t * hash,const buf_block_t * block,ulint heap_no)2345 lock_rec_reset_and_release_wait_low(
2346 /*================================*/
2347 hash_table_t* hash, /*!< in: hash table */
2348 const buf_block_t* block, /*!< in: buffer block containing
2349 the record */
2350 ulint heap_no)/*!< in: heap number of record */
2351 {
2352 lock_t* lock;
2353
2354 ut_ad(lock_mutex_own());
2355
2356 for (lock = lock_rec_get_first(hash, block, heap_no);
2357 lock != NULL;
2358 lock = lock_rec_get_next(heap_no, lock)) {
2359
2360 if (lock_get_wait(lock)) {
2361 lock_rec_cancel(lock);
2362 } else {
2363 lock_rec_reset_nth_bit(lock, heap_no);
2364 }
2365 }
2366 }
2367
2368 /*************************************************************//**
2369 Resets the lock bits for a single record. Releases transactions waiting for
2370 lock requests here. */
2371 static
2372 void
lock_rec_reset_and_release_wait(const buf_block_t * block,ulint heap_no)2373 lock_rec_reset_and_release_wait(
2374 /*============================*/
2375 const buf_block_t* block, /*!< in: buffer block containing
2376 the record */
2377 ulint heap_no)/*!< in: heap number of record */
2378 {
2379 lock_rec_reset_and_release_wait_low(
2380 lock_sys.rec_hash, block, heap_no);
2381
2382 lock_rec_reset_and_release_wait_low(
2383 lock_sys.prdt_hash, block, PAGE_HEAP_NO_INFIMUM);
2384 lock_rec_reset_and_release_wait_low(
2385 lock_sys.prdt_page_hash, block, PAGE_HEAP_NO_INFIMUM);
2386 }
2387
2388 /*************************************************************//**
2389 Makes a record to inherit the locks (except LOCK_INSERT_INTENTION type)
2390 of another record as gap type locks, but does not reset the lock bits of
2391 the other record. Also waiting lock requests on rec are inherited as
2392 GRANTED gap locks. */
2393 static
2394 void
lock_rec_inherit_to_gap(const buf_block_t * heir_block,const buf_block_t * block,ulint heir_heap_no,ulint heap_no)2395 lock_rec_inherit_to_gap(
2396 /*====================*/
2397 const buf_block_t* heir_block, /*!< in: block containing the
2398 record which inherits */
2399 const buf_block_t* block, /*!< in: block containing the
2400 record from which inherited;
2401 does NOT reset the locks on
2402 this record */
2403 ulint heir_heap_no, /*!< in: heap_no of the
2404 inheriting record */
2405 ulint heap_no) /*!< in: heap_no of the
2406 donating record */
2407 {
2408 lock_t* lock;
2409
2410 ut_ad(lock_mutex_own());
2411
2412 /* If srv_locks_unsafe_for_binlog is TRUE or session is using
2413 READ COMMITTED isolation level, we do not want locks set
2414 by an UPDATE or a DELETE to be inherited as gap type locks. But we
2415 DO want S-locks/X-locks(taken for replace) set by a consistency
2416 constraint to be inherited also then. */
2417
2418 for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
2419 lock != NULL;
2420 lock = lock_rec_get_next(heap_no, lock)) {
2421
2422 if (!lock_rec_get_insert_intention(lock)
2423 && !((srv_locks_unsafe_for_binlog
2424 || lock->trx->isolation_level
2425 <= TRX_ISO_READ_COMMITTED)
2426 && lock_get_mode(lock) ==
2427 (lock->trx->duplicates ? LOCK_S : LOCK_X))) {
2428 lock_rec_add_to_queue(
2429 LOCK_REC | LOCK_GAP
2430 | ulint(lock_get_mode(lock)),
2431 heir_block, heir_heap_no, lock->index,
2432 lock->trx, FALSE);
2433 }
2434 }
2435 }
2436
2437 /*************************************************************//**
2438 Makes a record to inherit the gap locks (except LOCK_INSERT_INTENTION type)
2439 of another record as gap type locks, but does not reset the lock bits of the
2440 other record. Also waiting lock requests are inherited as GRANTED gap locks. */
2441 static
2442 void
lock_rec_inherit_to_gap_if_gap_lock(const buf_block_t * block,ulint heir_heap_no,ulint heap_no)2443 lock_rec_inherit_to_gap_if_gap_lock(
2444 /*================================*/
2445 const buf_block_t* block, /*!< in: buffer block */
2446 ulint heir_heap_no, /*!< in: heap_no of
2447 record which inherits */
2448 ulint heap_no) /*!< in: heap_no of record
2449 from which inherited;
2450 does NOT reset the locks
2451 on this record */
2452 {
2453 lock_t* lock;
2454
2455 lock_mutex_enter();
2456
2457 for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
2458 lock != NULL;
2459 lock = lock_rec_get_next(heap_no, lock)) {
2460
2461 if (!lock_rec_get_insert_intention(lock)
2462 && (heap_no == PAGE_HEAP_NO_SUPREMUM
2463 || !lock_rec_get_rec_not_gap(lock))) {
2464
2465 lock_rec_add_to_queue(
2466 LOCK_REC | LOCK_GAP
2467 | ulint(lock_get_mode(lock)),
2468 block, heir_heap_no, lock->index,
2469 lock->trx, FALSE);
2470 }
2471 }
2472
2473 lock_mutex_exit();
2474 }
2475
2476 /*************************************************************//**
2477 Moves the locks of a record to another record and resets the lock bits of
2478 the donating record. */
2479 static
2480 void
lock_rec_move_low(hash_table_t * lock_hash,const buf_block_t * receiver,const buf_block_t * donator,ulint receiver_heap_no,ulint donator_heap_no)2481 lock_rec_move_low(
2482 /*==============*/
2483 hash_table_t* lock_hash, /*!< in: hash table to use */
2484 const buf_block_t* receiver, /*!< in: buffer block containing
2485 the receiving record */
2486 const buf_block_t* donator, /*!< in: buffer block containing
2487 the donating record */
2488 ulint receiver_heap_no,/*!< in: heap_no of the record
2489 which gets the locks; there
2490 must be no lock requests
2491 on it! */
2492 ulint donator_heap_no)/*!< in: heap_no of the record
2493 which gives the locks */
2494 {
2495 lock_t* lock;
2496
2497 ut_ad(lock_mutex_own());
2498
2499 /* If the lock is predicate lock, it resides on INFIMUM record */
2500 ut_ad(lock_rec_get_first(
2501 lock_hash, receiver, receiver_heap_no) == NULL
2502 || lock_hash == lock_sys.prdt_hash
2503 || lock_hash == lock_sys.prdt_page_hash);
2504
2505 for (lock = lock_rec_get_first(lock_hash,
2506 donator, donator_heap_no);
2507 lock != NULL;
2508 lock = lock_rec_get_next(donator_heap_no, lock)) {
2509
2510 const ulint type_mode = lock->type_mode;
2511
2512 lock_rec_reset_nth_bit(lock, donator_heap_no);
2513
2514 if (type_mode & LOCK_WAIT) {
2515 lock_reset_lock_and_trx_wait(lock);
2516 }
2517
2518 /* Note that we FIRST reset the bit, and then set the lock:
2519 the function works also if donator == receiver */
2520
2521 lock_rec_add_to_queue(
2522 type_mode, receiver, receiver_heap_no,
2523 lock->index, lock->trx, FALSE);
2524 }
2525
2526 ut_ad(lock_rec_get_first(lock_sys.rec_hash,
2527 donator, donator_heap_no) == NULL);
2528 }
2529
2530 /** Move all the granted locks to the front of the given lock list.
2531 All the waiting locks will be at the end of the list.
2532 @param[in,out] lock_list the given lock list. */
2533 static
2534 void
lock_move_granted_locks_to_front(UT_LIST_BASE_NODE_T (lock_t)& lock_list)2535 lock_move_granted_locks_to_front(
2536 UT_LIST_BASE_NODE_T(lock_t)& lock_list)
2537 {
2538 lock_t* lock;
2539
2540 bool seen_waiting_lock = false;
2541
2542 for (lock = UT_LIST_GET_FIRST(lock_list); lock;
2543 lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
2544
2545 if (!seen_waiting_lock) {
2546 if (lock->is_waiting()) {
2547 seen_waiting_lock = true;
2548 }
2549 continue;
2550 }
2551
2552 ut_ad(seen_waiting_lock);
2553
2554 if (!lock->is_waiting()) {
2555 lock_t* prev = UT_LIST_GET_PREV(trx_locks, lock);
2556 ut_a(prev);
2557 ut_list_move_to_front(lock_list, lock);
2558 lock = prev;
2559 }
2560 }
2561 }
2562
2563 /*************************************************************//**
2564 Moves the locks of a record to another record and resets the lock bits of
2565 the donating record. */
2566 UNIV_INLINE
2567 void
lock_rec_move(const buf_block_t * receiver,const buf_block_t * donator,ulint receiver_heap_no,ulint donator_heap_no)2568 lock_rec_move(
2569 /*==========*/
2570 const buf_block_t* receiver, /*!< in: buffer block containing
2571 the receiving record */
2572 const buf_block_t* donator, /*!< in: buffer block containing
2573 the donating record */
2574 ulint receiver_heap_no,/*!< in: heap_no of the record
2575 which gets the locks; there
2576 must be no lock requests
2577 on it! */
2578 ulint donator_heap_no)/*!< in: heap_no of the record
2579 which gives the locks */
2580 {
2581 lock_rec_move_low(lock_sys.rec_hash, receiver, donator,
2582 receiver_heap_no, donator_heap_no);
2583 }
2584
2585 /*************************************************************//**
2586 Updates the lock table when we have reorganized a page. NOTE: we copy
2587 also the locks set on the infimum of the page; the infimum may carry
2588 locks if an update of a record is occurring on the page, and its locks
2589 were temporarily stored on the infimum. */
2590 void
lock_move_reorganize_page(const buf_block_t * block,const buf_block_t * oblock)2591 lock_move_reorganize_page(
2592 /*======================*/
2593 const buf_block_t* block, /*!< in: old index page, now
2594 reorganized */
2595 const buf_block_t* oblock) /*!< in: copy of the old, not
2596 reorganized page */
2597 {
2598 lock_t* lock;
2599 UT_LIST_BASE_NODE_T(lock_t) old_locks;
2600 mem_heap_t* heap = NULL;
2601 ulint comp;
2602
2603 lock_mutex_enter();
2604
2605 /* FIXME: This needs to deal with predicate lock too */
2606 lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block);
2607
2608 if (lock == NULL) {
2609 lock_mutex_exit();
2610
2611 return;
2612 }
2613
2614 heap = mem_heap_create(256);
2615
2616 /* Copy first all the locks on the page to heap and reset the
2617 bitmaps in the original locks; chain the copies of the locks
2618 using the trx_locks field in them. */
2619
2620 UT_LIST_INIT(old_locks, &lock_t::trx_locks);
2621
2622 do {
2623 /* Make a copy of the lock */
2624 lock_t* old_lock = lock_rec_copy(lock, heap);
2625
2626 UT_LIST_ADD_LAST(old_locks, old_lock);
2627
2628 /* Reset bitmap of lock */
2629 lock_rec_bitmap_reset(lock);
2630
2631 if (lock_get_wait(lock)) {
2632
2633 lock_reset_lock_and_trx_wait(lock);
2634 }
2635
2636 lock = lock_rec_get_next_on_page(lock);
2637 } while (lock != NULL);
2638
2639 comp = page_is_comp(block->frame);
2640 ut_ad(comp == page_is_comp(oblock->frame));
2641
2642 lock_move_granted_locks_to_front(old_locks);
2643
2644 DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize",
2645 ut_list_reverse(old_locks););
2646
2647 for (lock = UT_LIST_GET_FIRST(old_locks); lock;
2648 lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
2649
2650 /* NOTE: we copy also the locks set on the infimum and
2651 supremum of the page; the infimum may carry locks if an
2652 update of a record is occurring on the page, and its locks
2653 were temporarily stored on the infimum */
2654 const rec_t* rec1 = page_get_infimum_rec(
2655 buf_block_get_frame(block));
2656 const rec_t* rec2 = page_get_infimum_rec(
2657 buf_block_get_frame(oblock));
2658
2659 /* Set locks according to old locks */
2660 for (;;) {
2661 ulint old_heap_no;
2662 ulint new_heap_no;
2663 ut_d(const rec_t* const orec = rec1);
2664 ut_ad(page_rec_is_metadata(rec1)
2665 == page_rec_is_metadata(rec2));
2666
2667 if (comp) {
2668 old_heap_no = rec_get_heap_no_new(rec2);
2669 new_heap_no = rec_get_heap_no_new(rec1);
2670
2671 rec1 = page_rec_get_next_low(rec1, TRUE);
2672 rec2 = page_rec_get_next_low(rec2, TRUE);
2673 } else {
2674 old_heap_no = rec_get_heap_no_old(rec2);
2675 new_heap_no = rec_get_heap_no_old(rec1);
2676 ut_ad(!memcmp(rec1, rec2,
2677 rec_get_data_size_old(rec2)));
2678
2679 rec1 = page_rec_get_next_low(rec1, FALSE);
2680 rec2 = page_rec_get_next_low(rec2, FALSE);
2681 }
2682
2683 /* Clear the bit in old_lock. */
2684 if (old_heap_no < lock->un_member.rec_lock.n_bits
2685 && lock_rec_reset_nth_bit(lock, old_heap_no)) {
2686 ut_ad(!page_rec_is_metadata(orec));
2687
2688 /* NOTE that the old lock bitmap could be too
2689 small for the new heap number! */
2690
2691 lock_rec_add_to_queue(
2692 lock->type_mode, block, new_heap_no,
2693 lock->index, lock->trx, FALSE);
2694 }
2695
2696 if (new_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2697 ut_ad(old_heap_no == PAGE_HEAP_NO_SUPREMUM);
2698 break;
2699 }
2700 }
2701
2702 ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2703 }
2704
2705 lock_mutex_exit();
2706
2707 mem_heap_free(heap);
2708
2709 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2710 ut_ad(lock_rec_validate_page(block));
2711 #endif
2712 }
2713
2714 /*************************************************************//**
2715 Moves the explicit locks on user records to another page if a record
2716 list end is moved to another page. */
2717 void
lock_move_rec_list_end(const buf_block_t * new_block,const buf_block_t * block,const rec_t * rec)2718 lock_move_rec_list_end(
2719 /*===================*/
2720 const buf_block_t* new_block, /*!< in: index page to move to */
2721 const buf_block_t* block, /*!< in: index page */
2722 const rec_t* rec) /*!< in: record on page: this
2723 is the first record moved */
2724 {
2725 lock_t* lock;
2726 const ulint comp = page_rec_is_comp(rec);
2727
2728 ut_ad(buf_block_get_frame(block) == page_align(rec));
2729 ut_ad(comp == page_is_comp(buf_block_get_frame(new_block)));
2730
2731 lock_mutex_enter();
2732
2733 /* Note: when we move locks from record to record, waiting locks
2734 and possible granted gap type locks behind them are enqueued in
2735 the original order, because new elements are inserted to a hash
2736 table to the end of the hash chain, and lock_rec_add_to_queue
2737 does not reuse locks if there are waiters in the queue. */
2738
2739 for (lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block); lock;
2740 lock = lock_rec_get_next_on_page(lock)) {
2741 const rec_t* rec1 = rec;
2742 const rec_t* rec2;
2743 const ulint type_mode = lock->type_mode;
2744
2745 if (comp) {
2746 if (page_offset(rec1) == PAGE_NEW_INFIMUM) {
2747 rec1 = page_rec_get_next_low(rec1, TRUE);
2748 }
2749
2750 rec2 = page_rec_get_next_low(
2751 buf_block_get_frame(new_block)
2752 + PAGE_NEW_INFIMUM, TRUE);
2753 } else {
2754 if (page_offset(rec1) == PAGE_OLD_INFIMUM) {
2755 rec1 = page_rec_get_next_low(rec1, FALSE);
2756 }
2757
2758 rec2 = page_rec_get_next_low(
2759 buf_block_get_frame(new_block)
2760 + PAGE_OLD_INFIMUM, FALSE);
2761 }
2762
2763 /* Copy lock requests on user records to new page and
2764 reset the lock bits on the old */
2765
2766 for (;;) {
2767 ut_ad(page_rec_is_metadata(rec1)
2768 == page_rec_is_metadata(rec2));
2769 ut_d(const rec_t* const orec = rec1);
2770
2771 ulint rec1_heap_no;
2772 ulint rec2_heap_no;
2773
2774 if (comp) {
2775 rec1_heap_no = rec_get_heap_no_new(rec1);
2776
2777 if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2778 break;
2779 }
2780
2781 rec2_heap_no = rec_get_heap_no_new(rec2);
2782 rec1 = page_rec_get_next_low(rec1, TRUE);
2783 rec2 = page_rec_get_next_low(rec2, TRUE);
2784 } else {
2785 rec1_heap_no = rec_get_heap_no_old(rec1);
2786
2787 if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2788 break;
2789 }
2790
2791 rec2_heap_no = rec_get_heap_no_old(rec2);
2792
2793 ut_ad(rec_get_data_size_old(rec1)
2794 == rec_get_data_size_old(rec2));
2795
2796 ut_ad(!memcmp(rec1, rec2,
2797 rec_get_data_size_old(rec1)));
2798
2799 rec1 = page_rec_get_next_low(rec1, FALSE);
2800 rec2 = page_rec_get_next_low(rec2, FALSE);
2801 }
2802
2803 if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2804 && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2805 ut_ad(!page_rec_is_metadata(orec));
2806
2807 if (type_mode & LOCK_WAIT) {
2808 lock_reset_lock_and_trx_wait(lock);
2809 }
2810
2811 lock_rec_add_to_queue(
2812 type_mode, new_block, rec2_heap_no,
2813 lock->index, lock->trx, FALSE);
2814 }
2815 }
2816 }
2817
2818 lock_mutex_exit();
2819
2820 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2821 ut_ad(lock_rec_validate_page(block));
2822 ut_ad(lock_rec_validate_page(new_block));
2823 #endif
2824 }
2825
2826 /*************************************************************//**
2827 Moves the explicit locks on user records to another page if a record
2828 list start is moved to another page. */
2829 void
lock_move_rec_list_start(const buf_block_t * new_block,const buf_block_t * block,const rec_t * rec,const rec_t * old_end)2830 lock_move_rec_list_start(
2831 /*=====================*/
2832 const buf_block_t* new_block, /*!< in: index page to
2833 move to */
2834 const buf_block_t* block, /*!< in: index page */
2835 const rec_t* rec, /*!< in: record on page:
2836 this is the first
2837 record NOT copied */
2838 const rec_t* old_end) /*!< in: old
2839 previous-to-last
2840 record on new_page
2841 before the records
2842 were copied */
2843 {
2844 lock_t* lock;
2845 const ulint comp = page_rec_is_comp(rec);
2846
2847 ut_ad(block->frame == page_align(rec));
2848 ut_ad(new_block->frame == page_align(old_end));
2849 ut_ad(comp == page_rec_is_comp(old_end));
2850 ut_ad(!page_rec_is_metadata(rec));
2851
2852 lock_mutex_enter();
2853
2854 for (lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block); lock;
2855 lock = lock_rec_get_next_on_page(lock)) {
2856 const rec_t* rec1;
2857 const rec_t* rec2;
2858 const ulint type_mode = lock->type_mode;
2859
2860 if (comp) {
2861 rec1 = page_rec_get_next_low(
2862 buf_block_get_frame(block)
2863 + PAGE_NEW_INFIMUM, TRUE);
2864 rec2 = page_rec_get_next_low(old_end, TRUE);
2865 } else {
2866 rec1 = page_rec_get_next_low(
2867 buf_block_get_frame(block)
2868 + PAGE_OLD_INFIMUM, FALSE);
2869 rec2 = page_rec_get_next_low(old_end, FALSE);
2870 }
2871
2872 /* Copy lock requests on user records to new page and
2873 reset the lock bits on the old */
2874
2875 while (rec1 != rec) {
2876 ut_ad(page_rec_is_metadata(rec1)
2877 == page_rec_is_metadata(rec2));
2878 ut_d(const rec_t* const prev = rec1);
2879
2880 ulint rec1_heap_no;
2881 ulint rec2_heap_no;
2882
2883 if (comp) {
2884 rec1_heap_no = rec_get_heap_no_new(rec1);
2885 rec2_heap_no = rec_get_heap_no_new(rec2);
2886
2887 rec1 = page_rec_get_next_low(rec1, TRUE);
2888 rec2 = page_rec_get_next_low(rec2, TRUE);
2889 } else {
2890 rec1_heap_no = rec_get_heap_no_old(rec1);
2891 rec2_heap_no = rec_get_heap_no_old(rec2);
2892
2893 ut_ad(!memcmp(rec1, rec2,
2894 rec_get_data_size_old(rec2)));
2895
2896 rec1 = page_rec_get_next_low(rec1, FALSE);
2897 rec2 = page_rec_get_next_low(rec2, FALSE);
2898 }
2899
2900 if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2901 && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2902 ut_ad(!page_rec_is_metadata(prev));
2903
2904 if (type_mode & LOCK_WAIT) {
2905 lock_reset_lock_and_trx_wait(lock);
2906 }
2907
2908 lock_rec_add_to_queue(
2909 type_mode, new_block, rec2_heap_no,
2910 lock->index, lock->trx, FALSE);
2911 }
2912 }
2913
2914 #ifdef UNIV_DEBUG
2915 if (page_rec_is_supremum(rec)) {
2916 ulint i;
2917
2918 for (i = PAGE_HEAP_NO_USER_LOW;
2919 i < lock_rec_get_n_bits(lock); i++) {
2920 if (lock_rec_get_nth_bit(lock, i)) {
2921 ib::fatal()
2922 << "lock_move_rec_list_start():"
2923 << i << " not moved in "
2924 << (void*) lock;
2925 }
2926 }
2927 }
2928 #endif /* UNIV_DEBUG */
2929 }
2930
2931 lock_mutex_exit();
2932
2933 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2934 ut_ad(lock_rec_validate_page(block));
2935 #endif
2936 }
2937
2938 /*************************************************************//**
2939 Moves the explicit locks on user records to another page if a record
2940 list start is moved to another page. */
2941 void
lock_rtr_move_rec_list(const buf_block_t * new_block,const buf_block_t * block,rtr_rec_move_t * rec_move,ulint num_move)2942 lock_rtr_move_rec_list(
2943 /*===================*/
2944 const buf_block_t* new_block, /*!< in: index page to
2945 move to */
2946 const buf_block_t* block, /*!< in: index page */
2947 rtr_rec_move_t* rec_move, /*!< in: recording records
2948 moved */
2949 ulint num_move) /*!< in: num of rec to move */
2950 {
2951 lock_t* lock;
2952 ulint comp;
2953
2954 if (!num_move) {
2955 return;
2956 }
2957
2958 comp = page_rec_is_comp(rec_move[0].old_rec);
2959
2960 ut_ad(block->frame == page_align(rec_move[0].old_rec));
2961 ut_ad(new_block->frame == page_align(rec_move[0].new_rec));
2962 ut_ad(comp == page_rec_is_comp(rec_move[0].new_rec));
2963
2964 lock_mutex_enter();
2965
2966 for (lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block); lock;
2967 lock = lock_rec_get_next_on_page(lock)) {
2968 ulint moved = 0;
2969 const rec_t* rec1;
2970 const rec_t* rec2;
2971 const ulint type_mode = lock->type_mode;
2972
2973 /* Copy lock requests on user records to new page and
2974 reset the lock bits on the old */
2975
2976 while (moved < num_move) {
2977 ulint rec1_heap_no;
2978 ulint rec2_heap_no;
2979
2980 rec1 = rec_move[moved].old_rec;
2981 rec2 = rec_move[moved].new_rec;
2982 ut_ad(!page_rec_is_metadata(rec1));
2983 ut_ad(!page_rec_is_metadata(rec2));
2984
2985 if (comp) {
2986 rec1_heap_no = rec_get_heap_no_new(rec1);
2987 rec2_heap_no = rec_get_heap_no_new(rec2);
2988
2989 } else {
2990 rec1_heap_no = rec_get_heap_no_old(rec1);
2991 rec2_heap_no = rec_get_heap_no_old(rec2);
2992
2993 ut_ad(!memcmp(rec1, rec2,
2994 rec_get_data_size_old(rec2)));
2995 }
2996
2997 if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2998 && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2999 if (type_mode & LOCK_WAIT) {
3000 lock_reset_lock_and_trx_wait(lock);
3001 }
3002
3003 lock_rec_add_to_queue(
3004 type_mode, new_block, rec2_heap_no,
3005 lock->index, lock->trx, FALSE);
3006
3007 rec_move[moved].moved = true;
3008 }
3009
3010 moved++;
3011 }
3012 }
3013
3014 lock_mutex_exit();
3015
3016 #ifdef UNIV_DEBUG_LOCK_VALIDATE
3017 ut_ad(lock_rec_validate_page(block));
3018 #endif
3019 }
3020 /*************************************************************//**
3021 Updates the lock table when a page is split to the right. */
3022 void
lock_update_split_right(const buf_block_t * right_block,const buf_block_t * left_block)3023 lock_update_split_right(
3024 /*====================*/
3025 const buf_block_t* right_block, /*!< in: right page */
3026 const buf_block_t* left_block) /*!< in: left page */
3027 {
3028 ulint heap_no = lock_get_min_heap_no(right_block);
3029
3030 lock_mutex_enter();
3031
3032 /* Move the locks on the supremum of the left page to the supremum
3033 of the right page */
3034
3035 lock_rec_move(right_block, left_block,
3036 PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3037
3038 /* Inherit the locks to the supremum of left page from the successor
3039 of the infimum on right page */
3040
3041 lock_rec_inherit_to_gap(left_block, right_block,
3042 PAGE_HEAP_NO_SUPREMUM, heap_no);
3043
3044 lock_mutex_exit();
3045 }
3046
3047 /*************************************************************//**
3048 Updates the lock table when a page is merged to the right. */
3049 void
lock_update_merge_right(const buf_block_t * right_block,const rec_t * orig_succ,const buf_block_t * left_block)3050 lock_update_merge_right(
3051 /*====================*/
3052 const buf_block_t* right_block, /*!< in: right page to
3053 which merged */
3054 const rec_t* orig_succ, /*!< in: original
3055 successor of infimum
3056 on the right page
3057 before merge */
3058 const buf_block_t* left_block) /*!< in: merged index
3059 page which will be
3060 discarded */
3061 {
3062 ut_ad(!page_rec_is_metadata(orig_succ));
3063
3064 lock_mutex_enter();
3065
3066 /* Inherit the locks from the supremum of the left page to the
3067 original successor of infimum on the right page, to which the left
3068 page was merged */
3069
3070 lock_rec_inherit_to_gap(right_block, left_block,
3071 page_rec_get_heap_no(orig_succ),
3072 PAGE_HEAP_NO_SUPREMUM);
3073
3074 /* Reset the locks on the supremum of the left page, releasing
3075 waiting transactions */
3076
3077 lock_rec_reset_and_release_wait_low(
3078 lock_sys.rec_hash, left_block, PAGE_HEAP_NO_SUPREMUM);
3079
3080 /* there should exist no page lock on the left page,
3081 otherwise, it will be blocked from merge */
3082 ut_ad(!lock_rec_get_first_on_page_addr(lock_sys.prdt_page_hash,
3083 left_block->page.id.space(),
3084 left_block->page.id.page_no()));
3085
3086 lock_rec_free_all_from_discard_page(left_block);
3087
3088 lock_mutex_exit();
3089 }
3090
3091 /*************************************************************//**
3092 Updates the lock table when the root page is copied to another in
3093 btr_root_raise_and_insert. Note that we leave lock structs on the
3094 root page, even though they do not make sense on other than leaf
3095 pages: the reason is that in a pessimistic update the infimum record
3096 of the root page will act as a dummy carrier of the locks of the record
3097 to be updated. */
3098 void
lock_update_root_raise(const buf_block_t * block,const buf_block_t * root)3099 lock_update_root_raise(
3100 /*===================*/
3101 const buf_block_t* block, /*!< in: index page to which copied */
3102 const buf_block_t* root) /*!< in: root page */
3103 {
3104 lock_mutex_enter();
3105
3106 /* Move the locks on the supremum of the root to the supremum
3107 of block */
3108
3109 lock_rec_move(block, root,
3110 PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3111 lock_mutex_exit();
3112 }
3113
3114 /*************************************************************//**
3115 Updates the lock table when a page is copied to another and the original page
3116 is removed from the chain of leaf pages, except if page is the root! */
3117 void
lock_update_copy_and_discard(const buf_block_t * new_block,const buf_block_t * block)3118 lock_update_copy_and_discard(
3119 /*=========================*/
3120 const buf_block_t* new_block, /*!< in: index page to
3121 which copied */
3122 const buf_block_t* block) /*!< in: index page;
3123 NOT the root! */
3124 {
3125 lock_mutex_enter();
3126
3127 /* Move the locks on the supremum of the old page to the supremum
3128 of new_page */
3129
3130 lock_rec_move(new_block, block,
3131 PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3132 lock_rec_free_all_from_discard_page(block);
3133
3134 lock_mutex_exit();
3135 }
3136
3137 /*************************************************************//**
3138 Updates the lock table when a page is split to the left. */
3139 void
lock_update_split_left(const buf_block_t * right_block,const buf_block_t * left_block)3140 lock_update_split_left(
3141 /*===================*/
3142 const buf_block_t* right_block, /*!< in: right page */
3143 const buf_block_t* left_block) /*!< in: left page */
3144 {
3145 ulint heap_no = lock_get_min_heap_no(right_block);
3146
3147 lock_mutex_enter();
3148
3149 /* Inherit the locks to the supremum of the left page from the
3150 successor of the infimum on the right page */
3151
3152 lock_rec_inherit_to_gap(left_block, right_block,
3153 PAGE_HEAP_NO_SUPREMUM, heap_no);
3154
3155 lock_mutex_exit();
3156 }
3157
3158 /*************************************************************//**
3159 Updates the lock table when a page is merged to the left. */
3160 void
lock_update_merge_left(const buf_block_t * left_block,const rec_t * orig_pred,const buf_block_t * right_block)3161 lock_update_merge_left(
3162 /*===================*/
3163 const buf_block_t* left_block, /*!< in: left page to
3164 which merged */
3165 const rec_t* orig_pred, /*!< in: original predecessor
3166 of supremum on the left page
3167 before merge */
3168 const buf_block_t* right_block) /*!< in: merged index page
3169 which will be discarded */
3170 {
3171 const rec_t* left_next_rec;
3172
3173 ut_ad(left_block->frame == page_align(orig_pred));
3174
3175 lock_mutex_enter();
3176
3177 left_next_rec = page_rec_get_next_const(orig_pred);
3178
3179 if (!page_rec_is_supremum(left_next_rec)) {
3180
3181 /* Inherit the locks on the supremum of the left page to the
3182 first record which was moved from the right page */
3183
3184 lock_rec_inherit_to_gap(left_block, left_block,
3185 page_rec_get_heap_no(left_next_rec),
3186 PAGE_HEAP_NO_SUPREMUM);
3187
3188 /* Reset the locks on the supremum of the left page,
3189 releasing waiting transactions */
3190
3191 lock_rec_reset_and_release_wait_low(
3192 lock_sys.rec_hash, left_block, PAGE_HEAP_NO_SUPREMUM);
3193 }
3194
3195 /* Move the locks from the supremum of right page to the supremum
3196 of the left page */
3197
3198 lock_rec_move(left_block, right_block,
3199 PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3200
3201 /* there should exist no page lock on the right page,
3202 otherwise, it will be blocked from merge */
3203 ut_ad(!lock_rec_get_first_on_page_addr(
3204 lock_sys.prdt_page_hash,
3205 right_block->page.id.space(),
3206 right_block->page.id.page_no()));
3207
3208 lock_rec_free_all_from_discard_page(right_block);
3209
3210 lock_mutex_exit();
3211 }
3212
3213 /*************************************************************//**
3214 Resets the original locks on heir and replaces them with gap type locks
3215 inherited from rec. */
3216 void
lock_rec_reset_and_inherit_gap_locks(const buf_block_t * heir_block,const buf_block_t * block,ulint heir_heap_no,ulint heap_no)3217 lock_rec_reset_and_inherit_gap_locks(
3218 /*=================================*/
3219 const buf_block_t* heir_block, /*!< in: block containing the
3220 record which inherits */
3221 const buf_block_t* block, /*!< in: block containing the
3222 record from which inherited;
3223 does NOT reset the locks on
3224 this record */
3225 ulint heir_heap_no, /*!< in: heap_no of the
3226 inheriting record */
3227 ulint heap_no) /*!< in: heap_no of the
3228 donating record */
3229 {
3230 lock_mutex_enter();
3231
3232 lock_rec_reset_and_release_wait(heir_block, heir_heap_no);
3233
3234 lock_rec_inherit_to_gap(heir_block, block, heir_heap_no, heap_no);
3235
3236 lock_mutex_exit();
3237 }
3238
3239 /*************************************************************//**
3240 Updates the lock table when a page is discarded. */
3241 void
lock_update_discard(const buf_block_t * heir_block,ulint heir_heap_no,const buf_block_t * block)3242 lock_update_discard(
3243 /*================*/
3244 const buf_block_t* heir_block, /*!< in: index page
3245 which will inherit the locks */
3246 ulint heir_heap_no, /*!< in: heap_no of the record
3247 which will inherit the locks */
3248 const buf_block_t* block) /*!< in: index page
3249 which will be discarded */
3250 {
3251 const page_t* page = block->frame;
3252 const rec_t* rec;
3253 ulint heap_no;
3254
3255 lock_mutex_enter();
3256
3257 if (lock_rec_get_first_on_page(lock_sys.rec_hash, block)) {
3258 ut_ad(!lock_rec_get_first_on_page(lock_sys.prdt_hash, block));
3259 ut_ad(!lock_rec_get_first_on_page(lock_sys.prdt_page_hash,
3260 block));
3261 /* Inherit all the locks on the page to the record and
3262 reset all the locks on the page */
3263
3264 if (page_is_comp(page)) {
3265 rec = page + PAGE_NEW_INFIMUM;
3266
3267 do {
3268 heap_no = rec_get_heap_no_new(rec);
3269
3270 lock_rec_inherit_to_gap(heir_block, block,
3271 heir_heap_no, heap_no);
3272
3273 lock_rec_reset_and_release_wait(
3274 block, heap_no);
3275
3276 rec = page + rec_get_next_offs(rec, TRUE);
3277 } while (heap_no != PAGE_HEAP_NO_SUPREMUM);
3278 } else {
3279 rec = page + PAGE_OLD_INFIMUM;
3280
3281 do {
3282 heap_no = rec_get_heap_no_old(rec);
3283
3284 lock_rec_inherit_to_gap(heir_block, block,
3285 heir_heap_no, heap_no);
3286
3287 lock_rec_reset_and_release_wait(
3288 block, heap_no);
3289
3290 rec = page + rec_get_next_offs(rec, FALSE);
3291 } while (heap_no != PAGE_HEAP_NO_SUPREMUM);
3292 }
3293
3294 lock_rec_free_all_from_discard_page_low(
3295 block->page.id.space(), block->page.id.page_no(),
3296 lock_sys.rec_hash);
3297 } else {
3298 lock_rec_free_all_from_discard_page_low(
3299 block->page.id.space(), block->page.id.page_no(),
3300 lock_sys.prdt_hash);
3301 lock_rec_free_all_from_discard_page_low(
3302 block->page.id.space(), block->page.id.page_no(),
3303 lock_sys.prdt_page_hash);
3304 }
3305
3306 lock_mutex_exit();
3307 }
3308
3309 /*************************************************************//**
3310 Updates the lock table when a new user record is inserted. */
3311 void
lock_update_insert(const buf_block_t * block,const rec_t * rec)3312 lock_update_insert(
3313 /*===============*/
3314 const buf_block_t* block, /*!< in: buffer block containing rec */
3315 const rec_t* rec) /*!< in: the inserted record */
3316 {
3317 ulint receiver_heap_no;
3318 ulint donator_heap_no;
3319
3320 ut_ad(block->frame == page_align(rec));
3321 ut_ad(!page_rec_is_metadata(rec));
3322
3323 /* Inherit the gap-locking locks for rec, in gap mode, from the next
3324 record */
3325
3326 if (page_rec_is_comp(rec)) {
3327 receiver_heap_no = rec_get_heap_no_new(rec);
3328 donator_heap_no = rec_get_heap_no_new(
3329 page_rec_get_next_low(rec, TRUE));
3330 } else {
3331 receiver_heap_no = rec_get_heap_no_old(rec);
3332 donator_heap_no = rec_get_heap_no_old(
3333 page_rec_get_next_low(rec, FALSE));
3334 }
3335
3336 lock_rec_inherit_to_gap_if_gap_lock(
3337 block, receiver_heap_no, donator_heap_no);
3338 }
3339
3340 /*************************************************************//**
3341 Updates the lock table when a record is removed. */
3342 void
lock_update_delete(const buf_block_t * block,const rec_t * rec)3343 lock_update_delete(
3344 /*===============*/
3345 const buf_block_t* block, /*!< in: buffer block containing rec */
3346 const rec_t* rec) /*!< in: the record to be removed */
3347 {
3348 const page_t* page = block->frame;
3349 ulint heap_no;
3350 ulint next_heap_no;
3351
3352 ut_ad(page == page_align(rec));
3353 ut_ad(!page_rec_is_metadata(rec));
3354
3355 if (page_is_comp(page)) {
3356 heap_no = rec_get_heap_no_new(rec);
3357 next_heap_no = rec_get_heap_no_new(page
3358 + rec_get_next_offs(rec,
3359 TRUE));
3360 } else {
3361 heap_no = rec_get_heap_no_old(rec);
3362 next_heap_no = rec_get_heap_no_old(page
3363 + rec_get_next_offs(rec,
3364 FALSE));
3365 }
3366
3367 lock_mutex_enter();
3368
3369 /* Let the next record inherit the locks from rec, in gap mode */
3370
3371 lock_rec_inherit_to_gap(block, block, next_heap_no, heap_no);
3372
3373 /* Reset the lock bits on rec and release waiting transactions */
3374
3375 lock_rec_reset_and_release_wait(block, heap_no);
3376
3377 lock_mutex_exit();
3378 }
3379
3380 /*********************************************************************//**
3381 Stores on the page infimum record the explicit locks of another record.
3382 This function is used to store the lock state of a record when it is
3383 updated and the size of the record changes in the update. The record
3384 is moved in such an update, perhaps to another page. The infimum record
3385 acts as a dummy carrier record, taking care of lock releases while the
3386 actual record is being moved. */
3387 void
lock_rec_store_on_page_infimum(const buf_block_t * block,const rec_t * rec)3388 lock_rec_store_on_page_infimum(
3389 /*===========================*/
3390 const buf_block_t* block, /*!< in: buffer block containing rec */
3391 const rec_t* rec) /*!< in: record whose lock state
3392 is stored on the infimum
3393 record of the same page; lock
3394 bits are reset on the
3395 record */
3396 {
3397 ulint heap_no = page_rec_get_heap_no(rec);
3398
3399 ut_ad(block->frame == page_align(rec));
3400
3401 lock_mutex_enter();
3402
3403 lock_rec_move(block, block, PAGE_HEAP_NO_INFIMUM, heap_no);
3404
3405 lock_mutex_exit();
3406 }
3407
3408 /*********************************************************************//**
3409 Restores the state of explicit lock requests on a single record, where the
3410 state was stored on the infimum of the page. */
3411 void
lock_rec_restore_from_page_infimum(const buf_block_t * block,const rec_t * rec,const buf_block_t * donator)3412 lock_rec_restore_from_page_infimum(
3413 /*===============================*/
3414 const buf_block_t* block, /*!< in: buffer block containing rec */
3415 const rec_t* rec, /*!< in: record whose lock state
3416 is restored */
3417 const buf_block_t* donator)/*!< in: page (rec is not
3418 necessarily on this page)
3419 whose infimum stored the lock
3420 state; lock bits are reset on
3421 the infimum */
3422 {
3423 ulint heap_no = page_rec_get_heap_no(rec);
3424
3425 lock_mutex_enter();
3426
3427 lock_rec_move(block, donator, heap_no, PAGE_HEAP_NO_INFIMUM);
3428
3429 lock_mutex_exit();
3430 }
3431
3432 /*========================= TABLE LOCKS ==============================*/
3433
3434 /** Functor for accessing the embedded node within a table lock. */
3435 struct TableLockGetNode {
operator ()TableLockGetNode3436 ut_list_node<lock_t>& operator() (lock_t& elem)
3437 {
3438 return(elem.un_member.tab_lock.locks);
3439 }
3440 };
3441
3442 /*********************************************************************//**
3443 Creates a table lock object and adds it as the last in the lock queue
3444 of the table. Does NOT check for deadlocks or lock compatibility.
3445 @return own: new lock object */
3446 UNIV_INLINE
3447 lock_t*
lock_table_create(dict_table_t * table,ulint type_mode,trx_t * trx,lock_t * c_lock=NULL)3448 lock_table_create(
3449 /*==============*/
3450 dict_table_t* table, /*!< in/out: database table
3451 in dictionary cache */
3452 ulint type_mode,/*!< in: lock mode possibly ORed with
3453 LOCK_WAIT */
3454 trx_t* trx /*!< in: trx */
3455 #ifdef WITH_WSREP
3456 , lock_t* c_lock = NULL /*!< in: conflicting lock */
3457 #endif
3458 )
3459 {
3460 lock_t* lock;
3461
3462 ut_ad(table && trx);
3463 ut_ad(lock_mutex_own());
3464 ut_ad(trx_mutex_own(trx));
3465 ut_ad(trx->is_recovered || trx->state == TRX_STATE_ACTIVE);
3466 ut_ad(!trx->auto_commit || trx->will_lock);
3467
3468 if ((type_mode & LOCK_MODE_MASK) == LOCK_AUTO_INC) {
3469 ++table->n_waiting_or_granted_auto_inc_locks;
3470 }
3471
3472 /* For AUTOINC locking we reuse the lock instance only if
3473 there is no wait involved else we allocate the waiting lock
3474 from the transaction lock heap. */
3475 if (type_mode == LOCK_AUTO_INC) {
3476
3477 lock = table->autoinc_lock;
3478
3479 table->autoinc_trx = trx;
3480
3481 ib_vector_push(trx->autoinc_locks, &lock);
3482
3483 } else if (trx->lock.table_cached
3484 < UT_ARR_SIZE(trx->lock.table_pool)) {
3485 lock = &trx->lock.table_pool[trx->lock.table_cached++];
3486 } else {
3487
3488 lock = static_cast<lock_t*>(
3489 mem_heap_alloc(trx->lock.lock_heap, sizeof(*lock)));
3490
3491 }
3492
3493 lock->type_mode = ib_uint32_t(type_mode | LOCK_TABLE);
3494 lock->trx = trx;
3495
3496 lock->un_member.tab_lock.table = table;
3497
3498 ut_ad(table->get_ref_count() > 0 || !table->can_be_evicted);
3499
3500 UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
3501
3502 #ifdef WITH_WSREP
3503 if (c_lock && trx->is_wsrep()) {
3504 if (wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
3505 ut_list_insert(table->locks, c_lock, lock,
3506 TableLockGetNode());
3507 if (UNIV_UNLIKELY(wsrep_debug)) {
3508 wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
3509 wsrep_report_bf_lock_wait(c_lock->trx->mysql_thd, c_lock->trx->id);
3510 }
3511 } else {
3512 ut_list_append(table->locks, lock, TableLockGetNode());
3513 }
3514
3515 trx_mutex_enter(c_lock->trx);
3516
3517 if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
3518 c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
3519
3520 if (UNIV_UNLIKELY(wsrep_debug)) {
3521 wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
3522 wsrep_report_bf_lock_wait(c_lock->trx->mysql_thd, c_lock->trx->id);
3523 wsrep_print_wait_locks(c_lock);
3524 }
3525
3526 /* The lock release will call lock_grant(),
3527 which would acquire trx->mutex again. */
3528 trx_mutex_exit(trx);
3529 lock_cancel_waiting_and_release(
3530 c_lock->trx->lock.wait_lock);
3531 trx_mutex_enter(trx);
3532 }
3533
3534 trx_mutex_exit(c_lock->trx);
3535 } else
3536 #endif /* WITH_WSREP */
3537 ut_list_append(table->locks, lock, TableLockGetNode());
3538
3539 if (type_mode & LOCK_WAIT) {
3540
3541 lock_set_lock_and_trx_wait(lock, trx);
3542 }
3543
3544 lock->trx->lock.table_locks.push_back(lock);
3545
3546 MONITOR_INC(MONITOR_TABLELOCK_CREATED);
3547 MONITOR_INC(MONITOR_NUM_TABLELOCK);
3548
3549 return(lock);
3550 }
3551
3552 /*************************************************************//**
3553 Pops autoinc lock requests from the transaction's autoinc_locks. We
3554 handle the case where there are gaps in the array and they need to
3555 be popped off the stack. */
3556 UNIV_INLINE
3557 void
lock_table_pop_autoinc_locks(trx_t * trx)3558 lock_table_pop_autoinc_locks(
3559 /*=========================*/
3560 trx_t* trx) /*!< in/out: transaction that owns the AUTOINC locks */
3561 {
3562 ut_ad(lock_mutex_own());
3563 ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3564
3565 /* Skip any gaps, gaps are NULL lock entries in the
3566 trx->autoinc_locks vector. */
3567
3568 do {
3569 ib_vector_pop(trx->autoinc_locks);
3570
3571 if (ib_vector_is_empty(trx->autoinc_locks)) {
3572 return;
3573 }
3574
3575 } while (*(lock_t**) ib_vector_get_last(trx->autoinc_locks) == NULL);
3576 }
3577
3578 /*************************************************************//**
3579 Removes an autoinc lock request from the transaction's autoinc_locks. */
3580 UNIV_INLINE
3581 void
lock_table_remove_autoinc_lock(lock_t * lock,trx_t * trx)3582 lock_table_remove_autoinc_lock(
3583 /*===========================*/
3584 lock_t* lock, /*!< in: table lock */
3585 trx_t* trx) /*!< in/out: transaction that owns the lock */
3586 {
3587 lock_t* autoinc_lock;
3588 lint i = ib_vector_size(trx->autoinc_locks) - 1;
3589
3590 ut_ad(lock_mutex_own());
3591 ut_ad(lock_get_mode(lock) == LOCK_AUTO_INC);
3592 ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
3593 ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3594
3595 /* With stored functions and procedures the user may drop
3596 a table within the same "statement". This special case has
3597 to be handled by deleting only those AUTOINC locks that were
3598 held by the table being dropped. */
3599
3600 autoinc_lock = *static_cast<lock_t**>(
3601 ib_vector_get(trx->autoinc_locks, i));
3602
3603 /* This is the default fast case. */
3604
3605 if (autoinc_lock == lock) {
3606 lock_table_pop_autoinc_locks(trx);
3607 } else {
3608 /* The last element should never be NULL */
3609 ut_a(autoinc_lock != NULL);
3610
3611 /* Handle freeing the locks from within the stack. */
3612
3613 while (--i >= 0) {
3614 autoinc_lock = *static_cast<lock_t**>(
3615 ib_vector_get(trx->autoinc_locks, i));
3616
3617 if (autoinc_lock == lock) {
3618 void* null_var = NULL;
3619 ib_vector_set(trx->autoinc_locks, i, &null_var);
3620 return;
3621 }
3622 }
3623
3624 /* Must find the autoinc lock. */
3625 ut_error;
3626 }
3627 }
3628
3629 /*************************************************************//**
3630 Removes a table lock request from the queue and the trx list of locks;
3631 this is a low-level function which does NOT check if waiting requests
3632 can now be granted. */
3633 UNIV_INLINE
3634 void
lock_table_remove_low(lock_t * lock)3635 lock_table_remove_low(
3636 /*==================*/
3637 lock_t* lock) /*!< in/out: table lock */
3638 {
3639 trx_t* trx;
3640 dict_table_t* table;
3641
3642 ut_ad(lock_mutex_own());
3643
3644 trx = lock->trx;
3645 table = lock->un_member.tab_lock.table;
3646
3647 /* Remove the table from the transaction's AUTOINC vector, if
3648 the lock that is being released is an AUTOINC lock. */
3649 if (lock_get_mode(lock) == LOCK_AUTO_INC) {
3650
3651 /* The table's AUTOINC lock can get transferred to
3652 another transaction before we get here. */
3653 if (table->autoinc_trx == trx) {
3654 table->autoinc_trx = NULL;
3655 }
3656
3657 /* The locks must be freed in the reverse order from
3658 the one in which they were acquired. This is to avoid
3659 traversing the AUTOINC lock vector unnecessarily.
3660
3661 We only store locks that were granted in the
3662 trx->autoinc_locks vector (see lock_table_create()
3663 and lock_grant()). Therefore it can be empty and we
3664 need to check for that. */
3665
3666 if (!lock_get_wait(lock)
3667 && !ib_vector_is_empty(trx->autoinc_locks)) {
3668
3669 lock_table_remove_autoinc_lock(lock, trx);
3670 }
3671
3672 ut_a(table->n_waiting_or_granted_auto_inc_locks > 0);
3673 table->n_waiting_or_granted_auto_inc_locks--;
3674 }
3675
3676 UT_LIST_REMOVE(trx->lock.trx_locks, lock);
3677 ut_list_remove(table->locks, lock, TableLockGetNode());
3678
3679 MONITOR_INC(MONITOR_TABLELOCK_REMOVED);
3680 MONITOR_DEC(MONITOR_NUM_TABLELOCK);
3681 }
3682
3683 /*********************************************************************//**
3684 Enqueues a waiting request for a table lock which cannot be granted
3685 immediately. Checks for deadlocks.
3686 @retval DB_LOCK_WAIT if the waiting lock was enqueued
3687 @retval DB_DEADLOCK if this transaction was chosen as the victim
3688 @retval DB_SUCCESS if the other transaction committed or aborted */
3689 static
3690 dberr_t
lock_table_enqueue_waiting(ulint mode,dict_table_t * table,que_thr_t * thr,lock_t * c_lock)3691 lock_table_enqueue_waiting(
3692 /*=======================*/
3693 ulint mode, /*!< in: lock mode this transaction is
3694 requesting */
3695 dict_table_t* table, /*!< in/out: table */
3696 que_thr_t* thr /*!< in: query thread */
3697 #ifdef WITH_WSREP
3698 , lock_t* c_lock /*!< in: conflicting lock or NULL */
3699 #endif
3700 )
3701 {
3702 trx_t* trx;
3703 lock_t* lock;
3704
3705 ut_ad(lock_mutex_own());
3706 ut_ad(!srv_read_only_mode);
3707
3708 trx = thr_get_trx(thr);
3709 ut_ad(trx_mutex_own(trx));
3710 ut_a(!que_thr_stop(thr));
3711
3712 switch (trx_get_dict_operation(trx)) {
3713 case TRX_DICT_OP_NONE:
3714 break;
3715 case TRX_DICT_OP_TABLE:
3716 case TRX_DICT_OP_INDEX:
3717 ib::error() << "A table lock wait happens in a dictionary"
3718 " operation. Table " << table->name
3719 << ". " << BUG_REPORT_MSG;
3720 ut_ad(0);
3721 }
3722
3723 #ifdef WITH_WSREP
3724 if (trx->is_wsrep() && trx->lock.was_chosen_as_deadlock_victim) {
3725 return(DB_DEADLOCK);
3726 }
3727 #endif /* WITH_WSREP */
3728
3729 /* Enqueue the lock request that will wait to be granted */
3730 lock = lock_table_create(table, ulint(mode) | LOCK_WAIT, trx
3731 #ifdef WITH_WSREP
3732 , c_lock
3733 #endif
3734 );
3735
3736 const trx_t* victim_trx =
3737 DeadlockChecker::check_and_resolve(lock, trx);
3738
3739 if (victim_trx != 0) {
3740 ut_ad(victim_trx == trx);
3741
3742 /* The order here is important, we don't want to
3743 lose the state of the lock before calling remove. */
3744 lock_table_remove_low(lock);
3745 lock_reset_lock_and_trx_wait(lock);
3746
3747 return(DB_DEADLOCK);
3748
3749 } else if (trx->lock.wait_lock == NULL) {
3750 /* Deadlock resolution chose another transaction as a victim,
3751 and we accidentally got our lock granted! */
3752
3753 return(DB_SUCCESS);
3754 }
3755
3756 trx->lock.que_state = TRX_QUE_LOCK_WAIT;
3757
3758 trx->lock.wait_started = time(NULL);
3759 trx->lock.was_chosen_as_deadlock_victim = false;
3760
3761 ut_a(que_thr_stop(thr));
3762
3763 MONITOR_INC(MONITOR_TABLELOCK_WAIT);
3764
3765 return(DB_LOCK_WAIT);
3766 }
3767
3768 /*********************************************************************//**
3769 Checks if other transactions have an incompatible mode lock request in
3770 the lock queue.
3771 @return lock or NULL */
3772 UNIV_INLINE
3773 lock_t*
lock_table_other_has_incompatible(const trx_t * trx,ulint wait,const dict_table_t * table,lock_mode mode)3774 lock_table_other_has_incompatible(
3775 /*==============================*/
3776 const trx_t* trx, /*!< in: transaction, or NULL if all
3777 transactions should be included */
3778 ulint wait, /*!< in: LOCK_WAIT if also
3779 waiting locks are taken into
3780 account, or 0 if not */
3781 const dict_table_t* table, /*!< in: table */
3782 lock_mode mode) /*!< in: lock mode */
3783 {
3784 lock_t* lock;
3785
3786 ut_ad(lock_mutex_own());
3787
3788 for (lock = UT_LIST_GET_LAST(table->locks);
3789 lock != NULL;
3790 lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock)) {
3791
3792 if (lock->trx != trx
3793 && !lock_mode_compatible(lock_get_mode(lock), mode)
3794 && (wait || !lock_get_wait(lock))) {
3795
3796 #ifdef WITH_WSREP
3797 if (lock->trx->is_wsrep()) {
3798 if (UNIV_UNLIKELY(wsrep_debug)) {
3799 ib::info() << "WSREP: table lock abort for table:"
3800 << table->name;
3801 ib::info() << " SQL: "
3802 << wsrep_thd_query(lock->trx->mysql_thd);
3803 }
3804 trx_mutex_enter(lock->trx);
3805 wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
3806 trx_mutex_exit(lock->trx);
3807 }
3808 #endif /* WITH_WSREP */
3809
3810 return(lock);
3811 }
3812 }
3813
3814 return(NULL);
3815 }
3816
3817 /*********************************************************************//**
3818 Locks the specified database table in the mode given. If the lock cannot
3819 be granted immediately, the query thread is put to wait.
3820 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
3821 dberr_t
lock_table(ulint flags,dict_table_t * table,lock_mode mode,que_thr_t * thr)3822 lock_table(
3823 /*=======*/
3824 ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG bit is set,
3825 does nothing */
3826 dict_table_t* table, /*!< in/out: database table
3827 in dictionary cache */
3828 lock_mode mode, /*!< in: lock mode */
3829 que_thr_t* thr) /*!< in: query thread */
3830 {
3831 trx_t* trx;
3832 dberr_t err;
3833 lock_t* wait_for;
3834
3835 ut_ad(table && thr);
3836
3837 /* Given limited visibility of temp-table we can avoid
3838 locking overhead */
3839 if ((flags & BTR_NO_LOCKING_FLAG)
3840 || srv_read_only_mode
3841 || table->is_temporary()) {
3842
3843 return(DB_SUCCESS);
3844 }
3845
3846 ut_a(flags == 0);
3847
3848 trx = thr_get_trx(thr);
3849
3850 /* Look for equal or stronger locks the same trx already
3851 has on the table. No need to acquire the lock mutex here
3852 because only this transacton can add/access table locks
3853 to/from trx_t::table_locks. */
3854
3855 if (lock_table_has(trx, table, mode)) {
3856
3857 return(DB_SUCCESS);
3858 }
3859
3860 /* Read only transactions can write to temp tables, we don't want
3861 to promote them to RW transactions. Their updates cannot be visible
3862 to other transactions. Therefore we can keep them out
3863 of the read views. */
3864
3865 if ((mode == LOCK_IX || mode == LOCK_X)
3866 && !trx->read_only
3867 && trx->rsegs.m_redo.rseg == 0) {
3868
3869 trx_set_rw_mode(trx);
3870 }
3871
3872 lock_mutex_enter();
3873
3874 DBUG_EXECUTE_IF("fatal-semaphore-timeout",
3875 { os_thread_sleep(3600000000LL); });
3876
3877 /* We have to check if the new lock is compatible with any locks
3878 other transactions have in the table lock queue. */
3879
3880 wait_for = lock_table_other_has_incompatible(
3881 trx, LOCK_WAIT, table, mode);
3882
3883 trx_mutex_enter(trx);
3884
3885 /* Another trx has a request on the table in an incompatible
3886 mode: this trx may have to wait */
3887
3888 if (wait_for != NULL) {
3889 err = lock_table_enqueue_waiting(ulint(mode) | flags, table,
3890 thr
3891 #ifdef WITH_WSREP
3892 , wait_for
3893 #endif
3894 );
3895 } else {
3896 lock_table_create(table, ulint(mode) | flags, trx);
3897
3898 ut_a(!flags || mode == LOCK_S || mode == LOCK_X);
3899
3900 err = DB_SUCCESS;
3901 }
3902
3903 lock_mutex_exit();
3904
3905 trx_mutex_exit(trx);
3906
3907 return(err);
3908 }
3909
3910 /*********************************************************************//**
3911 Creates a table IX lock object for a resurrected transaction. */
3912 void
lock_table_ix_resurrect(dict_table_t * table,trx_t * trx)3913 lock_table_ix_resurrect(
3914 /*====================*/
3915 dict_table_t* table, /*!< in/out: table */
3916 trx_t* trx) /*!< in/out: transaction */
3917 {
3918 ut_ad(trx->is_recovered);
3919
3920 if (lock_table_has(trx, table, LOCK_IX)) {
3921 return;
3922 }
3923
3924 lock_mutex_enter();
3925
3926 /* We have to check if the new lock is compatible with any locks
3927 other transactions have in the table lock queue. */
3928
3929 ut_ad(!lock_table_other_has_incompatible(
3930 trx, LOCK_WAIT, table, LOCK_IX));
3931
3932 trx_mutex_enter(trx);
3933 lock_table_create(table, LOCK_IX, trx);
3934 lock_mutex_exit();
3935 trx_mutex_exit(trx);
3936 }
3937
3938 /*********************************************************************//**
3939 Checks if a waiting table lock request still has to wait in a queue.
3940 @return TRUE if still has to wait */
3941 static
3942 bool
lock_table_has_to_wait_in_queue(const lock_t * wait_lock)3943 lock_table_has_to_wait_in_queue(
3944 /*============================*/
3945 const lock_t* wait_lock) /*!< in: waiting table lock */
3946 {
3947 const dict_table_t* table;
3948 const lock_t* lock;
3949
3950 ut_ad(lock_mutex_own());
3951 ut_ad(lock_get_wait(wait_lock));
3952
3953 table = wait_lock->un_member.tab_lock.table;
3954
3955 for (lock = UT_LIST_GET_FIRST(table->locks);
3956 lock != wait_lock;
3957 lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3958
3959 if (lock_has_to_wait(wait_lock, lock)) {
3960
3961 return(true);
3962 }
3963 }
3964
3965 return(false);
3966 }
3967
3968 /*************************************************************//**
3969 Removes a table lock request, waiting or granted, from the queue and grants
3970 locks to other transactions in the queue, if they now are entitled to a
3971 lock. */
3972 static
3973 void
lock_table_dequeue(lock_t * in_lock)3974 lock_table_dequeue(
3975 /*===============*/
3976 lock_t* in_lock)/*!< in/out: table lock object; transactions waiting
3977 behind will get their lock requests granted, if
3978 they are now qualified to it */
3979 {
3980 ut_ad(lock_mutex_own());
3981 ut_a(lock_get_type_low(in_lock) == LOCK_TABLE);
3982
3983 lock_t* lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
3984
3985 lock_table_remove_low(in_lock);
3986
3987 /* Check if waiting locks in the queue can now be granted: grant
3988 locks if there are no conflicting locks ahead. */
3989
3990 for (/* No op */;
3991 lock != NULL;
3992 lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3993
3994 if (lock_get_wait(lock)
3995 && !lock_table_has_to_wait_in_queue(lock)) {
3996
3997 /* Grant the lock */
3998 ut_ad(in_lock->trx != lock->trx);
3999 lock_grant(lock);
4000 }
4001 }
4002 }
4003
4004 /** Sets a lock on a table based on the given mode.
4005 @param[in] table table to lock
4006 @param[in,out] trx transaction
4007 @param[in] mode LOCK_X or LOCK_S
4008 @return error code or DB_SUCCESS. */
4009 dberr_t
lock_table_for_trx(dict_table_t * table,trx_t * trx,enum lock_mode mode)4010 lock_table_for_trx(
4011 dict_table_t* table,
4012 trx_t* trx,
4013 enum lock_mode mode)
4014 {
4015 mem_heap_t* heap;
4016 que_thr_t* thr;
4017 dberr_t err;
4018 sel_node_t* node;
4019 heap = mem_heap_create(512);
4020
4021 node = sel_node_create(heap);
4022 thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
4023 thr->graph->state = QUE_FORK_ACTIVE;
4024
4025 /* We use the select query graph as the dummy graph needed
4026 in the lock module call */
4027
4028 thr = static_cast<que_thr_t*>(
4029 que_fork_get_first_thr(
4030 static_cast<que_fork_t*>(que_node_get_parent(thr))));
4031
4032 que_thr_move_to_run_state_for_mysql(thr, trx);
4033
4034 run_again:
4035 thr->run_node = thr;
4036 thr->prev_node = thr->common.parent;
4037
4038 err = lock_table(0, table, mode, thr);
4039
4040 trx->error_state = err;
4041
4042 if (UNIV_LIKELY(err == DB_SUCCESS)) {
4043 que_thr_stop_for_mysql_no_error(thr, trx);
4044 } else {
4045 que_thr_stop_for_mysql(thr);
4046
4047 if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
4048 goto run_again;
4049 }
4050 }
4051
4052 que_graph_free(thr->graph);
4053 trx->op_info = "";
4054
4055 return(err);
4056 }
4057
4058 /*=========================== LOCK RELEASE ==============================*/
4059 static
4060 void
lock_grant_and_move_on_rec(hash_table_t * lock_hash,lock_t * first_lock,ulint heap_no)4061 lock_grant_and_move_on_rec(
4062 hash_table_t* lock_hash,
4063 lock_t* first_lock,
4064 ulint heap_no)
4065 {
4066 lock_t* lock;
4067 lock_t* previous;
4068 ulint space;
4069 ulint page_no;
4070 ulint rec_fold;
4071
4072 space = first_lock->un_member.rec_lock.space;
4073 page_no = first_lock->un_member.rec_lock.page_no;
4074 rec_fold = lock_rec_fold(space, page_no);
4075
4076 previous = (lock_t *) hash_get_nth_cell(lock_hash,
4077 hash_calc_hash(rec_fold, lock_hash))->node;
4078 if (previous == NULL) {
4079 return;
4080 }
4081 if (previous == first_lock) {
4082 lock = previous;
4083 } else {
4084 while (previous->hash &&
4085 previous->hash != first_lock) {
4086 previous = previous->hash;
4087 }
4088 lock = previous->hash;
4089 }
4090 /* Grant locks if there are no conflicting locks ahead.
4091 Move granted locks to the head of the list. */
4092 while (lock) {
4093 ut_ad(!lock->trx->is_wsrep());
4094 /* If the lock is a wait lock on this page, and it does not need to wait. */
4095 if (lock->un_member.rec_lock.space == space
4096 && lock->un_member.rec_lock.page_no == page_no
4097 && lock_rec_get_nth_bit(lock, heap_no)
4098 && lock_get_wait(lock)
4099 && !lock_rec_has_to_wait_in_queue(lock)) {
4100
4101 lock_grant(lock);
4102
4103 if (previous != NULL) {
4104 /* Move the lock to the head of the list. */
4105 HASH_GET_NEXT(hash, previous) = HASH_GET_NEXT(hash, lock);
4106 lock_rec_insert_to_head(lock, rec_fold);
4107 } else {
4108 /* Already at the head of the list. */
4109 previous = lock;
4110 }
4111 /* Move on to the next lock. */
4112 lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, previous));
4113 } else {
4114 previous = lock;
4115 lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, lock));
4116 }
4117 }
4118 }
4119
4120 /*************************************************************//**
4121 Removes a granted record lock of a transaction from the queue and grants
4122 locks to other transactions waiting in the queue if they now are entitled
4123 to a lock. */
4124 void
lock_rec_unlock(trx_t * trx,const buf_block_t * block,const rec_t * rec,lock_mode lock_mode)4125 lock_rec_unlock(
4126 /*============*/
4127 trx_t* trx, /*!< in/out: transaction that has
4128 set a record lock */
4129 const buf_block_t* block, /*!< in: buffer block containing rec */
4130 const rec_t* rec, /*!< in: record */
4131 lock_mode lock_mode)/*!< in: LOCK_S or LOCK_X */
4132 {
4133 lock_t* first_lock;
4134 lock_t* lock;
4135 ulint heap_no;
4136
4137 ut_ad(trx);
4138 ut_ad(rec);
4139 ut_ad(block->frame == page_align(rec));
4140 ut_ad(!trx->lock.wait_lock);
4141 ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
4142 ut_ad(!page_rec_is_metadata(rec));
4143
4144 heap_no = page_rec_get_heap_no(rec);
4145
4146 lock_mutex_enter();
4147 trx_mutex_enter(trx);
4148
4149 first_lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
4150
4151 /* Find the last lock with the same lock_mode and transaction
4152 on the record. */
4153
4154 for (lock = first_lock; lock != NULL;
4155 lock = lock_rec_get_next(heap_no, lock)) {
4156 if (lock->trx == trx && lock_get_mode(lock) == lock_mode) {
4157 goto released;
4158 }
4159 }
4160
4161 lock_mutex_exit();
4162 trx_mutex_exit(trx);
4163
4164 {
4165 ib::error err;
4166 err << "Unlock row could not find a " << lock_mode
4167 << " mode lock on the record. Current statement: ";
4168 size_t stmt_len;
4169 if (const char* stmt = innobase_get_stmt_unsafe(
4170 trx->mysql_thd, &stmt_len)) {
4171 err.write(stmt, stmt_len);
4172 }
4173 }
4174
4175 return;
4176
4177 released:
4178 ut_a(!lock_get_wait(lock));
4179 lock_rec_reset_nth_bit(lock, heap_no);
4180
4181 if (innodb_lock_schedule_algorithm
4182 == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
4183 thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
4184
4185 /* Check if we can now grant waiting lock requests */
4186
4187 for (lock = first_lock; lock != NULL;
4188 lock = lock_rec_get_next(heap_no, lock)) {
4189 if (!lock_get_wait(lock)) {
4190 continue;
4191 }
4192 const lock_t* c = lock_rec_has_to_wait_in_queue(lock);
4193 if (!c) {
4194 /* Grant the lock */
4195 ut_ad(trx != lock->trx);
4196 lock_grant(lock);
4197 }
4198 }
4199 } else {
4200 lock_grant_and_move_on_rec(lock_sys.rec_hash, first_lock, heap_no);
4201 }
4202
4203 lock_mutex_exit();
4204 trx_mutex_exit(trx);
4205 }
4206
4207 #ifdef UNIV_DEBUG
4208 /*********************************************************************//**
4209 Check if a transaction that has X or IX locks has set the dict_op
4210 code correctly. */
4211 static
4212 void
lock_check_dict_lock(const lock_t * lock)4213 lock_check_dict_lock(
4214 /*==================*/
4215 const lock_t* lock) /*!< in: lock to check */
4216 {
4217 if (lock_get_type_low(lock) == LOCK_REC) {
4218
4219 /* Check if the transcation locked a record
4220 in a system table in X mode. It should have set
4221 the dict_op code correctly if it did. */
4222 if (lock->index->table->id < DICT_HDR_FIRST_ID
4223 && lock_get_mode(lock) == LOCK_X) {
4224
4225 ut_ad(lock_get_mode(lock) != LOCK_IX);
4226 ut_ad(lock->trx->dict_operation != TRX_DICT_OP_NONE);
4227 }
4228 } else {
4229 ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
4230
4231 const dict_table_t* table;
4232
4233 table = lock->un_member.tab_lock.table;
4234
4235 /* Check if the transcation locked a system table
4236 in IX mode. It should have set the dict_op code
4237 correctly if it did. */
4238 if (table->id < DICT_HDR_FIRST_ID
4239 && (lock_get_mode(lock) == LOCK_X
4240 || lock_get_mode(lock) == LOCK_IX)) {
4241
4242 ut_ad(lock->trx->dict_operation != TRX_DICT_OP_NONE);
4243 }
4244 }
4245 }
4246 #endif /* UNIV_DEBUG */
4247
4248 /** Release the explicit locks of a committing transaction,
4249 and release possible other transactions waiting because of these locks. */
lock_release(trx_t * trx)4250 void lock_release(trx_t* trx)
4251 {
4252 #ifdef UNIV_DEBUG
4253 std::set<table_id_t> to_evict;
4254 if (innodb_evict_tables_on_commit_debug && !trx->is_recovered)
4255 # if 1 /* if dict_stats_exec_sql() were not playing dirty tricks */
4256 if (!mutex_own(&dict_sys->mutex))
4257 # else /* this would be more proper way to do it */
4258 if (!trx->dict_operation_lock_mode && !trx->dict_operation)
4259 # endif
4260 for (trx_mod_tables_t::const_iterator it= trx->mod_tables.begin();
4261 it != trx->mod_tables.end(); ++it)
4262 if (!it->first->is_temporary())
4263 to_evict.insert(it->first->id);
4264 #endif
4265 ulint count = 0;
4266 trx_id_t max_trx_id = trx_sys.get_max_trx_id();
4267
4268 lock_mutex_enter();
4269 ut_ad(!trx_mutex_own(trx));
4270
4271 for (lock_t* lock = UT_LIST_GET_LAST(trx->lock.trx_locks);
4272 lock != NULL;
4273 lock = UT_LIST_GET_LAST(trx->lock.trx_locks)) {
4274
4275 ut_d(lock_check_dict_lock(lock));
4276
4277 if (lock_get_type_low(lock) == LOCK_REC) {
4278
4279 lock_rec_dequeue_from_page(lock);
4280 } else {
4281 dict_table_t* table;
4282
4283 table = lock->un_member.tab_lock.table;
4284
4285 if (lock_get_mode(lock) != LOCK_IS
4286 && trx->undo_no != 0) {
4287
4288 /* The trx may have modified the table. We
4289 block the use of the MySQL query cache for
4290 all currently active transactions. */
4291
4292 table->query_cache_inv_trx_id = max_trx_id;
4293 }
4294
4295 lock_table_dequeue(lock);
4296 }
4297
4298 if (count == LOCK_RELEASE_INTERVAL) {
4299 /* Release the mutex for a while, so that we
4300 do not monopolize it */
4301
4302 lock_mutex_exit();
4303
4304 lock_mutex_enter();
4305
4306 count = 0;
4307 }
4308
4309 ++count;
4310 }
4311
4312 lock_mutex_exit();
4313
4314 #ifdef UNIV_DEBUG
4315 if (to_evict.empty()) {
4316 return;
4317 }
4318 mutex_enter(&dict_sys->mutex);
4319 lock_mutex_enter();
4320 for (std::set<table_id_t>::const_iterator i = to_evict.begin();
4321 i != to_evict.end(); ++i) {
4322 if (dict_table_t *table = dict_table_open_on_id(
4323 *i, TRUE, DICT_TABLE_OP_OPEN_ONLY_IF_CACHED)) {
4324 if (!table->get_ref_count()
4325 && !UT_LIST_GET_LEN(table->locks)) {
4326 dict_table_remove_from_cache_low(table, true);
4327 }
4328 }
4329 }
4330 lock_mutex_exit();
4331 mutex_exit(&dict_sys->mutex);
4332 #endif
4333 }
4334
4335 /* True if a lock mode is S or X */
4336 #define IS_LOCK_S_OR_X(lock) \
4337 (lock_get_mode(lock) == LOCK_S \
4338 || lock_get_mode(lock) == LOCK_X)
4339
4340 /*********************************************************************//**
4341 Removes table locks of the transaction on a table to be dropped. */
4342 static
4343 void
lock_trx_table_locks_remove(const lock_t * lock_to_remove)4344 lock_trx_table_locks_remove(
4345 /*========================*/
4346 const lock_t* lock_to_remove) /*!< in: lock to remove */
4347 {
4348 trx_t* trx = lock_to_remove->trx;
4349
4350 ut_ad(lock_mutex_own());
4351
4352 /* It is safe to read this because we are holding the lock mutex */
4353 if (!trx->lock.cancel) {
4354 trx_mutex_enter(trx);
4355 } else {
4356 ut_ad(trx_mutex_own(trx));
4357 }
4358
4359 for (lock_list::iterator it = trx->lock.table_locks.begin(),
4360 end = trx->lock.table_locks.end(); it != end; ++it) {
4361 const lock_t* lock = *it;
4362
4363 ut_ad(!lock || trx == lock->trx);
4364 ut_ad(!lock || lock_get_type_low(lock) & LOCK_TABLE);
4365 ut_ad(!lock || lock->un_member.tab_lock.table);
4366
4367 if (lock == lock_to_remove) {
4368 *it = NULL;
4369
4370 if (!trx->lock.cancel) {
4371 trx_mutex_exit(trx);
4372 }
4373
4374 return;
4375 }
4376 }
4377
4378 if (!trx->lock.cancel) {
4379 trx_mutex_exit(trx);
4380 }
4381
4382 /* Lock must exist in the vector. */
4383 ut_error;
4384 }
4385
4386 /*===================== VALIDATION AND DEBUGGING ====================*/
4387
4388 /** Print info of a table lock.
4389 @param[in,out] file output stream
4390 @param[in] lock table lock */
4391 static
4392 void
lock_table_print(FILE * file,const lock_t * lock)4393 lock_table_print(FILE* file, const lock_t* lock)
4394 {
4395 ut_ad(lock_mutex_own());
4396 ut_a(lock_get_type_low(lock) == LOCK_TABLE);
4397
4398 fputs("TABLE LOCK table ", file);
4399 ut_print_name(file, lock->trx,
4400 lock->un_member.tab_lock.table->name.m_name);
4401 fprintf(file, " trx id " TRX_ID_FMT, trx_get_id_for_print(lock->trx));
4402
4403 if (lock_get_mode(lock) == LOCK_S) {
4404 fputs(" lock mode S", file);
4405 } else if (lock_get_mode(lock) == LOCK_X) {
4406 ut_ad(lock->trx->id != 0);
4407 fputs(" lock mode X", file);
4408 } else if (lock_get_mode(lock) == LOCK_IS) {
4409 fputs(" lock mode IS", file);
4410 } else if (lock_get_mode(lock) == LOCK_IX) {
4411 ut_ad(lock->trx->id != 0);
4412 fputs(" lock mode IX", file);
4413 } else if (lock_get_mode(lock) == LOCK_AUTO_INC) {
4414 fputs(" lock mode AUTO-INC", file);
4415 } else {
4416 fprintf(file, " unknown lock mode %lu",
4417 (ulong) lock_get_mode(lock));
4418 }
4419
4420 if (lock_get_wait(lock)) {
4421 fputs(" waiting", file);
4422 }
4423
4424 putc('\n', file);
4425 }
4426
4427 /** Pretty-print a record lock.
4428 @param[in,out] file output stream
4429 @param[in] lock record lock
4430 @param[in,out] mtr mini-transaction for accessing the record */
lock_rec_print(FILE * file,const lock_t * lock,mtr_t & mtr)4431 static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr)
4432 {
4433 ulint space;
4434 ulint page_no;
4435
4436 ut_ad(lock_mutex_own());
4437 ut_a(lock_get_type_low(lock) == LOCK_REC);
4438
4439 space = lock->un_member.rec_lock.space;
4440 page_no = lock->un_member.rec_lock.page_no;
4441
4442 fprintf(file, "RECORD LOCKS space id %lu page no %lu n bits %lu "
4443 "index %s of table ",
4444 (ulong) space, (ulong) page_no,
4445 (ulong) lock_rec_get_n_bits(lock),
4446 lock->index->name());
4447 ut_print_name(file, lock->trx, lock->index->table->name.m_name);
4448 fprintf(file, " trx id " TRX_ID_FMT, trx_get_id_for_print(lock->trx));
4449
4450 if (lock_get_mode(lock) == LOCK_S) {
4451 fputs(" lock mode S", file);
4452 } else if (lock_get_mode(lock) == LOCK_X) {
4453 fputs(" lock_mode X", file);
4454 } else {
4455 ut_error;
4456 }
4457
4458 if (lock_rec_get_gap(lock)) {
4459 fputs(" locks gap before rec", file);
4460 }
4461
4462 if (lock_rec_get_rec_not_gap(lock)) {
4463 fputs(" locks rec but not gap", file);
4464 }
4465
4466 if (lock_rec_get_insert_intention(lock)) {
4467 fputs(" insert intention", file);
4468 }
4469
4470 if (lock_get_wait(lock)) {
4471 fputs(" waiting", file);
4472 }
4473
4474 putc('\n', file);
4475
4476 mem_heap_t* heap = NULL;
4477 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
4478 rec_offs* offsets = offsets_;
4479 rec_offs_init(offsets_);
4480
4481 mtr.start();
4482 const buf_block_t* block = buf_page_try_get(page_id_t(space, page_no),
4483 &mtr);
4484
4485 for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
4486
4487 if (!lock_rec_get_nth_bit(lock, i)) {
4488 continue;
4489 }
4490
4491 fprintf(file, "Record lock, heap no %lu", (ulong) i);
4492
4493 if (block) {
4494 ut_ad(page_is_leaf(block->frame));
4495 const rec_t* rec;
4496
4497 rec = page_find_rec_with_heap_no(
4498 buf_block_get_frame(block), i);
4499 ut_ad(!page_rec_is_metadata(rec));
4500
4501 offsets = rec_get_offsets(
4502 rec, lock->index, offsets,
4503 lock->index->n_core_fields,
4504 ULINT_UNDEFINED, &heap);
4505
4506 putc(' ', file);
4507 rec_print_new(file, rec, offsets);
4508 }
4509
4510 putc('\n', file);
4511 }
4512
4513 mtr.commit();
4514
4515 if (UNIV_LIKELY_NULL(heap)) {
4516 mem_heap_free(heap);
4517 }
4518 }
4519
4520 #ifdef UNIV_DEBUG
4521 /* Print the number of lock structs from lock_print_info_summary() only
4522 in non-production builds for performance reasons, see
4523 http://bugs.mysql.com/36942 */
4524 #define PRINT_NUM_OF_LOCK_STRUCTS
4525 #endif /* UNIV_DEBUG */
4526
4527 #ifdef PRINT_NUM_OF_LOCK_STRUCTS
4528 /*********************************************************************//**
4529 Calculates the number of record lock structs in the record lock hash table.
4530 @return number of record locks */
4531 static
4532 ulint
lock_get_n_rec_locks(void)4533 lock_get_n_rec_locks(void)
4534 /*======================*/
4535 {
4536 ulint n_locks = 0;
4537 ulint i;
4538
4539 ut_ad(lock_mutex_own());
4540
4541 for (i = 0; i < hash_get_n_cells(lock_sys.rec_hash); i++) {
4542 const lock_t* lock;
4543
4544 for (lock = static_cast<const lock_t*>(
4545 HASH_GET_FIRST(lock_sys.rec_hash, i));
4546 lock != 0;
4547 lock = static_cast<const lock_t*>(
4548 HASH_GET_NEXT(hash, lock))) {
4549
4550 n_locks++;
4551 }
4552 }
4553
4554 return(n_locks);
4555 }
4556 #endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4557
4558 /*********************************************************************//**
4559 Prints info of locks for all transactions.
4560 @return FALSE if not able to obtain lock mutex
4561 and exits without printing info */
4562 ibool
lock_print_info_summary(FILE * file,ibool nowait)4563 lock_print_info_summary(
4564 /*====================*/
4565 FILE* file, /*!< in: file where to print */
4566 ibool nowait) /*!< in: whether to wait for the lock mutex */
4567 {
4568 /* if nowait is FALSE, wait on the lock mutex,
4569 otherwise return immediately if fail to obtain the
4570 mutex. */
4571 if (!nowait) {
4572 lock_mutex_enter();
4573 } else if (lock_mutex_enter_nowait()) {
4574 fputs("FAIL TO OBTAIN LOCK MUTEX,"
4575 " SKIP LOCK INFO PRINTING\n", file);
4576 return(FALSE);
4577 }
4578
4579 if (lock_deadlock_found) {
4580 fputs("------------------------\n"
4581 "LATEST DETECTED DEADLOCK\n"
4582 "------------------------\n", file);
4583
4584 if (!srv_read_only_mode) {
4585 ut_copy_file(file, lock_latest_err_file);
4586 }
4587 }
4588
4589 fputs("------------\n"
4590 "TRANSACTIONS\n"
4591 "------------\n", file);
4592
4593 fprintf(file, "Trx id counter " TRX_ID_FMT "\n",
4594 trx_sys.get_max_trx_id());
4595
4596 fprintf(file,
4597 "Purge done for trx's n:o < " TRX_ID_FMT
4598 " undo n:o < " TRX_ID_FMT " state: %s\n"
4599 "History list length %u\n",
4600 purge_sys.tail.trx_no,
4601 purge_sys.tail.undo_no,
4602 purge_sys.enabled()
4603 ? (purge_sys.running() ? "running"
4604 : purge_sys.paused() ? "stopped" : "running but idle")
4605 : "disabled",
4606 trx_sys.history_size());
4607
4608 #ifdef PRINT_NUM_OF_LOCK_STRUCTS
4609 fprintf(file,
4610 "Total number of lock structs in row lock hash table %lu\n",
4611 (ulong) lock_get_n_rec_locks());
4612 #endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4613 return(TRUE);
4614 }
4615
4616 /** Prints transaction lock wait and MVCC state.
4617 @param[in,out] file file where to print
4618 @param[in] trx transaction
4619 @param[in] now current time */
4620 void
lock_trx_print_wait_and_mvcc_state(FILE * file,const trx_t * trx,time_t now)4621 lock_trx_print_wait_and_mvcc_state(FILE* file, const trx_t* trx, time_t now)
4622 {
4623 fprintf(file, "---");
4624
4625 trx_print_latched(file, trx, 600);
4626
4627 /* Note: read_view->get_state() check is race condition. But it
4628 should "kind of work" because read_view is freed only at shutdown.
4629 Worst thing that may happen is that it'll get transferred to
4630 another thread and print wrong values. */
4631
4632 if (trx->read_view.get_state() == READ_VIEW_STATE_OPEN) {
4633 trx->read_view.print_limits(file);
4634 }
4635
4636 if (trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
4637
4638 fprintf(file,
4639 "------- TRX HAS BEEN WAITING %lu SEC"
4640 " FOR THIS LOCK TO BE GRANTED:\n",
4641 (ulong) difftime(now, trx->lock.wait_started));
4642
4643 if (lock_get_type_low(trx->lock.wait_lock) == LOCK_REC) {
4644 mtr_t mtr;
4645 lock_rec_print(file, trx->lock.wait_lock, mtr);
4646 } else {
4647 lock_table_print(file, trx->lock.wait_lock);
4648 }
4649
4650 fprintf(file, "------------------\n");
4651 }
4652 }
4653
4654 /*********************************************************************//**
4655 Prints info of locks for a transaction. */
4656 static
4657 void
lock_trx_print_locks(FILE * file,const trx_t * trx)4658 lock_trx_print_locks(
4659 /*=================*/
4660 FILE* file, /*!< in/out: File to write */
4661 const trx_t* trx) /*!< in: current transaction */
4662 {
4663 mtr_t mtr;
4664 uint32_t i= 0;
4665 /* Iterate over the transaction's locks. */
4666 for (lock_t *lock = UT_LIST_GET_FIRST(trx->lock.trx_locks);
4667 lock != NULL;
4668 lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
4669 if (lock_get_type_low(lock) == LOCK_REC) {
4670
4671 lock_rec_print(file, lock, mtr);
4672 } else {
4673 ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
4674
4675 lock_table_print(file, lock);
4676 }
4677
4678 if (++i == 10) {
4679
4680 fprintf(file,
4681 "10 LOCKS PRINTED FOR THIS TRX:"
4682 " SUPPRESSING FURTHER PRINTS\n");
4683
4684 break;
4685 }
4686 }
4687 }
4688
4689 /** Functor to display all transactions */
4690 struct lock_print_info
4691 {
lock_print_infolock_print_info4692 lock_print_info(FILE* file, time_t now) :
4693 file(file), now(now),
4694 purge_trx(purge_sys.query ? purge_sys.query->trx : NULL)
4695 {}
4696
operator ()lock_print_info4697 void operator()(const trx_t* trx) const
4698 {
4699 ut_ad(mutex_own(&trx_sys.mutex));
4700 if (UNIV_UNLIKELY(trx == purge_trx))
4701 return;
4702 lock_trx_print_wait_and_mvcc_state(file, trx, now);
4703
4704 if (trx->will_lock && srv_print_innodb_lock_monitor)
4705 lock_trx_print_locks(file, trx);
4706 }
4707
4708 FILE* const file;
4709 const time_t now;
4710 const trx_t* const purge_trx;
4711 };
4712
4713 /*********************************************************************//**
4714 Prints info of locks for each transaction. This function assumes that the
4715 caller holds the lock mutex and more importantly it will release the lock
4716 mutex on behalf of the caller. (This should be fixed in the future). */
4717 void
lock_print_info_all_transactions(FILE * file)4718 lock_print_info_all_transactions(
4719 /*=============================*/
4720 FILE* file) /*!< in/out: file where to print */
4721 {
4722 ut_ad(lock_mutex_own());
4723
4724 fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");
4725 const time_t now = time(NULL);
4726
4727 mutex_enter(&trx_sys.mutex);
4728 ut_list_map(trx_sys.trx_list, lock_print_info(file, now));
4729 mutex_exit(&trx_sys.mutex);
4730 lock_mutex_exit();
4731
4732 ut_ad(lock_validate());
4733 }
4734
4735 #ifdef UNIV_DEBUG
4736 /*********************************************************************//**
4737 Find the the lock in the trx_t::trx_lock_t::table_locks vector.
4738 @return true if found */
4739 static
4740 bool
lock_trx_table_locks_find(trx_t * trx,const lock_t * find_lock)4741 lock_trx_table_locks_find(
4742 /*======================*/
4743 trx_t* trx, /*!< in: trx to validate */
4744 const lock_t* find_lock) /*!< in: lock to find */
4745 {
4746 bool found = false;
4747
4748 ut_ad(trx_mutex_own(trx));
4749
4750 for (lock_list::const_iterator it = trx->lock.table_locks.begin(),
4751 end = trx->lock.table_locks.end(); it != end; ++it) {
4752
4753 const lock_t* lock = *it;
4754
4755 if (lock == NULL) {
4756
4757 continue;
4758
4759 } else if (lock == find_lock) {
4760
4761 /* Can't be duplicates. */
4762 ut_a(!found);
4763 found = true;
4764 }
4765
4766 ut_a(trx == lock->trx);
4767 ut_a(lock_get_type_low(lock) & LOCK_TABLE);
4768 ut_a(lock->un_member.tab_lock.table != NULL);
4769 }
4770
4771 return(found);
4772 }
4773
4774 /*********************************************************************//**
4775 Validates the lock queue on a table.
4776 @return TRUE if ok */
4777 static
4778 ibool
lock_table_queue_validate(const dict_table_t * table)4779 lock_table_queue_validate(
4780 /*======================*/
4781 const dict_table_t* table) /*!< in: table */
4782 {
4783 const lock_t* lock;
4784
4785 ut_ad(lock_mutex_own());
4786
4787 for (lock = UT_LIST_GET_FIRST(table->locks);
4788 lock != NULL;
4789 lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
4790
4791 /* lock->trx->state cannot change from or to NOT_STARTED
4792 while we are holding the lock_sys.mutex. It may change
4793 from ACTIVE or PREPARED to PREPARED or COMMITTED. */
4794 trx_mutex_enter(lock->trx);
4795 check_trx_state(lock->trx);
4796
4797 if (lock->trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4798 } else if (!lock_get_wait(lock)) {
4799 ut_a(!lock_table_other_has_incompatible(
4800 lock->trx, 0, table,
4801 lock_get_mode(lock)));
4802 } else {
4803 ut_a(lock_table_has_to_wait_in_queue(lock));
4804 }
4805
4806 ut_a(lock_trx_table_locks_find(lock->trx, lock));
4807 trx_mutex_exit(lock->trx);
4808 }
4809
4810 return(TRUE);
4811 }
4812
4813 /*********************************************************************//**
4814 Validates the lock queue on a single record.
4815 @return TRUE if ok */
4816 static
4817 bool
lock_rec_queue_validate(bool locked_lock_trx_sys,const buf_block_t * block,const rec_t * rec,const dict_index_t * index,const rec_offs * offsets)4818 lock_rec_queue_validate(
4819 /*====================*/
4820 bool locked_lock_trx_sys,
4821 /*!< in: if the caller holds
4822 both the lock mutex and
4823 trx_sys_t->lock. */
4824 const buf_block_t* block, /*!< in: buffer block containing rec */
4825 const rec_t* rec, /*!< in: record to look at */
4826 const dict_index_t* index, /*!< in: index, or NULL if not known */
4827 const rec_offs* offsets)/*!< in: rec_get_offsets(rec, index) */
4828 {
4829 const lock_t* lock;
4830 ulint heap_no;
4831
4832 ut_a(rec);
4833 ut_a(block->frame == page_align(rec));
4834 ut_ad(rec_offs_validate(rec, index, offsets));
4835 ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4836 ut_ad(page_rec_is_leaf(rec));
4837 ut_ad(lock_mutex_own() == locked_lock_trx_sys);
4838 ut_ad(!index || dict_index_is_clust(index)
4839 || !dict_index_is_online_ddl(index));
4840
4841 heap_no = page_rec_get_heap_no(rec);
4842
4843 if (!locked_lock_trx_sys) {
4844 lock_mutex_enter();
4845 }
4846
4847 if (!page_rec_is_user_rec(rec)) {
4848
4849 for (lock = lock_rec_get_first(lock_sys.rec_hash,
4850 block, heap_no);
4851 lock != NULL;
4852 lock = lock_rec_get_next_const(heap_no, lock)) {
4853
4854 ut_ad(!index || lock->index == index);
4855
4856 trx_mutex_enter(lock->trx);
4857 ut_ad(!lock->trx->read_only
4858 || !lock->trx->is_autocommit_non_locking());
4859 ut_ad(trx_state_eq(lock->trx,
4860 TRX_STATE_COMMITTED_IN_MEMORY)
4861 || !lock_get_wait(lock)
4862 || lock_rec_has_to_wait_in_queue(lock));
4863 trx_mutex_exit(lock->trx);
4864 }
4865
4866 func_exit:
4867 if (!locked_lock_trx_sys) {
4868 lock_mutex_exit();
4869 }
4870
4871 return true;
4872 }
4873
4874 ut_ad(page_rec_is_leaf(rec));
4875 ut_ad(lock_mutex_own());
4876
4877 const trx_id_t impl_trx_id = index && index->is_primary()
4878 ? lock_clust_rec_some_has_impl(rec, index, offsets)
4879 : 0;
4880
4881 if (trx_t *impl_trx = impl_trx_id
4882 ? trx_sys.find(current_trx(), impl_trx_id, false)
4883 : 0) {
4884 /* impl_trx could have been committed before we
4885 acquire its mutex, but not thereafter. */
4886
4887 mutex_enter(&impl_trx->mutex);
4888 ut_ad(impl_trx->state != TRX_STATE_NOT_STARTED);
4889 if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4890 } else if (const lock_t* other_lock
4891 = lock_rec_other_has_expl_req(
4892 LOCK_S, block, true, heap_no,
4893 impl_trx)) {
4894 /* The impl_trx is holding an implicit lock on the
4895 given record 'rec'. So there cannot be another
4896 explicit granted lock. Also, there can be another
4897 explicit waiting lock only if the impl_trx has an
4898 explicit granted lock. */
4899
4900 #ifdef WITH_WSREP
4901 /** Galera record locking rules:
4902 * If there is no other record lock to the same record, we may grant
4903 the lock request.
4904 * If there is other record lock but this requested record lock is
4905 compatible, we may grant the lock request.
4906 * If there is other record lock and it is not compatible with
4907 requested lock, all normal transactions must wait.
4908 * BF (brute force) additional exceptions :
4909 ** If BF already holds record lock for requested record, we may
4910 grant new record lock even if there is conflicting record lock(s)
4911 waiting on a queue.
4912 ** If conflicting transaction holds requested record lock,
4913 we will cancel this record lock and select conflicting transaction
4914 for BF abort or kill victim.
4915 ** If conflicting transaction is waiting for requested record lock
4916 we will cancel this wait and select conflicting transaction
4917 for BF abort or kill victim.
4918 ** There should not be two BF transactions waiting for same record lock
4919 */
4920 if (other_lock->trx->is_wsrep() && !lock_get_wait(other_lock)) {
4921 wsrep_report_bf_lock_wait(impl_trx->mysql_thd, impl_trx->id);
4922 wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4923
4924 if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4925 block, heap_no,
4926 impl_trx)) {
4927 ib::info() << "WSREP impl BF lock conflict";
4928 }
4929 } else
4930 #endif /* WITH_WSREP */
4931 {
4932 ut_ad(lock_get_wait(other_lock));
4933 ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4934 block, heap_no, impl_trx));
4935 }
4936 }
4937
4938 mutex_exit(&impl_trx->mutex);
4939 }
4940
4941 for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
4942 lock != NULL;
4943 lock = lock_rec_get_next_const(heap_no, lock)) {
4944 ut_ad(!lock->trx->read_only
4945 || !lock->trx->is_autocommit_non_locking());
4946 ut_ad(!page_rec_is_metadata(rec));
4947
4948 if (index) {
4949 ut_a(lock->index == index);
4950 }
4951
4952 if (!lock_rec_get_gap(lock) && !lock_get_wait(lock)) {
4953
4954 lock_mode mode;
4955
4956 if (lock_get_mode(lock) == LOCK_S) {
4957 mode = LOCK_X;
4958 } else {
4959 mode = LOCK_S;
4960 }
4961
4962 const lock_t* other_lock
4963 = lock_rec_other_has_expl_req(
4964 mode, block, false, heap_no,
4965 lock->trx);
4966 #ifdef WITH_WSREP
4967 if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
4968 /* Only BF transaction may be granted
4969 lock before other conflicting lock
4970 request. */
4971 if (!wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE)
4972 && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
4973 /* If no BF, this case is a bug. */
4974 wsrep_report_bf_lock_wait(lock->trx->mysql_thd, lock->trx->id);
4975 wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4976 ut_error;
4977 }
4978 } else
4979 #endif /* WITH_WSREP */
4980 ut_ad(!other_lock);
4981 } else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
4982
4983 ut_a(lock_rec_has_to_wait_in_queue(lock));
4984 }
4985 }
4986
4987 ut_ad(innodb_lock_schedule_algorithm == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
4988 lock_queue_validate(lock));
4989
4990 goto func_exit;
4991 }
4992
4993 /*********************************************************************//**
4994 Validates the record lock queues on a page.
4995 @return TRUE if ok */
4996 static
4997 ibool
lock_rec_validate_page(const buf_block_t * block)4998 lock_rec_validate_page(
4999 /*===================*/
5000 const buf_block_t* block) /*!< in: buffer block */
5001 {
5002 const lock_t* lock;
5003 const rec_t* rec;
5004 ulint nth_lock = 0;
5005 ulint nth_bit = 0;
5006 ulint i;
5007 mem_heap_t* heap = NULL;
5008 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
5009 rec_offs* offsets = offsets_;
5010 rec_offs_init(offsets_);
5011
5012 ut_ad(!lock_mutex_own());
5013
5014 lock_mutex_enter();
5015 loop:
5016 lock = lock_rec_get_first_on_page_addr(
5017 lock_sys.rec_hash,
5018 block->page.id.space(), block->page.id.page_no());
5019
5020 if (!lock) {
5021 goto function_exit;
5022 }
5023
5024 ut_ad(!block->page.file_page_was_freed);
5025
5026 for (i = 0; i < nth_lock; i++) {
5027
5028 lock = lock_rec_get_next_on_page_const(lock);
5029
5030 if (!lock) {
5031 goto function_exit;
5032 }
5033 }
5034
5035 ut_ad(!lock->trx->read_only
5036 || !lock->trx->is_autocommit_non_locking());
5037
5038 /* Only validate the record queues when this thread is not
5039 holding a space->latch. */
5040 if (!sync_check_find(SYNC_FSP))
5041 for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
5042
5043 if (i == PAGE_HEAP_NO_SUPREMUM
5044 || lock_rec_get_nth_bit(lock, i)) {
5045
5046 rec = page_find_rec_with_heap_no(block->frame, i);
5047 ut_a(rec);
5048 ut_ad(!lock_rec_get_nth_bit(lock, i)
5049 || page_rec_is_leaf(rec));
5050 offsets = rec_get_offsets(rec, lock->index, offsets,
5051 lock->index->n_core_fields,
5052 ULINT_UNDEFINED, &heap);
5053
5054 /* If this thread is holding the file space
5055 latch (fil_space_t::latch), the following
5056 check WILL break the latching order and may
5057 cause a deadlock of threads. */
5058
5059 lock_rec_queue_validate(
5060 TRUE, block, rec, lock->index, offsets);
5061
5062 nth_bit = i + 1;
5063
5064 goto loop;
5065 }
5066 }
5067
5068 nth_bit = 0;
5069 nth_lock++;
5070
5071 goto loop;
5072
5073 function_exit:
5074 lock_mutex_exit();
5075
5076 if (heap != NULL) {
5077 mem_heap_free(heap);
5078 }
5079 return(TRUE);
5080 }
5081
5082 /*********************************************************************//**
5083 Validate record locks up to a limit.
5084 @return lock at limit or NULL if no more locks in the hash bucket */
5085 static MY_ATTRIBUTE((warn_unused_result))
5086 const lock_t*
lock_rec_validate(ulint start,ib_uint64_t * limit)5087 lock_rec_validate(
5088 /*==============*/
5089 ulint start, /*!< in: lock_sys.rec_hash
5090 bucket */
5091 ib_uint64_t* limit) /*!< in/out: upper limit of
5092 (space, page_no) */
5093 {
5094 ut_ad(lock_mutex_own());
5095
5096 for (const lock_t* lock = static_cast<const lock_t*>(
5097 HASH_GET_FIRST(lock_sys.rec_hash, start));
5098 lock != NULL;
5099 lock = static_cast<const lock_t*>(HASH_GET_NEXT(hash, lock))) {
5100
5101 ib_uint64_t current;
5102
5103 ut_ad(!lock->trx->read_only
5104 || !lock->trx->is_autocommit_non_locking());
5105 ut_ad(lock_get_type(lock) == LOCK_REC);
5106
5107 current = ut_ull_create(
5108 lock->un_member.rec_lock.space,
5109 lock->un_member.rec_lock.page_no);
5110
5111 if (current > *limit) {
5112 *limit = current + 1;
5113 return(lock);
5114 }
5115 }
5116
5117 return(0);
5118 }
5119
5120 /*********************************************************************//**
5121 Validate a record lock's block */
5122 static
5123 void
lock_rec_block_validate(ulint space_id,ulint page_no)5124 lock_rec_block_validate(
5125 /*====================*/
5126 ulint space_id,
5127 ulint page_no)
5128 {
5129 /* The lock and the block that it is referring to may be freed at
5130 this point. We pass BUF_GET_POSSIBLY_FREED to skip a debug check.
5131 If the lock exists in lock_rec_validate_page() we assert
5132 !block->page.file_page_was_freed. */
5133
5134 buf_block_t* block;
5135 mtr_t mtr;
5136
5137 /* Transactional locks should never refer to dropped
5138 tablespaces, because all DDL operations that would drop or
5139 discard or rebuild a tablespace do hold an exclusive table
5140 lock, which would conflict with any locks referring to the
5141 tablespace from other transactions. */
5142 if (fil_space_t* space = fil_space_acquire(space_id)) {
5143 dberr_t err = DB_SUCCESS;
5144 mtr_start(&mtr);
5145
5146 block = buf_page_get_gen(
5147 page_id_t(space_id, page_no),
5148 page_size_t(space->flags),
5149 RW_X_LATCH, NULL,
5150 BUF_GET_POSSIBLY_FREED,
5151 __FILE__, __LINE__, &mtr, &err);
5152
5153 if (err != DB_SUCCESS) {
5154 ib::error() << "Lock rec block validate failed for tablespace "
5155 << space->name
5156 << " space_id " << space_id
5157 << " page_no " << page_no << " err " << err;
5158 }
5159
5160 if (block) {
5161 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
5162
5163 ut_ad(lock_rec_validate_page(block));
5164 }
5165
5166 mtr_commit(&mtr);
5167
5168 space->release();
5169 }
5170 }
5171
5172
lock_validate_table_locks(rw_trx_hash_element_t * element,void *)5173 static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*)
5174 {
5175 ut_ad(lock_mutex_own());
5176 mutex_enter(&element->mutex);
5177 if (element->trx)
5178 {
5179 check_trx_state(element->trx);
5180 for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
5181 lock != NULL;
5182 lock= UT_LIST_GET_NEXT(trx_locks, lock))
5183 {
5184 if (lock_get_type_low(lock) & LOCK_TABLE)
5185 lock_table_queue_validate(lock->un_member.tab_lock.table);
5186 }
5187 }
5188 mutex_exit(&element->mutex);
5189 return 0;
5190 }
5191
5192
5193 /*********************************************************************//**
5194 Validates the lock system.
5195 @return TRUE if ok */
5196 static
5197 bool
lock_validate()5198 lock_validate()
5199 /*===========*/
5200 {
5201 typedef std::pair<ulint, ulint> page_addr_t;
5202 typedef std::set<
5203 page_addr_t,
5204 std::less<page_addr_t>,
5205 ut_allocator<page_addr_t> > page_addr_set;
5206
5207 page_addr_set pages;
5208
5209 lock_mutex_enter();
5210
5211 /* Validate table locks */
5212 trx_sys.rw_trx_hash.iterate(reinterpret_cast<my_hash_walk_action>
5213 (lock_validate_table_locks), 0);
5214
5215 /* Iterate over all the record locks and validate the locks. We
5216 don't want to hog the lock_sys_t::mutex and the trx_sys_t::mutex.
5217 Release both mutexes during the validation check. */
5218
5219 for (ulint i = 0; i < hash_get_n_cells(lock_sys.rec_hash); i++) {
5220 ib_uint64_t limit = 0;
5221
5222 while (const lock_t* lock = lock_rec_validate(i, &limit)) {
5223 if (lock_rec_find_set_bit(lock) == ULINT_UNDEFINED) {
5224 /* The lock bitmap is empty; ignore it. */
5225 continue;
5226 }
5227 const lock_rec_t& l = lock->un_member.rec_lock;
5228 pages.insert(std::make_pair(l.space, l.page_no));
5229 }
5230 }
5231
5232 lock_mutex_exit();
5233
5234 for (page_addr_set::const_iterator it = pages.begin();
5235 it != pages.end();
5236 ++it) {
5237 lock_rec_block_validate((*it).first, (*it).second);
5238 }
5239
5240 return(true);
5241 }
5242 #endif /* UNIV_DEBUG */
5243 /*============ RECORD LOCK CHECKS FOR ROW OPERATIONS ====================*/
5244
5245 /*********************************************************************//**
5246 Checks if locks of other transactions prevent an immediate insert of
5247 a record. If they do, first tests if the query thread should anyway
5248 be suspended for some reason; if not, then puts the transaction and
5249 the query thread to the lock wait state and inserts a waiting request
5250 for a gap x-lock to the lock queue.
5251 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5252 dberr_t
lock_rec_insert_check_and_lock(ulint flags,const rec_t * rec,buf_block_t * block,dict_index_t * index,que_thr_t * thr,mtr_t * mtr,bool * inherit)5253 lock_rec_insert_check_and_lock(
5254 /*===========================*/
5255 ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG bit is
5256 set, does nothing */
5257 const rec_t* rec, /*!< in: record after which to insert */
5258 buf_block_t* block, /*!< in/out: buffer block of rec */
5259 dict_index_t* index, /*!< in: index */
5260 que_thr_t* thr, /*!< in: query thread */
5261 mtr_t* mtr, /*!< in/out: mini-transaction */
5262 bool* inherit)/*!< out: set to true if the new
5263 inserted record maybe should inherit
5264 LOCK_GAP type locks from the successor
5265 record */
5266 {
5267 ut_ad(block->frame == page_align(rec));
5268 ut_ad(!dict_index_is_online_ddl(index)
5269 || index->is_primary()
5270 || (flags & BTR_CREATE_FLAG));
5271 ut_ad(mtr->is_named_space(index->table->space));
5272 ut_ad(page_rec_is_leaf(rec));
5273
5274 if (flags & BTR_NO_LOCKING_FLAG) {
5275
5276 return(DB_SUCCESS);
5277 }
5278
5279 ut_ad(!index->table->is_temporary());
5280 ut_ad(page_is_leaf(block->frame));
5281
5282 dberr_t err;
5283 lock_t* lock;
5284 bool inherit_in = *inherit;
5285 trx_t* trx = thr_get_trx(thr);
5286 const rec_t* next_rec = page_rec_get_next_const(rec);
5287 ulint heap_no = page_rec_get_heap_no(next_rec);
5288 ut_ad(!rec_is_metadata(next_rec, index));
5289
5290 lock_mutex_enter();
5291 /* Because this code is invoked for a running transaction by
5292 the thread that is serving the transaction, it is not necessary
5293 to hold trx->mutex here. */
5294
5295 /* When inserting a record into an index, the table must be at
5296 least IX-locked. When we are building an index, we would pass
5297 BTR_NO_LOCKING_FLAG and skip the locking altogether. */
5298 ut_ad(lock_table_has(trx, index->table, LOCK_IX));
5299
5300 lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
5301
5302 if (lock == NULL) {
5303 /* We optimize CPU time usage in the simplest case */
5304
5305 lock_mutex_exit();
5306
5307 if (inherit_in && !dict_index_is_clust(index)) {
5308 /* Update the page max trx id field */
5309 page_update_max_trx_id(block,
5310 buf_block_get_page_zip(block),
5311 trx->id, mtr);
5312 }
5313
5314 *inherit = false;
5315
5316 return(DB_SUCCESS);
5317 }
5318
5319 /* Spatial index does not use GAP lock protection. It uses
5320 "predicate lock" to protect the "range" */
5321 if (dict_index_is_spatial(index)) {
5322 return(DB_SUCCESS);
5323 }
5324
5325 *inherit = true;
5326
5327 /* If another transaction has an explicit lock request which locks
5328 the gap, waiting or granted, on the successor, the insert has to wait.
5329
5330 An exception is the case where the lock by the another transaction
5331 is a gap type lock which it placed to wait for its turn to insert. We
5332 do not consider that kind of a lock conflicting with our insert. This
5333 eliminates an unnecessary deadlock which resulted when 2 transactions
5334 had to wait for their insert. Both had waiting gap type lock requests
5335 on the successor, which produced an unnecessary deadlock. */
5336
5337 const ulint type_mode = LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;
5338
5339 if (
5340 #ifdef WITH_WSREP
5341 lock_t* c_lock =
5342 #endif /* WITH_WSREP */
5343 lock_rec_other_has_conflicting(type_mode, block, heap_no, trx)) {
5344 /* Note that we may get DB_SUCCESS also here! */
5345 trx_mutex_enter(trx);
5346
5347 err = lock_rec_enqueue_waiting(
5348 #ifdef WITH_WSREP
5349 c_lock,
5350 #endif /* WITH_WSREP */
5351 type_mode, block, heap_no, index, thr, NULL);
5352
5353 trx_mutex_exit(trx);
5354 } else {
5355 err = DB_SUCCESS;
5356 }
5357
5358 lock_mutex_exit();
5359
5360 switch (err) {
5361 case DB_SUCCESS_LOCKED_REC:
5362 err = DB_SUCCESS;
5363 /* fall through */
5364 case DB_SUCCESS:
5365 if (!inherit_in || dict_index_is_clust(index)) {
5366 break;
5367 }
5368
5369 /* Update the page max trx id field */
5370 page_update_max_trx_id(
5371 block, buf_block_get_page_zip(block), trx->id, mtr);
5372 default:
5373 /* We only care about the two return values. */
5374 break;
5375 }
5376
5377 #ifdef UNIV_DEBUG
5378 {
5379 mem_heap_t* heap = NULL;
5380 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
5381 const rec_offs* offsets;
5382 rec_offs_init(offsets_);
5383
5384 offsets = rec_get_offsets(next_rec, index, offsets_,
5385 index->n_core_fields,
5386 ULINT_UNDEFINED, &heap);
5387
5388 ut_ad(lock_rec_queue_validate(
5389 FALSE, block, next_rec, index, offsets));
5390
5391 if (heap != NULL) {
5392 mem_heap_free(heap);
5393 }
5394 }
5395 #endif /* UNIV_DEBUG */
5396
5397 return(err);
5398 }
5399
5400 /*********************************************************************//**
5401 Creates an explicit record lock for a running transaction that currently only
5402 has an implicit lock on the record. The transaction instance must have a
5403 reference count > 0 so that it can't be committed and freed before this
5404 function has completed. */
5405 static
5406 void
lock_rec_convert_impl_to_expl_for_trx(const buf_block_t * block,const rec_t * rec,dict_index_t * index,trx_t * trx,ulint heap_no)5407 lock_rec_convert_impl_to_expl_for_trx(
5408 /*==================================*/
5409 const buf_block_t* block, /*!< in: buffer block of rec */
5410 const rec_t* rec, /*!< in: user record on page */
5411 dict_index_t* index, /*!< in: index of record */
5412 trx_t* trx, /*!< in/out: active transaction */
5413 ulint heap_no)/*!< in: rec heap number to lock */
5414 {
5415 ut_ad(trx->is_referenced());
5416 ut_ad(page_rec_is_leaf(rec));
5417 ut_ad(!rec_is_metadata(rec, index));
5418
5419 DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx");
5420 lock_mutex_enter();
5421 trx_mutex_enter(trx);
5422 ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
5423
5424 if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)
5425 && !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
5426 block, heap_no, trx)) {
5427 lock_rec_add_to_queue(LOCK_REC | LOCK_X | LOCK_REC_NOT_GAP,
5428 block, heap_no, index, trx, true);
5429 }
5430
5431 lock_mutex_exit();
5432 trx_mutex_exit(trx);
5433 trx->release_reference();
5434
5435 DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx");
5436 }
5437
5438
5439 #ifdef UNIV_DEBUG
5440 struct lock_rec_other_trx_holds_expl_arg
5441 {
5442 const ulint heap_no;
5443 const buf_block_t * const block;
5444 const trx_t *impl_trx;
5445 };
5446
5447
lock_rec_other_trx_holds_expl_callback(rw_trx_hash_element_t * element,lock_rec_other_trx_holds_expl_arg * arg)5448 static my_bool lock_rec_other_trx_holds_expl_callback(
5449 rw_trx_hash_element_t *element,
5450 lock_rec_other_trx_holds_expl_arg *arg)
5451 {
5452 mutex_enter(&element->mutex);
5453 if (element->trx)
5454 {
5455 trx_mutex_enter(element->trx);
5456 ut_ad(element->trx->state != TRX_STATE_NOT_STARTED);
5457 lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
5458 ? NULL : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP, arg->block,
5459 arg->heap_no, element->trx);
5460 /*
5461 An explicit lock is held by trx other than the trx holding the implicit
5462 lock.
5463 */
5464 ut_ad(!expl_lock || expl_lock->trx == arg->impl_trx);
5465 trx_mutex_exit(element->trx);
5466 }
5467 mutex_exit(&element->mutex);
5468 return 0;
5469 }
5470
5471
5472 /**
5473 Checks if some transaction, other than given trx_id, has an explicit
5474 lock on the given rec.
5475
5476 FIXME: if the current transaction holds implicit lock from INSERT, a
5477 subsequent locking read should not convert it to explicit. See also
5478 MDEV-11215.
5479
5480 @param caller_trx trx of current thread
5481 @param[in] trx trx holding implicit lock on rec
5482 @param[in] rec user record
5483 @param[in] block buffer block containing the record
5484 */
5485
lock_rec_other_trx_holds_expl(trx_t * caller_trx,trx_t * trx,const rec_t * rec,const buf_block_t * block)5486 static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
5487 const rec_t *rec,
5488 const buf_block_t *block)
5489 {
5490 if (trx)
5491 {
5492 ut_ad(!page_rec_is_metadata(rec));
5493 lock_mutex_enter();
5494 ut_ad(trx->is_referenced());
5495 trx_mutex_enter(trx);
5496 const trx_state_t state = trx->state;
5497 trx_mutex_exit(trx);
5498 ut_ad(state != TRX_STATE_NOT_STARTED);
5499 if (state == TRX_STATE_COMMITTED_IN_MEMORY)
5500 {
5501 /* The transaction was committed before our lock_mutex_enter(). */
5502 lock_mutex_exit();
5503 return;
5504 }
5505 lock_rec_other_trx_holds_expl_arg arg= { page_rec_get_heap_no(rec), block,
5506 trx };
5507 trx_sys.rw_trx_hash.iterate(caller_trx,
5508 reinterpret_cast<my_hash_walk_action>
5509 (lock_rec_other_trx_holds_expl_callback),
5510 &arg);
5511 lock_mutex_exit();
5512 }
5513 }
5514 #endif /* UNIV_DEBUG */
5515
5516
5517 /** If an implicit x-lock exists on a record, convert it to an explicit one.
5518
5519 Often, this is called by a transaction that is about to enter a lock wait
5520 due to the lock conflict. Two explicit locks would be created: first the
5521 exclusive lock on behalf of the lock-holder transaction in this function,
5522 and then a wait request on behalf of caller_trx, in the calling function.
5523
5524 This may also be called by the same transaction that is already holding
5525 an implicit exclusive lock on the record. In this case, no explicit lock
5526 should be created.
5527
5528 @param[in,out] caller_trx current transaction
5529 @param[in] block index tree leaf page
5530 @param[in] rec record on the leaf page
5531 @param[in] index the index of the record
5532 @param[in] offsets rec_get_offsets(rec,index)
5533 @return whether caller_trx already holds an exclusive lock on rec */
5534 static
5535 bool
lock_rec_convert_impl_to_expl(trx_t * caller_trx,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)5536 lock_rec_convert_impl_to_expl(
5537 trx_t* caller_trx,
5538 const buf_block_t* block,
5539 const rec_t* rec,
5540 dict_index_t* index,
5541 const rec_offs* offsets)
5542 {
5543 trx_t* trx;
5544
5545 ut_ad(!lock_mutex_own());
5546 ut_ad(page_rec_is_user_rec(rec));
5547 ut_ad(rec_offs_validate(rec, index, offsets));
5548 ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
5549 ut_ad(page_rec_is_leaf(rec));
5550 ut_ad(!rec_is_metadata(rec, index));
5551
5552 if (dict_index_is_clust(index)) {
5553 trx_id_t trx_id;
5554
5555 trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);
5556
5557 if (trx_id == 0) {
5558 return false;
5559 }
5560 if (UNIV_UNLIKELY(trx_id == caller_trx->id)) {
5561 return true;
5562 }
5563
5564 trx = trx_sys.find(caller_trx, trx_id);
5565 } else {
5566 ut_ad(!dict_index_is_online_ddl(index));
5567
5568 trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
5569 offsets);
5570 if (trx == caller_trx) {
5571 trx->release_reference();
5572 return true;
5573 }
5574
5575 ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec,
5576 block));
5577 }
5578
5579 if (trx != 0) {
5580 ulint heap_no = page_rec_get_heap_no(rec);
5581
5582 ut_ad(trx->is_referenced());
5583
5584 /* If the transaction is still active and has no
5585 explicit x-lock set on the record, set one for it.
5586 trx cannot be committed until the ref count is zero. */
5587
5588 lock_rec_convert_impl_to_expl_for_trx(
5589 block, rec, index, trx, heap_no);
5590 }
5591
5592 return false;
5593 }
5594
5595 /*********************************************************************//**
5596 Checks if locks of other transactions prevent an immediate modify (update,
5597 delete mark, or delete unmark) of a clustered index record. If they do,
5598 first tests if the query thread should anyway be suspended for some
5599 reason; if not, then puts the transaction and the query thread to the
5600 lock wait state and inserts a waiting request for a record x-lock to the
5601 lock queue.
5602 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5603 dberr_t
lock_clust_rec_modify_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,que_thr_t * thr)5604 lock_clust_rec_modify_check_and_lock(
5605 /*=================================*/
5606 ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
5607 bit is set, does nothing */
5608 const buf_block_t* block, /*!< in: buffer block of rec */
5609 const rec_t* rec, /*!< in: record which should be
5610 modified */
5611 dict_index_t* index, /*!< in: clustered index */
5612 const rec_offs* offsets,/*!< in: rec_get_offsets(rec, index) */
5613 que_thr_t* thr) /*!< in: query thread */
5614 {
5615 dberr_t err;
5616 ulint heap_no;
5617
5618 ut_ad(rec_offs_validate(rec, index, offsets));
5619 ut_ad(page_rec_is_leaf(rec));
5620 ut_ad(dict_index_is_clust(index));
5621 ut_ad(block->frame == page_align(rec));
5622
5623 if (flags & BTR_NO_LOCKING_FLAG) {
5624
5625 return(DB_SUCCESS);
5626 }
5627 ut_ad(!rec_is_metadata(rec, index));
5628 ut_ad(!index->table->is_temporary());
5629
5630 heap_no = rec_offs_comp(offsets)
5631 ? rec_get_heap_no_new(rec)
5632 : rec_get_heap_no_old(rec);
5633
5634 /* If a transaction has no explicit x-lock set on the record, set one
5635 for it */
5636
5637 if (lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec, index,
5638 offsets)) {
5639 /* We already hold an implicit exclusive lock. */
5640 return DB_SUCCESS;
5641 }
5642
5643 err = lock_rec_lock(TRUE, LOCK_X | LOCK_REC_NOT_GAP,
5644 block, heap_no, index, thr);
5645
5646 ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5647
5648 if (err == DB_SUCCESS_LOCKED_REC) {
5649 err = DB_SUCCESS;
5650 }
5651
5652 return(err);
5653 }
5654
5655 /*********************************************************************//**
5656 Checks if locks of other transactions prevent an immediate modify (delete
5657 mark or delete unmark) of a secondary index record.
5658 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5659 dberr_t
lock_sec_rec_modify_check_and_lock(ulint flags,buf_block_t * block,const rec_t * rec,dict_index_t * index,que_thr_t * thr,mtr_t * mtr)5660 lock_sec_rec_modify_check_and_lock(
5661 /*===============================*/
5662 ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
5663 bit is set, does nothing */
5664 buf_block_t* block, /*!< in/out: buffer block of rec */
5665 const rec_t* rec, /*!< in: record which should be
5666 modified; NOTE: as this is a secondary
5667 index, we always have to modify the
5668 clustered index record first: see the
5669 comment below */
5670 dict_index_t* index, /*!< in: secondary index */
5671 que_thr_t* thr, /*!< in: query thread
5672 (can be NULL if BTR_NO_LOCKING_FLAG) */
5673 mtr_t* mtr) /*!< in/out: mini-transaction */
5674 {
5675 dberr_t err;
5676 ulint heap_no;
5677
5678 ut_ad(!dict_index_is_clust(index));
5679 ut_ad(!dict_index_is_online_ddl(index) || (flags & BTR_CREATE_FLAG));
5680 ut_ad(block->frame == page_align(rec));
5681 ut_ad(mtr->is_named_space(index->table->space));
5682 ut_ad(page_rec_is_leaf(rec));
5683 ut_ad(!rec_is_metadata(rec, index));
5684
5685 if (flags & BTR_NO_LOCKING_FLAG) {
5686
5687 return(DB_SUCCESS);
5688 }
5689 ut_ad(!index->table->is_temporary());
5690
5691 heap_no = page_rec_get_heap_no(rec);
5692
5693 #ifdef WITH_WSREP
5694 trx_t *trx= thr_get_trx(thr);
5695 /* If transaction scanning an unique secondary key is wsrep
5696 high priority thread (brute force) this scanning may involve
5697 GAP-locking in the index. As this locking happens also when
5698 applying replication events in high priority applier threads,
5699 there is a probability for lock conflicts between two wsrep
5700 high priority threads. To avoid this GAP-locking we mark that
5701 this transaction is using unique key scan here. */
5702 if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
5703 trx->wsrep_UK_scan= true;
5704 #endif /* WITH_WSREP */
5705
5706 /* Another transaction cannot have an implicit lock on the record,
5707 because when we come here, we already have modified the clustered
5708 index record, and this would not have been possible if another active
5709 transaction had modified this secondary index record. */
5710
5711 err = lock_rec_lock(TRUE, LOCK_X | LOCK_REC_NOT_GAP,
5712 block, heap_no, index, thr);
5713
5714 #ifdef WITH_WSREP
5715 trx->wsrep_UK_scan= false;
5716 #endif /* WITH_WSREP */
5717
5718 #ifdef UNIV_DEBUG
5719 {
5720 mem_heap_t* heap = NULL;
5721 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
5722 const rec_offs* offsets;
5723 rec_offs_init(offsets_);
5724
5725 offsets = rec_get_offsets(rec, index, offsets_,
5726 index->n_core_fields,
5727 ULINT_UNDEFINED, &heap);
5728
5729 ut_ad(lock_rec_queue_validate(
5730 FALSE, block, rec, index, offsets));
5731
5732 if (heap != NULL) {
5733 mem_heap_free(heap);
5734 }
5735 }
5736 #endif /* UNIV_DEBUG */
5737
5738 if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
5739 /* Update the page max trx id field */
5740 /* It might not be necessary to do this if
5741 err == DB_SUCCESS (no new lock created),
5742 but it should not cost too much performance. */
5743 page_update_max_trx_id(block,
5744 buf_block_get_page_zip(block),
5745 thr_get_trx(thr)->id, mtr);
5746 err = DB_SUCCESS;
5747 }
5748
5749 return(err);
5750 }
5751
5752 /*********************************************************************//**
5753 Like lock_clust_rec_read_check_and_lock(), but reads a
5754 secondary index record.
5755 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5756 dberr_t
lock_sec_rec_read_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,lock_mode mode,ulint gap_mode,que_thr_t * thr)5757 lock_sec_rec_read_check_and_lock(
5758 /*=============================*/
5759 ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
5760 bit is set, does nothing */
5761 const buf_block_t* block, /*!< in: buffer block of rec */
5762 const rec_t* rec, /*!< in: user record or page
5763 supremum record which should
5764 be read or passed over by a
5765 read cursor */
5766 dict_index_t* index, /*!< in: secondary index */
5767 const rec_offs* offsets,/*!< in: rec_get_offsets(rec, index) */
5768 lock_mode mode, /*!< in: mode of the lock which
5769 the read cursor should set on
5770 records: LOCK_S or LOCK_X; the
5771 latter is possible in
5772 SELECT FOR UPDATE */
5773 ulint gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5774 LOCK_REC_NOT_GAP */
5775 que_thr_t* thr) /*!< in: query thread */
5776 {
5777 dberr_t err;
5778 ulint heap_no;
5779
5780 ut_ad(!dict_index_is_clust(index));
5781 ut_ad(!dict_index_is_online_ddl(index));
5782 ut_ad(block->frame == page_align(rec));
5783 ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
5784 ut_ad(rec_offs_validate(rec, index, offsets));
5785 ut_ad(page_rec_is_leaf(rec));
5786 ut_ad(mode == LOCK_X || mode == LOCK_S);
5787
5788 if ((flags & BTR_NO_LOCKING_FLAG)
5789 || srv_read_only_mode
5790 || index->table->is_temporary()) {
5791
5792 return(DB_SUCCESS);
5793 }
5794
5795 ut_ad(!rec_is_metadata(rec, index));
5796 heap_no = page_rec_get_heap_no(rec);
5797
5798 /* Some transaction may have an implicit x-lock on the record only
5799 if the max trx id for the page >= min trx id for the trx list or a
5800 database recovery is running. */
5801
5802 if (!page_rec_is_supremum(rec)
5803 && page_get_max_trx_id(block->frame) >= trx_sys.get_min_trx_id()
5804 && lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec,
5805 index, offsets)
5806 && gap_mode == LOCK_REC_NOT_GAP) {
5807 /* We already hold an implicit exclusive lock. */
5808 return DB_SUCCESS;
5809 }
5810
5811 #ifdef WITH_WSREP
5812 trx_t *trx= thr_get_trx(thr);
5813 /* If transaction scanning an unique secondary key is wsrep
5814 high priority thread (brute force) this scanning may involve
5815 GAP-locking in the index. As this locking happens also when
5816 applying replication events in high priority applier threads,
5817 there is a probability for lock conflicts between two wsrep
5818 high priority threads. To avoid this GAP-locking we mark that
5819 this transaction is using unique key scan here. */
5820 if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
5821 trx->wsrep_UK_scan= true;
5822 #endif /* WITH_WSREP */
5823
5824 err = lock_rec_lock(FALSE, ulint(mode) | gap_mode,
5825 block, heap_no, index, thr);
5826
5827 #ifdef WITH_WSREP
5828 trx->wsrep_UK_scan= false;
5829 #endif /* WITH_WSREP */
5830
5831 ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5832
5833 return(err);
5834 }
5835
5836 /*********************************************************************//**
5837 Checks if locks of other transactions prevent an immediate read, or passing
5838 over by a read cursor, of a clustered index record. If they do, first tests
5839 if the query thread should anyway be suspended for some reason; if not, then
5840 puts the transaction and the query thread to the lock wait state and inserts a
5841 waiting request for a record lock to the lock queue. Sets the requested mode
5842 lock on the record.
5843 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5844 dberr_t
lock_clust_rec_read_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,lock_mode mode,ulint gap_mode,que_thr_t * thr)5845 lock_clust_rec_read_check_and_lock(
5846 /*===============================*/
5847 ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
5848 bit is set, does nothing */
5849 const buf_block_t* block, /*!< in: buffer block of rec */
5850 const rec_t* rec, /*!< in: user record or page
5851 supremum record which should
5852 be read or passed over by a
5853 read cursor */
5854 dict_index_t* index, /*!< in: clustered index */
5855 const rec_offs* offsets,/*!< in: rec_get_offsets(rec, index) */
5856 lock_mode mode, /*!< in: mode of the lock which
5857 the read cursor should set on
5858 records: LOCK_S or LOCK_X; the
5859 latter is possible in
5860 SELECT FOR UPDATE */
5861 ulint gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5862 LOCK_REC_NOT_GAP */
5863 que_thr_t* thr) /*!< in: query thread */
5864 {
5865 dberr_t err;
5866 ulint heap_no;
5867
5868 ut_ad(dict_index_is_clust(index));
5869 ut_ad(block->frame == page_align(rec));
5870 ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
5871 ut_ad(gap_mode == LOCK_ORDINARY || gap_mode == LOCK_GAP
5872 || gap_mode == LOCK_REC_NOT_GAP);
5873 ut_ad(rec_offs_validate(rec, index, offsets));
5874 ut_ad(page_rec_is_leaf(rec));
5875 ut_ad(!rec_is_metadata(rec, index));
5876
5877 if ((flags & BTR_NO_LOCKING_FLAG)
5878 || srv_read_only_mode
5879 || index->table->is_temporary()) {
5880
5881 return(DB_SUCCESS);
5882 }
5883
5884 heap_no = page_rec_get_heap_no(rec);
5885
5886 if (heap_no != PAGE_HEAP_NO_SUPREMUM
5887 && lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec,
5888 index, offsets)
5889 && gap_mode == LOCK_REC_NOT_GAP) {
5890 /* We already hold an implicit exclusive lock. */
5891 return DB_SUCCESS;
5892 }
5893
5894 err = lock_rec_lock(FALSE, ulint(mode) | gap_mode,
5895 block, heap_no, index, thr);
5896
5897 ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5898
5899 DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock");
5900
5901 return(err);
5902 }
5903 /*********************************************************************//**
5904 Checks if locks of other transactions prevent an immediate read, or passing
5905 over by a read cursor, of a clustered index record. If they do, first tests
5906 if the query thread should anyway be suspended for some reason; if not, then
5907 puts the transaction and the query thread to the lock wait state and inserts a
5908 waiting request for a record lock to the lock queue. Sets the requested mode
5909 lock on the record. This is an alternative version of
5910 lock_clust_rec_read_check_and_lock() that does not require the parameter
5911 "offsets".
5912 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5913 dberr_t
lock_clust_rec_read_check_and_lock_alt(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,lock_mode mode,ulint gap_mode,que_thr_t * thr)5914 lock_clust_rec_read_check_and_lock_alt(
5915 /*===================================*/
5916 ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
5917 bit is set, does nothing */
5918 const buf_block_t* block, /*!< in: buffer block of rec */
5919 const rec_t* rec, /*!< in: user record or page
5920 supremum record which should
5921 be read or passed over by a
5922 read cursor */
5923 dict_index_t* index, /*!< in: clustered index */
5924 lock_mode mode, /*!< in: mode of the lock which
5925 the read cursor should set on
5926 records: LOCK_S or LOCK_X; the
5927 latter is possible in
5928 SELECT FOR UPDATE */
5929 ulint gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5930 LOCK_REC_NOT_GAP */
5931 que_thr_t* thr) /*!< in: query thread */
5932 {
5933 mem_heap_t* tmp_heap = NULL;
5934 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
5935 rec_offs* offsets = offsets_;
5936 dberr_t err;
5937 rec_offs_init(offsets_);
5938
5939 ut_ad(page_rec_is_leaf(rec));
5940 offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
5941 ULINT_UNDEFINED, &tmp_heap);
5942 err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
5943 offsets, mode, gap_mode, thr);
5944 if (tmp_heap) {
5945 mem_heap_free(tmp_heap);
5946 }
5947
5948 if (err == DB_SUCCESS_LOCKED_REC) {
5949 err = DB_SUCCESS;
5950 }
5951
5952 return(err);
5953 }
5954
5955 /*******************************************************************//**
5956 Release the last lock from the transaction's autoinc locks. */
5957 UNIV_INLINE
5958 void
lock_release_autoinc_last_lock(ib_vector_t * autoinc_locks)5959 lock_release_autoinc_last_lock(
5960 /*===========================*/
5961 ib_vector_t* autoinc_locks) /*!< in/out: vector of AUTOINC locks */
5962 {
5963 ulint last;
5964 lock_t* lock;
5965
5966 ut_ad(lock_mutex_own());
5967 ut_a(!ib_vector_is_empty(autoinc_locks));
5968
5969 /* The lock to be release must be the last lock acquired. */
5970 last = ib_vector_size(autoinc_locks) - 1;
5971 lock = *static_cast<lock_t**>(ib_vector_get(autoinc_locks, last));
5972
5973 /* Should have only AUTOINC locks in the vector. */
5974 ut_a(lock_get_mode(lock) == LOCK_AUTO_INC);
5975 ut_a(lock_get_type(lock) == LOCK_TABLE);
5976
5977 ut_a(lock->un_member.tab_lock.table != NULL);
5978
5979 /* This will remove the lock from the trx autoinc_locks too. */
5980 lock_table_dequeue(lock);
5981
5982 /* Remove from the table vector too. */
5983 lock_trx_table_locks_remove(lock);
5984 }
5985
5986 /*******************************************************************//**
5987 Check if a transaction holds any autoinc locks.
5988 @return TRUE if the transaction holds any AUTOINC locks. */
5989 static
5990 ibool
lock_trx_holds_autoinc_locks(const trx_t * trx)5991 lock_trx_holds_autoinc_locks(
5992 /*=========================*/
5993 const trx_t* trx) /*!< in: transaction */
5994 {
5995 ut_a(trx->autoinc_locks != NULL);
5996
5997 return(!ib_vector_is_empty(trx->autoinc_locks));
5998 }
5999
6000 /*******************************************************************//**
6001 Release all the transaction's autoinc locks. */
6002 static
6003 void
lock_release_autoinc_locks(trx_t * trx)6004 lock_release_autoinc_locks(
6005 /*=======================*/
6006 trx_t* trx) /*!< in/out: transaction */
6007 {
6008 ut_ad(lock_mutex_own());
6009 /* If this is invoked for a running transaction by the thread
6010 that is serving the transaction, then it is not necessary to
6011 hold trx->mutex here. */
6012
6013 ut_a(trx->autoinc_locks != NULL);
6014
6015 /* We release the locks in the reverse order. This is to
6016 avoid searching the vector for the element to delete at
6017 the lower level. See (lock_table_remove_low()) for details. */
6018 while (!ib_vector_is_empty(trx->autoinc_locks)) {
6019
6020 /* lock_table_remove_low() will also remove the lock from
6021 the transaction's autoinc_locks vector. */
6022 lock_release_autoinc_last_lock(trx->autoinc_locks);
6023 }
6024
6025 /* Should release all locks. */
6026 ut_a(ib_vector_is_empty(trx->autoinc_locks));
6027 }
6028
6029 /*******************************************************************//**
6030 Gets the type of a lock. Non-inline version for using outside of the
6031 lock module.
6032 @return LOCK_TABLE or LOCK_REC */
6033 ulint
lock_get_type(const lock_t * lock)6034 lock_get_type(
6035 /*==========*/
6036 const lock_t* lock) /*!< in: lock */
6037 {
6038 return(lock_get_type_low(lock));
6039 }
6040
6041 /*******************************************************************//**
6042 Gets the id of the transaction owning a lock.
6043 @return transaction id */
6044 trx_id_t
lock_get_trx_id(const lock_t * lock)6045 lock_get_trx_id(
6046 /*============*/
6047 const lock_t* lock) /*!< in: lock */
6048 {
6049 return(trx_get_id_for_print(lock->trx));
6050 }
6051
6052 /*******************************************************************//**
6053 Gets the mode of a lock in a human readable string.
6054 The string should not be free()'d or modified.
6055 @return lock mode */
6056 const char*
lock_get_mode_str(const lock_t * lock)6057 lock_get_mode_str(
6058 /*==============*/
6059 const lock_t* lock) /*!< in: lock */
6060 {
6061 ibool is_gap_lock;
6062
6063 is_gap_lock = lock_get_type_low(lock) == LOCK_REC
6064 && lock_rec_get_gap(lock);
6065
6066 switch (lock_get_mode(lock)) {
6067 case LOCK_S:
6068 if (is_gap_lock) {
6069 return("S,GAP");
6070 } else {
6071 return("S");
6072 }
6073 case LOCK_X:
6074 if (is_gap_lock) {
6075 return("X,GAP");
6076 } else {
6077 return("X");
6078 }
6079 case LOCK_IS:
6080 if (is_gap_lock) {
6081 return("IS,GAP");
6082 } else {
6083 return("IS");
6084 }
6085 case LOCK_IX:
6086 if (is_gap_lock) {
6087 return("IX,GAP");
6088 } else {
6089 return("IX");
6090 }
6091 case LOCK_AUTO_INC:
6092 return("AUTO_INC");
6093 default:
6094 return("UNKNOWN");
6095 }
6096 }
6097
6098 /*******************************************************************//**
6099 Gets the type of a lock in a human readable string.
6100 The string should not be free()'d or modified.
6101 @return lock type */
6102 const char*
lock_get_type_str(const lock_t * lock)6103 lock_get_type_str(
6104 /*==============*/
6105 const lock_t* lock) /*!< in: lock */
6106 {
6107 switch (lock_get_type_low(lock)) {
6108 case LOCK_REC:
6109 return("RECORD");
6110 case LOCK_TABLE:
6111 return("TABLE");
6112 default:
6113 return("UNKNOWN");
6114 }
6115 }
6116
6117 /*******************************************************************//**
6118 Gets the table on which the lock is.
6119 @return table */
6120 UNIV_INLINE
6121 dict_table_t*
lock_get_table(const lock_t * lock)6122 lock_get_table(
6123 /*===========*/
6124 const lock_t* lock) /*!< in: lock */
6125 {
6126 switch (lock_get_type_low(lock)) {
6127 case LOCK_REC:
6128 ut_ad(dict_index_is_clust(lock->index)
6129 || !dict_index_is_online_ddl(lock->index));
6130 return(lock->index->table);
6131 case LOCK_TABLE:
6132 return(lock->un_member.tab_lock.table);
6133 default:
6134 ut_error;
6135 return(NULL);
6136 }
6137 }
6138
6139 /*******************************************************************//**
6140 Gets the id of the table on which the lock is.
6141 @return id of the table */
6142 table_id_t
lock_get_table_id(const lock_t * lock)6143 lock_get_table_id(
6144 /*==============*/
6145 const lock_t* lock) /*!< in: lock */
6146 {
6147 dict_table_t* table;
6148
6149 table = lock_get_table(lock);
6150
6151 return(table->id);
6152 }
6153
6154 /** Determine which table a lock is associated with.
6155 @param[in] lock the lock
6156 @return name of the table */
6157 const table_name_t&
lock_get_table_name(const lock_t * lock)6158 lock_get_table_name(
6159 const lock_t* lock)
6160 {
6161 return(lock_get_table(lock)->name);
6162 }
6163
6164 /*******************************************************************//**
6165 For a record lock, gets the index on which the lock is.
6166 @return index */
6167 const dict_index_t*
lock_rec_get_index(const lock_t * lock)6168 lock_rec_get_index(
6169 /*===============*/
6170 const lock_t* lock) /*!< in: lock */
6171 {
6172 ut_a(lock_get_type_low(lock) == LOCK_REC);
6173 ut_ad(dict_index_is_clust(lock->index)
6174 || !dict_index_is_online_ddl(lock->index));
6175
6176 return(lock->index);
6177 }
6178
6179 /*******************************************************************//**
6180 For a record lock, gets the name of the index on which the lock is.
6181 The string should not be free()'d or modified.
6182 @return name of the index */
6183 const char*
lock_rec_get_index_name(const lock_t * lock)6184 lock_rec_get_index_name(
6185 /*====================*/
6186 const lock_t* lock) /*!< in: lock */
6187 {
6188 ut_a(lock_get_type_low(lock) == LOCK_REC);
6189 ut_ad(dict_index_is_clust(lock->index)
6190 || !dict_index_is_online_ddl(lock->index));
6191
6192 return(lock->index->name);
6193 }
6194
6195 /*******************************************************************//**
6196 For a record lock, gets the tablespace number on which the lock is.
6197 @return tablespace number */
6198 ulint
lock_rec_get_space_id(const lock_t * lock)6199 lock_rec_get_space_id(
6200 /*==================*/
6201 const lock_t* lock) /*!< in: lock */
6202 {
6203 ut_a(lock_get_type_low(lock) == LOCK_REC);
6204
6205 return(lock->un_member.rec_lock.space);
6206 }
6207
6208 /*******************************************************************//**
6209 For a record lock, gets the page number on which the lock is.
6210 @return page number */
6211 ulint
lock_rec_get_page_no(const lock_t * lock)6212 lock_rec_get_page_no(
6213 /*=================*/
6214 const lock_t* lock) /*!< in: lock */
6215 {
6216 ut_a(lock_get_type_low(lock) == LOCK_REC);
6217
6218 return(lock->un_member.rec_lock.page_no);
6219 }
6220
6221 /*********************************************************************//**
6222 Cancels a waiting lock request and releases possible other transactions
6223 waiting behind it. */
6224 void
lock_cancel_waiting_and_release(lock_t * lock)6225 lock_cancel_waiting_and_release(
6226 /*============================*/
6227 lock_t* lock) /*!< in/out: waiting lock request */
6228 {
6229 que_thr_t* thr;
6230
6231 ut_ad(lock_mutex_own());
6232 ut_ad(trx_mutex_own(lock->trx));
6233 ut_ad(lock->trx->state == TRX_STATE_ACTIVE);
6234
6235 lock->trx->lock.cancel = true;
6236
6237 if (lock_get_type_low(lock) == LOCK_REC) {
6238
6239 lock_rec_dequeue_from_page(lock);
6240 } else {
6241 ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
6242
6243 if (lock->trx->autoinc_locks != NULL) {
6244 /* Release the transaction's AUTOINC locks. */
6245 lock_release_autoinc_locks(lock->trx);
6246 }
6247
6248 lock_table_dequeue(lock);
6249 /* Remove the lock from table lock vector too. */
6250 lock_trx_table_locks_remove(lock);
6251 }
6252
6253 /* Reset the wait flag and the back pointer to lock in trx. */
6254
6255 lock_reset_lock_and_trx_wait(lock);
6256
6257 /* The following function releases the trx from lock wait. */
6258
6259 thr = que_thr_end_lock_wait(lock->trx);
6260
6261 if (thr != NULL) {
6262 lock_wait_release_thread_if_suspended(thr);
6263 }
6264
6265 lock->trx->lock.cancel = false;
6266 }
6267
6268 /*********************************************************************//**
6269 Unlocks AUTO_INC type locks that were possibly reserved by a trx. This
6270 function should be called at the the end of an SQL statement, by the
6271 connection thread that owns the transaction (trx->mysql_thd). */
6272 void
lock_unlock_table_autoinc(trx_t * trx)6273 lock_unlock_table_autoinc(
6274 /*======================*/
6275 trx_t* trx) /*!< in/out: transaction */
6276 {
6277 ut_ad(!lock_mutex_own());
6278 ut_ad(!trx_mutex_own(trx));
6279 ut_ad(!trx->lock.wait_lock);
6280
6281 /* This can be invoked on NOT_STARTED, ACTIVE, PREPARED,
6282 but not COMMITTED transactions. */
6283
6284 ut_ad(trx_state_eq(trx, TRX_STATE_NOT_STARTED)
6285 || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
6286
6287 /* This function is invoked for a running transaction by the
6288 thread that is serving the transaction. Therefore it is not
6289 necessary to hold trx->mutex here. */
6290
6291 if (lock_trx_holds_autoinc_locks(trx)) {
6292 lock_mutex_enter();
6293
6294 lock_release_autoinc_locks(trx);
6295
6296 lock_mutex_exit();
6297 }
6298 }
6299
lock_trx_handle_wait_low(trx_t * trx)6300 static inline dberr_t lock_trx_handle_wait_low(trx_t* trx)
6301 {
6302 ut_ad(lock_mutex_own());
6303 ut_ad(trx_mutex_own(trx));
6304
6305 if (trx->lock.was_chosen_as_deadlock_victim) {
6306 return DB_DEADLOCK;
6307 }
6308 if (!trx->lock.wait_lock) {
6309 /* The lock was probably granted before we got here. */
6310 return DB_SUCCESS;
6311 }
6312
6313 lock_cancel_waiting_and_release(trx->lock.wait_lock);
6314 return DB_LOCK_WAIT;
6315 }
6316
6317 /*********************************************************************//**
6318 Check whether the transaction has already been rolled back because it
6319 was selected as a deadlock victim, or if it has to wait then cancel
6320 the wait lock.
6321 @return DB_DEADLOCK, DB_LOCK_WAIT or DB_SUCCESS */
6322 dberr_t
lock_trx_handle_wait(trx_t * trx)6323 lock_trx_handle_wait(
6324 /*=================*/
6325 trx_t* trx) /*!< in/out: trx lock state */
6326 {
6327 lock_mutex_enter();
6328 trx_mutex_enter(trx);
6329 dberr_t err = lock_trx_handle_wait_low(trx);
6330 lock_mutex_exit();
6331 trx_mutex_exit(trx);
6332 return err;
6333 }
6334
6335 /*********************************************************************//**
6336 Get the number of locks on a table.
6337 @return number of locks */
6338 ulint
lock_table_get_n_locks(const dict_table_t * table)6339 lock_table_get_n_locks(
6340 /*===================*/
6341 const dict_table_t* table) /*!< in: table */
6342 {
6343 ulint n_table_locks;
6344
6345 lock_mutex_enter();
6346
6347 n_table_locks = UT_LIST_GET_LEN(table->locks);
6348
6349 lock_mutex_exit();
6350
6351 return(n_table_locks);
6352 }
6353
6354 #ifdef UNIV_DEBUG
6355 /**
6356 Do an exhaustive check for any locks (table or rec) against the table.
6357
6358 @param[in] table check if there are any locks held on records in this table
6359 or on the table itself
6360 */
6361
lock_table_locks_lookup(rw_trx_hash_element_t * element,const dict_table_t * table)6362 static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element,
6363 const dict_table_t *table)
6364 {
6365 ut_ad(lock_mutex_own());
6366 mutex_enter(&element->mutex);
6367 if (element->trx)
6368 {
6369 trx_mutex_enter(element->trx);
6370 check_trx_state(element->trx);
6371 if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY)
6372 {
6373 for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
6374 lock != NULL;
6375 lock= UT_LIST_GET_NEXT(trx_locks, lock))
6376 {
6377 ut_ad(lock->trx == element->trx);
6378 if (lock_get_type_low(lock) == LOCK_REC)
6379 {
6380 ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION ||
6381 lock->index->is_primary());
6382 ut_ad(lock->index->table != table);
6383 }
6384 else
6385 ut_ad(lock->un_member.tab_lock.table != table);
6386 }
6387 }
6388 trx_mutex_exit(element->trx);
6389 }
6390 mutex_exit(&element->mutex);
6391 return 0;
6392 }
6393 #endif /* UNIV_DEBUG */
6394
6395 /*******************************************************************//**
6396 Check if there are any locks (table or rec) against table.
6397 @return true if table has either table or record locks. */
6398 bool
lock_table_has_locks(const dict_table_t * table)6399 lock_table_has_locks(
6400 /*=================*/
6401 const dict_table_t* table) /*!< in: check if there are any locks
6402 held on records in this table or on the
6403 table itself */
6404 {
6405 ibool has_locks;
6406
6407 ut_ad(table != NULL);
6408 lock_mutex_enter();
6409
6410 has_locks = UT_LIST_GET_LEN(table->locks) > 0 || table->n_rec_locks > 0;
6411
6412 #ifdef UNIV_DEBUG
6413 if (!has_locks) {
6414 trx_sys.rw_trx_hash.iterate(
6415 reinterpret_cast<my_hash_walk_action>
6416 (lock_table_locks_lookup),
6417 const_cast<dict_table_t*>(table));
6418 }
6419 #endif /* UNIV_DEBUG */
6420
6421 lock_mutex_exit();
6422
6423 return(has_locks);
6424 }
6425
6426 /*******************************************************************//**
6427 Initialise the table lock list. */
6428 void
lock_table_lock_list_init(table_lock_list_t * lock_list)6429 lock_table_lock_list_init(
6430 /*======================*/
6431 table_lock_list_t* lock_list) /*!< List to initialise */
6432 {
6433 UT_LIST_INIT(*lock_list, &lock_table_t::locks);
6434 }
6435
6436 /*******************************************************************//**
6437 Initialise the trx lock list. */
6438 void
lock_trx_lock_list_init(trx_lock_list_t * lock_list)6439 lock_trx_lock_list_init(
6440 /*====================*/
6441 trx_lock_list_t* lock_list) /*!< List to initialise */
6442 {
6443 UT_LIST_INIT(*lock_list, &lock_t::trx_locks);
6444 }
6445
6446 /*******************************************************************//**
6447 Set the lock system timeout event. */
6448 void
lock_set_timeout_event()6449 lock_set_timeout_event()
6450 /*====================*/
6451 {
6452 os_event_set(lock_sys.timeout_event);
6453 }
6454
6455 #ifdef UNIV_DEBUG
6456 /*******************************************************************//**
6457 Check if the transaction holds any locks on the sys tables
6458 or its records.
6459 @return the strongest lock found on any sys table or 0 for none */
6460 const lock_t*
lock_trx_has_sys_table_locks(const trx_t * trx)6461 lock_trx_has_sys_table_locks(
6462 /*=========================*/
6463 const trx_t* trx) /*!< in: transaction to check */
6464 {
6465 const lock_t* strongest_lock = 0;
6466 lock_mode strongest = LOCK_NONE;
6467
6468 lock_mutex_enter();
6469
6470 const lock_list::const_iterator end = trx->lock.table_locks.end();
6471 lock_list::const_iterator it = trx->lock.table_locks.begin();
6472
6473 /* Find a valid mode. Note: ib_vector_size() can be 0. */
6474
6475 for (/* No op */; it != end; ++it) {
6476 const lock_t* lock = *it;
6477
6478 if (lock != NULL
6479 && dict_is_sys_table(lock->un_member.tab_lock.table->id)) {
6480
6481 strongest = lock_get_mode(lock);
6482 ut_ad(strongest != LOCK_NONE);
6483 strongest_lock = lock;
6484 break;
6485 }
6486 }
6487
6488 if (strongest == LOCK_NONE) {
6489 lock_mutex_exit();
6490 return(NULL);
6491 }
6492
6493 for (/* No op */; it != end; ++it) {
6494 const lock_t* lock = *it;
6495
6496 if (lock == NULL) {
6497 continue;
6498 }
6499
6500 ut_ad(trx == lock->trx);
6501 ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
6502 ut_ad(lock->un_member.tab_lock.table != NULL);
6503
6504 lock_mode mode = lock_get_mode(lock);
6505
6506 if (dict_is_sys_table(lock->un_member.tab_lock.table->id)
6507 && lock_mode_stronger_or_eq(mode, strongest)) {
6508
6509 strongest = mode;
6510 strongest_lock = lock;
6511 }
6512 }
6513
6514 lock_mutex_exit();
6515
6516 return(strongest_lock);
6517 }
6518
6519 /** Check if the transaction holds an explicit exclusive lock on a record.
6520 @param[in] trx transaction
6521 @param[in] table table
6522 @param[in] block leaf page
6523 @param[in] heap_no heap number identifying the record
6524 @return whether an explicit X-lock is held */
6525 bool
lock_trx_has_expl_x_lock(const trx_t * trx,const dict_table_t * table,const buf_block_t * block,ulint heap_no)6526 lock_trx_has_expl_x_lock(
6527 const trx_t* trx, /*!< in: transaction to check */
6528 const dict_table_t* table, /*!< in: table to check */
6529 const buf_block_t* block, /*!< in: buffer block of the record */
6530 ulint heap_no)/*!< in: record heap number */
6531 {
6532 ut_ad(heap_no > PAGE_HEAP_NO_SUPREMUM);
6533
6534 lock_mutex_enter();
6535 ut_ad(lock_table_has(trx, table, LOCK_IX));
6536 ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no,
6537 trx));
6538 lock_mutex_exit();
6539 return(true);
6540 }
6541 #endif /* UNIV_DEBUG */
6542
6543 /** rewind(3) the file used for storing the latest detected deadlock and
6544 print a heading message to stderr if printing of all deadlocks to stderr
6545 is enabled. */
6546 void
start_print()6547 DeadlockChecker::start_print()
6548 {
6549 ut_ad(lock_mutex_own());
6550
6551 rewind(lock_latest_err_file);
6552 ut_print_timestamp(lock_latest_err_file);
6553
6554 if (srv_print_all_deadlocks) {
6555 ib::info() << "Transactions deadlock detected, dumping"
6556 " detailed information.";
6557 }
6558 }
6559
6560 /** Print a message to the deadlock file and possibly to stderr.
6561 @param msg message to print */
6562 void
print(const char * msg)6563 DeadlockChecker::print(const char* msg)
6564 {
6565 fputs(msg, lock_latest_err_file);
6566
6567 if (srv_print_all_deadlocks) {
6568 ib::info() << msg;
6569 }
6570 }
6571
6572 /** Print transaction data to the deadlock file and possibly to stderr.
6573 @param trx transaction
6574 @param max_query_len max query length to print */
6575 void
print(const trx_t * trx,ulint max_query_len)6576 DeadlockChecker::print(const trx_t* trx, ulint max_query_len)
6577 {
6578 ut_ad(lock_mutex_own());
6579
6580 ulint n_rec_locks = lock_number_of_rows_locked(&trx->lock);
6581 ulint n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
6582 ulint heap_size = mem_heap_get_size(trx->lock.lock_heap);
6583
6584 trx_print_low(lock_latest_err_file, trx, max_query_len,
6585 n_rec_locks, n_trx_locks, heap_size);
6586
6587 if (srv_print_all_deadlocks) {
6588 trx_print_low(stderr, trx, max_query_len,
6589 n_rec_locks, n_trx_locks, heap_size);
6590 }
6591 }
6592
6593 /** Print lock data to the deadlock file and possibly to stderr.
6594 @param lock record or table type lock */
6595 void
print(const lock_t * lock)6596 DeadlockChecker::print(const lock_t* lock)
6597 {
6598 ut_ad(lock_mutex_own());
6599
6600 if (lock_get_type_low(lock) == LOCK_REC) {
6601 mtr_t mtr;
6602 lock_rec_print(lock_latest_err_file, lock, mtr);
6603
6604 if (srv_print_all_deadlocks) {
6605 lock_rec_print(stderr, lock, mtr);
6606 }
6607 } else {
6608 lock_table_print(lock_latest_err_file, lock);
6609
6610 if (srv_print_all_deadlocks) {
6611 lock_table_print(stderr, lock);
6612 }
6613 }
6614 }
6615
6616 /** Get the next lock in the queue that is owned by a transaction whose
6617 sub-tree has not already been searched.
6618 Note: "next" here means PREV for table locks.
6619
6620 @param lock Lock in queue
6621 @param heap_no heap_no if lock is a record lock else ULINT_UNDEFINED
6622
6623 @return next lock or NULL if at end of queue */
6624 const lock_t*
get_next_lock(const lock_t * lock,ulint heap_no) const6625 DeadlockChecker::get_next_lock(const lock_t* lock, ulint heap_no) const
6626 {
6627 ut_ad(lock_mutex_own());
6628
6629 do {
6630 if (lock_get_type_low(lock) == LOCK_REC) {
6631 ut_ad(heap_no != ULINT_UNDEFINED);
6632 lock = lock_rec_get_next_const(heap_no, lock);
6633 } else {
6634 ut_ad(heap_no == ULINT_UNDEFINED);
6635 ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
6636
6637 lock = UT_LIST_GET_NEXT(
6638 un_member.tab_lock.locks, lock);
6639 }
6640
6641 } while (lock != NULL && is_visited(lock));
6642
6643 ut_ad(lock == NULL
6644 || lock_get_type_low(lock) == lock_get_type_low(m_wait_lock));
6645
6646 return(lock);
6647 }
6648
6649 /** Get the first lock to search. The search starts from the current
6650 wait_lock. What we are really interested in is an edge from the
6651 current wait_lock's owning transaction to another transaction that has
6652 a lock ahead in the queue. We skip locks where the owning transaction's
6653 sub-tree has already been searched.
6654
6655 Note: The record locks are traversed from the oldest lock to the
6656 latest. For table locks we go from latest to oldest.
6657
6658 For record locks, we first position the "iterator" on the first lock on
6659 the page and then reposition on the actual heap_no. This is required
6660 due to the way the record lock has is implemented.
6661
6662 @param[out] heap_no if rec lock, else ULINT_UNDEFINED.
6663 @return first lock or NULL */
6664 const lock_t*
get_first_lock(ulint * heap_no) const6665 DeadlockChecker::get_first_lock(ulint* heap_no) const
6666 {
6667 ut_ad(lock_mutex_own());
6668
6669 const lock_t* lock = m_wait_lock;
6670
6671 if (lock_get_type_low(lock) == LOCK_REC) {
6672 hash_table_t* lock_hash;
6673
6674 lock_hash = lock->type_mode & LOCK_PREDICATE
6675 ? lock_sys.prdt_hash
6676 : lock_sys.rec_hash;
6677
6678 /* We are only interested in records that match the heap_no. */
6679 *heap_no = lock_rec_find_set_bit(lock);
6680
6681 ut_ad(*heap_no <= 0xffff);
6682 ut_ad(*heap_no != ULINT_UNDEFINED);
6683
6684 /* Find the locks on the page. */
6685 lock = lock_rec_get_first_on_page_addr(
6686 lock_hash,
6687 lock->un_member.rec_lock.space,
6688 lock->un_member.rec_lock.page_no);
6689
6690 /* Position on the first lock on the physical record.*/
6691 if (!lock_rec_get_nth_bit(lock, *heap_no)) {
6692 lock = lock_rec_get_next_const(*heap_no, lock);
6693 }
6694
6695 ut_a(!lock_get_wait(lock));
6696 } else {
6697 /* Table locks don't care about the heap_no. */
6698 *heap_no = ULINT_UNDEFINED;
6699 ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
6700 dict_table_t* table = lock->un_member.tab_lock.table;
6701 lock = UT_LIST_GET_FIRST(table->locks);
6702 }
6703
6704 /* Must find at least two locks, otherwise there cannot be a
6705 waiting lock, secondly the first lock cannot be the wait_lock. */
6706 ut_a(lock != NULL);
6707 ut_a(lock != m_wait_lock ||
6708 (innodb_lock_schedule_algorithm
6709 == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
6710 && !thd_is_replication_slave_thread(lock->trx->mysql_thd)));
6711
6712 /* Check that the lock type doesn't change. */
6713 ut_ad(lock_get_type_low(lock) == lock_get_type_low(m_wait_lock));
6714
6715 return(lock);
6716 }
6717
6718 /** Notify that a deadlock has been detected and print the conflicting
6719 transaction info.
6720 @param lock lock causing deadlock */
6721 void
notify(const lock_t * lock) const6722 DeadlockChecker::notify(const lock_t* lock) const
6723 {
6724 ut_ad(lock_mutex_own());
6725
6726 start_print();
6727
6728 print("\n*** (1) TRANSACTION:\n");
6729
6730 print(m_wait_lock->trx, 3000);
6731
6732 print("*** (1) WAITING FOR THIS LOCK TO BE GRANTED:\n");
6733
6734 print(m_wait_lock);
6735
6736 print("*** (2) TRANSACTION:\n");
6737
6738 print(lock->trx, 3000);
6739
6740 print("*** (2) HOLDS THE LOCK(S):\n");
6741
6742 print(lock);
6743
6744 /* It is possible that the joining transaction was granted its
6745 lock when we rolled back some other waiting transaction. */
6746
6747 if (m_start->lock.wait_lock != 0) {
6748 print("*** (2) WAITING FOR THIS LOCK TO BE GRANTED:\n");
6749
6750 print(m_start->lock.wait_lock);
6751 }
6752
6753 DBUG_PRINT("ib_lock", ("deadlock detected"));
6754 }
6755
6756 /** Select the victim transaction that should be rolledback.
6757 @return victim transaction */
6758 const trx_t*
select_victim() const6759 DeadlockChecker::select_victim() const
6760 {
6761 ut_ad(lock_mutex_own());
6762 ut_ad(m_start->lock.wait_lock != 0);
6763 ut_ad(m_wait_lock->trx != m_start);
6764
6765 if (trx_weight_ge(m_wait_lock->trx, m_start)) {
6766 /* The joining transaction is 'smaller',
6767 choose it as the victim and roll it back. */
6768 #ifdef WITH_WSREP
6769 if (wsrep_thd_is_BF(m_start->mysql_thd, FALSE)) {
6770 return(m_wait_lock->trx);
6771 }
6772 #endif /* WITH_WSREP */
6773 return(m_start);
6774 }
6775
6776 #ifdef WITH_WSREP
6777 if (wsrep_thd_is_BF(m_wait_lock->trx->mysql_thd, FALSE)) {
6778 return(m_start);
6779 }
6780 #endif /* WITH_WSREP */
6781
6782 return(m_wait_lock->trx);
6783 }
6784
6785 /** Looks iteratively for a deadlock. Note: the joining transaction may
6786 have been granted its lock by the deadlock checks.
6787 @return 0 if no deadlock else the victim transaction instance.*/
6788 const trx_t*
search()6789 DeadlockChecker::search()
6790 {
6791 ut_ad(lock_mutex_own());
6792 ut_ad(!trx_mutex_own(m_start));
6793
6794 ut_ad(m_start != NULL);
6795 ut_ad(m_wait_lock != NULL);
6796 ut_ad(!m_wait_lock->trx->auto_commit || m_wait_lock->trx->will_lock);
6797 ut_d(check_trx_state(m_wait_lock->trx));
6798 ut_ad(m_mark_start <= s_lock_mark_counter);
6799
6800 /* Look at the locks ahead of wait_lock in the lock queue. */
6801 ulint heap_no;
6802 const lock_t* lock = get_first_lock(&heap_no);
6803
6804 for (;;) {
6805 /* We should never visit the same sub-tree more than once. */
6806 ut_ad(lock == NULL || !is_visited(lock));
6807
6808 while (m_n_elems > 0 && lock == NULL) {
6809
6810 /* Restore previous search state. */
6811
6812 pop(lock, heap_no);
6813
6814 lock = get_next_lock(lock, heap_no);
6815 }
6816
6817 if (lock == NULL) {
6818 break;
6819 }
6820
6821 if (lock == m_wait_lock) {
6822
6823 /* We can mark this subtree as searched */
6824 ut_ad(lock->trx->lock.deadlock_mark <= m_mark_start);
6825
6826 lock->trx->lock.deadlock_mark = ++s_lock_mark_counter;
6827
6828 /* We are not prepared for an overflow. This 64-bit
6829 counter should never wrap around. At 10^9 increments
6830 per second, it would take 10^3 years of uptime. */
6831
6832 ut_ad(s_lock_mark_counter > 0);
6833
6834 /* Backtrack */
6835 lock = NULL;
6836 continue;
6837 }
6838
6839 if (!lock_has_to_wait(m_wait_lock, lock)) {
6840 /* No conflict, next lock */
6841 lock = get_next_lock(lock, heap_no);
6842 continue;
6843 }
6844
6845 if (lock->trx == m_start) {
6846 /* Found a cycle. */
6847 notify(lock);
6848 return select_victim();
6849 }
6850
6851 if (is_too_deep()) {
6852 /* Search too deep to continue. */
6853 m_too_deep = true;
6854 return m_start;
6855 }
6856
6857 /* We do not need to report autoinc locks to the upper
6858 layer. These locks are released before commit, so they
6859 can not cause deadlocks with binlog-fixed commit
6860 order. */
6861 if (m_report_waiters
6862 && (lock_get_type_low(lock) != LOCK_TABLE
6863 || lock_get_mode(lock) != LOCK_AUTO_INC)) {
6864 thd_rpl_deadlock_check(m_start->mysql_thd,
6865 lock->trx->mysql_thd);
6866 }
6867
6868 if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
6869 /* Another trx ahead has requested a lock in an
6870 incompatible mode, and is itself waiting for a lock. */
6871
6872 ++m_cost;
6873
6874 if (!push(lock, heap_no)) {
6875 m_too_deep = true;
6876 return m_start;
6877 }
6878
6879 m_wait_lock = lock->trx->lock.wait_lock;
6880
6881 lock = get_first_lock(&heap_no);
6882
6883 if (is_visited(lock)) {
6884 lock = get_next_lock(lock, heap_no);
6885 }
6886 } else {
6887 lock = get_next_lock(lock, heap_no);
6888 }
6889 }
6890
6891 ut_a(lock == NULL && m_n_elems == 0);
6892
6893 /* No deadlock found. */
6894 return(0);
6895 }
6896
6897 /** Print info about transaction that was rolled back.
6898 @param trx transaction rolled back
6899 @param lock lock trx wants */
6900 void
rollback_print(const trx_t * trx,const lock_t * lock)6901 DeadlockChecker::rollback_print(const trx_t* trx, const lock_t* lock)
6902 {
6903 ut_ad(lock_mutex_own());
6904
6905 /* If the lock search exceeds the max step
6906 or the max depth, the current trx will be
6907 the victim. Print its information. */
6908 start_print();
6909
6910 print("TOO DEEP OR LONG SEARCH IN THE LOCK TABLE"
6911 " WAITS-FOR GRAPH, WE WILL ROLL BACK"
6912 " FOLLOWING TRANSACTION \n\n"
6913 "*** TRANSACTION:\n");
6914
6915 print(trx, 3000);
6916
6917 print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
6918
6919 print(lock);
6920 }
6921
6922 /** Rollback transaction selected as the victim. */
6923 void
trx_rollback()6924 DeadlockChecker::trx_rollback()
6925 {
6926 ut_ad(lock_mutex_own());
6927
6928 trx_t* trx = m_wait_lock->trx;
6929
6930 print("*** WE ROLL BACK TRANSACTION (1)\n");
6931
6932 trx_mutex_enter(trx);
6933
6934 trx->lock.was_chosen_as_deadlock_victim = true;
6935
6936 lock_cancel_waiting_and_release(trx->lock.wait_lock);
6937
6938 trx_mutex_exit(trx);
6939 }
6940
6941 /** Check if a joining lock request results in a deadlock.
6942 If a deadlock is found, we will resolve the deadlock by
6943 choosing a victim transaction and rolling it back.
6944 We will attempt to resolve all deadlocks.
6945
6946 @param[in] lock the lock request
6947 @param[in,out] trx transaction requesting the lock
6948
6949 @return trx if it was chosen as victim
6950 @retval NULL if another victim was chosen,
6951 or there is no deadlock (any more) */
6952 const trx_t*
check_and_resolve(const lock_t * lock,trx_t * trx)6953 DeadlockChecker::check_and_resolve(const lock_t* lock, trx_t* trx)
6954 {
6955 ut_ad(lock_mutex_own());
6956 ut_ad(trx_mutex_own(trx));
6957 ut_ad(trx->state == TRX_STATE_ACTIVE);
6958 ut_ad(!trx->auto_commit || trx->will_lock);
6959 ut_ad(!srv_read_only_mode);
6960
6961 if (!innobase_deadlock_detect) {
6962 return(NULL);
6963 }
6964
6965 /* Release the mutex to obey the latching order.
6966 This is safe, because DeadlockChecker::check_and_resolve()
6967 is invoked when a lock wait is enqueued for the currently
6968 running transaction. Because m_trx is a running transaction
6969 (it is not currently suspended because of a lock wait),
6970 its state can only be changed by this thread, which is
6971 currently associated with the transaction. */
6972
6973 trx_mutex_exit(trx);
6974
6975 const trx_t* victim_trx;
6976 const bool report_waiters = trx->mysql_thd
6977 && thd_need_wait_reports(trx->mysql_thd);
6978
6979 /* Try and resolve as many deadlocks as possible. */
6980 do {
6981 DeadlockChecker checker(trx, lock, s_lock_mark_counter,
6982 report_waiters);
6983
6984 victim_trx = checker.search();
6985
6986 /* Search too deep, we rollback the joining transaction only
6987 if it is possible to rollback. Otherwise we rollback the
6988 transaction that is holding the lock that the joining
6989 transaction wants. */
6990 if (checker.is_too_deep()) {
6991
6992 ut_ad(trx == checker.m_start);
6993 ut_ad(trx == victim_trx);
6994
6995 rollback_print(victim_trx, lock);
6996
6997 MONITOR_INC(MONITOR_DEADLOCK);
6998
6999 break;
7000
7001 } else if (victim_trx != NULL && victim_trx != trx) {
7002
7003 ut_ad(victim_trx == checker.m_wait_lock->trx);
7004
7005 checker.trx_rollback();
7006
7007 lock_deadlock_found = true;
7008
7009 MONITOR_INC(MONITOR_DEADLOCK);
7010 }
7011
7012 } while (victim_trx != NULL && victim_trx != trx);
7013
7014 /* If the joining transaction was selected as the victim. */
7015 if (victim_trx != NULL) {
7016
7017 print("*** WE ROLL BACK TRANSACTION (2)\n");
7018
7019 lock_deadlock_found = true;
7020 }
7021
7022 trx_mutex_enter(trx);
7023
7024 return(victim_trx);
7025 }
7026
7027 /*************************************************************//**
7028 Updates the lock table when a page is split and merged to
7029 two pages. */
7030 UNIV_INTERN
7031 void
lock_update_split_and_merge(const buf_block_t * left_block,const rec_t * orig_pred,const buf_block_t * right_block)7032 lock_update_split_and_merge(
7033 const buf_block_t* left_block, /*!< in: left page to which merged */
7034 const rec_t* orig_pred, /*!< in: original predecessor of
7035 supremum on the left page before merge*/
7036 const buf_block_t* right_block) /*!< in: right page from which merged */
7037 {
7038 const rec_t* left_next_rec;
7039
7040 ut_ad(page_is_leaf(left_block->frame));
7041 ut_ad(page_is_leaf(right_block->frame));
7042 ut_ad(page_align(orig_pred) == left_block->frame);
7043
7044 lock_mutex_enter();
7045
7046 left_next_rec = page_rec_get_next_const(orig_pred);
7047 ut_ad(!page_rec_is_metadata(left_next_rec));
7048
7049 /* Inherit the locks on the supremum of the left page to the
7050 first record which was moved from the right page */
7051 lock_rec_inherit_to_gap(
7052 left_block, left_block,
7053 page_rec_get_heap_no(left_next_rec),
7054 PAGE_HEAP_NO_SUPREMUM);
7055
7056 /* Reset the locks on the supremum of the left page,
7057 releasing waiting transactions */
7058 lock_rec_reset_and_release_wait(left_block,
7059 PAGE_HEAP_NO_SUPREMUM);
7060
7061 /* Inherit the locks to the supremum of the left page from the
7062 successor of the infimum on the right page */
7063 lock_rec_inherit_to_gap(left_block, right_block,
7064 PAGE_HEAP_NO_SUPREMUM,
7065 lock_get_min_heap_no(right_block));
7066
7067 lock_mutex_exit();
7068 }
7069