[PATCH 026/124] staging: lustre: statahead: statahead thread wait for RPCs to finish

From: James Simmons
Date: Sun Sep 18 2016 - 16:41:10 EST


From: Lai Siyao <lai.siyao@xxxxxxxxx>

Statahead thread should wait for inflight stat RPCs to finish in
case statahead RPC callback may access data allocated in statahead
thread context.

ll_sa_entry_fini() should keep old entry if stat RPC is not
finished yet.

Simplify sai refcounting:
* newly allocated sai will hold one refcount, and it will put it
after starting statahead thread.
* statahead thread holds one refcount.
* agl thread holds one refcount.
* stat process calls do_statahead_enter() which will try to get
sai, and if it's valid, it will revalidate from statahead cache,
and put refcount after use.

Signed-off-by: Lai Siyao <lai.siyao@xxxxxxxxx>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-3270
Reviewed-on: http://review.whamcloud.com/9663
Reviewed-by: Fan Yong <fan.yong@xxxxxxxxx>
Reviewed-by: James Simmons <uja.ornl@xxxxxxxxx>
Reviewed-by: Oleg Drokin <oleg.drokin@xxxxxxxxx>
Signed-off-by: James Simmons <jsimmons@xxxxxxxxxxxxx>
---
drivers/staging/lustre/lustre/include/obd.h | 1 -
drivers/staging/lustre/lustre/llite/dcache.c | 2 +-
drivers/staging/lustre/lustre/llite/file.c | 31 +-
.../staging/lustre/lustre/llite/llite_internal.h | 49 +-
drivers/staging/lustre/lustre/llite/llite_lib.c | 8 +
drivers/staging/lustre/lustre/llite/statahead.c | 849 +++++++++-----------
6 files changed, 434 insertions(+), 506 deletions(-)

diff --git a/drivers/staging/lustre/lustre/include/obd.h b/drivers/staging/lustre/lustre/include/obd.h
index 838a428..89633f7 100644
--- a/drivers/staging/lustre/lustre/include/obd.h
+++ b/drivers/staging/lustre/lustre/include/obd.h
@@ -806,7 +806,6 @@ struct md_enqueue_info {
int (*mi_cb)(struct ptlrpc_request *req,
struct md_enqueue_info *minfo, int rc);
__u64 mi_cbdata;
- unsigned int mi_generation;
};

struct obd_ops {
diff --git a/drivers/staging/lustre/lustre/llite/dcache.c b/drivers/staging/lustre/lustre/llite/dcache.c
index f4b6f38..8c00cc6 100644
--- a/drivers/staging/lustre/lustre/llite/dcache.c
+++ b/drivers/staging/lustre/lustre/llite/dcache.c
@@ -279,7 +279,7 @@ static int ll_revalidate_dentry(struct dentry *dentry,
if (lookup_flags & (LOOKUP_PARENT | LOOKUP_OPEN | LOOKUP_CREATE))
return 1;

- if (d_need_statahead(dir, dentry) <= 0)
+ if (!dentry_need_statahead(dir, dentry))
return 1;

if (lookup_flags & LOOKUP_RCU)
diff --git a/drivers/staging/lustre/lustre/llite/file.c b/drivers/staging/lustre/lustre/llite/file.c
index e9791e3..273b563 100644
--- a/drivers/staging/lustre/lustre/llite/file.c
+++ b/drivers/staging/lustre/lustre/llite/file.c
@@ -351,13 +351,11 @@ int ll_file_release(struct inode *inode, struct file *file)
fd = LUSTRE_FPRIVATE(file);
LASSERT(fd);

- /* The last ref on @file, maybe not be the owner pid of statahead.
- * Different processes can open the same dir, "ll_opendir_key" means:
- * it is me that should stop the statahead thread.
+ /* The last ref on @file, maybe not be the owner pid of statahead,
+ * because parent and child process can share the same file handle.
*/
- if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
- lli->lli_opendir_pid != 0)
- ll_stop_statahead(inode, lli->lli_opendir_key);
+ if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd)
+ ll_deauthorize_statahead(inode, fd);

if (is_root_inode(inode)) {
LUSTRE_FPRIVATE(file) = NULL;
@@ -530,7 +528,7 @@ int ll_file_open(struct inode *inode, struct file *file)
struct obd_client_handle **och_p = NULL;
__u64 *och_usecount = NULL;
struct ll_file_data *fd;
- int rc = 0, opendir_set = 0;
+ int rc = 0;

CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), flags %o\n",
PFID(ll_inode2fid(inode)), inode, file->f_flags);
@@ -545,16 +543,8 @@ int ll_file_open(struct inode *inode, struct file *file)
}

fd->fd_file = file;
- if (S_ISDIR(inode->i_mode)) {
- spin_lock(&lli->lli_sa_lock);
- if (!lli->lli_opendir_key && !lli->lli_sai &&
- lli->lli_opendir_pid == 0) {
- lli->lli_opendir_key = fd;
- lli->lli_opendir_pid = current_pid();
- opendir_set = 1;
- }
- spin_unlock(&lli->lli_sa_lock);
- }
+ if (S_ISDIR(inode->i_mode))
+ ll_authorize_statahead(inode, fd);

if (is_root_inode(inode)) {
LUSTRE_FPRIVATE(file) = fd;
@@ -713,9 +703,10 @@ out_och_free:
mutex_unlock(&lli->lli_och_mutex);

out_openerr:
- if (opendir_set != 0)
- ll_stop_statahead(inode, lli->lli_opendir_key);
- ll_file_data_put(fd);
+ if (lli->lli_opendir_key == fd)
+ ll_deauthorize_statahead(inode, fd);
+ if (fd)
+ ll_file_data_put(fd);
} else {
ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
}
diff --git a/drivers/staging/lustre/lustre/llite/llite_internal.h b/drivers/staging/lustre/lustre/llite/llite_internal.h
index cbd5bc5..f903f2a 100644
--- a/drivers/staging/lustre/lustre/llite/llite_internal.h
+++ b/drivers/staging/lustre/lustre/llite/llite_internal.h
@@ -172,6 +172,13 @@ struct ll_inode_info {
* -- I am the owner of dir statahead.
*/
pid_t d_opendir_pid;
+ /* stat will try to access statahead entries or start
+ * statahead if this flag is set, and this flag will be
+ * set upon dir open, and cleared when dir is closed,
+ * statahead hit ratio is too low, or start statahead
+ * thread failed.
+ */
+ unsigned int d_sa_enabled:1;
/* directory stripe information */
struct lmv_stripe_md *d_lsm_md;
/* striped directory size */
@@ -184,6 +191,7 @@ struct ll_inode_info {
#define lli_opendir_key u.d.d_opendir_key
#define lli_sai u.d.d_sai
#define lli_sa_lock u.d.d_sa_lock
+#define lli_sa_enabled u.d.d_sa_enabled
#define lli_opendir_pid u.d.d_opendir_pid
#define lli_lsm_md u.d.d_lsm_md
#define lli_stripe_dir_size u.d.d_stripe_size
@@ -495,6 +503,9 @@ struct ll_sb_info {
atomic_t ll_sa_wrong; /* statahead thread stopped for
* low hit ratio
*/
+ atomic_t ll_sa_running; /* running statahead thread
+ * count
+ */
atomic_t ll_agl_total; /* AGL thread started count */

dev_t ll_sdev_orig; /* save s_dev before assign for
@@ -1040,7 +1051,8 @@ struct ll_statahead_info {

int do_statahead_enter(struct inode *dir, struct dentry **dentry,
int only_unplug);
-void ll_stop_statahead(struct inode *dir, void *key);
+void ll_authorize_statahead(struct inode *dir, void *key);
+void ll_deauthorize_statahead(struct inode *dir, void *key);

blkcnt_t dirty_cnt(struct inode *inode);

@@ -1086,25 +1098,31 @@ ll_statahead_mark(struct inode *dir, struct dentry *dentry)
ldd->lld_sa_generation = sai->sai_generation;
}

-static inline int
-d_need_statahead(struct inode *dir, struct dentry *dentryp)
+static inline bool
+dentry_need_statahead(struct inode *dir, struct dentry *dentry)
{
struct ll_inode_info *lli;
struct ll_dentry_data *ldd;

if (ll_i2sbi(dir)->ll_sa_max == 0)
- return -EAGAIN;
+ return false;

lli = ll_i2info(dir);
+
+ /*
+ * statahead is not allowed for this dir, there may be three causes:
+ * 1. dir is not opened.
+ * 2. statahead hit ratio is too low.
+ * 3. previous stat started statahead thread failed.
+ */
+ if (!lli->lli_sa_enabled)
+ return false;
+
/* not the same process, don't statahead */
if (lli->lli_opendir_pid != current_pid())
- return -EAGAIN;
-
- /* statahead has been stopped */
- if (!lli->lli_opendir_key)
- return -EAGAIN;
+ return false;

- ldd = ll_d2d(dentryp);
+ ldd = ll_d2d(dentry);
/*
* When stats a dentry, the system trigger more than once "revalidate"
* or "lookup", for "getattr", for "getxattr", and maybe for others.
@@ -1122,19 +1140,16 @@ d_need_statahead(struct inode *dir, struct dentry *dentryp)
*/
if (ldd && lli->lli_sai &&
ldd->lld_sa_generation == lli->lli_sai->sai_generation)
- return -EAGAIN;
+ return false;

- return 1;
+ return true;
}

static inline int
ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug)
{
- int ret;
-
- ret = d_need_statahead(dir, *dentryp);
- if (ret <= 0)
- return ret;
+ if (!dentry_need_statahead(dir, *dentryp))
+ return -EAGAIN;

return do_statahead_enter(dir, dentryp, only_unplug);
}
diff --git a/drivers/staging/lustre/lustre/llite/llite_lib.c b/drivers/staging/lustre/lustre/llite/llite_lib.c
index 99aba6b..93fd69b 100644
--- a/drivers/staging/lustre/lustre/llite/llite_lib.c
+++ b/drivers/staging/lustre/lustre/llite/llite_lib.c
@@ -116,6 +116,7 @@ static struct ll_sb_info *ll_init_sbi(struct super_block *sb)
sbi->ll_sa_max = LL_SA_RPC_DEF;
atomic_set(&sbi->ll_sa_total, 0);
atomic_set(&sbi->ll_sa_wrong, 0);
+ atomic_set(&sbi->ll_sa_running, 0);
atomic_set(&sbi->ll_agl_total, 0);
sbi->ll_flags |= LL_SBI_AGL_ENABLED;

@@ -630,6 +631,12 @@ void ll_kill_super(struct super_block *sb)
if (sbi) {
sb->s_dev = sbi->ll_sdev_orig;
sbi->ll_umounting = 1;
+
+ /* wait running statahead threads to quit */
+ while (atomic_read(&sbi->ll_sa_running) > 0) {
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ schedule_timeout(msecs_to_jiffies(MSEC_PER_SEC >> 3));
+ }
}
}

@@ -795,6 +802,7 @@ void ll_lli_init(struct ll_inode_info *lli)
lli->lli_sai = NULL;
spin_lock_init(&lli->lli_sa_lock);
lli->lli_opendir_pid = 0;
+ lli->lli_sa_enabled = 0;
} else {
mutex_init(&lli->lli_size_mutex);
lli->lli_symlink_name = NULL;
diff --git a/drivers/staging/lustre/lustre/llite/statahead.c b/drivers/staging/lustre/lustre/llite/statahead.c
index 016463b..6577a66 100644
--- a/drivers/staging/lustre/lustre/llite/statahead.c
+++ b/drivers/staging/lustre/lustre/llite/statahead.c
@@ -281,25 +281,6 @@ ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index)
return NULL;
}

-static void ll_sa_entry_cleanup(struct ll_statahead_info *sai,
- struct ll_sa_entry *entry)
-{
- struct md_enqueue_info *minfo = entry->se_minfo;
- struct ptlrpc_request *req = entry->se_req;
-
- if (minfo) {
- entry->se_minfo = NULL;
- ll_intent_release(&minfo->mi_it);
- iput(minfo->mi_dir);
- kfree(minfo);
- }
-
- if (req) {
- entry->se_req = NULL;
- ptlrpc_req_finished(req);
- }
-}
-
static void ll_sa_entry_put(struct ll_statahead_info *sai,
struct ll_sa_entry *entry)
{
@@ -312,7 +293,6 @@ static void ll_sa_entry_put(struct ll_statahead_info *sai,
LASSERT(list_empty(&entry->se_list));
LASSERT(list_empty(&entry->se_hash));

- ll_sa_entry_cleanup(sai, entry);
iput(entry->se_inode);

kfree(entry);
@@ -355,7 +335,10 @@ ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) {
if (!is_omitted_entry(sai, pos->se_index))
break;
- do_sa_entry_fini(sai, pos);
+ /* keep those whose statahead RPC not finished */
+ if (pos->se_stat == SA_ENTRY_SUCC ||
+ pos->se_stat == SA_ENTRY_INVA)
+ do_sa_entry_fini(sai, pos);
}
}

@@ -363,12 +346,14 @@ ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
* Inside lli_sa_lock.
*/
static void
-do_sa_entry_to_stated(struct ll_statahead_info *sai,
- struct ll_sa_entry *entry, enum se_stat stat)
+__sa_entry_post_stat(struct ll_statahead_info *sai, struct ll_sa_entry *entry,
+ enum se_stat stat)
{
struct ll_sa_entry *se;
struct list_head *pos = &sai->sai_entries_stated;

+ LASSERT(entry->se_stat == SA_ENTRY_INIT);
+
if (!list_empty(&entry->se_list))
list_del_init(&entry->se_list);

@@ -388,23 +373,30 @@ do_sa_entry_to_stated(struct ll_statahead_info *sai,
* \retval 1 -- entry to be destroyed.
* \retval 0 -- entry is inserted into stated list.
*/
-static int
-ll_sa_entry_to_stated(struct ll_statahead_info *sai,
- struct ll_sa_entry *entry, enum se_stat stat)
+static void
+sa_entry_post_stat(struct ll_statahead_info *sai, struct ll_sa_entry *entry,
+ enum se_stat stat)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
- int ret = 1;
+ struct md_enqueue_info *minfo = entry->se_minfo;
+ struct ptlrpc_request *req = entry->se_req;
+
+ /* release resources used in RPC */
+ if (minfo) {
+ entry->se_minfo = NULL;
+ ll_intent_release(&minfo->mi_it);
+ iput(minfo->mi_dir);
+ kfree(minfo);
+ }

- ll_sa_entry_cleanup(sai, entry);
+ if (req) {
+ entry->se_req = NULL;
+ ptlrpc_req_finished(req);
+ }

spin_lock(&lli->lli_sa_lock);
- if (likely(entry->se_stat != SA_ENTRY_DEST)) {
- do_sa_entry_to_stated(sai, entry, stat);
- ret = 0;
- }
+ __sa_entry_post_stat(sai, entry, stat);
spin_unlock(&lli->lli_sa_lock);
-
- return ret;
}

/*
@@ -475,56 +467,46 @@ static struct ll_statahead_info *ll_sai_alloc(void)
return sai;
}

-static inline struct ll_statahead_info *
-ll_sai_get(struct ll_statahead_info *sai)
+static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
{
- atomic_inc(&sai->sai_refcount);
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai = NULL;
+
+ spin_lock(&lli->lli_sa_lock);
+ sai = lli->lli_sai;
+ if (sai)
+ atomic_inc(&sai->sai_refcount);
+ spin_unlock(&lli->lli_sa_lock);
+
return sai;
}

static void ll_sai_put(struct ll_statahead_info *sai)
{
- struct inode *inode = sai->sai_inode;
- struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_inode_info *lli = ll_i2info(sai->sai_inode);

if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
+ struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
struct ll_sa_entry *entry, *next;

- if (unlikely(atomic_read(&sai->sai_refcount) > 0)) {
- /* It is race case, the interpret callback just hold
- * a reference count
- */
- spin_unlock(&lli->lli_sa_lock);
- return;
- }
-
- LASSERT(!lli->lli_opendir_key);
- LASSERT(thread_is_stopped(&sai->sai_thread));
- LASSERT(thread_is_stopped(&sai->sai_agl_thread));
-
lli->lli_sai = NULL;
- lli->lli_opendir_pid = 0;
spin_unlock(&lli->lli_sa_lock);

- if (sai->sai_sent > sai->sai_replied)
- CDEBUG(D_READA, "statahead for dir "DFID
- " does not finish: [sent:%llu] [replied:%llu]\n",
- PFID(&lli->lli_fid),
- sai->sai_sent, sai->sai_replied);
+ LASSERT(thread_is_stopped(&sai->sai_thread));
+ LASSERT(thread_is_stopped(&sai->sai_agl_thread));
+ LASSERT(sai->sai_sent == sai->sai_replied);

list_for_each_entry_safe(entry, next, &sai->sai_entries,
se_link)
do_sa_entry_fini(sai, entry);

- LASSERT(list_empty(&sai->sai_entries));
- LASSERT(list_empty(&sai->sai_entries_received));
- LASSERT(list_empty(&sai->sai_entries_stated));
-
LASSERT(atomic_read(&sai->sai_cache_count) == 0);
LASSERT(list_empty(&sai->sai_entries_agl));
+ LASSERT(atomic_read(&sai->sai_refcount) == 0);

- iput(inode);
+ iput(sai->sai_inode);
kfree(sai);
+ atomic_dec(&sbi->ll_sa_running);
}
}

@@ -588,29 +570,18 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
iput(inode);
}

-static void ll_post_statahead(struct ll_statahead_info *sai)
+/* prepare inode for received statahead entry, and add it into agl list */
+static void sa_post_one(struct ll_statahead_info *sai,
+ struct ll_sa_entry *entry)
{
struct inode *dir = sai->sai_inode;
struct inode *child;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_sa_entry *entry;
struct md_enqueue_info *minfo;
struct lookup_intent *it;
struct ptlrpc_request *req;
struct mdt_body *body;
int rc = 0;

- spin_lock(&lli->lli_sa_lock);
- if (unlikely(list_empty(&sai->sai_entries_received))) {
- spin_unlock(&lli->lli_sa_lock);
- return;
- }
- entry = list_entry(sai->sai_entries_received.next,
- struct ll_sa_entry, se_list);
- atomic_inc(&entry->se_refcount);
- list_del_init(&entry->se_list);
- spin_unlock(&lli->lli_sa_lock);
-
LASSERT(entry->se_handle != 0);

minfo = entry->se_minfo;
@@ -670,18 +641,56 @@ static void ll_post_statahead(struct ll_statahead_info *sai)
ll_agl_add(sai, child, entry->se_index);

out:
- /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock
+ /* The "sa_entry_post_stat()" will drop related ldlm ibits lock
* reference count by calling "ll_intent_drop_lock()" in spite of the
* above operations failed or not. Do not worry about calling
* "ll_intent_drop_lock()" more than once.
*/
- rc = ll_sa_entry_to_stated(sai, entry,
- rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
- if (rc == 0 && entry->se_index == sai->sai_index_wait)
+ sa_entry_post_stat(sai, entry, rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
+ if (entry->se_index == sai->sai_index_wait)
wake_up(&sai->sai_waitq);
ll_sa_entry_put(sai, entry);
}

+static void ll_post_statahead(struct ll_statahead_info *sai)
+{
+ struct ll_inode_info *lli;
+
+ lli = ll_i2info(sai->sai_inode);
+
+ while (!sa_received_empty(sai)) {
+ struct ll_sa_entry *entry;
+
+ spin_lock(&lli->lli_sa_lock);
+ if (unlikely(sa_received_empty(sai))) {
+ spin_unlock(&lli->lli_sa_lock);
+ break;
+ }
+ entry = list_entry(sai->sai_entries_received.next,
+ struct ll_sa_entry, se_list);
+ atomic_inc(&entry->se_refcount);
+ list_del_init(&entry->se_list);
+ spin_unlock(&lli->lli_sa_lock);
+
+ sa_post_one(sai, entry);
+ }
+
+ spin_lock(&lli->lli_agl_lock);
+ while (!agl_list_empty(sai)) {
+ struct ll_inode_info *clli;
+
+ clli = list_entry(sai->sai_entries_agl.next,
+ struct ll_inode_info, lli_agl_list);
+ list_del_init(&clli->lli_agl_list);
+ spin_unlock(&lli->lli_agl_lock);
+
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
+
+ spin_lock(&lli->lli_agl_lock);
+ }
+ spin_unlock(&lli->lli_agl_lock);
+}
+
static int ll_statahead_interpret(struct ptlrpc_request *req,
struct md_enqueue_info *minfo, int rc)
{
@@ -690,72 +699,43 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = NULL;
struct ll_sa_entry *entry;
- __u64 handle = 0;
int wakeup;

if (it_disposition(it, DISP_LOOKUP_NEG))
rc = -ENOENT;

- if (rc == 0) {
- /* release ibits lock ASAP to avoid deadlock when statahead
- * thread enqueues lock on parent in readdir and another
- * process enqueues lock on child with parent lock held, eg.
- * unlink.
- */
- handle = it->it_lock_handle;
- ll_intent_drop_lock(it);
- }
+ sai = ll_sai_get(dir);
+ LASSERT(sai);
+ LASSERT(!thread_is_stopped(&sai->sai_thread));

spin_lock(&lli->lli_sa_lock);
- /* stale entry */
- if (unlikely(!lli->lli_sai ||
- lli->lli_sai->sai_generation != minfo->mi_generation)) {
- spin_unlock(&lli->lli_sa_lock);
- rc = -ESTALE;
- goto out;
+ entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata);
+ LASSERT(entry);
+ if (rc) {
+ __sa_entry_post_stat(sai, entry, SA_ENTRY_INVA);
+ wakeup = (entry->se_index == sai->sai_index_wait);
} else {
- sai = ll_sai_get(lli->lli_sai);
- if (unlikely(!thread_is_running(&sai->sai_thread))) {
- sai->sai_replied++;
- spin_unlock(&lli->lli_sa_lock);
- rc = -EBADFD;
- goto out;
- }
-
- entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata);
- if (!entry) {
- sai->sai_replied++;
- spin_unlock(&lli->lli_sa_lock);
- rc = -EIDRM;
- goto out;
- }
-
- if (rc != 0) {
- do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA);
- wakeup = (entry->se_index == sai->sai_index_wait);
- } else {
- entry->se_minfo = minfo;
- entry->se_req = ptlrpc_request_addref(req);
- /* Release the async ibits lock ASAP to avoid deadlock
- * when statahead thread tries to enqueue lock on parent
- * for readpage and other tries to enqueue lock on child
- * with parent's lock held, for example: unlink.
- */
- entry->se_handle = handle;
- wakeup = list_empty(&sai->sai_entries_received);
- list_add_tail(&entry->se_list,
- &sai->sai_entries_received);
- }
- sai->sai_replied++;
- spin_unlock(&lli->lli_sa_lock);
-
- ll_sa_entry_put(sai, entry);
- if (wakeup)
- wake_up(&sai->sai_thread.t_ctl_waitq);
+ entry->se_minfo = minfo;
+ entry->se_req = ptlrpc_request_addref(req);
+ /*
+ * Release the async ibits lock ASAP to avoid deadlock
+ * when statahead thread tries to enqueue lock on parent
+ * for readpage and other tries to enqueue lock on child
+ * with parent's lock held, for example: unlink.
+ */
+ entry->se_handle = it->it_lock_handle;
+ ll_intent_drop_lock(it);
+ wakeup = sa_received_empty(sai);
+ list_add_tail(&entry->se_list, &sai->sai_entries_received);
}
+ sai->sai_replied++;
+ spin_unlock(&lli->lli_sa_lock);

-out:
- if (rc != 0) {
+ ll_sa_entry_put(sai, entry);
+ if (wakeup)
+ wake_up(&sai->sai_thread.t_ctl_waitq);
+
+ if (rc) {
ll_intent_release(it);
iput(dir);
kfree(minfo);
@@ -782,7 +762,6 @@ static int sa_args_init(struct inode *dir, struct inode *child,
struct ldlm_enqueue_info **pei)
{
const struct qstr *qstr = &entry->se_qstr;
- struct ll_inode_info *lli = ll_i2info(dir);
struct md_enqueue_info *minfo;
struct ldlm_enqueue_info *einfo;
struct md_op_data *op_data;
@@ -808,7 +787,6 @@ static int sa_args_init(struct inode *dir, struct inode *child,
minfo->mi_it.it_op = IT_GETATTR;
minfo->mi_dir = igrab(dir);
minfo->mi_cb = ll_statahead_interpret;
- minfo->mi_generation = lli->lli_sai->sai_generation;
minfo->mi_cbdata = entry->se_index;

einfo->ei_type = LDLM_IBITS;
@@ -889,8 +867,8 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry,
return rc;
}

-static void ll_statahead_one(struct dentry *parent, const char *entry_name,
- int entry_name_len)
+static void ll_statahead_one(struct dentry *parent, const char *name,
+ const int name_len)
{
struct inode *dir = d_inode(parent);
struct ll_inode_info *lli = ll_i2info(dir);
@@ -898,10 +876,9 @@ static void ll_statahead_one(struct dentry *parent, const char *entry_name,
struct dentry *dentry = NULL;
struct ll_sa_entry *entry;
int rc;
- int rc1;

- entry = ll_sa_entry_alloc(parent, sai, sai->sai_index, entry_name,
- entry_name_len);
+ entry = ll_sa_entry_alloc(parent, sai, sai->sai_index, name,
+ name_len);
if (IS_ERR(entry))
return;

@@ -912,15 +889,15 @@ static void ll_statahead_one(struct dentry *parent, const char *entry_name,
rc = do_sa_revalidate(dir, entry, dentry);
if (rc == 1 && agl_should_run(sai, d_inode(dentry)))
ll_agl_add(sai, d_inode(dentry), entry->se_index);
+ }

+ if (dentry)
dput(dentry);
- }

if (rc) {
- rc1 = ll_sa_entry_to_stated(sai, entry,
- rc < 0 ? SA_ENTRY_INVA :
- SA_ENTRY_SUCC);
- if (rc1 == 0 && entry->se_index == sai->sai_index_wait)
+ sa_entry_post_stat(sai, entry,
+ rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
+ if (entry->se_index == sai->sai_index_wait)
wake_up(&sai->sai_waitq);
} else {
sai->sai_sent++;
@@ -938,10 +915,12 @@ static int ll_agl_thread(void *arg)
struct ll_inode_info *plli = ll_i2info(dir);
struct ll_inode_info *clli;
struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai);
- struct ptlrpc_thread *thread = &sai->sai_agl_thread;
+ struct ll_statahead_info *sai;
+ struct ptlrpc_thread *thread;
struct l_wait_info lwi = { 0 };

+ sai = ll_sai_get(dir);
+ thread = &sai->sai_agl_thread;
thread->t_pid = current_pid();
CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
sai, parent);
@@ -1030,12 +1009,11 @@ static int ll_statahead_thread(void *arg)
{
struct dentry *parent = arg;
struct inode *dir = d_inode(parent);
- struct ll_inode_info *plli = ll_i2info(dir);
- struct ll_inode_info *clli;
+ struct ll_inode_info *lli = ll_i2info(dir);
struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai);
- struct ptlrpc_thread *thread = &sai->sai_thread;
- struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread;
+ struct ll_statahead_info *sai;
+ struct ptlrpc_thread *thread;
+ struct ptlrpc_thread *agl_thread;
struct page *page = NULL;
__u64 pos = 0;
int first = 0;
@@ -1044,6 +1022,9 @@ static int ll_statahead_thread(void *arg)
struct ll_dir_chain chain;
struct l_wait_info lwi = { 0 };

+ sai = ll_sai_get(dir);
+ thread = &sai->sai_thread;
+ agl_thread = &sai->sai_agl_thread;
thread->t_pid = current_pid();
CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
sai, parent);
@@ -1052,7 +1033,7 @@ static int ll_statahead_thread(void *arg)
LUSTRE_OPC_ANY, dir);
if (IS_ERR(op_data)) {
rc = PTR_ERR(op_data);
- goto out_put;
+ goto out;
}

op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
@@ -1061,33 +1042,35 @@ static int ll_statahead_thread(void *arg)
ll_start_agl(parent, sai);

atomic_inc(&sbi->ll_sa_total);
- spin_lock(&plli->lli_sa_lock);
+ spin_lock(&lli->lli_sa_lock);
if (thread_is_init(thread))
/* If someone else has changed the thread state
* (e.g. already changed to SVC_STOPPING), we can't just
* blindly overwrite that setting.
*/
thread_set_flags(thread, SVC_RUNNING);
- spin_unlock(&plli->lli_sa_lock);
+ spin_unlock(&lli->lli_sa_lock);
wake_up(&thread->t_ctl_waitq);

ll_dir_chain_init(&chain);
- page = ll_get_dir_page(dir, op_data, pos, &chain);
-
- while (1) {
+ while (pos != MDS_DIR_END_OFF && thread_is_running(thread)) {
struct lu_dirpage *dp;
struct lu_dirent *ent;

+ sai->sai_in_readpage = 1;
+ page = ll_get_dir_page(dir, op_data, pos, &chain);
+ sai->sai_in_readpage = 0;
if (IS_ERR(page)) {
rc = PTR_ERR(page);
CDEBUG(D_READA, "error reading dir "DFID" at %llu/%llu: opendir_pid = %u: rc = %d\n",
PFID(ll_inode2fid(dir)), pos, sai->sai_index,
- plli->lli_opendir_pid, rc);
- goto out;
+ lli->lli_opendir_pid, rc);
+ break;
}

dp = page_address(page);
- for (ent = lu_dirent_start(dp); ent;
+ for (ent = lu_dirent_start(dp);
+ ent && thread_is_running(thread) && !sa_low_hit(sai);
ent = lu_dirent_next(ent)) {
__u64 hash;
int namelen;
@@ -1134,120 +1117,63 @@ static int ll_statahead_thread(void *arg)
if (unlikely(++first == 1))
continue;

-keep_it:
- l_wait_event(thread->t_ctl_waitq,
- !sa_sent_full(sai) ||
- !list_empty(&sai->sai_entries_received) ||
- !list_empty(&sai->sai_entries_agl) ||
- !thread_is_running(thread),
- &lwi);
-
-interpret_it:
- while (!list_empty(&sai->sai_entries_received))
+ /* wait for spare statahead window */
+ do {
+ l_wait_event(thread->t_ctl_waitq,
+ !sa_sent_full(sai) ||
+ !list_empty(&sai->sai_entries_received) ||
+ !list_empty(&sai->sai_entries_agl) ||
+ !thread_is_running(thread),
+ &lwi);
ll_post_statahead(sai);
+ } while (sa_sent_full(sai) &&
+ thread_is_running(thread));

- if (unlikely(!thread_is_running(thread))) {
- ll_release_page(dir, page, false);
- rc = 0;
- goto out;
- }
-
- /* If no window for metadata statahead, but there are
- * some AGL entries to be triggered, then try to help
- * to process the AGL entries.
- */
- if (sa_sent_full(sai)) {
- spin_lock(&plli->lli_agl_lock);
- while (!list_empty(&sai->sai_entries_agl)) {
- clli = list_entry(sai->sai_entries_agl.next,
- struct ll_inode_info, lli_agl_list);
- list_del_init(&clli->lli_agl_list);
- spin_unlock(&plli->lli_agl_lock);
- ll_agl_trigger(&clli->lli_vfs_inode,
- sai);
-
- if (!list_empty(&sai->sai_entries_received))
- goto interpret_it;
-
- if (unlikely(!thread_is_running(thread))) {
- ll_release_page(dir, page, false);
- rc = 0;
- goto out;
- }
-
- if (!sa_sent_full(sai))
- goto do_it;
-
- spin_lock(&plli->lli_agl_lock);
- }
- spin_unlock(&plli->lli_agl_lock);
-
- goto keep_it;
- }
-do_it:
ll_statahead_one(parent, name, namelen);
}

pos = le64_to_cpu(dp->ldp_hash_end);
- if (pos == MDS_DIR_END_OFF) {
- /*
- * End of directory reached.
- */
- ll_release_page(dir, page, false);
- while (1) {
- l_wait_event(thread->t_ctl_waitq,
- !list_empty(&sai->sai_entries_received) ||
- sai->sai_sent == sai->sai_replied ||
- !thread_is_running(thread),
- &lwi);
+ ll_release_page(dir, page,
+ le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);

- while (!list_empty(&sai->sai_entries_received))
- ll_post_statahead(sai);
-
- if (unlikely(!thread_is_running(thread))) {
- rc = 0;
- goto out;
- }
+ if (sa_low_hit(sai)) {
+ rc = -EFAULT;
+ atomic_inc(&sbi->ll_sa_wrong);
+ CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stopping statahead thread: pid %d\n",
+ PFID(&lli->lli_fid), sai->sai_hit,
+ sai->sai_miss, sai->sai_sent,
+ sai->sai_replied, current_pid());
+ break;
+ }
+ }
+ ll_dir_chain_fini(&chain);
+ ll_finish_md_op_data(op_data);

- if (sai->sai_sent == sai->sai_replied &&
- list_empty(&sai->sai_entries_received))
- break;
- }
+ if (rc < 0) {
+ spin_lock(&lli->lli_sa_lock);
+ thread_set_flags(thread, SVC_STOPPING);
+ lli->lli_sa_enabled = 0;
+ spin_unlock(&lli->lli_sa_lock);
+ }

- spin_lock(&plli->lli_agl_lock);
- while (!list_empty(&sai->sai_entries_agl) &&
- thread_is_running(thread)) {
- clli = list_entry(sai->sai_entries_agl.next,
- struct ll_inode_info, lli_agl_list);
- list_del_init(&clli->lli_agl_list);
- spin_unlock(&plli->lli_agl_lock);
- ll_agl_trigger(&clli->lli_vfs_inode, sai);
- spin_lock(&plli->lli_agl_lock);
- }
- spin_unlock(&plli->lli_agl_lock);
+ /*
+ * statahead is finished, but statahead entries need to be cached, wait
+ * for file release to stop me.
+ */
+ while (thread_is_running(thread)) {
+ l_wait_event(thread->t_ctl_waitq,
+ !sa_received_empty(sai) ||
+ !agl_list_empty(sai) ||
+ !thread_is_running(thread),
+ &lwi);

- rc = 0;
- goto out;
- } else {
- /*
- * chain is exhausted.
- * Normal case: continue to the next page.
- */
- ll_release_page(dir, page,
- le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
- sai->sai_in_readpage = 1;
- page = ll_get_dir_page(dir, op_data, pos, &chain);
- sai->sai_in_readpage = 0;
- }
+ ll_post_statahead(sai);
}
out:
- ll_dir_chain_fini(&chain);
- ll_finish_md_op_data(op_data);
-out_put:
if (sai->sai_agl_valid) {
- spin_lock(&plli->lli_agl_lock);
+ spin_lock(&lli->lli_agl_lock);
thread_set_flags(agl_thread, SVC_STOPPING);
- spin_unlock(&plli->lli_agl_lock);
+ spin_unlock(&lli->lli_agl_lock);
wake_up(&agl_thread->t_ctl_waitq);

CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
@@ -1257,21 +1183,27 @@ out_put:
&lwi);
} else {
/* Set agl_thread flags anyway. */
- thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
+ thread_set_flags(agl_thread, SVC_STOPPED);
}
- spin_lock(&plli->lli_sa_lock);
- if (!list_empty(&sai->sai_entries_received)) {
- thread_set_flags(thread, SVC_STOPPING);
- spin_unlock(&plli->lli_sa_lock);
-
- /* To release the resources held by received entries. */
- while (!list_empty(&sai->sai_entries_received))
- ll_post_statahead(sai);

- spin_lock(&plli->lli_sa_lock);
+ /*
+ * wait for inflight statahead RPCs to finish, and then we can free sai
+ * safely because statahead RPC will access sai data
+ */
+ while (sai->sai_sent != sai->sai_replied) {
+ /* in case we're not woken up, timeout wait */
+ lwi = LWI_TIMEOUT(HZ >> 3, NULL, NULL);
+ l_wait_event(thread->t_ctl_waitq,
+ sai->sai_sent == sai->sai_replied, &lwi);
}
+
+ /* release resources held by received entries. */
+ ll_post_statahead(sai);
+
+ spin_lock(&lli->lli_sa_lock);
thread_set_flags(thread, SVC_STOPPED);
- spin_unlock(&plli->lli_sa_lock);
+ spin_unlock(&lli->lli_sa_lock);
+
wake_up(&sai->sai_waitq);
wake_up(&thread->t_ctl_waitq);
ll_sai_put(sai);
@@ -1281,52 +1213,54 @@ out_put:
return rc;
}

-/**
- * called in ll_file_release().
- */
-void ll_stop_statahead(struct inode *dir, void *key)
+/* authorize opened dir handle @key to statahead later */
+void ll_authorize_statahead(struct inode *dir, void *key)
{
struct ll_inode_info *lli = ll_i2info(dir);

- if (unlikely(!key))
- return;
-
spin_lock(&lli->lli_sa_lock);
- if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
- spin_unlock(&lli->lli_sa_lock);
- return;
+ if (!lli->lli_opendir_key && !lli->lli_sai) {
+ /*
+ * if lli_sai is not NULL, it means previous statahead is not
+ * finished yet, we'd better not start a new statahead for now.
+ */
+ LASSERT(!lli->lli_opendir_pid);
+ lli->lli_opendir_key = key;
+ lli->lli_opendir_pid = current_pid();
+ lli->lli_sa_enabled = 1;
}
+ spin_unlock(&lli->lli_sa_lock);
+}

- lli->lli_opendir_key = NULL;
+/*
+ * deauthorize opened dir handle @key to statahead, but statahead thread may
+ * still be running, notify it to quit.
+ */
+void ll_deauthorize_statahead(struct inode *dir, void *key)
+{
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai;

- if (lli->lli_sai) {
- struct l_wait_info lwi = { 0 };
- struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
+ LASSERT(lli->lli_opendir_key == key);
+ LASSERT(lli->lli_opendir_pid);

- if (!thread_is_stopped(thread)) {
- thread_set_flags(thread, SVC_STOPPING);
- spin_unlock(&lli->lli_sa_lock);
- wake_up(&thread->t_ctl_waitq);
-
- CDEBUG(D_READA, "stop statahead thread: sai %p pid %u\n",
- lli->lli_sai, (unsigned int)thread->t_pid);
- l_wait_event(thread->t_ctl_waitq,
- thread_is_stopped(thread),
- &lwi);
- } else {
- spin_unlock(&lli->lli_sa_lock);
- }
+ CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
+ PFID(&lli->lli_fid));

+ spin_lock(&lli->lli_sa_lock);
+ lli->lli_opendir_key = NULL;
+ lli->lli_opendir_pid = 0;
+ lli->lli_sa_enabled = 0;
+ sai = lli->lli_sai;
+ if (sai && thread_is_running(&sai->sai_thread)) {
/*
- * Put the ref which was held when first statahead_enter.
- * It maybe not the last ref for some statahead requests
- * maybe inflight.
+ * statahead thread may not quit yet because it needs to cache
+ * stated entries, now it's time to tell it to quit.
*/
- ll_sai_put(lli->lli_sai);
- } else {
- lli->lli_opendir_pid = 0;
- spin_unlock(&lli->lli_sa_lock);
+ thread_set_flags(&sai->sai_thread, SVC_STOPPING);
+ wake_up(&sai->sai_thread.t_ctl_waitq);
}
+ spin_unlock(&lli->lli_sa_lock);
}

enum {
@@ -1465,175 +1399,137 @@ out:
static void
ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
{
- struct ptlrpc_thread *thread = &sai->sai_thread;
- struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
- int hit;
+ if (entry && entry->se_stat == SA_ENTRY_SUCC) {
+ struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);

- if (entry && entry->se_stat == SA_ENTRY_SUCC)
- hit = 1;
- else
- hit = 0;
-
- ll_sa_entry_fini(sai, entry);
- if (hit) {
sai->sai_hit++;
sai->sai_consecutive_miss = 0;
sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
} else {
- struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
-
sai->sai_miss++;
sai->sai_consecutive_miss++;
- if (sa_low_hit(sai) && thread_is_running(thread)) {
- atomic_inc(&sbi->ll_sa_wrong);
- CDEBUG(D_READA, "Statahead for dir " DFID " hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stopping statahead thread\n",
- PFID(&lli->lli_fid), sai->sai_hit,
- sai->sai_miss, sai->sai_sent,
- sai->sai_replied);
- spin_lock(&lli->lli_sa_lock);
- if (!thread_is_stopped(thread))
- thread_set_flags(thread, SVC_STOPPING);
- spin_unlock(&lli->lli_sa_lock);
- }
}
-
- if (!thread_is_stopped(thread))
- wake_up(&thread->t_ctl_waitq);
+ ll_sa_entry_fini(sai, entry);
+ wake_up(&sai->sai_thread.t_ctl_waitq);
}

-/**
- * Start statahead thread if this is the first dir entry.
- * Otherwise if a thread is started already, wait it until it is ahead of me.
- * \retval 1 -- find entry with lock in cache, the caller needs to do
- * nothing.
- * \retval 0 -- find entry in cache, but without lock, the caller needs
- * refresh from MDS.
- * \retval others -- the caller need to process as non-statahead.
- */
-int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
- int only_unplug)
+static int revalidate_statahead_dentry(struct inode *dir,
+ struct ll_statahead_info *sai,
+ struct dentry **dentryp,
+ int only_unplug)
{
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
- struct dentry *parent;
- struct ll_sa_entry *entry;
- struct ptlrpc_thread *thread;
- struct l_wait_info lwi = { 0 };
- struct task_struct *task;
- int rc = 0;
- struct ll_inode_info *plli;
-
- LASSERT(lli->lli_opendir_pid == current_pid());
-
- if (sai) {
- thread = &sai->sai_thread;
- if (unlikely(thread_is_stopped(thread) &&
- list_empty(&sai->sai_entries_stated))) {
- /* to release resource */
- ll_stop_statahead(dir, lli->lli_opendir_key);
- return -EAGAIN;
- }
+ struct ll_sa_entry *entry = NULL;
+ struct l_wait_info lwi = { 0 };
+ int rc = 0;

- if ((*dentryp)->d_name.name[0] == '.') {
- if (sai->sai_ls_all ||
- sai->sai_miss_hidden >= sai->sai_skip_hidden) {
+ if ((*dentryp)->d_name.name[0] == '.') {
+ if (sai->sai_ls_all ||
+ sai->sai_miss_hidden >= sai->sai_skip_hidden) {
+ /*
+ * Hidden dentry is the first one, or statahead
+ * thread does not skip so many hidden dentries
+ * before "sai_ls_all" enabled as below.
+ */
+ } else {
+ if (!sai->sai_ls_all)
/*
- * Hidden dentry is the first one, or statahead
- * thread does not skip so many hidden dentries
- * before "sai_ls_all" enabled as below.
+ * It maybe because hidden dentry is not
+ * the first one, "sai_ls_all" was not
+ * set, then "ls -al" missed. Enable
+ * "sai_ls_all" for such case.
*/
- } else {
- if (!sai->sai_ls_all)
- /*
- * It maybe because hidden dentry is not
- * the first one, "sai_ls_all" was not
- * set, then "ls -al" missed. Enable
- * "sai_ls_all" for such case.
- */
- sai->sai_ls_all = 1;
+ sai->sai_ls_all = 1;

- /*
- * Such "getattr" has been skipped before
- * "sai_ls_all" enabled as above.
- */
- sai->sai_miss_hidden++;
- return -EAGAIN;
- }
+ /*
+ * Such "getattr" has been skipped before
+ * "sai_ls_all" enabled as above.
+ */
+ sai->sai_miss_hidden++;
+ return -EAGAIN;
}
+ }

- entry = ll_sa_entry_get_byname(sai, &(*dentryp)->d_name);
- if (!entry || only_unplug) {
+ entry = ll_sa_entry_get_byname(sai, &(*dentryp)->d_name);
+ if (!entry || only_unplug) {
+ ll_sai_unplug(sai, entry);
+ return entry ? 1 : -EAGAIN;
+ }
+
+ /* if statahead is busy in readdir, help it do post-work */
+ if (!ll_sa_entry_stated(entry) && sai->sai_in_readpage)
+ ll_post_statahead(sai);
+
+ if (!ll_sa_entry_stated(entry)) {
+ sai->sai_index_wait = entry->se_index;
+ lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
+ LWI_ON_SIGNAL_NOOP, NULL);
+ rc = l_wait_event(sai->sai_waitq,
+ ll_sa_entry_stated(entry) ||
+ thread_is_stopped(&sai->sai_thread),
+ &lwi);
+ if (rc < 0) {
ll_sai_unplug(sai, entry);
- return entry ? 1 : -EAGAIN;
+ return -EAGAIN;
}
+ }

- /* if statahead is busy in readdir, help it do post-work */
- while (!ll_sa_entry_stated(entry) && sai->sai_in_readpage &&
- !sa_received_empty(sai))
- ll_post_statahead(sai);
-
- if (!ll_sa_entry_stated(entry)) {
- sai->sai_index_wait = entry->se_index;
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
- LWI_ON_SIGNAL_NOOP, NULL);
- rc = l_wait_event(sai->sai_waitq,
- ll_sa_entry_stated(entry) ||
- thread_is_stopped(thread),
- &lwi);
- if (rc < 0) {
- ll_sai_unplug(sai, entry);
- return -EAGAIN;
- }
- }
+ if (entry->se_stat == SA_ENTRY_SUCC && entry->se_inode) {
+ struct inode *inode = entry->se_inode;
+ struct lookup_intent it = { .it_op = IT_GETATTR,
+ .it_lock_handle = entry->se_handle };
+ __u64 bits;
+
+ rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
+ ll_inode2fid(inode), &bits);
+ if (rc == 1) {
+ if (!(*dentryp)->d_inode) {
+ struct dentry *alias;

- if (entry->se_stat == SA_ENTRY_SUCC && entry->se_inode) {
- struct inode *inode = entry->se_inode;
- struct lookup_intent it = { .it_op = IT_GETATTR,
- .it_lock_handle =
- entry->se_handle };
- __u64 bits;
-
- rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
- ll_inode2fid(inode), &bits);
- if (rc == 1) {
- if (!d_inode(*dentryp)) {
- struct dentry *alias;
-
- alias = ll_splice_alias(inode,
- *dentryp);
- if (IS_ERR(alias)) {
- ll_sai_unplug(sai, entry);
- return PTR_ERR(alias);
- }
- *dentryp = alias;
- } else if (d_inode(*dentryp) != inode) {
- /* revalidate, but inode is recreated */
- CDEBUG(D_READA, "%s: stale dentry %pd inode "DFID", statahead inode "DFID"\n",
- ll_get_fsname(d_inode(*dentryp)->i_sb, NULL, 0),
- *dentryp,
- PFID(ll_inode2fid(d_inode(*dentryp))),
- PFID(ll_inode2fid(inode)));
- ll_intent_release(&it);
+ alias = ll_splice_alias(inode, *dentryp);
+ if (IS_ERR(alias)) {
ll_sai_unplug(sai, entry);
- return -ESTALE;
- } else {
- iput(inode);
+ return PTR_ERR(alias);
}
- entry->se_inode = NULL;
-
- if ((bits & MDS_INODELOCK_LOOKUP) &&
- d_lustre_invalid(*dentryp))
- d_lustre_revalidate(*dentryp);
- ll_intent_release(&it);
+ *dentryp = alias;
+ } else if ((*dentryp)->d_inode != inode) {
+ /* revalidate, but inode is recreated */
+ CDEBUG(D_READA,
+ "%s: stale dentry %pd inode "DFID", statahead inode "DFID"\n",
+ ll_get_fsname((*dentryp)->d_inode->i_sb,
+ NULL, 0),
+ *dentryp,
+ PFID(ll_inode2fid((*dentryp)->d_inode)),
+ PFID(ll_inode2fid(inode)));
+ rc = -ESTALE;
+ goto out_unplug;
+ } else {
+ iput(inode);
}
- }
+ entry->se_inode = NULL;

- ll_sai_unplug(sai, entry);
- return rc;
+ if ((bits & MDS_INODELOCK_LOOKUP) &&
+ d_lustre_invalid(*dentryp))
+ d_lustre_revalidate(*dentryp);
+ ll_intent_release(&it);
+ }
}
+out_unplug:
+ ll_sai_unplug(sai, entry);
+ return rc;
+}
+
+static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
+{
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai = NULL;
+ struct l_wait_info lwi = { 0 };
+ struct ptlrpc_thread *thread;
+ struct task_struct *task;
+ struct dentry *parent;
+ int rc;

/* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
- rc = is_first_dirent(dir, *dentryp);
+ rc = is_first_dirent(dir, dentry);
if (rc == LS_NONE_FIRST_DE) {
/* It is not "ls -{a}l" operation, no need statahead for it. */
rc = -EAGAIN;
@@ -1656,13 +1552,12 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
}

/* get parent reference count here, and put it in ll_statahead_thread */
- parent = dget((*dentryp)->d_parent);
+ parent = dget(dentry->d_parent);
if (unlikely(sai->sai_inode != d_inode(parent))) {
struct ll_inode_info *nlli = ll_i2info(d_inode(parent));

CWARN("Race condition, someone changed %pd just now: old parent "DFID", new parent "DFID"\n",
- *dentryp,
- PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
+ dentry, PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
dput(parent);
iput(sai->sai_inode);
rc = -EAGAIN;
@@ -1672,30 +1567,18 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
CDEBUG(D_READA, "start statahead thread: sai %p, parent %pd\n",
sai, parent);

- /* The sai buffer already has one reference taken at allocation time,
- * but as soon as we expose the sai by attaching it to the lli that
- * default reference can be dropped by another thread calling
- * ll_stop_statahead. We need to take a local reference to protect
- * the sai buffer while we intend to access it.
- */
- ll_sai_get(sai);
lli->lli_sai = sai;

- plli = ll_i2info(d_inode(parent));
task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
- plli->lli_opendir_pid);
+ lli->lli_opendir_pid);
thread = &sai->sai_thread;
if (IS_ERR(task)) {
rc = PTR_ERR(task);
- CERROR("can't start ll_sa thread, rc: %d\n", rc);
+ CERROR("cannot start ll_sa thread: rc = %d\n", rc);
dput(parent);
lli->lli_opendir_key = NULL;
thread_set_flags(thread, SVC_STOPPED);
thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
- /* Drop both our own local reference and the default
- * reference from allocation time.
- */
- ll_sai_put(sai);
ll_sai_put(sai);
LASSERT(!lli->lli_sai);
return -EAGAIN;
@@ -1704,6 +1587,7 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
l_wait_event(thread->t_ctl_waitq,
thread_is_running(thread) || thread_is_stopped(thread),
&lwi);
+ atomic_inc(&ll_i2sbi(d_inode(parent))->ll_sa_running);
ll_sai_put(sai);

/*
@@ -1717,6 +1601,37 @@ out:
spin_lock(&lli->lli_sa_lock);
lli->lli_opendir_key = NULL;
lli->lli_opendir_pid = 0;
+ lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock);
+
return rc;
}
+
+/**
+ * Start statahead thread if this is the first dir entry.
+ * Otherwise if a thread is started already, wait it until it is ahead of me.
+ * \retval 1 -- find entry with lock in cache, the caller needs to do
+ * nothing.
+ * \retval 0 -- find entry in cache, but without lock, the caller needs
+ * refresh from MDS.
+ * \retval others -- the caller need to process as non-statahead.
+ */
+int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
+ int only_unplug)
+{
+ struct ll_statahead_info *sai;
+
+ sai = ll_sai_get(dir);
+ if (sai) {
+ int rc;
+
+ rc = revalidate_statahead_dentry(dir, sai, dentryp,
+ only_unplug);
+ CDEBUG(D_READA, "revalidate statahead %pd: %d.\n",
+ *dentryp, rc);
+ ll_sai_put(sai);
+ return rc;
+ }
+
+ return start_statahead_thread(dir, *dentryp);
+}
--
1.7.1