Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses

From: Boris Ostrovsky
Date: Tue Jan 10 2017 - 17:56:31 EST





> diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c
> index ebc768f..ebdfbee 100644
> --- a/drivers/xen/xenbus/xenbus_xs.c
> +++ b/drivers/xen/xenbus/xenbus_xs.c


> -
> -static struct xs_handle xs_state;
> +/*
> + * Framework to protect suspend/resume handling against normal Xenstore
> + * message handling:
> + * During suspend/resume there must be no open transaction and no pending
> + * Xenstore request.
> + * New watch events happening in this time can be ignored by firing all watches
> + * after resume.
> + */
> +/* Lock protecting enter/exit critical region. */
> +static DEFINE_SPINLOCK(xs_state_lock);
> +/* Wait queue for all callers waiting for critical region to become usable. */
> +static DECLARE_WAIT_QUEUE_HEAD(xs_state_enter_wq);
> +/* Wait queue for suspend handling waiting for critical region being empty. */
> +static DECLARE_WAIT_QUEUE_HEAD(xs_state_exit_wq);
> +/* Number of users in critical region. */
> +static unsigned int xs_state_users;
> +/* Suspend handler waiting or already active? */
> +static int xs_suspend_active;

I think these two should be declared next to xs_state _lock since they
are protected by it. Or maybe even put them into some sort of a state
struct.


> +
> +
> +static bool test_reply(struct xb_req_data *req)
> +{
> + if (req->state == xb_req_state_got_reply || !xenbus_ok())
> + return true;
> +
> + /* Make sure to reread req->state each time. */
> + cpu_relax();

I don't think I understand why this is needed.

> +
> + return false;
> +}
> +


> +static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg)
> {
> - mutex_lock(&xs_state.transaction_mutex);
> - atomic_inc(&xs_state.transaction_count);
> - mutex_unlock(&xs_state.transaction_mutex);
> -}
> + bool notify;
>
> -static void transaction_end(void)
> -{
> - if (atomic_dec_and_test(&xs_state.transaction_count))
> - wake_up(&xs_state.transaction_wq);
> -}
> + req->msg = *msg;
> + req->err = 0;
> + req->state = xb_req_state_queued;
> + init_waitqueue_head(&req->wq);
>
> -static void transaction_suspend(void)
> -{
> - mutex_lock(&xs_state.transaction_mutex);
> - wait_event(xs_state.transaction_wq,
> - atomic_read(&xs_state.transaction_count) == 0);
> -}
> + xs_request_enter(req);
>
> -static void transaction_resume(void)
> -{
> - mutex_unlock(&xs_state.transaction_mutex);
> + req->msg.req_id = xs_request_id++;

Is it safe to do this without a lock?

> +
> +int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par)
> +{
> + struct xb_req_data *req;
> + struct kvec *vec;
> +
> + req = kmalloc(sizeof(*req) + sizeof(*vec), GFP_KERNEL);

Is there a reason why you are using different flags here?

> @@ -263,11 +295,20 @@ static void *xs_talkv(struct xenbus_transaction t,
> unsigned int num_vecs,
> unsigned int *len)
> {
> + struct xb_req_data *req;
> struct xsd_sockmsg msg;
> void *ret = NULL;
> unsigned int i;
> int err;
>
> + req = kmalloc(sizeof(*req), GFP_NOIO | __GFP_HIGH);
> + if (!req)
> + return ERR_PTR(-ENOMEM);
> +
> + req->vec = iovec;
> + req->num_vecs = num_vecs;
> + req->cb = xs_wake_up;
> +
> msg.tx_id = t.id;
> msg.req_id = 0;

Is this still needed? You are assigning it in xs_send().

> +static int xs_reboot_notify(struct notifier_block *nb,
> + unsigned long code, void *unused)
> {
> - struct xs_stored_msg *msg;



> + struct xb_req_data *req;
> +
> + mutex_lock(&xb_write_mutex);
> + list_for_each_entry(req, &xs_reply_list, list)
> + wake_up(&req->wq);
> + list_for_each_entry(req, &xb_write_list, list)
> + wake_up(&req->wq);

We are waking up waiters here but there is not guarantee that waiting
threads will have a chance to run, is there?


-boris