Re: [PATCH v3 3/5] locking/qspinlock: Introduce CNA into the slow path of qspinlock

From: Peter Zijlstra
Date: Wed Jul 17 2019 - 04:40:03 EST


On Tue, Jul 16, 2019 at 08:47:24PM +0200, Peter Zijlstra wrote:
> On Tue, Jul 16, 2019 at 01:19:16PM -0400, Alex Kogan wrote:
> > > On Jul 16, 2019, at 11:50 AM, Peter Zijlstra <peterz@xxxxxxxxxxxxx> wrote:
>
> > > static void cna_move(struct cna_node *cn, struct cna_node *cni)
> > > {
> > > struct cna_node *head, *tail;
> > >
> > > /* remove @cni */
> > > WRITE_ONCE(cn->mcs.next, cni->mcs.next);
> > >
> > > /* stick @cni on the 'other' list tail */
> > > cni->mcs.next = NULL;
> > >
> > > if (cn->mcs.locked <= 1) {
> > > /* head = tail = cni */
> > > head = cni;
> > > head->tail = cni;
> > > cn->mcs.locked = head->encoded_tail;
> > > } else {
> > > /* add to tail */
> > > head = (struct cna_node *)decode_tail(cn->mcs.locked);
> > > tail = tail->tail;
> > > tail->next = cni;
> > > }
> > > }
> > >
> > > static struct cna_node *cna_find_next(struct mcs_spinlock *node)
> > > {
> > > struct cna_node *cni, *cn = (struct cna_node *)node;
> > >
> > > while ((cni = (struct cna_node *)READ_ONCE(cn->mcs.next))) {
> > > if (likely(cni->node == cn->node))
> > > break;
> > >
> > > cna_move(cn, cni);
> > > }
> > >
> > > return cni;
> > > }
> > But then you move nodes from the main list to the âotherâ list one-by-one.
> > Iâm afraid this would be unnecessary expensive.
> > Plus, all this extra work is wasted if you do not find a thread on the same
> > NUMA node (you move everyone to the âotherâ list only to move them back in
> > cna_mcs_pass_lock()).
>
> My primary concern was readability; I find the above suggestion much
> more readable. Maybe it can be written differently; you'll have to play
> around a bit.

static void cna_splice_tail(struct cna_node *cn, struct cna_node *head, struct cna_node *tail)
{
struct cna_node *list;

/* remove [head,tail] */
WRITE_ONCE(cn->mcs.next, tail->mcs.next);
tail->mcs.next = NULL;

/* stick [head,tail] on the secondary list tail */
if (cn->mcs.locked <= 1) {
/* create secondary list */
head->tail = tail;
cn->mcs.locked = head->encoded_tail;
} else {
/* add to tail */
list = (struct cna_node *)decode_tail(cn->mcs.locked);
list->tail->next = head;
list->tail = tail;
}
}

static struct cna_node *cna_find_next(struct mcs_spinlock *node)
{
struct cna_node *cni, *cn = (struct cna_node *)node;
struct cna_node *head, *tail = NULL;

/* find any next lock from 'our' node */
for (head = cni = (struct cna_node *)READ_ONCE(cn->mcs.next);
cni && cni->node != cn->node;
tail = cni, cni = (struct cna_node *)READ_ONCE(cni->mcs.next))
;

/* when found, splice any skipped locks onto the secondary list */
if (cni && tail)
cna_splice_tail(cn, head, tail);

return cni;
}

How's that?