Re: [PATCH v4 14/16] locking/rwsem: Guard against making count negative

From: Waiman Long
Date: Tue Apr 23 2019 - 10:31:50 EST


On 4/23/19 10:17 AM, Peter Zijlstra wrote:
> On Sun, Apr 21, 2019 at 05:07:56PM -0400, Waiman Long wrote:
>
>> How about the following chunks to disable preemption temporarily for the
>> increment-check-decrement sequence?
>>
>> diff --git a/include/linux/preempt.h b/include/linux/preempt.h
>> index dd92b1a93919..4cc03ac66e13 100644
>> --- a/include/linux/preempt.h
>> +++ b/include/linux/preempt.h
>> @@ -250,6 +250,8 @@ do { \
>> Â#define preempt_enable_notrace()ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ barrier()
>> Â#define preemptible()ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ 0
>> Â
>> +#define __preempt_disable_nop /* preempt_disable() is nop */
>> +
>> Â#endif /* CONFIG_PREEMPT_COUNT */
>> Â
>> Â#ifdef MODULE
>> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
>> index 043fd29b7534..54029e6af17b 100644
>> --- a/kernel/locking/rwsem.c
>> +++ b/kernel/locking/rwsem.c
>> @@ -256,11 +256,64 @@ static inline struct task_struct
>> *rwsem_get_owner(struct r
>> ÂÂÂÂÂÂÂ return (struct task_struct *) (cowner
>> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ ? cowner | (sowner & RWSEM_NONSPINNABLE) : sowner);
>> Â}
>> +
>> +/*
>> + * If __preempt_disable_nop is defined, calling preempt_disable() and
>> + * preempt_enable() directly is the most efficient way. Otherwise, it may
>> + * be more efficient to disable and enable interrupt instead for disabling
>> + * preemption tempoarily.
>> + */
>> +#ifdef __preempt_disable_nop
>> +#define disable_preemption()ÂÂ preempt_disable()
>> +#define enable_preemption()ÂÂÂ preempt_enable()
>> +#else
>> +#define disable_preemption()ÂÂ local_irq_disable()
>> +#define enable_preemption()ÂÂÂ local_irq_enable()
>> +#endif
> I'm not aware of an architecture where disabling interrupts is faster
> than disabling preemption.

I have actually done some performance test measuring the effects of
disabling interrupt and preemption on readers (on x86-64 system).

 Threads Before patch Disable irq Disable preemption
 ------- ------------ ----------- ------------------
ÂÂÂÂ 1ÂÂÂÂÂÂÂÂÂ 9,088ÂÂÂÂÂÂÂÂÂ 8,766ÂÂÂÂÂÂÂÂÂÂ 9,172
ÂÂÂÂ 2ÂÂÂÂÂÂÂÂÂ 9,296ÂÂÂÂÂÂÂÂÂ 9,169ÂÂÂÂÂÂÂÂÂÂ 8,707
ÂÂÂÂ 4ÂÂÂÂÂÂÂÂ 11,192ÂÂÂÂÂÂÂÂ 11,205ÂÂÂÂÂÂÂÂÂ 10,712
ÂÂÂÂ 8ÂÂÂÂÂÂÂÂ 11,329ÂÂÂÂÂÂÂÂ 11,332ÂÂÂÂÂÂÂÂÂ 11,213

For uncontended case, disable interrupt is slower. The slowdown is gone
once the rwsem becomes contended. So it may not be a good idea to
disable interrupt as a proxy of disabling preemption.

BTW, preemption count is not enabled in typical distro production
kernels like RHEL. So preempt_disable() is just a barrier. It is turned
on in the debug kernel, though.


>> +/*
>> + * When the owner task structure pointer is merged into couunt, less bits
>> + * will be available for readers. Therefore, there is a very slight chance
>> + * that the reader count may overflow. We try to prevent that from
>> happening
>> + * by checking for the MS bit of the count and failing the trylock attempt
>> + * if this bit is set.
>> + *
>> + * With preemption enabled, there is a remote possibility that preemption
>> + * can happen in the narrow timing window between incrementing and
>> + * decrementing the reader count and the task is put to sleep for a
>> + * considerable amount of time. If sufficient number of such unfortunate
>> + * sequence of events happen, we may still overflow the reader count.
>> + * To avoid such possibility, we have to disable preemption for the
>> + * whole increment-check-decrement sequence.
>> + *
>> + * The function returns true if there are too many readers and the count
>> + * has already been properly decremented so the reader must go directly
>> + * into the wait list.
>> + */
>> +static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt)
>> +{
>> +ÂÂÂÂÂÂ bool wait = false;ÂÂÂÂÂ /* Wait now flag */
>> +
>> +ÂÂÂÂÂÂ disable_preemption();
>> +ÂÂÂÂÂÂ *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
>> &sem->count);
>> +ÂÂÂÂÂÂ if (unlikely(*cnt < 0)) {
>> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
>> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ wait = true;
>> +ÂÂÂÂÂÂ }
>> +ÂÂÂÂÂÂ enable_preemption();
>> +ÂÂÂÂÂÂ return wait;
>> +}
>> Â#else /* !CONFIG_RWSEM_OWNER_COUNT */
> This also means you have to ensure CONFIG_NR_CPUS < 32K for
> RWSEM_OWNER_COUNT.


Yes, that can be done.


>
>> Âstatic inline struct task_struct *rwsem_get_owner(struct rw_semaphore *sem)
>> Â{
>> ÂÂÂÂÂÂÂ return READ_ONCE(sem->owner);
>> Â}
>> +
>> +static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt)
>> +{
>> +ÂÂÂÂÂÂ *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
>> &sem->count);
>> +ÂÂÂÂÂÂ return false;
>> +}
>> Â#endif /* CONFIG_RWSEM_OWNER_COUNT */
>> Â
>> Â/*
>> @@ -981,32 +1034,18 @@ static inline void clear_wr_nonspinnable(struct
>> rw_semaph
>> Â * Wait for the read lock to be granted
>> Â */
>> Âstatic struct rw_semaphore __sched *
>> -rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, long count)
>> +rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, const
>> bool wait)
>> Â{
>> -ÂÂÂÂÂÂ long adjustment = -RWSEM_READER_BIAS;
>> +ÂÂÂÂÂÂ long count, adjustment = -RWSEM_READER_BIAS;
>> ÂÂÂÂÂÂÂ bool wake = false;
>> ÂÂÂÂÂÂÂ struct rwsem_waiter waiter;
>> ÂÂÂÂÂÂÂ DEFINE_WAKE_Q(wake_q);
>> Â
>> -ÂÂÂÂÂÂ if (unlikely(count < 0)) {
>> +ÂÂÂÂÂÂ if (unlikely(wait)) {
>> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ /*
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * The sign bit has been set meaning that too many active
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * readers are present. We need to decrement reader count &
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * enter wait queue immediately to avoid overflowing the
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * reader count.
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ *
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * As preemption is not disabled, there is a remote
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * possibility that preemption can happen in the narrow
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * timing window between incrementing and decrementing
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * the reader count and the task is put to sleep for a
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * considerable amount of time. If sufficient number
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * of such unfortunate sequence of events happen, we
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * may still overflow the reader count. It is extremely
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * unlikey, though. If this is a concern, we should consider
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * disable preemption during this timing window to make
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * sure that such unfortunate event will not happen.
>> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * The reader count has already been decremented and the
>> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * reader should go directly into the wait list now.
>> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ */
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
>> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ adjustment = 0;
>> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ goto queue;
>> ÂÂÂÂÂÂÂ }
>> @@ -1358,11 +1397,12 @@ static struct rw_semaphore
>> *rwsem_downgrade_wake(struct
>> Â */
>> Âinline void __down_read(struct rw_semaphore *sem)
>> Â{
>> -ÂÂÂÂÂÂ long tmp = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ &sem->count);
>> +ÂÂÂÂÂÂ long tmp;
>> +ÂÂÂÂÂÂ bool wait;
>> Â
>> +ÂÂÂÂÂÂ wait = rwsem_read_trylock(sem, &tmp);
>> ÂÂÂÂÂÂÂ if (unlikely(tmp & RWSEM_READ_FAILED_MASK)) {
>> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, tmp);
>> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, wait);
>> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
>> ÂÂÂÂÂÂÂ } else {
>> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ rwsem_set_reader_owned(sem);
> I think I prefer that function returning/taking the bias/adjustment
> value instead of a bool, if it is all the same.

Sure, I can do that.

Cheers,
Longman