1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2014, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file lock/lock0lock.cc
22 The transaction lock system
23 
24 Created 5/7/1996 Heikki Tuuri
25 *******************************************************/
26 
27 #define LOCK_MODULE_IMPLEMENTATION
28 
29 #include "univ.i"
30 
31 #include <mysql/service_thd_error_context.h>
32 #include <sql_class.h>
33 
34 #include "lock0lock.h"
35 #include "lock0priv.h"
36 #include "dict0mem.h"
37 #include "trx0purge.h"
38 #include "trx0sys.h"
39 #include "ut0vec.h"
40 #include "btr0cur.h"
41 #include "row0sel.h"
42 #include "row0mysql.h"
43 #include "row0vers.h"
44 #include "pars0pars.h"
45 
46 #include <set>
47 
48 #ifdef WITH_WSREP
49 #include <mysql/service_wsrep.h>
50 #endif /* WITH_WSREP */
51 
52 /** Lock scheduling algorithm */
53 ulong innodb_lock_schedule_algorithm;
54 
55 /** The value of innodb_deadlock_detect */
56 my_bool	innobase_deadlock_detect;
57 
58 /*********************************************************************//**
59 Checks if a waiting record lock request still has to wait in a queue.
60 @return lock that is causing the wait */
61 static
62 const lock_t*
63 lock_rec_has_to_wait_in_queue(
64 /*==========================*/
65 	const lock_t*	wait_lock);	/*!< in: waiting record lock */
66 
67 /** Grant a lock to a waiting lock request and release the waiting transaction
68 after lock_reset_lock_and_trx_wait() has been called. */
69 static void lock_grant_after_reset(lock_t* lock);
70 
71 extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
72 extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
73 extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
74 
75 /** Pretty-print a table lock.
76 @param[in,out]	file	output stream
77 @param[in]	lock	table lock */
78 static void lock_table_print(FILE* file, const lock_t* lock);
79 
80 /** Pretty-print a record lock.
81 @param[in,out]	file	output stream
82 @param[in]	lock	record lock
83 @param[in,out]	mtr	mini-transaction for accessing the record */
84 static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr);
85 
86 /** Deadlock checker. */
87 class DeadlockChecker {
88 public:
89 	/** Check if a joining lock request results in a deadlock.
90 	If a deadlock is found, we will resolve the deadlock by
91 	choosing a victim transaction and rolling it back.
92 	We will attempt to resolve all deadlocks.
93 
94 	@param[in]	lock	the lock request
95 	@param[in,out]	trx	transaction requesting the lock
96 
97 	@return trx if it was chosen as victim
98 	@retval	NULL if another victim was chosen,
99 	or there is no deadlock (any more) */
100 	static const trx_t* check_and_resolve(const lock_t* lock, trx_t* trx);
101 
102 private:
103 	/** Do a shallow copy. Default destructor OK.
104 	@param trx the start transaction (start node)
105 	@param wait_lock lock that a transaction wants
106 	@param mark_start visited node counter
107 	@param report_waiters whether to call thd_rpl_deadlock_check() */
DeadlockChecker(const trx_t * trx,const lock_t * wait_lock,ib_uint64_t mark_start,bool report_waiters)108 	DeadlockChecker(
109 		const trx_t*	trx,
110 		const lock_t*	wait_lock,
111 		ib_uint64_t	mark_start,
112 		bool report_waiters)
113 		:
114 		m_cost(),
115 		m_start(trx),
116 		m_too_deep(),
117 		m_wait_lock(wait_lock),
118 		m_mark_start(mark_start),
119 		m_n_elems(),
120 		m_report_waiters(report_waiters)
121 	{
122 	}
123 
124 	/** Check if the search is too deep. */
is_too_deep() const125 	bool is_too_deep() const
126 	{
127 		return(m_n_elems > LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK
128 		       || m_cost > LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK);
129 	}
130 
131 	/** Save current state.
132 	@param lock lock to push on the stack.
133 	@param heap_no the heap number to push on the stack.
134 	@return false if stack is full. */
push(const lock_t * lock,ulint heap_no)135 	bool push(const lock_t*	lock, ulint heap_no)
136 	{
137 		ut_ad((lock_get_type_low(lock) & LOCK_REC)
138 		      || (lock_get_type_low(lock) & LOCK_TABLE));
139 
140 		ut_ad(((lock_get_type_low(lock) & LOCK_TABLE) != 0)
141 		      == (heap_no == ULINT_UNDEFINED));
142 
143 		/* Ensure that the stack is bounded. */
144 		if (m_n_elems >= UT_ARR_SIZE(s_states)) {
145 			return(false);
146 		}
147 
148 		state_t&	state = s_states[m_n_elems++];
149 
150 		state.m_lock = lock;
151 		state.m_wait_lock = m_wait_lock;
152 		state.m_heap_no =heap_no;
153 
154 		return(true);
155 	}
156 
157 	/** Restore state.
158 	@param[out] lock current lock
159 	@param[out] heap_no current heap_no */
pop(const lock_t * & lock,ulint & heap_no)160 	void pop(const lock_t*& lock, ulint& heap_no)
161 	{
162 		ut_a(m_n_elems > 0);
163 
164 		const state_t&	state = s_states[--m_n_elems];
165 
166 		lock = state.m_lock;
167 		heap_no = state.m_heap_no;
168 		m_wait_lock = state.m_wait_lock;
169 	}
170 
171 	/** Check whether the node has been visited.
172 	@param lock lock to check
173 	@return true if the node has been visited */
is_visited(const lock_t * lock) const174 	bool is_visited(const lock_t* lock) const
175 	{
176 		return(lock->trx->lock.deadlock_mark > m_mark_start);
177 	}
178 
179 	/** Get the next lock in the queue that is owned by a transaction
180 	whose sub-tree has not already been searched.
181 	Note: "next" here means PREV for table locks.
182 	@param lock Lock in queue
183 	@param heap_no heap_no if lock is a record lock else ULINT_UNDEFINED
184 	@return next lock or NULL if at end of queue */
185 	const lock_t* get_next_lock(const lock_t* lock, ulint heap_no) const;
186 
187 	/** Get the first lock to search. The search starts from the current
188 	wait_lock. What we are really interested in is an edge from the
189 	current wait_lock's owning transaction to another transaction that has
190 	a lock ahead in the queue. We skip locks where the owning transaction's
191 	sub-tree has already been searched.
192 
193 	Note: The record locks are traversed from the oldest lock to the
194 	latest. For table locks we go from latest to oldest.
195 
196 	For record locks, we first position the iterator on first lock on
197 	the page and then reposition on the actual heap_no. This is required
198 	due to the way the record lock has is implemented.
199 
200 	@param[out] heap_no if rec lock, else ULINT_UNDEFINED.
201 
202 	@return first lock or NULL */
203 	const lock_t* get_first_lock(ulint* heap_no) const;
204 
205 	/** Notify that a deadlock has been detected and print the conflicting
206 	transaction info.
207 	@param lock lock causing deadlock */
208 	void notify(const lock_t* lock) const;
209 
210 	/** Select the victim transaction that should be rolledback.
211 	@return victim transaction */
212 	const trx_t* select_victim() const;
213 
214 	/** Rollback transaction selected as the victim. */
215 	void trx_rollback();
216 
217 	/** Looks iteratively for a deadlock. Note: the joining transaction
218 	may have been granted its lock by the deadlock checks.
219 
220 	@return 0 if no deadlock else the victim transaction.*/
221 	const trx_t* search();
222 
223 	/** Print transaction data to the deadlock file and possibly to stderr.
224 	@param trx transaction
225 	@param max_query_len max query length to print */
226 	static void print(const trx_t* trx, ulint max_query_len);
227 
228 	/** rewind(3) the file used for storing the latest detected deadlock
229 	and print a heading message to stderr if printing of all deadlocks to
230 	stderr is enabled. */
231 	static void start_print();
232 
233 	/** Print lock data to the deadlock file and possibly to stderr.
234 	@param lock record or table type lock */
235 	static void print(const lock_t* lock);
236 
237 	/** Print a message to the deadlock file and possibly to stderr.
238 	@param msg message to print */
239 	static void print(const char* msg);
240 
241 	/** Print info about transaction that was rolled back.
242 	@param trx transaction rolled back
243 	@param lock lock trx wants */
244 	static void rollback_print(const trx_t* trx, const lock_t* lock);
245 
246 private:
247 	/** DFS state information, used during deadlock checking. */
248 	struct state_t {
249 		const lock_t*	m_lock;		/*!< Current lock */
250 		const lock_t*	m_wait_lock;	/*!< Waiting for lock */
251 		ulint		m_heap_no;	/*!< heap number if rec lock */
252 	};
253 
254 	/** Used in deadlock tracking. Protected by lock_sys.mutex. */
255 	static ib_uint64_t	s_lock_mark_counter;
256 
257 	/** Calculation steps thus far. It is the count of the nodes visited. */
258 	ulint			m_cost;
259 
260 	/** Joining transaction that is requesting a lock in an
261 	incompatible mode */
262 	const trx_t*		m_start;
263 
264 	/** TRUE if search was too deep and was aborted */
265 	bool			m_too_deep;
266 
267 	/** Lock that trx wants */
268 	const lock_t*		m_wait_lock;
269 
270 	/**  Value of lock_mark_count at the start of the deadlock check. */
271 	ib_uint64_t		m_mark_start;
272 
273 	/** Number of states pushed onto the stack */
274 	size_t			m_n_elems;
275 
276 	/** This is to avoid malloc/free calls. */
277 	static state_t		s_states[MAX_STACK_SIZE];
278 
279 	/** Set if thd_rpl_deadlock_check() should be called for waits. */
280 	const bool m_report_waiters;
281 };
282 
283 /** Counter to mark visited nodes during deadlock search. */
284 ib_uint64_t	DeadlockChecker::s_lock_mark_counter = 0;
285 
286 /** The stack used for deadlock searches. */
287 DeadlockChecker::state_t	DeadlockChecker::s_states[MAX_STACK_SIZE];
288 
289 #ifdef UNIV_DEBUG
290 /*********************************************************************//**
291 Validates the lock system.
292 @return TRUE if ok */
293 static
294 bool
295 lock_validate();
296 /*============*/
297 
298 /*********************************************************************//**
299 Validates the record lock queues on a page.
300 @return TRUE if ok */
301 static
302 ibool
303 lock_rec_validate_page(
304 /*===================*/
305 	const buf_block_t*	block)	/*!< in: buffer block */
306 	MY_ATTRIBUTE((warn_unused_result));
307 #endif /* UNIV_DEBUG */
308 
309 /* The lock system */
310 lock_sys_t lock_sys;
311 
312 /** We store info on the latest deadlock error to this buffer. InnoDB
313 Monitor will then fetch it and print */
314 static bool	lock_deadlock_found = false;
315 
316 /** Only created if !srv_read_only_mode */
317 static FILE*		lock_latest_err_file;
318 
319 /*********************************************************************//**
320 Reports that a transaction id is insensible, i.e., in the future. */
321 ATTRIBUTE_COLD
322 void
lock_report_trx_id_insanity(trx_id_t trx_id,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,trx_id_t max_trx_id)323 lock_report_trx_id_insanity(
324 /*========================*/
325 	trx_id_t	trx_id,		/*!< in: trx id */
326 	const rec_t*	rec,		/*!< in: user record */
327 	dict_index_t*	index,		/*!< in: index */
328 	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec, index) */
329 	trx_id_t	max_trx_id)	/*!< in: trx_sys.get_max_trx_id() */
330 {
331 	ut_ad(rec_offs_validate(rec, index, offsets));
332 	ut_ad(!rec_is_metadata(rec, index));
333 
334 	ib::error()
335 		<< "Transaction id " << ib::hex(trx_id)
336 		<< " associated with record" << rec_offsets_print(rec, offsets)
337 		<< " in index " << index->name
338 		<< " of table " << index->table->name
339 		<< " is greater than the global counter " << max_trx_id
340 		<< "! The table is corrupted.";
341 }
342 
343 /*********************************************************************//**
344 Checks that a transaction id is sensible, i.e., not in the future.
345 @return true if ok */
346 bool
lock_check_trx_id_sanity(trx_id_t trx_id,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)347 lock_check_trx_id_sanity(
348 /*=====================*/
349 	trx_id_t	trx_id,		/*!< in: trx id */
350 	const rec_t*	rec,		/*!< in: user record */
351 	dict_index_t*	index,		/*!< in: index */
352 	const rec_offs*	offsets)	/*!< in: rec_get_offsets(rec, index) */
353 {
354   ut_ad(rec_offs_validate(rec, index, offsets));
355   ut_ad(!rec_is_metadata(rec, index));
356 
357   trx_id_t max_trx_id= trx_sys.get_max_trx_id();
358   ut_ad(max_trx_id || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);
359 
360   if (UNIV_LIKELY(max_trx_id != 0) && UNIV_UNLIKELY(trx_id >= max_trx_id))
361   {
362     lock_report_trx_id_insanity(trx_id, rec, index, offsets, max_trx_id);
363     return false;
364   }
365   return true;
366 }
367 
368 /*********************************************************************//**
369 Checks that a record is seen in a consistent read.
370 @return true if sees, or false if an earlier version of the record
371 should be retrieved */
372 bool
lock_clust_rec_cons_read_sees(const rec_t * rec,dict_index_t * index,const rec_offs * offsets,ReadView * view)373 lock_clust_rec_cons_read_sees(
374 /*==========================*/
375 	const rec_t*	rec,	/*!< in: user record which should be read or
376 				passed over by a read cursor */
377 	dict_index_t*	index,	/*!< in: clustered index */
378 	const rec_offs*	offsets,/*!< in: rec_get_offsets(rec, index) */
379 	ReadView*	view)	/*!< in: consistent read view */
380 {
381 	ut_ad(dict_index_is_clust(index));
382 	ut_ad(page_rec_is_user_rec(rec));
383 	ut_ad(rec_offs_validate(rec, index, offsets));
384 	ut_ad(!rec_is_metadata(rec, index));
385 
386 	/* Temp-tables are not shared across connections and multiple
387 	transactions from different connections cannot simultaneously
388 	operate on same temp-table and so read of temp-table is
389 	always consistent read. */
390 	if (index->table->is_temporary()) {
391 		return(true);
392 	}
393 
394 	/* NOTE that we call this function while holding the search
395 	system latch. */
396 
397 	trx_id_t	trx_id = row_get_rec_trx_id(rec, index, offsets);
398 
399 	return(view->changes_visible(trx_id, index->table->name));
400 }
401 
402 /*********************************************************************//**
403 Checks that a non-clustered index record is seen in a consistent read.
404 
405 NOTE that a non-clustered index page contains so little information on
406 its modifications that also in the case false, the present version of
407 rec may be the right, but we must check this from the clustered index
408 record.
409 
410 @return true if certainly sees, or false if an earlier version of the
411 clustered index record might be needed */
412 bool
lock_sec_rec_cons_read_sees(const rec_t * rec,const dict_index_t * index,const ReadView * view)413 lock_sec_rec_cons_read_sees(
414 /*========================*/
415 	const rec_t*		rec,	/*!< in: user record which
416 					should be read or passed over
417 					by a read cursor */
418 	const dict_index_t*	index,	/*!< in: index */
419 	const ReadView*	view)	/*!< in: consistent read view */
420 {
421 	ut_ad(page_rec_is_user_rec(rec));
422 	ut_ad(!index->is_primary());
423 	ut_ad(!rec_is_metadata(rec, index));
424 
425 	/* NOTE that we might call this function while holding the search
426 	system latch. */
427 
428 	if (index->table->is_temporary()) {
429 
430 		/* Temp-tables are not shared across connections and multiple
431 		transactions from different connections cannot simultaneously
432 		operate on same temp-table and so read of temp-table is
433 		always consistent read. */
434 
435 		return(true);
436 	}
437 
438 	trx_id_t	max_trx_id = page_get_max_trx_id(page_align(rec));
439 
440 	ut_ad(max_trx_id > 0);
441 
442 	return(view->sees(max_trx_id));
443 }
444 
445 
446 /**
447   Creates the lock system at database start.
448 
449   @param[in] n_cells number of slots in lock hash table
450 */
create(ulint n_cells)451 void lock_sys_t::create(ulint n_cells)
452 {
453 	ut_ad(this == &lock_sys);
454 
455 	m_initialised= true;
456 
457 	waiting_threads = static_cast<srv_slot_t*>
458 		(ut_zalloc_nokey(srv_max_n_threads * sizeof *waiting_threads));
459 	last_slot = waiting_threads;
460 
461 	mutex_create(LATCH_ID_LOCK_SYS, &mutex);
462 
463 	mutex_create(LATCH_ID_LOCK_SYS_WAIT, &wait_mutex);
464 
465 	timeout_event = os_event_create(0);
466 
467 	rec_hash = hash_create(n_cells);
468 	prdt_hash = hash_create(n_cells);
469 	prdt_page_hash = hash_create(n_cells);
470 
471 	if (!srv_read_only_mode) {
472 		lock_latest_err_file = os_file_create_tmpfile();
473 		ut_a(lock_latest_err_file);
474 	}
475 }
476 
477 /** Calculates the fold value of a lock: used in migrating the hash table.
478 @param[in]	lock	record lock object
479 @return	folded value */
480 static
481 ulint
lock_rec_lock_fold(const lock_t * lock)482 lock_rec_lock_fold(
483 	const lock_t*	lock)
484 {
485 	return(lock_rec_fold(lock->un_member.rec_lock.space,
486 			     lock->un_member.rec_lock.page_no));
487 }
488 
489 
490 /**
491   Resize the lock hash table.
492 
493   @param[in] n_cells number of slots in lock hash table
494 */
resize(ulint n_cells)495 void lock_sys_t::resize(ulint n_cells)
496 {
497 	ut_ad(this == &lock_sys);
498 
499 	mutex_enter(&mutex);
500 
501 	hash_table_t* old_hash = rec_hash;
502 	rec_hash = hash_create(n_cells);
503 	HASH_MIGRATE(old_hash, rec_hash, lock_t, hash,
504 		     lock_rec_lock_fold);
505 	hash_table_free(old_hash);
506 
507 	old_hash = prdt_hash;
508 	prdt_hash = hash_create(n_cells);
509 	HASH_MIGRATE(old_hash, prdt_hash, lock_t, hash,
510 		     lock_rec_lock_fold);
511 	hash_table_free(old_hash);
512 
513 	old_hash = prdt_page_hash;
514 	prdt_page_hash = hash_create(n_cells);
515 	HASH_MIGRATE(old_hash, prdt_page_hash, lock_t, hash,
516 		     lock_rec_lock_fold);
517 	hash_table_free(old_hash);
518 
519 	/* need to update block->lock_hash_val */
520 	for (ulint i = 0; i < srv_buf_pool_instances; ++i) {
521 		buf_pool_t*	buf_pool = buf_pool_from_array(i);
522 
523 		buf_pool_mutex_enter(buf_pool);
524 		buf_page_t*	bpage;
525 		bpage = UT_LIST_GET_FIRST(buf_pool->LRU);
526 
527 		while (bpage != NULL) {
528 			if (buf_page_get_state(bpage)
529 			    == BUF_BLOCK_FILE_PAGE) {
530 				buf_block_t*	block;
531 				block = reinterpret_cast<buf_block_t*>(
532 					bpage);
533 
534 				block->lock_hash_val
535 					= lock_rec_hash(
536 						bpage->id.space(),
537 						bpage->id.page_no());
538 			}
539 			bpage = UT_LIST_GET_NEXT(LRU, bpage);
540 		}
541 		buf_pool_mutex_exit(buf_pool);
542 	}
543 
544 	mutex_exit(&mutex);
545 }
546 
547 
548 /** Closes the lock system at database shutdown. */
close()549 void lock_sys_t::close()
550 {
551 	ut_ad(this == &lock_sys);
552 
553 	if (!m_initialised) return;
554 
555 	if (lock_latest_err_file != NULL) {
556 		fclose(lock_latest_err_file);
557 		lock_latest_err_file = NULL;
558 	}
559 
560 	hash_table_free(rec_hash);
561 	hash_table_free(prdt_hash);
562 	hash_table_free(prdt_page_hash);
563 
564 	os_event_destroy(timeout_event);
565 
566 	mutex_destroy(&mutex);
567 	mutex_destroy(&wait_mutex);
568 
569 	for (ulint i = srv_max_n_threads; i--; ) {
570 		if (os_event_t& event = waiting_threads[i].event) {
571 			os_event_destroy(event);
572 		}
573 	}
574 
575 	ut_free(waiting_threads);
576 	m_initialised= false;
577 }
578 
579 /*********************************************************************//**
580 Gets the size of a lock struct.
581 @return size in bytes */
582 ulint
lock_get_size(void)583 lock_get_size(void)
584 /*===============*/
585 {
586 	return((ulint) sizeof(lock_t));
587 }
588 
lock_grant_have_trx_mutex(lock_t * lock)589 static inline void lock_grant_have_trx_mutex(lock_t* lock)
590 {
591 	lock_reset_lock_and_trx_wait(lock);
592 	lock_grant_after_reset(lock);
593 }
594 
595 /*********************************************************************//**
596 Gets the gap flag of a record lock.
597 @return LOCK_GAP or 0 */
598 UNIV_INLINE
599 ulint
lock_rec_get_gap(const lock_t * lock)600 lock_rec_get_gap(
601 /*=============*/
602 	const lock_t*	lock)	/*!< in: record lock */
603 {
604 	ut_ad(lock);
605 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
606 
607 	return(lock->type_mode & LOCK_GAP);
608 }
609 
610 /*********************************************************************//**
611 Gets the LOCK_REC_NOT_GAP flag of a record lock.
612 @return LOCK_REC_NOT_GAP or 0 */
613 UNIV_INLINE
614 ulint
lock_rec_get_rec_not_gap(const lock_t * lock)615 lock_rec_get_rec_not_gap(
616 /*=====================*/
617 	const lock_t*	lock)	/*!< in: record lock */
618 {
619 	ut_ad(lock);
620 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
621 
622 	return(lock->type_mode & LOCK_REC_NOT_GAP);
623 }
624 
625 /*********************************************************************//**
626 Gets the waiting insert flag of a record lock.
627 @return LOCK_INSERT_INTENTION or 0 */
628 UNIV_INLINE
629 ulint
lock_rec_get_insert_intention(const lock_t * lock)630 lock_rec_get_insert_intention(
631 /*==========================*/
632 	const lock_t*	lock)	/*!< in: record lock */
633 {
634 	ut_ad(lock);
635 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
636 
637 	return(lock->type_mode & LOCK_INSERT_INTENTION);
638 }
639 
640 #ifdef UNIV_DEBUG
641 #ifdef WITH_WSREP
642 /** Check if both conflicting lock transaction and other transaction
643 requesting record lock are brute force (BF). If they are check is
644 this BF-BF wait correct and if not report BF wait and assert.
645 
646 @param[in]	lock_rec	other waiting record lock
647 @param[in]	trx		trx requesting conflicting record lock
648 */
wsrep_assert_no_bf_bf_wait(const lock_t * lock,const trx_t * trx)649 static void wsrep_assert_no_bf_bf_wait(const lock_t *lock, const trx_t *trx)
650 {
651 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
652 	ut_ad(lock_mutex_own());
653 	trx_t* lock_trx= lock->trx;
654 
655 	/* Note that we are holding lock_sys->mutex, thus we should
656 	not acquire THD::LOCK_thd_data mutex below to avoid mutexing
657 	order violation. */
658 
659 	if (!trx->is_wsrep() || !lock_trx->is_wsrep())
660 		return;
661 	if (UNIV_LIKELY(!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
662 	    || UNIV_LIKELY(!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)))
663 		return;
664 
665 	ut_ad(trx->state == TRX_STATE_ACTIVE);
666 
667 	trx_mutex_enter(lock_trx);
668 	const trx_state_t trx2_state= lock_trx->state;
669 	trx_mutex_exit(lock_trx);
670 
671 	/* If transaction is already committed in memory or
672 	prepared we should wait. When transaction is committed in
673 	memory we held trx mutex, but not lock_sys->mutex. Therefore,
674 	we could end here before transaction has time to do
675 	lock_release() that is protected with lock_sys->mutex. */
676 	switch (trx2_state) {
677 	case TRX_STATE_COMMITTED_IN_MEMORY:
678 	case TRX_STATE_PREPARED:
679 		return;
680 	case TRX_STATE_ACTIVE:
681 		break;
682 	default:
683 		ut_ad("invalid state" == 0);
684 	}
685 
686 	/* If BF - BF order is honored, i.e. trx already holding
687 	record lock should be ordered before this new lock request
688 	we can keep trx waiting for the lock. If conflicting
689 	transaction is already aborting or rolling back for replaying
690 	we can also let new transaction waiting. */
691 	if (wsrep_trx_order_before(lock_trx->mysql_thd, trx->mysql_thd)
692 	    || wsrep_trx_is_aborting(lock_trx->mysql_thd))
693 		return;
694 
695 	mtr_t mtr;
696 
697 	ib::error() << "Conflicting lock on table: "
698 		    << lock->index->table->name
699 		    << " index: "
700 		    << lock->index->name()
701 		    << " that has lock ";
702 	lock_rec_print(stderr, lock, mtr);
703 
704 	ib::error() << "WSREP state: ";
705 
706 	wsrep_report_bf_lock_wait(trx->mysql_thd,
707 				  trx->id);
708 	wsrep_report_bf_lock_wait(lock_trx->mysql_thd,
709 				  lock_trx->id);
710 	/* BF-BF wait is a bug */
711 	ut_error;
712 }
713 #endif /* WITH_WSREP */
714 #endif /* UNIV_DEBUG */
715 
716 /*********************************************************************//**
717 Checks if a lock request for a new lock has to wait for request lock2.
718 @return TRUE if new lock has to wait for lock2 to be removed */
719 UNIV_INLINE
720 bool
lock_rec_has_to_wait(bool for_locking,const trx_t * trx,ulint type_mode,const lock_t * lock2,bool lock_is_on_supremum)721 lock_rec_has_to_wait(
722 /*=================*/
723 	bool		for_locking,
724 				/*!< in is called locking or releasing */
725 	const trx_t*	trx,	/*!< in: trx of new lock */
726 	ulint		type_mode,/*!< in: precise mode of the new lock
727 				to set: LOCK_S or LOCK_X, possibly
728 				ORed to LOCK_GAP or LOCK_REC_NOT_GAP,
729 				LOCK_INSERT_INTENTION */
730 	const lock_t*	lock2,	/*!< in: another record lock; NOTE that
731 				it is assumed that this has a lock bit
732 				set on the same record as in the new
733 				lock we are setting */
734 	bool		lock_is_on_supremum)
735 				/*!< in: TRUE if we are setting the
736 				lock on the 'supremum' record of an
737 				index page: we know then that the lock
738 				request is really for a 'gap' type lock */
739 {
740 	ut_ad(trx && lock2);
741 	ut_ad(lock_get_type_low(lock2) == LOCK_REC);
742 	ut_ad(lock_mutex_own());
743 
744 	if (trx == lock2->trx
745 	    || lock_mode_compatible(
746 		       static_cast<lock_mode>(LOCK_MODE_MASK & type_mode),
747 		       lock_get_mode(lock2))) {
748 		return false;
749 	}
750 
751 	/* We have somewhat complex rules when gap type record locks
752 	cause waits */
753 
754 	if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
755 	    && !(type_mode & LOCK_INSERT_INTENTION)) {
756 
757 		/* Gap type locks without LOCK_INSERT_INTENTION flag
758 		do not need to wait for anything. This is because
759 		different users can have conflicting lock types
760 		on gaps. */
761 
762 		return false;
763 	}
764 
765 	if (!(type_mode & LOCK_INSERT_INTENTION) && lock_rec_get_gap(lock2)) {
766 
767 		/* Record lock (LOCK_ORDINARY or LOCK_REC_NOT_GAP
768 		does not need to wait for a gap type lock */
769 
770 		return false;
771 	}
772 
773 	if ((type_mode & LOCK_GAP) && lock_rec_get_rec_not_gap(lock2)) {
774 
775 		/* Lock on gap does not need to wait for
776 		a LOCK_REC_NOT_GAP type lock */
777 
778 		return false;
779 	}
780 
781 	if (lock_rec_get_insert_intention(lock2)) {
782 
783 		/* No lock request needs to wait for an insert
784 		intention lock to be removed. This is ok since our
785 		rules allow conflicting locks on gaps. This eliminates
786 		a spurious deadlock caused by a next-key lock waiting
787 		for an insert intention lock; when the insert
788 		intention lock was granted, the insert deadlocked on
789 		the waiting next-key lock.
790 
791 		Also, insert intention locks do not disturb each
792 		other. */
793 
794 		return false;
795 	}
796 
797 	if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2))
798 	    && !thd_need_ordering_with(trx->mysql_thd, lock2->trx->mysql_thd)) {
799 		/* If the upper server layer has already decided on the
800 		commit order between the transaction requesting the
801 		lock and the transaction owning the lock, we do not
802 		need to wait for gap locks. Such ordeering by the upper
803 		server layer happens in parallel replication, where the
804 		commit order is fixed to match the original order on the
805 		master.
806 
807 		Such gap locks are mainly needed to get serialisability
808 		between transactions so that they will be binlogged in
809 		the correct order so that statement-based replication
810 		will give the correct results. Since the right order
811 		was already determined on the master, we do not need
812 		to enforce it again here.
813 
814 		Skipping the locks is not essential for correctness,
815 		since in case of deadlock we will just kill the later
816 		transaction and retry it. But it can save some
817 		unnecessary rollbacks and retries. */
818 
819 		return false;
820 	}
821 
822 #ifdef WITH_WSREP
823 		/* New lock request from a transaction is using unique key
824 		scan and this transaction is a wsrep high priority transaction
825 		(brute force). If conflicting transaction is also wsrep high
826 		priority transaction we should avoid lock conflict because
827 		ordering of these transactions is already decided and
828 		conflicting transaction will be later replayed. Note
829 		that thread holding conflicting lock can't be
830 		committed or rolled back while we hold
831 		lock_sys->mutex. */
832 		if (trx->is_wsrep_UK_scan()
833 		    && wsrep_thd_is_BF(lock2->trx->mysql_thd, false)) {
834 			return false;
835 		}
836 
837 		/* We very well can let bf to wait normally as other
838 		BF will be replayed in case of conflict. For debug
839 		builds we will do additional sanity checks to catch
840 		unsupported bf wait if any. */
841 		ut_d(wsrep_assert_no_bf_bf_wait(lock2, trx));
842 #endif /* WITH_WSREP */
843 
844 	return true;
845 }
846 
847 /*********************************************************************//**
848 Checks if a lock request lock1 has to wait for request lock2.
849 @return TRUE if lock1 has to wait for lock2 to be removed */
850 bool
lock_has_to_wait(const lock_t * lock1,const lock_t * lock2)851 lock_has_to_wait(
852 /*=============*/
853 	const lock_t*	lock1,	/*!< in: waiting lock */
854 	const lock_t*	lock2)	/*!< in: another lock; NOTE that it is
855 				assumed that this has a lock bit set
856 				on the same record as in lock1 if the
857 				locks are record locks */
858 {
859 	ut_ad(lock1 && lock2);
860 
861 	if (lock1->trx == lock2->trx
862 	    || lock_mode_compatible(lock_get_mode(lock1),
863 				    lock_get_mode(lock2))) {
864 		return false;
865 	}
866 
867 	if (lock_get_type_low(lock1) != LOCK_REC) {
868 		return true;
869 	}
870 
871 	ut_ad(lock_get_type_low(lock2) == LOCK_REC);
872 
873 	if (lock1->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
874 		return lock_prdt_has_to_wait(lock1->trx, lock1->type_mode,
875 					     lock_get_prdt_from_lock(lock1),
876 					     lock2);
877 	}
878 
879 	return lock_rec_has_to_wait(
880 		false, lock1->trx, lock1->type_mode, lock2,
881 		lock_rec_get_nth_bit(lock1, PAGE_HEAP_NO_SUPREMUM));
882 }
883 
884 /*============== RECORD LOCK BASIC FUNCTIONS ============================*/
885 
886 /**********************************************************************//**
887 Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
888 if none found.
889 @return bit index == heap number of the record, or ULINT_UNDEFINED if
890 none found */
891 ulint
lock_rec_find_set_bit(const lock_t * lock)892 lock_rec_find_set_bit(
893 /*==================*/
894 	const lock_t*	lock)	/*!< in: record lock with at least one bit set */
895 {
896 	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
897 
898 		if (lock_rec_get_nth_bit(lock, i)) {
899 
900 			return(i);
901 		}
902 	}
903 
904 	return(ULINT_UNDEFINED);
905 }
906 
907 /*********************************************************************//**
908 Determines if there are explicit record locks on a page.
909 @return an explicit record lock on the page, or NULL if there are none */
910 lock_t*
lock_rec_expl_exist_on_page(ulint space,ulint page_no)911 lock_rec_expl_exist_on_page(
912 /*========================*/
913 	ulint	space,	/*!< in: space id */
914 	ulint	page_no)/*!< in: page number */
915 {
916 	lock_t*	lock;
917 
918 	lock_mutex_enter();
919 	/* Only used in ibuf pages, so rec_hash is good enough */
920 	lock = lock_rec_get_first_on_page_addr(lock_sys.rec_hash,
921 					       space, page_no);
922 	lock_mutex_exit();
923 
924 	return(lock);
925 }
926 
927 /*********************************************************************//**
928 Resets the record lock bitmap to zero. NOTE: does not touch the wait_lock
929 pointer in the transaction! This function is used in lock object creation
930 and resetting. */
931 static
932 void
lock_rec_bitmap_reset(lock_t * lock)933 lock_rec_bitmap_reset(
934 /*==================*/
935 	lock_t*	lock)	/*!< in: record lock */
936 {
937 	ulint	n_bytes;
938 
939 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
940 
941 	/* Reset to zero the bitmap which resides immediately after the lock
942 	struct */
943 
944 	n_bytes = lock_rec_get_n_bits(lock) / 8;
945 
946 	ut_ad((lock_rec_get_n_bits(lock) % 8) == 0);
947 
948 	memset(&lock[1], 0, n_bytes);
949 }
950 
951 /*********************************************************************//**
952 Copies a record lock to heap.
953 @return copy of lock */
954 static
955 lock_t*
lock_rec_copy(const lock_t * lock,mem_heap_t * heap)956 lock_rec_copy(
957 /*==========*/
958 	const lock_t*	lock,	/*!< in: record lock */
959 	mem_heap_t*	heap)	/*!< in: memory heap */
960 {
961 	ulint	size;
962 
963 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
964 
965 	size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;
966 
967 	return(static_cast<lock_t*>(mem_heap_dup(heap, lock, size)));
968 }
969 
970 /*********************************************************************//**
971 Gets the previous record lock set on a record.
972 @return previous lock on the same record, NULL if none exists */
973 const lock_t*
lock_rec_get_prev(const lock_t * in_lock,ulint heap_no)974 lock_rec_get_prev(
975 /*==============*/
976 	const lock_t*	in_lock,/*!< in: record lock */
977 	ulint		heap_no)/*!< in: heap number of the record */
978 {
979 	lock_t*		lock;
980 	ulint		space;
981 	ulint		page_no;
982 	lock_t*		found_lock	= NULL;
983 	hash_table_t*	hash;
984 
985 	ut_ad(lock_mutex_own());
986 	ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
987 
988 	space = in_lock->un_member.rec_lock.space;
989 	page_no = in_lock->un_member.rec_lock.page_no;
990 
991 	hash = lock_hash_get(in_lock->type_mode);
992 
993 	for (lock = lock_rec_get_first_on_page_addr(hash, space, page_no);
994 	     /* No op */;
995 	     lock = lock_rec_get_next_on_page(lock)) {
996 
997 		ut_ad(lock);
998 
999 		if (lock == in_lock) {
1000 
1001 			return(found_lock);
1002 		}
1003 
1004 		if (lock_rec_get_nth_bit(lock, heap_no)) {
1005 
1006 			found_lock = lock;
1007 		}
1008 	}
1009 }
1010 
1011 /*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/
1012 
1013 /*********************************************************************//**
1014 Checks if a transaction has a GRANTED explicit lock on rec stronger or equal
1015 to precise_mode.
1016 @return lock or NULL */
1017 UNIV_INLINE
1018 lock_t*
lock_rec_has_expl(ulint precise_mode,const buf_block_t * block,ulint heap_no,const trx_t * trx)1019 lock_rec_has_expl(
1020 /*==============*/
1021 	ulint			precise_mode,/*!< in: LOCK_S or LOCK_X
1022 					possibly ORed to LOCK_GAP or
1023 					LOCK_REC_NOT_GAP, for a
1024 					supremum record we regard this
1025 					always a gap type request */
1026 	const buf_block_t*	block,	/*!< in: buffer block containing
1027 					the record */
1028 	ulint			heap_no,/*!< in: heap number of the record */
1029 	const trx_t*		trx)	/*!< in: transaction */
1030 {
1031 	lock_t*	lock;
1032 
1033 	ut_ad(lock_mutex_own());
1034 	ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
1035 	      || (precise_mode & LOCK_MODE_MASK) == LOCK_X);
1036 	ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
1037 
1038 	for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
1039 	     lock != NULL;
1040 	     lock = lock_rec_get_next(heap_no, lock)) {
1041 
1042 		if (lock->trx == trx
1043 		    && !lock_rec_get_insert_intention(lock)
1044 		    && lock_mode_stronger_or_eq(
1045 			    lock_get_mode(lock),
1046 			    static_cast<lock_mode>(
1047 				    precise_mode & LOCK_MODE_MASK))
1048 		    && !lock_get_wait(lock)
1049 		    && (!lock_rec_get_rec_not_gap(lock)
1050 			|| (precise_mode & LOCK_REC_NOT_GAP)
1051 			|| heap_no == PAGE_HEAP_NO_SUPREMUM)
1052 		    && (!lock_rec_get_gap(lock)
1053 			|| (precise_mode & LOCK_GAP)
1054 			|| heap_no == PAGE_HEAP_NO_SUPREMUM)) {
1055 
1056 			return(lock);
1057 		}
1058 	}
1059 
1060 	return(NULL);
1061 }
1062 
1063 #ifdef UNIV_DEBUG
1064 /*********************************************************************//**
1065 Checks if some other transaction has a lock request in the queue.
1066 @return lock or NULL */
1067 static
1068 lock_t*
lock_rec_other_has_expl_req(lock_mode mode,const buf_block_t * block,bool wait,ulint heap_no,const trx_t * trx)1069 lock_rec_other_has_expl_req(
1070 /*========================*/
1071 	lock_mode		mode,	/*!< in: LOCK_S or LOCK_X */
1072 	const buf_block_t*	block,	/*!< in: buffer block containing
1073 					the record */
1074 	bool			wait,	/*!< in: whether also waiting locks
1075 					are taken into account */
1076 	ulint			heap_no,/*!< in: heap number of the record */
1077 	const trx_t*		trx)	/*!< in: transaction, or NULL if
1078 					requests by all transactions
1079 					are taken into account */
1080 {
1081 
1082 	ut_ad(lock_mutex_own());
1083 	ut_ad(mode == LOCK_X || mode == LOCK_S);
1084 
1085 	/* Only GAP lock can be on SUPREMUM, and we are not looking for
1086 	GAP lock */
1087 	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
1088 		return(NULL);
1089 	}
1090 
1091 	for (lock_t* lock = lock_rec_get_first(lock_sys.rec_hash,
1092 						     block, heap_no);
1093 	     lock != NULL;
1094 	     lock = lock_rec_get_next(heap_no, lock)) {
1095 
1096 		if (lock->trx != trx
1097 		    && !lock_rec_get_gap(lock)
1098 		    && (wait || !lock_get_wait(lock))
1099 		    && lock_mode_stronger_or_eq(lock_get_mode(lock), mode)) {
1100 
1101 			return(lock);
1102 		}
1103 	}
1104 
1105 	return(NULL);
1106 }
1107 #endif /* UNIV_DEBUG */
1108 
1109 #ifdef WITH_WSREP
wsrep_kill_victim(const trx_t * const trx,const lock_t * lock)1110 static void wsrep_kill_victim(const trx_t * const trx, const lock_t *lock)
1111 {
1112 	ut_ad(lock_mutex_own());
1113 	ut_ad(trx->is_wsrep());
1114 	trx_t* lock_trx = lock->trx;
1115 	ut_ad(trx_mutex_own(lock_trx));
1116 	ut_ad(lock_trx != trx);
1117 
1118 	if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
1119 		return;
1120 
1121 	if (lock_trx->state == TRX_STATE_COMMITTED_IN_MEMORY
1122 	    || lock_trx->lock.was_chosen_as_deadlock_victim)
1123               return;
1124 
1125 	my_bool bf_other = wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE);
1126 
1127 	if (!bf_other
1128 	    || wsrep_trx_order_before(trx->mysql_thd,
1129 				      lock_trx->mysql_thd)) {
1130 
1131 		if (lock_trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
1132 			if (UNIV_UNLIKELY(wsrep_debug))
1133 				WSREP_INFO("BF victim waiting");
1134 			/* cannot release lock, until our lock
1135 			is in the queue*/
1136 		} else {
1137 			wsrep_innobase_kill_one_trx(trx->mysql_thd, trx,
1138 						    lock_trx, true);
1139 		}
1140 	}
1141 }
1142 #endif /* WITH_WSREP */
1143 
1144 /*********************************************************************//**
1145 Checks if some other transaction has a conflicting explicit lock request
1146 in the queue, so that we have to wait.
1147 @return lock or NULL */
1148 static
1149 lock_t*
lock_rec_other_has_conflicting(ulint mode,const buf_block_t * block,ulint heap_no,const trx_t * trx)1150 lock_rec_other_has_conflicting(
1151 /*===========================*/
1152 	ulint			mode,	/*!< in: LOCK_S or LOCK_X,
1153 					possibly ORed to LOCK_GAP or
1154 					LOC_REC_NOT_GAP,
1155 					LOCK_INSERT_INTENTION */
1156 	const buf_block_t*	block,	/*!< in: buffer block containing
1157 					the record */
1158 	ulint			heap_no,/*!< in: heap number of the record */
1159 	const trx_t*		trx)	/*!< in: our transaction */
1160 {
1161 	lock_t*		lock;
1162 
1163 	ut_ad(lock_mutex_own());
1164 
1165 	bool	is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
1166 
1167 	for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
1168 	     lock != NULL;
1169 	     lock = lock_rec_get_next(heap_no, lock)) {
1170 
1171 		if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
1172 #ifdef WITH_WSREP
1173 			if (trx->is_wsrep()) {
1174 				trx_mutex_enter(lock->trx);
1175 				/* Below function will roll back either trx
1176 				or lock->trx depending on priority of the
1177 				transaction. */
1178 				wsrep_kill_victim(const_cast<trx_t*>(trx), lock);
1179 				trx_mutex_exit(lock->trx);
1180 			}
1181 #endif /* WITH_WSREP */
1182 			return(lock);
1183 		}
1184 	}
1185 
1186 	return(NULL);
1187 }
1188 
1189 /*********************************************************************//**
1190 Checks if some transaction has an implicit x-lock on a record in a secondary
1191 index.
1192 @return transaction id of the transaction which has the x-lock, or 0;
1193 NOTE that this function can return false positives but never false
1194 negatives. The caller must confirm all positive results by calling
1195 trx_is_active(). */
1196 static
1197 trx_t*
lock_sec_rec_some_has_impl(trx_t * caller_trx,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)1198 lock_sec_rec_some_has_impl(
1199 /*=======================*/
1200 	trx_t*		caller_trx,/*!<in/out: trx of current thread */
1201 	const rec_t*	rec,	/*!< in: user record */
1202 	dict_index_t*	index,	/*!< in: secondary index */
1203 	const rec_offs*	offsets)/*!< in: rec_get_offsets(rec, index) */
1204 {
1205 	trx_t*		trx;
1206 	trx_id_t	max_trx_id;
1207 	const page_t*	page = page_align(rec);
1208 
1209 	ut_ad(!lock_mutex_own());
1210 	ut_ad(!dict_index_is_clust(index));
1211 	ut_ad(page_rec_is_user_rec(rec));
1212 	ut_ad(rec_offs_validate(rec, index, offsets));
1213 	ut_ad(!rec_is_metadata(rec, index));
1214 
1215 	max_trx_id = page_get_max_trx_id(page);
1216 
1217 	/* Some transaction may have an implicit x-lock on the record only
1218 	if the max trx id for the page >= min trx id for the trx list, or
1219 	database recovery is running. */
1220 
1221 	if (max_trx_id < trx_sys.get_min_trx_id()) {
1222 
1223 		trx = 0;
1224 
1225 	} else if (!lock_check_trx_id_sanity(max_trx_id, rec, index, offsets)) {
1226 
1227 		/* The page is corrupt: try to avoid a crash by returning 0 */
1228 		trx = 0;
1229 
1230 	/* In this case it is possible that some transaction has an implicit
1231 	x-lock. We have to look in the clustered index. */
1232 
1233 	} else {
1234 		trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
1235 	}
1236 
1237 	return(trx);
1238 }
1239 
1240 /*********************************************************************//**
1241 Return approximate number or record locks (bits set in the bitmap) for
1242 this transaction. Since delete-marked records may be removed, the
1243 record count will not be precise.
1244 The caller must be holding lock_sys.mutex. */
1245 ulint
lock_number_of_rows_locked(const trx_lock_t * trx_lock)1246 lock_number_of_rows_locked(
1247 /*=======================*/
1248 	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1249 {
1250 	ut_ad(lock_mutex_own());
1251 
1252 	return(trx_lock->n_rec_locks);
1253 }
1254 
1255 /*********************************************************************//**
1256 Return the number of table locks for a transaction.
1257 The caller must be holding lock_sys.mutex. */
1258 ulint
lock_number_of_tables_locked(const trx_lock_t * trx_lock)1259 lock_number_of_tables_locked(
1260 /*=========================*/
1261 	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1262 {
1263 	const lock_t*	lock;
1264 	ulint		n_tables = 0;
1265 
1266 	ut_ad(lock_mutex_own());
1267 
1268 	for (lock = UT_LIST_GET_FIRST(trx_lock->trx_locks);
1269 	     lock != NULL;
1270 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
1271 
1272 		if (lock_get_type_low(lock) == LOCK_TABLE) {
1273 			n_tables++;
1274 		}
1275 	}
1276 
1277 	return(n_tables);
1278 }
1279 
1280 /*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/
1281 
1282 #ifdef WITH_WSREP
1283 ATTRIBUTE_COLD
1284 static
1285 void
wsrep_print_wait_locks(lock_t * c_lock)1286 wsrep_print_wait_locks(
1287 /*===================*/
1288 	lock_t*		c_lock) /* conflicting lock to print */
1289 {
1290 	if (c_lock->trx->lock.wait_lock != c_lock) {
1291 		mtr_t mtr;
1292 		ib::info() << "WSREP: c_lock != wait lock";
1293 		ib::info() << " SQL: "
1294 			   << wsrep_thd_query(c_lock->trx->mysql_thd);
1295 
1296 		if (lock_get_type_low(c_lock) & LOCK_TABLE) {
1297 			lock_table_print(stderr, c_lock);
1298 		} else {
1299 			lock_rec_print(stderr, c_lock, mtr);
1300 		}
1301 
1302 		if (lock_get_type_low(c_lock->trx->lock.wait_lock) & LOCK_TABLE) {
1303 			lock_table_print(stderr, c_lock->trx->lock.wait_lock);
1304 		} else {
1305 			lock_rec_print(stderr, c_lock->trx->lock.wait_lock,
1306 				       mtr);
1307 		}
1308 	}
1309 }
1310 #endif /* WITH_WSREP */
1311 
1312 #ifdef UNIV_DEBUG
1313 /** Check transaction state */
check_trx_state(const trx_t * trx)1314 static void check_trx_state(const trx_t *trx)
1315 {
1316   ut_ad(!trx->auto_commit || trx->will_lock);
1317   const trx_state_t state= trx->state;
1318   ut_ad(state == TRX_STATE_ACTIVE ||
1319         state == TRX_STATE_PREPARED_RECOVERED ||
1320         state == TRX_STATE_PREPARED ||
1321         state == TRX_STATE_COMMITTED_IN_MEMORY);
1322 }
1323 #endif
1324 
1325 /** Create a new record lock and inserts it to the lock queue,
1326 without checking for deadlocks or conflicts.
1327 @param[in]	type_mode	lock mode and wait flag; type will be replaced
1328 				with LOCK_REC
1329 @param[in]	space		tablespace id
1330 @param[in]	page_no		index page number
1331 @param[in]	page		R-tree index page, or NULL
1332 @param[in]	heap_no		record heap number in the index page
1333 @param[in]	index		the index tree
1334 @param[in,out]	trx		transaction
1335 @param[in]	holds_trx_mutex	whether the caller holds trx->mutex
1336 @return created lock */
1337 lock_t*
lock_rec_create_low(lock_t * c_lock,que_thr_t * thr,ulint type_mode,ulint space,ulint page_no,const page_t * page,ulint heap_no,dict_index_t * index,trx_t * trx,bool holds_trx_mutex)1338 lock_rec_create_low(
1339 #ifdef WITH_WSREP
1340 	lock_t*		c_lock,	/*!< conflicting lock */
1341 	que_thr_t*	thr,	/*!< thread owning trx */
1342 #endif
1343 	ulint		type_mode,
1344 	ulint		space,
1345 	ulint		page_no,
1346 	const page_t*	page,
1347 	ulint		heap_no,
1348 	dict_index_t*	index,
1349 	trx_t*		trx,
1350 	bool		holds_trx_mutex)
1351 {
1352 	lock_t*		lock;
1353 	ulint		n_bits;
1354 	ulint		n_bytes;
1355 
1356 	ut_ad(lock_mutex_own());
1357 	ut_ad(holds_trx_mutex == trx_mutex_own(trx));
1358 	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1359 
1360 #ifdef UNIV_DEBUG
1361 	/* Non-locking autocommit read-only transactions should not set
1362 	any locks. See comment in trx_set_rw_mode explaining why this
1363 	conditional check is required in debug code. */
1364 	if (holds_trx_mutex) {
1365 		check_trx_state(trx);
1366 	}
1367 #endif /* UNIV_DEBUG */
1368 
1369 	/* If rec is the supremum record, then we reset the gap and
1370 	LOCK_REC_NOT_GAP bits, as all locks on the supremum are
1371 	automatically of the gap type */
1372 
1373 	if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
1374 		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
1375 		type_mode = type_mode & ~(LOCK_GAP | LOCK_REC_NOT_GAP);
1376 	}
1377 
1378 	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
1379 		/* Make lock bitmap bigger by a safety margin */
1380 		n_bits = page_dir_get_n_heap(page) + LOCK_PAGE_BITMAP_MARGIN;
1381 		n_bytes = 1 + n_bits / 8;
1382 	} else {
1383 		ut_ad(heap_no == PRDT_HEAPNO);
1384 
1385 		/* The lock is always on PAGE_HEAP_NO_INFIMUM (0), so
1386 		we only need 1 bit (which round up to 1 byte) for
1387 		lock bit setting */
1388 		n_bytes = 1;
1389 
1390 		if (type_mode & LOCK_PREDICATE) {
1391 			ulint	tmp = UNIV_WORD_SIZE - 1;
1392 
1393 			/* We will attach predicate structure after lock.
1394 			Make sure the memory is aligned on 8 bytes,
1395 			the mem_heap_alloc will align it with
1396 			MEM_SPACE_NEEDED anyway. */
1397 			n_bytes = (n_bytes + sizeof(lock_prdt_t) + tmp) & ~tmp;
1398 			ut_ad(n_bytes == sizeof(lock_prdt_t) + UNIV_WORD_SIZE);
1399 		}
1400 	}
1401 
1402 	if (trx->lock.rec_cached >= UT_ARR_SIZE(trx->lock.rec_pool)
1403 	    || sizeof *lock + n_bytes > sizeof *trx->lock.rec_pool) {
1404 		lock = static_cast<lock_t*>(
1405 			mem_heap_alloc(trx->lock.lock_heap,
1406 				       sizeof *lock + n_bytes));
1407 	} else {
1408 		lock = &trx->lock.rec_pool[trx->lock.rec_cached++].lock;
1409 	}
1410 
1411 	lock->trx = trx;
1412 	lock->type_mode = (type_mode & ~LOCK_TYPE_MASK) | LOCK_REC;
1413 	lock->index = index;
1414 	lock->un_member.rec_lock.space = uint32_t(space);
1415 	lock->un_member.rec_lock.page_no = uint32_t(page_no);
1416 
1417 	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
1418 		lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
1419 	} else {
1420 		/* Predicate lock always on INFIMUM (0) */
1421 		lock->un_member.rec_lock.n_bits = 8;
1422  	}
1423 	lock_rec_bitmap_reset(lock);
1424 	lock_rec_set_nth_bit(lock, heap_no);
1425 	index->table->n_rec_locks++;
1426 	ut_ad(index->table->get_ref_count() > 0 || !index->table->can_be_evicted);
1427 
1428 #ifdef WITH_WSREP
1429 	if (c_lock && trx->is_wsrep()
1430 	    && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
1431 		lock_t *hash	= (lock_t *)c_lock->hash;
1432 		lock_t *prev	= NULL;
1433 
1434 		while (hash && wsrep_thd_is_BF(hash->trx->mysql_thd, FALSE)
1435 		       && wsrep_trx_order_before(hash->trx->mysql_thd,
1436 						 trx->mysql_thd)) {
1437 			prev = hash;
1438 			hash = (lock_t *)hash->hash;
1439 		}
1440 		lock->hash = hash;
1441 		if (prev) {
1442 			prev->hash = lock;
1443 		} else {
1444 			c_lock->hash = lock;
1445 		}
1446 		/*
1447 		 * delayed conflict resolution '...kill_one_trx' was not called,
1448 		 * if victim was waiting for some other lock
1449 		 */
1450 		trx_mutex_enter(c_lock->trx);
1451 		if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
1452 
1453 			c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
1454 
1455 			if (UNIV_UNLIKELY(wsrep_debug)) {
1456 				wsrep_print_wait_locks(c_lock);
1457 			}
1458 
1459 			trx->lock.que_state = TRX_QUE_LOCK_WAIT;
1460 			lock_set_lock_and_trx_wait(lock, trx);
1461 			UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
1462 
1463 			trx->lock.wait_thr = thr;
1464 			thr->state = QUE_THR_LOCK_WAIT;
1465 
1466 			/* have to release trx mutex for the duration of
1467 			   victim lock release. This will eventually call
1468 			   lock_grant, which wants to grant trx mutex again
1469 			*/
1470 			if (holds_trx_mutex) {
1471 				trx_mutex_exit(trx);
1472 			}
1473 			lock_cancel_waiting_and_release(
1474 				c_lock->trx->lock.wait_lock);
1475 
1476 			if (holds_trx_mutex) {
1477 				trx_mutex_enter(trx);
1478 			}
1479 
1480 			trx_mutex_exit(c_lock->trx);
1481 
1482 			/* have to bail out here to avoid lock_set_lock... */
1483 			return(lock);
1484 		}
1485 		trx_mutex_exit(c_lock->trx);
1486 	} else
1487 #endif /* WITH_WSREP */
1488 	if (!(type_mode & (LOCK_WAIT | LOCK_PREDICATE | LOCK_PRDT_PAGE))
1489 	    && innodb_lock_schedule_algorithm
1490 	    == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
1491 	    && !thd_is_replication_slave_thread(trx->mysql_thd)) {
1492 		HASH_PREPEND(lock_t, hash, lock_sys.rec_hash,
1493 			     lock_rec_fold(space, page_no), lock);
1494 	} else {
1495 		HASH_INSERT(lock_t, hash, lock_hash_get(type_mode),
1496 			    lock_rec_fold(space, page_no), lock);
1497 	}
1498 
1499 	if (!holds_trx_mutex) {
1500 		trx_mutex_enter(trx);
1501 	}
1502 	ut_ad(trx_mutex_own(trx));
1503 	if (type_mode & LOCK_WAIT) {
1504 		lock_set_lock_and_trx_wait(lock, trx);
1505 	}
1506 	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
1507 	if (!holds_trx_mutex) {
1508 		trx_mutex_exit(trx);
1509 	}
1510 	MONITOR_INC(MONITOR_RECLOCK_CREATED);
1511 	MONITOR_INC(MONITOR_NUM_RECLOCK);
1512 
1513 	return lock;
1514 }
1515 
1516 /*********************************************************************//**
1517 Check if lock1 has higher priority than lock2.
1518 NULL has lowest priority.
1519 If neither of them is wait lock, the first one has higher priority.
1520 If only one of them is a wait lock, it has lower priority.
1521 If either is a high priority transaction, the lock has higher priority.
1522 Otherwise, the one with an older transaction has higher priority.
1523 @returns true if lock1 has higher priority, false otherwise. */
has_higher_priority(lock_t * lock1,lock_t * lock2)1524 static bool has_higher_priority(lock_t *lock1, lock_t *lock2)
1525 {
1526 	if (lock1 == NULL) {
1527 		return false;
1528 	} else if (lock2 == NULL) {
1529 		return true;
1530 	}
1531 	// Granted locks has higher priority.
1532 	if (!lock_get_wait(lock1)) {
1533 		return true;
1534 	} else if (!lock_get_wait(lock2)) {
1535 		return false;
1536 	}
1537 	return lock1->trx->start_time_micro <= lock2->trx->start_time_micro;
1538 }
1539 
1540 /*********************************************************************//**
1541 Insert a lock to the hash list according to the mode (whether it is a wait
1542 lock) and the age of the transaction the it is associated with.
1543 If the lock is not a wait lock, insert it to the head of the hash list.
1544 Otherwise, insert it to the middle of the wait locks according to the age of
1545 the transaciton. */
1546 static
1547 dberr_t
lock_rec_insert_by_trx_age(lock_t * in_lock)1548 lock_rec_insert_by_trx_age(
1549 	lock_t	*in_lock) /*!< in: lock to be insert */{
1550 	ulint				space;
1551 	ulint				page_no;
1552 	ulint				rec_fold;
1553 	lock_t*				node;
1554 	lock_t*				next;
1555 	hash_table_t*		hash;
1556 	hash_cell_t*		cell;
1557 
1558 	ut_ad(!in_lock->trx->is_wsrep());
1559 	space = in_lock->un_member.rec_lock.space;
1560 	page_no = in_lock->un_member.rec_lock.page_no;
1561 	rec_fold = lock_rec_fold(space, page_no);
1562 	hash = lock_hash_get(in_lock->type_mode);
1563 	cell = hash_get_nth_cell(hash,
1564 				 hash_calc_hash(rec_fold, hash));
1565 
1566 	node = (lock_t *) cell->node;
1567 	// If in_lock is not a wait lock, we insert it to the head of the list.
1568 	if (node == NULL || !lock_get_wait(in_lock) || has_higher_priority(in_lock, node)) {
1569 		cell->node = in_lock;
1570 		in_lock->hash = node;
1571 		if (lock_get_wait(in_lock)) {
1572 			lock_grant_have_trx_mutex(in_lock);
1573 			return DB_SUCCESS_LOCKED_REC;
1574 		}
1575 		return DB_SUCCESS;
1576 	}
1577 	while (node != NULL && has_higher_priority((lock_t *) node->hash,
1578 						   in_lock)) {
1579 		node = (lock_t *) node->hash;
1580 	}
1581 	next = (lock_t *) node->hash;
1582 	node->hash = in_lock;
1583 	in_lock->hash = next;
1584 
1585 	if (lock_get_wait(in_lock) && !lock_rec_has_to_wait_in_queue(in_lock)) {
1586 		lock_grant_have_trx_mutex(in_lock);
1587 		if (cell->node != in_lock) {
1588 			// Move it to the front of the queue
1589 			node->hash = in_lock->hash;
1590 			next = (lock_t *) cell->node;
1591 			cell->node = in_lock;
1592 			in_lock->hash = next;
1593 		}
1594 		return DB_SUCCESS_LOCKED_REC;
1595 	}
1596 
1597 	return DB_SUCCESS;
1598 }
1599 
1600 #ifdef UNIV_DEBUG
1601 static
1602 bool
lock_queue_validate(const lock_t * in_lock)1603 lock_queue_validate(
1604 	const lock_t	*in_lock) /*!< in: lock whose hash list is to be validated */
1605 {
1606 	ulint				space;
1607 	ulint				page_no;
1608 	ulint				rec_fold;
1609 	hash_table_t*		hash;
1610 	hash_cell_t*		cell;
1611 	lock_t*				next;
1612 	bool				wait_lock __attribute__((unused))= false;
1613 
1614 	if (in_lock == NULL) {
1615 		return true;
1616 	}
1617 
1618 	space = in_lock->un_member.rec_lock.space;
1619 	page_no = in_lock->un_member.rec_lock.page_no;
1620 	rec_fold = lock_rec_fold(space, page_no);
1621 	hash = lock_hash_get(in_lock->type_mode);
1622 	cell = hash_get_nth_cell(hash,
1623 			hash_calc_hash(rec_fold, hash));
1624 	next = (lock_t *) cell->node;
1625 	while (next != NULL) {
1626 		// If this is a granted lock, check that there's no wait lock before it.
1627 		if (!lock_get_wait(next)) {
1628 			ut_ad(!wait_lock);
1629 		} else {
1630 			wait_lock = true;
1631 		}
1632 		next = next->hash;
1633 	}
1634 	return true;
1635 }
1636 #endif /* UNIV_DEBUG */
1637 
1638 static
1639 void
lock_rec_insert_to_head(lock_t * in_lock,ulint rec_fold)1640 lock_rec_insert_to_head(
1641 	lock_t *in_lock,   /*!< in: lock to be insert */
1642 	ulint	rec_fold)  /*!< in: rec_fold of the page */
1643 {
1644 	hash_table_t*		hash;
1645 	hash_cell_t*		cell;
1646 	lock_t*				node;
1647 
1648 	if (in_lock == NULL) {
1649 		return;
1650 	}
1651 
1652 	hash = lock_hash_get(in_lock->type_mode);
1653 	cell = hash_get_nth_cell(hash,
1654 			hash_calc_hash(rec_fold, hash));
1655 	node = (lock_t *) cell->node;
1656 	if (node != in_lock) {
1657 		cell->node = in_lock;
1658 		in_lock->hash = node;
1659 	}
1660 }
1661 
1662 /** Enqueue a waiting request for a lock which cannot be granted immediately.
1663 Check for deadlocks.
1664 @param[in]	type_mode	the requested lock mode (LOCK_S or LOCK_X)
1665 				possibly ORed with LOCK_GAP or
1666 				LOCK_REC_NOT_GAP, ORed with
1667 				LOCK_INSERT_INTENTION if this
1668 				waiting lock request is set
1669 				when performing an insert of
1670 				an index record
1671 @param[in]	block		leaf page in the index
1672 @param[in]	heap_no		record heap number in the block
1673 @param[in]	index		index tree
1674 @param[in,out]	thr		query thread
1675 @param[in]	prdt		minimum bounding box (spatial index)
1676 @retval	DB_LOCK_WAIT		if the waiting lock was enqueued
1677 @retval	DB_DEADLOCK		if this transaction was chosen as the victim
1678 @retval	DB_SUCCESS_LOCKED_REC	if the other transaction was chosen as a victim
1679 				(or it happened to commit) */
1680 dberr_t
lock_rec_enqueue_waiting(lock_t * c_lock,ulint type_mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,que_thr_t * thr,lock_prdt_t * prdt)1681 lock_rec_enqueue_waiting(
1682 #ifdef WITH_WSREP
1683 	lock_t*			c_lock,	/*!< conflicting lock */
1684 #endif
1685 	ulint			type_mode,
1686 	const buf_block_t*	block,
1687 	ulint			heap_no,
1688 	dict_index_t*		index,
1689 	que_thr_t*		thr,
1690 	lock_prdt_t*		prdt)
1691 {
1692 	ut_ad(lock_mutex_own());
1693 	ut_ad(!srv_read_only_mode);
1694 	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1695 
1696 	trx_t* trx = thr_get_trx(thr);
1697 
1698 	ut_ad(trx_mutex_own(trx));
1699 	ut_a(!que_thr_stop(thr));
1700 
1701 	switch (trx_get_dict_operation(trx)) {
1702 	case TRX_DICT_OP_NONE:
1703 		break;
1704 	case TRX_DICT_OP_TABLE:
1705 	case TRX_DICT_OP_INDEX:
1706 		ib::error() << "A record lock wait happens in a dictionary"
1707 			" operation. index "
1708 			<< index->name
1709 			<< " of table "
1710 			<< index->table->name
1711 			<< ". " << BUG_REPORT_MSG;
1712 		ut_ad(0);
1713 	}
1714 
1715 	if (trx->mysql_thd && thd_lock_wait_timeout(trx->mysql_thd) == 0) {
1716 		trx->error_state = DB_LOCK_WAIT_TIMEOUT;
1717 		return DB_LOCK_WAIT_TIMEOUT;
1718 	}
1719 
1720 	/* Enqueue the lock request that will wait to be granted, note that
1721 	we already own the trx mutex. */
1722 	lock_t* lock = lock_rec_create(
1723 #ifdef WITH_WSREP
1724 		c_lock, thr,
1725 #endif
1726 		type_mode | LOCK_WAIT, block, heap_no, index, trx, TRUE);
1727 
1728 	if (prdt && type_mode & LOCK_PREDICATE) {
1729 		lock_prdt_set_prdt(lock, prdt);
1730 	}
1731 
1732 	if (ut_d(const trx_t* victim =)
1733 	    DeadlockChecker::check_and_resolve(lock, trx)) {
1734 		ut_ad(victim == trx);
1735 		lock_reset_lock_and_trx_wait(lock);
1736 		lock_rec_reset_nth_bit(lock, heap_no);
1737 		return DB_DEADLOCK;
1738 	}
1739 
1740 	if (!trx->lock.wait_lock) {
1741 		/* If there was a deadlock but we chose another
1742 		transaction as a victim, it is possible that we
1743 		already have the lock now granted! */
1744 #ifdef WITH_WSREP
1745 		if (UNIV_UNLIKELY(wsrep_debug)) {
1746 			ib::info() << "WSREP: BF thread got lock granted early, ID " << ib::hex(trx->id)
1747 				   << " query: " << wsrep_thd_query(trx->mysql_thd);
1748 		}
1749 #endif
1750 		return DB_SUCCESS_LOCKED_REC;
1751 	}
1752 
1753 	trx->lock.que_state = TRX_QUE_LOCK_WAIT;
1754 
1755 	trx->lock.was_chosen_as_deadlock_victim = false;
1756 	trx->lock.wait_started = time(NULL);
1757 
1758 	ut_a(que_thr_stop(thr));
1759 
1760 	DBUG_LOG("ib_lock", "trx " << ib::hex(trx->id)
1761 		 << " waits for lock in index " << index->name
1762 		 << " of table " << index->table->name);
1763 
1764 	MONITOR_INC(MONITOR_LOCKREC_WAIT);
1765 
1766 	if (innodb_lock_schedule_algorithm
1767 	    == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
1768 	    && !prdt
1769 	    && !thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
1770 		HASH_DELETE(lock_t, hash, lock_sys.rec_hash,
1771 			    lock_rec_lock_fold(lock), lock);
1772 		dberr_t res = lock_rec_insert_by_trx_age(lock);
1773 		if (res != DB_SUCCESS) {
1774 			return res;
1775 		}
1776 	}
1777 
1778 	return DB_LOCK_WAIT;
1779 }
1780 
1781 /*********************************************************************//**
1782 Adds a record lock request in the record queue. The request is normally
1783 added as the last in the queue, but if there are no waiting lock requests
1784 on the record, and the request to be added is not a waiting request, we
1785 can reuse a suitable record lock object already existing on the same page,
1786 just setting the appropriate bit in its bitmap. This is a low-level function
1787 which does NOT check for deadlocks or lock compatibility!
1788 @return lock where the bit was set */
1789 static
1790 void
lock_rec_add_to_queue(ulint type_mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,trx_t * trx,bool caller_owns_trx_mutex)1791 lock_rec_add_to_queue(
1792 /*==================*/
1793 	ulint			type_mode,/*!< in: lock mode, wait, gap
1794 					etc. flags; type is ignored
1795 					and replaced by LOCK_REC */
1796 	const buf_block_t*	block,	/*!< in: buffer block containing
1797 					the record */
1798 	ulint			heap_no,/*!< in: heap number of the record */
1799 	dict_index_t*		index,	/*!< in: index of record */
1800 	trx_t*			trx,	/*!< in/out: transaction */
1801 	bool			caller_owns_trx_mutex)
1802 					/*!< in: TRUE if caller owns the
1803 					transaction mutex */
1804 {
1805 #ifdef UNIV_DEBUG
1806 	ut_ad(lock_mutex_own());
1807 	ut_ad(caller_owns_trx_mutex == trx_mutex_own(trx));
1808 	ut_ad(dict_index_is_clust(index)
1809 	      || dict_index_get_online_status(index) != ONLINE_INDEX_CREATION);
1810 	switch (type_mode & LOCK_MODE_MASK) {
1811 	case LOCK_X:
1812 	case LOCK_S:
1813 		break;
1814 	default:
1815 		ut_error;
1816 	}
1817 
1818 	if (!(type_mode & (LOCK_WAIT | LOCK_GAP))) {
1819 		lock_mode	mode = (type_mode & LOCK_MODE_MASK) == LOCK_S
1820 			? LOCK_X
1821 			: LOCK_S;
1822 		const lock_t*	other_lock
1823 			= lock_rec_other_has_expl_req(
1824 				mode, block, false, heap_no, trx);
1825 #ifdef WITH_WSREP
1826 		if (UNIV_UNLIKELY(other_lock && trx->is_wsrep())) {
1827 			/* Only BF transaction may be granted lock
1828 			before other conflicting lock request. */
1829 			if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)
1830 			    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
1831 				/* If it is not BF, this case is a bug. */
1832 				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
1833 				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
1834 				ut_error;
1835 			}
1836 		} else
1837 #endif /* WITH_WSREP */
1838 		ut_ad(!other_lock);
1839 	}
1840 #endif /* UNIV_DEBUG */
1841 
1842 	type_mode |= LOCK_REC;
1843 
1844 	/* If rec is the supremum record, then we can reset the gap bit, as
1845 	all locks on the supremum are automatically of the gap type, and we
1846 	try to avoid unnecessary memory consumption of a new record lock
1847 	struct for a gap type lock */
1848 
1849 	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
1850 		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
1851 
1852 		/* There should never be LOCK_REC_NOT_GAP on a supremum
1853 		record, but let us play safe */
1854 
1855 		type_mode &= ~(LOCK_GAP | LOCK_REC_NOT_GAP);
1856 	}
1857 
1858 	lock_t*		lock;
1859 	lock_t*		first_lock;
1860 	hash_table_t*	hash = lock_hash_get(type_mode);
1861 
1862 	/* Look for a waiting lock request on the same record or on a gap */
1863 
1864 	for (first_lock = lock = lock_rec_get_first_on_page(hash, block);
1865 	     lock != NULL;
1866 	     lock = lock_rec_get_next_on_page(lock)) {
1867 
1868 		if (lock_get_wait(lock)
1869 		    && lock_rec_get_nth_bit(lock, heap_no)) {
1870 
1871 			break;
1872 		}
1873 	}
1874 
1875 	if (lock == NULL && !(type_mode & LOCK_WAIT)) {
1876 
1877 		/* Look for a similar record lock on the same page:
1878 		if one is found and there are no waiting lock requests,
1879 		we can just set the bit */
1880 
1881 		lock = lock_rec_find_similar_on_page(
1882 			type_mode, heap_no, first_lock, trx);
1883 
1884 		if (lock != NULL) {
1885 
1886 			lock_rec_set_nth_bit(lock, heap_no);
1887 
1888 			return;
1889 		}
1890 	}
1891 
1892 	lock_rec_create(
1893 #ifdef WITH_WSREP
1894 		NULL, NULL,
1895 #endif
1896 		type_mode, block, heap_no, index, trx, caller_owns_trx_mutex);
1897 }
1898 
1899 /*********************************************************************//**
1900 Tries to lock the specified record in the mode requested. If not immediately
1901 possible, enqueues a waiting lock request. This is a low-level function
1902 which does NOT look at implicit locks! Checks lock compatibility within
1903 explicit locks. This function sets a normal next-key lock, or in the case
1904 of a page supremum record, a gap type lock.
1905 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
1906 static
1907 dberr_t
lock_rec_lock(bool impl,ulint mode,const buf_block_t * block,ulint heap_no,dict_index_t * index,que_thr_t * thr)1908 lock_rec_lock(
1909 /*==========*/
1910 	bool			impl,	/*!< in: if true, no lock is set
1911 					if no wait is necessary: we
1912 					assume that the caller will
1913 					set an implicit lock */
1914 	ulint			mode,	/*!< in: lock mode: LOCK_X or
1915 					LOCK_S possibly ORed to either
1916 					LOCK_GAP or LOCK_REC_NOT_GAP */
1917 	const buf_block_t*	block,	/*!< in: buffer block containing
1918 					the record */
1919 	ulint			heap_no,/*!< in: heap number of record */
1920 	dict_index_t*		index,	/*!< in: index of record */
1921 	que_thr_t*		thr)	/*!< in: query thread */
1922 {
1923   trx_t *trx= thr_get_trx(thr);
1924   dberr_t err= DB_SUCCESS;
1925 
1926   ut_ad(!srv_read_only_mode);
1927   ut_ad((LOCK_MODE_MASK & mode) == LOCK_S ||
1928         (LOCK_MODE_MASK & mode) == LOCK_X);
1929   ut_ad((mode & LOCK_TYPE_MASK) == LOCK_GAP ||
1930         (mode & LOCK_TYPE_MASK) == LOCK_REC_NOT_GAP ||
1931         (mode & LOCK_TYPE_MASK) == 0);
1932   ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1933   DBUG_EXECUTE_IF("innodb_report_deadlock", return DB_DEADLOCK;);
1934 
1935   lock_mutex_enter();
1936   ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ||
1937         lock_table_has(trx, index->table, LOCK_IS));
1938   ut_ad((LOCK_MODE_MASK & mode) != LOCK_X ||
1939          lock_table_has(trx, index->table, LOCK_IX));
1940 
1941   if (lock_t *lock= lock_rec_get_first_on_page(lock_sys.rec_hash, block))
1942   {
1943     trx_mutex_enter(trx);
1944     if (lock_rec_get_next_on_page(lock) ||
1945         lock->trx != trx ||
1946         lock->type_mode != (ulint(mode) | LOCK_REC) ||
1947         lock_rec_get_n_bits(lock) <= heap_no)
1948     {
1949       /* Do nothing if the trx already has a strong enough lock on rec */
1950       if (!lock_rec_has_expl(mode, block, heap_no, trx))
1951       {
1952         if (
1953 #ifdef WITH_WSREP
1954 	    lock_t *c_lock=
1955 #endif
1956 	    lock_rec_other_has_conflicting(mode, block, heap_no, trx))
1957         {
1958           /*
1959             If another transaction has a non-gap conflicting
1960             request in the queue, as this transaction does not
1961             have a lock strong enough already granted on the
1962 	    record, we have to wait. */
1963 	    err = lock_rec_enqueue_waiting(
1964 #ifdef WITH_WSREP
1965 			c_lock,
1966 #endif /* WITH_WSREP */
1967 			mode, block, heap_no, index, thr, NULL);
1968         }
1969         else if (!impl)
1970         {
1971           /* Set the requested lock on the record. */
1972           lock_rec_add_to_queue(LOCK_REC | mode, block, heap_no, index, trx,
1973                                 true);
1974           err= DB_SUCCESS_LOCKED_REC;
1975         }
1976       }
1977     }
1978     else if (!impl)
1979     {
1980       /*
1981         If the nth bit of the record lock is already set then we do not set
1982         a new lock bit, otherwise we do set
1983       */
1984       if (!lock_rec_get_nth_bit(lock, heap_no))
1985       {
1986         lock_rec_set_nth_bit(lock, heap_no);
1987         err= DB_SUCCESS_LOCKED_REC;
1988       }
1989     }
1990     trx_mutex_exit(trx);
1991   }
1992   else
1993   {
1994     /*
1995       Simplified and faster path for the most common cases
1996       Note that we don't own the trx mutex.
1997     */
1998     if (!impl)
1999       lock_rec_create(
2000 #ifdef WITH_WSREP
2001          NULL, NULL,
2002 #endif
2003         mode, block, heap_no, index, trx, false);
2004 
2005     err= DB_SUCCESS_LOCKED_REC;
2006   }
2007   lock_mutex_exit();
2008   MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
2009   return err;
2010 }
2011 
2012 /*********************************************************************//**
2013 Checks if a waiting record lock request still has to wait in a queue.
2014 @return lock that is causing the wait */
2015 static
2016 const lock_t*
lock_rec_has_to_wait_in_queue(const lock_t * wait_lock)2017 lock_rec_has_to_wait_in_queue(
2018 /*==========================*/
2019 	const lock_t*	wait_lock)	/*!< in: waiting record lock */
2020 {
2021 	const lock_t*	lock;
2022 	ulint		space;
2023 	ulint		page_no;
2024 	ulint		heap_no;
2025 	ulint		bit_mask;
2026 	ulint		bit_offset;
2027 	hash_table_t*	hash;
2028 
2029 	ut_ad(wait_lock);
2030 	ut_ad(lock_mutex_own());
2031 	ut_ad(lock_get_wait(wait_lock));
2032 	ut_ad(lock_get_type_low(wait_lock) == LOCK_REC);
2033 
2034 	space = wait_lock->un_member.rec_lock.space;
2035 	page_no = wait_lock->un_member.rec_lock.page_no;
2036 	heap_no = lock_rec_find_set_bit(wait_lock);
2037 
2038 	bit_offset = heap_no / 8;
2039 	bit_mask = static_cast<ulint>(1) << (heap_no % 8);
2040 
2041 	hash = lock_hash_get(wait_lock->type_mode);
2042 
2043 	for (lock = lock_rec_get_first_on_page_addr(hash, space, page_no);
2044 	     lock != wait_lock;
2045 	     lock = lock_rec_get_next_on_page_const(lock)) {
2046 		const byte*	p = (const byte*) &lock[1];
2047 
2048 		if (heap_no < lock_rec_get_n_bits(lock)
2049 		    && (p[bit_offset] & bit_mask)
2050 		    && lock_has_to_wait(wait_lock, lock)) {
2051 			return(lock);
2052 		}
2053 	}
2054 
2055 	return(NULL);
2056 }
2057 
2058 /** Grant a lock to a waiting lock request and release the waiting transaction
2059 after lock_reset_lock_and_trx_wait() has been called. */
lock_grant_after_reset(lock_t * lock)2060 static void lock_grant_after_reset(lock_t* lock)
2061 {
2062 	ut_ad(lock_mutex_own());
2063 	ut_ad(trx_mutex_own(lock->trx));
2064 
2065 	if (lock_get_mode(lock) == LOCK_AUTO_INC) {
2066 		dict_table_t*	table = lock->un_member.tab_lock.table;
2067 
2068 		if (table->autoinc_trx == lock->trx) {
2069 			ib::error() << "Transaction already had an"
2070 				<< " AUTO-INC lock!";
2071 		} else {
2072 			table->autoinc_trx = lock->trx;
2073 
2074 			ib_vector_push(lock->trx->autoinc_locks, &lock);
2075 		}
2076 	}
2077 
2078 	DBUG_PRINT("ib_lock", ("wait for trx " TRX_ID_FMT " ends",
2079 			       trx_get_id_for_print(lock->trx)));
2080 
2081 	/* If we are resolving a deadlock by choosing another transaction
2082 	as a victim, then our original transaction may not be in the
2083 	TRX_QUE_LOCK_WAIT state, and there is no need to end the lock wait
2084 	for it */
2085 
2086 	if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
2087 		que_thr_t*	thr;
2088 
2089 		thr = que_thr_end_lock_wait(lock->trx);
2090 
2091 		if (thr != NULL) {
2092 			lock_wait_release_thread_if_suspended(thr);
2093 		}
2094 	}
2095 }
2096 
2097 /** Grant a lock to a waiting lock request and release the waiting transaction. */
lock_grant(lock_t * lock)2098 static void lock_grant(lock_t* lock)
2099 {
2100 	lock_reset_lock_and_trx_wait(lock);
2101 	trx_mutex_enter(lock->trx);
2102 	lock_grant_after_reset(lock);
2103 	trx_mutex_exit(lock->trx);
2104 }
2105 
2106 /*************************************************************//**
2107 Cancels a waiting record lock request and releases the waiting transaction
2108 that requested it. NOTE: does NOT check if waiting lock requests behind this
2109 one can now be granted! */
2110 static
2111 void
lock_rec_cancel(lock_t * lock)2112 lock_rec_cancel(
2113 /*============*/
2114 	lock_t*	lock)	/*!< in: waiting record lock request */
2115 {
2116 	que_thr_t*	thr;
2117 
2118 	ut_ad(lock_mutex_own());
2119 	ut_ad(lock_get_type_low(lock) == LOCK_REC);
2120 
2121 	/* Reset the bit (there can be only one set bit) in the lock bitmap */
2122 	lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
2123 
2124 	/* Reset the wait flag and the back pointer to lock in trx */
2125 
2126 	lock_reset_lock_and_trx_wait(lock);
2127 
2128 	/* The following function releases the trx from lock wait */
2129 
2130 	trx_mutex_enter(lock->trx);
2131 
2132 	thr = que_thr_end_lock_wait(lock->trx);
2133 
2134 	if (thr != NULL) {
2135 		lock_wait_release_thread_if_suspended(thr);
2136 	}
2137 
2138 	trx_mutex_exit(lock->trx);
2139 }
2140 
2141 static
2142 void
lock_grant_and_move_on_page(ulint rec_fold,ulint space,ulint page_no)2143 lock_grant_and_move_on_page(ulint rec_fold, ulint space, ulint page_no)
2144 {
2145 	lock_t*		lock;
2146 	lock_t*		previous = static_cast<lock_t*>(
2147 		hash_get_nth_cell(lock_sys.rec_hash,
2148 				  hash_calc_hash(rec_fold, lock_sys.rec_hash))
2149 		->node);
2150 	if (previous == NULL) {
2151 		return;
2152 	}
2153 	if (previous->un_member.rec_lock.space == space &&
2154 		previous->un_member.rec_lock.page_no == page_no) {
2155 		lock = previous;
2156 	}
2157 	else {
2158 		while (previous->hash &&
2159 				(previous->hash->un_member.rec_lock.space != space ||
2160 				previous->hash->un_member.rec_lock.page_no != page_no)) {
2161 					previous = previous->hash;
2162 		}
2163 		lock = previous->hash;
2164 	}
2165 
2166 	ut_ad(previous->hash == lock || previous == lock);
2167 	/* Grant locks if there are no conflicting locks ahead.
2168 	 Move granted locks to the head of the list. */
2169 	while (lock) {
2170 		/* If the lock is a wait lock on this page, and it does not need to wait. */
2171 		ut_ad(!lock->trx->is_wsrep());
2172 		if (lock_get_wait(lock)
2173 		    && lock->un_member.rec_lock.space == space
2174 		    && lock->un_member.rec_lock.page_no == page_no
2175 		    && !lock_rec_has_to_wait_in_queue(lock)) {
2176 			lock_grant(lock);
2177 
2178 			if (previous != NULL) {
2179 				/* Move the lock to the head of the list. */
2180 				HASH_GET_NEXT(hash, previous) = HASH_GET_NEXT(hash, lock);
2181 				lock_rec_insert_to_head(lock, rec_fold);
2182 			} else {
2183 				/* Already at the head of the list. */
2184 				previous = lock;
2185 			}
2186 			/* Move on to the next lock. */
2187 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, previous));
2188 		} else {
2189 			previous = lock;
2190 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, lock));
2191 		}
2192 	}
2193 }
2194 
2195 /** Remove a record lock request, waiting or granted, from the queue and
2196 grant locks to other transactions in the queue if they now are entitled
2197 to a lock. NOTE: all record locks contained in in_lock are removed.
2198 @param[in,out]	in_lock		record lock */
lock_rec_dequeue_from_page(lock_t * in_lock)2199 static void lock_rec_dequeue_from_page(lock_t* in_lock)
2200 {
2201 	ulint		space;
2202 	ulint		page_no;
2203 	hash_table_t*	lock_hash;
2204 
2205 	ut_ad(lock_mutex_own());
2206 	ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
2207 	/* We may or may not be holding in_lock->trx->mutex here. */
2208 
2209 	space = in_lock->un_member.rec_lock.space;
2210 	page_no = in_lock->un_member.rec_lock.page_no;
2211 
2212 	in_lock->index->table->n_rec_locks--;
2213 
2214 	lock_hash = lock_hash_get(in_lock->type_mode);
2215 
2216 	ulint rec_fold = lock_rec_fold(space, page_no);
2217 
2218 	HASH_DELETE(lock_t, hash, lock_hash, rec_fold, in_lock);
2219 	UT_LIST_REMOVE(in_lock->trx->lock.trx_locks, in_lock);
2220 
2221 	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
2222 	MONITOR_DEC(MONITOR_NUM_RECLOCK);
2223 
2224 	if (innodb_lock_schedule_algorithm
2225 	    == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS
2226 	    || lock_hash != lock_sys.rec_hash
2227 	    || thd_is_replication_slave_thread(in_lock->trx->mysql_thd)) {
2228 		/* Check if waiting locks in the queue can now be granted:
2229 		grant locks if there are no conflicting locks ahead. Stop at
2230 		the first X lock that is waiting or has been granted. */
2231 
2232 		for (lock_t* lock = lock_rec_get_first_on_page_addr(
2233 			     lock_hash, space, page_no);
2234 		     lock != NULL;
2235 		     lock = lock_rec_get_next_on_page(lock)) {
2236 
2237 			if (!lock_get_wait(lock)) {
2238 				continue;
2239 			}
2240 			const lock_t* c = lock_rec_has_to_wait_in_queue(lock);
2241 			if (!c) {
2242 				/* Grant the lock */
2243 				ut_ad(lock->trx != in_lock->trx);
2244 				lock_grant(lock);
2245 			}
2246 		}
2247 	} else {
2248 		lock_grant_and_move_on_page(rec_fold, space, page_no);
2249 	}
2250 }
2251 
2252 /*************************************************************//**
2253 Removes a record lock request, waiting or granted, from the queue. */
2254 void
lock_rec_discard(lock_t * in_lock)2255 lock_rec_discard(
2256 /*=============*/
2257 	lock_t*		in_lock)	/*!< in: record lock object: all
2258 					record locks which are contained
2259 					in this lock object are removed */
2260 {
2261 	ulint		space;
2262 	ulint		page_no;
2263 	trx_lock_t*	trx_lock;
2264 
2265 	ut_ad(lock_mutex_own());
2266 	ut_ad(lock_get_type_low(in_lock) == LOCK_REC);
2267 
2268 	trx_lock = &in_lock->trx->lock;
2269 
2270 	space = in_lock->un_member.rec_lock.space;
2271 	page_no = in_lock->un_member.rec_lock.page_no;
2272 
2273 	in_lock->index->table->n_rec_locks--;
2274 
2275 	HASH_DELETE(lock_t, hash, lock_hash_get(in_lock->type_mode),
2276 			    lock_rec_fold(space, page_no), in_lock);
2277 
2278 	UT_LIST_REMOVE(trx_lock->trx_locks, in_lock);
2279 
2280 	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
2281 	MONITOR_DEC(MONITOR_NUM_RECLOCK);
2282 }
2283 
2284 /*************************************************************//**
2285 Removes record lock objects set on an index page which is discarded. This
2286 function does not move locks, or check for waiting locks, therefore the
2287 lock bitmaps must already be reset when this function is called. */
2288 static
2289 void
lock_rec_free_all_from_discard_page_low(ulint space,ulint page_no,hash_table_t * lock_hash)2290 lock_rec_free_all_from_discard_page_low(
2291 /*====================================*/
2292 	ulint		space,
2293 	ulint		page_no,
2294 	hash_table_t*	lock_hash)
2295 {
2296 	lock_t*	lock;
2297 	lock_t*	next_lock;
2298 
2299 	lock = lock_rec_get_first_on_page_addr(lock_hash, space, page_no);
2300 
2301 	while (lock != NULL) {
2302 		ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2303 		ut_ad(!lock_get_wait(lock));
2304 
2305 		next_lock = lock_rec_get_next_on_page(lock);
2306 
2307 		lock_rec_discard(lock);
2308 
2309 		lock = next_lock;
2310 	}
2311 }
2312 
2313 /*************************************************************//**
2314 Removes record lock objects set on an index page which is discarded. This
2315 function does not move locks, or check for waiting locks, therefore the
2316 lock bitmaps must already be reset when this function is called. */
2317 void
lock_rec_free_all_from_discard_page(const buf_block_t * block)2318 lock_rec_free_all_from_discard_page(
2319 /*================================*/
2320 	const buf_block_t*	block)	/*!< in: page to be discarded */
2321 {
2322 	ulint	space;
2323 	ulint	page_no;
2324 
2325 	ut_ad(lock_mutex_own());
2326 
2327 	space = block->page.id.space();
2328 	page_no = block->page.id.page_no();
2329 
2330 	lock_rec_free_all_from_discard_page_low(
2331 		space, page_no, lock_sys.rec_hash);
2332 	lock_rec_free_all_from_discard_page_low(
2333 		space, page_no, lock_sys.prdt_hash);
2334 	lock_rec_free_all_from_discard_page_low(
2335 		space, page_no, lock_sys.prdt_page_hash);
2336 }
2337 
2338 /*============= RECORD LOCK MOVING AND INHERITING ===================*/
2339 
2340 /*************************************************************//**
2341 Resets the lock bits for a single record. Releases transactions waiting for
2342 lock requests here. */
2343 static
2344 void
lock_rec_reset_and_release_wait_low(hash_table_t * hash,const buf_block_t * block,ulint heap_no)2345 lock_rec_reset_and_release_wait_low(
2346 /*================================*/
2347 	hash_table_t*		hash,	/*!< in: hash table */
2348 	const buf_block_t*	block,	/*!< in: buffer block containing
2349 					the record */
2350 	ulint			heap_no)/*!< in: heap number of record */
2351 {
2352 	lock_t*	lock;
2353 
2354 	ut_ad(lock_mutex_own());
2355 
2356 	for (lock = lock_rec_get_first(hash, block, heap_no);
2357 	     lock != NULL;
2358 	     lock = lock_rec_get_next(heap_no, lock)) {
2359 
2360 		if (lock_get_wait(lock)) {
2361 			lock_rec_cancel(lock);
2362 		} else {
2363 			lock_rec_reset_nth_bit(lock, heap_no);
2364 		}
2365 	}
2366 }
2367 
2368 /*************************************************************//**
2369 Resets the lock bits for a single record. Releases transactions waiting for
2370 lock requests here. */
2371 static
2372 void
lock_rec_reset_and_release_wait(const buf_block_t * block,ulint heap_no)2373 lock_rec_reset_and_release_wait(
2374 /*============================*/
2375 	const buf_block_t*	block,	/*!< in: buffer block containing
2376 					the record */
2377 	ulint			heap_no)/*!< in: heap number of record */
2378 {
2379 	lock_rec_reset_and_release_wait_low(
2380 		lock_sys.rec_hash, block, heap_no);
2381 
2382 	lock_rec_reset_and_release_wait_low(
2383 		lock_sys.prdt_hash, block, PAGE_HEAP_NO_INFIMUM);
2384 	lock_rec_reset_and_release_wait_low(
2385 		lock_sys.prdt_page_hash, block, PAGE_HEAP_NO_INFIMUM);
2386 }
2387 
2388 /*************************************************************//**
2389 Makes a record to inherit the locks (except LOCK_INSERT_INTENTION type)
2390 of another record as gap type locks, but does not reset the lock bits of
2391 the other record. Also waiting lock requests on rec are inherited as
2392 GRANTED gap locks. */
2393 static
2394 void
lock_rec_inherit_to_gap(const buf_block_t * heir_block,const buf_block_t * block,ulint heir_heap_no,ulint heap_no)2395 lock_rec_inherit_to_gap(
2396 /*====================*/
2397 	const buf_block_t*	heir_block,	/*!< in: block containing the
2398 						record which inherits */
2399 	const buf_block_t*	block,		/*!< in: block containing the
2400 						record from which inherited;
2401 						does NOT reset the locks on
2402 						this record */
2403 	ulint			heir_heap_no,	/*!< in: heap_no of the
2404 						inheriting record */
2405 	ulint			heap_no)	/*!< in: heap_no of the
2406 						donating record */
2407 {
2408 	lock_t*	lock;
2409 
2410 	ut_ad(lock_mutex_own());
2411 
2412 	/* If srv_locks_unsafe_for_binlog is TRUE or session is using
2413 	READ COMMITTED isolation level, we do not want locks set
2414 	by an UPDATE or a DELETE to be inherited as gap type locks. But we
2415 	DO want S-locks/X-locks(taken for replace) set by a consistency
2416 	constraint to be inherited also then. */
2417 
2418 	for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
2419 	     lock != NULL;
2420 	     lock = lock_rec_get_next(heap_no, lock)) {
2421 
2422 		if (!lock_rec_get_insert_intention(lock)
2423 		    && !((srv_locks_unsafe_for_binlog
2424 			  || lock->trx->isolation_level
2425 			  <= TRX_ISO_READ_COMMITTED)
2426 			 && lock_get_mode(lock) ==
2427 			 (lock->trx->duplicates ? LOCK_S : LOCK_X))) {
2428 			lock_rec_add_to_queue(
2429 				LOCK_REC | LOCK_GAP
2430 				| ulint(lock_get_mode(lock)),
2431 				heir_block, heir_heap_no, lock->index,
2432 				lock->trx, FALSE);
2433 		}
2434 	}
2435 }
2436 
2437 /*************************************************************//**
2438 Makes a record to inherit the gap locks (except LOCK_INSERT_INTENTION type)
2439 of another record as gap type locks, but does not reset the lock bits of the
2440 other record. Also waiting lock requests are inherited as GRANTED gap locks. */
2441 static
2442 void
lock_rec_inherit_to_gap_if_gap_lock(const buf_block_t * block,ulint heir_heap_no,ulint heap_no)2443 lock_rec_inherit_to_gap_if_gap_lock(
2444 /*================================*/
2445 	const buf_block_t*	block,		/*!< in: buffer block */
2446 	ulint			heir_heap_no,	/*!< in: heap_no of
2447 						record which inherits */
2448 	ulint			heap_no)	/*!< in: heap_no of record
2449 						from which inherited;
2450 						does NOT reset the locks
2451 						on this record */
2452 {
2453 	lock_t*	lock;
2454 
2455 	lock_mutex_enter();
2456 
2457 	for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
2458 	     lock != NULL;
2459 	     lock = lock_rec_get_next(heap_no, lock)) {
2460 
2461 		if (!lock_rec_get_insert_intention(lock)
2462 		    && (heap_no == PAGE_HEAP_NO_SUPREMUM
2463 			|| !lock_rec_get_rec_not_gap(lock))) {
2464 
2465 			lock_rec_add_to_queue(
2466 				LOCK_REC | LOCK_GAP
2467 				| ulint(lock_get_mode(lock)),
2468 				block, heir_heap_no, lock->index,
2469 				lock->trx, FALSE);
2470 		}
2471 	}
2472 
2473 	lock_mutex_exit();
2474 }
2475 
2476 /*************************************************************//**
2477 Moves the locks of a record to another record and resets the lock bits of
2478 the donating record. */
2479 static
2480 void
lock_rec_move_low(hash_table_t * lock_hash,const buf_block_t * receiver,const buf_block_t * donator,ulint receiver_heap_no,ulint donator_heap_no)2481 lock_rec_move_low(
2482 /*==============*/
2483 	hash_table_t*		lock_hash,	/*!< in: hash table to use */
2484 	const buf_block_t*	receiver,	/*!< in: buffer block containing
2485 						the receiving record */
2486 	const buf_block_t*	donator,	/*!< in: buffer block containing
2487 						the donating record */
2488 	ulint			receiver_heap_no,/*!< in: heap_no of the record
2489 						which gets the locks; there
2490 						must be no lock requests
2491 						on it! */
2492 	ulint			donator_heap_no)/*!< in: heap_no of the record
2493 						which gives the locks */
2494 {
2495 	lock_t*	lock;
2496 
2497 	ut_ad(lock_mutex_own());
2498 
2499 	/* If the lock is predicate lock, it resides on INFIMUM record */
2500 	ut_ad(lock_rec_get_first(
2501 		lock_hash, receiver, receiver_heap_no) == NULL
2502 	      || lock_hash == lock_sys.prdt_hash
2503 	      || lock_hash == lock_sys.prdt_page_hash);
2504 
2505 	for (lock = lock_rec_get_first(lock_hash,
2506 				       donator, donator_heap_no);
2507 	     lock != NULL;
2508 	     lock = lock_rec_get_next(donator_heap_no, lock)) {
2509 
2510 		const ulint	type_mode = lock->type_mode;
2511 
2512 		lock_rec_reset_nth_bit(lock, donator_heap_no);
2513 
2514 		if (type_mode & LOCK_WAIT) {
2515 			lock_reset_lock_and_trx_wait(lock);
2516 		}
2517 
2518 		/* Note that we FIRST reset the bit, and then set the lock:
2519 		the function works also if donator == receiver */
2520 
2521 		lock_rec_add_to_queue(
2522 			type_mode, receiver, receiver_heap_no,
2523 			lock->index, lock->trx, FALSE);
2524 	}
2525 
2526 	ut_ad(lock_rec_get_first(lock_sys.rec_hash,
2527 				 donator, donator_heap_no) == NULL);
2528 }
2529 
2530 /** Move all the granted locks to the front of the given lock list.
2531 All the waiting locks will be at the end of the list.
2532 @param[in,out]	lock_list	the given lock list.  */
2533 static
2534 void
lock_move_granted_locks_to_front(UT_LIST_BASE_NODE_T (lock_t)& lock_list)2535 lock_move_granted_locks_to_front(
2536 	UT_LIST_BASE_NODE_T(lock_t)&	lock_list)
2537 {
2538 	lock_t*	lock;
2539 
2540 	bool seen_waiting_lock = false;
2541 
2542 	for (lock = UT_LIST_GET_FIRST(lock_list); lock;
2543 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
2544 
2545 		if (!seen_waiting_lock) {
2546 			if (lock->is_waiting()) {
2547 				seen_waiting_lock = true;
2548 			}
2549 			continue;
2550 		}
2551 
2552 		ut_ad(seen_waiting_lock);
2553 
2554 		if (!lock->is_waiting()) {
2555 			lock_t* prev = UT_LIST_GET_PREV(trx_locks, lock);
2556 			ut_a(prev);
2557 			ut_list_move_to_front(lock_list, lock);
2558 			lock = prev;
2559 		}
2560 	}
2561 }
2562 
2563 /*************************************************************//**
2564 Moves the locks of a record to another record and resets the lock bits of
2565 the donating record. */
2566 UNIV_INLINE
2567 void
lock_rec_move(const buf_block_t * receiver,const buf_block_t * donator,ulint receiver_heap_no,ulint donator_heap_no)2568 lock_rec_move(
2569 /*==========*/
2570 	const buf_block_t*	receiver,       /*!< in: buffer block containing
2571 						the receiving record */
2572 	const buf_block_t*	donator,        /*!< in: buffer block containing
2573 						the donating record */
2574 	ulint			receiver_heap_no,/*!< in: heap_no of the record
2575 						which gets the locks; there
2576 						must be no lock requests
2577 						on it! */
2578 	ulint			donator_heap_no)/*!< in: heap_no of the record
2579                                                 which gives the locks */
2580 {
2581 	lock_rec_move_low(lock_sys.rec_hash, receiver, donator,
2582 			  receiver_heap_no, donator_heap_no);
2583 }
2584 
2585 /*************************************************************//**
2586 Updates the lock table when we have reorganized a page. NOTE: we copy
2587 also the locks set on the infimum of the page; the infimum may carry
2588 locks if an update of a record is occurring on the page, and its locks
2589 were temporarily stored on the infimum. */
2590 void
lock_move_reorganize_page(const buf_block_t * block,const buf_block_t * oblock)2591 lock_move_reorganize_page(
2592 /*======================*/
2593 	const buf_block_t*	block,	/*!< in: old index page, now
2594 					reorganized */
2595 	const buf_block_t*	oblock)	/*!< in: copy of the old, not
2596 					reorganized page */
2597 {
2598 	lock_t*		lock;
2599 	UT_LIST_BASE_NODE_T(lock_t)	old_locks;
2600 	mem_heap_t*	heap		= NULL;
2601 	ulint		comp;
2602 
2603 	lock_mutex_enter();
2604 
2605 	/* FIXME: This needs to deal with predicate lock too */
2606 	lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block);
2607 
2608 	if (lock == NULL) {
2609 		lock_mutex_exit();
2610 
2611 		return;
2612 	}
2613 
2614 	heap = mem_heap_create(256);
2615 
2616 	/* Copy first all the locks on the page to heap and reset the
2617 	bitmaps in the original locks; chain the copies of the locks
2618 	using the trx_locks field in them. */
2619 
2620 	UT_LIST_INIT(old_locks, &lock_t::trx_locks);
2621 
2622 	do {
2623 		/* Make a copy of the lock */
2624 		lock_t*	old_lock = lock_rec_copy(lock, heap);
2625 
2626 		UT_LIST_ADD_LAST(old_locks, old_lock);
2627 
2628 		/* Reset bitmap of lock */
2629 		lock_rec_bitmap_reset(lock);
2630 
2631 		if (lock_get_wait(lock)) {
2632 
2633 			lock_reset_lock_and_trx_wait(lock);
2634 		}
2635 
2636 		lock = lock_rec_get_next_on_page(lock);
2637 	} while (lock != NULL);
2638 
2639 	comp = page_is_comp(block->frame);
2640 	ut_ad(comp == page_is_comp(oblock->frame));
2641 
2642 	lock_move_granted_locks_to_front(old_locks);
2643 
2644 	DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize",
2645 			ut_list_reverse(old_locks););
2646 
2647 	for (lock = UT_LIST_GET_FIRST(old_locks); lock;
2648 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
2649 
2650 		/* NOTE: we copy also the locks set on the infimum and
2651 		supremum of the page; the infimum may carry locks if an
2652 		update of a record is occurring on the page, and its locks
2653 		were temporarily stored on the infimum */
2654 		const rec_t*	rec1 = page_get_infimum_rec(
2655 			buf_block_get_frame(block));
2656 		const rec_t*	rec2 = page_get_infimum_rec(
2657 			buf_block_get_frame(oblock));
2658 
2659 		/* Set locks according to old locks */
2660 		for (;;) {
2661 			ulint	old_heap_no;
2662 			ulint	new_heap_no;
2663 			ut_d(const rec_t* const orec = rec1);
2664 			ut_ad(page_rec_is_metadata(rec1)
2665 			      == page_rec_is_metadata(rec2));
2666 
2667 			if (comp) {
2668 				old_heap_no = rec_get_heap_no_new(rec2);
2669 				new_heap_no = rec_get_heap_no_new(rec1);
2670 
2671 				rec1 = page_rec_get_next_low(rec1, TRUE);
2672 				rec2 = page_rec_get_next_low(rec2, TRUE);
2673 			} else {
2674 				old_heap_no = rec_get_heap_no_old(rec2);
2675 				new_heap_no = rec_get_heap_no_old(rec1);
2676 				ut_ad(!memcmp(rec1, rec2,
2677 					      rec_get_data_size_old(rec2)));
2678 
2679 				rec1 = page_rec_get_next_low(rec1, FALSE);
2680 				rec2 = page_rec_get_next_low(rec2, FALSE);
2681 			}
2682 
2683 			/* Clear the bit in old_lock. */
2684 			if (old_heap_no < lock->un_member.rec_lock.n_bits
2685 			    && lock_rec_reset_nth_bit(lock, old_heap_no)) {
2686 				ut_ad(!page_rec_is_metadata(orec));
2687 
2688 				/* NOTE that the old lock bitmap could be too
2689 				small for the new heap number! */
2690 
2691 				lock_rec_add_to_queue(
2692 					lock->type_mode, block, new_heap_no,
2693 					lock->index, lock->trx, FALSE);
2694 			}
2695 
2696 			if (new_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2697 				ut_ad(old_heap_no == PAGE_HEAP_NO_SUPREMUM);
2698 				break;
2699 			}
2700 		}
2701 
2702 		ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2703 	}
2704 
2705 	lock_mutex_exit();
2706 
2707 	mem_heap_free(heap);
2708 
2709 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2710 	ut_ad(lock_rec_validate_page(block));
2711 #endif
2712 }
2713 
2714 /*************************************************************//**
2715 Moves the explicit locks on user records to another page if a record
2716 list end is moved to another page. */
2717 void
lock_move_rec_list_end(const buf_block_t * new_block,const buf_block_t * block,const rec_t * rec)2718 lock_move_rec_list_end(
2719 /*===================*/
2720 	const buf_block_t*	new_block,	/*!< in: index page to move to */
2721 	const buf_block_t*	block,		/*!< in: index page */
2722 	const rec_t*		rec)		/*!< in: record on page: this
2723 						is the first record moved */
2724 {
2725 	lock_t*		lock;
2726 	const ulint	comp	= page_rec_is_comp(rec);
2727 
2728 	ut_ad(buf_block_get_frame(block) == page_align(rec));
2729 	ut_ad(comp == page_is_comp(buf_block_get_frame(new_block)));
2730 
2731 	lock_mutex_enter();
2732 
2733 	/* Note: when we move locks from record to record, waiting locks
2734 	and possible granted gap type locks behind them are enqueued in
2735 	the original order, because new elements are inserted to a hash
2736 	table to the end of the hash chain, and lock_rec_add_to_queue
2737 	does not reuse locks if there are waiters in the queue. */
2738 
2739 	for (lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block); lock;
2740 	     lock = lock_rec_get_next_on_page(lock)) {
2741 		const rec_t*	rec1	= rec;
2742 		const rec_t*	rec2;
2743 		const ulint	type_mode = lock->type_mode;
2744 
2745 		if (comp) {
2746 			if (page_offset(rec1) == PAGE_NEW_INFIMUM) {
2747 				rec1 = page_rec_get_next_low(rec1, TRUE);
2748 			}
2749 
2750 			rec2 = page_rec_get_next_low(
2751 				buf_block_get_frame(new_block)
2752 				+ PAGE_NEW_INFIMUM, TRUE);
2753 		} else {
2754 			if (page_offset(rec1) == PAGE_OLD_INFIMUM) {
2755 				rec1 = page_rec_get_next_low(rec1, FALSE);
2756 			}
2757 
2758 			rec2 = page_rec_get_next_low(
2759 				buf_block_get_frame(new_block)
2760 				+ PAGE_OLD_INFIMUM, FALSE);
2761 		}
2762 
2763 		/* Copy lock requests on user records to new page and
2764 		reset the lock bits on the old */
2765 
2766 		for (;;) {
2767 			ut_ad(page_rec_is_metadata(rec1)
2768 			      == page_rec_is_metadata(rec2));
2769 			ut_d(const rec_t* const orec = rec1);
2770 
2771 			ulint	rec1_heap_no;
2772 			ulint	rec2_heap_no;
2773 
2774 			if (comp) {
2775 				rec1_heap_no = rec_get_heap_no_new(rec1);
2776 
2777 				if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2778 					break;
2779 				}
2780 
2781 				rec2_heap_no = rec_get_heap_no_new(rec2);
2782 				rec1 = page_rec_get_next_low(rec1, TRUE);
2783 				rec2 = page_rec_get_next_low(rec2, TRUE);
2784 			} else {
2785 				rec1_heap_no = rec_get_heap_no_old(rec1);
2786 
2787 				if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2788 					break;
2789 				}
2790 
2791 				rec2_heap_no = rec_get_heap_no_old(rec2);
2792 
2793 				ut_ad(rec_get_data_size_old(rec1)
2794 				      == rec_get_data_size_old(rec2));
2795 
2796 				ut_ad(!memcmp(rec1, rec2,
2797 					      rec_get_data_size_old(rec1)));
2798 
2799 				rec1 = page_rec_get_next_low(rec1, FALSE);
2800 				rec2 = page_rec_get_next_low(rec2, FALSE);
2801 			}
2802 
2803 			if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2804 			    && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2805 				ut_ad(!page_rec_is_metadata(orec));
2806 
2807 				if (type_mode & LOCK_WAIT) {
2808 					lock_reset_lock_and_trx_wait(lock);
2809 				}
2810 
2811 				lock_rec_add_to_queue(
2812 					type_mode, new_block, rec2_heap_no,
2813 					lock->index, lock->trx, FALSE);
2814 			}
2815 		}
2816 	}
2817 
2818 	lock_mutex_exit();
2819 
2820 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2821 	ut_ad(lock_rec_validate_page(block));
2822 	ut_ad(lock_rec_validate_page(new_block));
2823 #endif
2824 }
2825 
2826 /*************************************************************//**
2827 Moves the explicit locks on user records to another page if a record
2828 list start is moved to another page. */
2829 void
lock_move_rec_list_start(const buf_block_t * new_block,const buf_block_t * block,const rec_t * rec,const rec_t * old_end)2830 lock_move_rec_list_start(
2831 /*=====================*/
2832 	const buf_block_t*	new_block,	/*!< in: index page to
2833 						move to */
2834 	const buf_block_t*	block,		/*!< in: index page */
2835 	const rec_t*		rec,		/*!< in: record on page:
2836 						this is the first
2837 						record NOT copied */
2838 	const rec_t*		old_end)	/*!< in: old
2839 						previous-to-last
2840 						record on new_page
2841 						before the records
2842 						were copied */
2843 {
2844 	lock_t*		lock;
2845 	const ulint	comp	= page_rec_is_comp(rec);
2846 
2847 	ut_ad(block->frame == page_align(rec));
2848 	ut_ad(new_block->frame == page_align(old_end));
2849 	ut_ad(comp == page_rec_is_comp(old_end));
2850 	ut_ad(!page_rec_is_metadata(rec));
2851 
2852 	lock_mutex_enter();
2853 
2854 	for (lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block); lock;
2855 	     lock = lock_rec_get_next_on_page(lock)) {
2856 		const rec_t*	rec1;
2857 		const rec_t*	rec2;
2858 		const ulint	type_mode = lock->type_mode;
2859 
2860 		if (comp) {
2861 			rec1 = page_rec_get_next_low(
2862 				buf_block_get_frame(block)
2863 				+ PAGE_NEW_INFIMUM, TRUE);
2864 			rec2 = page_rec_get_next_low(old_end, TRUE);
2865 		} else {
2866 			rec1 = page_rec_get_next_low(
2867 				buf_block_get_frame(block)
2868 				+ PAGE_OLD_INFIMUM, FALSE);
2869 			rec2 = page_rec_get_next_low(old_end, FALSE);
2870 		}
2871 
2872 		/* Copy lock requests on user records to new page and
2873 		reset the lock bits on the old */
2874 
2875 		while (rec1 != rec) {
2876 			ut_ad(page_rec_is_metadata(rec1)
2877 			      == page_rec_is_metadata(rec2));
2878 			ut_d(const rec_t* const prev = rec1);
2879 
2880 			ulint	rec1_heap_no;
2881 			ulint	rec2_heap_no;
2882 
2883 			if (comp) {
2884 				rec1_heap_no = rec_get_heap_no_new(rec1);
2885 				rec2_heap_no = rec_get_heap_no_new(rec2);
2886 
2887 				rec1 = page_rec_get_next_low(rec1, TRUE);
2888 				rec2 = page_rec_get_next_low(rec2, TRUE);
2889 			} else {
2890 				rec1_heap_no = rec_get_heap_no_old(rec1);
2891 				rec2_heap_no = rec_get_heap_no_old(rec2);
2892 
2893 				ut_ad(!memcmp(rec1, rec2,
2894 					      rec_get_data_size_old(rec2)));
2895 
2896 				rec1 = page_rec_get_next_low(rec1, FALSE);
2897 				rec2 = page_rec_get_next_low(rec2, FALSE);
2898 			}
2899 
2900 			if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2901 			    && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2902 				ut_ad(!page_rec_is_metadata(prev));
2903 
2904 				if (type_mode & LOCK_WAIT) {
2905 					lock_reset_lock_and_trx_wait(lock);
2906 				}
2907 
2908 				lock_rec_add_to_queue(
2909 					type_mode, new_block, rec2_heap_no,
2910 					lock->index, lock->trx, FALSE);
2911 			}
2912 		}
2913 
2914 #ifdef UNIV_DEBUG
2915 		if (page_rec_is_supremum(rec)) {
2916 			ulint	i;
2917 
2918 			for (i = PAGE_HEAP_NO_USER_LOW;
2919 			     i < lock_rec_get_n_bits(lock); i++) {
2920 				if (lock_rec_get_nth_bit(lock, i)) {
2921 					ib::fatal()
2922 						<< "lock_move_rec_list_start():"
2923 						<< i << " not moved in "
2924 						<<  (void*) lock;
2925 				}
2926 			}
2927 		}
2928 #endif /* UNIV_DEBUG */
2929 	}
2930 
2931 	lock_mutex_exit();
2932 
2933 #ifdef UNIV_DEBUG_LOCK_VALIDATE
2934 	ut_ad(lock_rec_validate_page(block));
2935 #endif
2936 }
2937 
2938 /*************************************************************//**
2939 Moves the explicit locks on user records to another page if a record
2940 list start is moved to another page. */
2941 void
lock_rtr_move_rec_list(const buf_block_t * new_block,const buf_block_t * block,rtr_rec_move_t * rec_move,ulint num_move)2942 lock_rtr_move_rec_list(
2943 /*===================*/
2944 	const buf_block_t*	new_block,	/*!< in: index page to
2945 						move to */
2946 	const buf_block_t*	block,		/*!< in: index page */
2947 	rtr_rec_move_t*		rec_move,       /*!< in: recording records
2948 						moved */
2949 	ulint			num_move)       /*!< in: num of rec to move */
2950 {
2951 	lock_t*		lock;
2952 	ulint		comp;
2953 
2954 	if (!num_move) {
2955 		return;
2956 	}
2957 
2958 	comp = page_rec_is_comp(rec_move[0].old_rec);
2959 
2960 	ut_ad(block->frame == page_align(rec_move[0].old_rec));
2961 	ut_ad(new_block->frame == page_align(rec_move[0].new_rec));
2962 	ut_ad(comp == page_rec_is_comp(rec_move[0].new_rec));
2963 
2964 	lock_mutex_enter();
2965 
2966 	for (lock = lock_rec_get_first_on_page(lock_sys.rec_hash, block); lock;
2967 	     lock = lock_rec_get_next_on_page(lock)) {
2968 		ulint		moved = 0;
2969 		const rec_t*	rec1;
2970 		const rec_t*	rec2;
2971 		const ulint	type_mode = lock->type_mode;
2972 
2973 		/* Copy lock requests on user records to new page and
2974 		reset the lock bits on the old */
2975 
2976 		while (moved < num_move) {
2977 			ulint	rec1_heap_no;
2978 			ulint	rec2_heap_no;
2979 
2980 			rec1 = rec_move[moved].old_rec;
2981 			rec2 = rec_move[moved].new_rec;
2982 			ut_ad(!page_rec_is_metadata(rec1));
2983 			ut_ad(!page_rec_is_metadata(rec2));
2984 
2985 			if (comp) {
2986 				rec1_heap_no = rec_get_heap_no_new(rec1);
2987 				rec2_heap_no = rec_get_heap_no_new(rec2);
2988 
2989 			} else {
2990 				rec1_heap_no = rec_get_heap_no_old(rec1);
2991 				rec2_heap_no = rec_get_heap_no_old(rec2);
2992 
2993 				ut_ad(!memcmp(rec1, rec2,
2994 					      rec_get_data_size_old(rec2)));
2995 			}
2996 
2997 			if (rec1_heap_no < lock->un_member.rec_lock.n_bits
2998 			    && lock_rec_reset_nth_bit(lock, rec1_heap_no)) {
2999 				if (type_mode & LOCK_WAIT) {
3000 					lock_reset_lock_and_trx_wait(lock);
3001 				}
3002 
3003 				lock_rec_add_to_queue(
3004 					type_mode, new_block, rec2_heap_no,
3005 					lock->index, lock->trx, FALSE);
3006 
3007 				rec_move[moved].moved = true;
3008 			}
3009 
3010 			moved++;
3011 		}
3012 	}
3013 
3014 	lock_mutex_exit();
3015 
3016 #ifdef UNIV_DEBUG_LOCK_VALIDATE
3017 	ut_ad(lock_rec_validate_page(block));
3018 #endif
3019 }
3020 /*************************************************************//**
3021 Updates the lock table when a page is split to the right. */
3022 void
lock_update_split_right(const buf_block_t * right_block,const buf_block_t * left_block)3023 lock_update_split_right(
3024 /*====================*/
3025 	const buf_block_t*	right_block,	/*!< in: right page */
3026 	const buf_block_t*	left_block)	/*!< in: left page */
3027 {
3028 	ulint	heap_no = lock_get_min_heap_no(right_block);
3029 
3030 	lock_mutex_enter();
3031 
3032 	/* Move the locks on the supremum of the left page to the supremum
3033 	of the right page */
3034 
3035 	lock_rec_move(right_block, left_block,
3036 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3037 
3038 	/* Inherit the locks to the supremum of left page from the successor
3039 	of the infimum on right page */
3040 
3041 	lock_rec_inherit_to_gap(left_block, right_block,
3042 				PAGE_HEAP_NO_SUPREMUM, heap_no);
3043 
3044 	lock_mutex_exit();
3045 }
3046 
3047 /*************************************************************//**
3048 Updates the lock table when a page is merged to the right. */
3049 void
lock_update_merge_right(const buf_block_t * right_block,const rec_t * orig_succ,const buf_block_t * left_block)3050 lock_update_merge_right(
3051 /*====================*/
3052 	const buf_block_t*	right_block,	/*!< in: right page to
3053 						which merged */
3054 	const rec_t*		orig_succ,	/*!< in: original
3055 						successor of infimum
3056 						on the right page
3057 						before merge */
3058 	const buf_block_t*	left_block)	/*!< in: merged index
3059 						page which will be
3060 						discarded */
3061 {
3062 	ut_ad(!page_rec_is_metadata(orig_succ));
3063 
3064 	lock_mutex_enter();
3065 
3066 	/* Inherit the locks from the supremum of the left page to the
3067 	original successor of infimum on the right page, to which the left
3068 	page was merged */
3069 
3070 	lock_rec_inherit_to_gap(right_block, left_block,
3071 				page_rec_get_heap_no(orig_succ),
3072 				PAGE_HEAP_NO_SUPREMUM);
3073 
3074 	/* Reset the locks on the supremum of the left page, releasing
3075 	waiting transactions */
3076 
3077 	lock_rec_reset_and_release_wait_low(
3078 		lock_sys.rec_hash, left_block, PAGE_HEAP_NO_SUPREMUM);
3079 
3080 	/* there should exist no page lock on the left page,
3081 	otherwise, it will be blocked from merge */
3082 	ut_ad(!lock_rec_get_first_on_page_addr(lock_sys.prdt_page_hash,
3083 					       left_block->page.id.space(),
3084 					       left_block->page.id.page_no()));
3085 
3086 	lock_rec_free_all_from_discard_page(left_block);
3087 
3088 	lock_mutex_exit();
3089 }
3090 
3091 /*************************************************************//**
3092 Updates the lock table when the root page is copied to another in
3093 btr_root_raise_and_insert. Note that we leave lock structs on the
3094 root page, even though they do not make sense on other than leaf
3095 pages: the reason is that in a pessimistic update the infimum record
3096 of the root page will act as a dummy carrier of the locks of the record
3097 to be updated. */
3098 void
lock_update_root_raise(const buf_block_t * block,const buf_block_t * root)3099 lock_update_root_raise(
3100 /*===================*/
3101 	const buf_block_t*	block,	/*!< in: index page to which copied */
3102 	const buf_block_t*	root)	/*!< in: root page */
3103 {
3104 	lock_mutex_enter();
3105 
3106 	/* Move the locks on the supremum of the root to the supremum
3107 	of block */
3108 
3109 	lock_rec_move(block, root,
3110 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3111 	lock_mutex_exit();
3112 }
3113 
3114 /*************************************************************//**
3115 Updates the lock table when a page is copied to another and the original page
3116 is removed from the chain of leaf pages, except if page is the root! */
3117 void
lock_update_copy_and_discard(const buf_block_t * new_block,const buf_block_t * block)3118 lock_update_copy_and_discard(
3119 /*=========================*/
3120 	const buf_block_t*	new_block,	/*!< in: index page to
3121 						which copied */
3122 	const buf_block_t*	block)		/*!< in: index page;
3123 						NOT the root! */
3124 {
3125 	lock_mutex_enter();
3126 
3127 	/* Move the locks on the supremum of the old page to the supremum
3128 	of new_page */
3129 
3130 	lock_rec_move(new_block, block,
3131 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3132 	lock_rec_free_all_from_discard_page(block);
3133 
3134 	lock_mutex_exit();
3135 }
3136 
3137 /*************************************************************//**
3138 Updates the lock table when a page is split to the left. */
3139 void
lock_update_split_left(const buf_block_t * right_block,const buf_block_t * left_block)3140 lock_update_split_left(
3141 /*===================*/
3142 	const buf_block_t*	right_block,	/*!< in: right page */
3143 	const buf_block_t*	left_block)	/*!< in: left page */
3144 {
3145 	ulint	heap_no = lock_get_min_heap_no(right_block);
3146 
3147 	lock_mutex_enter();
3148 
3149 	/* Inherit the locks to the supremum of the left page from the
3150 	successor of the infimum on the right page */
3151 
3152 	lock_rec_inherit_to_gap(left_block, right_block,
3153 				PAGE_HEAP_NO_SUPREMUM, heap_no);
3154 
3155 	lock_mutex_exit();
3156 }
3157 
3158 /*************************************************************//**
3159 Updates the lock table when a page is merged to the left. */
3160 void
lock_update_merge_left(const buf_block_t * left_block,const rec_t * orig_pred,const buf_block_t * right_block)3161 lock_update_merge_left(
3162 /*===================*/
3163 	const buf_block_t*	left_block,	/*!< in: left page to
3164 						which merged */
3165 	const rec_t*		orig_pred,	/*!< in: original predecessor
3166 						of supremum on the left page
3167 						before merge */
3168 	const buf_block_t*	right_block)	/*!< in: merged index page
3169 						which will be discarded */
3170 {
3171 	const rec_t*	left_next_rec;
3172 
3173 	ut_ad(left_block->frame == page_align(orig_pred));
3174 
3175 	lock_mutex_enter();
3176 
3177 	left_next_rec = page_rec_get_next_const(orig_pred);
3178 
3179 	if (!page_rec_is_supremum(left_next_rec)) {
3180 
3181 		/* Inherit the locks on the supremum of the left page to the
3182 		first record which was moved from the right page */
3183 
3184 		lock_rec_inherit_to_gap(left_block, left_block,
3185 					page_rec_get_heap_no(left_next_rec),
3186 					PAGE_HEAP_NO_SUPREMUM);
3187 
3188 		/* Reset the locks on the supremum of the left page,
3189 		releasing waiting transactions */
3190 
3191 		lock_rec_reset_and_release_wait_low(
3192 			lock_sys.rec_hash, left_block, PAGE_HEAP_NO_SUPREMUM);
3193 	}
3194 
3195 	/* Move the locks from the supremum of right page to the supremum
3196 	of the left page */
3197 
3198 	lock_rec_move(left_block, right_block,
3199 		      PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
3200 
3201 	/* there should exist no page lock on the right page,
3202 	otherwise, it will be blocked from merge */
3203 	ut_ad(!lock_rec_get_first_on_page_addr(
3204 		      lock_sys.prdt_page_hash,
3205 		      right_block->page.id.space(),
3206 		      right_block->page.id.page_no()));
3207 
3208 	lock_rec_free_all_from_discard_page(right_block);
3209 
3210 	lock_mutex_exit();
3211 }
3212 
3213 /*************************************************************//**
3214 Resets the original locks on heir and replaces them with gap type locks
3215 inherited from rec. */
3216 void
lock_rec_reset_and_inherit_gap_locks(const buf_block_t * heir_block,const buf_block_t * block,ulint heir_heap_no,ulint heap_no)3217 lock_rec_reset_and_inherit_gap_locks(
3218 /*=================================*/
3219 	const buf_block_t*	heir_block,	/*!< in: block containing the
3220 						record which inherits */
3221 	const buf_block_t*	block,		/*!< in: block containing the
3222 						record from which inherited;
3223 						does NOT reset the locks on
3224 						this record */
3225 	ulint			heir_heap_no,	/*!< in: heap_no of the
3226 						inheriting record */
3227 	ulint			heap_no)	/*!< in: heap_no of the
3228 						donating record */
3229 {
3230 	lock_mutex_enter();
3231 
3232 	lock_rec_reset_and_release_wait(heir_block, heir_heap_no);
3233 
3234 	lock_rec_inherit_to_gap(heir_block, block, heir_heap_no, heap_no);
3235 
3236 	lock_mutex_exit();
3237 }
3238 
3239 /*************************************************************//**
3240 Updates the lock table when a page is discarded. */
3241 void
lock_update_discard(const buf_block_t * heir_block,ulint heir_heap_no,const buf_block_t * block)3242 lock_update_discard(
3243 /*================*/
3244 	const buf_block_t*	heir_block,	/*!< in: index page
3245 						which will inherit the locks */
3246 	ulint			heir_heap_no,	/*!< in: heap_no of the record
3247 						which will inherit the locks */
3248 	const buf_block_t*	block)		/*!< in: index page
3249 						which will be discarded */
3250 {
3251 	const page_t*	page = block->frame;
3252 	const rec_t*	rec;
3253 	ulint		heap_no;
3254 
3255 	lock_mutex_enter();
3256 
3257 	if (lock_rec_get_first_on_page(lock_sys.rec_hash, block)) {
3258 		ut_ad(!lock_rec_get_first_on_page(lock_sys.prdt_hash, block));
3259 		ut_ad(!lock_rec_get_first_on_page(lock_sys.prdt_page_hash,
3260 						  block));
3261 		/* Inherit all the locks on the page to the record and
3262 		reset all the locks on the page */
3263 
3264 		if (page_is_comp(page)) {
3265 			rec = page + PAGE_NEW_INFIMUM;
3266 
3267 			do {
3268 				heap_no = rec_get_heap_no_new(rec);
3269 
3270 				lock_rec_inherit_to_gap(heir_block, block,
3271 							heir_heap_no, heap_no);
3272 
3273 				lock_rec_reset_and_release_wait(
3274 					block, heap_no);
3275 
3276 				rec = page + rec_get_next_offs(rec, TRUE);
3277 			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
3278 		} else {
3279 			rec = page + PAGE_OLD_INFIMUM;
3280 
3281 			do {
3282 				heap_no = rec_get_heap_no_old(rec);
3283 
3284 				lock_rec_inherit_to_gap(heir_block, block,
3285 							heir_heap_no, heap_no);
3286 
3287 				lock_rec_reset_and_release_wait(
3288 					block, heap_no);
3289 
3290 				rec = page + rec_get_next_offs(rec, FALSE);
3291 			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
3292 		}
3293 
3294 		lock_rec_free_all_from_discard_page_low(
3295 			block->page.id.space(), block->page.id.page_no(),
3296 			lock_sys.rec_hash);
3297 	} else {
3298 		lock_rec_free_all_from_discard_page_low(
3299 			block->page.id.space(), block->page.id.page_no(),
3300 			lock_sys.prdt_hash);
3301 		lock_rec_free_all_from_discard_page_low(
3302 			block->page.id.space(), block->page.id.page_no(),
3303 			lock_sys.prdt_page_hash);
3304 	}
3305 
3306 	lock_mutex_exit();
3307 }
3308 
3309 /*************************************************************//**
3310 Updates the lock table when a new user record is inserted. */
3311 void
lock_update_insert(const buf_block_t * block,const rec_t * rec)3312 lock_update_insert(
3313 /*===============*/
3314 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3315 	const rec_t*		rec)	/*!< in: the inserted record */
3316 {
3317 	ulint	receiver_heap_no;
3318 	ulint	donator_heap_no;
3319 
3320 	ut_ad(block->frame == page_align(rec));
3321 	ut_ad(!page_rec_is_metadata(rec));
3322 
3323 	/* Inherit the gap-locking locks for rec, in gap mode, from the next
3324 	record */
3325 
3326 	if (page_rec_is_comp(rec)) {
3327 		receiver_heap_no = rec_get_heap_no_new(rec);
3328 		donator_heap_no = rec_get_heap_no_new(
3329 			page_rec_get_next_low(rec, TRUE));
3330 	} else {
3331 		receiver_heap_no = rec_get_heap_no_old(rec);
3332 		donator_heap_no = rec_get_heap_no_old(
3333 			page_rec_get_next_low(rec, FALSE));
3334 	}
3335 
3336 	lock_rec_inherit_to_gap_if_gap_lock(
3337 		block, receiver_heap_no, donator_heap_no);
3338 }
3339 
3340 /*************************************************************//**
3341 Updates the lock table when a record is removed. */
3342 void
lock_update_delete(const buf_block_t * block,const rec_t * rec)3343 lock_update_delete(
3344 /*===============*/
3345 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3346 	const rec_t*		rec)	/*!< in: the record to be removed */
3347 {
3348 	const page_t*	page = block->frame;
3349 	ulint		heap_no;
3350 	ulint		next_heap_no;
3351 
3352 	ut_ad(page == page_align(rec));
3353 	ut_ad(!page_rec_is_metadata(rec));
3354 
3355 	if (page_is_comp(page)) {
3356 		heap_no = rec_get_heap_no_new(rec);
3357 		next_heap_no = rec_get_heap_no_new(page
3358 						   + rec_get_next_offs(rec,
3359 								       TRUE));
3360 	} else {
3361 		heap_no = rec_get_heap_no_old(rec);
3362 		next_heap_no = rec_get_heap_no_old(page
3363 						   + rec_get_next_offs(rec,
3364 								       FALSE));
3365 	}
3366 
3367 	lock_mutex_enter();
3368 
3369 	/* Let the next record inherit the locks from rec, in gap mode */
3370 
3371 	lock_rec_inherit_to_gap(block, block, next_heap_no, heap_no);
3372 
3373 	/* Reset the lock bits on rec and release waiting transactions */
3374 
3375 	lock_rec_reset_and_release_wait(block, heap_no);
3376 
3377 	lock_mutex_exit();
3378 }
3379 
3380 /*********************************************************************//**
3381 Stores on the page infimum record the explicit locks of another record.
3382 This function is used to store the lock state of a record when it is
3383 updated and the size of the record changes in the update. The record
3384 is moved in such an update, perhaps to another page. The infimum record
3385 acts as a dummy carrier record, taking care of lock releases while the
3386 actual record is being moved. */
3387 void
lock_rec_store_on_page_infimum(const buf_block_t * block,const rec_t * rec)3388 lock_rec_store_on_page_infimum(
3389 /*===========================*/
3390 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3391 	const rec_t*		rec)	/*!< in: record whose lock state
3392 					is stored on the infimum
3393 					record of the same page; lock
3394 					bits are reset on the
3395 					record */
3396 {
3397 	ulint	heap_no = page_rec_get_heap_no(rec);
3398 
3399 	ut_ad(block->frame == page_align(rec));
3400 
3401 	lock_mutex_enter();
3402 
3403 	lock_rec_move(block, block, PAGE_HEAP_NO_INFIMUM, heap_no);
3404 
3405 	lock_mutex_exit();
3406 }
3407 
3408 /*********************************************************************//**
3409 Restores the state of explicit lock requests on a single record, where the
3410 state was stored on the infimum of the page. */
3411 void
lock_rec_restore_from_page_infimum(const buf_block_t * block,const rec_t * rec,const buf_block_t * donator)3412 lock_rec_restore_from_page_infimum(
3413 /*===============================*/
3414 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
3415 	const rec_t*		rec,	/*!< in: record whose lock state
3416 					is restored */
3417 	const buf_block_t*	donator)/*!< in: page (rec is not
3418 					necessarily on this page)
3419 					whose infimum stored the lock
3420 					state; lock bits are reset on
3421 					the infimum */
3422 {
3423 	ulint	heap_no = page_rec_get_heap_no(rec);
3424 
3425 	lock_mutex_enter();
3426 
3427 	lock_rec_move(block, donator, heap_no, PAGE_HEAP_NO_INFIMUM);
3428 
3429 	lock_mutex_exit();
3430 }
3431 
3432 /*========================= TABLE LOCKS ==============================*/
3433 
3434 /** Functor for accessing the embedded node within a table lock. */
3435 struct TableLockGetNode {
operator ()TableLockGetNode3436 	ut_list_node<lock_t>& operator() (lock_t& elem)
3437 	{
3438 		return(elem.un_member.tab_lock.locks);
3439 	}
3440 };
3441 
3442 /*********************************************************************//**
3443 Creates a table lock object and adds it as the last in the lock queue
3444 of the table. Does NOT check for deadlocks or lock compatibility.
3445 @return own: new lock object */
3446 UNIV_INLINE
3447 lock_t*
lock_table_create(dict_table_t * table,ulint type_mode,trx_t * trx,lock_t * c_lock=NULL)3448 lock_table_create(
3449 /*==============*/
3450 	dict_table_t*	table,	/*!< in/out: database table
3451 				in dictionary cache */
3452 	ulint		type_mode,/*!< in: lock mode possibly ORed with
3453 				LOCK_WAIT */
3454 	trx_t*		trx	/*!< in: trx */
3455 #ifdef WITH_WSREP
3456 	, lock_t*	c_lock = NULL	/*!< in: conflicting lock */
3457 #endif
3458 	)
3459 {
3460 	lock_t*		lock;
3461 
3462 	ut_ad(table && trx);
3463 	ut_ad(lock_mutex_own());
3464 	ut_ad(trx_mutex_own(trx));
3465 	ut_ad(trx->is_recovered || trx->state == TRX_STATE_ACTIVE);
3466 	ut_ad(!trx->auto_commit || trx->will_lock);
3467 
3468 	if ((type_mode & LOCK_MODE_MASK) == LOCK_AUTO_INC) {
3469 		++table->n_waiting_or_granted_auto_inc_locks;
3470 	}
3471 
3472 	/* For AUTOINC locking we reuse the lock instance only if
3473 	there is no wait involved else we allocate the waiting lock
3474 	from the transaction lock heap. */
3475 	if (type_mode == LOCK_AUTO_INC) {
3476 
3477 		lock = table->autoinc_lock;
3478 
3479 		table->autoinc_trx = trx;
3480 
3481 		ib_vector_push(trx->autoinc_locks, &lock);
3482 
3483 	} else if (trx->lock.table_cached
3484 		   < UT_ARR_SIZE(trx->lock.table_pool)) {
3485 		lock = &trx->lock.table_pool[trx->lock.table_cached++];
3486 	} else {
3487 
3488 		lock = static_cast<lock_t*>(
3489 			mem_heap_alloc(trx->lock.lock_heap, sizeof(*lock)));
3490 
3491 	}
3492 
3493 	lock->type_mode = ib_uint32_t(type_mode | LOCK_TABLE);
3494 	lock->trx = trx;
3495 
3496 	lock->un_member.tab_lock.table = table;
3497 
3498 	ut_ad(table->get_ref_count() > 0 || !table->can_be_evicted);
3499 
3500 	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
3501 
3502 #ifdef WITH_WSREP
3503 	if (c_lock && trx->is_wsrep()) {
3504 		if (wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
3505 			ut_list_insert(table->locks, c_lock, lock,
3506 				       TableLockGetNode());
3507 			if (UNIV_UNLIKELY(wsrep_debug)) {
3508 				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
3509 				wsrep_report_bf_lock_wait(c_lock->trx->mysql_thd, c_lock->trx->id);
3510 			}
3511 		} else {
3512 			ut_list_append(table->locks, lock, TableLockGetNode());
3513 		}
3514 
3515 		trx_mutex_enter(c_lock->trx);
3516 
3517 		if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
3518 			c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
3519 
3520 			if (UNIV_UNLIKELY(wsrep_debug)) {
3521 				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
3522 				wsrep_report_bf_lock_wait(c_lock->trx->mysql_thd, c_lock->trx->id);
3523 				wsrep_print_wait_locks(c_lock);
3524 			}
3525 
3526 			/* The lock release will call lock_grant(),
3527 			which would acquire trx->mutex again. */
3528 			trx_mutex_exit(trx);
3529 			lock_cancel_waiting_and_release(
3530 				c_lock->trx->lock.wait_lock);
3531 			trx_mutex_enter(trx);
3532 		}
3533 
3534 		trx_mutex_exit(c_lock->trx);
3535 	} else
3536 #endif /* WITH_WSREP */
3537 	ut_list_append(table->locks, lock, TableLockGetNode());
3538 
3539 	if (type_mode & LOCK_WAIT) {
3540 
3541 		lock_set_lock_and_trx_wait(lock, trx);
3542 	}
3543 
3544 	lock->trx->lock.table_locks.push_back(lock);
3545 
3546 	MONITOR_INC(MONITOR_TABLELOCK_CREATED);
3547 	MONITOR_INC(MONITOR_NUM_TABLELOCK);
3548 
3549 	return(lock);
3550 }
3551 
3552 /*************************************************************//**
3553 Pops autoinc lock requests from the transaction's autoinc_locks. We
3554 handle the case where there are gaps in the array and they need to
3555 be popped off the stack. */
3556 UNIV_INLINE
3557 void
lock_table_pop_autoinc_locks(trx_t * trx)3558 lock_table_pop_autoinc_locks(
3559 /*=========================*/
3560 	trx_t*	trx)	/*!< in/out: transaction that owns the AUTOINC locks */
3561 {
3562 	ut_ad(lock_mutex_own());
3563 	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3564 
3565 	/* Skip any gaps, gaps are NULL lock entries in the
3566 	trx->autoinc_locks vector. */
3567 
3568 	do {
3569 		ib_vector_pop(trx->autoinc_locks);
3570 
3571 		if (ib_vector_is_empty(trx->autoinc_locks)) {
3572 			return;
3573 		}
3574 
3575 	} while (*(lock_t**) ib_vector_get_last(trx->autoinc_locks) == NULL);
3576 }
3577 
3578 /*************************************************************//**
3579 Removes an autoinc lock request from the transaction's autoinc_locks. */
3580 UNIV_INLINE
3581 void
lock_table_remove_autoinc_lock(lock_t * lock,trx_t * trx)3582 lock_table_remove_autoinc_lock(
3583 /*===========================*/
3584 	lock_t*	lock,	/*!< in: table lock */
3585 	trx_t*	trx)	/*!< in/out: transaction that owns the lock */
3586 {
3587 	lock_t*	autoinc_lock;
3588 	lint	i = ib_vector_size(trx->autoinc_locks) - 1;
3589 
3590 	ut_ad(lock_mutex_own());
3591 	ut_ad(lock_get_mode(lock) == LOCK_AUTO_INC);
3592 	ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
3593 	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3594 
3595 	/* With stored functions and procedures the user may drop
3596 	a table within the same "statement". This special case has
3597 	to be handled by deleting only those AUTOINC locks that were
3598 	held by the table being dropped. */
3599 
3600 	autoinc_lock = *static_cast<lock_t**>(
3601 		ib_vector_get(trx->autoinc_locks, i));
3602 
3603 	/* This is the default fast case. */
3604 
3605 	if (autoinc_lock == lock) {
3606 		lock_table_pop_autoinc_locks(trx);
3607 	} else {
3608 		/* The last element should never be NULL */
3609 		ut_a(autoinc_lock != NULL);
3610 
3611 		/* Handle freeing the locks from within the stack. */
3612 
3613 		while (--i >= 0) {
3614 			autoinc_lock = *static_cast<lock_t**>(
3615 				ib_vector_get(trx->autoinc_locks, i));
3616 
3617 			if (autoinc_lock == lock) {
3618 				void*	null_var = NULL;
3619 				ib_vector_set(trx->autoinc_locks, i, &null_var);
3620 				return;
3621 			}
3622 		}
3623 
3624 		/* Must find the autoinc lock. */
3625 		ut_error;
3626 	}
3627 }
3628 
3629 /*************************************************************//**
3630 Removes a table lock request from the queue and the trx list of locks;
3631 this is a low-level function which does NOT check if waiting requests
3632 can now be granted. */
3633 UNIV_INLINE
3634 void
lock_table_remove_low(lock_t * lock)3635 lock_table_remove_low(
3636 /*==================*/
3637 	lock_t*	lock)	/*!< in/out: table lock */
3638 {
3639 	trx_t*		trx;
3640 	dict_table_t*	table;
3641 
3642 	ut_ad(lock_mutex_own());
3643 
3644 	trx = lock->trx;
3645 	table = lock->un_member.tab_lock.table;
3646 
3647 	/* Remove the table from the transaction's AUTOINC vector, if
3648 	the lock that is being released is an AUTOINC lock. */
3649 	if (lock_get_mode(lock) == LOCK_AUTO_INC) {
3650 
3651 		/* The table's AUTOINC lock can get transferred to
3652 		another transaction before we get here. */
3653 		if (table->autoinc_trx == trx) {
3654 			table->autoinc_trx = NULL;
3655 		}
3656 
3657 		/* The locks must be freed in the reverse order from
3658 		the one in which they were acquired. This is to avoid
3659 		traversing the AUTOINC lock vector unnecessarily.
3660 
3661 		We only store locks that were granted in the
3662 		trx->autoinc_locks vector (see lock_table_create()
3663 		and lock_grant()). Therefore it can be empty and we
3664 		need to check for that. */
3665 
3666 		if (!lock_get_wait(lock)
3667 		    && !ib_vector_is_empty(trx->autoinc_locks)) {
3668 
3669 			lock_table_remove_autoinc_lock(lock, trx);
3670 		}
3671 
3672 		ut_a(table->n_waiting_or_granted_auto_inc_locks > 0);
3673 		table->n_waiting_or_granted_auto_inc_locks--;
3674 	}
3675 
3676 	UT_LIST_REMOVE(trx->lock.trx_locks, lock);
3677 	ut_list_remove(table->locks, lock, TableLockGetNode());
3678 
3679 	MONITOR_INC(MONITOR_TABLELOCK_REMOVED);
3680 	MONITOR_DEC(MONITOR_NUM_TABLELOCK);
3681 }
3682 
3683 /*********************************************************************//**
3684 Enqueues a waiting request for a table lock which cannot be granted
3685 immediately. Checks for deadlocks.
3686 @retval	DB_LOCK_WAIT	if the waiting lock was enqueued
3687 @retval	DB_DEADLOCK	if this transaction was chosen as the victim
3688 @retval	DB_SUCCESS	if the other transaction committed or aborted */
3689 static
3690 dberr_t
lock_table_enqueue_waiting(ulint mode,dict_table_t * table,que_thr_t * thr,lock_t * c_lock)3691 lock_table_enqueue_waiting(
3692 /*=======================*/
3693 	ulint		mode,	/*!< in: lock mode this transaction is
3694 				requesting */
3695 	dict_table_t*	table,	/*!< in/out: table */
3696 	que_thr_t*	thr	/*!< in: query thread */
3697 #ifdef WITH_WSREP
3698 	, lock_t*	c_lock	/*!< in: conflicting lock or NULL */
3699 #endif
3700 )
3701 {
3702 	trx_t*		trx;
3703 	lock_t*		lock;
3704 
3705 	ut_ad(lock_mutex_own());
3706 	ut_ad(!srv_read_only_mode);
3707 
3708 	trx = thr_get_trx(thr);
3709 	ut_ad(trx_mutex_own(trx));
3710 	ut_a(!que_thr_stop(thr));
3711 
3712 	switch (trx_get_dict_operation(trx)) {
3713 	case TRX_DICT_OP_NONE:
3714 		break;
3715 	case TRX_DICT_OP_TABLE:
3716 	case TRX_DICT_OP_INDEX:
3717 		ib::error() << "A table lock wait happens in a dictionary"
3718 			" operation. Table " << table->name
3719 			<< ". " << BUG_REPORT_MSG;
3720 		ut_ad(0);
3721 	}
3722 
3723 #ifdef WITH_WSREP
3724 	if (trx->is_wsrep() && trx->lock.was_chosen_as_deadlock_victim) {
3725 		return(DB_DEADLOCK);
3726 	}
3727 #endif /* WITH_WSREP */
3728 
3729 	/* Enqueue the lock request that will wait to be granted */
3730 	lock = lock_table_create(table, ulint(mode) | LOCK_WAIT, trx
3731 #ifdef WITH_WSREP
3732 				 , c_lock
3733 #endif
3734 				 );
3735 
3736 	const trx_t*	victim_trx =
3737 		DeadlockChecker::check_and_resolve(lock, trx);
3738 
3739 	if (victim_trx != 0) {
3740 		ut_ad(victim_trx == trx);
3741 
3742 		/* The order here is important, we don't want to
3743 		lose the state of the lock before calling remove. */
3744 		lock_table_remove_low(lock);
3745 		lock_reset_lock_and_trx_wait(lock);
3746 
3747 		return(DB_DEADLOCK);
3748 
3749 	} else if (trx->lock.wait_lock == NULL) {
3750 		/* Deadlock resolution chose another transaction as a victim,
3751 		and we accidentally got our lock granted! */
3752 
3753 		return(DB_SUCCESS);
3754 	}
3755 
3756 	trx->lock.que_state = TRX_QUE_LOCK_WAIT;
3757 
3758 	trx->lock.wait_started = time(NULL);
3759 	trx->lock.was_chosen_as_deadlock_victim = false;
3760 
3761 	ut_a(que_thr_stop(thr));
3762 
3763 	MONITOR_INC(MONITOR_TABLELOCK_WAIT);
3764 
3765 	return(DB_LOCK_WAIT);
3766 }
3767 
3768 /*********************************************************************//**
3769 Checks if other transactions have an incompatible mode lock request in
3770 the lock queue.
3771 @return lock or NULL */
3772 UNIV_INLINE
3773 lock_t*
lock_table_other_has_incompatible(const trx_t * trx,ulint wait,const dict_table_t * table,lock_mode mode)3774 lock_table_other_has_incompatible(
3775 /*==============================*/
3776 	const trx_t*		trx,	/*!< in: transaction, or NULL if all
3777 					transactions should be included */
3778 	ulint			wait,	/*!< in: LOCK_WAIT if also
3779 					waiting locks are taken into
3780 					account, or 0 if not */
3781 	const dict_table_t*	table,	/*!< in: table */
3782 	lock_mode		mode)	/*!< in: lock mode */
3783 {
3784 	lock_t*	lock;
3785 
3786 	ut_ad(lock_mutex_own());
3787 
3788 	for (lock = UT_LIST_GET_LAST(table->locks);
3789 	     lock != NULL;
3790 	     lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock)) {
3791 
3792 		if (lock->trx != trx
3793 		    && !lock_mode_compatible(lock_get_mode(lock), mode)
3794 		    && (wait || !lock_get_wait(lock))) {
3795 
3796 #ifdef WITH_WSREP
3797 			if (lock->trx->is_wsrep()) {
3798 				if (UNIV_UNLIKELY(wsrep_debug)) {
3799 					ib::info() << "WSREP: table lock abort for table:"
3800 						   << table->name;
3801 					ib::info() << " SQL: "
3802 					   << wsrep_thd_query(lock->trx->mysql_thd);
3803 				}
3804 				trx_mutex_enter(lock->trx);
3805 				wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
3806 				trx_mutex_exit(lock->trx);
3807 			}
3808 #endif /* WITH_WSREP */
3809 
3810 			return(lock);
3811 		}
3812 	}
3813 
3814 	return(NULL);
3815 }
3816 
3817 /*********************************************************************//**
3818 Locks the specified database table in the mode given. If the lock cannot
3819 be granted immediately, the query thread is put to wait.
3820 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
3821 dberr_t
lock_table(ulint flags,dict_table_t * table,lock_mode mode,que_thr_t * thr)3822 lock_table(
3823 /*=======*/
3824 	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG bit is set,
3825 				does nothing */
3826 	dict_table_t*	table,	/*!< in/out: database table
3827 				in dictionary cache */
3828 	lock_mode	mode,	/*!< in: lock mode */
3829 	que_thr_t*	thr)	/*!< in: query thread */
3830 {
3831 	trx_t*		trx;
3832 	dberr_t		err;
3833 	lock_t*		wait_for;
3834 
3835 	ut_ad(table && thr);
3836 
3837 	/* Given limited visibility of temp-table we can avoid
3838 	locking overhead */
3839 	if ((flags & BTR_NO_LOCKING_FLAG)
3840 	    || srv_read_only_mode
3841 	    || table->is_temporary()) {
3842 
3843 		return(DB_SUCCESS);
3844 	}
3845 
3846 	ut_a(flags == 0);
3847 
3848 	trx = thr_get_trx(thr);
3849 
3850 	/* Look for equal or stronger locks the same trx already
3851 	has on the table. No need to acquire the lock mutex here
3852 	because only this transacton can add/access table locks
3853 	to/from trx_t::table_locks. */
3854 
3855 	if (lock_table_has(trx, table, mode)) {
3856 
3857 		return(DB_SUCCESS);
3858 	}
3859 
3860 	/* Read only transactions can write to temp tables, we don't want
3861 	to promote them to RW transactions. Their updates cannot be visible
3862 	to other transactions. Therefore we can keep them out
3863 	of the read views. */
3864 
3865 	if ((mode == LOCK_IX || mode == LOCK_X)
3866 	    && !trx->read_only
3867 	    && trx->rsegs.m_redo.rseg == 0) {
3868 
3869 		trx_set_rw_mode(trx);
3870 	}
3871 
3872 	lock_mutex_enter();
3873 
3874 	DBUG_EXECUTE_IF("fatal-semaphore-timeout",
3875 		{ os_thread_sleep(3600000000LL); });
3876 
3877 	/* We have to check if the new lock is compatible with any locks
3878 	other transactions have in the table lock queue. */
3879 
3880 	wait_for = lock_table_other_has_incompatible(
3881 		trx, LOCK_WAIT, table, mode);
3882 
3883 	trx_mutex_enter(trx);
3884 
3885 	/* Another trx has a request on the table in an incompatible
3886 	mode: this trx may have to wait */
3887 
3888 	if (wait_for != NULL) {
3889 		err = lock_table_enqueue_waiting(ulint(mode) | flags, table,
3890 						 thr
3891 #ifdef WITH_WSREP
3892 						 , wait_for
3893 #endif
3894 						 );
3895 	} else {
3896 		lock_table_create(table, ulint(mode) | flags, trx);
3897 
3898 		ut_a(!flags || mode == LOCK_S || mode == LOCK_X);
3899 
3900 		err = DB_SUCCESS;
3901 	}
3902 
3903 	lock_mutex_exit();
3904 
3905 	trx_mutex_exit(trx);
3906 
3907 	return(err);
3908 }
3909 
3910 /*********************************************************************//**
3911 Creates a table IX lock object for a resurrected transaction. */
3912 void
lock_table_ix_resurrect(dict_table_t * table,trx_t * trx)3913 lock_table_ix_resurrect(
3914 /*====================*/
3915 	dict_table_t*	table,	/*!< in/out: table */
3916 	trx_t*		trx)	/*!< in/out: transaction */
3917 {
3918 	ut_ad(trx->is_recovered);
3919 
3920 	if (lock_table_has(trx, table, LOCK_IX)) {
3921 		return;
3922 	}
3923 
3924 	lock_mutex_enter();
3925 
3926 	/* We have to check if the new lock is compatible with any locks
3927 	other transactions have in the table lock queue. */
3928 
3929 	ut_ad(!lock_table_other_has_incompatible(
3930 		      trx, LOCK_WAIT, table, LOCK_IX));
3931 
3932 	trx_mutex_enter(trx);
3933 	lock_table_create(table, LOCK_IX, trx);
3934 	lock_mutex_exit();
3935 	trx_mutex_exit(trx);
3936 }
3937 
3938 /*********************************************************************//**
3939 Checks if a waiting table lock request still has to wait in a queue.
3940 @return TRUE if still has to wait */
3941 static
3942 bool
lock_table_has_to_wait_in_queue(const lock_t * wait_lock)3943 lock_table_has_to_wait_in_queue(
3944 /*============================*/
3945 	const lock_t*	wait_lock)	/*!< in: waiting table lock */
3946 {
3947 	const dict_table_t*	table;
3948 	const lock_t*		lock;
3949 
3950 	ut_ad(lock_mutex_own());
3951 	ut_ad(lock_get_wait(wait_lock));
3952 
3953 	table = wait_lock->un_member.tab_lock.table;
3954 
3955 	for (lock = UT_LIST_GET_FIRST(table->locks);
3956 	     lock != wait_lock;
3957 	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3958 
3959 		if (lock_has_to_wait(wait_lock, lock)) {
3960 
3961 			return(true);
3962 		}
3963 	}
3964 
3965 	return(false);
3966 }
3967 
3968 /*************************************************************//**
3969 Removes a table lock request, waiting or granted, from the queue and grants
3970 locks to other transactions in the queue, if they now are entitled to a
3971 lock. */
3972 static
3973 void
lock_table_dequeue(lock_t * in_lock)3974 lock_table_dequeue(
3975 /*===============*/
3976 	lock_t*	in_lock)/*!< in/out: table lock object; transactions waiting
3977 			behind will get their lock requests granted, if
3978 			they are now qualified to it */
3979 {
3980 	ut_ad(lock_mutex_own());
3981 	ut_a(lock_get_type_low(in_lock) == LOCK_TABLE);
3982 
3983 	lock_t*	lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
3984 
3985 	lock_table_remove_low(in_lock);
3986 
3987 	/* Check if waiting locks in the queue can now be granted: grant
3988 	locks if there are no conflicting locks ahead. */
3989 
3990 	for (/* No op */;
3991 	     lock != NULL;
3992 	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3993 
3994 		if (lock_get_wait(lock)
3995 		    && !lock_table_has_to_wait_in_queue(lock)) {
3996 
3997 			/* Grant the lock */
3998 			ut_ad(in_lock->trx != lock->trx);
3999 			lock_grant(lock);
4000 		}
4001 	}
4002 }
4003 
4004 /** Sets a lock on a table based on the given mode.
4005 @param[in]	table	table to lock
4006 @param[in,out]	trx	transaction
4007 @param[in]	mode	LOCK_X or LOCK_S
4008 @return error code or DB_SUCCESS. */
4009 dberr_t
lock_table_for_trx(dict_table_t * table,trx_t * trx,enum lock_mode mode)4010 lock_table_for_trx(
4011 	dict_table_t*	table,
4012 	trx_t*		trx,
4013 	enum lock_mode	mode)
4014 {
4015 	mem_heap_t*	heap;
4016 	que_thr_t*	thr;
4017 	dberr_t		err;
4018 	sel_node_t*	node;
4019 	heap = mem_heap_create(512);
4020 
4021 	node = sel_node_create(heap);
4022 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
4023 	thr->graph->state = QUE_FORK_ACTIVE;
4024 
4025 	/* We use the select query graph as the dummy graph needed
4026 	in the lock module call */
4027 
4028 	thr = static_cast<que_thr_t*>(
4029 		que_fork_get_first_thr(
4030 			static_cast<que_fork_t*>(que_node_get_parent(thr))));
4031 
4032 	que_thr_move_to_run_state_for_mysql(thr, trx);
4033 
4034 run_again:
4035 	thr->run_node = thr;
4036 	thr->prev_node = thr->common.parent;
4037 
4038 	err = lock_table(0, table, mode, thr);
4039 
4040 	trx->error_state = err;
4041 
4042 	if (UNIV_LIKELY(err == DB_SUCCESS)) {
4043 		que_thr_stop_for_mysql_no_error(thr, trx);
4044 	} else {
4045 		que_thr_stop_for_mysql(thr);
4046 
4047 		if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
4048 			goto run_again;
4049 		}
4050 	}
4051 
4052 	que_graph_free(thr->graph);
4053 	trx->op_info = "";
4054 
4055 	return(err);
4056 }
4057 
4058 /*=========================== LOCK RELEASE ==============================*/
4059 static
4060 void
lock_grant_and_move_on_rec(hash_table_t * lock_hash,lock_t * first_lock,ulint heap_no)4061 lock_grant_and_move_on_rec(
4062 	hash_table_t*	lock_hash,
4063 	lock_t*			first_lock,
4064 	ulint			heap_no)
4065 {
4066 	lock_t*		lock;
4067 	lock_t*		previous;
4068 	ulint		space;
4069 	ulint		page_no;
4070 	ulint		rec_fold;
4071 
4072 	space = first_lock->un_member.rec_lock.space;
4073 	page_no = first_lock->un_member.rec_lock.page_no;
4074 	rec_fold = lock_rec_fold(space, page_no);
4075 
4076 	previous = (lock_t *) hash_get_nth_cell(lock_hash,
4077 							hash_calc_hash(rec_fold, lock_hash))->node;
4078 	if (previous == NULL) {
4079 		return;
4080 	}
4081 	if (previous == first_lock) {
4082 		lock = previous;
4083 	} else {
4084 		while (previous->hash &&
4085 				previous->hash != first_lock) {
4086 			previous = previous->hash;
4087 	    }
4088 		lock = previous->hash;
4089 	}
4090 	/* Grant locks if there are no conflicting locks ahead.
4091 	 Move granted locks to the head of the list. */
4092 	while (lock) {
4093 		ut_ad(!lock->trx->is_wsrep());
4094 		/* If the lock is a wait lock on this page, and it does not need to wait. */
4095 		if (lock->un_member.rec_lock.space == space
4096 			&& lock->un_member.rec_lock.page_no == page_no
4097 			&& lock_rec_get_nth_bit(lock, heap_no)
4098 			&& lock_get_wait(lock)
4099 			&& !lock_rec_has_to_wait_in_queue(lock)) {
4100 
4101 			lock_grant(lock);
4102 
4103 			if (previous != NULL) {
4104 				/* Move the lock to the head of the list. */
4105 				HASH_GET_NEXT(hash, previous) = HASH_GET_NEXT(hash, lock);
4106 				lock_rec_insert_to_head(lock, rec_fold);
4107 			} else {
4108 				/* Already at the head of the list. */
4109 				previous = lock;
4110 			}
4111 			/* Move on to the next lock. */
4112 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, previous));
4113 		} else {
4114 			previous = lock;
4115 			lock = static_cast<lock_t *>(HASH_GET_NEXT(hash, lock));
4116 		}
4117 	}
4118 }
4119 
4120 /*************************************************************//**
4121 Removes a granted record lock of a transaction from the queue and grants
4122 locks to other transactions waiting in the queue if they now are entitled
4123 to a lock. */
4124 void
lock_rec_unlock(trx_t * trx,const buf_block_t * block,const rec_t * rec,lock_mode lock_mode)4125 lock_rec_unlock(
4126 /*============*/
4127 	trx_t*			trx,	/*!< in/out: transaction that has
4128 					set a record lock */
4129 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
4130 	const rec_t*		rec,	/*!< in: record */
4131 	lock_mode		lock_mode)/*!< in: LOCK_S or LOCK_X */
4132 {
4133 	lock_t*		first_lock;
4134 	lock_t*		lock;
4135 	ulint		heap_no;
4136 
4137 	ut_ad(trx);
4138 	ut_ad(rec);
4139 	ut_ad(block->frame == page_align(rec));
4140 	ut_ad(!trx->lock.wait_lock);
4141 	ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
4142 	ut_ad(!page_rec_is_metadata(rec));
4143 
4144 	heap_no = page_rec_get_heap_no(rec);
4145 
4146 	lock_mutex_enter();
4147 	trx_mutex_enter(trx);
4148 
4149 	first_lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
4150 
4151 	/* Find the last lock with the same lock_mode and transaction
4152 	on the record. */
4153 
4154 	for (lock = first_lock; lock != NULL;
4155 	     lock = lock_rec_get_next(heap_no, lock)) {
4156 		if (lock->trx == trx && lock_get_mode(lock) == lock_mode) {
4157 			goto released;
4158 		}
4159 	}
4160 
4161 	lock_mutex_exit();
4162 	trx_mutex_exit(trx);
4163 
4164 	{
4165 		ib::error	err;
4166 		err << "Unlock row could not find a " << lock_mode
4167 			<< " mode lock on the record. Current statement: ";
4168 		size_t		stmt_len;
4169 		if (const char* stmt = innobase_get_stmt_unsafe(
4170 			    trx->mysql_thd, &stmt_len)) {
4171 			err.write(stmt, stmt_len);
4172 		}
4173 	}
4174 
4175 	return;
4176 
4177 released:
4178 	ut_a(!lock_get_wait(lock));
4179 	lock_rec_reset_nth_bit(lock, heap_no);
4180 
4181 	if (innodb_lock_schedule_algorithm
4182 		== INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
4183 		thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
4184 
4185 		/* Check if we can now grant waiting lock requests */
4186 
4187 		for (lock = first_lock; lock != NULL;
4188 			 lock = lock_rec_get_next(heap_no, lock)) {
4189 			if (!lock_get_wait(lock)) {
4190 				continue;
4191 			}
4192 			const lock_t* c = lock_rec_has_to_wait_in_queue(lock);
4193 			if (!c) {
4194 				/* Grant the lock */
4195 				ut_ad(trx != lock->trx);
4196 				lock_grant(lock);
4197 			}
4198 		}
4199 	} else {
4200 		lock_grant_and_move_on_rec(lock_sys.rec_hash, first_lock, heap_no);
4201 	}
4202 
4203 	lock_mutex_exit();
4204 	trx_mutex_exit(trx);
4205 }
4206 
4207 #ifdef UNIV_DEBUG
4208 /*********************************************************************//**
4209 Check if a transaction that has X or IX locks has set the dict_op
4210 code correctly. */
4211 static
4212 void
lock_check_dict_lock(const lock_t * lock)4213 lock_check_dict_lock(
4214 /*==================*/
4215 	const lock_t*	lock)	/*!< in: lock to check */
4216 {
4217 	if (lock_get_type_low(lock) == LOCK_REC) {
4218 
4219 		/* Check if the transcation locked a record
4220 		in a system table in X mode. It should have set
4221 		the dict_op code correctly if it did. */
4222 		if (lock->index->table->id < DICT_HDR_FIRST_ID
4223 		    && lock_get_mode(lock) == LOCK_X) {
4224 
4225 			ut_ad(lock_get_mode(lock) != LOCK_IX);
4226 			ut_ad(lock->trx->dict_operation != TRX_DICT_OP_NONE);
4227 		}
4228 	} else {
4229 		ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
4230 
4231 		const dict_table_t*	table;
4232 
4233 		table = lock->un_member.tab_lock.table;
4234 
4235 		/* Check if the transcation locked a system table
4236 		in IX mode. It should have set the dict_op code
4237 		correctly if it did. */
4238 		if (table->id < DICT_HDR_FIRST_ID
4239 		    && (lock_get_mode(lock) == LOCK_X
4240 			|| lock_get_mode(lock) == LOCK_IX)) {
4241 
4242 			ut_ad(lock->trx->dict_operation != TRX_DICT_OP_NONE);
4243 		}
4244 	}
4245 }
4246 #endif /* UNIV_DEBUG */
4247 
4248 /** Release the explicit locks of a committing transaction,
4249 and release possible other transactions waiting because of these locks. */
lock_release(trx_t * trx)4250 void lock_release(trx_t* trx)
4251 {
4252 #ifdef UNIV_DEBUG
4253 	std::set<table_id_t> to_evict;
4254 	if (innodb_evict_tables_on_commit_debug && !trx->is_recovered)
4255 # if 1 /* if dict_stats_exec_sql() were not playing dirty tricks */
4256 	if (!mutex_own(&dict_sys->mutex))
4257 # else /* this would be more proper way to do it */
4258 	if (!trx->dict_operation_lock_mode && !trx->dict_operation)
4259 # endif
4260 	for (trx_mod_tables_t::const_iterator it= trx->mod_tables.begin();
4261 	     it != trx->mod_tables.end(); ++it)
4262 		if (!it->first->is_temporary())
4263 			to_evict.insert(it->first->id);
4264 #endif
4265 	ulint		count = 0;
4266 	trx_id_t	max_trx_id = trx_sys.get_max_trx_id();
4267 
4268 	lock_mutex_enter();
4269 	ut_ad(!trx_mutex_own(trx));
4270 
4271 	for (lock_t* lock = UT_LIST_GET_LAST(trx->lock.trx_locks);
4272 	     lock != NULL;
4273 	     lock = UT_LIST_GET_LAST(trx->lock.trx_locks)) {
4274 
4275 		ut_d(lock_check_dict_lock(lock));
4276 
4277 		if (lock_get_type_low(lock) == LOCK_REC) {
4278 
4279 			lock_rec_dequeue_from_page(lock);
4280 		} else {
4281 			dict_table_t*	table;
4282 
4283 			table = lock->un_member.tab_lock.table;
4284 
4285 			if (lock_get_mode(lock) != LOCK_IS
4286 			    && trx->undo_no != 0) {
4287 
4288 				/* The trx may have modified the table. We
4289 				block the use of the MySQL query cache for
4290 				all currently active transactions. */
4291 
4292 				table->query_cache_inv_trx_id = max_trx_id;
4293 			}
4294 
4295 			lock_table_dequeue(lock);
4296 		}
4297 
4298 		if (count == LOCK_RELEASE_INTERVAL) {
4299 			/* Release the  mutex for a while, so that we
4300 			do not monopolize it */
4301 
4302 			lock_mutex_exit();
4303 
4304 			lock_mutex_enter();
4305 
4306 			count = 0;
4307 		}
4308 
4309 		++count;
4310 	}
4311 
4312 	lock_mutex_exit();
4313 
4314 #ifdef UNIV_DEBUG
4315 	if (to_evict.empty()) {
4316 		return;
4317 	}
4318 	mutex_enter(&dict_sys->mutex);
4319 	lock_mutex_enter();
4320 	for (std::set<table_id_t>::const_iterator i = to_evict.begin();
4321 	     i != to_evict.end(); ++i) {
4322 		if (dict_table_t *table = dict_table_open_on_id(
4323 			    *i, TRUE, DICT_TABLE_OP_OPEN_ONLY_IF_CACHED)) {
4324 			if (!table->get_ref_count()
4325 			    && !UT_LIST_GET_LEN(table->locks)) {
4326 				dict_table_remove_from_cache_low(table, true);
4327 			}
4328 		}
4329 	}
4330 	lock_mutex_exit();
4331 	mutex_exit(&dict_sys->mutex);
4332 #endif
4333 }
4334 
4335 /* True if a lock mode is S or X */
4336 #define IS_LOCK_S_OR_X(lock) \
4337 	(lock_get_mode(lock) == LOCK_S \
4338 	 || lock_get_mode(lock) == LOCK_X)
4339 
4340 /*********************************************************************//**
4341 Removes table locks of the transaction on a table to be dropped. */
4342 static
4343 void
lock_trx_table_locks_remove(const lock_t * lock_to_remove)4344 lock_trx_table_locks_remove(
4345 /*========================*/
4346 	const lock_t*	lock_to_remove)		/*!< in: lock to remove */
4347 {
4348 	trx_t*		trx = lock_to_remove->trx;
4349 
4350 	ut_ad(lock_mutex_own());
4351 
4352 	/* It is safe to read this because we are holding the lock mutex */
4353 	if (!trx->lock.cancel) {
4354 		trx_mutex_enter(trx);
4355 	} else {
4356 		ut_ad(trx_mutex_own(trx));
4357 	}
4358 
4359 	for (lock_list::iterator it = trx->lock.table_locks.begin(),
4360              end = trx->lock.table_locks.end(); it != end; ++it) {
4361 		const lock_t*	lock = *it;
4362 
4363 		ut_ad(!lock || trx == lock->trx);
4364 		ut_ad(!lock || lock_get_type_low(lock) & LOCK_TABLE);
4365 		ut_ad(!lock || lock->un_member.tab_lock.table);
4366 
4367 		if (lock == lock_to_remove) {
4368 			*it = NULL;
4369 
4370 			if (!trx->lock.cancel) {
4371 				trx_mutex_exit(trx);
4372 			}
4373 
4374 			return;
4375 		}
4376 	}
4377 
4378 	if (!trx->lock.cancel) {
4379 		trx_mutex_exit(trx);
4380 	}
4381 
4382 	/* Lock must exist in the vector. */
4383 	ut_error;
4384 }
4385 
4386 /*===================== VALIDATION AND DEBUGGING ====================*/
4387 
4388 /** Print info of a table lock.
4389 @param[in,out]	file	output stream
4390 @param[in]	lock	table lock */
4391 static
4392 void
lock_table_print(FILE * file,const lock_t * lock)4393 lock_table_print(FILE* file, const lock_t* lock)
4394 {
4395 	ut_ad(lock_mutex_own());
4396 	ut_a(lock_get_type_low(lock) == LOCK_TABLE);
4397 
4398 	fputs("TABLE LOCK table ", file);
4399 	ut_print_name(file, lock->trx,
4400 		      lock->un_member.tab_lock.table->name.m_name);
4401 	fprintf(file, " trx id " TRX_ID_FMT, trx_get_id_for_print(lock->trx));
4402 
4403 	if (lock_get_mode(lock) == LOCK_S) {
4404 		fputs(" lock mode S", file);
4405 	} else if (lock_get_mode(lock) == LOCK_X) {
4406 		ut_ad(lock->trx->id != 0);
4407 		fputs(" lock mode X", file);
4408 	} else if (lock_get_mode(lock) == LOCK_IS) {
4409 		fputs(" lock mode IS", file);
4410 	} else if (lock_get_mode(lock) == LOCK_IX) {
4411 		ut_ad(lock->trx->id != 0);
4412 		fputs(" lock mode IX", file);
4413 	} else if (lock_get_mode(lock) == LOCK_AUTO_INC) {
4414 		fputs(" lock mode AUTO-INC", file);
4415 	} else {
4416 		fprintf(file, " unknown lock mode %lu",
4417 			(ulong) lock_get_mode(lock));
4418 	}
4419 
4420 	if (lock_get_wait(lock)) {
4421 		fputs(" waiting", file);
4422 	}
4423 
4424 	putc('\n', file);
4425 }
4426 
4427 /** Pretty-print a record lock.
4428 @param[in,out]	file	output stream
4429 @param[in]	lock	record lock
4430 @param[in,out]	mtr	mini-transaction for accessing the record */
lock_rec_print(FILE * file,const lock_t * lock,mtr_t & mtr)4431 static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr)
4432 {
4433 	ulint			space;
4434 	ulint			page_no;
4435 
4436 	ut_ad(lock_mutex_own());
4437 	ut_a(lock_get_type_low(lock) == LOCK_REC);
4438 
4439 	space = lock->un_member.rec_lock.space;
4440 	page_no = lock->un_member.rec_lock.page_no;
4441 
4442 	fprintf(file, "RECORD LOCKS space id %lu page no %lu n bits %lu "
4443 		"index %s of table ",
4444 		(ulong) space, (ulong) page_no,
4445 		(ulong) lock_rec_get_n_bits(lock),
4446 		lock->index->name());
4447 	ut_print_name(file, lock->trx, lock->index->table->name.m_name);
4448 	fprintf(file, " trx id " TRX_ID_FMT, trx_get_id_for_print(lock->trx));
4449 
4450 	if (lock_get_mode(lock) == LOCK_S) {
4451 		fputs(" lock mode S", file);
4452 	} else if (lock_get_mode(lock) == LOCK_X) {
4453 		fputs(" lock_mode X", file);
4454 	} else {
4455 		ut_error;
4456 	}
4457 
4458 	if (lock_rec_get_gap(lock)) {
4459 		fputs(" locks gap before rec", file);
4460 	}
4461 
4462 	if (lock_rec_get_rec_not_gap(lock)) {
4463 		fputs(" locks rec but not gap", file);
4464 	}
4465 
4466 	if (lock_rec_get_insert_intention(lock)) {
4467 		fputs(" insert intention", file);
4468 	}
4469 
4470 	if (lock_get_wait(lock)) {
4471 		fputs(" waiting", file);
4472 	}
4473 
4474 	putc('\n', file);
4475 
4476 	mem_heap_t*		heap		= NULL;
4477 	rec_offs		offsets_[REC_OFFS_NORMAL_SIZE];
4478 	rec_offs*		offsets		= offsets_;
4479 	rec_offs_init(offsets_);
4480 
4481 	mtr.start();
4482 	const buf_block_t* block = buf_page_try_get(page_id_t(space, page_no),
4483 						    &mtr);
4484 
4485 	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
4486 
4487 		if (!lock_rec_get_nth_bit(lock, i)) {
4488 			continue;
4489 		}
4490 
4491 		fprintf(file, "Record lock, heap no %lu", (ulong) i);
4492 
4493 		if (block) {
4494 			ut_ad(page_is_leaf(block->frame));
4495 			const rec_t*	rec;
4496 
4497 			rec = page_find_rec_with_heap_no(
4498 				buf_block_get_frame(block), i);
4499 			ut_ad(!page_rec_is_metadata(rec));
4500 
4501 			offsets = rec_get_offsets(
4502 				rec, lock->index, offsets,
4503 				lock->index->n_core_fields,
4504 				ULINT_UNDEFINED, &heap);
4505 
4506 			putc(' ', file);
4507 			rec_print_new(file, rec, offsets);
4508 		}
4509 
4510 		putc('\n', file);
4511 	}
4512 
4513 	mtr.commit();
4514 
4515 	if (UNIV_LIKELY_NULL(heap)) {
4516 		mem_heap_free(heap);
4517 	}
4518 }
4519 
4520 #ifdef UNIV_DEBUG
4521 /* Print the number of lock structs from lock_print_info_summary() only
4522 in non-production builds for performance reasons, see
4523 http://bugs.mysql.com/36942 */
4524 #define PRINT_NUM_OF_LOCK_STRUCTS
4525 #endif /* UNIV_DEBUG */
4526 
4527 #ifdef PRINT_NUM_OF_LOCK_STRUCTS
4528 /*********************************************************************//**
4529 Calculates the number of record lock structs in the record lock hash table.
4530 @return number of record locks */
4531 static
4532 ulint
lock_get_n_rec_locks(void)4533 lock_get_n_rec_locks(void)
4534 /*======================*/
4535 {
4536 	ulint	n_locks	= 0;
4537 	ulint	i;
4538 
4539 	ut_ad(lock_mutex_own());
4540 
4541 	for (i = 0; i < hash_get_n_cells(lock_sys.rec_hash); i++) {
4542 		const lock_t*	lock;
4543 
4544 		for (lock = static_cast<const lock_t*>(
4545 				HASH_GET_FIRST(lock_sys.rec_hash, i));
4546 		     lock != 0;
4547 		     lock = static_cast<const lock_t*>(
4548 				HASH_GET_NEXT(hash, lock))) {
4549 
4550 			n_locks++;
4551 		}
4552 	}
4553 
4554 	return(n_locks);
4555 }
4556 #endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4557 
4558 /*********************************************************************//**
4559 Prints info of locks for all transactions.
4560 @return FALSE if not able to obtain lock mutex
4561 and exits without printing info */
4562 ibool
lock_print_info_summary(FILE * file,ibool nowait)4563 lock_print_info_summary(
4564 /*====================*/
4565 	FILE*	file,	/*!< in: file where to print */
4566 	ibool	nowait)	/*!< in: whether to wait for the lock mutex */
4567 {
4568 	/* if nowait is FALSE, wait on the lock mutex,
4569 	otherwise return immediately if fail to obtain the
4570 	mutex. */
4571 	if (!nowait) {
4572 		lock_mutex_enter();
4573 	} else if (lock_mutex_enter_nowait()) {
4574 		fputs("FAIL TO OBTAIN LOCK MUTEX,"
4575 		      " SKIP LOCK INFO PRINTING\n", file);
4576 		return(FALSE);
4577 	}
4578 
4579 	if (lock_deadlock_found) {
4580 		fputs("------------------------\n"
4581 		      "LATEST DETECTED DEADLOCK\n"
4582 		      "------------------------\n", file);
4583 
4584 		if (!srv_read_only_mode) {
4585 			ut_copy_file(file, lock_latest_err_file);
4586 		}
4587 	}
4588 
4589 	fputs("------------\n"
4590 	      "TRANSACTIONS\n"
4591 	      "------------\n", file);
4592 
4593 	fprintf(file, "Trx id counter " TRX_ID_FMT "\n",
4594 		trx_sys.get_max_trx_id());
4595 
4596 	fprintf(file,
4597 		"Purge done for trx's n:o < " TRX_ID_FMT
4598 		" undo n:o < " TRX_ID_FMT " state: %s\n"
4599 		"History list length %u\n",
4600 		purge_sys.tail.trx_no,
4601 		purge_sys.tail.undo_no,
4602 		purge_sys.enabled()
4603 		? (purge_sys.running() ? "running"
4604 		   : purge_sys.paused() ? "stopped" : "running but idle")
4605 		: "disabled",
4606 		trx_sys.history_size());
4607 
4608 #ifdef PRINT_NUM_OF_LOCK_STRUCTS
4609 	fprintf(file,
4610 		"Total number of lock structs in row lock hash table %lu\n",
4611 		(ulong) lock_get_n_rec_locks());
4612 #endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4613 	return(TRUE);
4614 }
4615 
4616 /** Prints transaction lock wait and MVCC state.
4617 @param[in,out]	file	file where to print
4618 @param[in]	trx	transaction
4619 @param[in]	now	current time */
4620 void
lock_trx_print_wait_and_mvcc_state(FILE * file,const trx_t * trx,time_t now)4621 lock_trx_print_wait_and_mvcc_state(FILE* file, const trx_t* trx, time_t now)
4622 {
4623 	fprintf(file, "---");
4624 
4625 	trx_print_latched(file, trx, 600);
4626 
4627 	/* Note: read_view->get_state() check is race condition. But it
4628 	should "kind of work" because read_view is freed only at shutdown.
4629 	Worst thing that may happen is that it'll get transferred to
4630 	another thread and print wrong values. */
4631 
4632 	if (trx->read_view.get_state() == READ_VIEW_STATE_OPEN) {
4633 		trx->read_view.print_limits(file);
4634 	}
4635 
4636 	if (trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
4637 
4638 		fprintf(file,
4639 			"------- TRX HAS BEEN WAITING %lu SEC"
4640 			" FOR THIS LOCK TO BE GRANTED:\n",
4641 			(ulong) difftime(now, trx->lock.wait_started));
4642 
4643 		if (lock_get_type_low(trx->lock.wait_lock) == LOCK_REC) {
4644 			mtr_t mtr;
4645 			lock_rec_print(file, trx->lock.wait_lock, mtr);
4646 		} else {
4647 			lock_table_print(file, trx->lock.wait_lock);
4648 		}
4649 
4650 		fprintf(file, "------------------\n");
4651 	}
4652 }
4653 
4654 /*********************************************************************//**
4655 Prints info of locks for a transaction. */
4656 static
4657 void
lock_trx_print_locks(FILE * file,const trx_t * trx)4658 lock_trx_print_locks(
4659 /*=================*/
4660 	FILE*		file,		/*!< in/out: File to write */
4661 	const trx_t*	trx)		/*!< in: current transaction */
4662 {
4663 	mtr_t mtr;
4664 	uint32_t i= 0;
4665 	/* Iterate over the transaction's locks. */
4666 	for (lock_t *lock = UT_LIST_GET_FIRST(trx->lock.trx_locks);
4667 	     lock != NULL;
4668 	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
4669 		if (lock_get_type_low(lock) == LOCK_REC) {
4670 
4671 			lock_rec_print(file, lock, mtr);
4672 		} else {
4673 			ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
4674 
4675 			lock_table_print(file, lock);
4676 		}
4677 
4678 		if (++i == 10) {
4679 
4680 			fprintf(file,
4681 				"10 LOCKS PRINTED FOR THIS TRX:"
4682 				" SUPPRESSING FURTHER PRINTS\n");
4683 
4684 			break;
4685 		}
4686 	}
4687 }
4688 
4689 /** Functor to display all transactions */
4690 struct lock_print_info
4691 {
lock_print_infolock_print_info4692   lock_print_info(FILE* file, time_t now) :
4693     file(file), now(now),
4694     purge_trx(purge_sys.query ? purge_sys.query->trx : NULL)
4695   {}
4696 
operator ()lock_print_info4697   void operator()(const trx_t* trx) const
4698   {
4699     ut_ad(mutex_own(&trx_sys.mutex));
4700     if (UNIV_UNLIKELY(trx == purge_trx))
4701       return;
4702     lock_trx_print_wait_and_mvcc_state(file, trx, now);
4703 
4704     if (trx->will_lock && srv_print_innodb_lock_monitor)
4705       lock_trx_print_locks(file, trx);
4706   }
4707 
4708   FILE* const file;
4709   const time_t now;
4710   const trx_t* const purge_trx;
4711 };
4712 
4713 /*********************************************************************//**
4714 Prints info of locks for each transaction. This function assumes that the
4715 caller holds the lock mutex and more importantly it will release the lock
4716 mutex on behalf of the caller. (This should be fixed in the future). */
4717 void
lock_print_info_all_transactions(FILE * file)4718 lock_print_info_all_transactions(
4719 /*=============================*/
4720 	FILE*		file)	/*!< in/out: file where to print */
4721 {
4722 	ut_ad(lock_mutex_own());
4723 
4724 	fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");
4725 	const time_t now = time(NULL);
4726 
4727 	mutex_enter(&trx_sys.mutex);
4728 	ut_list_map(trx_sys.trx_list, lock_print_info(file, now));
4729 	mutex_exit(&trx_sys.mutex);
4730 	lock_mutex_exit();
4731 
4732 	ut_ad(lock_validate());
4733 }
4734 
4735 #ifdef UNIV_DEBUG
4736 /*********************************************************************//**
4737 Find the the lock in the trx_t::trx_lock_t::table_locks vector.
4738 @return true if found */
4739 static
4740 bool
lock_trx_table_locks_find(trx_t * trx,const lock_t * find_lock)4741 lock_trx_table_locks_find(
4742 /*======================*/
4743 	trx_t*		trx,		/*!< in: trx to validate */
4744 	const lock_t*	find_lock)	/*!< in: lock to find */
4745 {
4746 	bool		found = false;
4747 
4748 	ut_ad(trx_mutex_own(trx));
4749 
4750 	for (lock_list::const_iterator it = trx->lock.table_locks.begin(),
4751              end = trx->lock.table_locks.end(); it != end; ++it) {
4752 
4753 		const lock_t*	lock = *it;
4754 
4755 		if (lock == NULL) {
4756 
4757 			continue;
4758 
4759 		} else if (lock == find_lock) {
4760 
4761 			/* Can't be duplicates. */
4762 			ut_a(!found);
4763 			found = true;
4764 		}
4765 
4766 		ut_a(trx == lock->trx);
4767 		ut_a(lock_get_type_low(lock) & LOCK_TABLE);
4768 		ut_a(lock->un_member.tab_lock.table != NULL);
4769 	}
4770 
4771 	return(found);
4772 }
4773 
4774 /*********************************************************************//**
4775 Validates the lock queue on a table.
4776 @return TRUE if ok */
4777 static
4778 ibool
lock_table_queue_validate(const dict_table_t * table)4779 lock_table_queue_validate(
4780 /*======================*/
4781 	const dict_table_t*	table)	/*!< in: table */
4782 {
4783 	const lock_t*	lock;
4784 
4785 	ut_ad(lock_mutex_own());
4786 
4787 	for (lock = UT_LIST_GET_FIRST(table->locks);
4788 	     lock != NULL;
4789 	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
4790 
4791 		/* lock->trx->state cannot change from or to NOT_STARTED
4792 		while we are holding the lock_sys.mutex. It may change
4793 		from ACTIVE or PREPARED to PREPARED or COMMITTED. */
4794 		trx_mutex_enter(lock->trx);
4795 		check_trx_state(lock->trx);
4796 
4797 		if (lock->trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4798 		} else if (!lock_get_wait(lock)) {
4799 			ut_a(!lock_table_other_has_incompatible(
4800 				     lock->trx, 0, table,
4801 				     lock_get_mode(lock)));
4802 		} else {
4803 			ut_a(lock_table_has_to_wait_in_queue(lock));
4804 		}
4805 
4806 		ut_a(lock_trx_table_locks_find(lock->trx, lock));
4807 		trx_mutex_exit(lock->trx);
4808 	}
4809 
4810 	return(TRUE);
4811 }
4812 
4813 /*********************************************************************//**
4814 Validates the lock queue on a single record.
4815 @return TRUE if ok */
4816 static
4817 bool
lock_rec_queue_validate(bool locked_lock_trx_sys,const buf_block_t * block,const rec_t * rec,const dict_index_t * index,const rec_offs * offsets)4818 lock_rec_queue_validate(
4819 /*====================*/
4820 	bool			locked_lock_trx_sys,
4821 					/*!< in: if the caller holds
4822 					both the lock mutex and
4823 					trx_sys_t->lock. */
4824 	const buf_block_t*	block,	/*!< in: buffer block containing rec */
4825 	const rec_t*		rec,	/*!< in: record to look at */
4826 	const dict_index_t*	index,	/*!< in: index, or NULL if not known */
4827 	const rec_offs*		offsets)/*!< in: rec_get_offsets(rec, index) */
4828 {
4829 	const lock_t*	lock;
4830 	ulint		heap_no;
4831 
4832 	ut_a(rec);
4833 	ut_a(block->frame == page_align(rec));
4834 	ut_ad(rec_offs_validate(rec, index, offsets));
4835 	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4836 	ut_ad(page_rec_is_leaf(rec));
4837 	ut_ad(lock_mutex_own() == locked_lock_trx_sys);
4838 	ut_ad(!index || dict_index_is_clust(index)
4839 	      || !dict_index_is_online_ddl(index));
4840 
4841 	heap_no = page_rec_get_heap_no(rec);
4842 
4843 	if (!locked_lock_trx_sys) {
4844 		lock_mutex_enter();
4845 	}
4846 
4847 	if (!page_rec_is_user_rec(rec)) {
4848 
4849 		for (lock = lock_rec_get_first(lock_sys.rec_hash,
4850 					       block, heap_no);
4851 		     lock != NULL;
4852 		     lock = lock_rec_get_next_const(heap_no, lock)) {
4853 
4854 			ut_ad(!index || lock->index == index);
4855 
4856 			trx_mutex_enter(lock->trx);
4857 			ut_ad(!lock->trx->read_only
4858 			      || !lock->trx->is_autocommit_non_locking());
4859 			ut_ad(trx_state_eq(lock->trx,
4860 					   TRX_STATE_COMMITTED_IN_MEMORY)
4861 			      || !lock_get_wait(lock)
4862 			      || lock_rec_has_to_wait_in_queue(lock));
4863 			trx_mutex_exit(lock->trx);
4864 		}
4865 
4866 func_exit:
4867 		if (!locked_lock_trx_sys) {
4868 			lock_mutex_exit();
4869 		}
4870 
4871 		return true;
4872 	}
4873 
4874 	ut_ad(page_rec_is_leaf(rec));
4875 	ut_ad(lock_mutex_own());
4876 
4877 	const trx_id_t impl_trx_id = index && index->is_primary()
4878 		? lock_clust_rec_some_has_impl(rec, index, offsets)
4879 		: 0;
4880 
4881 	if (trx_t *impl_trx = impl_trx_id
4882 	    ? trx_sys.find(current_trx(), impl_trx_id, false)
4883 	    : 0) {
4884 		/* impl_trx could have been committed before we
4885 		acquire its mutex, but not thereafter. */
4886 
4887 		mutex_enter(&impl_trx->mutex);
4888 		ut_ad(impl_trx->state != TRX_STATE_NOT_STARTED);
4889 		if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4890 		} else if (const lock_t* other_lock
4891 			   = lock_rec_other_has_expl_req(
4892 				   LOCK_S, block, true, heap_no,
4893 				   impl_trx)) {
4894 			/* The impl_trx is holding an implicit lock on the
4895 			given record 'rec'. So there cannot be another
4896 			explicit granted lock.  Also, there can be another
4897 			explicit waiting lock only if the impl_trx has an
4898 			explicit granted lock. */
4899 
4900 #ifdef WITH_WSREP
4901 			/** Galera record locking rules:
4902 			* If there is no other record lock to the same record, we may grant
4903 			the lock request.
4904 			* If there is other record lock but this requested record lock is
4905 			compatible, we may grant the lock request.
4906 			* If there is other record lock and it is not compatible with
4907 			requested lock, all normal transactions must wait.
4908 			* BF (brute force) additional exceptions :
4909 			** If BF already holds record lock for requested record, we may
4910 			grant new record lock even if there is conflicting record lock(s)
4911 			waiting on a queue.
4912 			** If conflicting transaction holds requested record lock,
4913 			we will cancel this record lock and select conflicting transaction
4914 			for BF abort or kill victim.
4915 			** If conflicting transaction is waiting for requested record lock
4916 			we will cancel this wait and select conflicting transaction
4917 			for BF abort or kill victim.
4918 			** There should not be two BF transactions waiting for same record lock
4919 			*/
4920 			if (other_lock->trx->is_wsrep() && !lock_get_wait(other_lock)) {
4921 				wsrep_report_bf_lock_wait(impl_trx->mysql_thd, impl_trx->id);
4922 				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4923 
4924 				if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4925 						       block, heap_no,
4926 						       impl_trx)) {
4927 					ib::info() << "WSREP impl BF lock conflict";
4928 				}
4929 			} else
4930 #endif /* WITH_WSREP */
4931 			{
4932 				ut_ad(lock_get_wait(other_lock));
4933 				ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4934 						        block, heap_no, impl_trx));
4935 			}
4936 		}
4937 
4938 		mutex_exit(&impl_trx->mutex);
4939 	}
4940 
4941 	for (lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
4942 	     lock != NULL;
4943 	     lock = lock_rec_get_next_const(heap_no, lock)) {
4944 		ut_ad(!lock->trx->read_only
4945 		      || !lock->trx->is_autocommit_non_locking());
4946 		ut_ad(!page_rec_is_metadata(rec));
4947 
4948 		if (index) {
4949 			ut_a(lock->index == index);
4950 		}
4951 
4952 		if (!lock_rec_get_gap(lock) && !lock_get_wait(lock)) {
4953 
4954 			lock_mode	mode;
4955 
4956 			if (lock_get_mode(lock) == LOCK_S) {
4957 				mode = LOCK_X;
4958 			} else {
4959 				mode = LOCK_S;
4960 			}
4961 
4962 			const lock_t*	other_lock
4963 				= lock_rec_other_has_expl_req(
4964 					mode, block, false, heap_no,
4965 					lock->trx);
4966 #ifdef WITH_WSREP
4967 			if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
4968 				/* Only BF transaction may be granted
4969 				lock before other conflicting lock
4970 				request. */
4971 				if (!wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE)
4972 				    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
4973 					/* If no BF, this case is a bug. */
4974 					wsrep_report_bf_lock_wait(lock->trx->mysql_thd, lock->trx->id);
4975 					wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4976 					ut_error;
4977 				}
4978 			} else
4979 #endif /* WITH_WSREP */
4980 			ut_ad(!other_lock);
4981 		} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
4982 
4983 			ut_a(lock_rec_has_to_wait_in_queue(lock));
4984 		}
4985 	}
4986 
4987 	ut_ad(innodb_lock_schedule_algorithm == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
4988 		  lock_queue_validate(lock));
4989 
4990 	goto func_exit;
4991 }
4992 
4993 /*********************************************************************//**
4994 Validates the record lock queues on a page.
4995 @return TRUE if ok */
4996 static
4997 ibool
lock_rec_validate_page(const buf_block_t * block)4998 lock_rec_validate_page(
4999 /*===================*/
5000 	const buf_block_t*	block)	/*!< in: buffer block */
5001 {
5002 	const lock_t*	lock;
5003 	const rec_t*	rec;
5004 	ulint		nth_lock	= 0;
5005 	ulint		nth_bit		= 0;
5006 	ulint		i;
5007 	mem_heap_t*	heap		= NULL;
5008 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5009 	rec_offs*	offsets		= offsets_;
5010 	rec_offs_init(offsets_);
5011 
5012 	ut_ad(!lock_mutex_own());
5013 
5014 	lock_mutex_enter();
5015 loop:
5016 	lock = lock_rec_get_first_on_page_addr(
5017 		lock_sys.rec_hash,
5018 		block->page.id.space(), block->page.id.page_no());
5019 
5020 	if (!lock) {
5021 		goto function_exit;
5022 	}
5023 
5024 	ut_ad(!block->page.file_page_was_freed);
5025 
5026 	for (i = 0; i < nth_lock; i++) {
5027 
5028 		lock = lock_rec_get_next_on_page_const(lock);
5029 
5030 		if (!lock) {
5031 			goto function_exit;
5032 		}
5033 	}
5034 
5035 	ut_ad(!lock->trx->read_only
5036 	      || !lock->trx->is_autocommit_non_locking());
5037 
5038 	/* Only validate the record queues when this thread is not
5039 	holding a space->latch. */
5040 	if (!sync_check_find(SYNC_FSP))
5041 	for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
5042 
5043 		if (i == PAGE_HEAP_NO_SUPREMUM
5044 		    || lock_rec_get_nth_bit(lock, i)) {
5045 
5046 			rec = page_find_rec_with_heap_no(block->frame, i);
5047 			ut_a(rec);
5048 			ut_ad(!lock_rec_get_nth_bit(lock, i)
5049 			      || page_rec_is_leaf(rec));
5050 			offsets = rec_get_offsets(rec, lock->index, offsets,
5051 						  lock->index->n_core_fields,
5052 						  ULINT_UNDEFINED, &heap);
5053 
5054 			/* If this thread is holding the file space
5055 			latch (fil_space_t::latch), the following
5056 			check WILL break the latching order and may
5057 			cause a deadlock of threads. */
5058 
5059 			lock_rec_queue_validate(
5060 				TRUE, block, rec, lock->index, offsets);
5061 
5062 			nth_bit = i + 1;
5063 
5064 			goto loop;
5065 		}
5066 	}
5067 
5068 	nth_bit = 0;
5069 	nth_lock++;
5070 
5071 	goto loop;
5072 
5073 function_exit:
5074 	lock_mutex_exit();
5075 
5076 	if (heap != NULL) {
5077 		mem_heap_free(heap);
5078 	}
5079 	return(TRUE);
5080 }
5081 
5082 /*********************************************************************//**
5083 Validate record locks up to a limit.
5084 @return lock at limit or NULL if no more locks in the hash bucket */
5085 static MY_ATTRIBUTE((warn_unused_result))
5086 const lock_t*
lock_rec_validate(ulint start,ib_uint64_t * limit)5087 lock_rec_validate(
5088 /*==============*/
5089 	ulint		start,		/*!< in: lock_sys.rec_hash
5090 					bucket */
5091 	ib_uint64_t*	limit)		/*!< in/out: upper limit of
5092 					(space, page_no) */
5093 {
5094 	ut_ad(lock_mutex_own());
5095 
5096 	for (const lock_t* lock = static_cast<const lock_t*>(
5097 			HASH_GET_FIRST(lock_sys.rec_hash, start));
5098 	     lock != NULL;
5099 	     lock = static_cast<const lock_t*>(HASH_GET_NEXT(hash, lock))) {
5100 
5101 		ib_uint64_t	current;
5102 
5103 		ut_ad(!lock->trx->read_only
5104 		      || !lock->trx->is_autocommit_non_locking());
5105 		ut_ad(lock_get_type(lock) == LOCK_REC);
5106 
5107 		current = ut_ull_create(
5108 			lock->un_member.rec_lock.space,
5109 			lock->un_member.rec_lock.page_no);
5110 
5111 		if (current > *limit) {
5112 			*limit = current + 1;
5113 			return(lock);
5114 		}
5115 	}
5116 
5117 	return(0);
5118 }
5119 
5120 /*********************************************************************//**
5121 Validate a record lock's block */
5122 static
5123 void
lock_rec_block_validate(ulint space_id,ulint page_no)5124 lock_rec_block_validate(
5125 /*====================*/
5126 	ulint		space_id,
5127 	ulint		page_no)
5128 {
5129 	/* The lock and the block that it is referring to may be freed at
5130 	this point. We pass BUF_GET_POSSIBLY_FREED to skip a debug check.
5131 	If the lock exists in lock_rec_validate_page() we assert
5132 	!block->page.file_page_was_freed. */
5133 
5134 	buf_block_t*	block;
5135 	mtr_t		mtr;
5136 
5137 	/* Transactional locks should never refer to dropped
5138 	tablespaces, because all DDL operations that would drop or
5139 	discard or rebuild a tablespace do hold an exclusive table
5140 	lock, which would conflict with any locks referring to the
5141 	tablespace from other transactions. */
5142 	if (fil_space_t* space = fil_space_acquire(space_id)) {
5143 		dberr_t err = DB_SUCCESS;
5144 		mtr_start(&mtr);
5145 
5146 		block = buf_page_get_gen(
5147 			page_id_t(space_id, page_no),
5148 			page_size_t(space->flags),
5149 			RW_X_LATCH, NULL,
5150 			BUF_GET_POSSIBLY_FREED,
5151 			__FILE__, __LINE__, &mtr, &err);
5152 
5153 		if (err != DB_SUCCESS) {
5154 			ib::error() << "Lock rec block validate failed for tablespace "
5155 				   << space->name
5156 				   << " space_id " << space_id
5157 				   << " page_no " << page_no << " err " << err;
5158 		}
5159 
5160 		if (block) {
5161 			buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
5162 
5163 			ut_ad(lock_rec_validate_page(block));
5164 		}
5165 
5166 		mtr_commit(&mtr);
5167 
5168 		space->release();
5169 	}
5170 }
5171 
5172 
lock_validate_table_locks(rw_trx_hash_element_t * element,void *)5173 static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*)
5174 {
5175   ut_ad(lock_mutex_own());
5176   mutex_enter(&element->mutex);
5177   if (element->trx)
5178   {
5179     check_trx_state(element->trx);
5180     for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
5181          lock != NULL;
5182          lock= UT_LIST_GET_NEXT(trx_locks, lock))
5183     {
5184       if (lock_get_type_low(lock) & LOCK_TABLE)
5185         lock_table_queue_validate(lock->un_member.tab_lock.table);
5186     }
5187   }
5188   mutex_exit(&element->mutex);
5189   return 0;
5190 }
5191 
5192 
5193 /*********************************************************************//**
5194 Validates the lock system.
5195 @return TRUE if ok */
5196 static
5197 bool
lock_validate()5198 lock_validate()
5199 /*===========*/
5200 {
5201 	typedef	std::pair<ulint, ulint>		page_addr_t;
5202 	typedef std::set<
5203 		page_addr_t,
5204 		std::less<page_addr_t>,
5205 		ut_allocator<page_addr_t> >	page_addr_set;
5206 
5207 	page_addr_set	pages;
5208 
5209 	lock_mutex_enter();
5210 
5211 	/* Validate table locks */
5212 	trx_sys.rw_trx_hash.iterate(reinterpret_cast<my_hash_walk_action>
5213 				    (lock_validate_table_locks), 0);
5214 
5215 	/* Iterate over all the record locks and validate the locks. We
5216 	don't want to hog the lock_sys_t::mutex and the trx_sys_t::mutex.
5217 	Release both mutexes during the validation check. */
5218 
5219 	for (ulint i = 0; i < hash_get_n_cells(lock_sys.rec_hash); i++) {
5220 		ib_uint64_t	limit = 0;
5221 
5222 		while (const lock_t* lock = lock_rec_validate(i, &limit)) {
5223 			if (lock_rec_find_set_bit(lock) == ULINT_UNDEFINED) {
5224 				/* The lock bitmap is empty; ignore it. */
5225 				continue;
5226 			}
5227 			const lock_rec_t& l = lock->un_member.rec_lock;
5228 			pages.insert(std::make_pair(l.space, l.page_no));
5229 		}
5230 	}
5231 
5232 	lock_mutex_exit();
5233 
5234 	for (page_addr_set::const_iterator it = pages.begin();
5235 	     it != pages.end();
5236 	     ++it) {
5237 		lock_rec_block_validate((*it).first, (*it).second);
5238 	}
5239 
5240 	return(true);
5241 }
5242 #endif /* UNIV_DEBUG */
5243 /*============ RECORD LOCK CHECKS FOR ROW OPERATIONS ====================*/
5244 
5245 /*********************************************************************//**
5246 Checks if locks of other transactions prevent an immediate insert of
5247 a record. If they do, first tests if the query thread should anyway
5248 be suspended for some reason; if not, then puts the transaction and
5249 the query thread to the lock wait state and inserts a waiting request
5250 for a gap x-lock to the lock queue.
5251 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5252 dberr_t
lock_rec_insert_check_and_lock(ulint flags,const rec_t * rec,buf_block_t * block,dict_index_t * index,que_thr_t * thr,mtr_t * mtr,bool * inherit)5253 lock_rec_insert_check_and_lock(
5254 /*===========================*/
5255 	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG bit is
5256 				set, does nothing */
5257 	const rec_t*	rec,	/*!< in: record after which to insert */
5258 	buf_block_t*	block,	/*!< in/out: buffer block of rec */
5259 	dict_index_t*	index,	/*!< in: index */
5260 	que_thr_t*	thr,	/*!< in: query thread */
5261 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
5262 	bool*		inherit)/*!< out: set to true if the new
5263 				inserted record maybe should inherit
5264 				LOCK_GAP type locks from the successor
5265 				record */
5266 {
5267 	ut_ad(block->frame == page_align(rec));
5268 	ut_ad(!dict_index_is_online_ddl(index)
5269 	      || index->is_primary()
5270 	      || (flags & BTR_CREATE_FLAG));
5271 	ut_ad(mtr->is_named_space(index->table->space));
5272 	ut_ad(page_rec_is_leaf(rec));
5273 
5274 	if (flags & BTR_NO_LOCKING_FLAG) {
5275 
5276 		return(DB_SUCCESS);
5277 	}
5278 
5279 	ut_ad(!index->table->is_temporary());
5280 	ut_ad(page_is_leaf(block->frame));
5281 
5282 	dberr_t		err;
5283 	lock_t*		lock;
5284 	bool		inherit_in = *inherit;
5285 	trx_t*		trx = thr_get_trx(thr);
5286 	const rec_t*	next_rec = page_rec_get_next_const(rec);
5287 	ulint		heap_no = page_rec_get_heap_no(next_rec);
5288 	ut_ad(!rec_is_metadata(next_rec, index));
5289 
5290 	lock_mutex_enter();
5291 	/* Because this code is invoked for a running transaction by
5292 	the thread that is serving the transaction, it is not necessary
5293 	to hold trx->mutex here. */
5294 
5295 	/* When inserting a record into an index, the table must be at
5296 	least IX-locked. When we are building an index, we would pass
5297 	BTR_NO_LOCKING_FLAG and skip the locking altogether. */
5298 	ut_ad(lock_table_has(trx, index->table, LOCK_IX));
5299 
5300 	lock = lock_rec_get_first(lock_sys.rec_hash, block, heap_no);
5301 
5302 	if (lock == NULL) {
5303 		/* We optimize CPU time usage in the simplest case */
5304 
5305 		lock_mutex_exit();
5306 
5307 		if (inherit_in && !dict_index_is_clust(index)) {
5308 			/* Update the page max trx id field */
5309 			page_update_max_trx_id(block,
5310 					       buf_block_get_page_zip(block),
5311 					       trx->id, mtr);
5312 		}
5313 
5314 		*inherit = false;
5315 
5316 		return(DB_SUCCESS);
5317 	}
5318 
5319 	/* Spatial index does not use GAP lock protection. It uses
5320 	"predicate lock" to protect the "range" */
5321 	if (dict_index_is_spatial(index)) {
5322 		return(DB_SUCCESS);
5323 	}
5324 
5325 	*inherit = true;
5326 
5327 	/* If another transaction has an explicit lock request which locks
5328 	the gap, waiting or granted, on the successor, the insert has to wait.
5329 
5330 	An exception is the case where the lock by the another transaction
5331 	is a gap type lock which it placed to wait for its turn to insert. We
5332 	do not consider that kind of a lock conflicting with our insert. This
5333 	eliminates an unnecessary deadlock which resulted when 2 transactions
5334 	had to wait for their insert. Both had waiting gap type lock requests
5335 	on the successor, which produced an unnecessary deadlock. */
5336 
5337 	const ulint	type_mode = LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;
5338 
5339 	if (
5340 #ifdef WITH_WSREP
5341 	    lock_t* c_lock =
5342 #endif /* WITH_WSREP */
5343 	    lock_rec_other_has_conflicting(type_mode, block, heap_no, trx)) {
5344 		/* Note that we may get DB_SUCCESS also here! */
5345 		trx_mutex_enter(trx);
5346 
5347 		err = lock_rec_enqueue_waiting(
5348 #ifdef WITH_WSREP
5349 			c_lock,
5350 #endif /* WITH_WSREP */
5351 			type_mode, block, heap_no, index, thr, NULL);
5352 
5353 		trx_mutex_exit(trx);
5354 	} else {
5355 		err = DB_SUCCESS;
5356 	}
5357 
5358 	lock_mutex_exit();
5359 
5360 	switch (err) {
5361 	case DB_SUCCESS_LOCKED_REC:
5362 		err = DB_SUCCESS;
5363 		/* fall through */
5364 	case DB_SUCCESS:
5365 		if (!inherit_in || dict_index_is_clust(index)) {
5366 			break;
5367 		}
5368 
5369 		/* Update the page max trx id field */
5370 		page_update_max_trx_id(
5371 			block, buf_block_get_page_zip(block), trx->id, mtr);
5372 	default:
5373 		/* We only care about the two return values. */
5374 		break;
5375 	}
5376 
5377 #ifdef UNIV_DEBUG
5378 	{
5379 		mem_heap_t*	heap		= NULL;
5380 		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5381 		const rec_offs*	offsets;
5382 		rec_offs_init(offsets_);
5383 
5384 		offsets = rec_get_offsets(next_rec, index, offsets_,
5385 					  index->n_core_fields,
5386 					  ULINT_UNDEFINED, &heap);
5387 
5388 		ut_ad(lock_rec_queue_validate(
5389 				FALSE, block, next_rec, index, offsets));
5390 
5391 		if (heap != NULL) {
5392 			mem_heap_free(heap);
5393 		}
5394 	}
5395 #endif /* UNIV_DEBUG */
5396 
5397 	return(err);
5398 }
5399 
5400 /*********************************************************************//**
5401 Creates an explicit record lock for a running transaction that currently only
5402 has an implicit lock on the record. The transaction instance must have a
5403 reference count > 0 so that it can't be committed and freed before this
5404 function has completed. */
5405 static
5406 void
lock_rec_convert_impl_to_expl_for_trx(const buf_block_t * block,const rec_t * rec,dict_index_t * index,trx_t * trx,ulint heap_no)5407 lock_rec_convert_impl_to_expl_for_trx(
5408 /*==================================*/
5409 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5410 	const rec_t*		rec,	/*!< in: user record on page */
5411 	dict_index_t*		index,	/*!< in: index of record */
5412 	trx_t*			trx,	/*!< in/out: active transaction */
5413 	ulint			heap_no)/*!< in: rec heap number to lock */
5414 {
5415 	ut_ad(trx->is_referenced());
5416 	ut_ad(page_rec_is_leaf(rec));
5417 	ut_ad(!rec_is_metadata(rec, index));
5418 
5419 	DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx");
5420 	lock_mutex_enter();
5421 	trx_mutex_enter(trx);
5422 	ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
5423 
5424 	if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)
5425 	    && !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
5426 				  block, heap_no, trx)) {
5427 		lock_rec_add_to_queue(LOCK_REC | LOCK_X | LOCK_REC_NOT_GAP,
5428 				      block, heap_no, index, trx, true);
5429 	}
5430 
5431 	lock_mutex_exit();
5432 	trx_mutex_exit(trx);
5433 	trx->release_reference();
5434 
5435 	DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx");
5436 }
5437 
5438 
5439 #ifdef UNIV_DEBUG
5440 struct lock_rec_other_trx_holds_expl_arg
5441 {
5442   const ulint heap_no;
5443   const buf_block_t * const block;
5444   const trx_t *impl_trx;
5445 };
5446 
5447 
lock_rec_other_trx_holds_expl_callback(rw_trx_hash_element_t * element,lock_rec_other_trx_holds_expl_arg * arg)5448 static my_bool lock_rec_other_trx_holds_expl_callback(
5449   rw_trx_hash_element_t *element,
5450   lock_rec_other_trx_holds_expl_arg *arg)
5451 {
5452   mutex_enter(&element->mutex);
5453   if (element->trx)
5454   {
5455     trx_mutex_enter(element->trx);
5456     ut_ad(element->trx->state != TRX_STATE_NOT_STARTED);
5457     lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
5458       ? NULL : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP, arg->block,
5459                                  arg->heap_no, element->trx);
5460     /*
5461       An explicit lock is held by trx other than the trx holding the implicit
5462       lock.
5463     */
5464     ut_ad(!expl_lock || expl_lock->trx == arg->impl_trx);
5465     trx_mutex_exit(element->trx);
5466   }
5467   mutex_exit(&element->mutex);
5468   return 0;
5469 }
5470 
5471 
5472 /**
5473   Checks if some transaction, other than given trx_id, has an explicit
5474   lock on the given rec.
5475 
5476   FIXME: if the current transaction holds implicit lock from INSERT, a
5477   subsequent locking read should not convert it to explicit. See also
5478   MDEV-11215.
5479 
5480   @param      caller_trx  trx of current thread
5481   @param[in]  trx         trx holding implicit lock on rec
5482   @param[in]  rec         user record
5483   @param[in]  block       buffer block containing the record
5484 */
5485 
lock_rec_other_trx_holds_expl(trx_t * caller_trx,trx_t * trx,const rec_t * rec,const buf_block_t * block)5486 static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
5487                                           const rec_t *rec,
5488                                           const buf_block_t *block)
5489 {
5490   if (trx)
5491   {
5492     ut_ad(!page_rec_is_metadata(rec));
5493     lock_mutex_enter();
5494     ut_ad(trx->is_referenced());
5495     trx_mutex_enter(trx);
5496     const trx_state_t state = trx->state;
5497     trx_mutex_exit(trx);
5498     ut_ad(state != TRX_STATE_NOT_STARTED);
5499     if (state == TRX_STATE_COMMITTED_IN_MEMORY)
5500     {
5501       /* The transaction was committed before our lock_mutex_enter(). */
5502       lock_mutex_exit();
5503       return;
5504     }
5505     lock_rec_other_trx_holds_expl_arg arg= { page_rec_get_heap_no(rec), block,
5506                                              trx };
5507     trx_sys.rw_trx_hash.iterate(caller_trx,
5508                                 reinterpret_cast<my_hash_walk_action>
5509                                 (lock_rec_other_trx_holds_expl_callback),
5510                                 &arg);
5511     lock_mutex_exit();
5512   }
5513 }
5514 #endif /* UNIV_DEBUG */
5515 
5516 
5517 /** If an implicit x-lock exists on a record, convert it to an explicit one.
5518 
5519 Often, this is called by a transaction that is about to enter a lock wait
5520 due to the lock conflict. Two explicit locks would be created: first the
5521 exclusive lock on behalf of the lock-holder transaction in this function,
5522 and then a wait request on behalf of caller_trx, in the calling function.
5523 
5524 This may also be called by the same transaction that is already holding
5525 an implicit exclusive lock on the record. In this case, no explicit lock
5526 should be created.
5527 
5528 @param[in,out]	caller_trx	current transaction
5529 @param[in]	block		index tree leaf page
5530 @param[in]	rec		record on the leaf page
5531 @param[in]	index		the index of the record
5532 @param[in]	offsets		rec_get_offsets(rec,index)
5533 @return	whether caller_trx already holds an exclusive lock on rec */
5534 static
5535 bool
lock_rec_convert_impl_to_expl(trx_t * caller_trx,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets)5536 lock_rec_convert_impl_to_expl(
5537 	trx_t*			caller_trx,
5538 	const buf_block_t*	block,
5539 	const rec_t*		rec,
5540 	dict_index_t*		index,
5541 	const rec_offs*		offsets)
5542 {
5543 	trx_t*		trx;
5544 
5545 	ut_ad(!lock_mutex_own());
5546 	ut_ad(page_rec_is_user_rec(rec));
5547 	ut_ad(rec_offs_validate(rec, index, offsets));
5548 	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
5549 	ut_ad(page_rec_is_leaf(rec));
5550 	ut_ad(!rec_is_metadata(rec, index));
5551 
5552 	if (dict_index_is_clust(index)) {
5553 		trx_id_t	trx_id;
5554 
5555 		trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);
5556 
5557 		if (trx_id == 0) {
5558 			return false;
5559 		}
5560 		if (UNIV_UNLIKELY(trx_id == caller_trx->id)) {
5561 			return true;
5562 		}
5563 
5564 		trx = trx_sys.find(caller_trx, trx_id);
5565 	} else {
5566 		ut_ad(!dict_index_is_online_ddl(index));
5567 
5568 		trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
5569 						 offsets);
5570 		if (trx == caller_trx) {
5571 			trx->release_reference();
5572 			return true;
5573 		}
5574 
5575 		ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec,
5576 						   block));
5577 	}
5578 
5579 	if (trx != 0) {
5580 		ulint	heap_no = page_rec_get_heap_no(rec);
5581 
5582 		ut_ad(trx->is_referenced());
5583 
5584 		/* If the transaction is still active and has no
5585 		explicit x-lock set on the record, set one for it.
5586 		trx cannot be committed until the ref count is zero. */
5587 
5588 		lock_rec_convert_impl_to_expl_for_trx(
5589 			block, rec, index, trx, heap_no);
5590 	}
5591 
5592 	return false;
5593 }
5594 
5595 /*********************************************************************//**
5596 Checks if locks of other transactions prevent an immediate modify (update,
5597 delete mark, or delete unmark) of a clustered index record. If they do,
5598 first tests if the query thread should anyway be suspended for some
5599 reason; if not, then puts the transaction and the query thread to the
5600 lock wait state and inserts a waiting request for a record x-lock to the
5601 lock queue.
5602 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5603 dberr_t
lock_clust_rec_modify_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,que_thr_t * thr)5604 lock_clust_rec_modify_check_and_lock(
5605 /*=================================*/
5606 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5607 					bit is set, does nothing */
5608 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5609 	const rec_t*		rec,	/*!< in: record which should be
5610 					modified */
5611 	dict_index_t*		index,	/*!< in: clustered index */
5612 	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5613 	que_thr_t*		thr)	/*!< in: query thread */
5614 {
5615 	dberr_t	err;
5616 	ulint	heap_no;
5617 
5618 	ut_ad(rec_offs_validate(rec, index, offsets));
5619 	ut_ad(page_rec_is_leaf(rec));
5620 	ut_ad(dict_index_is_clust(index));
5621 	ut_ad(block->frame == page_align(rec));
5622 
5623 	if (flags & BTR_NO_LOCKING_FLAG) {
5624 
5625 		return(DB_SUCCESS);
5626 	}
5627 	ut_ad(!rec_is_metadata(rec, index));
5628 	ut_ad(!index->table->is_temporary());
5629 
5630 	heap_no = rec_offs_comp(offsets)
5631 		? rec_get_heap_no_new(rec)
5632 		: rec_get_heap_no_old(rec);
5633 
5634 	/* If a transaction has no explicit x-lock set on the record, set one
5635 	for it */
5636 
5637 	if (lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec, index,
5638 					  offsets)) {
5639 		/* We already hold an implicit exclusive lock. */
5640 		return DB_SUCCESS;
5641 	}
5642 
5643 	err = lock_rec_lock(TRUE, LOCK_X | LOCK_REC_NOT_GAP,
5644 			    block, heap_no, index, thr);
5645 
5646 	ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5647 
5648 	if (err == DB_SUCCESS_LOCKED_REC) {
5649 		err = DB_SUCCESS;
5650 	}
5651 
5652 	return(err);
5653 }
5654 
5655 /*********************************************************************//**
5656 Checks if locks of other transactions prevent an immediate modify (delete
5657 mark or delete unmark) of a secondary index record.
5658 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5659 dberr_t
lock_sec_rec_modify_check_and_lock(ulint flags,buf_block_t * block,const rec_t * rec,dict_index_t * index,que_thr_t * thr,mtr_t * mtr)5660 lock_sec_rec_modify_check_and_lock(
5661 /*===============================*/
5662 	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5663 				bit is set, does nothing */
5664 	buf_block_t*	block,	/*!< in/out: buffer block of rec */
5665 	const rec_t*	rec,	/*!< in: record which should be
5666 				modified; NOTE: as this is a secondary
5667 				index, we always have to modify the
5668 				clustered index record first: see the
5669 				comment below */
5670 	dict_index_t*	index,	/*!< in: secondary index */
5671 	que_thr_t*	thr,	/*!< in: query thread
5672 				(can be NULL if BTR_NO_LOCKING_FLAG) */
5673 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
5674 {
5675 	dberr_t	err;
5676 	ulint	heap_no;
5677 
5678 	ut_ad(!dict_index_is_clust(index));
5679 	ut_ad(!dict_index_is_online_ddl(index) || (flags & BTR_CREATE_FLAG));
5680 	ut_ad(block->frame == page_align(rec));
5681 	ut_ad(mtr->is_named_space(index->table->space));
5682 	ut_ad(page_rec_is_leaf(rec));
5683 	ut_ad(!rec_is_metadata(rec, index));
5684 
5685 	if (flags & BTR_NO_LOCKING_FLAG) {
5686 
5687 		return(DB_SUCCESS);
5688 	}
5689 	ut_ad(!index->table->is_temporary());
5690 
5691 	heap_no = page_rec_get_heap_no(rec);
5692 
5693 #ifdef WITH_WSREP
5694 	trx_t *trx= thr_get_trx(thr);
5695 	/* If transaction scanning an unique secondary key is wsrep
5696 	high priority thread (brute force) this scanning may involve
5697 	GAP-locking in the index. As this locking happens also when
5698 	applying replication events in high priority applier threads,
5699 	there is a probability for lock conflicts between two wsrep
5700 	high priority threads. To avoid this GAP-locking we mark that
5701 	this transaction is using unique key scan here. */
5702 	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
5703 		trx->wsrep_UK_scan= true;
5704 #endif /* WITH_WSREP */
5705 
5706 	/* Another transaction cannot have an implicit lock on the record,
5707 	because when we come here, we already have modified the clustered
5708 	index record, and this would not have been possible if another active
5709 	transaction had modified this secondary index record. */
5710 
5711 	err = lock_rec_lock(TRUE, LOCK_X | LOCK_REC_NOT_GAP,
5712 			    block, heap_no, index, thr);
5713 
5714 #ifdef WITH_WSREP
5715 	trx->wsrep_UK_scan= false;
5716 #endif /* WITH_WSREP */
5717 
5718 #ifdef UNIV_DEBUG
5719 	{
5720 		mem_heap_t*	heap		= NULL;
5721 		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5722 		const rec_offs*	offsets;
5723 		rec_offs_init(offsets_);
5724 
5725 		offsets = rec_get_offsets(rec, index, offsets_,
5726 					  index->n_core_fields,
5727 					  ULINT_UNDEFINED, &heap);
5728 
5729 		ut_ad(lock_rec_queue_validate(
5730 			FALSE, block, rec, index, offsets));
5731 
5732 		if (heap != NULL) {
5733 			mem_heap_free(heap);
5734 		}
5735 	}
5736 #endif /* UNIV_DEBUG */
5737 
5738 	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
5739 		/* Update the page max trx id field */
5740 		/* It might not be necessary to do this if
5741 		err == DB_SUCCESS (no new lock created),
5742 		but it should not cost too much performance. */
5743 		page_update_max_trx_id(block,
5744 				       buf_block_get_page_zip(block),
5745 				       thr_get_trx(thr)->id, mtr);
5746 		err = DB_SUCCESS;
5747 	}
5748 
5749 	return(err);
5750 }
5751 
5752 /*********************************************************************//**
5753 Like lock_clust_rec_read_check_and_lock(), but reads a
5754 secondary index record.
5755 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5756 dberr_t
lock_sec_rec_read_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,lock_mode mode,ulint gap_mode,que_thr_t * thr)5757 lock_sec_rec_read_check_and_lock(
5758 /*=============================*/
5759 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5760 					bit is set, does nothing */
5761 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5762 	const rec_t*		rec,	/*!< in: user record or page
5763 					supremum record which should
5764 					be read or passed over by a
5765 					read cursor */
5766 	dict_index_t*		index,	/*!< in: secondary index */
5767 	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5768 	lock_mode		mode,	/*!< in: mode of the lock which
5769 					the read cursor should set on
5770 					records: LOCK_S or LOCK_X; the
5771 					latter is possible in
5772 					SELECT FOR UPDATE */
5773 	ulint			gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5774 					LOCK_REC_NOT_GAP */
5775 	que_thr_t*		thr)	/*!< in: query thread */
5776 {
5777 	dberr_t	err;
5778 	ulint	heap_no;
5779 
5780 	ut_ad(!dict_index_is_clust(index));
5781 	ut_ad(!dict_index_is_online_ddl(index));
5782 	ut_ad(block->frame == page_align(rec));
5783 	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
5784 	ut_ad(rec_offs_validate(rec, index, offsets));
5785 	ut_ad(page_rec_is_leaf(rec));
5786 	ut_ad(mode == LOCK_X || mode == LOCK_S);
5787 
5788 	if ((flags & BTR_NO_LOCKING_FLAG)
5789 	    || srv_read_only_mode
5790 	    || index->table->is_temporary()) {
5791 
5792 		return(DB_SUCCESS);
5793 	}
5794 
5795 	ut_ad(!rec_is_metadata(rec, index));
5796 	heap_no = page_rec_get_heap_no(rec);
5797 
5798 	/* Some transaction may have an implicit x-lock on the record only
5799 	if the max trx id for the page >= min trx id for the trx list or a
5800 	database recovery is running. */
5801 
5802 	if (!page_rec_is_supremum(rec)
5803 	    && page_get_max_trx_id(block->frame) >= trx_sys.get_min_trx_id()
5804 	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec,
5805 					     index, offsets)
5806 	    && gap_mode == LOCK_REC_NOT_GAP) {
5807 		/* We already hold an implicit exclusive lock. */
5808 		return DB_SUCCESS;
5809 	}
5810 
5811 #ifdef WITH_WSREP
5812 	trx_t *trx= thr_get_trx(thr);
5813 	/* If transaction scanning an unique secondary key is wsrep
5814 	high priority thread (brute force) this scanning may involve
5815 	GAP-locking in the index. As this locking happens also when
5816 	applying replication events in high priority applier threads,
5817 	there is a probability for lock conflicts between two wsrep
5818 	high priority threads. To avoid this GAP-locking we mark that
5819 	this transaction is using unique key scan here. */
5820 	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
5821 		trx->wsrep_UK_scan= true;
5822 #endif /* WITH_WSREP */
5823 
5824 	err = lock_rec_lock(FALSE, ulint(mode) | gap_mode,
5825 			    block, heap_no, index, thr);
5826 
5827 #ifdef WITH_WSREP
5828 	trx->wsrep_UK_scan= false;
5829 #endif /* WITH_WSREP */
5830 
5831 	ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5832 
5833 	return(err);
5834 }
5835 
5836 /*********************************************************************//**
5837 Checks if locks of other transactions prevent an immediate read, or passing
5838 over by a read cursor, of a clustered index record. If they do, first tests
5839 if the query thread should anyway be suspended for some reason; if not, then
5840 puts the transaction and the query thread to the lock wait state and inserts a
5841 waiting request for a record lock to the lock queue. Sets the requested mode
5842 lock on the record.
5843 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5844 dberr_t
lock_clust_rec_read_check_and_lock(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const rec_offs * offsets,lock_mode mode,ulint gap_mode,que_thr_t * thr)5845 lock_clust_rec_read_check_and_lock(
5846 /*===============================*/
5847 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5848 					bit is set, does nothing */
5849 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5850 	const rec_t*		rec,	/*!< in: user record or page
5851 					supremum record which should
5852 					be read or passed over by a
5853 					read cursor */
5854 	dict_index_t*		index,	/*!< in: clustered index */
5855 	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5856 	lock_mode		mode,	/*!< in: mode of the lock which
5857 					the read cursor should set on
5858 					records: LOCK_S or LOCK_X; the
5859 					latter is possible in
5860 					SELECT FOR UPDATE */
5861 	ulint			gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5862 					LOCK_REC_NOT_GAP */
5863 	que_thr_t*		thr)	/*!< in: query thread */
5864 {
5865 	dberr_t	err;
5866 	ulint	heap_no;
5867 
5868 	ut_ad(dict_index_is_clust(index));
5869 	ut_ad(block->frame == page_align(rec));
5870 	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
5871 	ut_ad(gap_mode == LOCK_ORDINARY || gap_mode == LOCK_GAP
5872 	      || gap_mode == LOCK_REC_NOT_GAP);
5873 	ut_ad(rec_offs_validate(rec, index, offsets));
5874 	ut_ad(page_rec_is_leaf(rec));
5875 	ut_ad(!rec_is_metadata(rec, index));
5876 
5877 	if ((flags & BTR_NO_LOCKING_FLAG)
5878 	    || srv_read_only_mode
5879 	    || index->table->is_temporary()) {
5880 
5881 		return(DB_SUCCESS);
5882 	}
5883 
5884 	heap_no = page_rec_get_heap_no(rec);
5885 
5886 	if (heap_no != PAGE_HEAP_NO_SUPREMUM
5887 	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec,
5888 					     index, offsets)
5889 	    && gap_mode == LOCK_REC_NOT_GAP) {
5890 		/* We already hold an implicit exclusive lock. */
5891 		return DB_SUCCESS;
5892 	}
5893 
5894 	err = lock_rec_lock(FALSE, ulint(mode) | gap_mode,
5895 			    block, heap_no, index, thr);
5896 
5897 	ut_ad(lock_rec_queue_validate(FALSE, block, rec, index, offsets));
5898 
5899 	DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock");
5900 
5901 	return(err);
5902 }
5903 /*********************************************************************//**
5904 Checks if locks of other transactions prevent an immediate read, or passing
5905 over by a read cursor, of a clustered index record. If they do, first tests
5906 if the query thread should anyway be suspended for some reason; if not, then
5907 puts the transaction and the query thread to the lock wait state and inserts a
5908 waiting request for a record lock to the lock queue. Sets the requested mode
5909 lock on the record. This is an alternative version of
5910 lock_clust_rec_read_check_and_lock() that does not require the parameter
5911 "offsets".
5912 @return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5913 dberr_t
lock_clust_rec_read_check_and_lock_alt(ulint flags,const buf_block_t * block,const rec_t * rec,dict_index_t * index,lock_mode mode,ulint gap_mode,que_thr_t * thr)5914 lock_clust_rec_read_check_and_lock_alt(
5915 /*===================================*/
5916 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
5917 					bit is set, does nothing */
5918 	const buf_block_t*	block,	/*!< in: buffer block of rec */
5919 	const rec_t*		rec,	/*!< in: user record or page
5920 					supremum record which should
5921 					be read or passed over by a
5922 					read cursor */
5923 	dict_index_t*		index,	/*!< in: clustered index */
5924 	lock_mode		mode,	/*!< in: mode of the lock which
5925 					the read cursor should set on
5926 					records: LOCK_S or LOCK_X; the
5927 					latter is possible in
5928 					SELECT FOR UPDATE */
5929 	ulint			gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5930 					LOCK_REC_NOT_GAP */
5931 	que_thr_t*		thr)	/*!< in: query thread */
5932 {
5933 	mem_heap_t*	tmp_heap	= NULL;
5934 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
5935 	rec_offs*	offsets		= offsets_;
5936 	dberr_t		err;
5937 	rec_offs_init(offsets_);
5938 
5939 	ut_ad(page_rec_is_leaf(rec));
5940 	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
5941 				  ULINT_UNDEFINED, &tmp_heap);
5942 	err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
5943 						 offsets, mode, gap_mode, thr);
5944 	if (tmp_heap) {
5945 		mem_heap_free(tmp_heap);
5946 	}
5947 
5948 	if (err == DB_SUCCESS_LOCKED_REC) {
5949 		err = DB_SUCCESS;
5950 	}
5951 
5952 	return(err);
5953 }
5954 
5955 /*******************************************************************//**
5956 Release the last lock from the transaction's autoinc locks. */
5957 UNIV_INLINE
5958 void
lock_release_autoinc_last_lock(ib_vector_t * autoinc_locks)5959 lock_release_autoinc_last_lock(
5960 /*===========================*/
5961 	ib_vector_t*	autoinc_locks)	/*!< in/out: vector of AUTOINC locks */
5962 {
5963 	ulint		last;
5964 	lock_t*		lock;
5965 
5966 	ut_ad(lock_mutex_own());
5967 	ut_a(!ib_vector_is_empty(autoinc_locks));
5968 
5969 	/* The lock to be release must be the last lock acquired. */
5970 	last = ib_vector_size(autoinc_locks) - 1;
5971 	lock = *static_cast<lock_t**>(ib_vector_get(autoinc_locks, last));
5972 
5973 	/* Should have only AUTOINC locks in the vector. */
5974 	ut_a(lock_get_mode(lock) == LOCK_AUTO_INC);
5975 	ut_a(lock_get_type(lock) == LOCK_TABLE);
5976 
5977 	ut_a(lock->un_member.tab_lock.table != NULL);
5978 
5979 	/* This will remove the lock from the trx autoinc_locks too. */
5980 	lock_table_dequeue(lock);
5981 
5982 	/* Remove from the table vector too. */
5983 	lock_trx_table_locks_remove(lock);
5984 }
5985 
5986 /*******************************************************************//**
5987 Check if a transaction holds any autoinc locks.
5988 @return TRUE if the transaction holds any AUTOINC locks. */
5989 static
5990 ibool
lock_trx_holds_autoinc_locks(const trx_t * trx)5991 lock_trx_holds_autoinc_locks(
5992 /*=========================*/
5993 	const trx_t*	trx)		/*!< in: transaction */
5994 {
5995 	ut_a(trx->autoinc_locks != NULL);
5996 
5997 	return(!ib_vector_is_empty(trx->autoinc_locks));
5998 }
5999 
6000 /*******************************************************************//**
6001 Release all the transaction's autoinc locks. */
6002 static
6003 void
lock_release_autoinc_locks(trx_t * trx)6004 lock_release_autoinc_locks(
6005 /*=======================*/
6006 	trx_t*		trx)		/*!< in/out: transaction */
6007 {
6008 	ut_ad(lock_mutex_own());
6009 	/* If this is invoked for a running transaction by the thread
6010 	that is serving the transaction, then it is not necessary to
6011 	hold trx->mutex here. */
6012 
6013 	ut_a(trx->autoinc_locks != NULL);
6014 
6015 	/* We release the locks in the reverse order. This is to
6016 	avoid searching the vector for the element to delete at
6017 	the lower level. See (lock_table_remove_low()) for details. */
6018 	while (!ib_vector_is_empty(trx->autoinc_locks)) {
6019 
6020 		/* lock_table_remove_low() will also remove the lock from
6021 		the transaction's autoinc_locks vector. */
6022 		lock_release_autoinc_last_lock(trx->autoinc_locks);
6023 	}
6024 
6025 	/* Should release all locks. */
6026 	ut_a(ib_vector_is_empty(trx->autoinc_locks));
6027 }
6028 
6029 /*******************************************************************//**
6030 Gets the type of a lock. Non-inline version for using outside of the
6031 lock module.
6032 @return LOCK_TABLE or LOCK_REC */
6033 ulint
lock_get_type(const lock_t * lock)6034 lock_get_type(
6035 /*==========*/
6036 	const lock_t*	lock)	/*!< in: lock */
6037 {
6038 	return(lock_get_type_low(lock));
6039 }
6040 
6041 /*******************************************************************//**
6042 Gets the id of the transaction owning a lock.
6043 @return transaction id */
6044 trx_id_t
lock_get_trx_id(const lock_t * lock)6045 lock_get_trx_id(
6046 /*============*/
6047 	const lock_t*	lock)	/*!< in: lock */
6048 {
6049 	return(trx_get_id_for_print(lock->trx));
6050 }
6051 
6052 /*******************************************************************//**
6053 Gets the mode of a lock in a human readable string.
6054 The string should not be free()'d or modified.
6055 @return lock mode */
6056 const char*
lock_get_mode_str(const lock_t * lock)6057 lock_get_mode_str(
6058 /*==============*/
6059 	const lock_t*	lock)	/*!< in: lock */
6060 {
6061 	ibool	is_gap_lock;
6062 
6063 	is_gap_lock = lock_get_type_low(lock) == LOCK_REC
6064 		&& lock_rec_get_gap(lock);
6065 
6066 	switch (lock_get_mode(lock)) {
6067 	case LOCK_S:
6068 		if (is_gap_lock) {
6069 			return("S,GAP");
6070 		} else {
6071 			return("S");
6072 		}
6073 	case LOCK_X:
6074 		if (is_gap_lock) {
6075 			return("X,GAP");
6076 		} else {
6077 			return("X");
6078 		}
6079 	case LOCK_IS:
6080 		if (is_gap_lock) {
6081 			return("IS,GAP");
6082 		} else {
6083 			return("IS");
6084 		}
6085 	case LOCK_IX:
6086 		if (is_gap_lock) {
6087 			return("IX,GAP");
6088 		} else {
6089 			return("IX");
6090 		}
6091 	case LOCK_AUTO_INC:
6092 		return("AUTO_INC");
6093 	default:
6094 		return("UNKNOWN");
6095 	}
6096 }
6097 
6098 /*******************************************************************//**
6099 Gets the type of a lock in a human readable string.
6100 The string should not be free()'d or modified.
6101 @return lock type */
6102 const char*
lock_get_type_str(const lock_t * lock)6103 lock_get_type_str(
6104 /*==============*/
6105 	const lock_t*	lock)	/*!< in: lock */
6106 {
6107 	switch (lock_get_type_low(lock)) {
6108 	case LOCK_REC:
6109 		return("RECORD");
6110 	case LOCK_TABLE:
6111 		return("TABLE");
6112 	default:
6113 		return("UNKNOWN");
6114 	}
6115 }
6116 
6117 /*******************************************************************//**
6118 Gets the table on which the lock is.
6119 @return table */
6120 UNIV_INLINE
6121 dict_table_t*
lock_get_table(const lock_t * lock)6122 lock_get_table(
6123 /*===========*/
6124 	const lock_t*	lock)	/*!< in: lock */
6125 {
6126 	switch (lock_get_type_low(lock)) {
6127 	case LOCK_REC:
6128 		ut_ad(dict_index_is_clust(lock->index)
6129 		      || !dict_index_is_online_ddl(lock->index));
6130 		return(lock->index->table);
6131 	case LOCK_TABLE:
6132 		return(lock->un_member.tab_lock.table);
6133 	default:
6134 		ut_error;
6135 		return(NULL);
6136 	}
6137 }
6138 
6139 /*******************************************************************//**
6140 Gets the id of the table on which the lock is.
6141 @return id of the table */
6142 table_id_t
lock_get_table_id(const lock_t * lock)6143 lock_get_table_id(
6144 /*==============*/
6145 	const lock_t*	lock)	/*!< in: lock */
6146 {
6147 	dict_table_t*	table;
6148 
6149 	table = lock_get_table(lock);
6150 
6151 	return(table->id);
6152 }
6153 
6154 /** Determine which table a lock is associated with.
6155 @param[in]	lock	the lock
6156 @return name of the table */
6157 const table_name_t&
lock_get_table_name(const lock_t * lock)6158 lock_get_table_name(
6159 	const lock_t*	lock)
6160 {
6161 	return(lock_get_table(lock)->name);
6162 }
6163 
6164 /*******************************************************************//**
6165 For a record lock, gets the index on which the lock is.
6166 @return index */
6167 const dict_index_t*
lock_rec_get_index(const lock_t * lock)6168 lock_rec_get_index(
6169 /*===============*/
6170 	const lock_t*	lock)	/*!< in: lock */
6171 {
6172 	ut_a(lock_get_type_low(lock) == LOCK_REC);
6173 	ut_ad(dict_index_is_clust(lock->index)
6174 	      || !dict_index_is_online_ddl(lock->index));
6175 
6176 	return(lock->index);
6177 }
6178 
6179 /*******************************************************************//**
6180 For a record lock, gets the name of the index on which the lock is.
6181 The string should not be free()'d or modified.
6182 @return name of the index */
6183 const char*
lock_rec_get_index_name(const lock_t * lock)6184 lock_rec_get_index_name(
6185 /*====================*/
6186 	const lock_t*	lock)	/*!< in: lock */
6187 {
6188 	ut_a(lock_get_type_low(lock) == LOCK_REC);
6189 	ut_ad(dict_index_is_clust(lock->index)
6190 	      || !dict_index_is_online_ddl(lock->index));
6191 
6192 	return(lock->index->name);
6193 }
6194 
6195 /*******************************************************************//**
6196 For a record lock, gets the tablespace number on which the lock is.
6197 @return tablespace number */
6198 ulint
lock_rec_get_space_id(const lock_t * lock)6199 lock_rec_get_space_id(
6200 /*==================*/
6201 	const lock_t*	lock)	/*!< in: lock */
6202 {
6203 	ut_a(lock_get_type_low(lock) == LOCK_REC);
6204 
6205 	return(lock->un_member.rec_lock.space);
6206 }
6207 
6208 /*******************************************************************//**
6209 For a record lock, gets the page number on which the lock is.
6210 @return page number */
6211 ulint
lock_rec_get_page_no(const lock_t * lock)6212 lock_rec_get_page_no(
6213 /*=================*/
6214 	const lock_t*	lock)	/*!< in: lock */
6215 {
6216 	ut_a(lock_get_type_low(lock) == LOCK_REC);
6217 
6218 	return(lock->un_member.rec_lock.page_no);
6219 }
6220 
6221 /*********************************************************************//**
6222 Cancels a waiting lock request and releases possible other transactions
6223 waiting behind it. */
6224 void
lock_cancel_waiting_and_release(lock_t * lock)6225 lock_cancel_waiting_and_release(
6226 /*============================*/
6227 	lock_t*	lock)	/*!< in/out: waiting lock request */
6228 {
6229 	que_thr_t*	thr;
6230 
6231 	ut_ad(lock_mutex_own());
6232 	ut_ad(trx_mutex_own(lock->trx));
6233 	ut_ad(lock->trx->state == TRX_STATE_ACTIVE);
6234 
6235 	lock->trx->lock.cancel = true;
6236 
6237 	if (lock_get_type_low(lock) == LOCK_REC) {
6238 
6239 		lock_rec_dequeue_from_page(lock);
6240 	} else {
6241 		ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
6242 
6243 		if (lock->trx->autoinc_locks != NULL) {
6244 			/* Release the transaction's AUTOINC locks. */
6245 			lock_release_autoinc_locks(lock->trx);
6246 		}
6247 
6248 		lock_table_dequeue(lock);
6249 		/* Remove the lock from table lock vector too. */
6250 		lock_trx_table_locks_remove(lock);
6251 	}
6252 
6253 	/* Reset the wait flag and the back pointer to lock in trx. */
6254 
6255 	lock_reset_lock_and_trx_wait(lock);
6256 
6257 	/* The following function releases the trx from lock wait. */
6258 
6259 	thr = que_thr_end_lock_wait(lock->trx);
6260 
6261 	if (thr != NULL) {
6262 		lock_wait_release_thread_if_suspended(thr);
6263 	}
6264 
6265 	lock->trx->lock.cancel = false;
6266 }
6267 
6268 /*********************************************************************//**
6269 Unlocks AUTO_INC type locks that were possibly reserved by a trx. This
6270 function should be called at the the end of an SQL statement, by the
6271 connection thread that owns the transaction (trx->mysql_thd). */
6272 void
lock_unlock_table_autoinc(trx_t * trx)6273 lock_unlock_table_autoinc(
6274 /*======================*/
6275 	trx_t*	trx)	/*!< in/out: transaction */
6276 {
6277 	ut_ad(!lock_mutex_own());
6278 	ut_ad(!trx_mutex_own(trx));
6279 	ut_ad(!trx->lock.wait_lock);
6280 
6281 	/* This can be invoked on NOT_STARTED, ACTIVE, PREPARED,
6282 	but not COMMITTED transactions. */
6283 
6284 	ut_ad(trx_state_eq(trx, TRX_STATE_NOT_STARTED)
6285 	      || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
6286 
6287 	/* This function is invoked for a running transaction by the
6288 	thread that is serving the transaction. Therefore it is not
6289 	necessary to hold trx->mutex here. */
6290 
6291 	if (lock_trx_holds_autoinc_locks(trx)) {
6292 		lock_mutex_enter();
6293 
6294 		lock_release_autoinc_locks(trx);
6295 
6296 		lock_mutex_exit();
6297 	}
6298 }
6299 
lock_trx_handle_wait_low(trx_t * trx)6300 static inline dberr_t lock_trx_handle_wait_low(trx_t* trx)
6301 {
6302 	ut_ad(lock_mutex_own());
6303 	ut_ad(trx_mutex_own(trx));
6304 
6305 	if (trx->lock.was_chosen_as_deadlock_victim) {
6306 		return DB_DEADLOCK;
6307 	}
6308 	if (!trx->lock.wait_lock) {
6309 		/* The lock was probably granted before we got here. */
6310 		return DB_SUCCESS;
6311 	}
6312 
6313 	lock_cancel_waiting_and_release(trx->lock.wait_lock);
6314 	return DB_LOCK_WAIT;
6315 }
6316 
6317 /*********************************************************************//**
6318 Check whether the transaction has already been rolled back because it
6319 was selected as a deadlock victim, or if it has to wait then cancel
6320 the wait lock.
6321 @return DB_DEADLOCK, DB_LOCK_WAIT or DB_SUCCESS */
6322 dberr_t
lock_trx_handle_wait(trx_t * trx)6323 lock_trx_handle_wait(
6324 /*=================*/
6325 	trx_t*	trx)	/*!< in/out: trx lock state */
6326 {
6327 	lock_mutex_enter();
6328 	trx_mutex_enter(trx);
6329 	dberr_t err = lock_trx_handle_wait_low(trx);
6330 	lock_mutex_exit();
6331 	trx_mutex_exit(trx);
6332 	return err;
6333 }
6334 
6335 /*********************************************************************//**
6336 Get the number of locks on a table.
6337 @return number of locks */
6338 ulint
lock_table_get_n_locks(const dict_table_t * table)6339 lock_table_get_n_locks(
6340 /*===================*/
6341 	const dict_table_t*	table)	/*!< in: table */
6342 {
6343 	ulint		n_table_locks;
6344 
6345 	lock_mutex_enter();
6346 
6347 	n_table_locks = UT_LIST_GET_LEN(table->locks);
6348 
6349 	lock_mutex_exit();
6350 
6351 	return(n_table_locks);
6352 }
6353 
6354 #ifdef UNIV_DEBUG
6355 /**
6356   Do an exhaustive check for any locks (table or rec) against the table.
6357 
6358   @param[in]  table  check if there are any locks held on records in this table
6359                      or on the table itself
6360 */
6361 
lock_table_locks_lookup(rw_trx_hash_element_t * element,const dict_table_t * table)6362 static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element,
6363                                        const dict_table_t *table)
6364 {
6365   ut_ad(lock_mutex_own());
6366   mutex_enter(&element->mutex);
6367   if (element->trx)
6368   {
6369     trx_mutex_enter(element->trx);
6370     check_trx_state(element->trx);
6371     if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY)
6372     {
6373       for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
6374            lock != NULL;
6375            lock= UT_LIST_GET_NEXT(trx_locks, lock))
6376       {
6377         ut_ad(lock->trx == element->trx);
6378         if (lock_get_type_low(lock) == LOCK_REC)
6379         {
6380           ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION ||
6381                 lock->index->is_primary());
6382           ut_ad(lock->index->table != table);
6383         }
6384         else
6385           ut_ad(lock->un_member.tab_lock.table != table);
6386       }
6387     }
6388     trx_mutex_exit(element->trx);
6389   }
6390   mutex_exit(&element->mutex);
6391   return 0;
6392 }
6393 #endif /* UNIV_DEBUG */
6394 
6395 /*******************************************************************//**
6396 Check if there are any locks (table or rec) against table.
6397 @return true if table has either table or record locks. */
6398 bool
lock_table_has_locks(const dict_table_t * table)6399 lock_table_has_locks(
6400 /*=================*/
6401 	const dict_table_t*	table)	/*!< in: check if there are any locks
6402 					held on records in this table or on the
6403 					table itself */
6404 {
6405 	ibool			has_locks;
6406 
6407 	ut_ad(table != NULL);
6408 	lock_mutex_enter();
6409 
6410 	has_locks = UT_LIST_GET_LEN(table->locks) > 0 || table->n_rec_locks > 0;
6411 
6412 #ifdef UNIV_DEBUG
6413 	if (!has_locks) {
6414 		trx_sys.rw_trx_hash.iterate(
6415 			reinterpret_cast<my_hash_walk_action>
6416 			(lock_table_locks_lookup),
6417 			const_cast<dict_table_t*>(table));
6418 	}
6419 #endif /* UNIV_DEBUG */
6420 
6421 	lock_mutex_exit();
6422 
6423 	return(has_locks);
6424 }
6425 
6426 /*******************************************************************//**
6427 Initialise the table lock list. */
6428 void
lock_table_lock_list_init(table_lock_list_t * lock_list)6429 lock_table_lock_list_init(
6430 /*======================*/
6431 	table_lock_list_t*	lock_list)	/*!< List to initialise */
6432 {
6433 	UT_LIST_INIT(*lock_list, &lock_table_t::locks);
6434 }
6435 
6436 /*******************************************************************//**
6437 Initialise the trx lock list. */
6438 void
lock_trx_lock_list_init(trx_lock_list_t * lock_list)6439 lock_trx_lock_list_init(
6440 /*====================*/
6441 	trx_lock_list_t*	lock_list)	/*!< List to initialise */
6442 {
6443 	UT_LIST_INIT(*lock_list, &lock_t::trx_locks);
6444 }
6445 
6446 /*******************************************************************//**
6447 Set the lock system timeout event. */
6448 void
lock_set_timeout_event()6449 lock_set_timeout_event()
6450 /*====================*/
6451 {
6452 	os_event_set(lock_sys.timeout_event);
6453 }
6454 
6455 #ifdef UNIV_DEBUG
6456 /*******************************************************************//**
6457 Check if the transaction holds any locks on the sys tables
6458 or its records.
6459 @return the strongest lock found on any sys table or 0 for none */
6460 const lock_t*
lock_trx_has_sys_table_locks(const trx_t * trx)6461 lock_trx_has_sys_table_locks(
6462 /*=========================*/
6463 	const trx_t*	trx)	/*!< in: transaction to check */
6464 {
6465 	const lock_t*	strongest_lock = 0;
6466 	lock_mode	strongest = LOCK_NONE;
6467 
6468 	lock_mutex_enter();
6469 
6470 	const lock_list::const_iterator end = trx->lock.table_locks.end();
6471 	lock_list::const_iterator it = trx->lock.table_locks.begin();
6472 
6473 	/* Find a valid mode. Note: ib_vector_size() can be 0. */
6474 
6475 	for (/* No op */; it != end; ++it) {
6476 		const lock_t*	lock = *it;
6477 
6478 		if (lock != NULL
6479 		    && dict_is_sys_table(lock->un_member.tab_lock.table->id)) {
6480 
6481 			strongest = lock_get_mode(lock);
6482 			ut_ad(strongest != LOCK_NONE);
6483 			strongest_lock = lock;
6484 			break;
6485 		}
6486 	}
6487 
6488 	if (strongest == LOCK_NONE) {
6489 		lock_mutex_exit();
6490 		return(NULL);
6491 	}
6492 
6493 	for (/* No op */; it != end; ++it) {
6494 		const lock_t*	lock = *it;
6495 
6496 		if (lock == NULL) {
6497 			continue;
6498 		}
6499 
6500 		ut_ad(trx == lock->trx);
6501 		ut_ad(lock_get_type_low(lock) & LOCK_TABLE);
6502 		ut_ad(lock->un_member.tab_lock.table != NULL);
6503 
6504 		lock_mode	mode = lock_get_mode(lock);
6505 
6506 		if (dict_is_sys_table(lock->un_member.tab_lock.table->id)
6507 		    && lock_mode_stronger_or_eq(mode, strongest)) {
6508 
6509 			strongest = mode;
6510 			strongest_lock = lock;
6511 		}
6512 	}
6513 
6514 	lock_mutex_exit();
6515 
6516 	return(strongest_lock);
6517 }
6518 
6519 /** Check if the transaction holds an explicit exclusive lock on a record.
6520 @param[in]	trx	transaction
6521 @param[in]	table	table
6522 @param[in]	block	leaf page
6523 @param[in]	heap_no	heap number identifying the record
6524 @return whether an explicit X-lock is held */
6525 bool
lock_trx_has_expl_x_lock(const trx_t * trx,const dict_table_t * table,const buf_block_t * block,ulint heap_no)6526 lock_trx_has_expl_x_lock(
6527 	const trx_t*		trx,	/*!< in: transaction to check */
6528 	const dict_table_t*	table,	/*!< in: table to check */
6529 	const buf_block_t*	block,	/*!< in: buffer block of the record */
6530 	ulint			heap_no)/*!< in: record heap number */
6531 {
6532 	ut_ad(heap_no > PAGE_HEAP_NO_SUPREMUM);
6533 
6534 	lock_mutex_enter();
6535 	ut_ad(lock_table_has(trx, table, LOCK_IX));
6536 	ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no,
6537 				trx));
6538 	lock_mutex_exit();
6539 	return(true);
6540 }
6541 #endif /* UNIV_DEBUG */
6542 
6543 /** rewind(3) the file used for storing the latest detected deadlock and
6544 print a heading message to stderr if printing of all deadlocks to stderr
6545 is enabled. */
6546 void
start_print()6547 DeadlockChecker::start_print()
6548 {
6549 	ut_ad(lock_mutex_own());
6550 
6551 	rewind(lock_latest_err_file);
6552 	ut_print_timestamp(lock_latest_err_file);
6553 
6554 	if (srv_print_all_deadlocks) {
6555 		ib::info() << "Transactions deadlock detected, dumping"
6556 			" detailed information.";
6557 	}
6558 }
6559 
6560 /** Print a message to the deadlock file and possibly to stderr.
6561 @param msg message to print */
6562 void
print(const char * msg)6563 DeadlockChecker::print(const char* msg)
6564 {
6565 	fputs(msg, lock_latest_err_file);
6566 
6567 	if (srv_print_all_deadlocks) {
6568 		ib::info() << msg;
6569 	}
6570 }
6571 
6572 /** Print transaction data to the deadlock file and possibly to stderr.
6573 @param trx transaction
6574 @param max_query_len max query length to print */
6575 void
print(const trx_t * trx,ulint max_query_len)6576 DeadlockChecker::print(const trx_t* trx, ulint max_query_len)
6577 {
6578 	ut_ad(lock_mutex_own());
6579 
6580 	ulint	n_rec_locks = lock_number_of_rows_locked(&trx->lock);
6581 	ulint	n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
6582 	ulint	heap_size = mem_heap_get_size(trx->lock.lock_heap);
6583 
6584 	trx_print_low(lock_latest_err_file, trx, max_query_len,
6585 		      n_rec_locks, n_trx_locks, heap_size);
6586 
6587 	if (srv_print_all_deadlocks) {
6588 		trx_print_low(stderr, trx, max_query_len,
6589 			      n_rec_locks, n_trx_locks, heap_size);
6590 	}
6591 }
6592 
6593 /** Print lock data to the deadlock file and possibly to stderr.
6594 @param lock record or table type lock */
6595 void
print(const lock_t * lock)6596 DeadlockChecker::print(const lock_t* lock)
6597 {
6598 	ut_ad(lock_mutex_own());
6599 
6600 	if (lock_get_type_low(lock) == LOCK_REC) {
6601 		mtr_t mtr;
6602 		lock_rec_print(lock_latest_err_file, lock, mtr);
6603 
6604 		if (srv_print_all_deadlocks) {
6605 			lock_rec_print(stderr, lock, mtr);
6606 		}
6607 	} else {
6608 		lock_table_print(lock_latest_err_file, lock);
6609 
6610 		if (srv_print_all_deadlocks) {
6611 			lock_table_print(stderr, lock);
6612 		}
6613 	}
6614 }
6615 
6616 /** Get the next lock in the queue that is owned by a transaction whose
6617 sub-tree has not already been searched.
6618 Note: "next" here means PREV for table locks.
6619 
6620 @param lock Lock in queue
6621 @param heap_no heap_no if lock is a record lock else ULINT_UNDEFINED
6622 
6623 @return next lock or NULL if at end of queue */
6624 const lock_t*
get_next_lock(const lock_t * lock,ulint heap_no) const6625 DeadlockChecker::get_next_lock(const lock_t* lock, ulint heap_no) const
6626 {
6627 	ut_ad(lock_mutex_own());
6628 
6629 	do {
6630 		if (lock_get_type_low(lock) == LOCK_REC) {
6631 			ut_ad(heap_no != ULINT_UNDEFINED);
6632 			lock = lock_rec_get_next_const(heap_no, lock);
6633 		} else {
6634 			ut_ad(heap_no == ULINT_UNDEFINED);
6635 			ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
6636 
6637 			lock = UT_LIST_GET_NEXT(
6638 				un_member.tab_lock.locks, lock);
6639 		}
6640 
6641 	} while (lock != NULL && is_visited(lock));
6642 
6643 	ut_ad(lock == NULL
6644 	      || lock_get_type_low(lock) == lock_get_type_low(m_wait_lock));
6645 
6646 	return(lock);
6647 }
6648 
6649 /** Get the first lock to search. The search starts from the current
6650 wait_lock. What we are really interested in is an edge from the
6651 current wait_lock's owning transaction to another transaction that has
6652 a lock ahead in the queue. We skip locks where the owning transaction's
6653 sub-tree has already been searched.
6654 
6655 Note: The record locks are traversed from the oldest lock to the
6656 latest. For table locks we go from latest to oldest.
6657 
6658 For record locks, we first position the "iterator" on the first lock on
6659 the page and then reposition on the actual heap_no. This is required
6660 due to the way the record lock has is implemented.
6661 
6662 @param[out] heap_no if rec lock, else ULINT_UNDEFINED.
6663 @return first lock or NULL */
6664 const lock_t*
get_first_lock(ulint * heap_no) const6665 DeadlockChecker::get_first_lock(ulint* heap_no) const
6666 {
6667 	ut_ad(lock_mutex_own());
6668 
6669 	const lock_t*	lock = m_wait_lock;
6670 
6671 	if (lock_get_type_low(lock) == LOCK_REC) {
6672 		hash_table_t*	lock_hash;
6673 
6674 		lock_hash = lock->type_mode & LOCK_PREDICATE
6675 			? lock_sys.prdt_hash
6676 			: lock_sys.rec_hash;
6677 
6678 		/* We are only interested in records that match the heap_no. */
6679 		*heap_no = lock_rec_find_set_bit(lock);
6680 
6681 		ut_ad(*heap_no <= 0xffff);
6682 		ut_ad(*heap_no != ULINT_UNDEFINED);
6683 
6684 		/* Find the locks on the page. */
6685 		lock = lock_rec_get_first_on_page_addr(
6686 			lock_hash,
6687 			lock->un_member.rec_lock.space,
6688 			lock->un_member.rec_lock.page_no);
6689 
6690 		/* Position on the first lock on the physical record.*/
6691 		if (!lock_rec_get_nth_bit(lock, *heap_no)) {
6692 			lock = lock_rec_get_next_const(*heap_no, lock);
6693 		}
6694 
6695 		ut_a(!lock_get_wait(lock));
6696 	} else {
6697 		/* Table locks don't care about the heap_no. */
6698 		*heap_no = ULINT_UNDEFINED;
6699 		ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
6700 		dict_table_t*	table = lock->un_member.tab_lock.table;
6701 		lock = UT_LIST_GET_FIRST(table->locks);
6702 	}
6703 
6704 	/* Must find at least two locks, otherwise there cannot be a
6705 	waiting lock, secondly the first lock cannot be the wait_lock. */
6706 	ut_a(lock != NULL);
6707 	ut_a(lock != m_wait_lock ||
6708 	     (innodb_lock_schedule_algorithm
6709 	      	== INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
6710 	      && !thd_is_replication_slave_thread(lock->trx->mysql_thd)));
6711 
6712 	/* Check that the lock type doesn't change. */
6713 	ut_ad(lock_get_type_low(lock) == lock_get_type_low(m_wait_lock));
6714 
6715 	return(lock);
6716 }
6717 
6718 /** Notify that a deadlock has been detected and print the conflicting
6719 transaction info.
6720 @param lock lock causing deadlock */
6721 void
notify(const lock_t * lock) const6722 DeadlockChecker::notify(const lock_t* lock) const
6723 {
6724 	ut_ad(lock_mutex_own());
6725 
6726 	start_print();
6727 
6728 	print("\n*** (1) TRANSACTION:\n");
6729 
6730 	print(m_wait_lock->trx, 3000);
6731 
6732 	print("*** (1) WAITING FOR THIS LOCK TO BE GRANTED:\n");
6733 
6734 	print(m_wait_lock);
6735 
6736 	print("*** (2) TRANSACTION:\n");
6737 
6738 	print(lock->trx, 3000);
6739 
6740 	print("*** (2) HOLDS THE LOCK(S):\n");
6741 
6742 	print(lock);
6743 
6744 	/* It is possible that the joining transaction was granted its
6745 	lock when we rolled back some other waiting transaction. */
6746 
6747 	if (m_start->lock.wait_lock != 0) {
6748 		print("*** (2) WAITING FOR THIS LOCK TO BE GRANTED:\n");
6749 
6750 		print(m_start->lock.wait_lock);
6751 	}
6752 
6753 	DBUG_PRINT("ib_lock", ("deadlock detected"));
6754 }
6755 
6756 /** Select the victim transaction that should be rolledback.
6757 @return victim transaction */
6758 const trx_t*
select_victim() const6759 DeadlockChecker::select_victim() const
6760 {
6761 	ut_ad(lock_mutex_own());
6762 	ut_ad(m_start->lock.wait_lock != 0);
6763 	ut_ad(m_wait_lock->trx != m_start);
6764 
6765 	if (trx_weight_ge(m_wait_lock->trx, m_start)) {
6766 		/* The joining transaction is 'smaller',
6767 		choose it as the victim and roll it back. */
6768 #ifdef WITH_WSREP
6769 		if (wsrep_thd_is_BF(m_start->mysql_thd, FALSE)) {
6770 			return(m_wait_lock->trx);
6771 		}
6772 #endif /* WITH_WSREP */
6773 		return(m_start);
6774 	}
6775 
6776 #ifdef WITH_WSREP
6777 	if (wsrep_thd_is_BF(m_wait_lock->trx->mysql_thd, FALSE)) {
6778 		return(m_start);
6779 	}
6780 #endif /* WITH_WSREP */
6781 
6782 	return(m_wait_lock->trx);
6783 }
6784 
6785 /** Looks iteratively for a deadlock. Note: the joining transaction may
6786 have been granted its lock by the deadlock checks.
6787 @return 0 if no deadlock else the victim transaction instance.*/
6788 const trx_t*
search()6789 DeadlockChecker::search()
6790 {
6791 	ut_ad(lock_mutex_own());
6792 	ut_ad(!trx_mutex_own(m_start));
6793 
6794 	ut_ad(m_start != NULL);
6795 	ut_ad(m_wait_lock != NULL);
6796 	ut_ad(!m_wait_lock->trx->auto_commit || m_wait_lock->trx->will_lock);
6797 	ut_d(check_trx_state(m_wait_lock->trx));
6798 	ut_ad(m_mark_start <= s_lock_mark_counter);
6799 
6800 	/* Look at the locks ahead of wait_lock in the lock queue. */
6801 	ulint		heap_no;
6802 	const lock_t*	lock = get_first_lock(&heap_no);
6803 
6804 	for (;;) {
6805 		/* We should never visit the same sub-tree more than once. */
6806 		ut_ad(lock == NULL || !is_visited(lock));
6807 
6808 		while (m_n_elems > 0 && lock == NULL) {
6809 
6810 			/* Restore previous search state. */
6811 
6812 			pop(lock, heap_no);
6813 
6814 			lock = get_next_lock(lock, heap_no);
6815 		}
6816 
6817 		if (lock == NULL) {
6818 			break;
6819 		}
6820 
6821 		if (lock == m_wait_lock) {
6822 
6823 			/* We can mark this subtree as searched */
6824 			ut_ad(lock->trx->lock.deadlock_mark <= m_mark_start);
6825 
6826 			lock->trx->lock.deadlock_mark = ++s_lock_mark_counter;
6827 
6828 			/* We are not prepared for an overflow. This 64-bit
6829 			counter should never wrap around. At 10^9 increments
6830 			per second, it would take 10^3 years of uptime. */
6831 
6832 			ut_ad(s_lock_mark_counter > 0);
6833 
6834 			/* Backtrack */
6835 			lock = NULL;
6836 			continue;
6837 		}
6838 
6839 		if (!lock_has_to_wait(m_wait_lock, lock)) {
6840 			/* No conflict, next lock */
6841 			lock = get_next_lock(lock, heap_no);
6842 			continue;
6843 		}
6844 
6845 		if (lock->trx == m_start) {
6846 			/* Found a cycle. */
6847 			notify(lock);
6848 			return select_victim();
6849 		}
6850 
6851 		if (is_too_deep()) {
6852 			/* Search too deep to continue. */
6853 			m_too_deep = true;
6854 			return m_start;
6855 		}
6856 
6857 		/* We do not need to report autoinc locks to the upper
6858 		layer. These locks are released before commit, so they
6859 		can not cause deadlocks with binlog-fixed commit
6860 		order. */
6861 		if (m_report_waiters
6862 		    && (lock_get_type_low(lock) != LOCK_TABLE
6863 			|| lock_get_mode(lock) != LOCK_AUTO_INC)) {
6864 			thd_rpl_deadlock_check(m_start->mysql_thd,
6865 					       lock->trx->mysql_thd);
6866 		}
6867 
6868 		if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
6869 			/* Another trx ahead has requested a lock in an
6870 			incompatible mode, and is itself waiting for a lock. */
6871 
6872 			++m_cost;
6873 
6874 			if (!push(lock, heap_no)) {
6875 				m_too_deep = true;
6876 				return m_start;
6877 			}
6878 
6879 			m_wait_lock = lock->trx->lock.wait_lock;
6880 
6881 			lock = get_first_lock(&heap_no);
6882 
6883 			if (is_visited(lock)) {
6884 				lock = get_next_lock(lock, heap_no);
6885 			}
6886 		} else {
6887 			lock = get_next_lock(lock, heap_no);
6888 		}
6889 	}
6890 
6891 	ut_a(lock == NULL && m_n_elems == 0);
6892 
6893 	/* No deadlock found. */
6894 	return(0);
6895 }
6896 
6897 /** Print info about transaction that was rolled back.
6898 @param trx transaction rolled back
6899 @param lock lock trx wants */
6900 void
rollback_print(const trx_t * trx,const lock_t * lock)6901 DeadlockChecker::rollback_print(const trx_t* trx, const lock_t* lock)
6902 {
6903 	ut_ad(lock_mutex_own());
6904 
6905 	/* If the lock search exceeds the max step
6906 	or the max depth, the current trx will be
6907 	the victim. Print its information. */
6908 	start_print();
6909 
6910 	print("TOO DEEP OR LONG SEARCH IN THE LOCK TABLE"
6911 	      " WAITS-FOR GRAPH, WE WILL ROLL BACK"
6912 	      " FOLLOWING TRANSACTION \n\n"
6913 	      "*** TRANSACTION:\n");
6914 
6915 	print(trx, 3000);
6916 
6917 	print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
6918 
6919 	print(lock);
6920 }
6921 
6922 /** Rollback transaction selected as the victim. */
6923 void
trx_rollback()6924 DeadlockChecker::trx_rollback()
6925 {
6926 	ut_ad(lock_mutex_own());
6927 
6928 	trx_t*	trx = m_wait_lock->trx;
6929 
6930 	print("*** WE ROLL BACK TRANSACTION (1)\n");
6931 
6932 	trx_mutex_enter(trx);
6933 
6934 	trx->lock.was_chosen_as_deadlock_victim = true;
6935 
6936 	lock_cancel_waiting_and_release(trx->lock.wait_lock);
6937 
6938 	trx_mutex_exit(trx);
6939 }
6940 
6941 /** Check if a joining lock request results in a deadlock.
6942 If a deadlock is found, we will resolve the deadlock by
6943 choosing a victim transaction and rolling it back.
6944 We will attempt to resolve all deadlocks.
6945 
6946 @param[in]	lock	the lock request
6947 @param[in,out]	trx	transaction requesting the lock
6948 
6949 @return trx if it was chosen as victim
6950 @retval	NULL if another victim was chosen,
6951 or there is no deadlock (any more) */
6952 const trx_t*
check_and_resolve(const lock_t * lock,trx_t * trx)6953 DeadlockChecker::check_and_resolve(const lock_t* lock, trx_t* trx)
6954 {
6955 	ut_ad(lock_mutex_own());
6956 	ut_ad(trx_mutex_own(trx));
6957 	ut_ad(trx->state == TRX_STATE_ACTIVE);
6958 	ut_ad(!trx->auto_commit || trx->will_lock);
6959 	ut_ad(!srv_read_only_mode);
6960 
6961 	if (!innobase_deadlock_detect) {
6962 		return(NULL);
6963 	}
6964 
6965 	/*  Release the mutex to obey the latching order.
6966 	This is safe, because DeadlockChecker::check_and_resolve()
6967 	is invoked when a lock wait is enqueued for the currently
6968 	running transaction. Because m_trx is a running transaction
6969 	(it is not currently suspended because of a lock wait),
6970 	its state can only be changed by this thread, which is
6971 	currently associated with the transaction. */
6972 
6973 	trx_mutex_exit(trx);
6974 
6975 	const trx_t*	victim_trx;
6976 	const bool	report_waiters = trx->mysql_thd
6977 		&& thd_need_wait_reports(trx->mysql_thd);
6978 
6979 	/* Try and resolve as many deadlocks as possible. */
6980 	do {
6981 		DeadlockChecker	checker(trx, lock, s_lock_mark_counter,
6982 					report_waiters);
6983 
6984 		victim_trx = checker.search();
6985 
6986 		/* Search too deep, we rollback the joining transaction only
6987 		if it is possible to rollback. Otherwise we rollback the
6988 		transaction that is holding the lock that the joining
6989 		transaction wants. */
6990 		if (checker.is_too_deep()) {
6991 
6992 			ut_ad(trx == checker.m_start);
6993 			ut_ad(trx == victim_trx);
6994 
6995 			rollback_print(victim_trx, lock);
6996 
6997 			MONITOR_INC(MONITOR_DEADLOCK);
6998 
6999 			break;
7000 
7001 		} else if (victim_trx != NULL && victim_trx != trx) {
7002 
7003 			ut_ad(victim_trx == checker.m_wait_lock->trx);
7004 
7005 			checker.trx_rollback();
7006 
7007 			lock_deadlock_found = true;
7008 
7009 			MONITOR_INC(MONITOR_DEADLOCK);
7010 		}
7011 
7012 	} while (victim_trx != NULL && victim_trx != trx);
7013 
7014 	/* If the joining transaction was selected as the victim. */
7015 	if (victim_trx != NULL) {
7016 
7017 		print("*** WE ROLL BACK TRANSACTION (2)\n");
7018 
7019 		lock_deadlock_found = true;
7020 	}
7021 
7022 	trx_mutex_enter(trx);
7023 
7024 	return(victim_trx);
7025 }
7026 
7027 /*************************************************************//**
7028 Updates the lock table when a page is split and merged to
7029 two pages. */
7030 UNIV_INTERN
7031 void
lock_update_split_and_merge(const buf_block_t * left_block,const rec_t * orig_pred,const buf_block_t * right_block)7032 lock_update_split_and_merge(
7033 	const buf_block_t* left_block,	/*!< in: left page to which merged */
7034 	const rec_t* orig_pred,		/*!< in: original predecessor of
7035 					supremum on the left page before merge*/
7036 	const buf_block_t* right_block)	/*!< in: right page from which merged */
7037 {
7038 	const rec_t* left_next_rec;
7039 
7040 	ut_ad(page_is_leaf(left_block->frame));
7041 	ut_ad(page_is_leaf(right_block->frame));
7042 	ut_ad(page_align(orig_pred) == left_block->frame);
7043 
7044 	lock_mutex_enter();
7045 
7046 	left_next_rec = page_rec_get_next_const(orig_pred);
7047 	ut_ad(!page_rec_is_metadata(left_next_rec));
7048 
7049 	/* Inherit the locks on the supremum of the left page to the
7050 	first record which was moved from the right page */
7051 	lock_rec_inherit_to_gap(
7052 		left_block, left_block,
7053 		page_rec_get_heap_no(left_next_rec),
7054 		PAGE_HEAP_NO_SUPREMUM);
7055 
7056 	/* Reset the locks on the supremum of the left page,
7057 	releasing waiting transactions */
7058 	lock_rec_reset_and_release_wait(left_block,
7059 					PAGE_HEAP_NO_SUPREMUM);
7060 
7061 	/* Inherit the locks to the supremum of the left page from the
7062 	successor of the infimum on the right page */
7063 	lock_rec_inherit_to_gap(left_block, right_block,
7064 				PAGE_HEAP_NO_SUPREMUM,
7065 				lock_get_min_heap_no(right_block));
7066 
7067 	lock_mutex_exit();
7068 }
7069