Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCUimplementation

From: Lai Jiangshan
Date: Mon Aug 30 2010 - 23:02:26 EST


On 08/31/2010 07:57 AM, Paul E. McKenney wrote:
> On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote:
>>
>> 0) contents
>> 1) overview
>> 2) scalable
>> 3) multi-GP
>> 4) integrated expedited synchronize_rcu
>> 5) preemptible
>> 6) top-down implies
>> 7) speech
>> 8) core rcuring
>
> Interesting! I can't claim to have dug through every detail, but figured
> I should ask a few questions first.
>
> Thanx, Paul
>
> Of course, the first question is whether it passes rcutorture, and if
> so on what hardware configuration.

Only in x86(32bit and 64bit) system.
Did you patch it in power and test it? It' need more test for different archs,
but I have no other arch.

What is "hardware configuration"?

>
> At first glance, it looks to live somewhere between Manfred Spraul's
> counter-based hierarchical RCU and classic RCU. There are also some
> resemblances to K42's "generations" RCU-like mechanism.
>
>> 1) overview
>> This is a new RCU implementation: RCURING. It uses a ring atomic counters
>> to control the completion of GPs and a ring callbacks batches to process
>> the callbacks.
>
> The ring is a set of grace periods, correct?

correct. A GP is also controled by a set ring counters in
(domain's done_complete, this GP's complete], this GP is only completed
until all these conters become zero.

I pull up a question of your here:
> For a given grace period, all quiescent states hit the same counter.
> Or am I missing something?

No.
any rcu read site has a locked_complete, if a cpu hold a locked_complete,
the counter of this locked_complete will not become zero.

Any quiescent states will release its previous rcu read site's locked_complete,
so different QS may hit different counter.
QS alway locks the newest GP(not started to waiting, domain's curr_complete)
for next rcu read site and get the new locked_complete.

What I said seems still indistinct, more questions are wellcome, questions
also help for documents. Thank you very much.

>
>> In 2008, I think out the idea, and wrote the first 300lines code of rcuring.c,
>> but I'm too lazy to finish it
>>
>> 2) scalable
>> There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt),
>> one of them is for misc things, it is a infrequency lock, the other is
>> for advancing complete, it is frequency lock. But somebody else would
>> very probably do the same work for me if somebody else is holding this lock,
>> so we alway use spin_trylock() for this lock, so it is scalable.
>>
>> We don't need any lock for reporting quiescent state, we just need 2
>> atomic operations for every RCU domain.
>
> If you have a single GP in flight, all CPUs need to do an atomic op on the
> same element of the RCU ring, correct?

Yes, but it seldom have only a single GP in flight. We start GPs when need.

> My first thought was that each CPU
> was writing the current grace-period number to its own variable, but that
> would not require an atomic operation. Of course, the downside of this
> latter approach is that some CPU would need to scan all CPUs' counters.
>
>> 3) multi-GP
>> All old rcu implementations have only one waiting GP.
>>
>> time -----ta-tb--------------------tc----------------td------------->
>> GP -----|------GP1---------------|--------GP2------|------------->
>> cpu#x ........|---------------------------------------|...............>
>> |->insert a callback |->handle callback
>>
>> the callback have to wait a partial GP1, and a full GP2, it may wait
>> near 2 GPs in worse case because it have to wait the rcu read sites witch
>> start at (tb, tc] which are needless to wait. In average, a callback
>> have to wait (1 + 1/(1+1))= 1.5GPs
>>
>> if we can have three simultaneous waiting GPs:
>>
>> GPs .....|------GP1---------------|..........................
>> ...........|--------GP2-----------|....................
>> ....................|---------GP3---------------|........
>> cpu#x ........|-------------------------|..........................>
>> |->insert a callback |->handle callback
>> The callback is handled more quickly, In average, a callback have to wait
>> (1 + 1/(3+1))=1.25GPs
>>
>> if we can have X simultaneous waiting GPs, a callback have to wait
>> (1 + 1/(X+1)) GPs in average. The default X in this patch is 15
>> (including the reserved GPs).
>
> Assuming that the start times of the GPs are evenly spaced, correct?

No, We start GPs when need.
If a cpu has queued some callbacks, and the callbacks's
complete is equals to rcu domain's curr_complete, this cpu will
start a new GPs for these callbacks.

>
>> 4) integrated expedited synchronize_rcu
>> expedited synchronize_rcu is implemented in rcuring.c, not in other files.
>> The whole of it is less than 50 lines of code. we just reserve several
>> GPs for expedited synchronize_rcu, when a expedited callback is inserted,
>> we start a new GP immediately(we have reserved GPs, this step will
>> very probably success), and then force this new GP to be completed quickly.
>> (thanks to multi-GP)
>
> I would be more convinced had you implemented force_quiescent_state()
> for preemptible RCU. ;-)
>
>> 5) preemptible
>> new simple preemptible code, rcu_read_lock/unlock() is simple and is inline function.
>> add new kernel-offset.c to avoid recursively header files inclusion.
>>
>> 6) top-down implies
>> irq and local_irq_disable() implies preempt_disable()/rcu_read_lock_sched(),
>> irq and local_irq_disable() implies rcu_read_lock(). (*)
>> preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*)
>> *: In preemptible rcutree/rcutiny, it is false.
>>
>> 7) speech
>> I will give a speech about RCU and this RCU implementation
>> in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010.
>
> Congratulations!
>
>> 8) core rcuring
>> (Comments and document is still been writing)
>>
>> all read_read_lock/unlock_domain() is nested in a coresponding
>> rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock()
>> ciritcal region as a full rcu read site ciritcal region.
>>
>> read site:
>> rcuring_lock()
>> lock a complete, and ensure this locked_complete - done_complete(1) > 0
>> curr_complete(1) - locked_complete >= 0
>> mb(1)
>
> This is really a compiler barrier?

a really memory barrier.
these memory barriers are not in rcu_read_lock(), there in the code where
pass QS.

>
>>
>> rcu_derefrence(ptr)
>>
>> rcuring_unlock()
>> mb(2)
>
> Ditto?
>
>> release its locked_complete.
>>
>> update site:
>> rcu_assign_pointer(ptr, new)
>>
>> (handle callback:call_rcu()+rcu_do_batch())
>> (call_rcu():)
>> mb(3)
>> get the curr_complete(2)
>> set callback's complete as this curr_complete(2).
>> (we don't have any field to record the callback's complete, we just
>> emulate it by rdp->done_complete and the position of this callback
>> in rdp->batch.)
>>
>> time pass... until done_complete(2) - callback's complete > 0
>
> I have to ask about counter overflow on 32-bit systems.

The "complete" is "long" not "unsigned long"

if callback's complete - rdp->curr_complete > 0, we also queue
this callback to done_batch.

>
>> (rcu_do_batch():)
>> mb(4)
>> call the callback: (free old ptr, etc.)
>>
>> Signed-off-by: Lai Jiangshan <laijs@xxxxxxxxxxxxxx>
>> ---
>> Kbuild | 75 ++-
>> include/linux/rcupdate.h | 6
>> include/linux/rcuring.h | 195 +++++++++
>> include/linux/sched.h | 6
>> init/Kconfig | 32 +
>> kernel/Makefile | 1
>> kernel/kernel-offsets.c | 20
>> kernel/rcuring.c | 1002 +++++++++++++++++++++++++++++++++++++++++++++++
>> kernel/sched.c | 2
>> kernel/time/tick-sched.c | 4
>> 10 files changed, 1324 insertions(+), 19 deletions(-)
>> diff --git a/Kbuild b/Kbuild
>> index e3737ad..bce37cb 100644
>> --- a/Kbuild
>> +++ b/Kbuild
>> @@ -3,7 +3,19 @@
>> # This file takes care of the following:
>> # 1) Generate bounds.h
>> # 2) Generate asm-offsets.h (may need bounds.h)
>> -# 3) Check for missing system calls
>> +# 3) Generate kernel-offsets.h
>> +# 4) Check for missing system calls
>> +
>> +# Default sed regexp - multiline due to syntax constraints
>> +define sed-y
>> + "/^->/{s:->#\(.*\):/* \1 */:; \
>> + s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
>> + s:->::; p;}"
>> +endef
>> +
>> +quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag) $@
>> +cmd_offsets_cc_s_c = $(CC) -D__GENARATING_OFFSETS__ \
>
> s/GENARATING/GENERATING/ just to be pedantic. ;-)
>
>> + $(c_flags) -fverbose-asm -S -o $@ $<
>
> Pulling out the offsets might have the advantage of allowing both
> rcu_read_lock() and rcu_read_unlock() to be inlined, but without pulling
> in include/linux/sched.h everywhere. I had been thinking in terms of
> moving the state from the task struct to the taskinfo struct, but
> this approach might be worth considering.
>
>> #####
>> # 1) Generate bounds.h
>> @@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild
>> # 2) Generate asm-offsets.h
>> #
>>
>> -offsets-file := include/generated/asm-offsets.h
>> +asm-offsets-file := include/generated/asm-offsets.h
>>
>> -always += $(offsets-file)
>> -targets += $(offsets-file)
>> +always += $(asm-offsets-file)
>> +targets += $(asm-offsets-file)
>> targets += arch/$(SRCARCH)/kernel/asm-offsets.s
>>
>> -
>> -# Default sed regexp - multiline due to syntax constraints
>> -define sed-y
>> - "/^->/{s:->#\(.*\):/* \1 */:; \
>> - s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
>> - s:->::; p;}"
>> -endef
>> -
>> -quiet_cmd_offsets = GEN $@
>> -define cmd_offsets
>> +quiet_cmd_asm_offsets = GEN $@
>> +define cmd_asm_offsets
>> (set -e; \
>> echo "#ifndef __ASM_OFFSETS_H__"; \
>> echo "#define __ASM_OFFSETS_H__"; \
>> @@ -78,13 +82,48 @@ endef
>> arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c \
>> $(obj)/$(bounds-file) FORCE
>> $(Q)mkdir -p $(dir $@)
>> - $(call if_changed_dep,cc_s_c)
>> + $(call if_changed_dep,offsets_cc_s_c)
>> +
>> +$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
>> + $(call cmd,asm_offsets)
>> +
>> +#####
>> +# 3) Generate kernel-offsets.h
>> +#
>> +
>> +kernel-offsets-file := include/generated/kernel-offsets.h
>> +
>> +always += $(kernel-offsets-file)
>> +targets += $(kernel-offsets-file)
>> +targets += kernel/kernel-offsets.s
>> +
>> +quiet_cmd_kernel_offsets = GEN $@
>> +define cmd_kernel_offsets
>> + (set -e; \
>> + echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \
>> + echo "#define __LINUX_KERNEL_OFFSETS_H__"; \
>> + echo "/*"; \
>> + echo " * DO NOT MODIFY."; \
>> + echo " *"; \
>> + echo " * This file was generated by Kbuild"; \
>> + echo " *"; \
>> + echo " */"; \
>> + echo ""; \
>> + sed -ne $(sed-y) $<; \
>> + echo ""; \
>> + echo "#endif" ) > $@
>> +endef
>> +
>> +# We use internal kbuild rules to avoid the "is up to date" message from make
>> +kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \
>> + $(obj)/$(asm-offsets-file) FORCE
>> + $(call if_changed_dep,offsets_cc_s_c)
>>
>> -$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
>> - $(call cmd,offsets)
>> +$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild
>> + $(call cmd,kernel_offsets)
>>
>> #####
>> -# 3) Check for missing system calls
>> +# 4) Check for missing system calls
>> #
>>
>> quiet_cmd_syscalls = CALL $<
>> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
>> index 89414d6..42cd6bc 100644
>> --- a/include/linux/rcupdate.h
>> +++ b/include/linux/rcupdate.h
>> @@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu);
>> extern void rcu_check_callbacks(int cpu, int user);
>> struct notifier_block;
>>
>> +#ifndef CONFIG_RCURING
>> #ifdef CONFIG_NO_HZ
>>
>> extern void rcu_enter_nohz(void);
>> @@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void)
>> }
>>
>> #endif /* #else #ifdef CONFIG_NO_HZ */
>> +#endif
>>
>> #if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU)
>> #include <linux/rcutree.h>
>> #elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU)
>> #include <linux/rcutiny.h>
>> +#elif defined(CONFIG_RCURING)
>> +#include <linux/rcuring.h>
>> #else
>> #error "Unknown RCU implementation specified to kernel configuration"
>> #endif
>> @@ -686,6 +690,7 @@ struct rcu_synchronize {
>>
>> extern void wakeme_after_rcu(struct rcu_head *head);
>>
>> +#ifndef CONFIG_RCURING
>> #ifdef CONFIG_PREEMPT_RCU
>>
>> /**
>> @@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head,
>> #define call_rcu call_rcu_sched
>>
>> #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
>> +#endif
>>
>> /**
>> * call_rcu_bh() - Queue an RCU for invocation after a quicker grace period.
>> diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h
>> new file mode 100644
>> index 0000000..343b932
>> --- /dev/null
>> +++ b/include/linux/rcuring.h
>> @@ -0,0 +1,195 @@
>> +/*
>> + * Read-Copy Update mechanism for mutual exclusion
>> + *
>> + * This program is free software; you can redistribute it and/or modify
>> + * it under the terms of the GNU General Public License as published by
>> + * the Free Software Foundation; either version 2 of the License, or
>> + * (at your option) any later version.
>> + *
>> + * This program is distributed in the hope that it will be useful,
>> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
>> + * GNU General Public License for more details.
>> + *
>> + * You should have received a copy of the GNU General Public License
>> + * along with this program; if not, write to the Free Software
>> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
>> + *
>> + * Copyright IBM Corporation, 2008
>> + * Copyright Fujitsu 2008-2010 Lai Jiangshan
>> + *
>> + * Authors: Dipankar Sarma <dipankar@xxxxxxxxxx>
>> + * Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
>> + * Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx> Hierarchical version
>> + * Lai Jiangshan <laijs@xxxxxxxxxxxxxx> RCURING version
>> + */
>> +
>> +#ifndef __LINUX_RCURING_H
>> +#define __LINUX_RCURING_H
>> +
>> +#define __rcu_read_lock_bh() local_bh_disable()
>> +#define __rcu_read_unlock_bh() local_bh_enable()
>> +#define __rcu_read_lock_sched() preempt_disable()
>> +#define __rcu_read_unlock_sched() preempt_enable()
>> +
>> +#ifdef CONFIG_RCURING_BH
>> +extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *));
>> +extern void synchronize_rcu_bh(void);
>> +extern void synchronize_rcu_bh_expedited(void);
>> +extern void rcu_bh_qs(int cpu);
>> +extern long rcu_batches_completed_bh(void);
>> +extern void rcu_barrier_bh(void);
>> +extern void rcu_bh_force_quiescent_state(void);
>> +#else /* CONFIG_RCURING_BH */
>> +#define call_rcu_bh call_rcu_sched
>> +#define synchronize_rcu_bh synchronize_rcu_sched
>> +#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited
>> +#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0)
>> +#define rcu_batches_completed_bh rcu_batches_completed_sched
>> +#define rcu_barrier_bh rcu_barrier_sched
>> +#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state
>> +#endif /* CONFIG_RCURING_BH */
>> +
>> +extern
>> +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *));
>> +extern void synchronize_rcu_sched(void);
>> +#define synchronize_sched synchronize_rcu_sched
>> +extern void synchronize_rcu_sched_expedited(void);
>> +#define synchronize_sched_expedited synchronize_rcu_sched_expedited
>> +extern void rcu_note_context_switch(int cpu);
>> +extern long rcu_batches_completed_sched(void);
>> +extern void rcu_barrier_sched(void);
>> +extern void rcu_sched_force_quiescent_state(void);
>> +
>> +/*
>> + * The @flags is valid only when RCURING_PREEMPT_SAVED is set.
>> + *
>> + * When preemptible rcu read site is preempted, we save RCURING_PREEMPT_SAVED,
>> + * RCURING_PREEMPT_QS and this read site's locked_complete(only the last
>> + * RCURING_COUNTERS_SHIFT bits) in the @flags.
>> + *
>> + * When the outmost rcu read site is closed, we will clear
>> + * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows
>> + * this task has passed a quiescent state and release the old read site's
>> + * locked_complete.
>> + */
>> +#define RCURING_PREEMPT_SAVED (1 << 31)
>> +#define RCURING_PREEMPT_QS (1 << 30)
>> +#define RCURING_PREEMPT_FLAGS (RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS)
>> +
>> +struct rcu_task_preempt {
>> + unsigned int nesting;
>> + unsigned int flags;
>> +};
>> +
>> +static inline void rcu_read_lock_preempt(struct rcu_task_preempt *rcu_task)
>> +{
>> + rcu_task->nesting++;
>> + barrier();
>> +}
>> +
>> +static inline void rcu_read_unlock_preempt(struct rcu_task_preempt *rcu_task)
>> +{
>> + int nesting = rcu_task->nesting;
>> +
>> + if (nesting == 1) {
>> + unsigned int flags;
>> +
>> + barrier();
>> + flags = ACCESS_ONCE(rcu_task->flags);
>> +
>> + if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) {
>> + flags &= ~RCURING_PREEMPT_QS;
>> + ACCESS_ONCE(rcu_task->flags) = flags;
>> + }
>> + }
>> +
>> + barrier();
>> + rcu_task->nesting = nesting - 1;
>> +}
>> +
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +
>> +#ifdef __GENARATING_OFFSETS__
>> +#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL)
>
> The above looks a bit strange. The idea is that if you were generating
> offsets for the fields within rcu_task_preempt, you would select the
> fields? Something like the following?

when __GENARATING_OFFSETS__, no one really use task_rcu_preempt(),
it just for avoiding the compiling warnings.

>
> #define example_offset(p) &(((struct rcu_task_preempt *)NULL)->field_a)
>
>> +#else
>> +#include <generated/kernel-offsets.h>
>> +#define task_rcu_preempt(p) \
>> + ((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT))
>> +#endif
>> +
>> +#define current_rcu_preempt() task_rcu_preempt(current)
>> +#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt())
>> +#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt())
>> +extern
>> +void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head *));
>> +#define call_rcu call_rcu_preempt
>> +extern void synchronize_rcu_preempt(void);
>> +#define synchronize_rcu synchronize_rcu_preempt
>> +extern void synchronize_rcu_preempt_expedited(void);
>> +#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited
>> +extern long rcu_batches_completed_preempt(void);
>> +#define rcu_batches_completed rcu_batches_completed_preempt
>> +extern void rcu_barrier_preempt(void);
>> +#define rcu_barrier rcu_barrier_preempt
>> +extern void rcu_preempt_force_quiescent_state(void);
>> +#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state
>> +
>> +struct task_struct;
>> +static inline void rcu_copy_process(struct task_struct *p)
>> +{
>> + struct rcu_task_preempt *rcu_task = task_rcu_preempt(p);
>> +
>> + rcu_task->nesting = 0;
>> + rcu_task->flags = 0;
>> +}
>> +
>> +static inline void exit_rcu(void)
>> +{
>> + if (current_rcu_preempt()->nesting) {
>> + WARN_ON(1);
>> + current_rcu_preempt()->nesting = 0;
>
> If the RCU core was waiting on this task, how does it learn that this
> task is no longer blocking the current grace period(s)?

A schedule() will be called after exit_rcu(), and RCU core collect infos
when task do schedule().

>
>> + }
>> +}
>> +
>> +#else /*CONFIG_RCURING_PREEMPT */
>> +
>> +#define __rcu_read_lock() __rcu_read_lock_sched();
>> +#define __rcu_read_unlock() __rcu_read_unlock_sched();
>> +#define call_rcu call_rcu_sched
>> +#define synchronize_rcu synchronize_rcu_sched
>> +#define synchronize_rcu_expedited synchronize_rcu_sched_expedited
>> +#define rcu_batches_completed rcu_batches_completed_sched
>> +#define rcu_barrier rcu_barrier_sched
>> +#define rcu_force_quiescent_state rcu_sched_force_quiescent_state
>> +
>> +static inline void rcu_copy_process(struct task_struct *p) {}
>> +static inline void exit_rcu(void) {}
>> +#endif /* CONFIG_RCURING_PREEMPT */
>> +
>> +#ifdef CONFIG_NO_HZ
>> +extern void rcu_kernel_enter(void);
>> +extern void rcu_kernel_exit(void);
>> +
>> +static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); }
>> +static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); }
>> +static inline void rcu_irq_enter(void) { rcu_kernel_enter(); }
>> +static inline void rcu_irq_exit(void) { rcu_kernel_exit(); }
>> +extern void rcu_enter_nohz(void);
>> +extern void rcu_exit_nohz(void);
>> +#else
>> +static inline void rcu_nmi_enter(void) {}
>> +static inline void rcu_nmi_exit(void) {}
>> +static inline void rcu_irq_enter(void) {}
>> +static inline void rcu_irq_exit(void) {}
>> +static inline void rcu_enter_nohz(void) {}
>> +static inline void rcu_exit_nohz(void) {}
>> +#endif
>> +
>> +extern int rcu_needs_cpu(int cpu);
>> +extern void rcu_check_callbacks(int cpu, int user);
>> +
>> +extern void rcu_scheduler_starting(void);
>> +extern int rcu_scheduler_active __read_mostly;
>> +
>> +#endif /* __LINUX_RCURING_H */
>> diff --git a/include/linux/sched.h b/include/linux/sched.h
>> index e18473f..15b332d 100644
>> --- a/include/linux/sched.h
>> +++ b/include/linux/sched.h
>> @@ -1211,6 +1211,10 @@ struct task_struct {
>> struct rcu_node *rcu_blocked_node;
>> #endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */
>>
>> +#ifdef CONFIG_RCURING_PREEMPT
>> + struct rcu_task_preempt rcu_task_preempt;
>> +#endif
>> +
>> #if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT)
>> struct sched_info sched_info;
>> #endif
>> @@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct task_struct *p)
>> INIT_LIST_HEAD(&p->rcu_node_entry);
>> }
>>
>> -#else
>> +#elif !defined(CONFIG_RCURING)
>>
>> static inline void rcu_copy_process(struct task_struct *p)
>> {
>> diff --git a/init/Kconfig b/init/Kconfig
>> index a619a1a..48e58d9 100644
>> --- a/init/Kconfig
>> +++ b/init/Kconfig
>> @@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU
>> for real-time UP systems. This option greatly reduces the
>> memory footprint of RCU.
>>
>> +config RCURING
>> + bool "Multi-GP ring based RCU"
>> +
>> endchoice
>>
>> config PREEMPT_RCU
>> @@ -450,6 +453,35 @@ config TREE_RCU_TRACE
>> TREE_PREEMPT_RCU implementations, permitting Makefile to
>> trivially select kernel/rcutree_trace.c.
>>
>> +config RCURING_BH
>> + bool "RCU for bh"
>> + default y
>> + depends on RCURING && !PREEMPT_RT
>> + help
>> + If Y, use a independent rcu domain for rcu_bh.
>> + If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh,
>> + rcu_read_unlock_bh).
>
> If I understand correctly, "N" defeats the purpose of bh, which is to
> have faster grace periods -- not having to wait for a context switch.

I remembered, BH is for defeating DOS, so somebody will not need
rcu domain for BH if he don't care this kind of DOS.

>
>> +
>> +config RCURING_BH_PREEMPT
>> + bool
>> + depends on PREEMPT_RT
>> + help
>> + rcu_bh will map to rcu_preempt.
>
> If I understand what you are doing, this config option will break
> networking.

In PREEMPT_RT, BH will be preemptible. but RCURING_BH_PREEMPT
is not used here, will be removed.

>
>> +
>> +config RCURING_PREEMPT
>> + bool "RCURING based reemptible RCU"
>> + default n
>> + depends on RCURING && PREEMPT
>> + help
>> + If Y, normal rcu functionality will map to rcu_preempt.
>> + If N, normal rcu functionality will map to rcu_sched.
>> +
>> +config RCURING_COUNTERS_SHIFT
>> + int "RCURING's counter shift"
>> + range 1 10
>> + depends on RCURING
>> + default 4
>> +
>> endmenu # "RCU Subsystem"
>>
>> config IKCONFIG
>> diff --git a/kernel/Makefile b/kernel/Makefile
>> index 92b7420..66eab8c 100644
>> --- a/kernel/Makefile
>> +++ b/kernel/Makefile
>> @@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o
>> obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o
>> obj-$(CONFIG_TINY_RCU) += rcutiny.o
>> obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o
>> +obj-$(CONFIG_RCURING) += rcuring.o
>> obj-$(CONFIG_RELAY) += relay.o
>> obj-$(CONFIG_SYSCTL) += utsname_sysctl.o
>> obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
>> diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c
>> new file mode 100644
>> index 0000000..0d30817
>> --- /dev/null
>> +++ b/kernel/kernel-offsets.c
>> @@ -0,0 +1,20 @@
>> +/*
>> + * Generate definitions needed by assembly language modules.
>> + *
>> + * Copyright (C) 2008-2010 Lai Jiangshan
>> + */
>> +
>> +#define __GENERATING_KERNEL_OFFSETS__
>> +#include <linux/rcupdate.h>
>> +#include <linux/sched.h>
>> +#include <linux/kbuild.h>
>> +
>> +void foo(void);
>> +
>> +void foo(void)
>> +{
>> +#ifdef CONFIG_RCURING_PREEMPT
>> + OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt);
>> +#endif
>> +}
>> +
>> diff --git a/kernel/rcuring.c b/kernel/rcuring.c
>> new file mode 100644
>> index 0000000..e7cedb5
>> --- /dev/null
>> +++ b/kernel/rcuring.c
>> @@ -0,0 +1,1002 @@
>> +/*
>> + * Read-Copy Update mechanism for mutual exclusion
>> + *
>> + * This program is free software; you can redistribute it and/or modify
>> + * it under the terms of the GNU General Public License as published by
>> + * the Free Software Foundation; either version 2 of the License, or
>> + * (at your option) any later version.
>> + *
>> + * This program is distributed in the hope that it will be useful,
>> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
>> + * GNU General Public License for more details.
>> + *
>> + * You should have received a copy of the GNU General Public License
>> + * along with this program; if not, write to the Free Software
>> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
>> + *
>> + * Copyright IBM Corporation, 2008
>> + * Copyright Fujitsu 2008-2010 Lai Jiangshan
>> + *
>> + * Authors: Dipankar Sarma <dipankar@xxxxxxxxxx>
>> + * Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
>> + * Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx> Hierarchical version
>> + * Lai Jiangshan <laijs@xxxxxxxxxxxxxx> RCURING version
>> + */
>> +
>> +#include <linux/rcupdate.h>
>> +#include <linux/percpu.h>
>> +#include <asm/atomic.h>
>> +#include <linux/spinlock.h>
>> +#include <linux/module.h>
>> +#include <linux/interrupt.h>
>> +#include <linux/cpu.h>
>> +
>> +#define RCURING_IDX_SHIFT CONFIG_RCURING_COUNTERS_SHIFT
>> +#define RCURING_IDX_MAX (1L << RCURING_IDX_SHIFT)
>> +#define RCURING_IDX_MASK (RCURING_IDX_MAX - 1)
>> +#define RCURING_IDX(complete) ((complete) & RCURING_IDX_MASK)
>> +
>> +struct rcuring {
>> + atomic_t ctr[RCURING_IDX_MAX];
>> + long curr_complete;
>> + long done_complete;
>> +
>> + raw_spinlock_t lock;
>> +};
>> +
>> +/* It is long history that we use -300 as an initial complete */
>> +#define RCURING_INIT_COMPLETE -300L
>> +#define RCURING_INIT(name) \
>> +{ \
>> + {[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),}, \
>> + RCURING_INIT_COMPLETE, \
>> + RCURING_INIT_COMPLETE - 1, \
>> + __RAW_SPIN_LOCK_UNLOCKED(name.lock), \
>
> Please use the gcc-style initializers here.

I will be use gcc-style initializers in next version.

>
>> +}
>> +
>> +static inline
>> +int rcuring_ctr_read(struct rcuring *rr, unsigned long complete)
>> +{
>> + int res;
>> +
>> + /* atomic_read() does not ensure to imply barrier(). */
>> + barrier();
>> + res = atomic_read(rr->ctr + RCURING_IDX(complete));
>> + barrier();
>> +
>> + return res;
>> +}
>> +
>> +void __rcuring_advance_done_complete(struct rcuring *rr)
>> +{
>> + long wait_complete = rr->done_complete + 1;
>> +
>> + if (!rcuring_ctr_read(rr, wait_complete)) {
>> + do {
>> + wait_complete++;
>> + } while (!rcuring_ctr_read(rr, wait_complete));
>> +
>> + ACCESS_ONCE(rr->done_complete) = wait_complete - 1;
>> + }
>> +}
>
> For a given grace period, all quiescent states hit the same counter.
> Or am I missing something?

No. see the top of this email.

>
>> +
>> +static inline
>> +int rcuring_could_advance_done_complete(struct rcuring *rr)
>> +{
>> + return !rcuring_ctr_read(rr, rr->done_complete + 1);
>> +}
>> +
>> +static inline
>> +void rcuring_advance_done_complete(struct rcuring *rr)
>> +{
>> + if (rcuring_could_advance_done_complete(rr)) {
>> + if (__raw_spin_trylock(&rr->lock)) {
>> + __rcuring_advance_done_complete(rr);
>> + __raw_spin_unlock(&rr->lock);
>> + }
>> + }
>> +}
>> +
>> +void __rcuring_advance_curr_complete(struct rcuring *rr, long next_complete)
>> +{
>> + long curr_complete = rr->curr_complete;
>> +
>> + if (curr_complete + 1 == next_complete) {
>> + if (!rcuring_ctr_read(rr, next_complete)) {
>> + ACCESS_ONCE(rr->curr_complete) = next_complete;
>> + smp_mb__before_atomic_inc();
>> + atomic_inc(rr->ctr + RCURING_IDX(next_complete));
>> + smp_mb__after_atomic_inc();
>> + atomic_dec(rr->ctr + RCURING_IDX(curr_complete));
>> + }
>> + }
>> +}
>> +
>> +static inline
>> +int rcuring_could_advance_complete(struct rcuring *rr, long next_complete)
>> +{
>> + if (rcuring_could_advance_done_complete(rr))
>> + return 1;
>> +
>> + if (rr->curr_complete + 1 == next_complete) {
>> + if (!rcuring_ctr_read(rr, next_complete))
>> + return 1;
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +static inline
>> +void rcuring_advance_complete(struct rcuring *rr, long next_complete)
>> +{
>> + if (rcuring_could_advance_complete(rr, next_complete)) {
>> + if (__raw_spin_trylock(&rr->lock)) {
>> + __rcuring_advance_done_complete(rr);
>> + __rcuring_advance_curr_complete(rr, next_complete);
>> + __raw_spin_unlock(&rr->lock);
>> + }
>> + }
>> +}
>> +
>> +static inline
>> +void rcuring_advance_complete_force(struct rcuring *rr, long next_complete)
>> +{
>> + if (rcuring_could_advance_complete(rr, next_complete)) {
>> + __raw_spin_lock(&rr->lock);
>> + __rcuring_advance_done_complete(rr);
>> + __rcuring_advance_curr_complete(rr, next_complete);
>> + __raw_spin_unlock(&rr->lock);
>> + }
>> +}
>> +
>> +static inline long __locked_complete_adjust(int locked_idx, long complete)
>> +{
>> + long locked_complete;
>> +
>> + locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx;
>> + if (locked_complete - complete <= 0)
>> + return locked_complete;
>> + else
>> + return locked_complete - RCURING_IDX_MAX;
>> +}
>> +
>> +static long rcuring_lock(struct rcuring *rr)
>> +{
>> + long locked_complete;
>> + int idx;
>> +
>> + for (;;) {
>> + idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete));
>> + if (likely(atomic_inc_not_zero(rr->ctr + idx)))
>> + break;
>> + }
>> + smp_mb__after_atomic_inc();
>> +
>> + locked_complete = ACCESS_ONCE(rr->curr_complete);
>> + if (likely(RCURING_IDX(locked_complete) == idx))
>> + return locked_complete;
>> + else
>> + return __locked_complete_adjust(idx, locked_complete);
>> +}
>> +
>> +static void rcuring_unlock(struct rcuring *rr, long locked_complete)
>> +{
>> + smp_mb__before_atomic_dec();
>> +
>> + atomic_dec(rr->ctr + RCURING_IDX(locked_complete));
>> +}
>> +
>> +static inline
>> +void rcuring_dup_lock(struct rcuring *rr, long locked_complete)
>> +{
>> + atomic_inc(rr->ctr + RCURING_IDX(locked_complete));
>> +}
>> +
>> +static inline
>> +long rcuring_advance_lock(struct rcuring *rr, long locked_complete)
>> +{
>> + long new_locked_complete = locked_complete;
>> +
>> + if (locked_complete != rr->curr_complete) {
>> + /*
>> + * Lock the new complete at first, and then release
>> + * the old one. It prevents errors when NMI/SMI occurs
>> + * after rcuring_unlock() - we still lock it.
>> + */
>> + new_locked_complete = rcuring_lock(rr);
>> + rcuring_unlock(rr, locked_complete);
>> + }
>> +
>> + return new_locked_complete;
>> +}
>> +
>> +static inline long rcuring_get_done_complete(struct rcuring *rr)
>> +{
>> + return ACCESS_ONCE(rr->done_complete);
>> +}
>> +
>> +static inline long rcuring_get_curr_complete(struct rcuring *rr)
>> +{
>> + return ACCESS_ONCE(rr->curr_complete);
>> +}
>> +
>> +struct rcu_batch {
>> + struct rcu_head *list, **tail;
>> +};
>> +
>> +static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from)
>> +{
>> + if (from->list != NULL) {
>> + *to->tail = from->list;
>> + to->tail = from->tail;
>> +
>> + from->list = NULL;
>> + from->tail = &from->list;
>> + }
>> +}
>> +
>> +static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new)
>> +{
>> + *batch->tail = new;
>> + batch->tail = &new->next;
>> +}
>> +
>> +struct rcu_data {
>> + long locked_complete;
>> +
>> + /*
>> + * callbacks are in batches (done_complete, curr_complete]
>> + * curr_complete - done_complete >= 0
>> + * curr_complete - done_complete <= RCURING_IDX_MAX
>> + * domain's curr_complete - curr_complete >= 0
>> + * domain's done_complete - done_complete >= 0
>> + *
>> + * curr_complete == done_complete just means @batch is empty.
>> + * we have at least one callback in batch[RCURING_IDX(done_complete)]
>> + * if curr_complete != done_complete
>> + */
>> + long curr_complete;
>> + long done_complete;
>> +
>> + struct rcu_batch batch[RCURING_IDX_MAX];
>> + struct rcu_batch done_batch;
>> +
>> + unsigned int qlen;
>> + unsigned long jiffies;
>> + unsigned long stall_jiffies;
>> +};
>> +
>> +struct rcu_domain {
>> + struct rcuring rcuring;
>> + const char *domain_name;
>> + struct rcu_data __percpu *rcu_data;
>> +
>> + spinlock_t lock; /* this lock is for fqs or other misc things */
>> + long fqs_complete;
>> + void (*force_quiescent_state)(struct rcu_domain *domain,
>> + long fqs_complete);
>> +};
>> +
>> +#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1)
>> +
>> +static inline long gp_reserved(struct rcu_domain *domain)
>> +{
>> + return EXPEDITED_GP_RESERVED_RECOMMEND;
>> +}
>> +
>> +static inline void __init rcu_data_init(struct rcu_data *rdp)
>> +{
>> + int idx;
>> +
>> + for (idx = 0; idx < RCURING_IDX_MAX; idx++)
>> + rdp->batch[idx].tail = &rdp->batch[idx].list;
>> +
>> + rdp->done_batch.tail = &rdp->done_batch.list;
>> +}
>> +
>> +static void do_advance_callbacks(struct rcu_data *rdp, long done_complete)
>> +{
>> + int idx;
>> +
>> + while (rdp->done_complete != done_complete) {
>> + rdp->done_complete++;
>> + idx = RCURING_IDX(rdp->done_complete);
>> + rcu_batch_merge(&rdp->done_batch, rdp->batch + idx);
>> + }
>> +}
>> +
>> +/* Is value in the range (@left_open, @right_close] */
>> +static inline bool in_range(long left_open, long right_close, long value)
>> +{
>> + return left_open - value < 0 && value - right_close <= 0;
>> +}
>> +
>> +static void advance_callbacks(struct rcu_data *rdp, long done_complete,
>> + long curr_complete)
>> +{
>> + if (in_range(done_complete, curr_complete, rdp->curr_complete)) {
>> + do_advance_callbacks(rdp, done_complete);
>> + } else {
>> + do_advance_callbacks(rdp, rdp->curr_complete);
>> + rdp->curr_complete = done_complete;
>> + rdp->done_complete = done_complete;
>> + }
>> +}
>> +
>> +static void __force_quiescent_state(struct rcu_domain *domain,
>> + long fqs_complete)
>> +{
>> + int cpu;
>> + long fqs_complete_old;
>> + long curr_complete = rcuring_get_curr_complete(&domain->rcuring);
>> + long done_complete = rcuring_get_done_complete(&domain->rcuring);
>> +
>> + spin_lock(&domain->lock);
>> +
>> + fqs_complete_old = domain->fqs_complete;
>> + if (!in_range(done_complete, curr_complete, fqs_complete_old))
>> + fqs_complete_old = done_complete;
>> +
>> + if (fqs_complete_old - fqs_complete >= 0) {
>> + spin_unlock(&domain->lock);
>> + return;
>> + }
>> +
>> + domain->fqs_complete = fqs_complete;
>> + spin_unlock(&domain->lock);
>> +
>> + for_each_online_cpu(cpu) {
>> + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
>> + long locked_complete = ACCESS_ONCE(rdp->locked_complete);
>> +
>> + if (!in_range(fqs_complete_old, fqs_complete, locked_complete))
>> + continue;
>
> Is preemption disabled here?

Yes, it called with local irq disabled.

>
>> +
>> + if (cpu == smp_processor_id())
>> + set_tsk_need_resched(current);
>> + else
>> + smp_send_reschedule(cpu);
>> + }
>> +}
>> +
>> +static void force_quiescent_state(struct rcu_domain *domain, long fqs_complete)
>> +{
>> + domain->force_quiescent_state(domain, fqs_complete);
>> +}
>> +
>> +static
>> +long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data *rdp)
>> +{
>> + long curr_complete, done_complete;
>> + struct rcuring *rr = &domain->rcuring;
>> +
>> + smp_mb();
>> + curr_complete = rcuring_get_curr_complete(rr);
>> + done_complete = rcuring_get_done_complete(rr);
>> +
>> + advance_callbacks(rdp, done_complete, curr_complete);
>> + rdp->curr_complete = curr_complete;
>> +
>> + if (!rdp->qlen) {
>> + rdp->jiffies = jiffies;
>> + rdp->stall_jiffies = jiffies;
>> + }
>> +
>> + return curr_complete;
>> +}
>> +
>> +static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head,
>> + void (*func)(struct rcu_head *))
>> +{
>> + struct rcu_data *rdp;
>> + long curr_complete;
>> +
>> + head->next = NULL;
>> + head->func = func;
>> +
>> + rdp = this_cpu_ptr(domain->rcu_data);
>> + curr_complete = prepare_for_new_callback(domain, rdp);
>> + rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head);
>> + rdp->qlen++;
>> +
>> + return curr_complete;
>> +}
>> +
>> +static void print_rcuring_stall(struct rcu_domain *domain)
>> +{
>> + struct rcuring *rr = &domain->rcuring;
>> + unsigned long curr_complete = rcuring_get_curr_complete(rr);
>> + unsigned long done_complete = rcuring_get_done_complete(rr);
>> + int cpu;
>> +
>> + printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n", smp_processor_id(),
>> + domain->domain_name);
>> +
>> + printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n",
>> + curr_complete, done_complete);
>> + printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n",
>> + rcuring_ctr_read(rr, curr_complete),
>> + rcuring_ctr_read(rr, done_complete + 1));
>> +
>> + for_each_online_cpu(cpu) {
>> + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
>> + printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)="
>> + "%ld/%ld/%ld\n", cpu, rdp->qlen,
>> + rdp->locked_complete, rdp->done_complete,
>> + rdp->curr_complete);
>> + }
>> +}
>> +
>> +static void __rcu_check_callbacks(struct rcu_domain *domain,
>> + struct rcu_data *rdp, int in_rcu_softirq)
>> +{
>> + long curr_complete, done_complete;
>> + struct rcuring *rr = &domain->rcuring;
>> +
>> + if (!rdp->qlen)
>> + return;
>> +
>> + rcuring_advance_done_complete(rr);
>> +
>> + curr_complete = rcuring_get_curr_complete(rr);
>> + done_complete = ACCESS_ONCE(rr->done_complete);
>> + advance_callbacks(rdp, done_complete, curr_complete);
>> +
>> + if (rdp->curr_complete == curr_complete
>> + && rdp->batch[RCURING_IDX(rdp->curr_complete)].list) {
>> + long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain);
>> +
>> + if (curr_complete - done_complete <= max_gp_allow)
>> + rcuring_advance_complete(rr, curr_complete + 1);
>> + }
>> +
>> + if (rdp->done_batch.list) {
>> + if (!in_rcu_softirq)
>> + raise_softirq(RCU_SOFTIRQ);
>> + } else {
>> + if (jiffies - rdp->jiffies > 10) {
>> + force_quiescent_state(domain, rdp->curr_complete);
>> + rdp->jiffies = jiffies;
>> + }
>> + if (jiffies - rdp->stall_jiffies > 2 * HZ) {
>> + print_rcuring_stall(domain);
>> + rdp->stall_jiffies = jiffies;
>> + }
>> + }
>> +}
>> +
>> +static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp)
>> +{
>> + int count = 0;
>> + struct rcu_head *list, *next;
>> +
>> + list = rdp->done_batch.list;
>> +
>> + if (list) {
>> + rdp->done_batch.list = NULL;
>> + rdp->done_batch.tail = &rdp->done_batch.list;
>> +
>> + local_irq_enable();
>> +
>> + smp_mb();
>> +
>> + while (list) {
>> + next = list->next;
>> + prefetch(next);
>> + list->func(list);
>> + count++;
>> + list = next;
>> + }
>> + local_irq_disable();
>> +
>> + rdp->qlen -= count;
>> + rdp->jiffies = jiffies;
>> + }
>> +}
>> +
>> +static int __rcu_needs_cpu(struct rcu_data *rdp)
>> +{
>> + return !!rdp->qlen;
>> +}
>> +
>> +static inline
>> +void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp)
>> +{
>> + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
>> + rdp->locked_complete);
>> +}
>> +
>> +static inline void rcu_preempt_save_complete(struct rcu_task_preempt *rcu_task,
>> + long locked_complete)
>> +{
>> + unsigned int new_flags;
>> +
>> + new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete);
>> + ACCESS_ONCE(rcu_task->flags) = new_flags;
>> +}
>> +
>> +/*
>> + * Every cpu hold a lock_complete. But for preempt rcu, any task may also
>> + * hold a lock_complete. This function increases lock_complete for the current
>> + * cpu and check(dup_lock_and_save or release or advance) the lock_complete of
>> + * the current task.
>> + */
>> +static inline
>> +void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp,
>> + struct rcu_task_preempt *rcu_task)
>> +{
>> + long locked_complete = rdp->locked_complete;
>> + unsigned int flags = ACCESS_ONCE(rcu_task->flags);
>> +
>> + if (!(flags & RCURING_PREEMPT_SAVED)) {
>> + BUG_ON(flags & RCURING_PREEMPT_QS);
>> + if (rcu_task->nesting) {
>> + /* dup_lock_and_save current task's lock_complete */
>> + rcuring_dup_lock(&domain->rcuring, locked_complete);
>> + rcu_preempt_save_complete(rcu_task, locked_complete);
>> + }
>> + } else if (!(rcu_task->nesting)) {
>> + /* release current task's lock_complete */
>> + rcuring_unlock(&domain->rcuring, (long)flags);
>> + ACCESS_ONCE(rcu_task->flags) = 0;
>> + } else if (!(flags & RCURING_PREEMPT_QS)) {
>> + /* advance_and_save current task's lock_complete */
>> + if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) {
>> + rcuring_unlock(&domain->rcuring, (long)flags);
>> + rcuring_dup_lock(&domain->rcuring, locked_complete);
>> + }
>> + rcu_preempt_save_complete(rcu_task, locked_complete);
>> + }
>> +
>> + /* increases lock_complete for the current cpu */
>> + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
>> + locked_complete);
>> +}
>
> When we need to boost the priority of preempted readers, how do we tell
> who they are?

And struct list_head in struct rcu_task_preempt, and a ring struct list_head
in rdp, and use percpu locks or scheduler's runqueues's locks
protect all these struct list_head.

>
>> +
>> +static void __synchronize_rcu(struct rcu_domain *domain, int expedited)
>> +{
>> + struct rcu_synchronize rcu;
>> + long complete;
>> +
>> + init_completion(&rcu.completion);
>> +
>> + local_irq_disable();
>> + complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu);
>> +
>> + if (expedited) {
>> + /*
>> + * Fore a new gp to be started immediately(can use reserved gp).
>> + * But if all gp are used, it will fail to start new gp,
>> + * and we have to wait until some new gps available.
>> + */
>> + rcuring_advance_complete_force(&domain->rcuring,
>> + complete + 1);
>
> It looks to me like rcuring_advance_complete_force() can just fail
> silently. How does this work?

If it fails, We have no GPs lefts, synchronize_rcu_expedited()
will becomes synchronize_rcu(). the next fqs will expedite it only a little.

It's OK, we have 3 reserved GPs by default, It means we allows 3
synchronize_rcu_expedited()s concurrent at least and it's enough.

>
>> +
>> + /* Fore qs and expedite the new gp */
>> + force_quiescent_state(domain, complete);
>> + }
>> + local_irq_enable();
>> +
>> + wait_for_completion(&rcu.completion);
>> +}
>> +
>> +static inline long __rcu_batches_completed(struct rcu_domain *domain)
>> +{
>> + return rcuring_get_done_complete(&domain->rcuring);
>> +}
>> +
>> +#ifdef CONFIG_RCURING_BH
>> +#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args)
>
> Where is gen_code() defined?
>
>> +#else
>> +#define gen_code_for_bh(gen_code, args...)
>> +#endif
>> +#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args)
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args)
>> +#else
>> +#define gen_code_for_preempt(gen_code, args...)
>> +#endif
>> +
>> +#define GEN_CODES(gen_code, args...) \
>> + gen_code_for_bh(gen_code, ##args) \
>> + gen_code_for_sched(gen_code, ##args) \
>> + gen_code_for_preempt(gen_code, ##args) \
>> +
>> +#define gen_basic_code(domain) \
>> + \
>> +static void force_quiescent_state_##domain(struct rcu_domain *domain, \
>> + long fqs_complete); \
>> + \
>> +static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain); \
>> + \
>> +struct rcu_domain rcu_domain_##domain = \
>> +{ \
>> + .rcuring = RCURING_INIT(rcu_domain_##domain.rcuring), \
>> + .domain_name = #domain, \
>> + .rcu_data = &rcu_data_##domain, \
>> + .lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock), \
>> + .force_quiescent_state = force_quiescent_state_##domain, \
>> +}; \
>> + \
>> +void call_rcu_##domain(struct rcu_head *head, \
>> + void (*func)(struct rcu_head *)) \
>> +{ \
>> + unsigned long flags; \
>> + \
>> + local_irq_save(flags); \
>> + __call_rcu(&rcu_domain_##domain, head, func); \
>> + local_irq_restore(flags); \
>> +} \
>> +EXPORT_SYMBOL_GPL(call_rcu_##domain); \
>> + \
>> +void synchronize_rcu_##domain(void) \
>> +{ \
>> + __synchronize_rcu(&rcu_domain_##domain, 0); \
>> +} \
>> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain); \
>> + \
>> +void synchronize_rcu_##domain##_expedited(void) \
>> +{ \
>> + __synchronize_rcu(&rcu_domain_##domain, 1); \
>> +} \
>> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited); \
>> + \
>> +long rcu_batches_completed_##domain(void) \
>> +{ \
>> + return __rcu_batches_completed(&rcu_domain_##domain); \
>> +} \
>> +EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain); \
>> + \
>> +void rcu_##domain##_force_quiescent_state(void) \
>> +{ \
>> + force_quiescent_state(&rcu_domain_##domain, 0); \
>> +} \
>> +EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state);
>
> I used to use a single macro for the synchronize_rcu*() APIs, but was
> talked into separate functions. Maybe some of the more ornate recent
> macros have shifted people away from duplicate code in functions, so
> might be time to try again. ;-)

Too much dumplicated code, I have to do this :-)

>
>> +GEN_CODES(gen_basic_code)
>> +
>> +static DEFINE_PER_CPU(int, kernel_count);
>> +
>> +#define LOCK_COMPLETE(domain) \
>> + long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring);
>> +#define SAVE_COMPLETE(domain) \
>> + per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain;
>> +#define LOAD_COMPLETE(domain) \
>> + int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete;
>> +#define RELEASE_COMPLETE(domain) \
>> + rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain);
>> +
>> +static void __rcu_kernel_enter_outmost(int cpu)
>> +{
>> + GEN_CODES(LOCK_COMPLETE)
>> +
>> + barrier();
>> + per_cpu(kernel_count, cpu) = 1;
>> + barrier();
>> +
>> + GEN_CODES(SAVE_COMPLETE)
>> +}
>> +
>> +static void __rcu_kernel_exit_outmost(int cpu)
>> +{
>> + GEN_CODES(LOAD_COMPLETE)
>> +
>> + barrier();
>> + per_cpu(kernel_count, cpu) = 0;
>> + barrier();
>> +
>> + GEN_CODES(RELEASE_COMPLETE)
>> +}
>> +
>> +#ifdef CONFIG_RCURING_BH
>> +static void force_quiescent_state_bh(struct rcu_domain *domain,
>> + long fqs_complete)
>> +{
>> + __force_quiescent_state(domain, fqs_complete);
>> +}
>> +
>> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu)
>> +{
>> + __rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu));
>> +}
>> +
>> +void rcu_bh_qs(int cpu)
>> +{
>> + unsigned long flags;
>> +
>> + local_irq_save(flags);
>> + rcu_bh_qsctr_inc_irqoff(cpu);
>> + local_irq_restore(flags);
>> +}
>> +
>> +#else /* CONFIG_RCURING_BH */
>> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); }
>> +#endif /* CONFIG_RCURING_BH */
>> +
>> +static void force_quiescent_state_sched(struct rcu_domain *domain,
>> + long fqs_complete)
>> +{
>> + __force_quiescent_state(domain, fqs_complete);
>> +}
>> +
>> +static inline void rcu_sched_qsctr_inc_irqoff(int cpu)
>> +{
>> + __rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu));
>> +}
>> +
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +static void force_quiescent_state_preempt(struct rcu_domain *domain,
>> + long fqs_complete)
>> +{
>> + /* To Be Implemented */
>> +}
>> +
>> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
>> +{
>> + __rcu_preempt_qsctr_inc(&rcu_domain_preempt,
>> + &per_cpu(rcu_data_preempt, cpu),
>> + &current->rcu_task_preempt);
>> +}
>> +
>> +#else /* CONFIG_RCURING_PREEMPT */
>> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
>> +{
>> + (void)(cpu);
>> + (void)(__rcu_preempt_qsctr_inc);
>> +}
>> +#endif /* CONFIG_RCURING_PREEMPT */
>> +
>> +#define gen_needs_cpu(domain, cpu) \
>> + if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu))) \
>> + return 1;
>> +int rcu_needs_cpu(int cpu)
>> +{
>> + GEN_CODES(gen_needs_cpu, cpu)
>> + return 0;
>> +}
>> +
>> +#define call_func(domain, func, args...) \
>> + func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args);
>> +
>> +/*
>> + * Note a context switch. This is a quiescent state for RCU-sched,
>> + * and requires special handling for preemptible RCU.
>> + */
>> +void rcu_note_context_switch(int cpu)
>> +{
>> + unsigned long flags;
>> +
>> +#ifdef CONFIG_HOTPLUG_CPU
>> + /*
>> + * The stoper thread need to schedule() to the idle thread
>> + * after CPU_DYING.
>> + */
>> + if (unlikely(!per_cpu(kernel_count, cpu))) {
>> + BUG_ON(cpu_online(cpu));
>> + return;
>> + }
>> +#endif
>> +
>> + local_irq_save(flags);
>> + rcu_bh_qsctr_inc_irqoff(cpu);
>> + rcu_sched_qsctr_inc_irqoff(cpu);
>> + rcu_preempt_qsctr_inc_irqoff(cpu);
>> + local_irq_restore(flags);
>> +}
>> +
>> +static int rcu_cpu_idle(int cpu)
>> +{
>> + return idle_cpu(cpu) && rcu_scheduler_active &&
>> + !in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT);
>> +}
>> +
>> +void rcu_check_callbacks(int cpu, int user)
>> +{
>> + unsigned long flags;
>> +
>> + local_irq_save(flags);
>> +
>> + if (user || rcu_cpu_idle(cpu)) {
>> + rcu_bh_qsctr_inc_irqoff(cpu);
>> + rcu_sched_qsctr_inc_irqoff(cpu);
>> + rcu_preempt_qsctr_inc_irqoff(cpu);
>> + } else if (!in_softirq())
>> + rcu_bh_qsctr_inc_irqoff(cpu);
>> +
>> + GEN_CODES(call_func, __rcu_check_callbacks, 0)
>> + local_irq_restore(flags);
>> +}
>> +
>> +static void rcu_process_callbacks(struct softirq_action *unused)
>> +{
>> + local_irq_disable();
>> + GEN_CODES(call_func, __rcu_check_callbacks, 1)
>> + GEN_CODES(call_func, rcu_do_batch)
>> + local_irq_enable();
>> +}
>> +
>> +static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain,
>> + struct rcu_data *off_rdp, struct rcu_data *rdp)
>> +{
>> + if (off_rdp->qlen > 0) {
>> + unsigned long flags;
>> + long curr_complete;
>> +
>> + /* move all callbacks in @off_rdp to @off_rdp->done_batch */
>> + do_advance_callbacks(off_rdp, off_rdp->curr_complete);
>> +
>> + /* move all callbacks in @off_rdp to this cpu(@rdp) */
>> + local_irq_save(flags);
>> + curr_complete = prepare_for_new_callback(domain, rdp);
>> + rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete),
>> + &off_rdp->done_batch);
>> + rdp->qlen += off_rdp->qlen;
>> + local_irq_restore(flags);
>> +
>> + off_rdp->qlen = 0;
>> + }
>> +}
>> +
>> +#define gen_offline(domain, cpu, recieve_cpu) \
>> + __rcu_offline_cpu(&rcu_domain_##domain, \
>> + &per_cpu(rcu_data_##domain, cpu), \
>> + &per_cpu(rcu_data_##domain, recieve_cpu));
>> +
>> +static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
>> + unsigned long action, void *hcpu)
>> +{
>> + int cpu = (long)hcpu;
>> + int recieve_cpu;
>> +
>> + (void)(recieve_cpu);
>> + (void)(__rcu_offline_cpu);
>> + (void)(__rcu_kernel_exit_outmost);
>> +
>> + switch (action) {
>> +#ifdef CONFIG_HOTPLUG_CPU
>> + case CPU_DYING:
>> + case CPU_DYING_FROZEN:
>> + /*
>> + * the machine is stopped now, we can access to
>> + * any other cpu data.
>> + * */
>> + recieve_cpu = cpumask_any_but(cpu_online_mask, cpu);
>> + GEN_CODES(gen_offline, cpu, recieve_cpu)
>> + __rcu_kernel_exit_outmost(cpu);
>> + break;
>> +#endif
>> + case CPU_STARTING:
>> + case CPU_STARTING_FROZEN:
>> + __rcu_kernel_enter_outmost(cpu);
>> + default:
>> + break;
>> + }
>> + return NOTIFY_OK;
>> +}
>> +
>> +#define rcu_init_domain_data(domain, cpu) \
>> + for_each_possible_cpu(cpu) \
>> + rcu_data_init(&per_cpu(rcu_data_##domain, cpu));
>> +
>> +void __init rcu_init(void)
>> +{
>> + int cpu;
>> +
>> + GEN_CODES(rcu_init_domain_data, cpu)
>
> This one actually consumes more lines than would writing out the
> calls explicitly. ;-)

like following? more lines and more "#ifdef"
GEN_CODES reduces dumplicated code and reduces "#ifdef"s.

#ifdef CONFIG_RCURING_BH
for_each_possible_cpu(cpu)
rcu_data_init(&per_cpu(rcu_data_bh, cpu));
#endif
for_each_possible_cpu(cpu)
rcu_data_init(&per_cpu(rcu_data_sched, cpu));
#ifdef CONFIG_RCURING_PREEMPT
for_each_possible_cpu(cpu)
rcu_data_init(&per_cpu(rcu_data_preempt, cpu));
#endif

>
>> +
>> + __rcu_kernel_enter_outmost(raw_smp_processor_id());
>> + open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
>> + cpu_notifier(rcu_cpu_notify, 0);
>> +}
>> +
>> +int rcu_scheduler_active __read_mostly;
>> +EXPORT_SYMBOL_GPL(rcu_scheduler_active);
>> +
>> +/*
>> + * This function is invoked towards the end of the scheduler's initialization
>> + * process. Before this is called, the idle task might contain
>> + * RCU read-side critical sections (during which time, this idle
>> + * task is booting the system). After this function is called, the
>> + * idle tasks are prohibited from containing RCU read-side critical
>> + * sections. This function also enables RCU lockdep checking.
>> + */
>> +void rcu_scheduler_starting(void)
>> +{
>> + rcu_scheduler_active = 1;
>> +}
>> +
>> +#ifdef CONFIG_NO_HZ
>> +
>> +void rcu_kernel_enter(void)
>> +{
>> + unsigned long flags;
>> +
>> + raw_local_irq_save(flags);
>> + if (__get_cpu_var(kernel_count) == 0)
>> + __rcu_kernel_enter_outmost(raw_smp_processor_id());
>> + else
>> + __get_cpu_var(kernel_count)++;
>> + raw_local_irq_restore(flags);
>> +}
>> +
>> +void rcu_kernel_exit(void)
>> +{
>> + unsigned long flags;
>> +
>> + raw_local_irq_save(flags);
>> + if (__get_cpu_var(kernel_count) == 1)
>> + __rcu_kernel_exit_outmost(raw_smp_processor_id());
>> + else
>> + __get_cpu_var(kernel_count)--;
>> + raw_local_irq_restore(flags);
>> +}
>> +
>> +void rcu_enter_nohz(void)
>> +{
>> + unsigned long flags;
>> +
>> + raw_local_irq_save(flags);
>> + __rcu_kernel_exit_outmost(raw_smp_processor_id());
>> + raw_local_irq_restore(flags);
>> +}
>> +
>> +void rcu_exit_nohz(void)
>> +{
>> + unsigned long flags;
>> +
>> + raw_local_irq_save(flags);
>> + __rcu_kernel_enter_outmost(raw_smp_processor_id());
>> + raw_local_irq_restore(flags);
>> +}
>> +#endif /* CONFIG_NO_HZ */
>> +
>> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
>> +static struct completion rcu_barrier_completion;
>> +static struct kref rcu_barrier_wait_ref;
>> +static DEFINE_MUTEX(rcu_barrier_mutex);
>> +
>> +static void rcu_barrier_release_ref(struct kref *notused)
>> +{
>> + complete(&rcu_barrier_completion);
>> +}
>> +
>> +static void rcu_barrier_callback(struct rcu_head *notused)
>> +{
>> + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
>> +}
>> +
>> +static void rcu_barrier_func(void *data)
>> +{
>> + struct rcu_domain *rcu_domain = data;
>> +
>> + kref_get(&rcu_barrier_wait_ref);
>> + __call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head),
>> + rcu_barrier_callback);
>> +}
>> +
>> +static void __rcu_barrier(struct rcu_domain *rcu_domain)
>> +{
>> + mutex_lock(&rcu_barrier_mutex);
>> +
>> + init_completion(&rcu_barrier_completion);
>> + kref_init(&rcu_barrier_wait_ref);
>> +
>> + /* queue barrier rcu_heads for every cpu */
>> + on_each_cpu(rcu_barrier_func, rcu_domain, 1);
>> +
>> + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
>> + wait_for_completion(&rcu_barrier_completion);
>> +
>> + mutex_unlock(&rcu_barrier_mutex);
>> +}
>> +
>> +#define gen_rcu_barrier(domain) \
>> +void rcu_barrier_##domain(void) \
>> +{ \
>> + __rcu_barrier(&rcu_domain_##domain); \
>> +} \
>> +EXPORT_SYMBOL_GPL(rcu_barrier_##domain);
>> +
>> +GEN_CODES(gen_rcu_barrier)
>> +
>> diff --git a/kernel/sched.c b/kernel/sched.c
>> index 09b574e..967f25a 100644
>> --- a/kernel/sched.c
>> +++ b/kernel/sched.c
>> @@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = {
>> };
>> #endif /* CONFIG_CGROUP_CPUACCT */
>>
>> +#ifndef CONFIG_RCURING
>> #ifndef CONFIG_SMP
>>
>> void synchronize_sched_expedited(void)
>> @@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void)
>> EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
>>
>> #endif /* #else #ifndef CONFIG_SMP */
>> +#endif /* CONFIG_RCURING */
>> diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c
>> index 3e216e0..0eba561 100644
>> --- a/kernel/time/tick-sched.c
>> +++ b/kernel/time/tick-sched.c
>> @@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle)
>> cpu = smp_processor_id();
>> ts = &per_cpu(tick_cpu_sched, cpu);
>>
>> + /* Don't enter nohz when cpu is offlining, it is going to die */
>> + if (cpu_is_offline(cpu))
>> + goto end;
>> +
>> /*
>> * Call to tick_nohz_start_idle stops the last_update_time from being
>> * updated. Thus, it must not be called in the event we are called from
>>
>
>

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/