Re: [PATCH v4 14/16] locking/rwsem: Guard against making count negative

From: Peter Zijlstra
Date: Tue Apr 23 2019 - 10:17:27 EST


On Sun, Apr 21, 2019 at 05:07:56PM -0400, Waiman Long wrote:

> How about the following chunks to disable preemption temporarily for the
> increment-check-decrement sequence?
>
> diff --git a/include/linux/preempt.h b/include/linux/preempt.h
> index dd92b1a93919..4cc03ac66e13 100644
> --- a/include/linux/preempt.h
> +++ b/include/linux/preempt.h
> @@ -250,6 +250,8 @@ do { \
>  #define preempt_enable_notrace()               barrier()
>  #define preemptible()                          0
>  
> +#define __preempt_disable_nop  /* preempt_disable() is nop */
> +
>  #endif /* CONFIG_PREEMPT_COUNT */
>  
>  #ifdef MODULE
> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
> index 043fd29b7534..54029e6af17b 100644
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -256,11 +256,64 @@ static inline struct task_struct
> *rwsem_get_owner(struct r
>         return (struct task_struct *) (cowner
>                 ? cowner | (sowner & RWSEM_NONSPINNABLE) : sowner);
>  }
> +
> +/*
> + * If __preempt_disable_nop is defined, calling preempt_disable() and
> + * preempt_enable() directly is the most efficient way. Otherwise, it may
> + * be more efficient to disable and enable interrupt instead for disabling
> + * preemption tempoarily.
> + */
> +#ifdef __preempt_disable_nop
> +#define disable_preemption()   preempt_disable()
> +#define enable_preemption()    preempt_enable()
> +#else
> +#define disable_preemption()   local_irq_disable()
> +#define enable_preemption()    local_irq_enable()
> +#endif

I'm not aware of an architecture where disabling interrupts is faster
than disabling preemption.

> +/*
> + * When the owner task structure pointer is merged into couunt, less bits
> + * will be available for readers. Therefore, there is a very slight chance
> + * that the reader count may overflow. We try to prevent that from
> happening
> + * by checking for the MS bit of the count and failing the trylock attempt
> + * if this bit is set.
> + *
> + * With preemption enabled, there is a remote possibility that preemption
> + * can happen in the narrow timing window between incrementing and
> + * decrementing the reader count and the task is put to sleep for a
> + * considerable amount of time. If sufficient number of such unfortunate
> + * sequence of events happen, we may still overflow the reader count.
> + * To avoid such possibility, we have to disable preemption for the
> + * whole increment-check-decrement sequence.
> + *
> + * The function returns true if there are too many readers and the count
> + * has already been properly decremented so the reader must go directly
> + * into the wait list.
> + */
> +static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt)
> +{
> +       bool wait = false;      /* Wait now flag */
> +
> +       disable_preemption();
> +       *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
> &sem->count);
> +       if (unlikely(*cnt < 0)) {
> +               atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
> +               wait = true;
> +       }
> +       enable_preemption();
> +       return wait;
> +}
>  #else /* !CONFIG_RWSEM_OWNER_COUNT */

This also means you have to ensure CONFIG_NR_CPUS < 32K for
RWSEM_OWNER_COUNT.

>  static inline struct task_struct *rwsem_get_owner(struct rw_semaphore *sem)
>  {
>         return READ_ONCE(sem->owner);
>  }
> +
> +static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt)
> +{
> +       *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
> &sem->count);
> +       return false;
> +}
>  #endif /* CONFIG_RWSEM_OWNER_COUNT */
>  
>  /*
> @@ -981,32 +1034,18 @@ static inline void clear_wr_nonspinnable(struct
> rw_semaph
>   * Wait for the read lock to be granted
>   */
>  static struct rw_semaphore __sched *
> -rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, long count)
> +rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, const
> bool wait)
>  {
> -       long adjustment = -RWSEM_READER_BIAS;
> +       long count, adjustment = -RWSEM_READER_BIAS;
>         bool wake = false;
>         struct rwsem_waiter waiter;
>         DEFINE_WAKE_Q(wake_q);
>  
> -       if (unlikely(count < 0)) {
> +       if (unlikely(wait)) {
>                 /*
> -                * The sign bit has been set meaning that too many active
> -                * readers are present. We need to decrement reader count &
> -                * enter wait queue immediately to avoid overflowing the
> -                * reader count.
> -                *
> -                * As preemption is not disabled, there is a remote
> -                * possibility that preemption can happen in the narrow
> -                * timing window between incrementing and decrementing
> -                * the reader count and the task is put to sleep for a
> -                * considerable amount of time. If sufficient number
> -                * of such unfortunate sequence of events happen, we
> -                * may still overflow the reader count. It is extremely
> -                * unlikey, though. If this is a concern, we should consider
> -                * disable preemption during this timing window to make
> -                * sure that such unfortunate event will not happen.
> +                * The reader count has already been decremented and the
> +                * reader should go directly into the wait list now.
>                  */
> -               atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
>                 adjustment = 0;
>                 goto queue;
>         }
> @@ -1358,11 +1397,12 @@ static struct rw_semaphore
> *rwsem_downgrade_wake(struct
>   */
>  inline void __down_read(struct rw_semaphore *sem)
>  {
> -       long tmp = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
> -                                                &sem->count);
> +       long tmp;
> +       bool wait;
>  
> +       wait = rwsem_read_trylock(sem, &tmp);
>         if (unlikely(tmp & RWSEM_READ_FAILED_MASK)) {
> -               rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, tmp);
> +               rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, wait);
>                 DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
>         } else {
>                 rwsem_set_reader_owned(sem);

I think I prefer that function returning/taking the bias/adjustment
value instead of a bool, if it is all the same.