Re: [PATCH POC] printk_ringbuffer: Alternative implementation of lockless printk ringbuffer

From: Petr Mladek
Date: Tue Jul 09 2019 - 07:58:22 EST


On Tue 2019-07-09 12:21:01, John Ogness wrote:
> On 2019-07-09, Petr Mladek <pmladek@xxxxxxxx> wrote:
> >>>> 1. The code claims that the cmpxchg(seq_newest) in
> >>>> prb_reserve_desc() guarantees that "The descriptor is ours until
> >>>> the COMMITTED bit is set." This is not true if in that wind
> >>>> seq_newest wraps, allowing another writer to gain ownership of the
> >>>> same descriptor. For small descriptor arrays (such as in my test
> >>>> module), this situation is quite easy to reproduce.
> >>>
> >> Let me inline the function are talking about and add commentary to
> >> illustrate what I am saying:
> >>
> >> static int prb_reserve_desc(struct prb_reserved_entry *entry)
> >> {
> >> unsigned long seq, seq_newest, seq_prev_wrap;
> >> struct printk_ringbuffer *rb = entry->rb;
> >> struct prb_desc *desc;
> >> int err;
> >>
> >> /* Get descriptor for the next sequence number. */
> >> do {
> >> seq_newest = READ_ONCE(rb->seq_newest);
> >> seq = (seq_newest + 1) & PRB_SEQ_MASK;
> >> seq_prev_wrap = (seq - PRB_DESC_SIZE(rb)) & PRB_SEQ_MASK;
> >>
> >> /*
> >> * Remove conflicting descriptor from the previous wrap
> >> * if ever used. It might fail when the related data
> >> * have not been committed yet.
> >> */
> >> if (seq_prev_wrap == READ_ONCE(rb->seq_oldest)) {
> >> err = prb_remove_desc_oldest(rb, seq_prev_wrap);
> >> if (err)
> >> return err;
> >> }
> >> } while (cmpxchg(&rb->seq_newest, seq_newest, seq) != seq_newest);
> >>
> >> I am referring to this point in the code, after the
> >> cmpxchg(). seq_newest has been incremented but the descriptor is
> >> still in the unused state and seq is still 1 wrap behind. If an NMI
> >> occurs here and the NMI (or some other CPU) inserts enough entries to
> >> wrap the descriptor array, this descriptor will be reserved again,
> >> even though it has already been reserved.
> >
> > Not really, the NMI will not reach the cmpxchg() in this case.
> > prb_remove_desc_oldest() will return error.
>
> Why will prb_remove_desc_oldest() fail? IIUC, it will return success
> because the descriptor is in the desc_miss state.
>
> > It will not be able to remove the conflicting descriptor because
> > it will still be occupied by a two-wraps-old descriptor.

Ah, I see that this situation was not handled correctly.
But it can get fixed pretty easily, see an updated
prb_remove_desc_oldest() at the end of the mail.


> Please explain why with more details. Perhaps providing a function call
> chain? Sorry if I'm missing the obvious here.

To be on the safe side, let's try it with real numbers.

Let's have array with 8 descriptors filled with the following
sequence numbers pointing to commited messages:

desc[10]: 16 17 18 19 20 21 22 23
rb->seq_oldest = 16;
rb->seq_newest = 23;

then prb_reserve_desc() would do:

seq_newest = 23;
seq = 24;
seq_prew_wrap = 16;

prb_remove_desc_oldest(rb, 16);

// let's say that it succeeds and
// rb->seq_oldest == 17;

cmpxchg(&rb->seq_newest, 23, 24) == 23)

// let's say that it succeds and it is immediately
// interrupted by NMI before desc[0]->dst is set to 24.
// So, we still have:

desc[10]: 16 17 18 19 20 21 22 23
rb->seq_oldest = 17;
rb->seq_newest = 24;

Let's say that NMI tries to print 8 messages.
After 7 successfully reserved and commited messages
we could have:

desc[10]: 16 25 26 27 28 29 30 31
rb->seq_oldest = 24;
rb->seq_newest = 31;

desc[0] still has the outdated information. Now, we try to reserve
the 8th message, then prb_reserve_desc() would do:

seq_newest = 31;
seq = 32;
seq_prew_wrap = 24;

prb_remove_desc_oldest(rb, 24);

desc_state = prb_read_desc(rb, 24, &desc);

// desc_state == desc_miss because the
// descriptor still points to the outdated
// seq = 16.

// prb_remove_desc_oldest(rb, 24) would continue with:
switch (desc_state) {
/*
* Another seq means that the oldest desciptor has already been
* removed and reused. Return success in this case.
*/
case desc_miss:
return 0;

BUG: This is obviously wrong!

But this is a special case that can get detected. desc->seq is exactly
1 wrap back than requested. The proper code would be:

static int prb_remove_desc_oldest(struct printk_ringbuffer *rb,
unsigned long seq_oldest)
{
struct prb_desc desc;
enum prb_desc_state desc_state;
int err;

desc_state = prb_read_desc(rb, seq_oldest, &desc);
switch (desc_state) {
case desc_miss:
unsigned long seq_prev_wrap =
(seq_oldest - PRB_DESC_SIZE(rb)) &
PRB_SEQ_MASK;

if (desc->dst ==
(seq_prev_wrap |
PRB_COMMITED_MASK |
PRB_REUSABLE_MASK)) {
/*
* Special case: Reusable descriptor from the
* previous wrap means that the current
* oldest descriptor is reserved but the dst
* has not been updated yet.
*/
return -ENOMEM;

/*
* Any other desc_misc means that the oldest
* has been already removed and reused by a newer
* sequence number. Return success in this case.
* The attempt to update rb->seq_oldest will fail.
*/
return 0;

At this point, any prb_reserve() would fail exactly this way
until NMI returns and the message with seq == 24 gets commited.

Best Regards,
Petr