if (lock->is_dirty() && !lock->is_flushed()) {
scatter_writebehind(static_cast<ScatterLock *>(lock));
- mds->mdlog->flush();
return;
}
lock->clear_flushed();
// [auth] twiddle mode?
eval(in, CEPH_CAP_LOCKS);
- if (_need_flush_mdlog(in, my_want))
+ int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;
+ int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed,
+ xlocker_allowed);
+
+ if (_need_flush_mdlog(in, my_want & ~allowed, true))
mds->mdlog->flush();
} else {
}
};
-int Locker::issue_caps(CInode *in, Capability *only_cap)
+int Locker::get_allowed_caps(CInode *in, Capability *cap,
+ int &all_allowed, int &loner_allowed,
+ int &xlocker_allowed)
{
+ client_t client = cap->get_client();
+
// allowed caps are determined by the lock mode.
- int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
- int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
- int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
+ if (all_allowed == -1)
+ all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
+ if (loner_allowed == -1)
+ loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
+ if (xlocker_allowed == -1)
+ xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
client_t loner = in->get_loner();
if (loner >= 0) {
- dout(7) << "issue_caps loner client." << loner
+ dout(7) << "get_allowed_caps loner client." << loner
<< " allowed=" << ccap_string(loner_allowed)
<< ", xlocker allowed=" << ccap_string(xlocker_allowed)
<< ", others allowed=" << ccap_string(all_allowed)
<< " on " << *in << dendl;
} else {
- dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
+ dout(7) << "get_allowed_caps allowed=" << ccap_string(all_allowed)
<< ", xlocker allowed=" << ccap_string(xlocker_allowed)
<< " on " << *in << dendl;
}
- ceph_assert(in->is_head());
+ // do not issue _new_ bits when size|mtime is projected
+ int allowed;
+ if (loner == client)
+ allowed = loner_allowed;
+ else
+ allowed = all_allowed;
+ // add in any xlocker-only caps (for locks this client is the xlocker for)
+ allowed |= xlocker_allowed & in->get_xlocker_mask(client);
+ if (in->is_dir()) {
+ allowed &= ~CEPH_CAP_ANY_DIR_OPS;
+ if (allowed & CEPH_CAP_FILE_EXCL)
+ allowed |= cap->get_lock_cache_allowed();
+ }
+
+ if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE &&
+ cap->is_noinline()) ||
+ (!in->get_inode()->layout.pool_ns.empty() &&
+ cap->is_nopoolns()))
+ allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
+
+ return allowed;
+}
+
+int Locker::issue_caps(CInode *in, Capability *only_cap)
+{
// count conflicts with
- int nissued = 0;
+ int nissued = 0;
+ int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;
+
+ ceph_assert(in->is_head());
// client caps
map<client_t, Capability>::iterator it;
it = in->client_caps.begin();
for (; it != in->client_caps.end(); ++it) {
Capability *cap = &it->second;
-
- // do not issue _new_ bits when size|mtime is projected
- int allowed;
- if (loner == it->first)
- allowed = loner_allowed;
- else
- allowed = all_allowed;
-
- // add in any xlocker-only caps (for locks this client is the xlocker for)
- allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
- if (in->is_dir()) {
- allowed &= ~CEPH_CAP_ANY_DIR_OPS;
- if (allowed & CEPH_CAP_FILE_EXCL)
- allowed |= cap->get_lock_cache_allowed();
- }
-
- if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE &&
- cap->is_noinline()) ||
- (!in->get_inode()->layout.pool_ns.empty() &&
- cap->is_nopoolns()))
- allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
-
+ int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed,
+ xlocker_allowed);
int pending = cap->pending();
int wanted = cap->wanted();
}
}
-bool Locker::_need_flush_mdlog(CInode *in, int wanted)
+bool Locker::_need_flush_mdlog(CInode *in, int wanted, bool lock_state_any)
{
/* flush log if caps are wanted by client but corresponding lock is unstable and locked by
* pending mutations. */
if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
- in->filelock.is_unstable_and_locked()) ||
+ (lock_state_any ? in->filelock.is_locked() : in->filelock.is_unstable_and_locked())) ||
((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
- in->authlock.is_unstable_and_locked()) ||
+ (lock_state_any ? in->authlock.is_locked() : in->authlock.is_unstable_and_locked())) ||
((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
- in->linklock.is_unstable_and_locked()) ||
+ (lock_state_any ? in->linklock.is_locked() : in->linklock.is_unstable_and_locked())) ||
((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
- in->xattrlock.is_unstable_and_locked()))
+ (lock_state_any ? in->xattrlock.is_locked() : in->xattrlock.is_unstable_and_locked())))
return true;
return false;
}
}
-void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
- utime_t now, bufferlist &bl)
+void Locker::issue_client_lease(CDentry *dn, CInode *in, MDRequestRef &mdr, utime_t now,
+ bufferlist &bl)
{
client_t client = mdr->get_client();
Session *session = mdr->session;
!diri->is_stray() && // do not issue dn leases in stray dir!
!diri->filelock.can_lease(client) &&
!(diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL))) {
+ int mask = 0;
+ CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
+ if (dnl->is_primary()) {
+ ceph_assert(dnl->get_inode() == in);
+ mask = CEPH_LEASE_PRIMARY_LINK;
+ } else {
+ if (dnl->is_remote())
+ ceph_assert(dnl->get_remote_ino() == in->ino());
+ else
+ ceph_assert(!in);
+ }
// issue a dentry lease
ClientLease *l = dn->add_client_lease(client, session);
session->touch_lease(l);
} else {
// null lease
LeaseStat lstat;
- lstat.mask = mask;
+ lstat.mask = 0;
lstat.alternate_name = std::string(dn->alternate_name);
encode_lease(bl, session->info, lstat);
dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
if (!gather && lock->is_dirty()) {
lock->get_parent()->auth_pin(lock);
scatter_writebehind(static_cast<ScatterLock*>(lock));
- mds->mdlog->flush();
return false;
}
if (!gather && lock->is_dirty()) {
lock->get_parent()->auth_pin(lock);
scatter_writebehind(static_cast<ScatterLock*>(lock));
- mds->mdlog->flush();
return;
}
in->finish_scatter_gather_update_accounted(lock->get_type(), &le->metablob);
mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
+ mds->mdlog->flush();
}
void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)