Re: [PATCH] locking/percpu_rwsem: Rewrite to not use rwsem
From: Oleg Nesterov
Date: Wed Aug 07 2019 - 05:57:16 EST
On 08/06, Peter Zijlstra wrote:
>
> On Tue, Aug 06, 2019 at 06:17:42PM +0200, Oleg Nesterov wrote:
>
> > but this will also wake all the pending readers up. Every reader will burn
> > CPU for no reason and likely delay the writer.
> >
> > In fact I'm afraid this can lead to live-lock, because every reader in turn
> > will call __percpu_up_read().
>
> I didn't really consider that case important; because of how heavy the
> write side is, it should be relatively rare.
Well yes, but down_read() should not stress the system.
However I was wrong, it is not that bad as I thought, I forgot that the
pending reader won't return from wait_event(sem->block) if another reader
comes.
Still I think we should try to avoid the unnecessary wakeups. See below.
> > How about 2 wait queues?
>
> That said, I can certainly try that.
and either way, with or without 2 queues, what do you think about the code
below?
This way the new reader does wake_up() only in the very unlikely case when
it races with the new writer which sets sem->block = 1 right after
this_cpu_inc().
Oleg.
-------------------------------------------------------------------------------
static inline void percpu_down_read(struct percpu_rw_semaphore *sem)
{
might_sleep();
rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
preempt_disable();
if (likely(rcu_sync_is_idle(&sem->rss)))
__this_cpu_inc(*sem->read_count);
else
__percpu_down_read(sem, false);
preempt_enable();
}
static inline void percpu_up_read(struct percpu_rw_semaphore *sem)
{
rwsem_release(&sem->dep_map, 1, _RET_IP_);
preempt_disable();
if (likely(rcu_sync_is_idle(&sem->rss)))
__this_cpu_dec(*sem->read_count);
else
__percpu_up_read(sem);
preempt_enable();
}
// both called and return with preemption disabled
bool __percpu_down_read(struct percpu_rw_semaphore *sem, bool try)
{
if (atomic_read_acquire(&sem->block)) {
again:
preempt_enable();
__wait_event(sem->waiters, !atomic_read_acquire(&sem->block));
preempt_disable();
}
__this_cpu_inc(*sem->read_count);
smp_mb();
if (likely(!atomic_read_acquire(&sem->block)))
return true;
__percpu_up_read(sem);
if (try)
return false;
goto again;
}
void __percpu_up_read(struct percpu_rw_semaphore *sem)
{
smp_mb();
__this_cpu_dec(*sem->read_count);
wake_up(&sem->waiters);
}