xref: /netbsd/external/bsd/ntp/dist/lib/isc/rwlock.c (revision 6550d01e)
1 /*	$NetBSD: rwlock.c,v 1.1.1.1 2009/12/13 16:54:17 kardel Exp $	*/
2 
3 /*
4  * Copyright (C) 2004, 2005, 2007, 2009  Internet Systems Consortium, Inc. ("ISC")
5  * Copyright (C) 1998-2001, 2003  Internet Software Consortium.
6  *
7  * Permission to use, copy, modify, and/or distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
12  * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
13  * AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
14  * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
15  * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
16  * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
17  * PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 /* Id: rwlock.c,v 1.44.332.2 2009/01/18 23:47:41 tbox Exp */
21 
22 /*! \file */
23 
24 #include <config.h>
25 
26 #include <stddef.h>
27 
28 #include <isc/atomic.h>
29 #include <isc/magic.h>
30 #include <isc/msgs.h>
31 #include <isc/platform.h>
32 #include <isc/rwlock.h>
33 #include <isc/util.h>
34 
35 #define RWLOCK_MAGIC		ISC_MAGIC('R', 'W', 'L', 'k')
36 #define VALID_RWLOCK(rwl)	ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
37 
38 #ifdef ISC_PLATFORM_USETHREADS
39 
40 #ifndef RWLOCK_DEFAULT_READ_QUOTA
41 #define RWLOCK_DEFAULT_READ_QUOTA 4
42 #endif
43 
44 #ifndef RWLOCK_DEFAULT_WRITE_QUOTA
45 #define RWLOCK_DEFAULT_WRITE_QUOTA 4
46 #endif
47 
48 #ifdef ISC_RWLOCK_TRACE
49 #include <stdio.h>		/* Required for fprintf/stderr. */
50 #include <isc/thread.h>		/* Required for isc_thread_self(). */
51 
52 static void
53 print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
54 	fprintf(stderr,
55 		isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
56 			       ISC_MSG_PRINTLOCK,
57 			       "rwlock %p thread %lu %s(%s): %s, %u active, "
58 			       "%u granted, %u rwaiting, %u wwaiting\n"),
59 		rwl, isc_thread_self(), operation,
60 		(type == isc_rwlocktype_read ?
61 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
62 				ISC_MSG_READ, "read") :
63 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
64 				ISC_MSG_WRITE, "write")),
65 		(rwl->type == isc_rwlocktype_read ?
66 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
67 				ISC_MSG_READING, "reading") :
68 		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
69 				ISC_MSG_WRITING, "writing")),
70 		rwl->active, rwl->granted, rwl->readers_waiting,
71 		rwl->writers_waiting);
72 }
73 #endif
74 
75 isc_result_t
76 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
77 		unsigned int write_quota)
78 {
79 	isc_result_t result;
80 
81 	REQUIRE(rwl != NULL);
82 
83 	/*
84 	 * In case there's trouble initializing, we zero magic now.  If all
85 	 * goes well, we'll set it to RWLOCK_MAGIC.
86 	 */
87 	rwl->magic = 0;
88 
89 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
90 	rwl->write_requests = 0;
91 	rwl->write_completions = 0;
92 	rwl->cnt_and_flag = 0;
93 	rwl->readers_waiting = 0;
94 	rwl->write_granted = 0;
95 	if (read_quota != 0) {
96 		UNEXPECTED_ERROR(__FILE__, __LINE__,
97 				 "read quota is not supported");
98 	}
99 	if (write_quota == 0)
100 		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
101 	rwl->write_quota = write_quota;
102 #else
103 	rwl->type = isc_rwlocktype_read;
104 	rwl->original = isc_rwlocktype_none;
105 	rwl->active = 0;
106 	rwl->granted = 0;
107 	rwl->readers_waiting = 0;
108 	rwl->writers_waiting = 0;
109 	if (read_quota == 0)
110 		read_quota = RWLOCK_DEFAULT_READ_QUOTA;
111 	rwl->read_quota = read_quota;
112 	if (write_quota == 0)
113 		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
114 	rwl->write_quota = write_quota;
115 #endif
116 
117 	result = isc_mutex_init(&rwl->lock);
118 	if (result != ISC_R_SUCCESS)
119 		return (result);
120 
121 	result = isc_condition_init(&rwl->readable);
122 	if (result != ISC_R_SUCCESS) {
123 		UNEXPECTED_ERROR(__FILE__, __LINE__,
124 				 "isc_condition_init(readable) %s: %s",
125 				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
126 						ISC_MSG_FAILED, "failed"),
127 				 isc_result_totext(result));
128 		result = ISC_R_UNEXPECTED;
129 		goto destroy_lock;
130 	}
131 	result = isc_condition_init(&rwl->writeable);
132 	if (result != ISC_R_SUCCESS) {
133 		UNEXPECTED_ERROR(__FILE__, __LINE__,
134 				 "isc_condition_init(writeable) %s: %s",
135 				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
136 						ISC_MSG_FAILED, "failed"),
137 				 isc_result_totext(result));
138 		result = ISC_R_UNEXPECTED;
139 		goto destroy_rcond;
140 	}
141 
142 	rwl->magic = RWLOCK_MAGIC;
143 
144 	return (ISC_R_SUCCESS);
145 
146   destroy_rcond:
147 	(void)isc_condition_destroy(&rwl->readable);
148   destroy_lock:
149 	DESTROYLOCK(&rwl->lock);
150 
151 	return (result);
152 }
153 
154 void
155 isc_rwlock_destroy(isc_rwlock_t *rwl) {
156 	REQUIRE(VALID_RWLOCK(rwl));
157 
158 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
159 	REQUIRE(rwl->write_requests == rwl->write_completions &&
160 		rwl->cnt_and_flag == 0 && rwl->readers_waiting == 0);
161 #else
162 	LOCK(&rwl->lock);
163 	REQUIRE(rwl->active == 0 &&
164 		rwl->readers_waiting == 0 &&
165 		rwl->writers_waiting == 0);
166 	UNLOCK(&rwl->lock);
167 #endif
168 
169 	rwl->magic = 0;
170 	(void)isc_condition_destroy(&rwl->readable);
171 	(void)isc_condition_destroy(&rwl->writeable);
172 	DESTROYLOCK(&rwl->lock);
173 }
174 
175 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
176 
177 /*
178  * When some architecture-dependent atomic operations are available,
179  * rwlock can be more efficient than the generic algorithm defined below.
180  * The basic algorithm is described in the following URL:
181  *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
182  *
183  * The key is to use the following integer variables modified atomically:
184  *   write_requests, write_completions, and cnt_and_flag.
185  *
186  * write_requests and write_completions act as a waiting queue for writers
187  * in order to ensure the FIFO order.  Both variables begin with the initial
188  * value of 0.  When a new writer tries to get a write lock, it increments
189  * write_requests and gets the previous value of the variable as a "ticket".
190  * When write_completions reaches the ticket number, the new writer can start
191  * writing.  When the writer completes its work, it increments
192  * write_completions so that another new writer can start working.  If the
193  * write_requests is not equal to write_completions, it means a writer is now
194  * working or waiting.  In this case, a new readers cannot start reading, or
195  * in other words, this algorithm basically prefers writers.
196  *
197  * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
198  * variable is a kind of structure with two members: writer_flag (1 bit) and
199  * reader_count (31 bits).  The writer_flag shows whether a writer is working,
200  * and the reader_count shows the number of readers currently working or almost
201  * ready for working.  A writer who has the current "ticket" tries to get the
202  * lock by exclusively setting the writer_flag to 1, provided that the whole
203  * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
204  * a new reader tries to increment the "reader_count" field provided that
205  * the writer_flag is 0 (meaning there is no writer working).
206  *
207  * If some of the above operations fail, the reader or the writer sleeps
208  * until the related condition changes.  When a working reader or writer
209  * completes its work, some readers or writers are sleeping, and the condition
210  * that suspended the reader or writer has changed, it wakes up the sleeping
211  * readers or writers.
212  *
213  * As already noted, this algorithm basically prefers writers.  In order to
214  * prevent readers from starving, however, the algorithm also introduces the
215  * "writer quota" (Q).  When Q consecutive writers have completed their work,
216  * suspending readers, the last writer will wake up the readers, even if a new
217  * writer is waiting.
218  *
219  * Implementation specific note: due to the combination of atomic operations
220  * and a mutex lock, ordering between the atomic operation and locks can be
221  * very sensitive in some cases.  In particular, it is generally very important
222  * to check the atomic variable that requires a reader or writer to sleep after
223  * locking the mutex and before actually sleeping; otherwise, it could be very
224  * likely to cause a deadlock.  For example, assume "var" is a variable
225  * atomically modified, then the corresponding code would be:
226  *	if (var == need_sleep) {
227  *		LOCK(lock);
228  *		if (var == need_sleep)
229  *			WAIT(cond, lock);
230  *		UNLOCK(lock);
231  *	}
232  * The second check is important, since "var" is protected by the atomic
233  * operation, not by the mutex, and can be changed just before sleeping.
234  * (The first "if" could be omitted, but this is also important in order to
235  * make the code efficient by avoiding the use of the mutex unless it is
236  * really necessary.)
237  */
238 
239 #define WRITER_ACTIVE	0x1
240 #define READER_INCR	0x2
241 
242 isc_result_t
243 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
244 	isc_int32_t cntflag;
245 
246 	REQUIRE(VALID_RWLOCK(rwl));
247 
248 #ifdef ISC_RWLOCK_TRACE
249 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
250 				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
251 #endif
252 
253 	if (type == isc_rwlocktype_read) {
254 		if (rwl->write_requests != rwl->write_completions) {
255 			/* there is a waiting or active writer */
256 			LOCK(&rwl->lock);
257 			if (rwl->write_requests != rwl->write_completions) {
258 				rwl->readers_waiting++;
259 				WAIT(&rwl->readable, &rwl->lock);
260 				rwl->readers_waiting--;
261 			}
262 			UNLOCK(&rwl->lock);
263 		}
264 
265 		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
266 		while (1) {
267 			if ((rwl->cnt_and_flag & WRITER_ACTIVE) == 0)
268 				break;
269 
270 			/* A writer is still working */
271 			LOCK(&rwl->lock);
272 			rwl->readers_waiting++;
273 			if ((rwl->cnt_and_flag & WRITER_ACTIVE) != 0)
274 				WAIT(&rwl->readable, &rwl->lock);
275 			rwl->readers_waiting--;
276 			UNLOCK(&rwl->lock);
277 
278 			/*
279 			 * Typically, the reader should be able to get a lock
280 			 * at this stage:
281 			 *   (1) there should have been no pending writer when
282 			 *       the reader was trying to increment the
283 			 *       counter; otherwise, the writer should be in
284 			 *       the waiting queue, preventing the reader from
285 			 *       proceeding to this point.
286 			 *   (2) once the reader increments the counter, no
287 			 *       more writer can get a lock.
288 			 * Still, it is possible another writer can work at
289 			 * this point, e.g. in the following scenario:
290 			 *   A previous writer unlocks the writer lock.
291 			 *   This reader proceeds to point (1).
292 			 *   A new writer appears, and gets a new lock before
293 			 *   the reader increments the counter.
294 			 *   The reader then increments the counter.
295 			 *   The previous writer notices there is a waiting
296 			 *   reader who is almost ready, and wakes it up.
297 			 * So, the reader needs to confirm whether it can now
298 			 * read explicitly (thus we loop).  Note that this is
299 			 * not an infinite process, since the reader has
300 			 * incremented the counter at this point.
301 			 */
302 		}
303 
304 		/*
305 		 * If we are temporarily preferred to writers due to the writer
306 		 * quota, reset the condition (race among readers doesn't
307 		 * matter).
308 		 */
309 		rwl->write_granted = 0;
310 	} else {
311 		isc_int32_t prev_writer;
312 
313 		/* enter the waiting queue, and wait for our turn */
314 		prev_writer = isc_atomic_xadd(&rwl->write_requests, 1);
315 		while (rwl->write_completions != prev_writer) {
316 			LOCK(&rwl->lock);
317 			if (rwl->write_completions != prev_writer) {
318 				WAIT(&rwl->writeable, &rwl->lock);
319 				UNLOCK(&rwl->lock);
320 				continue;
321 			}
322 			UNLOCK(&rwl->lock);
323 			break;
324 		}
325 
326 		while (1) {
327 			cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
328 						     WRITER_ACTIVE);
329 			if (cntflag == 0)
330 				break;
331 
332 			/* Another active reader or writer is working. */
333 			LOCK(&rwl->lock);
334 			if (rwl->cnt_and_flag != 0)
335 				WAIT(&rwl->writeable, &rwl->lock);
336 			UNLOCK(&rwl->lock);
337 		}
338 
339 		INSIST((rwl->cnt_and_flag & WRITER_ACTIVE) != 0);
340 		rwl->write_granted++;
341 	}
342 
343 #ifdef ISC_RWLOCK_TRACE
344 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
345 				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
346 #endif
347 
348 	return (ISC_R_SUCCESS);
349 }
350 
351 isc_result_t
352 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
353 	isc_int32_t cntflag;
354 
355 	REQUIRE(VALID_RWLOCK(rwl));
356 
357 #ifdef ISC_RWLOCK_TRACE
358 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
359 				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
360 #endif
361 
362 	if (type == isc_rwlocktype_read) {
363 		/* If a writer is waiting or working, we fail. */
364 		if (rwl->write_requests != rwl->write_completions)
365 			return (ISC_R_LOCKBUSY);
366 
367 		/* Otherwise, be ready for reading. */
368 		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
369 		if ((cntflag & WRITER_ACTIVE) != 0) {
370 			/*
371 			 * A writer is working.  We lose, and cancel the read
372 			 * request.
373 			 */
374 			cntflag = isc_atomic_xadd(&rwl->cnt_and_flag,
375 						  -READER_INCR);
376 			/*
377 			 * If no other readers are waiting and we've suspended
378 			 * new writers in this short period, wake them up.
379 			 */
380 			if (cntflag == READER_INCR &&
381 			    rwl->write_completions != rwl->write_requests) {
382 				LOCK(&rwl->lock);
383 				BROADCAST(&rwl->writeable);
384 				UNLOCK(&rwl->lock);
385 			}
386 
387 			return (ISC_R_LOCKBUSY);
388 		}
389 	} else {
390 		/* Try locking without entering the waiting queue. */
391 		cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
392 					     WRITER_ACTIVE);
393 		if (cntflag != 0)
394 			return (ISC_R_LOCKBUSY);
395 
396 		/*
397 		 * XXXJT: jump into the queue, possibly breaking the writer
398 		 * order.
399 		 */
400 		(void)isc_atomic_xadd(&rwl->write_completions, -1);
401 
402 		rwl->write_granted++;
403 	}
404 
405 #ifdef ISC_RWLOCK_TRACE
406 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
407 				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
408 #endif
409 
410 	return (ISC_R_SUCCESS);
411 }
412 
413 isc_result_t
414 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
415 	isc_int32_t prevcnt;
416 
417 	REQUIRE(VALID_RWLOCK(rwl));
418 
419 	/* Try to acquire write access. */
420 	prevcnt = isc_atomic_cmpxchg(&rwl->cnt_and_flag,
421 				     READER_INCR, WRITER_ACTIVE);
422 	/*
423 	 * There must have been no writer, and there must have been at least
424 	 * one reader.
425 	 */
426 	INSIST((prevcnt & WRITER_ACTIVE) == 0 &&
427 	       (prevcnt & ~WRITER_ACTIVE) != 0);
428 
429 	if (prevcnt == READER_INCR) {
430 		/*
431 		 * We are the only reader and have been upgraded.
432 		 * Now jump into the head of the writer waiting queue.
433 		 */
434 		(void)isc_atomic_xadd(&rwl->write_completions, -1);
435 	} else
436 		return (ISC_R_LOCKBUSY);
437 
438 	return (ISC_R_SUCCESS);
439 
440 }
441 
442 void
443 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
444 	isc_int32_t prev_readers;
445 
446 	REQUIRE(VALID_RWLOCK(rwl));
447 
448 	/* Become an active reader. */
449 	prev_readers = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
450 	/* We must have been a writer. */
451 	INSIST((prev_readers & WRITER_ACTIVE) != 0);
452 
453 	/* Complete write */
454 	(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
455 	(void)isc_atomic_xadd(&rwl->write_completions, 1);
456 
457 	/* Resume other readers */
458 	LOCK(&rwl->lock);
459 	if (rwl->readers_waiting > 0)
460 		BROADCAST(&rwl->readable);
461 	UNLOCK(&rwl->lock);
462 }
463 
464 isc_result_t
465 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
466 	isc_int32_t prev_cnt;
467 
468 	REQUIRE(VALID_RWLOCK(rwl));
469 
470 #ifdef ISC_RWLOCK_TRACE
471 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
472 				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
473 #endif
474 
475 	if (type == isc_rwlocktype_read) {
476 		prev_cnt = isc_atomic_xadd(&rwl->cnt_and_flag, -READER_INCR);
477 
478 		/*
479 		 * If we're the last reader and any writers are waiting, wake
480 		 * them up.  We need to wake up all of them to ensure the
481 		 * FIFO order.
482 		 */
483 		if (prev_cnt == READER_INCR &&
484 		    rwl->write_completions != rwl->write_requests) {
485 			LOCK(&rwl->lock);
486 			BROADCAST(&rwl->writeable);
487 			UNLOCK(&rwl->lock);
488 		}
489 	} else {
490 		isc_boolean_t wakeup_writers = ISC_TRUE;
491 
492 		/*
493 		 * Reset the flag, and (implicitly) tell other writers
494 		 * we are done.
495 		 */
496 		(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
497 		(void)isc_atomic_xadd(&rwl->write_completions, 1);
498 
499 		if (rwl->write_granted >= rwl->write_quota ||
500 		    rwl->write_requests == rwl->write_completions ||
501 		    (rwl->cnt_and_flag & ~WRITER_ACTIVE) != 0) {
502 			/*
503 			 * We have passed the write quota, no writer is
504 			 * waiting, or some readers are almost ready, pending
505 			 * possible writers.  Note that the last case can
506 			 * happen even if write_requests != write_completions
507 			 * (which means a new writer in the queue), so we need
508 			 * to catch the case explicitly.
509 			 */
510 			LOCK(&rwl->lock);
511 			if (rwl->readers_waiting > 0) {
512 				wakeup_writers = ISC_FALSE;
513 				BROADCAST(&rwl->readable);
514 			}
515 			UNLOCK(&rwl->lock);
516 		}
517 
518 		if (rwl->write_requests != rwl->write_completions &&
519 		    wakeup_writers) {
520 			LOCK(&rwl->lock);
521 			BROADCAST(&rwl->writeable);
522 			UNLOCK(&rwl->lock);
523 		}
524 	}
525 
526 #ifdef ISC_RWLOCK_TRACE
527 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
528 				  ISC_MSG_POSTUNLOCK, "postunlock"),
529 		   rwl, type);
530 #endif
531 
532 	return (ISC_R_SUCCESS);
533 }
534 
535 #else /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
536 
537 static isc_result_t
538 doit(isc_rwlock_t *rwl, isc_rwlocktype_t type, isc_boolean_t nonblock) {
539 	isc_boolean_t skip = ISC_FALSE;
540 	isc_boolean_t done = ISC_FALSE;
541 	isc_result_t result = ISC_R_SUCCESS;
542 
543 	REQUIRE(VALID_RWLOCK(rwl));
544 
545 	LOCK(&rwl->lock);
546 
547 #ifdef ISC_RWLOCK_TRACE
548 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
549 				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
550 #endif
551 
552 	if (type == isc_rwlocktype_read) {
553 		if (rwl->readers_waiting != 0)
554 			skip = ISC_TRUE;
555 		while (!done) {
556 			if (!skip &&
557 			    ((rwl->active == 0 ||
558 			      (rwl->type == isc_rwlocktype_read &&
559 			       (rwl->writers_waiting == 0 ||
560 				rwl->granted < rwl->read_quota)))))
561 			{
562 				rwl->type = isc_rwlocktype_read;
563 				rwl->active++;
564 				rwl->granted++;
565 				done = ISC_TRUE;
566 			} else if (nonblock) {
567 				result = ISC_R_LOCKBUSY;
568 				done = ISC_TRUE;
569 			} else {
570 				skip = ISC_FALSE;
571 				rwl->readers_waiting++;
572 				WAIT(&rwl->readable, &rwl->lock);
573 				rwl->readers_waiting--;
574 			}
575 		}
576 	} else {
577 		if (rwl->writers_waiting != 0)
578 			skip = ISC_TRUE;
579 		while (!done) {
580 			if (!skip && rwl->active == 0) {
581 				rwl->type = isc_rwlocktype_write;
582 				rwl->active = 1;
583 				rwl->granted++;
584 				done = ISC_TRUE;
585 			} else if (nonblock) {
586 				result = ISC_R_LOCKBUSY;
587 				done = ISC_TRUE;
588 			} else {
589 				skip = ISC_FALSE;
590 				rwl->writers_waiting++;
591 				WAIT(&rwl->writeable, &rwl->lock);
592 				rwl->writers_waiting--;
593 			}
594 		}
595 	}
596 
597 #ifdef ISC_RWLOCK_TRACE
598 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
599 				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
600 #endif
601 
602 	UNLOCK(&rwl->lock);
603 
604 	return (result);
605 }
606 
607 isc_result_t
608 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
609 	return (doit(rwl, type, ISC_FALSE));
610 }
611 
612 isc_result_t
613 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
614 	return (doit(rwl, type, ISC_TRUE));
615 }
616 
617 isc_result_t
618 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
619 	isc_result_t result = ISC_R_SUCCESS;
620 
621 	REQUIRE(VALID_RWLOCK(rwl));
622 	LOCK(&rwl->lock);
623 	REQUIRE(rwl->type == isc_rwlocktype_read);
624 	REQUIRE(rwl->active != 0);
625 
626 	/* If we are the only reader then succeed. */
627 	if (rwl->active == 1) {
628 		rwl->original = (rwl->original == isc_rwlocktype_none) ?
629 				isc_rwlocktype_read : isc_rwlocktype_none;
630 		rwl->type = isc_rwlocktype_write;
631 	} else
632 		result = ISC_R_LOCKBUSY;
633 
634 	UNLOCK(&rwl->lock);
635 	return (result);
636 }
637 
638 void
639 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
640 
641 	REQUIRE(VALID_RWLOCK(rwl));
642 	LOCK(&rwl->lock);
643 	REQUIRE(rwl->type == isc_rwlocktype_write);
644 	REQUIRE(rwl->active == 1);
645 
646 	rwl->type = isc_rwlocktype_read;
647 	rwl->original = (rwl->original == isc_rwlocktype_none) ?
648 			isc_rwlocktype_write : isc_rwlocktype_none;
649 	/*
650 	 * Resume processing any read request that were blocked when
651 	 * we upgraded.
652 	 */
653 	if (rwl->original == isc_rwlocktype_none &&
654 	    (rwl->writers_waiting == 0 || rwl->granted < rwl->read_quota) &&
655 	    rwl->readers_waiting > 0)
656 		BROADCAST(&rwl->readable);
657 
658 	UNLOCK(&rwl->lock);
659 }
660 
661 isc_result_t
662 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
663 
664 	REQUIRE(VALID_RWLOCK(rwl));
665 	LOCK(&rwl->lock);
666 	REQUIRE(rwl->type == type);
667 
668 	UNUSED(type);
669 
670 #ifdef ISC_RWLOCK_TRACE
671 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
672 				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
673 #endif
674 
675 	INSIST(rwl->active > 0);
676 	rwl->active--;
677 	if (rwl->active == 0) {
678 		if (rwl->original != isc_rwlocktype_none) {
679 			rwl->type = rwl->original;
680 			rwl->original = isc_rwlocktype_none;
681 		}
682 		if (rwl->type == isc_rwlocktype_read) {
683 			rwl->granted = 0;
684 			if (rwl->writers_waiting > 0) {
685 				rwl->type = isc_rwlocktype_write;
686 				SIGNAL(&rwl->writeable);
687 			} else if (rwl->readers_waiting > 0) {
688 				/* Does this case ever happen? */
689 				BROADCAST(&rwl->readable);
690 			}
691 		} else {
692 			if (rwl->readers_waiting > 0) {
693 				if (rwl->writers_waiting > 0 &&
694 				    rwl->granted < rwl->write_quota) {
695 					SIGNAL(&rwl->writeable);
696 				} else {
697 					rwl->granted = 0;
698 					rwl->type = isc_rwlocktype_read;
699 					BROADCAST(&rwl->readable);
700 				}
701 			} else if (rwl->writers_waiting > 0) {
702 				rwl->granted = 0;
703 				SIGNAL(&rwl->writeable);
704 			} else {
705 				rwl->granted = 0;
706 			}
707 		}
708 	}
709 	INSIST(rwl->original == isc_rwlocktype_none);
710 
711 #ifdef ISC_RWLOCK_TRACE
712 	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
713 				  ISC_MSG_POSTUNLOCK, "postunlock"),
714 		   rwl, type);
715 #endif
716 
717 	UNLOCK(&rwl->lock);
718 
719 	return (ISC_R_SUCCESS);
720 }
721 
722 #endif /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
723 #else /* ISC_PLATFORM_USETHREADS */
724 
725 isc_result_t
726 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
727 		unsigned int write_quota)
728 {
729 	REQUIRE(rwl != NULL);
730 
731 	UNUSED(read_quota);
732 	UNUSED(write_quota);
733 
734 	rwl->type = isc_rwlocktype_read;
735 	rwl->active = 0;
736 	rwl->magic = RWLOCK_MAGIC;
737 
738 	return (ISC_R_SUCCESS);
739 }
740 
741 isc_result_t
742 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
743 	REQUIRE(VALID_RWLOCK(rwl));
744 
745 	if (type == isc_rwlocktype_read) {
746 		if (rwl->type != isc_rwlocktype_read && rwl->active != 0)
747 			return (ISC_R_LOCKBUSY);
748 		rwl->type = isc_rwlocktype_read;
749 		rwl->active++;
750 	} else {
751 		if (rwl->active != 0)
752 			return (ISC_R_LOCKBUSY);
753 		rwl->type = isc_rwlocktype_write;
754 		rwl->active = 1;
755 	}
756 	return (ISC_R_SUCCESS);
757 }
758 
759 isc_result_t
760 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
761 	return (isc_rwlock_lock(rwl, type));
762 }
763 
764 isc_result_t
765 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
766 	isc_result_t result = ISC_R_SUCCESS;
767 
768 	REQUIRE(VALID_RWLOCK(rwl));
769 	REQUIRE(rwl->type == isc_rwlocktype_read);
770 	REQUIRE(rwl->active != 0);
771 
772 	/* If we are the only reader then succeed. */
773 	if (rwl->active == 1)
774 		rwl->type = isc_rwlocktype_write;
775 	else
776 		result = ISC_R_LOCKBUSY;
777 	return (result);
778 }
779 
780 void
781 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
782 
783 	REQUIRE(VALID_RWLOCK(rwl));
784 	REQUIRE(rwl->type == isc_rwlocktype_write);
785 	REQUIRE(rwl->active == 1);
786 
787 	rwl->type = isc_rwlocktype_read;
788 }
789 
790 isc_result_t
791 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
792 	REQUIRE(VALID_RWLOCK(rwl));
793 	REQUIRE(rwl->type == type);
794 
795 	UNUSED(type);
796 
797 	INSIST(rwl->active > 0);
798 	rwl->active--;
799 
800 	return (ISC_R_SUCCESS);
801 }
802 
803 void
804 isc_rwlock_destroy(isc_rwlock_t *rwl) {
805 	REQUIRE(rwl != NULL);
806 	REQUIRE(rwl->active == 0);
807 	rwl->magic = 0;
808 }
809 
810 #endif /* ISC_PLATFORM_USETHREADS */
811