Re: [PATCH RFC] ext4: fix potential race between online resizing and write operations

From: Joel Fernandes
Date: Fri Feb 21 2020 - 08:28:22 EST


On Fri, Feb 21, 2020 at 07:06:18AM -0500, Joel Fernandes wrote:
> On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote:
> > On Mon, Feb 17, 2020 at 02:33:14PM -0500, Theodore Y. Ts'o wrote:
> > > On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote:
> > > > Hello, Joel, Paul, Ted.
> > > >
> > > > >
> > > > > Good point!
> > > > >
> > > > > Now that kfree_rcu() is on its way to being less of a hack deeply
> > > > > entangled into the bowels of RCU, this might be fairly easy to implement.
> > > > > It might well be simply a matter of a function pointer and a kvfree_rcu()
> > > > > API. Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.
> > > > >
> > > > I think it makes sense. For example i see there is a similar demand in
> > > > the mm/list_lru.c too. As for implementation, it will not be hard, i think.
> > > >
> > > > The easiest way is to inject kvfree() support directly into existing kfree_call_rcu()
> > > > logic(probably will need to rename that function), i.e. to free vmalloc() allocations
> > > > only in "emergency path" by just calling kvfree(). So that function in its turn will
> > > > figure out if it is _vmalloc_ address or not and trigger corresponding "free" path.
> > >
> > > The other difference between ext4_kvfree_array_rcu() and kfree_rcu()
> > > is that kfree_rcu() is a magic macro which frees a structure, which
> > > has to contain a struct rcu_head. In this case, I'm freeing a pointer
> > > to set of structures, or in another case, a set of buffer_heads, which
> > > do *not* have an rcu_head structure.
> > >
> > We can implement kvfree_rcu() that will deal with pointer only, it is not
> > an issue. I mean without embedding rcu_head to the structure or whatever
> > else.
> >
> > I tried to implement it with less number of changes to make it more clear
> > and readable. Please have a look:
> >
> > <snip>
> > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
>
> Overall this implementation is nice. You are basically avoiding allocating
> rcu_head like Ted did by using the array-of-pointers technique we used for
> the previous kfree_rcu() work.
>
> One thing stands out, the path where we could not allocate a page for the new
> block node:
>
> > @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > if (krcp->initialized)
> > spin_unlock(&krcp->lock);
> > local_irq_restore(flags);
> > +
> > + if (!skip_call_rcu) {
> > + synchronize_rcu();
> > + kvfree(ptr_to_free);
>
> We can't block, it has to be async otherwise everything else blocks, and I
> think this can also be used from interrupt handlers which would at least be
> an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object
> itself for the 'emergeny case' and use the regular techniques.
>
> Another thing that stands out is the code duplication, if we can make this
> reuse as much as of the previous code as possible, that'd be great. I'd like
> to avoid bvcached and bvhead if possible. Maybe we can store information
> about the fact that this is a 'special object' in some of the lower-order
> bits of the pointer. Then we can detect that it is 'special' and free it
> using kvfree() during the reclaim

I was thinking something like the following, only build-tested -- just to
show the idea. Perhaps the synchronize_rcu() should be done from a workqueue
handler to prevent IRQ crapping out?

Basically what I did different is:
1. Use the existing kfree_rcu_bulk_data::records array to store the
to-be-freed array.
2. In case of emergency, allocate a new wrapper and tag the pointer.
Read the tag later to figure its an array wrapper and do additional kvfree.

debug_objects bits wouldn't work obviously for the !emergency kvfree case,
not sure what we can do there.
---8<-----------------------

diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 2678a37c31696..19fd7c74ad532 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -805,7 +805,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void)
#define __kfree_rcu(head, offset) \
do { \
BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \
- kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \
+ kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset), NULL); \
} while (0)

/**
@@ -842,6 +842,14 @@ do { \
__kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \
} while (0)

+#define kvfree_rcu(ptr) \
+do { \
+ typeof (ptr) ___p = (ptr); \
+ \
+ if (___p) \
+ kfree_call_rcu(NULL, (rcu_callback_t)(unsigned long)(0), ___p); \
+} while (0)
+
/*
* Place this after a lock-acquisition primitive to guarantee that
* an UNLOCK+LOCK pair acts as a full barrier. This guarantee applies
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
index 045c28b71f4f3..a12ecc1645fa9 100644
--- a/include/linux/rcutiny.h
+++ b/include/linux/rcutiny.h
@@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void)
synchronize_rcu();
}

-static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
{
call_rcu(head, func);
}
diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
index 45f3f66bb04df..1e445b566c019 100644
--- a/include/linux/rcutree.h
+++ b/include/linux/rcutree.h
@@ -33,7 +33,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
}

void synchronize_rcu_expedited(void);
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr);

void rcu_barrier(void);
bool rcu_eqs_special_set(int cpu);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index ec81139cc4c6a..7b6ab4160f080 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2814,6 +2814,15 @@ struct kfree_rcu_cpu {
bool initialized;
};

+/*
+ * Used in a situation where array of pointers could
+ * not be put onto a kfree_bulk_data::records array.
+ */
+struct kfree_rcu_wrap_kvarray {
+ struct rcu_head head;
+ void *ptr;
+};
+
static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);

static __always_inline void
@@ -2873,12 +2882,25 @@ static void kfree_rcu_work(struct work_struct *work)
*/
for (; head; head = next) {
unsigned long offset = (unsigned long)head->func;
+ bool is_array_ptr = false;
+
+ if (((unsigned long)head - offset) & BIT(0)) {
+ is_array_ptr = true;
+ offset = offset - 1;
+ }

next = head->next;
- debug_rcu_head_unqueue(head);
+ if (!is_array_ptr)
+ debug_rcu_head_unqueue(head);
+
rcu_lock_acquire(&rcu_callback_map);
trace_rcu_invoke_kfree_callback(rcu_state.name, head, offset);

+ if (is_array_ptr) {
+ struct kfree_rcu_wrap_kvarray *kv = (void *)head - offset;
+ kvfree((void *)kv->ptr);
+ }
+
if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset)))
kfree((void *)head - offset);

@@ -2975,7 +2997,7 @@ static void kfree_rcu_monitor(struct work_struct *work)

static inline bool
kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
- struct rcu_head *head, rcu_callback_t func)
+ struct rcu_head *head, rcu_callback_t func, void *ptr)
{
struct kfree_rcu_bulk_data *bnode;

@@ -3009,14 +3031,20 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
}

#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
- head->func = func;
- head->next = krcp->bhead->head_free_debug;
- krcp->bhead->head_free_debug = head;
+ /* debug_objects doesn't work for kvfree */
+ if (!ptr) {
+ head->func = func;
+ head->next = krcp->bhead->head_free_debug;
+ krcp->bhead->head_free_debug = head;
+ }
#endif

/* Finally insert. */
- krcp->bhead->records[krcp->bhead->nr_records++] =
- (void *) head - (unsigned long) func;
+ if (ptr)
+ krcp->bhead->records[krcp->bhead->nr_records++] = ptr;
+ else
+ krcp->bhead->records[krcp->bhead->nr_records++] =
+ (void *) head - (unsigned long) func;

return true;
}
@@ -3033,10 +3061,11 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
* be free'd in workqueue context. This allows us to: batch requests together to
* reduce the number of grace periods during heavy kfree_rcu() load.
*/
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
{
unsigned long flags;
struct kfree_rcu_cpu *krcp;
+ bool ret;

local_irq_save(flags); // For safely calling this_cpu_ptr().
krcp = this_cpu_ptr(&krc);
@@ -3044,7 +3073,8 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
spin_lock(&krcp->lock);

// Queue the object but don't yet schedule the batch.
- if (debug_rcu_head_queue(head)) {
+ // NOTE: debug objects doesn't work for kvfree.
+ if (!ptr && debug_rcu_head_queue(head)) {
// Probable double kfree_rcu(), just leak.
WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
__func__, head);
@@ -3055,7 +3085,29 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
* Under high memory pressure GFP_NOWAIT can fail,
* in that case the emergency path is maintained.
*/
- if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
+ ret = !kfree_call_rcu_add_ptr_to_bulk(krcp, head, func, ptr);
+ if (unlikely(!ret)) {
+ if (ptr) {
+ struct kfree_rcu_wrap_kvarray *kvwrap;
+
+ kvwrap = kzalloc(sizeof(*kvwrap), GFP_KERNEL);
+
+ // If memory is really low, just try inline-freeing.
+ if (!kvwrap) {
+ // NOT SURE if permitted due to IRQ. Maybe we
+ // should try doing this from WQ?
+ synchronize_rcu();
+ kvfree(ptr);
+ }
+
+ kvwrap->ptr = ptr;
+ ptr = NULL;
+ head = &(kvwrap->head);
+ func = offsetof(typeof(*kvwrap), head);
+ // Tag the array as wrapper
+ func = (rcu_callback_t)((unsigned long)func + 1);
+ }
+
head->func = func;
head->next = krcp->head;
krcp->head = head;