ceph: fix iterate_caps removal race

We need to be able to iterate over all caps on a session with a
possibly slow callback on each cap.  To allow this, we used to
prevent cap reordering while we were iterating.  However, we were
not safe from races with removal: removing the 'next' cap would
make the next pointer from list_for_each_entry_safe be invalid,
and cause a lock up or similar badness.

Instead, we keep an iterator pointer in the session pointing to
the current cap.  As before, we avoid reordering.  For removal,
if the cap isn't the current cap we are iterating over, we are
fine.  If it is, we clear cap->ci (to mark the cap as pending
removal) but leave it in the session list.  In iterate_caps, we
can safely finish removal and get the next cap pointer.

While we're at it, clean up put_cap to not take a cap reservation
context, as it was never used.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2010-02-16 11:39:45 -08:00
parent 85ccce43a3
commit 7c1332b8cb
4 changed files with 70 additions and 36 deletions

View File

@ -266,12 +266,11 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
return cap;
}
static void put_cap(struct ceph_cap *cap,
struct ceph_cap_reservation *ctx)
void ceph_put_cap(struct ceph_cap *cap)
{
spin_lock(&caps_list_lock);
dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count,
dout("put_cap %p %d = %d used + %d resv + %d avail\n",
cap, caps_total_count, caps_use_count,
caps_reserve_count, caps_avail_count);
caps_use_count--;
/*
@ -282,12 +281,7 @@ static void put_cap(struct ceph_cap *cap,
caps_total_count--;
kmem_cache_free(ceph_cap_cachep, cap);
} else {
if (ctx) {
ctx->count++;
caps_reserve_count++;
} else {
caps_avail_count++;
}
caps_avail_count++;
list_add(&cap->caps_item, &caps_list);
}
@ -709,7 +703,7 @@ static void __touch_cap(struct ceph_cap *cap)
struct ceph_mds_session *s = cap->session;
spin_lock(&s->s_cap_lock);
if (!s->s_iterating_caps) {
if (s->s_cap_iterator == NULL) {
dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
s->s_mds);
list_move_tail(&cap->session_caps, &s->s_caps);
@ -865,8 +859,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
* caller should hold i_lock, and session s_mutex.
* returns true if this is the last cap. if so, caller should iput.
*/
void __ceph_remove_cap(struct ceph_cap *cap,
struct ceph_cap_reservation *ctx)
void __ceph_remove_cap(struct ceph_cap *cap)
{
struct ceph_mds_session *session = cap->session;
struct ceph_inode_info *ci = cap->ci;
@ -874,19 +867,27 @@ void __ceph_remove_cap(struct ceph_cap *cap,
dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
/* remove from session list */
spin_lock(&session->s_cap_lock);
list_del_init(&cap->session_caps);
session->s_nr_caps--;
spin_unlock(&session->s_cap_lock);
/* remove from inode list */
rb_erase(&cap->ci_node, &ci->i_caps);
cap->session = NULL;
cap->ci = NULL;
if (ci->i_auth_cap == cap)
ci->i_auth_cap = NULL;
put_cap(cap, ctx);
/* remove from session list */
spin_lock(&session->s_cap_lock);
if (session->s_cap_iterator == cap) {
/* not yet, we are iterating over this very cap */
dout("__ceph_remove_cap delaying %p removal from session %p\n",
cap, cap->session);
} else {
list_del_init(&cap->session_caps);
session->s_nr_caps--;
cap->session = NULL;
}
spin_unlock(&session->s_cap_lock);
if (cap->session == NULL)
ceph_put_cap(cap);
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
@ -1022,7 +1023,7 @@ void ceph_queue_caps_release(struct inode *inode)
}
spin_unlock(&session->s_cap_lock);
p = rb_next(p);
__ceph_remove_cap(cap, NULL);
__ceph_remove_cap(cap);
}
spin_unlock(&inode->i_lock);
@ -2521,7 +2522,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
ci->i_cap_exporting_mseq = mseq;
ci->i_cap_exporting_issued = cap->issued;
}
__ceph_remove_cap(cap, NULL);
__ceph_remove_cap(cap);
} else {
WARN_ON(!cap);
}

View File

@ -344,7 +344,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
s->s_num_cap_releases = 0;
s->s_iterating_caps = false;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_releases_done);
INIT_LIST_HEAD(&s->s_cap_flushing);
@ -729,28 +729,61 @@ static int iterate_session_caps(struct ceph_mds_session *session,
int (*cb)(struct inode *, struct ceph_cap *,
void *), void *arg)
{
struct ceph_cap *cap, *ncap;
struct inode *inode;
struct list_head *p;
struct ceph_cap *cap;
struct inode *inode, *last_inode = NULL;
struct ceph_cap *old_cap = NULL;
int ret;
dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
spin_lock(&session->s_cap_lock);
session->s_iterating_caps = true;
list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
p = session->s_caps.next;
while (p != &session->s_caps) {
cap = list_entry(p, struct ceph_cap, session_caps);
inode = igrab(&cap->ci->vfs_inode);
if (!inode)
if (!inode) {
p = p->next;
continue;
}
session->s_cap_iterator = cap;
spin_unlock(&session->s_cap_lock);
if (last_inode) {
iput(last_inode);
last_inode = NULL;
}
if (old_cap) {
ceph_put_cap(old_cap);
old_cap = NULL;
}
ret = cb(inode, cap, arg);
iput(inode);
last_inode = inode;
spin_lock(&session->s_cap_lock);
p = p->next;
if (cap->ci == NULL) {
dout("iterate_session_caps finishing cap %p removal\n",
cap);
BUG_ON(cap->session != session);
list_del_init(&cap->session_caps);
session->s_nr_caps--;
cap->session = NULL;
old_cap = cap; /* put_cap it w/o locks held */
}
if (ret < 0)
goto out;
}
ret = 0;
out:
session->s_iterating_caps = false;
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);
if (last_inode)
iput(last_inode);
if (old_cap)
ceph_put_cap(old_cap);
return ret;
}
@ -942,7 +975,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
session->s_trim_caps--;
if (oissued) {
/* we aren't the only cap.. just remove us */
__ceph_remove_cap(cap, NULL);
__ceph_remove_cap(cap);
} else {
/* try to drop referring dentries */
spin_unlock(&inode->i_lock);

View File

@ -114,7 +114,7 @@ struct ceph_mds_session {
int s_num_cap_releases;
struct list_head s_cap_releases; /* waiting cap_release messages */
struct list_head s_cap_releases_done; /* ready to send */
bool s_iterating_caps;
struct ceph_cap *s_cap_iterator;
/* protected by mutex */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */

View File

@ -795,15 +795,15 @@ extern int ceph_add_cap(struct inode *inode,
int fmode, unsigned issued, unsigned wanted,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap_reservation *caps_reservation);
extern void __ceph_remove_cap(struct ceph_cap *cap,
struct ceph_cap_reservation *ctx);
extern void __ceph_remove_cap(struct ceph_cap *cap);
static inline void ceph_remove_cap(struct ceph_cap *cap)
{
struct inode *inode = &cap->ci->vfs_inode;
spin_lock(&inode->i_lock);
__ceph_remove_cap(cap, NULL);
__ceph_remove_cap(cap);
spin_unlock(&inode->i_lock);
}
extern void ceph_put_cap(struct ceph_cap *cap);
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, int unused);