Re: [PATCH v4 -rcu 3/4] rcu/segcblist: Add counters to segcblist datastructure

From: Paul E. McKenney
Date: Tue Aug 25 2020 - 17:53:42 EST


On Mon, Aug 24, 2020 at 10:48:41PM -0400, Joel Fernandes (Google) wrote:
> Add counting of segment lengths of segmented callback list.
>
> This will be useful for a number of things such as knowing how big the
> ready-to-execute segment have gotten. The immediate benefit is ability
> to trace how the callbacks in the segmented callback list change.
>
> Tested by profusely reading traces when segcblist counts updated.

Might I recommend a more repeatable and sustainable test methodology?
(Sorry, couldn't resist, given that you left yourself wide open for
that one...)

> Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
> ---
> include/linux/rcu_segcblist.h | 2 +
> kernel/rcu/rcu_segcblist.c | 82 +++++++++++++++++++++++++++++++++--
> 2 files changed, 81 insertions(+), 3 deletions(-)
>
> diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h
> index b36afe7b22c9..d462ae5e340a 100644
> --- a/include/linux/rcu_segcblist.h
> +++ b/include/linux/rcu_segcblist.h
> @@ -69,8 +69,10 @@ struct rcu_segcblist {
> unsigned long gp_seq[RCU_CBLIST_NSEGS];
> #ifdef CONFIG_RCU_NOCB_CPU
> atomic_long_t len;
> + atomic_long_t seglen[RCU_CBLIST_NSEGS];

What are the guarantees for access to seglen?

I get that the possibility of RCU callback offloading means that you
need atomic_long_t, but presumably you are only guaranteeing coherent
lockless access to the individual elements of the array, rather than
any coherent relationship among them or with ->len. Anything surprising
should go in the header comment blocks of the access functions.

And in case someone thinks that atomic_t would suffice, I have in the past
had list with in excess of 30M callbacks without RCU CPU stall warnings,
so 2B is not as large a number as you might think.

> #else
> long len;
> + long seglen[RCU_CBLIST_NSEGS];
> #endif
> u8 enabled;
> u8 offloaded;
> diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> index 076337ae2e50..73a103464ea4 100644
> --- a/kernel/rcu/rcu_segcblist.c
> +++ b/kernel/rcu/rcu_segcblist.c
> @@ -88,6 +88,62 @@ static void rcu_segcblist_set_len(struct rcu_segcblist *rsclp, long v)
> #endif
> }
>
> +/* Get the length of a segment of the rcu_segcblist structure. */
> +static long rcu_segcblist_get_seglen(struct rcu_segcblist *rsclp, int seg)
> +{
> +#ifdef CONFIG_RCU_NOCB_CPU
> + return atomic_long_read(&rsclp->seglen[seg]);
> +#else
> + return READ_ONCE(rsclp->seglen[seg]);
> +#endif
> +}
> +
> +/* Set the length of a segment of the rcu_segcblist structure. */
> +static void rcu_segcblist_set_seglen(struct rcu_segcblist *rsclp, int seg, long v)
> +{
> +#ifdef CONFIG_RCU_NOCB_CPU
> + atomic_long_set(&rsclp->seglen[seg], v);
> +#else
> + WRITE_ONCE(rsclp->seglen[seg], v);
> +#endif
> +}
> +
> +/* Return number of callbacks in a segment of the segmented callback list. */
> +static void rcu_segcblist_add_seglen(struct rcu_segcblist *rsclp, int seg, long v)
> +{
> +#ifdef CONFIG_RCU_NOCB_CPU
> + smp_mb__before_atomic(); /* Up to the caller! */
> + atomic_long_add(v, &rsclp->seglen[seg]);
> + smp_mb__after_atomic(); /* Up to the caller! */
> +#else
> + smp_mb(); /* Up to the caller! */
> + WRITE_ONCE(rsclp->seglen[seg], rsclp->seglen[seg] + v);
> + smp_mb(); /* Up to the caller! */
> +#endif
> +}
> +
> +/* Move from's segment length to to's segment. */
> +static void rcu_segcblist_move_seglen(struct rcu_segcblist *rsclp, int from, int to)
> +{
> + long len;
> +
> + if (from == to)
> + return;
> +
> + len = rcu_segcblist_get_seglen(rsclp, from);
> + if (!len)
> + return;

You don't need the blank lines on this small a function.

> + rcu_segcblist_add_seglen(rsclp, to, len);
> + rcu_segcblist_set_seglen(rsclp, from, 0);
> +}
> +
> +/* Increment segment's length. */
> +static void rcu_segcblist_inc_seglen(struct rcu_segcblist *rsclp, int seg)
> +{
> + rcu_segcblist_add_seglen(rsclp, seg, 1);
> +}
> +
> /*
> * Increase the numeric length of an rcu_segcblist structure by the
> * specified amount, which can be negative. This can cause the ->len
> @@ -149,8 +205,10 @@ void rcu_segcblist_init(struct rcu_segcblist *rsclp)
> BUILD_BUG_ON(RCU_NEXT_TAIL + 1 != ARRAY_SIZE(rsclp->gp_seq));
> BUILD_BUG_ON(ARRAY_SIZE(rsclp->tails) != ARRAY_SIZE(rsclp->gp_seq));
> rsclp->head = NULL;
> - for (i = 0; i < RCU_CBLIST_NSEGS; i++)
> + for (i = 0; i < RCU_CBLIST_NSEGS; i++) {
> rsclp->tails[i] = &rsclp->head;
> + rcu_segcblist_set_seglen(rsclp, i, 0);
> + }
> rcu_segcblist_set_len(rsclp, 0);
> rsclp->enabled = 1;
> }
> @@ -245,6 +303,7 @@ void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> struct rcu_head *rhp)
> {
> rcu_segcblist_inc_len(rsclp);
> + rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
> smp_mb(); /* Ensure counts are updated before callback is enqueued. */
> rhp->next = NULL;
> WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> @@ -274,6 +333,7 @@ bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
> for (i = RCU_NEXT_TAIL; i > RCU_DONE_TAIL; i--)
> if (rsclp->tails[i] != rsclp->tails[i - 1])
> break;
> + rcu_segcblist_inc_seglen(rsclp, i);
> WRITE_ONCE(*rsclp->tails[i], rhp);
> for (; i <= RCU_NEXT_TAIL; i++)
> WRITE_ONCE(rsclp->tails[i], &rhp->next);
> @@ -307,6 +367,7 @@ void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
>
> if (!rcu_segcblist_ready_cbs(rsclp))
> return; /* Nothing to do. */
> + rclp->len = rcu_segcblist_get_seglen(rsclp, RCU_DONE_TAIL);
> *rclp->tail = rsclp->head;
> WRITE_ONCE(rsclp->head, *rsclp->tails[RCU_DONE_TAIL]);
> WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL);
> @@ -314,6 +375,7 @@ void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
> for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--)
> if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL])
> WRITE_ONCE(rsclp->tails[i], &rsclp->head);
> + rcu_segcblist_set_seglen(rsclp, RCU_DONE_TAIL, 0);
> }
>
> /*
> @@ -330,11 +392,16 @@ void rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp,
>
> if (!rcu_segcblist_pend_cbs(rsclp))
> return; /* Nothing to do. */
> + rclp->len = rcu_segcblist_get_seglen(rsclp, RCU_WAIT_TAIL) +
> + rcu_segcblist_get_seglen(rsclp, RCU_NEXT_READY_TAIL) +
> + rcu_segcblist_get_seglen(rsclp, RCU_NEXT_TAIL);

A loop for future-proofness, please. The compiler will unroll it anyway.
Plus this is nowhere near a fastpath.

> *rclp->tail = *rsclp->tails[RCU_DONE_TAIL];
> rclp->tail = rsclp->tails[RCU_NEXT_TAIL];
> WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL);
> - for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++)
> + for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++) {
> WRITE_ONCE(rsclp->tails[i], rsclp->tails[RCU_DONE_TAIL]);
> + rcu_segcblist_set_seglen(rsclp, i, 0);
> + }
> }
>
> /*
> @@ -359,6 +426,7 @@ void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp,
>
> if (!rclp->head)
> return; /* No callbacks to move. */
> + rcu_segcblist_add_seglen(rsclp, RCU_DONE_TAIL, rclp->len);
> *rclp->tail = rsclp->head;
> WRITE_ONCE(rsclp->head, rclp->head);
> for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++)
> @@ -379,6 +447,8 @@ void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp,
> {
> if (!rclp->head)
> return; /* Nothing to do. */
> +

This only has five lines of code, so the blank line is superfluous.

> + rcu_segcblist_add_seglen(rsclp, RCU_NEXT_TAIL, rclp->len);
> WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rclp->head);
> WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], rclp->tail);
> }
> @@ -403,6 +473,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
> if (ULONG_CMP_LT(seq, rsclp->gp_seq[i]))
> break;
> WRITE_ONCE(rsclp->tails[RCU_DONE_TAIL], rsclp->tails[i]);
> + rcu_segcblist_move_seglen(rsclp, i, RCU_DONE_TAIL);
> }
>
> /* If no callbacks moved, nothing more need be done. */
> @@ -423,6 +494,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
> if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL])
> break; /* No more callbacks. */
> WRITE_ONCE(rsclp->tails[j], rsclp->tails[i]);
> + rcu_segcblist_move_seglen(rsclp, i, j);
> rsclp->gp_seq[j] = rsclp->gp_seq[i];
> }
> }
> @@ -444,7 +516,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
> */
> bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, unsigned long seq)
> {
> - int i;
> + int i, j;
>
> WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
> if (rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL))
> @@ -487,6 +559,10 @@ bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, unsigned long seq)
> if (rcu_segcblist_restempty(rsclp, i) || ++i >= RCU_NEXT_TAIL)
> return false;
>
> + /* Accounting: everything below i is about to get merged into i. */

How about "newer than i"? Many people would think of "below i" as being
numerically less than i, which is not what we need them to think.

> + for (j = i + 1; j <= RCU_NEXT_TAIL; j++)
> + rcu_segcblist_move_seglen(rsclp, j, i);

I thought about merging the above loop with the next one, but the added
check and the snapshotting of "i" makes it not worthwhile, so please
leave this loop separate, just as you now have it.

Thanx, Paul

> /*
> * Merge all later callbacks, including newly arrived callbacks,
> * into the segment located by the for-loop above. Assign "seq"
> --
> 2.28.0.297.g1956fa8f8d-goog
>