Re: [PATCH] kfree_rcu() should use the new kfree_bulk() interface for freeing rcu structures
From: Jesper Dangaard Brouer
Date: Wed Dec 20 2017 - 02:07:10 EST
On Tue, 19 Dec 2017 16:20:51 -0800
"Paul E. McKenney" <paulmck@xxxxxxxxxxxxxxxxxx> wrote:
> On Tue, Dec 19, 2017 at 02:12:06PM -0800, Matthew Wilcox wrote:
> > On Tue, Dec 19, 2017 at 09:41:58PM +0100, Jesper Dangaard Brouer wrote:
> > > If I had to implement this: I would choose to do the optimization in
> > > __rcu_process_callbacks() create small on-call-stack ptr-array for
> > > kfree_bulk(). I would only optimize the case that call kfree()
> > > directly. In the while(list) loop I would defer calling
> > > __rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
> > > them to the ptr-array (and flush if the array is full in loop, and
> > > kfree_bulk flush after loop).
> > >
> > > The real advantage of kfree_bulk() comes from amortizing the per kfree
> > > (behind-the-scenes) sync cost. There is an additional benefit, because
> > > objects comes from RCU and will hit a slower path in SLUB. The SLUB
> > > allocator is very fast for objects that gets recycled quickly (short
> > > lifetime), non-locked (cpu-local) double-cmpxchg. But slower for
> > > longer-lived/more-outstanding objects, as this hits a slower code-path,
> > > fully locked (cross-cpu) double-cmpxchg.
> >
> > Something like this ... (compile tested only)
Yes, exactly.
> > Considerably less code; Rao, what do you think?
>
> I am sorry, but I am not at all fan of this approach.
>
> If we are going to make this sort of change, we should do so in a way
> that allows the slab code to actually do the optimizations that might
> make this sort of thing worthwhile. After all, if the main goal was small
> code size, the best approach is to drop kfree_bulk() and get on with life
> in the usual fashion.
>
> I would prefer to believe that something like kfree_bulk() can help,
> and if that is the case, we should give it a chance to do things like
> group kfree_rcu() requests by destination slab and soforth, allowing
> batching optimizations that might provide more significant increases
> in performance. Furthermore, having this in slab opens the door to
> slab taking emergency action when memory is low.
I agree with your argument. Although in the (slub) code I do handle
different destination slab's, but only do a limited look-ahead to find
same dest-slab's which gives the speedup (see build_detached_freelist).
We do have a larger and more consistent speedup potential, if adding
infrastructure that allow us to pre-sort by destination slab, before
invoking kfree_bulk(). In that respect, Rao's patch is a better
approach.
> But for the patch below, NAK.
>
> Thanx, Paul
>
> > diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
> > index 59c471de342a..5ac4ed077233 100644
> > --- a/kernel/rcu/rcu.h
> > +++ b/kernel/rcu/rcu.h
> > @@ -174,20 +174,19 @@ static inline void debug_rcu_head_unqueue(struct rcu_head *head)
> > }
> > #endif /* #else !CONFIG_DEBUG_OBJECTS_RCU_HEAD */
> >
> > -void kfree(const void *);
> > -
> > /*
> > * Reclaim the specified callback, either by invoking it (non-lazy case)
> > * or freeing it directly (lazy case). Return true if lazy, false otherwise.
> > */
> > -static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head)
> > +static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head, void **kfree,
> > + unsigned int *idx)
> > {
> > unsigned long offset = (unsigned long)head->func;
> >
> > rcu_lock_acquire(&rcu_callback_map);
> > if (__is_kfree_rcu_offset(offset)) {
> > RCU_TRACE(trace_rcu_invoke_kfree_callback(rn, head, offset);)
> > - kfree((void *)head - offset);
> > + kfree[*idx++] = (void *)head - offset;
> > rcu_lock_release(&rcu_callback_map);
> > return true;
> > } else {
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index f9c0ca2ccf0c..7e13979b4697 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -2725,6 +2725,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > struct rcu_head *rhp;
> > struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
> > long bl, count;
> > + void *to_free[16];
> > + unsigned int to_free_idx = 0;
> >
> > /* If no callbacks are ready, just return. */
> > if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
> > @@ -2755,8 +2757,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > rhp = rcu_cblist_dequeue(&rcl);
> > for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
> > debug_rcu_head_unqueue(rhp);
> > - if (__rcu_reclaim(rsp->name, rhp))
> > + if (__rcu_reclaim(rsp->name, rhp, to_free, &to_free_idx))
> > rcu_cblist_dequeued_lazy(&rcl);
> > + if (to_free_idx == 16)
> > + kfree_bulk(16, to_free);
> > /*
> > * Stop only if limit reached and CPU has something to do.
> > * Note: The rcl structure counts down from zero.
> > @@ -2766,6 +2770,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
> > break;
> > }
> > + if (to_free_idx)
> > + kfree_bulk(to_free_idx, to_free);
> >
> > local_irq_save(flags);
> > count = -rcl.len;
> > diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
> > index db85ca3975f1..4127be06759b 100644
> > --- a/kernel/rcu/tree_plugin.h
> > +++ b/kernel/rcu/tree_plugin.h
> > @@ -2189,6 +2189,8 @@ static int rcu_nocb_kthread(void *arg)
> > struct rcu_head *next;
> > struct rcu_head **tail;
> > struct rcu_data *rdp = arg;
> > + void *to_free[16];
> > + unsigned int to_free_idx = 0;
> >
> > /* Each pass through this loop invokes one batch of callbacks */
> > for (;;) {
> > @@ -2226,13 +2228,18 @@ static int rcu_nocb_kthread(void *arg)
> > }
> > debug_rcu_head_unqueue(list);
> > local_bh_disable();
> > - if (__rcu_reclaim(rdp->rsp->name, list))
> > + if (__rcu_reclaim(rdp->rsp->name, list, to_free,
> > + &to_free_idx))
> > cl++;
> > c++;
> > + if (to_free_idx == 16)
> > + kfree_bulk(16, to_free);
> > local_bh_enable();
> > cond_resched_rcu_qs();
> > list = next;
> > }
> > + if (to_free_idx)
> > + kfree_bulk(to_free_idx, to_free);
> > trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
> > smp_mb__before_atomic(); /* _add after CB invocation. */
> > atomic_long_add(-c, &rdp->nocb_q_count);
> >
>
--
Best regards,
Jesper Dangaard Brouer
MSc.CS, Principal Kernel Engineer at Red Hat
LinkedIn: http://www.linkedin.com/in/brouer