Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
From: Boris Ostrovsky
Date: Mon Jan 09 2017 - 16:17:18 EST
On 01/06/2017 10:05 AM, Juergen Gross wrote:
> Handling of multiple concurrent Xenstore accesses through xenbus driver
> either from the kernel or user land is rather lame today: xenbus is
> capable to have one access active only at one point of time.
>
> Rewrite xenbus to handle multiple requests concurrently by making use
> of the request id of the Xenstore protocol. This requires to:
>
> - Instead of blocking inside xb_read() when trying to read data from
> the xenstore ring buffer do so only in the main loop of
> xenbus_thread().
>
> - Instead of doing writes to the xenstore ring buffer in the context of
> the caller just queue the request and do the write in the dedicated
> xenbus thread.
>
> - Instead of just forwarding the request id specified by the caller of
> xenbus to xenstore use a xenbus internal unique request id. This will
> allow multiple outstanding requests.
>
> - Modify the locking scheme in order to allow multiple requests being
> active in parallel.
>
> - Instead of waiting for the reply of a user's xenstore request after
> writing the request to the xenstore ring buffer return directly to
> the caller and do the waiting in the read path.
>
> Additionally signal handling was optimized by avoiding waking up the
> xenbus thread or sending an event to Xenstore in case the addressed
> entity is known to be running already.
>
> As a result communication with Xenstore is sped up by a factor of up
> to 5: depending on the request type (read or write) and the amount of
> data transferred the gain was at least 20% (small reads) and went up to
> a factor of 5 for large writes.
>
> In the end some more rough edges of xenbus have been smoothed:
>
> - Handling of memory shortage when reading from xenstore ring buffer in
> the xenbus driver was not optimal: it was busy looping and issuing a
> warning in each loop.
>
> - In case of xenstore not running in dom0 but in a stubdom we end up
> with two xenbus threads running as the initialization of xenbus in
> dom0 expecting a local xenstored will be redone later when connecting
> to the xenstore domain. Up to now this was no problem as locking
> would prevent the two xenbus threads interfering with each other, but
> this was just a waste of kernel resources.
>
> - An out of memory situation while writing to or reading from the
> xenstore ring buffer no longer will lead to a possible loss of
> synchronization with xenstore.
>
> - The user read and write part are now interruptible by signals.
>
> Signed-off-by: Juergen Gross <jgross@xxxxxxxx>
> ---
> I'm aware that the changes are quite large. I thought about sending a
> version split into multiple patches, but a lot of lines would have been
> touched by more than one patch. I still have the multiple patch variant
> lying around - this patch is split into 11 smaller ones. While all
> steps of this larger series is operational some steps are not optimal
> as they are even slower than the original version of xenbus.
>
> Nevertheless I can send the large series if there are requests for it.
I will comment only on xen_comms changes for now since otherwise I am
afraid it may be difficult to keep track of conversation.
> diff --git a/drivers/xen/xenbus/xenbus_comms.c b/drivers/xen/xenbus/xenbus_comms.c
> index c21ec02..fa054ca 100644
> --- a/drivers/xen/xenbus/xenbus_comms.c
> +++ b/drivers/xen/xenbus/xenbus_comms.c
> @@ -34,6 +34,7 @@
>
> #include <linux/wait.h>
> #include <linux/interrupt.h>
> +#include <linux/kthread.h>
> #include <linux/sched.h>
> #include <linux/err.h>
> #include <xen/xenbus.h>
> @@ -42,11 +43,40 @@
> #include <xen/page.h>
> #include "xenbus.h"
>
> +struct xs_thread_state_write {
> + struct xb_req_data *req;
> + int idx;
> + unsigned int used;
"written" or "sent"?
> +};
> +
> +struct xs_thread_state_read {
> + struct xsd_sockmsg msg;
> + char *body;
> + union {
> + void *alloc;
> + struct xs_watch_event *watch;
> + };
> + bool in_msg;
> + bool in_hdr;
It may be better to keep track of which state we are in using a bitmap.
Otherwise it easy to lose track of one or the other.
> + unsigned int used;
"read" or"received"?
> +};
Both of these are private to process_msg/process_write so perhaps they
can be declared in those routines' scopes.
> +
> +/* A list of replies. Currently only one will ever be outstanding. */
> +LIST_HEAD(xs_reply_list);
> +
> +/* A list of write requests. */
> +LIST_HEAD(xb_write_list);
> +DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
> +DEFINE_MUTEX(xb_write_mutex);
> +
> +/* Protect xenbus reader thread against save/restore. */
> +DEFINE_MUTEX(xs_response_mutex);
> +
> static int xenbus_irq;
> +static struct task_struct *xenbus_task;
>
> static DECLARE_WORK(probe_work, xenbus_probe);
>
> -static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
>
> static irqreturn_t wake_waiting(int irq, void *unused)
> {
> @@ -84,30 +114,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX cons,
> return buf + MASK_XENSTORE_IDX(cons);
> }
>
> +static int xb_data_to_write(void)
> +{
> + struct xenstore_domain_interface *intf = xen_store_interface;
> +
> + return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE &&
> + !list_empty(&xb_write_list);
> +}
> +
> /**
> * xb_write - low level write
> * @data: buffer to send
> * @len: length of buffer
> *
> - * Returns 0 on success, error otherwise.
> + * Returns number of bytes written or -err.
> */
> -int xb_write(const void *data, unsigned len)
> +static int xb_write(const void *data, unsigned int len)
> {
> struct xenstore_domain_interface *intf = xen_store_interface;
> XENSTORE_RING_IDX cons, prod;
> - int rc;
> + unsigned int bytes = 0;
>
> while (len != 0) {
> void *dst;
> unsigned int avail;
>
> - rc = wait_event_interruptible(
> - xb_waitq,
> - (intf->req_prod - intf->req_cons) !=
> - XENSTORE_RING_SIZE);
> - if (rc < 0)
> - return rc;
> -
> /* Read indexes, then verify. */
> cons = intf->req_cons;
> prod = intf->req_prod;
> @@ -115,59 +146,57 @@ int xb_write(const void *data, unsigned len)
> intf->req_cons = intf->req_prod = 0;
> return -EIO;
> }
> -
> - dst = get_output_chunk(cons, prod, intf->req, &avail);
> - if (avail == 0)
> - continue;
> - if (avail > len)
> - avail = len;
> + if (!xb_data_to_write())
> + return bytes;
>
> /* Must write data /after/ reading the consumer index. */
> virt_mb();
>
> + dst = get_output_chunk(cons, prod, intf->req, &avail);
> + if (avail == 0)
> + continue;
Should we continue the loop here or return? We are waiting for the
reader to get stuff off the ring.
> + if (avail > len)
> + avail = len;
> +
> memcpy(dst, data, avail);
> data += avail;
> len -= avail;
> + bytes += avail;
>
> /* Other side must not see new producer until data is there. */
> virt_wmb();
> intf->req_prod += avail;
>
> /* Implies mb(): other side will see the updated producer. */
> - notify_remote_via_evtchn(xen_store_evtchn);
> + if (prod <= intf->req_cons)
> + notify_remote_via_evtchn(xen_store_evtchn);
> }
>
> - return 0;
> + return bytes;
> }
>
> -int xb_data_to_read(void)
> +static int xb_data_to_read(void)
> {
> struct xenstore_domain_interface *intf = xen_store_interface;
> return (intf->rsp_cons != intf->rsp_prod);
> }
>
> -int xb_wait_for_data_to_read(void)
> -{
> - return wait_event_interruptible(xb_waitq, xb_data_to_read());
> -}
> -
> -int xb_read(void *data, unsigned len)
> +static int xb_read(void *data, unsigned int len)
> {
> struct xenstore_domain_interface *intf = xen_store_interface;
> XENSTORE_RING_IDX cons, prod;
> - int rc;
> + unsigned int bytes = 0;
>
> while (len != 0) {
> unsigned int avail;
> const char *src;
>
> - rc = xb_wait_for_data_to_read();
> - if (rc < 0)
> - return rc;
> -
> /* Read indexes, then verify. */
> cons = intf->rsp_cons;
> prod = intf->rsp_prod;
> + if (cons == prod)
> + return bytes;
> +
> if (!check_indexes(cons, prod)) {
> intf->rsp_cons = intf->rsp_prod = 0;
> return -EIO;
> @@ -185,17 +214,229 @@ int xb_read(void *data, unsigned len)
> memcpy(data, src, avail);
> data += avail;
> len -= avail;
> + bytes += avail;
>
> /* Other side must not see free space until we've copied out */
> virt_mb();
> intf->rsp_cons += avail;
>
> - pr_debug("Finished read of %i bytes (%i to go)\n", avail, len);
> -
> /* Implies mb(): other side will see the updated consumer. */
> - notify_remote_via_evtchn(xen_store_evtchn);
> + if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE)
> + notify_remote_via_evtchn(xen_store_evtchn);
> }
>
> + return bytes;
> +}
> +
> +static int process_msg(void)
> +{
> + static struct xs_thread_state_read state;
> + struct xb_req_data *req;
> + int err;
> + unsigned int len;
> +
> + if (!state.in_msg) {
> + state.in_msg = true;
> + state.in_hdr = true;
> + state.used = 0;
> +
> + /*
> + * We must disallow save/restore while reading a message.
> + * A partial read across s/r leaves us out of sync with
> + * xenstored.
> + */
> + mutex_lock(&xs_response_mutex);
> +
> + if (!xb_data_to_read()) {
> + /* We raced with save/restore: pending data 'gone'. */
> + mutex_unlock(&xs_response_mutex);
> + state.in_msg = false;
> + return 0;
> + }
> + }
> +
> + if (state.in_hdr) {
> + if (state.used != sizeof(state.msg)) {
> + err = xb_read((void *)&state.msg + state.used,
> + sizeof(state.msg) - state.used);
> + if (err < 0)
> + goto out;
> + state.used += err;
> + if (state.used != sizeof(state.msg))
> + return 0;
Would it be possible to do locking at the caller? I understand that you
are trying to hold the lock across multiple invocations of this function
but it feels somewhat counter-intuitive and bug-prone.
If it's not possible then at least please add a comment explaining
locking algorithm.
> + if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
> + err = -EINVAL;
> + goto out;
> + }
> + }
> +
> + len = state.msg.len + 1;
> + if (state.msg.type == XS_WATCH_EVENT)
> + len += sizeof(*state.watch);
> +
> + state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
Why can't you kmalloc to state.body only when type!=XS_WATCH_EVENT ?
> + if (!state.alloc)
> + return -ENOMEM;
> +
> + if (state.msg.type == XS_WATCH_EVENT)
> + state.body = state.watch->body;
> + else
> + state.body = state.alloc;
> + state.in_hdr = false;
> + state.used = 0;
> + }
> +
> + err = xb_read(state.body + state.used, state.msg.len - state.used);
> + if (err < 0)
> + goto out;
> +
> + state.used += err;
> + if (state.used != state.msg.len)
> + return 0;
> +
> + state.body[state.msg.len] = '\0';
> +
> + if (state.msg.type == XS_WATCH_EVENT) {
> + state.watch->len = state.msg.len;
> + err = xs_watch_msg(state.watch);
> + } else {
> + err = -ENOENT;
> + mutex_lock(&xb_write_mutex);
> + list_for_each_entry(req, &xs_reply_list, list) {
> + if (req->msg.req_id == state.msg.req_id) {
> + if (req->state == xb_req_state_wait_reply) {
> + req->msg.type = state.msg.type;
> + req->msg.len = state.msg.len;
> + req->body = state.body;
> + req->state = xb_req_state_got_reply;
> + list_del(&req->list);
> + req->cb(req);
> + } else {
> + list_del(&req->list);
> + kfree(req);
> + }
> + err = 0;
> + break;
> + }
> + }
> + mutex_unlock(&xb_write_mutex);
> + if (err)
> + goto out;
> + }
> +
> + mutex_unlock(&xs_response_mutex);
> +
> + state.in_msg = false;
> + state.alloc = NULL;
> + return err;
> +
> + out:
> + mutex_unlock(&xs_response_mutex);
> + state.in_msg = false;
> + kfree(state.alloc);
> + state.alloc = NULL;
> + return err;
> +}
> +
> +static int process_writes(void)
> +{
> + static struct xs_thread_state_write state;
> + void *base;
> + unsigned int len;
> + int err = 0;
> +
> + if (!xb_data_to_write())
> + return 0;
> +
> + mutex_lock(&xb_write_mutex);
> +
> + if (!state.req) {
> + state.req = list_first_entry(&xb_write_list,
> + struct xb_req_data, list);
> + state.idx = -1;
> + state.used = 0;
> + }
> +
> + if (state.req->state == xb_req_state_aborted)
> + goto out_err;
> +
> + while (state.idx < state.req->num_vecs) {
> + if (state.idx < 0) {
> + base = &state.req->msg;
> + len = sizeof(state.req->msg);
> + } else {
> + base = state.req->vec[state.idx].iov_base;
> + len = state.req->vec[state.idx].iov_len;
> + }
> + err = xb_write(base + state.used, len - state.used);
> + if (err < 0)
> + goto out_err;
> + state.used += err;
> + if (state.used != len)
> + goto out;
> +
> + state.idx++;
> + state.used = 0;
> + }
> +
> + /*
> + * You would expect the following to be racy, but as the response is
> + * being read by our thread there is no risk of req being freed
> + * under our feet.
> + */
I don't think I understand this (and it's missing a "so" or something
like that between "thread" and "there"). If this is not racy, why are we
doing this under xb_write_mutex?
> + list_del(&state.req->list);
> + state.req->state = xb_req_state_wait_reply;
> + list_add_tail(&state.req->list, &xs_reply_list);
> + state.req = NULL;
> +
> + out:
> + mutex_unlock(&xb_write_mutex);
> +
> + return 0;
> +
> + out_err:
> + state.req->msg.type = XS_ERROR;
> + state.req->err = err;
You don't seem to need this for xb_req_state_aborted since you are
freeing state_req. OTOH, why shouldn't aborted requests generate an
error reply as well?
> + list_del(&state.req->list);
> + if (state.req->state == xb_req_state_aborted)
> + kfree(state.req);
> + else {
> + state.req->state = xb_req_state_got_reply;
> + wake_up(&state.req->wq);
> + }
> +
> + mutex_unlock(&xb_write_mutex);
> +
> + state.req = NULL;
> +
> + return err;
> +}
> +
> +static int xb_thread_work(void)
> +{
> + return xb_data_to_read() || xb_data_to_write();
> +}
> +
> +static int xenbus_thread(void *unused)
> +{
> + int err;
> +
> + while (!kthread_should_stop()) {
> + if (wait_event_interruptible(xb_waitq, xb_thread_work()))
> + continue;
> +
> + err = process_msg();
> + if (err == -ENOMEM)
> + schedule();
> + else if (err)
> + pr_warn("error %d while reading message\n", err);
> +
> + err = process_writes();
> + if (err)
> + pr_warn("error %d while writing message\n", err);
Is there a chance that errors are persistent and you then spam the log?
-boris
> + }
> +
> + xenbus_task = NULL;
> return 0;
> }
>
> @@ -223,6 +464,7 @@ int xb_init_comms(void)
> rebind_evtchn_irq(xen_store_evtchn, xenbus_irq);
> } else {
> int err;
> +
> err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting,
> 0, "xenbus", &xb_waitq);
> if (err < 0) {
> @@ -231,6 +473,13 @@ int xb_init_comms(void)
> }
>
> xenbus_irq = err;
> +
> + if (!xenbus_task) {
> + xenbus_task = kthread_run(xenbus_thread, NULL,
> + "xenbus");
> + if (IS_ERR(xenbus_task))
> + return PTR_ERR(xenbus_task);
> + }
> }
>
> return 0;
>
>
>
>
>