Re: [RFC 2/4] lightnvm: add write buffering for rrpc

From: Matias BjÃrling
Date: Fri Feb 05 2016 - 09:52:49 EST


On 02/04/2016 02:08 PM, Javier GonzÃlez wrote:
> Flash controllers typically define flash pages as a collection of flash
> sectors of typically 4K. Moreover, flash controllers might program flash
> pages across several planes. This defines the write granurality at which
> flash can be programmed. This is different for each flash controller. In
> rrpc, the assumption has been that flash pages are 4K, thus writes could
> be sent out to the media as they were received.
>
> This patch adds a per-block write buffer to rrpc. Writes are flushed to
> the media in chuncks of the minimum granurality allowed by the
> controller, though the strategy can be changed to, for example, only
> flush at the maximum supported (64 pages in nvme). Apart from enabling
> the use of rrpc in real hardware, this buffering strategy will be used
> for recovery; if a block becomes bad, a new block can be allocated to
> persist valid data.

Great work. The code is nearly 2x'ed in size. Can you add a detailed
explanation on the inner workings of the write buffering to help new
users to understand its logic?

>
> Signed-off-by: Javier GonzÃlez <javier@xxxxxxxxxxxx>
> ---
> drivers/lightnvm/rrpc.c | 851 ++++++++++++++++++++++++++++++++++++++---------
> drivers/lightnvm/rrpc.h | 82 ++++-
> include/linux/lightnvm.h | 6 +-
> 3 files changed, 771 insertions(+), 168 deletions(-)
>
> diff --git a/drivers/lightnvm/rrpc.c b/drivers/lightnvm/rrpc.c
> index 8187bf3..e9fb19d 100644
> --- a/drivers/lightnvm/rrpc.c
> +++ b/drivers/lightnvm/rrpc.c
> @@ -16,11 +16,12 @@
>
> #include "rrpc.h"
>
> -static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache;
> +static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache, *rrpc_rrq_cache,
> + *rrpc_buf_rrq_cache, *rrpc_wb_cache, *rrpc_block_cache;
> static DECLARE_RWSEM(rrpc_lock);
>
> static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> - struct nvm_rq *rqd, unsigned long flags);
> + struct rrpc_rq *rqd, unsigned long flags);
>
> #define rrpc_for_each_lun(rrpc, rlun, i) \
> for ((i) = 0, rlun = &(rrpc)->luns[0]; \
> @@ -62,53 +63,59 @@ static void rrpc_invalidate_range(struct rrpc *rrpc, sector_t slba,
> spin_unlock(&rrpc->rev_lock);
> }
>
> -static struct nvm_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
> +static void rrpc_release_and_free_rrqd(struct kref *ref)
> +{
> + struct rrpc_rq *rrqd = container_of(ref, struct rrpc_rq, refs);
> + struct rrpc *rrpc = rrqd->addr->rblk->rlun->rrpc;

Wow.. Happy that we got to rrpc :)

> +
> + rrpc_unlock_rq(rrpc, rrqd);
> + mempool_free(rrqd, rrpc->rrq_pool);
> +}
> +
> +static struct rrpc_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
> sector_t laddr, unsigned int pages)
> {
> - struct nvm_rq *rqd;
> + struct rrpc_rq *rrqd;
> struct rrpc_inflight_rq *inf;
>
> - rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> - if (!rqd)
> + rrqd = mempool_alloc(rrpc->rrq_pool, GFP_ATOMIC);
> + if (!rrqd)
> return ERR_PTR(-ENOMEM);
> + kref_init(&rrqd->refs);
>
> - inf = rrpc_get_inflight_rq(rqd);
> + inf = rrpc_get_inflight_rq(rrqd);
> if (rrpc_lock_laddr(rrpc, laddr, pages, inf)) {
> - mempool_free(rqd, rrpc->rq_pool);
> + mempool_free(rrqd, rrpc->rrq_pool);
> return NULL;
> }
>
> - return rqd;
> + return rrqd;
> }
>
> -static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct nvm_rq *rqd)
> +static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct rrpc_rq *rrqd)
> {
> - struct rrpc_inflight_rq *inf = rrpc_get_inflight_rq(rqd);
> -
> - rrpc_unlock_laddr(rrpc, inf);
> -
> - mempool_free(rqd, rrpc->rq_pool);
> + kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
> }
>
> static void rrpc_discard(struct rrpc *rrpc, struct bio *bio)
> {
> sector_t slba = bio->bi_iter.bi_sector / NR_PHY_IN_LOG;
> sector_t len = bio->bi_iter.bi_size / RRPC_EXPOSED_PAGE_SIZE;
> - struct nvm_rq *rqd;
> + struct rrpc_rq *rrqd;
>
> do {
> - rqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
> + rrqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
> schedule();
> - } while (!rqd);
> + } while (!rrqd);
>
> - if (IS_ERR(rqd)) {
> + if (IS_ERR(rrqd)) {
> pr_err("rrpc: unable to acquire inflight IO\n");
> bio_io_error(bio);
> return;
> }
>
> rrpc_invalidate_range(rrpc, slba, len);
> - rrpc_inflight_laddr_release(rrpc, rqd);
> + rrpc_inflight_laddr_release(rrpc, rrqd);
> }
>
> static int block_is_full(struct rrpc *rrpc, struct rrpc_block *rblk)
> @@ -166,8 +173,6 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
> {
> struct rrpc *rrpc = rlun->rrpc;
>
> - BUG_ON(!rblk);
> -
> if (rlun->cur) {
> spin_lock(&rlun->cur->lock);
> WARN_ON(!block_is_full(rrpc, rlun->cur));
> @@ -176,12 +181,107 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
> rlun->cur = rblk;
> }
>
> +static void rrpc_free_w_buffer(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> +try:
> + spin_lock(&rblk->w_buf.s_lock);
> + if (atomic_read(&rblk->w_buf.refs) > 0) {
> + spin_unlock(&rblk->w_buf.s_lock);
> + schedule();
> + goto try;
> + }
> +
> + mempool_free(rblk->w_buf.entries, rrpc->block_pool);
> + rblk->w_buf.entries = NULL;
> + spin_unlock(&rblk->w_buf.s_lock);
> +
> + /* TODO: Reuse the same buffers if the block size is the same */
> + mempool_free(rblk->w_buf.data, rrpc->write_buf_pool);
> + kfree(rblk->w_buf.sync_bitmap);
> +
> + rblk->w_buf.mem = NULL;
> + rblk->w_buf.subm = NULL;
> + rblk->w_buf.sync_bitmap = NULL;
> + rblk->w_buf.data = NULL;
> + rblk->w_buf.nentries = 0;
> + rblk->w_buf.cur_mem = 0;
> + rblk->w_buf.cur_subm = 0;
> +}
> +
> +static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> + struct rrpc_lun *rlun = rblk->rlun;
> + struct nvm_lun *lun = rlun->parent;
> + struct rrpc_w_buf *buf = &rblk->w_buf;
> +
> +try:
> + spin_lock(&buf->w_lock);
> + /* Flush inflight I/Os */
> + if (!bitmap_full(buf->sync_bitmap, buf->cur_mem)) {
> + spin_unlock(&buf->w_lock);
> + schedule();
> + goto try;
> + }
> + spin_unlock(&buf->w_lock);
> +
> + if (rblk->w_buf.entries)
> + rrpc_free_w_buffer(rrpc, rblk);
> +
> + spin_lock(&lun->lock);
> + nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> + list_del(&rblk->list);
> + spin_unlock(&lun->lock);
> +}
> +
> +static void rrpc_put_blks(struct rrpc *rrpc)
> +{
> + struct rrpc_lun *rlun;
> + int i;
> +
> + for (i = 0; i < rrpc->nr_luns; i++) {
> + rlun = &rrpc->luns[i];
> + if (rlun->cur)
> + rrpc_put_blk(rrpc, rlun->cur);
> + if (rlun->gc_cur)
> + rrpc_put_blk(rrpc, rlun->gc_cur);
> + }
> +}
> +
> static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
> unsigned long flags)
> {
> + struct nvm_dev *dev = rrpc->dev;
> struct nvm_lun *lun = rlun->parent;
> struct nvm_block *blk;
> struct rrpc_block *rblk;
> + struct buf_entry *entries;
> + unsigned long *sync_bitmap;
> + void *data;
> + int nentries = dev->pgs_per_blk * dev->sec_per_pg;
> +
> + data = mempool_alloc(rrpc->write_buf_pool, GFP_ATOMIC);
> + if (!data) {
> + pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> + return NULL;
> + }
> +
> + entries = mempool_alloc(rrpc->block_pool, GFP_ATOMIC);
> + if (!entries) {
> + pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> + mempool_free(data, rrpc->write_buf_pool);
> + return NULL;
> + }
> +
> + /* TODO: Mempool? */

Not on fast path. I don't think we need mempools at all to take some
unnecessary memory.

> + sync_bitmap = kmalloc(BITS_TO_LONGS(nentries) *
> + sizeof(unsigned long), GFP_ATOMIC);
> + if (!sync_bitmap) {
> + mempool_free(data, rrpc->write_buf_pool);
> + mempool_free(entries, rrpc->block_pool);
> + return NULL;
> + }
> +
> + bitmap_zero(sync_bitmap, nentries);
>
> spin_lock(&lun->lock);
> blk = nvm_get_blk_unlocked(rrpc->dev, rlun->parent, flags);
> @@ -192,43 +292,34 @@ static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
> }
>
> rblk = &rlun->blocks[blk->id];
> - list_add_tail(&rblk->list, &rlun->open_list);
> - spin_unlock(&lun->lock);
>
> blk->priv = rblk;
> - bitmap_zero(rblk->invalid_pages, rrpc->dev->pgs_per_blk);
> + bitmap_zero(rblk->invalid_pages, dev->pgs_per_blk);
> rblk->next_page = 0;
> rblk->nr_invalid_pages = 0;
> - atomic_set(&rblk->data_cmnt_size, 0);
> +
> + rblk->w_buf.data = data;
> + rblk->w_buf.entries = entries;
> + rblk->w_buf.sync_bitmap = sync_bitmap;
> +
> + rblk->w_buf.entries->data = rblk->w_buf.data;
> + rblk->w_buf.mem = rblk->w_buf.entries;
> + rblk->w_buf.subm = rblk->w_buf.entries;
> + rblk->w_buf.nentries = nentries;
> + rblk->w_buf.cur_mem = 0;
> + rblk->w_buf.cur_subm = 0;
> +
> + atomic_set(&rblk->w_buf.refs, 0);
> +
> + spin_lock_init(&rblk->w_buf.w_lock);
> + spin_lock_init(&rblk->w_buf.s_lock);
> +
> + list_add_tail(&rblk->list, &rlun->open_list);
> + spin_unlock(&lun->lock);
>
> return rblk;
> }
>
> -static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> -{
> - struct rrpc_lun *rlun = rblk->rlun;
> - struct nvm_lun *lun = rlun->parent;
> -
> - spin_lock(&lun->lock);
> - nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> - list_del(&rblk->list);
> - spin_unlock(&lun->lock);
> -}
> -
> -static void rrpc_put_blks(struct rrpc *rrpc)
> -{
> - struct rrpc_lun *rlun;
> - int i;
> -
> - for (i = 0; i < rrpc->nr_luns; i++) {
> - rlun = &rrpc->luns[i];
> - if (rlun->cur)
> - rrpc_put_blk(rrpc, rlun->cur);
> - if (rlun->gc_cur)
> - rrpc_put_blk(rrpc, rlun->gc_cur);
> - }
> -}
> -
> static struct rrpc_lun *get_next_lun(struct rrpc *rrpc)
> {
> int next = atomic_inc_return(&rrpc->next_lun);
> @@ -247,6 +338,17 @@ static void rrpc_gc_kick(struct rrpc *rrpc)
> }
> }
>
> +static void rrpc_writer_kick(struct rrpc *rrpc)
> +{
> + struct rrpc_lun *rlun;
> + unsigned int i;
> +
> + for (i = 0; i < rrpc->nr_luns; i++) {
> + rlun = &rrpc->luns[i];
> + queue_work(rrpc->kw_wq, &rlun->ws_writer);
> + }
> +}
> +
> /*
> * timed GC every interval.
> */
> @@ -282,7 +384,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
> {
> struct request_queue *q = rrpc->dev->q;
> struct rrpc_rev_addr *rev;
> - struct nvm_rq *rqd;
> + struct rrpc_rq *rrqd;
> struct bio *bio;
> struct page *page;
> int slot;
> @@ -306,7 +408,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
> }
>
> while ((slot = find_first_zero_bit(rblk->invalid_pages,
> - nr_pgs_per_blk)) < nr_pgs_per_blk) {
> + nr_pgs_per_blk)) < nr_pgs_per_blk) {

No need to fix the indentation here.

>
> /* Lock laddr */
> phys_addr = (rblk->parent->id * nr_pgs_per_blk) + slot;
> @@ -321,8 +423,8 @@ try:
> continue;
> }
>
> - rqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> - if (IS_ERR_OR_NULL(rqd)) {
> + rrqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> + if (IS_ERR_OR_NULL(rrqd)) {
> spin_unlock(&rrpc->rev_lock);
> schedule();
> goto try;
> @@ -339,14 +441,14 @@ try:
> /* TODO: may fail when EXP_PG_SIZE > PAGE_SIZE */
> bio_add_pc_page(q, bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
>
> - if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> + if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)) {
> pr_err("rrpc: gc read failed.\n");
> - rrpc_inflight_laddr_release(rrpc, rqd);
> + rrpc_inflight_laddr_release(rrpc, rrqd);
> goto finished;
> }
> wait_for_completion_io(&wait);
> if (bio->bi_error) {
> - rrpc_inflight_laddr_release(rrpc, rqd);
> + rrpc_inflight_laddr_release(rrpc, rrqd);
> goto finished;
> }
>
> @@ -363,14 +465,16 @@ try:
> /* turn the command around and write the data back to a new
> * address
> */
> - if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> + if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)
> + != NVM_IO_DONE) {
> pr_err("rrpc: gc write failed.\n");
> - rrpc_inflight_laddr_release(rrpc, rqd);
> + rrpc_inflight_laddr_release(rrpc, rrqd);
> goto finished;
> }
> + bio_endio(bio);
> wait_for_completion_io(&wait);
>
> - rrpc_inflight_laddr_release(rrpc, rqd);
> + /* rrpc_inflight_laddr_release(rrpc, rrqd); */

This is commented out.

> if (bio->bi_error)
> goto finished;
>
> @@ -514,6 +618,8 @@ static void rrpc_gc_queue(struct work_struct *work)
> list_move_tail(&rblk->list, &rlun->closed_list);
> spin_unlock(&lun->lock);
>
> + rrpc_free_w_buffer(rrpc, rblk);
> +
> mempool_free(gcb, rrpc->gcb_pool);
> pr_debug("nvm: block '%lu' is full, allow GC (sched)\n",
> rblk->parent->id);
> @@ -663,43 +769,72 @@ static void rrpc_run_gc(struct rrpc *rrpc, struct rrpc_block *rblk)
> queue_work(rrpc->kgc_wq, &gcb->ws_gc);
> }
>
> -static void rrpc_end_io_write(struct rrpc *rrpc, struct rrpc_rq *rrqd,
> - sector_t laddr, uint8_t npages)
> +static void rrpc_sync_buffer(struct rrpc *rrpc, struct rrpc_addr *p)
> {
> - struct rrpc_addr *p;
> struct rrpc_block *rblk;
> + struct rrpc_w_buf *buf;
> struct nvm_lun *lun;
> - int cmnt_size, i;
> + unsigned long bppa;
>
> - for (i = 0; i < npages; i++) {
> - p = &rrpc->trans_map[laddr + i];
> - rblk = p->rblk;
> - lun = rblk->parent->lun;
> + rblk = p->rblk;
> + buf = &rblk->w_buf;
> + lun = rblk->parent->lun;
>
> - cmnt_size = atomic_inc_return(&rblk->data_cmnt_size);
> - if (unlikely(cmnt_size == rrpc->dev->pgs_per_blk))
> - rrpc_run_gc(rrpc, rblk);
> + bppa = rrpc->dev->sec_per_blk * rblk->parent->id;
> +
> + WARN_ON(test_and_set_bit((p->addr - bppa), buf->sync_bitmap));
> +
> + if (unlikely(bitmap_full(buf->sync_bitmap, buf->nentries))) {
> + /* Write buffer out-of-bounds */
> + WARN_ON((buf->cur_mem != buf->nentries) &&
> + (buf->cur_mem != buf->cur_subm));
> +
> + rrpc_run_gc(rrpc, rblk);
> + }
> +}
> +
> +static void rrpc_end_io_write(struct rrpc *rrpc, struct nvm_rq *rqd,
> + uint8_t nr_pages)
> +{
> + struct rrpc_buf_rq *brrqd = nvm_rq_to_pdu(rqd);
> + struct rrpc_rq *rrqd;
> + int i;
> +
> + for (i = 0; i < nr_pages; i++) {
> + rrqd = brrqd[i].rrqd;
> + rrpc_sync_buffer(rrpc, brrqd[i].addr);
> + kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
> }
> +
> + mempool_free(brrqd, rrpc->m_rrq_pool);
> + rrpc_writer_kick(rrpc);
> +}
> +
> +static void rrpc_end_io_read(struct rrpc *rrpc, struct nvm_rq *rqd,
> + uint8_t nr_pages)
> +{
> + struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +
> + if (rqd->flags & NVM_IOTYPE_GC)
> + return;
> +
> + rrpc_unlock_rq(rrpc, rrqd);
> + mempool_free(rrqd, rrpc->rrq_pool);
> }
>
> static void rrpc_end_io(struct nvm_rq *rqd)
> {
> struct rrpc *rrpc = container_of(rqd->ins, struct rrpc, instance);
> - struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> - uint8_t npages = rqd->nr_pages;
> - sector_t laddr = rrpc_get_laddr(rqd->bio) - npages;
> + uint8_t nr_pages = rqd->nr_pages;
>
> if (bio_data_dir(rqd->bio) == WRITE)
> - rrpc_end_io_write(rrpc, rrqd, laddr, npages);
> + rrpc_end_io_write(rrpc, rqd, nr_pages);
> + else
> + rrpc_end_io_read(rrpc, rqd, nr_pages);
>
> bio_put(rqd->bio);
>
> - if (rrqd->flags & NVM_IOTYPE_GC)
> - return;
> -
> - rrpc_unlock_rq(rrpc, rqd);
> -
> - if (npages > 1)
> + if (nr_pages > 1)

Cleanup patches can go in afterwards or before.

> nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> if (rqd->metadata)
> nvm_dev_dma_free(rrpc->dev, rqd->metadata, rqd->dma_metadata);
> @@ -708,20 +843,24 @@ static void rrpc_end_io(struct nvm_rq *rqd)
> }
>
> static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> - struct nvm_rq *rqd, unsigned long flags, int npages)
> + struct nvm_rq *rqd, struct rrpc_buf_rq *brrqd,
> + unsigned long flags, int nr_pages)
> {
> - struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> + struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> + struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> struct rrpc_addr *gp;
> sector_t laddr = rrpc_get_laddr(bio);
> int is_gc = flags & NVM_IOTYPE_GC;
> int i;
>
> - if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> + if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> + mempool_free(rrqd, rrpc->rrq_pool);
> + mempool_free(rqd, rrpc->rq_pool);

Seems like there is a relationship here between rrq_pool (rrpc_rq) and
rq_pool (nvm_rq)

To get to a single mempool alloc ( and just keep the nvm_rq mempool )



> return NVM_IO_REQUEUE;
> }
>
> - for (i = 0; i < npages; i++) {
> + for (i = 0; i < nr_pages; i++) {
> /* We assume that mapping occurs at 4KB granularity */
> BUG_ON(!(laddr + i >= 0 && laddr + i < rrpc->nr_sects));
> gp = &rrpc->trans_map[laddr + i];
> @@ -734,8 +873,12 @@ static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> rrpc_unlock_laddr(rrpc, r);
> nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
> rqd->dma_ppa_list);
> + mempool_free(rrqd, rrpc->rrq_pool);
> + mempool_free(rqd, rrpc->rq_pool);
> return NVM_IO_DONE;
> }
> +
> + brrqd[i].addr = gp;
> }
>
> rqd->opcode = NVM_OP_HBREAD;
> @@ -751,8 +894,11 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
> sector_t laddr = rrpc_get_laddr(bio);
> struct rrpc_addr *gp;
>
> - if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> + if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> + mempool_free(rrqd, rrpc->rrq_pool);
> + mempool_free(rqd, rrpc->rq_pool);
> return NVM_IO_REQUEUE;
> + }
>
> BUG_ON(!(laddr >= 0 && laddr < rrpc->nr_sects));
> gp = &rrpc->trans_map[laddr];
> @@ -761,7 +907,9 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
> rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, gp->addr);
> } else {
> BUG_ON(is_gc);
> - rrpc_unlock_rq(rrpc, rqd);
> + rrpc_unlock_rq(rrpc, rrqd);
> + mempool_free(rrqd, rrpc->rrq_pool);
> + mempool_free(rqd, rrpc->rq_pool);
> return NVM_IO_DONE;
> }
>
> @@ -771,120 +919,190 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
> return NVM_IO_OK;
> }
>
> +/*
> + * Copy data from current bio to block write buffer. This if necessary
> + * to guarantee durability if a flash block becomes bad before all pages
> + * are written. This buffer is also used to write at the right page
> + * granurality
> + */
> +static int rrpc_write_to_buffer(struct rrpc *rrpc, struct bio *bio,
> + struct rrpc_rq *rrqd, struct rrpc_addr *addr,
> + struct rrpc_w_buf *w_buf,
> + unsigned long flags)
> +{
> + struct nvm_dev *dev = rrpc->dev;
> + unsigned int bio_len = bio_cur_bytes(bio);
> +
> + if (bio_len != RRPC_EXPOSED_PAGE_SIZE)
> + return NVM_IO_ERR;
> +
> + spin_lock(&w_buf->w_lock);
> +
> + WARN_ON(w_buf->cur_mem == w_buf->nentries);
> +
> + w_buf->mem->rrqd = rrqd;
> + w_buf->mem->addr = addr;
> + w_buf->mem->flags = flags;
> +
> + memcpy(w_buf->mem->data, bio_data(bio), bio_len);
> +
> + w_buf->cur_mem++;
> + if (likely(w_buf->cur_mem < w_buf->nentries)) {
> + w_buf->mem++;
> + w_buf->mem->data =
> + w_buf->data + (w_buf->cur_mem * dev->sec_size);
> + }
> +
> + spin_unlock(&w_buf->w_lock);
> +
> + return 0;
> +}
> +
> static int rrpc_write_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> - struct nvm_rq *rqd, unsigned long flags, int npages)
> + struct rrpc_rq *rrqd, unsigned long flags, int nr_pages)
> {
> - struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> + struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> + struct rrpc_w_buf *w_buf;
> struct rrpc_addr *p;
> + struct rrpc_lun *rlun;
> sector_t laddr = rrpc_get_laddr(bio);
> int is_gc = flags & NVM_IOTYPE_GC;
> + int err;
> int i;
>
> - if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> - nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> + if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> + mempool_free(rrqd, rrpc->rrq_pool);
> return NVM_IO_REQUEUE;
> }
>
> - for (i = 0; i < npages; i++) {
> + for (i = 0; i < nr_pages; i++) {
> + kref_get(&rrqd->refs);
> +
> /* We assume that mapping occurs at 4KB granularity */
> p = rrpc_map_page(rrpc, laddr + i, is_gc);
> if (!p) {
> BUG_ON(is_gc);
> rrpc_unlock_laddr(rrpc, r);
> - nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
> - rqd->dma_ppa_list);
> + mempool_free(rrqd, rrpc->rrq_pool);
> rrpc_gc_kick(rrpc);
> return NVM_IO_REQUEUE;
> }
>
> - rqd->ppa_list[i] = rrpc_ppa_to_gaddr(rrpc->dev,
> - p->addr);
> + w_buf = &p->rblk->w_buf;
> + rlun = p->rblk->rlun;
> +
> + rrqd->addr = p;
> +
> + err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> + if (err) {
> + pr_err("rrpc: could not write to write buffer\n");
> + return err;
> + }
> +
> + bio_advance(bio, RRPC_EXPOSED_PAGE_SIZE);
> +
> + queue_work(rrpc->kw_wq, &rlun->ws_writer);

How about a timer and only the kick when a flush command is sent?

> }
>
> - rqd->opcode = NVM_OP_HBWRITE;
> + if (kref_put(&rrqd->refs, rrpc_release_and_free_rrqd)) {
> + pr_err("rrpc: request reference counter dailed\n");
> + return NVM_IO_ERR;
> + }
>
> - return NVM_IO_OK;
> + return NVM_IO_DONE;
> }
>
> static int rrpc_write_rq(struct rrpc *rrpc, struct bio *bio,
> - struct nvm_rq *rqd, unsigned long flags)
> + struct rrpc_rq *rrqd, unsigned long flags)
> {
> - struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> + struct rrpc_w_buf *w_buf;
> struct rrpc_addr *p;
> + struct rrpc_lun *rlun;
> int is_gc = flags & NVM_IOTYPE_GC;
> + int err;
> sector_t laddr = rrpc_get_laddr(bio);
>
> - if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> + if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> + mempool_free(rrqd, rrpc->rrq_pool);
> return NVM_IO_REQUEUE;
> + }
>
> p = rrpc_map_page(rrpc, laddr, is_gc);
> if (!p) {
> BUG_ON(is_gc);
> - rrpc_unlock_rq(rrpc, rqd);
> + rrpc_unlock_rq(rrpc, rrqd);
> + mempool_free(rrqd, rrpc->rrq_pool);
> rrpc_gc_kick(rrpc);
> return NVM_IO_REQUEUE;
> }
>
> - rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, p->addr);
> - rqd->opcode = NVM_OP_HBWRITE;
> + w_buf = &p->rblk->w_buf;
> + rlun = p->rblk->rlun;
> +
> rrqd->addr = p;
>
> - return NVM_IO_OK;
> + err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> + if (err) {
> + pr_err("rrpc: could not write to write buffer\n");
> + return err;
> + }
> +
> + queue_work(rrpc->kw_wq, &rlun->ws_writer);
> + return NVM_IO_DONE;
> }
>
> -static int rrpc_setup_rq(struct rrpc *rrpc, struct bio *bio,
> - struct nvm_rq *rqd, unsigned long flags, uint8_t npages)
> +static int rrpc_submit_read(struct rrpc *rrpc, struct bio *bio,
> + struct rrpc_rq *rrqd, unsigned long flags)
> {
> - if (npages > 1) {
> + struct nvm_rq *rqd;
> + struct rrpc_buf_rq brrqd[rrpc->max_write_pgs];
> + uint8_t nr_pages = rrpc_get_pages(bio);
> + int err;
> +
> + rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> + if (!rqd) {
> + pr_err_ratelimited("rrpc: not able to queue bio.");
> + bio_io_error(bio);
> + return NVM_IO_ERR;
> + }
> + rqd->metadata = NULL;
> + rqd->priv = rrqd;
> +
> + if (nr_pages > 1) {
> rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_KERNEL,
> - &rqd->dma_ppa_list);
> + &rqd->dma_ppa_list);
> if (!rqd->ppa_list) {
> pr_err("rrpc: not able to allocate ppa list\n");
> + mempool_free(rrqd, rrpc->rrq_pool);
> + mempool_free(rqd, rrpc->rq_pool);
> return NVM_IO_ERR;
> }
>
> - if (bio_rw(bio) == WRITE)
> - return rrpc_write_ppalist_rq(rrpc, bio, rqd, flags,
> - npages);
> -
> - return rrpc_read_ppalist_rq(rrpc, bio, rqd, flags, npages);
> + err = rrpc_read_ppalist_rq(rrpc, bio, rqd, brrqd, flags,
> + nr_pages);
> + if (err) {
> + mempool_free(rrqd, rrpc->rrq_pool);
> + mempool_free(rqd, rrpc->rq_pool);
> + return err;
> + }
> + } else {
> + err = rrpc_read_rq(rrpc, bio, rqd, flags);
> + if (err)
> + return err;
> }
>
> - if (bio_rw(bio) == WRITE)
> - return rrpc_write_rq(rrpc, bio, rqd, flags);
> -
> - return rrpc_read_rq(rrpc, bio, rqd, flags);
> -}
> -
> -static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> - struct nvm_rq *rqd, unsigned long flags)
> -{
> - int err;
> - struct rrpc_rq *rrq = nvm_rq_to_pdu(rqd);
> - uint8_t nr_pages = rrpc_get_pages(bio);
> - int bio_size = bio_sectors(bio) << 9;
> -
> - if (bio_size < rrpc->dev->sec_size)
> - return NVM_IO_ERR;
> - else if (bio_size > rrpc->dev->max_rq_size)
> - return NVM_IO_ERR;
> -
> - err = rrpc_setup_rq(rrpc, bio, rqd, flags, nr_pages);
> - if (err)
> - return err;
> -
> bio_get(bio);
> rqd->bio = bio;
> rqd->ins = &rrpc->instance;
> - rqd->nr_pages = nr_pages;
> - rrq->flags = flags;
> + rqd->nr_pages = rrqd->nr_pages = nr_pages;
> + rqd->flags = flags;
>
> err = nvm_submit_io(rrpc->dev, rqd);
> if (err) {
> pr_err("rrpc: I/O submission failed: %d\n", err);
> bio_put(bio);
> if (!(flags & NVM_IOTYPE_GC)) {
> - rrpc_unlock_rq(rrpc, rqd);
> + rrpc_unlock_rq(rrpc, rrqd);
> if (rqd->nr_pages > 1)
> nvm_dev_dma_free(rrpc->dev,
> rqd->ppa_list, rqd->dma_ppa_list);
> @@ -895,10 +1113,39 @@ static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> return NVM_IO_OK;
> }
>
> +static int rrpc_buffer_write(struct rrpc *rrpc, struct bio *bio,
> + struct rrpc_rq *rrqd, unsigned long flags)
> +{
> + uint8_t nr_pages = rrpc_get_pages(bio);
> +
> + rrqd->nr_pages = nr_pages;
> +
> + if (nr_pages > 1)
> + return rrpc_write_ppalist_rq(rrpc, bio, rrqd, flags, nr_pages);
> + else

You can skip the else here.

> + return rrpc_write_rq(rrpc, bio, rrqd, flags);
> +}
> +
> +static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> + struct rrpc_rq *rrqd, unsigned long flags)
> +{
> + int bio_size = bio_sectors(bio) << 9;
> +
> + if (bio_size < rrpc->dev->sec_size)
> + return NVM_IO_ERR;
> + else if (bio_size > rrpc->dev->max_rq_size)
> + return NVM_IO_ERR;

I have a patch incoming that removes this. Properly making
rrpc_submit_io obsolete.

> +
> + if (bio_rw(bio) == READ)
> + return rrpc_submit_read(rrpc, bio, rrqd, flags);
> +
> + return rrpc_buffer_write(rrpc, bio, rrqd, flags);
> +}
> +
> static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
> {
> struct rrpc *rrpc = q->queuedata;
> - struct nvm_rq *rqd;
> + struct rrpc_rq *rrqd;
> int err;
>
> if (bio->bi_rw & REQ_DISCARD) {
> @@ -906,15 +1153,15 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
> return BLK_QC_T_NONE;
> }
>
> - rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> - if (!rqd) {
> - pr_err_ratelimited("rrpc: not able to queue bio.");
> + rrqd = mempool_alloc(rrpc->rrq_pool, GFP_KERNEL);
> + if (!rrqd) {
> + pr_err_ratelimited("rrpc: not able to allocate rrqd.");
> bio_io_error(bio);
> return BLK_QC_T_NONE;
> }
> - memset(rqd, 0, sizeof(struct nvm_rq));

It seems like we don't use nvm_rq in the write patch? (the writer thread
will do the write itself). Move this inside read? and use the original
nvm_rq for submission?

> + kref_init(&rrqd->refs);
>
> - err = rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_NONE);
> + err = rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_NONE);
> switch (err) {
> case NVM_IO_OK:
> return BLK_QC_T_NONE;
> @@ -932,10 +1179,221 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
> break;
> }
>
> - mempool_free(rqd, rrpc->rq_pool);
> return BLK_QC_T_NONE;
> }
>
> +static int rrpc_alloc_page_in_bio(struct rrpc *rrpc, struct bio *bio,
> + void *data)
> +{
> + struct page *page;
> + int err;
> +
> + if (PAGE_SIZE != RRPC_EXPOSED_PAGE_SIZE)
> + return -1;

return -EINVAL?

This doesn't work on power and sparch architectures (with larger
PAGE_SIZEs). Would be great to handle that as well.

> +
> + page = virt_to_page(data);
> + if (!page) {
> + pr_err("nvm: rrpc: could not alloc page\n");
> + return -1;
> + }
> +
> + err = bio_add_page(bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
> + if (err != RRPC_EXPOSED_PAGE_SIZE) {
> + pr_err("nvm: rrpc: could not add page to bio\n");
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +static void rrpc_submit_write(struct work_struct *work)
> +{
> + struct rrpc_lun *rlun = container_of(work, struct rrpc_lun, ws_writer);
> + struct rrpc *rrpc = rlun->rrpc;
> + struct nvm_dev *dev = rrpc->dev;
> + struct rrpc_addr *addr;
> + struct rrpc_rq *rrqd;
> + struct rrpc_buf_rq *brrqd;
> + void *data;
> + struct nvm_rq *rqd;
> + struct rrpc_block *rblk;
> + struct bio *bio;
> + int pgs_to_sync, pgs_avail;
> + int sync = NVM_SYNC_HARD;
> + int err;
> + int i;
> +
> + /* Note that OS pages are typically mapped to flash page sectors, which
> + * are 4K; a flash page might be formed of several sectors. Also,
> + * controllers typically program flash pages across multiple planes.
> + * This is the flash programing granurality, and the reason behind the
> + * sync strategy performed in this write thread.
> + */
> +try:
> + spin_lock(&rlun->parent->lock);
> + list_for_each_entry(rblk, &rlun->open_list, list) {
> + if (!spin_trylock(&rblk->w_buf.w_lock))
> + continue;
> +
> + /* If the write thread has already submitted all I/Os in the
> + * write buffer for this block ignore that the block is in the
> + * open list; it is on its way to the closed list. This enables
> + * us to avoid taking a lock on the list.
> + */
> + if (unlikely(rblk->w_buf.cur_subm == rblk->w_buf.nentries)) {
> + spin_unlock(&rblk->w_buf.w_lock);
> + spin_unlock(&rlun->parent->lock);
> + schedule();
> + goto try;
> + }
> + pgs_avail = rblk->w_buf.cur_mem - rblk->w_buf.cur_subm;
> +
> + switch (sync) {
> + case NVM_SYNC_SOFT:
> + pgs_to_sync = (pgs_avail >= rrpc->max_write_pgs) ?
> + rrpc->max_write_pgs : 0;
> + break;
> + case NVM_SYNC_HARD:
> + if (pgs_avail >= rrpc->max_write_pgs)
> + pgs_to_sync = rrpc->max_write_pgs;
> + else if (pgs_avail >= rrpc->min_write_pgs)
> + pgs_to_sync = rrpc->min_write_pgs *
> + (pgs_avail / rrpc->min_write_pgs);
> + else
> + pgs_to_sync = pgs_avail; /* TODO: ADD PADDING */
> + break;
> + case NVM_SYNC_OPORT:
> + if (pgs_avail >= rrpc->max_write_pgs)
> + pgs_to_sync = rrpc->max_write_pgs;
> + else if (pgs_avail >= rrpc->min_write_pgs)
> + pgs_to_sync = rrpc->min_write_pgs *
> + (pgs_avail / rrpc->min_write_pgs);
> + else
> + pgs_to_sync = 0;
> + }
> +
> + if (pgs_to_sync == 0) {
> + spin_unlock(&rblk->w_buf.w_lock);
> + continue;
> + }
> +
> + bio = bio_alloc(GFP_ATOMIC, pgs_to_sync);
> + if (!bio) {
> + pr_err("nvm: rrpc: could not alloc write bio\n");
> + goto out1;
> + }
> +
> + rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> + if (!rqd) {
> + pr_err_ratelimited("rrpc: not able to create w req.");
> + goto out2;
> + }
> + rqd->metadata = NULL;
> +
> + brrqd = mempool_alloc(rrpc->m_rrq_pool, GFP_ATOMIC);
> + if (!brrqd) {
> + pr_err_ratelimited("rrpc: not able to create w rea.");
> + goto out3;
> + }
> +
> + bio->bi_iter.bi_sector = 0; /* artificial bio */
> + bio->bi_rw = WRITE;
> +
> + rqd->opcode = NVM_OP_HBWRITE;
> + rqd->bio = bio;
> + rqd->ins = &rrpc->instance;
> + rqd->nr_pages = pgs_to_sync;
> + rqd->priv = brrqd;
> +
> + if (pgs_to_sync == 1) {
> + rrqd = rblk->w_buf.subm->rrqd;
> + addr = rblk->w_buf.subm->addr;
> + rqd->flags = rblk->w_buf.subm->flags;
> + data = rblk->w_buf.subm->data;
> +
> + err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> + if (err) {
> + pr_err("rrpc: cannot allocate page in bio\n");
> + goto out4;
> + }
> +
> + /* TODO: This address can be skipped */
> + if (addr->addr == ADDR_EMPTY)
> + pr_err_ratelimited("rrpc: submitting empty rq");
> +
> + rqd->ppa_addr = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> + brrqd[0].rrqd = rrqd;
> + brrqd[0].addr = addr;
> +
> + rblk->w_buf.subm++;
> + rblk->w_buf.cur_subm++;
> +
> + goto submit_io;
> + }
> +
> + /* This bio will contain several pppas */
> + rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_ATOMIC,
> + &rqd->dma_ppa_list);
> + if (!rqd->ppa_list) {
> + pr_err("rrpc: not able to allocate ppa list\n");
> + goto out4;
> + }
> +
> + for (i = 0; i < pgs_to_sync; i++) {
> + rrqd = rblk->w_buf.subm->rrqd;
> + addr = rblk->w_buf.subm->addr;
> + rqd->flags = rblk->w_buf.subm->flags;
> + data = rblk->w_buf.subm->data;
> +
> + err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> + if (err) {
> + pr_err("rrpc: cannot allocate page in bio\n");
> + goto out5;
> + }
> +
> + /* TODO: This address can be skipped */
> + if (addr->addr == ADDR_EMPTY)
> + pr_err_ratelimited("rrpc: submitting empty rq");
> +
> + rqd->ppa_list[i] = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> + brrqd[i].rrqd = rrqd;
> + brrqd[i].addr = addr;
> +
> + rblk->w_buf.subm++;
> + rblk->w_buf.cur_subm++;
> + }
> +
> +submit_io:
> + WARN_ON(rblk->w_buf.cur_subm > rblk->w_buf.nentries);
> +
> + spin_unlock(&rblk->w_buf.w_lock);
> +
> + err = nvm_submit_io(dev, rqd);
> + if (err) {
> + pr_err("rrpc: I/O submission failed: %d\n", err);
> + mempool_free(rqd, rrpc->rq_pool);
> + bio_put(bio);
> + }
> + }
> +
> + spin_unlock(&rlun->parent->lock);
> + return;
> +
> +out5:
> + nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +out4:
> + mempool_free(brrqd, rrpc->m_rrq_pool);
> +out3:
> + mempool_free(rqd, rrpc->rq_pool);
> +out2:
> + bio_put(bio);
> +out1:
> + spin_unlock(&rblk->w_buf.w_lock);
> + spin_unlock(&rlun->parent->lock);
> +}
> +

The function can at least be split up in two parts. The what to sync and
the actual write. The ret* labels should be replaced with more sane names.

> static void rrpc_requeue(struct work_struct *work)
> {
> struct rrpc *rrpc = container_of(work, struct rrpc, ws_requeue);
> @@ -978,7 +1436,7 @@ static void rrpc_gc_free(struct rrpc *rrpc)
>
> static int rrpc_gc_init(struct rrpc *rrpc)
> {
> - rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM|WQ_UNBOUND,
> + rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM | WQ_UNBOUND,

No need for reformatting

> rrpc->nr_luns);
> if (!rrpc->krqd_wq)
> return -ENOMEM;
> @@ -1080,6 +1538,8 @@ static int rrpc_map_init(struct rrpc *rrpc)
>
> static int rrpc_core_init(struct rrpc *rrpc)
> {
> + struct nvm_dev *dev = rrpc->dev;
> +
> down_write(&rrpc_lock);
> if (!rrpc_gcb_cache) {
> rrpc_gcb_cache = kmem_cache_create("rrpc_gcb",
> @@ -1089,14 +1549,70 @@ static int rrpc_core_init(struct rrpc *rrpc)
> return -ENOMEM;
> }
>
> - rrpc_rq_cache = kmem_cache_create("rrpc_rq",
> - sizeof(struct nvm_rq) + sizeof(struct rrpc_rq),
> - 0, 0, NULL);
> + rrpc_rq_cache = kmem_cache_create("nvm_rq",
> + sizeof(struct nvm_rq), 0, 0, NULL);
> if (!rrpc_rq_cache) {
> kmem_cache_destroy(rrpc_gcb_cache);
> up_write(&rrpc_lock);
> return -ENOMEM;
> }
> +
> + rrpc_rrq_cache = kmem_cache_create("rrpc_rrq",
> + sizeof(struct rrpc_rq), 0, 0, NULL);
> + if (!rrpc_rrq_cache) {
> + kmem_cache_destroy(rrpc_gcb_cache);
> + kmem_cache_destroy(rrpc_rq_cache);
> + up_write(&rrpc_lock);
> + return -ENOMEM;
> + }
> +
> + rrpc_buf_rrq_cache = kmem_cache_create("rrpc_m_rrq",
> + rrpc->max_write_pgs * sizeof(struct rrpc_buf_rq),
> + 0, 0, NULL);
> + if (!rrpc_buf_rrq_cache) {
> + kmem_cache_destroy(rrpc_gcb_cache);
> + kmem_cache_destroy(rrpc_rq_cache);
> + kmem_cache_destroy(rrpc_rrq_cache);
> + up_write(&rrpc_lock);
> + return -ENOMEM;
> + }
> + }
> +
> + /* we assume that sec->sec_size is the same as the page size exposed by
> + * rrpc (4KB). We need extra logic otherwise
> + */
> + if (!rrpc_block_cache) {
> + /* Write buffer: Allocate all buffer (for all block) at once. We
> + * avoid having to allocate a memory from the pool for each IO
> + * at the cost pre-allocating memory for the whole block when a
> + * new block is allocated from the media manager.
> + */
> + rrpc_wb_cache = kmem_cache_create("nvm_wb",
> + dev->pgs_per_blk * dev->sec_per_pg * dev->sec_size,
> + 0, 0, NULL);

Seems a bit excessive to allocate a mempool for this. Why not do a
vmalloc for when the memory is needed? The normal case is 4MB? per
entry, and the cache creates 8 of them in the pool, using 64MB out of
the gate. We are not in a hot path at this point in time, so lets not
waste the resource when we might not use them.

> + if (!rrpc_wb_cache) {
> + kmem_cache_destroy(rrpc_gcb_cache);
> + kmem_cache_destroy(rrpc_rq_cache);
> + kmem_cache_destroy(rrpc_rrq_cache);
> + kmem_cache_destroy(rrpc_buf_rrq_cache);
> + up_write(&rrpc_lock);
> + return -ENOMEM;
> + }
> +
> + /* Write buffer entries */
> + rrpc_block_cache = kmem_cache_create("nvm_entry",
> + dev->pgs_per_blk * dev->sec_per_pg *
> + sizeof(struct buf_entry),
> + 0, 0, NULL);
> + if (!rrpc_block_cache) {
> + kmem_cache_destroy(rrpc_gcb_cache);
> + kmem_cache_destroy(rrpc_rq_cache);
> + kmem_cache_destroy(rrpc_rrq_cache);
> + kmem_cache_destroy(rrpc_buf_rrq_cache);
> + kmem_cache_destroy(rrpc_wb_cache);
> + up_write(&rrpc_lock);
> + return -ENOMEM;
> + }
> }
> up_write(&rrpc_lock);
>
> @@ -1113,17 +1629,45 @@ static int rrpc_core_init(struct rrpc *rrpc)
> if (!rrpc->rq_pool)
> return -ENOMEM;
>
> + rrpc->rrq_pool = mempool_create_slab_pool(64, rrpc_rrq_cache);
> + if (!rrpc->rrq_pool)
> + return -ENOMEM;
> +
> + rrpc->m_rrq_pool = mempool_create_slab_pool(64, rrpc_buf_rrq_cache);
> + if (!rrpc->m_rrq_pool)
> + return -ENOMEM;
> +
> + rrpc->block_pool = mempool_create_slab_pool(8, rrpc_block_cache);
> + if (!rrpc->block_pool)
> + return -ENOMEM;
> +
> + rrpc->write_buf_pool = mempool_create_slab_pool(8, rrpc_wb_cache);
> + if (!rrpc->write_buf_pool)
> + return -ENOMEM;

Alot of pools. I hope the nvm_rq, rrpc_rq, and rrpc_buf_rq consolidation
can move it back to just a nvm_rq entry)

> +
> spin_lock_init(&rrpc->inflights.lock);
> INIT_LIST_HEAD(&rrpc->inflights.reqs);
>
> + rrpc->kw_wq = alloc_workqueue("rrpc-writer",
> + WQ_MEM_RECLAIM | WQ_UNBOUND, rrpc->nr_luns);
> + if (!rrpc->kw_wq)
> + return -ENOMEM;
> +
> return 0;
> }
>
> static void rrpc_core_free(struct rrpc *rrpc)
> {
> + if (rrpc->kw_wq)
> + destroy_workqueue(rrpc->kw_wq);
> +
> mempool_destroy(rrpc->page_pool);
> mempool_destroy(rrpc->gcb_pool);
> + mempool_destroy(rrpc->m_rrq_pool);
> + mempool_destroy(rrpc->rrq_pool);
> mempool_destroy(rrpc->rq_pool);
> + mempool_destroy(rrpc->block_pool);
> + mempool_destroy(rrpc->write_buf_pool);
> }
>
> static void rrpc_luns_free(struct rrpc *rrpc)
> @@ -1164,6 +1708,8 @@ static int rrpc_luns_init(struct rrpc *rrpc, int lun_begin, int lun_end)
> INIT_LIST_HEAD(&rlun->open_list);
> INIT_LIST_HEAD(&rlun->closed_list);
>
> + INIT_WORK(&rlun->ws_writer, rrpc_submit_write);
> +
> INIT_WORK(&rlun->ws_gc, rrpc_lun_gc);
> spin_lock_init(&rlun->lock);
>
> @@ -1209,6 +1755,7 @@ static void rrpc_exit(void *private)
>
> flush_workqueue(rrpc->krqd_wq);
> flush_workqueue(rrpc->kgc_wq);
> + /* flush_workqueue(rrpc->kw_wq); */ /* TODO: Implement flush + padding*/

This can come in an additional patch.

>
> rrpc_free(rrpc);
> }
> diff --git a/drivers/lightnvm/rrpc.h b/drivers/lightnvm/rrpc.h
> index 868e91a..6e188b4 100644
> --- a/drivers/lightnvm/rrpc.h
> +++ b/drivers/lightnvm/rrpc.h
> @@ -49,17 +49,67 @@ struct rrpc_inflight_rq {
> struct rrpc_rq {
> struct rrpc_inflight_rq inflight_rq;
> struct rrpc_addr *addr;
> + int nr_pages;
> +
> + struct kref refs;

I think what you really want to do here is to add the ref counter to
rrpc_inflight_addr and then have that maintain it to when to free.
(also, name it kref)

> +};
> +
> +struct rrpc_buf_rq {
> + struct rrpc_addr *addr;
> + struct rrpc_rq *rrqd;

I'm not sure why we keep a rrpc_buf_rq->addr and rrpc_rq->addr?

> +};
> +
> +/* Sync strategies from write buffer to media */
> +enum {
> + NVM_SYNC_SOFT = 0x0, /* Only submit at max_write_pgs
> + * supported by the device. Typically 64
> + * pages (256k).
> + */
> + NVM_SYNC_HARD = 0x1, /* Submit the whole buffer. Add padding
> + * if necessary to respect the device's
> + * min_write_pgs.
> + */
> + NVM_SYNC_OPORT = 0x2, /* Submit what we can, always respecting
> + * the device's min_write_pgs.
> + */

That is great. This should properly be exposed as a sysfs option later.
> +};
> +
> +struct buf_entry {
> + struct rrpc_rq *rrqd;
> + void *data;
> + struct rrpc_addr *addr;
> unsigned long flags;
> };
>
> +struct rrpc_w_buf {
> + struct buf_entry *entries; /* Entries */
> + struct buf_entry *mem; /* Points to the next writable entry */
> + struct buf_entry *subm; /* Points to the last submitted entry */
> + int cur_mem; /* Current memory entry. Follows mem */
> + int cur_subm; /* Entries have been submitted to dev */
> + int nentries; /* Number of entries in write buffer */

nr_entries?

> +
> + void *data; /* Actual data */
> + unsigned long *sync_bitmap; /* Bitmap representing physical
> + * addresses that have been synced to
> + * the media
> + */
> +
> + atomic_t refs;

kref? semaphore?

> +
> + spinlock_t w_lock;
> + spinlock_t s_lock;

is it short for submission lock?

> +};
> +
> struct rrpc_block {
> struct nvm_block *parent;
> struct rrpc_lun *rlun;
> struct list_head prio;
> struct list_head list;
> + struct rrpc_w_buf w_buf;
>
> #define MAX_INVALID_PAGES_STORAGE 8
> - /* Bitmap for invalid page intries */
> + /* Bitmap for invalid page entries */
> unsigned long invalid_pages[MAX_INVALID_PAGES_STORAGE];
> /* points to the next writable page within a block */
> unsigned int next_page;
> @@ -67,7 +117,6 @@ struct rrpc_block {
> unsigned int nr_invalid_pages;
>
> spinlock_t lock;
> - atomic_t data_cmnt_size; /* data pages committed to stable storage */
> };
>
> struct rrpc_lun {
> @@ -86,6 +135,7 @@ struct rrpc_lun {
> */
>
> struct work_struct ws_gc;
> + struct work_struct ws_writer;
>
> spinlock_t lock;
> };
> @@ -136,10 +186,15 @@ struct rrpc {
> mempool_t *page_pool;
> mempool_t *gcb_pool;
> mempool_t *rq_pool;
> + mempool_t *rrq_pool;
> + mempool_t *m_rrq_pool;
> + mempool_t *block_pool;
> + mempool_t *write_buf_pool;
>
> struct timer_list gc_timer;
> struct workqueue_struct *krqd_wq;
> struct workqueue_struct *kgc_wq;
> + struct workqueue_struct *kw_wq;
> };
>
> struct rrpc_block_gc {
> @@ -181,7 +236,7 @@ static inline int request_intersects(struct rrpc_inflight_rq *r,
> }
>
> static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> - unsigned pages, struct rrpc_inflight_rq *r)
> + unsigned pages, struct rrpc_inflight_rq *r)
> {
> sector_t laddr_end = laddr + pages - 1;
> struct rrpc_inflight_rq *rtmp;
> @@ -206,27 +261,26 @@ static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> }
>
> static inline int rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> - unsigned pages,
> - struct rrpc_inflight_rq *r)
> + unsigned pages,
> + struct rrpc_inflight_rq *r)
> {
> BUG_ON((laddr + pages) > rrpc->nr_sects);
>
> return __rrpc_lock_laddr(rrpc, laddr, pages, r);
> }
>
> -static inline struct rrpc_inflight_rq *rrpc_get_inflight_rq(struct nvm_rq *rqd)
> +static inline struct rrpc_inflight_rq
> + *rrpc_get_inflight_rq(struct rrpc_rq *rrqd)
> {
> - struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> -
> return &rrqd->inflight_rq;
> }
>
> static inline int rrpc_lock_rq(struct rrpc *rrpc, struct bio *bio,
> - struct nvm_rq *rqd)
> + struct rrpc_rq *rrqd)
> {
> sector_t laddr = rrpc_get_laddr(bio);
> unsigned int pages = rrpc_get_pages(bio);
> - struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> + struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>
> return rrpc_lock_laddr(rrpc, laddr, pages, r);
> }
> @@ -241,12 +295,12 @@ static inline void rrpc_unlock_laddr(struct rrpc *rrpc,
> spin_unlock_irqrestore(&rrpc->inflights.lock, flags);
> }
>
> -static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct nvm_rq *rqd)
> +static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct rrpc_rq *rrqd)
> {
> - struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> - uint8_t pages = rqd->nr_pages;
> + struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> + uint8_t nr_pages = rrqd->nr_pages;

The cleanups can be moved to another patch.

>
> - BUG_ON((r->l_start + pages) > rrpc->nr_sects);
> + BUG_ON((r->l_start + nr_pages) > rrpc->nr_sects);
>
> rrpc_unlock_laddr(rrpc, r);
> }
> diff --git a/include/linux/lightnvm.h b/include/linux/lightnvm.h
> index b94f2d5..eda9743 100644
> --- a/include/linux/lightnvm.h
> +++ b/include/linux/lightnvm.h
> @@ -231,6 +231,8 @@ struct nvm_rq {
> void *metadata;
> dma_addr_t dma_metadata;
>
> + void *priv;
> +
> struct completion *wait;
> nvm_end_io_fn *end_io;
>
> @@ -243,12 +245,12 @@ struct nvm_rq {
>
> static inline struct nvm_rq *nvm_rq_from_pdu(void *pdu)
> {
> - return pdu - sizeof(struct nvm_rq);
> + return container_of(pdu, struct nvm_rq, priv);
> }
>
> static inline void *nvm_rq_to_pdu(struct nvm_rq *rqdata)
> {
> - return rqdata + 1;
> + return rqdata->priv;
> }

I think the priv is unnecessary. This should be able to be expressed in
a single structure. E.g. have the rrpc_rq be a union of the data types
you either need when processing a read or a write. It seems that they no
longer have much in common.