summaryrefslogtreecommitdiff
path: root/usr/src/uts/common/os/rwlock.c
blob: cf5e635f8aebecb25d9c9befb726d26aee4773de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

/*
 * Copyright 2019 Joyent, Inc.
 */

#include <sys/param.h>
#include <sys/thread.h>
#include <sys/cmn_err.h>
#include <sys/debug.h>
#include <sys/cpuvar.h>
#include <sys/sobject.h>
#include <sys/turnstile.h>
#include <sys/rwlock.h>
#include <sys/rwlock_impl.h>
#include <sys/atomic.h>
#include <sys/lockstat.h>

/*
 * Big Theory Statement for readers/writer locking primitives.
 *
 * An rwlock provides exclusive access to a single thread ("writer") or
 * concurrent access to multiple threads ("readers").  See rwlock(9F)
 * for a full description of the interfaces and programming model.
 * The rest of this comment describes the implementation.
 *
 * An rwlock is a single word with the following structure:
 *
 *	---------------------------------------------------------------------
 *	| OWNER (writer) or HOLD COUNT (readers)   | WRLOCK | WRWANT | WAIT |
 *	---------------------------------------------------------------------
 *			63 / 31 .. 3			2	1	0
 *
 * The waiters bit (0) indicates whether any threads are blocked waiting
 * for the lock.  The write-wanted bit (1) indicates whether any threads
 * are blocked waiting for write access.  The write-locked bit (2) indicates
 * whether the lock is held by a writer, which determines whether the upper
 * bits (3..31 in ILP32, 3..63 in LP64) should be interpreted as the owner
 * (thread pointer) or the hold count (number of readers).
 *
 * In the absence of any contention, a writer gets the lock by setting
 * this word to (curthread | RW_WRITE_LOCKED); a reader gets the lock
 * by incrementing the hold count (i.e. adding 8, aka RW_READ_LOCK).
 *
 * A writer will fail to acquire the lock if any other thread owns it.
 * A reader will fail if the lock is either owned (in the RW_READER and
 * RW_READER_STARVEWRITER cases) or wanted by a writer (in the RW_READER
 * case). rw_tryenter() returns 0 in these cases; rw_enter() blocks until
 * the lock becomes available.
 *
 * When a thread blocks it acquires the rwlock's hashed turnstile lock and
 * attempts to set RW_HAS_WAITERS (and RW_WRITE_WANTED in the writer case)
 * atomically *only if the lock still appears busy*.  A thread must never
 * accidentally block for an available lock since there would be no owner
 * to awaken it.  casip() provides the required atomicity.  Once casip()
 * succeeds, the decision to block becomes final and irreversible.  The
 * thread will not become runnable again until it has been granted ownership
 * of the lock via direct handoff from a former owner as described below.
 *
 * In the absence of any waiters, rw_exit() just clears the lock (if it
 * is write-locked) or decrements the hold count (if it is read-locked).
 * Note that even if waiters are present, decrementing the hold count
 * to a non-zero value requires no special action since the lock is still
 * held by at least one other thread.
 *
 * On the "final exit" (transition to unheld state) of a lock with waiters,
 * rw_exit_wakeup() grabs the turnstile lock and transfers ownership directly
 * to the next writer or set of readers.  There are several advantages to this
 * approach: (1) it closes all windows for priority inversion (when a new
 * writer has grabbed the lock but has not yet inherited from blocked readers);
 * (2) it prevents starvation of equal-priority threads by granting the lock
 * in FIFO order; (3) it eliminates the need for a write-wanted count -- a
 * single bit suffices because the lock remains held until all waiting
 * writers are gone; (4) when we awaken N readers we can perform a single
 * "atomic_add(&x, N)" to set the total hold count rather than having all N
 * threads fight for the cache to perform an "atomic_add(&x, 1)" upon wakeup.
 *
 * The most interesting policy decision in rw_exit_wakeup() is which thread
 * to wake.  Starvation is always possible with priority-based scheduling,
 * but any sane wakeup policy should at least satisfy these requirements:
 *
 * (1) The highest-priority thread in the system should not starve.
 * (2) The highest-priority writer should not starve.
 * (3) No writer should starve due to lower-priority threads.
 * (4) No reader should starve due to lower-priority writers.
 * (5) If all threads have equal priority, none of them should starve.
 *
 * We used to employ a writers-always-win policy, which doesn't even
 * satisfy (1): a steady stream of low-priority writers can starve out
 * a real-time reader!  This is clearly a broken policy -- it violates
 * (1), (4), and (5) -- but it's how rwlocks always used to behave.
 *
 * A round-robin policy (exiting readers grant the lock to blocked writers
 * and vice versa) satisfies all but (3): a single high-priority writer
 * and many low-priority readers can starve out medium-priority writers.
 *
 * A strict priority policy (grant the lock to the highest priority blocked
 * thread) satisfies everything but (2): a steady stream of high-priority
 * readers can permanently starve the highest-priority writer.
 *
 * The reason we care about (2) is that it's important to process writers
 * reasonably quickly -- even if they're low priority -- because their very
 * presence causes all readers to take the slow (blocking) path through this
 * code.  There is also a general sense that writers deserve some degree of
 * deference because they're updating the data upon which all readers act.
 * Presumably this data should not be allowed to become arbitrarily stale
 * due to writer starvation.  Finally, it seems reasonable to level the
 * playing field a bit to compensate for the fact that it's so much harder
 * for a writer to get in when there are already many readers present.
 *
 * A hybrid of round-robin and strict priority can be made to satisfy
 * all five criteria.  In this "writer priority policy" exiting readers
 * always grant the lock to waiting writers, but exiting writers only
 * grant the lock to readers of the same or higher priority than the
 * highest-priority blocked writer.  Thus requirement (2) is satisfied,
 * necessarily, by a willful act of priority inversion: an exiting reader
 * will grant the lock to a blocked writer even if there are blocked
 * readers of higher priority.  The situation is mitigated by the fact
 * that writers always inherit priority from blocked readers, and the
 * writer will awaken those readers as soon as it exits the lock.
 *
 * Finally, note that this hybrid scheme -- and indeed, any scheme that
 * satisfies requirement (2) -- has an important consequence:  if a lock is
 * held as reader and a writer subsequently becomes blocked, any further
 * readers must be blocked to avoid writer starvation.  This implementation
 * detail has ramifications for the semantics of rwlocks, as it prohibits
 * recursively acquiring an rwlock as reader: any writer that wishes to
 * acquire the lock after the first but before the second acquisition as
 * reader will block the second acquisition -- resulting in deadlock.  This
 * itself is not necessarily prohibitive, as it is often straightforward to
 * prevent a single thread from recursively acquiring an rwlock as reader.
 * However, a more subtle situation arises when both a traditional mutex and
 * a reader lock are acquired by two different threads in opposite order.
 * (That is, one thread first acquires the mutex and then the rwlock as
 * reader; the other acquires the rwlock as reader and then the mutex.) As
 * with the single threaded case, this is fine absent a blocked writer: the
 * thread that acquires the mutex before acquiring the rwlock as reader will
 * be able to successfully acquire the rwlock -- even as/if the other thread
 * has the rwlock as reader and is blocked on the held mutex.  However, if
 * an unrelated writer (that is, a third thread) becomes blocked on the
 * rwlock after the first thread acquires the rwlock as reader but before
 * it's able to acquire the mutex, the second thread -- with the mutex held
 * -- will not be able to acquire the rwlock as reader due to the waiting
 * writer, deadlocking the three threads.  Unlike the single-threaded
 * (recursive) rwlock acquisition case, this case can be quite a bit
 * thornier to fix, especially as there is nothing inherently wrong in the
 * locking strategy: the deadlock is really induced by requirement (2), not
 * the consumers of the rwlock.  To permit such consumers, we allow rwlock
 * acquirers to explicitly opt out of requirement (2) by specifying
 * RW_READER_STARVEWRITER when acquiring the rwlock.  This (obviously) means
 * that inifinite readers can starve writers, but it also allows for
 * multiple readers in the presence of other synchronization primitives
 * without regard for lock-ordering.  And while certainly odd (and perhaps
 * unwise), RW_READER_STARVEWRITER can be safely used alongside RW_READER on
 * the same lock -- RW_READER_STARVEWRITER describes only the act of lock
 * acquisition with respect to waiting writers, not the lock itself.
 *
 * rw_downgrade() follows the same wakeup policy as an exiting writer.
 *
 * rw_tryupgrade() has the same failure mode as rw_tryenter() for a
 * write lock.  Both honor the WRITE_WANTED bit by specification.
 *
 * The following rules apply to manipulation of rwlock internal state:
 *
 * (1) The rwlock is only modified via the atomic primitives casip()
 *     and atomic_add_ip().
 *
 * (2) The waiters bit and write-wanted bit are only modified under
 *     turnstile_lookup().  This ensures that the turnstile is consistent
 *     with the rwlock.
 *
 * (3) Waiters receive the lock by direct handoff from the previous
 *     owner.  Therefore, waiters *always* wake up holding the lock.
 */

/*
 * The sobj_ops vector exports a set of functions needed when a thread
 * is asleep on a synchronization object of a given type.
 */
static sobj_ops_t rw_sobj_ops = {
	SOBJ_RWLOCK, rw_owner, turnstile_stay_asleep, turnstile_change_pri
};

/*
 * If the system panics on an rwlock, save the address of the offending
 * rwlock in panic_rwlock_addr, and save the contents in panic_rwlock.
 */
static rwlock_impl_t panic_rwlock;
static rwlock_impl_t *panic_rwlock_addr;

static void
rw_panic(char *msg, rwlock_impl_t *lp)
{
	if (panicstr)
		return;

	if (atomic_cas_ptr(&panic_rwlock_addr, NULL, lp) == NULL)
		panic_rwlock = *lp;

	panic("%s, lp=%p wwwh=%lx thread=%p",
	    msg, (void *)lp, panic_rwlock.rw_wwwh, (void *)curthread);
}

/* ARGSUSED */
void
rw_init(krwlock_t *rwlp, char *name, krw_type_t type, void *arg)
{
	((rwlock_impl_t *)rwlp)->rw_wwwh = 0;
}

void
rw_destroy(krwlock_t *rwlp)
{
	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;

	if (lp->rw_wwwh != 0) {
		if ((lp->rw_wwwh & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK)
			rw_panic("rw_destroy: lock already destroyed", lp);
		else
			rw_panic("rw_destroy: lock still active", lp);
	}

	lp->rw_wwwh = RW_DOUBLE_LOCK;
}

/*
 * Verify that an rwlock is held correctly.
 */
static int
rw_locked(rwlock_impl_t *lp, krw_t rw)
{
	uintptr_t old = lp->rw_wwwh;

	if (rw == RW_READER || rw == RW_READER_STARVEWRITER)
		return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));

	if (rw == RW_WRITER)
		return ((old & RW_OWNER) == (uintptr_t)curthread);

	return (0);
}

uint_t (*rw_lock_backoff)(uint_t) = NULL;
void (*rw_lock_delay)(uint_t) = NULL;

/*
 * Full-service implementation of rw_enter() to handle all the hard cases.
 * Called from the assembly version if anything complicated is going on.
 */
void
rw_enter_sleep(rwlock_impl_t *lp, krw_t rw)
{
	uintptr_t old, new, lock_value, lock_busy, lock_wait;
	hrtime_t sleep_time;
	turnstile_t *ts;
	uint_t  backoff = 0;
	int loop_count = 0;

	if (rw == RW_READER) {
		lock_value = RW_READ_LOCK;
		lock_busy = RW_WRITE_CLAIMED;
		lock_wait = RW_HAS_WAITERS;
	} else if (rw == RW_READER_STARVEWRITER) {
		lock_value = RW_READ_LOCK;
		lock_busy = RW_WRITE_LOCKED;
		lock_wait = RW_HAS_WAITERS;
	} else {
		lock_value = RW_WRITE_LOCK(curthread);
		lock_busy = (uintptr_t)RW_LOCKED;
		lock_wait = RW_HAS_WAITERS | RW_WRITE_WANTED;
	}

	for (;;) {
		if (((old = lp->rw_wwwh) & lock_busy) == 0) {
			if (casip(&lp->rw_wwwh, old, old + lock_value) != old) {
				if (rw_lock_delay != NULL) {
					backoff = rw_lock_backoff(backoff);
					rw_lock_delay(backoff);
					if (++loop_count == ncpus_online) {
						backoff = 0;
						loop_count = 0;
					}
				}
				continue;
			}
			break;
		}

		if (panicstr)
			return;

		if ((old & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK) {
			rw_panic("rw_enter: bad rwlock", lp);
			return;
		}

		if ((old & RW_OWNER) == (uintptr_t)curthread) {
			rw_panic("recursive rw_enter", lp);
			return;
		}

		ts = turnstile_lookup(lp);

		do {
			if (((old = lp->rw_wwwh) & lock_busy) == 0)
				break;
			new = old | lock_wait;
		} while (old != new && casip(&lp->rw_wwwh, old, new) != old);

		if ((old & lock_busy) == 0) {
			/*
			 * The lock appears free now; try the dance again
			 */
			turnstile_exit(lp);
			continue;
		}

		/*
		 * We really are going to block, so bump the stats.
		 */
		ASSERT(lp->rw_wwwh & lock_wait);
		ASSERT(lp->rw_wwwh & RW_LOCKED);

		sleep_time = -gethrtime();
		if (rw != RW_WRITER) {
			CPU_STATS_ADDQ(CPU, sys, rw_rdfails, 1);
			(void) turnstile_block(ts, TS_READER_Q, lp,
			    &rw_sobj_ops, NULL, NULL);
		} else {
			CPU_STATS_ADDQ(CPU, sys, rw_wrfails, 1);
			(void) turnstile_block(ts, TS_WRITER_Q, lp,
			    &rw_sobj_ops, NULL, NULL);
		}
		sleep_time += gethrtime();

		LOCKSTAT_RECORD4(LS_RW_ENTER_BLOCK, lp, sleep_time, rw,
		    (old & RW_WRITE_LOCKED) ? 1 : 0,
		    old >> RW_HOLD_COUNT_SHIFT);

		/*
		 * We wake up holding the lock via direct handoff from the
		 * previous owner.
		 */
		break;
	}

	ASSERT(rw_locked(lp, rw));

	membar_enter();

	LOCKSTAT_RECORD(LS_RW_ENTER_ACQUIRE, lp, rw);
}

/*
 * Return the number of readers to wake, or zero if we should wake a writer.
 * Called only by exiting/downgrading writers (readers don't wake readers).
 */
static int
rw_readers_to_wake(turnstile_t *ts)
{
	kthread_t *next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
	kthread_t *next_reader = ts->ts_sleepq[TS_READER_Q].sq_first;
	pri_t wpri = (next_writer != NULL) ? DISP_PRIO(next_writer) : -1;
	int count = 0;

	while (next_reader != NULL) {
		if (DISP_PRIO(next_reader) < wpri)
			break;
		next_reader = next_reader->t_link;
		count++;
	}
	return (count);
}

/*
 * Full-service implementation of rw_exit() to handle all the hard cases.
 * Called from the assembly version if anything complicated is going on.
 * There is no semantic difference between calling rw_exit() and calling
 * rw_exit_wakeup() directly.
 */
void
rw_exit_wakeup(rwlock_impl_t *lp)
{
	turnstile_t *ts;
	uintptr_t old, new, lock_value;
	kthread_t *next_writer;
	int nreaders;
	uint_t  backoff = 0;
	int loop_count = 0;

	membar_exit();

	old = lp->rw_wwwh;
	if (old & RW_WRITE_LOCKED) {
		if ((old & RW_OWNER) != (uintptr_t)curthread) {
			rw_panic("rw_exit: not owner", lp);
			lp->rw_wwwh = 0;
			return;
		}
		lock_value = RW_WRITE_LOCK(curthread);
	} else {
		if ((old & RW_LOCKED) == 0) {
			rw_panic("rw_exit: lock not held", lp);
			return;
		}
		lock_value = RW_READ_LOCK;
	}

	for (;;) {
		/*
		 * If this is *not* the final exit of a lock with waiters,
		 * just drop the lock -- there's nothing tricky going on.
		 */
		old = lp->rw_wwwh;
		new = old - lock_value;
		if ((new & (RW_LOCKED | RW_HAS_WAITERS)) != RW_HAS_WAITERS) {
			if (casip(&lp->rw_wwwh, old, new) != old) {
				if (rw_lock_delay != NULL) {
					backoff = rw_lock_backoff(backoff);
					rw_lock_delay(backoff);
					if (++loop_count == ncpus_online) {
						backoff = 0;
						loop_count = 0;
					}
				}
				continue;
			}
			break;
		}

		/*
		 * This appears to be the final exit of a lock with waiters.
		 * If we do not have the lock as writer (that is, if this is
		 * the last exit of a reader with waiting writers), we will
		 * grab the lock as writer to prevent additional readers.
		 * (This is required because a reader that is acquiring the
		 * lock via RW_READER_STARVEWRITER will not observe the
		 * RW_WRITE_WANTED bit -- and we could therefore be racing
		 * with such readers here.)
		 */
		if (!(old & RW_WRITE_LOCKED)) {
			new = RW_WRITE_LOCK(curthread) |
			    RW_HAS_WAITERS | RW_WRITE_WANTED;

			if (casip(&lp->rw_wwwh, old, new) != old)
				continue;
		}

		/*
		 * Perform the final exit of a lock that has waiters.
		 */
		ts = turnstile_lookup(lp);

		next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;

		if ((old & RW_WRITE_LOCKED) &&
		    (nreaders = rw_readers_to_wake(ts)) > 0) {
			/*
			 * Don't drop the lock -- just set the hold count
			 * such that we grant the lock to all readers at once.
			 */
			new = nreaders * RW_READ_LOCK;
			if (ts->ts_waiters > nreaders)
				new |= RW_HAS_WAITERS;
			if (next_writer)
				new |= RW_WRITE_WANTED;
			lp->rw_wwwh = new;
			membar_enter();
			turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
		} else {
			/*
			 * Don't drop the lock -- just transfer ownership
			 * directly to next_writer.  Note that there must
			 * be at least one waiting writer, because we get
			 * here only if (A) the lock is read-locked or
			 * (B) there are no waiting readers.  In case (A),
			 * since the lock is read-locked there would be no
			 * reason for other readers to have blocked unless
			 * the RW_WRITE_WANTED bit was set.  In case (B),
			 * since there are waiters but no waiting readers,
			 * they must all be waiting writers.
			 */
			ASSERT(lp->rw_wwwh & RW_WRITE_WANTED);
			new = RW_WRITE_LOCK(next_writer);
			if (ts->ts_waiters > 1)
				new |= RW_HAS_WAITERS;
			if (next_writer->t_link)
				new |= RW_WRITE_WANTED;
			lp->rw_wwwh = new;
			membar_enter();
			turnstile_wakeup(ts, TS_WRITER_Q, 1, next_writer);
		}
		break;
	}

	if (lock_value == RW_READ_LOCK) {
		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_READER);
	} else {
		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_WRITER);
	}
}

int
rw_tryenter(krwlock_t *rwlp, krw_t rw)
{
	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
	uintptr_t old;

	if (rw != RW_WRITER) {
		uint_t backoff = 0;
		int loop_count = 0;
		for (;;) {
			if ((old = lp->rw_wwwh) & (rw == RW_READER ?
			    RW_WRITE_CLAIMED : RW_WRITE_LOCKED)) {
				return (0);
			}
			if (casip(&lp->rw_wwwh, old, old + RW_READ_LOCK) == old)
				break;
			if (rw_lock_delay != NULL) {
				backoff = rw_lock_backoff(backoff);
				rw_lock_delay(backoff);
				if (++loop_count == ncpus_online) {
					backoff = 0;
					loop_count = 0;
				}
			}
		}
		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
	} else {
		if (casip(&lp->rw_wwwh, 0, RW_WRITE_LOCK(curthread)) != 0)
			return (0);
		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
	}
	ASSERT(rw_locked(lp, rw));
	membar_enter();
	return (1);
}

void
rw_downgrade(krwlock_t *rwlp)
{
	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;

	membar_exit();

	if ((lp->rw_wwwh & RW_OWNER) != (uintptr_t)curthread) {
		rw_panic("rw_downgrade: not owner", lp);
		return;
	}

	if (atomic_add_ip_nv(&lp->rw_wwwh,
	    RW_READ_LOCK - RW_WRITE_LOCK(curthread)) & RW_HAS_WAITERS) {
		turnstile_t *ts = turnstile_lookup(lp);
		int nreaders = rw_readers_to_wake(ts);
		if (nreaders > 0) {
			uintptr_t delta = nreaders * RW_READ_LOCK;
			if (ts->ts_waiters == nreaders)
				delta -= RW_HAS_WAITERS;
			atomic_add_ip(&lp->rw_wwwh, delta);
		}
		turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
	}
	ASSERT(rw_locked(lp, RW_READER));
	LOCKSTAT_RECORD0(LS_RW_DOWNGRADE_DOWNGRADE, lp);
}

int
rw_tryupgrade(krwlock_t *rwlp)
{
	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
	uintptr_t old, new;

	ASSERT(rw_locked(lp, RW_READER));

	do {
		if (((old = lp->rw_wwwh) & ~RW_HAS_WAITERS) != RW_READ_LOCK)
			return (0);
		new = old + RW_WRITE_LOCK(curthread) - RW_READ_LOCK;
	} while (casip(&lp->rw_wwwh, old, new) != old);

	membar_enter();
	LOCKSTAT_RECORD0(LS_RW_TRYUPGRADE_UPGRADE, lp);
	ASSERT(rw_locked(lp, RW_WRITER));
	return (1);
}

int
rw_read_held(krwlock_t *rwlp)
{
	uintptr_t tmp;

	return (_RW_READ_HELD(rwlp, tmp));
}

int
rw_write_held(krwlock_t *rwlp)
{
	return (_RW_WRITE_HELD(rwlp));
}

int
rw_lock_held(krwlock_t *rwlp)
{
	return (_RW_LOCK_HELD(rwlp));
}

/*
 * Like rw_read_held(), but ASSERTs that the lock is currently held
 */
int
rw_read_locked(krwlock_t *rwlp)
{
	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;

	ASSERT(old & RW_LOCKED);
	return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
}

/*
 * Returns non-zero if the lock is either held or desired by a writer
 */
int
rw_iswriter(krwlock_t *rwlp)
{
	return (_RW_ISWRITER(rwlp));
}

kthread_t *
rw_owner(krwlock_t *rwlp)
{
	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;

	return ((old & RW_WRITE_LOCKED) ? (kthread_t *)(old & RW_OWNER) : NULL);
}