[PATCH 26/56] ocfs2: Implement quota recovery

From: Mark Fasheh
Date: Mon Dec 22 2008 - 16:58:33 EST


From: Jan Kara <jack@xxxxxxx>

Implement functions for recovery after a crash. Functions just
read local quota file and sync info to global quota file.

Signed-off-by: Jan Kara <jack@xxxxxxx>
Signed-off-by: Mark Fasheh <mfasheh@xxxxxxxx>
---
fs/ocfs2/journal.c | 108 ++++++++++---
fs/ocfs2/journal.h | 1 +
fs/ocfs2/ocfs2.h | 4 +-
fs/ocfs2/quota.h | 21 +++
fs/ocfs2/quota_global.c | 1 -
fs/ocfs2/quota_local.c | 425 ++++++++++++++++++++++++++++++++++++++++++++++-
6 files changed, 528 insertions(+), 32 deletions(-)

diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index 11a1178..c602420 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -45,6 +45,7 @@
#include "slot_map.h"
#include "super.h"
#include "sysfile.h"
+#include "quota.h"

#include "buffer_head_io.h"

@@ -52,7 +53,7 @@ DEFINE_SPINLOCK(trans_inc_lock);

static int ocfs2_force_read_journal(struct inode *inode);
static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num);
+ int node_num, int slot_num);
static int __ocfs2_recovery_thread(void *arg);
static int ocfs2_commit_cache(struct ocfs2_super *osb);
static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
@@ -857,6 +858,7 @@ struct ocfs2_la_recovery_item {
int lri_slot;
struct ocfs2_dinode *lri_la_dinode;
struct ocfs2_dinode *lri_tl_dinode;
+ struct ocfs2_quota_recovery *lri_qrec;
};

/* Does the second half of the recovery process. By this point, the
@@ -877,6 +879,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
struct ocfs2_super *osb = journal->j_osb;
struct ocfs2_dinode *la_dinode, *tl_dinode;
struct ocfs2_la_recovery_item *item, *n;
+ struct ocfs2_quota_recovery *qrec;
LIST_HEAD(tmp_la_list);

mlog_entry_void();
@@ -922,6 +925,16 @@ void ocfs2_complete_recovery(struct work_struct *work)
if (ret < 0)
mlog_errno(ret);

+ qrec = item->lri_qrec;
+ if (qrec) {
+ mlog(0, "Recovering quota files");
+ ret = ocfs2_finish_quota_recovery(osb, qrec,
+ item->lri_slot);
+ if (ret < 0)
+ mlog_errno(ret);
+ /* Recovery info is already freed now */
+ }
+
kfree(item);
}

@@ -935,7 +948,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
int slot_num,
struct ocfs2_dinode *la_dinode,
- struct ocfs2_dinode *tl_dinode)
+ struct ocfs2_dinode *tl_dinode,
+ struct ocfs2_quota_recovery *qrec)
{
struct ocfs2_la_recovery_item *item;

@@ -950,6 +964,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
if (tl_dinode)
kfree(tl_dinode);

+ if (qrec)
+ ocfs2_free_quota_recovery(qrec);
+
mlog_errno(-ENOMEM);
return;
}
@@ -958,6 +975,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
item->lri_la_dinode = la_dinode;
item->lri_slot = slot_num;
item->lri_tl_dinode = tl_dinode;
+ item->lri_qrec = qrec;

spin_lock(&journal->j_lock);
list_add_tail(&item->lri_list, &journal->j_la_cleanups);
@@ -977,6 +995,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
ocfs2_queue_recovery_completion(journal,
osb->slot_num,
osb->local_alloc_copy,
+ NULL,
NULL);
ocfs2_schedule_truncate_log_flush(osb, 0);

@@ -985,11 +1004,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
}
}

+void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
+{
+ if (osb->quota_rec) {
+ ocfs2_queue_recovery_completion(osb->journal,
+ osb->slot_num,
+ NULL,
+ NULL,
+ osb->quota_rec);
+ osb->quota_rec = NULL;
+ }
+}
+
static int __ocfs2_recovery_thread(void *arg)
{
- int status, node_num;
+ int status, node_num, slot_num;
struct ocfs2_super *osb = arg;
struct ocfs2_recovery_map *rm = osb->recovery_map;
+ int *rm_quota = NULL;
+ int rm_quota_used = 0, i;
+ struct ocfs2_quota_recovery *qrec;

mlog_entry_void();

@@ -998,6 +1032,11 @@ static int __ocfs2_recovery_thread(void *arg)
goto bail;
}

+ rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
+ if (!rm_quota) {
+ status = -ENOMEM;
+ goto bail;
+ }
restart:
status = ocfs2_super_lock(osb, 1);
if (status < 0) {
@@ -1011,8 +1050,28 @@ restart:
* clear it until ocfs2_recover_node() has succeeded. */
node_num = rm->rm_entries[0];
spin_unlock(&osb->osb_lock);
-
- status = ocfs2_recover_node(osb, node_num);
+ mlog(0, "checking node %d\n", node_num);
+ slot_num = ocfs2_node_num_to_slot(osb, node_num);
+ if (slot_num == -ENOENT) {
+ status = 0;
+ mlog(0, "no slot for this node, so no recovery"
+ "required.\n");
+ goto skip_recovery;
+ }
+ mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+
+ /* It is a bit subtle with quota recovery. We cannot do it
+ * immediately because we have to obtain cluster locks from
+ * quota files and we also don't want to just skip it because
+ * then quota usage would be out of sync until some node takes
+ * the slot. So we remember which nodes need quota recovery
+ * and when everything else is done, we recover quotas. */
+ for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+ if (i == rm_quota_used)
+ rm_quota[rm_quota_used++] = slot_num;
+
+ status = ocfs2_recover_node(osb, node_num, slot_num);
+skip_recovery:
if (!status) {
ocfs2_recovery_map_clear(osb, node_num);
} else {
@@ -1034,13 +1093,27 @@ restart:
if (status < 0)
mlog_errno(status);

+ /* Now it is right time to recover quotas... We have to do this under
+ * superblock lock so that noone can start using the slot (and crash)
+ * before we recover it */
+ for (i = 0; i < rm_quota_used; i++) {
+ qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
+ if (IS_ERR(qrec)) {
+ status = PTR_ERR(qrec);
+ mlog_errno(status);
+ continue;
+ }
+ ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
+ NULL, NULL, qrec);
+ }
+
ocfs2_super_unlock(osb, 1);

/* We always run recovery on our own orphan dir - the dead
* node(s) may have disallowd a previos inode delete. Re-processing
* is therefore required. */
ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
- NULL);
+ NULL, NULL);

bail:
mutex_lock(&osb->recovery_lock);
@@ -1055,6 +1128,9 @@ bail:

mutex_unlock(&osb->recovery_lock);

+ if (rm_quota)
+ kfree(rm_quota);
+
mlog_exit(status);
/* no one is callint kthread_stop() for us so the kthread() api
* requires that we call do_exit(). And it isn't exported, but
@@ -1282,31 +1358,19 @@ done:
* far less concerning.
*/
static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num)
+ int node_num, int slot_num)
{
int status = 0;
- int slot_num;
struct ocfs2_dinode *la_copy = NULL;
struct ocfs2_dinode *tl_copy = NULL;

- mlog_entry("(node_num=%d, osb->node_num = %d)\n",
- node_num, osb->node_num);
-
- mlog(0, "checking node %d\n", node_num);
+ mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
+ node_num, slot_num, osb->node_num);

/* Should not ever be called to recover ourselves -- in that
* case we should've called ocfs2_journal_load instead. */
BUG_ON(osb->node_num == node_num);

- slot_num = ocfs2_node_num_to_slot(osb, node_num);
- if (slot_num == -ENOENT) {
- status = 0;
- mlog(0, "no slot for this node, so no recovery required.\n");
- goto done;
- }
-
- mlog(0, "node %d was using slot %d\n", node_num, slot_num);
-
status = ocfs2_replay_journal(osb, node_num, slot_num);
if (status < 0) {
if (status == -EBUSY) {
@@ -1342,7 +1406,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,

/* This will kfree the memory pointed to by la_copy and tl_copy */
ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
- tl_copy);
+ tl_copy, NULL);

status = 0;
done:
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index ee08e9c..37013bf 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -168,6 +168,7 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb,
int node_num);
int ocfs2_mark_dead_nodes(struct ocfs2_super *osb);
void ocfs2_complete_mount_recovery(struct ocfs2_super *osb);
+void ocfs2_complete_quota_recovery(struct ocfs2_super *osb);

static inline void ocfs2_start_checkpoint(struct ocfs2_super *osb)
{
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index f04b229..6b25b4a 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -206,6 +206,7 @@ enum ocfs2_mount_options
struct ocfs2_journal;
struct ocfs2_slot_info;
struct ocfs2_recovery_map;
+struct ocfs2_quota_recovery;
struct ocfs2_super
{
struct task_struct *commit_task;
@@ -287,10 +288,11 @@ struct ocfs2_super
char *local_alloc_debug_buf;
#endif

- /* Next two fields are for local node slot recovery during
+ /* Next three fields are for local node slot recovery during
* mount. */
int dirty;
struct ocfs2_dinode *local_alloc_copy;
+ struct ocfs2_quota_recovery *quota_rec;

struct ocfs2_alloc_stats alloc_stats;
char dev_str[20]; /* "major,minor" of the device */
diff --git a/fs/ocfs2/quota.h b/fs/ocfs2/quota.h
index 11cdff1..84c50a1 100644
--- a/fs/ocfs2/quota.h
+++ b/fs/ocfs2/quota.h
@@ -34,6 +34,17 @@ struct ocfs2_dquot {
s64 dq_originodes; /* Last globally synced inode usage */
};

+/* Description of one chunk to recover in memory */
+struct ocfs2_recovery_chunk {
+ struct list_head rc_list; /* List of chunks */
+ int rc_chunk; /* Chunk number */
+ unsigned long *rc_bitmap; /* Bitmap of entries to recover */
+};
+
+struct ocfs2_quota_recovery {
+ struct list_head r_list[MAXQUOTAS]; /* List of chunks to recover */
+};
+
/* In-memory structure with quota header information */
struct ocfs2_mem_dqinfo {
unsigned int dqi_type; /* Quota type this structure describes */
@@ -50,6 +61,10 @@ struct ocfs2_mem_dqinfo {
struct buffer_head *dqi_ibh; /* Buffer with information header */
struct qtree_mem_dqinfo dqi_gi; /* Info about global file */
struct timer_list dqi_sync_timer; /* Timer for syncing dquots */
+ struct ocfs2_quota_recovery *dqi_rec; /* Pointer to recovery
+ * information, in case we
+ * enable quotas on file
+ * needing it */
};

static inline struct ocfs2_dquot *OCFS2_DQUOT(struct dquot *dquot)
@@ -68,6 +83,12 @@ extern struct kmem_cache *ocfs2_qf_chunk_cachep;

extern struct qtree_fmt_operations ocfs2_global_ops;

+struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery(
+ struct ocfs2_super *osb, int slot_num);
+int ocfs2_finish_quota_recovery(struct ocfs2_super *osb,
+ struct ocfs2_quota_recovery *rec,
+ int slot_num);
+void ocfs2_free_quota_recovery(struct ocfs2_quota_recovery *rec);
ssize_t ocfs2_quota_read(struct super_block *sb, int type, char *data,
size_t len, loff_t off);
ssize_t ocfs2_quota_write(struct super_block *sb, int type,
diff --git a/fs/ocfs2/quota_global.c b/fs/ocfs2/quota_global.c
index 4a5bc09..d2a5bfa 100644
--- a/fs/ocfs2/quota_global.c
+++ b/fs/ocfs2/quota_global.c
@@ -85,7 +85,6 @@ struct qtree_fmt_operations ocfs2_global_ops = {
.is_id = ocfs2_global_is_id,
};

-
struct buffer_head *ocfs2_read_quota_block(struct inode *inode,
int block, int *err)
{
diff --git a/fs/ocfs2/quota_local.c b/fs/ocfs2/quota_local.c
index 1db7a16..54e8788 100644
--- a/fs/ocfs2/quota_local.c
+++ b/fs/ocfs2/quota_local.c
@@ -49,14 +49,25 @@ static unsigned int ol_quota_chunk_block(struct super_block *sb, int c)
return 1 + (ol_chunk_blocks(sb) + 1) * c;
}

-/* Offset of the dquot structure in the quota file */
-static loff_t ol_dqblk_off(struct super_block *sb, int c, int off)
+static unsigned int ol_dqblk_block(struct super_block *sb, int c, int off)
+{
+ int epb = ol_quota_entries_per_block(sb);
+
+ return ol_quota_chunk_block(sb, c) + 1 + off / epb;
+}
+
+static unsigned int ol_dqblk_block_off(struct super_block *sb, int c, int off)
{
int epb = ol_quota_entries_per_block(sb);

- return ((ol_quota_chunk_block(sb, c) + 1 + off / epb)
- << sb->s_blocksize_bits) +
- (off % epb) * sizeof(struct ocfs2_local_disk_dqblk);
+ return (off % epb) * sizeof(struct ocfs2_local_disk_dqblk);
+}
+
+/* Offset of the dquot structure in the quota file */
+static loff_t ol_dqblk_off(struct super_block *sb, int c, int off)
+{
+ return (ol_dqblk_block(sb, c, off) << sb->s_blocksize_bits) +
+ ol_dqblk_block_off(sb, c, off);
}

/* Compute block number from given offset */
@@ -253,6 +264,379 @@ static void olq_update_info(struct buffer_head *bh, void *private)
spin_unlock(&dq_data_lock);
}

+static int ocfs2_add_recovery_chunk(struct super_block *sb,
+ struct ocfs2_local_disk_chunk *dchunk,
+ int chunk,
+ struct list_head *head)
+{
+ struct ocfs2_recovery_chunk *rc;
+
+ rc = kmalloc(sizeof(struct ocfs2_recovery_chunk), GFP_NOFS);
+ if (!rc)
+ return -ENOMEM;
+ rc->rc_chunk = chunk;
+ rc->rc_bitmap = kmalloc(sb->s_blocksize, GFP_NOFS);
+ if (!rc->rc_bitmap) {
+ kfree(rc);
+ return -ENOMEM;
+ }
+ memcpy(rc->rc_bitmap, dchunk->dqc_bitmap,
+ (ol_chunk_entries(sb) + 7) >> 3);
+ list_add_tail(&rc->rc_list, head);
+ return 0;
+}
+
+static void free_recovery_list(struct list_head *head)
+{
+ struct ocfs2_recovery_chunk *next;
+ struct ocfs2_recovery_chunk *rchunk;
+
+ list_for_each_entry_safe(rchunk, next, head, rc_list) {
+ list_del(&rchunk->rc_list);
+ kfree(rchunk->rc_bitmap);
+ kfree(rchunk);
+ }
+}
+
+void ocfs2_free_quota_recovery(struct ocfs2_quota_recovery *rec)
+{
+ int type;
+
+ for (type = 0; type < MAXQUOTAS; type++)
+ free_recovery_list(&(rec->r_list[type]));
+ kfree(rec);
+}
+
+/* Load entries in our quota file we have to recover*/
+static int ocfs2_recovery_load_quota(struct inode *lqinode,
+ struct ocfs2_local_disk_dqinfo *ldinfo,
+ int type,
+ struct list_head *head)
+{
+ struct super_block *sb = lqinode->i_sb;
+ struct buffer_head *hbh;
+ struct ocfs2_local_disk_chunk *dchunk;
+ int i, chunks = le32_to_cpu(ldinfo->dqi_chunks);
+ int status = 0;
+
+ for (i = 0; i < chunks; i++) {
+ hbh = ocfs2_read_quota_block(lqinode,
+ ol_quota_chunk_block(sb, i),
+ &status);
+ if (!hbh) {
+ mlog_errno(status);
+ break;
+ }
+ dchunk = (struct ocfs2_local_disk_chunk *)hbh->b_data;
+ if (le32_to_cpu(dchunk->dqc_free) < ol_chunk_entries(sb))
+ status = ocfs2_add_recovery_chunk(sb, dchunk, i, head);
+ brelse(hbh);
+ if (status < 0)
+ break;
+ }
+ if (status < 0)
+ free_recovery_list(head);
+ return status;
+}
+
+static struct ocfs2_quota_recovery *ocfs2_alloc_quota_recovery(void)
+{
+ int type;
+ struct ocfs2_quota_recovery *rec;
+
+ rec = kmalloc(sizeof(struct ocfs2_quota_recovery), GFP_NOFS);
+ if (!rec)
+ return NULL;
+ for (type = 0; type < MAXQUOTAS; type++)
+ INIT_LIST_HEAD(&(rec->r_list[type]));
+ return rec;
+}
+
+/* Load information we need for quota recovery into memory */
+struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery(
+ struct ocfs2_super *osb,
+ int slot_num)
+{
+ unsigned int feature[MAXQUOTAS] = { OCFS2_FEATURE_RO_COMPAT_USRQUOTA,
+ OCFS2_FEATURE_RO_COMPAT_GRPQUOTA};
+ unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE,
+ LOCAL_GROUP_QUOTA_SYSTEM_INODE };
+ struct super_block *sb = osb->sb;
+ struct ocfs2_local_disk_dqinfo *ldinfo;
+ struct inode *lqinode;
+ struct buffer_head *bh;
+ int type;
+ int status = 0;
+ struct ocfs2_quota_recovery *rec;
+
+ mlog(ML_NOTICE, "Beginning quota recovery in slot %u\n", slot_num);
+ rec = ocfs2_alloc_quota_recovery();
+ if (!rec)
+ return ERR_PTR(-ENOMEM);
+ /* First init... */
+
+ for (type = 0; type < MAXQUOTAS; type++) {
+ if (!OCFS2_HAS_RO_COMPAT_FEATURE(sb, feature[type]))
+ continue;
+ /* At this point, journal of the slot is already replayed so
+ * we can trust metadata and data of the quota file */
+ lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num);
+ if (!lqinode) {
+ status = -ENOENT;
+ goto out;
+ }
+ status = ocfs2_inode_lock_full(lqinode, NULL, 1,
+ OCFS2_META_LOCK_RECOVERY);
+ if (status < 0) {
+ mlog_errno(status);
+ goto out_put;
+ }
+ /* Now read local header */
+ bh = ocfs2_read_quota_block(lqinode, 0, &status);
+ if (!bh) {
+ mlog_errno(status);
+ mlog(ML_ERROR, "failed to read quota file info header "
+ "(slot=%d type=%d)\n", slot_num, type);
+ goto out_lock;
+ }
+ ldinfo = (struct ocfs2_local_disk_dqinfo *)(bh->b_data +
+ OCFS2_LOCAL_INFO_OFF);
+ status = ocfs2_recovery_load_quota(lqinode, ldinfo, type,
+ &rec->r_list[type]);
+ brelse(bh);
+out_lock:
+ ocfs2_inode_unlock(lqinode, 1);
+out_put:
+ iput(lqinode);
+ if (status < 0)
+ break;
+ }
+out:
+ if (status < 0) {
+ ocfs2_free_quota_recovery(rec);
+ rec = ERR_PTR(status);
+ }
+ return rec;
+}
+
+/* Sync changes in local quota file into global quota file and
+ * reinitialize local quota file.
+ * The function expects local quota file to be already locked and
+ * dqonoff_mutex locked. */
+static int ocfs2_recover_local_quota_file(struct inode *lqinode,
+ int type,
+ struct ocfs2_quota_recovery *rec)
+{
+ struct super_block *sb = lqinode->i_sb;
+ struct ocfs2_mem_dqinfo *oinfo = sb_dqinfo(sb, type)->dqi_priv;
+ struct ocfs2_local_disk_chunk *dchunk;
+ struct ocfs2_local_disk_dqblk *dqblk;
+ struct dquot *dquot;
+ handle_t *handle;
+ struct buffer_head *hbh = NULL, *qbh = NULL;
+ int status = 0;
+ int bit, chunk;
+ struct ocfs2_recovery_chunk *rchunk, *next;
+ qsize_t spacechange, inodechange;
+
+ mlog_entry("ino=%lu type=%u", (unsigned long)lqinode->i_ino, type);
+
+ status = ocfs2_lock_global_qf(oinfo, 1);
+ if (status < 0)
+ goto out;
+
+ list_for_each_entry_safe(rchunk, next, &(rec->r_list[type]), rc_list) {
+ chunk = rchunk->rc_chunk;
+ hbh = ocfs2_read_quota_block(lqinode,
+ ol_quota_chunk_block(sb, chunk),
+ &status);
+ if (!hbh) {
+ mlog_errno(status);
+ break;
+ }
+ dchunk = (struct ocfs2_local_disk_chunk *)hbh->b_data;
+ for_each_bit(bit, rchunk->rc_bitmap, ol_chunk_entries(sb)) {
+ qbh = ocfs2_read_quota_block(lqinode,
+ ol_dqblk_block(sb, chunk, bit),
+ &status);
+ if (!qbh) {
+ mlog_errno(status);
+ break;
+ }
+ dqblk = (struct ocfs2_local_disk_dqblk *)(qbh->b_data +
+ ol_dqblk_block_off(sb, chunk, bit));
+ dquot = dqget(sb, le64_to_cpu(dqblk->dqb_id), type);
+ if (!dquot) {
+ status = -EIO;
+ mlog(ML_ERROR, "Failed to get quota structure "
+ "for id %u, type %d. Cannot finish quota "
+ "file recovery.\n",
+ (unsigned)le64_to_cpu(dqblk->dqb_id),
+ type);
+ goto out_put_bh;
+ }
+ handle = ocfs2_start_trans(OCFS2_SB(sb),
+ OCFS2_QSYNC_CREDITS);
+ if (IS_ERR(handle)) {
+ status = PTR_ERR(handle);
+ mlog_errno(status);
+ goto out_put_dquot;
+ }
+ mutex_lock(&sb_dqopt(sb)->dqio_mutex);
+ spin_lock(&dq_data_lock);
+ /* Add usage from quota entry into quota changes
+ * of our node. Auxiliary variables are important
+ * due to signedness */
+ spacechange = le64_to_cpu(dqblk->dqb_spacemod);
+ inodechange = le64_to_cpu(dqblk->dqb_inodemod);
+ dquot->dq_dqb.dqb_curspace += spacechange;
+ dquot->dq_dqb.dqb_curinodes += inodechange;
+ spin_unlock(&dq_data_lock);
+ /* We want to drop reference held by the crashed
+ * node. Since we have our own reference we know
+ * global structure actually won't be freed. */
+ status = ocfs2_global_release_dquot(dquot);
+ if (status < 0) {
+ mlog_errno(status);
+ goto out_commit;
+ }
+ /* Release local quota file entry */
+ status = ocfs2_journal_access(handle, lqinode,
+ qbh, OCFS2_JOURNAL_ACCESS_WRITE);
+ if (status < 0) {
+ mlog_errno(status);
+ goto out_commit;
+ }
+ lock_buffer(qbh);
+ WARN_ON(!ocfs2_test_bit(bit, dchunk->dqc_bitmap));
+ ocfs2_clear_bit(bit, dchunk->dqc_bitmap);
+ le32_add_cpu(&dchunk->dqc_free, 1);
+ unlock_buffer(qbh);
+ status = ocfs2_journal_dirty(handle, qbh);
+ if (status < 0)
+ mlog_errno(status);
+out_commit:
+ mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
+ ocfs2_commit_trans(OCFS2_SB(sb), handle);
+out_put_dquot:
+ dqput(dquot);
+out_put_bh:
+ brelse(qbh);
+ if (status < 0)
+ break;
+ }
+ brelse(hbh);
+ list_del(&rchunk->rc_list);
+ kfree(rchunk->rc_bitmap);
+ kfree(rchunk);
+ if (status < 0)
+ break;
+ }
+ ocfs2_unlock_global_qf(oinfo, 1);
+out:
+ if (status < 0)
+ free_recovery_list(&(rec->r_list[type]));
+ mlog_exit(status);
+ return status;
+}
+
+/* Recover local quota files for given node different from us */
+int ocfs2_finish_quota_recovery(struct ocfs2_super *osb,
+ struct ocfs2_quota_recovery *rec,
+ int slot_num)
+{
+ unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE,
+ LOCAL_GROUP_QUOTA_SYSTEM_INODE };
+ struct super_block *sb = osb->sb;
+ struct ocfs2_local_disk_dqinfo *ldinfo;
+ struct buffer_head *bh;
+ handle_t *handle;
+ int type;
+ int status = 0;
+ struct inode *lqinode;
+ unsigned int flags;
+
+ mlog(ML_NOTICE, "Finishing quota recovery in slot %u\n", slot_num);
+ mutex_lock(&sb_dqopt(sb)->dqonoff_mutex);
+ for (type = 0; type < MAXQUOTAS; type++) {
+ if (list_empty(&(rec->r_list[type])))
+ continue;
+ mlog(0, "Recovering quota in slot %d\n", slot_num);
+ lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num);
+ if (!lqinode) {
+ status = -ENOENT;
+ goto out;
+ }
+ status = ocfs2_inode_lock_full(lqinode, NULL, 1,
+ OCFS2_META_LOCK_NOQUEUE);
+ /* Someone else is holding the lock? Then he must be
+ * doing the recovery. Just skip the file... */
+ if (status == -EAGAIN) {
+ mlog(ML_NOTICE, "skipping quota recovery for slot %d "
+ "because quota file is locked.\n", slot_num);
+ status = 0;
+ goto out_put;
+ } else if (status < 0) {
+ mlog_errno(status);
+ goto out_put;
+ }
+ /* Now read local header */
+ bh = ocfs2_read_quota_block(lqinode, 0, &status);
+ if (!bh) {
+ mlog_errno(status);
+ mlog(ML_ERROR, "failed to read quota file info header "
+ "(slot=%d type=%d)\n", slot_num, type);
+ goto out_lock;
+ }
+ ldinfo = (struct ocfs2_local_disk_dqinfo *)(bh->b_data +
+ OCFS2_LOCAL_INFO_OFF);
+ /* Is recovery still needed? */
+ flags = le32_to_cpu(ldinfo->dqi_flags);
+ if (!(flags & OLQF_CLEAN))
+ status = ocfs2_recover_local_quota_file(lqinode,
+ type,
+ rec);
+ /* We don't want to mark file as clean when it is actually
+ * active */
+ if (slot_num == osb->slot_num)
+ goto out_bh;
+ /* Mark quota file as clean if we are recovering quota file of
+ * some other node. */
+ handle = ocfs2_start_trans(osb, 1);
+ if (IS_ERR(handle)) {
+ status = PTR_ERR(handle);
+ mlog_errno(status);
+ goto out_bh;
+ }
+ status = ocfs2_journal_access(handle, lqinode, bh,
+ OCFS2_JOURNAL_ACCESS_WRITE);
+ if (status < 0) {
+ mlog_errno(status);
+ goto out_trans;
+ }
+ lock_buffer(bh);
+ ldinfo->dqi_flags = cpu_to_le32(flags | OLQF_CLEAN);
+ unlock_buffer(bh);
+ status = ocfs2_journal_dirty(handle, bh);
+ if (status < 0)
+ mlog_errno(status);
+out_trans:
+ ocfs2_commit_trans(osb, handle);
+out_bh:
+ brelse(bh);
+out_lock:
+ ocfs2_inode_unlock(lqinode, 1);
+out_put:
+ iput(lqinode);
+ if (status < 0)
+ break;
+ }
+out:
+ mutex_unlock(&sb_dqopt(sb)->dqonoff_mutex);
+ kfree(rec);
+ return status;
+}
+
/* Read information header from quota file */
static int ocfs2_local_read_info(struct super_block *sb, int type)
{
@@ -262,6 +646,7 @@ static int ocfs2_local_read_info(struct super_block *sb, int type)
struct inode *lqinode = sb_dqopt(sb)->files[type];
int status;
struct buffer_head *bh = NULL;
+ struct ocfs2_quota_recovery *rec;
int locked = 0;

info->dqi_maxblimit = 0x7fffffffffffffffLL;
@@ -275,6 +660,7 @@ static int ocfs2_local_read_info(struct super_block *sb, int type)
info->dqi_priv = oinfo;
oinfo->dqi_type = type;
INIT_LIST_HEAD(&oinfo->dqi_chunk);
+ oinfo->dqi_rec = NULL;
oinfo->dqi_lqi_bh = NULL;
oinfo->dqi_ibh = NULL;

@@ -305,10 +691,27 @@ static int ocfs2_local_read_info(struct super_block *sb, int type)
oinfo->dqi_ibh = bh;

/* We crashed when using local quota file? */
- if (!(info->dqi_flags & OLQF_CLEAN))
- goto out_err; /* So far we just bail out. Later we should resync here */
+ if (!(info->dqi_flags & OLQF_CLEAN)) {
+ rec = OCFS2_SB(sb)->quota_rec;
+ if (!rec) {
+ rec = ocfs2_alloc_quota_recovery();
+ if (!rec) {
+ status = -ENOMEM;
+ mlog_errno(status);
+ goto out_err;
+ }
+ OCFS2_SB(sb)->quota_rec = rec;
+ }

- status = ocfs2_load_local_quota_bitmaps(sb_dqopt(sb)->files[type],
+ status = ocfs2_recovery_load_quota(lqinode, ldinfo, type,
+ &rec->r_list[type]);
+ if (status < 0) {
+ mlog_errno(status);
+ goto out_err;
+ }
+ }
+
+ status = ocfs2_load_local_quota_bitmaps(lqinode,
ldinfo,
&oinfo->dqi_chunk);
if (status < 0) {
@@ -394,6 +797,12 @@ static int ocfs2_local_free_info(struct super_block *sb, int type)
}
ocfs2_release_local_quota_bitmaps(&oinfo->dqi_chunk);

+ /* dqonoff_mutex protects us against racing with recovery thread... */
+ if (oinfo->dqi_rec) {
+ ocfs2_free_quota_recovery(oinfo->dqi_rec);
+ mark_clean = 0;
+ }
+
if (!mark_clean)
goto out;

--
1.5.6

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/