]> git.proxmox.com Git - mirror_ubuntu-jammy-kernel.git/commitdiff
Merge tag 'ceph-for-5.15-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 8 Sep 2021 22:50:32 +0000 (15:50 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 8 Sep 2021 22:50:32 +0000 (15:50 -0700)
Pull ceph updates from Ilya Dryomov:

 - a set of patches to address fsync stalls caused by depending on
   periodic rather than triggered MDS journal flushes in some cases
   (Xiubo Li)

 - a fix for mtime effectively not getting updated in case of competing
   writers (Jeff Layton)

 - a couple of fixes for inode reference leaks and various WARNs after
   "umount -f" (Xiubo Li)

 - a new ceph.auth_mds extended attribute (Jeff Layton)

 - a smattering of fixups and cleanups from Jeff, Xiubo and Colin.

* tag 'ceph-for-5.15-rc1' of git://github.com/ceph/ceph-client:
  ceph: fix dereference of null pointer cf
  ceph: drop the mdsc_get_session/put_session dout messages
  ceph: lockdep annotations for try_nonblocking_invalidate
  ceph: don't WARN if we're forcibly removing the session caps
  ceph: don't WARN if we're force umounting
  ceph: remove the capsnaps when removing caps
  ceph: request Fw caps before updating the mtime in ceph_write_iter
  ceph: reconnect to the export targets on new mdsmaps
  ceph: print more information when we can't find snaprealm
  ceph: add ceph_change_snap_realm() helper
  ceph: remove redundant initializations from mdsc and session
  ceph: cancel delayed work instead of flushing on mdsc teardown
  ceph: add a new vxattr to return auth mds for an inode
  ceph: remove some defunct forward declarations
  ceph: flush the mdlog before waiting on unsafe reqs
  ceph: flush mdlog before umounting
  ceph: make iterate_sessions a global symbol
  ceph: make ceph_create_session_msg a global symbol
  ceph: fix comment about short copies in ceph_write_end
  ceph: fix memory leak on decode error in ceph_handle_caps

14 files changed:
fs/ceph/addr.c
fs/ceph/cache.h
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/mdsmap.c
fs/ceph/metric.c
fs/ceph/snap.c
fs/ceph/strings.c
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/ceph_fs.h

index 7e7a897ae0d3f5ac58fc9042e6b44925b5dbbceb..99b80b5c7a931c1d6418e40c2f5a17713f638e22 100644 (file)
@@ -1281,8 +1281,8 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
        dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
             inode, page, (int)pos, (int)copied, (int)len);
 
-       /* zero the stale part of the page if we did a short copy */
        if (!PageUptodate(page)) {
+               /* just return that nothing was copied on a short copy */
                if (copied < len) {
                        copied = 0;
                        goto out;
index 1409d6149281723ce12512bcbfdbf234f8fec246..058ea2a043762e5d40444933dde39c12db0010cb 100644 (file)
@@ -26,12 +26,6 @@ void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci);
 void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp);
 void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci);
 
-int ceph_readpage_from_fscache(struct inode *inode, struct page *page);
-int ceph_readpages_from_fscache(struct inode *inode,
-                               struct address_space *mapping,
-                               struct list_head *pages,
-                               unsigned *nr_pages);
-
 static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
 {
        ci->fscache = NULL;
index 39db97f149b9ba7e066074ebe1b593e16e2eec97..6c0e52fd0743efda7852e89c52c35201f38bfed6 100644 (file)
@@ -703,29 +703,12 @@ void ceph_add_cap(struct inode *inode,
                 */
                struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
                                                               realmino);
-               if (realm) {
-                       struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
-                       if (oldrealm) {
-                               spin_lock(&oldrealm->inodes_with_caps_lock);
-                               list_del_init(&ci->i_snap_realm_item);
-                               spin_unlock(&oldrealm->inodes_with_caps_lock);
-                       }
-
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_add(&ci->i_snap_realm_item,
-                                &realm->inodes_with_caps);
-                       ci->i_snap_realm = realm;
-                       if (realm->ino == ci->i_vino.ino)
-                               realm->inode = inode;
-                       spin_unlock(&realm->inodes_with_caps_lock);
-
-                       if (oldrealm)
-                               ceph_put_snap_realm(mdsc, oldrealm);
-               } else {
-                       pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
-                              realmino);
-                       WARN_ON(!realm);
-               }
+               if (realm)
+                       ceph_change_snap_realm(inode, realm);
+               else
+                       WARN(1, "%s: couldn't find snap realm 0x%llx (ino 0x%llx oldrealm 0x%llx)\n",
+                            __func__, realmino, ci->i_vino.ino,
+                            ci->i_snap_realm ? ci->i_snap_realm->ino : 0);
        }
 
        __check_cap_issue(ci, cap, issued);
@@ -1112,20 +1095,6 @@ int ceph_is_any_caps(struct inode *inode)
        return ret;
 }
 
-static void drop_inode_snap_realm(struct ceph_inode_info *ci)
-{
-       struct ceph_snap_realm *realm = ci->i_snap_realm;
-       spin_lock(&realm->inodes_with_caps_lock);
-       list_del_init(&ci->i_snap_realm_item);
-       ci->i_snap_realm_counter++;
-       ci->i_snap_realm = NULL;
-       if (realm->ino == ci->i_vino.ino)
-               realm->inode = NULL;
-       spin_unlock(&realm->inodes_with_caps_lock);
-       ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc,
-                           realm);
-}
-
 /*
  * Remove a cap.  Take steps to deal with a racing iterate_session_caps.
  *
@@ -1145,17 +1114,16 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
                return;
        }
 
+       lockdep_assert_held(&ci->i_ceph_lock);
+
        dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
        mdsc = ceph_inode_to_client(&ci->vfs_inode)->mdsc;
 
        /* remove from inode's cap rbtree, and clear auth cap */
        rb_erase(&cap->ci_node, &ci->i_caps);
-       if (ci->i_auth_cap == cap) {
-               WARN_ON_ONCE(!list_empty(&ci->i_dirty_item) &&
-                            !mdsc->fsc->blocklisted);
+       if (ci->i_auth_cap == cap)
                ci->i_auth_cap = NULL;
-       }
 
        /* remove from session list */
        spin_lock(&session->s_cap_lock);
@@ -1201,12 +1169,34 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
                 * keep i_snap_realm.
                 */
                if (ci->i_wr_ref == 0 && ci->i_snap_realm)
-                       drop_inode_snap_realm(ci);
+                       ceph_change_snap_realm(&ci->vfs_inode, NULL);
 
                __cap_delay_cancel(mdsc, ci);
        }
 }
 
+void ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
+{
+       struct ceph_inode_info *ci = cap->ci;
+       struct ceph_fs_client *fsc;
+
+       /* 'ci' being NULL means the remove have already occurred */
+       if (!ci) {
+               dout("%s: cap inode is NULL\n", __func__);
+               return;
+       }
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+       WARN_ON_ONCE(ci->i_auth_cap == cap &&
+                    !list_empty(&ci->i_dirty_item) &&
+                    !fsc->blocklisted &&
+                    READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
+
+       __ceph_remove_cap(cap, queue_release);
+}
+
 struct cap_msg_args {
        struct ceph_mds_session *session;
        u64                     ino, cid, follows;
@@ -1335,7 +1325,7 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
        while (p) {
                struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
                p = rb_next(p);
-               __ceph_remove_cap(cap, true);
+               ceph_remove_cap(cap, true);
        }
        spin_unlock(&ci->i_ceph_lock);
 }
@@ -1746,6 +1736,9 @@ struct ceph_cap_flush *ceph_alloc_cap_flush(void)
        struct ceph_cap_flush *cf;
 
        cf = kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+       if (!cf)
+               return NULL;
+
        cf->is_capsnap = false;
        return cf;
 }
@@ -1856,6 +1849,8 @@ static u64 __mark_caps_flushing(struct inode *inode,
  * try to invalidate mapping pages without blocking.
  */
 static int try_nonblocking_invalidate(struct inode *inode)
+       __releases(ci->i_ceph_lock)
+       __acquires(ci->i_ceph_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u32 invalidating_gen = ci->i_rdcache_gen;
@@ -2219,6 +2214,7 @@ static int caps_are_flushed(struct inode *inode, u64 flush_tid)
  */
 static int unsafe_request_wait(struct inode *inode)
 {
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_request *req1 = NULL, *req2 = NULL;
        int ret, err = 0;
@@ -2238,6 +2234,81 @@ static int unsafe_request_wait(struct inode *inode)
        }
        spin_unlock(&ci->i_unsafe_lock);
 
+       /*
+        * Trigger to flush the journal logs in all the relevant MDSes
+        * manually, or in the worst case we must wait at most 5 seconds
+        * to wait the journal logs to be flushed by the MDSes periodically.
+        */
+       if (req1 || req2) {
+               struct ceph_mds_session **sessions = NULL;
+               struct ceph_mds_session *s;
+               struct ceph_mds_request *req;
+               unsigned int max;
+               int i;
+
+               /*
+                * The mdsc->max_sessions is unlikely to be changed
+                * mostly, here we will retry it by reallocating the
+                * sessions arrary memory to get rid of the mdsc->mutex
+                * lock.
+                */
+retry:
+               max = mdsc->max_sessions;
+               sessions = krealloc(sessions, max * sizeof(s), __GFP_ZERO);
+               if (!sessions)
+                       return -ENOMEM;
+
+               spin_lock(&ci->i_unsafe_lock);
+               if (req1) {
+                       list_for_each_entry(req, &ci->i_unsafe_dirops,
+                                           r_unsafe_dir_item) {
+                               s = req->r_session;
+                               if (unlikely(s->s_mds > max)) {
+                                       spin_unlock(&ci->i_unsafe_lock);
+                                       goto retry;
+                               }
+                               if (!sessions[s->s_mds]) {
+                                       s = ceph_get_mds_session(s);
+                                       sessions[s->s_mds] = s;
+                               }
+                       }
+               }
+               if (req2) {
+                       list_for_each_entry(req, &ci->i_unsafe_iops,
+                                           r_unsafe_target_item) {
+                               s = req->r_session;
+                               if (unlikely(s->s_mds > max)) {
+                                       spin_unlock(&ci->i_unsafe_lock);
+                                       goto retry;
+                               }
+                               if (!sessions[s->s_mds]) {
+                                       s = ceph_get_mds_session(s);
+                                       sessions[s->s_mds] = s;
+                               }
+                       }
+               }
+               spin_unlock(&ci->i_unsafe_lock);
+
+               /* the auth MDS */
+               spin_lock(&ci->i_ceph_lock);
+               if (ci->i_auth_cap) {
+                     s = ci->i_auth_cap->session;
+                     if (!sessions[s->s_mds])
+                             sessions[s->s_mds] = ceph_get_mds_session(s);
+               }
+               spin_unlock(&ci->i_ceph_lock);
+
+               /* send flush mdlog request to MDSes */
+               for (i = 0; i < max; i++) {
+                       s = sessions[i];
+                       if (s) {
+                               send_flush_mdlog(s);
+                               ceph_put_mds_session(s);
+                       }
+               }
+               kfree(sessions);
+       }
+
        dout("unsafe_request_wait %p wait on tid %llu %llu\n",
             inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
        if (req1) {
@@ -3008,7 +3079,7 @@ static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
                        }
                        /* see comment in __ceph_remove_cap() */
                        if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
-                               drop_inode_snap_realm(ci);
+                               ceph_change_snap_realm(inode, NULL);
                }
        }
        if (check_flushsnaps && __ceph_have_pending_cap_snap(ci)) {
@@ -3114,7 +3185,16 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                                break;
                        }
                }
-               BUG_ON(!found);
+
+               if (!found) {
+                       /*
+                        * The capsnap should already be removed when removing
+                        * auth cap in the case of a forced unmount.
+                        */
+                       WARN_ON_ONCE(ci->i_auth_cap);
+                       goto unlock;
+               }
+
                capsnap->dirty_pages -= nr;
                if (capsnap->dirty_pages == 0) {
                        complete_capsnap = true;
@@ -3136,6 +3216,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                     complete_capsnap ? " (complete capsnap)" : "");
        }
 
+unlock:
        spin_unlock(&ci->i_ceph_lock);
 
        if (last) {
@@ -3606,6 +3687,43 @@ out:
                iput(inode);
 }
 
+void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
+                          bool *wake_ci, bool *wake_mdsc)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+       bool ret;
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       dout("removing capsnap %p, inode %p ci %p\n", capsnap, inode, ci);
+
+       list_del_init(&capsnap->ci_item);
+       ret = __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
+       if (wake_ci)
+               *wake_ci = ret;
+
+       spin_lock(&mdsc->cap_dirty_lock);
+       if (list_empty(&ci->i_cap_flush_list))
+               list_del_init(&ci->i_flushing_item);
+
+       ret = __detach_cap_flush_from_mdsc(mdsc, &capsnap->cap_flush);
+       if (wake_mdsc)
+               *wake_mdsc = ret;
+       spin_unlock(&mdsc->cap_dirty_lock);
+}
+
+void ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
+                        bool *wake_ci, bool *wake_mdsc)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       WARN_ON_ONCE(capsnap->dirty_pages || capsnap->writing);
+       __ceph_remove_capsnap(inode, capsnap, wake_ci, wake_mdsc);
+}
+
 /*
  * Handle FLUSHSNAP_ACK.  MDS has flushed snap data to disk and we can
  * throw away our cap_snap.
@@ -3643,23 +3761,10 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
                             capsnap, capsnap->follows);
                }
        }
-       if (flushed) {
-               WARN_ON(capsnap->dirty_pages || capsnap->writing);
-               dout(" removing %p cap_snap %p follows %lld\n",
-                    inode, capsnap, follows);
-               list_del(&capsnap->ci_item);
-               wake_ci |= __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
-
-               spin_lock(&mdsc->cap_dirty_lock);
-
-               if (list_empty(&ci->i_cap_flush_list))
-                       list_del_init(&ci->i_flushing_item);
-
-               wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc,
-                                                         &capsnap->cap_flush);
-               spin_unlock(&mdsc->cap_dirty_lock);
-       }
+       if (flushed)
+               ceph_remove_capsnap(inode, capsnap, &wake_ci, &wake_mdsc);
        spin_unlock(&ci->i_ceph_lock);
+
        if (flushed) {
                ceph_put_snap_context(capsnap->context);
                ceph_put_cap_snap(capsnap);
@@ -3743,7 +3848,7 @@ retry:
                goto out_unlock;
 
        if (target < 0) {
-               __ceph_remove_cap(cap, false);
+               ceph_remove_cap(cap, false);
                goto out_unlock;
        }
 
@@ -3778,7 +3883,7 @@ retry:
                                change_auth_cap_ses(ci, tcap->session);
                        }
                }
-               __ceph_remove_cap(cap, false);
+               ceph_remove_cap(cap, false);
                goto out_unlock;
        } else if (tsession) {
                /* add placeholder for the export tagert */
@@ -3795,7 +3900,7 @@ retry:
                        spin_unlock(&mdsc->cap_dirty_lock);
                }
 
-               __ceph_remove_cap(cap, false);
+               ceph_remove_cap(cap, false);
                goto out_unlock;
        }
 
@@ -3906,7 +4011,7 @@ retry:
                                        ocap->mseq, mds, le32_to_cpu(ph->seq),
                                        le32_to_cpu(ph->mseq));
                }
-               __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
+               ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
        }
 
        *old_issued = issued;
@@ -4134,8 +4239,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 done:
        mutex_unlock(&session->s_mutex);
 done_unlocked:
-       ceph_put_string(extra_info.pool_ns);
        iput(inode);
+out:
+       ceph_put_string(extra_info.pool_ns);
        return;
 
 flush_cap_releases:
@@ -4150,7 +4256,7 @@ flush_cap_releases:
 bad:
        pr_err("ceph_handle_caps: corrupt message\n");
        ceph_msg_dump(msg);
-       return;
+       goto out;
 }
 
 /*
@@ -4225,33 +4331,9 @@ static void flush_dirty_session_caps(struct ceph_mds_session *s)
        dout("flush_dirty_caps done\n");
 }
 
-static void iterate_sessions(struct ceph_mds_client *mdsc,
-                            void (*cb)(struct ceph_mds_session *))
-{
-       int mds;
-
-       mutex_lock(&mdsc->mutex);
-       for (mds = 0; mds < mdsc->max_sessions; ++mds) {
-               struct ceph_mds_session *s;
-
-               if (!mdsc->sessions[mds])
-                       continue;
-
-               s = ceph_get_mds_session(mdsc->sessions[mds]);
-               if (!s)
-                       continue;
-
-               mutex_unlock(&mdsc->mutex);
-               cb(s);
-               ceph_put_mds_session(s);
-               mutex_lock(&mdsc->mutex);
-       }
-       mutex_unlock(&mdsc->mutex);
-}
-
 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 {
-       iterate_sessions(mdsc, flush_dirty_session_caps);
+       ceph_mdsc_iterate_sessions(mdsc, flush_dirty_session_caps, true);
 }
 
 void __ceph_touch_fmode(struct ceph_inode_info *ci,
index e1d605a02d4a39047836e27b81960cb2592dba90..d16fd2d5fd426205c8d3678430b677781bdea6dd 100644 (file)
@@ -1722,32 +1722,26 @@ retry_snap:
                goto out;
        }
 
-       err = file_remove_privs(file);
-       if (err)
+       down_read(&osdc->lock);
+       map_flags = osdc->osdmap->flags;
+       pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id);
+       up_read(&osdc->lock);
+       if ((map_flags & CEPH_OSDMAP_FULL) ||
+           (pool_flags & CEPH_POOL_FLAG_FULL)) {
+               err = -ENOSPC;
                goto out;
+       }
 
-       err = file_update_time(file);
+       err = file_remove_privs(file);
        if (err)
                goto out;
 
-       inode_inc_iversion_raw(inode);
-
        if (ci->i_inline_version != CEPH_INLINE_NONE) {
                err = ceph_uninline_data(file, NULL);
                if (err < 0)
                        goto out;
        }
 
-       down_read(&osdc->lock);
-       map_flags = osdc->osdmap->flags;
-       pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id);
-       up_read(&osdc->lock);
-       if ((map_flags & CEPH_OSDMAP_FULL) ||
-           (pool_flags & CEPH_POOL_FLAG_FULL)) {
-               err = -ENOSPC;
-               goto out;
-       }
-
        dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
             inode, ceph_vinop(inode), pos, count, i_size_read(inode));
        if (fi->fmode & CEPH_FILE_MODE_LAZY)
@@ -1759,6 +1753,12 @@ retry_snap:
        if (err < 0)
                goto out;
 
+       err = file_update_time(file);
+       if (err)
+               goto out_caps;
+
+       inode_inc_iversion_raw(inode);
+
        dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
             inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 
@@ -1842,6 +1842,8 @@ retry_snap:
        }
 
        goto out_unlocked;
+out_caps:
+       ceph_put_cap_refs(ci, got);
 out:
        if (direct_lock)
                ceph_end_io_direct(inode);
index 1bd2cc015913f9e5da20b7b70a2acd0d4f396d8f..2df1e1284451e66999ac34465e1e7617a13d5fd9 100644 (file)
@@ -581,16 +581,9 @@ void ceph_evict_inode(struct inode *inode)
         */
        if (ci->i_snap_realm) {
                if (ceph_snap(inode) == CEPH_NOSNAP) {
-                       struct ceph_snap_realm *realm = ci->i_snap_realm;
                        dout(" dropping residual ref to snap realm %p\n",
-                            realm);
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_del_init(&ci->i_snap_realm_item);
-                       ci->i_snap_realm = NULL;
-                       if (realm->ino == ci->i_vino.ino)
-                               realm->inode = NULL;
-                       spin_unlock(&realm->inodes_with_caps_lock);
-                       ceph_put_snap_realm(mdsc, realm);
+                            ci->i_snap_realm);
+                       ceph_change_snap_realm(inode, NULL);
                } else {
                        ceph_put_snapid_map(mdsc, ci->i_snapid_map);
                        ci->i_snap_realm = NULL;
index 0b69aec23e5c4846ba47cad4faaa70f78973db09..7cad180d6debc5a52be66ba082965201a6490e61 100644 (file)
@@ -11,6 +11,7 @@
 #include <linux/ratelimit.h>
 #include <linux/bits.h>
 #include <linux/ktime.h>
+#include <linux/bitmap.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -652,14 +653,9 @@ const char *ceph_session_state_name(int s)
 
 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
 {
-       if (refcount_inc_not_zero(&s->s_ref)) {
-               dout("mdsc get_session %p %d -> %d\n", s,
-                    refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
+       if (refcount_inc_not_zero(&s->s_ref))
                return s;
-       } else {
-               dout("mdsc get_session %p 0 -- FAIL\n", s);
-               return NULL;
-       }
+       return NULL;
 }
 
 void ceph_put_mds_session(struct ceph_mds_session *s)
@@ -667,8 +663,6 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
        if (IS_ERR_OR_NULL(s))
                return;
 
-       dout("mdsc put_session %p %d -> %d\n", s,
-            refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
        if (refcount_dec_and_test(&s->s_ref)) {
                if (s->s_auth.authorizer)
                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
@@ -743,8 +737,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_mdsc = mdsc;
        s->s_mds = mds;
        s->s_state = CEPH_MDS_SESSION_NEW;
-       s->s_ttl = 0;
-       s->s_seq = 0;
        mutex_init(&s->s_mutex);
 
        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
@@ -753,17 +745,11 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_cap_ttl = jiffies - 1;
 
        spin_lock_init(&s->s_cap_lock);
-       s->s_renew_requested = 0;
-       s->s_renew_seq = 0;
        INIT_LIST_HEAD(&s->s_caps);
-       s->s_nr_caps = 0;
        refcount_set(&s->s_ref, 1);
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
        xa_init(&s->s_delegated_inos);
-       s->s_num_cap_releases = 0;
-       s->s_cap_reconnect = 0;
-       s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
 
@@ -811,6 +797,33 @@ static void put_request_session(struct ceph_mds_request *req)
        }
 }
 
+void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
+                               void (*cb)(struct ceph_mds_session *),
+                               bool check_state)
+{
+       int mds;
+
+       mutex_lock(&mdsc->mutex);
+       for (mds = 0; mds < mdsc->max_sessions; ++mds) {
+               struct ceph_mds_session *s;
+
+               s = __ceph_lookup_mds_session(mdsc, mds);
+               if (!s)
+                       continue;
+
+               if (check_state && !check_session_state(s)) {
+                       ceph_put_mds_session(s);
+                       continue;
+               }
+
+               mutex_unlock(&mdsc->mutex);
+               cb(s);
+               ceph_put_mds_session(s);
+               mutex_lock(&mdsc->mutex);
+       }
+       mutex_unlock(&mdsc->mutex);
+}
+
 void ceph_mdsc_release_request(struct kref *kref)
 {
        struct ceph_mds_request *req = container_of(kref,
@@ -1155,7 +1168,7 @@ random:
 /*
  * session messages
  */
-static struct ceph_msg *create_session_msg(u32 op, u64 seq)
+struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
 {
        struct ceph_msg *msg;
        struct ceph_mds_session_head *h;
@@ -1163,7 +1176,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
                           false);
        if (!msg) {
-               pr_err("create_session_msg ENOMEM creating msg\n");
+               pr_err("ENOMEM creating session %s msg\n",
+                      ceph_session_op_name(op));
                return NULL;
        }
        h = msg->front.iov_base;
@@ -1294,7 +1308,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
                           GFP_NOFS, false);
        if (!msg) {
-               pr_err("create_session_msg ENOMEM creating msg\n");
+               pr_err("ENOMEM creating session open msg\n");
                return ERR_PTR(-ENOMEM);
        }
        p = msg->front.iov_base;
@@ -1583,14 +1597,39 @@ out:
        return ret;
 }
 
+static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_cap_snap *capsnap;
+       int capsnap_release = 0;
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
+
+       while (!list_empty(&ci->i_cap_snaps)) {
+               capsnap = list_first_entry(&ci->i_cap_snaps,
+                                          struct ceph_cap_snap, ci_item);
+               __ceph_remove_capsnap(inode, capsnap, NULL, NULL);
+               ceph_put_snap_context(capsnap->context);
+               ceph_put_cap_snap(capsnap);
+               capsnap_release++;
+       }
+       wake_up_all(&ci->i_cap_wq);
+       wake_up_all(&mdsc->cap_flushing_wq);
+       return capsnap_release;
+}
+
 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                                  void *arg)
 {
        struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        LIST_HEAD(to_remove);
        bool dirty_dropped = false;
        bool invalidate = false;
+       int capsnap_release = 0;
 
        dout("removing cap %p, ci is %p, inode is %p\n",
             cap, ci, &ci->vfs_inode);
@@ -1598,7 +1637,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
        __ceph_remove_cap(cap, false);
        if (!ci->i_auth_cap) {
                struct ceph_cap_flush *cf;
-               struct ceph_mds_client *mdsc = fsc->mdsc;
 
                if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
                        if (inode->i_data.nrpages > 0)
@@ -1662,6 +1700,9 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                        list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
                        ci->i_prealloc_cap_flush = NULL;
                }
+
+               if (!list_empty(&ci->i_cap_snaps))
+                       capsnap_release = remove_capsnaps(mdsc, inode);
        }
        spin_unlock(&ci->i_ceph_lock);
        while (!list_empty(&to_remove)) {
@@ -1678,6 +1719,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                ceph_queue_invalidate(inode);
        if (dirty_dropped)
                iput(inode);
+       while (capsnap_release--)
+               iput(inode);
        return 0;
 }
 
@@ -1803,8 +1846,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
 
        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
                ceph_mds_state_name(state));
-       msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
-                                ++session->s_renew_seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
+                                     ++session->s_renew_seq);
        if (!msg)
                return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
@@ -1818,7 +1861,7 @@ static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
 
        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
             session->s_mds, ceph_session_state_name(session->s_state), seq);
-       msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
        if (!msg)
                return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
@@ -1870,7 +1913,8 @@ static int request_close_session(struct ceph_mds_session *session)
        dout("request_close_session mds%d state %s seq %lld\n",
             session->s_mds, ceph_session_state_name(session->s_state),
             session->s_seq);
-       msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
+                                     session->s_seq);
        if (!msg)
                return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
@@ -1965,7 +2009,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 
        if (oissued) {
                /* we aren't the only cap.. just remove us */
-               __ceph_remove_cap(cap, true);
+               ceph_remove_cap(cap, true);
                (*remaining)--;
        } else {
                struct dentry *dentry;
@@ -4150,13 +4194,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                          struct ceph_mdsmap *newmap,
                          struct ceph_mdsmap *oldmap)
 {
-       int i;
+       int i, j, err;
        int oldstate, newstate;
        struct ceph_mds_session *s;
+       unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
 
        dout("check_new_map new %u old %u\n",
             newmap->m_epoch, oldmap->m_epoch);
 
+       if (newmap->m_info) {
+               for (i = 0; i < newmap->possible_max_rank; i++) {
+                       for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
+                               set_bit(newmap->m_info[i].export_targets[j], targets);
+               }
+       }
+
        for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
                if (!mdsc->sessions[i])
                        continue;
@@ -4210,6 +4262,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
                    newstate >= CEPH_MDS_STATE_RECONNECT) {
                        mutex_unlock(&mdsc->mutex);
+                       clear_bit(i, targets);
                        send_mds_reconnect(mdsc, s);
                        mutex_lock(&mdsc->mutex);
                }
@@ -4232,6 +4285,51 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                }
        }
 
+       /*
+        * Only open and reconnect sessions that don't exist yet.
+        */
+       for (i = 0; i < newmap->possible_max_rank; i++) {
+               /*
+                * In case the import MDS is crashed just after
+                * the EImportStart journal is flushed, so when
+                * a standby MDS takes over it and is replaying
+                * the EImportStart journal the new MDS daemon
+                * will wait the client to reconnect it, but the
+                * client may never register/open the session yet.
+                *
+                * Will try to reconnect that MDS daemon if the
+                * rank number is in the export targets array and
+                * is the up:reconnect state.
+                */
+               newstate = ceph_mdsmap_get_state(newmap, i);
+               if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
+                       continue;
+
+               /*
+                * The session maybe registered and opened by some
+                * requests which were choosing random MDSes during
+                * the mdsc->mutex's unlock/lock gap below in rare
+                * case. But the related MDS daemon will just queue
+                * that requests and be still waiting for the client's
+                * reconnection request in up:reconnect state.
+                */
+               s = __ceph_lookup_mds_session(mdsc, i);
+               if (likely(!s)) {
+                       s = __open_export_target_session(mdsc, i);
+                       if (IS_ERR(s)) {
+                               err = PTR_ERR(s);
+                               pr_err("failed to open export target session, err %d\n",
+                                      err);
+                               continue;
+                       }
+               }
+               dout("send reconnect to export target mds.%d\n", i);
+               mutex_unlock(&mdsc->mutex);
+               send_mds_reconnect(mdsc, s);
+               ceph_put_mds_session(s);
+               mutex_lock(&mdsc->mutex);
+       }
+
        for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
                s = mdsc->sessions[i];
                if (!s)
@@ -4409,24 +4507,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
 }
 
 /*
- * lock unlock sessions, to wait ongoing session activities
+ * lock unlock the session, to wait ongoing session activities
  */
-static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
+static void lock_unlock_session(struct ceph_mds_session *s)
 {
-       int i;
-
-       mutex_lock(&mdsc->mutex);
-       for (i = 0; i < mdsc->max_sessions; i++) {
-               struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
-               if (!s)
-                       continue;
-               mutex_unlock(&mdsc->mutex);
-               mutex_lock(&s->s_mutex);
-               mutex_unlock(&s->s_mutex);
-               ceph_put_mds_session(s);
-               mutex_lock(&mdsc->mutex);
-       }
-       mutex_unlock(&mdsc->mutex);
+       mutex_lock(&s->s_mutex);
+       mutex_unlock(&s->s_mutex);
 }
 
 static void maybe_recover_session(struct ceph_mds_client *mdsc)
@@ -4448,6 +4534,8 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc)
 
 bool check_session_state(struct ceph_mds_session *s)
 {
+       struct ceph_fs_client *fsc = s->s_mdsc->fsc;
+
        switch (s->s_state) {
        case CEPH_MDS_SESSION_OPEN:
                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
@@ -4456,8 +4544,9 @@ bool check_session_state(struct ceph_mds_session *s)
                }
                break;
        case CEPH_MDS_SESSION_CLOSING:
-               /* Should never reach this when we're unmounting */
-               WARN_ON_ONCE(s->s_ttl);
+               /* Should never reach this when not force unmounting */
+               WARN_ON_ONCE(s->s_ttl &&
+                            READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
                fallthrough;
        case CEPH_MDS_SESSION_NEW:
        case CEPH_MDS_SESSION_RESTARTING:
@@ -4584,21 +4673,12 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        init_completion(&mdsc->safe_umount_waiters);
        init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
-       mdsc->sessions = NULL;
-       atomic_set(&mdsc->num_sessions, 0);
-       mdsc->max_sessions = 0;
-       mdsc->stopping = 0;
-       atomic64_set(&mdsc->quotarealms_count, 0);
        mdsc->quotarealms_inodes = RB_ROOT;
        mutex_init(&mdsc->quotarealms_inodes_mutex);
-       mdsc->last_snap_seq = 0;
        init_rwsem(&mdsc->snap_rwsem);
        mdsc->snap_realms = RB_ROOT;
        INIT_LIST_HEAD(&mdsc->snap_empty);
-       mdsc->num_snap_realms = 0;
        spin_lock_init(&mdsc->snap_empty_lock);
-       mdsc->last_tid = 0;
-       mdsc->oldest_tid = 0;
        mdsc->request_tree = RB_ROOT;
        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
        mdsc->last_renew_caps = jiffies;
@@ -4610,11 +4690,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        mdsc->last_cap_flush_tid = 1;
        INIT_LIST_HEAD(&mdsc->cap_flush_list);
        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
-       mdsc->num_cap_flushing = 0;
        spin_lock_init(&mdsc->cap_dirty_lock);
        init_waitqueue_head(&mdsc->cap_flushing_wq);
        INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
-       atomic_set(&mdsc->cap_reclaim_pending, 0);
        err = ceph_metric_init(&mdsc->metric);
        if (err)
                goto err_mdsmap;
@@ -4676,6 +4754,30 @@ static void wait_requests(struct ceph_mds_client *mdsc)
        dout("wait_requests done\n");
 }
 
+void send_flush_mdlog(struct ceph_mds_session *s)
+{
+       struct ceph_msg *msg;
+
+       /*
+        * Pre-luminous MDS crashes when it sees an unknown session request
+        */
+       if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
+               return;
+
+       mutex_lock(&s->s_mutex);
+       dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
+            ceph_session_state_name(s->s_state), s->s_seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
+                                     s->s_seq);
+       if (!msg) {
+               pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
+                      s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
+       } else {
+               ceph_con_send(&s->s_con, msg);
+       }
+       mutex_unlock(&s->s_mutex);
+}
+
 /*
  * called before mount is ro, and before dentries are torn down.
  * (hmm, does this still race with new lookups?)
@@ -4685,7 +4787,8 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
        dout("pre_umount\n");
        mdsc->stopping = 1;
 
-       lock_unlock_sessions(mdsc);
+       ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
+       ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
        ceph_flush_dirty_caps(mdsc);
        wait_requests(mdsc);
 
@@ -4912,7 +5015,6 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
 
        ceph_metric_destroy(&mdsc->metric);
 
-       flush_delayed_work(&mdsc->metric.delayed_work);
        fsc->mdsc = NULL;
        kfree(mdsc);
        dout("mdsc_destroy %p done\n", mdsc);
index 20e42d8b66c6cdb96cbe8281cd152ecf50d9228b..97c7f7bfa55f39ba67ae1f733791148142406241 100644 (file)
@@ -522,6 +522,11 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
        kref_put(&req->r_kref, ceph_mdsc_release_request);
 }
 
+extern void send_flush_mdlog(struct ceph_mds_session *s);
+extern void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
+                                      void (*cb)(struct ceph_mds_session *),
+                                      bool check_state);
+extern struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq);
 extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
                                    struct ceph_cap *cap);
 extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
index 3c444b9cb17b8ce3ce4238cf863cbbb2989d5277..61d67cbcb36711c119c3365805308fc5771baa7c 100644 (file)
@@ -122,6 +122,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end, bool msgr2)
        int err;
        u8 mdsmap_v;
        u16 mdsmap_ev;
+       u32 target;
 
        m = kzalloc(sizeof(*m), GFP_NOFS);
        if (!m)
@@ -260,9 +261,14 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end, bool msgr2)
                                                       sizeof(u32), GFP_NOFS);
                        if (!info->export_targets)
                                goto nomem;
-                       for (j = 0; j < num_export_targets; j++)
-                               info->export_targets[j] =
-                                      ceph_decode_32(&pexport_targets);
+                       for (j = 0; j < num_export_targets; j++) {
+                               target = ceph_decode_32(&pexport_targets);
+                               if (target >= m->possible_max_rank) {
+                                       err = -EIO;
+                                       goto corrupt;
+                               }
+                               info->export_targets[j] = target;
+                       }
                } else {
                        info->export_targets = NULL;
                }
index 5ac151eb0d498d0d612cfb3a860890de4f19f59f..04d5df29bbbfb30bbfe06253e3438df1dd0b10eb 100644 (file)
@@ -302,6 +302,8 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
        if (!m)
                return;
 
+       cancel_delayed_work_sync(&m->delayed_work);
+
        percpu_counter_destroy(&m->total_inodes);
        percpu_counter_destroy(&m->opened_inodes);
        percpu_counter_destroy(&m->i_caps_mis);
@@ -309,8 +311,6 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
        percpu_counter_destroy(&m->d_lease_mis);
        percpu_counter_destroy(&m->d_lease_hit);
 
-       cancel_delayed_work_sync(&m->delayed_work);
-
        ceph_put_mds_session(m->session);
 }
 
index 15105f9da3fd2ad9790fa4e5110c83cd6eb8ab70..b41e6724c5910418a35523eebf1c489288fd35a4 100644 (file)
@@ -849,6 +849,43 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
        dout("flush_snaps done\n");
 }
 
+/**
+ * ceph_change_snap_realm - change the snap_realm for an inode
+ * @inode: inode to move to new snap realm
+ * @realm: new realm to move inode into (may be NULL)
+ *
+ * Detach an inode from its old snaprealm (if any) and attach it to
+ * the new snaprealm (if any). The old snap realm reference held by
+ * the inode is put. If realm is non-NULL, then the caller's reference
+ * to it is taken over by the inode.
+ */
+void ceph_change_snap_realm(struct inode *inode, struct ceph_snap_realm *realm)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+       struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       if (oldrealm) {
+               spin_lock(&oldrealm->inodes_with_caps_lock);
+               list_del_init(&ci->i_snap_realm_item);
+               if (oldrealm->ino == ci->i_vino.ino)
+                       oldrealm->inode = NULL;
+               spin_unlock(&oldrealm->inodes_with_caps_lock);
+               ceph_put_snap_realm(mdsc, oldrealm);
+       }
+
+       ci->i_snap_realm = realm;
+
+       if (realm) {
+               spin_lock(&realm->inodes_with_caps_lock);
+               list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
+               if (realm->ino == ci->i_vino.ino)
+                       realm->inode = inode;
+               spin_unlock(&realm->inodes_with_caps_lock);
+       }
+}
 
 /*
  * Handle a snap notification from the MDS.
@@ -935,7 +972,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        };
                        struct inode *inode = ceph_find_inode(sb, vino);
                        struct ceph_inode_info *ci;
-                       struct ceph_snap_realm *oldrealm;
 
                        if (!inode)
                                continue;
@@ -960,27 +996,10 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        }
                        dout(" will move %p to split realm %llx %p\n",
                             inode, realm->ino, realm);
-                       /*
-                        * Move the inode to the new realm
-                        */
-                       oldrealm = ci->i_snap_realm;
-                       spin_lock(&oldrealm->inodes_with_caps_lock);
-                       list_del_init(&ci->i_snap_realm_item);
-                       spin_unlock(&oldrealm->inodes_with_caps_lock);
-
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_add(&ci->i_snap_realm_item,
-                                &realm->inodes_with_caps);
-                       ci->i_snap_realm = realm;
-                       if (realm->ino == ci->i_vino.ino)
-                                realm->inode = inode;
-                       spin_unlock(&realm->inodes_with_caps_lock);
-
-                       spin_unlock(&ci->i_ceph_lock);
 
                        ceph_get_snap_realm(mdsc, realm);
-                       ceph_put_snap_realm(mdsc, oldrealm);
-
+                       ceph_change_snap_realm(inode, realm);
+                       spin_unlock(&ci->i_ceph_lock);
                        iput(inode);
                        continue;
 
index 4a79f3632260e83a9f3169aea4c11da0119f8117..573bb9556fb56cbf16802d7770fcf22305bc5469 100644 (file)
@@ -46,6 +46,7 @@ const char *ceph_session_op_name(int op)
        case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
        case CEPH_SESSION_FORCE_RO: return "force_ro";
        case CEPH_SESSION_REJECT: return "reject";
+       case CEPH_SESSION_REQUEST_FLUSH_MDLOG: return "flush_mdlog";
        }
        return "???";
 }
index c30258f95e37778f12a55c825e7190a04bab2f06..a40eb14c282af3604223fa47dcf99e0531e52b11 100644 (file)
@@ -418,7 +418,6 @@ struct ceph_inode_info {
                struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
                struct ceph_snapid_map *i_snapid_map; /* snapid -> dev_t */
        };
-       int i_snap_realm_counter; /* snap realm (if caps) */
        struct list_head i_snap_realm_item;
        struct list_head i_snap_flush_item;
        struct timespec64 i_btime;
@@ -929,6 +928,7 @@ extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
 extern int ceph_update_snap_trace(struct ceph_mds_client *m,
                                  void *p, void *e, bool deletion,
                                  struct ceph_snap_realm **realm_ret);
+void ceph_change_snap_realm(struct inode *inode, struct ceph_snap_realm *realm);
 extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
                             struct ceph_mds_session *session,
                             struct ceph_msg *msg);
@@ -1138,6 +1138,7 @@ extern void ceph_add_cap(struct inode *inode,
                         unsigned cap, unsigned seq, u64 realmino, int flags,
                         struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
+extern void ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
 extern void __ceph_remove_caps(struct ceph_inode_info *ci);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
                         struct ceph_cap *cap);
@@ -1163,6 +1164,12 @@ extern void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci,
                                            int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                                       struct ceph_snap_context *snapc);
+extern void __ceph_remove_capsnap(struct inode *inode,
+                                 struct ceph_cap_snap *capsnap,
+                                 bool *wake_ci, bool *wake_mdsc);
+extern void ceph_remove_capsnap(struct inode *inode,
+                               struct ceph_cap_snap *capsnap,
+                               bool *wake_ci, bool *wake_mdsc);
 extern void ceph_flush_snaps(struct ceph_inode_info *ci,
                             struct ceph_mds_session **psession);
 extern bool __ceph_should_report_size(struct ceph_inode_info *ci);
index 1242db8d3444acc523daf5cd7011aab22d8f28a7..159a1ffa4f4b887ad421828b6555e6e47f8e4b0c 100644 (file)
@@ -340,6 +340,18 @@ static ssize_t ceph_vxattrcb_caps(struct ceph_inode_info *ci, char *val,
                              ceph_cap_string(issued), issued);
 }
 
+static ssize_t ceph_vxattrcb_auth_mds(struct ceph_inode_info *ci,
+                                      char *val, size_t size)
+{
+       int ret;
+
+       spin_lock(&ci->i_ceph_lock);
+       ret = ceph_fmt_xattr(val, size, "%d",
+                            ci->i_auth_cap ? ci->i_auth_cap->session->s_mds : -1);
+       spin_unlock(&ci->i_ceph_lock);
+       return ret;
+}
+
 #define CEPH_XATTR_NAME(_type, _name)  XATTR_CEPH_PREFIX #_type "." #_name
 #define CEPH_XATTR_NAME2(_type, _name, _name2) \
        XATTR_CEPH_PREFIX #_type "." #_name "." #_name2
@@ -473,6 +485,13 @@ static struct ceph_vxattr ceph_common_vxattrs[] = {
                .exists_cb = NULL,
                .flags = VXATTR_FLAG_READONLY,
        },
+       {
+               .name = "ceph.auth_mds",
+               .name_size = sizeof("ceph.auth_mds"),
+               .getxattr_cb = ceph_vxattrcb_auth_mds,
+               .exists_cb = NULL,
+               .flags = VXATTR_FLAG_READONLY,
+       },
        { .name = NULL, 0 }     /* Required table terminator */
 };
 
index e41a811026f669bd937abd817891abd29b998898..bc2699feddbeb7d1b29000dd7b51c4888276495d 100644 (file)
@@ -299,6 +299,7 @@ enum {
        CEPH_SESSION_FLUSHMSG_ACK,
        CEPH_SESSION_FORCE_RO,
        CEPH_SESSION_REJECT,
+       CEPH_SESSION_REQUEST_FLUSH_MDLOG,
 };
 
 extern const char *ceph_session_op_name(int op);