xref: /minix/external/bsd/bind/dist/lib/isc/rwlock.c (revision 00b67f09)
1 /*	$NetBSD: rwlock.c,v 1.8 2014/12/10 04:37:59 christos Exp $	*/
2 
3 /*
4  * Copyright (C) 2004, 2005, 2007, 2009, 2011, 2012  Internet Systems Consortium, Inc. ("ISC")
5  * Copyright (C) 1998-2001, 2003  Internet Software Consortium.
6  *
7  * Permission to use, copy, modify, and/or distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
12  * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
13  * AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
14  * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
15  * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
16  * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
17  * PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 /* Id */
21 
22 /*! \file */
23 
24 #include <config.h>
25 
26 #include <stddef.h>
27 #include <stdlib.h>
28 
29 #include <isc/atomic.h>
30 #include <isc/magic.h>
31 #include <isc/msgs.h>
32 #include <isc/platform.h>
33 #include <isc/rwlock.h>
34 #include <isc/util.h>
35 
36 #define RWLOCK_MAGIC		ISC_MAGIC('R', 'W', 'L', 'k')
37 #define VALID_RWLOCK(rwl)	ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
38 
39 #ifdef ISC_PLATFORM_USETHREADS
40 #ifdef ISC_PLATFORM_USE_NATIVE_RWLOCKS
41 
42 isc_result_t
isc_rwlock_init(isc_rwlock_t * rwl,unsigned int read_quota,unsigned int write_quota)43 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
44                unsigned int write_quota)
45 {
46 	REQUIRE(rwl != NULL);
47 
48 	UNUSED(read_quota);
49 	UNUSED(write_quota);
50 
51 	return pthread_rwlock_init(rwl, NULL) == 0 ?
52 	    ISC_R_SUCCESS : ISC_R_FAILURE;
53 }
54 
55 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)56 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type)
57 {
58 	REQUIRE(rwl != NULL);
59 
60 	switch (type) {
61 	case isc_rwlocktype_none:
62 		return ISC_R_SUCCESS;
63 
64 	case isc_rwlocktype_read:
65 		return pthread_rwlock_rdlock(rwl) == 0 ?
66 		    ISC_R_SUCCESS : ISC_R_LOCKBUSY;
67 
68 	case isc_rwlocktype_write:
69 		return pthread_rwlock_wrlock(rwl) == 0 ?
70 		    ISC_R_SUCCESS : ISC_R_LOCKBUSY;
71 
72 	default:
73 		abort();
74 		return (ISC_R_FAILURE);
75 	}
76 }
77 
78 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)79 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type)
80 {
81 	REQUIRE(rwl != NULL);
82 
83 	switch (type) {
84 	case isc_rwlocktype_none:
85 		return ISC_R_SUCCESS;
86 
87 	case isc_rwlocktype_read:
88 		return pthread_rwlock_tryrdlock(rwl) == 0 ?
89 		    ISC_R_SUCCESS : ISC_R_LOCKBUSY;
90 
91 	case isc_rwlocktype_write:
92 		return pthread_rwlock_trywrlock(rwl) == 0 ?
93 		    ISC_R_SUCCESS : ISC_R_LOCKBUSY;
94 
95 	default:
96 		abort();
97 		return (ISC_R_FAILURE);
98 	}
99 }
100 
101 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)102 isc_rwlock_tryupgrade(isc_rwlock_t *rwl)
103 {
104 	REQUIRE(rwl != NULL);
105 
106 	/*
107 	* XXX: we need to make sure we are holding a read lock here
108 	* but how to do it atomically?
109 	*/
110 	return pthread_rwlock_trywrlock(rwl) == 0 ?
111 	    ISC_R_SUCCESS : ISC_R_LOCKBUSY;
112 }
113 
114 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)115 isc_rwlock_downgrade(isc_rwlock_t *rwl)
116 {
117 	REQUIRE(rwl != NULL);
118 
119 	/*
120 	* XXX: we need to make sure we are holding a write lock here
121 	* and then give it up and get a read lock but how to do it atomically?
122 	*/
123 	pthread_rwlock_unlock(rwl);
124 	REQUIRE(pthread_rwlock_tryrdlock(rwl) == 0);
125 }
126 
127 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)128 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type)
129 {
130 	REQUIRE(rwl != NULL);
131 	UNUSED(type);
132 
133 	pthread_rwlock_unlock(rwl);
134 
135 	return (ISC_R_SUCCESS);
136 }
137 
138 void
isc_rwlock_destroy(isc_rwlock_t * rwl)139 isc_rwlock_destroy(isc_rwlock_t *rwl)
140 {
141 	REQUIRE(rwl != NULL);
142 }
143 
144 #else /* !ISC_PLATFORM_USE_NATIVE_RWLOCKS */
145 
146 
147 #ifndef RWLOCK_DEFAULT_READ_QUOTA
148 #define RWLOCK_DEFAULT_READ_QUOTA 4
149 #endif
150 
151 #ifndef RWLOCK_DEFAULT_WRITE_QUOTA
152 #define RWLOCK_DEFAULT_WRITE_QUOTA 4
153 #endif
154 
155 #ifdef ISC_RWLOCK_TRACE
156 #include <stdio.h>		/* Required for fprintf/stderr. */
157 #include <isc/thread.h>		/* Required for isc_thread_self(). */
158 
159 static void
print_lock(const char * operation,isc_rwlock_t * rwl,isc_rwlocktype_t type)160 print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
161 	fprintf(stderr,
162 		isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
163 			       ISC_MSG_PRINTLOCK,
164 			       "rwlock %p thread %lu %s(%s): %s, %u active, "
165 			       "%u granted, %u rwaiting, %u wwaiting\n"),
166 		rwl, isc_thread_self(), operation,
167 		(type == isc_rwlocktype_read ?
168 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
169 				ISC_MSG_READ, "read") :
170 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
171 				ISC_MSG_WRITE, "write")),
172 		(rwl->type == isc_rwlocktype_read ?
173 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
174 				ISC_MSG_READING, "reading") :
175 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
176 				ISC_MSG_WRITING, "writing")),
177 		rwl->active, rwl->granted, rwl->readers_waiting,
178 		rwl->writers_waiting);
179 }
180 #endif
181 
182 isc_result_t
isc_rwlock_init(isc_rwlock_t * rwl,unsigned int read_quota,unsigned int write_quota)183 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
184 		unsigned int write_quota)
185 {
186 	isc_result_t result;
187 
188 	REQUIRE(rwl != NULL);
189 
190 	/*
191 	 * In case there's trouble initializing, we zero magic now.  If all
192 	 * goes well, we'll set it to RWLOCK_MAGIC.
193 	 */
194 	rwl->magic = 0;
195 
196 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
197 	rwl->write_requests = 0;
198 	rwl->write_completions = 0;
199 	rwl->cnt_and_flag = 0;
200 	rwl->readers_waiting = 0;
201 	rwl->write_granted = 0;
202 	if (read_quota != 0) {
203 		UNEXPECTED_ERROR(__FILE__, __LINE__,
204 				 "read quota is not supported");
205 	}
206 	if (write_quota == 0)
207 		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
208 	rwl->write_quota = write_quota;
209 #else
210 	rwl->type = isc_rwlocktype_read;
211 	rwl->original = isc_rwlocktype_none;
212 	rwl->active = 0;
213 	rwl->granted = 0;
214 	rwl->readers_waiting = 0;
215 	rwl->writers_waiting = 0;
216 	if (read_quota == 0)
217 		read_quota = RWLOCK_DEFAULT_READ_QUOTA;
218 	rwl->read_quota = read_quota;
219 	if (write_quota == 0)
220 		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
221 	rwl->write_quota = write_quota;
222 #endif
223 
224 	result = isc_mutex_init(&rwl->lock);
225 	if (result != ISC_R_SUCCESS)
226 		return (result);
227 
228 	result = isc_condition_init(&rwl->readable);
229 	if (result != ISC_R_SUCCESS) {
230 		UNEXPECTED_ERROR(__FILE__, __LINE__,
231 				 "isc_condition_init(readable) %s: %s",
232 				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
233 						ISC_MSG_FAILED, "failed"),
234 				 isc_result_totext(result));
235 		result = ISC_R_UNEXPECTED;
236 		goto destroy_lock;
237 	}
238 	result = isc_condition_init(&rwl->writeable);
239 	if (result != ISC_R_SUCCESS) {
240 		UNEXPECTED_ERROR(__FILE__, __LINE__,
241 				 "isc_condition_init(writeable) %s: %s",
242 				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
243 						ISC_MSG_FAILED, "failed"),
244 				 isc_result_totext(result));
245 		result = ISC_R_UNEXPECTED;
246 		goto destroy_rcond;
247 	}
248 
249 	rwl->magic = RWLOCK_MAGIC;
250 
251 	return (ISC_R_SUCCESS);
252 
253   destroy_rcond:
254 	(void)isc_condition_destroy(&rwl->readable);
255   destroy_lock:
256 	DESTROYLOCK(&rwl->lock);
257 
258 	return (result);
259 }
260 
261 void
isc_rwlock_destroy(isc_rwlock_t * rwl)262 isc_rwlock_destroy(isc_rwlock_t *rwl) {
263 	REQUIRE(VALID_RWLOCK(rwl));
264 
265 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
266 	REQUIRE(rwl->write_requests == rwl->write_completions &&
267 		rwl->cnt_and_flag == 0 && rwl->readers_waiting == 0);
268 #else
269 	LOCK(&rwl->lock);
270 	REQUIRE(rwl->active == 0 &&
271 		rwl->readers_waiting == 0 &&
272 		rwl->writers_waiting == 0);
273 	UNLOCK(&rwl->lock);
274 #endif
275 
276 	rwl->magic = 0;
277 	(void)isc_condition_destroy(&rwl->readable);
278 	(void)isc_condition_destroy(&rwl->writeable);
279 	DESTROYLOCK(&rwl->lock);
280 }
281 
282 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
283 
284 /*
285  * When some architecture-dependent atomic operations are available,
286  * rwlock can be more efficient than the generic algorithm defined below.
287  * The basic algorithm is described in the following URL:
288  *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
289  *
290  * The key is to use the following integer variables modified atomically:
291  *   write_requests, write_completions, and cnt_and_flag.
292  *
293  * write_requests and write_completions act as a waiting queue for writers
294  * in order to ensure the FIFO order.  Both variables begin with the initial
295  * value of 0.  When a new writer tries to get a write lock, it increments
296  * write_requests and gets the previous value of the variable as a "ticket".
297  * When write_completions reaches the ticket number, the new writer can start
298  * writing.  When the writer completes its work, it increments
299  * write_completions so that another new writer can start working.  If the
300  * write_requests is not equal to write_completions, it means a writer is now
301  * working or waiting.  In this case, a new readers cannot start reading, or
302  * in other words, this algorithm basically prefers writers.
303  *
304  * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
305  * variable is a kind of structure with two members: writer_flag (1 bit) and
306  * reader_count (31 bits).  The writer_flag shows whether a writer is working,
307  * and the reader_count shows the number of readers currently working or almost
308  * ready for working.  A writer who has the current "ticket" tries to get the
309  * lock by exclusively setting the writer_flag to 1, provided that the whole
310  * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
311  * a new reader tries to increment the "reader_count" field provided that
312  * the writer_flag is 0 (meaning there is no writer working).
313  *
314  * If some of the above operations fail, the reader or the writer sleeps
315  * until the related condition changes.  When a working reader or writer
316  * completes its work, some readers or writers are sleeping, and the condition
317  * that suspended the reader or writer has changed, it wakes up the sleeping
318  * readers or writers.
319  *
320  * As already noted, this algorithm basically prefers writers.  In order to
321  * prevent readers from starving, however, the algorithm also introduces the
322  * "writer quota" (Q).  When Q consecutive writers have completed their work,
323  * suspending readers, the last writer will wake up the readers, even if a new
324  * writer is waiting.
325  *
326  * Implementation specific note: due to the combination of atomic operations
327  * and a mutex lock, ordering between the atomic operation and locks can be
328  * very sensitive in some cases.  In particular, it is generally very important
329  * to check the atomic variable that requires a reader or writer to sleep after
330  * locking the mutex and before actually sleeping; otherwise, it could be very
331  * likely to cause a deadlock.  For example, assume "var" is a variable
332  * atomically modified, then the corresponding code would be:
333  *	if (var == need_sleep) {
334  *		LOCK(lock);
335  *		if (var == need_sleep)
336  *			WAIT(cond, lock);
337  *		UNLOCK(lock);
338  *	}
339  * The second check is important, since "var" is protected by the atomic
340  * operation, not by the mutex, and can be changed just before sleeping.
341  * (The first "if" could be omitted, but this is also important in order to
342  * make the code efficient by avoiding the use of the mutex unless it is
343  * really necessary.)
344  */
345 
346 #define WRITER_ACTIVE	0x1
347 #define READER_INCR	0x2
348 
349 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)350 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
351 	isc_int32_t cntflag;
352 
353 	REQUIRE(VALID_RWLOCK(rwl));
354 
355 #ifdef ISC_RWLOCK_TRACE
356 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
357 				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
358 #endif
359 
360 	if (type == isc_rwlocktype_read) {
361 		if (rwl->write_requests != rwl->write_completions) {
362 			/* there is a waiting or active writer */
363 			LOCK(&rwl->lock);
364 			if (rwl->write_requests != rwl->write_completions) {
365 				rwl->readers_waiting++;
366 				WAIT(&rwl->readable, &rwl->lock);
367 				rwl->readers_waiting--;
368 			}
369 			UNLOCK(&rwl->lock);
370 		}
371 
372 		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
373 		POST(cntflag);
374 		while (1) {
375 			if ((rwl->cnt_and_flag & WRITER_ACTIVE) == 0)
376 				break;
377 
378 			/* A writer is still working */
379 			LOCK(&rwl->lock);
380 			rwl->readers_waiting++;
381 			if ((rwl->cnt_and_flag & WRITER_ACTIVE) != 0)
382 				WAIT(&rwl->readable, &rwl->lock);
383 			rwl->readers_waiting--;
384 			UNLOCK(&rwl->lock);
385 
386 			/*
387 			 * Typically, the reader should be able to get a lock
388 			 * at this stage:
389 			 *   (1) there should have been no pending writer when
390 			 *       the reader was trying to increment the
391 			 *       counter; otherwise, the writer should be in
392 			 *       the waiting queue, preventing the reader from
393 			 *       proceeding to this point.
394 			 *   (2) once the reader increments the counter, no
395 			 *       more writer can get a lock.
396 			 * Still, it is possible another writer can work at
397 			 * this point, e.g. in the following scenario:
398 			 *   A previous writer unlocks the writer lock.
399 			 *   This reader proceeds to point (1).
400 			 *   A new writer appears, and gets a new lock before
401 			 *   the reader increments the counter.
402 			 *   The reader then increments the counter.
403 			 *   The previous writer notices there is a waiting
404 			 *   reader who is almost ready, and wakes it up.
405 			 * So, the reader needs to confirm whether it can now
406 			 * read explicitly (thus we loop).  Note that this is
407 			 * not an infinite process, since the reader has
408 			 * incremented the counter at this point.
409 			 */
410 		}
411 
412 		/*
413 		 * If we are temporarily preferred to writers due to the writer
414 		 * quota, reset the condition (race among readers doesn't
415 		 * matter).
416 		 */
417 		rwl->write_granted = 0;
418 	} else {
419 		isc_int32_t prev_writer;
420 
421 		/* enter the waiting queue, and wait for our turn */
422 		prev_writer = isc_atomic_xadd(&rwl->write_requests, 1);
423 		while (rwl->write_completions != prev_writer) {
424 			LOCK(&rwl->lock);
425 			if (rwl->write_completions != prev_writer) {
426 				WAIT(&rwl->writeable, &rwl->lock);
427 				UNLOCK(&rwl->lock);
428 				continue;
429 			}
430 			UNLOCK(&rwl->lock);
431 			break;
432 		}
433 
434 		while (1) {
435 			cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
436 						     WRITER_ACTIVE);
437 			if (cntflag == 0)
438 				break;
439 
440 			/* Another active reader or writer is working. */
441 			LOCK(&rwl->lock);
442 			if (rwl->cnt_and_flag != 0)
443 				WAIT(&rwl->writeable, &rwl->lock);
444 			UNLOCK(&rwl->lock);
445 		}
446 
447 		INSIST((rwl->cnt_and_flag & WRITER_ACTIVE) != 0);
448 		rwl->write_granted++;
449 	}
450 
451 #ifdef ISC_RWLOCK_TRACE
452 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
453 				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
454 #endif
455 
456 	return (ISC_R_SUCCESS);
457 }
458 
459 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)460 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
461 	isc_int32_t cntflag;
462 
463 	REQUIRE(VALID_RWLOCK(rwl));
464 
465 #ifdef ISC_RWLOCK_TRACE
466 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
467 				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
468 #endif
469 
470 	if (type == isc_rwlocktype_read) {
471 		/* If a writer is waiting or working, we fail. */
472 		if (rwl->write_requests != rwl->write_completions)
473 			return (ISC_R_LOCKBUSY);
474 
475 		/* Otherwise, be ready for reading. */
476 		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
477 		if ((cntflag & WRITER_ACTIVE) != 0) {
478 			/*
479 			 * A writer is working.  We lose, and cancel the read
480 			 * request.
481 			 */
482 			cntflag = isc_atomic_xadd(&rwl->cnt_and_flag,
483 						  -READER_INCR);
484 			/*
485 			 * If no other readers are waiting and we've suspended
486 			 * new writers in this short period, wake them up.
487 			 */
488 			if (cntflag == READER_INCR &&
489 			    rwl->write_completions != rwl->write_requests) {
490 				LOCK(&rwl->lock);
491 				BROADCAST(&rwl->writeable);
492 				UNLOCK(&rwl->lock);
493 			}
494 
495 			return (ISC_R_LOCKBUSY);
496 		}
497 	} else {
498 		/* Try locking without entering the waiting queue. */
499 		cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
500 					     WRITER_ACTIVE);
501 		if (cntflag != 0)
502 			return (ISC_R_LOCKBUSY);
503 
504 		/*
505 		 * XXXJT: jump into the queue, possibly breaking the writer
506 		 * order.
507 		 */
508 		(void)isc_atomic_xadd(&rwl->write_completions, -1);
509 
510 		rwl->write_granted++;
511 	}
512 
513 #ifdef ISC_RWLOCK_TRACE
514 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
515 				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
516 #endif
517 
518 	return (ISC_R_SUCCESS);
519 }
520 
521 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)522 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
523 	isc_int32_t prevcnt;
524 
525 	REQUIRE(VALID_RWLOCK(rwl));
526 
527 	/* Try to acquire write access. */
528 	prevcnt = isc_atomic_cmpxchg(&rwl->cnt_and_flag,
529 				     READER_INCR, WRITER_ACTIVE);
530 	/*
531 	 * There must have been no writer, and there must have been at least
532 	 * one reader.
533 	 */
534 	INSIST((prevcnt & WRITER_ACTIVE) == 0 &&
535 	       (prevcnt & ~WRITER_ACTIVE) != 0);
536 
537 	if (prevcnt == READER_INCR) {
538 		/*
539 		 * We are the only reader and have been upgraded.
540 		 * Now jump into the head of the writer waiting queue.
541 		 */
542 		(void)isc_atomic_xadd(&rwl->write_completions, -1);
543 	} else
544 		return (ISC_R_LOCKBUSY);
545 
546 	return (ISC_R_SUCCESS);
547 
548 }
549 
550 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)551 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
552 	isc_int32_t prev_readers;
553 
554 	REQUIRE(VALID_RWLOCK(rwl));
555 
556 	/* Become an active reader. */
557 	prev_readers = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
558 	/* We must have been a writer. */
559 	INSIST((prev_readers & WRITER_ACTIVE) != 0);
560 
561 	/* Complete write */
562 	(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
563 	(void)isc_atomic_xadd(&rwl->write_completions, 1);
564 
565 	/* Resume other readers */
566 	LOCK(&rwl->lock);
567 	if (rwl->readers_waiting > 0)
568 		BROADCAST(&rwl->readable);
569 	UNLOCK(&rwl->lock);
570 }
571 
572 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)573 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
574 	isc_int32_t prev_cnt;
575 
576 	REQUIRE(VALID_RWLOCK(rwl));
577 
578 #ifdef ISC_RWLOCK_TRACE
579 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
580 				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
581 #endif
582 
583 	if (type == isc_rwlocktype_read) {
584 		prev_cnt = isc_atomic_xadd(&rwl->cnt_and_flag, -READER_INCR);
585 
586 		/*
587 		 * If we're the last reader and any writers are waiting, wake
588 		 * them up.  We need to wake up all of them to ensure the
589 		 * FIFO order.
590 		 */
591 		if (prev_cnt == READER_INCR &&
592 		    rwl->write_completions != rwl->write_requests) {
593 			LOCK(&rwl->lock);
594 			BROADCAST(&rwl->writeable);
595 			UNLOCK(&rwl->lock);
596 		}
597 	} else {
598 		isc_boolean_t wakeup_writers = ISC_TRUE;
599 
600 		/*
601 		 * Reset the flag, and (implicitly) tell other writers
602 		 * we are done.
603 		 */
604 		(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
605 		(void)isc_atomic_xadd(&rwl->write_completions, 1);
606 
607 		if (rwl->write_granted >= rwl->write_quota ||
608 		    rwl->write_requests == rwl->write_completions ||
609 		    (rwl->cnt_and_flag & ~WRITER_ACTIVE) != 0) {
610 			/*
611 			 * We have passed the write quota, no writer is
612 			 * waiting, or some readers are almost ready, pending
613 			 * possible writers.  Note that the last case can
614 			 * happen even if write_requests != write_completions
615 			 * (which means a new writer in the queue), so we need
616 			 * to catch the case explicitly.
617 			 */
618 			LOCK(&rwl->lock);
619 			if (rwl->readers_waiting > 0) {
620 				wakeup_writers = ISC_FALSE;
621 				BROADCAST(&rwl->readable);
622 			}
623 			UNLOCK(&rwl->lock);
624 		}
625 
626 		if (rwl->write_requests != rwl->write_completions &&
627 		    wakeup_writers) {
628 			LOCK(&rwl->lock);
629 			BROADCAST(&rwl->writeable);
630 			UNLOCK(&rwl->lock);
631 		}
632 	}
633 
634 #ifdef ISC_RWLOCK_TRACE
635 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
636 				  ISC_MSG_POSTUNLOCK, "postunlock"),
637 		   rwl, type);
638 #endif
639 
640 	return (ISC_R_SUCCESS);
641 }
642 
643 #else /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
644 
645 static isc_result_t
doit(isc_rwlock_t * rwl,isc_rwlocktype_t type,isc_boolean_t nonblock)646 doit(isc_rwlock_t *rwl, isc_rwlocktype_t type, isc_boolean_t nonblock) {
647 	isc_boolean_t skip = ISC_FALSE;
648 	isc_boolean_t done = ISC_FALSE;
649 	isc_result_t result = ISC_R_SUCCESS;
650 
651 	REQUIRE(VALID_RWLOCK(rwl));
652 
653 	LOCK(&rwl->lock);
654 
655 #ifdef ISC_RWLOCK_TRACE
656 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
657 				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
658 #endif
659 
660 	if (type == isc_rwlocktype_read) {
661 		if (rwl->readers_waiting != 0)
662 			skip = ISC_TRUE;
663 		while (!done) {
664 			if (!skip &&
665 			    ((rwl->active == 0 ||
666 			      (rwl->type == isc_rwlocktype_read &&
667 			       (rwl->writers_waiting == 0 ||
668 				rwl->granted < rwl->read_quota)))))
669 			{
670 				rwl->type = isc_rwlocktype_read;
671 				rwl->active++;
672 				rwl->granted++;
673 				done = ISC_TRUE;
674 			} else if (nonblock) {
675 				result = ISC_R_LOCKBUSY;
676 				done = ISC_TRUE;
677 			} else {
678 				skip = ISC_FALSE;
679 				rwl->readers_waiting++;
680 				WAIT(&rwl->readable, &rwl->lock);
681 				rwl->readers_waiting--;
682 			}
683 		}
684 	} else {
685 		if (rwl->writers_waiting != 0)
686 			skip = ISC_TRUE;
687 		while (!done) {
688 			if (!skip && rwl->active == 0) {
689 				rwl->type = isc_rwlocktype_write;
690 				rwl->active = 1;
691 				rwl->granted++;
692 				done = ISC_TRUE;
693 			} else if (nonblock) {
694 				result = ISC_R_LOCKBUSY;
695 				done = ISC_TRUE;
696 			} else {
697 				skip = ISC_FALSE;
698 				rwl->writers_waiting++;
699 				WAIT(&rwl->writeable, &rwl->lock);
700 				rwl->writers_waiting--;
701 			}
702 		}
703 	}
704 
705 #ifdef ISC_RWLOCK_TRACE
706 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
707 				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
708 #endif
709 
710 	UNLOCK(&rwl->lock);
711 
712 	return (result);
713 }
714 
715 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)716 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
717 	return (doit(rwl, type, ISC_FALSE));
718 }
719 
720 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)721 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
722 	return (doit(rwl, type, ISC_TRUE));
723 }
724 
725 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)726 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
727 	isc_result_t result = ISC_R_SUCCESS;
728 
729 	REQUIRE(VALID_RWLOCK(rwl));
730 	LOCK(&rwl->lock);
731 	REQUIRE(rwl->type == isc_rwlocktype_read);
732 	REQUIRE(rwl->active != 0);
733 
734 	/* If we are the only reader then succeed. */
735 	if (rwl->active == 1) {
736 		rwl->original = (rwl->original == isc_rwlocktype_none) ?
737 				isc_rwlocktype_read : isc_rwlocktype_none;
738 		rwl->type = isc_rwlocktype_write;
739 	} else
740 		result = ISC_R_LOCKBUSY;
741 
742 	UNLOCK(&rwl->lock);
743 	return (result);
744 }
745 
746 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)747 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
748 
749 	REQUIRE(VALID_RWLOCK(rwl));
750 	LOCK(&rwl->lock);
751 	REQUIRE(rwl->type == isc_rwlocktype_write);
752 	REQUIRE(rwl->active == 1);
753 
754 	rwl->type = isc_rwlocktype_read;
755 	rwl->original = (rwl->original == isc_rwlocktype_none) ?
756 			isc_rwlocktype_write : isc_rwlocktype_none;
757 	/*
758 	 * Resume processing any read request that were blocked when
759 	 * we upgraded.
760 	 */
761 	if (rwl->original == isc_rwlocktype_none &&
762 	    (rwl->writers_waiting == 0 || rwl->granted < rwl->read_quota) &&
763 	    rwl->readers_waiting > 0)
764 		BROADCAST(&rwl->readable);
765 
766 	UNLOCK(&rwl->lock);
767 }
768 
769 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)770 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
771 
772 	REQUIRE(VALID_RWLOCK(rwl));
773 	LOCK(&rwl->lock);
774 	REQUIRE(rwl->type == type);
775 
776 	UNUSED(type);
777 
778 #ifdef ISC_RWLOCK_TRACE
779 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
780 				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
781 #endif
782 
783 	INSIST(rwl->active > 0);
784 	rwl->active--;
785 	if (rwl->active == 0) {
786 		if (rwl->original != isc_rwlocktype_none) {
787 			rwl->type = rwl->original;
788 			rwl->original = isc_rwlocktype_none;
789 		}
790 		if (rwl->type == isc_rwlocktype_read) {
791 			rwl->granted = 0;
792 			if (rwl->writers_waiting > 0) {
793 				rwl->type = isc_rwlocktype_write;
794 				SIGNAL(&rwl->writeable);
795 			} else if (rwl->readers_waiting > 0) {
796 				/* Does this case ever happen? */
797 				BROADCAST(&rwl->readable);
798 			}
799 		} else {
800 			if (rwl->readers_waiting > 0) {
801 				if (rwl->writers_waiting > 0 &&
802 				    rwl->granted < rwl->write_quota) {
803 					SIGNAL(&rwl->writeable);
804 				} else {
805 					rwl->granted = 0;
806 					rwl->type = isc_rwlocktype_read;
807 					BROADCAST(&rwl->readable);
808 				}
809 			} else if (rwl->writers_waiting > 0) {
810 				rwl->granted = 0;
811 				SIGNAL(&rwl->writeable);
812 			} else {
813 				rwl->granted = 0;
814 			}
815 		}
816 	}
817 	INSIST(rwl->original == isc_rwlocktype_none);
818 
819 #ifdef ISC_RWLOCK_TRACE
820 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
821 				  ISC_MSG_POSTUNLOCK, "postunlock"),
822 		   rwl, type);
823 #endif
824 
825 	UNLOCK(&rwl->lock);
826 
827 	return (ISC_R_SUCCESS);
828 }
829 
830 #endif /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
831 #endif /* !ISC_PLATFORM_USE_NATIVE_RWLOCKS */
832 #else /* ISC_PLATFORM_USETHREADS */
833 
834 isc_result_t
isc_rwlock_init(isc_rwlock_t * rwl,unsigned int read_quota,unsigned int write_quota)835 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
836 		unsigned int write_quota)
837 {
838 	REQUIRE(rwl != NULL);
839 
840 	UNUSED(read_quota);
841 	UNUSED(write_quota);
842 
843 	rwl->type = isc_rwlocktype_read;
844 	rwl->active = 0;
845 	rwl->magic = RWLOCK_MAGIC;
846 
847 	return (ISC_R_SUCCESS);
848 }
849 
850 isc_result_t
isc_rwlock_lock(isc_rwlock_t * rwl,isc_rwlocktype_t type)851 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
852 	REQUIRE(VALID_RWLOCK(rwl));
853 
854 	if (type == isc_rwlocktype_read) {
855 		if (rwl->type != isc_rwlocktype_read && rwl->active != 0)
856 			return (ISC_R_LOCKBUSY);
857 		rwl->type = isc_rwlocktype_read;
858 		rwl->active++;
859 	} else {
860 		if (rwl->active != 0)
861 			return (ISC_R_LOCKBUSY);
862 		rwl->type = isc_rwlocktype_write;
863 		rwl->active = 1;
864 	}
865 	return (ISC_R_SUCCESS);
866 }
867 
868 isc_result_t
isc_rwlock_trylock(isc_rwlock_t * rwl,isc_rwlocktype_t type)869 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
870 	return (isc_rwlock_lock(rwl, type));
871 }
872 
873 isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t * rwl)874 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
875 	isc_result_t result = ISC_R_SUCCESS;
876 
877 	REQUIRE(VALID_RWLOCK(rwl));
878 	REQUIRE(rwl->type == isc_rwlocktype_read);
879 	REQUIRE(rwl->active != 0);
880 
881 	/* If we are the only reader then succeed. */
882 	if (rwl->active == 1)
883 		rwl->type = isc_rwlocktype_write;
884 	else
885 		result = ISC_R_LOCKBUSY;
886 	return (result);
887 }
888 
889 void
isc_rwlock_downgrade(isc_rwlock_t * rwl)890 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
891 
892 	REQUIRE(VALID_RWLOCK(rwl));
893 	REQUIRE(rwl->type == isc_rwlocktype_write);
894 	REQUIRE(rwl->active == 1);
895 
896 	rwl->type = isc_rwlocktype_read;
897 }
898 
899 isc_result_t
isc_rwlock_unlock(isc_rwlock_t * rwl,isc_rwlocktype_t type)900 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
901 	REQUIRE(VALID_RWLOCK(rwl));
902 	REQUIRE(rwl->type == type);
903 
904 	UNUSED(type);
905 
906 	INSIST(rwl->active > 0);
907 	rwl->active--;
908 
909 	return (ISC_R_SUCCESS);
910 }
911 
912 void
isc_rwlock_destroy(isc_rwlock_t * rwl)913 isc_rwlock_destroy(isc_rwlock_t *rwl) {
914 	REQUIRE(rwl != NULL);
915 	REQUIRE(rwl->active == 0);
916 	rwl->magic = 0;
917 }
918 
919 #endif /* ISC_PLATFORM_USETHREADS */
920