Re: [PATCH] locking/percpu_rwsem: Rewrite to not use rwsem

From: Oleg Nesterov
Date: Wed Aug 07 2019 - 05:57:16 EST


On 08/06, Peter Zijlstra wrote:
>
> On Tue, Aug 06, 2019 at 06:17:42PM +0200, Oleg Nesterov wrote:
>
> > but this will also wake all the pending readers up. Every reader will burn
> > CPU for no reason and likely delay the writer.
> >
> > In fact I'm afraid this can lead to live-lock, because every reader in turn
> > will call __percpu_up_read().
>
> I didn't really consider that case important; because of how heavy the
> write side is, it should be relatively rare.

Well yes, but down_read() should not stress the system.

However I was wrong, it is not that bad as I thought, I forgot that the
pending reader won't return from wait_event(sem->block) if another reader
comes.

Still I think we should try to avoid the unnecessary wakeups. See below.

> > How about 2 wait queues?
>
> That said, I can certainly try that.

and either way, with or without 2 queues, what do you think about the code
below?

This way the new reader does wake_up() only in the very unlikely case when
it races with the new writer which sets sem->block = 1 right after
this_cpu_inc().

Oleg.
-------------------------------------------------------------------------------

static inline void percpu_down_read(struct percpu_rw_semaphore *sem)
{
might_sleep();
rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

preempt_disable();

if (likely(rcu_sync_is_idle(&sem->rss)))
__this_cpu_inc(*sem->read_count);
else
__percpu_down_read(sem, false);

preempt_enable();
}

static inline void percpu_up_read(struct percpu_rw_semaphore *sem)
{
rwsem_release(&sem->dep_map, 1, _RET_IP_);

preempt_disable();

if (likely(rcu_sync_is_idle(&sem->rss)))
__this_cpu_dec(*sem->read_count);
else
__percpu_up_read(sem);

preempt_enable();
}

// both called and return with preemption disabled

bool __percpu_down_read(struct percpu_rw_semaphore *sem, bool try)
{

if (atomic_read_acquire(&sem->block)) {
again:
preempt_enable();
__wait_event(sem->waiters, !atomic_read_acquire(&sem->block));
preempt_disable();
}

__this_cpu_inc(*sem->read_count);

smp_mb();

if (likely(!atomic_read_acquire(&sem->block)))
return true;

__percpu_up_read(sem);

if (try)
return false;

goto again;
}

void __percpu_up_read(struct percpu_rw_semaphore *sem)
{
smp_mb();

__this_cpu_dec(*sem->read_count);

wake_up(&sem->waiters);
}