]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Locker.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / mds / Locker.cc
index bacc79a10381d63bdfc29cb1c0a357a18e4b05a1..a73c4225cbf27d58986db974401249b03058faee 100644 (file)
@@ -1201,7 +1201,6 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSCon
 
       if (lock->is_dirty() && !lock->is_flushed()) {
        scatter_writebehind(static_cast<ScatterLock *>(lock));
-       mds->mdlog->flush();
        return;
       }
       lock->clear_flushed();
@@ -2230,7 +2229,11 @@ Capability* Locker::issue_new_caps(CInode *in,
     // [auth] twiddle mode?
     eval(in, CEPH_CAP_LOCKS);
 
-    if (_need_flush_mdlog(in, my_want))
+    int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;
+    int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed,
+                                   xlocker_allowed);
+
+    if (_need_flush_mdlog(in, my_want & ~allowed, true))
       mds->mdlog->flush();
 
   } else {
@@ -2269,30 +2272,64 @@ public:
   }
 };
 
-int Locker::issue_caps(CInode *in, Capability *only_cap)
+int Locker::get_allowed_caps(CInode *in, Capability *cap,
+                             int &all_allowed, int &loner_allowed,
+                             int &xlocker_allowed)
 {
+  client_t client = cap->get_client();
+
   // allowed caps are determined by the lock mode.
-  int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
-  int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
-  int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
+  if (all_allowed == -1)
+    all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
+  if (loner_allowed == -1)
+    loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
+  if (xlocker_allowed == -1)
+    xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
 
   client_t loner = in->get_loner();
   if (loner >= 0) {
-    dout(7) << "issue_caps loner client." << loner
+    dout(7) << "get_allowed_caps loner client." << loner
            << " allowed=" << ccap_string(loner_allowed) 
            << ", xlocker allowed=" << ccap_string(xlocker_allowed)
            << ", others allowed=" << ccap_string(all_allowed)
            << " on " << *in << dendl;
   } else {
-    dout(7) << "issue_caps allowed=" << ccap_string(all_allowed) 
+    dout(7) << "get_allowed_caps allowed=" << ccap_string(all_allowed) 
            << ", xlocker allowed=" << ccap_string(xlocker_allowed)
            << " on " << *in << dendl;
   }
 
-  ceph_assert(in->is_head());
+  // do not issue _new_ bits when size|mtime is projected
+  int allowed;
+  if (loner == client)
+    allowed = loner_allowed;
+  else
+    allowed = all_allowed;
 
+  // add in any xlocker-only caps (for locks this client is the xlocker for)
+  allowed |= xlocker_allowed & in->get_xlocker_mask(client);
+  if (in->is_dir()) {
+    allowed &= ~CEPH_CAP_ANY_DIR_OPS;
+    if (allowed & CEPH_CAP_FILE_EXCL)
+      allowed |= cap->get_lock_cache_allowed();
+  }
+
+  if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE &&
+       cap->is_noinline()) ||
+      (!in->get_inode()->layout.pool_ns.empty() &&
+       cap->is_nopoolns()))
+    allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
+
+  return allowed;
+}
+
+int Locker::issue_caps(CInode *in, Capability *only_cap)
+{
   // count conflicts with
-  int nissued = 0;        
+  int nissued = 0;
+  int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;
+
+  ceph_assert(in->is_head());
 
   // client caps
   map<client_t, Capability>::iterator it;
@@ -2302,28 +2339,8 @@ int Locker::issue_caps(CInode *in, Capability *only_cap)
     it = in->client_caps.begin();
   for (; it != in->client_caps.end(); ++it) {
     Capability *cap = &it->second;
-
-    // do not issue _new_ bits when size|mtime is projected
-    int allowed;
-    if (loner == it->first)
-      allowed = loner_allowed;
-    else
-      allowed = all_allowed;
-
-    // add in any xlocker-only caps (for locks this client is the xlocker for)
-    allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
-    if (in->is_dir()) {
-      allowed &= ~CEPH_CAP_ANY_DIR_OPS;
-      if (allowed & CEPH_CAP_FILE_EXCL)
-       allowed |= cap->get_lock_cache_allowed();
-    }
-
-    if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE &&
-        cap->is_noinline()) ||
-       (!in->get_inode()->layout.pool_ns.empty() &&
-        cap->is_nopoolns()))
-      allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
-
+    int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed,
+                                   xlocker_allowed);
     int pending = cap->pending();
     int wanted = cap->wanted();
 
@@ -2934,18 +2951,18 @@ void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
   }
 }
 
-bool Locker::_need_flush_mdlog(CInode *in, int wanted)
+bool Locker::_need_flush_mdlog(CInode *in, int wanted, bool lock_state_any)
 {
   /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
    * pending mutations. */
   if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
-       in->filelock.is_unstable_and_locked()) ||
+       (lock_state_any ? in->filelock.is_locked() : in->filelock.is_unstable_and_locked())) ||
       ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
-       in->authlock.is_unstable_and_locked()) ||
+       (lock_state_any ? in->authlock.is_locked() : in->authlock.is_unstable_and_locked())) ||
       ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
-       in->linklock.is_unstable_and_locked()) ||
+       (lock_state_any ? in->linklock.is_locked() : in->linklock.is_unstable_and_locked())) ||
       ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
-       in->xattrlock.is_unstable_and_locked()))
+       (lock_state_any ? in->xattrlock.is_locked() : in->xattrlock.is_unstable_and_locked())))
     return true;
   return false;
 }
@@ -4231,8 +4248,8 @@ void Locker::handle_client_lease(const cref_t<MClientLease> &m)
 }
 
 
-void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
-                               utime_t now, bufferlist &bl)
+void Locker::issue_client_lease(CDentry *dn, CInode *in, MDRequestRef &mdr, utime_t now,
+                                bufferlist &bl)
 {
   client_t client = mdr->get_client();
   Session *session = mdr->session;
@@ -4243,6 +4260,17 @@ void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
       !diri->is_stray() &&  // do not issue dn leases in stray dir!
       !diri->filelock.can_lease(client) &&
       !(diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL))) {
+    int mask = 0;
+    CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
+    if (dnl->is_primary()) {
+      ceph_assert(dnl->get_inode() == in);
+      mask = CEPH_LEASE_PRIMARY_LINK;
+    } else {
+      if (dnl->is_remote())
+        ceph_assert(dnl->get_remote_ino() == in->ino());
+      else
+        ceph_assert(!in);
+    }
     // issue a dentry lease
     ClientLease *l = dn->add_client_lease(client, session);
     session->touch_lease(l);
@@ -4262,7 +4290,7 @@ void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
   } else {
     // null lease
     LeaseStat lstat;
-    lstat.mask = mask;
+    lstat.mask = 0;
     lstat.alternate_name = std::string(dn->alternate_name);
     encode_lease(bl, session->info, lstat);
     dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
@@ -4669,7 +4697,6 @@ bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
     if (!gather && lock->is_dirty()) {
       lock->get_parent()->auth_pin(lock);
       scatter_writebehind(static_cast<ScatterLock*>(lock));
-      mds->mdlog->flush();
       return false;
     }
 
@@ -4828,7 +4855,6 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
   if (!gather && lock->is_dirty()) {
     lock->get_parent()->auth_pin(lock);
     scatter_writebehind(static_cast<ScatterLock*>(lock));
-    mds->mdlog->flush();
     return;
   }
 
@@ -4970,6 +4996,7 @@ void Locker::scatter_writebehind(ScatterLock *lock)
   in->finish_scatter_gather_update_accounted(lock->get_type(), &le->metablob);
 
   mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
+  mds->mdlog->flush();
 }
 
 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)