1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2008, Google Inc.
5 Copyright (c) 2017, 2020, MariaDB Corporation.
6 
7 Portions of this file contain modifications contributed and copyrighted by
8 Google, Inc. Those modifications are gratefully acknowledged and are described
9 briefly in the InnoDB documentation. The contributions by Google are
10 incorporated with their permission, and subject to the conditions contained in
11 the file COPYING.Google.
12 
13 This program is free software; you can redistribute it and/or modify it under
14 the terms of the GNU General Public License as published by the Free Software
15 Foundation; version 2 of the License.
16 
17 This program is distributed in the hope that it will be useful, but WITHOUT
18 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file sync/sync0rw.cc
29 The read-write lock (for thread synchronization)
30 
31 Created 9/11/1995 Heikki Tuuri
32 *******************************************************/
33 
34 #include "sync0rw.h"
35 #include "my_cpu.h"
36 #include <my_sys.h>
37 
38 /*
39 	IMPLEMENTATION OF THE RW_LOCK
40 	=============================
41 The status of a rw_lock is held in lock_word. The initial value of lock_word is
42 X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
43 or 1 for each x-lock. This describes the lock state for each value of lock_word:
44 
45 lock_word == X_LOCK_DECR:	Unlocked.
46 X_LOCK_HALF_DECR < lock_word < X_LOCK_DECR:
47 				S locked, no waiting writers.
48 				(X_LOCK_DECR - lock_word) is the number
49 				of S locks.
50 lock_word == X_LOCK_HALF_DECR:	SX locked, no waiting writers.
51 0 < lock_word < X_LOCK_HALF_DECR:
52 				SX locked AND S locked, no waiting writers.
53 				(X_LOCK_HALF_DECR - lock_word) is the number
54 				of S locks.
55 lock_word == 0:			X locked, no waiting writers.
56 -X_LOCK_HALF_DECR < lock_word < 0:
57 				S locked, with a waiting writer.
58 				(-lock_word) is the number of S locks.
59 lock_word == -X_LOCK_HALF_DECR:	X locked and SX locked, no waiting writers.
60 -X_LOCK_DECR < lock_word < -X_LOCK_HALF_DECR:
61 				S locked, with a waiting writer
62 				which has SX lock.
63 				-(lock_word + X_LOCK_HALF_DECR) is the number
64 				of S locks.
65 lock_word == -X_LOCK_DECR:	X locked with recursive X lock (2 X locks).
66 -(X_LOCK_DECR + X_LOCK_HALF_DECR) < lock_word < -X_LOCK_DECR:
67 				X locked. The number of the X locks is:
68 				2 - (lock_word + X_LOCK_DECR)
69 lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR):
70 				X locked with recursive X lock (2 X locks)
71 				and SX locked.
72 lock_word < -(X_LOCK_DECR + X_LOCK_HALF_DECR):
73 				X locked and SX locked.
74 				The number of the X locks is:
75 				2 - (lock_word + X_LOCK_DECR + X_LOCK_HALF_DECR)
76 
77  LOCK COMPATIBILITY MATRIX
78 
79       | S|SX| X|
80     --+--+--+--+
81      S| +| +| -|
82     --+--+--+--+
83     SX| +| -| -|
84     --+--+--+--+
85      X| -| -| -|
86     --+--+--+--+
87 
88 The lock_word is always read and updated atomically and consistently, so that
89 it always represents the state of the lock, and the state of the lock changes
90 with a single atomic operation. This lock_word holds all of the information
91 that a thread needs in order to determine if it is eligible to gain the lock
92 or if it must spin or sleep. The one exception to this is that writer_thread
93 must be verified before recursive write locks: to solve this scenario, we make
94 writer_thread readable by all threads, but only writeable by the x-lock or
95 sx-lock holder.
96 
97 The other members of the lock obey the following rules to remain consistent:
98 
99 writer_thread:	Is used only in recursive x-locking or sx-locking.
100 		This field is 0 at lock creation time and is updated
101 		when x-lock is acquired or when move_ownership is called.
102 		A thread is only allowed to set the value of this field to
103 		it's thread_id i.e.: a thread cannot set writer_thread to
104 		some other thread's id.
105 waiters:	May be set to 1 anytime, but to avoid unnecessary wake-up
106 		signals, it should only be set to 1 when there are threads
107 		waiting on event. Must be 1 when a writer starts waiting to
108 		ensure the current x-locking thread sends a wake-up signal
109 		during unlock. May only be reset to 0 immediately before a
110 		a wake-up signal is sent to event. On most platforms, a
111 		memory barrier is required after waiters is set, and before
112 		verifying lock_word is still held, to ensure some unlocker
113 		really does see the flags new value.
114 event:		Threads wait on event for read or writer lock when another
115 		thread has an x-lock or an x-lock reservation (wait_ex). A
116 		thread may only	wait on event after performing the following
117 		actions in order:
118 		   (1) Record the counter value of event (with os_event_reset).
119 		   (2) Set waiters to 1.
120 		   (3) Verify lock_word <= 0.
121 		(1) must come before (2) to ensure signal is not missed.
122 		(2) must come before (3) to ensure a signal is sent.
123 		These restrictions force the above ordering.
124 		Immediately before sending the wake-up signal, we should:
125 		   (1) Verify lock_word == X_LOCK_DECR (unlocked)
126 		   (2) Reset waiters to 0.
127 wait_ex_event:	A thread may only wait on the wait_ex_event after it has
128 		performed the following actions in order:
129 		   (1) Decrement lock_word by X_LOCK_DECR.
130 		   (2) Record counter value of wait_ex_event (os_event_reset,
131 		       called from sync_array_reserve_cell).
132 		   (3) Verify that lock_word < 0.
133 		(1) must come first to ensures no other threads become reader
134 		or next writer, and notifies unlocker that signal must be sent.
135 		(2) must come before (3) to ensure the signal is not missed.
136 		These restrictions force the above ordering.
137 		Immediately before sending the wake-up signal, we should:
138 		   Verify lock_word == 0 (waiting thread holds x_lock)
139 */
140 
141 rw_lock_stats_t		rw_lock_stats;
142 
143 /* The global list of rw-locks */
144 ilist<rw_lock_t> rw_lock_list;
145 ib_mutex_t		rw_lock_list_mutex;
146 
147 #ifdef UNIV_DEBUG
148 /******************************************************************//**
149 Creates a debug info struct. */
150 static
151 rw_lock_debug_t*
152 rw_lock_debug_create(void);
153 /*======================*/
154 /******************************************************************//**
155 Frees a debug info struct. */
156 static
157 void
158 rw_lock_debug_free(
159 /*===============*/
160 	rw_lock_debug_t* info);
161 
162 /******************************************************************//**
163 Creates a debug info struct.
164 @return own: debug info struct */
165 static
166 rw_lock_debug_t*
rw_lock_debug_create(void)167 rw_lock_debug_create(void)
168 /*======================*/
169 {
170 	return((rw_lock_debug_t*) ut_malloc_nokey(sizeof(rw_lock_debug_t)));
171 }
172 
173 /******************************************************************//**
174 Frees a debug info struct. */
175 static
176 void
rw_lock_debug_free(rw_lock_debug_t * info)177 rw_lock_debug_free(
178 /*===============*/
179 	rw_lock_debug_t* info)
180 {
181 	ut_free(info);
182 }
183 #endif /* UNIV_DEBUG */
184 
185 /******************************************************************//**
186 Creates, or rather, initializes an rw-lock object in a specified memory
187 location (which must be appropriately aligned). The rw-lock is initialized
188 to the non-locked state. Explicit freeing of the rw-lock with rw_lock_free
189 is necessary only if the memory block containing it is freed. */
190 void
rw_lock_create_func(rw_lock_t * lock,latch_level_t level,const char * cfile_name,unsigned cline)191 rw_lock_create_func(
192 /*================*/
193 	rw_lock_t*	lock,		/*!< in: pointer to memory */
194 #ifdef UNIV_DEBUG
195 	latch_level_t	level,		/*!< in: level */
196 #endif /* UNIV_DEBUG */
197 	const char*	cfile_name,	/*!< in: file name where created */
198 	unsigned	cline)		/*!< in: file line where created */
199 {
200 #if defined(UNIV_DEBUG) && !defined(UNIV_PFS_RWLOCK)
201 	/* It should have been created in pfs_rw_lock_create_func() */
202 	new(lock) rw_lock_t();
203 #endif /* UNIV_DEBUG */
204 
205 	lock->lock_word = X_LOCK_DECR;
206 	lock->waiters = 0;
207 
208 	lock->sx_recursive = 0;
209 	lock->writer_thread= 0;
210 
211 #ifdef UNIV_DEBUG
212 	lock->m_rw_lock = true;
213 
214 	UT_LIST_INIT(lock->debug_list, &rw_lock_debug_t::list);
215 
216 	lock->m_id = sync_latch_get_id(sync_latch_get_name(level));
217 	ut_a(lock->m_id != LATCH_ID_NONE);
218 
219 	lock->level = level;
220 #endif /* UNIV_DEBUG */
221 
222 	lock->cfile_name = cfile_name;
223 
224 	/* This should hold in practice. If it doesn't then we need to
225 	split the source file anyway. Or create the locks on lines
226 	less than 8192. cline is unsigned:13. */
227 	ut_ad(cline <= ((1U << 13) - 1));
228 	lock->cline = cline & ((1U << 13) - 1);
229 	lock->count_os_wait = 0;
230 	lock->last_x_file_name = "not yet reserved";
231 	lock->last_x_line = 0;
232 	lock->event = os_event_create(0);
233 	lock->wait_ex_event = os_event_create(0);
234 
235 	lock->is_block_lock = 0;
236 
237 	ut_d(lock->created = true);
238 
239 	mutex_enter(&rw_lock_list_mutex);
240 	rw_lock_list.push_front(*lock);
241 	mutex_exit(&rw_lock_list_mutex);
242 }
243 
244 /******************************************************************//**
245 Calling this function is obligatory only if the memory buffer containing
246 the rw-lock is freed. Removes an rw-lock object from the global list. The
247 rw-lock is checked to be in the non-locked state. */
248 void
rw_lock_free_func(rw_lock_t * lock)249 rw_lock_free_func(
250 /*==============*/
251 	rw_lock_t*	lock)	/*!< in/out: rw-lock */
252 {
253 	ut_ad(rw_lock_validate(lock));
254 	ut_a(lock->lock_word == X_LOCK_DECR);
255 
256 	ut_d(lock->created = false);
257 
258 	mutex_enter(&rw_lock_list_mutex);
259 
260 	os_event_destroy(lock->event);
261 
262 	os_event_destroy(lock->wait_ex_event);
263 
264 	rw_lock_list.remove(*lock);
265 
266 	mutex_exit(&rw_lock_list_mutex);
267 }
268 
269 /******************************************************************//**
270 Lock an rw-lock in shared mode for the current thread. If the rw-lock is
271 locked in exclusive mode, or there is an exclusive lock request waiting,
272 the function spins a preset time (controlled by srv_n_spin_wait_rounds), waiting
273 for the lock, before suspending the thread. */
274 void
rw_lock_s_lock_spin(rw_lock_t * lock,ulint pass,const char * file_name,unsigned line)275 rw_lock_s_lock_spin(
276 /*================*/
277 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
278 	ulint		pass,	/*!< in: pass value; != 0, if the lock
279 				will be passed to another thread to unlock */
280 	const char*	file_name, /*!< in: file name where lock requested */
281 	unsigned	line)	/*!< in: line where requested */
282 {
283 	ulint		i = 0;	/* spin round count */
284 	sync_array_t*	sync_arr;
285 	lint		spin_count = 0;
286 	int64_t		count_os_wait = 0;
287 
288 	/* We reuse the thread id to index into the counter, cache
289 	it here for efficiency. */
290 
291 	ut_ad(rw_lock_validate(lock));
292 
293 	rw_lock_stats.rw_s_spin_wait_count.inc();
294 
295 lock_loop:
296 
297 	/* Spin waiting for the writer field to become free */
298 	HMT_low();
299 	ulint j = i;
300 	while (i < srv_n_spin_wait_rounds &&
301 	       lock->lock_word <= 0) {
302 		ut_delay(srv_spin_wait_delay);
303 		i++;
304 	}
305 
306 	HMT_medium();
307 	if (i >= srv_n_spin_wait_rounds) {
308 		os_thread_yield();
309 	}
310 
311 	spin_count += lint(i - j);
312 
313 	/* We try once again to obtain the lock */
314 	if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
315 
316 		if (count_os_wait > 0) {
317 			lock->count_os_wait +=
318 				static_cast<uint32_t>(count_os_wait);
319 			rw_lock_stats.rw_s_os_wait_count.add(count_os_wait);
320 		}
321 
322 		rw_lock_stats.rw_s_spin_round_count.add(spin_count);
323 
324 		return; /* Success */
325 	} else {
326 
327 		if (i < srv_n_spin_wait_rounds) {
328 			goto lock_loop;
329 		}
330 
331 
332 		++count_os_wait;
333 
334 		sync_cell_t*	cell;
335 
336 		sync_arr = sync_array_get_and_reserve_cell(
337 				lock, RW_LOCK_S, file_name, line, &cell);
338 
339 		/* Set waiters before checking lock_word to ensure wake-up
340 		signal is sent. This may lead to some unnecessary signals. */
341 		lock->waiters.exchange(1, std::memory_order_acquire);
342 
343 		if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
344 
345 			sync_array_free_cell(sync_arr, cell);
346 
347 			if (count_os_wait > 0) {
348 
349 				lock->count_os_wait +=
350 					static_cast<uint32_t>(count_os_wait);
351 
352 				rw_lock_stats.rw_s_os_wait_count.add(
353 					count_os_wait);
354 			}
355 
356 			rw_lock_stats.rw_s_spin_round_count.add(spin_count);
357 
358 			return; /* Success */
359 		}
360 
361 		/* see comments in trx_commit_low() to
362 		before_trx_state_committed_in_memory explaining
363 		this care to invoke the following sync check.*/
364 #ifndef DBUG_OFF
365 #ifdef UNIV_DEBUG
366 		if (lock->get_level() != SYNC_DICT_OPERATION) {
367 			DEBUG_SYNC_C("rw_s_lock_waiting");
368 		}
369 #endif
370 #endif
371 		sync_array_wait_event(sync_arr, cell);
372 
373 		i = 0;
374 
375 		goto lock_loop;
376 	}
377 }
378 
379 /******************************************************************//**
380 This function is used in the insert buffer to move the ownership of an
381 x-latch on a buffer frame to the current thread. The x-latch was set by
382 the buffer read operation and it protected the buffer frame while the
383 read was done. The ownership is moved because we want that the current
384 thread is able to acquire a second x-latch which is stored in an mtr.
385 This, in turn, is needed to pass the debug checks of index page
386 operations. */
387 void
rw_lock_x_lock_move_ownership(rw_lock_t * lock)388 rw_lock_x_lock_move_ownership(
389 /*==========================*/
390 	rw_lock_t*	lock)	/*!< in: lock which was x-locked in the
391 				buffer read */
392 {
393 	ut_ad(rw_lock_is_locked(lock, RW_LOCK_X));
394 
395 	lock->writer_thread = os_thread_get_curr_id();
396 }
397 
398 /******************************************************************//**
399 Function for the next writer to call. Waits for readers to exit.
400 The caller must have already decremented lock_word by X_LOCK_DECR. */
401 UNIV_INLINE
402 void
rw_lock_x_lock_wait_func(rw_lock_t * lock,ulint pass,lint threshold,const char * file_name,unsigned line)403 rw_lock_x_lock_wait_func(
404 /*=====================*/
405 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
406 #ifdef UNIV_DEBUG
407 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
408 				be passed to another thread to unlock */
409 #endif
410 	lint		threshold,/*!< in: threshold to wait for */
411 	const char*	file_name,/*!< in: file name where lock requested */
412 	unsigned	line)	/*!< in: line where requested */
413 {
414 	ulint		i = 0;
415 	lint		n_spins = 0;
416 	sync_array_t*	sync_arr;
417 	int64_t		count_os_wait = 0;
418 
419 	ut_ad(lock->lock_word <= threshold);
420 
421 	HMT_low();
422 	while (lock->lock_word < threshold) {
423 		ut_delay(srv_spin_wait_delay);
424 
425 		if (i < srv_n_spin_wait_rounds) {
426 			i++;
427 			continue;
428 		}
429 
430 		/* If there is still a reader, then go to sleep.*/
431 		n_spins += i;
432 
433 		sync_cell_t*	cell;
434 
435 		sync_arr = sync_array_get_and_reserve_cell(
436 			lock, RW_LOCK_X_WAIT, file_name, line, &cell);
437 
438 		i = 0;
439 
440 		/* Check lock_word to ensure wake-up isn't missed.*/
441 		if (lock->lock_word < threshold) {
442 			++count_os_wait;
443 
444 			/* Add debug info as it is needed to detect possible
445 			deadlock. We must add info for WAIT_EX thread for
446 			deadlock detection to work properly. */
447 			ut_d(rw_lock_add_debug_info(
448 					lock, pass, RW_LOCK_X_WAIT,
449 					file_name, line));
450 
451 			sync_array_wait_event(sync_arr, cell);
452 
453 			ut_d(rw_lock_remove_debug_info(
454 					lock, pass, RW_LOCK_X_WAIT));
455 
456 			/* It is possible to wake when lock_word < 0.
457 			We must pass the while-loop check to proceed.*/
458 
459 		} else {
460 			sync_array_free_cell(sync_arr, cell);
461 			break;
462 		}
463 	}
464 	HMT_medium();
465 	rw_lock_stats.rw_x_spin_round_count.add(n_spins);
466 
467 	if (count_os_wait > 0) {
468 		lock->count_os_wait += static_cast<uint32_t>(count_os_wait);
469 		rw_lock_stats.rw_x_os_wait_count.add(count_os_wait);
470 	}
471 }
472 
473 #ifdef UNIV_DEBUG
474 # define rw_lock_x_lock_wait(L, P, T, F, O)		\
475 	rw_lock_x_lock_wait_func(L, P, T, F, O)
476 #else
477 # define rw_lock_x_lock_wait(L, P, T, F, O)		\
478 	rw_lock_x_lock_wait_func(L, T, F, O)
479 #endif /* UNIV_DBEUG */
480 
481 /******************************************************************//**
482 Low-level function for acquiring an exclusive lock.
483 @return FALSE if did not succeed, TRUE if success. */
484 UNIV_INLINE
485 ibool
rw_lock_x_lock_low(rw_lock_t * lock,ulint pass,const char * file_name,unsigned line)486 rw_lock_x_lock_low(
487 /*===============*/
488 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
489 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
490 				be passed to another thread to unlock */
491 	const char*	file_name,/*!< in: file name where lock requested */
492 	unsigned	line)	/*!< in: line where requested */
493 {
494 	if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, X_LOCK_HALF_DECR)) {
495 
496 		/* As we are going to write our own thread id in that field it
497 		must be that the current writer_thread value is not active. */
498 		ut_a(!lock->writer_thread);
499 
500 		/* Decrement occurred: we are writer or next-writer. */
501 		if (!pass)
502 		{
503 			lock->writer_thread = os_thread_get_curr_id();
504 		}
505 
506 		rw_lock_x_lock_wait(lock, pass, 0, file_name, line);
507 
508 	} else {
509 		os_thread_id_t	thread_id = os_thread_get_curr_id();
510 
511 		/* Decrement failed: An X or SX lock is held by either
512 		this thread or another. Try to relock. */
513 		if (!pass && os_thread_eq(lock->writer_thread, thread_id)) {
514 			/* Other s-locks can be allowed. If it is request x
515 			recursively while holding sx lock, this x lock should
516 			be along with the latching-order. */
517 
518 			/* The existing X or SX lock is from this thread */
519 			if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, 0)) {
520 				/* There is at least one SX-lock from this
521 				thread, but no X-lock. */
522 
523 				/* Wait for any the other S-locks to be
524 				released. */
525 				rw_lock_x_lock_wait(
526 					lock, pass, -X_LOCK_HALF_DECR,
527 					file_name, line);
528 
529 			} else {
530 				int32_t lock_word = lock->lock_word;
531 				/* At least one X lock by this thread already
532 				exists. Add another. */
533 				if (lock_word == 0
534 				    || lock_word == -X_LOCK_HALF_DECR) {
535 					lock->lock_word.fetch_sub(X_LOCK_DECR);
536 				} else {
537 					ut_ad(lock_word <= -X_LOCK_DECR);
538 					lock->lock_word.fetch_sub(1);
539 				}
540 			}
541 
542 		} else {
543 			/* Another thread locked before us */
544 			return(FALSE);
545 		}
546 	}
547 
548 	ut_d(rw_lock_add_debug_info(lock, pass, RW_LOCK_X, file_name, line));
549 
550 	lock->last_x_file_name = file_name;
551 	lock->last_x_line = line & ((1U << 14) - 1);
552 
553 	return(TRUE);
554 }
555 
556 /******************************************************************//**
557 Low-level function for acquiring an sx lock.
558 @return FALSE if did not succeed, TRUE if success. */
559 ibool
rw_lock_sx_lock_low(rw_lock_t * lock,ulint pass,const char * file_name,unsigned line)560 rw_lock_sx_lock_low(
561 /*================*/
562 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
563 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
564 				be passed to another thread to unlock */
565 	const char*	file_name,/*!< in: file name where lock requested */
566 	unsigned	line)	/*!< in: line where requested */
567 {
568 	if (rw_lock_lock_word_decr(lock, X_LOCK_HALF_DECR, X_LOCK_HALF_DECR)) {
569 
570 		/* As we are going to write our own thread id in that field it
571 		must be that the current writer_thread value is not active. */
572 		ut_a(!lock->writer_thread);
573 
574 		/* Decrement occurred: we are the SX lock owner. */
575 		if (!pass)
576 		{
577 			lock->writer_thread = os_thread_get_curr_id();
578 		}
579 
580 		lock->sx_recursive = 1;
581 	} else {
582 		os_thread_id_t	thread_id = os_thread_get_curr_id();
583 
584 		/* Decrement failed: It already has an X or SX lock by this
585 		thread or another thread. If it is this thread, relock,
586 		else fail. */
587 		if (!pass && os_thread_eq(lock->writer_thread, thread_id)) {
588 			/* This thread owns an X or SX lock */
589 			if (lock->sx_recursive++ == 0) {
590 				/* This thread is making first SX-lock request
591 				and it must be holding at least one X-lock here
592 				because:
593 
594 				* There can't be a WAIT_EX thread because we are
595 				  the thread which has it's thread_id written in
596 				  the writer_thread field and we are not waiting.
597 
598 				* Any other X-lock thread cannot exist because
599 				  it must update recursive flag only after
600 				  updating the thread_id. Had there been
601 				  a concurrent X-locking thread which succeeded
602 				  in decrementing the lock_word it must have
603 				  written it's thread_id before setting the
604 				  recursive flag. As we cleared the if()
605 				  condition above therefore we must be the only
606 				  thread working on this lock and it is safe to
607 				  read and write to the lock_word. */
608 
609 #ifdef UNIV_DEBUG
610 				auto lock_word =
611 #endif
612 				lock->lock_word.fetch_sub(X_LOCK_HALF_DECR,
613 							std::memory_order_relaxed);
614 
615 				ut_ad((lock_word == 0)
616 				      || ((lock_word <= -X_LOCK_DECR)
617 					  && (lock_word
618 					      > -(X_LOCK_DECR
619 						  + X_LOCK_HALF_DECR))));
620 			}
621 		} else {
622 			/* Another thread locked before us */
623 			return(FALSE);
624 		}
625 	}
626 
627 	ut_d(rw_lock_add_debug_info(lock, pass, RW_LOCK_SX, file_name, line));
628 
629 	lock->last_x_file_name = file_name;
630 	lock->last_x_line = line & ((1U << 14) - 1);
631 
632 	return(TRUE);
633 }
634 
635 /******************************************************************//**
636 NOTE! Use the corresponding macro, not directly this function! Lock an
637 rw-lock in exclusive mode for the current thread. If the rw-lock is locked
638 in shared or exclusive mode, or there is an exclusive lock request waiting,
639 the function spins a preset time (controlled by srv_n_spin_wait_rounds), waiting
640 for the lock before suspending the thread. If the same thread has an x-lock
641 on the rw-lock, locking succeed, with the following exception: if pass != 0,
642 only a single x-lock may be taken on the lock. NOTE: If the same thread has
643 an s-lock, locking does not succeed! */
644 void
rw_lock_x_lock_func(rw_lock_t * lock,ulint pass,const char * file_name,unsigned line)645 rw_lock_x_lock_func(
646 /*================*/
647 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
648 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
649 				be passed to another thread to unlock */
650 	const char*	file_name,/*!< in: file name where lock requested */
651 	unsigned	line)	/*!< in: line where requested */
652 {
653 	ulint		i = 0;
654 	sync_array_t*	sync_arr;
655 	lint		spin_count = 0;
656 	int64_t		count_os_wait = 0;
657 
658 	ut_ad(rw_lock_validate(lock));
659 	ut_ad(!rw_lock_own(lock, RW_LOCK_S));
660 
661 	if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
662 		/* Locking succeeded */
663 		return;
664 	}
665 	rw_lock_stats.rw_x_spin_wait_count.inc();
666 
667 lock_loop:
668 
669 	if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
670 
671 		if (count_os_wait > 0) {
672 			lock->count_os_wait +=
673 				static_cast<uint32_t>(count_os_wait);
674 			rw_lock_stats.rw_x_os_wait_count.add(count_os_wait);
675 		}
676 
677 		rw_lock_stats.rw_x_spin_round_count.add(spin_count);
678 
679 		/* Locking succeeded */
680 		return;
681 
682 	} else {
683 
684 		/* Spin waiting for the lock_word to become free */
685 		HMT_low();
686 		ulint j = i;
687 		while (i < srv_n_spin_wait_rounds
688 		       && lock->lock_word <= X_LOCK_HALF_DECR) {
689 			ut_delay(srv_spin_wait_delay);
690 			i++;
691 		}
692 
693 		HMT_medium();
694 		spin_count += lint(i - j);
695 
696 		if (i >= srv_n_spin_wait_rounds) {
697 
698 			os_thread_yield();
699 
700 		} else {
701 
702 			goto lock_loop;
703 		}
704 	}
705 
706 	sync_cell_t*	cell;
707 
708 	sync_arr = sync_array_get_and_reserve_cell(
709 			lock, RW_LOCK_X, file_name, line, &cell);
710 
711 	/* Waiters must be set before checking lock_word, to ensure signal
712 	is sent. This could lead to a few unnecessary wake-up signals. */
713 	lock->waiters.exchange(1, std::memory_order_acquire);
714 
715 	if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
716 		sync_array_free_cell(sync_arr, cell);
717 
718 		if (count_os_wait > 0) {
719 			lock->count_os_wait +=
720 				static_cast<uint32_t>(count_os_wait);
721 			rw_lock_stats.rw_x_os_wait_count.add(count_os_wait);
722 		}
723 
724 		rw_lock_stats.rw_x_spin_round_count.add(spin_count);
725 
726 		/* Locking succeeded */
727 		return;
728 	}
729 
730 	++count_os_wait;
731 
732 	sync_array_wait_event(sync_arr, cell);
733 
734 	i = 0;
735 
736 	goto lock_loop;
737 }
738 
739 /******************************************************************//**
740 NOTE! Use the corresponding macro, not directly this function! Lock an
741 rw-lock in SX mode for the current thread. If the rw-lock is locked
742 in exclusive mode, or there is an exclusive lock request waiting,
743 the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
744 for the lock, before suspending the thread. If the same thread has an x-lock
745 on the rw-lock, locking succeed, with the following exception: if pass != 0,
746 only a single sx-lock may be taken on the lock. NOTE: If the same thread has
747 an s-lock, locking does not succeed! */
748 void
rw_lock_sx_lock_func(rw_lock_t * lock,ulint pass,const char * file_name,unsigned line)749 rw_lock_sx_lock_func(
750 /*=================*/
751 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
752 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
753 				be passed to another thread to unlock */
754 	const char*	file_name,/*!< in: file name where lock requested */
755 	unsigned	line)	/*!< in: line where requested */
756 
757 {
758 	ulint		i = 0;
759 	sync_array_t*	sync_arr;
760 	lint		spin_count = 0;
761 	int64_t		count_os_wait = 0;
762 
763 	ut_ad(rw_lock_validate(lock));
764 	ut_ad(!rw_lock_own(lock, RW_LOCK_S));
765 
766 	if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
767 		/* Locking succeeded */
768 		return;
769 	}
770 
771 	rw_lock_stats.rw_sx_spin_wait_count.inc();
772 
773 lock_loop:
774 
775 	if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
776 
777 		if (count_os_wait > 0) {
778 			lock->count_os_wait +=
779 				static_cast<uint32_t>(count_os_wait);
780 			rw_lock_stats.rw_sx_os_wait_count.add(count_os_wait);
781 		}
782 
783 		rw_lock_stats.rw_sx_spin_round_count.add(spin_count);
784 
785 		/* Locking succeeded */
786 		return;
787 
788 	} else {
789 
790 		/* Spin waiting for the lock_word to become free */
791 		ulint j = i;
792 		while (i < srv_n_spin_wait_rounds
793 		       && lock->lock_word <= X_LOCK_HALF_DECR) {
794 			ut_delay(srv_spin_wait_delay);
795 			i++;
796 		}
797 
798 		spin_count += lint(i - j);
799 
800 		if (i >= srv_n_spin_wait_rounds) {
801 
802 			os_thread_yield();
803 
804 		} else {
805 
806 			goto lock_loop;
807 		}
808 	}
809 
810 	sync_cell_t*	cell;
811 
812 	sync_arr = sync_array_get_and_reserve_cell(
813 			lock, RW_LOCK_SX, file_name, line, &cell);
814 
815 	/* Waiters must be set before checking lock_word, to ensure signal
816 	is sent. This could lead to a few unnecessary wake-up signals. */
817 	lock->waiters.exchange(1, std::memory_order_acquire);
818 
819 	if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
820 
821 		sync_array_free_cell(sync_arr, cell);
822 
823 		if (count_os_wait > 0) {
824 			lock->count_os_wait +=
825 				static_cast<uint32_t>(count_os_wait);
826 			rw_lock_stats.rw_sx_os_wait_count.add(count_os_wait);
827 		}
828 
829 		rw_lock_stats.rw_sx_spin_round_count.add(spin_count);
830 
831 		/* Locking succeeded */
832 		return;
833 	}
834 
835 	++count_os_wait;
836 
837 	sync_array_wait_event(sync_arr, cell);
838 
839 	i = 0;
840 
841 	goto lock_loop;
842 }
843 
844 #ifdef UNIV_DEBUG
845 
846 /******************************************************************//**
847 Checks that the rw-lock has been initialized and that there are no
848 simultaneous shared and exclusive locks.
849 @return true */
850 bool
rw_lock_validate(const rw_lock_t * lock)851 rw_lock_validate(
852 /*=============*/
853 	const rw_lock_t*	lock)	/*!< in: rw-lock */
854 {
855 	ut_ad(lock);
856 
857 	ut_ad(lock->created);
858 
859 	int32_t lock_word = lock->lock_word;
860 
861 	ut_ad(lock->waiters < 2);
862 	ut_ad(lock_word > -(2 * X_LOCK_DECR));
863 	ut_ad(lock_word <= X_LOCK_DECR);
864 
865 	return(true);
866 }
867 
868 /******************************************************************//**
869 Checks if somebody has locked the rw-lock in the specified mode.
870 @return true if locked */
871 bool
rw_lock_is_locked(rw_lock_t * lock,ulint lock_type)872 rw_lock_is_locked(
873 /*==============*/
874 	rw_lock_t*	lock,		/*!< in: rw-lock */
875 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_S,
876 					RW_LOCK_X or RW_LOCK_SX */
877 {
878 	ut_ad(rw_lock_validate(lock));
879 
880 	switch (lock_type) {
881 	case RW_LOCK_S:
882 		return(rw_lock_get_reader_count(lock) > 0);
883 
884 	case RW_LOCK_X:
885 		return(rw_lock_get_writer(lock) == RW_LOCK_X);
886 
887 	case RW_LOCK_SX:
888 		return(rw_lock_get_sx_lock_count(lock) > 0);
889 
890 	default:
891 		ut_error;
892 	}
893 	return(false);	/* avoid compiler warnings */
894 }
895 
896 /******************************************************************//**
897 Inserts the debug information for an rw-lock. */
898 void
rw_lock_add_debug_info(rw_lock_t * lock,ulint pass,ulint lock_type,const char * file_name,unsigned line)899 rw_lock_add_debug_info(
900 /*===================*/
901 	rw_lock_t*	lock,		/*!< in: rw-lock */
902 	ulint		pass,		/*!< in: pass value */
903 	ulint		lock_type,	/*!< in: lock type */
904 	const char*	file_name,	/*!< in: file where requested */
905 	unsigned	line)		/*!< in: line where requested */
906 {
907 	ut_ad(file_name != NULL);
908 
909 	rw_lock_debug_t*	info = rw_lock_debug_create();
910 
911 	rw_lock_debug_mutex_enter();
912 
913 	info->pass	= pass;
914 	info->line	= line;
915 	info->lock_type = lock_type;
916 	info->file_name = file_name;
917 	info->thread_id = os_thread_get_curr_id();
918 
919 	UT_LIST_ADD_FIRST(lock->debug_list, info);
920 
921 	rw_lock_debug_mutex_exit();
922 
923 	if (pass == 0 && lock_type != RW_LOCK_X_WAIT) {
924 		int32_t lock_word = lock->lock_word;
925 
926 		/* Recursive x while holding SX
927 		(lock_type == RW_LOCK_X && lock_word == -X_LOCK_HALF_DECR)
928 		is treated as not-relock (new lock). */
929 
930 		if ((lock_type == RW_LOCK_X
931 		     && lock_word <  -X_LOCK_HALF_DECR)
932 		    || (lock_type == RW_LOCK_SX
933 		       && (lock_word < 0 || lock->sx_recursive == 1))) {
934 
935 			sync_check_lock_validate(lock);
936 			sync_check_lock_granted(lock);
937 		} else {
938 			sync_check_relock(lock);
939 		}
940 	}
941 }
942 
943 /******************************************************************//**
944 Removes a debug information struct for an rw-lock. */
945 void
rw_lock_remove_debug_info(rw_lock_t * lock,ulint pass,ulint lock_type)946 rw_lock_remove_debug_info(
947 /*======================*/
948 	rw_lock_t*	lock,		/*!< in: rw-lock */
949 	ulint		pass,		/*!< in: pass value */
950 	ulint		lock_type)	/*!< in: lock type */
951 {
952 	rw_lock_debug_t*	info;
953 
954 	ut_ad(lock);
955 
956 	if (pass == 0 && lock_type != RW_LOCK_X_WAIT) {
957 		sync_check_unlock(lock);
958 	}
959 
960 	rw_lock_debug_mutex_enter();
961 
962 	for (info = UT_LIST_GET_FIRST(lock->debug_list);
963 	     info != 0;
964 	     info = UT_LIST_GET_NEXT(list, info)) {
965 
966 		if (pass == info->pass
967 		    && (pass != 0
968 			|| os_thread_eq(info->thread_id,
969 					os_thread_get_curr_id()))
970 		    && info->lock_type == lock_type) {
971 
972 			/* Found! */
973 			UT_LIST_REMOVE(lock->debug_list, info);
974 
975 			rw_lock_debug_mutex_exit();
976 
977 			rw_lock_debug_free(info);
978 
979 			return;
980 		}
981 	}
982 
983 	ut_error;
984 }
985 
986 /******************************************************************//**
987 Checks if the thread has locked the rw-lock in the specified mode, with
988 the pass value == 0.
989 @return TRUE if locked */
990 bool
rw_lock_own(const rw_lock_t * lock,ulint lock_type)991 rw_lock_own(
992 /*========*/
993 	const rw_lock_t*lock,		/*!< in: rw-lock */
994 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_S,
995 					RW_LOCK_X */
996 {
997 	ut_ad(lock);
998 	ut_ad(rw_lock_validate(lock));
999 
1000 	const os_thread_id_t thread_id = os_thread_get_curr_id();
1001 
1002 	if (!os_thread_eq(lock->writer_thread, thread_id)) {
1003 	} else if (lock_type == RW_LOCK_X && rw_lock_get_x_lock_count(lock)) {
1004 		return TRUE;
1005 	} else if (lock_type == RW_LOCK_SX && rw_lock_get_sx_lock_count(lock)) {
1006 		return TRUE;
1007 	}
1008 
1009 	rw_lock_debug_mutex_enter();
1010 
1011 	for (const rw_lock_debug_t* info = UT_LIST_GET_FIRST(lock->debug_list);
1012 	     info != NULL;
1013 	     info = UT_LIST_GET_NEXT(list, info)) {
1014 
1015 		if (os_thread_eq(info->thread_id, thread_id)
1016 		    && info->pass == 0
1017 		    && info->lock_type == lock_type) {
1018 
1019 			rw_lock_debug_mutex_exit();
1020 			/* Found! */
1021 
1022 			return(true);
1023 		}
1024 	}
1025 	rw_lock_debug_mutex_exit();
1026 
1027 	return(false);
1028 }
1029 
1030 /** Checks if the thread has locked the rw-lock in the specified mode, with
1031 the pass value == 0.
1032 @param[in]	lock		rw-lock
1033 @param[in]	flags		specify lock types with OR of the
1034 				rw_lock_flag_t values
1035 @return true if locked */
rw_lock_own_flagged(const rw_lock_t * lock,rw_lock_flags_t flags)1036 bool rw_lock_own_flagged(const rw_lock_t* lock, rw_lock_flags_t flags)
1037 {
1038 	ut_ad(rw_lock_validate(lock));
1039 
1040 	const os_thread_id_t thread_id = os_thread_get_curr_id();
1041 
1042 	if (!os_thread_eq(lock->writer_thread, thread_id)) {
1043 	} else if ((flags & RW_LOCK_FLAG_X)
1044 		   && rw_lock_get_x_lock_count(lock)) {
1045 		return true;
1046 	} else if ((flags & RW_LOCK_FLAG_SX)
1047 		   && rw_lock_get_sx_lock_count(lock)) {
1048 		return true;
1049 	}
1050 
1051 	rw_lock_debug_mutex_enter();
1052 
1053 	for (rw_lock_debug_t* info = UT_LIST_GET_FIRST(lock->debug_list);
1054 	     info != NULL;
1055 	     info = UT_LIST_GET_NEXT(list, info)) {
1056 		if (!os_thread_eq(info->thread_id, thread_id)
1057 		    || info->pass) {
1058 			continue;
1059 		}
1060 
1061 		switch (info->lock_type) {
1062 		case RW_LOCK_S:
1063 			if (!(flags & RW_LOCK_FLAG_S)) {
1064 				continue;
1065 			}
1066 			break;
1067 
1068 		case RW_LOCK_X:
1069 			if (!(flags & RW_LOCK_FLAG_X)) {
1070 				continue;
1071 			}
1072 			break;
1073 
1074 		case RW_LOCK_SX:
1075 			if (!(flags & RW_LOCK_FLAG_SX)) {
1076 				continue;
1077 			}
1078 			break;
1079 		}
1080 
1081 		rw_lock_debug_mutex_exit();
1082 		return true;
1083 	}
1084 
1085 	rw_lock_debug_mutex_exit();
1086 	return false;
1087 }
1088 
1089 /***************************************************************//**
1090 Prints debug info of currently locked rw-locks. */
1091 void
rw_lock_list_print_info(FILE * file)1092 rw_lock_list_print_info(
1093 /*====================*/
1094 	FILE*	file)		/*!< in: file where to print */
1095 {
1096 	ulint		count = 0;
1097 
1098 	mutex_enter(&rw_lock_list_mutex);
1099 
1100 	fputs("-------------\n"
1101 	      "RW-LATCH INFO\n"
1102 	      "-------------\n", file);
1103 
1104 	for (const rw_lock_t& lock : rw_lock_list) {
1105 
1106 		count++;
1107 
1108 		if (lock.lock_word != X_LOCK_DECR) {
1109 
1110 			fprintf(file, "RW-LOCK: %p ", (void*) &lock);
1111 
1112 			if (int32_t waiters= lock.waiters) {
1113 				fprintf(file, " (%d waiters)\n", waiters);
1114 			} else {
1115 				putc('\n', file);
1116 			}
1117 
1118 			rw_lock_debug_t* info;
1119 
1120 			rw_lock_debug_mutex_enter();
1121 
1122 			for (info = UT_LIST_GET_FIRST(lock.debug_list);
1123 			     info != NULL;
1124 			     info = UT_LIST_GET_NEXT(list, info)) {
1125 
1126 				rw_lock_debug_print(file, info);
1127 			}
1128 
1129 			rw_lock_debug_mutex_exit();
1130 		}
1131 	}
1132 
1133 	fprintf(file, "Total number of rw-locks " ULINTPF "\n", count);
1134 	mutex_exit(&rw_lock_list_mutex);
1135 }
1136 
1137 /*********************************************************************//**
1138 Prints info of a debug struct. */
1139 void
rw_lock_debug_print(FILE * f,const rw_lock_debug_t * info)1140 rw_lock_debug_print(
1141 /*================*/
1142 	FILE*			f,	/*!< in: output stream */
1143 	const rw_lock_debug_t*	info)	/*!< in: debug struct */
1144 {
1145 	ulint	rwt = info->lock_type;
1146 
1147 	fprintf(f, "Locked: thread " ULINTPF " file %s line %u  ",
1148 		ulint(info->thread_id),
1149 		sync_basename(info->file_name),
1150 		info->line);
1151 
1152 	switch (rwt) {
1153 	case RW_LOCK_S:
1154 		fputs("S-LOCK", f);
1155 		break;
1156 	case RW_LOCK_X:
1157 		fputs("X-LOCK", f);
1158 		break;
1159 	case RW_LOCK_SX:
1160 		fputs("SX-LOCK", f);
1161 		break;
1162 	case RW_LOCK_X_WAIT:
1163 		fputs("WAIT X-LOCK", f);
1164 		break;
1165 	default:
1166 		ut_error;
1167 	}
1168 
1169 	if (info->pass != 0) {
1170 		fprintf(f, " pass value %lu", (ulong) info->pass);
1171 	}
1172 
1173 	fprintf(f, "\n");
1174 }
1175 
1176 /** Print the rw-lock information.
1177 @return the string representation */
1178 std::string
to_string() const1179 rw_lock_t::to_string() const
1180 {
1181 	/* Note: For X locks it can be locked form multiple places because
1182 	the same thread can call X lock recursively. */
1183 
1184 	std::ostringstream	msg;
1185 	bool			written = false;
1186 
1187 	ut_ad(rw_lock_validate(this));
1188 
1189 	msg << "RW-LATCH: "
1190 	    << "thread id " << os_thread_get_curr_id()
1191 	    << " addr: " << this
1192 	    << " Locked from: ";
1193 
1194 	rw_lock_debug_mutex_enter();
1195 
1196 	for (rw_lock_debug_t* info = UT_LIST_GET_FIRST(debug_list);
1197 	     info != NULL;
1198 	     info = UT_LIST_GET_NEXT(list, info)) {
1199 		if (!os_thread_eq(info->thread_id, os_thread_get_curr_id())) {
1200 			continue;
1201 		}
1202 
1203 		if (written) {
1204 			msg << ", ";
1205 		}
1206 
1207 		written = true;
1208 
1209 		msg << info->file_name << ":" << info->line;
1210 	}
1211 
1212 	rw_lock_debug_mutex_exit();
1213 
1214 	return(msg.str());
1215 }
1216 #endif /* UNIV_DEBUG */
1217