1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2014, 2022, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file lock/lock0lock.cc
22 The transaction lock system
23 
24 Created 5/7/1996 Heikki Tuuri
25 *******************************************************/
26 
27 #define LOCK_MODULE_IMPLEMENTATION
28 
29 #include "univ.i"
30 
31 #include <mysql/service_thd_error_context.h>
32 #include <sql_class.h>
33 
34 #include "lock0lock.h"
35 #include "lock0priv.h"
36 #include "dict0mem.h"
37 #include "trx0purge.h"
38 #include "trx0sys.h"
39 #include "ut0vec.h"
40 #include "btr0cur.h"
41 #include "row0sel.h"
42 #include "row0mysql.h"
43 #include "row0vers.h"
44 #include "pars0pars.h"
45 
46 #include <set>
47 
48 #ifdef WITH_WSREP
49 #include <mysql/service_wsrep.h>
50 #endif /* WITH_WSREP */
51 
52 /** Lock scheduling algorithm */
53 ulong innodb_lock_schedule_algorithm;
54 
55 /** The value of innodb_deadlock_detect */
56 my_bool	innobase_deadlock_detect;
57 
58 /*********************************************************************//**
59 Checks if a waiting record lock request still has to wait in a queue.
60 @return lock that is causing the wait */
61 static
62 const lock_t*
63 lock_rec_has_to_wait_in_queue(
64 /*==========================*/
65 	const lock_t*	wait_lock);	/*!< in: waiting record lock */
66 
67 /** Grant a lock to a waiting lock request and release the waiting transaction
68 after lock_reset_lock_and_trx_wait() has been called. */
69 static void lock_grant_after_reset(lock_t* lock);
70 
71 extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
72 extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
73 extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
74 
75 /** Pretty-print a table lock.
76 @param[in,out]	file	output stream
77 @param[in]	lock	table lock */
78 static void lock_table_print(FILE* file, const lock_t* lock);
79 
80 /** Pretty-print a record lock.
81 @param[in,out]	file	output stream
82 @param[in]	lock	record lock
83 @param[in,out]	mtr	mini-transaction for accessing the record */
84 static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr);
85 
86 /** Deadlock checker. */
87 class DeadlockChecker {
88 public:
89 	/** Check if a joining lock request results in a deadlock.
90 	If a deadlock is found, we will resolve the deadlock by
91 	choosing a victim transaction and rolling it back.
92 	We will attempt to resolve all deadlocks.
93 
94 	@param[in]	lock	the lock request
95 	@param[in,out]	trx	transaction requesting the lock
96 
97 	@return trx if it was chosen as victim
98 	@retval	NULL if another victim was chosen,
99 	or there is no deadlock (any more) */
100 	static const trx_t* check_and_resolve(const lock_t* lock, trx_t* trx);
101 
102 private:
103 	/** Do a shallow copy. Default destructor OK.
104 	@param trx the start transaction (start node)
105 	@param wait_lock lock that a transaction wants
106 	@param mark_start visited node counter
107 	@param report_waiters whether to call thd_rpl_deadlock_check() */
DeadlockChecker(const trx_t * trx,const lock_t * wait_lock,ib_uint64_t mark_start,bool report_waiters)108 	DeadlockChecker(
109 		const trx_t*	trx,
110 		const lock_t*	wait_lock,
111 		ib_uint64_t	mark_start,
112 		bool report_waiters)
113 		:
114 		m_cost(),
115 		m_start(trx),
116 		m_too_deep(),
117 		m_wait_lock(wait_lock),
118 		m_mark_start(mark_start),
119 		m_n_elems(),
120 		m_report_waiters(report_waiters)
121 	{
122 	}
123 
124 	/** Check if the search is too deep. */
is_too_deep() const125 	bool is_too_deep() const
126 	{
127 		return(m_n_elems > LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK
128 		       || m_cost > LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK);
129 	}
130 
131 	/** Save current state.
132 	@param lock lock to push on the stack.
133 	@param heap_no the heap number to push on the stack.
134 	@return false if stack is full. */
push(const lock_t * lock,ulint heap_no)135 	bool push(const lock_t*	lock, ulint heap_no)
136 	{
137 		ut_ad((lock_get_type_low(lock) & LOCK_REC)
138 		      || (lock_get_type_low(lock) & LOCK_TABLE));
139 
140 		ut_ad(((lock_get_type_low(lock) & LOCK_TABLE) != 0)
141 		      == (heap_no == ULINT_UNDEFINED));
142 
143 		/* Ensure that the stack is bounded. */
144 		if (m_n_elems >= UT_ARR_SIZE(s_states)) {
145 			return(false);
146 		}
147 
148 		state_t&	state = s_states[m_n_elems++];
149 
150 		state.m_lock = lock;
151 		state.m_wait_lock = m_wait_lock;
152 		state.m_heap_no =heap_no;
153 
154 		return(true);
155 	}
156 
157 	/** Restore state.
158 	@param[out] lock current lock
159 	@param[out] heap_no current heap_no */
pop(const lock_t * & lock,ulint & heap_no)160 	void pop(const lock_t*& lock, ulint& heap_no)
161 	{
162 		ut_a(m_n_elems > 0);
163 
164 		const state_t&	state = s_states[--m_n_elems];
165 
166 		lock = state.m_lock;
167 		heap_no = state.m_heap_no;
168 		m_wait_lock = state.m_wait_lock;
169 	}
170 
171 	/** Check whether the node has been visited.
172 	@param lock lock to check
173 	@return true if the node has been visited */
is_visited(const lock_t * lock) const174 	bool is_visited(const lock_t* lock) const
175 	{
176 		return(lock->trx->lock.deadlock_mark > m_mark_start);
177 	}
178 
179 	/** Get the next lock in the queue that is owned by a transaction
180 	whose sub-tree has not already been searched.
181 	Note: "next" here means PREV for table locks.
182 	@param lock Lock in queue
183 	@param heap_no heap_no if lock is a record lock else ULINT_UNDEFINED
184 	@return next lock or NULL if at end of queue */
185 	const lock_t* get_next_lock(const lock_t* lock, ulint heap_no) const;
186 
187 	/** Get the first lock to search. The search starts from the current
188 	wait_lock. What we are really interested in is an edge from the
189 	current wait_lock's owning transaction to another transaction that has
190 	a lock ahead in the queue. We skip locks where the owning transaction's
191 	sub-tree has already been searched.
192 
193 	Note: The record locks are traversed from the oldest lock to the
194 	latest. For table locks we go from latest to oldest.
195 
196 	For record locks, we first position the iterator on first lock on
197 	the page and then reposition on the actual heap_no. This is required
198 	due to the way the record lock has is implemented.
199 
200 	@param[out] heap_no if rec lock, else ULINT_UNDEFINED.
201 
202 	@return first lock or NULL */
203 	const lock_t* get_first_lock(ulint* heap_no) const;
204 
205 	/** Notify that a deadlock has been detected and print the conflicting
206 	transaction info.
207 	@param lock lock causing deadlock */
208 	void notify(const lock_t* lock) const;
209 
210 	/** Select the victim transaction that should be rolledback.
211 	@return victim transaction */
212 	const trx_t* select_victim() const;
213 
214 	/** Rollback transaction selected as the victim. */
215 	void trx_rollback();
216 
217 	/** Looks iteratively for a deadlock. Note: the joining transaction
218 	may have been granted its lock by the deadlock checks.
219 
220 	@return 0 if no deadlock else the victim transaction.*/
221 	const trx_t* search();
222 
223 	/** Print transaction data to the deadlock file and possibly to stderr.
224 	@param trx transaction
225 	@param max_query_len max query length to print */
226 	static void print(const trx_t* trx, ulint max_query_len);
227 
228 	/** rewind(3) the file used for storing the latest detected deadlock
229 	and print a heading message to stderr if printing of all deadlocks to
230 	stderr is enabled. */
231 	static void start_print();
232 
233 	/** Print lock data to the deadlock file and possibly to stderr.
234 	@param lock record or table type lock */
235 	static void print(const lock_t* lock);
236 
237 	/** Print a message to the deadlock file and possibly to stderr.
238 	@param msg message to print */
239 	static void print(const char* msg);
240 
241 	/** Print info about transaction that was rolled back.
242 	@param trx transaction rolled back
243 	@param lock lock trx wants */
244 	static void rollback_print(const trx_t* trx, const lock_t* lock);
245 
246 private:
247 	/** DFS state information, used during deadlock checking. */
248 	struct state_t {
249 		const lock_t*	m_lock;		/*!< Current lock */
250 		const lock_t*	m_wait_lock;	/*!< Waiting for lock */
251 		ulint		m_heap_no;	/*!< heap number if rec lock */
252 	};
253 
254 	/** Used in deadlock tracking. Protected by lock_sys.mutex. */
255 	static ib_uint64_t	s_lock_mark_counter;
256 
257 	/** Calculation steps thus far. It is the count of the nodes visited. */
258 	ulint			m_cost;
259 
260 	/** Joining transaction that is requesting a lock in an
261 	incompatible mode */
262 	const trx_t*		m_start;
263 
264 	/** TRUE if search was too deep and was aborted */
265 	bool			m_too_deep;
266 
267 	/** Lock that trx wants */
268 	const lock_t*		m_wait_lock;
269 
270 	/**  Value of lock_mark_count at the start of the deadlock check. */
271 	ib_uint64_t		m_mark_start;
272 
273 	/** Number of states pushed onto the stack */
274 	size_t			m_n_elems;
275 
276 	/** This is to avoid malloc/free calls. */
277 	static state_t		s_states[MAX_STACK_SIZE];
278 
279 	/** Set if thd_rpl_deadlock_check() should be called for waits. */
280 	const bool m_report_waiters;
281 };
282 
283 /** Counter to mark visited nodes during deadlock search. */
284 ib_uint64_t	DeadlockChecker::s_lock_mark_counter = 0;
285 
286 /** The stack used for deadlock searches. */
287 DeadlockChecker::state_t	DeadlockChecker::s_states[MAX_STACK_SIZE];
288 
289 #ifdef UNIV_DEBUG
290 /*********************************************************************//**
291 Validates the lock system.
292 @return TRUE if ok */
293 static
294 bool
295 lock_validate();
296 /*============*/
297 
298 /*********************************************************************//**
299 Validates the record lock queues on a page.
300 @return TRUE if ok */
301 static
302 ibool
303 lock_rec_validate_page(
304 /*===================*/
305 	const buf_block_t*	block)	/*!< in: buffer block */
306 	MY_ATTRIBUTE((warn_unused_result));
307 #endif /* UNIV_DEBUG */
308 
309 /* The lock system */
310 lock_sys_t lock_sys;
311 
312 /** We store info on the latest deadlock error to this buffer. InnoDB
313 Monitor will then fetch it and print */
314 static bool	lock_deadlock_found = false;
315 
316 /** Only created if !srv_read_only_mode */
317 static FILE*		lock_latest_err_file;
318 
319 /*********************************************************************//**
320 Reports that a transaction id is insensible, i.e., in the future. */
321 ATTRIBUTE_COLD
322 void
lock_report_trx_id_insanity(trx_id_t trx_id,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,trx_id_t max_trx_id)323 lock_report_trx_id_insanity(
324 /*========================*/
325 	trx_id_t	trx_id,		/*!< in: trx id */
326 	const rec_t*	rec,		/*!< in: user record */
327 	dict_index_t*	index,		/*!< in: index */
328 	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec, index) */
329 	trx_id_t	max_trx_id)	/*!< in: trx_sys.get_max_trx_id() */
330 {
331 	ut_ad(rec_offs_validate(rec, index, offsets));
332 	ut_ad(!rec_is_metadata(rec, *index));
333 
334 	ib::error()
335 		<< "Transaction id " << ib::hex(trx_id)
336 		<< " associated with record" << rec_offsets_print(rec, offsets)
337 		<< " in index " << index->name
338 		<< " of table " << index->table->name
339 		<< " is greater than the global counter " << max_trx_id
340 		<< "! The table is corrupted.";
341 }
342 
343 /*********************************************************************//**
344 Checks that a transaction id is sensible, i.e., not in the future.
345 @return true if ok */
346 bool
lock_check_trx_id_sanity(trx_id_t trx_id,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)347 lock_check_trx_id_sanity(
348 /*=====================*/
349 	trx_id_t	trx_id,		/*!< in: trx id */
350 	const rec_t*	rec,		/*!< in: user record */
351 	dict_index_t*	index,		/*!< in: index */
352 	const rec_offs*	offsets)	/*!< in: rec_get_offsets(rec, index) */
353 {
354   ut_ad(rec_offs_validate(rec, index, offsets));
355   ut_ad(!rec_is_metadata(rec, *index));
356 
357   trx_id_t max_trx_id= trx_sys.get_max_trx_id();
358   ut_ad(max_trx_id || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);
359 
360   if (UNIV_LIKELY(max_trx_id != 0) && UNIV_UNLIKELY(trx_id >= max_trx_id))
361   {
362     lock_report_trx_id_insanity(trx_id, rec, index, offsets, max_trx_id);
363     return false;
364   }
365   return true;
366 }
367 
368 /*********************************************************************//**
369 Checks that a record is seen in a consistent read.
370 @return true if sees, or false if an earlier version of the record
371 should be retrieved */
372 bool
lock_clust_rec_cons_read_sees(const rec_t * rec,dict_index_t * index,const rec_offs * offsets,ReadView * view)373 lock_clust_rec_cons_read_sees(
374 /*==========================*/
375 	const rec_t*	rec,	/*!< in: user record which should be read or
376 				passed over by a read cursor */
377 	dict_index_t*	index,	/*!< in: clustered index */
378 	const rec_offs*	offsets,/*!< in: rec_get_offsets(rec, index) */
379 	ReadView*	view)	/*!< in: consistent read view */
380 {
381 	ut_ad(dict_index_is_clust(index));
382 	ut_ad(page_rec_is_user_rec(rec));
383 	ut_ad(rec_offs_validate(rec, index, offsets));
384 	ut_ad(!rec_is_metadata(rec, *index));
385 
386 	/* Temp-tables are not shared across connections and multiple
387 	transactions from different connections cannot simultaneously
388 	operate on same temp-table and so read of temp-table is
389 	always consistent read. */
390 	if (index->table->is_temporary()) {
391 		return(true);
392 	}
393 
394 	/* NOTE that we call this function while holding the search
395 	system latch. */
396 
397 	trx_id_t	trx_id = row_get_rec_trx_id(rec, index, offsets);
398 
399 	return(view->changes_visible(trx_id, index->table->name));
400 }
401 
402 /*********************************************************************//**
403 Checks that a non-clustered index record is seen in a consistent read.
404 
405 NOTE that a non-clustered index page contains so little information on
406 its modifications that also in the case false, the present version of
407 rec may be the right, but we must check this from the clustered index
408 record.
409 
410 @return true if certainly sees, or false if an earlier version of the
411 clustered index record might be needed */
412 bool
lock_sec_rec_cons_read_sees(const rec_t * rec,const dict_index_t * index,const ReadView * view)413 lock_sec_rec_cons_read_sees(
414 /*========================*/
415 	const rec_t*		rec,	/*!< in: user record which
416 					should be read or passed over
417 					by a read cursor */
418 	const dict_index_t*	index,	/*!< in: index */
419 	const ReadView*	view)	/*!< in: consistent read view */
420 {
421 	ut_ad(page_rec_is_user_rec(rec));
422 	ut_ad(!index->is_primary());
423 	ut_ad(!rec_is_metadata(rec, *index));
424 
425 	/* NOTE that we might call this function while holding the search
426 	system latch. */
427 
428 	if (index->table->is_temporary()) {
429 
430 		/* Temp-tables are not shared across connections and multiple
431 		transactions from different connections cannot simultaneously
432 		operate on same temp-table and so read of temp-table is
433 		always consistent read. */
434 
435 		return(true);
436 	}
437 
438 	trx_id_t	max_trx_id = page_get_max_trx_id(page_align(rec));
439 
440 	ut_ad(max_trx_id > 0);
441 
442 	return(view->sees(max_trx_id));
443 }
444 
445 
446 /**
447   Creates the lock system at database start.
448 
449   @param[in] n_cells number of slots in lock hash table
450 */
create(ulint n_cells)451 void lock_sys_t::create(ulint n_cells)
452 {
453 	ut_ad(this == &lock_sys);
454 
455 	m_initialised= true;
456 
457 	waiting_threads = static_cast<srv_slot_t*>
458 		(ut_zalloc_nokey(srv_max_n_threads * sizeof *waiting_threads));
459 	last_slot = waiting_threads;
460 
461 	mutex_create(LATCH_ID_LOCK_SYS, &mutex);
462 
463 	mutex_create(LATCH_ID_LOCK_SYS_WAIT, &wait_mutex);
464 
465 
466 	rec_hash.create(n_cells);
467 	prdt_hash.create(n_cells);
468 	prdt_page_hash.create(n_cells);
469 
470 	if (!srv_read_only_mode) {
471 		lock_latest_err_file = os_file_create_tmpfile();
472 		ut_a(lock_latest_err_file);
473 	}
474 	timeout_timer_active = false;
475 }
476 
477 /** Calculates the fold value of a lock: used in migrating the hash table.
478 @param[in]	lock	record lock object
479 @return	folded value */
lock_rec_lock_fold(const lock_t * lock)480 static ulint lock_rec_lock_fold(const lock_t *lock)
481 {
482   return lock->un_member.rec_lock.page_id.fold();
483 }
484 
485 
486 /**
487   Resize the lock hash table.
488 
489   @param[in] n_cells number of slots in lock hash table
490 */
resize(ulint n_cells)491 void lock_sys_t::resize(ulint n_cells)
492 {
493 	ut_ad(this == &lock_sys);
494 
495 	mutex_enter(&mutex);
496 
497 	hash_table_t old_hash(rec_hash);
498 	rec_hash.create(n_cells);
499 	HASH_MIGRATE(&old_hash, &rec_hash, lock_t, hash,
500 		     lock_rec_lock_fold);
501 	old_hash.free();
502 
503 	old_hash = prdt_hash;
504 	prdt_hash.create(n_cells);
505 	HASH_MIGRATE(&old_hash, &prdt_hash, lock_t, hash,
506 		     lock_rec_lock_fold);
507 	old_hash.free();
508 
509 	old_hash = prdt_page_hash;
510 	prdt_page_hash.create(n_cells);
511 	HASH_MIGRATE(&old_hash, &prdt_page_hash, lock_t, hash,
512 		     lock_rec_lock_fold);
513 	old_hash.free();
514 	mutex_exit(&mutex);
515 }
516 
517 
518 /** Closes the lock system at database shutdown. */
close()519 void lock_sys_t::close()
520 {
521 	ut_ad(this == &lock_sys);
522 
523 	if (!m_initialised) return;
524 
525 	if (lock_latest_err_file != NULL) {
526 		my_fclose(lock_latest_err_file, MYF(MY_WME));
527 		lock_latest_err_file = NULL;
528 	}
529 
530 	rec_hash.free();
531 	prdt_hash.free();
532 	prdt_page_hash.free();
533 
534 	mutex_destroy(&mutex);
535 	mutex_destroy(&wait_mutex);
536 
537 	for (ulint i = srv_max_n_threads; i--; ) {
538 		if (os_event_t& event = waiting_threads[i].event) {
539 			os_event_destroy(event);
540 		}
541 	}
542 
543 	ut_free(waiting_threads);
544 	m_initialised= false;
545 }
546 
547 /*********************************************************************//**
548 Gets the size of a lock struct.
549 @return size in bytes */
550 ulint
lock_get_size(void)551 lock_get_size(void)
552 /*===============*/
553 {
554 	return((ulint) sizeof(lock_t));
555 }
556 
lock_grant_have_trx_mutex(lock_t * lock)557 static inline void lock_grant_have_trx_mutex(lock_t* lock)
558 {
559 	lock_reset_lock_and_trx_wait(lock);
560 	lock_grant_after_reset(lock);
561 }
562 
563 /*********************************************************************//**
564 Gets the gap flag of a record lock.
565 @return LOCK_GAP or 0 */
566 UNIV_INLINE
567 ulint
lock_rec_get_gap(const lock_t * lock)568 lock_rec_get_gap(
569 /*=============*/
570 	const lock_t*	lock)	/*!< in: record lock */
571 {
572 	ut_ad(lock);
573 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
574 
575 	return(lock->type_mode & LOCK_GAP);
576 }
577 
578 /*********************************************************************//**
579 Gets the LOCK_REC_NOT_GAP flag of a record lock.
580 @return LOCK_REC_NOT_GAP or 0 */
581 UNIV_INLINE
582 ulint
lock_rec_get_rec_not_gap(const lock_t * lock)583 lock_rec_get_rec_not_gap(
584 /*=====================*/
585 	const lock_t*	lock)	/*!< in: record lock */
586 {
587 	ut_ad(lock);
588 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
589 
590 	return(lock->type_mode & LOCK_REC_NOT_GAP);
591 }
592 
593 /*********************************************************************//**
594 Gets the waiting insert flag of a record lock.
595 @return LOCK_INSERT_INTENTION or 0 */
596 UNIV_INLINE
597 ulint
lock_rec_get_insert_intention(const lock_t * lock)598 lock_rec_get_insert_intention(
599 /*==========================*/
600 	const lock_t*	lock)	/*!< in: record lock */
601 {
602 	ut_ad(lock);
603 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
604 
605 	return(lock->type_mode & LOCK_INSERT_INTENTION);
606 }
607 
608 #ifdef UNIV_DEBUG
609 #ifdef WITH_WSREP
610 /** Check if both conflicting lock transaction and other transaction
611 requesting record lock are brute force (BF). If they are check is
612 this BF-BF wait correct and if not report BF wait and assert.
613 
614 @param[in]	lock_rec	other waiting record lock
615 @param[in]	trx		trx requesting conflicting record lock
616 */
wsrep_assert_no_bf_bf_wait(const lock_t * lock,const trx_t * trx)617 static void wsrep_assert_no_bf_bf_wait(const lock_t *lock, const trx_t *trx)
618 {
619 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
620 	ut_ad(lock_mutex_own());
621 	trx_t* lock_trx= lock->trx;
622 
623 	/* Note that we are holding lock_sys->mutex, thus we should
624 	not acquire THD::LOCK_thd_data mutex below to avoid mutexing
625 	order violation. */
626 
627 	if (!trx->is_wsrep() || !lock_trx->is_wsrep())
628 		return;
629 	if (UNIV_LIKELY(!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
630 	    || UNIV_LIKELY(!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)))
631 		return;
632 
633 	ut_ad(trx->state == TRX_STATE_ACTIVE);
634 
635 	trx_mutex_enter(lock_trx);
636 	const trx_state_t trx2_state= lock_trx->state;
637 	trx_mutex_exit(lock_trx);
638 
639 	/* If transaction is already committed in memory or
640 	prepared we should wait. When transaction is committed in
641 	memory we held trx mutex, but not lock_sys->mutex. Therefore,
642 	we could end here before transaction has time to do
643 	lock_release() that is protected with lock_sys->mutex. */
644 	switch (trx2_state) {
645 	case TRX_STATE_COMMITTED_IN_MEMORY:
646 	case TRX_STATE_PREPARED:
647 		return;
648 	case TRX_STATE_ACTIVE:
649 		break;
650 	default:
651 		ut_ad("invalid state" == 0);
652 	}
653 
654 	/* If BF - BF order is honored, i.e. trx already holding
655 	record lock should be ordered before this new lock request
656 	we can keep trx waiting for the lock. If conflicting
657 	transaction is already aborting or rolling back for replaying
658 	we can also let new transaction waiting. */
659 	if (wsrep_thd_order_before(lock_trx->mysql_thd, trx->mysql_thd)
660 	    || wsrep_thd_is_aborting(lock_trx->mysql_thd)) {
661 		return;
662 	}
663 
664 	mtr_t mtr;
665 
666 	ib::error() << "Conflicting lock on table: "
667 		    << lock->index->table->name
668 		    << " index: "
669 		    << lock->index->name()
670 		    << " that has lock ";
671 	lock_rec_print(stderr, lock, mtr);
672 
673 	ib::error() << "WSREP state: ";
674 
675 	wsrep_report_bf_lock_wait(trx->mysql_thd,
676 				  trx->id);
677 	wsrep_report_bf_lock_wait(lock_trx->mysql_thd,
678 				  lock_trx->id);
679 	/* BF-BF wait is a bug */
680 	ut_error;
681 }
682 #endif /* WITH_WSREP */
683 #endif /* UNIV_DEBUG */
684 
685 /*********************************************************************//**
686 Checks if a lock request for a new lock has to wait for request lock2.
687 @return TRUE if new lock has to wait for lock2 to be removed */
688 UNIV_INLINE
689 bool
lock_rec_has_to_wait(bool for_locking,const trx_t * trx,unsigned type_mode,const lock_t * lock2,bool lock_is_on_supremum)690 lock_rec_has_to_wait(
691 /*=================*/
692 	bool		for_locking,
693 				/*!< in is called locking or releasing */
694 	const trx_t*	trx,	/*!< in: trx of new lock */
695 	unsigned	type_mode,/*!< in: precise mode of the new lock
696 				to set: LOCK_S or LOCK_X, possibly
697 				ORed to LOCK_GAP or LOCK_REC_NOT_GAP,
698 				LOCK_INSERT_INTENTION */
699 	const lock_t*	lock2,	/*!< in: another record lock; NOTE that
700 				it is assumed that this has a lock bit
701 				set on the same record as in the new
702 				lock we are setting */
703 	bool		lock_is_on_supremum)
704 				/*!< in: TRUE if we are setting the
705 				lock on the 'supremum' record of an
706 				index page: we know then that the lock
707 				request is really for a 'gap' type lock */
708 {
709 	ut_ad(trx && lock2);
710 	ut_ad(lock_get_type_low(lock2) == LOCK_REC);
711 	ut_ad(lock_mutex_own());
712 
713 	if (trx == lock2->trx
714 	    || lock_mode_compatible(
715 		       static_cast<lock_mode>(LOCK_MODE_MASK & type_mode),
716 		       lock_get_mode(lock2))) {
717 		return false;
718 	}
719 
720 	/* We have somewhat complex rules when gap type record locks
721 	cause waits */
722 
723 	if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
724 	    && !(type_mode & LOCK_INSERT_INTENTION)) {
725 
726 		/* Gap type locks without LOCK_INSERT_INTENTION flag
727 		do not need to wait for anything. This is because
728 		different users can have conflicting lock types
729 		on gaps. */
730 
731 		return false;
732 	}
733 
734 	if (!(type_mode & LOCK_INSERT_INTENTION) && lock_rec_get_gap(lock2)) {
735 
736 		/* Record lock (LOCK_ORDINARY or LOCK_REC_NOT_GAP
737 		does not need to wait for a gap type lock */
738 
739 		return false;
740 	}
741 
742 	if ((type_mode & LOCK_GAP) && lock_rec_get_rec_not_gap(lock2)) {
743 
744 		/* Lock on gap does not need to wait for
745 		a LOCK_REC_NOT_GAP type lock */
746 
747 		return false;
748 	}
749 
750 	if (lock_rec_get_insert_intention(lock2)) {
751 
752 		/* No lock request needs to wait for an insert
753 		intention lock to be removed. This is ok since our
754 		rules allow conflicting locks on gaps. This eliminates
755 		a spurious deadlock caused by a next-key lock waiting
756 		for an insert intention lock; when the insert
757 		intention lock was granted, the insert deadlocked on
758 		the waiting next-key lock.
759 
760 		Also, insert intention locks do not disturb each
761 		other. */
762 
763 		return false;
764 	}
765 
766 	if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2))
767 	    && !thd_need_ordering_with(trx->mysql_thd, lock2->trx->mysql_thd)) {
768 		/* If the upper server layer has already decided on the
769 		commit order between the transaction requesting the
770 		lock and the transaction owning the lock, we do not
771 		need to wait for gap locks. Such ordeering by the upper
772 		server layer happens in parallel replication, where the
773 		commit order is fixed to match the original order on the
774 		master.
775 
776 		Such gap locks are mainly needed to get serialisability
777 		between transactions so that they will be binlogged in
778 		the correct order so that statement-based replication
779 		will give the correct results. Since the right order
780 		was already determined on the master, we do not need
781 		to enforce it again here.
782 
783 		Skipping the locks is not essential for correctness,
784 		since in case of deadlock we will just kill the later
785 		transaction and retry it. But it can save some
786 		unnecessary rollbacks and retries. */
787 
788 		return false;
789 	}
790 
791 #ifdef WITH_WSREP
792 		/* New lock request from a transaction is using unique key
793 		scan and this transaction is a wsrep high priority transaction
794 		(brute force). If conflicting transaction is also wsrep high
795 		priority transaction we should avoid lock conflict because
796 		ordering of these transactions is already decided and
797 		conflicting transaction will be later replayed. Note
798 		that thread holding conflicting lock can't be
799 		committed or rolled back while we hold
800 		lock_sys->mutex. */
801 		if (trx->is_wsrep_UK_scan()
802 		    && wsrep_thd_is_BF(lock2->trx->mysql_thd, false)) {
803 			return false;
804 		}
805 
806 		/* We very well can let bf to wait normally as other
807 		BF will be replayed in case of conflict. For debug
808 		builds we will do additional sanity checks to catch
809 		unsupported bf wait if any. */
810 		ut_d(wsrep_assert_no_bf_bf_wait(lock2, trx));
811 #endif /* WITH_WSREP */
812 
813 	return true;
814 }
815 
816 /*********************************************************************//**
817 Checks if a lock request lock1 has to wait for request lock2.
818 @return TRUE if lock1 has to wait for lock2 to be removed */
819 bool
lock_has_to_wait(const lock_t * lock1,const lock_t * lock2)820 lock_has_to_wait(
821 /*=============*/
822 	const lock_t*	lock1,	/*!< in: waiting lock */
823 	const lock_t*	lock2)	/*!< in: another lock; NOTE that it is
824 				assumed that this has a lock bit set
825 				on the same record as in lock1 if the
826 				locks are record locks */
827 {
828 	ut_ad(lock1 && lock2);
829 
830 	if (lock1->trx == lock2->trx
831 	    || lock_mode_compatible(lock_get_mode(lock1),
832 				    lock_get_mode(lock2))) {
833 		return false;
834 	}
835 
836 	if (lock_get_type_low(lock1) != LOCK_REC) {
837 		return true;
838 	}
839 
840 	ut_ad(lock_get_type_low(lock2) == LOCK_REC);
841 
842 	if (lock1->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
843 		return lock_prdt_has_to_wait(lock1->trx, lock1->type_mode,
844 					     lock_get_prdt_from_lock(lock1),
845 					     lock2);
846 	}
847 
848 	return lock_rec_has_to_wait(
849 		false, lock1->trx, lock1->type_mode, lock2,
850 		lock_rec_get_nth_bit(lock1, PAGE_HEAP_NO_SUPREMUM));
851 }
852 
853 /*============== RECORD LOCK BASIC FUNCTIONS ============================*/
854 
855 /**********************************************************************//**
856 Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
857 if none found.
858 @return bit index == heap number of the record, or ULINT_UNDEFINED if
859 none found */
860 ulint
lock_rec_find_set_bit(const lock_t * lock)861 lock_rec_find_set_bit(
862 /*==================*/
863 	const lock_t*	lock)	/*!< in: record lock with at least one bit set */
864 {
865 	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
866 
867 		if (lock_rec_get_nth_bit(lock, i)) {
868 
869 			return(i);
870 		}
871 	}
872 
873 	return(ULINT_UNDEFINED);
874 }
875 
876 /*********************************************************************//**
877 Resets the record lock bitmap to zero. NOTE: does not touch the wait_lock
878 pointer in the transaction! This function is used in lock object creation
879 and resetting. */
880 static
881 void
lock_rec_bitmap_reset(lock_t * lock)882 lock_rec_bitmap_reset(
883 /*==================*/
884 	lock_t*	lock)	/*!< in: record lock */
885 {
886 	ulint	n_bytes;
887 
888 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
889 
890 	/* Reset to zero the bitmap which resides immediately after the lock
891 	struct */
892 
893 	n_bytes = lock_rec_get_n_bits(lock) / 8;
894 
895 	ut_ad((lock_rec_get_n_bits(lock) % 8) == 0);
896 
897 	memset(reinterpret_cast<void*>(&lock[1]), 0, n_bytes);
898 }
899 
900 /*********************************************************************//**
901 Copies a record lock to heap.
902 @return copy of lock */
903 static
904 lock_t*
lock_rec_copy(const lock_t * lock,mem_heap_t * heap)905 lock_rec_copy(
906 /*==========*/
907 	const lock_t*	lock,	/*!< in: record lock */
908 	mem_heap_t*	heap)	/*!< in: memory heap */
909 {
910 	ulint	size;
911 
912 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
913 
914 	size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;
915 
916 	return(static_cast<lock_t*>(mem_heap_dup(heap, lock, size)));
917 }
918 
919 /*********************************************************************//**
920 Gets the previous record lock set on a record.
921 @return previous lock on the same record, NULL if none exists */
922 const lock_t*
lock_rec_get_prev(const lock_t * in_lock,ulint heap_no)923 lock_rec_get_prev(
924 /*==============*/
925 	const lock_t*	in_lock,/*!< in: record lock */
926 	ulint		heap_no)/*!< in: heap number of the record */
927 {
928 	lock_t*		lock;
929 	lock_t*		found_lock	= NULL;
930 
931 	ut_ad(lock_mutex_own());
932 	ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
933 
934 	for (lock = lock_sys.get_first(*lock_hash_get(in_lock->type_mode),
935 				       in_lock->un_member.rec_lock.page_id);
936 	     lock != in_lock;
937 	     lock = lock_rec_get_next_on_page(lock)) {
938 		if (lock_rec_get_nth_bit(lock, heap_no)) {
939 			found_lock = lock;
940 		}
941 	}
942 
943 	return found_lock;
944 }
945 
946 /*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/
947 
948 /*********************************************************************//**
949 Checks if a transaction has a GRANTED explicit lock on rec stronger or equal
950 to precise_mode.
951 @return lock or NULL */
952 UNIV_INLINE
953 lock_t*
lock_rec_has_expl(ulint precise_mode,const buf_block_t * block,ulint heap_no,const trx_t * trx)954 lock_rec_has_expl(
955 /*==============*/
956 	ulint			precise_mode,/*!< in: LOCK_S or LOCK_X
957 					possibly ORed to LOCK_GAP or
958 					LOCK_REC_NOT_GAP, for a
959 					supremum record we regard this
960 					always a gap type request */
961 	const buf_block_t*	block,	/*!< in: buffer block containing
962 					the record */
963 	ulint			heap_no,/*!< in: heap number of the record */
964 	const trx_t*		trx)	/*!< in: transaction */
965 {
966 	lock_t*	lock;
967 
968 	ut_ad(lock_mutex_own());
969 	ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
970 	      || (precise_mode & LOCK_MODE_MASK) == LOCK_X);
971 	ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
972 
973 	for (lock = lock_rec_get_first(&lock_sys.rec_hash, block->page.id(),
974 	      heap_no);
975 	     lock != NULL;
976 	     lock = lock_rec_get_next(heap_no, lock)) {
977 
978 		if (lock->trx == trx
979 		    && !lock_rec_get_insert_intention(lock)
980 		    && lock_mode_stronger_or_eq(
981 			    lock_get_mode(lock),
982 			    static_cast<lock_mode>(
983 				    precise_mode & LOCK_MODE_MASK))
984 		    && !lock_get_wait(lock)
985 		    && (!lock_rec_get_rec_not_gap(lock)
986 			|| (precise_mode & LOCK_REC_NOT_GAP)
987 			|| heap_no == PAGE_HEAP_NO_SUPREMUM)
988 		    && (!lock_rec_get_gap(lock)
989 			|| (precise_mode & LOCK_GAP)
990 			|| heap_no == PAGE_HEAP_NO_SUPREMUM)) {
991 
992 			return(lock);
993 		}
994 	}
995 
996 	return(NULL);
997 }
998 
999 #ifdef UNIV_DEBUG
1000 /*********************************************************************//**
1001 Checks if some other transaction has a lock request in the queue.
1002 @return lock or NULL */
1003 static
1004 lock_t*
lock_rec_other_has_expl_req(lock_mode mode,const buf_block_t * block,bool wait,ulint heap_no,const trx_t * trx)1005 lock_rec_other_has_expl_req(
1006 /*========================*/
1007 	lock_mode		mode,	/*!< in: LOCK_S or LOCK_X */
1008 	const buf_block_t*	block,	/*!< in: buffer block containing
1009 					the record */
1010 	bool			wait,	/*!< in: whether also waiting locks
1011 					are taken into account */
1012 	ulint			heap_no,/*!< in: heap number of the record */
1013 	const trx_t*		trx)	/*!< in: transaction, or NULL if
1014 					requests by all transactions
1015 					are taken into account */
1016 {
1017 
1018 	ut_ad(lock_mutex_own());
1019 	ut_ad(mode == LOCK_X || mode == LOCK_S);
1020 
1021 	/* Only GAP lock can be on SUPREMUM, and we are not looking for
1022 	GAP lock */
1023 	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
1024 		return(NULL);
1025 	}
1026 
1027 	for (lock_t* lock = lock_rec_get_first(&lock_sys.rec_hash,
1028 					       block->page.id(), heap_no);
1029 	     lock != NULL;
1030 	     lock = lock_rec_get_next(heap_no, lock)) {
1031 
1032 		if (lock->trx != trx
1033 		    && !lock_rec_get_gap(lock)
1034 		    && (wait || !lock_get_wait(lock))
1035 		    && lock_mode_stronger_or_eq(lock_get_mode(lock), mode)) {
1036 
1037 			return(lock);
1038 		}
1039 	}
1040 
1041 	return(NULL);
1042 }
1043 #endif /* UNIV_DEBUG */
1044 
1045 #ifdef WITH_WSREP
wsrep_kill_victim(const trx_t * const trx,const lock_t * lock)1046 static void wsrep_kill_victim(const trx_t * const trx, const lock_t *lock)
1047 {
1048 	ut_ad(lock_mutex_own());
1049 	ut_ad(trx->is_wsrep());
1050 	trx_t* lock_trx = lock->trx;
1051 	ut_ad(trx_mutex_own(lock_trx));
1052 	ut_ad(lock_trx != trx);
1053 
1054 	if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
1055 		return;
1056 
1057 	if (lock_trx->state == TRX_STATE_COMMITTED_IN_MEMORY
1058 	    || lock_trx->lock.was_chosen_as_deadlock_victim)
1059               return;
1060 
1061 	if (!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)
1062 	    || wsrep_thd_order_before(trx->mysql_thd, lock_trx->mysql_thd)) {
1063 		if (lock_trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
1064 			if (UNIV_UNLIKELY(wsrep_debug))
1065 				WSREP_INFO("BF victim waiting");
1066 			/* cannot release lock, until our lock
1067 			is in the queue*/
1068 		} else {
1069 			wsrep_innobase_kill_one_trx(trx->mysql_thd,
1070 						    lock_trx, true);
1071 		}
1072 	}
1073 }
1074 #endif /* WITH_WSREP */
1075 
1076 /*********************************************************************//**
1077 Checks if some other transaction has a conflicting explicit lock request
1078 in the queue, so that we have to wait.
1079 @param[in] mode LOCK_S or LOCK_X, possibly ORed to LOCK_GAP or LOC_REC_NOT_GAP,
1080 LOCK_INSERT_INTENTION
1081 @param[in] block buffer block containing the record
1082 @param[in] heap_no heap number of the record
1083 @param[in] trx our transaction
1084 @param[out] was_ignored true if conflicting locks waiting for the current
1085 transaction were ignored
1086 @return lock or NULL */
lock_rec_other_has_conflicting(unsigned mode,const buf_block_t * block,ulint heap_no,const trx_t * trx,bool * was_ignored=nullptr)1087 static lock_t *lock_rec_other_has_conflicting(unsigned mode,
1088                                               const buf_block_t *block,
1089                                               ulint heap_no, const trx_t *trx,
1090                                               bool *was_ignored= nullptr)
1091 {
1092 	lock_t*		lock;
1093 
1094 	ut_ad(lock_mutex_own());
1095 
1096 	bool	is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
1097 
1098 	for (lock = lock_rec_get_first(&lock_sys.rec_hash, block->page.id(),
1099 	      heap_no);
1100 	     lock != NULL;
1101 	     lock = lock_rec_get_next(heap_no, lock)) {
1102 
1103 		/* There can't be lock loops for one record, because
1104 		all waiting locks of the record  will always wait for the same
1105 		lock of the record in a cell array, and check for
1106 		conflicting lock will always start with the first lock for the
1107 		heap_no, and go ahead with the same order(the order of the
1108 		locks in the cell array) */
1109 		if (lock_get_wait(lock) && lock->trx->lock.wait_trx == trx) {
1110 			if (was_ignored) *was_ignored= true;
1111 			continue;
1112 		}
1113 		if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
1114 #ifdef WITH_WSREP
1115 			if (trx->is_wsrep()) {
1116 				trx_mutex_enter(lock->trx);
1117 				/* Below function will roll back either trx
1118 				or lock->trx depending on priority of the
1119 				transaction. */
1120 				wsrep_kill_victim(const_cast<trx_t*>(trx), lock);
1121 				trx_mutex_exit(lock->trx);
1122 			}
1123 #endif /* WITH_WSREP */
1124 			return(lock);
1125 		}
1126 	}
1127 
1128 	return(NULL);
1129 }
1130 
1131 /*********************************************************************//**
1132 Checks if some transaction has an implicit x-lock on a record in a secondary
1133 index.
1134 @return transaction id of the transaction which has the x-lock, or 0;
1135 NOTE that this function can return false positives but never false
1136 negatives. The caller must confirm all positive results by calling
1137 trx_is_active(). */
1138 static
1139 trx_t*
lock_sec_rec_some_has_impl(trx_t * caller_trx,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)1140 lock_sec_rec_some_has_impl(
1141 /*=======================*/
1142 	trx_t*		caller_trx,/*!<in/out: trx of current thread */
1143 	const rec_t*	rec,	/*!< in: user record */
1144 	dict_index_t*	index,	/*!< in: secondary index */
1145 	const rec_offs*	offsets)/*!< in: rec_get_offsets(rec, index) */
1146 {
1147 	trx_t*		trx;
1148 	trx_id_t	max_trx_id;
1149 	const page_t*	page = page_align(rec);
1150 
1151 	ut_ad(!lock_mutex_own());
1152 	ut_ad(!dict_index_is_clust(index));
1153 	ut_ad(page_rec_is_user_rec(rec));
1154 	ut_ad(rec_offs_validate(rec, index, offsets));
1155 	ut_ad(!rec_is_metadata(rec, *index));
1156 
1157 	max_trx_id = page_get_max_trx_id(page);
1158 
1159 	/* Some transaction may have an implicit x-lock on the record only
1160 	if the max trx id for the page >= min trx id for the trx list, or
1161 	database recovery is running. */
1162 
1163 	if (max_trx_id < trx_sys.get_min_trx_id()) {
1164 
1165 		trx = 0;
1166 
1167 	} else if (!lock_check_trx_id_sanity(max_trx_id, rec, index, offsets)) {
1168 
1169 		/* The page is corrupt: try to avoid a crash by returning 0 */
1170 		trx = 0;
1171 
1172 	/* In this case it is possible that some transaction has an implicit
1173 	x-lock. We have to look in the clustered index. */
1174 
1175 	} else {
1176 		trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
1177 	}
1178 
1179 	return(trx);
1180 }
1181 
1182 /*********************************************************************//**
1183 Return approximate number or record locks (bits set in the bitmap) for
1184 this transaction. Since delete-marked records may be removed, the
1185 record count will not be precise.
1186 The caller must be holding lock_sys.mutex. */
1187 ulint
lock_number_of_rows_locked(const trx_lock_t * trx_lock)1188 lock_number_of_rows_locked(
1189 /*=======================*/
1190 	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1191 {
1192 	ut_ad(lock_mutex_own());
1193 
1194 	return(trx_lock->n_rec_locks);
1195 }
1196 
1197 /*********************************************************************//**
1198 Return the number of table locks for a transaction.
1199 The caller must be holding lock_sys.mutex. */
1200 ulint
lock_number_of_tables_locked(const trx_lock_t * trx_lock)1201 lock_number_of_tables_locked(
1202 /*=========================*/
1203 	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1204 {
1205 	const lock_t*	lock;
1206 	ulint		n_tables = 0;
1207 
1208 	ut_ad(lock_mutex_own());
1209 
1210 	for (lock = UT_LIST_GET_FIRST(trx_lock->trx_locks);
1211 	     lock != NULL;
1212 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
1213 
1214 		if (lock_get_type_low(lock) == LOCK_TABLE) {
1215 			n_tables++;
1216 		}
1217 	}
1218 
1219 	return(n_tables);
1220 }
1221 
1222 /*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/
1223 
1224 #ifdef WITH_WSREP
1225 ATTRIBUTE_COLD
1226 static
1227 void
wsrep_print_wait_locks(lock_t * c_lock)1228 wsrep_print_wait_locks(
1229 /*===================*/
1230 	lock_t*		c_lock) /* conflicting lock to print */
1231 {
1232 	if (c_lock->trx->lock.wait_lock != c_lock) {
1233 		mtr_t mtr;
1234 		ib::info() << "WSREP: c_lock != wait lock";
1235 		ib::info() << " SQL: "
1236 			   << wsrep_thd_query(c_lock->trx->mysql_thd);
1237 
1238 		if (lock_get_type_low(c_lock) & LOCK_TABLE) {
1239 			lock_table_print(stderr, c_lock);
1240 		} else {
1241 			lock_rec_print(stderr, c_lock, mtr);
1242 		}
1243 
1244 		if (lock_get_type_low(c_lock->trx->lock.wait_lock) & LOCK_TABLE) {
1245 			lock_table_print(stderr, c_lock->trx->lock.wait_lock);
1246 		} else {
1247 			lock_rec_print(stderr, c_lock->trx->lock.wait_lock,
1248 				       mtr);
1249 		}
1250 	}
1251 }
1252 #endif /* WITH_WSREP */
1253 
1254 #ifdef UNIV_DEBUG
1255 /** Check transaction state */
check_trx_state(const trx_t * trx)1256 static void check_trx_state(const trx_t *trx)
1257 {
1258   ut_ad(!trx->auto_commit || trx->will_lock);
1259   const auto state= trx->state;
1260   ut_ad(state == TRX_STATE_ACTIVE ||
1261         state == TRX_STATE_PREPARED_RECOVERED ||
1262         state == TRX_STATE_PREPARED ||
1263         state == TRX_STATE_COMMITTED_IN_MEMORY);
1264 }
1265 #endif
1266 
1267 /** Create a new record lock and inserts it to the lock queue,
1268 without checking for deadlocks or conflicts.
1269 @param[in]	c_lock		conflicting lock
1270 @param[in]	type_mode	lock mode and wait flag; type will be replaced
1271 				with LOCK_REC
1272 @param[in]	page_id		index page number
1273 @param[in]	page		R-tree index page, or NULL
1274 @param[in]	heap_no		record heap number in the index page
1275 @param[in]	index		the index tree
1276 @param[in,out]	trx		transaction
1277 @param[in]	holds_trx_mutex	whether the caller holds trx->mutex
1278 @param[in]	insert_before_waiting if true, inserts new B-tree record lock
1279 just after the last non-waiting lock of the current transaction which is
1280 located before the first waiting for the current transaction lock, otherwise
1281 the lock is inserted at the end of the queue
1282 @return created lock */
1283 lock_t*
lock_rec_create_low(lock_t * c_lock,que_thr_t * thr,unsigned type_mode,const page_id_t page_id,const page_t * page,ulint heap_no,dict_index_t * index,trx_t * trx,bool holds_trx_mutex,bool insert_before_waiting)1284 lock_rec_create_low(
1285 	lock_t*		c_lock,
1286 #ifdef WITH_WSREP
1287 	que_thr_t*	thr,	/*!< thread owning trx */
1288 #endif
1289 	unsigned	type_mode,
1290 	const page_id_t	page_id,
1291 	const page_t*	page,
1292 	ulint		heap_no,
1293 	dict_index_t*	index,
1294 	trx_t*		trx,
1295 	bool		holds_trx_mutex,
1296 	bool		insert_before_waiting)
1297 {
1298 	lock_t*		lock;
1299 	ulint		n_bits;
1300 	ulint		n_bytes;
1301 
1302 	ut_ad(lock_mutex_own());
1303 	ut_ad(holds_trx_mutex == trx_mutex_own(trx));
1304 	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1305 
1306 #ifdef UNIV_DEBUG
1307 	/* Non-locking autocommit read-only transactions should not set
1308 	any locks. See comment in trx_set_rw_mode explaining why this
1309 	conditional check is required in debug code. */
1310 	if (holds_trx_mutex) {
1311 		check_trx_state(trx);
1312 	}
1313 #endif /* UNIV_DEBUG */
1314 
1315 	/* If rec is the supremum record, then we reset the gap and
1316 	LOCK_REC_NOT_GAP bits, as all locks on the supremum are
1317 	automatically of the gap type */
1318 
1319 	if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
1320 		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
1321 		type_mode = type_mode & ~(LOCK_GAP | LOCK_REC_NOT_GAP);
1322 	}
1323 
1324 	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
1325 		/* Make lock bitmap bigger by a safety margin */
1326 		n_bits = page_dir_get_n_heap(page) + LOCK_PAGE_BITMAP_MARGIN;
1327 		n_bytes = 1 + n_bits / 8;
1328 	} else {
1329 		ut_ad(heap_no == PRDT_HEAPNO);
1330 
1331 		/* The lock is always on PAGE_HEAP_NO_INFIMUM (0), so
1332 		we only need 1 bit (which round up to 1 byte) for
1333 		lock bit setting */
1334 		n_bytes = 1;
1335 
1336 		if (type_mode & LOCK_PREDICATE) {
1337 			ulint	tmp = UNIV_WORD_SIZE - 1;
1338 
1339 			/* We will attach predicate structure after lock.
1340 			Make sure the memory is aligned on 8 bytes,
1341 			the mem_heap_alloc will align it with
1342 			MEM_SPACE_NEEDED anyway. */
1343 			n_bytes = (n_bytes + sizeof(lock_prdt_t) + tmp) & ~tmp;
1344 			ut_ad(n_bytes == sizeof(lock_prdt_t) + UNIV_WORD_SIZE);
1345 		}
1346 	}
1347 
1348 	if (trx->lock.rec_cached >= UT_ARR_SIZE(trx->lock.rec_pool)
1349 	    || sizeof *lock + n_bytes > sizeof *trx->lock.rec_pool) {
1350 		lock = static_cast<lock_t*>(
1351 			mem_heap_alloc(trx->lock.lock_heap,
1352 				       sizeof *lock + n_bytes));
1353 	} else {
1354 		lock = &trx->lock.rec_pool[trx->lock.rec_cached++].lock;
1355 	}
1356 
1357 	lock->trx = trx;
1358 	lock->type_mode = (type_mode & unsigned(~LOCK_TYPE_MASK)) | LOCK_REC;
1359 	lock->index = index;
1360 	lock->un_member.rec_lock.page_id = page_id;
1361 
1362 	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
1363 		lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
1364 	} else {
1365 		/* Predicate lock always on INFIMUM (0) */
1366 		lock->un_member.rec_lock.n_bits = 8;
1367  	}
1368 	lock_rec_bitmap_reset(lock);
1369 	lock_rec_set_nth_bit(lock, heap_no);
1370 	index->table->n_rec_locks++;
1371 	ut_ad(index->table->get_ref_count() > 0 || !index->table->can_be_evicted);
1372 
1373 #ifdef WITH_WSREP
1374 	if (c_lock && trx->is_wsrep()
1375 	    && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
1376 		lock_t *hash	= (lock_t *)c_lock->hash;
1377 		lock_t *prev	= NULL;
1378 
1379 		while (hash && wsrep_thd_is_BF(hash->trx->mysql_thd, FALSE)
1380 		       && wsrep_thd_order_before(hash->trx->mysql_thd,
1381 						 trx->mysql_thd)) {
1382 			prev = hash;
1383 			hash = (lock_t *)hash->hash;
1384 		}
1385 		lock->hash = hash;
1386 		if (prev) {
1387 			prev->hash = lock;
1388 		} else {
1389 			c_lock->hash = lock;
1390 		}
1391 		/*
1392 		 * delayed conflict resolution '...kill_one_trx' was not called,
1393 		 * if victim was waiting for some other lock
1394 		 */
1395 		trx_mutex_enter(c_lock->trx);
1396 		if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
1397 
1398 			c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
1399 
1400 			if (UNIV_UNLIKELY(wsrep_debug)) {
1401 				wsrep_print_wait_locks(c_lock);
1402 			}
1403 
1404 			trx->lock.que_state = TRX_QUE_LOCK_WAIT;
1405 			lock_set_lock_and_trx_wait(lock, trx, c_lock);
1406 			UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
1407 
1408 			trx->lock.wait_thr = thr;
1409 			thr->state = QUE_THR_LOCK_WAIT;
1410 
1411 			/* have to release trx mutex for the duration of
1412 			   victim lock release. This will eventually call
1413 			   lock_grant, which wants to grant trx mutex again
1414 			*/
1415 			if (holds_trx_mutex) {
1416 				trx_mutex_exit(trx);
1417 			}
1418 			lock_cancel_waiting_and_release(
1419 				c_lock->trx->lock.wait_lock);
1420 
1421 			if (holds_trx_mutex) {
1422 				trx_mutex_enter(trx);
1423 			}
1424 
1425 			trx_mutex_exit(c_lock->trx);
1426 
1427 			/* have to bail out here to avoid lock_set_lock... */
1428 			return(lock);
1429 		}
1430 		trx_mutex_exit(c_lock->trx);
1431 	} else
1432 #endif /* WITH_WSREP */
1433 	if (insert_before_waiting
1434 	    && !(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE))) {
1435 		/* Try to insert the lock just after the last non-waiting
1436 		lock of the current transaction which immediately
1437 		precedes the first waiting lock request. */
1438 		hash_table_t *lock_hash = lock_hash_get(type_mode);
1439 		hash_cell_t& cell = lock_hash->array[lock_hash->calc_hash(
1440 		    page_id.fold())];
1441 
1442 		lock_t* last_non_waiting = nullptr;
1443 
1444 		for (lock_t* l = lock_rec_get_first(lock_hash, page_id,
1445 		      heap_no); l; l = lock_rec_get_next(heap_no, l)) {
1446 			if (lock_get_wait(lock)
1447 			    && l->trx->lock.wait_trx == trx) {
1448 				break;
1449 			}
1450 			if (l->trx == trx) {
1451 				last_non_waiting = l;
1452 			}
1453 		}
1454 
1455 		if (!last_non_waiting) {
1456 			goto append_last;
1457 		}
1458 
1459 		cell.insert_after(*last_non_waiting, *lock, &lock_t::hash);
1460 	}
1461 	else {
1462 append_last:
1463 		if (!(type_mode & (LOCK_WAIT | LOCK_PREDICATE | LOCK_PRDT_PAGE))
1464 		    && innodb_lock_schedule_algorithm
1465 		    == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
1466 		    && !thd_is_replication_slave_thread(trx->mysql_thd)) {
1467 			HASH_PREPEND(lock_t, hash, &lock_sys.rec_hash,
1468 				     page_id.fold(), lock);
1469 		} else {
1470 			HASH_INSERT(lock_t, hash, lock_hash_get(type_mode),
1471 				    page_id.fold(), lock);
1472 		}
1473 	}
1474 
1475 	if (!holds_trx_mutex) {
1476 		trx_mutex_enter(trx);
1477 	}
1478 	ut_ad(trx_mutex_own(trx));
1479 	if (type_mode & LOCK_WAIT) {
1480 		lock_set_lock_and_trx_wait(lock, trx, c_lock);
1481 	}
1482 	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
1483 	if (!holds_trx_mutex) {
1484 		trx_mutex_exit(trx);
1485 	}
1486 	MONITOR_INC(MONITOR_RECLOCK_CREATED);
1487 	MONITOR_INC(MONITOR_NUM_RECLOCK);
1488 
1489 	return lock;
1490 }
1491 
1492 /*********************************************************************//**
1493 Check if lock1 has higher priority than lock2.
1494 NULL has lowest priority.
1495 If neither of them is wait lock, the first one has higher priority.
1496 If only one of them is a wait lock, it has lower priority.
1497 If either is a high priority transaction, the lock has higher priority.
1498 Otherwise, the one with an older transaction has higher priority.
1499 @returns true if lock1 has higher priority, false otherwise. */
has_higher_priority(lock_t * lock1,lock_t * lock2)1500 static bool has_higher_priority(lock_t *lock1, lock_t *lock2)
1501 {
1502 	if (lock1 == NULL) {
1503 		return false;
1504 	} else if (lock2 == NULL) {
1505 		return true;
1506 	}
1507 	// Granted locks has higher priority.
1508 	if (!lock_get_wait(lock1)) {
1509 		return true;
1510 	} else if (!lock_get_wait(lock2)) {
1511 		return false;
1512 	}
1513 	return lock1->trx->start_time_micro <= lock2->trx->start_time_micro;
1514 }
1515 
1516 /*********************************************************************//**
1517 Insert a lock to the hash list according to the mode (whether it is a wait
1518 lock) and the age of the transaction the it is associated with.
1519 If the lock is not a wait lock, insert it to the head of the hash list.
1520 Otherwise, insert it to the middle of the wait locks according to the age of
1521 the transaciton. */
1522 static
1523 dberr_t
lock_rec_insert_by_trx_age(lock_t * in_lock)1524 lock_rec_insert_by_trx_age(
1525 	lock_t	*in_lock) /*!< in: lock to be insert */{
1526 	lock_t*				node;
1527 	lock_t*				next;
1528 	hash_table_t*		hash;
1529 	hash_cell_t*		cell;
1530 
1531 	ut_ad(!in_lock->trx->is_wsrep());
1532 	const page_id_t page_id(in_lock->un_member.rec_lock.page_id);
1533 	hash = lock_hash_get(in_lock->type_mode);
1534 	cell = &hash->array[hash->calc_hash(page_id.fold())];
1535 
1536 	node = (lock_t *) cell->node;
1537 	// If in_lock is not a wait lock, we insert it to the head of the list.
1538 	if (node == NULL || !lock_get_wait(in_lock) || has_higher_priority(in_lock, node)) {
1539 		cell->node = in_lock;
1540 		in_lock->hash = node;
1541 		if (lock_get_wait(in_lock)) {
1542 			lock_grant_have_trx_mutex(in_lock);
1543 			return DB_SUCCESS_LOCKED_REC;
1544 		}
1545 		return DB_SUCCESS;
1546 	}
1547 	while (node != NULL && has_higher_priority((lock_t *) node->hash,
1548 						   in_lock)) {
1549 		node = (lock_t *) node->hash;
1550 	}
1551 	next = (lock_t *) node->hash;
1552 	node->hash = in_lock;
1553 	in_lock->hash = next;
1554 
1555 	if (lock_get_wait(in_lock) && !lock_rec_has_to_wait_in_queue(in_lock)) {
1556 		lock_grant_have_trx_mutex(in_lock);
1557 		if (cell->node != in_lock) {
1558 			// Move it to the front of the queue
1559 			node->hash = in_lock->hash;
1560 			next = (lock_t *) cell->node;
1561 			cell->node = in_lock;
1562 			in_lock->hash = next;
1563 		}
1564 		return DB_SUCCESS_LOCKED_REC;
1565 	}
1566 
1567 	return DB_SUCCESS;
1568 }
1569 
1570 #ifdef UNIV_DEBUG
1571 static
1572 bool
lock_queue_validate(const lock_t * in_lock)1573 lock_queue_validate(
1574 	const lock_t	*in_lock) /*!< in: lock whose hash list is to be validated */
1575 {
1576 	hash_table_t*		hash;
1577 	hash_cell_t*		cell;
1578 	lock_t*				next;
1579 	bool				wait_lock __attribute__((unused))= false;
1580 
1581 	if (in_lock == NULL) {
1582 		return true;
1583 	}
1584 
1585 	const page_id_t	page_id(in_lock->un_member.rec_lock.page_id);
1586 	hash = lock_hash_get(in_lock->type_mode);
1587 	cell = &hash->array[hash->calc_hash(page_id.fold())];
1588 	next = (lock_t *) cell->node;
1589 	while (next != NULL) {
1590 		// If this is a granted lock, check that there's no wait lock before it.
1591 		if (!lock_get_wait(next)) {
1592 			ut_ad(!wait_lock);
1593 		} else {
1594 			wait_lock = true;
1595 		}
1596 		next = next->hash;
1597 	}
1598 	return true;
1599 }
1600 #endif /* UNIV_DEBUG */
1601 
1602 static
1603 void
lock_rec_insert_to_head(lock_t * in_lock,ulint rec_fold)1604 lock_rec_insert_to_head(
1605 	lock_t *in_lock,   /*!< in: lock to be insert */
1606 	ulint	rec_fold)  /*!< in: rec_fold of the page */
1607 {
1608 	hash_table_t*		hash;
1609 	hash_cell_t*		cell;
1610 	lock_t*				node;
1611 
1612 	if (in_lock == NULL) {
1613 		return;
1614 	}
1615 
1616 	hash = lock_hash_get(in_lock->type_mode);
1617 	cell = &hash->array[hash->calc_hash(rec_fold)];
1618 	node = (lock_t *) cell->node;
1619 	if (node != in_lock) {
1620 		cell->node = in_lock;
1621 		in_lock->hash = node;
1622 	}
1623 }
1624 
1625 /** Enqueue a waiting request for a lock which cannot be granted immediately.
1626 Check for deadlocks.
1627 @param[in]	c_lock		conflicting lock
1628 @param[in]	type_mode	the requested lock mode (LOCK_S or LOCK_X)
1629 				possibly ORed with LOCK_GAP or
1630 				LOCK_REC_NOT_GAP, ORed with
1631 				LOCK_INSERT_INTENTION if this
1632 				waiting lock request is set
1633 				when performing an insert of
1634 				an index record
1635 @param[in]	block		leaf page in the index
1636 @param[in]	heap_no		record heap number in the block
1637 @param[in]	index		index tree
1638 @param[in,out]	thr		query thread
1639 @param[in]	prdt		minimum bounding box (spatial index)
1640 @retval	DB_LOCK_WAIT		if the waiting lock was enqueued
1641 @retval	DB_DEADLOCK		if this transaction was chosen as the victim
1642 @retval	DB_SUCCESS_LOCKED_REC	if the other transaction was chosen as a victim
1643 				(or it happened to commit) */
1644 dberr_t
lock_rec_enqueue_waiting(lock_t * c_lock,unsigned type_mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,que_thr_t * thr,lock_prdt_t * prdt)1645 lock_rec_enqueue_waiting(
1646 	lock_t*			c_lock,
1647 	unsigned		type_mode,
1648 	const buf_block_t*	block,
1649 	ulint			heap_no,
1650 	dict_index_t*		index,
1651 	que_thr_t*		thr,
1652 	lock_prdt_t*		prdt)
1653 {
1654 	ut_ad(lock_mutex_own());
1655 	ut_ad(!srv_read_only_mode);
1656 	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1657 
1658 	trx_t* trx = thr_get_trx(thr);
1659 
1660 	ut_ad(trx_mutex_own(trx));
1661 	ut_a(!que_thr_stop(thr));
1662 
1663 	switch (trx_get_dict_operation(trx)) {
1664 	case TRX_DICT_OP_NONE:
1665 		break;
1666 	case TRX_DICT_OP_TABLE:
1667 	case TRX_DICT_OP_INDEX:
1668 		ib::error() << "A record lock wait happens in a dictionary"
1669 			" operation. index "
1670 			<< index->name
1671 			<< " of table "
1672 			<< index->table->name
1673 			<< ". " << BUG_REPORT_MSG;
1674 		ut_ad(0);
1675 	}
1676 
1677 	if (trx->mysql_thd && thd_lock_wait_timeout(trx->mysql_thd) == 0) {
1678 		trx->error_state = DB_LOCK_WAIT_TIMEOUT;
1679 		return DB_LOCK_WAIT_TIMEOUT;
1680 	}
1681 
1682 	/* Enqueue the lock request that will wait to be granted, note that
1683 	we already own the trx mutex. */
1684 	lock_t* lock = lock_rec_create(c_lock,
1685 #ifdef WITH_WSREP
1686 		thr,
1687 #endif
1688 		type_mode | LOCK_WAIT, block, heap_no, index, trx, TRUE);
1689 
1690 	if (prdt && type_mode & LOCK_PREDICATE) {
1691 		lock_prdt_set_prdt(lock, prdt);
1692 	}
1693 
1694 	if (ut_d(const trx_t* victim =)
1695 	    DeadlockChecker::check_and_resolve(lock, trx)) {
1696 		ut_ad(victim == trx);
1697 		lock_reset_lock_and_trx_wait(lock);
1698 		lock_rec_reset_nth_bit(lock, heap_no);
1699 		return DB_DEADLOCK;
1700 	}
1701 
1702 	if (!trx->lock.wait_lock) {
1703 		/* If there was a deadlock but we chose another
1704 		transaction as a victim, it is possible that we
1705 		already have the lock now granted! */
1706 #ifdef WITH_WSREP
1707 		if (UNIV_UNLIKELY(wsrep_debug)) {
1708 			ib::info() << "WSREP: BF thread got lock granted early, ID " << ib::hex(trx->id)
1709 				   << " query: " << wsrep_thd_query(trx->mysql_thd);
1710 		}
1711 #endif
1712 		return DB_SUCCESS_LOCKED_REC;
1713 	}
1714 
1715 	trx->lock.que_state = TRX_QUE_LOCK_WAIT;
1716 
1717 	trx->lock.was_chosen_as_deadlock_victim = false;
1718 	trx->lock.wait_started = time(NULL);
1719 
1720 	ut_a(que_thr_stop(thr));
1721 
1722 	DBUG_LOG("ib_lock", "trx " << ib::hex(trx->id)
1723 		 << " waits for lock in index " << index->name
1724 		 << " of table " << index->table->name);
1725 
1726 	MONITOR_INC(MONITOR_LOCKREC_WAIT);
1727 
1728 	if (innodb_lock_schedule_algorithm
1729 	    == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
1730 	    && !prdt
1731 	    && !thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
1732 		HASH_DELETE(lock_t, hash, &lock_sys.rec_hash,
1733 			    lock_rec_lock_fold(lock), lock);
1734 		dberr_t res = lock_rec_insert_by_trx_age(lock);
1735 		if (res != DB_SUCCESS) {
1736 			return res;
1737 		}
1738 	}
1739 
1740 	return DB_LOCK_WAIT;
1741 }
1742 
1743 /*********************************************************************//**
1744 Looks for a suitable type record lock struct by the same trx on the same page.
1745 This can be used to save space when a new record lock should be set on a page:
1746 no new struct is needed, if a suitable old is found.
1747 @return lock or NULL */
1748 static inline
1749 lock_t*
lock_rec_find_similar_on_page(ulint type_mode,ulint heap_no,lock_t * lock,const trx_t * trx)1750 lock_rec_find_similar_on_page(
1751 	ulint           type_mode,      /*!< in: lock type_mode field */
1752 	ulint           heap_no,        /*!< in: heap number of the record */
1753 	lock_t*         lock,           /*!< in: lock_sys.get_first() */
1754 	const trx_t*    trx)            /*!< in: transaction */
1755 {
1756 	ut_ad(lock_mutex_own());
1757 
1758 	for (/* No op */;
1759 	     lock != NULL;
1760 	     lock = lock_rec_get_next_on_page(lock)) {
1761 
1762 		if (lock->trx == trx
1763 		    && lock->type_mode == type_mode
1764 		    && lock_rec_get_n_bits(lock) > heap_no) {
1765 
1766 			return(lock);
1767 		}
1768 	}
1769 
1770 	return(NULL);
1771 }
1772 
1773 /*********************************************************************//**
1774 Adds a record lock request in the record queue. The request is normally
1775 added as the last in the queue, but if there are no waiting lock requests
1776 on the record, and the request to be added is not a waiting request, we
1777 can reuse a suitable record lock object already existing on the same page,
1778 just setting the appropriate bit in its bitmap. This is a low-level function
1779 which does NOT check for deadlocks or lock compatibility!
1780 @param[in] type_mode lock mode, wait, gap etc. flags; type is ignored and
1781 replaced by LOCK_REC
1782 @param[in] block buffer block containing the record
1783 @param[in] heap_no heap number of the record
1784 @param[in] index index of record
1785 @param[in/out] trx transaction
1786 @param[in] caller_owns_trx_mutex, TRUE if caller owns the transaction mutex
1787 @param[in] insert_before_waiting true=insert B-tree record lock right before
1788 a waiting lock request; false=insert the lock at the end of the queue
1789 @return lock where the bit was set */
lock_rec_add_to_queue(unsigned type_mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,trx_t * trx,bool caller_owns_trx_mutex,bool insert_before_waiting=false)1790 static void lock_rec_add_to_queue(unsigned type_mode, const buf_block_t *block,
1791                                   ulint heap_no, dict_index_t *index,
1792                                   trx_t *trx, bool caller_owns_trx_mutex,
1793                                   bool insert_before_waiting= false)
1794 {
1795 #ifdef UNIV_DEBUG
1796 	ut_ad(lock_mutex_own());
1797 	ut_ad(caller_owns_trx_mutex == trx_mutex_own(trx));
1798 	ut_ad(dict_index_is_clust(index)
1799 	      || dict_index_get_online_status(index) != ONLINE_INDEX_CREATION);
1800 	switch (type_mode & LOCK_MODE_MASK) {
1801 	case LOCK_X:
1802 	case LOCK_S:
1803 		break;
1804 	default:
1805 		ut_error;
1806 	}
1807 
1808 	if (!(type_mode & (LOCK_WAIT | LOCK_GAP))) {
1809 		lock_mode	mode = (type_mode & LOCK_MODE_MASK) == LOCK_S
1810 			? LOCK_X
1811 			: LOCK_S;
1812 		const lock_t*	other_lock
1813 			= lock_rec_other_has_expl_req(
1814 				mode, block, false, heap_no, trx);
1815 #ifdef WITH_WSREP
1816 		if (UNIV_LIKELY_NULL(other_lock) && trx->is_wsrep()) {
1817 			/* Only BF transaction may be granted lock
1818 			before other conflicting lock request. */
1819 			if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)
1820 			    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
1821 				/* If it is not BF, this case is a bug. */
1822 				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
1823 				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
1824 				ut_error;
1825 			}
1826 		} else
1827 #endif /* WITH_WSREP */
1828 		ut_ad(!other_lock);
1829 	}
1830 #endif /* UNIV_DEBUG */
1831 
1832 	type_mode |= LOCK_REC;
1833 
1834 	/* If rec is the supremum record, then we can reset the gap bit, as
1835 	all locks on the supremum are automatically of the gap type, and we
1836 	try to avoid unnecessary memory consumption of a new record lock
1837 	struct for a gap type lock */
1838 
1839 	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
1840 		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
1841 
1842 		/* There should never be LOCK_REC_NOT_GAP on a supremum
1843 		record, but let us play safe */
1844 
1845 		type_mode &= ~(LOCK_GAP | LOCK_REC_NOT_GAP);
1846 	}
1847 
1848 	lock_t*		lock;
1849 	lock_t*		first_lock;
1850 
1851 	/* Look for a waiting lock request on the same record or on a gap */
1852 
1853 	for (first_lock = lock = lock_sys.get_first(*lock_hash_get(type_mode),
1854 						    block->page.id());
1855 	     lock != NULL;
1856 	     lock = lock_rec_get_next_on_page(lock)) {
1857 
1858 		if (lock_get_wait(lock)
1859 		    && lock_rec_get_nth_bit(lock, heap_no)) {
1860 
1861 			break;
1862 		}
1863 	}
1864 
1865 	if (lock == NULL && !(type_mode & LOCK_WAIT)) {
1866 
1867 		/* Look for a similar record lock on the same page:
1868 		if one is found and there are no waiting lock requests,
1869 		we can just set the bit */
1870 
1871 		lock = lock_rec_find_similar_on_page(
1872 			type_mode, heap_no, first_lock, trx);
1873 
1874 		if (lock != NULL) {
1875 
1876 			lock_rec_set_nth_bit(lock, heap_no);
1877 
1878 			return;
1879 		}
1880 	}
1881 
1882 	/* Note: We will not pass any conflicting lock to lock_rec_create(),
1883 	because we should be moving an existing waiting lock request. */
1884 	ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);
1885 
1886 	lock_rec_create(NULL,
1887 #ifdef WITH_WSREP
1888 		NULL,
1889 #endif
1890 		type_mode, block, heap_no, index, trx, caller_owns_trx_mutex,
1891 		insert_before_waiting);
1892 }
1893 
1894 /*********************************************************************//**
1895 Tries to lock the specified record in the mode requested. If not immediately
1896 possible, enqueues a waiting lock request. This is a low-level function
1897 which does NOT look at implicit locks! Checks lock compatibility within
1898 explicit locks. This function sets a normal next-key lock, or in the case
1899 of a page supremum record, a gap type lock.
1900 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
1901 static
1902 dberr_t
lock_rec_lock(bool impl,unsigned mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,que_thr_t * thr)1903 lock_rec_lock(
1904 /*==========*/
1905 	bool			impl,	/*!< in: if true, no lock is set
1906 					if no wait is necessary: we
1907 					assume that the caller will
1908 					set an implicit lock */
1909 	unsigned		mode,	/*!< in: lock mode: LOCK_X or
1910 					LOCK_S possibly ORed to either
1911 					LOCK_GAP or LOCK_REC_NOT_GAP */
1912 	const buf_block_t*	block,	/*!< in: buffer block containing
1913 					the record */
1914 	ulint			heap_no,/*!< in: heap number of record */
1915 	dict_index_t*		index,	/*!< in: index of record */
1916 	que_thr_t*		thr)	/*!< in: query thread */
1917 {
1918   trx_t *trx= thr_get_trx(thr);
1919   dberr_t err= DB_SUCCESS;
1920 
1921   ut_ad(!srv_read_only_mode);
1922   ut_ad((LOCK_MODE_MASK & mode) == LOCK_S ||
1923         (LOCK_MODE_MASK & mode) == LOCK_X);
1924   ut_ad((mode & LOCK_TYPE_MASK) == LOCK_GAP ||
1925         (mode & LOCK_TYPE_MASK) == LOCK_REC_NOT_GAP ||
1926         (mode & LOCK_TYPE_MASK) == 0);
1927   ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1928   DBUG_EXECUTE_IF("innodb_report_deadlock", return DB_DEADLOCK;);
1929 
1930   lock_mutex_enter();
1931   ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ||
1932         lock_table_has(trx, index->table, LOCK_IS));
1933   ut_ad((LOCK_MODE_MASK & mode) != LOCK_X ||
1934          lock_table_has(trx, index->table, LOCK_IX));
1935 
1936   if (lock_table_has(trx, index->table,
1937                      static_cast<lock_mode>(LOCK_MODE_MASK & mode)));
1938   else if (lock_t *lock= lock_sys.get_first(block->page.id()))
1939   {
1940     trx_mutex_enter(trx);
1941     if (lock_rec_get_next_on_page(lock) ||
1942         lock->trx != trx ||
1943         lock->type_mode != (ulint(mode) | LOCK_REC) ||
1944         lock_rec_get_n_bits(lock) <= heap_no)
1945     {
1946       /* Do nothing if the trx already has a strong enough lock on rec */
1947       if (!lock_rec_has_expl(mode, block, heap_no, trx))
1948       {
1949         bool was_ignored = false;
1950         if (lock_t *c_lock= lock_rec_other_has_conflicting(
1951                 mode, block, heap_no, trx, &was_ignored))
1952         {
1953           /*
1954             If another transaction has a non-gap conflicting
1955             request in the queue, as this transaction does not
1956             have a lock strong enough already granted on the
1957 	    record, we have to wait. */
1958 	    err = lock_rec_enqueue_waiting(c_lock, mode, block, heap_no, index,
1959 		thr, NULL);
1960         }
1961         else if (!impl)
1962         {
1963           /* Set the requested lock on the record. */
1964           lock_rec_add_to_queue(LOCK_REC | mode, block, heap_no, index, trx,
1965                                 true, was_ignored);
1966           err= DB_SUCCESS_LOCKED_REC;
1967         }
1968       }
1969     }
1970     else if (!impl)
1971     {
1972       /*
1973         If the nth bit of the record lock is already set then we do not set
1974         a new lock bit, otherwise we do set
1975       */
1976       if (!lock_rec_get_nth_bit(lock, heap_no))
1977       {
1978         lock_rec_set_nth_bit(lock, heap_no);
1979         err= DB_SUCCESS_LOCKED_REC;
1980       }
1981     }
1982     trx_mutex_exit(trx);
1983   }
1984   else
1985   {
1986     /*
1987       Simplified and faster path for the most common cases
1988       Note that we don't own the trx mutex.
1989     */
1990     if (!impl)
1991       lock_rec_create(NULL,
1992 #ifdef WITH_WSREP
1993          NULL,
1994 #endif
1995         mode, block, heap_no, index, trx, false);
1996 
1997     err= DB_SUCCESS_LOCKED_REC;
1998   }
1999   lock_mutex_exit();
2000   MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
2001   return err;
2002 }
2003 
2004 /*********************************************************************//**
2005 Checks if a waiting record lock request still has to wait in a queue.
2006 @return lock that is causing the wait */
2007 static
2008 const lock_t*
lock_rec_has_to_wait_in_queue(const lock_t * wait_lock)2009 lock_rec_has_to_wait_in_queue(
2010 /*==========================*/
2011 	const lock_t*	wait_lock)	/*!< in: waiting record lock */
2012 {
2013 	const lock_t*	lock;
2014 	ulint		heap_no;
2015 	ulint		bit_mask;
2016 	ulint		bit_offset;
2017 
2018 	ut_ad(wait_lock);
2019 	ut_ad(lock_mutex_own());
2020 	ut_ad(lock_get_wait(wait_lock));
2021 	ut_ad(lock_get_type_low(wait_lock) == LOCK_REC);
2022 
2023 	heap_no = lock_rec_find_set_bit(wait_lock);
2024 
2025 	bit_offset = heap_no / 8;
2026 	bit_mask = static_cast<ulint>(1) << (heap_no % 8);
2027 
2028 	for (lock = lock_sys.get_first(*lock_hash_get(wait_lock->type_mode),
2029 				       wait_lock->un_member.rec_lock.page_id);
2030 	     lock != wait_lock;
2031 	     lock = lock_rec_get_next_on_page_const(lock)) {
2032 		const byte*	p = (const byte*) &lock[1];
2033 
2034 		if (heap_no < lock_rec_get_n_bits(lock)
2035 		    && (p[bit_offset] & bit_mask)
2036 		    && lock_has_to_wait(wait_lock, lock)) {
2037 			return(lock);
2038 		}
2039 	}
2040 
2041 	return(NULL);
2042 }
2043 
2044 /** Grant a lock to a waiting lock request and release the waiting transaction
2045 after lock_reset_lock_and_trx_wait() has been called. */
lock_grant_after_reset(lock_t * lock)2046 static void lock_grant_after_reset(lock_t* lock)
2047 {
2048 	ut_ad(lock_mutex_own());
2049 	ut_ad(trx_mutex_own(lock->trx));
2050 
2051 	if (lock_get_mode(lock) == LOCK_AUTO_INC) {
2052 		dict_table_t*	table = lock->un_member.tab_lock.table;
2053 
2054 		if (table->autoinc_trx == lock->trx) {
2055 			ib::error() << "Transaction already had an"
2056 				<< " AUTO-INC lock!";
2057 		} else {
2058 			table->autoinc_trx = lock->trx;
2059 
2060 			ib_vector_push(lock->trx->autoinc_locks, &lock);
2061 		}
2062 	}
2063 
2064 	DBUG_PRINT("ib_lock", ("wait for trx " TRX_ID_FMT " ends",
2065 			       trx_get_id_for_print(lock->trx)));
2066 
2067 	/* If we are resolving a deadlock by choosing another transaction
2068 	as a victim, then our original transaction may not be in the
2069 	TRX_QUE_LOCK_WAIT state, and there is no need to end the lock wait
2070 	for it */
2071 
2072 	if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
2073 		que_thr_t*	thr;
2074 
2075 		thr = que_thr_end_lock_wait(lock->trx);
2076 
2077 		if (thr != NULL) {
2078 			lock_wait_release_thread_if_suspended(thr);
2079 		}
2080 	}
2081 }
2082 
2083 /** Grant a lock to a waiting lock request and release the waiting transaction. */
lock_grant(lock_t * lock)2084 static void lock_grant(lock_t* lock)
2085 {
2086 	lock_reset_lock_and_trx_wait(lock);
2087 	trx_mutex_enter(lock->trx);
2088 	lock_grant_after_reset(lock);
2089 	trx_mutex_exit(lock->trx);
2090 }
2091 
2092 /*************************************************************//**
2093 Cancels a waiting record lock request and releases the waiting transaction
2094 that requested it. NOTE: does NOT check if waiting lock requests behind this
2095 one can now be granted! */
2096 static
2097 void
lock_rec_cancel(lock_t * lock)2098 lock_rec_cancel(
2099 /*============*/
2100 	lock_t*	lock)	/*!< in: waiting record lock request */
2101 {
2102 	que_thr_t*	thr;
2103 
2104 	ut_ad(lock_mutex_own());
2105 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
2106 
2107 	/* Reset the bit (there can be only one set bit) in the lock bitmap */
2108 	lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
2109 
2110 	/* Reset the wait flag and the back pointer to lock in trx */
2111 
2112 	lock_reset_lock_and_trx_wait(lock);
2113 
2114 	/* The following function releases the trx from lock wait */
2115 
2116 	trx_mutex_enter(lock->trx);
2117 
2118 	thr = que_thr_end_lock_wait(lock->trx);
2119 
2120 	if (thr != NULL) {
2121 		lock_wait_release_thread_if_suspended(thr);
2122 	}
2123 
2124 	trx_mutex_exit(lock->trx);
2125 }
2126 
lock_grant_and_move_on_page(ulint rec_fold,const page_id_t id)2127 static void lock_grant_and_move_on_page(ulint rec_fold, const page_id_t id)
2128 {
2129 	lock_t*		lock;
2130 	lock_t*		previous = static_cast<lock_t*>(
2131 		lock_sys.rec_hash.array[lock_sys.rec_hash.calc_hash(rec_fold)].
2132 		node);
2133 	if (previous == NULL) {
2134 		return;
2135 	}
2136 	if (previous->un_member.rec_lock.page_id == id) {
2137 		lock = previous;
2138 	}
2139 	else {
2140 		while (previous->hash &&
2141 		       (previous->hash->un_member.rec_lock.page_id != id)) {
2142 			previous = previous->hash;
2143 		}
2144 		lock = previous->hash;
2145 	}
2146 
2147 	ut_ad(previous->hash == lock || previous == lock);
2148 	/* Grant locks if there are no conflicting locks ahead.
2149 	 Move granted locks to the head of the list. */
2150 	while (lock) {
2151 		/* If the lock is a wait lock on this page, and it does not need to wait. */
2152 		ut_ad(!lock->trx->is_wsrep());
2153 		if (lock_get_wait(lock)
2154 		    && lock->un_member.rec_lock.page_id == id
2155 		    && !lock_rec_has_to_wait_in_queue(lock)) {
2156 			lock_grant(lock);
2157 
2158 			if (previous != NULL) {
2159 				/* Move the lock to the head of the list. */
2160 				HASH_GET_NEXT(hash, previous) = HASH_GET_NEXT(hash, lock);
2161 				lock_rec_insert_to_head(lock, rec_fold);
2162 			} else {
2163 				/* Already at the head of the list. */
2164 				previous = lock;
2165 			}
2166 			/* Move on to the next lock. */
2167 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, previous));
2168 		} else {
2169 			previous = lock;
2170 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, lock));
2171 		}
2172 	}
2173 }
2174 
2175 /** Remove a record lock request, waiting or granted, from the queue and
2176 grant locks to other transactions in the queue if they now are entitled
2177 to a lock. NOTE: all record locks contained in in_lock are removed.
2178 @param[in,out]	in_lock		record lock */
lock_rec_dequeue_from_page(lock_t * in_lock)2179 static void lock_rec_dequeue_from_page(lock_t* in_lock)
2180 {
2181 	hash_table_t*	lock_hash;
2182 
2183 	ut_ad(lock_mutex_own());
2184 	ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
2185 	/* We may or may not be holding in_lock->trx->mutex here. */
2186 
2187 	const page_id_t page_id(in_lock->un_member.rec_lock.page_id);
2188 
2189 	in_lock->index->table->n_rec_locks--;
2190 
2191 	lock_hash = lock_hash_get(in_lock->type_mode);
2192 
2193 	const ulint rec_fold = page_id.fold();
2194 
2195 	HASH_DELETE(lock_t, hash, lock_hash, rec_fold, in_lock);
2196 	UT_LIST_REMOVE(in_lock->trx->lock.trx_locks, in_lock);
2197 
2198 	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
2199 	MONITOR_DEC(MONITOR_NUM_RECLOCK);
2200 
2201 	if (innodb_lock_schedule_algorithm
2202 	    == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS
2203 	    || lock_hash != &lock_sys.rec_hash
2204 	    || thd_is_replication_slave_thread(in_lock->trx->mysql_thd)) {
2205 		/* Check if waiting locks in the queue can now be granted:
2206 		grant locks if there are no conflicting locks ahead. Stop at
2207 		the first X lock that is waiting or has been granted. */
2208 
2209 		for (lock_t* lock = lock_sys.get_first(*lock_hash, page_id);
2210 		     lock != NULL;
2211 		     lock = lock_rec_get_next_on_page(lock)) {
2212 
2213 			if (!lock_get_wait(lock)) {
2214 				continue;
2215 			}
2216 
2217 			ut_ad(lock->trx->lock.wait_trx);
2218 			ut_ad(lock->trx->lock.wait_lock);
2219 
2220 			if (const lock_t* c = lock_rec_has_to_wait_in_queue(
2221 				    lock)) {
2222 				trx_mutex_enter(lock->trx);
2223 				lock->trx->lock.wait_trx = c->trx;
2224 				trx_mutex_exit(lock->trx);
2225 			}
2226 			else {
2227 				/* Grant the lock */
2228 				ut_ad(lock->trx != in_lock->trx);
2229 				lock_grant(lock);
2230 			}
2231 		}
2232 	} else {
2233 		lock_grant_and_move_on_page(rec_fold, page_id);
2234 	}
2235 }
2236 
2237 /*************************************************************//**
2238 Removes a record lock request, waiting or granted, from the queue. */
2239 void
lock_rec_discard(lock_t * in_lock)2240 lock_rec_discard(
2241 /*=============*/
2242 	lock_t*		in_lock)	/*!< in: record lock object: all
2243 					record locks which are contained
2244 					in this lock object are removed */
2245 {
2246 	trx_lock_t*	trx_lock;
2247 
2248 	ut_ad(lock_mutex_own());
2249 	ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
2250 
2251 	trx_lock = &in_lock->trx->lock;
2252 
2253 	in_lock->index->table->n_rec_locks--;
2254 
2255 	HASH_DELETE(lock_t, hash, lock_hash_get(in_lock->type_mode),
2256 		    in_lock->un_member.rec_lock.page_id.fold(), in_lock);
2257 
2258 	UT_LIST_REMOVE(trx_lock->trx_locks, in_lock);
2259 
2260 	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
2261 	MONITOR_DEC(MONITOR_NUM_RECLOCK);
2262 }
2263 
2264 /*************************************************************//**
2265 Removes record lock objects set on an index page which is discarded. This
2266 function does not move locks, or check for waiting locks, therefore the
2267 lock bitmaps must already be reset when this function is called. */
lock_rec_free_all_from_discard_page_low(const page_id_t id,hash_table_t * lock_hash)2268 static void lock_rec_free_all_from_discard_page_low(const page_id_t id,
2269                                                     hash_table_t *lock_hash)
2270 {
2271   lock_t *lock= lock_sys.get_first(*lock_hash, id);
2272 
2273   while (lock)
2274   {
2275     ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2276     ut_ad(!lock_get_wait(lock));
2277     lock_t *next_lock= lock_rec_get_next_on_page(lock);
2278     lock_rec_discard(lock);
2279     lock= next_lock;
2280   }
2281 }
2282 
2283 /*************************************************************//**
2284 Removes record lock objects set on an index page which is discarded. This
2285 function does not move locks, or check for waiting locks, therefore the
2286 lock bitmaps must already be reset when this function is called. */
2287 void
lock_rec_free_all_from_discard_page(const buf_block_t * block)2288 lock_rec_free_all_from_discard_page(
2289 /*================================*/
2290 	const buf_block_t*	block)	/*!< in: page to be discarded */
2291 {
2292   const page_id_t page_id(block->page.id());
2293   lock_rec_free_all_from_discard_page_low(page_id, &lock_sys.rec_hash);
2294   lock_rec_free_all_from_discard_page_low(page_id, &lock_sys.prdt_hash);
2295   lock_rec_free_all_from_discard_page_low(page_id, &lock_sys.prdt_page_hash);
2296 }
2297 
2298 /*============= RECORD LOCK MOVING AND INHERITING ===================*/
2299 
2300 /*************************************************************//**
2301 Resets the lock bits for a single record. Releases transactions waiting for
2302 lock requests here. */
2303 static
2304 void
lock_rec_reset_and_release_wait_low(hash_table_t * hash,const buf_block_t * block,ulint heap_no)2305 lock_rec_reset_and_release_wait_low(
2306 /*================================*/
2307 	hash_table_t*		hash,	/*!< in: hash table */
2308 	const buf_block_t*	block,	/*!< in: buffer block containing
2309 					the record */
2310 	ulint			heap_no)/*!< in: heap number of record */
2311 {
2312 	lock_t*	lock;
2313 
2314 	ut_ad(lock_mutex_own());
2315 
2316 	for (lock = lock_rec_get_first(hash, block->page.id(), heap_no);
2317 	     lock != NULL;
2318 	     lock = lock_rec_get_next(heap_no, lock)) {
2319 
2320 		if (lock_get_wait(lock)) {
2321 			lock_rec_cancel(lock);
2322 		} else {
2323 			lock_rec_reset_nth_bit(lock, heap_no);
2324 		}
2325 	}
2326 }
2327 
2328 /*************************************************************//**
2329 Resets the lock bits for a single record. Releases transactions waiting for
2330 lock requests here. */
2331 static
2332 void
lock_rec_reset_and_release_wait(const buf_block_t * block,ulint heap_no)2333 lock_rec_reset_and_release_wait(
2334 /*============================*/
2335 	const buf_block_t*	block,	/*!< in: buffer block containing
2336 					the record */
2337 	ulint			heap_no)/*!< in: heap number of record */
2338 {
2339 	lock_rec_reset_and_release_wait_low(
2340 		&lock_sys.rec_hash, block, heap_no);
2341 
2342 	lock_rec_reset_and_release_wait_low(
2343 		&lock_sys.prdt_hash, block, PAGE_HEAP_NO_INFIMUM);
2344 	lock_rec_reset_and_release_wait_low(
2345 		&lock_sys.prdt_page_hash, block, PAGE_HEAP_NO_INFIMUM);
2346 }
2347 
2348 /*************************************************************//**
2349 Makes a record to inherit the locks (except LOCK_INSERT_INTENTION type)
2350 of another record as gap type locks, but does not reset the lock bits of
2351 the other record. Also waiting lock requests on rec are inherited as
2352 GRANTED gap locks. */
2353 static
2354 void
lock_rec_inherit_to_gap(const buf_block_t * heir_block,const buf_block_t * block,ulint heir_heap_no,ulint heap_no)2355 lock_rec_inherit_to_gap(
2356 /*====================*/
2357 	const buf_block_t*	heir_block,	/*!< in: block containing the
2358 						record which inherits */
2359 	const buf_block_t*	block,		/*!< in: block containing the
2360 						record from which inherited;
2361 						does NOT reset the locks on
2362 						this record */
2363 	ulint			heir_heap_no,	/*!< in: heap_no of the
2364 						inheriting record */
2365 	ulint			heap_no)	/*!< in: heap_no of the
2366 						donating record */
2367 {
2368 	lock_t*	lock;
2369 
2370 	ut_ad(lock_mutex_own());
2371 
2372 	/* At READ UNCOMMITTED or READ COMMITTED isolation level,
2373 	we do not want locks set
2374 	by an UPDATE or a DELETE to be inherited as gap type locks. But we
2375 	DO want S-locks/X-locks(taken for replace) set by a consistency
2376 	constraint to be inherited also then. */
2377 
2378 	for (lock = lock_rec_get_first(&lock_sys.rec_hash, block->page.id(),
2379 	      heap_no);
2380 	     lock != NULL;
2381 	     lock = lock_rec_get_next(heap_no, lock)) {
2382 
2383 		if (!lock_rec_get_insert_intention(lock)
2384 		    && (lock->trx->isolation_level > TRX_ISO_READ_COMMITTED
2385 			|| lock_get_mode(lock) !=
2386 			(lock->trx->duplicates ? LOCK_S : LOCK_X))) {
2387 			lock_rec_add_to_queue(
2388 				LOCK_REC | LOCK_GAP | lock_get_mode(lock),
2389 				heir_block, heir_heap_no, lock->index,
2390 				lock->trx, FALSE);
2391 		}
2392 	}
2393 }
2394 
2395 /*************************************************************//**
2396 Makes a record to inherit the gap locks (except LOCK_INSERT_INTENTION type)
2397 of another record as gap type locks, but does not reset the lock bits of the
2398 other record. Also waiting lock requests are inherited as GRANTED gap locks. */
2399 static
2400 void
lock_rec_inherit_to_gap_if_gap_lock(const buf_block_t * block,ulint heir_heap_no,ulint heap_no)2401 lock_rec_inherit_to_gap_if_gap_lock(
2402 /*================================*/
2403 	const buf_block_t*	block,		/*!< in: buffer block */
2404 	ulint			heir_heap_no,	/*!< in: heap_no of
2405 						record which inherits */
2406 	ulint			heap_no)	/*!< in: heap_no of record
2407 						from which inherited;
2408 						does NOT reset the locks
2409 						on this record */
2410 {
2411 	lock_t*	lock;
2412 
2413 	lock_mutex_enter();
2414 
2415 	for (lock = lock_rec_get_first(&lock_sys.rec_hash, block->page.id(),
2416 	      heap_no);
2417 	     lock != NULL;
2418 	     lock = lock_rec_get_next(heap_no, lock)) {
2419 
2420 		if (!lock_rec_get_insert_intention(lock)
2421 		    && (heap_no == PAGE_HEAP_NO_SUPREMUM
2422 			|| !lock_rec_get_rec_not_gap(lock))) {
2423 
2424 			lock_rec_add_to_queue(
2425 				LOCK_REC | LOCK_GAP | lock_get_mode(lock),
2426 				block, heir_heap_no, lock->index,
2427 				lock->trx, FALSE);
2428 		}
2429 	}
2430 
2431 	lock_mutex_exit();
2432 }
2433 
2434 /*************************************************************//**
2435 Moves the locks of a record to another record and resets the lock bits of
2436 the donating record. */
2437 static
2438 void
lock_rec_move_low(hash_table_t * lock_hash,const buf_block_t * receiver,const buf_block_t * donator,ulint receiver_heap_no,ulint donator_heap_no)2439 lock_rec_move_low(
2440 /*==============*/
2441 	hash_table_t*		lock_hash,	/*!< in: hash table to use */
2442 	const buf_block_t*	receiver,	/*!< in: buffer block containing
2443 						the receiving record */
2444 	const buf_block_t*	donator,	/*!< in: buffer block containing
2445 						the donating record */
2446 	ulint			receiver_heap_no,/*!< in: heap_no of the record
2447 						which gets the locks; there
2448 						must be no lock requests
2449 						on it! */
2450 	ulint			donator_heap_no)/*!< in: heap_no of the record
2451 						which gives the locks */
2452 {
2453 	lock_t*	lock;
2454 
2455 	ut_ad(lock_mutex_own());
2456 
2457 	/* If the lock is predicate lock, it resides on INFIMUM record */
2458 	ut_ad(lock_rec_get_first(
2459 		lock_hash, receiver->page.id(), receiver_heap_no) == NULL
2460 	      || lock_hash == &lock_sys.prdt_hash
2461 	      || lock_hash == &lock_sys.prdt_page_hash);
2462 
2463 	for (lock = lock_rec_get_first(lock_hash,
2464 				       donator->page.id(), donator_heap_no);
2465 	     lock != NULL;
2466 	     lock = lock_rec_get_next(donator_heap_no, lock)) {
2467 
2468 		const auto type_mode = lock->type_mode;
2469 
2470 		lock_rec_reset_nth_bit(lock, donator_heap_no);
2471 
2472 		if (type_mode & LOCK_WAIT) {
2473 			ut_ad(lock->trx->lock.wait_lock == lock);
2474 			lock->type_mode &= ~LOCK_WAIT;
2475 		}
2476 
2477 		/* Note that we FIRST reset the bit, and then set the lock:
2478 		the function works also if donator == receiver */
2479 
2480 		lock_rec_add_to_queue(
2481 			type_mode, receiver, receiver_heap_no,
2482 			lock->index, lock->trx, FALSE);
2483 	}
2484 
2485 	ut_ad(!lock_rec_get_first(&lock_sys.rec_hash,
2486 				  donator->page.id(), donator_heap_no));
2487 }
2488 
2489 /** Move all the granted locks to the front of the given lock list.
2490 All the waiting locks will be at the end of the list.
2491 @param[in,out]	lock_list	the given lock list.  */
2492 static
2493 void
lock_move_granted_locks_to_front(UT_LIST_BASE_NODE_T (lock_t)& lock_list)2494 lock_move_granted_locks_to_front(
2495 	UT_LIST_BASE_NODE_T(lock_t)&	lock_list)
2496 {
2497 	lock_t*	lock;
2498 
2499 	bool seen_waiting_lock = false;
2500 
2501 	for (lock = UT_LIST_GET_FIRST(lock_list); lock;
2502 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
2503 
2504 		if (!seen_waiting_lock) {
2505 			if (lock->is_waiting()) {
2506 				seen_waiting_lock = true;
2507 			}
2508 			continue;
2509 		}
2510 
2511 		ut_ad(seen_waiting_lock);
2512 
2513 		if (!lock->is_waiting()) {
2514 			lock_t* prev = UT_LIST_GET_PREV(trx_locks, lock);
2515 			ut_a(prev);
2516 			ut_list_move_to_front(lock_list, lock);
2517 			lock = prev;
2518 		}
2519 	}
2520 }
2521 
2522 /*************************************************************//**
2523 Moves the locks of a record to another record and resets the lock bits of
2524 the donating record. */
2525 UNIV_INLINE
2526 void
lock_rec_move(const buf_block_t * receiver,const buf_block_t * donator,ulint receiver_heap_no,ulint donator_heap_no)2527 lock_rec_move(
2528 /*==========*/
2529 	const buf_block_t*	receiver,       /*!< in: buffer block containing
2530 						the receiving record */
2531 	const buf_block_t*	donator,        /*!< in: buffer block containing
2532 						the donating record */
2533 	ulint			receiver_heap_no,/*!< in: heap_no of the record
2534 						which gets the locks; there
2535 						must be no lock requests
2536 						on it! */
2537 	ulint			donator_heap_no)/*!< in: heap_no of the record
2538                                                 which gives the locks */
2539 {
2540 	lock_rec_move_low(&lock_sys.rec_hash, receiver, donator,
2541 			  receiver_heap_no, donator_heap_no);
2542 }
2543 
2544 /*************************************************************//**
2545 Updates the lock table when we have reorganized a page. NOTE: we copy
2546 also the locks set on the infimum of the page; the infimum may carry
2547 locks if an update of a record is occurring on the page, and its locks
2548 were temporarily stored on the infimum. */
2549 void
lock_move_reorganize_page(const buf_block_t * block,const buf_block_t * oblock)2550 lock_move_reorganize_page(
2551 /*======================*/
2552 	const buf_block_t*	block,	/*!< in: old index page, now
2553 					reorganized */
2554 	const buf_block_t*	oblock)	/*!< in: copy of the old, not
2555 					reorganized page */
2556 {
2557 	lock_t*		lock;
2558 	UT_LIST_BASE_NODE_T(lock_t)	old_locks;
2559 	mem_heap_t*	heap		= NULL;
2560 	ulint		comp;
2561 
2562 	lock_mutex_enter();
2563 
2564 	/* FIXME: This needs to deal with predicate lock too */
2565 	lock = lock_sys.get_first(block->page.id());
2566 
2567 	if (lock == NULL) {
2568 		lock_mutex_exit();
2569 
2570 		return;
2571 	}
2572 
2573 	heap = mem_heap_create(256);
2574 
2575 	/* Copy first all the locks on the page to heap and reset the
2576 	bitmaps in the original locks; chain the copies of the locks
2577 	using the trx_locks field in them. */
2578 
2579 	UT_LIST_INIT(old_locks, &lock_t::trx_locks);
2580 
2581 	do {
2582 		/* Make a copy of the lock */
2583 		lock_t*	old_lock = lock_rec_copy(lock, heap);
2584 
2585 		UT_LIST_ADD_LAST(old_locks, old_lock);
2586 
2587 		/* Reset bitmap of lock */
2588 		lock_rec_bitmap_reset(lock);
2589 
2590 		if (lock_get_wait(lock)) {
2591 			ut_ad(lock->trx->lock.wait_lock == lock);
2592 			lock->type_mode&= ~LOCK_WAIT;
2593 		}
2594 
2595 		lock = lock_rec_get_next_on_page(lock);
2596 	} while (lock != NULL);
2597 
2598 	comp = page_is_comp(block->frame);
2599 	ut_ad(comp == page_is_comp(oblock->frame));
2600 
2601 	lock_move_granted_locks_to_front(old_locks);
2602 
2603 	DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize",
2604 			ut_list_reverse(old_locks););
2605 
2606 	for (lock = UT_LIST_GET_FIRST(old_locks); lock;
2607 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
2608 
2609 		/* NOTE: we copy also the locks set on the infimum and
2610 		supremum of the page; the infimum may carry locks if an
2611 		update of a record is occurring on the page, and its locks
2612 		were temporarily stored on the infimum */
2613 		const rec_t*	rec1 = page_get_infimum_rec(
2614 			buf_block_get_frame(block));
2615 		const rec_t*	rec2 = page_get_infimum_rec(
2616 			buf_block_get_frame(oblock));
2617 
2618 		/* Set locks according to old locks */
2619 		for (;;) {
2620 			ulint	old_heap_no;
2621 			ulint	new_heap_no;
2622 			ut_d(const rec_t* const orec = rec1);
2623 			ut_ad(page_rec_is_metadata(rec1)
2624 			      == page_rec_is_metadata(rec2));
2625 
2626 			if (comp) {
2627 				old_heap_no = rec_get_heap_no_new(rec2);
2628 				new_heap_no = rec_get_heap_no_new(rec1);
2629 
2630 				rec1 = page_rec_get_next_low(rec1, TRUE);
2631 				rec2 = page_rec_get_next_low(rec2, TRUE);
2632 			} else {
2633 				old_heap_no = rec_get_heap_no_old(rec2);
2634 				new_heap_no = rec_get_heap_no_old(rec1);
2635 				ut_ad(!memcmp(rec1, rec2,
2636 					      rec_get_data_size_old(rec2)));
2637 
2638 				rec1 = page_rec_get_next_low(rec1, FALSE);
2639 				rec2 = page_rec_get_next_low(rec2, FALSE);
2640 			}
2641 
2642 			/* Clear the bit in old_lock. */
2643 			if (old_heap_no < lock->un_member.rec_lock.n_bits
2644 			    && lock_rec_reset_nth_bit(lock, old_heap_no)) {
2645 				ut_ad(!page_rec_is_metadata(orec));
2646 
2647 				/* NOTE that the old lock bitmap could be too
2648 				small for the new heap number! */
2649 
2650 				lock_rec_add_to_queue(
2651 					lock->type_mode, block, new_heap_no,
2652 					lock->index, lock->trx, FALSE);
2653 			}
2654 
2655 			if (new_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2656 				ut_ad(old_heap_no == PAGE_HEAP_NO_SUPREMUM);
2657 				break;
2658 			}
2659 		}
2660 
2661 		ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2662 	}
2663 
2664 	lock_mutex_exit();
2665 
2666 	mem_heap_free(heap);
2667 
2668 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2669 	ut_ad(lock_rec_validate_page(block));
2670 #endif
2671 }
2672 
2673 /*************************************************************//**
2674 Moves the explicit locks on user records to another page if a record
2675 list end is moved to another page. */
2676 void
lock_move_rec_list_end(const buf_block_t * new_block,const buf_block_t * block,const rec_t * rec)2677 lock_move_rec_list_end(
2678 /*===================*/
2679 	const buf_block_t*	new_block,	/*!< in: index page to move to */
2680 	const buf_block_t*	block,		/*!< in: index page */
2681 	const rec_t*		rec)		/*!< in: record on page: this
2682 						is the first record moved */
2683 {
2684 	lock_t*		lock;
2685 	const ulint	comp	= page_rec_is_comp(rec);
2686 
2687 	ut_ad(buf_block_get_frame(block) == page_align(rec));
2688 	ut_ad(comp == page_is_comp(buf_block_get_frame(new_block)));
2689 
2690 	lock_mutex_enter();
2691 
2692 	/* Note: when we move locks from record to record, waiting locks
2693 	and possible granted gap type locks behind them are enqueued in
2694 	the original order, because new elements are inserted to a hash
2695 	table to the end of the hash chain, and lock_rec_add_to_queue
2696 	does not reuse locks if there are waiters in the queue. */
2697 
2698 	for (lock = lock_sys.get_first(block->page.id());
2699 	     lock;
2700 	     lock = lock_rec_get_next_on_page(lock)) {
2701 		const rec_t*	rec1	= rec;
2702 		const rec_t*	rec2;
2703 		const auto	type_mode = lock->type_mode;
2704 
2705 		if (comp) {
2706 			if (page_offset(rec1) == PAGE_NEW_INFIMUM) {
2707 				rec1 = page_rec_get_next_low(rec1, TRUE);
2708 			}
2709 
2710 			rec2 = page_rec_get_next_low(
2711 				buf_block_get_frame(new_block)
2712 				+ PAGE_NEW_INFIMUM, TRUE);
2713 		} else {
2714 			if (page_offset(rec1) == PAGE_OLD_INFIMUM) {
2715 				rec1 = page_rec_get_next_low(rec1, FALSE);
2716 			}
2717 
2718 			rec2 = page_rec_get_next_low(
2719 				buf_block_get_frame(new_block)
2720 				+ PAGE_OLD_INFIMUM, FALSE);
2721 		}
2722 
2723 		/* Copy lock requests on user records to new page and
2724 		reset the lock bits on the old */
2725 
2726 		for (;;) {
2727 			ut_ad(page_rec_is_metadata(rec1)
2728 			      == page_rec_is_metadata(rec2));
2729 			ut_d(const rec_t* const orec = rec1);
2730 
2731 			ulint	rec1_heap_no;
2732 			ulint	rec2_heap_no;
2733 
2734 			if (comp) {
2735 				rec1_heap_no = rec_get_heap_no_new(rec1);
2736 
2737 				if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2738 					break;
2739 				}
2740 
2741 				rec2_heap_no = rec_get_heap_no_new(rec2);
2742 				rec1 = page_rec_get_next_low(rec1, TRUE);
2743 				rec2 = page_rec_get_next_low(rec2, TRUE);
2744 			} else {
2745 				rec1_heap_no = rec_get_heap_no_old(rec1);
2746 
2747 				if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2748 					break;
2749 				}
2750 
2751 				rec2_heap_no = rec_get_heap_no_old(rec2);
2752 
2753 				ut_ad(rec_get_data_size_old(rec1)
2754 				      == rec_get_data_size_old(rec2));
2755 
2756 				ut_ad(!memcmp(rec1, rec2,
2757 					      rec_get_data_size_old(rec1)));
2758 
2759 				rec1 = page_rec_get_next_low(rec1, FALSE);
2760 				rec2 = page_rec_get_next_low(rec2, FALSE);
2761 			}
2762 
2763 			if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2764 			    && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2765 				ut_ad(!page_rec_is_metadata(orec));
2766 
2767 				if (type_mode & LOCK_WAIT) {
2768 					ut_ad(lock->trx->lock.wait_lock ==
2769 					    lock);
2770 					lock->type_mode&= ~LOCK_WAIT;
2771 				}
2772 
2773 				lock_rec_add_to_queue(
2774 					type_mode, new_block, rec2_heap_no,
2775 					lock->index, lock->trx, FALSE);
2776 			}
2777 		}
2778 	}
2779 
2780 	lock_mutex_exit();
2781 
2782 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2783 	ut_ad(lock_rec_validate_page(block));
2784 	ut_ad(lock_rec_validate_page(new_block));
2785 #endif
2786 }
2787 
2788 /*************************************************************//**
2789 Moves the explicit locks on user records to another page if a record
2790 list start is moved to another page. */
2791 void
lock_move_rec_list_start(const buf_block_t * new_block,const buf_block_t * block,const rec_t * rec,const rec_t * old_end)2792 lock_move_rec_list_start(
2793 /*=====================*/
2794 	const buf_block_t*	new_block,	/*!< in: index page to
2795 						move to */
2796 	const buf_block_t*	block,		/*!< in: index page */
2797 	const rec_t*		rec,		/*!< in: record on page:
2798 						this is the first
2799 						record NOT copied */
2800 	const rec_t*		old_end)	/*!< in: old
2801 						previous-to-last
2802 						record on new_page
2803 						before the records
2804 						were copied */
2805 {
2806 	lock_t*		lock;
2807 	const ulint	comp	= page_rec_is_comp(rec);
2808 
2809 	ut_ad(block->frame == page_align(rec));
2810 	ut_ad(new_block->frame == page_align(old_end));
2811 	ut_ad(comp == page_rec_is_comp(old_end));
2812 	ut_ad(!page_rec_is_metadata(rec));
2813 
2814 	lock_mutex_enter();
2815 
2816 	for (lock = lock_sys.get_first(block->page.id());
2817 	     lock;
2818 	     lock = lock_rec_get_next_on_page(lock)) {
2819 		const rec_t*	rec1;
2820 		const rec_t*	rec2;
2821 		const auto	type_mode = lock->type_mode;
2822 
2823 		if (comp) {
2824 			rec1 = page_rec_get_next_low(
2825 				buf_block_get_frame(block)
2826 				+ PAGE_NEW_INFIMUM, TRUE);
2827 			rec2 = page_rec_get_next_low(old_end, TRUE);
2828 		} else {
2829 			rec1 = page_rec_get_next_low(
2830 				buf_block_get_frame(block)
2831 				+ PAGE_OLD_INFIMUM, FALSE);
2832 			rec2 = page_rec_get_next_low(old_end, FALSE);
2833 		}
2834 
2835 		/* Copy lock requests on user records to new page and
2836 		reset the lock bits on the old */
2837 
2838 		while (rec1 != rec) {
2839 			ut_ad(page_rec_is_metadata(rec1)
2840 			      == page_rec_is_metadata(rec2));
2841 			ut_d(const rec_t* const prev = rec1);
2842 
2843 			ulint	rec1_heap_no;
2844 			ulint	rec2_heap_no;
2845 
2846 			if (comp) {
2847 				rec1_heap_no = rec_get_heap_no_new(rec1);
2848 				rec2_heap_no = rec_get_heap_no_new(rec2);
2849 
2850 				rec1 = page_rec_get_next_low(rec1, TRUE);
2851 				rec2 = page_rec_get_next_low(rec2, TRUE);
2852 			} else {
2853 				rec1_heap_no = rec_get_heap_no_old(rec1);
2854 				rec2_heap_no = rec_get_heap_no_old(rec2);
2855 
2856 				ut_ad(!memcmp(rec1, rec2,
2857 					      rec_get_data_size_old(rec2)));
2858 
2859 				rec1 = page_rec_get_next_low(rec1, FALSE);
2860 				rec2 = page_rec_get_next_low(rec2, FALSE);
2861 			}
2862 
2863 			if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2864 			    && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2865 				ut_ad(!page_rec_is_metadata(prev));
2866 
2867 				if (type_mode & LOCK_WAIT) {
2868 					ut_ad(lock->trx->lock.wait_lock
2869 					    == lock);
2870 					lock->type_mode&= ~LOCK_WAIT;
2871 				}
2872 
2873 				lock_rec_add_to_queue(
2874 					type_mode, new_block, rec2_heap_no,
2875 					lock->index, lock->trx, FALSE);
2876 			}
2877 		}
2878 
2879 #ifdef UNIV_DEBUG
2880 		if (page_rec_is_supremum(rec)) {
2881 			ulint	i;
2882 
2883 			for (i = PAGE_HEAP_NO_USER_LOW;
2884 			     i < lock_rec_get_n_bits(lock); i++) {
2885 				if (lock_rec_get_nth_bit(lock, i)) {
2886 					ib::fatal()
2887 						<< "lock_move_rec_list_start():"
2888 						<< i << " not moved in "
2889 						<<  (void*) lock;
2890 				}
2891 			}
2892 		}
2893 #endif /* UNIV_DEBUG */
2894 	}
2895 
2896 	lock_mutex_exit();
2897 
2898 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2899 	ut_ad(lock_rec_validate_page(block));
2900 #endif
2901 }
2902 
2903 /*************************************************************//**
2904 Moves the explicit locks on user records to another page if a record
2905 list start is moved to another page. */
2906 void
lock_rtr_move_rec_list(const buf_block_t * new_block,const buf_block_t * block,rtr_rec_move_t * rec_move,ulint num_move)2907 lock_rtr_move_rec_list(
2908 /*===================*/
2909 	const buf_block_t*	new_block,	/*!< in: index page to
2910 						move to */
2911 	const buf_block_t*	block,		/*!< in: index page */
2912 	rtr_rec_move_t*		rec_move,       /*!< in: recording records
2913 						moved */
2914 	ulint			num_move)       /*!< in: num of rec to move */
2915 {
2916 	lock_t*		lock;
2917 	ulint		comp;
2918 
2919 	if (!num_move) {
2920 		return;
2921 	}
2922 
2923 	comp = page_rec_is_comp(rec_move[0].old_rec);
2924 
2925 	ut_ad(block->frame == page_align(rec_move[0].old_rec));
2926 	ut_ad(new_block->frame == page_align(rec_move[0].new_rec));
2927 	ut_ad(comp == page_rec_is_comp(rec_move[0].new_rec));
2928 
2929 	lock_mutex_enter();
2930 
2931 	for (lock = lock_sys.get_first(block->page.id());
2932 	     lock;
2933 	     lock = lock_rec_get_next_on_page(lock)) {
2934 		ulint		moved = 0;
2935 		const rec_t*	rec1;
2936 		const rec_t*	rec2;
2937 		const auto	type_mode = lock->type_mode;
2938 
2939 		/* Copy lock requests on user records to new page and
2940 		reset the lock bits on the old */
2941 
2942 		while (moved < num_move) {
2943 			ulint	rec1_heap_no;
2944 			ulint	rec2_heap_no;
2945 
2946 			rec1 = rec_move[moved].old_rec;
2947 			rec2 = rec_move[moved].new_rec;
2948 			ut_ad(!page_rec_is_metadata(rec1));
2949 			ut_ad(!page_rec_is_metadata(rec2));
2950 
2951 			if (comp) {
2952 				rec1_heap_no = rec_get_heap_no_new(rec1);
2953 				rec2_heap_no = rec_get_heap_no_new(rec2);
2954 
2955 			} else {
2956 				rec1_heap_no = rec_get_heap_no_old(rec1);
2957 				rec2_heap_no = rec_get_heap_no_old(rec2);
2958 
2959 				ut_ad(!memcmp(rec1, rec2,
2960 					      rec_get_data_size_old(rec2)));
2961 			}
2962 
2963 			if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2964 			    && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2965 				if (type_mode & LOCK_WAIT) {
2966 					ut_ad(lock->trx->lock.wait_lock
2967 					    == lock);
2968 					lock->type_mode&= ~LOCK_WAIT;
2969 				}
2970 
2971 				lock_rec_add_to_queue(
2972 					type_mode, new_block, rec2_heap_no,
2973 					lock->index, lock->trx, FALSE);
2974 
2975 				rec_move[moved].moved = true;
2976 			}
2977 
2978 			moved++;
2979 		}
2980 	}
2981 
2982 	lock_mutex_exit();
2983 
2984 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2985 	ut_ad(lock_rec_validate_page(block));
2986 #endif
2987 }
2988 /*************************************************************//**
2989 Updates the lock table when a page is split to the right. */
2990 void
lock_update_split_right(const buf_block_t * right_block,const buf_block_t * left_block)2991 lock_update_split_right(
2992 /*====================*/
2993 	const buf_block_t*	right_block,	/*!< in: right page */
2994 	const buf_block_t*	left_block)	/*!< in: left page */
2995 {
2996 	ulint	heap_no = lock_get_min_heap_no(right_block);
2997 
2998 	lock_mutex_enter();
2999 
3000 	/* Move the locks on the supremum of the left page to the supremum
3001 	of the right page */
3002 
3003 	lock_rec_move(right_block, left_block,
3004 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3005 
3006 	/* Inherit the locks to the supremum of left page from the successor
3007 	of the infimum on right page */
3008 
3009 	lock_rec_inherit_to_gap(left_block, right_block,
3010 				PAGE_HEAP_NO_SUPREMUM, heap_no);
3011 
3012 	lock_mutex_exit();
3013 }
3014 
3015 /*************************************************************//**
3016 Updates the lock table when a page is merged to the right. */
3017 void
lock_update_merge_right(const buf_block_t * right_block,const rec_t * orig_succ,const buf_block_t * left_block)3018 lock_update_merge_right(
3019 /*====================*/
3020 	const buf_block_t*	right_block,	/*!< in: right page to
3021 						which merged */
3022 	const rec_t*		orig_succ,	/*!< in: original
3023 						successor of infimum
3024 						on the right page
3025 						before merge */
3026 	const buf_block_t*	left_block)	/*!< in: merged index
3027 						page which will be
3028 						discarded */
3029 {
3030 	ut_ad(!page_rec_is_metadata(orig_succ));
3031 
3032 	lock_mutex_enter();
3033 
3034 	/* Inherit the locks from the supremum of the left page to the
3035 	original successor of infimum on the right page, to which the left
3036 	page was merged */
3037 
3038 	lock_rec_inherit_to_gap(right_block, left_block,
3039 				page_rec_get_heap_no(orig_succ),
3040 				PAGE_HEAP_NO_SUPREMUM);
3041 
3042 	/* Reset the locks on the supremum of the left page, releasing
3043 	waiting transactions */
3044 
3045 	lock_rec_reset_and_release_wait_low(
3046 		&lock_sys.rec_hash, left_block, PAGE_HEAP_NO_SUPREMUM);
3047 
3048 	/* there should exist no page lock on the left page,
3049 	otherwise, it will be blocked from merge */
3050 	ut_ad(!lock_sys.get_first_prdt_page(left_block->page.id()));
3051 
3052 	lock_rec_free_all_from_discard_page(left_block);
3053 
3054 	lock_mutex_exit();
3055 }
3056 
3057 /*************************************************************//**
3058 Updates the lock table when the root page is copied to another in
3059 btr_root_raise_and_insert. Note that we leave lock structs on the
3060 root page, even though they do not make sense on other than leaf
3061 pages: the reason is that in a pessimistic update the infimum record
3062 of the root page will act as a dummy carrier of the locks of the record
3063 to be updated. */
3064 void
lock_update_root_raise(const buf_block_t * block,const buf_block_t * root)3065 lock_update_root_raise(
3066 /*===================*/
3067 	const buf_block_t*	block,	/*!< in: index page to which copied */
3068 	const buf_block_t*	root)	/*!< in: root page */
3069 {
3070 	lock_mutex_enter();
3071 
3072 	/* Move the locks on the supremum of the root to the supremum
3073 	of block */
3074 
3075 	lock_rec_move(block, root,
3076 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3077 	lock_mutex_exit();
3078 }
3079 
3080 /*************************************************************//**
3081 Updates the lock table when a page is copied to another and the original page
3082 is removed from the chain of leaf pages, except if page is the root! */
3083 void
lock_update_copy_and_discard(const buf_block_t * new_block,const buf_block_t * block)3084 lock_update_copy_and_discard(
3085 /*=========================*/
3086 	const buf_block_t*	new_block,	/*!< in: index page to
3087 						which copied */
3088 	const buf_block_t*	block)		/*!< in: index page;
3089 						NOT the root! */
3090 {
3091 	lock_mutex_enter();
3092 
3093 	/* Move the locks on the supremum of the old page to the supremum
3094 	of new_page */
3095 
3096 	lock_rec_move(new_block, block,
3097 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3098 	lock_rec_free_all_from_discard_page(block);
3099 
3100 	lock_mutex_exit();
3101 }
3102 
3103 /*************************************************************//**
3104 Updates the lock table when a page is split to the left. */
3105 void
lock_update_split_left(const buf_block_t * right_block,const buf_block_t * left_block)3106 lock_update_split_left(
3107 /*===================*/
3108 	const buf_block_t*	right_block,	/*!< in: right page */
3109 	const buf_block_t*	left_block)	/*!< in: left page */
3110 {
3111 	ulint	heap_no = lock_get_min_heap_no(right_block);
3112 
3113 	lock_mutex_enter();
3114 
3115 	/* Inherit the locks to the supremum of the left page from the
3116 	successor of the infimum on the right page */
3117 
3118 	lock_rec_inherit_to_gap(left_block, right_block,
3119 				PAGE_HEAP_NO_SUPREMUM, heap_no);
3120 
3121 	lock_mutex_exit();
3122 }
3123 
3124 /*************************************************************//**
3125 Updates the lock table when a page is merged to the left. */
3126 void
lock_update_merge_left(const buf_block_t * left_block,const rec_t * orig_pred,const buf_block_t * right_block)3127 lock_update_merge_left(
3128 /*===================*/
3129 	const buf_block_t*	left_block,	/*!< in: left page to
3130 						which merged */
3131 	const rec_t*		orig_pred,	/*!< in: original predecessor
3132 						of supremum on the left page
3133 						before merge */
3134 	const buf_block_t*	right_block)	/*!< in: merged index page
3135 						which will be discarded */
3136 {
3137 	const rec_t*	left_next_rec;
3138 
3139 	ut_ad(left_block->frame == page_align(orig_pred));
3140 
3141 	lock_mutex_enter();
3142 
3143 	left_next_rec = page_rec_get_next_const(orig_pred);
3144 
3145 	if (!page_rec_is_supremum(left_next_rec)) {
3146 
3147 		/* Inherit the locks on the supremum of the left page to the
3148 		first record which was moved from the right page */
3149 
3150 		lock_rec_inherit_to_gap(left_block, left_block,
3151 					page_rec_get_heap_no(left_next_rec),
3152 					PAGE_HEAP_NO_SUPREMUM);
3153 
3154 		/* Reset the locks on the supremum of the left page,
3155 		releasing waiting transactions */
3156 
3157 		lock_rec_reset_and_release_wait_low(
3158 			&lock_sys.rec_hash, left_block, PAGE_HEAP_NO_SUPREMUM);
3159 	}
3160 
3161 	/* Move the locks from the supremum of right page to the supremum
3162 	of the left page */
3163 
3164 	lock_rec_move(left_block, right_block,
3165 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3166 
3167 	/* there should exist no page lock on the right page,
3168 	otherwise, it will be blocked from merge */
3169 	ut_ad(!lock_sys.get_first_prdt_page(right_block->page.id()));
3170 
3171 	lock_rec_free_all_from_discard_page(right_block);
3172 
3173 	lock_mutex_exit();
3174 }
3175 
3176 /*************************************************************//**
3177 Resets the original locks on heir and replaces them with gap type locks
3178 inherited from rec. */
3179 void
lock_rec_reset_and_inherit_gap_locks(const buf_block_t * heir_block,const buf_block_t * block,ulint heir_heap_no,ulint heap_no)3180 lock_rec_reset_and_inherit_gap_locks(
3181 /*=================================*/
3182 	const buf_block_t*	heir_block,	/*!< in: block containing the
3183 						record which inherits */
3184 	const buf_block_t*	block,		/*!< in: block containing the
3185 						record from which inherited;
3186 						does NOT reset the locks on
3187 						this record */
3188 	ulint			heir_heap_no,	/*!< in: heap_no of the
3189 						inheriting record */
3190 	ulint			heap_no)	/*!< in: heap_no of the
3191 						donating record */
3192 {
3193 	lock_mutex_enter();
3194 
3195 	lock_rec_reset_and_release_wait(heir_block, heir_heap_no);
3196 
3197 	lock_rec_inherit_to_gap(heir_block, block, heir_heap_no, heap_no);
3198 
3199 	lock_mutex_exit();
3200 }
3201 
3202 /*************************************************************//**
3203 Updates the lock table when a page is discarded. */
3204 void
lock_update_discard(const buf_block_t * heir_block,ulint heir_heap_no,const buf_block_t * block)3205 lock_update_discard(
3206 /*================*/
3207 	const buf_block_t*	heir_block,	/*!< in: index page
3208 						which will inherit the locks */
3209 	ulint			heir_heap_no,	/*!< in: heap_no of the record
3210 						which will inherit the locks */
3211 	const buf_block_t*	block)		/*!< in: index page
3212 						which will be discarded */
3213 {
3214 	const page_t*	page = block->frame;
3215 	const rec_t*	rec;
3216 	ulint		heap_no;
3217 	const page_id_t	page_id(block->page.id());
3218 
3219 	lock_mutex_enter();
3220 
3221 	if (lock_sys.get_first(page_id)) {
3222 		ut_ad(!lock_sys.get_first_prdt(page_id));
3223 		ut_ad(!lock_sys.get_first_prdt_page(page_id));
3224 		/* Inherit all the locks on the page to the record and
3225 		reset all the locks on the page */
3226 
3227 		if (page_is_comp(page)) {
3228 			rec = page + PAGE_NEW_INFIMUM;
3229 
3230 			do {
3231 				heap_no = rec_get_heap_no_new(rec);
3232 
3233 				lock_rec_inherit_to_gap(heir_block, block,
3234 							heir_heap_no, heap_no);
3235 
3236 				lock_rec_reset_and_release_wait(
3237 					block, heap_no);
3238 
3239 				rec = page + rec_get_next_offs(rec, TRUE);
3240 			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
3241 		} else {
3242 			rec = page + PAGE_OLD_INFIMUM;
3243 
3244 			do {
3245 				heap_no = rec_get_heap_no_old(rec);
3246 
3247 				lock_rec_inherit_to_gap(heir_block, block,
3248 							heir_heap_no, heap_no);
3249 
3250 				lock_rec_reset_and_release_wait(
3251 					block, heap_no);
3252 
3253 				rec = page + rec_get_next_offs(rec, FALSE);
3254 			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
3255 		}
3256 
3257 		lock_rec_free_all_from_discard_page_low(page_id,
3258 							&lock_sys.rec_hash);
3259 	} else {
3260 		lock_rec_free_all_from_discard_page_low(page_id,
3261 							&lock_sys.prdt_hash);
3262 		lock_rec_free_all_from_discard_page_low(
3263 			page_id, &lock_sys.prdt_page_hash);
3264 	}
3265 
3266 	lock_mutex_exit();
3267 }
3268 
3269 /*************************************************************//**
3270 Updates the lock table when a new user record is inserted. */
3271 void
lock_update_insert(const buf_block_t * block,const rec_t * rec)3272 lock_update_insert(
3273 /*===============*/
3274 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3275 	const rec_t*		rec)	/*!< in: the inserted record */
3276 {
3277 	ulint	receiver_heap_no;
3278 	ulint	donator_heap_no;
3279 
3280 	ut_ad(block->frame == page_align(rec));
3281 	ut_ad(!page_rec_is_metadata(rec));
3282 
3283 	/* Inherit the gap-locking locks for rec, in gap mode, from the next
3284 	record */
3285 
3286 	if (page_rec_is_comp(rec)) {
3287 		receiver_heap_no = rec_get_heap_no_new(rec);
3288 		donator_heap_no = rec_get_heap_no_new(
3289 			page_rec_get_next_low(rec, TRUE));
3290 	} else {
3291 		receiver_heap_no = rec_get_heap_no_old(rec);
3292 		donator_heap_no = rec_get_heap_no_old(
3293 			page_rec_get_next_low(rec, FALSE));
3294 	}
3295 
3296 	lock_rec_inherit_to_gap_if_gap_lock(
3297 		block, receiver_heap_no, donator_heap_no);
3298 }
3299 
3300 /*************************************************************//**
3301 Updates the lock table when a record is removed. */
3302 void
lock_update_delete(const buf_block_t * block,const rec_t * rec)3303 lock_update_delete(
3304 /*===============*/
3305 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3306 	const rec_t*		rec)	/*!< in: the record to be removed */
3307 {
3308 	const page_t*	page = block->frame;
3309 	ulint		heap_no;
3310 	ulint		next_heap_no;
3311 
3312 	ut_ad(page == page_align(rec));
3313 	ut_ad(!page_rec_is_metadata(rec));
3314 
3315 	if (page_is_comp(page)) {
3316 		heap_no = rec_get_heap_no_new(rec);
3317 		next_heap_no = rec_get_heap_no_new(page
3318 						   + rec_get_next_offs(rec,
3319 								       TRUE));
3320 	} else {
3321 		heap_no = rec_get_heap_no_old(rec);
3322 		next_heap_no = rec_get_heap_no_old(page
3323 						   + rec_get_next_offs(rec,
3324 								       FALSE));
3325 	}
3326 
3327 	lock_mutex_enter();
3328 
3329 	/* Let the next record inherit the locks from rec, in gap mode */
3330 
3331 	lock_rec_inherit_to_gap(block, block, next_heap_no, heap_no);
3332 
3333 	/* Reset the lock bits on rec and release waiting transactions */
3334 
3335 	lock_rec_reset_and_release_wait(block, heap_no);
3336 
3337 	lock_mutex_exit();
3338 }
3339 
3340 /*********************************************************************//**
3341 Stores on the page infimum record the explicit locks of another record.
3342 This function is used to store the lock state of a record when it is
3343 updated and the size of the record changes in the update. The record
3344 is moved in such an update, perhaps to another page. The infimum record
3345 acts as a dummy carrier record, taking care of lock releases while the
3346 actual record is being moved. */
3347 void
lock_rec_store_on_page_infimum(const buf_block_t * block,const rec_t * rec)3348 lock_rec_store_on_page_infimum(
3349 /*===========================*/
3350 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3351 	const rec_t*		rec)	/*!< in: record whose lock state
3352 					is stored on the infimum
3353 					record of the same page; lock
3354 					bits are reset on the
3355 					record */
3356 {
3357 	ulint	heap_no = page_rec_get_heap_no(rec);
3358 
3359 	ut_ad(block->frame == page_align(rec));
3360 
3361 	lock_mutex_enter();
3362 
3363 	lock_rec_move(block, block, PAGE_HEAP_NO_INFIMUM, heap_no);
3364 
3365 	lock_mutex_exit();
3366 }
3367 
3368 /*********************************************************************//**
3369 Restores the state of explicit lock requests on a single record, where the
3370 state was stored on the infimum of the page. */
3371 void
lock_rec_restore_from_page_infimum(const buf_block_t * block,const rec_t * rec,const buf_block_t * donator)3372 lock_rec_restore_from_page_infimum(
3373 /*===============================*/
3374 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3375 	const rec_t*		rec,	/*!< in: record whose lock state
3376 					is restored */
3377 	const buf_block_t*	donator)/*!< in: page (rec is not
3378 					necessarily on this page)
3379 					whose infimum stored the lock
3380 					state; lock bits are reset on
3381 					the infimum */
3382 {
3383 	ulint	heap_no = page_rec_get_heap_no(rec);
3384 
3385 	lock_mutex_enter();
3386 
3387 	lock_rec_move(block, donator, heap_no, PAGE_HEAP_NO_INFIMUM);
3388 
3389 	lock_mutex_exit();
3390 }
3391 
3392 /*========================= TABLE LOCKS ==============================*/
3393 
3394 /** Functor for accessing the embedded node within a table lock. */
3395 struct TableLockGetNode {
operator ()TableLockGetNode3396 	ut_list_node<lock_t>& operator() (lock_t& elem)
3397 	{
3398 		return(elem.un_member.tab_lock.locks);
3399 	}
3400 };
3401 
3402 /*********************************************************************//**
3403 Creates a table lock object and adds it as the last in the lock queue
3404 of the table. Does NOT check for deadlocks or lock compatibility.
3405 @return own: new lock object */
3406 UNIV_INLINE
3407 lock_t*
lock_table_create(dict_table_t * table,unsigned type_mode,trx_t * trx,lock_t * c_lock=NULL)3408 lock_table_create(
3409 /*==============*/
3410 	dict_table_t*	table,	/*!< in/out: database table
3411 				in dictionary cache */
3412 	unsigned	type_mode,/*!< in: lock mode possibly ORed with
3413 				LOCK_WAIT */
3414 	trx_t*		trx,	/*!< in: trx */
3415 	lock_t*	c_lock = NULL	/*!< in: conflicting lock */
3416 	)
3417 {
3418 	lock_t*		lock;
3419 
3420 	ut_ad(table && trx);
3421 	ut_ad(lock_mutex_own());
3422 	ut_ad(trx_mutex_own(trx));
3423 	ut_ad(trx->is_recovered || trx->state == TRX_STATE_ACTIVE);
3424 	ut_ad(!trx->auto_commit || trx->will_lock);
3425 
3426 	if ((type_mode & LOCK_MODE_MASK) == LOCK_AUTO_INC) {
3427 		++table->n_waiting_or_granted_auto_inc_locks;
3428 	}
3429 
3430 	/* For AUTOINC locking we reuse the lock instance only if
3431 	there is no wait involved else we allocate the waiting lock
3432 	from the transaction lock heap. */
3433 	if (type_mode == LOCK_AUTO_INC) {
3434 
3435 		lock = table->autoinc_lock;
3436 
3437 		table->autoinc_trx = trx;
3438 
3439 		ib_vector_push(trx->autoinc_locks, &lock);
3440 
3441 	} else if (trx->lock.table_cached
3442 		   < UT_ARR_SIZE(trx->lock.table_pool)) {
3443 		lock = &trx->lock.table_pool[trx->lock.table_cached++];
3444 	} else {
3445 
3446 		lock = static_cast<lock_t*>(
3447 			mem_heap_alloc(trx->lock.lock_heap, sizeof(*lock)));
3448 
3449 	}
3450 
3451 	lock->type_mode = ib_uint32_t(type_mode | LOCK_TABLE);
3452 	lock->trx = trx;
3453 
3454 	lock->un_member.tab_lock.table = table;
3455 
3456 	ut_ad(table->get_ref_count() > 0 || !table->can_be_evicted);
3457 
3458 	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
3459 
3460 #ifdef WITH_WSREP
3461 	if (c_lock && trx->is_wsrep()) {
3462 		if (wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
3463 			ut_list_insert(table->locks, c_lock, lock,
3464 				       TableLockGetNode());
3465 			if (UNIV_UNLIKELY(wsrep_debug)) {
3466 				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
3467 				wsrep_report_bf_lock_wait(c_lock->trx->mysql_thd, c_lock->trx->id);
3468 			}
3469 		} else {
3470 			ut_list_append(table->locks, lock, TableLockGetNode());
3471 		}
3472 
3473 		trx_mutex_enter(c_lock->trx);
3474 
3475 		if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
3476 			c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
3477 
3478 			if (UNIV_UNLIKELY(wsrep_debug)) {
3479 				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
3480 				wsrep_report_bf_lock_wait(c_lock->trx->mysql_thd, c_lock->trx->id);
3481 				wsrep_print_wait_locks(c_lock);
3482 			}
3483 
3484 			/* The lock release will call lock_grant(),
3485 			which would acquire trx->mutex again. */
3486 			trx_mutex_exit(trx);
3487 			lock_cancel_waiting_and_release(
3488 				c_lock->trx->lock.wait_lock);
3489 			trx_mutex_enter(trx);
3490 		}
3491 
3492 		trx_mutex_exit(c_lock->trx);
3493 	} else
3494 #endif /* WITH_WSREP */
3495 	ut_list_append(table->locks, lock, TableLockGetNode());
3496 
3497 	if (type_mode & LOCK_WAIT) {
3498 		lock_set_lock_and_trx_wait(lock, trx, c_lock);
3499 	}
3500 
3501 	lock->trx->lock.table_locks.push_back(lock);
3502 
3503 	MONITOR_INC(MONITOR_TABLELOCK_CREATED);
3504 	MONITOR_INC(MONITOR_NUM_TABLELOCK);
3505 
3506 	return(lock);
3507 }
3508 
3509 /*************************************************************//**
3510 Pops autoinc lock requests from the transaction's autoinc_locks. We
3511 handle the case where there are gaps in the array and they need to
3512 be popped off the stack. */
3513 UNIV_INLINE
3514 void
lock_table_pop_autoinc_locks(trx_t * trx)3515 lock_table_pop_autoinc_locks(
3516 /*=========================*/
3517 	trx_t*	trx)	/*!< in/out: transaction that owns the AUTOINC locks */
3518 {
3519 	ut_ad(lock_mutex_own());
3520 	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3521 
3522 	/* Skip any gaps, gaps are NULL lock entries in the
3523 	trx->autoinc_locks vector. */
3524 
3525 	do {
3526 		ib_vector_pop(trx->autoinc_locks);
3527 
3528 		if (ib_vector_is_empty(trx->autoinc_locks)) {
3529 			return;
3530 		}
3531 
3532 	} while (*(lock_t**) ib_vector_get_last(trx->autoinc_locks) == NULL);
3533 }
3534 
3535 /*************************************************************//**
3536 Removes an autoinc lock request from the transaction's autoinc_locks. */
3537 UNIV_INLINE
3538 void
lock_table_remove_autoinc_lock(lock_t * lock,trx_t * trx)3539 lock_table_remove_autoinc_lock(
3540 /*===========================*/
3541 	lock_t*	lock,	/*!< in: table lock */
3542 	trx_t*	trx)	/*!< in/out: transaction that owns the lock */
3543 {
3544 	lock_t*	autoinc_lock;
3545 	lint	i = ib_vector_size(trx->autoinc_locks) - 1;
3546 
3547 	ut_ad(lock_mutex_own());
3548 	ut_ad(lock_get_mode(lock) == LOCK_AUTO_INC);
3549 	ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
3550 	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3551 
3552 	/* With stored functions and procedures the user may drop
3553 	a table within the same "statement". This special case has
3554 	to be handled by deleting only those AUTOINC locks that were
3555 	held by the table being dropped. */
3556 
3557 	autoinc_lock = *static_cast<lock_t**>(
3558 		ib_vector_get(trx->autoinc_locks, i));
3559 
3560 	/* This is the default fast case. */
3561 
3562 	if (autoinc_lock == lock) {
3563 		lock_table_pop_autoinc_locks(trx);
3564 	} else {
3565 		/* The last element should never be NULL */
3566 		ut_a(autoinc_lock != NULL);
3567 
3568 		/* Handle freeing the locks from within the stack. */
3569 
3570 		while (--i >= 0) {
3571 			autoinc_lock = *static_cast<lock_t**>(
3572 				ib_vector_get(trx->autoinc_locks, i));
3573 
3574 			if (autoinc_lock == lock) {
3575 				void*	null_var = NULL;
3576 				ib_vector_set(trx->autoinc_locks, i, &null_var);
3577 				return;
3578 			}
3579 		}
3580 
3581 		/* Must find the autoinc lock. */
3582 		ut_error;
3583 	}
3584 }
3585 
3586 /*************************************************************//**
3587 Removes a table lock request from the queue and the trx list of locks;
3588 this is a low-level function which does NOT check if waiting requests
3589 can now be granted. */
3590 UNIV_INLINE
3591 void
lock_table_remove_low(lock_t * lock)3592 lock_table_remove_low(
3593 /*==================*/
3594 	lock_t*	lock)	/*!< in/out: table lock */
3595 {
3596 	trx_t*		trx;
3597 	dict_table_t*	table;
3598 
3599 	ut_ad(lock_mutex_own());
3600 
3601 	trx = lock->trx;
3602 	table = lock->un_member.tab_lock.table;
3603 
3604 	/* Remove the table from the transaction's AUTOINC vector, if
3605 	the lock that is being released is an AUTOINC lock. */
3606 	if (lock_get_mode(lock) == LOCK_AUTO_INC) {
3607 
3608 		/* The table's AUTOINC lock can get transferred to
3609 		another transaction before we get here. */
3610 		if (table->autoinc_trx == trx) {
3611 			table->autoinc_trx = NULL;
3612 		}
3613 
3614 		/* The locks must be freed in the reverse order from
3615 		the one in which they were acquired. This is to avoid
3616 		traversing the AUTOINC lock vector unnecessarily.
3617 
3618 		We only store locks that were granted in the
3619 		trx->autoinc_locks vector (see lock_table_create()
3620 		and lock_grant()). Therefore it can be empty and we
3621 		need to check for that. */
3622 
3623 		if (!lock_get_wait(lock)
3624 		    && !ib_vector_is_empty(trx->autoinc_locks)) {
3625 
3626 			lock_table_remove_autoinc_lock(lock, trx);
3627 		}
3628 
3629 		ut_a(table->n_waiting_or_granted_auto_inc_locks > 0);
3630 		table->n_waiting_or_granted_auto_inc_locks--;
3631 	}
3632 
3633 	UT_LIST_REMOVE(trx->lock.trx_locks, lock);
3634 	ut_list_remove(table->locks, lock, TableLockGetNode());
3635 
3636 	MONITOR_INC(MONITOR_TABLELOCK_REMOVED);
3637 	MONITOR_DEC(MONITOR_NUM_TABLELOCK);
3638 }
3639 
3640 /*********************************************************************//**
3641 Enqueues a waiting request for a table lock which cannot be granted
3642 immediately. Checks for deadlocks.
3643 @retval	DB_LOCK_WAIT	if the waiting lock was enqueued
3644 @retval	DB_DEADLOCK	if this transaction was chosen as the victim
3645 @retval	DB_SUCCESS	if the other transaction committed or aborted */
3646 static
3647 dberr_t
lock_table_enqueue_waiting(unsigned mode,dict_table_t * table,que_thr_t * thr,lock_t * c_lock)3648 lock_table_enqueue_waiting(
3649 /*=======================*/
3650 	unsigned	mode,	/*!< in: lock mode this transaction is
3651 				requesting */
3652 	dict_table_t*	table,	/*!< in/out: table */
3653 	que_thr_t*	thr,	/*!< in: query thread */
3654 	lock_t*	c_lock	/*!< in: conflicting lock or NULL */
3655 )
3656 {
3657 	trx_t*		trx;
3658 	lock_t*		lock;
3659 
3660 	ut_ad(lock_mutex_own());
3661 	ut_ad(!srv_read_only_mode);
3662 
3663 	trx = thr_get_trx(thr);
3664 	ut_ad(trx_mutex_own(trx));
3665 	ut_a(!que_thr_stop(thr));
3666 
3667 	switch (trx_get_dict_operation(trx)) {
3668 	case TRX_DICT_OP_NONE:
3669 		break;
3670 	case TRX_DICT_OP_TABLE:
3671 	case TRX_DICT_OP_INDEX:
3672 		ib::error() << "A table lock wait happens in a dictionary"
3673 			" operation. Table " << table->name
3674 			<< ". " << BUG_REPORT_MSG;
3675 		ut_ad(0);
3676 	}
3677 
3678 #ifdef WITH_WSREP
3679 	if (trx->is_wsrep() && trx->lock.was_chosen_as_deadlock_victim) {
3680 		return(DB_DEADLOCK);
3681 	}
3682 #endif /* WITH_WSREP */
3683 
3684 	/* Enqueue the lock request that will wait to be granted */
3685 	lock = lock_table_create(table, mode | LOCK_WAIT, trx, c_lock);
3686 
3687 	const trx_t*	victim_trx =
3688 		DeadlockChecker::check_and_resolve(lock, trx);
3689 
3690 	if (victim_trx != 0) {
3691 		ut_ad(victim_trx == trx);
3692 
3693 		/* The order here is important, we don't want to
3694 		lose the state of the lock before calling remove. */
3695 		lock_table_remove_low(lock);
3696 		lock_reset_lock_and_trx_wait(lock);
3697 
3698 		return(DB_DEADLOCK);
3699 
3700 	} else if (trx->lock.wait_lock == NULL) {
3701 		/* Deadlock resolution chose another transaction as a victim,
3702 		and we accidentally got our lock granted! */
3703 
3704 		return(DB_SUCCESS);
3705 	}
3706 
3707 	trx->lock.que_state = TRX_QUE_LOCK_WAIT;
3708 
3709 	trx->lock.wait_started = time(NULL);
3710 	trx->lock.was_chosen_as_deadlock_victim = false;
3711 
3712 	ut_a(que_thr_stop(thr));
3713 
3714 	MONITOR_INC(MONITOR_TABLELOCK_WAIT);
3715 
3716 	return(DB_LOCK_WAIT);
3717 }
3718 
3719 /*********************************************************************//**
3720 Checks if other transactions have an incompatible mode lock request in
3721 the lock queue.
3722 @return lock or NULL */
3723 UNIV_INLINE
3724 lock_t*
lock_table_other_has_incompatible(const trx_t * trx,ulint wait,const dict_table_t * table,lock_mode mode)3725 lock_table_other_has_incompatible(
3726 /*==============================*/
3727 	const trx_t*		trx,	/*!< in: transaction, or NULL if all
3728 					transactions should be included */
3729 	ulint			wait,	/*!< in: LOCK_WAIT if also
3730 					waiting locks are taken into
3731 					account, or 0 if not */
3732 	const dict_table_t*	table,	/*!< in: table */
3733 	lock_mode		mode)	/*!< in: lock mode */
3734 {
3735 	lock_t*	lock;
3736 
3737 	ut_ad(lock_mutex_own());
3738 
3739 	for (lock = UT_LIST_GET_LAST(table->locks);
3740 	     lock != NULL;
3741 	     lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock)) {
3742 
3743 		if (lock->trx != trx
3744 		    && !lock_mode_compatible(lock_get_mode(lock), mode)
3745 		    && (wait || !lock_get_wait(lock))) {
3746 
3747 #ifdef WITH_WSREP
3748 			if (lock->trx->is_wsrep()) {
3749 				if (UNIV_UNLIKELY(wsrep_debug)) {
3750 					ib::info() << "WSREP: table lock abort for table:"
3751 						   << table->name;
3752 					ib::info() << " SQL: "
3753 					   << wsrep_thd_query(lock->trx->mysql_thd);
3754 				}
3755 				trx_mutex_enter(lock->trx);
3756 				wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
3757 				trx_mutex_exit(lock->trx);
3758 			}
3759 #endif /* WITH_WSREP */
3760 
3761 			return(lock);
3762 		}
3763 	}
3764 
3765 	return(NULL);
3766 }
3767 
3768 /*********************************************************************//**
3769 Locks the specified database table in the mode given. If the lock cannot
3770 be granted immediately, the query thread is put to wait.
3771 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
3772 dberr_t
lock_table(unsigned flags,dict_table_t * table,lock_mode mode,que_thr_t * thr)3773 lock_table(
3774 /*=======*/
3775 	unsigned	flags,	/*!< in: if BTR_NO_LOCKING_FLAG bit is set,
3776 				does nothing */
3777 	dict_table_t*	table,	/*!< in/out: database table
3778 				in dictionary cache */
3779 	lock_mode	mode,	/*!< in: lock mode */
3780 	que_thr_t*	thr)	/*!< in: query thread */
3781 {
3782 	trx_t*		trx;
3783 	dberr_t		err;
3784 	lock_t*		wait_for;
3785 
3786 	ut_ad(table && thr);
3787 
3788 	/* Given limited visibility of temp-table we can avoid
3789 	locking overhead */
3790 	if ((flags & BTR_NO_LOCKING_FLAG)
3791 	    || srv_read_only_mode
3792 	    || table->is_temporary()) {
3793 
3794 		return(DB_SUCCESS);
3795 	}
3796 
3797 	ut_a(flags == 0);
3798 
3799 	trx = thr_get_trx(thr);
3800 
3801 	/* Look for equal or stronger locks the same trx already
3802 	has on the table. No need to acquire the lock mutex here
3803 	because only this transacton can add/access table locks
3804 	to/from trx_t::table_locks. */
3805 
3806 	if (lock_table_has(trx, table, mode)) {
3807 
3808 		return(DB_SUCCESS);
3809 	}
3810 
3811 	/* Read only transactions can write to temp tables, we don't want
3812 	to promote them to RW transactions. Their updates cannot be visible
3813 	to other transactions. Therefore we can keep them out
3814 	of the read views. */
3815 
3816 	if ((mode == LOCK_IX || mode == LOCK_X)
3817 	    && !trx->read_only
3818 	    && trx->rsegs.m_redo.rseg == 0) {
3819 
3820 		trx_set_rw_mode(trx);
3821 	}
3822 
3823 	lock_mutex_enter();
3824 
3825 	DBUG_EXECUTE_IF("fatal-semaphore-timeout",
3826 		{ os_thread_sleep(3600000000LL); });
3827 
3828 	/* We have to check if the new lock is compatible with any locks
3829 	other transactions have in the table lock queue. */
3830 
3831 	wait_for = lock_table_other_has_incompatible(
3832 		trx, LOCK_WAIT, table, mode);
3833 
3834 	trx_mutex_enter(trx);
3835 
3836 	/* Another trx has a request on the table in an incompatible
3837 	mode: this trx may have to wait */
3838 
3839 	if (wait_for != NULL) {
3840 		err = lock_table_enqueue_waiting(flags | mode, table,
3841 						 thr, wait_for);
3842 	} else {
3843 		lock_table_create(table, flags | mode, trx);
3844 
3845 		ut_a(!flags || mode == LOCK_S || mode == LOCK_X);
3846 
3847 		err = DB_SUCCESS;
3848 	}
3849 
3850 	lock_mutex_exit();
3851 
3852 	trx_mutex_exit(trx);
3853 
3854 	return(err);
3855 }
3856 
3857 /*********************************************************************//**
3858 Creates a table IX lock object for a resurrected transaction. */
3859 void
lock_table_ix_resurrect(dict_table_t * table,trx_t * trx)3860 lock_table_ix_resurrect(
3861 /*====================*/
3862 	dict_table_t*	table,	/*!< in/out: table */
3863 	trx_t*		trx)	/*!< in/out: transaction */
3864 {
3865 	ut_ad(trx->is_recovered);
3866 
3867 	if (lock_table_has(trx, table, LOCK_IX)) {
3868 		return;
3869 	}
3870 
3871 	lock_mutex_enter();
3872 
3873 	/* We have to check if the new lock is compatible with any locks
3874 	other transactions have in the table lock queue. */
3875 
3876 	ut_ad(!lock_table_other_has_incompatible(
3877 		      trx, LOCK_WAIT, table, LOCK_IX));
3878 
3879 	trx_mutex_enter(trx);
3880 	lock_table_create(table, LOCK_IX, trx);
3881 	lock_mutex_exit();
3882 	trx_mutex_exit(trx);
3883 }
3884 
3885 /*********************************************************************//**
3886 Checks if a waiting table lock request still has to wait in a queue.
3887 @return TRUE if still has to wait */
3888 static
3889 const lock_t*
lock_table_has_to_wait_in_queue(const lock_t * wait_lock)3890 lock_table_has_to_wait_in_queue(
3891 /*============================*/
3892 	const lock_t*	wait_lock)	/*!< in: waiting table lock */
3893 {
3894 	const dict_table_t*	table;
3895 	const lock_t*		lock;
3896 
3897 	ut_ad(lock_mutex_own());
3898 	ut_ad(lock_get_wait(wait_lock));
3899 
3900 	table = wait_lock->un_member.tab_lock.table;
3901 
3902 	for (lock = UT_LIST_GET_FIRST(table->locks);
3903 	     lock != wait_lock;
3904 	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3905 
3906 		if (lock_has_to_wait(wait_lock, lock)) {
3907 
3908 			return(lock);
3909 		}
3910 	}
3911 
3912 	return(NULL);
3913 }
3914 
3915 /*************************************************************//**
3916 Removes a table lock request, waiting or granted, from the queue and grants
3917 locks to other transactions in the queue, if they now are entitled to a
3918 lock. */
3919 static
3920 void
lock_table_dequeue(lock_t * in_lock)3921 lock_table_dequeue(
3922 /*===============*/
3923 	lock_t*	in_lock)/*!< in/out: table lock object; transactions waiting
3924 			behind will get their lock requests granted, if
3925 			they are now qualified to it */
3926 {
3927 	ut_ad(lock_mutex_own());
3928 	ut_a(lock_get_type_low(in_lock) == LOCK_TABLE);
3929 
3930 	lock_t*	lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
3931 
3932 	lock_table_remove_low(in_lock);
3933 
3934 	/* Check if waiting locks in the queue can now be granted: grant
3935 	locks if there are no conflicting locks ahead. */
3936 
3937 	for (/* No op */;
3938 	     lock != NULL;
3939 	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3940 
3941 		if (!lock_get_wait(lock))
3942 			continue;
3943 
3944 		ut_ad(lock->trx->lock.wait_trx);
3945 		ut_ad(lock->trx->lock.wait_lock);
3946 
3947 		if (const lock_t *c = lock_table_has_to_wait_in_queue(lock)) {
3948 			trx_mutex_enter(lock->trx);
3949 			lock->trx->lock.wait_trx = c->trx;
3950 			trx_mutex_exit(lock->trx);
3951 		} else {
3952 			/* Grant the lock */
3953 			ut_ad(in_lock->trx != lock->trx);
3954 			lock_grant(lock);
3955 		}
3956 	}
3957 }
3958 
3959 /** Sets a lock on a table based on the given mode.
3960 @param[in]	table	table to lock
3961 @param[in,out]	trx	transaction
3962 @param[in]	mode	LOCK_X or LOCK_S
3963 @return error code or DB_SUCCESS. */
3964 dberr_t
lock_table_for_trx(dict_table_t * table,trx_t * trx,enum lock_mode mode)3965 lock_table_for_trx(
3966 	dict_table_t*	table,
3967 	trx_t*		trx,
3968 	enum lock_mode	mode)
3969 {
3970 	mem_heap_t*	heap;
3971 	que_thr_t*	thr;
3972 	dberr_t		err;
3973 	sel_node_t*	node;
3974 	heap = mem_heap_create(512);
3975 
3976 	node = sel_node_create(heap);
3977 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
3978 	thr->graph->state = QUE_FORK_ACTIVE;
3979 
3980 	/* We use the select query graph as the dummy graph needed
3981 	in the lock module call */
3982 
3983 	thr = static_cast<que_thr_t*>(
3984 		que_fork_get_first_thr(
3985 			static_cast<que_fork_t*>(que_node_get_parent(thr))));
3986 
3987 	thr->start_running();
3988 
3989 run_again:
3990 	thr->run_node = thr;
3991 	thr->prev_node = thr->common.parent;
3992 
3993 	err = lock_table(0, table, mode, thr);
3994 
3995 	trx->error_state = err;
3996 
3997 	if (UNIV_LIKELY(err == DB_SUCCESS)) {
3998 		thr->stop_no_error();
3999 	} else {
4000 		que_thr_stop_for_mysql(thr);
4001 
4002 		if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
4003 			goto run_again;
4004 		}
4005 	}
4006 
4007 	que_graph_free(thr->graph);
4008 	trx->op_info = "";
4009 
4010 	return(err);
4011 }
4012 
4013 /*=========================== LOCK RELEASE ==============================*/
4014 static
4015 void
lock_grant_and_move_on_rec(lock_t * first_lock,ulint heap_no)4016 lock_grant_and_move_on_rec(
4017 	lock_t*			first_lock,
4018 	ulint			heap_no)
4019 {
4020 	lock_t*		lock;
4021 	const page_id_t	page_id(first_lock->un_member.rec_lock.page_id);
4022 	const ulint	rec_fold= page_id.fold();
4023 	lock_t*		previous = static_cast<lock_t*>(
4024 		lock_sys.rec_hash.array[lock_sys.hash(page_id)]
4025 		.node);
4026 	if (previous == NULL) {
4027 		return;
4028 	}
4029 	if (previous == first_lock) {
4030 		lock = previous;
4031 	} else {
4032 		while (previous->hash &&
4033 				previous->hash != first_lock) {
4034 			previous = previous->hash;
4035 	    }
4036 		lock = previous->hash;
4037 	}
4038 	/* Grant locks if there are no conflicting locks ahead.
4039 	 Move granted locks to the head of the list. */
4040 	while (lock) {
4041 		ut_ad(!lock->trx->is_wsrep());
4042 		/* If the lock is a wait lock on this page, and it does not need to wait. */
4043 		if (lock->un_member.rec_lock.page_id == page_id
4044 			&& lock_rec_get_nth_bit(lock, heap_no)
4045 			&& lock_get_wait(lock)
4046 			&& !lock_rec_has_to_wait_in_queue(lock)) {
4047 
4048 			lock_grant(lock);
4049 
4050 			if (previous != NULL) {
4051 				/* Move the lock to the head of the list. */
4052 				HASH_GET_NEXT(hash, previous) = HASH_GET_NEXT(hash, lock);
4053 				lock_rec_insert_to_head(lock, rec_fold);
4054 			} else {
4055 				/* Already at the head of the list. */
4056 				previous = lock;
4057 			}
4058 			/* Move on to the next lock. */
4059 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, previous));
4060 		} else {
4061 			previous = lock;
4062 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, lock));
4063 		}
4064 	}
4065 }
4066 
4067 /*************************************************************//**
4068 Removes a granted record lock of a transaction from the queue and grants
4069 locks to other transactions waiting in the queue if they now are entitled
4070 to a lock. */
4071 void
lock_rec_unlock(trx_t * trx,const buf_block_t * block,const rec_t * rec,lock_mode lock_mode)4072 lock_rec_unlock(
4073 /*============*/
4074 	trx_t*			trx,	/*!< in/out: transaction that has
4075 					set a record lock */
4076 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
4077 	const rec_t*		rec,	/*!< in: record */
4078 	lock_mode		lock_mode)/*!< in: LOCK_S or LOCK_X */
4079 {
4080 	lock_t*		first_lock;
4081 	lock_t*		lock;
4082 	ulint		heap_no;
4083 
4084 	ut_ad(trx);
4085 	ut_ad(rec);
4086 	ut_ad(block->frame == page_align(rec));
4087 	ut_ad(!trx->lock.wait_lock);
4088 	ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
4089 	ut_ad(!page_rec_is_metadata(rec));
4090 
4091 	heap_no = page_rec_get_heap_no(rec);
4092 
4093 	lock_mutex_enter();
4094 	trx_mutex_enter(trx);
4095 
4096 	first_lock = lock_rec_get_first(&lock_sys.rec_hash, block->page.id(),
4097 	    heap_no);
4098 
4099 	/* Find the last lock with the same lock_mode and transaction
4100 	on the record. */
4101 
4102 	for (lock = first_lock; lock != NULL;
4103 	     lock = lock_rec_get_next(heap_no, lock)) {
4104 		if (lock->trx == trx && lock_get_mode(lock) == lock_mode) {
4105 			goto released;
4106 		}
4107 	}
4108 
4109 	lock_mutex_exit();
4110 	trx_mutex_exit(trx);
4111 
4112 	{
4113 		ib::error	err;
4114 		err << "Unlock row could not find a " << lock_mode
4115 			<< " mode lock on the record. Current statement: ";
4116 		size_t		stmt_len;
4117 		if (const char* stmt = innobase_get_stmt_unsafe(
4118 			    trx->mysql_thd, &stmt_len)) {
4119 			err.write(stmt, stmt_len);
4120 		}
4121 	}
4122 
4123 	return;
4124 
4125 released:
4126 	ut_a(!lock_get_wait(lock));
4127 	lock_rec_reset_nth_bit(lock, heap_no);
4128 
4129 	if (innodb_lock_schedule_algorithm
4130 		== INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
4131 		thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
4132 
4133 		/* Check if we can now grant waiting lock requests */
4134 
4135 		for (lock = first_lock; lock != NULL;
4136 			 lock = lock_rec_get_next(heap_no, lock)) {
4137 			if (!lock_get_wait(lock)) {
4138 				continue;
4139 			}
4140 			ut_ad(lock->trx->lock.wait_trx);
4141 			ut_ad(lock->trx->lock.wait_lock);
4142 			if (const lock_t* c = lock_rec_has_to_wait_in_queue(
4143 			      lock)) {
4144 				if (lock->trx != trx)
4145 					trx_mutex_enter(lock->trx);
4146 				lock->trx->lock.wait_trx = c->trx;
4147 				if (lock->trx != trx)
4148 					trx_mutex_exit(lock->trx);
4149 			} else {
4150 				/* Grant the lock */
4151 				ut_ad(trx != lock->trx);
4152 				lock_grant(lock);
4153 			}
4154 		}
4155 	} else {
4156 		lock_grant_and_move_on_rec(first_lock, heap_no);
4157 	}
4158 
4159 	lock_mutex_exit();
4160 	trx_mutex_exit(trx);
4161 }
4162 
4163 #ifdef UNIV_DEBUG
4164 /*********************************************************************//**
4165 Check if a transaction that has X or IX locks has set the dict_op
4166 code correctly. */
4167 static
4168 void
lock_check_dict_lock(const lock_t * lock)4169 lock_check_dict_lock(
4170 /*==================*/
4171 	const lock_t*	lock)	/*!< in: lock to check */
4172 {
4173 	if (lock_get_type_low(lock) == LOCK_REC) {
4174 		ut_ad(!lock->index->table->is_temporary());
4175 
4176 		/* Check if the transcation locked a record
4177 		in a system table in X mode. It should have set
4178 		the dict_op code correctly if it did. */
4179 		if (lock->index->table->id < DICT_HDR_FIRST_ID
4180 		    && lock_get_mode(lock) == LOCK_X) {
4181 
4182 			ut_ad(lock_get_mode(lock) != LOCK_IX);
4183 			ut_ad(lock->trx->dict_operation != TRX_DICT_OP_NONE);
4184 		}
4185 	} else {
4186 		ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
4187 
4188 		const dict_table_t* table = lock->un_member.tab_lock.table;
4189 		ut_ad(!table->is_temporary());
4190 
4191 		/* Check if the transcation locked a system table
4192 		in IX mode. It should have set the dict_op code
4193 		correctly if it did. */
4194 		if (table->id < DICT_HDR_FIRST_ID
4195 		    && (lock_get_mode(lock) == LOCK_X
4196 			|| lock_get_mode(lock) == LOCK_IX)) {
4197 
4198 			ut_ad(lock->trx->dict_operation != TRX_DICT_OP_NONE);
4199 		}
4200 	}
4201 }
4202 #endif /* UNIV_DEBUG */
4203 
4204 /** Release the explicit locks of a committing transaction,
4205 and release possible other transactions waiting because of these locks. */
lock_release(trx_t * trx)4206 void lock_release(trx_t* trx)
4207 {
4208 #ifdef UNIV_DEBUG
4209 	std::set<table_id_t> to_evict;
4210 	if (innodb_evict_tables_on_commit_debug && !trx->is_recovered)
4211 # if 1 /* if dict_stats_exec_sql() were not playing dirty tricks */
4212 	if (!mutex_own(&dict_sys.mutex))
4213 # else /* this would be more proper way to do it */
4214 	if (!trx->dict_operation_lock_mode && !trx->dict_operation)
4215 # endif
4216 	for (const auto& p : trx->mod_tables)
4217 		if (!p.first->is_temporary())
4218 			to_evict.emplace(p.first->id);
4219 #endif
4220 	ulint		count = 0;
4221 	trx_id_t	max_trx_id = trx_sys.get_max_trx_id();
4222 
4223 	lock_mutex_enter();
4224 	ut_ad(!trx_mutex_own(trx));
4225 
4226 	for (lock_t* lock = UT_LIST_GET_LAST(trx->lock.trx_locks);
4227 	     lock != NULL;
4228 	     lock = UT_LIST_GET_LAST(trx->lock.trx_locks)) {
4229 
4230 		ut_d(lock_check_dict_lock(lock));
4231 
4232 		if (lock_get_type_low(lock) == LOCK_REC) {
4233 
4234 			lock_rec_dequeue_from_page(lock);
4235 		} else {
4236 			dict_table_t*	table;
4237 
4238 			table = lock->un_member.tab_lock.table;
4239 
4240 			if (lock_get_mode(lock) != LOCK_IS
4241 			    && trx->undo_no != 0) {
4242 
4243 				/* The trx may have modified the table. We
4244 				block the use of the MySQL query cache for
4245 				all currently active transactions. */
4246 
4247 				table->query_cache_inv_trx_id = max_trx_id;
4248 			}
4249 
4250 			lock_table_dequeue(lock);
4251 		}
4252 
4253 		if (count == LOCK_RELEASE_INTERVAL) {
4254 			/* Release the  mutex for a while, so that we
4255 			do not monopolize it */
4256 
4257 			lock_mutex_exit();
4258 
4259 			lock_mutex_enter();
4260 
4261 			count = 0;
4262 		}
4263 
4264 		++count;
4265 	}
4266 
4267 	lock_mutex_exit();
4268 
4269 #ifdef UNIV_DEBUG
4270 	if (to_evict.empty()) {
4271 		return;
4272 	}
4273 	mutex_enter(&dict_sys.mutex);
4274 	lock_mutex_enter();
4275 	for (table_id_t id : to_evict) {
4276 		if (dict_table_t *table = dict_table_open_on_id(
4277 			    id, TRUE, DICT_TABLE_OP_OPEN_ONLY_IF_CACHED)) {
4278 			if (!table->get_ref_count()
4279 			    && !UT_LIST_GET_LEN(table->locks)) {
4280 				dict_sys.remove(table, true);
4281 			}
4282 		}
4283 	}
4284 	lock_mutex_exit();
4285 	mutex_exit(&dict_sys.mutex);
4286 #endif
4287 }
4288 
4289 /** Release non-exclusive locks on XA PREPARE,
4290 and release possible other transactions waiting because of these locks. */
lock_release_on_prepare(trx_t * trx)4291 void lock_release_on_prepare(trx_t *trx)
4292 {
4293   ulint count= 0;
4294   lock_mutex_enter();
4295   ut_ad(!trx_mutex_own(trx));
4296 
4297   for (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; )
4298   {
4299     ut_ad(lock->trx == trx);
4300 
4301     if (lock_get_type_low(lock) == LOCK_REC)
4302     {
4303       ut_ad(!lock->index->table->is_temporary());
4304       if (lock_rec_get_gap(lock) || lock_get_mode(lock) != LOCK_X)
4305         lock_rec_dequeue_from_page(lock);
4306       else
4307       {
4308         ut_ad(trx->dict_operation ||
4309               lock->index->table->id >= DICT_HDR_FIRST_ID);
4310 retain_lock:
4311         lock= UT_LIST_GET_PREV(trx_locks, lock);
4312         continue;
4313       }
4314     }
4315     else
4316     {
4317       ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
4318       ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
4319       ut_ad(!table->is_temporary());
4320 
4321       switch (lock_get_mode(lock)) {
4322       case LOCK_IS:
4323       case LOCK_S:
4324         lock_table_dequeue(lock);
4325         break;
4326       case LOCK_IX:
4327       case LOCK_X:
4328         ut_ad(table->id >= DICT_HDR_FIRST_ID || trx->dict_operation);
4329         /* fall through */
4330       default:
4331         goto retain_lock;
4332       }
4333     }
4334 
4335     if (++count == LOCK_RELEASE_INTERVAL)
4336     {
4337       lock_mutex_exit();
4338       count= 0;
4339       lock_mutex_enter();
4340     }
4341 
4342     lock= UT_LIST_GET_LAST(trx->lock.trx_locks);
4343   }
4344 
4345   lock_mutex_exit();
4346 }
4347 
4348 /* True if a lock mode is S or X */
4349 #define IS_LOCK_S_OR_X(lock) \
4350 	(lock_get_mode(lock) == LOCK_S \
4351 	 || lock_get_mode(lock) == LOCK_X)
4352 
4353 /*********************************************************************//**
4354 Removes table locks of the transaction on a table to be dropped. */
4355 static
4356 void
lock_trx_table_locks_remove(const lock_t * lock_to_remove)4357 lock_trx_table_locks_remove(
4358 /*========================*/
4359 	const lock_t*	lock_to_remove)		/*!< in: lock to remove */
4360 {
4361 	trx_t*		trx = lock_to_remove->trx;
4362 
4363 	ut_ad(lock_mutex_own());
4364 
4365 	/* It is safe to read this because we are holding the lock mutex */
4366 	if (!trx->lock.cancel) {
4367 		trx_mutex_enter(trx);
4368 	} else {
4369 		ut_ad(trx_mutex_own(trx));
4370 	}
4371 
4372 	for (lock_list::iterator it = trx->lock.table_locks.begin(),
4373              end = trx->lock.table_locks.end(); it != end; ++it) {
4374 		const lock_t*	lock = *it;
4375 
4376 		ut_ad(!lock || trx == lock->trx);
4377 		ut_ad(!lock || lock_get_type_low(lock) & LOCK_TABLE);
4378 		ut_ad(!lock || lock->un_member.tab_lock.table);
4379 
4380 		if (lock == lock_to_remove) {
4381 			*it = NULL;
4382 
4383 			if (!trx->lock.cancel) {
4384 				trx_mutex_exit(trx);
4385 			}
4386 
4387 			return;
4388 		}
4389 	}
4390 
4391 	if (!trx->lock.cancel) {
4392 		trx_mutex_exit(trx);
4393 	}
4394 
4395 	/* Lock must exist in the vector. */
4396 	ut_error;
4397 }
4398 
4399 /*===================== VALIDATION AND DEBUGGING ====================*/
4400 
4401 /** Print info of a table lock.
4402 @param[in,out]	file	output stream
4403 @param[in]	lock	table lock */
4404 static
4405 void
lock_table_print(FILE * file,const lock_t * lock)4406 lock_table_print(FILE* file, const lock_t* lock)
4407 {
4408 	ut_ad(lock_mutex_own());
4409 	ut_a(lock_get_type_low(lock) == LOCK_TABLE);
4410 
4411 	fputs("TABLE LOCK table ", file);
4412 	ut_print_name(file, lock->trx,
4413 		      lock->un_member.tab_lock.table->name.m_name);
4414 	fprintf(file, " trx id " TRX_ID_FMT, trx_get_id_for_print(lock->trx));
4415 
4416 	if (lock_get_mode(lock) == LOCK_S) {
4417 		fputs(" lock mode S", file);
4418 	} else if (lock_get_mode(lock) == LOCK_X) {
4419 		ut_ad(lock->trx->id != 0);
4420 		fputs(" lock mode X", file);
4421 	} else if (lock_get_mode(lock) == LOCK_IS) {
4422 		fputs(" lock mode IS", file);
4423 	} else if (lock_get_mode(lock) == LOCK_IX) {
4424 		ut_ad(lock->trx->id != 0);
4425 		fputs(" lock mode IX", file);
4426 	} else if (lock_get_mode(lock) == LOCK_AUTO_INC) {
4427 		fputs(" lock mode AUTO-INC", file);
4428 	} else {
4429 		fprintf(file, " unknown lock mode %lu",
4430 			(ulong) lock_get_mode(lock));
4431 	}
4432 
4433 	if (lock_get_wait(lock)) {
4434 		fputs(" waiting", file);
4435 	}
4436 
4437 	putc('\n', file);
4438 }
4439 
4440 /** Pretty-print a record lock.
4441 @param[in,out]	file	output stream
4442 @param[in]	lock	record lock
4443 @param[in,out]	mtr	mini-transaction for accessing the record */
lock_rec_print(FILE * file,const lock_t * lock,mtr_t & mtr)4444 static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr)
4445 {
4446 	ut_ad(lock_mutex_own());
4447 	ut_a(lock_get_type_low(lock) == LOCK_REC);
4448 
4449 	const page_id_t page_id(lock->un_member.rec_lock.page_id);
4450 
4451 	fprintf(file, "RECORD LOCKS space id %u page no %u n bits " ULINTPF
4452 		" index %s of table ",
4453 		page_id.space(), page_id.page_no(),
4454 		lock_rec_get_n_bits(lock),
4455 		lock->index->name());
4456 	ut_print_name(file, lock->trx, lock->index->table->name.m_name);
4457 	fprintf(file, " trx id " TRX_ID_FMT, trx_get_id_for_print(lock->trx));
4458 
4459 	if (lock_get_mode(lock) == LOCK_S) {
4460 		fputs(" lock mode S", file);
4461 	} else if (lock_get_mode(lock) == LOCK_X) {
4462 		fputs(" lock_mode X", file);
4463 	} else {
4464 		ut_error;
4465 	}
4466 
4467 	if (lock_rec_get_gap(lock)) {
4468 		fputs(" locks gap before rec", file);
4469 	}
4470 
4471 	if (lock_rec_get_rec_not_gap(lock)) {
4472 		fputs(" locks rec but not gap", file);
4473 	}
4474 
4475 	if (lock_rec_get_insert_intention(lock)) {
4476 		fputs(" insert intention", file);
4477 	}
4478 
4479 	if (lock_get_wait(lock)) {
4480 		fputs(" waiting", file);
4481 	}
4482 
4483 	putc('\n', file);
4484 
4485 	mem_heap_t*		heap		= NULL;
4486 	rec_offs		offsets_[REC_OFFS_NORMAL_SIZE];
4487 	rec_offs*		offsets		= offsets_;
4488 	rec_offs_init(offsets_);
4489 
4490 	mtr.start();
4491 	const buf_block_t* block = buf_page_try_get(page_id, &mtr);
4492 
4493 	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
4494 
4495 		if (!lock_rec_get_nth_bit(lock, i)) {
4496 			continue;
4497 		}
4498 
4499 		fprintf(file, "Record lock, heap no %lu", (ulong) i);
4500 
4501 		if (block) {
4502 			ut_ad(page_is_leaf(block->frame));
4503 			const rec_t*	rec;
4504 
4505 			rec = page_find_rec_with_heap_no(
4506 				buf_block_get_frame(block), i);
4507 			ut_ad(!page_rec_is_metadata(rec));
4508 
4509 			offsets = rec_get_offsets(
4510 				rec, lock->index, offsets,
4511 				lock->index->n_core_fields,
4512 				ULINT_UNDEFINED, &heap);
4513 
4514 			putc(' ', file);
4515 			rec_print_new(file, rec, offsets);
4516 		}
4517 
4518 		putc('\n', file);
4519 	}
4520 
4521 	mtr.commit();
4522 
4523 	if (UNIV_LIKELY_NULL(heap)) {
4524 		mem_heap_free(heap);
4525 	}
4526 }
4527 
4528 #ifdef UNIV_DEBUG
4529 /* Print the number of lock structs from lock_print_info_summary() only
4530 in non-production builds for performance reasons, see
4531 http://bugs.mysql.com/36942 */
4532 #define PRINT_NUM_OF_LOCK_STRUCTS
4533 #endif /* UNIV_DEBUG */
4534 
4535 #ifdef PRINT_NUM_OF_LOCK_STRUCTS
4536 /*********************************************************************//**
4537 Calculates the number of record lock structs in the record lock hash table.
4538 @return number of record locks */
lock_get_n_rec_locks()4539 static ulint lock_get_n_rec_locks()
4540 {
4541 	ulint	n_locks	= 0;
4542 	ulint	i;
4543 
4544 	ut_ad(lock_mutex_own());
4545 
4546 	for (i = 0; i < lock_sys.rec_hash.n_cells; i++) {
4547 		const lock_t*	lock;
4548 
4549 		for (lock = static_cast<const lock_t*>(
4550 			     HASH_GET_FIRST(&lock_sys.rec_hash, i));
4551 		     lock != 0;
4552 		     lock = static_cast<const lock_t*>(
4553 				HASH_GET_NEXT(hash, lock))) {
4554 
4555 			n_locks++;
4556 		}
4557 	}
4558 
4559 	return(n_locks);
4560 }
4561 #endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4562 
4563 /*********************************************************************//**
4564 Prints info of locks for all transactions.
4565 @return FALSE if not able to obtain lock mutex
4566 and exits without printing info */
4567 ibool
lock_print_info_summary(FILE * file,ibool nowait)4568 lock_print_info_summary(
4569 /*====================*/
4570 	FILE*	file,	/*!< in: file where to print */
4571 	ibool	nowait)	/*!< in: whether to wait for the lock mutex */
4572 {
4573 	/* if nowait is FALSE, wait on the lock mutex,
4574 	otherwise return immediately if fail to obtain the
4575 	mutex. */
4576 	if (!nowait) {
4577 		lock_mutex_enter();
4578 	} else if (lock_mutex_enter_nowait()) {
4579 		fputs("FAIL TO OBTAIN LOCK MUTEX,"
4580 		      " SKIP LOCK INFO PRINTING\n", file);
4581 		return(FALSE);
4582 	}
4583 
4584 	if (lock_deadlock_found) {
4585 		fputs("------------------------\n"
4586 		      "LATEST DETECTED DEADLOCK\n"
4587 		      "------------------------\n", file);
4588 
4589 		if (!srv_read_only_mode) {
4590 			ut_copy_file(file, lock_latest_err_file);
4591 		}
4592 	}
4593 
4594 	fputs("------------\n"
4595 	      "TRANSACTIONS\n"
4596 	      "------------\n", file);
4597 
4598 	fprintf(file, "Trx id counter " TRX_ID_FMT "\n",
4599 		trx_sys.get_max_trx_id());
4600 
4601 	fprintf(file,
4602 		"Purge done for trx's n:o < " TRX_ID_FMT
4603 		" undo n:o < " TRX_ID_FMT " state: %s\n"
4604 		"History list length %u\n",
4605 		purge_sys.tail.trx_no,
4606 		purge_sys.tail.undo_no,
4607 		purge_sys.enabled()
4608 		? (purge_sys.running() ? "running"
4609 		   : purge_sys.paused() ? "stopped" : "running but idle")
4610 		: "disabled",
4611 		uint32_t{trx_sys.rseg_history_len});
4612 
4613 #ifdef PRINT_NUM_OF_LOCK_STRUCTS
4614 	fprintf(file,
4615 		"Total number of lock structs in row lock hash table %lu\n",
4616 		(ulong) lock_get_n_rec_locks());
4617 #endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4618 	return(TRUE);
4619 }
4620 
4621 /** Prints transaction lock wait and MVCC state.
4622 @param[in,out]	file	file where to print
4623 @param[in]	trx	transaction
4624 @param[in]	now	current time */
4625 void
lock_trx_print_wait_and_mvcc_state(FILE * file,const trx_t * trx,time_t now)4626 lock_trx_print_wait_and_mvcc_state(FILE* file, const trx_t* trx, time_t now)
4627 {
4628 	fprintf(file, "---");
4629 
4630 	trx_print_latched(file, trx, 600);
4631 	trx->read_view.print_limits(file);
4632 
4633 	if (trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
4634 
4635 		fprintf(file,
4636 			"------- TRX HAS BEEN WAITING %lu SEC"
4637 			" FOR THIS LOCK TO BE GRANTED:\n",
4638 			(ulong) difftime(now, trx->lock.wait_started));
4639 
4640 		if (lock_get_type_low(trx->lock.wait_lock) == LOCK_REC) {
4641 			mtr_t mtr;
4642 			lock_rec_print(file, trx->lock.wait_lock, mtr);
4643 		} else {
4644 			lock_table_print(file, trx->lock.wait_lock);
4645 		}
4646 
4647 		fprintf(file, "------------------\n");
4648 	}
4649 }
4650 
4651 /*********************************************************************//**
4652 Prints info of locks for a transaction. */
4653 static
4654 void
lock_trx_print_locks(FILE * file,const trx_t * trx)4655 lock_trx_print_locks(
4656 /*=================*/
4657 	FILE*		file,		/*!< in/out: File to write */
4658 	const trx_t*	trx)		/*!< in: current transaction */
4659 {
4660 	mtr_t mtr;
4661 	uint32_t i= 0;
4662 	/* Iterate over the transaction's locks. */
4663 	for (lock_t *lock = UT_LIST_GET_FIRST(trx->lock.trx_locks);
4664 	     lock != NULL;
4665 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
4666 		if (lock_get_type_low(lock) == LOCK_REC) {
4667 
4668 			lock_rec_print(file, lock, mtr);
4669 		} else {
4670 			ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
4671 
4672 			lock_table_print(file, lock);
4673 		}
4674 
4675 		if (++i == 10) {
4676 
4677 			fprintf(file,
4678 				"10 LOCKS PRINTED FOR THIS TRX:"
4679 				" SUPPRESSING FURTHER PRINTS\n");
4680 
4681 			break;
4682 		}
4683 	}
4684 }
4685 
4686 /** Functor to display all transactions */
4687 struct lock_print_info
4688 {
lock_print_infolock_print_info4689   lock_print_info(FILE* file, time_t now) :
4690     file(file), now(now),
4691     purge_trx(purge_sys.query ? purge_sys.query->trx : NULL)
4692   {}
4693 
operator ()lock_print_info4694   void operator()(const trx_t &trx) const
4695   {
4696     if (UNIV_UNLIKELY(&trx == purge_trx))
4697       return;
4698     lock_trx_print_wait_and_mvcc_state(file, &trx, now);
4699 
4700     if (trx.will_lock && srv_print_innodb_lock_monitor)
4701       lock_trx_print_locks(file, &trx);
4702   }
4703 
4704   FILE* const file;
4705   const time_t now;
4706   const trx_t* const purge_trx;
4707 };
4708 
4709 /*********************************************************************//**
4710 Prints info of locks for each transaction. This function assumes that the
4711 caller holds the lock mutex and more importantly it will release the lock
4712 mutex on behalf of the caller. (This should be fixed in the future). */
4713 void
lock_print_info_all_transactions(FILE * file)4714 lock_print_info_all_transactions(
4715 /*=============================*/
4716 	FILE*		file)	/*!< in/out: file where to print */
4717 {
4718 	ut_ad(lock_mutex_own());
4719 
4720 	fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");
4721 
4722 	trx_sys.trx_list.for_each(lock_print_info(file, time(nullptr)));
4723 	lock_mutex_exit();
4724 
4725 	ut_ad(lock_validate());
4726 }
4727 
4728 #ifdef UNIV_DEBUG
4729 /*********************************************************************//**
4730 Find the the lock in the trx_t::trx_lock_t::table_locks vector.
4731 @return true if found */
4732 static
4733 bool
lock_trx_table_locks_find(trx_t * trx,const lock_t * find_lock)4734 lock_trx_table_locks_find(
4735 /*======================*/
4736 	trx_t*		trx,		/*!< in: trx to validate */
4737 	const lock_t*	find_lock)	/*!< in: lock to find */
4738 {
4739 	bool		found = false;
4740 
4741 	ut_ad(trx_mutex_own(trx));
4742 
4743 	for (lock_list::const_iterator it = trx->lock.table_locks.begin(),
4744              end = trx->lock.table_locks.end(); it != end; ++it) {
4745 
4746 		const lock_t*	lock = *it;
4747 
4748 		if (lock == NULL) {
4749 
4750 			continue;
4751 
4752 		} else if (lock == find_lock) {
4753 
4754 			/* Can't be duplicates. */
4755 			ut_a(!found);
4756 			found = true;
4757 		}
4758 
4759 		ut_a(trx == lock->trx);
4760 		ut_a(lock_get_type_low(lock) & LOCK_TABLE);
4761 		ut_a(lock->un_member.tab_lock.table != NULL);
4762 	}
4763 
4764 	return(found);
4765 }
4766 
4767 /*********************************************************************//**
4768 Validates the lock queue on a table.
4769 @return TRUE if ok */
4770 static
4771 ibool
lock_table_queue_validate(const dict_table_t * table)4772 lock_table_queue_validate(
4773 /*======================*/
4774 	const dict_table_t*	table)	/*!< in: table */
4775 {
4776 	const lock_t*	lock;
4777 
4778 	ut_ad(lock_mutex_own());
4779 
4780 	for (lock = UT_LIST_GET_FIRST(table->locks);
4781 	     lock != NULL;
4782 	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
4783 
4784 		/* lock->trx->state cannot change from or to NOT_STARTED
4785 		while we are holding the lock_sys.mutex. It may change
4786 		from ACTIVE or PREPARED to PREPARED or COMMITTED. */
4787 		trx_mutex_enter(lock->trx);
4788 		check_trx_state(lock->trx);
4789 
4790 		if (lock->trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4791 		} else if (!lock_get_wait(lock)) {
4792 			ut_a(!lock_table_other_has_incompatible(
4793 				     lock->trx, 0, table,
4794 				     lock_get_mode(lock)));
4795 		} else {
4796 			ut_a(lock_table_has_to_wait_in_queue(lock));
4797 		}
4798 
4799 		ut_a(lock_trx_table_locks_find(lock->trx, lock));
4800 		trx_mutex_exit(lock->trx);
4801 	}
4802 
4803 	return(TRUE);
4804 }
4805 
4806 /*********************************************************************//**
4807 Validates the lock queue on a single record.
4808 @return TRUE if ok */
4809 static
4810 bool
lock_rec_queue_validate(bool locked_lock_trx_sys,const buf_block_t * block,const rec_t * rec,const dict_index_t * index,const rec_offs * offsets)4811 lock_rec_queue_validate(
4812 /*====================*/
4813 	bool			locked_lock_trx_sys,
4814 					/*!< in: if the caller holds
4815 					both the lock mutex and
4816 					trx_sys_t->lock. */
4817 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
4818 	const rec_t*		rec,	/*!< in: record to look at */
4819 	const dict_index_t*	index,	/*!< in: index, or NULL if not known */
4820 	const rec_offs*		offsets)/*!< in: rec_get_offsets(rec, index) */
4821 {
4822 	const lock_t*	lock;
4823 	ulint		heap_no;
4824 
4825 	ut_a(rec);
4826 	ut_a(block->frame == page_align(rec));
4827 	ut_ad(rec_offs_validate(rec, index, offsets));
4828 	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4829 	ut_ad(page_rec_is_leaf(rec));
4830 	ut_ad(lock_mutex_own() == locked_lock_trx_sys);
4831 	ut_ad(!index || dict_index_is_clust(index)
4832 	      || !dict_index_is_online_ddl(index));
4833 
4834 	heap_no = page_rec_get_heap_no(rec);
4835 
4836 	if (!locked_lock_trx_sys) {
4837 		lock_mutex_enter();
4838 	}
4839 
4840 	if (!page_rec_is_user_rec(rec)) {
4841 
4842 		for (lock = lock_rec_get_first(&lock_sys.rec_hash,
4843 					       block->page.id(), heap_no);
4844 		     lock != NULL;
4845 		     lock = lock_rec_get_next_const(heap_no, lock)) {
4846 
4847 			ut_ad(!index || lock->index == index);
4848 
4849 			trx_mutex_enter(lock->trx);
4850 			ut_ad(!lock->trx->read_only
4851 			      || !lock->trx->is_autocommit_non_locking());
4852 			ut_ad(trx_state_eq(lock->trx,
4853 					   TRX_STATE_COMMITTED_IN_MEMORY)
4854 			      || !lock_get_wait(lock)
4855 			      || lock_rec_has_to_wait_in_queue(lock));
4856 			trx_mutex_exit(lock->trx);
4857 		}
4858 
4859 func_exit:
4860 		if (!locked_lock_trx_sys) {
4861 			lock_mutex_exit();
4862 		}
4863 
4864 		return true;
4865 	}
4866 
4867 	ut_ad(page_rec_is_leaf(rec));
4868 	ut_ad(lock_mutex_own());
4869 
4870 	const trx_id_t impl_trx_id = index && index->is_primary()
4871 		? lock_clust_rec_some_has_impl(rec, index, offsets)
4872 		: 0;
4873 
4874 	if (trx_t *impl_trx = impl_trx_id
4875 	    ? trx_sys.find(current_trx(), impl_trx_id, false)
4876 	    : 0) {
4877 		/* impl_trx could have been committed before we
4878 		acquire its mutex, but not thereafter. */
4879 
4880 		mutex_enter(&impl_trx->mutex);
4881 		ut_ad(impl_trx->state != TRX_STATE_NOT_STARTED);
4882 		if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4883 		} else if (const lock_t* other_lock
4884 			   = lock_rec_other_has_expl_req(
4885 				   LOCK_S, block, true, heap_no,
4886 				   impl_trx)) {
4887 			/* The impl_trx is holding an implicit lock on the
4888 			given record 'rec'. So there cannot be another
4889 			explicit granted lock.  Also, there can be another
4890 			explicit waiting lock only if the impl_trx has an
4891 			explicit granted lock. */
4892 
4893 #ifdef WITH_WSREP
4894 			/** Galera record locking rules:
4895 			* If there is no other record lock to the same record, we may grant
4896 			the lock request.
4897 			* If there is other record lock but this requested record lock is
4898 			compatible, we may grant the lock request.
4899 			* If there is other record lock and it is not compatible with
4900 			requested lock, all normal transactions must wait.
4901 			* BF (brute force) additional exceptions :
4902 			** If BF already holds record lock for requested record, we may
4903 			grant new record lock even if there is conflicting record lock(s)
4904 			waiting on a queue.
4905 			** If conflicting transaction holds requested record lock,
4906 			we will cancel this record lock and select conflicting transaction
4907 			for BF abort or kill victim.
4908 			** If conflicting transaction is waiting for requested record lock
4909 			we will cancel this wait and select conflicting transaction
4910 			for BF abort or kill victim.
4911 			** There should not be two BF transactions waiting for same record lock
4912 			*/
4913 			if (other_lock->trx->is_wsrep() && !lock_get_wait(other_lock)) {
4914 				wsrep_report_bf_lock_wait(impl_trx->mysql_thd, impl_trx->id);
4915 				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4916 
4917 				if (!lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP,
4918 						       block, heap_no,
4919 						       impl_trx)) {
4920 					ib::info() << "WSREP impl BF lock conflict";
4921 				}
4922 			} else
4923 #endif /* WITH_WSREP */
4924 			{
4925 				ut_ad(lock_get_wait(other_lock));
4926 				/* After MDEV-27025 fix the following case is
4927 				possible:
4928 				1. trx 1 acquires S-lock;
4929 				2. trx 2 creates X-lock waiting for trx 1;
4930 				3. trx 1 creates implicit lock, as
4931 				lock_rec_other_has_conflicting() returns no
4932 				conflicting trx 2 X-lock, the explicit lock
4933 				will not be created;
4934 				4. trx 3 creates waiting X-lock,
4935 				it will wait for S-lock of trx 1.
4936 				That is why we relaxing the condition here and
4937 				check only for S-lock.
4938 				*/
4939 				ut_ad(lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP,
4940 						        block, heap_no, impl_trx));
4941 			}
4942 		}
4943 
4944 		mutex_exit(&impl_trx->mutex);
4945 	}
4946 
4947 	for (lock = lock_rec_get_first(&lock_sys.rec_hash, block->page.id(),
4948 	      heap_no);
4949 	     lock != NULL;
4950 	     lock = lock_rec_get_next_const(heap_no, lock)) {
4951 		ut_ad(!lock->trx->read_only
4952 		      || !lock->trx->is_autocommit_non_locking());
4953 		ut_ad(!page_rec_is_metadata(rec));
4954 
4955 		if (index) {
4956 			ut_a(lock->index == index);
4957 		}
4958 
4959 		if (!lock_rec_get_gap(lock) && !lock_get_wait(lock)) {
4960 
4961 			lock_mode	mode;
4962 
4963 			if (lock_get_mode(lock) == LOCK_S) {
4964 				mode = LOCK_X;
4965 			} else {
4966 				mode = LOCK_S;
4967 			}
4968 
4969 			const lock_t*	other_lock
4970 				= lock_rec_other_has_expl_req(
4971 					mode, block, false, heap_no,
4972 					lock->trx);
4973 #ifdef WITH_WSREP
4974 			if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
4975 				/* Only BF transaction may be granted
4976 				lock before other conflicting lock
4977 				request. */
4978 				if (!wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE)
4979 				    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
4980 					/* If no BF, this case is a bug. */
4981 					wsrep_report_bf_lock_wait(lock->trx->mysql_thd, lock->trx->id);
4982 					wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4983 					ut_error;
4984 				}
4985 			} else
4986 #endif /* WITH_WSREP */
4987 			ut_ad(!other_lock);
4988 		} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
4989 
4990 			ut_a(lock_rec_has_to_wait_in_queue(lock));
4991 		}
4992 	}
4993 
4994 	ut_ad(innodb_lock_schedule_algorithm == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
4995 		  lock_queue_validate(lock));
4996 
4997 	goto func_exit;
4998 }
4999 
5000 /*********************************************************************//**
5001 Validates the record lock queues on a page.
5002 @return TRUE if ok */
5003 static
5004 ibool
lock_rec_validate_page(const buf_block_t * block)5005 lock_rec_validate_page(
5006 /*===================*/
5007 	const buf_block_t*	block)	/*!< in: buffer block */
5008 {
5009 	const lock_t*	lock;
5010 	const rec_t*	rec;
5011 	ulint		nth_lock	= 0;
5012 	ulint		nth_bit		= 0;
5013 	ulint		i;
5014 	mem_heap_t*	heap		= NULL;
5015 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5016 	rec_offs*	offsets		= offsets_;
5017 	rec_offs_init(offsets_);
5018 
5019 	lock_mutex_enter();
5020 loop:
5021 	lock = lock_sys.get_first(block->page.id());
5022 
5023 	if (!lock) {
5024 		goto function_exit;
5025 	}
5026 
5027 	DBUG_ASSERT(block->page.status != buf_page_t::FREED);
5028 
5029 	for (i = 0; i < nth_lock; i++) {
5030 
5031 		lock = lock_rec_get_next_on_page_const(lock);
5032 
5033 		if (!lock) {
5034 			goto function_exit;
5035 		}
5036 	}
5037 
5038 	ut_ad(!lock->trx->read_only
5039 	      || !lock->trx->is_autocommit_non_locking());
5040 
5041 	/* Only validate the record queues when this thread is not
5042 	holding a space->latch. */
5043 	if (!sync_check_find(SYNC_FSP))
5044 	for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
5045 
5046 		if (i == PAGE_HEAP_NO_SUPREMUM
5047 		    || lock_rec_get_nth_bit(lock, i)) {
5048 
5049 			rec = page_find_rec_with_heap_no(block->frame, i);
5050 			ut_a(rec);
5051 			ut_ad(!lock_rec_get_nth_bit(lock, i)
5052 			      || page_rec_is_leaf(rec));
5053 			offsets = rec_get_offsets(rec, lock->index, offsets,
5054 						  lock->index->n_core_fields,
5055 						  ULINT_UNDEFINED, &heap);
5056 
5057 			/* If this thread is holding the file space
5058 			latch (fil_space_t::latch), the following
5059 			check WILL break the latching order and may
5060 			cause a deadlock of threads. */
5061 
5062 			lock_rec_queue_validate(
5063 				TRUE, block, rec, lock->index, offsets);
5064 
5065 			nth_bit = i + 1;
5066 
5067 			goto loop;
5068 		}
5069 	}
5070 
5071 	nth_bit = 0;
5072 	nth_lock++;
5073 
5074 	goto loop;
5075 
5076 function_exit:
5077 	lock_mutex_exit();
5078 
5079 	if (heap != NULL) {
5080 		mem_heap_free(heap);
5081 	}
5082 	return(TRUE);
5083 }
5084 
5085 /*********************************************************************//**
5086 Validate record locks up to a limit.
5087 @return lock at limit or NULL if no more locks in the hash bucket */
5088 static MY_ATTRIBUTE((warn_unused_result))
5089 const lock_t*
lock_rec_validate(ulint start,page_id_t * limit)5090 lock_rec_validate(
5091 /*==============*/
5092 	ulint		start,		/*!< in: lock_sys.rec_hash
5093 					bucket */
5094 	page_id_t*	limit)		/*!< in/out: upper limit of
5095 					(space, page_no) */
5096 {
5097 	ut_ad(lock_mutex_own());
5098 
5099 	for (const lock_t* lock = static_cast<const lock_t*>(
5100 		     HASH_GET_FIRST(&lock_sys.rec_hash, start));
5101 	     lock != NULL;
5102 	     lock = static_cast<const lock_t*>(HASH_GET_NEXT(hash, lock))) {
5103 
5104 		ut_ad(!lock->trx->read_only
5105 		      || !lock->trx->is_autocommit_non_locking());
5106 		ut_ad(lock_get_type(lock) == LOCK_REC);
5107 
5108 		page_id_t current(lock->un_member.rec_lock.page_id);
5109 
5110 		if (current > *limit) {
5111 			*limit = current + 1;
5112 			return(lock);
5113 		}
5114 	}
5115 
5116 	return(0);
5117 }
5118 
5119 /*********************************************************************//**
5120 Validate a record lock's block */
lock_rec_block_validate(const page_id_t page_id)5121 static void lock_rec_block_validate(const page_id_t page_id)
5122 {
5123 	/* The lock and the block that it is referring to may be freed at
5124 	this point. We pass BUF_GET_POSSIBLY_FREED to skip a debug check.
5125 	If the lock exists in lock_rec_validate_page() we assert
5126 	block->page.status != FREED. */
5127 
5128 	buf_block_t*	block;
5129 	mtr_t		mtr;
5130 
5131 	/* Transactional locks should never refer to dropped
5132 	tablespaces, because all DDL operations that would drop or
5133 	discard or rebuild a tablespace do hold an exclusive table
5134 	lock, which would conflict with any locks referring to the
5135 	tablespace from other transactions. */
5136 	if (fil_space_t* space = fil_space_t::get(page_id.space())) {
5137 		dberr_t err = DB_SUCCESS;
5138 		mtr_start(&mtr);
5139 
5140 		block = buf_page_get_gen(
5141 			page_id,
5142 			space->zip_size(),
5143 			RW_X_LATCH, NULL,
5144 			BUF_GET_POSSIBLY_FREED,
5145 			__FILE__, __LINE__, &mtr, &err);
5146 
5147 		if (err != DB_SUCCESS) {
5148 			ib::error() << "Lock rec block validate failed for tablespace "
5149 				   << space->name
5150 				   << page_id << " err " << err;
5151 		}
5152 
5153 		if (block && block->page.status != buf_page_t::FREED) {
5154 			buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
5155 
5156 			ut_ad(lock_rec_validate_page(block));
5157 		}
5158 
5159 		mtr_commit(&mtr);
5160 
5161 		space->release();
5162 	}
5163 }
5164 
5165 
lock_validate_table_locks(rw_trx_hash_element_t * element,void *)5166 static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*)
5167 {
5168   ut_ad(lock_mutex_own());
5169   mutex_enter(&element->mutex);
5170   if (element->trx)
5171   {
5172     check_trx_state(element->trx);
5173     for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
5174          lock != NULL;
5175          lock= UT_LIST_GET_NEXT(trx_locks, lock))
5176     {
5177       if (lock_get_type_low(lock) & LOCK_TABLE)
5178         lock_table_queue_validate(lock->un_member.tab_lock.table);
5179     }
5180   }
5181   mutex_exit(&element->mutex);
5182   return 0;
5183 }
5184 
5185 
5186 /*********************************************************************//**
5187 Validates the lock system.
5188 @return TRUE if ok */
5189 static
5190 bool
lock_validate()5191 lock_validate()
5192 /*===========*/
5193 {
5194 	std::set<page_id_t> pages;
5195 
5196 	lock_mutex_enter();
5197 
5198 	/* Validate table locks */
5199 	trx_sys.rw_trx_hash.iterate(lock_validate_table_locks);
5200 
5201 	/* Iterate over all the record locks and validate the locks. We
5202 	don't want to hog the lock_sys_t::mutex. Release it during the
5203 	validation check. */
5204 
5205 	for (ulint i = 0; i < lock_sys.rec_hash.n_cells; i++) {
5206 		page_id_t limit(0, 0);
5207 
5208 		while (const lock_t* lock = lock_rec_validate(i, &limit)) {
5209 			if (lock_rec_find_set_bit(lock) == ULINT_UNDEFINED) {
5210 				/* The lock bitmap is empty; ignore it. */
5211 				continue;
5212 			}
5213 			pages.insert(lock->un_member.rec_lock.page_id);
5214 		}
5215 	}
5216 
5217 	lock_mutex_exit();
5218 
5219 	for (page_id_t page_id : pages) {
5220 		lock_rec_block_validate(page_id);
5221 	}
5222 
5223 	return(true);
5224 }
5225 #endif /* UNIV_DEBUG */
5226 /*============ RECORD LOCK CHECKS FOR ROW OPERATIONS ====================*/
5227 
5228 /*********************************************************************//**
5229 Checks if locks of other transactions prevent an immediate insert of
5230 a record. If they do, first tests if the query thread should anyway
5231 be suspended for some reason; if not, then puts the transaction and
5232 the query thread to the lock wait state and inserts a waiting request
5233 for a gap x-lock to the lock queue.
5234 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5235 dberr_t
lock_rec_insert_check_and_lock(ulint flags,const rec_t * rec,buf_block_t * block,dict_index_t * index,que_thr_t * thr,mtr_t * mtr,bool * inherit)5236 lock_rec_insert_check_and_lock(
5237 /*===========================*/
5238 	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG bit is
5239 				set, does nothing */
5240 	const rec_t*	rec,	/*!< in: record after which to insert */
5241 	buf_block_t*	block,	/*!< in/out: buffer block of rec */
5242 	dict_index_t*	index,	/*!< in: index */
5243 	que_thr_t*	thr,	/*!< in: query thread */
5244 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
5245 	bool*		inherit)/*!< out: set to true if the new
5246 				inserted record maybe should inherit
5247 				LOCK_GAP type locks from the successor
5248 				record */
5249 {
5250 	ut_ad(block->frame == page_align(rec));
5251 	ut_ad(!dict_index_is_online_ddl(index)
5252 	      || index->is_primary()
5253 	      || (flags & BTR_CREATE_FLAG));
5254 	ut_ad(mtr->is_named_space(index->table->space));
5255 	ut_ad(page_rec_is_leaf(rec));
5256 
5257 	if (flags & BTR_NO_LOCKING_FLAG) {
5258 
5259 		return(DB_SUCCESS);
5260 	}
5261 
5262 	ut_ad(!index->table->is_temporary());
5263 	ut_ad(page_is_leaf(block->frame));
5264 
5265 	dberr_t		err;
5266 	lock_t*		lock;
5267 	bool		inherit_in = *inherit;
5268 	trx_t*		trx = thr_get_trx(thr);
5269 	const rec_t*	next_rec = page_rec_get_next_const(rec);
5270 	ulint		heap_no = page_rec_get_heap_no(next_rec);
5271 	ut_ad(!rec_is_metadata(next_rec, *index));
5272 
5273 	lock_mutex_enter();
5274 	/* Because this code is invoked for a running transaction by
5275 	the thread that is serving the transaction, it is not necessary
5276 	to hold trx->mutex here. */
5277 
5278 	/* When inserting a record into an index, the table must be at
5279 	least IX-locked. When we are building an index, we would pass
5280 	BTR_NO_LOCKING_FLAG and skip the locking altogether. */
5281 	ut_ad(lock_table_has(trx, index->table, LOCK_IX));
5282 
5283 	lock = lock_rec_get_first(&lock_sys.rec_hash, block->page.id(),
5284 	    heap_no);
5285 
5286 	if (lock == NULL) {
5287 		/* We optimize CPU time usage in the simplest case */
5288 
5289 		lock_mutex_exit();
5290 
5291 		if (inherit_in && !dict_index_is_clust(index)) {
5292 			/* Update the page max trx id field */
5293 			page_update_max_trx_id(block,
5294 					       buf_block_get_page_zip(block),
5295 					       trx->id, mtr);
5296 		}
5297 
5298 		*inherit = false;
5299 
5300 		return(DB_SUCCESS);
5301 	}
5302 
5303 	/* Spatial index does not use GAP lock protection. It uses
5304 	"predicate lock" to protect the "range" */
5305 	if (dict_index_is_spatial(index)) {
5306 		return(DB_SUCCESS);
5307 	}
5308 
5309 	*inherit = true;
5310 
5311 	/* If another transaction has an explicit lock request which locks
5312 	the gap, waiting or granted, on the successor, the insert has to wait.
5313 
5314 	An exception is the case where the lock by the another transaction
5315 	is a gap type lock which it placed to wait for its turn to insert. We
5316 	do not consider that kind of a lock conflicting with our insert. This
5317 	eliminates an unnecessary deadlock which resulted when 2 transactions
5318 	had to wait for their insert. Both had waiting gap type lock requests
5319 	on the successor, which produced an unnecessary deadlock. */
5320 
5321 	const unsigned	type_mode = LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;
5322 
5323 	if (lock_t* c_lock =
5324 	    lock_rec_other_has_conflicting(type_mode, block, heap_no, trx)) {
5325 		/* Note that we may get DB_SUCCESS also here! */
5326 		trx_mutex_enter(trx);
5327 
5328 		err = lock_rec_enqueue_waiting(c_lock, type_mode, block,
5329 		    heap_no, index, thr, NULL);
5330 
5331 		trx_mutex_exit(trx);
5332 	} else {
5333 		err = DB_SUCCESS;
5334 	}
5335 
5336 	lock_mutex_exit();
5337 
5338 	switch (err) {
5339 	case DB_SUCCESS_LOCKED_REC:
5340 		err = DB_SUCCESS;
5341 		/* fall through */
5342 	case DB_SUCCESS:
5343 		if (!inherit_in || dict_index_is_clust(index)) {
5344 			break;
5345 		}
5346 
5347 		/* Update the page max trx id field */
5348 		page_update_max_trx_id(
5349 			block, buf_block_get_page_zip(block), trx->id, mtr);
5350 	default:
5351 		/* We only care about the two return values. */
5352 		break;
5353 	}
5354 
5355 #ifdef UNIV_DEBUG
5356 	{
5357 		mem_heap_t*	heap		= NULL;
5358 		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5359 		const rec_offs*	offsets;
5360 		rec_offs_init(offsets_);
5361 
5362 		offsets = rec_get_offsets(next_rec, index, offsets_,
5363 					  index->n_core_fields,
5364 					  ULINT_UNDEFINED, &heap);
5365 
5366 		ut_ad(lock_rec_queue_validate(
5367 				FALSE, block, next_rec, index, offsets));
5368 
5369 		if (heap != NULL) {
5370 			mem_heap_free(heap);
5371 		}
5372 	}
5373 #endif /* UNIV_DEBUG */
5374 
5375 	return(err);
5376 }
5377 
5378 /*********************************************************************//**
5379 Creates an explicit record lock for a running transaction that currently only
5380 has an implicit lock on the record. The transaction instance must have a
5381 reference count > 0 so that it can't be committed and freed before this
5382 function has completed. */
5383 static
5384 void
lock_rec_convert_impl_to_expl_for_trx(const buf_block_t * block,const rec_t * rec,dict_index_t * index,trx_t * trx,ulint heap_no)5385 lock_rec_convert_impl_to_expl_for_trx(
5386 /*==================================*/
5387 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5388 	const rec_t*		rec,	/*!< in: user record on page */
5389 	dict_index_t*		index,	/*!< in: index of record */
5390 	trx_t*			trx,	/*!< in/out: active transaction */
5391 	ulint			heap_no)/*!< in: rec heap number to lock */
5392 {
5393 	ut_ad(trx->is_referenced());
5394 	ut_ad(page_rec_is_leaf(rec));
5395 	ut_ad(!rec_is_metadata(rec, *index));
5396 
5397 	DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx");
5398 	lock_mutex_enter();
5399 	trx_mutex_enter(trx);
5400 	ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
5401 
5402 	if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)
5403 	    && !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
5404 				  block, heap_no, trx)) {
5405 		lock_rec_add_to_queue(LOCK_REC | LOCK_X | LOCK_REC_NOT_GAP,
5406 				      block, heap_no, index, trx, true, true);
5407 	}
5408 
5409 	lock_mutex_exit();
5410 	trx_mutex_exit(trx);
5411 	trx->release_reference();
5412 
5413 	DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx");
5414 }
5415 
5416 
5417 #ifdef UNIV_DEBUG
5418 struct lock_rec_other_trx_holds_expl_arg
5419 {
5420   const ulint heap_no;
5421   const buf_block_t * const block;
5422   const trx_t *impl_trx;
5423 };
5424 
5425 
lock_rec_other_trx_holds_expl_callback(rw_trx_hash_element_t * element,lock_rec_other_trx_holds_expl_arg * arg)5426 static my_bool lock_rec_other_trx_holds_expl_callback(
5427   rw_trx_hash_element_t *element,
5428   lock_rec_other_trx_holds_expl_arg *arg)
5429 {
5430   mutex_enter(&element->mutex);
5431   if (element->trx)
5432   {
5433     trx_mutex_enter(element->trx);
5434     ut_ad(element->trx->state != TRX_STATE_NOT_STARTED);
5435     lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
5436       ? NULL : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP, arg->block,
5437                                  arg->heap_no, element->trx);
5438     /*
5439       An explicit lock is held by trx other than the trx holding the implicit
5440       lock.
5441     */
5442     ut_ad(!expl_lock || expl_lock->trx == arg->impl_trx);
5443     trx_mutex_exit(element->trx);
5444   }
5445   mutex_exit(&element->mutex);
5446   return 0;
5447 }
5448 
5449 
5450 /**
5451   Checks if some transaction, other than given trx_id, has an explicit
5452   lock on the given rec.
5453 
5454   FIXME: if the current transaction holds implicit lock from INSERT, a
5455   subsequent locking read should not convert it to explicit. See also
5456   MDEV-11215.
5457 
5458   @param      caller_trx  trx of current thread
5459   @param[in]  trx         trx holding implicit lock on rec
5460   @param[in]  rec         user record
5461   @param[in]  block       buffer block containing the record
5462 */
5463 
lock_rec_other_trx_holds_expl(trx_t * caller_trx,trx_t * trx,const rec_t * rec,const buf_block_t * block)5464 static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
5465                                           const rec_t *rec,
5466                                           const buf_block_t *block)
5467 {
5468   if (trx)
5469   {
5470     ut_ad(!page_rec_is_metadata(rec));
5471     lock_mutex_enter();
5472     ut_ad(trx->is_referenced());
5473     trx_mutex_enter(trx);
5474     const trx_state_t state = trx->state;
5475     trx_mutex_exit(trx);
5476     ut_ad(state != TRX_STATE_NOT_STARTED);
5477     if (state == TRX_STATE_COMMITTED_IN_MEMORY)
5478     {
5479       /* The transaction was committed before our lock_mutex_enter(). */
5480       lock_mutex_exit();
5481       return;
5482     }
5483     lock_rec_other_trx_holds_expl_arg arg= { page_rec_get_heap_no(rec), block,
5484                                              trx };
5485     trx_sys.rw_trx_hash.iterate(caller_trx,
5486                                 lock_rec_other_trx_holds_expl_callback, &arg);
5487     lock_mutex_exit();
5488   }
5489 }
5490 #endif /* UNIV_DEBUG */
5491 
5492 
5493 /** If an implicit x-lock exists on a record, convert it to an explicit one.
5494 
5495 Often, this is called by a transaction that is about to enter a lock wait
5496 due to the lock conflict. Two explicit locks would be created: first the
5497 exclusive lock on behalf of the lock-holder transaction in this function,
5498 and then a wait request on behalf of caller_trx, in the calling function.
5499 
5500 This may also be called by the same transaction that is already holding
5501 an implicit exclusive lock on the record. In this case, no explicit lock
5502 should be created.
5503 
5504 @param[in,out]	caller_trx	current transaction
5505 @param[in]	block		index tree leaf page
5506 @param[in]	rec		record on the leaf page
5507 @param[in]	index		the index of the record
5508 @param[in]	offsets		rec_get_offsets(rec,index)
5509 @return	whether caller_trx already holds an exclusive lock on rec */
5510 static
5511 bool
lock_rec_convert_impl_to_expl(trx_t * caller_trx,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)5512 lock_rec_convert_impl_to_expl(
5513 	trx_t*			caller_trx,
5514 	const buf_block_t*	block,
5515 	const rec_t*		rec,
5516 	dict_index_t*		index,
5517 	const rec_offs*		offsets)
5518 {
5519 	trx_t*		trx;
5520 
5521 	ut_ad(!lock_mutex_own());
5522 	ut_ad(page_rec_is_user_rec(rec));
5523 	ut_ad(rec_offs_validate(rec, index, offsets));
5524 	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
5525 	ut_ad(page_rec_is_leaf(rec));
5526 	ut_ad(!rec_is_metadata(rec, *index));
5527 
5528 	if (dict_index_is_clust(index)) {
5529 		trx_id_t	trx_id;
5530 
5531 		trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);
5532 
5533 		if (trx_id == 0) {
5534 			return false;
5535 		}
5536 		if (UNIV_UNLIKELY(trx_id == caller_trx->id)) {
5537 			return true;
5538 		}
5539 
5540 		trx = trx_sys.find(caller_trx, trx_id);
5541 	} else {
5542 		ut_ad(!dict_index_is_online_ddl(index));
5543 
5544 		trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
5545 						 offsets);
5546 		if (trx == caller_trx) {
5547 			trx->release_reference();
5548 			return true;
5549 		}
5550 
5551 		ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec,
5552 						   block));
5553 	}
5554 
5555 	if (trx != 0) {
5556 		ulint	heap_no = page_rec_get_heap_no(rec);
5557 
5558 		ut_ad(trx->is_referenced());
5559 
5560 		/* If the transaction is still active and has no
5561 		explicit x-lock set on the record, set one for it.
5562 		trx cannot be committed until the ref count is zero. */
5563 
5564 		lock_rec_convert_impl_to_expl_for_trx(
5565 			block, rec, index, trx, heap_no);
5566 	}
5567 
5568 	return false;
5569 }
5570 
5571 /*********************************************************************//**
5572 Checks if locks of other transactions prevent an immediate modify (update,
5573 delete mark, or delete unmark) of a clustered index record. If they do,
5574 first tests if the query thread should anyway be suspended for some
5575 reason; if not, then puts the transaction and the query thread to the
5576 lock wait state and inserts a waiting request for a record x-lock to the
5577 lock queue.
5578 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5579 dberr_t
lock_clust_rec_modify_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,que_thr_t * thr)5580 lock_clust_rec_modify_check_and_lock(
5581 /*=================================*/
5582 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5583 					bit is set, does nothing */
5584 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5585 	const rec_t*		rec,	/*!< in: record which should be
5586 					modified */
5587 	dict_index_t*		index,	/*!< in: clustered index */
5588 	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5589 	que_thr_t*		thr)	/*!< in: query thread */
5590 {
5591 	dberr_t	err;
5592 	ulint	heap_no;
5593 
5594 	ut_ad(rec_offs_validate(rec, index, offsets));
5595 	ut_ad(page_rec_is_leaf(rec));
5596 	ut_ad(dict_index_is_clust(index));
5597 	ut_ad(block->frame == page_align(rec));
5598 
5599 	if (flags & BTR_NO_LOCKING_FLAG) {
5600 
5601 		return(DB_SUCCESS);
5602 	}
5603 	ut_ad(!rec_is_metadata(rec, *index));
5604 	ut_ad(!index->table->is_temporary());
5605 
5606 	heap_no = rec_offs_comp(offsets)
5607 		? rec_get_heap_no_new(rec)
5608 		: rec_get_heap_no_old(rec);
5609 
5610 	/* If a transaction has no explicit x-lock set on the record, set one
5611 	for it */
5612 
5613 	if (lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec, index,
5614 					  offsets)) {
5615 		/* We already hold an implicit exclusive lock. */
5616 		return DB_SUCCESS;
5617 	}
5618 
5619 	err = lock_rec_lock(TRUE, LOCK_X | LOCK_REC_NOT_GAP,
5620 			    block, heap_no, index, thr);
5621 
5622 	ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5623 
5624 	if (err == DB_SUCCESS_LOCKED_REC) {
5625 		err = DB_SUCCESS;
5626 	}
5627 
5628 	return(err);
5629 }
5630 
5631 /*********************************************************************//**
5632 Checks if locks of other transactions prevent an immediate modify (delete
5633 mark or delete unmark) of a secondary index record.
5634 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5635 dberr_t
lock_sec_rec_modify_check_and_lock(ulint flags,buf_block_t * block,const rec_t * rec,dict_index_t * index,que_thr_t * thr,mtr_t * mtr)5636 lock_sec_rec_modify_check_and_lock(
5637 /*===============================*/
5638 	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5639 				bit is set, does nothing */
5640 	buf_block_t*	block,	/*!< in/out: buffer block of rec */
5641 	const rec_t*	rec,	/*!< in: record which should be
5642 				modified; NOTE: as this is a secondary
5643 				index, we always have to modify the
5644 				clustered index record first: see the
5645 				comment below */
5646 	dict_index_t*	index,	/*!< in: secondary index */
5647 	que_thr_t*	thr,	/*!< in: query thread
5648 				(can be NULL if BTR_NO_LOCKING_FLAG) */
5649 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
5650 {
5651 	dberr_t	err;
5652 	ulint	heap_no;
5653 
5654 	ut_ad(!dict_index_is_clust(index));
5655 	ut_ad(!dict_index_is_online_ddl(index) || (flags & BTR_CREATE_FLAG));
5656 	ut_ad(block->frame == page_align(rec));
5657 	ut_ad(mtr->is_named_space(index->table->space));
5658 	ut_ad(page_rec_is_leaf(rec));
5659 	ut_ad(!rec_is_metadata(rec, *index));
5660 
5661 	if (flags & BTR_NO_LOCKING_FLAG) {
5662 
5663 		return(DB_SUCCESS);
5664 	}
5665 	ut_ad(!index->table->is_temporary());
5666 
5667 	heap_no = page_rec_get_heap_no(rec);
5668 
5669 #ifdef WITH_WSREP
5670 	trx_t *trx= thr_get_trx(thr);
5671 	/* If transaction scanning an unique secondary key is wsrep
5672 	high priority thread (brute force) this scanning may involve
5673 	GAP-locking in the index. As this locking happens also when
5674 	applying replication events in high priority applier threads,
5675 	there is a probability for lock conflicts between two wsrep
5676 	high priority threads. To avoid this GAP-locking we mark that
5677 	this transaction is using unique key scan here. */
5678 	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
5679 		trx->wsrep_UK_scan= true;
5680 #endif /* WITH_WSREP */
5681 
5682 	/* Another transaction cannot have an implicit lock on the record,
5683 	because when we come here, we already have modified the clustered
5684 	index record, and this would not have been possible if another active
5685 	transaction had modified this secondary index record. */
5686 
5687 	err = lock_rec_lock(TRUE, LOCK_X | LOCK_REC_NOT_GAP,
5688 			    block, heap_no, index, thr);
5689 
5690 #ifdef WITH_WSREP
5691 	trx->wsrep_UK_scan= false;
5692 #endif /* WITH_WSREP */
5693 
5694 #ifdef UNIV_DEBUG
5695 	{
5696 		mem_heap_t*	heap		= NULL;
5697 		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5698 		const rec_offs*	offsets;
5699 		rec_offs_init(offsets_);
5700 
5701 		offsets = rec_get_offsets(rec, index, offsets_,
5702 					  index->n_core_fields,
5703 					  ULINT_UNDEFINED, &heap);
5704 
5705 		ut_ad(lock_rec_queue_validate(
5706 			FALSE, block, rec, index, offsets));
5707 
5708 		if (heap != NULL) {
5709 			mem_heap_free(heap);
5710 		}
5711 	}
5712 #endif /* UNIV_DEBUG */
5713 
5714 	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
5715 		/* Update the page max trx id field */
5716 		/* It might not be necessary to do this if
5717 		err == DB_SUCCESS (no new lock created),
5718 		but it should not cost too much performance. */
5719 		page_update_max_trx_id(block,
5720 				       buf_block_get_page_zip(block),
5721 				       thr_get_trx(thr)->id, mtr);
5722 		err = DB_SUCCESS;
5723 	}
5724 
5725 	return(err);
5726 }
5727 
5728 /*********************************************************************//**
5729 Like lock_clust_rec_read_check_and_lock(), but reads a
5730 secondary index record.
5731 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5732 dberr_t
lock_sec_rec_read_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,lock_mode mode,unsigned gap_mode,que_thr_t * thr)5733 lock_sec_rec_read_check_and_lock(
5734 /*=============================*/
5735 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5736 					bit is set, does nothing */
5737 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5738 	const rec_t*		rec,	/*!< in: user record or page
5739 					supremum record which should
5740 					be read or passed over by a
5741 					read cursor */
5742 	dict_index_t*		index,	/*!< in: secondary index */
5743 	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5744 	lock_mode		mode,	/*!< in: mode of the lock which
5745 					the read cursor should set on
5746 					records: LOCK_S or LOCK_X; the
5747 					latter is possible in
5748 					SELECT FOR UPDATE */
5749 	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5750 					LOCK_REC_NOT_GAP */
5751 	que_thr_t*		thr)	/*!< in: query thread */
5752 {
5753 	dberr_t	err;
5754 	ulint	heap_no;
5755 
5756 	ut_ad(!dict_index_is_clust(index));
5757 	ut_ad(!dict_index_is_online_ddl(index));
5758 	ut_ad(block->frame == page_align(rec));
5759 	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
5760 	ut_ad(rec_offs_validate(rec, index, offsets));
5761 	ut_ad(page_rec_is_leaf(rec));
5762 	ut_ad(mode == LOCK_X || mode == LOCK_S);
5763 
5764 	if ((flags & BTR_NO_LOCKING_FLAG)
5765 	    || srv_read_only_mode
5766 	    || index->table->is_temporary()) {
5767 
5768 		return(DB_SUCCESS);
5769 	}
5770 
5771 	ut_ad(!rec_is_metadata(rec, *index));
5772 	heap_no = page_rec_get_heap_no(rec);
5773 
5774 	/* Some transaction may have an implicit x-lock on the record only
5775 	if the max trx id for the page >= min trx id for the trx list or a
5776 	database recovery is running. */
5777 
5778 	trx_t *trx = thr_get_trx(thr);
5779 	if (!lock_table_has(trx, index->table, LOCK_X)
5780 	    && !page_rec_is_supremum(rec)
5781 	    && page_get_max_trx_id(block->frame) >= trx_sys.get_min_trx_id()
5782 	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec,
5783 					     index, offsets)
5784 	    && gap_mode == LOCK_REC_NOT_GAP) {
5785 		/* We already hold an implicit exclusive lock. */
5786 		return DB_SUCCESS;
5787 	}
5788 
5789 #ifdef WITH_WSREP
5790 	/* If transaction scanning an unique secondary key is wsrep
5791 	high priority thread (brute force) this scanning may involve
5792 	GAP-locking in the index. As this locking happens also when
5793 	applying replication events in high priority applier threads,
5794 	there is a probability for lock conflicts between two wsrep
5795 	high priority threads. To avoid this GAP-locking we mark that
5796 	this transaction is using unique key scan here. */
5797 	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
5798 		trx->wsrep_UK_scan= true;
5799 #endif /* WITH_WSREP */
5800 
5801 	err = lock_rec_lock(FALSE, gap_mode | mode,
5802 			    block, heap_no, index, thr);
5803 
5804 #ifdef WITH_WSREP
5805 	trx->wsrep_UK_scan= false;
5806 #endif /* WITH_WSREP */
5807 
5808 	ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5809 
5810 	return(err);
5811 }
5812 
5813 /*********************************************************************//**
5814 Checks if locks of other transactions prevent an immediate read, or passing
5815 over by a read cursor, of a clustered index record. If they do, first tests
5816 if the query thread should anyway be suspended for some reason; if not, then
5817 puts the transaction and the query thread to the lock wait state and inserts a
5818 waiting request for a record lock to the lock queue. Sets the requested mode
5819 lock on the record.
5820 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5821 dberr_t
lock_clust_rec_read_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,lock_mode mode,unsigned gap_mode,que_thr_t * thr)5822 lock_clust_rec_read_check_and_lock(
5823 /*===============================*/
5824 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5825 					bit is set, does nothing */
5826 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5827 	const rec_t*		rec,	/*!< in: user record or page
5828 					supremum record which should
5829 					be read or passed over by a
5830 					read cursor */
5831 	dict_index_t*		index,	/*!< in: clustered index */
5832 	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5833 	lock_mode		mode,	/*!< in: mode of the lock which
5834 					the read cursor should set on
5835 					records: LOCK_S or LOCK_X; the
5836 					latter is possible in
5837 					SELECT FOR UPDATE */
5838 	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5839 					LOCK_REC_NOT_GAP */
5840 	que_thr_t*		thr)	/*!< in: query thread */
5841 {
5842 	dberr_t	err;
5843 	ulint	heap_no;
5844 
5845 	ut_ad(dict_index_is_clust(index));
5846 	ut_ad(block->frame == page_align(rec));
5847 	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
5848 	ut_ad(gap_mode == LOCK_ORDINARY || gap_mode == LOCK_GAP
5849 	      || gap_mode == LOCK_REC_NOT_GAP);
5850 	ut_ad(rec_offs_validate(rec, index, offsets));
5851 	ut_ad(page_rec_is_leaf(rec));
5852 	ut_ad(!rec_is_metadata(rec, *index));
5853 
5854 	if ((flags & BTR_NO_LOCKING_FLAG)
5855 	    || srv_read_only_mode
5856 	    || index->table->is_temporary()) {
5857 
5858 		return(DB_SUCCESS);
5859 	}
5860 
5861 	heap_no = page_rec_get_heap_no(rec);
5862 
5863 	trx_t *trx = thr_get_trx(thr);
5864 	if (!lock_table_has(trx, index->table, LOCK_X)
5865 	    && heap_no != PAGE_HEAP_NO_SUPREMUM
5866 	    && lock_rec_convert_impl_to_expl(trx, block, rec,
5867 					     index, offsets)
5868 	    && gap_mode == LOCK_REC_NOT_GAP) {
5869 		/* We already hold an implicit exclusive lock. */
5870 		return DB_SUCCESS;
5871 	}
5872 
5873 	err = lock_rec_lock(FALSE, gap_mode | mode,
5874 			    block, heap_no, index, thr);
5875 
5876 	ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5877 
5878 	DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock");
5879 
5880 	return(err);
5881 }
5882 /*********************************************************************//**
5883 Checks if locks of other transactions prevent an immediate read, or passing
5884 over by a read cursor, of a clustered index record. If they do, first tests
5885 if the query thread should anyway be suspended for some reason; if not, then
5886 puts the transaction and the query thread to the lock wait state and inserts a
5887 waiting request for a record lock to the lock queue. Sets the requested mode
5888 lock on the record. This is an alternative version of
5889 lock_clust_rec_read_check_and_lock() that does not require the parameter
5890 "offsets".
5891 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5892 dberr_t
lock_clust_rec_read_check_and_lock_alt(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,lock_mode mode,unsigned gap_mode,que_thr_t * thr)5893 lock_clust_rec_read_check_and_lock_alt(
5894 /*===================================*/
5895 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5896 					bit is set, does nothing */
5897 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5898 	const rec_t*		rec,	/*!< in: user record or page
5899 					supremum record which should
5900 					be read or passed over by a
5901 					read cursor */
5902 	dict_index_t*		index,	/*!< in: clustered index */
5903 	lock_mode		mode,	/*!< in: mode of the lock which
5904 					the read cursor should set on
5905 					records: LOCK_S or LOCK_X; the
5906 					latter is possible in
5907 					SELECT FOR UPDATE */
5908 	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5909 					LOCK_REC_NOT_GAP */
5910 	que_thr_t*		thr)	/*!< in: query thread */
5911 {
5912 	mem_heap_t*	tmp_heap	= NULL;
5913 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5914 	rec_offs*	offsets		= offsets_;
5915 	dberr_t		err;
5916 	rec_offs_init(offsets_);
5917 
5918 	ut_ad(page_rec_is_leaf(rec));
5919 	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
5920 				  ULINT_UNDEFINED, &tmp_heap);
5921 	err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
5922 						 offsets, mode, gap_mode, thr);
5923 	if (tmp_heap) {
5924 		mem_heap_free(tmp_heap);
5925 	}
5926 
5927 	if (err == DB_SUCCESS_LOCKED_REC) {
5928 		err = DB_SUCCESS;
5929 	}
5930 
5931 	return(err);
5932 }
5933 
5934 /*******************************************************************//**
5935 Release the last lock from the transaction's autoinc locks. */
5936 UNIV_INLINE
5937 void
lock_release_autoinc_last_lock(ib_vector_t * autoinc_locks)5938 lock_release_autoinc_last_lock(
5939 /*===========================*/
5940 	ib_vector_t*	autoinc_locks)	/*!< in/out: vector of AUTOINC locks */
5941 {
5942 	ulint		last;
5943 	lock_t*		lock;
5944 
5945 	ut_ad(lock_mutex_own());
5946 	ut_a(!ib_vector_is_empty(autoinc_locks));
5947 
5948 	/* The lock to be release must be the last lock acquired. */
5949 	last = ib_vector_size(autoinc_locks) - 1;
5950 	lock = *static_cast<lock_t**>(ib_vector_get(autoinc_locks, last));
5951 
5952 	/* Should have only AUTOINC locks in the vector. */
5953 	ut_a(lock_get_mode(lock) == LOCK_AUTO_INC);
5954 	ut_a(lock_get_type(lock) == LOCK_TABLE);
5955 
5956 	ut_a(lock->un_member.tab_lock.table != NULL);
5957 
5958 	/* This will remove the lock from the trx autoinc_locks too. */
5959 	lock_table_dequeue(lock);
5960 
5961 	/* Remove from the table vector too. */
5962 	lock_trx_table_locks_remove(lock);
5963 }
5964 
5965 /*******************************************************************//**
5966 Check if a transaction holds any autoinc locks.
5967 @return TRUE if the transaction holds any AUTOINC locks. */
5968 static
5969 ibool
lock_trx_holds_autoinc_locks(const trx_t * trx)5970 lock_trx_holds_autoinc_locks(
5971 /*=========================*/
5972 	const trx_t*	trx)		/*!< in: transaction */
5973 {
5974 	ut_a(trx->autoinc_locks != NULL);
5975 
5976 	return(!ib_vector_is_empty(trx->autoinc_locks));
5977 }
5978 
5979 /*******************************************************************//**
5980 Release all the transaction's autoinc locks. */
5981 static
5982 void
lock_release_autoinc_locks(trx_t * trx)5983 lock_release_autoinc_locks(
5984 /*=======================*/
5985 	trx_t*		trx)		/*!< in/out: transaction */
5986 {
5987 	ut_ad(lock_mutex_own());
5988 	/* If this is invoked for a running transaction by the thread
5989 	that is serving the transaction, then it is not necessary to
5990 	hold trx->mutex here. */
5991 
5992 	ut_a(trx->autoinc_locks != NULL);
5993 
5994 	/* We release the locks in the reverse order. This is to
5995 	avoid searching the vector for the element to delete at
5996 	the lower level. See (lock_table_remove_low()) for details. */
5997 	while (!ib_vector_is_empty(trx->autoinc_locks)) {
5998 
5999 		/* lock_table_remove_low() will also remove the lock from
6000 		the transaction's autoinc_locks vector. */
6001 		lock_release_autoinc_last_lock(trx->autoinc_locks);
6002 	}
6003 
6004 	/* Should release all locks. */
6005 	ut_a(ib_vector_is_empty(trx->autoinc_locks));
6006 }
6007 
6008 /*******************************************************************//**
6009 Gets the type of a lock. Non-inline version for using outside of the
6010 lock module.
6011 @return LOCK_TABLE or LOCK_REC */
6012 ulint
lock_get_type(const lock_t * lock)6013 lock_get_type(
6014 /*==========*/
6015 	const lock_t*	lock)	/*!< in: lock */
6016 {
6017 	return(lock_get_type_low(lock));
6018 }
6019 
6020 /*******************************************************************//**
6021 Gets the id of the transaction owning a lock.
6022 @return transaction id */
6023 trx_id_t
lock_get_trx_id(const lock_t * lock)6024 lock_get_trx_id(
6025 /*============*/
6026 	const lock_t*	lock)	/*!< in: lock */
6027 {
6028 	return(trx_get_id_for_print(lock->trx));
6029 }
6030 
6031 /*******************************************************************//**
6032 Gets the table on which the lock is.
6033 @return table */
6034 UNIV_INLINE
6035 dict_table_t*
lock_get_table(const lock_t * lock)6036 lock_get_table(
6037 /*===========*/
6038 	const lock_t*	lock)	/*!< in: lock */
6039 {
6040 	switch (lock_get_type_low(lock)) {
6041 	case LOCK_REC:
6042 		ut_ad(dict_index_is_clust(lock->index)
6043 		      || !dict_index_is_online_ddl(lock->index));
6044 		return(lock->index->table);
6045 	case LOCK_TABLE:
6046 		return(lock->un_member.tab_lock.table);
6047 	default:
6048 		ut_error;
6049 		return(NULL);
6050 	}
6051 }
6052 
6053 /*******************************************************************//**
6054 Gets the id of the table on which the lock is.
6055 @return id of the table */
6056 table_id_t
lock_get_table_id(const lock_t * lock)6057 lock_get_table_id(
6058 /*==============*/
6059 	const lock_t*	lock)	/*!< in: lock */
6060 {
6061 	dict_table_t* table = lock_get_table(lock);
6062 	ut_ad(!table->is_temporary());
6063 	return(table->id);
6064 }
6065 
6066 /** Determine which table a lock is associated with.
6067 @param[in]	lock	the lock
6068 @return name of the table */
6069 const table_name_t&
lock_get_table_name(const lock_t * lock)6070 lock_get_table_name(
6071 	const lock_t*	lock)
6072 {
6073 	return(lock_get_table(lock)->name);
6074 }
6075 
6076 /*******************************************************************//**
6077 For a record lock, gets the index on which the lock is.
6078 @return index */
6079 const dict_index_t*
lock_rec_get_index(const lock_t * lock)6080 lock_rec_get_index(
6081 /*===============*/
6082 	const lock_t*	lock)	/*!< in: lock */
6083 {
6084 	ut_a(lock_get_type_low(lock) == LOCK_REC);
6085 	ut_ad(dict_index_is_clust(lock->index)
6086 	      || !dict_index_is_online_ddl(lock->index));
6087 
6088 	return(lock->index);
6089 }
6090 
6091 /*******************************************************************//**
6092 For a record lock, gets the name of the index on which the lock is.
6093 The string should not be free()'d or modified.
6094 @return name of the index */
6095 const char*
lock_rec_get_index_name(const lock_t * lock)6096 lock_rec_get_index_name(
6097 /*====================*/
6098 	const lock_t*	lock)	/*!< in: lock */
6099 {
6100 	ut_a(lock_get_type_low(lock) == LOCK_REC);
6101 	ut_ad(dict_index_is_clust(lock->index)
6102 	      || !dict_index_is_online_ddl(lock->index));
6103 
6104 	return(lock->index->name);
6105 }
6106 
6107 /*********************************************************************//**
6108 Cancels a waiting lock request and releases possible other transactions
6109 waiting behind it. */
6110 void
lock_cancel_waiting_and_release(lock_t * lock)6111 lock_cancel_waiting_and_release(
6112 /*============================*/
6113 	lock_t*	lock)	/*!< in/out: waiting lock request */
6114 {
6115 	que_thr_t*	thr;
6116 
6117 	ut_ad(lock_mutex_own());
6118 	ut_ad(trx_mutex_own(lock->trx));
6119 	ut_ad(lock->trx->state == TRX_STATE_ACTIVE);
6120 
6121 	lock->trx->lock.cancel = true;
6122 
6123 	if (lock_get_type_low(lock) == LOCK_REC) {
6124 
6125 		lock_rec_dequeue_from_page(lock);
6126 	} else {
6127 		ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
6128 
6129 		if (lock->trx->autoinc_locks != NULL) {
6130 			/* Release the transaction's AUTOINC locks. */
6131 			lock_release_autoinc_locks(lock->trx);
6132 		}
6133 
6134 		lock_table_dequeue(lock);
6135 		/* Remove the lock from table lock vector too. */
6136 		lock_trx_table_locks_remove(lock);
6137 	}
6138 
6139 	/* Reset the wait flag and the back pointer to lock in trx. */
6140 
6141 	lock_reset_lock_and_trx_wait(lock);
6142 
6143 	/* The following function releases the trx from lock wait. */
6144 
6145 	thr = que_thr_end_lock_wait(lock->trx);
6146 
6147 	if (thr != NULL) {
6148 		lock_wait_release_thread_if_suspended(thr);
6149 	}
6150 
6151 	lock->trx->lock.cancel = false;
6152 }
6153 
6154 /*********************************************************************//**
6155 Unlocks AUTO_INC type locks that were possibly reserved by a trx. This
6156 function should be called at the the end of an SQL statement, by the
6157 connection thread that owns the transaction (trx->mysql_thd). */
6158 void
lock_unlock_table_autoinc(trx_t * trx)6159 lock_unlock_table_autoinc(
6160 /*======================*/
6161 	trx_t*	trx)	/*!< in/out: transaction */
6162 {
6163 	ut_ad(!lock_mutex_own());
6164 	ut_ad(!trx_mutex_own(trx));
6165 	ut_ad(!trx->lock.wait_lock);
6166 
6167 	/* This can be invoked on NOT_STARTED, ACTIVE, PREPARED,
6168 	but not COMMITTED transactions. */
6169 
6170 	ut_ad(trx_state_eq(trx, TRX_STATE_NOT_STARTED)
6171 	      || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
6172 
6173 	/* This function is invoked for a running transaction by the
6174 	thread that is serving the transaction. Therefore it is not
6175 	necessary to hold trx->mutex here. */
6176 
6177 	if (lock_trx_holds_autoinc_locks(trx)) {
6178 		lock_mutex_enter();
6179 
6180 		lock_release_autoinc_locks(trx);
6181 
6182 		lock_mutex_exit();
6183 	}
6184 }
6185 
lock_trx_handle_wait_low(trx_t * trx)6186 static inline dberr_t lock_trx_handle_wait_low(trx_t* trx)
6187 {
6188 	ut_ad(lock_mutex_own());
6189 	ut_ad(trx_mutex_own(trx));
6190 
6191 	if (trx->lock.was_chosen_as_deadlock_victim) {
6192 		return DB_DEADLOCK;
6193 	}
6194 	if (!trx->lock.wait_lock) {
6195 		/* The lock was probably granted before we got here. */
6196 		return DB_SUCCESS;
6197 	}
6198 
6199 	lock_cancel_waiting_and_release(trx->lock.wait_lock);
6200 	return DB_LOCK_WAIT;
6201 }
6202 
6203 /*********************************************************************//**
6204 Check whether the transaction has already been rolled back because it
6205 was selected as a deadlock victim, or if it has to wait then cancel
6206 the wait lock.
6207 @return DB_DEADLOCK, DB_LOCK_WAIT or DB_SUCCESS */
6208 dberr_t
lock_trx_handle_wait(trx_t * trx)6209 lock_trx_handle_wait(
6210 /*=================*/
6211 	trx_t*	trx)	/*!< in/out: trx lock state */
6212 {
6213 #ifdef WITH_WSREP
6214 	/* We already own mutexes */
6215 	if (trx->lock.was_chosen_as_wsrep_victim) {
6216 		return lock_trx_handle_wait_low(trx);
6217 	}
6218 #endif /* WITH_WSREP */
6219 	lock_mutex_enter();
6220 	trx_mutex_enter(trx);
6221 	dberr_t err = lock_trx_handle_wait_low(trx);
6222 	lock_mutex_exit();
6223 	trx_mutex_exit(trx);
6224 	return err;
6225 }
6226 
6227 /*********************************************************************//**
6228 Get the number of locks on a table.
6229 @return number of locks */
6230 ulint
lock_table_get_n_locks(const dict_table_t * table)6231 lock_table_get_n_locks(
6232 /*===================*/
6233 	const dict_table_t*	table)	/*!< in: table */
6234 {
6235 	ulint		n_table_locks;
6236 
6237 	lock_mutex_enter();
6238 
6239 	n_table_locks = UT_LIST_GET_LEN(table->locks);
6240 
6241 	lock_mutex_exit();
6242 
6243 	return(n_table_locks);
6244 }
6245 
6246 #ifdef UNIV_DEBUG
6247 /**
6248   Do an exhaustive check for any locks (table or rec) against the table.
6249 
6250   @param[in]  table  check if there are any locks held on records in this table
6251                      or on the table itself
6252 */
6253 
lock_table_locks_lookup(rw_trx_hash_element_t * element,const dict_table_t * table)6254 static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element,
6255                                        const dict_table_t *table)
6256 {
6257   ut_ad(lock_mutex_own());
6258   mutex_enter(&element->mutex);
6259   if (element->trx)
6260   {
6261     trx_mutex_enter(element->trx);
6262     check_trx_state(element->trx);
6263     if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY)
6264     {
6265       for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
6266            lock != NULL;
6267            lock= UT_LIST_GET_NEXT(trx_locks, lock))
6268       {
6269         ut_ad(lock->trx == element->trx);
6270         if (lock_get_type_low(lock) == LOCK_REC)
6271         {
6272           ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION ||
6273                 lock->index->is_primary());
6274           ut_ad(lock->index->table != table);
6275         }
6276         else
6277           ut_ad(lock->un_member.tab_lock.table != table);
6278       }
6279     }
6280     trx_mutex_exit(element->trx);
6281   }
6282   mutex_exit(&element->mutex);
6283   return 0;
6284 }
6285 #endif /* UNIV_DEBUG */
6286 
6287 /*******************************************************************//**
6288 Check if there are any locks (table or rec) against table.
6289 @return true if table has either table or record locks. */
6290 bool
lock_table_has_locks(const dict_table_t * table)6291 lock_table_has_locks(
6292 /*=================*/
6293 	const dict_table_t*	table)	/*!< in: check if there are any locks
6294 					held on records in this table or on the
6295 					table itself */
6296 {
6297 	ibool			has_locks;
6298 
6299 	ut_ad(table != NULL);
6300 	lock_mutex_enter();
6301 
6302 	has_locks = UT_LIST_GET_LEN(table->locks) > 0 || table->n_rec_locks > 0;
6303 
6304 #ifdef UNIV_DEBUG
6305 	if (!has_locks) {
6306 		trx_sys.rw_trx_hash.iterate(lock_table_locks_lookup, table);
6307 	}
6308 #endif /* UNIV_DEBUG */
6309 
6310 	lock_mutex_exit();
6311 
6312 	return(has_locks);
6313 }
6314 
6315 /*******************************************************************//**
6316 Initialise the table lock list. */
6317 void
lock_table_lock_list_init(table_lock_list_t * lock_list)6318 lock_table_lock_list_init(
6319 /*======================*/
6320 	table_lock_list_t*	lock_list)	/*!< List to initialise */
6321 {
6322 	UT_LIST_INIT(*lock_list, &lock_table_t::locks);
6323 }
6324 
6325 /*******************************************************************//**
6326 Initialise the trx lock list. */
6327 void
lock_trx_lock_list_init(trx_lock_list_t * lock_list)6328 lock_trx_lock_list_init(
6329 /*====================*/
6330 	trx_lock_list_t*	lock_list)	/*!< List to initialise */
6331 {
6332 	UT_LIST_INIT(*lock_list, &lock_t::trx_locks);
6333 }
6334 
6335 
6336 #ifdef UNIV_DEBUG
6337 /*******************************************************************//**
6338 Check if the transaction holds any locks on the sys tables
6339 or its records.
6340 @return the strongest lock found on any sys table or 0 for none */
6341 const lock_t*
lock_trx_has_sys_table_locks(const trx_t * trx)6342 lock_trx_has_sys_table_locks(
6343 /*=========================*/
6344 	const trx_t*	trx)	/*!< in: transaction to check */
6345 {
6346 	const lock_t*	strongest_lock = 0;
6347 	lock_mode	strongest = LOCK_NONE;
6348 
6349 	lock_mutex_enter();
6350 
6351 	const lock_list::const_iterator end = trx->lock.table_locks.end();
6352 	lock_list::const_iterator it = trx->lock.table_locks.begin();
6353 
6354 	/* Find a valid mode. Note: ib_vector_size() can be 0. */
6355 
6356 	for (/* No op */; it != end; ++it) {
6357 		const lock_t*	lock = *it;
6358 
6359 		if (lock != NULL
6360 		    && dict_is_sys_table(lock->un_member.tab_lock.table->id)) {
6361 
6362 			strongest = lock_get_mode(lock);
6363 			ut_ad(strongest != LOCK_NONE);
6364 			strongest_lock = lock;
6365 			break;
6366 		}
6367 	}
6368 
6369 	if (strongest == LOCK_NONE) {
6370 		lock_mutex_exit();
6371 		return(NULL);
6372 	}
6373 
6374 	for (/* No op */; it != end; ++it) {
6375 		const lock_t*	lock = *it;
6376 
6377 		if (lock == NULL) {
6378 			continue;
6379 		}
6380 
6381 		ut_ad(trx == lock->trx);
6382 		ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
6383 		ut_ad(lock->un_member.tab_lock.table != NULL);
6384 
6385 		lock_mode	mode = lock_get_mode(lock);
6386 
6387 		if (dict_is_sys_table(lock->un_member.tab_lock.table->id)
6388 		    && lock_mode_stronger_or_eq(mode, strongest)) {
6389 
6390 			strongest = mode;
6391 			strongest_lock = lock;
6392 		}
6393 	}
6394 
6395 	lock_mutex_exit();
6396 
6397 	return(strongest_lock);
6398 }
6399 
6400 /** Check if the transaction holds an explicit exclusive lock on a record.
6401 @param[in]	trx	transaction
6402 @param[in]	table	table
6403 @param[in]	block	leaf page
6404 @param[in]	heap_no	heap number identifying the record
6405 @return whether an explicit X-lock is held */
6406 bool
lock_trx_has_expl_x_lock(const trx_t * trx,const dict_table_t * table,const buf_block_t * block,ulint heap_no)6407 lock_trx_has_expl_x_lock(
6408 	const trx_t*		trx,	/*!< in: transaction to check */
6409 	const dict_table_t*	table,	/*!< in: table to check */
6410 	const buf_block_t*	block,	/*!< in: buffer block of the record */
6411 	ulint			heap_no)/*!< in: record heap number */
6412 {
6413 	ut_ad(heap_no > PAGE_HEAP_NO_SUPREMUM);
6414 
6415 	lock_mutex_enter();
6416 	ut_ad(lock_table_has(trx, table, LOCK_IX));
6417 	ut_ad(lock_table_has(trx, table, LOCK_X)
6418 	      || lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no,
6419 				   trx));
6420 	lock_mutex_exit();
6421 	return(true);
6422 }
6423 #endif /* UNIV_DEBUG */
6424 
6425 /** rewind(3) the file used for storing the latest detected deadlock and
6426 print a heading message to stderr if printing of all deadlocks to stderr
6427 is enabled. */
6428 void
start_print()6429 DeadlockChecker::start_print()
6430 {
6431 	ut_ad(lock_mutex_own());
6432 
6433 	rewind(lock_latest_err_file);
6434 	ut_print_timestamp(lock_latest_err_file);
6435 
6436 	if (srv_print_all_deadlocks) {
6437 		ib::info() << "Transactions deadlock detected, dumping"
6438 			" detailed information.";
6439 	}
6440 }
6441 
6442 /** Print a message to the deadlock file and possibly to stderr.
6443 @param msg message to print */
6444 void
print(const char * msg)6445 DeadlockChecker::print(const char* msg)
6446 {
6447 	fputs(msg, lock_latest_err_file);
6448 
6449 	if (srv_print_all_deadlocks) {
6450 		ib::info() << msg;
6451 	}
6452 }
6453 
6454 /** Print transaction data to the deadlock file and possibly to stderr.
6455 @param trx transaction
6456 @param max_query_len max query length to print */
6457 void
print(const trx_t * trx,ulint max_query_len)6458 DeadlockChecker::print(const trx_t* trx, ulint max_query_len)
6459 {
6460 	ut_ad(lock_mutex_own());
6461 
6462 	ulint	n_rec_locks = lock_number_of_rows_locked(&trx->lock);
6463 	ulint	n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
6464 	ulint	heap_size = mem_heap_get_size(trx->lock.lock_heap);
6465 
6466 	trx_print_low(lock_latest_err_file, trx, max_query_len,
6467 		      n_rec_locks, n_trx_locks, heap_size);
6468 
6469 	if (srv_print_all_deadlocks) {
6470 		trx_print_low(stderr, trx, max_query_len,
6471 			      n_rec_locks, n_trx_locks, heap_size);
6472 	}
6473 }
6474 
6475 /** Print lock data to the deadlock file and possibly to stderr.
6476 @param lock record or table type lock */
6477 void
print(const lock_t * lock)6478 DeadlockChecker::print(const lock_t* lock)
6479 {
6480 	ut_ad(lock_mutex_own());
6481 
6482 	if (lock_get_type_low(lock) == LOCK_REC) {
6483 		mtr_t mtr;
6484 		lock_rec_print(lock_latest_err_file, lock, mtr);
6485 
6486 		if (srv_print_all_deadlocks) {
6487 			lock_rec_print(stderr, lock, mtr);
6488 		}
6489 	} else {
6490 		lock_table_print(lock_latest_err_file, lock);
6491 
6492 		if (srv_print_all_deadlocks) {
6493 			lock_table_print(stderr, lock);
6494 		}
6495 	}
6496 }
6497 
6498 /** Get the next lock in the queue that is owned by a transaction whose
6499 sub-tree has not already been searched.
6500 Note: "next" here means PREV for table locks.
6501 
6502 @param lock Lock in queue
6503 @param heap_no heap_no if lock is a record lock else ULINT_UNDEFINED
6504 
6505 @return next lock or NULL if at end of queue */
6506 const lock_t*
get_next_lock(const lock_t * lock,ulint heap_no) const6507 DeadlockChecker::get_next_lock(const lock_t* lock, ulint heap_no) const
6508 {
6509 	ut_ad(lock_mutex_own());
6510 
6511 	do {
6512 		if (lock_get_type_low(lock) == LOCK_REC) {
6513 			ut_ad(heap_no != ULINT_UNDEFINED);
6514 			lock = lock_rec_get_next_const(heap_no, lock);
6515 		} else {
6516 			ut_ad(heap_no == ULINT_UNDEFINED);
6517 			ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
6518 
6519 			lock = UT_LIST_GET_NEXT(
6520 				un_member.tab_lock.locks, lock);
6521 		}
6522 
6523 	} while (lock != NULL && is_visited(lock));
6524 
6525 	ut_ad(lock == NULL
6526 	      || lock_get_type_low(lock) == lock_get_type_low(m_wait_lock));
6527 
6528 	return(lock);
6529 }
6530 
6531 /** Get the first lock to search. The search starts from the current
6532 wait_lock. What we are really interested in is an edge from the
6533 current wait_lock's owning transaction to another transaction that has
6534 a lock ahead in the queue. We skip locks where the owning transaction's
6535 sub-tree has already been searched.
6536 
6537 Note: The record locks are traversed from the oldest lock to the
6538 latest. For table locks we go from latest to oldest.
6539 
6540 For record locks, we first position the "iterator" on the first lock on
6541 the page and then reposition on the actual heap_no. This is required
6542 due to the way the record lock has is implemented.
6543 
6544 @param[out] heap_no if rec lock, else ULINT_UNDEFINED.
6545 @return first lock or NULL */
6546 const lock_t*
get_first_lock(ulint * heap_no) const6547 DeadlockChecker::get_first_lock(ulint* heap_no) const
6548 {
6549 	ut_ad(lock_mutex_own());
6550 
6551 	const lock_t*	lock = m_wait_lock;
6552 
6553 	if (lock_get_type_low(lock) == LOCK_REC) {
6554 		/* We are only interested in records that match the heap_no. */
6555 		*heap_no = lock_rec_find_set_bit(lock);
6556 
6557 		ut_ad(*heap_no <= 0xffff);
6558 		ut_ad(*heap_no != ULINT_UNDEFINED);
6559 
6560 		/* Find the locks on the page. */
6561 		lock = lock_sys.get_first(
6562 			lock->type_mode & LOCK_PREDICATE
6563 			? lock_sys.prdt_hash
6564 			: lock_sys.rec_hash,
6565 			lock->un_member.rec_lock.page_id);
6566 
6567 		/* Position on the first lock on the physical record.*/
6568 		if (!lock_rec_get_nth_bit(lock, *heap_no)) {
6569 			lock = lock_rec_get_next_const(*heap_no, lock);
6570 		}
6571 
6572 		ut_a(!lock_get_wait(lock));
6573 	} else {
6574 		/* Table locks don't care about the heap_no. */
6575 		*heap_no = ULINT_UNDEFINED;
6576 		ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
6577 		dict_table_t*	table = lock->un_member.tab_lock.table;
6578 		lock = UT_LIST_GET_FIRST(table->locks);
6579 	}
6580 
6581 	/* Must find at least two locks, otherwise there cannot be a
6582 	waiting lock, secondly the first lock cannot be the wait_lock. */
6583 	ut_a(lock != NULL);
6584 	ut_a(lock != m_wait_lock ||
6585 	     (innodb_lock_schedule_algorithm
6586 	      	== INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
6587 	      && !thd_is_replication_slave_thread(lock->trx->mysql_thd)));
6588 
6589 	/* Check that the lock type doesn't change. */
6590 	ut_ad(lock_get_type_low(lock) == lock_get_type_low(m_wait_lock));
6591 
6592 	return(lock);
6593 }
6594 
6595 /** Notify that a deadlock has been detected and print the conflicting
6596 transaction info.
6597 @param lock lock causing deadlock */
6598 void
notify(const lock_t * lock) const6599 DeadlockChecker::notify(const lock_t* lock) const
6600 {
6601 	ut_ad(lock_mutex_own());
6602 
6603 	start_print();
6604 
6605 	print("\n*** (1) TRANSACTION:\n");
6606 
6607 	print(m_wait_lock->trx, 3000);
6608 
6609 	print("*** (1) WAITING FOR THIS LOCK TO BE GRANTED:\n");
6610 
6611 	print(m_wait_lock);
6612 
6613 	print("*** (2) TRANSACTION:\n");
6614 
6615 	print(lock->trx, 3000);
6616 
6617 	print("*** (2) HOLDS THE LOCK(S):\n");
6618 
6619 	print(lock);
6620 
6621 	/* It is possible that the joining transaction was granted its
6622 	lock when we rolled back some other waiting transaction. */
6623 
6624 	if (m_start->lock.wait_lock != 0) {
6625 		print("*** (2) WAITING FOR THIS LOCK TO BE GRANTED:\n");
6626 
6627 		print(m_start->lock.wait_lock);
6628 	}
6629 
6630 	DBUG_PRINT("ib_lock", ("deadlock detected"));
6631 }
6632 
6633 /** Select the victim transaction that should be rolledback.
6634 @return victim transaction */
6635 const trx_t*
select_victim() const6636 DeadlockChecker::select_victim() const
6637 {
6638 	ut_ad(lock_mutex_own());
6639 	ut_ad(m_start->lock.wait_lock != 0);
6640 	ut_ad(m_wait_lock->trx != m_start);
6641 
6642 	if (trx_weight_ge(m_wait_lock->trx, m_start)) {
6643 		/* The joining transaction is 'smaller',
6644 		choose it as the victim and roll it back. */
6645 #ifdef WITH_WSREP
6646 		if (wsrep_thd_is_BF(m_start->mysql_thd, FALSE)) {
6647 			return(m_wait_lock->trx);
6648 		}
6649 #endif /* WITH_WSREP */
6650 		return(m_start);
6651 	}
6652 
6653 #ifdef WITH_WSREP
6654 	if (wsrep_thd_is_BF(m_wait_lock->trx->mysql_thd, FALSE)) {
6655 		return(m_start);
6656 	}
6657 #endif /* WITH_WSREP */
6658 
6659 	return(m_wait_lock->trx);
6660 }
6661 
6662 /** Looks iteratively for a deadlock. Note: the joining transaction may
6663 have been granted its lock by the deadlock checks.
6664 @return 0 if no deadlock else the victim transaction instance.*/
6665 const trx_t*
search()6666 DeadlockChecker::search()
6667 {
6668 	ut_ad(lock_mutex_own());
6669 	ut_ad(!trx_mutex_own(m_start));
6670 
6671 	ut_ad(m_start != NULL);
6672 	ut_ad(m_wait_lock != NULL);
6673 	ut_ad(!m_wait_lock->trx->auto_commit || m_wait_lock->trx->will_lock);
6674 	ut_d(check_trx_state(m_wait_lock->trx));
6675 	ut_ad(m_mark_start <= s_lock_mark_counter);
6676 
6677 	/* Look at the locks ahead of wait_lock in the lock queue. */
6678 	ulint		heap_no;
6679 	const lock_t*	lock = get_first_lock(&heap_no);
6680 
6681 	for (;;) {
6682 		/* We should never visit the same sub-tree more than once. */
6683 		ut_ad(lock == NULL || !is_visited(lock));
6684 
6685 		while (m_n_elems > 0 && lock == NULL) {
6686 
6687 			/* Restore previous search state. */
6688 
6689 			pop(lock, heap_no);
6690 
6691 			lock = get_next_lock(lock, heap_no);
6692 		}
6693 
6694 		if (lock == NULL) {
6695 			break;
6696 		}
6697 
6698 		if (lock == m_wait_lock) {
6699 
6700 			/* We can mark this subtree as searched */
6701 			ut_ad(lock->trx->lock.deadlock_mark <= m_mark_start);
6702 
6703 			lock->trx->lock.deadlock_mark = ++s_lock_mark_counter;
6704 
6705 			/* We are not prepared for an overflow. This 64-bit
6706 			counter should never wrap around. At 10^9 increments
6707 			per second, it would take 10^3 years of uptime. */
6708 
6709 			ut_ad(s_lock_mark_counter > 0);
6710 
6711 			/* Backtrack */
6712 			lock = NULL;
6713 			continue;
6714 		}
6715 
6716 		if (!lock_has_to_wait(m_wait_lock, lock)) {
6717 			/* No conflict, next lock */
6718 			lock = get_next_lock(lock, heap_no);
6719 			continue;
6720 		}
6721 
6722 		if (lock->trx == m_start) {
6723 			/* Found a cycle. */
6724 			notify(lock);
6725 			return select_victim();
6726 		}
6727 
6728 		if (is_too_deep()) {
6729 			/* Search too deep to continue. */
6730 			m_too_deep = true;
6731 			return m_start;
6732 		}
6733 
6734 		/* We do not need to report autoinc locks to the upper
6735 		layer. These locks are released before commit, so they
6736 		can not cause deadlocks with binlog-fixed commit
6737 		order. */
6738 		if (m_report_waiters
6739 		    && (lock_get_type_low(lock) != LOCK_TABLE
6740 			|| lock_get_mode(lock) != LOCK_AUTO_INC)) {
6741 			thd_rpl_deadlock_check(m_start->mysql_thd,
6742 					       lock->trx->mysql_thd);
6743 		}
6744 
6745 		if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
6746 			/* Another trx ahead has requested a lock in an
6747 			incompatible mode, and is itself waiting for a lock. */
6748 
6749 			++m_cost;
6750 
6751 			if (!push(lock, heap_no)) {
6752 				m_too_deep = true;
6753 				return m_start;
6754 			}
6755 
6756 			m_wait_lock = lock->trx->lock.wait_lock;
6757 
6758 			lock = get_first_lock(&heap_no);
6759 
6760 			if (is_visited(lock)) {
6761 				lock = get_next_lock(lock, heap_no);
6762 			}
6763 		} else {
6764 			lock = get_next_lock(lock, heap_no);
6765 		}
6766 	}
6767 
6768 	ut_a(lock == NULL && m_n_elems == 0);
6769 
6770 	/* No deadlock found. */
6771 	return(0);
6772 }
6773 
6774 /** Print info about transaction that was rolled back.
6775 @param trx transaction rolled back
6776 @param lock lock trx wants */
6777 void
rollback_print(const trx_t * trx,const lock_t * lock)6778 DeadlockChecker::rollback_print(const trx_t* trx, const lock_t* lock)
6779 {
6780 	ut_ad(lock_mutex_own());
6781 
6782 	/* If the lock search exceeds the max step
6783 	or the max depth, the current trx will be
6784 	the victim. Print its information. */
6785 	start_print();
6786 
6787 	print("TOO DEEP OR LONG SEARCH IN THE LOCK TABLE"
6788 	      " WAITS-FOR GRAPH, WE WILL ROLL BACK"
6789 	      " FOLLOWING TRANSACTION \n\n"
6790 	      "*** TRANSACTION:\n");
6791 
6792 	print(trx, 3000);
6793 
6794 	print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
6795 
6796 	print(lock);
6797 }
6798 
6799 /** Rollback transaction selected as the victim. */
6800 void
trx_rollback()6801 DeadlockChecker::trx_rollback()
6802 {
6803 	ut_ad(lock_mutex_own());
6804 
6805 	trx_t*	trx = m_wait_lock->trx;
6806 
6807 	print("*** WE ROLL BACK TRANSACTION (1)\n");
6808 #ifdef WITH_WSREP
6809 	if (trx->is_wsrep() && wsrep_thd_is_SR(trx->mysql_thd)) {
6810 		wsrep_handle_SR_rollback(m_start->mysql_thd, trx->mysql_thd);
6811 	}
6812 #endif
6813 
6814 	trx_mutex_enter(trx);
6815 
6816 	trx->lock.was_chosen_as_deadlock_victim = true;
6817 
6818 	lock_cancel_waiting_and_release(trx->lock.wait_lock);
6819 
6820 	trx_mutex_exit(trx);
6821 }
6822 
6823 /** Check if a joining lock request results in a deadlock.
6824 If a deadlock is found, we will resolve the deadlock by
6825 choosing a victim transaction and rolling it back.
6826 We will attempt to resolve all deadlocks.
6827 
6828 @param[in]	lock	the lock request
6829 @param[in,out]	trx	transaction requesting the lock
6830 
6831 @return trx if it was chosen as victim
6832 @retval	NULL if another victim was chosen,
6833 or there is no deadlock (any more) */
6834 const trx_t*
check_and_resolve(const lock_t * lock,trx_t * trx)6835 DeadlockChecker::check_and_resolve(const lock_t* lock, trx_t* trx)
6836 {
6837 	ut_ad(lock_mutex_own());
6838 	ut_ad(trx_mutex_own(trx));
6839 	ut_ad(trx->state == TRX_STATE_ACTIVE);
6840 	ut_ad(!trx->auto_commit || trx->will_lock);
6841 	ut_ad(!srv_read_only_mode);
6842 
6843 	if (!innobase_deadlock_detect) {
6844 		return(NULL);
6845 	}
6846 
6847 	/*  Release the mutex to obey the latching order.
6848 	This is safe, because DeadlockChecker::check_and_resolve()
6849 	is invoked when a lock wait is enqueued for the currently
6850 	running transaction. Because m_trx is a running transaction
6851 	(it is not currently suspended because of a lock wait),
6852 	its state can only be changed by this thread, which is
6853 	currently associated with the transaction. */
6854 
6855 	trx_mutex_exit(trx);
6856 
6857 	const trx_t*	victim_trx;
6858 	const bool	report_waiters = trx->mysql_thd
6859 		&& thd_need_wait_reports(trx->mysql_thd);
6860 
6861 	/* Try and resolve as many deadlocks as possible. */
6862 	do {
6863 		DeadlockChecker	checker(trx, lock, s_lock_mark_counter,
6864 					report_waiters);
6865 
6866 		victim_trx = checker.search();
6867 
6868 		/* Search too deep, we rollback the joining transaction only
6869 		if it is possible to rollback. Otherwise we rollback the
6870 		transaction that is holding the lock that the joining
6871 		transaction wants. */
6872 		if (checker.is_too_deep()) {
6873 
6874 			ut_ad(trx == checker.m_start);
6875 			ut_ad(trx == victim_trx);
6876 
6877 			rollback_print(victim_trx, lock);
6878 
6879 			MONITOR_INC(MONITOR_DEADLOCK);
6880 			srv_stats.lock_deadlock_count.inc();
6881 
6882 			break;
6883 
6884 		} else if (victim_trx != NULL && victim_trx != trx) {
6885 
6886 			ut_ad(victim_trx == checker.m_wait_lock->trx);
6887 
6888 			checker.trx_rollback();
6889 
6890 			lock_deadlock_found = true;
6891 
6892 			MONITOR_INC(MONITOR_DEADLOCK);
6893 			srv_stats.lock_deadlock_count.inc();
6894 		}
6895 
6896 	} while (victim_trx != NULL && victim_trx != trx);
6897 
6898 	/* If the joining transaction was selected as the victim. */
6899 	if (victim_trx != NULL) {
6900 
6901 		print("*** WE ROLL BACK TRANSACTION (2)\n");
6902 #ifdef WITH_WSREP
6903 		if (trx->is_wsrep() && wsrep_thd_is_SR(trx->mysql_thd)) {
6904 			wsrep_handle_SR_rollback(trx->mysql_thd,
6905 						 victim_trx->mysql_thd);
6906 		}
6907 #endif
6908 
6909 		lock_deadlock_found = true;
6910 	}
6911 
6912 	trx_mutex_enter(trx);
6913 
6914 	return(victim_trx);
6915 }
6916 
6917 /*************************************************************//**
6918 Updates the lock table when a page is split and merged to
6919 two pages. */
6920 UNIV_INTERN
6921 void
lock_update_split_and_merge(const buf_block_t * left_block,const rec_t * orig_pred,const buf_block_t * right_block)6922 lock_update_split_and_merge(
6923 	const buf_block_t* left_block,	/*!< in: left page to which merged */
6924 	const rec_t* orig_pred,		/*!< in: original predecessor of
6925 					supremum on the left page before merge*/
6926 	const buf_block_t* right_block)	/*!< in: right page from which merged */
6927 {
6928 	const rec_t* left_next_rec;
6929 
6930 	ut_ad(page_is_leaf(left_block->frame));
6931 	ut_ad(page_is_leaf(right_block->frame));
6932 	ut_ad(page_align(orig_pred) == left_block->frame);
6933 
6934 	lock_mutex_enter();
6935 
6936 	left_next_rec = page_rec_get_next_const(orig_pred);
6937 	ut_ad(!page_rec_is_metadata(left_next_rec));
6938 
6939 	/* Inherit the locks on the supremum of the left page to the
6940 	first record which was moved from the right page */
6941 	lock_rec_inherit_to_gap(
6942 		left_block, left_block,
6943 		page_rec_get_heap_no(left_next_rec),
6944 		PAGE_HEAP_NO_SUPREMUM);
6945 
6946 	/* Reset the locks on the supremum of the left page,
6947 	releasing waiting transactions */
6948 	lock_rec_reset_and_release_wait(left_block,
6949 					PAGE_HEAP_NO_SUPREMUM);
6950 
6951 	/* Inherit the locks to the supremum of the left page from the
6952 	successor of the infimum on the right page */
6953 	lock_rec_inherit_to_gap(left_block, right_block,
6954 				PAGE_HEAP_NO_SUPREMUM,
6955 				lock_get_min_heap_no(right_block));
6956 
6957 	lock_mutex_exit();
6958 }
6959