Re: [PATCH v2 2/2] block: virtio-blk: support multi virt queues per virtio-blk device

From: Ming Lei
Date: Thu Jun 26 2014 - 04:23:37 EST


On Thu, Jun 26, 2014 at 3:45 PM, Michael S. Tsirkin <mst@xxxxxxxxxx> wrote:
> On Thu, Jun 26, 2014 at 10:08:46AM +0800, Ming Lei wrote:
>> Firstly this patch supports more than one virtual queues for virtio-blk
>> device.
>>
>> Secondly this patch maps the virtual queue to blk-mq's hardware queue.
>>
>> With this approach, both scalability and performance can be improved.
>>
>> Signed-off-by: Ming Lei <ming.lei@xxxxxxxxxxxxx>
>> ---
>> drivers/block/virtio_blk.c | 109 ++++++++++++++++++++++++++++++++++++--------
>> 1 file changed, 89 insertions(+), 20 deletions(-)
>>
>> diff --git a/drivers/block/virtio_blk.c b/drivers/block/virtio_blk.c
>> index f63d358..b0a49a0 100644
>> --- a/drivers/block/virtio_blk.c
>> +++ b/drivers/block/virtio_blk.c
>> @@ -21,11 +21,14 @@ static DEFINE_IDA(vd_index_ida);
>>
>> static struct workqueue_struct *virtblk_wq;
>>
>> +struct virtio_blk_vq {
>> + struct virtqueue *vq;
>> + spinlock_t lock;
>> +} ____cacheline_aligned_in_smp;
>> +
>
> Padding wastes a hot cacheline here.
> What about this patch I sent:
>
> virtio-blk: move spinlock to vq itself
>
> Signed-off-by: Michael S. Tsirkin <mst@xxxxxxxxxx>
>
> Rusty didn't respond, try including it as 1/3 in your patchset
> and we'll see if anyone objects?

I think your patch is fine, but I'd like to follow current virtio vq's
lock rule.

Your patch should not be part of this patchset because
you introduce one spinlock inside vq, and you need to
replace other virtio devices' per-vq lock to the builtin lock
too in your patchset.

>
>
>> struct virtio_blk
>> {
>> struct virtio_device *vdev;
>> - struct virtqueue *vq;
>> - spinlock_t vq_lock;
>>
>> /* The disk structure for the kernel. */
>> struct gendisk *disk;
>> @@ -47,6 +50,10 @@ struct virtio_blk
>>
>> /* Ida index - used to track minor number allocations. */
>> int index;
>> +
>> + /* num of vqs */
>> + int num_vqs;
>> + struct virtio_blk_vq *vqs;
>> };
>>
>> struct virtblk_req
>> @@ -133,14 +140,15 @@ static void virtblk_done(struct virtqueue *vq)
>> {
>> struct virtio_blk *vblk = vq->vdev->priv;
>> bool req_done = false;
>> + int qid = vq->index;
>> struct virtblk_req *vbr;
>> unsigned long flags;
>> unsigned int len;
>>
>> - spin_lock_irqsave(&vblk->vq_lock, flags);
>> + spin_lock_irqsave(&vblk->vqs[qid].lock, flags);
>> do {
>> virtqueue_disable_cb(vq);
>> - while ((vbr = virtqueue_get_buf(vblk->vq, &len)) != NULL) {
>> + while ((vbr = virtqueue_get_buf(vblk->vqs[qid].vq, &len)) != NULL) {
>> blk_mq_complete_request(vbr->req);
>> req_done = true;
>> }
>> @@ -151,7 +159,7 @@ static void virtblk_done(struct virtqueue *vq)
>> /* In case queue is stopped waiting for more buffers. */
>> if (req_done)
>> blk_mq_start_stopped_hw_queues(vblk->disk->queue, true);
>> - spin_unlock_irqrestore(&vblk->vq_lock, flags);
>> + spin_unlock_irqrestore(&vblk->vqs[qid].lock, flags);
>> }
>>
>> static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req)
>> @@ -160,6 +168,7 @@ static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req)
>> struct virtblk_req *vbr = blk_mq_rq_to_pdu(req);
>> unsigned long flags;
>> unsigned int num;
>> + int qid = hctx->queue_num;
>> const bool last = (req->cmd_flags & REQ_END) != 0;
>> int err;
>> bool notify = false;
>> @@ -202,12 +211,12 @@ static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req)
>> vbr->out_hdr.type |= VIRTIO_BLK_T_IN;
>> }
>>
>> - spin_lock_irqsave(&vblk->vq_lock, flags);
>> - err = __virtblk_add_req(vblk->vq, vbr, vbr->sg, num);
>> + spin_lock_irqsave(&vblk->vqs[qid].lock, flags);
>> + err = __virtblk_add_req(vblk->vqs[qid].vq, vbr, vbr->sg, num);
>> if (err) {
>> - virtqueue_kick(vblk->vq);
>> + virtqueue_kick(vblk->vqs[qid].vq);
>> blk_mq_stop_hw_queue(hctx);
>> - spin_unlock_irqrestore(&vblk->vq_lock, flags);
>> + spin_unlock_irqrestore(&vblk->vqs[qid].lock, flags);
>> /* Out of mem doesn't actually happen, since we fall back
>> * to direct descriptors */
>> if (err == -ENOMEM || err == -ENOSPC)
>> @@ -215,12 +224,12 @@ static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req)
>> return BLK_MQ_RQ_QUEUE_ERROR;
>> }
>>
>> - if (last && virtqueue_kick_prepare(vblk->vq))
>> + if (last && virtqueue_kick_prepare(vblk->vqs[qid].vq))
>> notify = true;
>> - spin_unlock_irqrestore(&vblk->vq_lock, flags);
>> + spin_unlock_irqrestore(&vblk->vqs[qid].lock, flags);
>>
>> if (notify)
>> - virtqueue_notify(vblk->vq);
>> + virtqueue_notify(vblk->vqs[qid].vq);
>> return BLK_MQ_RQ_QUEUE_OK;
>> }
>>
>> @@ -377,12 +386,71 @@ static void virtblk_config_changed(struct virtio_device *vdev)
>> static int init_vq(struct virtio_blk *vblk)
>> {
>> int err = 0;
>> + int i;
>> + vq_callback_t **callbacks;
>> + const char **names;
>> + char *name_array;
>> + struct virtqueue **vqs;
>> + unsigned short num_vqs;
>> + struct virtio_device *vdev = vblk->vdev;
>>
>> - /* We expect one virtqueue, for output. */
>> - vblk->vq = virtio_find_single_vq(vblk->vdev, virtblk_done, "requests");
>> - if (IS_ERR(vblk->vq))
>> - err = PTR_ERR(vblk->vq);
>> + err = virtio_cread_feature(vdev, VIRTIO_BLK_F_MQ,
>> + struct virtio_blk_config, num_queues,
>> + &num_vqs);
>> + if (err)
>> + num_vqs = 1;
>> +
>> + vblk->vqs = kmalloc(sizeof(*vblk->vqs) * num_vqs, GFP_KERNEL);
>> + if (!vblk->vqs) {
>> + err = -ENOMEM;
>> + goto out;
>> + }
>>
>> + name_array = kmalloc(sizeof(char) * 32 * num_vqs, GFP_KERNEL);
>
> sizeof(char) is 1, just drop it.
> We don't do if (!NULL) just in case someone redefined it, either.
>
>> + if (!name_array)
>> + goto err_name_array;
>
> You want vmalloc here, it will fail on high # of vqs, and speed
> doesn't matter for names.

I don't think there should be lots of vqs:

- each virtio-blk has only one disk, not like virtio-scsi

- with aio, the block io can easily reach its top throughput
with very few IO threads

- for each IO thread, just several vqs can make it at full load
(in my test, 2 vqs can make one iothread at full loading)

- more vqs, more notifications and irqs, which hurt performance too

so I think we needn't consider the huge vqs case until it is
proved to be necessary, we have use one vq per virtio-blk
working for long time at all.

>
>> +
>> + names = kmalloc(sizeof(*names) * num_vqs, GFP_KERNEL);
>> + if (!names)
>> + goto err_names;
>> +
>> + callbacks = kmalloc(sizeof(*callbacks) * num_vqs, GFP_KERNEL);
>> + if (!callbacks)
>> + goto err_callbacks;
>> +
>> + vqs = kmalloc(sizeof(*vqs) * num_vqs, GFP_KERNEL);
>> + if (!vqs)
>> + goto err_vqs;
>> +
>> + for (i = 0; i < num_vqs; i++) {
>> + callbacks[i] = virtblk_done;
>> + snprintf(&name_array[i * 32], 32, "req.%d", i);
>
> Eschew abbreviation. Call it requests.%d.

I like short name because it can fit in 80 character's column
when reading /proc/interrupts.

>
>> + names[i] = &name_array[i * 32];
>> + }
>
> That 32 and pointer math hurts.
> Please create a structure and use an array everywhere.

OK.

>
>
>> +
>> + /* Discover virtqueues and write information to configuration. */
>> + err = vdev->config->find_vqs(vdev, num_vqs, vqs, callbacks, names);
>> + if (err)
>> + goto err_find_vqs;
>> +
>> + for (i = 0; i < num_vqs; i++) {
>> + spin_lock_init(&vblk->vqs[i].lock);
>> + vblk->vqs[i].vq = vqs[i];
>> + }
>> + vblk->num_vqs = num_vqs;
>> +
>> + err_find_vqs:
>> + kfree(vqs);
>> + err_vqs:
>> + kfree(callbacks);
>> + err_callbacks:
>> + kfree(names);
>> + err_names:
>> + kfree(name_array);
>
> This one will cause use after free if vq names are later used, since
> vring_new_virtqueue simply does
> vq->vq.name = name;
>
> You need to keep the memory around until unplug.

That is a bug, will fix that.

>
>> + err_name_array:
>> + if (err)
>> + kfree(vblk->vqs);
>> + out:
>> return err;
>> }
>>
>> @@ -551,7 +619,6 @@ static int virtblk_probe(struct virtio_device *vdev)
>> err = init_vq(vblk);
>> if (err)
>> goto out_free_vblk;
>> - spin_lock_init(&vblk->vq_lock);
>>
>> /* FIXME: How many partitions? How long is a piece of string? */
>> vblk->disk = alloc_disk(1 << PART_BITS);
>> @@ -562,7 +629,7 @@ static int virtblk_probe(struct virtio_device *vdev)
>>
>> /* Default queue sizing is to fill the ring. */
>> if (!virtblk_queue_depth) {
>> - virtblk_queue_depth = vblk->vq->num_free;
>> + virtblk_queue_depth = vblk->vqs[0].vq->num_free;
>> /* ... but without indirect descs, we use 2 descs per req */
>> if (!virtio_has_feature(vdev, VIRTIO_RING_F_INDIRECT_DESC))
>> virtblk_queue_depth /= 2;
>> @@ -570,7 +637,6 @@ static int virtblk_probe(struct virtio_device *vdev)
>>
>> memset(&vblk->tag_set, 0, sizeof(vblk->tag_set));
>> vblk->tag_set.ops = &virtio_mq_ops;
>> - vblk->tag_set.nr_hw_queues = 1;
>> vblk->tag_set.queue_depth = virtblk_queue_depth;
>> vblk->tag_set.numa_node = NUMA_NO_NODE;
>> vblk->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
>> @@ -578,6 +644,7 @@ static int virtblk_probe(struct virtio_device *vdev)
>> sizeof(struct virtblk_req) +
>> sizeof(struct scatterlist) * sg_elems;
>> vblk->tag_set.driver_data = vblk;
>> + vblk->tag_set.nr_hw_queues = vblk->num_vqs;
>>
>> err = blk_mq_alloc_tag_set(&vblk->tag_set);
>> if (err)
>> @@ -727,6 +794,7 @@ static void virtblk_remove(struct virtio_device *vdev)
>> refc = atomic_read(&disk_to_dev(vblk->disk)->kobj.kref.refcount);
>> put_disk(vblk->disk);
>> vdev->config->del_vqs(vdev);
>> + kfree(vblk->vqs);
>> kfree(vblk);
>>
>> /* Only free device id if we don't have any users */
>> @@ -777,7 +845,8 @@ static const struct virtio_device_id id_table[] = {
>> static unsigned int features[] = {
>> VIRTIO_BLK_F_SEG_MAX, VIRTIO_BLK_F_SIZE_MAX, VIRTIO_BLK_F_GEOMETRY,
>> VIRTIO_BLK_F_RO, VIRTIO_BLK_F_BLK_SIZE, VIRTIO_BLK_F_SCSI,
>> - VIRTIO_BLK_F_WCE, VIRTIO_BLK_F_TOPOLOGY, VIRTIO_BLK_F_CONFIG_WCE
>> + VIRTIO_BLK_F_WCE, VIRTIO_BLK_F_TOPOLOGY, VIRTIO_BLK_F_CONFIG_WCE,
>> + VIRTIO_BLK_F_MQ,
>> };
>>
>> static struct virtio_driver virtio_blk = {
>> --
>> 1.7.9.5
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/