1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2008, Google Inc.
5 
6 Portions of this file contain modifications contributed and copyrighted by
7 Google, Inc. Those modifications are gratefully acknowledged and are described
8 briefly in the InnoDB documentation. The contributions by Google are
9 incorporated with their permission, and subject to the conditions contained in
10 the file COPYING.Google.
11 
12 This program is free software; you can redistribute it and/or modify it under
13 the terms of the GNU General Public License as published by the Free Software
14 Foundation; version 2 of the License.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 
20 You should have received a copy of the GNU General Public License along with
21 this program; if not, write to the Free Software Foundation, Inc.,
22 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
23 
24 *****************************************************************************/
25 
26 /**************************************************//**
27 @file sync/sync0rw.c
28 The read-write lock (for thread synchronization)
29 
30 Created 9/11/1995 Heikki Tuuri
31 *******************************************************/
32 
33 #include "sync0rw.h"
34 #ifdef UNIV_NONINL
35 #include "sync0rw.ic"
36 #endif
37 
38 #include "os0thread.h"
39 #include "mem0mem.h"
40 #include "srv0srv.h"
41 #include "os0sync.h" /* for INNODB_RW_LOCKS_USE_ATOMICS */
42 #include "ha_prototypes.h"
43 
44 /*
45 	IMPLEMENTATION OF THE RW_LOCK
46 	=============================
47 The status of a rw_lock is held in lock_word. The initial value of lock_word is
48 X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
49 for each x-lock. This describes the lock state for each value of lock_word:
50 
51 lock_word == X_LOCK_DECR:      Unlocked.
52 0 < lock_word < X_LOCK_DECR:   Read locked, no waiting writers.
53 			       (X_LOCK_DECR - lock_word) is the
54 			       number of readers that hold the lock.
55 lock_word == 0:		       Write locked
56 -X_LOCK_DECR < lock_word < 0:  Read locked, with a waiting writer.
57 			       (-lock_word) is the number of readers
58 			       that hold the lock.
59 lock_word <= -X_LOCK_DECR:     Recursively write locked. lock_word has been
60 			       decremented by X_LOCK_DECR once for each lock,
61 			       so the number of locks is:
62 			       ((-lock_word) / X_LOCK_DECR) + 1
63 When lock_word <= -X_LOCK_DECR, we also know that lock_word % X_LOCK_DECR == 0:
64 other values of lock_word are invalid.
65 
66 The lock_word is always read and updated atomically and consistently, so that
67 it always represents the state of the lock, and the state of the lock changes
68 with a single atomic operation. This lock_word holds all of the information
69 that a thread needs in order to determine if it is eligible to gain the lock
70 or if it must spin or sleep. The one exception to this is that writer_thread
71 must be verified before recursive write locks: to solve this scenario, we make
72 writer_thread readable by all threads, but only writeable by the x-lock holder.
73 
74 The other members of the lock obey the following rules to remain consistent:
75 
76 recursive:	This and the writer_thread field together control the
77 		behaviour of recursive x-locking.
78 		lock->recursive must be FALSE in following states:
79 			1) The writer_thread contains garbage i.e.: the
80 			lock has just been initialized.
81 			2) The lock is not x-held and there is no
82 			x-waiter waiting on WAIT_EX event.
83 			3) The lock is x-held or there is an x-waiter
84 			waiting on WAIT_EX event but the 'pass' value
85 			is non-zero.
86 		lock->recursive is TRUE iff:
87 			1) The lock is x-held or there is an x-waiter
88 			waiting on WAIT_EX event and the 'pass' value
89 			is zero.
90 		This flag must be set after the writer_thread field
91 		has been updated with a memory ordering barrier.
92 		It is unset before the lock_word has been incremented.
93 writer_thread:	Is used only in recursive x-locking. Can only be safely
94 		read iff lock->recursive flag is TRUE.
95 		This field is uninitialized at lock creation time and
96 		is updated atomically when x-lock is acquired or when
97 		move_ownership is called. A thread is only allowed to
98 		set the value of this field to it's thread_id i.e.: a
99 		thread cannot set writer_thread to some other thread's
100 		id.
101 waiters:	May be set to 1 anytime, but to avoid unnecessary wake-up
102 		signals, it should only be set to 1 when there are threads
103 		waiting on event. Must be 1 when a writer starts waiting to
104 		ensure the current x-locking thread sends a wake-up signal
105 		during unlock. May only be reset to 0 immediately before a
106 		a wake-up signal is sent to event. On most platforms, a
107 		memory barrier is required after waiters is set, and before
108 		verifying lock_word is still held, to ensure some unlocker
109 		really does see the flags new value.
110 event:		Threads wait on event for read or writer lock when another
111 		thread has an x-lock or an x-lock reservation (wait_ex). A
112 		thread may only	wait on event after performing the following
113 		actions in order:
114 		   (1) Record the counter value of event (with os_event_reset).
115 		   (2) Set waiters to 1.
116 		   (3) Verify lock_word <= 0.
117 		(1) must come before (2) to ensure signal is not missed.
118 		(2) must come before (3) to ensure a signal is sent.
119 		These restrictions force the above ordering.
120 		Immediately before sending the wake-up signal, we should:
121 		   (1) Verify lock_word == X_LOCK_DECR (unlocked)
122 		   (2) Reset waiters to 0.
123 wait_ex_event:	A thread may only wait on the wait_ex_event after it has
124 		performed the following actions in order:
125 		   (1) Decrement lock_word by X_LOCK_DECR.
126 		   (2) Record counter value of wait_ex_event (os_event_reset,
127                        called from sync_array_reserve_cell).
128 		   (3) Verify that lock_word < 0.
129 		(1) must come first to ensures no other threads become reader
130                 or next writer, and notifies unlocker that signal must be sent.
131                 (2) must come before (3) to ensure the signal is not missed.
132 		These restrictions force the above ordering.
133 		Immediately before sending the wake-up signal, we should:
134 		   Verify lock_word == 0 (waiting thread holds x_lock)
135 */
136 
137 
138 /** number of spin waits on rw-latches,
139 resulted during shared (read) locks */
140 UNIV_INTERN ib_int64_t	rw_s_spin_wait_count	= 0;
141 /** number of spin loop rounds on rw-latches,
142 resulted during shared (read) locks */
143 UNIV_INTERN ib_int64_t	rw_s_spin_round_count	= 0;
144 
145 /** number of OS waits on rw-latches,
146 resulted during shared (read) locks */
147 UNIV_INTERN ib_int64_t	rw_s_os_wait_count	= 0;
148 
149 /** number of unlocks (that unlock shared locks),
150 set only when UNIV_SYNC_PERF_STAT is defined */
151 UNIV_INTERN ib_int64_t	rw_s_exit_count		= 0;
152 
153 /** number of spin waits on rw-latches,
154 resulted during exclusive (write) locks */
155 UNIV_INTERN ib_int64_t	rw_x_spin_wait_count	= 0;
156 /** number of spin loop rounds on rw-latches,
157 resulted during exclusive (write) locks */
158 UNIV_INTERN ib_int64_t	rw_x_spin_round_count	= 0;
159 
160 /** number of OS waits on rw-latches,
161 resulted during exclusive (write) locks */
162 UNIV_INTERN ib_int64_t	rw_x_os_wait_count	= 0;
163 
164 /** number of unlocks (that unlock exclusive locks),
165 set only when UNIV_SYNC_PERF_STAT is defined */
166 UNIV_INTERN ib_int64_t	rw_x_exit_count		= 0;
167 
168 /* The global list of rw-locks */
169 UNIV_INTERN rw_lock_list_t	rw_lock_list;
170 UNIV_INTERN mutex_t		rw_lock_list_mutex;
171 
172 #ifdef UNIV_PFS_MUTEX
173 UNIV_INTERN mysql_pfs_key_t	rw_lock_list_mutex_key;
174 UNIV_INTERN mysql_pfs_key_t	rw_lock_mutex_key;
175 #endif /* UNIV_PFS_MUTEX */
176 
177 #ifdef UNIV_SYNC_DEBUG
178 /* The global mutex which protects debug info lists of all rw-locks.
179 To modify the debug info list of an rw-lock, this mutex has to be
180 acquired in addition to the mutex protecting the lock. */
181 
182 UNIV_INTERN mutex_t		rw_lock_debug_mutex;
183 
184 # ifdef UNIV_PFS_MUTEX
185 UNIV_INTERN mysql_pfs_key_t	rw_lock_debug_mutex_key;
186 # endif
187 
188 /* If deadlock detection does not get immediately the mutex,
189 it may wait for this event */
190 UNIV_INTERN os_event_t		rw_lock_debug_event;
191 /* This is set to TRUE, if there may be waiters for the event */
192 UNIV_INTERN ibool		rw_lock_debug_waiters;
193 
194 /******************************************************************//**
195 Creates a debug info struct. */
196 static
197 rw_lock_debug_t*
198 rw_lock_debug_create(void);
199 /*======================*/
200 /******************************************************************//**
201 Frees a debug info struct. */
202 static
203 void
204 rw_lock_debug_free(
205 /*===============*/
206 	rw_lock_debug_t* info);
207 
208 /******************************************************************//**
209 Creates a debug info struct.
210 @return	own: debug info struct */
211 static
212 rw_lock_debug_t*
rw_lock_debug_create(void)213 rw_lock_debug_create(void)
214 /*======================*/
215 {
216 	return((rw_lock_debug_t*) mem_alloc(sizeof(rw_lock_debug_t)));
217 }
218 
219 /******************************************************************//**
220 Frees a debug info struct. */
221 static
222 void
rw_lock_debug_free(rw_lock_debug_t * info)223 rw_lock_debug_free(
224 /*===============*/
225 	rw_lock_debug_t* info)
226 {
227 	mem_free(info);
228 }
229 #endif /* UNIV_SYNC_DEBUG */
230 
231 /******************************************************************//**
232 Creates, or rather, initializes an rw-lock object in a specified memory
233 location (which must be appropriately aligned). The rw-lock is initialized
234 to the non-locked state. Explicit freeing of the rw-lock with rw_lock_free
235 is necessary only if the memory block containing it is freed. */
236 UNIV_INTERN
237 void
rw_lock_create_func(rw_lock_t * lock,ulint level,const char * cmutex_name,const char * cfile_name,ulint cline)238 rw_lock_create_func(
239 /*================*/
240 	rw_lock_t*	lock,		/*!< in: pointer to memory */
241 #ifdef UNIV_DEBUG
242 # ifdef UNIV_SYNC_DEBUG
243 	ulint		level,		/*!< in: level */
244 # endif /* UNIV_SYNC_DEBUG */
245 	const char*	cmutex_name,	/*!< in: mutex name */
246 #endif /* UNIV_DEBUG */
247 	const char*	cfile_name,	/*!< in: file name where created */
248 	ulint		cline)		/*!< in: file line where created */
249 {
250 	/* If this is the very first time a synchronization object is
251 	created, then the following call initializes the sync system. */
252 
253 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
254 	mutex_create(rw_lock_mutex_key, rw_lock_get_mutex(lock),
255 		     SYNC_NO_ORDER_CHECK);
256 
257 	lock->mutex.cfile_name = cfile_name;
258 	lock->mutex.cline = cline;
259 
260 	ut_d(lock->mutex.cmutex_name = cmutex_name);
261 	ut_d(lock->mutex.mutex_type = 1);
262 #else /* INNODB_RW_LOCKS_USE_ATOMICS */
263 # ifdef UNIV_DEBUG
264 	UT_NOT_USED(cmutex_name);
265 # endif
266 #endif /* INNODB_RW_LOCKS_USE_ATOMICS */
267 
268 	lock->lock_word = X_LOCK_DECR;
269 	lock->waiters = 0;
270 
271 	/* We set this value to signify that lock->writer_thread
272 	contains garbage at initialization and cannot be used for
273 	recursive x-locking. */
274 	lock->recursive = FALSE;
275 	/* Silence Valgrind when UNIV_DEBUG_VALGRIND is not enabled. */
276 	memset((void*) &lock->writer_thread, 0, sizeof lock->writer_thread);
277 	UNIV_MEM_INVALID(&lock->writer_thread, sizeof lock->writer_thread);
278 
279 #ifdef UNIV_SYNC_DEBUG
280 	UT_LIST_INIT(lock->debug_list);
281 
282 	lock->level = level;
283 #endif /* UNIV_SYNC_DEBUG */
284 
285 	ut_d(lock->magic_n = RW_LOCK_MAGIC_N);
286 
287 	lock->cfile_name = cfile_name;
288 	lock->cline = (unsigned int) cline;
289 
290 	lock->count_os_wait = 0;
291 	lock->last_s_file_name = "not yet reserved";
292 	lock->last_x_file_name = "not yet reserved";
293 	lock->last_s_line = 0;
294 	lock->last_x_line = 0;
295 	lock->event = os_event_create(NULL);
296 	lock->wait_ex_event = os_event_create(NULL);
297 
298 	mutex_enter(&rw_lock_list_mutex);
299 
300 	ut_ad(UT_LIST_GET_FIRST(rw_lock_list) == NULL
301 	      || UT_LIST_GET_FIRST(rw_lock_list)->magic_n == RW_LOCK_MAGIC_N);
302 
303 	UT_LIST_ADD_FIRST(list, rw_lock_list, lock);
304 
305 	mutex_exit(&rw_lock_list_mutex);
306 }
307 
308 /******************************************************************//**
309 Calling this function is obligatory only if the memory buffer containing
310 the rw-lock is freed. Removes an rw-lock object from the global list. The
311 rw-lock is checked to be in the non-locked state. */
312 UNIV_INTERN
313 void
rw_lock_free_func(rw_lock_t * lock)314 rw_lock_free_func(
315 /*==============*/
316 	rw_lock_t*	lock)	/*!< in: rw-lock */
317 {
318 	ut_ad(rw_lock_validate(lock));
319 	ut_a(lock->lock_word == X_LOCK_DECR);
320 
321 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
322 	mutex_free(rw_lock_get_mutex(lock));
323 #endif /* INNODB_RW_LOCKS_USE_ATOMICS */
324 
325 	mutex_enter(&rw_lock_list_mutex);
326 	os_event_free(lock->event);
327 
328 	os_event_free(lock->wait_ex_event);
329 
330 	ut_ad(UT_LIST_GET_PREV(list, lock) == NULL
331 	      || UT_LIST_GET_PREV(list, lock)->magic_n == RW_LOCK_MAGIC_N);
332 	ut_ad(UT_LIST_GET_NEXT(list, lock) == NULL
333 	      || UT_LIST_GET_NEXT(list, lock)->magic_n == RW_LOCK_MAGIC_N);
334 
335 	UT_LIST_REMOVE(list, rw_lock_list, lock);
336 
337 	mutex_exit(&rw_lock_list_mutex);
338 
339 	ut_d(lock->magic_n = 0);
340 }
341 
342 #ifdef UNIV_DEBUG
343 /******************************************************************//**
344 Checks that the rw-lock has been initialized and that there are no
345 simultaneous shared and exclusive locks.
346 @return	TRUE */
347 UNIV_INTERN
348 ibool
rw_lock_validate(rw_lock_t * lock)349 rw_lock_validate(
350 /*=============*/
351 	rw_lock_t*	lock)	/*!< in: rw-lock */
352 {
353 	ulint	waiters;
354 	lint	lock_word;
355 
356 	ut_a(lock);
357 
358 	waiters = rw_lock_get_waiters(lock);
359 	lock_word = lock->lock_word;
360 
361 	ut_ad(lock->magic_n == RW_LOCK_MAGIC_N);
362 	ut_a(waiters == 0 || waiters == 1);
363 	ut_a(lock_word > -X_LOCK_DECR ||(-lock_word) % X_LOCK_DECR == 0);
364 
365 	return(TRUE);
366 }
367 #endif /* UNIV_DEBUG */
368 
369 /******************************************************************//**
370 Lock an rw-lock in shared mode for the current thread. If the rw-lock is
371 locked in exclusive mode, or there is an exclusive lock request waiting,
372 the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
373 for the lock, before suspending the thread. */
374 UNIV_INTERN
375 void
rw_lock_s_lock_spin(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)376 rw_lock_s_lock_spin(
377 /*================*/
378 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
379 	ulint		pass,	/*!< in: pass value; != 0, if the lock
380 				will be passed to another thread to unlock */
381 	const char*	file_name, /*!< in: file name where lock requested */
382 	ulint		line)	/*!< in: line where requested */
383 {
384 	ulint	 index;	/* index of the reserved wait cell */
385 	ulint	 i = 0;	/* spin round count */
386 
387 	ut_ad(rw_lock_validate(lock));
388 
389 	rw_s_spin_wait_count++;	/*!< Count calls to this function */
390 lock_loop:
391 
392 	/* Spin waiting for the writer field to become free */
393 	while (i < SYNC_SPIN_ROUNDS && lock->lock_word <= 0) {
394 		if (srv_spin_wait_delay) {
395 			ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
396 		}
397 
398 		i++;
399 	}
400 
401 	if (i == SYNC_SPIN_ROUNDS) {
402 		os_thread_yield();
403 	}
404 
405 	if (srv_print_latch_waits) {
406 		fprintf(stderr,
407 			"Thread %lu spin wait rw-s-lock at %p"
408 			" cfile %s cline %lu rnds %lu\n",
409 			(ulong) os_thread_pf(os_thread_get_curr_id()),
410 			(void*) lock,
411 			innobase_basename(lock->cfile_name),
412 			(ulong) lock->cline, (ulong) i);
413 	}
414 
415 	/* We try once again to obtain the lock */
416 	if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
417 		rw_s_spin_round_count += i;
418 
419 		return; /* Success */
420 	} else {
421 
422 		if (i < SYNC_SPIN_ROUNDS) {
423 			goto lock_loop;
424 		}
425 
426 		rw_s_spin_round_count += i;
427 
428 		sync_array_reserve_cell(sync_primary_wait_array,
429 					lock, RW_LOCK_SHARED,
430 					file_name, line,
431 					&index);
432 
433 		/* Set waiters before checking lock_word to ensure wake-up
434                 signal is sent. This may lead to some unnecessary signals. */
435 		rw_lock_set_waiter_flag(lock);
436 
437 		if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
438 			sync_array_free_cell(sync_primary_wait_array, index);
439 			return; /* Success */
440 		}
441 
442 		if (srv_print_latch_waits) {
443 			fprintf(stderr,
444 				"Thread %lu OS wait rw-s-lock at %p"
445 				" cfile %s cline %lu\n",
446 				os_thread_pf(os_thread_get_curr_id()),
447 				(void*) lock,
448 				innobase_basename(lock->cfile_name),
449 				(ulong) lock->cline);
450 		}
451 
452 		/* these stats may not be accurate */
453 		lock->count_os_wait++;
454 		rw_s_os_wait_count++;
455 
456 		sync_array_wait_event(sync_primary_wait_array, index);
457 
458 		i = 0;
459 		goto lock_loop;
460 	}
461 }
462 
463 /******************************************************************//**
464 This function is used in the insert buffer to move the ownership of an
465 x-latch on a buffer frame to the current thread. The x-latch was set by
466 the buffer read operation and it protected the buffer frame while the
467 read was done. The ownership is moved because we want that the current
468 thread is able to acquire a second x-latch which is stored in an mtr.
469 This, in turn, is needed to pass the debug checks of index page
470 operations. */
471 UNIV_INTERN
472 void
rw_lock_x_lock_move_ownership(rw_lock_t * lock)473 rw_lock_x_lock_move_ownership(
474 /*==========================*/
475 	rw_lock_t*	lock)	/*!< in: lock which was x-locked in the
476 				buffer read */
477 {
478 	ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX));
479 
480 	rw_lock_set_writer_id_and_recursion_flag(lock, TRUE);
481 }
482 
483 /******************************************************************//**
484 Function for the next writer to call. Waits for readers to exit.
485 The caller must have already decremented lock_word by X_LOCK_DECR. */
486 UNIV_INLINE
487 void
rw_lock_x_lock_wait(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)488 rw_lock_x_lock_wait(
489 /*================*/
490 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
491 #ifdef UNIV_SYNC_DEBUG
492 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
493 				be passed to another thread to unlock */
494 #endif
495 	const char*	file_name,/*!< in: file name where lock requested */
496 	ulint		line)	/*!< in: line where requested */
497 {
498 	ulint index;
499 	ulint i = 0;
500 
501 	ut_ad(lock->lock_word <= 0);
502 
503 	while (lock->lock_word < 0) {
504 		if (srv_spin_wait_delay) {
505 			ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
506 		}
507 		if(i < SYNC_SPIN_ROUNDS) {
508 			i++;
509 			continue;
510 		}
511 
512 		/* If there is still a reader, then go to sleep.*/
513 		rw_x_spin_round_count += i;
514 		i = 0;
515 		sync_array_reserve_cell(sync_primary_wait_array,
516 					lock,
517 					RW_LOCK_WAIT_EX,
518 					file_name, line,
519 					&index);
520 		/* Check lock_word to ensure wake-up isn't missed.*/
521 		if(lock->lock_word < 0) {
522 
523 			/* these stats may not be accurate */
524 			lock->count_os_wait++;
525 			rw_x_os_wait_count++;
526 
527                         /* Add debug info as it is needed to detect possible
528                         deadlock. We must add info for WAIT_EX thread for
529                         deadlock detection to work properly. */
530 #ifdef UNIV_SYNC_DEBUG
531 			rw_lock_add_debug_info(lock, pass, RW_LOCK_WAIT_EX,
532 					       file_name, line);
533 #endif
534 
535 			sync_array_wait_event(sync_primary_wait_array,
536 					      index);
537 #ifdef UNIV_SYNC_DEBUG
538 			rw_lock_remove_debug_info(lock, pass,
539 					       RW_LOCK_WAIT_EX);
540 #endif
541                         /* It is possible to wake when lock_word < 0.
542                         We must pass the while-loop check to proceed.*/
543 		} else {
544 			sync_array_free_cell(sync_primary_wait_array,
545 					     index);
546 		}
547 	}
548 	rw_x_spin_round_count += i;
549 }
550 
551 /******************************************************************//**
552 Low-level function for acquiring an exclusive lock.
553 @return	RW_LOCK_NOT_LOCKED if did not succeed, RW_LOCK_EX if success. */
554 UNIV_INLINE
555 ibool
rw_lock_x_lock_low(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)556 rw_lock_x_lock_low(
557 /*===============*/
558 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
559 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
560 				be passed to another thread to unlock */
561 	const char*	file_name,/*!< in: file name where lock requested */
562 	ulint		line)	/*!< in: line where requested */
563 {
564 	os_thread_id_t	curr_thread	= os_thread_get_curr_id();
565 
566 	if (rw_lock_lock_word_decr(lock, X_LOCK_DECR)) {
567 
568 		/* lock->recursive also tells us if the writer_thread
569 		field is stale or active. As we are going to write
570 		our own thread id in that field it must be that the
571 		current writer_thread value is not active. */
572 		ut_a(!lock->recursive);
573 
574 		/* Decrement occurred: we are writer or next-writer. */
575 		rw_lock_set_writer_id_and_recursion_flag(lock,
576 						pass ? FALSE : TRUE);
577 
578 		rw_lock_x_lock_wait(lock,
579 #ifdef UNIV_SYNC_DEBUG
580 				    pass,
581 #endif
582                                     file_name, line);
583 
584 	} else {
585 		/* Decrement failed: relock or failed lock */
586 		if (!pass && lock->recursive
587 		    && os_thread_eq(lock->writer_thread, curr_thread)) {
588 			/* Relock */
589                         lock->lock_word -= X_LOCK_DECR;
590 		} else {
591 			/* Another thread locked before us */
592 			return(FALSE);
593 		}
594 	}
595 #ifdef UNIV_SYNC_DEBUG
596 	rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
597 			       file_name, line);
598 #endif
599 	lock->last_x_file_name = file_name;
600 	lock->last_x_line = (unsigned int) line;
601 
602 	return(TRUE);
603 }
604 
605 /******************************************************************//**
606 NOTE! Use the corresponding macro, not directly this function! Lock an
607 rw-lock in exclusive mode for the current thread. If the rw-lock is locked
608 in shared or exclusive mode, or there is an exclusive lock request waiting,
609 the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
610 for the lock before suspending the thread. If the same thread has an x-lock
611 on the rw-lock, locking succeed, with the following exception: if pass != 0,
612 only a single x-lock may be taken on the lock. NOTE: If the same thread has
613 an s-lock, locking does not succeed! */
614 UNIV_INTERN
615 void
rw_lock_x_lock_func(rw_lock_t * lock,ulint pass,const char * file_name,ulint line)616 rw_lock_x_lock_func(
617 /*================*/
618 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
619 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
620 				be passed to another thread to unlock */
621 	const char*	file_name,/*!< in: file name where lock requested */
622 	ulint		line)	/*!< in: line where requested */
623 {
624 	ulint	index;	/*!< index of the reserved wait cell */
625 	ulint	i;	/*!< spin round count */
626 	ibool	spinning = FALSE;
627 
628 	ut_ad(rw_lock_validate(lock));
629 #ifdef UNIV_SYNC_DEBUG
630 	ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED));
631 #endif /* UNIV_SYNC_DEBUG */
632 
633 	i = 0;
634 
635 lock_loop:
636 
637 	if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
638 		rw_x_spin_round_count += i;
639 
640 		return;	/* Locking succeeded */
641 
642 	} else {
643 
644                 if (!spinning) {
645                         spinning = TRUE;
646                         rw_x_spin_wait_count++;
647 		}
648 
649 		/* Spin waiting for the lock_word to become free */
650 		while (i < SYNC_SPIN_ROUNDS
651 		       && lock->lock_word <= 0) {
652 			if (srv_spin_wait_delay) {
653 				ut_delay(ut_rnd_interval(0,
654 							 srv_spin_wait_delay));
655 			}
656 
657 			i++;
658 		}
659 		if (i == SYNC_SPIN_ROUNDS) {
660 			os_thread_yield();
661 		} else {
662 			goto lock_loop;
663 		}
664 	}
665 
666 	rw_x_spin_round_count += i;
667 
668 	if (srv_print_latch_waits) {
669 		fprintf(stderr,
670 			"Thread %lu spin wait rw-x-lock at %p"
671 			" cfile %s cline %lu rnds %lu\n",
672 			os_thread_pf(os_thread_get_curr_id()), (void*) lock,
673 			innobase_basename(lock->cfile_name),
674 			(ulong) lock->cline, (ulong) i);
675 	}
676 
677 	sync_array_reserve_cell(sync_primary_wait_array,
678 				lock,
679 				RW_LOCK_EX,
680 				file_name, line,
681 				&index);
682 
683 	/* Waiters must be set before checking lock_word, to ensure signal
684 	is sent. This could lead to a few unnecessary wake-up signals. */
685 	rw_lock_set_waiter_flag(lock);
686 
687 	if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
688 		sync_array_free_cell(sync_primary_wait_array, index);
689 		return; /* Locking succeeded */
690 	}
691 
692 	if (srv_print_latch_waits) {
693 		fprintf(stderr,
694 			"Thread %lu OS wait for rw-x-lock at %p"
695 			" cfile %s cline %lu\n",
696 			os_thread_pf(os_thread_get_curr_id()), (void*) lock,
697 			innobase_basename(lock->cfile_name),
698 			(ulong) lock->cline);
699 	}
700 
701 	/* these stats may not be accurate */
702 	lock->count_os_wait++;
703 	rw_x_os_wait_count++;
704 
705 	sync_array_wait_event(sync_primary_wait_array, index);
706 
707 	i = 0;
708 	goto lock_loop;
709 }
710 
711 #ifdef UNIV_SYNC_DEBUG
712 /******************************************************************//**
713 Acquires the debug mutex. We cannot use the mutex defined in sync0sync,
714 because the debug mutex is also acquired in sync0arr while holding the OS
715 mutex protecting the sync array, and the ordinary mutex_enter might
716 recursively call routines in sync0arr, leading to a deadlock on the OS
717 mutex. */
718 UNIV_INTERN
719 void
rw_lock_debug_mutex_enter(void)720 rw_lock_debug_mutex_enter(void)
721 /*===========================*/
722 {
723 loop:
724 	if (0 == mutex_enter_nowait(&rw_lock_debug_mutex)) {
725 		return;
726 	}
727 
728 	os_event_reset(rw_lock_debug_event);
729 
730 	rw_lock_debug_waiters = TRUE;
731 
732 	if (0 == mutex_enter_nowait(&rw_lock_debug_mutex)) {
733 		return;
734 	}
735 
736 	os_event_wait(rw_lock_debug_event);
737 
738 	goto loop;
739 }
740 
741 /******************************************************************//**
742 Releases the debug mutex. */
743 UNIV_INTERN
744 void
rw_lock_debug_mutex_exit(void)745 rw_lock_debug_mutex_exit(void)
746 /*==========================*/
747 {
748 	mutex_exit(&rw_lock_debug_mutex);
749 
750 	if (rw_lock_debug_waiters) {
751 		rw_lock_debug_waiters = FALSE;
752 		os_event_set(rw_lock_debug_event);
753 	}
754 }
755 
756 /******************************************************************//**
757 Inserts the debug information for an rw-lock. */
758 UNIV_INTERN
759 void
rw_lock_add_debug_info(rw_lock_t * lock,ulint pass,ulint lock_type,const char * file_name,ulint line)760 rw_lock_add_debug_info(
761 /*===================*/
762 	rw_lock_t*	lock,		/*!< in: rw-lock */
763 	ulint		pass,		/*!< in: pass value */
764 	ulint		lock_type,	/*!< in: lock type */
765 	const char*	file_name,	/*!< in: file where requested */
766 	ulint		line)		/*!< in: line where requested */
767 {
768 	rw_lock_debug_t*	info;
769 
770 	ut_ad(lock);
771 	ut_ad(file_name);
772 
773 	info = rw_lock_debug_create();
774 
775 	rw_lock_debug_mutex_enter();
776 
777 	info->file_name = file_name;
778 	info->line	= line;
779 	info->lock_type = lock_type;
780 	info->thread_id = os_thread_get_curr_id();
781 	info->pass	= pass;
782 
783 	UT_LIST_ADD_FIRST(list, lock->debug_list, info);
784 
785 	rw_lock_debug_mutex_exit();
786 
787 	if ((pass == 0) && (lock_type != RW_LOCK_WAIT_EX)) {
788 		sync_thread_add_level(lock, lock->level,
789 				      lock_type == RW_LOCK_EX
790 				      && lock->lock_word < 0);
791 	}
792 }
793 
794 /******************************************************************//**
795 Removes a debug information struct for an rw-lock. */
796 UNIV_INTERN
797 void
rw_lock_remove_debug_info(rw_lock_t * lock,ulint pass,ulint lock_type)798 rw_lock_remove_debug_info(
799 /*======================*/
800 	rw_lock_t*	lock,		/*!< in: rw-lock */
801 	ulint		pass,		/*!< in: pass value */
802 	ulint		lock_type)	/*!< in: lock type */
803 {
804 	rw_lock_debug_t*	info;
805 
806 	ut_ad(lock);
807 
808 	if ((pass == 0) && (lock_type != RW_LOCK_WAIT_EX)) {
809 		sync_thread_reset_level(lock);
810 	}
811 
812 	rw_lock_debug_mutex_enter();
813 
814 	info = UT_LIST_GET_FIRST(lock->debug_list);
815 
816 	while (info != NULL) {
817 		if ((pass == info->pass)
818 		    && ((pass != 0)
819 			|| os_thread_eq(info->thread_id,
820 					os_thread_get_curr_id()))
821 		    && (info->lock_type == lock_type)) {
822 
823 			/* Found! */
824 			UT_LIST_REMOVE(list, lock->debug_list, info);
825 			rw_lock_debug_mutex_exit();
826 
827 			rw_lock_debug_free(info);
828 
829 			return;
830 		}
831 
832 		info = UT_LIST_GET_NEXT(list, info);
833 	}
834 
835 	ut_error;
836 }
837 #endif /* UNIV_SYNC_DEBUG */
838 
839 #ifdef UNIV_SYNC_DEBUG
840 /******************************************************************//**
841 Checks if the thread has locked the rw-lock in the specified mode, with
842 the pass value == 0.
843 @return	TRUE if locked */
844 UNIV_INTERN
845 ibool
rw_lock_own(rw_lock_t * lock,ulint lock_type)846 rw_lock_own(
847 /*========*/
848 	rw_lock_t*	lock,		/*!< in: rw-lock */
849 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_SHARED,
850 					RW_LOCK_EX */
851 {
852 	rw_lock_debug_t*	info;
853 
854 	ut_ad(lock);
855 	ut_ad(rw_lock_validate(lock));
856 
857 	rw_lock_debug_mutex_enter();
858 
859 	info = UT_LIST_GET_FIRST(lock->debug_list);
860 
861 	while (info != NULL) {
862 
863 		if (os_thread_eq(info->thread_id, os_thread_get_curr_id())
864 		    && (info->pass == 0)
865 		    && (info->lock_type == lock_type)) {
866 
867 			rw_lock_debug_mutex_exit();
868 			/* Found! */
869 
870 			return(TRUE);
871 		}
872 
873 		info = UT_LIST_GET_NEXT(list, info);
874 	}
875 	rw_lock_debug_mutex_exit();
876 
877 	return(FALSE);
878 }
879 #endif /* UNIV_SYNC_DEBUG */
880 
881 /******************************************************************//**
882 Checks if somebody has locked the rw-lock in the specified mode.
883 @return	TRUE if locked */
884 UNIV_INTERN
885 ibool
rw_lock_is_locked(rw_lock_t * lock,ulint lock_type)886 rw_lock_is_locked(
887 /*==============*/
888 	rw_lock_t*	lock,		/*!< in: rw-lock */
889 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_SHARED,
890 					RW_LOCK_EX */
891 {
892 	ibool	ret	= FALSE;
893 
894 	ut_ad(lock);
895 	ut_ad(rw_lock_validate(lock));
896 
897 	if (lock_type == RW_LOCK_SHARED) {
898 		if (rw_lock_get_reader_count(lock) > 0) {
899 			ret = TRUE;
900 		}
901 	} else if (lock_type == RW_LOCK_EX) {
902 		if (rw_lock_get_writer(lock) == RW_LOCK_EX) {
903 			ret = TRUE;
904 		}
905 	} else {
906 		ut_error;
907 	}
908 
909 	return(ret);
910 }
911 
912 #ifdef UNIV_SYNC_DEBUG
913 /***************************************************************//**
914 Prints debug info of currently locked rw-locks. */
915 UNIV_INTERN
916 void
rw_lock_list_print_info(FILE * file)917 rw_lock_list_print_info(
918 /*====================*/
919 	FILE*	file)		/*!< in: file where to print */
920 {
921 	rw_lock_t*	lock;
922 	ulint		count		= 0;
923 	rw_lock_debug_t* info;
924 
925 	mutex_enter(&rw_lock_list_mutex);
926 
927 	fputs("-------------\n"
928 	      "RW-LATCH INFO\n"
929 	      "-------------\n", file);
930 
931 	lock = UT_LIST_GET_FIRST(rw_lock_list);
932 
933 	while (lock != NULL) {
934 
935 		count++;
936 
937 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
938 		mutex_enter(&(lock->mutex));
939 #endif
940 		if (lock->lock_word != X_LOCK_DECR) {
941 
942 			fprintf(file, "RW-LOCK: %p ", (void*) lock);
943 
944 			if (rw_lock_get_waiters(lock)) {
945 				fputs(" Waiters for the lock exist\n", file);
946 			} else {
947 				putc('\n', file);
948 			}
949 
950 			rw_lock_debug_mutex_enter();
951 			info = UT_LIST_GET_FIRST(lock->debug_list);
952 			while (info != NULL) {
953 				rw_lock_debug_print(file, info);
954 				info = UT_LIST_GET_NEXT(list, info);
955 			}
956 			rw_lock_debug_mutex_exit();
957 		}
958 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
959 		mutex_exit(&(lock->mutex));
960 #endif
961 
962 		lock = UT_LIST_GET_NEXT(list, lock);
963 	}
964 
965 	fprintf(file, "Total number of rw-locks %ld\n", count);
966 	mutex_exit(&rw_lock_list_mutex);
967 }
968 
969 /***************************************************************//**
970 Prints debug info of an rw-lock. */
971 UNIV_INTERN
972 void
rw_lock_print(rw_lock_t * lock)973 rw_lock_print(
974 /*==========*/
975 	rw_lock_t*	lock)	/*!< in: rw-lock */
976 {
977 	rw_lock_debug_t* info;
978 
979 	fprintf(stderr,
980 		"-------------\n"
981 		"RW-LATCH INFO\n"
982 		"RW-LATCH: %p ", (void*) lock);
983 
984 #ifndef INNODB_RW_LOCKS_USE_ATOMICS
985 	/* We used to acquire lock->mutex here, but it would cause a
986 	recursive call to sync_thread_add_level() if UNIV_SYNC_DEBUG
987 	is defined.  Since this function is only invoked from
988 	sync_thread_levels_g(), let us choose the smaller evil:
989 	performing dirty reads instead of causing bogus deadlocks or
990 	assertion failures. */
991 #endif
992 	if (lock->lock_word != X_LOCK_DECR) {
993 
994 		if (rw_lock_get_waiters(lock)) {
995 			fputs(" Waiters for the lock exist\n", stderr);
996 		} else {
997 			putc('\n', stderr);
998 		}
999 
1000 		rw_lock_debug_mutex_enter();
1001 		info = UT_LIST_GET_FIRST(lock->debug_list);
1002 		while (info != NULL) {
1003 			rw_lock_debug_print(stderr, info);
1004 			info = UT_LIST_GET_NEXT(list, info);
1005 		}
1006 		rw_lock_debug_mutex_exit();
1007 	}
1008 }
1009 
1010 /*********************************************************************//**
1011 Prints info of a debug struct. */
1012 UNIV_INTERN
1013 void
rw_lock_debug_print(FILE * f,rw_lock_debug_t * info)1014 rw_lock_debug_print(
1015 /*================*/
1016 	FILE*			f,	/*!< in: output stream */
1017 	rw_lock_debug_t*	info)	/*!< in: debug struct */
1018 {
1019 	ulint	rwt;
1020 
1021 	rwt	  = info->lock_type;
1022 
1023 	fprintf(f, "Locked: thread %lu file %s line %lu  ",
1024 		(ulong) os_thread_pf(info->thread_id), info->file_name,
1025 		(ulong) info->line);
1026 	if (rwt == RW_LOCK_SHARED) {
1027 		fputs("S-LOCK", f);
1028 	} else if (rwt == RW_LOCK_EX) {
1029 		fputs("X-LOCK", f);
1030 	} else if (rwt == RW_LOCK_WAIT_EX) {
1031 		fputs("WAIT X-LOCK", f);
1032 	} else {
1033 		ut_error;
1034 	}
1035 	if (info->pass != 0) {
1036 		fprintf(f, " pass value %lu", (ulong) info->pass);
1037 	}
1038 	putc('\n', f);
1039 }
1040 
1041 /***************************************************************//**
1042 Returns the number of currently locked rw-locks. Works only in the debug
1043 version.
1044 @return	number of locked rw-locks */
1045 UNIV_INTERN
1046 ulint
rw_lock_n_locked(void)1047 rw_lock_n_locked(void)
1048 /*==================*/
1049 {
1050 	rw_lock_t*	lock;
1051 	ulint		count		= 0;
1052 
1053 	mutex_enter(&rw_lock_list_mutex);
1054 
1055 	lock = UT_LIST_GET_FIRST(rw_lock_list);
1056 
1057 	while (lock != NULL) {
1058 
1059 		if (lock->lock_word != X_LOCK_DECR) {
1060 			count++;
1061 		}
1062 
1063 		lock = UT_LIST_GET_NEXT(list, lock);
1064 	}
1065 
1066 	mutex_exit(&rw_lock_list_mutex);
1067 
1068 	return(count);
1069 }
1070 #endif /* UNIV_SYNC_DEBUG */
1071