1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2008, Google Inc.
5 
6 Portions of this file contain modifications contributed and copyrighted by
7 Google, Inc. Those modifications are gratefully acknowledged and are described
8 briefly in the InnoDB documentation. The contributions by Google are
9 incorporated with their permission, and subject to the conditions contained in
10 the file COPYING.Google.
11 
12 This program is free software; you can redistribute it and/or modify
13 it under the terms of the GNU General Public License, version 2.0,
14 as published by the Free Software Foundation.
15 
16 This program is also distributed with certain software (including
17 but not limited to OpenSSL) that is licensed under separate terms,
18 as designated in a particular file or component or in included license
19 documentation.  The authors of MySQL hereby grant you an additional
20 permission to link the program and your derivative works with the
21 separately licensed software that they have included with MySQL.
22 
23 This program is distributed in the hope that it will be useful,
24 but WITHOUT ANY WARRANTY; without even the implied warranty of
25 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26 GNU General Public License, version 2.0, for more details.
27 
28 You should have received a copy of the GNU General Public License along with
29 this program; if not, write to the Free Software Foundation, Inc.,
30 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
31 
32 *****************************************************************************/
33 
34 /**************************************************//**
35 @file sync/sync0rw.cc
36 The read-write lock (for thread synchronization)
37 
38 Created 9/11/1995 Heikki Tuuri
39 *******************************************************/
40 
41 #include "sync0rw.h"
42 #ifdef UNIV_NONINL
43 #include "sync0rw.ic"
44 #include "sync0arr.ic"
45 #endif
46 
47 #include "ha_prototypes.h"
48 
49 #include "os0thread.h"
50 #include "mem0mem.h"
51 #include "srv0srv.h"
52 #include "os0event.h"
53 #include "srv0mon.h"
54 #include "sync0debug.h"
55 #include "ha_prototypes.h"
56 #include <my_sys.h>
57 
58 /*
59 	IMPLEMENTATION OF THE RW_LOCK
60 	=============================
61 The status of a rw_lock is held in lock_word. The initial value of lock_word is
62 X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
63 or 1 for each x-lock. This describes the lock state for each value of lock_word:
64 
65 lock_word == X_LOCK_DECR:	Unlocked.
66 X_LOCK_HALF_DECR < lock_word < X_LOCK_DECR:
67 				S locked, no waiting writers.
68 				(X_LOCK_DECR - lock_word) is the number
69 				of S locks.
70 lock_word == X_LOCK_HALF_DECR:	SX locked, no waiting writers.
71 0 < lock_word < X_LOCK_HALF_DECR:
72 				SX locked AND S locked, no waiting writers.
73 				(X_LOCK_HALF_DECR - lock_word) is the number
74 				of S locks.
75 lock_word == 0:			X locked, no waiting writers.
76 -X_LOCK_HALF_DECR < lock_word < 0:
77 				S locked, with a waiting writer.
78 				(-lock_word) is the number of S locks.
79 lock_word == -X_LOCK_HALF_DECR:	X locked and SX locked, no waiting writers.
80 -X_LOCK_DECR < lock_word < -X_LOCK_HALF_DECR:
81 				S locked, with a waiting writer
82 				which has SX lock.
83 				-(lock_word + X_LOCK_HALF_DECR) is the number
84 				of S locks.
85 lock_word == -X_LOCK_DECR:	X locked with recursive X lock (2 X locks).
86 -(X_LOCK_DECR + X_LOCK_HALF_DECR) < lock_word < -X_LOCK_DECR:
87 				X locked. The number of the X locks is:
88 				2 - (lock_word + X_LOCK_DECR)
89 lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR):
90 				X locked with recursive X lock (2 X locks)
91 				and SX locked.
92 lock_word < -(X_LOCK_DECR + X_LOCK_HALF_DECR):
93 				X locked and SX locked.
94 				The number of the X locks is:
95 				2 - (lock_word + X_LOCK_DECR + X_LOCK_HALF_DECR)
96 
97  LOCK COMPATIBILITY MATRIX
98     S SX  X
99  S  +  +  -
100  SX +  -  -
101  X  -  -  -
102 
103 The lock_word is always read and updated atomically and consistently, so that
104 it always represents the state of the lock, and the state of the lock changes
105 with a single atomic operation. This lock_word holds all of the information
106 that a thread needs in order to determine if it is eligible to gain the lock
107 or if it must spin or sleep. The one exception to this is that writer_thread
108 must be verified before recursive write locks: to solve this scenario, we make
109 writer_thread readable by all threads, but only writeable by the x-lock or
110 sx-lock holder.
111 
112 The other members of the lock obey the following rules to remain consistent:
113 
114 recursive:	This and the writer_thread field together control the
115 		behaviour of recursive x-locking or sx-locking.
116 		lock->recursive must be FALSE in following states:
117 			1) The writer_thread contains garbage i.e.: the
118 			lock has just been initialized.
119 			2) The lock is not x-held and there is no
120 			x-waiter waiting on WAIT_EX event.
121 			3) The lock is x-held or there is an x-waiter
122 			waiting on WAIT_EX event but the 'pass' value
123 			is non-zero.
124 		lock->recursive is TRUE iff:
125 			1) The lock is x-held or there is an x-waiter
126 			waiting on WAIT_EX event and the 'pass' value
127 			is zero.
128 		This flag must be set after the writer_thread field
129 		has been updated with a memory ordering barrier.
130 		It is unset before the lock_word has been incremented.
131 writer_thread:	Is used only in recursive x-locking. Can only be safely
132 		read iff lock->recursive flag is TRUE.
133 		This field is uninitialized at lock creation time and
134 		is updated atomically when x-lock is acquired or when
135 		move_ownership is called. A thread is only allowed to
136 		set the value of this field to it's thread_id i.e.: a
137 		thread cannot set writer_thread to some other thread's
138 		id.
139 waiters:	May be set to 1 anytime, but to avoid unnecessary wake-up
140 		signals, it should only be set to 1 when there are threads
141 		waiting on event. Must be 1 when a writer starts waiting to
142 		ensure the current x-locking thread sends a wake-up signal
143 		during unlock. May only be reset to 0 immediately before a
144 		a wake-up signal is sent to event. On most platforms, a
145 		memory barrier is required after waiters is set, and before
146 		verifying lock_word is still held, to ensure some unlocker
147 		really does see the flags new value.
148 event:		Threads wait on event for read or writer lock when another
149 		thread has an x-lock or an x-lock reservation (wait_ex). A
150 		thread may only	wait on event after performing the following
151 		actions in order:
152 		   (1) Record the counter value of event (with os_event_reset).
153 		   (2) Set waiters to 1.
154 		   (3) Verify lock_word <= 0.
155 		(1) must come before (2) to ensure signal is not missed.
156 		(2) must come before (3) to ensure a signal is sent.
157 		These restrictions force the above ordering.
158 		Immediately before sending the wake-up signal, we should:
159 		   (1) Verify lock_word == X_LOCK_DECR (unlocked)
160 		   (2) Reset waiters to 0.
161 wait_ex_event:	A thread may only wait on the wait_ex_event after it has
162 		performed the following actions in order:
163 		   (1) Decrement lock_word by X_LOCK_DECR.
164 		   (2) Record counter value of wait_ex_event (os_event_reset,
165 		       called from sync_array_reserve_cell).
166 		   (3) Verify that lock_word < 0.
167 		(1) must come first to ensures no other threads become reader
168 		or next writer, and notifies unlocker that signal must be sent.
169 		(2) must come before (3) to ensure the signal is not missed.
170 		These restrictions force the above ordering.
171 		Immediately before sending the wake-up signal, we should:
172 		   Verify lock_word == 0 (waiting thread holds x_lock)
173 */
174 
175 rw_lock_stats_t		rw_lock_stats;
176 
177 /* The global list of rw-locks */
178 rw_lock_list_t		rw_lock_list;
179 ib_mutex_t		rw_lock_list_mutex;
180 
181 #ifdef UNIV_DEBUG
182 /******************************************************************//**
183 Creates a debug info struct. */
184 static
185 rw_lock_debug_t*
186 rw_lock_debug_create(void);
187 /*======================*/
188 /******************************************************************//**
189 Frees a debug info struct. */
190 static
191 void
192 rw_lock_debug_free(
193 /*===============*/
194 	rw_lock_debug_t* info);
195 
196 /******************************************************************//**
197 Creates a debug info struct.
198 @return own: debug info struct */
199 static
200 rw_lock_debug_t*
rw_lock_debug_create(void)201 rw_lock_debug_create(void)
202 /*======================*/
203 {
204 	return((rw_lock_debug_t*) ut_malloc_nokey(sizeof(rw_lock_debug_t)));
205 }
206 
207 /******************************************************************//**
208 Frees a debug info struct. */
209 static
210 void
rw_lock_debug_free(rw_lock_debug_t * info)211 rw_lock_debug_free(
212 /*===============*/
213 	rw_lock_debug_t* info)
214 {
215 	ut_free(info);
216 }
217 #endif /* UNIV_DEBUG */
218 
219 /******************************************************************//**
220 Creates, or rather, initializes an rw-lock object in a specified memory
221 location (which must be appropriately aligned). The rw-lock is initialized
222 to the non-locked state. Explicit freeing of the rw-lock with rw_lock_free
223 is necessary only if the memory block containing it is freed. */
224 void
rw_lock_create_func(rw_lock_t * lock,latch_level_t level,const char * cmutex_name,const char * cfile_name,ulint cline)225 rw_lock_create_func(
226 /*================*/
227 	rw_lock_t*	lock,		/*!< in: pointer to memory */
228 #ifdef UNIV_DEBUG
229 	latch_level_t	level,		/*!< in: level */
230 	const char*	cmutex_name,	/*!< in: rw-lock name */
231 #endif /* UNIV_DEBUG */
232 	const char*	cfile_name,	/*!< in: file name where created */
233 	ulint		cline)		/*!< in: file line where created */
234 {
235 #if defined(UNIV_DEBUG) && !defined(UNIV_PFS_RWLOCK)
236 	/* It should have been created in pfs_rw_lock_create_func() */
237 	new(lock) rw_lock_t();
238 #endif /* UNIV_DEBUG */
239 
240 	/* If this is the very first time a synchronization object is
241 	created, then the following call initializes the sync system. */
242 
243 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
244 	mutex_create(LATCH_ID_RW_LOCK_MUTEX, rw_lock_get_mutex(lock));
245 #else /* INNODB_RW_LOCKS_USE_ATOMICS */
246 # ifdef UNIV_DEBUG
247 	UT_NOT_USED(cmutex_name);
248 # endif
249 #endif /* INNODB_RW_LOCKS_USE_ATOMICS */
250 
251 	lock->lock_word = X_LOCK_DECR;
252 	lock->waiters = 0;
253 
254 	/* We set this value to signify that lock->writer_thread
255 	contains garbage at initialization and cannot be used for
256 	recursive x-locking. */
257 	lock->recursive = FALSE;
258 	lock->sx_recursive = 0;
259 	/* Silence Valgrind when UNIV_DEBUG_VALGRIND is not enabled. */
260 	memset((void*) &lock->writer_thread, 0, sizeof lock->writer_thread);
261 	UNIV_MEM_INVALID(&lock->writer_thread, sizeof lock->writer_thread);
262 
263 #ifdef UNIV_DEBUG
264 	lock->m_rw_lock = true;
265 
266 	UT_LIST_INIT(lock->debug_list, &rw_lock_debug_t::list);
267 
268 	lock->m_id = sync_latch_get_id(sync_latch_get_name(level));
269 	ut_a(lock->m_id != LATCH_ID_NONE);
270 
271 	lock->level = level;
272 #endif /* UNIV_DEBUG */
273 
274 	lock->cfile_name = cfile_name;
275 
276 	/* This should hold in practice. If it doesn't then we need to
277 	split the source file anyway. Or create the locks on lines
278 	less than 8192. cline is unsigned:13. */
279 	ut_ad(cline <= 8192);
280 	lock->cline = (unsigned int) cline;
281 
282 	lock->count_os_wait = 0;
283 	lock->last_s_file_name = "not yet reserved";
284 	lock->last_x_file_name = "not yet reserved";
285 	lock->last_s_line = 0;
286 	lock->last_x_line = 0;
287 	lock->event = os_event_create(0);
288 	lock->wait_ex_event = os_event_create(0);
289 
290 	lock->is_block_lock = 0;
291 
292 	mutex_enter(&rw_lock_list_mutex);
293 
294 	ut_ad(UT_LIST_GET_FIRST(rw_lock_list) == NULL
295 	      || UT_LIST_GET_FIRST(rw_lock_list)->magic_n == RW_LOCK_MAGIC_N);
296 
297 	UT_LIST_ADD_FIRST(rw_lock_list, lock);
298 
299 	mutex_exit(&rw_lock_list_mutex);
300 }
301 
302 /******************************************************************//**
303 Calling this function is obligatory only if the memory buffer containing
304 the rw-lock is freed. Removes an rw-lock object from the global list. The
305 rw-lock is checked to be in the non-locked state. */
306 void
rw_lock_free_func(rw_lock_t * lock)307 rw_lock_free_func(
308 /*==============*/
309 	rw_lock_t*	lock)	/*!< in/out: rw-lock */
310 {
311 	os_rmb;
312 	ut_ad(rw_lock_validate(lock));
313 	ut_a(lock->lock_word == X_LOCK_DECR);
314 
315 	mutex_enter(&rw_lock_list_mutex);
316 
317 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
318 	mutex_free(rw_lock_get_mutex(lock));
319 #endif /* !INNODB_RW_LOCKS_USE_ATOMICS */
320 
321 	os_event_destroy(lock->event);
322 
323 	os_event_destroy(lock->wait_ex_event);
324 
325 	UT_LIST_REMOVE(rw_lock_list, lock);
326 
327 	mutex_exit(&rw_lock_list_mutex);
328 
329 	/* We did an in-place new in rw_lock_create_func() */
330 	ut_d(lock->~rw_lock_t());
331 }
332 
333 /******************************************************************//**
334 Lock an rw-lock in shared mode for the current thread. If the rw-lock is
335 locked in exclusive mode, or there is an exclusive lock request waiting,
336 the function spins a preset time (controlled by srv_n_spin_wait_rounds), waiting
337 for the lock, before suspending the thread. */
338 void
rw_lock_s_lock_spin(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)339 rw_lock_s_lock_spin(
340 /*================*/
341 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
342 	ulint		pass,	/*!< in: pass value; != 0, if the lock
343 				will be passed to another thread to unlock */
344 	const char*	file_name, /*!< in: file name where lock requested */
345 	ulint		line)	/*!< in: line where requested */
346 {
347 	ulint		i = 0;	/* spin round count */
348 	sync_array_t*	sync_arr;
349 	ulint		spin_count = 0;
350 	uint64_t	count_os_wait = 0;
351 	const os_thread_id_t curr_thread = os_thread_get_curr_id();
352 
353 	/* We reuse the thread id to index into the counter, cache
354 	it here for efficiency. */
355 
356 	const size_t counter_index = (size_t) ut_rnd_gen_next_ulint(
357 							(ulint) curr_thread);
358 
359 	ut_ad(rw_lock_validate(lock));
360 
361 lock_loop:
362 
363 	/* Spin waiting for the writer field to become free */
364 	os_rmb;
365 	while (i < srv_n_spin_wait_rounds && lock->lock_word <= 0) {
366 		if (srv_spin_wait_delay) {
367 			ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
368 		}
369 
370 		i++;
371 	}
372 
373 	if (i >= srv_n_spin_wait_rounds) {
374 		os_thread_yield();
375 	}
376 
377 	++spin_count;
378 
379 	/* We try once again to obtain the lock */
380 	if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
381 
382 		if (count_os_wait > 0) {
383 			lock->count_os_wait +=
384 				static_cast<uint32_t>(count_os_wait);
385 			rw_lock_stats.rw_s_os_wait_count.add(counter_index,
386 							     count_os_wait);
387 		}
388 
389 		rw_lock_stats.rw_s_spin_round_count.add(counter_index,
390 							spin_count);
391 
392 		return; /* Success */
393 	} else {
394 
395 		if (i < srv_n_spin_wait_rounds) {
396 			goto lock_loop;
397 		}
398 
399 
400 		++count_os_wait;
401 
402 		sync_cell_t*	cell;
403 
404 		sync_arr = sync_array_get_and_reserve_cell(
405 				lock, RW_LOCK_S, file_name, line, &cell);
406 
407 		/* Set waiters before checking lock_word to ensure wake-up
408 		signal is sent. This may lead to some unnecessary signals. */
409 		rw_lock_set_waiter_flag(lock);
410 
411 		if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
412 
413 			sync_array_free_cell(sync_arr, cell);
414 
415 			if (count_os_wait > 0) {
416 
417 				lock->count_os_wait +=
418 					static_cast<uint32_t>(count_os_wait);
419 
420 				rw_lock_stats.rw_s_os_wait_count.add(
421 					counter_index, count_os_wait);
422 			}
423 
424 			rw_lock_stats.rw_s_spin_round_count.add(counter_index,
425 								spin_count);
426 
427 			return; /* Success */
428 		}
429 
430 		/* see comments in trx_commit_low() to
431 		before_trx_state_committed_in_memory explaining
432 		this care to invoke the following sync check.*/
433 #ifndef NDEBUG
434 #ifdef UNIV_DEBUG
435 		if (lock->get_level() != SYNC_DICT_OPERATION) {
436 			DEBUG_SYNC_C("rw_s_lock_waiting");
437 		}
438 #endif
439 #endif
440 		sync_array_wait_event(sync_arr, cell);
441 
442 		i = 0;
443 
444 		goto lock_loop;
445 	}
446 }
447 
448 /******************************************************************//**
449 This function is used in the insert buffer to move the ownership of an
450 x-latch on a buffer frame to the current thread. The x-latch was set by
451 the buffer read operation and it protected the buffer frame while the
452 read was done. The ownership is moved because we want that the current
453 thread is able to acquire a second x-latch which is stored in an mtr.
454 This, in turn, is needed to pass the debug checks of index page
455 operations. */
456 void
rw_lock_x_lock_move_ownership(rw_lock_t * lock)457 rw_lock_x_lock_move_ownership(
458 /*==========================*/
459 	rw_lock_t*	lock)	/*!< in: lock which was x-locked in the
460 				buffer read */
461 {
462 	const os_thread_id_t curr_thread = os_thread_get_curr_id();
463 
464 	ut_ad(rw_lock_is_locked(lock, RW_LOCK_X));
465 
466 	rw_lock_set_writer_id_and_recursion_flag(lock, true, curr_thread);
467 }
468 
469 /******************************************************************//**
470 Function for the next writer to call. Waits for readers to exit.
471 The caller must have already decremented lock_word by X_LOCK_DECR. */
472 UNIV_INLINE
473 void
rw_lock_x_lock_wait_func(rw_lock_t * lock,ulint pass,lint threshold,const os_thread_id_t curr_thread,const char * file_name,ulint line)474 rw_lock_x_lock_wait_func(
475 /*=====================*/
476 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
477 #ifdef UNIV_DEBUG
478 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
479 				be passed to another thread to unlock */
480 #endif
481 	lint		threshold,/*!< in: threshold to wait for */
482 	const os_thread_id_t curr_thread,/*!< in: current thread id */
483 	const char*	file_name,/*!< in: file name where lock requested */
484 	ulint		line)	/*!< in: line where requested */
485 {
486 	ulint		i = 0;
487 	ulint		n_spins = 0;
488 	sync_array_t*	sync_arr;
489 	uint64_t	count_os_wait = 0;
490 	size_t		counter_index;
491 
492 	/* We reuse the thread id to index into the counter, cache
493 	it here for efficiency. */
494 
495 	counter_index = (size_t) ut_rnd_gen_next_ulint((ulint) curr_thread);
496 
497 	os_rmb;
498 	ut_ad(lock->lock_word <= threshold);
499 
500 	while (lock->lock_word < threshold) {
501 
502 		if (srv_spin_wait_delay) {
503 			ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
504 		}
505 
506 		if (i < srv_n_spin_wait_rounds) {
507 			i++;
508 			os_rmb;
509 			continue;
510 		}
511 
512 		/* If there is still a reader, then go to sleep.*/
513 		++n_spins;
514 
515 		sync_cell_t*	cell;
516 
517 		sync_arr = sync_array_get_and_reserve_cell(
518 			lock, RW_LOCK_X_WAIT, file_name, line, &cell);
519 
520 		i = 0;
521 
522 		/* Check lock_word to ensure wake-up isn't missed.*/
523 		if (lock->lock_word < threshold) {
524 
525 			++count_os_wait;
526 
527 			/* Add debug info as it is needed to detect possible
528 			deadlock. We must add info for WAIT_EX thread for
529 			deadlock detection to work properly. */
530 			ut_d(rw_lock_add_debug_info(
531 					lock, pass, RW_LOCK_X_WAIT,
532 					file_name, line));
533 
534 			sync_array_wait_event(sync_arr, cell);
535 
536 			ut_d(rw_lock_remove_debug_info(
537 					lock, pass, RW_LOCK_X_WAIT));
538 
539 			/* It is possible to wake when lock_word < 0.
540 			We must pass the while-loop check to proceed.*/
541 
542 		} else {
543 			sync_array_free_cell(sync_arr, cell);
544 			break;
545 		}
546 	}
547 
548 	rw_lock_stats.rw_x_spin_round_count.add(counter_index, n_spins);
549 
550 	if (count_os_wait > 0) {
551 		lock->count_os_wait +=
552 			static_cast<uint32_t>(count_os_wait);
553 		rw_lock_stats.rw_x_os_wait_count.add(counter_index,
554 						     count_os_wait);
555 	}
556 }
557 
558 #ifdef UNIV_DEBUG
559 # define rw_lock_x_lock_wait(L, P, T, C, F, O)		\
560 	rw_lock_x_lock_wait_func(L, P, T, C, F, O)
561 #else
562 # define rw_lock_x_lock_wait(L, P, T, C, F, O)		\
563 	rw_lock_x_lock_wait_func(L, T, C, F, O)
564 #endif /* UNIV_DBEUG */
565 
566 /******************************************************************//**
567 Low-level function for acquiring an exclusive lock.
568 @return FALSE if did not succeed, TRUE if success. */
569 UNIV_INLINE
570 ibool
rw_lock_x_lock_low(rw_lock_t * lock,ulint pass,const os_thread_id_t curr_thread,const char * file_name,ulint line)571 rw_lock_x_lock_low(
572 /*===============*/
573 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
574 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
575 				be passed to another thread to unlock */
576 	const os_thread_id_t curr_thread,/*!< in: current thread id */
577 	const char*	file_name,/*!< in: file name where lock requested */
578 	ulint		line)	/*!< in: line where requested */
579 {
580 	if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, X_LOCK_HALF_DECR)) {
581 
582 		/* lock->recursive also tells us if the writer_thread
583 		field is stale or active. As we are going to write
584 		our own thread id in that field it must be that the
585 		current writer_thread value is not active. */
586 		ut_a(!lock->recursive);
587 
588 		/* Decrement occurred: we are writer or next-writer. */
589 		rw_lock_set_writer_id_and_recursion_flag(
590 			lock, !pass, curr_thread);
591 
592 		rw_lock_x_lock_wait(lock, pass, 0, curr_thread,
593 				    file_name, line);
594 
595 	} else {
596 		bool recursive;
597 		os_thread_id_t writer_thread;
598 
599 		if (!pass) {
600 			recursive = lock->recursive;
601 			os_rmb;
602 			writer_thread = lock->writer_thread;
603 		}
604 
605 		/* Decrement failed: An X or SX lock is held by either
606 		this thread or another. Try to relock. */
607 		if (!pass && recursive
608 		    && os_thread_eq(writer_thread, curr_thread)) {
609 			/* Other s-locks can be allowed. If it is request x
610 			recursively while holding sx lock, this x lock should
611 			be along with the latching-order. */
612 
613 			/* The existing X or SX lock is from this thread */
614 			if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, 0)) {
615 				/* There is at least one SX-lock from this
616 				thread, but no X-lock. */
617 
618 				/* Wait for any the other S-locks to be
619 				released. */
620 				rw_lock_x_lock_wait(
621 					lock, pass, -X_LOCK_HALF_DECR,
622 					curr_thread, file_name, line);
623 
624 			} else {
625 				/* At least one X lock by this thread already
626 				exists. Add another. */
627 				if (lock->lock_word == 0
628 				    || lock->lock_word == -X_LOCK_HALF_DECR) {
629 					lock->lock_word -= X_LOCK_DECR;
630 				} else {
631 					ut_ad(lock->lock_word <= -X_LOCK_DECR);
632 					--lock->lock_word;
633 				}
634 			}
635 
636 		} else {
637 			/* Another thread locked before us */
638 			return(FALSE);
639 		}
640 	}
641 
642 	ut_d(rw_lock_add_debug_info(lock, pass, RW_LOCK_X, file_name, line));
643 
644 	lock->last_x_file_name = file_name;
645 	lock->last_x_line = (unsigned int) line;
646 
647 	return(TRUE);
648 }
649 
650 /******************************************************************//**
651 Low-level function for acquiring an sx lock.
652 @return FALSE if did not succeed, TRUE if success. */
653 UNIV_INLINE
654 ibool
rw_lock_sx_lock_low(rw_lock_t * lock,ulint pass,const os_thread_id_t curr_thread,const char * file_name,ulint line)655 rw_lock_sx_lock_low(
656 /*================*/
657 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
658 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
659 				be passed to another thread to unlock */
660 	const os_thread_id_t curr_thread,/*!< in: current thread id */
661 	const char*	file_name,/*!< in: file name where lock requested */
662 	ulint		line)	/*!< in: line where requested */
663 {
664 	if (rw_lock_lock_word_decr(lock, X_LOCK_HALF_DECR, X_LOCK_HALF_DECR)) {
665 
666 		/* lock->recursive also tells us if the writer_thread
667 		field is stale or active. As we are going to write
668 		our own thread id in that field it must be that the
669 		current writer_thread value is not active. */
670 		ut_a(!lock->recursive);
671 
672 		/* Decrement occurred: we are the SX lock owner. */
673 		rw_lock_set_writer_id_and_recursion_flag(
674 			lock, !pass, curr_thread);
675 
676 		lock->sx_recursive = 1;
677 
678 	} else {
679 		bool recursive;
680 		os_thread_id_t writer_thread;
681 
682 		if (!pass) {
683 			recursive = lock->recursive;
684 			os_rmb;
685 			writer_thread = lock->writer_thread;
686 		}
687 
688 		/* Decrement failed: It already has an X or SX lock by this
689 		thread or another thread. If it is this thread, relock,
690 		else fail. */
691 		if (!pass && recursive
692 		    && os_thread_eq(writer_thread, curr_thread)) {
693 			/* This thread owns an X or SX lock */
694 			if (lock->sx_recursive++ == 0) {
695 				/* This thread is making first SX-lock request
696 				and it must be holding at least one X-lock here
697 				because:
698 
699 				* There can't be a WAIT_EX thread because we are
700 				  the thread which has it's thread_id written in
701 				  the writer_thread field and we are not waiting.
702 
703 				* Any other X-lock thread cannot exist because
704 				  it must update recursive flag only after
705 				  updating the thread_id. Had there been
706 				  a concurrent X-locking thread which succeeded
707 				  in decrementing the lock_word it must have
708 				  written it's thread_id before setting the
709 				  recursive flag. As we cleared the if()
710 				  condition above therefore we must be the only
711 				  thread working on this lock and it is safe to
712 				  read and write to the lock_word. */
713 
714 				ut_ad((lock->lock_word == 0)
715 				      || ((lock->lock_word <= -X_LOCK_DECR)
716 					  && (lock->lock_word
717 					      > -(X_LOCK_DECR
718 						  + X_LOCK_HALF_DECR))));
719 				lock->lock_word -= X_LOCK_HALF_DECR;
720 			}
721 		} else {
722 			/* Another thread locked before us */
723 			return(FALSE);
724 		}
725 	}
726 
727 	ut_d(rw_lock_add_debug_info(lock, pass, RW_LOCK_SX, file_name, line));
728 
729 	lock->last_x_file_name = file_name;
730 	lock->last_x_line = (unsigned int) line;
731 
732 	return(TRUE);
733 }
734 
735 /******************************************************************//**
736 NOTE! Use the corresponding macro, not directly this function! Lock an
737 rw-lock in exclusive mode for the current thread. If the rw-lock is locked
738 in shared or exclusive mode, or there is an exclusive lock request waiting,
739 the function spins a preset time (controlled by srv_n_spin_wait_rounds), waiting
740 for the lock before suspending the thread. If the same thread has an x-lock
741 on the rw-lock, locking succeed, with the following exception: if pass != 0,
742 only a single x-lock may be taken on the lock. NOTE: If the same thread has
743 an s-lock, locking does not succeed! */
744 void
rw_lock_x_lock_func(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)745 rw_lock_x_lock_func(
746 /*================*/
747 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
748 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
749 				be passed to another thread to unlock */
750 	const char*	file_name,/*!< in: file name where lock requested */
751 	ulint		line)	/*!< in: line where requested */
752 {
753 	ulint		i = 0;
754 	sync_array_t*	sync_arr;
755 	ulint		spin_count = 0;
756 	uint64_t	count_os_wait = 0;
757 	const os_thread_id_t curr_thread = os_thread_get_curr_id();
758 
759 	/* We reuse the thread id to index into the counter, cache
760 	it here for efficiency. */
761 
762 	const size_t counter_index = (size_t) ut_rnd_gen_next_ulint(
763 							(ulint) curr_thread);
764 
765 	ut_ad(rw_lock_validate(lock));
766 	ut_ad(!rw_lock_own(lock, RW_LOCK_S));
767 
768 lock_loop:
769 
770 	if (rw_lock_x_lock_low(lock, pass, curr_thread, file_name, line)) {
771 
772 		if (count_os_wait > 0) {
773 			lock->count_os_wait +=
774 				static_cast<uint32_t>(count_os_wait);
775 			rw_lock_stats.rw_x_os_wait_count.add(counter_index,
776 							     count_os_wait);
777 		}
778 
779 		rw_lock_stats.rw_x_spin_round_count.add(counter_index,
780 							spin_count);
781 
782 		/* Locking succeeded */
783 		return;
784 
785 	} else {
786 
787 		/* Spin waiting for the lock_word to become free */
788 		os_rmb;
789 		while (i < srv_n_spin_wait_rounds
790 		       && lock->lock_word <= X_LOCK_HALF_DECR) {
791 
792 			if (srv_spin_wait_delay) {
793 				ut_delay(ut_rnd_interval(
794 						0, srv_spin_wait_delay));
795 			}
796 
797 			i++;
798 		}
799 
800 		spin_count += i;
801 
802 		if (i >= srv_n_spin_wait_rounds) {
803 
804 			os_thread_yield();
805 
806 		} else {
807 
808 			goto lock_loop;
809 		}
810 	}
811 
812 	sync_cell_t*	cell;
813 
814 	sync_arr = sync_array_get_and_reserve_cell(
815 			lock, RW_LOCK_X, file_name, line, &cell);
816 
817 	/* Waiters must be set before checking lock_word, to ensure signal
818 	is sent. This could lead to a few unnecessary wake-up signals. */
819 	rw_lock_set_waiter_flag(lock);
820 
821 	if (rw_lock_x_lock_low(lock, pass, curr_thread, file_name, line)) {
822 		sync_array_free_cell(sync_arr, cell);
823 
824 		if (count_os_wait > 0) {
825 			lock->count_os_wait +=
826 				static_cast<uint32_t>(count_os_wait);
827 			rw_lock_stats.rw_x_os_wait_count.add(counter_index,
828 							     count_os_wait);
829 		}
830 
831 		rw_lock_stats.rw_x_spin_round_count.add(counter_index,
832 							spin_count);
833 
834 		/* Locking succeeded */
835 		return;
836 	}
837 
838 	++count_os_wait;
839 
840 	sync_array_wait_event(sync_arr, cell);
841 
842 	i = 0;
843 
844 	goto lock_loop;
845 }
846 
847 /******************************************************************//**
848 NOTE! Use the corresponding macro, not directly this function! Lock an
849 rw-lock in SX mode for the current thread. If the rw-lock is locked
850 in exclusive mode, or there is an exclusive lock request waiting,
851 the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
852 for the lock, before suspending the thread. If the same thread has an x-lock
853 on the rw-lock, locking succeed, with the following exception: if pass != 0,
854 only a single sx-lock may be taken on the lock. NOTE: If the same thread has
855 an s-lock, locking does not succeed! */
856 void
rw_lock_sx_lock_func(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)857 rw_lock_sx_lock_func(
858 /*=================*/
859 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
860 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
861 				be passed to another thread to unlock */
862 	const char*	file_name,/*!< in: file name where lock requested */
863 	ulint		line)	/*!< in: line where requested */
864 
865 {
866 	ulint		i = 0;
867 	sync_array_t*	sync_arr;
868 	ulint		spin_count = 0;
869 	uint64_t	count_os_wait = 0;
870 	ulint		spin_wait_count = 0;
871 	const os_thread_id_t curr_thread = os_thread_get_curr_id();
872 
873 	/* We reuse the thread id to index into the counter, cache
874 	it here for efficiency. */
875 
876 	const size_t counter_index = (size_t) ut_rnd_gen_next_ulint(
877 							(ulint) curr_thread);
878 
879 	ut_ad(rw_lock_validate(lock));
880 	ut_ad(!rw_lock_own(lock, RW_LOCK_S));
881 
882 lock_loop:
883 
884 	if (rw_lock_sx_lock_low(lock, pass, curr_thread, file_name, line)) {
885 
886 		if (count_os_wait > 0) {
887 			lock->count_os_wait +=
888 				static_cast<uint32_t>(count_os_wait);
889 			rw_lock_stats.rw_sx_os_wait_count.add(counter_index,
890 							      count_os_wait);
891 		}
892 
893 		rw_lock_stats.rw_sx_spin_round_count.add(counter_index,
894 							 spin_count);
895 		rw_lock_stats.rw_sx_spin_wait_count.add(counter_index,
896 							spin_wait_count);
897 
898 		/* Locking succeeded */
899 		return;
900 
901 	} else {
902 
903 		++spin_wait_count;
904 
905 		/* Spin waiting for the lock_word to become free */
906 		os_rmb;
907 		while (i < srv_n_spin_wait_rounds
908 		       && lock->lock_word <= X_LOCK_HALF_DECR) {
909 
910 			if (srv_spin_wait_delay) {
911 				ut_delay(ut_rnd_interval(
912 						0, srv_spin_wait_delay));
913 			}
914 
915 			i++;
916 		}
917 
918 		spin_count += i;
919 
920 		if (i >= srv_n_spin_wait_rounds) {
921 
922 			os_thread_yield();
923 
924 		} else {
925 
926 			goto lock_loop;
927 		}
928 	}
929 
930 	sync_cell_t*	cell;
931 
932 	sync_arr = sync_array_get_and_reserve_cell(
933 			lock, RW_LOCK_SX, file_name, line, &cell);
934 
935 	/* Waiters must be set before checking lock_word, to ensure signal
936 	is sent. This could lead to a few unnecessary wake-up signals. */
937 	rw_lock_set_waiter_flag(lock);
938 
939 	if (rw_lock_sx_lock_low(lock, pass, curr_thread, file_name, line)) {
940 
941 		sync_array_free_cell(sync_arr, cell);
942 
943 		if (count_os_wait > 0) {
944 			lock->count_os_wait +=
945 				static_cast<uint32_t>(count_os_wait);
946 			rw_lock_stats.rw_sx_os_wait_count.add(counter_index,
947 							      count_os_wait);
948 		}
949 
950 		rw_lock_stats.rw_sx_spin_round_count.add(counter_index,
951 							 spin_count);
952 		rw_lock_stats.rw_sx_spin_wait_count.add(counter_index,
953 							spin_wait_count);
954 
955 		/* Locking succeeded */
956 		return;
957 	}
958 
959 	++count_os_wait;
960 
961 	sync_array_wait_event(sync_arr, cell);
962 
963 	i = 0;
964 
965 	goto lock_loop;
966 }
967 
968 /******************************************************************//**
969 NOTE! Use the corresponding macro, not directly this function! Lock an
970 rw-lock in SX mode for the current thread if the lock can be
971 obtained immediately.
972 @return FALSE if did not succeed, TRUE if success. */
973 ibool
rw_lock_sx_lock_func_nowait(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)974 rw_lock_sx_lock_func_nowait(
975 /*========================*/
976 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
977 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
978 				be passed to another thread to unlock */
979 	const char*	file_name,/*!< in: file name where lock requested */
980 	ulint		line)	/*!< in: line where requested */
981 {
982 	const os_thread_id_t curr_thread = os_thread_get_curr_id();
983 
984 	return(rw_lock_sx_lock_low(lock, pass, curr_thread, file_name, line));
985 }
986 
987 #ifdef UNIV_DEBUG
988 
989 /******************************************************************//**
990 Checks that the rw-lock has been initialized and that there are no
991 simultaneous shared and exclusive locks.
992 @return true */
993 bool
rw_lock_validate(const rw_lock_t * lock)994 rw_lock_validate(
995 /*=============*/
996 	const rw_lock_t*	lock)	/*!< in: rw-lock */
997 {
998 	ulint	waiters;
999 	lint	lock_word;
1000 
1001 	ut_ad(lock);
1002 
1003 	waiters = rw_lock_get_waiters(lock);
1004 	lock_word = lock->lock_word;
1005 
1006 	ut_ad(lock->magic_n == RW_LOCK_MAGIC_N);
1007 	ut_ad(waiters == 0 || waiters == 1);
1008 	ut_ad(lock_word > -(2 * X_LOCK_DECR));
1009 	ut_ad(lock_word <= X_LOCK_DECR);
1010 
1011 	return(true);
1012 }
1013 
1014 /******************************************************************//**
1015 Checks if somebody has locked the rw-lock in the specified mode.
1016 @return true if locked */
1017 bool
rw_lock_is_locked(rw_lock_t * lock,ulint lock_type)1018 rw_lock_is_locked(
1019 /*==============*/
1020 	rw_lock_t*	lock,		/*!< in: rw-lock */
1021 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_S,
1022 					RW_LOCK_X or RW_LOCK_SX */
1023 {
1024 	ut_ad(rw_lock_validate(lock));
1025 
1026 	switch (lock_type) {
1027 	case RW_LOCK_S:
1028 		return(rw_lock_get_reader_count(lock) > 0);
1029 
1030 	case RW_LOCK_X:
1031 		return(rw_lock_get_writer(lock) == RW_LOCK_X);
1032 
1033 	case RW_LOCK_SX:
1034 		return(rw_lock_get_sx_lock_count(lock) > 0);
1035 
1036 	default:
1037 		ut_error;
1038 	}
1039 	return(false);	/* avoid compiler warnings */
1040 }
1041 
1042 /******************************************************************//**
1043 Inserts the debug information for an rw-lock. */
1044 void
rw_lock_add_debug_info(rw_lock_t * lock,ulint pass,ulint lock_type,const char * file_name,ulint line)1045 rw_lock_add_debug_info(
1046 /*===================*/
1047 	rw_lock_t*	lock,		/*!< in: rw-lock */
1048 	ulint		pass,		/*!< in: pass value */
1049 	ulint		lock_type,	/*!< in: lock type */
1050 	const char*	file_name,	/*!< in: file where requested */
1051 	ulint		line)		/*!< in: line where requested */
1052 {
1053 	ut_ad(file_name != NULL);
1054 
1055 	rw_lock_debug_t*	info = rw_lock_debug_create();
1056 
1057 	rw_lock_debug_mutex_enter();
1058 
1059 	info->pass	= pass;
1060 	info->line	= line;
1061 	info->lock_type = lock_type;
1062 	info->file_name = file_name;
1063 	info->thread_id = os_thread_get_curr_id();
1064 
1065 	UT_LIST_ADD_FIRST(lock->debug_list, info);
1066 
1067 	rw_lock_debug_mutex_exit();
1068 
1069 	if (pass == 0 && lock_type != RW_LOCK_X_WAIT) {
1070 
1071 		/* Recursive x while holding SX
1072 		(lock_type == RW_LOCK_X && lock_word == -X_LOCK_HALF_DECR)
1073 		is treated as not-relock (new lock). */
1074 
1075 		if ((lock_type == RW_LOCK_X
1076 		     && lock->lock_word <  -X_LOCK_HALF_DECR)
1077 		    || (lock_type == RW_LOCK_SX
1078 		       && (lock->lock_word < 0 || lock->sx_recursive == 1))) {
1079 
1080 			sync_check_lock_validate(lock);
1081 			sync_check_lock_granted(lock);
1082 		} else {
1083 			sync_check_relock(lock);
1084 		}
1085 	}
1086 }
1087 
1088 /******************************************************************//**
1089 Removes a debug information struct for an rw-lock. */
1090 void
rw_lock_remove_debug_info(rw_lock_t * lock,ulint pass,ulint lock_type)1091 rw_lock_remove_debug_info(
1092 /*======================*/
1093 	rw_lock_t*	lock,		/*!< in: rw-lock */
1094 	ulint		pass,		/*!< in: pass value */
1095 	ulint		lock_type)	/*!< in: lock type */
1096 {
1097 	rw_lock_debug_t*	info;
1098 
1099 	ut_ad(lock);
1100 
1101 	if (pass == 0 && lock_type != RW_LOCK_X_WAIT) {
1102 		sync_check_unlock(lock);
1103 	}
1104 
1105 	rw_lock_debug_mutex_enter();
1106 
1107 	for (info = UT_LIST_GET_FIRST(lock->debug_list);
1108 	     info != 0;
1109 	     info = UT_LIST_GET_NEXT(list, info)) {
1110 
1111 		if (pass == info->pass
1112 		    && (pass != 0
1113 			|| os_thread_eq(info->thread_id,
1114 					os_thread_get_curr_id()))
1115 		    && info->lock_type == lock_type) {
1116 
1117 			/* Found! */
1118 			UT_LIST_REMOVE(lock->debug_list, info);
1119 
1120 			rw_lock_debug_mutex_exit();
1121 
1122 			rw_lock_debug_free(info);
1123 
1124 			return;
1125 		}
1126 	}
1127 
1128 	ut_error;
1129 }
1130 
1131 /******************************************************************//**
1132 Checks if the thread has locked the rw-lock in the specified mode, with
1133 the pass value == 0.
1134 @return TRUE if locked */
1135 ibool
rw_lock_own(rw_lock_t * lock,ulint lock_type)1136 rw_lock_own(
1137 /*========*/
1138 	rw_lock_t*	lock,		/*!< in: rw-lock */
1139 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_S,
1140 					RW_LOCK_X */
1141 {
1142 	ut_ad(lock);
1143 	ut_ad(rw_lock_validate(lock));
1144 
1145 	rw_lock_debug_mutex_enter();
1146 
1147 	for (const rw_lock_debug_t* info = UT_LIST_GET_FIRST(lock->debug_list);
1148 	     info != NULL;
1149 	     info = UT_LIST_GET_NEXT(list, info)) {
1150 
1151 		if (os_thread_eq(info->thread_id, os_thread_get_curr_id())
1152 		    && info->pass == 0
1153 		    && info->lock_type == lock_type) {
1154 
1155 			rw_lock_debug_mutex_exit();
1156 			/* Found! */
1157 
1158 			return(TRUE);
1159 		}
1160 	}
1161 	rw_lock_debug_mutex_exit();
1162 
1163 	return(FALSE);
1164 }
1165 
1166 /** For collecting the debug information for a thread's rw-lock */
1167 typedef std::vector<rw_lock_debug_t*> Infos;
1168 
1169 /** Get the thread debug info
1170 @param[in]	infos		The rw-lock mode owned by the threads
1171 @param[in]	lock		rw-lock to check
1172 @return the thread debug info or NULL if not found */
1173 void
rw_lock_get_debug_info(const rw_lock_t * lock,Infos * infos)1174 rw_lock_get_debug_info(const rw_lock_t* lock, Infos* infos)
1175 {
1176 	rw_lock_debug_t*	info = NULL;
1177 
1178 	ut_ad(rw_lock_validate(lock));
1179 
1180 	rw_lock_debug_mutex_enter();
1181 
1182 	for (info = UT_LIST_GET_FIRST(lock->debug_list);
1183 	     info != NULL;
1184 	     info = UT_LIST_GET_NEXT(list, info)) {
1185 
1186 		if (os_thread_eq(info->thread_id, os_thread_get_curr_id())) {
1187 
1188 			infos->push_back(info);
1189 		}
1190 	}
1191 
1192 	rw_lock_debug_mutex_exit();
1193 }
1194 
1195 /** Checks if the thread has locked the rw-lock in the specified mode, with
1196 the pass value == 0.
1197 @param[in]	lock		rw-lock
1198 @param[in]	flags		specify lock types with OR of the
1199 				rw_lock_flag_t values
1200 @return true if locked */
1201 bool
rw_lock_own_flagged(const rw_lock_t * lock,rw_lock_flags_t flags)1202 rw_lock_own_flagged(
1203 	const rw_lock_t*	lock,
1204 	rw_lock_flags_t		flags)
1205 {
1206 	Infos	infos;
1207 
1208 	rw_lock_get_debug_info(lock, &infos);
1209 
1210 	Infos::const_iterator	end = infos.end();
1211 
1212 	for (Infos::const_iterator it = infos.begin(); it != end; ++it) {
1213 
1214 		const rw_lock_debug_t*	info = *it;
1215 
1216 		ut_ad(os_thread_eq(info->thread_id, os_thread_get_curr_id()));
1217 
1218 		if (info->pass != 0) {
1219 			continue;
1220 		}
1221 
1222 		switch (info->lock_type) {
1223 		case RW_LOCK_S:
1224 
1225 			if (flags & RW_LOCK_FLAG_S) {
1226 				return(true);
1227 			}
1228 			break;
1229 
1230 		case RW_LOCK_X:
1231 
1232 			if (flags & RW_LOCK_FLAG_X) {
1233 				return(true);
1234 			}
1235 			break;
1236 
1237 		case RW_LOCK_SX:
1238 
1239 			if (flags & RW_LOCK_FLAG_SX) {
1240 				return(true);
1241 			}
1242 		}
1243 	}
1244 
1245 	return(false);
1246 }
1247 
1248 /***************************************************************//**
1249 Prints debug info of currently locked rw-locks. */
1250 void
rw_lock_list_print_info(FILE * file)1251 rw_lock_list_print_info(
1252 /*====================*/
1253 	FILE*	file)		/*!< in: file where to print */
1254 {
1255 	ulint		count = 0;
1256 
1257 	mutex_enter(&rw_lock_list_mutex);
1258 
1259 	fputs("-------------\n"
1260 	      "RW-LATCH INFO\n"
1261 	      "-------------\n", file);
1262 
1263 	for (const rw_lock_t* lock = UT_LIST_GET_FIRST(rw_lock_list);
1264 	     lock != NULL;
1265 	     lock = UT_LIST_GET_NEXT(list, lock)) {
1266 
1267 		count++;
1268 
1269 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
1270 		mutex_enter(&lock->mutex);
1271 #endif /* INNODB_RW_LOCKS_USE_ATOMICS */
1272 
1273 		if (lock->lock_word != X_LOCK_DECR) {
1274 
1275 			fprintf(file, "RW-LOCK: %p ", (void*) lock);
1276 
1277 			if (rw_lock_get_waiters(lock)) {
1278 				fputs(" Waiters for the lock exist\n", file);
1279 			} else {
1280 				putc('\n', file);
1281 			}
1282 
1283 			rw_lock_debug_t* info;
1284 
1285 			rw_lock_debug_mutex_enter();
1286 
1287 			for (info = UT_LIST_GET_FIRST(lock->debug_list);
1288 			     info != NULL;
1289 			     info = UT_LIST_GET_NEXT(list, info)) {
1290 
1291 				rw_lock_debug_print(file, info);
1292 			}
1293 
1294 			rw_lock_debug_mutex_exit();
1295 		}
1296 
1297 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
1298 		mutex_exit(&lock->mutex);
1299 #endif /* INNODB_RW_LOCKS_USE_ATOMICS */
1300 	}
1301 
1302 	fprintf(file, "Total number of rw-locks " ULINTPF "\n", count);
1303 	mutex_exit(&rw_lock_list_mutex);
1304 }
1305 
1306 /***************************************************************//**
1307 Prints debug info of an rw-lock. */
1308 void
rw_lock_print(rw_lock_t * lock)1309 rw_lock_print(
1310 /*==========*/
1311 	rw_lock_t*	lock)	/*!< in: rw-lock */
1312 {
1313 	rw_lock_debug_t* info;
1314 
1315 	fprintf(stderr,
1316 		"-------------\n"
1317 		"RW-LATCH INFO\n"
1318 		"RW-LATCH: %p ", (void*) lock);
1319 
1320 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
1321 	/* We used to acquire lock->mutex here, but it would cause a
1322 	recursive call to sync_thread_add_level() if UNIV_DEBUG
1323 	is defined.  Since this function is only invoked from
1324 	sync_thread_levels_g(), let us choose the smaller evil:
1325 	performing dirty reads instead of causing bogus deadlocks or
1326 	assertion failures. */
1327 #endif /* INNODB_RW_LOCKS_USE_ATOMICS */
1328 
1329 	if (lock->lock_word != X_LOCK_DECR) {
1330 
1331 		if (rw_lock_get_waiters(lock)) {
1332 			fputs(" Waiters for the lock exist\n", stderr);
1333 		} else {
1334 			putc('\n', stderr);
1335 		}
1336 
1337 		rw_lock_debug_mutex_enter();
1338 
1339 		for (info = UT_LIST_GET_FIRST(lock->debug_list);
1340 		     info != NULL;
1341 		     info = UT_LIST_GET_NEXT(list, info)) {
1342 
1343 			rw_lock_debug_print(stderr, info);
1344 		}
1345 
1346 		rw_lock_debug_mutex_exit();
1347 	}
1348 }
1349 
1350 /*********************************************************************//**
1351 Prints info of a debug struct. */
1352 void
rw_lock_debug_print(FILE * f,const rw_lock_debug_t * info)1353 rw_lock_debug_print(
1354 /*================*/
1355 	FILE*			f,	/*!< in: output stream */
1356 	const rw_lock_debug_t*	info)	/*!< in: debug struct */
1357 {
1358 	ulint	rwt = info->lock_type;
1359 
1360 	fprintf(f, "Locked: thread %lu file %s line %lu  ",
1361 		static_cast<ulong>(os_thread_pf(info->thread_id)),
1362 		sync_basename(info->file_name),
1363 		static_cast<ulong>(info->line));
1364 
1365 	switch (rwt) {
1366 	case RW_LOCK_S:
1367 		fputs("S-LOCK", f);
1368 		break;
1369 	case RW_LOCK_X:
1370 		fputs("X-LOCK", f);
1371 		break;
1372 	case RW_LOCK_SX:
1373 		fputs("SX-LOCK", f);
1374 		break;
1375 	case RW_LOCK_X_WAIT:
1376 		fputs("WAIT X-LOCK", f);
1377 		break;
1378 	default:
1379 		ut_error;
1380 	}
1381 
1382 	if (info->pass != 0) {
1383 		fprintf(f, " pass value %lu", (ulong) info->pass);
1384 	}
1385 
1386 	fprintf(f, "\n");
1387 }
1388 
1389 /***************************************************************//**
1390 Returns the number of currently locked rw-locks. Works only in the debug
1391 version.
1392 @return number of locked rw-locks */
1393 ulint
rw_lock_n_locked(void)1394 rw_lock_n_locked(void)
1395 /*==================*/
1396 {
1397 	ulint		count = 0;
1398 
1399 	mutex_enter(&rw_lock_list_mutex);
1400 
1401 	for (const rw_lock_t* lock = UT_LIST_GET_FIRST(rw_lock_list);
1402 	     lock != NULL;
1403 	     lock = UT_LIST_GET_NEXT(list, lock)) {
1404 
1405 		if (lock->lock_word != X_LOCK_DECR) {
1406 			count++;
1407 		}
1408 	}
1409 
1410 	mutex_exit(&rw_lock_list_mutex);
1411 
1412 	return(count);
1413 }
1414 
1415 /** Print where it was locked from
1416 @return the string representation */
1417 std::string
locked_from() const1418 rw_lock_t::locked_from() const
1419 {
1420 	/* Note: For X locks it can be locked form multiple places because
1421 	the same thread can call X lock recursively. */
1422 
1423 	std::ostringstream	msg;
1424 	Infos			infos;
1425 
1426 	rw_lock_get_debug_info(this, &infos);
1427 
1428 	ulint			i = 0;
1429 	Infos::const_iterator	end = infos.end();
1430 
1431 	for (Infos::const_iterator it = infos.begin(); it != end; ++it, ++i) {
1432 
1433 		const rw_lock_debug_t*	info = *it;
1434 
1435 		ut_ad(os_thread_eq(info->thread_id, os_thread_get_curr_id()));
1436 
1437 		if (i > 0) {
1438 			msg << ", ";
1439 		}
1440 
1441 		msg << info->file_name << ":" << info->line;
1442 	}
1443 
1444 	return(msg.str());
1445 
1446 }
1447 
1448 /** Print the rw-lock information.
1449 @return the string representation */
1450 std::string
to_string() const1451 rw_lock_t::to_string() const
1452 {
1453 	std::ostringstream	msg;
1454 
1455 	msg << "RW-LATCH: "
1456 	    << "thread id " << os_thread_pf(os_thread_get_curr_id())
1457 	    << " addr: " << this
1458 	    << " Locked from: " << locked_from().c_str();
1459 
1460 	return(msg.str());
1461 }
1462 #endif /* UNIV_DEBUG */
1463