[PATCH 23/23] locking/rwsem: Make MSbit of count as guard bit to fail readlock

From: Waiman Long
Date: Fri Feb 08 2019 - 08:58:10 EST


With the merging of owner into count for x86-64, there is only 16 bits
left for reader count. It is theoretically possible for an application to
cause more than 64k readers to acquire a rwsem leading to count overflow.

To prevent this dire situation, the most significant bit of the count
is now treated as a guard bit (RWSEM_FLAG_READFAIL). Read-lock will now
fails for both the fast and optimistic spinning paths whenever this bit
is set. So all those extra readers will be put to sleep in the wait
queue. Wakeup will not happen until the reader count reaches 0.

A limit of 256 is also imposed on the number of readers that can be woken
up in one wakeup function call. This will eliminate the possibility of
waking up more than 64k readers and overflowing the count.

Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/lock_events_list.h | 1 +
kernel/locking/rwsem-xadd.c | 40 ++++++++++++++++++++++++++++++++------
kernel/locking/rwsem-xadd.h | 41 ++++++++++++++++++++++++++-------------
3 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/kernel/locking/lock_events_list.h b/kernel/locking/lock_events_list.h
index 0052534..9ecdeac 100644
--- a/kernel/locking/lock_events_list.h
+++ b/kernel/locking/lock_events_list.h
@@ -60,6 +60,7 @@
LOCK_EVENT(rwsem_opt_rlock) /* # of read locks opt-spin acquired */
LOCK_EVENT(rwsem_opt_wlock) /* # of write locks opt-spin acquired */
LOCK_EVENT(rwsem_opt_fail) /* # of failed opt-spinnings */
+LOCK_EVENT(rwsem_opt_rfail) /* # of failed reader-owned readlocks */
LOCK_EVENT(rwsem_opt_nospin) /* # of disabled reader opt-spinnings */
LOCK_EVENT(rwsem_rlock) /* # of read locks acquired */
LOCK_EVENT(rwsem_rlock_fast) /* # of fast read locks acquired */
diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index 213c2aa..a993055 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -110,6 +110,8 @@ enum rwsem_wake_type {
# define RWSEM_RSPIN_MAX (1 << 12)
#endif

+#define MAX_READERS_WAKEUP 0x100
+
/*
* handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -208,6 +210,12 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
* after setting the reader waiter to nil.
*/
wake_q_add_safe(wake_q, tsk);
+
+ /*
+ * Limit # of readers that can be woken up per wakeup call.
+ */
+ if (woken >= MAX_READERS_WAKEUP)
+ break;
}

adjustment = woken * RWSEM_READER_BIAS - adjustment;
@@ -445,6 +453,16 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock)
break;

/*
+ * If a reader cannot acquire a reader-owned lock, we
+ * have to quit. It is either the handoff bit just got
+ * set or (unlikely) readfail bit is somehow set.
+ */
+ if (unlikely(!wlock && (owner_state == OWNER_READER))) {
+ lockevent_inc(rwsem_opt_rfail);
+ break;
+ }
+
+ /*
* An RT task cannot do optimistic spinning if it cannot
* be sure the lock holder is running. When there's no owner
* or is reader-owned, an RT task has to stop spinning or
@@ -526,12 +544,22 @@ static inline bool rwsem_optimistic_spin(struct rw_semaphore *sem,
* Wait for the read lock to be granted
*/
static inline struct rw_semaphore __sched *
-__rwsem_down_read_failed_common(struct rw_semaphore *sem, int state)
+__rwsem_down_read_failed_common(struct rw_semaphore *sem, int state, long count)
{
- long count, adjustment = -RWSEM_READER_BIAS;
+ long adjustment = -RWSEM_READER_BIAS;
struct rwsem_waiter waiter;
DEFINE_WAKE_Q(wake_q);

+ if (unlikely(count < 0)) {
+ /*
+ * Too many active readers, decrement count &
+ * enter the wait queue.
+ */
+ atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
+ adjustment = 0;
+ goto queue;
+ }
+
if (!rwsem_can_spin_on_owner(sem))
goto queue;

@@ -635,16 +663,16 @@ static inline bool rwsem_optimistic_spin(struct rw_semaphore *sem,
}

__visible struct rw_semaphore * __sched
-rwsem_down_read_failed(struct rw_semaphore *sem)
+rwsem_down_read_failed(struct rw_semaphore *sem, long cnt)
{
- return __rwsem_down_read_failed_common(sem, TASK_UNINTERRUPTIBLE);
+ return __rwsem_down_read_failed_common(sem, TASK_UNINTERRUPTIBLE, cnt);
}
EXPORT_SYMBOL(rwsem_down_read_failed);

__visible struct rw_semaphore * __sched
-rwsem_down_read_failed_killable(struct rw_semaphore *sem)
+rwsem_down_read_failed_killable(struct rw_semaphore *sem, long cnt)
{
- return __rwsem_down_read_failed_common(sem, TASK_KILLABLE);
+ return __rwsem_down_read_failed_common(sem, TASK_KILLABLE, cnt);
}
EXPORT_SYMBOL(rwsem_down_read_failed_killable);

diff --git a/kernel/locking/rwsem-xadd.h b/kernel/locking/rwsem-xadd.h
index be67dbd..72308b7 100644
--- a/kernel/locking/rwsem-xadd.h
+++ b/kernel/locking/rwsem-xadd.h
@@ -63,7 +63,8 @@
* Bit 0 - waiters present bit
* Bit 1 - lock handoff bit
* Bits 2-47 - compressed task structure pointer
- * Bits 48-63 - 16-bit reader counts
+ * Bits 48-62 - 15-bit reader counts
+ * Bit 63 - read fail bit
*
* On other 64-bit architectures, the bit definitions are:
*
@@ -71,7 +72,8 @@
* Bit 1 - lock handoff bit
* Bits 2-6 - reserved
* Bit 7 - writer lock bit
- * Bits 8-63 - 56-bit reader counts
+ * Bits 8-62 - 55-bit reader counts
+ * Bit 63 - read fail bit
*
* On 32-bit architectures, the bit definitions of the count are:
*
@@ -79,13 +81,15 @@
* Bit 1 - lock handoff bit
* Bits 2-6 - reserved
* Bit 7 - writer lock bit
- * Bits 8-31 - 24-bit reader counts
+ * Bits 8-30 - 23-bit reader counts
+ * Bit 32 - read fail bit
*
* atomic_long_fetch_add() is used to obtain reader lock, whereas
* atomic_long_cmpxchg() will be used to obtain writer lock.
*/
#define RWSEM_FLAG_WAITERS (1UL << 0)
#define RWSEM_FLAG_HANDOFF (1UL << 1)
+#define RWSEM_FLAG_READFAIL (1UL << (BITS_PER_LONG - 1))

#ifdef CONFIG_X86_64

@@ -108,7 +112,7 @@
#define RWSEM_READER_MASK (~(RWSEM_READER_BIAS - 1))
#define RWSEM_LOCK_MASK (RWSEM_WRITER_MASK|RWSEM_READER_MASK)
#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
- RWSEM_FLAG_HANDOFF)
+ RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL)

#define RWSEM_COUNT_LOCKED(c) ((c) & RWSEM_LOCK_MASK)
#define RWSEM_COUNT_WLOCKED(c) ((c) & RWSEM_WRITER_MASK)
@@ -302,10 +306,15 @@ static inline void rwsem_clear_reader_owned(struct rw_semaphore *sem)
}
#endif

-extern struct rw_semaphore *rwsem_down_read_failed(struct rw_semaphore *sem);
-extern struct rw_semaphore *rwsem_down_read_failed_killable(struct rw_semaphore *sem);
-extern struct rw_semaphore *rwsem_down_write_failed(struct rw_semaphore *sem);
-extern struct rw_semaphore *rwsem_down_write_failed_killable(struct rw_semaphore *sem);
+extern struct rw_semaphore *
+rwsem_down_read_failed(struct rw_semaphore *sem, long count);
+extern struct rw_semaphore *
+rwsem_down_read_failed_killable(struct rw_semaphore *sem, long count);
+extern struct rw_semaphore *
+rwsem_down_write_failed(struct rw_semaphore *sem);
+extern struct rw_semaphore *
+rwsem_down_write_failed_killable(struct rw_semaphore *sem);
+
extern struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count);
extern struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem);

@@ -314,9 +323,11 @@ static inline void rwsem_clear_reader_owned(struct rw_semaphore *sem)
*/
static inline void __down_read(struct rw_semaphore *sem)
{
- if (unlikely(atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
- &sem->count) & RWSEM_READ_FAILED_MASK)) {
- rwsem_down_read_failed(sem);
+ long count = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
+ &sem->count);
+
+ if (unlikely(count & RWSEM_READ_FAILED_MASK)) {
+ rwsem_down_read_failed(sem, count);
DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
} else {
rwsem_set_reader_owned(sem);
@@ -325,9 +336,11 @@ static inline void __down_read(struct rw_semaphore *sem)

static inline int __down_read_killable(struct rw_semaphore *sem)
{
- if (unlikely(atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
- &sem->count) & RWSEM_READ_FAILED_MASK)) {
- if (IS_ERR(rwsem_down_read_failed_killable(sem)))
+ long count = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
+ &sem->count);
+
+ if (unlikely(count & RWSEM_READ_FAILED_MASK)) {
+ if (IS_ERR(rwsem_down_read_failed_killable(sem, count)))
return -EINTR;
DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
} else {
--
1.8.3.1


--------------044906BC29EDC2231BFC9A7F--