]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/commitdiff
ceph: send cap releases more aggressively
authorYan, Zheng <zyan@redhat.com>
Mon, 14 Jan 2019 09:21:19 +0000 (17:21 +0800)
committerKleber Sacilotto de Souza <kleber.souza@canonical.com>
Tue, 2 Jul 2019 16:30:46 +0000 (18:30 +0200)
BugLink: https://bugs.launchpad.net/bugs/1834235
When pending cap releases fill up one message, start a work to send
cap release message. (old way is sending cap releases every 5 seconds)

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Reviewed-by: Jeff Layton <jlayton@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
(cherry picked from commit e3ec8d6898f71636a067dae683174ef9bf81bc96)
Signed-off-by: Connor Kuehl <connor.kuehl@canonical.com>
Acked-by: Kleber Sacilotto de Souza <kleber.souza@canonical.com>
Acked-by: Stefan Bader <stefan.bader@canonical.com>
Signed-off-by: Kleber Sacilotto de Souza <kleber.souza@canonical.com>
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.c
fs/ceph/super.h

index 84d6ac8b373124c0c5480ae992bac9c0ef6816b9..7ac63ed48943d1626f232ffbf5737f9b827c8fa1 100644 (file)
@@ -958,9 +958,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
            (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
                cap->queue_release = 1;
                if (removed) {
-                       list_add_tail(&cap->session_caps,
-                                     &session->s_cap_releases);
-                       session->s_num_cap_releases++;
+                       __ceph_queue_cap_release(session, cap);
                        removed = 0;
                }
        } else {
@@ -1122,7 +1120,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
  * Queue cap releases when an inode is dropped from our cache.  Since
  * inode is about to be destroyed, there is no need for i_ceph_lock.
  */
-void ceph_queue_caps_release(struct inode *inode)
+void __ceph_remove_caps(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct rb_node *p;
@@ -3722,12 +3720,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                        cap->seq = seq;
                        cap->issue_seq = seq;
                        spin_lock(&session->s_cap_lock);
-                       list_add_tail(&cap->session_caps,
-                                       &session->s_cap_releases);
-                       session->s_num_cap_releases++;
+                       __ceph_queue_cap_release(session, cap);
                        spin_unlock(&session->s_cap_lock);
                }
-               goto flush_cap_releases;
+               goto done;
        }
 
        /* these will work even if we don't have a cap yet */
@@ -3797,7 +3793,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                       ceph_cap_op_name(op));
        }
 
-       goto done;
+done:
+       mutex_unlock(&session->s_mutex);
+done_unlocked:
+       iput(inode);
+       ceph_put_string(extra_info.pool_ns);
+       return;
 
 flush_cap_releases:
        /*
@@ -3805,14 +3806,8 @@ flush_cap_releases:
         * along for the mds (who clearly thinks we still have this
         * cap).
         */
-       ceph_send_cap_releases(mdsc, session);
-
-done:
-       mutex_unlock(&session->s_mutex);
-done_unlocked:
-       iput(inode);
-       ceph_put_string(extra_info.pool_ns);
-       return;
+       ceph_flush_cap_releases(mdsc, session);
+       goto done;
 
 bad:
        pr_err("ceph_handle_caps: corrupt message\n");
index 705e316200bebd42ba137bc046c58ba15bb675c1..9e1bb79cc5c052b65280af65056a4ab201cf8457 100644 (file)
@@ -534,7 +534,7 @@ void ceph_destroy_inode(struct inode *inode)
 
        ceph_fscache_unregister_inode_cookie(ci);
 
-       ceph_queue_caps_release(inode);
+       __ceph_remove_caps(inode);
 
        /*
         * we may still have a snap_realm reference if there are stray
index 1b468250e94752e6eedf63283cba95a236fb0380..6deaf018d16829c5561c04c04bd48c24698fe0b4 100644 (file)
@@ -53,6 +53,7 @@ struct ceph_reconnect_state {
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
                            struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -474,6 +475,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_cap_reconnect = 0;
        s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
+       INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
        INIT_LIST_HEAD(&s->s_cap_flushing);
 
        dout("register_session mds%d\n", mds);
@@ -516,6 +519,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
        dout("__unregister_session mds%d %p\n", s->s_mds, s);
        BUG_ON(mdsc->sessions[s->s_mds] != s);
        mdsc->sessions[s->s_mds] = NULL;
+       s->s_state = 0;
        ceph_con_close(&s->s_con);
        ceph_put_mds_session(s);
        atomic_dec(&mdsc->num_sessions);
@@ -1138,13 +1142,10 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                        cap->session = NULL;
                        list_del_init(&cap->session_caps);
                        session->s_nr_caps--;
-                       if (cap->queue_release) {
-                               list_add_tail(&cap->session_caps,
-                                             &session->s_cap_releases);
-                               session->s_num_cap_releases++;
-                       } else {
+                       if (cap->queue_release)
+                               __ceph_queue_cap_release(session, cap);
+                       else
                                old_cap = cap;  /* put_cap it w/o locks held */
-                       }
                }
                if (ret < 0)
                        goto out;
@@ -1562,7 +1563,7 @@ static int trim_caps(struct ceph_mds_client *mdsc,
                session->s_trim_caps = 0;
        }
 
-       ceph_send_cap_releases(mdsc, session);
+       ceph_flush_cap_releases(mdsc, session);
        return 0;
 }
 
@@ -1605,8 +1606,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
 /*
  * called under s_mutex
  */
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-                           struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+                                  struct ceph_mds_session *session)
 {
        struct ceph_msg *msg = NULL;
        struct ceph_mds_cap_release *head;
@@ -1698,6 +1699,48 @@ out_err:
        spin_unlock(&session->s_cap_lock);
 }
 
+static void ceph_cap_release_work(struct work_struct *work)
+{
+       struct ceph_mds_session *session =
+               container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+       mutex_lock(&session->s_mutex);
+       if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+           session->s_state == CEPH_MDS_SESSION_HUNG)
+               ceph_send_cap_releases(session->s_mdsc, session);
+       mutex_unlock(&session->s_mutex);
+       ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+                            struct ceph_mds_session *session)
+{
+       if (mdsc->stopping)
+               return;
+
+       get_session(session);
+       if (queue_work(mdsc->fsc->cap_wq,
+                      &session->s_cap_release_work)) {
+               dout("cap release work queued\n");
+       } else {
+               ceph_put_mds_session(session);
+               dout("failed to queue cap release work\n");
+       }
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+                             struct ceph_cap *cap)
+{
+       list_add_tail(&cap->session_caps, &session->s_cap_releases);
+       session->s_num_cap_releases++;
+
+       if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+               ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
 /*
  * requests
  */
index 837ac4b087a0babb0f202ae66e79688d8f57822c..e741a1ddd59cf5a28688fb07ae940340f715a3cb 100644 (file)
@@ -151,12 +151,13 @@ struct ceph_mds_session {
        /* protected by s_cap_lock */
        spinlock_t        s_cap_lock;
        struct list_head  s_caps;     /* all caps issued by this session */
+       struct ceph_cap  *s_cap_iterator;
        int               s_nr_caps, s_trim_caps;
        int               s_num_cap_releases;
        int               s_cap_reconnect;
        int               s_readonly;
        struct list_head  s_cap_releases; /* waiting cap_release messages */
-       struct ceph_cap  *s_cap_iterator;
+       struct work_struct s_cap_release_work;
 
        /* protected by mutex */
        struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
@@ -420,9 +421,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
        kref_put(&req->r_kref, ceph_mdsc_release_request);
 }
 
-extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-                                  struct ceph_mds_session *session);
-
+extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
+                                   struct ceph_cap *cap);
+extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+                                   struct ceph_mds_session *session);
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
index d250b54c30f718ed0689cbf69043e85f91f960fd..4badd729e5b96d5b3c4c93c575bca3e869c575c2 100644 (file)
@@ -629,6 +629,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
        fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
        if (!fsc->trunc_wq)
                goto fail_pg_inv_wq;
+       fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
+       if (!fsc->cap_wq)
+               goto fail_trunc_wq;
 
        /* set up mempools */
        err = -ENOMEM;
@@ -636,13 +639,15 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
        size = sizeof (struct page *) * (page_count ? page_count : 1);
        fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
        if (!fsc->wb_pagevec_pool)
-               goto fail_trunc_wq;
+               goto fail_cap_wq;
 
        /* caps */
        fsc->min_caps = fsopt->max_readdir;
 
        return fsc;
 
+fail_cap_wq:
+       destroy_workqueue(fsc->cap_wq);
 fail_trunc_wq:
        destroy_workqueue(fsc->trunc_wq);
 fail_pg_inv_wq:
@@ -661,6 +666,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
        flush_workqueue(fsc->wb_wq);
        flush_workqueue(fsc->pg_inv_wq);
        flush_workqueue(fsc->trunc_wq);
+       flush_workqueue(fsc->cap_wq);
 }
 
 static void destroy_fs_client(struct ceph_fs_client *fsc)
@@ -670,6 +676,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
        destroy_workqueue(fsc->wb_wq);
        destroy_workqueue(fsc->pg_inv_wq);
        destroy_workqueue(fsc->trunc_wq);
+       destroy_workqueue(fsc->cap_wq);
 
        mempool_destroy(fsc->wb_pagevec_pool);
 
index 601100da738f865c90f37dda9f628c01c2e106a7..7ee986782f0f2d85f6a88095603b001c2b035abc 100644 (file)
@@ -102,10 +102,12 @@ struct ceph_fs_client {
 
        /* writeback */
        mempool_t *wb_pagevec_pool;
+       atomic_long_t writeback_count;
+
        struct workqueue_struct *wb_wq;
        struct workqueue_struct *pg_inv_wq;
        struct workqueue_struct *trunc_wq;
-       atomic_long_t writeback_count;
+       struct workqueue_struct *cap_wq;
 
 #ifdef CONFIG_DEBUG_FS
        struct dentry *debugfs_dentry_lru, *debugfs_caps;
@@ -961,11 +963,11 @@ extern void ceph_add_cap(struct inode *inode,
                         unsigned cap, unsigned seq, u64 realmino, int flags,
                         struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
+extern void __ceph_remove_caps(struct inode* inode);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
                         struct ceph_cap *cap);
 extern int ceph_is_any_caps(struct inode *inode);
 
-extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
                      int datasync);