On Tue, Jan 26, 2016 at 11:03:37AM -0500, Waiman Long wrote:
+static __always_inline void _list_batch_cmd(enum list_batch_cmd cmd,Maybe use switch(), GCC has fancy warns with enums and switch().
+ struct list_head *head,
+ struct list_head *entry)
+{
+ if (cmd == lb_cmd_add)
+ list_add(entry, head);
+ else if (cmd == lb_cmd_del)
+ list_del(entry);
+ else /* cmd == lb_cmd_del_init */
+ list_del_init(entry);
+}This is still quite a lot of code for an inline function
+static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
+ struct list_batch *batch,
+ struct list_head *entry)
+{
+ /*
+ * Fast path
+ */
+ if (spin_trylock(lock)) {
+ _list_batch_cmd(cmd, batch->list, entry);
+ spin_unlock(lock);
_list_batch_cmd
+ return;
+ }
+ do_list_batch_slowpath(cmd, lock, batch, entry);
+}
+void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,Here we rely on the release barrier implied by xchg() to ensure the node
+ struct list_batch *batch, struct list_head *entry)
+{
+ struct list_batch_qnode node, *prev, *next, *nptr;
+ int loop;
+
+ /*
+ * Put itself into the list_batch queue
+ */
+ node.next = NULL;
+ node.entry = entry;
+ node.cmd = cmd;
+ node.state = lb_state_waiting;
+
initialization is complete before the xchg() publishes the thing.
But do we also need the acquire part of this barrier? From what I could
tell, the primitive as a whole does not imply any ordering.
+ prev = xchg(&batch->tail,&node);Have you tried different sizes?
+
+ if (prev) {
+ WRITE_ONCE(prev->next,&node);
+ while (READ_ONCE(node.state) == lb_state_waiting)
+ cpu_relax();
+ if (node.state == lb_state_done)
+ return;
+ WARN_ON(node.state != lb_state_batch);
+ }
+
+ /*
+ * We are now the queue head, we shold now acquire the lock and
+ * process a batch of qnodes.
+ */
+ loop = LB_BATCH_SIZE;
+ next =&node;for what? :-)
+ spin_lock(lock);
+
+do_list_again:
+ do {
+ nptr = next;
+ _list_batch_cmd(nptr->cmd, batch->list, nptr->entry);
+ next = READ_ONCE(nptr->next);
+ /*
+ * As soon as the state is marked lb_state_done, we
+ * can no longer assume the content of *nptr as valid.
+ * So we have to hold off marking it done until we no
+ * longer need its content.
+ *
+ * The release barrier here is to make sure that we
+ * won't access its content after marking it done.
+ */
+ if (next)
+ smp_store_release(&nptr->state, lb_state_done);
+ } while (--loop&& next);
+ if (!next) {
+ /*
+ * The queue tail should equal to nptr, so clear it to
+ * mark the queue as empty.
+ */
+ if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
+ /*
+ * Queue not empty, wait until the next pointer is
+ * initialized.
+ */
+ while (!(next = READ_ONCE(nptr->next)))
+ cpu_relax();
+ }
+ /* The above cmpxchg acts as a memory barrier */
Also, if that cmpxchg() fails, it very much does _not_ act as one.
I suspect you want smp_store_release() setting the state_done, just as
above, and then use cmpxchg_relaxed().
+ WRITE_ONCE(nptr->state, lb_state_done);
+ }
+ if (next) {
+ if (loop)
+ goto do_list_again; /* More qnodes to process */
+ /*
+ * Mark the next qnode as head to process the next batch
+ * of qnodes. The new queue head cannot proceed until we
+ * release the lock.
+ */
+ WRITE_ONCE(next->state, lb_state_batch);
+ }
+ spin_unlock(lock);
+}
+EXPORT_SYMBOL_GPL(do_list_batch_slowpath);