*
*/
+#include <string_view>
#include "MDSRank.h"
#include "MDCache.h"
#include "Locker.h"
+#include "MDBalancer.h"
+#include "Migrator.h"
#include "CInode.h"
#include "CDir.h"
#include "CDentry.h"
}
-class LockerContext : public MDSInternalContextBase {
+class LockerContext : public MDSContext {
protected:
Locker *locker;
MDSRank *get_mds() override
public:
explicit LockerContext(Locker *locker_) : locker(locker_) {
- assert(locker != NULL);
+ ceph_assert(locker != NULL);
}
};
public:
explicit LockerLogContext(Locker *locker_) : locker(locker_) {
- assert(locker != NULL);
+ ceph_assert(locker != NULL);
}
};
-/* This function DOES put the passed message before returning */
-void Locker::dispatch(Message *m)
+Locker::Locker(MDSRank *m, MDCache *c) :
+ mds(m), mdcache(c), need_snapflush_inodes(member_offset(CInode, item_caps)) {}
+
+
+void Locker::dispatch(const Message::const_ref &m)
{
switch (m->get_type()) {
-
// inter-mds locking
case MSG_MDS_LOCK:
- handle_lock(static_cast<MLock*>(m));
+ handle_lock(MLock::msgref_cast(m));
break;
// inter-mds caps
case MSG_MDS_INODEFILECAPS:
- handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
+ handle_inode_file_caps(MInodeFileCaps::msgref_cast(m));
break;
-
// client sync
case CEPH_MSG_CLIENT_CAPS:
- handle_client_caps(static_cast<MClientCaps*>(m));
-
+ handle_client_caps(MClientCaps::msgref_cast(m));
break;
case CEPH_MSG_CLIENT_CAPRELEASE:
- handle_client_cap_release(static_cast<MClientCapRelease*>(m));
+ handle_client_cap_release(MClientCapRelease::msgref_cast(m));
break;
case CEPH_MSG_CLIENT_LEASE:
- handle_client_lease(static_cast<MClientLease*>(m));
+ handle_client_lease(MClientLease::msgref_cast(m));
break;
-
default:
derr << "locker unknown message " << m->get_type() << dendl;
- assert(0 == "locker unknown message");
+ ceph_abort_msg("locker unknown message");
}
}
void Locker::send_lock_message(SimpleLock *lock, int msg)
{
- for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
- it != lock->get_parent()->replicas_end();
- ++it) {
+ for (const auto &it : lock->get_parent()->get_replicas()) {
if (mds->is_cluster_degraded() &&
- mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
+ mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
continue;
- MLock *m = new MLock(lock, msg, mds->get_nodeid());
- mds->send_message_mds(m, it->first);
+ auto m = MLock::create(lock, msg, mds->get_nodeid());
+ mds->send_message_mds(m, it.first);
}
}
void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
{
- for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
- it != lock->get_parent()->replicas_end();
- ++it) {
+ for (const auto &it : lock->get_parent()->get_replicas()) {
if (mds->is_cluster_degraded() &&
- mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
+ mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
continue;
- MLock *m = new MLock(lock, msg, mds->get_nodeid());
+ auto m = MLock::create(lock, msg, mds->get_nodeid());
m->set_data(data);
- mds->send_message_mds(m, it->first);
+ mds->send_message_mds(m, it.first);
}
}
-void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
+void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
{
// rdlock ancestor snaps
CInode *t = in;
- rdlocks.insert(&in->snaplock);
while (t->get_projected_parent_dn()) {
t = t->get_projected_parent_dn()->get_dir()->get_inode();
- rdlocks.insert(&t->snaplock);
+ lov.add_rdlock(&t->snaplock);
}
+ lov.add_rdlock(&in->snaplock);
}
-void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
+void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
file_layout_t **layout)
{
//rdlock ancestor snaps
CInode *t = in;
- rdlocks.insert(&in->snaplock);
- rdlocks.insert(&in->policylock);
+ lov.add_rdlock(&in->snaplock);
+ lov.add_rdlock(&in->policylock);
bool found_layout = false;
while (t) {
- rdlocks.insert(&t->snaplock);
+ lov.add_rdlock(&t->snaplock);
if (!found_layout) {
- rdlocks.insert(&t->policylock);
+ lov.add_rdlock(&t->policylock);
if (t->get_projected_inode()->has_layout()) {
*layout = &t->get_projected_inode()->layout;
found_layout = true;
struct MarkEventOnDestruct {
MDRequestRef& mdr;
- const char* message;
+ std::string_view message;
bool mark_event;
- MarkEventOnDestruct(MDRequestRef& _mdr,
- const char *_message) : mdr(_mdr),
- message(_message),
- mark_event(true) {}
+ MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
+ mdr(_mdr),
+ message(_message),
+ mark_event(true) {}
~MarkEventOnDestruct() {
if (mark_event)
mdr->mark_event(message);
/* If this function returns false, the mdr has been placed
* on the appropriate wait list */
bool Locker::acquire_locks(MDRequestRef& mdr,
- set<SimpleLock*> &rdlocks,
- set<SimpleLock*> &wrlocks,
- set<SimpleLock*> &xlocks,
- map<SimpleLock*,mds_rank_t> *remote_wrlocks,
+ MutationImpl::LockOpVec& lov,
CInode *auth_pin_freeze,
bool auth_pin_nonblock)
{
client_t client = mdr->get_client();
- set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
- set<MDSCacheObject*> mustpin; // items to authpin
+ set<MDSCacheObject*> mustpin; // items to authpin
// xlocks
- for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
- dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
- sorted.insert(*p);
- mustpin.insert((*p)->get_parent());
-
- // augment xlock with a versionlock?
- if ((*p)->get_type() == CEPH_LOCK_DN) {
- CDentry *dn = (CDentry*)(*p)->get_parent();
- if (!dn->is_auth())
- continue;
-
- if (xlocks.count(&dn->versionlock))
- continue; // we're xlocking the versionlock too; don't wrlock it!
-
- if (mdr->is_master()) {
- // master. wrlock versionlock so we can pipeline dentry updates to journal.
- wrlocks.insert(&dn->versionlock);
- } else {
- // slave. exclusively lock the dentry version (i.e. block other journal updates).
- // this makes rollback safe.
- xlocks.insert(&dn->versionlock);
- sorted.insert(&dn->versionlock);
- }
- }
- if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
- // inode version lock?
- CInode *in = (CInode*)(*p)->get_parent();
- if (!in->is_auth())
- continue;
- if (mdr->is_master()) {
- // master. wrlock versionlock so we can pipeline inode updates to journal.
- wrlocks.insert(&in->versionlock);
- } else {
- // slave. exclusively lock the inode version (i.e. block other journal updates).
- // this makes rollback safe.
- xlocks.insert(&in->versionlock);
- sorted.insert(&in->versionlock);
+ for (int i = 0, size = lov.size(); i < size; ++i) {
+ auto& p = lov[i];
+ SimpleLock *lock = p.lock;
+ MDSCacheObject *object = lock->get_parent();
+
+ if (p.is_xlock()) {
+ if ((lock->get_type() == CEPH_LOCK_ISNAP ||
+ lock->get_type() == CEPH_LOCK_IPOLICY) &&
+ mds->is_cluster_degraded() &&
+ mdr->is_master() &&
+ !mdr->is_queued_for_replay()) {
+ // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
+ // get processed in proper order.
+ bool wait = false;
+ if (object->is_auth()) {
+ if (!mdr->locks.count(lock)) {
+ set<mds_rank_t> ls;
+ object->list_replicas(ls);
+ for (auto m : ls) {
+ if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
+ wait = true;
+ break;
+ }
+ }
+ }
+ } else {
+ // if the lock is the latest locked one, it's possible that slave mds got the lock
+ // while there are recovering mds.
+ if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
+ wait = true;
+ }
+ if (wait) {
+ dout(10) << " must xlock " << *lock << " " << *object
+ << ", waiting for cluster recovered" << dendl;
+ mds->locker->drop_locks(mdr.get(), NULL);
+ mdr->drop_local_auth_pins();
+ mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+ return false;
+ }
}
- }
- }
- // wrlocks
- for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
- MDSCacheObject *object = (*p)->get_parent();
- dout(20) << " must wrlock " << **p << " " << *object << dendl;
- sorted.insert(*p);
- if (object->is_auth())
- mustpin.insert(object);
- else if (!object->is_auth() &&
- !(*p)->can_wrlock(client) && // we might have to request a scatter
- !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
- dout(15) << " will also auth_pin " << *object
- << " in case we need to request a scatter" << dendl;
- mustpin.insert(object);
- }
- }
+ dout(20) << " must xlock " << *lock << " " << *object << dendl;
- // remote_wrlocks
- if (remote_wrlocks) {
- for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
- MDSCacheObject *object = p->first->get_parent();
- dout(20) << " must remote_wrlock on mds." << p->second << " "
- << *p->first << " " << *object << dendl;
- sorted.insert(p->first);
mustpin.insert(object);
- }
- }
- // rdlocks
- for (set<SimpleLock*>::iterator p = rdlocks.begin();
- p != rdlocks.end();
- ++p) {
- MDSCacheObject *object = (*p)->get_parent();
- dout(20) << " must rdlock " << **p << " " << *object << dendl;
- sorted.insert(*p);
- if (object->is_auth())
- mustpin.insert(object);
- else if (!object->is_auth() &&
- !(*p)->can_rdlock(client)) { // we might have to request an rdlock
- dout(15) << " will also auth_pin " << *object
- << " in case we need to request a rdlock" << dendl;
+ // augment xlock with a versionlock?
+ if (lock->get_type() == CEPH_LOCK_DN) {
+ CDentry *dn = static_cast<CDentry*>(object);
+ if (!dn->is_auth())
+ continue;
+ if (mdr->is_master()) {
+ // master. wrlock versionlock so we can pipeline dentry updates to journal.
+ lov.add_wrlock(&dn->versionlock);
+ } else {
+ // slave. exclusively lock the dentry version (i.e. block other journal updates).
+ // this makes rollback safe.
+ lov.add_xlock(&dn->versionlock);
+ }
+ }
+ if (lock->get_type() > CEPH_LOCK_IVERSION) {
+ // inode version lock?
+ CInode *in = static_cast<CInode*>(object);
+ if (!in->is_auth())
+ continue;
+ if (mdr->is_master()) {
+ // master. wrlock versionlock so we can pipeline inode updates to journal.
+ lov.add_wrlock(&in->versionlock);
+ } else {
+ // slave. exclusively lock the inode version (i.e. block other journal updates).
+ // this makes rollback safe.
+ lov.add_xlock(&in->versionlock);
+ }
+ }
+ } else if (p.is_wrlock()) {
+ dout(20) << " must wrlock " << *lock << " " << *object << dendl;
+ if (object->is_auth()) {
+ mustpin.insert(object);
+ } else if (!object->is_auth() &&
+ !lock->can_wrlock(client) && // we might have to request a scatter
+ !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
+ dout(15) << " will also auth_pin " << *object
+ << " in case we need to request a scatter" << dendl;
+ mustpin.insert(object);
+ }
+ } else if (p.is_remote_wrlock()) {
+ dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
+ << *lock << " " << *object << dendl;
mustpin.insert(object);
+ } else if (p.is_rdlock()) {
+
+ dout(20) << " must rdlock " << *lock << " " << *object << dendl;
+ if (object->is_auth()) {
+ mustpin.insert(object);
+ } else if (!object->is_auth() &&
+ !lock->can_rdlock(client)) { // we might have to request an rdlock
+ dout(15) << " will also auth_pin " << *object
+ << " in case we need to request a rdlock" << dendl;
+ mustpin.insert(object);
+ }
+ } else {
+ ceph_assert(0 == "locker unknown lock operation");
}
}
+ lov.sort_and_merge();
// AUTH PINS
map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
// can i auth pin them all now?
marker.message = "failed to authpin local pins";
- for (set<MDSCacheObject*>::iterator p = mustpin.begin();
- p != mustpin.end();
- ++p) {
- MDSCacheObject *object = *p;
+ for (const auto &p : mustpin) {
+ MDSCacheObject *object = p;
dout(10) << " must authpin " << *object << dendl;
drop_locks(mdr.get());
if (object->is_ambiguous_auth()) {
// wait
+ marker.message = "waiting for single auth, object is being migrated";
dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
mdr->drop_local_auth_pins();
mustpin_remote[object->authority().first].insert(object);
continue;
}
- if (!object->can_auth_pin()) {
+ int err = 0;
+ if (!object->can_auth_pin(&err)) {
// wait
drop_locks(mdr.get());
mdr->drop_local_auth_pins();
mdr->aborted = true;
return false;
}
+ if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
+ marker.message = "failed to authpin, subtree is being exported";
+ } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
+ marker.message = "failed to authpin, dir is being fragmented";
+ } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
+ marker.message = "failed to authpin, inode is being exported";
+ }
dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
+
+ if (!mdr->remote_auth_pins.empty())
+ notify_freeze_waiter(object);
+
return false;
}
}
// ok, grab local auth pins
- for (set<MDSCacheObject*>::iterator p = mustpin.begin();
- p != mustpin.end();
- ++p) {
- MDSCacheObject *object = *p;
+ for (const auto& p : mustpin) {
+ MDSCacheObject *object = p;
if (mdr->is_auth_pinned(object)) {
dout(10) << " already auth_pinned " << *object << dendl;
} else if (object->is_auth()) {
// request remote auth_pins
if (!mustpin_remote.empty()) {
marker.message = "requesting remote authpins";
- for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
- p != mdr->remote_auth_pins.end();
- ++p) {
- if (mustpin.count(p->first)) {
- assert(p->second == p->first->authority().first);
- map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
+ for (const auto& p : mdr->remote_auth_pins) {
+ if (mustpin.count(p.first)) {
+ ceph_assert(p.second == p.first->authority().first);
+ map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p.second);
if (q != mustpin_remote.end())
- q->second.insert(p->first);
+ q->second.insert(p.first);
}
}
for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
return false;
}
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
- MMDSSlaveRequest::OP_AUTHPIN);
+ auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN);
for (set<MDSCacheObject*>::iterator q = p->second.begin();
q != p->second.end();
++q) {
mds->send_message_mds(req, p->first);
// put in waiting list
- assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
+ ceph_assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
mdr->more()->waiting_on_slave.insert(p->first);
}
return false;
// acquire locks.
// make sure they match currently acquired locks.
- set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
- for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
- p != sorted.end();
- ++p) {
- bool need_wrlock = !!wrlocks.count(*p);
- bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
+ auto existing = mdr->locks.begin();
+ for (const auto& p : lov) {
+ bool need_wrlock = p.is_wrlock();
+ bool need_remote_wrlock = p.is_remote_wrlock();
// already locked?
- if (existing != mdr->locks.end() && *existing == *p) {
+ if (existing != mdr->locks.end() && existing->lock == p.lock) {
// right kind?
- SimpleLock *have = *existing;
- ++existing;
- if (xlocks.count(have) && mdr->xlocks.count(have)) {
- dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
+ auto it = existing++;
+ auto have = *it; // don't reference
+
+ if (have.is_xlock() && p.is_xlock()) {
+ dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
continue;
}
- if (mdr->remote_wrlocks.count(have)) {
- if (!need_remote_wrlock ||
- mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
- dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
- << " " << *have << " " << *have->get_parent() << dendl;
- remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
- }
+
+ if (have.is_remote_wrlock() &&
+ (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
+ dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
+ << " " << *have.lock << " " << *have.lock->get_parent() << dendl;
+ remote_wrlock_finish(it, mdr.get());
+ have.clear_remote_wrlock();
}
+
if (need_wrlock || need_remote_wrlock) {
- if (need_wrlock == !!mdr->wrlocks.count(have) &&
- need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
+ if (need_wrlock == have.is_wrlock() &&
+ need_remote_wrlock == have.is_remote_wrlock()) {
if (need_wrlock)
- dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
+ dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
if (need_remote_wrlock)
- dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
+ dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
continue;
}
- }
- if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
- dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
+
+ if (have.is_wrlock()) {
+ if (!need_wrlock)
+ dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl;
+ else if (need_remote_wrlock) // acquire remote_wrlock first
+ dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl;
+ bool need_issue = false;
+ wrlock_finish(it, mdr.get(), &need_issue);
+ if (need_issue)
+ issue_set.insert(static_cast<CInode*>(have.lock->get_parent()));
+ }
+ } else if (have.is_rdlock() && p.is_rdlock()) {
+ dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
continue;
}
}
// hose any stray locks
- if (existing != mdr->locks.end() && *existing == *p) {
- assert(need_wrlock || need_remote_wrlock);
- SimpleLock *lock = *existing;
- if (mdr->wrlocks.count(lock)) {
- if (!need_wrlock)
- dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
- else if (need_remote_wrlock) // acquire remote_wrlock first
- dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
- bool need_issue = false;
- wrlock_finish(lock, mdr.get(), &need_issue);
- if (need_issue)
- issue_set.insert(static_cast<CInode*>(lock->get_parent()));
- }
- ++existing;
- }
while (existing != mdr->locks.end()) {
- SimpleLock *stray = *existing;
- ++existing;
- dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
+ auto it = existing++;
+ auto stray = *it; // don't reference
+ dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
bool need_issue = false;
- if (mdr->xlocks.count(stray)) {
- xlock_finish(stray, mdr.get(), &need_issue);
- } else if (mdr->rdlocks.count(stray)) {
- rdlock_finish(stray, mdr.get(), &need_issue);
+ if (stray.is_xlock()) {
+ xlock_finish(it, mdr.get(), &need_issue);
+ } else if (stray.is_rdlock()) {
+ rdlock_finish(it, mdr.get(), &need_issue);
} else {
// may have acquired both wrlock and remore wrlock
- if (mdr->wrlocks.count(stray))
- wrlock_finish(stray, mdr.get(), &need_issue);
- if (mdr->remote_wrlocks.count(stray))
- remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+ if (stray.is_wrlock())
+ wrlock_finish(it, mdr.get(), &need_issue);
+ if (stray.is_remote_wrlock())
+ remote_wrlock_finish(it, mdr.get());
}
if (need_issue)
- issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+ issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
}
// lock
- if (mdr->locking && *p != mdr->locking) {
+ if (mdr->locking && p.lock != mdr->locking) {
cancel_locking(mdr.get(), &issue_set);
}
- if (xlocks.count(*p)) {
+ if (p.is_xlock()) {
marker.message = "failed to xlock, waiting";
- if (!xlock_start(*p, mdr))
+ if (!xlock_start(p.lock, mdr))
goto out;
- dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
+ dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
} else if (need_wrlock || need_remote_wrlock) {
- if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
+ if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) {
marker.message = "waiting for remote wrlocks";
- remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+ remote_wrlock_start(p, p.wrlock_target, mdr);
goto out;
}
- if (need_wrlock && !mdr->wrlocks.count(*p)) {
+ if (need_wrlock) {
marker.message = "failed to wrlock, waiting";
- if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
+ if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) {
marker.message = "failed to wrlock, dropping remote wrlock and waiting";
// can't take the wrlock because the scatter lock is gathering. need to
// release the remote wrlock, so that the gathering process can finish.
- remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
- remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+ auto it = mdr->locks.end();
+ ++it;
+ remote_wrlock_finish(it, mdr.get());
+ remote_wrlock_start(p, p.wrlock_target, mdr);
goto out;
}
// nowait if we have already gotten remote wrlock
- if (!wrlock_start(*p, mdr, need_remote_wrlock))
+ if (!wrlock_start(p, mdr, need_remote_wrlock))
goto out;
- dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
+ dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
}
} else {
- assert(mdr->is_master());
- if ((*p)->is_scatterlock()) {
- ScatterLock *slock = static_cast<ScatterLock *>(*p);
- if (slock->is_rejoin_mix()) {
- // If there is a recovering mds who replcated an object when it failed
- // and scatterlock in the object was in MIX state, It's possible that
- // the recovering mds needs to take wrlock on the scatterlock when it
- // replays unsafe requests. So this mds should delay taking rdlock on
- // the scatterlock until the recovering mds finishes replaying unsafe.
- // Otherwise unsafe requests may get replayed after current request.
- //
- // For example:
- // The recovering mds is auth mds of a dirfrag, this mds is auth mds
- // of correspinding inode. when 'rm -rf' the direcotry, this mds should
- // delay the rmdir request until the recovering mds has replayed unlink
- // requests.
- if (mds->is_cluster_degraded()) {
- if (!mdr->is_replay()) {
- drop_locks(mdr.get());
- mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
- dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
- << ", waiting for cluster recovered" << dendl;
- marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
- return false;
- }
- } else {
- slock->clear_rejoin_mix();
+ ceph_assert(mdr->is_master());
+ if (p.lock->needs_recover()) {
+ if (mds->is_cluster_degraded()) {
+ if (!mdr->is_queued_for_replay()) {
+ // see comments in SimpleLock::set_state_rejoin() and
+ // ScatterLock::encode_state_for_rejoin()
+ drop_locks(mdr.get());
+ mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+ dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent()
+ << ", waiting for cluster recovered" << dendl;
+ marker.message = "rejoin recovering lock, waiting for cluster recovered";
+ return false;
}
+ } else {
+ p.lock->clear_need_recover();
}
}
marker.message = "failed to rdlock, waiting";
- if (!rdlock_start(*p, mdr))
+ if (!rdlock_start(p, mdr))
goto out;
- dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
+ dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
}
}
// any extra unneeded locks?
while (existing != mdr->locks.end()) {
- SimpleLock *stray = *existing;
- ++existing;
- dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
+ auto it = existing++;
+ auto stray = *it;
+ dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
bool need_issue = false;
- if (mdr->xlocks.count(stray)) {
- xlock_finish(stray, mdr.get(), &need_issue);
- } else if (mdr->rdlocks.count(stray)) {
- rdlock_finish(stray, mdr.get(), &need_issue);
+ if (stray.is_xlock()) {
+ xlock_finish(it, mdr.get(), &need_issue);
+ } else if (stray.is_rdlock()) {
+ rdlock_finish(it, mdr.get(), &need_issue);
} else {
// may have acquired both wrlock and remore wrlock
- if (mdr->wrlocks.count(stray))
- wrlock_finish(stray, mdr.get(), &need_issue);
- if (mdr->remote_wrlocks.count(stray))
- remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+ if (stray.is_wrlock())
+ wrlock_finish(it, mdr.get(), &need_issue);
+ if (stray.is_remote_wrlock())
+ remote_wrlock_finish(it, mdr.get());
}
if (need_issue)
- issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+ issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
}
mdr->done_locking = true;
return result;
}
-
-void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
+void Locker::notify_freeze_waiter(MDSCacheObject *o)
{
- for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
- p != mut->xlocks.end();
- ++p) {
- MDSCacheObject *object = (*p)->get_parent();
- assert(object->is_auth());
- if (skip_dentry &&
- ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
- continue;
- dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
- (*p)->set_xlock_done();
+ CDir *dir = NULL;
+ if (CInode *in = dynamic_cast<CInode*>(o)) {
+ if (!in->is_root())
+ dir = in->get_parent_dir();
+ } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
+ dir = dn->get_dir();
+ } else {
+ dir = dynamic_cast<CDir*>(o);
+ ceph_assert(dir);
+ }
+ if (dir) {
+ if (dir->is_freezing_dir())
+ mdcache->fragment_freeze_inc_num_waiters(dir);
+ if (dir->is_freezing_tree()) {
+ while (!dir->is_freezing_tree_root())
+ dir = dir->get_parent_dir();
+ mdcache->migrator->export_freeze_inc_num_waiters(dir);
+ }
}
}
-void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
{
- while (!mut->rdlocks.empty()) {
- bool ni = false;
- MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
- rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
- if (ni)
- pneed_issue->insert(static_cast<CInode*>(p));
+ for (const auto &p : mut->locks) {
+ if (!p.is_xlock())
+ continue;
+ MDSCacheObject *obj = p.lock->get_parent();
+ ceph_assert(obj->is_auth());
+ if (skip_dentry &&
+ (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
+ continue;
+ dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
+ p.lock->set_xlock_done();
}
}
-void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
+ bool drop_rdlocks)
{
set<mds_rank_t> slaves;
- while (!mut->xlocks.empty()) {
- SimpleLock *lock = *mut->xlocks.begin();
- MDSCacheObject *p = lock->get_parent();
- if (!p->is_auth()) {
- assert(lock->get_sm()->can_remote_xlock);
- slaves.insert(p->authority().first);
- lock->put_xlock();
- mut->locks.erase(lock);
- mut->xlocks.erase(lock);
- continue;
- }
- bool ni = false;
- xlock_finish(lock, mut, &ni);
- if (ni)
- pneed_issue->insert(static_cast<CInode*>(p));
- }
-
- while (!mut->remote_wrlocks.empty()) {
- map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
- slaves.insert(p->second);
- if (mut->wrlocks.count(p->first) == 0)
- mut->locks.erase(p->first);
- mut->remote_wrlocks.erase(p);
- }
+ for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+ SimpleLock *lock = it->lock;
+ MDSCacheObject *obj = lock->get_parent();
- while (!mut->wrlocks.empty()) {
- bool ni = false;
- MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
- wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
- if (ni)
- pneed_issue->insert(static_cast<CInode*>(p));
+ if (it->is_xlock()) {
+ if (obj->is_auth()) {
+ bool ni = false;
+ xlock_finish(it++, mut, &ni);
+ if (ni)
+ pneed_issue->insert(static_cast<CInode*>(obj));
+ } else {
+ ceph_assert(lock->get_sm()->can_remote_xlock);
+ slaves.insert(obj->authority().first);
+ lock->put_xlock();
+ mut->locks.erase(it++);
+ }
+ } else if (it->is_wrlock() || it->is_remote_wrlock()) {
+ if (it->is_remote_wrlock()) {
+ slaves.insert(it->wrlock_target);
+ it->clear_remote_wrlock();
+ }
+ if (it->is_wrlock()) {
+ bool ni = false;
+ wrlock_finish(it++, mut, &ni);
+ if (ni)
+ pneed_issue->insert(static_cast<CInode*>(obj));
+ } else {
+ mut->locks.erase(it++);
+ }
+ } else if (drop_rdlocks && it->is_rdlock()) {
+ bool ni = false;
+ rdlock_finish(it++, mut, &ni);
+ if (ni)
+ pneed_issue->insert(static_cast<CInode*>(obj));
+ } else {
+ ++it;
+ }
}
for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
if (!mds->is_cluster_degraded() ||
mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
- MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
- MMDSSlaveRequest::OP_DROPLOCKS);
+ auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_DROPLOCKS);
mds->send_message_mds(slavereq, *p);
}
}
void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
{
SimpleLock *lock = mut->locking;
- assert(lock);
+ ceph_assert(lock);
dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
if (lock->get_parent()->is_auth()) {
bool need_issue = false;
if (lock->get_state() == LOCK_PREXLOCK) {
_finish_xlock(lock, -1, &need_issue);
- } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
- lock->get_num_xlocks() == 0) {
+ } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
lock->set_state(LOCK_XLOCKDONE);
eval_gather(lock, true, &need_issue);
}
if (mut->locking)
cancel_locking(mut, pneed_issue);
- _drop_non_rdlocks(mut, pneed_issue);
- _drop_rdlocks(mut, pneed_issue);
+ _drop_locks(mut, pneed_issue, true);
if (pneed_issue == &my_need_issue)
issue_caps_set(*pneed_issue);
if (!pneed_issue)
pneed_issue = &my_need_issue;
- _drop_non_rdlocks(mut, pneed_issue);
+ _drop_locks(mut, pneed_issue, false);
if (pneed_issue == &my_need_issue)
issue_caps_set(*pneed_issue);
}
-void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
{
- set<CInode*> my_need_issue;
- if (!pneed_issue)
- pneed_issue = &my_need_issue;
+ set<CInode*> need_issue;
- _drop_rdlocks(mut, pneed_issue);
+ for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+ if (!it->is_rdlock()) {
+ ++it;
+ continue;
+ }
+ SimpleLock *lock = it->lock;
+ // make later mksnap/setlayout (at other mds) wait for this unsafe request
+ if (lock->get_type() == CEPH_LOCK_ISNAP ||
+ lock->get_type() == CEPH_LOCK_IPOLICY) {
+ ++it;
+ continue;
+ }
+ bool ni = false;
+ rdlock_finish(it++, mut, &ni);
+ if (ni)
+ need_issue.insert(static_cast<CInode*>(lock->get_parent()));
+ }
- if (pneed_issue == &my_need_issue)
- issue_caps_set(*pneed_issue);
+ issue_caps_set(need_issue);
}
+void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
+{
+ set<CInode*> need_issue;
+
+ for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+ SimpleLock *lock = it->lock;
+ if (lock->get_type() == CEPH_LOCK_IDFT) {
+ ++it;
+ continue;
+ }
+ bool ni = false;
+ wrlock_finish(it++, mut, &ni);
+ if (ni)
+ need_issue.insert(static_cast<CInode*>(lock->get_parent()));
+ }
+ issue_caps_set(need_issue);
+}
// generics
-void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
+void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
{
dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
- assert(!lock->is_stable());
+ ceph_assert(!lock->is_stable());
int next = lock->get_next_state();
bool need_issue = false;
int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
- assert(!caps || in != NULL);
+ ceph_assert(!caps || in != NULL);
if (caps && in->is_head()) {
in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
lock->get_cap_shift(), lock->get_cap_mask());
<< " on " << *lock->get_parent() << dendl;
if (lock->get_sm() == &sm_filelock) {
- assert(in);
+ ceph_assert(in);
if (in->state_test(CInode::STATE_RECOVERING)) {
dout(7) << "eval_gather finished gather, but still recovering" << dendl;
return;
mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
switch (lock->get_state()) {
case LOCK_SYNC_LOCK:
- mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
- auth);
+ mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
break;
case LOCK_MIX_SYNC:
{
- MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
+ auto reply = MLock::create(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
lock->encode_locked_state(reply->get_data());
mds->send_message_mds(reply, auth);
next = LOCK_MIX_SYNC2;
case LOCK_SYNC_MIX:
{
- MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
+ auto reply = MLock::create(lock, LOCK_AC_MIXACK, mds->get_nodeid());
mds->send_message_mds(reply, auth);
next = LOCK_SYNC_MIX2;
}
{
bufferlist data;
lock->encode_locked_state(data);
- mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
+ mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
(static_cast<ScatterLock *>(lock))->start_flush();
// we'll get an AC_LOCKFLUSHED to complete
}
case LOCK_TSYN_MIX:
case LOCK_SYNC_MIX:
case LOCK_EXCL_MIX:
+ case LOCK_XSYN_MIX:
in->start_scatter(static_cast<ScatterLock *>(lock));
if (lock->get_parent()->is_replicated()) {
bufferlist softdata;
bool Locker::eval(CInode *in, int mask, bool caps_imported)
{
bool need_issue = caps_imported;
- list<MDSInternalContextBase*> finishers;
+ MDSContext::vec finishers;
dout(10) << "eval " << mask << " " << *in << dendl;
// choose loner?
if (in->is_auth() && in->is_head()) {
- if (in->choose_ideal_loner() >= 0) {
- if (in->try_set_loner()) {
- dout(10) << "eval set loner to client." << in->get_loner() << dendl;
- need_issue = true;
- mask = -1;
- } else
- dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
- } else
- dout(10) << "eval doesn't want loner" << dendl;
+ client_t orig_loner = in->get_loner();
+ if (in->choose_ideal_loner()) {
+ dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
+ need_issue = true;
+ mask = -1;
+ } else if (in->get_wanted_loner() != in->get_loner()) {
+ dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
+ mask = -1;
+ }
}
retry:
// drop loner?
if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
- dout(10) << " trying to drop loner" << dendl;
if (in->try_drop_loner()) {
- dout(10) << " dropped loner" << dendl;
need_issue = true;
-
if (in->get_wanted_loner() >= 0) {
- if (in->try_set_loner()) {
- dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
- mask = -1;
- goto retry;
- } else {
- dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
- }
+ dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
+ bool ok = in->try_set_loner();
+ ceph_assert(ok);
+ mask = -1;
+ goto retry;
}
}
}
C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
// We are used as an MDSCacheObject waiter, so should
// only be invoked by someone already holding the big lock.
- assert(locker->mds->mds_lock.is_locked_by_me());
+ ceph_assert(locker->mds->mds_lock.is_locked_by_me());
p->get(MDSCacheObject::PIN_PTRWAITER);
}
void finish(int r) override {
}
if (mask & CEPH_LOCK_DN) {
- assert(mask == CEPH_LOCK_DN);
+ ceph_assert(mask == CEPH_LOCK_DN);
bool need_issue = false; // ignore this, no caps on dentries
CDentry *dn = static_cast<CDentry *>(p);
eval_any(&dn->lock, &need_issue);
}
}
- if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
+ if (lock->get_type() != CEPH_LOCK_DN &&
+ lock->get_type() != CEPH_LOCK_ISNAP &&
+ p->is_freezing()) {
dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
return;
void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
{
bool need_issue = false;
- list<MDSInternalContextBase*> finishers;
+ MDSContext::vec finishers;
// kick locks now
if (!in->filelock.is_stable())
void Locker::eval_scatter_gathers(CInode *in)
{
bool need_issue = false;
- list<MDSInternalContextBase*> finishers;
+ MDSContext::vec finishers;
dout(10) << "eval_scatter_gathers " << *in << dendl;
mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
dout(10) << "requesting rdlock from auth on "
<< *lock << " on " << *lock->get_parent() << dendl;
- mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
+ mds->send_message_mds(MLock::create(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
}
return false;
}
return false;
}
-bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
+bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSContext *con)
{
dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
// can read? grab ref.
if (lock->can_rdlock(client)) {
lock->get_rdlock();
- mut->rdlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
return true;
}
lock->get_state() == LOCK_SNAP_SYNC) {
// okay, we actually need to kick the head's lock to get ourselves synced up.
CInode *head = mdcache->get_inode(in->ino());
- assert(head);
- SimpleLock *hlock = head->get_lock(lock->get_type());
+ ceph_assert(head);
+ SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
+ if (hlock->get_state() == LOCK_SYNC)
+ hlock = head->get_lock(lock->get_type());
+
if (hlock->get_state() != LOCK_SYNC) {
dout(10) << "rdlock_start trying head inode " << *head << dendl;
- if (!rdlock_start(head->get_lock(lock->get_type()), mut, true)) // ** as_anon, no rdlock on EXCL **
+ if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
return false;
// oh, check our lock again then
}
mds->mdlog->flush();
}
-void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
{
+ ceph_assert(it->is_rdlock());
+ SimpleLock *lock = it->lock;
// drop ref
lock->put_rdlock();
- if (mut) {
- mut->rdlocks.erase(lock);
- mut->locks.erase(lock);
- }
+ if (mut)
+ mut->locks.erase(it);
dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
}
-bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
+bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov)
{
- dout(10) << "can_rdlock_set " << locks << dendl;
- for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
- if (!(*p)->can_rdlock(-1)) {
- dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
+ dout(10) << "can_rdlock_set " << dendl;
+ for (const auto& p : lov) {
+ ceph_assert(p.is_rdlock());
+ if (!p.lock->can_rdlock(-1)) {
+ dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl;
return false;
}
+ }
return true;
}
-bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
-{
- dout(10) << "rdlock_try_set " << locks << dendl;
- for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
- if (!rdlock_try(*p, -1, NULL)) {
- dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
- return false;
- }
- return true;
-}
-void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
+void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
{
- dout(10) << "rdlock_take_set " << locks << dendl;
- for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
- (*p)->get_rdlock();
- mut->rdlocks.insert(*p);
- mut->locks.insert(*p);
+ dout(10) << "rdlock_take_set " << dendl;
+ for (const auto& p : lov) {
+ ceph_assert(p.is_rdlock());
+ p.lock->get_rdlock();
+ mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
}
}
dout(7) << "wrlock_force on " << *lock
<< " on " << *lock->get_parent() << dendl;
lock->get_wrlock(true);
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
}
bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
if (lock->can_wrlock(client) &&
(!want_scatter || lock->get_state() == LOCK_MIX)) {
lock->get_wrlock();
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+ it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
return true;
}
mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
dout(10) << "requesting scatter from auth on "
<< *lock << " on " << *lock->get_parent() << dendl;
- mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
+ mds->send_message_mds(MLock::create(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
}
break;
}
return false;
}
-void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
{
+ ceph_assert(it->is_wrlock());
+ SimpleLock* lock = it->lock;
+
if (lock->get_type() == CEPH_LOCK_IVERSION ||
lock->get_type() == CEPH_LOCK_DVERSION)
- return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
+ return local_wrlock_finish(it, mut);
dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
lock->put_wrlock();
- if (mut) {
- mut->wrlocks.erase(lock);
- if (mut->remote_wrlocks.count(lock) == 0)
- mut->locks.erase(lock);
- }
+
+ if (it->is_remote_wrlock())
+ it->clear_wrlock();
+ else
+ mut->locks.erase(it);
if (!lock->is_wrlocked()) {
if (!lock->is_stable())
// send lock request
mut->start_locking(lock, target);
mut->more()->slaves.insert(target);
- MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
- MMDSSlaveRequest::OP_WRLOCK);
+ auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
r->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(r->get_object_info());
mds->send_message_mds(r, target);
- assert(mut->more()->waiting_on_slave.count(target) == 0);
+ ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
mut->more()->waiting_on_slave.insert(target);
}
-void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
- MutationImpl *mut)
+void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
- // drop ref
- mut->remote_wrlocks.erase(lock);
- if (mut->wrlocks.count(lock) == 0)
- mut->locks.erase(lock);
+ ceph_assert(it->is_remote_wrlock());
+ SimpleLock *lock = it->lock;
+ mds_rank_t target = it->wrlock_target;
+
+ if (it->is_wrlock())
+ it->clear_remote_wrlock();
+ else
+ mut->locks.erase(it);
dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
<< " " << *lock->get_parent() << dendl;
if (!mds->is_cluster_degraded() ||
mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
- MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
- MMDSSlaveRequest::OP_UNWRLOCK);
+ auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
slavereq->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(slavereq->get_object_info());
mds->send_message_mds(slavereq, target);
dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
client_t client = mut->get_client();
+ CInode *in = nullptr;
+ if (lock->get_cap_shift())
+ in = static_cast<CInode *>(lock->get_parent());
+
// auth?
if (lock->get_parent()->is_auth()) {
// auth
while (1) {
- if (lock->can_xlock(client)) {
+ if (lock->can_xlock(client) &&
+ !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
+ in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
lock->set_state(LOCK_XLOCK);
lock->get_xlock(mut, client);
- mut->xlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
mut->finish_locking(lock);
return true;
}
- if (lock->get_type() == CEPH_LOCK_IFILE) {
- CInode *in = static_cast<CInode*>(lock->get_parent());
- if (in->state_test(CInode::STATE_RECOVERING)) {
- mds->mdcache->recovery_queue.prioritize(in);
- }
+ if (lock->get_type() == CEPH_LOCK_IFILE &&
+ in->state_test(CInode::STATE_RECOVERING)) {
+ mds->mdcache->recovery_queue.prioritize(in);
}
if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
return false;
} else {
// replica
- assert(lock->get_sm()->can_remote_xlock);
- assert(!mut->slave_request);
+ ceph_assert(lock->get_sm()->can_remote_xlock);
+ ceph_assert(!mut->slave_request);
// wait for single auth
if (lock->get_parent()->is_ambiguous_auth()) {
// send lock request
mut->more()->slaves.insert(auth);
mut->start_locking(lock, auth);
- MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
- MMDSSlaveRequest::OP_XLOCK);
+ auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
r->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(r->get_object_info());
mds->send_message_mds(r, auth);
- assert(mut->more()->waiting_on_slave.count(auth) == 0);
+ ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
mut->more()->waiting_on_slave.insert(auth);
return false;
void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
{
- assert(!lock->is_stable());
- if (lock->get_num_rdlocks() == 0 &&
+ ceph_assert(!lock->is_stable());
+ if (lock->get_type() != CEPH_LOCK_DN &&
+ lock->get_type() != CEPH_LOCK_ISNAP &&
+ lock->get_num_rdlocks() == 0 &&
lock->get_num_wrlocks() == 0 &&
- lock->get_num_client_lease() == 0 &&
- lock->get_state() != LOCK_XLOCKSNAP &&
- lock->get_type() != CEPH_LOCK_DN) {
+ !lock->is_leased() &&
+ lock->get_state() != LOCK_XLOCKSNAP) {
CInode *in = static_cast<CInode*>(lock->get_parent());
client_t loner = in->get_target_loner();
if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
}
-void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
{
+ ceph_assert(it->is_xlock());
+ SimpleLock *lock = it->lock;
+
if (lock->get_type() == CEPH_LOCK_IVERSION ||
lock->get_type() == CEPH_LOCK_DVERSION)
- return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
+ return local_xlock_finish(it, mut);
dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
// drop ref
lock->put_xlock();
- assert(mut);
- mut->xlocks.erase(lock);
- mut->locks.erase(lock);
+ ceph_assert(mut);
+ mut->locks.erase(it);
bool do_issue = false;
// remote xlock?
if (!lock->get_parent()->is_auth()) {
- assert(lock->get_sm()->can_remote_xlock);
+ ceph_assert(lock->get_sm()->can_remote_xlock);
// tell auth
dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
mds_rank_t auth = lock->get_parent()->authority().first;
if (!mds->is_cluster_degraded() ||
mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
- MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
- MMDSSlaveRequest::OP_UNXLOCK);
+ auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
slavereq->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(slavereq->get_object_info());
mds->send_message_mds(slavereq, auth);
SimpleLock::WAIT_WR |
SimpleLock::WAIT_RD, 0);
} else {
- if (lock->get_num_xlocks() == 0) {
- if (lock->get_state() == LOCK_LOCK_XLOCK)
- lock->set_state(LOCK_XLOCKDONE);
+ if (lock->get_num_xlocks() == 0 &&
+ lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
_finish_xlock(lock, xlocker, &do_issue);
}
}
}
}
-void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
+void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
+ ceph_assert(it->is_xlock());
+ SimpleLock *lock = it->lock;
dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
lock->put_xlock();
- mut->xlocks.erase(lock);
- mut->locks.erase(lock);
+ mut->locks.erase(it);
MDSCacheObject *p = lock->get_parent();
- assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
+ ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
if (!lock->is_stable())
lock->get_parent()->auth_unpin(lock);
class C_Locker_FileUpdate_finish : public LockerLogContext {
CInode *in;
MutationRef mut;
- bool share_max;
- bool need_issue;
+ unsigned flags;
client_t client;
- MClientCaps *ack;
+ MClientCaps::ref ack;
public:
- C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
- bool sm=false, bool ni=false, client_t c=-1,
- MClientCaps *ac = 0)
- : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
- client(c), ack(ac) {
+ C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
+ const MClientCaps::ref &ack, client_t c=-1)
+ : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
in->get(CInode::PIN_PTRWAITER);
}
void finish(int r) override {
- locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
+ locker->file_update_finish(in, mut, flags, client, ack);
in->put(CInode::PIN_PTRWAITER);
}
};
-void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
- client_t client, MClientCaps *ack)
+enum {
+ UPDATE_SHAREMAX = 1,
+ UPDATE_NEEDSISSUE = 2,
+ UPDATE_SNAPFLUSH = 4,
+};
+
+void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
+ client_t client, const MClientCaps::ref &ack)
{
dout(10) << "file_update_finish on " << *in << dendl;
in->pop_and_dirty_projected_inode(mut->ls);
mut->apply();
-
+
if (ack) {
Session *session = mds->get_session(client);
- if (session) {
+ if (session && !session->is_closed()) {
// "oldest flush tid" > 0 means client uses unique TID for each flush
if (ack->get_oldest_flush_tid() > 0)
- session->add_completed_flush(ack->get_client_tid());
+ session->add_completed_flush(ack->get_client_tid());
mds->send_message_client_counted(ack, session);
} else {
dout(10) << " no session for client." << client << " " << *ack << dendl;
- ack->put();
}
}
set<CInode*> need_issue;
drop_locks(mut.get(), &need_issue);
- if (!in->is_head() && !in->client_snap_caps.empty()) {
- dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
- // check for snap writeback completion
- bool gather = false;
- compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
- while (p != in->client_snap_caps.end()) {
- SimpleLock *lock = in->get_lock(p->first);
- assert(lock);
- dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
- << " lock " << *lock << " on " << *in << dendl;
- lock->put_wrlock();
-
- p->second.erase(client);
- if (p->second.empty()) {
- gather = true;
- in->client_snap_caps.erase(p++);
- } else
- ++p;
- }
- if (gather) {
- if (in->client_snap_caps.empty())
- in->item_open_file.remove_myself();
- eval_cap_gather(in, &need_issue);
- }
- } else {
- if (issue_client_cap && need_issue.count(in) == 0) {
+ if (in->is_head()) {
+ if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
Capability *cap = in->get_client_cap(client);
if (cap && (cap->wanted() & ~cap->pending()))
issue_caps(in, cap);
}
-
- if (share_max && in->is_auth() &&
+
+ if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
(in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
share_inode_max_size(in);
+
+ } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
+ dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
+ // check for snap writeback completion
+ in->client_snap_caps.erase(client);
+ if (in->client_snap_caps.empty()) {
+ for (int i = 0; i < num_cinode_locks; i++) {
+ SimpleLock *lock = in->get_lock(cinode_lock_info[i].lock);
+ ceph_assert(lock);
+ lock->put_wrlock();
+ }
+ in->item_open_file.remove_myself();
+ in->item_caps.remove_myself();
+ eval_cap_gather(in, &need_issue);
+ }
}
issue_caps_set(need_issue);
+ mds->balancer->hit_inode(in, META_POP_IWR);
+
// auth unpin after issuing caps
mut->cleanup();
}
bool is_new;
// if replay, try to reconnect cap, and otherwise do nothing.
- if (is_replay) {
- mds->mdcache->try_reconnect_cap(in, session);
- return 0;
- }
+ if (is_replay)
+ return mds->mdcache->try_reconnect_cap(in, session);
+
// my needs
- assert(session->info.inst.name.is_client());
- client_t my_client = session->info.inst.name.num();
+ ceph_assert(session->info.inst.name.is_client());
+ client_t my_client = session->get_client();
int my_want = ceph_caps_for_mode(mode);
// register a capability
issue_caps(*p);
}
-bool Locker::issue_caps(CInode *in, Capability *only_cap)
+class C_Locker_RevokeStaleCap : public LockerContext {
+ CInode *in;
+ client_t client;
+public:
+ C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
+ LockerContext(l), in(i), client(c) {
+ in->get(CInode::PIN_PTRWAITER);
+ }
+ void finish(int r) override {
+ locker->revoke_stale_cap(in, client);
+ in->put(CInode::PIN_PTRWAITER);
+ }
+};
+
+int Locker::issue_caps(CInode *in, Capability *only_cap)
{
// allowed caps are determined by the lock mode.
int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
<< " on " << *in << dendl;
}
- assert(in->is_head());
+ ceph_assert(in->is_head());
// count conflicts with
int nissued = 0;
// client caps
- map<client_t, Capability*>::iterator it;
+ map<client_t, Capability>::iterator it;
if (only_cap)
it = in->client_caps.find(only_cap->get_client());
else
it = in->client_caps.begin();
for (; it != in->client_caps.end(); ++it) {
- Capability *cap = it->second;
- if (cap->is_stale())
- continue;
+ Capability *cap = &it->second;
// do not issue _new_ bits when size|mtime is projected
int allowed;
// add in any xlocker-only caps (for locks this client is the xlocker for)
allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
- Session *session = mds->get_session(it->first);
- if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
- !(session && session->connection &&
- session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
+ if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
+ cap->is_noinline()) ||
+ (!in->inode.layout.pool_ns.empty() &&
+ cap->is_nopoolns()))
allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
int pending = cap->pending();
if (!(pending & ~allowed)) {
// skip if suppress or new, and not revocation
- if (cap->is_new() || cap->is_suppress()) {
- dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
+ if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
+ dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
+ continue;
+ }
+ } else {
+ ceph_assert(!cap->is_new());
+ if (cap->is_stale()) {
+ dout(20) << " revoke stale cap from client." << it->first << dendl;
+ ceph_assert(!cap->is_valid());
+ cap->issue(allowed & pending, false);
+ mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
continue;
}
+
+ if (!cap->is_valid() && (pending & ~CEPH_CAP_PIN)) {
+ // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
+ // mds needs to re-issue caps, then do revocation.
+ long seq = cap->issue(pending, true);
+
+ dout(7) << " sending MClientCaps to client." << it->first
+ << " seq " << seq << " re-issue " << ccap_string(pending) << dendl;
+
+ auto m = MClientCaps::create(CEPH_CAP_OP_GRANT, in->ino(),
+ in->find_snaprealm()->inode->ino(),
+ cap->get_cap_id(), cap->get_last_seq(),
+ pending, wanted, 0, cap->get_mseq(),
+ mds->get_osd_epoch_barrier());
+ in->encode_cap_message(m, cap);
+
+ mds->send_message_client_counted(m, cap->get_session());
+ }
}
// notify clients about deleted inode, to make sure they release caps ASAP.
// are there caps that the client _wants_ and can have, but aren't pending?
// or do we need to revoke?
- if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
- (pending & ~allowed)) { // need to revoke ~allowed caps.
+ if ((pending & ~allowed) || // need to revoke ~allowed caps.
+ ((wanted & allowed) & ~pending) || // missing wanted+allowed caps
+ !cap->is_valid()) { // after stale->resume circle
// issue
nissued++;
int before = pending;
long seq;
if (pending & ~allowed)
- seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
+ seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new.
else
- seq = cap->issue((wanted|likes) & allowed);
+ seq = cap->issue((wanted|likes) & allowed, true);
int after = cap->pending();
- if (cap->is_new()) {
- // haven't send caps to client yet
- if (before & ~after)
- cap->confirm_receipt(seq, after);
- } else {
- dout(7) << " sending MClientCaps to client." << it->first
- << " seq " << cap->get_last_seq()
- << " new pending " << ccap_string(after) << " was " << ccap_string(before)
- << dendl;
-
- int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
- if (op == CEPH_CAP_OP_REVOKE) {
- revoking_caps.push_back(&cap->item_revoking_caps);
- revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
- cap->set_last_revoke_stamp(ceph_clock_now());
- cap->reset_num_revoke_warnings();
- }
-
- MClientCaps *m = new MClientCaps(op, in->ino(),
- in->find_snaprealm()->inode->ino(),
- cap->get_cap_id(), cap->get_last_seq(),
- after, wanted, 0,
- cap->get_mseq(),
- mds->get_osd_epoch_barrier());
- in->encode_cap_message(m, cap);
+ dout(7) << " sending MClientCaps to client." << it->first
+ << " seq " << seq << " new pending " << ccap_string(after)
+ << " was " << ccap_string(before) << dendl;
- mds->send_message_client_counted(m, it->first);
+ int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
+ if (op == CEPH_CAP_OP_REVOKE) {
+ revoking_caps.push_back(&cap->item_revoking_caps);
+ revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
+ cap->set_last_revoke_stamp(ceph_clock_now());
+ cap->reset_num_revoke_warnings();
}
+
+ auto m = MClientCaps::create(op, in->ino(),
+ in->find_snaprealm()->inode->ino(),
+ cap->get_cap_id(), cap->get_last_seq(),
+ after, wanted, 0, cap->get_mseq(),
+ mds->get_osd_epoch_barrier());
+ in->encode_cap_message(m, cap);
+
+ mds->send_message_client_counted(m, cap->get_session());
}
if (only_cap)
break;
}
- return (nissued == 0); // true if no re-issued, no callbacks
+ return nissued;
}
void Locker::issue_truncate(CInode *in)
{
dout(7) << "issue_truncate on " << *in << dendl;
- for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
- it != in->client_caps.end();
- ++it) {
- Capability *cap = it->second;
- MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
- in->ino(),
- in->find_snaprealm()->inode->ino(),
- cap->get_cap_id(), cap->get_last_seq(),
- cap->pending(), cap->wanted(), 0,
- cap->get_mseq(),
- mds->get_osd_epoch_barrier());
+ for (auto &p : in->client_caps) {
+ Capability *cap = &p.second;
+ auto m = MClientCaps::create(CEPH_CAP_OP_TRUNC,
+ in->ino(),
+ in->find_snaprealm()->inode->ino(),
+ cap->get_cap_id(), cap->get_last_seq(),
+ cap->pending(), cap->wanted(), 0,
+ cap->get_mseq(),
+ mds->get_osd_epoch_barrier());
in->encode_cap_message(m, cap);
- mds->send_message_client_counted(m, it->first);
+ mds->send_message_client_counted(m, p.first);
}
// should we increase max_size?
}
-void Locker::revoke_stale_caps(Capability *cap)
+void Locker::revoke_stale_cap(CInode *in, client_t client)
{
- CInode *in = cap->get_inode();
- if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
- // if export succeeds, the cap will be removed. if export fails, we need to
- // revoke the cap if it's still stale.
- in->state_set(CInode::STATE_EVALSTALECAPS);
+ dout(7) << __func__ << " client." << client << " on " << *in << dendl;
+ Capability *cap = in->get_client_cap(client);
+ if (!cap)
+ return;
+
+ if (cap->revoking() & CEPH_CAP_ANY_WR) {
+ std::stringstream ss;
+ mds->evict_client(client.v, false, g_conf()->mds_session_blacklist_on_timeout, ss, nullptr);
return;
}
- int issued = cap->issued();
- if (issued & ~CEPH_CAP_PIN) {
+ cap->revoke();
+
+ if (in->is_auth() && in->inode.client_ranges.count(cap->get_client()))
+ in->state_set(CInode::STATE_NEEDSRECOVER);
+
+ if (in->state_test(CInode::STATE_EXPORTINGCAPS))
+ return;
+
+ if (!in->filelock.is_stable())
+ eval_gather(&in->filelock);
+ if (!in->linklock.is_stable())
+ eval_gather(&in->linklock);
+ if (!in->authlock.is_stable())
+ eval_gather(&in->authlock);
+ if (!in->xattrlock.is_stable())
+ eval_gather(&in->xattrlock);
+
+ if (in->is_auth())
+ try_eval(in, CEPH_CAP_LOCKS);
+ else
+ request_inode_file_caps(in);
+}
+
+bool Locker::revoke_stale_caps(Session *session)
+{
+ dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
+
+ // invalidate all caps
+ session->inc_cap_gen();
+
+ bool ret = true;
+ std::vector<CInode*> to_eval;
+
+ for (auto p = session->caps.begin(); !p.end(); ) {
+ Capability *cap = *p;
+ ++p;
+ if (!cap->is_notable()) {
+ // the rest ones are not being revoked and don't have writeable range
+ // and don't want exclusive caps or want file read/write. They don't
+ // need recover, they don't affect eval_gather()/try_eval()
+ break;
+ }
+
+ int revoking = cap->revoking();
+ if (!revoking)
+ continue;
+
+ if (revoking & CEPH_CAP_ANY_WR) {
+ ret = false;
+ break;
+ }
+
+ int issued = cap->issued();
+ CInode *in = cap->get_inode();
dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
cap->revoke();
in->inode.client_ranges.count(cap->get_client()))
in->state_set(CInode::STATE_NEEDSRECOVER);
- if (!in->filelock.is_stable()) eval_gather(&in->filelock);
- if (!in->linklock.is_stable()) eval_gather(&in->linklock);
- if (!in->authlock.is_stable()) eval_gather(&in->authlock);
- if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
+ // eval lock/inode may finish contexts, which may modify other cap's position
+ // in the session->caps.
+ to_eval.push_back(in);
+ }
+
+ for (auto in : to_eval) {
+ if (in->state_test(CInode::STATE_EXPORTINGCAPS))
+ continue;
- if (in->is_auth()) {
+ if (!in->filelock.is_stable())
+ eval_gather(&in->filelock);
+ if (!in->linklock.is_stable())
+ eval_gather(&in->linklock);
+ if (!in->authlock.is_stable())
+ eval_gather(&in->authlock);
+ if (!in->xattrlock.is_stable())
+ eval_gather(&in->xattrlock);
+
+ if (in->is_auth())
try_eval(in, CEPH_CAP_LOCKS);
- } else {
+ else
request_inode_file_caps(in);
- }
}
-}
-
-void Locker::revoke_stale_caps(Session *session)
-{
- dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
- for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
- Capability *cap = *p;
- cap->mark_stale();
- revoke_stale_caps(cap);
- }
+ return ret;
}
void Locker::resume_stale_caps(Session *session)
{
dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
- for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
+ bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
+ for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
Capability *cap = *p;
+ ++p;
+ if (lazy && !cap->is_notable())
+ break; // see revoke_stale_caps()
+
CInode *in = cap->get_inode();
- assert(in->is_head());
- if (cap->is_stale()) {
- dout(10) << " clearing stale flag on " << *in << dendl;
- cap->clear_stale();
-
- if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
- // if export succeeds, the cap will be removed. if export fails,
- // we need to re-issue the cap if it's not stale.
- in->state_set(CInode::STATE_EVALSTALECAPS);
- continue;
- }
+ ceph_assert(in->is_head());
+ dout(10) << " clearing stale flag on " << *in << dendl;
- if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
- issue_caps(in, cap);
+ if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
+ // if export succeeds, the cap will be removed. if export fails,
+ // we need to re-issue the cap if it's not stale.
+ in->state_set(CInode::STATE_EVALSTALECAPS);
+ continue;
}
+
+ if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
+ issue_caps(in, cap);
}
}
void Locker::request_inode_file_caps(CInode *in)
{
- assert(!in->is_auth());
+ ceph_assert(!in->is_auth());
- int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
+ int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
if (wanted != in->replica_caps_wanted) {
// wait for single auth
if (in->is_ambiguous_auth()) {
if (!mds->is_cluster_degraded() ||
mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
- mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
- auth);
+ mds->send_message_mds(MInodeFileCaps::create(in->ino(), in->replica_caps_wanted), auth);
}
}
-/* This function DOES put the passed message before returning */
-void Locker::handle_inode_file_caps(MInodeFileCaps *m)
+void Locker::handle_inode_file_caps(const MInodeFileCaps::const_ref &m)
{
// nobody should be talking to us during recovery.
- assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+ if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
+ if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
+ mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+ return;
+ }
+ ceph_abort_msg("got unexpected message during recovery");
+ }
// ok
CInode *in = mdcache->get_inode(m->get_ino());
mds_rank_t from = mds_rank_t(m->get_source().num());
- assert(in);
- assert(in->is_auth());
+ ceph_assert(in);
+ ceph_assert(in->is_auth());
dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
- if (m->get_caps())
- in->mds_caps_wanted[from] = m->get_caps();
- else
- in->mds_caps_wanted.erase(from);
+ in->set_mds_caps_wanted(from, m->get_caps());
try_eval(in, CEPH_CAP_LOCKS);
- m->put();
}
}
};
-uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
+uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
{
uint64_t new_max = (size + 1) << 1;
- uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
+ uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
if (max_inc > 0) {
- max_inc *= pi->get_layout_size_increment();
- new_max = MIN(new_max, size + max_inc);
+ max_inc *= pi->layout.object_size;
+ new_max = std::min(new_max, size + max_inc);
}
- return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
+ return round_up_to(new_max, pi->get_layout_size_increment());
}
-void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
- map<client_t,client_writeable_range_t> *new_ranges,
+void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
+ CInode::mempool_inode::client_range_map *new_ranges,
bool *max_increased)
{
- inode_t *latest = in->get_projected_inode();
+ auto latest = in->get_projected_inode();
uint64_t ms;
- if(latest->has_layout()) {
+ if (latest->has_layout()) {
ms = calc_new_max_size(latest, size);
} else {
// Layout-less directories like ~mds0/, have zero size
// increase ranges as appropriate.
// shrink to 0 if no WR|BUFFER caps issued.
- for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
- p != in->client_caps.end();
- ++p) {
- if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
- client_writeable_range_t& nr = (*new_ranges)[p->first];
+ for (auto &p : in->client_caps) {
+ if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
+ client_writeable_range_t& nr = (*new_ranges)[p.first];
nr.range.first = 0;
- if (latest->client_ranges.count(p->first)) {
- client_writeable_range_t& oldr = latest->client_ranges[p->first];
+ if (latest->client_ranges.count(p.first)) {
+ client_writeable_range_t& oldr = latest->client_ranges[p.first];
if (ms > oldr.range.last)
*max_increased = true;
- nr.range.last = MAX(ms, oldr.range.last);
+ nr.range.last = std::max(ms, oldr.range.last);
nr.follows = oldr.follows;
} else {
*max_increased = true;
nr.range.last = ms;
nr.follows = in->first - 1;
}
+ if (update)
+ p.second.mark_clientwriteable();
+ } else {
+ if (update)
+ p.second.clear_clientwriteable();
}
}
}
uint64_t new_max_size, uint64_t new_size,
utime_t new_mtime)
{
- assert(in->is_auth());
- assert(in->is_file());
+ ceph_assert(in->is_auth());
+ ceph_assert(in->is_file());
- inode_t *latest = in->get_projected_inode();
- map<client_t, client_writeable_range_t> new_ranges;
+ CInode::mempool_inode *latest = in->get_projected_inode();
+ CInode::mempool_inode::client_range_map new_ranges;
uint64_t size = latest->size;
bool update_size = new_size > 0;
bool update_max = false;
bool max_increased = false;
if (update_size) {
- new_size = size = MAX(size, new_size);
- new_mtime = MAX(new_mtime, latest->mtime);
+ new_size = size = std::max(size, new_size);
+ new_mtime = std::max(new_mtime, latest->mtime);
if (latest->size == new_size && latest->mtime == new_mtime)
update_size = false;
}
- calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
+ int can_update = 1;
+ if (in->is_frozen()) {
+ can_update = -1;
+ } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
+ // lock?
+ if (in->filelock.is_stable()) {
+ if (in->get_target_loner() >= 0)
+ file_excl(&in->filelock);
+ else
+ simple_lock(&in->filelock);
+ }
+ if (!in->filelock.can_wrlock(in->get_loner()))
+ can_update = -2;
+ }
+
+ calc_new_client_ranges(in, std::max(new_max_size, size), can_update > 0,
+ &new_ranges, &max_increased);
if (max_increased || latest->client_ranges != new_ranges)
update_max = true;
<< " update_size " << update_size
<< " on " << *in << dendl;
- if (in->is_frozen()) {
- dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
- C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
- new_max_size,
- new_size,
- new_mtime);
- in->add_waiter(CInode::WAIT_UNFREEZE, cms);
- return false;
- }
- if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
- // lock?
- if (in->filelock.is_stable()) {
- if (in->get_target_loner() >= 0)
- file_excl(&in->filelock);
- else
- simple_lock(&in->filelock);
- }
- if (!in->filelock.can_wrlock(in->get_loner())) {
- // try again later
- C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
- new_max_size,
- new_size,
- new_mtime);
-
+ if (can_update < 0) {
+ auto cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime);
+ if (can_update == -1) {
+ dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
+ in->add_waiter(CInode::WAIT_UNFREEZE, cms);
+ } else {
in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
- return false;
}
+ return false;
}
MutationRef mut(new MutationImpl());
mut->ls = mds->mdlog->get_current_segment();
- inode_t *pi = in->project_inode();
- pi->version = in->pre_dirty();
+ auto &pi = in->project_inode();
+ pi.inode.version = in->pre_dirty();
if (update_max) {
- dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
- pi->client_ranges = new_ranges;
+ dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
+ pi.inode.client_ranges = new_ranges;
}
if (update_size) {
- dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
- pi->size = new_size;
- pi->rstat.rbytes = new_size;
- dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
- pi->mtime = new_mtime;
+ dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
+ pi.inode.size = new_size;
+ pi.inode.rstat.rbytes = new_size;
+ dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
+ pi.inode.mtime = new_mtime;
+ if (new_mtime > pi.inode.ctime) {
+ pi.inode.ctime = new_mtime;
+ if (new_mtime > pi.inode.rstat.rctime)
+ pi.inode.rstat.rctime = new_mtime;
+ }
}
// use EOpen if the file is still open; otherwise, use EUpdate.
eo->add_ino(in->ino());
metablob = &eo->metablob;
le = eo;
- mut->ls->open_files.push_back(&in->item_open_file);
} else {
EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
metablob = &eu->metablob;
metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
mdcache->journal_dirty_inode(mut.get(), metablob, in);
}
- mds->mdlog->submit_entry(le,
- new C_Locker_FileUpdate_finish(this, in, mut, true));
+ mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
+ UPDATE_SHAREMAX, MClientCaps::ref()));
wrlock_force(&in->filelock, mut); // wrlock for duration of journal
mut->auth_pin(in);
* the cap later.
*/
dout(10) << "share_inode_max_size on " << *in << dendl;
- map<client_t, Capability*>::iterator it;
+ map<client_t, Capability>::iterator it;
if (only_cap)
it = in->client_caps.find(only_cap->get_client());
else
it = in->client_caps.begin();
for (; it != in->client_caps.end(); ++it) {
const client_t client = it->first;
- Capability *cap = it->second;
+ Capability *cap = &it->second;
if (cap->is_suppress())
continue;
if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
dout(10) << "share_inode_max_size with client." << client << dendl;
cap->inc_last_seq();
- MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
- in->ino(),
- in->find_snaprealm()->inode->ino(),
- cap->get_cap_id(), cap->get_last_seq(),
- cap->pending(), cap->wanted(), 0,
- cap->get_mseq(),
- mds->get_osd_epoch_barrier());
+ auto m = MClientCaps::create(CEPH_CAP_OP_GRANT,
+ in->ino(),
+ in->find_snaprealm()->inode->ino(),
+ cap->get_cap_id(),
+ cap->get_last_seq(),
+ cap->pending(),
+ cap->wanted(), 0,
+ cap->get_mseq(),
+ mds->get_osd_epoch_barrier());
in->encode_cap_message(m, cap);
mds->send_message_client_counted(m, client);
}
return;
}
- if (cap->wanted() == 0) {
- if (cur->item_open_file.is_on_list() &&
- !cur->is_any_caps_wanted()) {
- dout(10) << " removing unwanted file from open file list " << *cur << dendl;
- cur->item_open_file.remove_myself();
- }
- } else {
+ if (cap->wanted()) {
if (cur->state_test(CInode::STATE_RECOVERING) &&
(cap->wanted() & (CEPH_CAP_FILE_RD |
CEPH_CAP_FILE_WR))) {
mds->mdcache->recovery_queue.prioritize(cur);
}
- if (!cur->item_open_file.is_on_list()) {
- dout(10) << " adding to open file list " << *cur << dendl;
- assert(cur->last == CEPH_NOSNAP);
- LogSegment *ls = mds->mdlog->get_current_segment();
+ if (mdcache->open_file_table.should_log_open(cur)) {
+ ceph_assert(cur->last == CEPH_NOSNAP);
EOpen *le = new EOpen(mds->mdlog);
mds->mdlog->start_entry(le);
le->add_clean_inode(cur);
- ls->open_files.push_back(&cur->item_open_file);
mds->mdlog->submit_entry(le);
}
}
}
+void Locker::snapflush_nudge(CInode *in)
+{
+ ceph_assert(in->last != CEPH_NOSNAP);
+ if (in->client_snap_caps.empty())
+ return;
+
+ CInode *head = mdcache->get_inode(in->ino());
+ // head inode gets unpinned when snapflush starts. It might get trimmed
+ // before snapflush finishes.
+ if (!head)
+ return;
+ ceph_assert(head->is_auth());
+ if (head->client_need_snapflush.empty())
+ return;
+
+ SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
+ if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
+ hlock = NULL;
+ for (int i = 0; i < num_cinode_locks; i++) {
+ SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
+ if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
+ hlock = lock;
+ break;
+ }
+ }
+ }
+ if (hlock) {
+ _rdlock_kick(hlock, true);
+ } else {
+ // also, requeue, in case of unstable lock
+ need_snapflush_inodes.push_back(&in->item_caps);
+ }
+}
+
+void Locker::mark_need_snapflush_inode(CInode *in)
+{
+ ceph_assert(in->last != CEPH_NOSNAP);
+ if (!in->item_caps.is_on_list()) {
+ need_snapflush_inodes.push_back(&in->item_caps);
+ utime_t now = ceph_clock_now();
+ in->last_dirstat_prop = now;
+ dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
+ }
+}
+
+bool Locker::is_revoking_any_caps_from(client_t client)
+{
+ auto it = revoking_caps_by_client.find(client);
+ if (it == revoking_caps_by_client.end())
+ return false;
+ return !it->second.empty();
+}
-void Locker::_do_null_snapflush(CInode *head_in, client_t client)
+void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
{
dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
- compact_map<snapid_t, set<client_t> >::iterator p = head_in->client_need_snapflush.begin();
- while (p != head_in->client_need_snapflush.end()) {
+ for (auto p = head_in->client_need_snapflush.begin();
+ p != head_in->client_need_snapflush.end() && p->first < last; ) {
snapid_t snapid = p->first;
- set<client_t>& clients = p->second;
+ auto &clients = p->second;
++p; // be careful, q loop below depends on this
if (clients.count(client)) {
dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
- CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
- if (!sin) {
- // hrm, look forward until we find the inode.
- // (we can only look it up by the last snapid it is valid for)
- dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
- for (compact_map<snapid_t, set<client_t> >::iterator q = p; // p is already at next entry
- q != head_in->client_need_snapflush.end();
- ++q) {
- dout(10) << " trying snapid " << q->first << dendl;
- sin = mdcache->get_inode(head_in->ino(), q->first);
- if (sin) {
- assert(sin->first <= snapid);
- break;
- }
- dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
- }
- if (!sin && head_in->is_multiversion())
- sin = head_in;
- assert(sin);
- }
- _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
+ CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
+ ceph_assert(sin);
+ ceph_assert(sin->first <= snapid);
+ _do_snap_update(sin, snapid, 0, sin->first - 1, client, MClientCaps::ref(), MClientCaps::ref());
head_in->remove_need_snapflush(sin, snapid, client);
}
}
return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
}
-/*
- * This function DOES put the passed message before returning
- */
-void Locker::handle_client_caps(MClientCaps *m)
+void Locker::handle_client_caps(const MClientCaps::const_ref &m)
{
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
client_t client = m->get_source().num();
-
snapid_t follows = m->get_snap_follows();
+ auto op = m->get_op();
+ auto dirty = m->get_dirty();
dout(7) << "handle_client_caps "
- << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
<< " on " << m->get_ino()
<< " tid " << m->get_client_tid() << " follows " << follows
- << " op " << ceph_cap_op_name(m->get_op()) << dendl;
+ << " op " << ceph_cap_op_name(op)
+ << " flags 0x" << std::hex << m->flags << std::dec << dendl;
+ Session *session = mds->get_session(m);
if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
if (!session) {
dout(5) << " no session, dropping " << *m << dendl;
- m->put();
return;
}
if (session->is_closed() ||
session->is_closing() ||
session->is_killing()) {
dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
- m->put();
return;
}
- if (mds->is_reconnect() &&
- m->get_dirty() && m->get_client_tid() > 0 &&
+ if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
+ dirty && m->get_client_tid() > 0 &&
!session->have_completed_flush(m->get_client_tid())) {
- mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
+ mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
+ op == CEPH_CAP_OP_FLUSHSNAP);
}
mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
return;
session->have_completed_flush(m->get_client_tid())) {
dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
<< " for client." << client << dendl;
- MClientCaps *ack;
- if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
- ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
- m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+ MClientCaps::ref ack;
+ if (op == CEPH_CAP_OP_FLUSHSNAP) {
+ ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
} else {
- ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
- m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
- mds->get_osd_epoch_barrier());
+ ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
}
ack->set_snap_follows(follows);
ack->set_client_tid(m->get_client_tid());
mds->send_message_client_counted(ack, m->get_connection());
- if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
- m->put();
+ if (op == CEPH_CAP_OP_FLUSHSNAP) {
return;
} else {
// fall-thru because the message may release some caps
- m->clear_dirty();
- m->set_op(CEPH_CAP_OP_UPDATE);
+ dirty = false;
+ op = CEPH_CAP_OP_UPDATE;
}
}
mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
if (session->get_num_trim_flushes_warnings() > 0 &&
- session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
+ session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
session->reset_num_trim_flushes_warnings();
} else {
if (session->get_num_completed_flushes() >=
- (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
+ (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
session->inc_num_trim_flushes_warnings();
stringstream ss;
ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
return;
}
- dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
- m->put();
+
+ /*
+ * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
+ * Sequence of events that cause this are:
+ * - client sends caps message to mds.a
+ * - mds finishes subtree migration, send cap export to client
+ * - mds trim its cache
+ * - mds receives cap messages from client
+ */
+ dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
return;
}
mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
}
- CInode *in = head_in;
- if (follows > 0) {
- in = mdcache->pick_inode_snap(head_in, follows);
- if (in != head_in)
- dout(10) << " head inode " << *head_in << dendl;
- }
- dout(10) << " cap inode " << *in << dendl;
+ dout(10) << " head inode " << *head_in << dendl;
Capability *cap = 0;
- cap = in->get_client_cap(client);
- if (!cap && in != head_in)
- cap = head_in->get_client_cap(client);
+ cap = head_in->get_client_cap(client);
if (!cap) {
- dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
- m->put();
+ dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
return;
}
- assert(cap);
+ ceph_assert(cap);
// freezing|frozen?
- if (should_defer_client_cap_frozen(in)) {
- dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
- in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
+ if (should_defer_client_cap_frozen(head_in)) {
+ dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
+ head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
return;
}
if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
<< ", dropping" << dendl;
- m->put();
return;
}
- int op = m->get_op();
+ bool need_unpin = false;
// flushsnap?
if (op == CEPH_CAP_OP_FLUSHSNAP) {
- if (!in->is_auth()) {
- dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
+ if (!head_in->is_auth()) {
+ dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
goto out;
}
- SnapRealm *realm = in->find_snaprealm();
+ SnapRealm *realm = head_in->find_snaprealm();
snapid_t snap = realm->get_snap_following(follows);
dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
+ auto p = head_in->client_need_snapflush.begin();
+ if (p != head_in->client_need_snapflush.end() && p->first < snap) {
+ head_in->auth_pin(this); // prevent subtree frozen
+ need_unpin = true;
+ _do_null_snapflush(head_in, client, snap);
+ }
+
+ CInode *in = head_in;
+ if (snap != CEPH_NOSNAP) {
+ in = mdcache->pick_inode_snap(head_in, snap - 1);
+ if (in != head_in)
+ dout(10) << " snapped inode " << *in << dendl;
+ }
+
// we can prepare the ack now, since this FLUSHEDSNAP is independent of any
// other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
// case we get a dup response, so whatever.)
- MClientCaps *ack = 0;
- if (m->get_dirty()) {
- ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+ MClientCaps::ref ack;
+ if (dirty) {
+ ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
ack->set_snap_follows(follows);
ack->set_client_tid(m->get_client_tid());
ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
if (in == head_in)
cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
- _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
+ _do_snap_update(in, snap, dirty, follows, client, m, ack);
if (in != head_in)
head_in->remove_need_snapflush(in, snap, client);
-
} else {
dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
if (ack)
if (cap->get_cap_id() != m->get_cap_id()) {
dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
} else {
- // intermediate snap inodes
- while (in != head_in) {
- assert(in->last != CEPH_NOSNAP);
- if (in->is_auth() && m->get_dirty()) {
- dout(10) << " updating intermediate snapped inode " << *in << dendl;
- _do_cap_update(in, NULL, m->get_dirty(), follows, m);
+ CInode *in = head_in;
+ if (follows > 0) {
+ in = mdcache->pick_inode_snap(head_in, follows);
+ // intermediate snap inodes
+ while (in != head_in) {
+ ceph_assert(in->last != CEPH_NOSNAP);
+ if (in->is_auth() && dirty) {
+ dout(10) << " updating intermediate snapped inode " << *in << dendl;
+ _do_cap_update(in, NULL, dirty, follows, m, MClientCaps::ref());
+ }
+ in = mdcache->pick_inode_snap(head_in, in->last);
}
- in = mdcache->pick_inode_snap(head_in, in->last);
}
// head inode, and cap
- MClientCaps *ack = 0;
+ MClientCaps::ref ack;
int caps = m->get_caps();
if (caps & ~cap->issued()) {
cap->confirm_receipt(m->get_seq(), caps);
dout(10) << " follows " << follows
<< " retains " << ccap_string(m->get_caps())
- << " dirty " << ccap_string(m->get_dirty())
+ << " dirty " << ccap_string(dirty)
<< " on " << *in << dendl;
// released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
// update/release).
if (!head_in->client_need_snapflush.empty()) {
- if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
+ if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
+ !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
+ head_in->auth_pin(this); // prevent subtree frozen
+ need_unpin = true;
_do_null_snapflush(head_in, client);
} else {
dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
}
}
-
- if (m->get_dirty() && in->is_auth()) {
- dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
+ if (cap->need_snapflush() && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))
+ cap->clear_needsnapflush();
+
+ if (dirty && in->is_auth()) {
+ dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
<< " seq " << m->get_seq() << " on " << *in << dendl;
- ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
- m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
+ ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
+ m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
ack->set_client_tid(m->get_client_tid());
ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
}
// filter wanted based on what we could ever give out (given auth/replica status)
- bool need_flush = m->flags & CLIENT_CAPS_SYNC;
- int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
+ bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
+ int new_wanted = m->get_wanted();
if (new_wanted != cap->wanted()) {
- if (!need_flush && (new_wanted & ~cap->pending())) {
+ if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
// exapnding caps. make sure we aren't waiting for a log flush
need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
}
adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
}
-
+
if (in->is_auth() &&
- _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
+ _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush)) {
// updated
eval(in, CEPH_CAP_LOCKS);
if (cap->get_last_seq() == 0 &&
(cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
- cap->issue_norevoke(cap->issued());
share_inode_max_size(in, cap);
}
}
}
out:
- m->put();
+ if (need_unpin)
+ head_in->auth_unpin(this);
}
};
void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
- const string &dname)
+ std::string_view dname)
{
inodeno_t ino = (uint64_t)item.ino;
uint64_t cap_id = item.cap_id;
if (dn) {
ClientLease *l = dn->get_client_lease(client);
if (l) {
- dout(10) << "process_cap_release removing lease on " << *dn << dendl;
+ dout(10) << __func__ << " removing lease on " << *dn << dendl;
dn->remove_client_lease(l, this);
} else {
- dout(7) << "process_cap_release client." << client
+ dout(7) << __func__ << " client." << client
<< " doesn't have lease on " << *dn << dendl;
}
} else {
- dout(7) << "process_cap_release client." << client << " released lease on dn "
+ dout(7) << __func__ << " client." << client << " released lease on dn "
<< dir->dirfrag() << "/" << dname << " which dne" << dendl;
}
}
if (!cap)
return;
- dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
+ dout(10) << __func__ << " client." << client << " " << ccap_string(caps) << " on " << *in
<< (mdr ? "" : " (DEFERRED, no mdr)")
<< dendl;
void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
{
Capability *cap = in->get_client_cap(client);
- if (!cap || cap->get_last_sent() != seq)
+ if (!cap || cap->get_last_seq() != seq)
return;
if (in->is_frozen()) {
dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
/**
* m and ack might be NULL, so don't dereference them unless dirty != 0
*/
-void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
+void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const MClientCaps::const_ref &m, const MClientCaps::ref &ack)
{
dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
<< " follows " << follows << " snap " << snap
// normal metadata updates that we can apply to the head as well.
// update xattrs?
- bool xattrs = false;
- map<string,bufferptr> *px = 0;
- if ((dirty & CEPH_CAP_XATTR_EXCL) &&
- m->xattrbl.length() &&
- m->head.xattr_version > in->get_projected_inode()->xattr_version)
- xattrs = true;
-
- old_inode_t *oi = 0;
+ CInode::mempool_xattr_map *px = nullptr;
+ bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
+ m->xattrbl.length() &&
+ m->head.xattr_version > in->get_projected_inode()->xattr_version;
+
+ CInode::mempool_old_inode *oi = 0;
if (in->is_multiversion()) {
oi = in->pick_old_inode(snap);
}
- inode_t *pi;
+ CInode::mempool_inode *i;
if (oi) {
dout(10) << " writing into old inode" << dendl;
- pi = in->project_inode();
- pi->version = in->pre_dirty();
+ auto &pi = in->project_inode();
+ pi.inode.version = in->pre_dirty();
if (snap > oi->first)
in->split_old_inode(snap);
- pi = &oi->inode;
+ i = &oi->inode;
if (xattrs)
px = &oi->xattrs;
} else {
+ auto &pi = in->project_inode(xattrs);
+ pi.inode.version = in->pre_dirty();
+ i = &pi.inode;
if (xattrs)
- px = new map<string,bufferptr>;
- pi = in->project_inode(px);
- pi->version = in->pre_dirty();
+ px = pi.xattrs.get();
}
- _update_cap_fields(in, dirty, m, pi);
+ _update_cap_fields(in, dirty, m, i);
// xattr
- if (px) {
- dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
+ if (xattrs) {
+ dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
<< " len " << m->xattrbl.length() << dendl;
- pi->xattr_version = m->head.xattr_version;
- bufferlist::iterator p = m->xattrbl.begin();
- ::decode(*px, p);
+ i->xattr_version = m->head.xattr_version;
+ auto p = m->xattrbl.cbegin();
+ decode(*px, p);
}
- if (pi->client_ranges.count(client)) {
- if (in->last == snap) {
- dout(10) << " removing client_range entirely" << dendl;
- pi->client_ranges.erase(client);
- } else {
- dout(10) << " client_range now follows " << snap << dendl;
- pi->client_ranges[client].follows = snap;
+ {
+ auto it = i->client_ranges.find(client);
+ if (it != i->client_ranges.end()) {
+ if (in->last == snap) {
+ dout(10) << " removing client_range entirely" << dendl;
+ i->client_ranges.erase(it);
+ } else {
+ dout(10) << " client_range now follows " << snap << dendl;
+ it->second.follows = snap;
+ }
}
}
le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
ack->get_oldest_flush_tid());
- mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
- client, ack));
+ mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
+ ack, client));
}
-void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
+void Locker::_update_cap_fields(CInode *in, int dirty, const MClientCaps::const_ref &m, CInode::mempool_inode *pi)
{
if (dirty == 0)
return;
/* m must be valid if there are dirty caps */
- assert(m);
+ ceph_assert(m);
uint64_t features = m->get_connection()->get_features();
if (m->get_ctime() > pi->ctime) {
dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
<< " for " << *in << dendl;
pi->ctime = m->get_ctime();
+ if (m->get_ctime() > pi->rstat.rctime)
+ pi->rstat.rctime = m->get_ctime();
}
if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
dout(7) << " mtime " << pi->mtime << " -> " << mtime
<< " for " << *in << dendl;
pi->mtime = mtime;
+ if (mtime > pi->rstat.rctime)
+ pi->rstat.rctime = mtime;
}
if (in->inode.is_file() && // ONLY if regular file
size > pi->size) {
*/
bool Locker::_do_cap_update(CInode *in, Capability *cap,
int dirty, snapid_t follows,
- MClientCaps *m, MClientCaps *ack,
+ const MClientCaps::const_ref &m, const MClientCaps::ref &ack,
bool *need_flush)
{
dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
<< " issued " << ccap_string(cap ? cap->issued() : 0)
<< " wanted " << ccap_string(cap ? cap->wanted() : 0)
<< " on " << *in << dendl;
- assert(in->is_auth());
+ ceph_assert(in->is_auth());
client_t client = m->get_source().num();
- inode_t *latest = in->get_projected_inode();
+ CInode::mempool_inode *latest = in->get_projected_inode();
// increase or zero max_size?
uint64_t size = m->get_size();
bool need_issue = false;
if (cap)
cap->inc_suppress();
- if (in->mds_caps_wanted.empty() &&
+ if (in->get_mds_caps_wanted().empty() &&
(in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
if (in->filelock.get_state() != LOCK_EXCL)
file_excl(&in->filelock, &need_issue);
if (m->flockbl.length()) {
int32_t num_locks;
- bufferlist::iterator bli = m->flockbl.begin();
- ::decode(num_locks, bli);
+ auto bli = m->flockbl.cbegin();
+ decode(num_locks, bli);
for ( int i=0; i < num_locks; ++i) {
ceph_filelock decoded_lock;
- ::decode(decoded_lock, bli);
+ decode(decoded_lock, bli);
in->get_fcntl_lock_state()->held_locks.
insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
}
- ::decode(num_locks, bli);
+ decode(num_locks, bli);
for ( int i=0; i < num_locks; ++i) {
ceph_filelock decoded_lock;
- ::decode(decoded_lock, bli);
+ decode(decoded_lock, bli);
in->get_flock_lock_state()->held_locks.
insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
if (!dirty && !change_max)
return false;
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+ Session *session = mds->get_session(m);
if (session->check_access(in, MAY_WRITE,
m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
- session->put();
dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
return false;
}
- session->put();
// do the update.
EUpdate *le = new EUpdate(mds->mdlog, "cap update");
mds->mdlog->start_entry(le);
- // xattrs update?
- map<string,bufferptr> *px = 0;
- if ((dirty & CEPH_CAP_XATTR_EXCL) &&
- m->xattrbl.length() &&
- m->head.xattr_version > in->get_projected_inode()->xattr_version)
- px = new map<string,bufferptr>;
+ bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
+ m->xattrbl.length() &&
+ m->head.xattr_version > in->get_projected_inode()->xattr_version;
- inode_t *pi = in->project_inode(px);
- pi->version = in->pre_dirty();
+ auto &pi = in->project_inode(xattr);
+ pi.inode.version = in->pre_dirty();
MutationRef mut(new MutationImpl());
mut->ls = mds->mdlog->get_current_segment();
- _update_cap_fields(in, dirty, m, pi);
+ _update_cap_fields(in, dirty, m, &pi.inode);
if (change_max) {
dout(7) << " max_size " << old_max << " -> " << new_max
<< " for " << *in << dendl;
if (new_max) {
- pi->client_ranges[client].range.first = 0;
- pi->client_ranges[client].range.last = new_max;
- pi->client_ranges[client].follows = in->first - 1;
- } else
- pi->client_ranges.erase(client);
+ auto &cr = pi.inode.client_ranges[client];
+ cr.range.first = 0;
+ cr.range.last = new_max;
+ cr.follows = in->first - 1;
+ if (cap)
+ cap->mark_clientwriteable();
+ } else {
+ pi.inode.client_ranges.erase(client);
+ if (cap)
+ cap->clear_clientwriteable();
+ }
}
if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
if (dirty & CEPH_CAP_AUTH_EXCL)
wrlock_force(&in->authlock, mut);
- // xattr
- if (px) {
- dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
- pi->xattr_version = m->head.xattr_version;
- bufferlist::iterator p = m->xattrbl.begin();
- ::decode(*px, p);
-
+ // xattrs update?
+ if (xattr) {
+ dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
+ pi.inode.xattr_version = m->head.xattr_version;
+ auto p = m->xattrbl.cbegin();
+ decode(*pi.xattrs, p);
wrlock_force(&in->xattrlock, mut);
}
le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
ack->get_oldest_flush_tid());
- mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
- change_max, !!cap,
- client, ack));
+ unsigned update_flags = 0;
+ if (change_max)
+ update_flags |= UPDATE_SHAREMAX;
+ if (cap)
+ update_flags |= UPDATE_NEEDSISSUE;
+ mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
+ ack, client));
if (need_flush && !*need_flush &&
((change_max && new_max) || // max INCREASE
_need_flush_mdlog(in, dirty)))
return true;
}
-/* This function DOES put the passed message before returning */
-void Locker::handle_client_cap_release(MClientCapRelease *m)
+void Locker::handle_client_cap_release(const MClientCapRelease::const_ref &m)
{
client_t client = m->get_source().num();
dout(10) << "handle_client_cap_release " << *m << dendl;
mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
}
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+ Session *session = mds->get_session(m);
- for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
- _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
+ for (const auto &cap : m->caps) {
+ _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
}
if (session) {
session->notify_cap_release(m->caps.size());
}
-
- m->put();
}
class C_Locker_RetryCapRelease : public LockerContext {
eval_cap_gather(in);
return;
}
- remove_client_cap(in, client);
+ remove_client_cap(in, cap);
}
-/* This function DOES put the passed message before returning */
-
-void Locker::remove_client_cap(CInode *in, client_t client)
+void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
{
+ client_t client = cap->get_client();
// clean out any pending snapflush state
if (!in->client_need_snapflush.empty())
_do_null_snapflush(in, client);
+ bool notable = cap->is_notable();
in->remove_client_cap(client);
+ if (!notable)
+ return;
if (in->is_auth()) {
// make sure we clear out the client byte range
if (in->get_projected_inode()->client_ranges.count(client) &&
- !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
- check_inode_max_size(in);
+ !(in->inode.nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray
+ if (kill)
+ in->state_set(CInode::STATE_NEEDSRECOVER);
+ else
+ check_inode_max_size(in);
+ }
} else {
request_inode_file_caps(in);
}
/**
* Return true if any currently revoking caps exceed the
- * mds_revoke_cap_timeout threshold.
+ * session_timeout threshold.
*/
-bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
+bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
+ double timeout) const
{
xlist<Capability*>::const_iterator p = revoking.begin();
if (p.end()) {
} else {
utime_t now = ceph_clock_now();
utime_t age = now - (*p)->get_last_revoke_stamp();
- if (age <= g_conf->mds_revoke_cap_timeout) {
+ if (age <= timeout) {
return false;
} else {
return true;
}
}
-
-void Locker::get_late_revoking_clients(std::list<client_t> *result) const
+void Locker::get_late_revoking_clients(std::list<client_t> *result,
+ double timeout) const
{
- if (!any_late_revoking_caps(revoking_caps)) {
+ if (!any_late_revoking_caps(revoking_caps, timeout)) {
// Fast path: no misbehaving clients, execute in O(1)
return;
}
// Slow path: execute in O(N_clients)
- std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
- for (client_rc_iter = revoking_caps_by_client.begin();
- client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
- xlist<Capability*> const &client_rc = client_rc_iter->second;
- bool any_late = any_late_revoking_caps(client_rc);
- if (any_late) {
- result->push_back(client_rc_iter->first);
+ for (auto &p : revoking_caps_by_client) {
+ if (any_late_revoking_caps(p.second, timeout)) {
+ // Search the list for duplicate and only insert if unique
+ std::list<client_t>::const_iterator it = std::find(result->begin(), result->end(), p.first);
+ if (it == result->end())
+ result->push_back(p.first);
}
}
}
{
utime_t now = ceph_clock_now();
+ if (!need_snapflush_inodes.empty()) {
+ // snap inodes that needs flush are auth pinned, they affect
+ // subtree/difrarg freeze.
+ utime_t cutoff = now;
+ cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
+
+ CInode *last = need_snapflush_inodes.back();
+ while (!need_snapflush_inodes.empty()) {
+ CInode *in = need_snapflush_inodes.front();
+ if (in->last_dirstat_prop >= cutoff)
+ break;
+ in->item_caps.remove_myself();
+ snapflush_nudge(in);
+ if (in == last)
+ break;
+ }
+ }
+
dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
- int i = 0;
+ now = ceph_clock_now();
+ int n = 0;
for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
Capability *cap = *p;
utime_t age = now - cap->get_last_revoke_stamp();
- dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
- if (age <= g_conf->mds_revoke_cap_timeout) {
- dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
+ dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+ if (age <= mds->mdsmap->get_session_timeout()) {
+ dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
break;
} else {
- ++i;
- if (i > MAX_WARN_CAPS) {
+ ++n;
+ if (n > MAX_WARN_CAPS) {
dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
<< "revoking, ignoring subsequent caps" << dendl;
break;
}
}
// exponential backoff of warning intervals
- if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
+ if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
cap->inc_num_revoke_warnings();
stringstream ss;
ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
mds->clog->warn() << ss.str();
dout(20) << __func__ << " " << ss.str() << dendl;
} else {
- dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
+ dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
}
}
}
-void Locker::handle_client_lease(MClientLease *m)
+void Locker::handle_client_lease(const MClientLease::const_ref &m)
{
dout(10) << "handle_client_lease " << *m << dendl;
- assert(m->get_source().is_client());
+ ceph_assert(m->get_source().is_client());
client_t client = m->get_source().num();
CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
if (!in) {
dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
- m->put();
return;
}
CDentry *dn = 0;
dn = dir->lookup(m->dname);
if (!dn) {
dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
- m->put();
return;
}
dout(10) << " on " << *dn << dendl;
ClientLease *l = dn->get_client_lease(client);
if (!l) {
dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
- m->put();
return;
}
<< " on " << *dn << dendl;
dn->remove_client_lease(l, this);
}
- m->put();
break;
case CEPH_MDS_LEASE_RENEW:
dout(7) << "handle_client_lease client." << client << " renew on " << *dn
<< (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
if (dn->lock.can_lease(client)) {
+ auto reply = MClientLease::create(*m);
int pool = 1; // fixme.. do something smart!
- m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
- m->h.seq = ++l->seq;
- m->clear_payload();
+ reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
+ reply->h.seq = ++l->seq;
+ reply->clear_payload();
utime_t now = ceph_clock_now();
now += mdcache->client_lease_durations[pool];
mdcache->touch_client_lease(l, pool, now);
- mds->send_message_client_counted(m, m->get_connection());
+ mds->send_message_client_counted(reply, m->get_connection());
}
}
break;
now += mdcache->client_lease_durations[pool];
mdcache->touch_client_lease(l, pool, now);
- LeaseStat e;
- e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
- e.seq = ++l->seq;
- e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
- ::encode(e, bl);
- dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
+ LeaseStat lstat;
+ lstat.mask = 1 | CEPH_LOCK_DN; // old and new bit values
+ lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
+ lstat.seq = ++l->seq;
+ encode_lease(bl, session->info, lstat);
+ dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
<< " on " << *dn << dendl;
} else {
// null lease
- LeaseStat e;
- e.mask = 0;
- e.seq = 0;
- e.duration_ms = 0;
- ::encode(e, bl);
+ LeaseStat lstat;
+ encode_lease(bl, session->info, lstat);
dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
}
}
ClientLease *l = p->second;
n++;
- assert(lock->get_type() == CEPH_LOCK_DN);
+ ceph_assert(lock->get_type() == CEPH_LOCK_DN);
CDentry *dn = static_cast<CDentry*>(lock->get_parent());
int mask = 1 | CEPH_LOCK_DN; // old and new bits
// i should also revoke the dir ICONTENT lease, if they have it!
CInode *diri = dn->get_dir()->get_inode();
- mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
- mask,
- diri->ino(),
- diri->first, CEPH_NOSNAP,
- dn->get_name()),
- l->client);
- }
- assert(n == lock->get_num_client_lease());
+ auto lease = MClientLease::create(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
+ mds->send_message_client_counted(lease, l->client);
+ }
}
-
+void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
+ const LeaseStat& ls)
+{
+ if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
+ ENCODE_START(1, 1, bl);
+ encode(ls.mask, bl);
+ encode(ls.duration_ms, bl);
+ encode(ls.seq, bl);
+ ENCODE_FINISH(bl);
+ }
+ else {
+ encode(ls.mask, bl);
+ encode(ls.duration_ms, bl);
+ encode(ls.seq, bl);
+ }
+}
// locks ----------------------------------------------------------------
-SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
+SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info)
{
switch (lock_type) {
case CEPH_LOCK_DN:
return 0;
}
-/* This function DOES put the passed message before returning */
-void Locker::handle_lock(MLock *m)
+void Locker::handle_lock(const MLock::const_ref &m)
{
// nobody should be talking to us during recovery.
- assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+ ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
if (!lock) {
dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
- m->put();
return;
}
/** This function may take a reference to m if it needs one, but does
* not put references. */
-void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
+void Locker::handle_reqrdlock(SimpleLock *lock, const MLock::const_ref &m)
{
MDSCacheObject *parent = lock->get_parent();
if (parent->is_auth() &&
!parent->is_frozen()) {
dout(7) << "handle_reqrdlock got rdlock request on " << *lock
<< " on " << *parent << dendl;
- assert(parent->is_auth()); // replica auth pinned if they're doing this!
+ ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
if (lock->is_stable()) {
simple_sync(lock);
} else {
dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
- new C_MDS_RetryMessage(mds, m->get()));
+ new C_MDS_RetryMessage(mds, m));
}
} else {
dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
}
}
-/* This function DOES put the passed message before returning */
-void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
+void Locker::handle_simple_lock(SimpleLock *lock, const MLock::const_ref &m)
{
int from = m->get_asker();
if (lock->get_parent()->is_rejoining()) {
dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
<< ", dropping " << *m << dendl;
- m->put();
return;
}
}
switch (m->get_action()) {
// -- replica --
case LOCK_AC_SYNC:
- assert(lock->get_state() == LOCK_LOCK);
+ ceph_assert(lock->get_state() == LOCK_LOCK);
lock->decode_locked_state(m->get_data());
lock->set_state(LOCK_SYNC);
lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
break;
case LOCK_AC_LOCK:
- assert(lock->get_state() == LOCK_SYNC);
+ ceph_assert(lock->get_state() == LOCK_SYNC);
lock->set_state(LOCK_SYNC_LOCK);
if (lock->is_leased())
revoke_client_leases(lock);
// -- auth --
case LOCK_AC_LOCKACK:
- assert(lock->get_state() == LOCK_SYNC_LOCK ||
+ ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
lock->get_state() == LOCK_SYNC_EXCL);
- assert(lock->is_gathering(from));
+ ceph_assert(lock->is_gathering(from));
lock->remove_gather(from);
if (lock->is_gathering()) {
break;
}
-
- m->put();
}
/* unused, currently.
{
dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->is_stable());
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->is_stable());
if (lock->get_parent()->is_freezing_or_frozen()) {
- // dentry lock in unreadable state can block path traverse
- if ((lock->get_type() != CEPH_LOCK_DN ||
+ // dentry/snap lock in unreadable state can block path traverse
+ if ((lock->get_type() != CEPH_LOCK_DN &&
+ lock->get_type() != CEPH_LOCK_ISNAP) ||
lock->get_state() == LOCK_SYNC ||
- lock->get_parent()->is_frozen()))
+ lock->get_parent()->is_frozen())
return;
}
CInode *in = 0;
int wanted = 0;
- if (lock->get_type() != CEPH_LOCK_DN) {
+ if (lock->get_cap_shift()) {
in = static_cast<CInode*>(lock->get_parent());
in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
}
bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
{
dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->is_stable());
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->is_stable());
CInode *in = 0;
if (lock->get_cap_shift())
bool need_recover = false;
if (lock->get_type() == CEPH_LOCK_IFILE) {
- assert(in);
+ ceph_assert(in);
if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
mds->mdcache->queue_file_recover(in);
need_recover = true;
void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
{
dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->is_stable());
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->is_stable());
CInode *in = 0;
if (lock->get_cap_shift())
void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
{
dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->is_stable());
- assert(lock->get_state() != LOCK_LOCK);
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->is_stable());
+ ceph_assert(lock->get_state() != LOCK_LOCK);
CInode *in = 0;
if (lock->get_cap_shift())
switch (lock->get_state()) {
case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
- case LOCK_XSYN:
- file_excl(static_cast<ScatterLock*>(lock), need_issue);
- if (lock->get_state() != LOCK_EXCL)
- return;
- // fall-thru
+ case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
(static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
bool need_recover = false;
if (lock->get_type() == CEPH_LOCK_IFILE) {
- assert(in);
+ ceph_assert(in);
if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
mds->mdcache->queue_file_recover(in);
need_recover = true;
void Locker::simple_xlock(SimpleLock *lock)
{
dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->get_parent()->is_auth());
//assert(lock->is_stable());
- assert(lock->get_state() != LOCK_XLOCK);
+ ceph_assert(lock->get_state() != LOCK_XLOCK);
CInode *in = 0;
if (lock->get_cap_shift())
// forcefully take a wrlock
lock->get_wrlock(true);
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
in->pre_cow_old_inode(); // avoid cow mayhem
- inode_t *pi = in->project_inode();
- pi->version = in->pre_dirty();
+ auto &pi = in->project_inode();
+ pi.inode.version = in->pre_dirty();
in->finish_scatter_gather_update(lock->get_type());
lock->start_flush();
{
dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->is_stable());
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->is_stable());
if (lock->get_parent()->is_freezing_or_frozen()) {
dout(20) << " freezing|frozen" << dendl;
* we need to lock|scatter in order to push fnode changes into the
* inode.dirstat.
*/
-void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
+void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
{
CInode *p = static_cast<CInode *>(lock->get_parent());
dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
if (c)
p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
- else
+ else if (lock->is_dirty())
// just requeue. not ideal.. starvation prone..
updated_scatterlocks.push_back(lock->get_updated_item());
return;
dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
if (c)
p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
- else
+ else if (lock->is_dirty())
// just requeue. not ideal.. starvation prone..
updated_scatterlocks.push_back(lock->get_updated_item());
return;
// handle_file_lock due to AC_NUDGE, because the rest of the
// time we are replicated or have dirty data and won't get
// called. bailing here avoids an infinite loop.
- assert(!c);
+ ceph_assert(!c);
break;
}
} else {
<< *lock << " on " << *p << dendl;
// request unscatter?
mds_rank_t auth = lock->get_parent()->authority().first;
- if (!mds->is_cluster_degraded() ||
- mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
- mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
+ if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
+ mds->send_message_mds(MLock::create(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
+ }
// wait...
if (c)
lock->add_waiter(SimpleLock::WAIT_STABLE, c);
// also, requeue, in case we had wrong auth or something
- updated_scatterlocks.push_back(lock->get_updated_item());
+ if (lock->is_dirty())
+ updated_scatterlocks.push_back(lock->get_updated_item());
}
}
<< *lock << " " << *lock->get_parent() << dendl;
continue;
}
- if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
+ if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
break;
updated_scatterlocks.pop_front();
scatter_nudge(lock, 0);
{
dout(10) << "scatter_tempsync " << *lock
<< " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->is_stable());
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->is_stable());
- assert(0 == "not fully implemented, at least not for filelock");
+ ceph_abort_msg("not fully implemented, at least not for filelock");
CInode *in = static_cast<CInode *>(lock->get_parent());
dout(7) << "local_wrlock_grab on " << *lock
<< " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->can_wrlock());
- assert(!mut->wrlocks.count(lock));
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->can_wrlock());
lock->get_wrlock(mut->get_client());
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+
+ auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+ ceph_assert(ret.second);
}
bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
dout(7) << "local_wrlock_start on " << *lock
<< " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->get_parent()->is_auth());
if (lock->can_wrlock()) {
- assert(!mut->wrlocks.count(lock));
lock->get_wrlock(mut->get_client());
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+ ceph_assert(it->is_wrlock());
return true;
} else {
lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
}
}
-void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
+ ceph_assert(it->is_wrlock());
+ LocalLock *lock = static_cast<LocalLock*>(it->lock);
dout(7) << "local_wrlock_finish on " << *lock
<< " on " << *lock->get_parent() << dendl;
lock->put_wrlock();
- mut->wrlocks.erase(lock);
- mut->locks.erase(lock);
+ mut->locks.erase(it);
if (lock->get_num_wrlocks() == 0) {
lock->finish_waiters(SimpleLock::WAIT_STABLE |
SimpleLock::WAIT_WR |
dout(7) << "local_xlock_start on " << *lock
<< " on " << *lock->get_parent() << dendl;
- assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->get_parent()->is_auth());
if (!lock->can_xlock_local()) {
lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
return false;
}
lock->get_xlock(mut, mut->get_client());
- mut->xlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
return true;
}
-void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
+ ceph_assert(it->is_xlock());
+ LocalLock *lock = static_cast<LocalLock*>(it->lock);
dout(7) << "local_xlock_finish on " << *lock
<< " on " << *lock->get_parent() << dendl;
lock->put_xlock();
- mut->xlocks.erase(lock);
- mut->locks.erase(lock);
+ mut->locks.erase(it);
lock->finish_waiters(SimpleLock::WAIT_STABLE |
SimpleLock::WAIT_WR |
<< " filelock=" << *lock << " on " << *lock->get_parent()
<< dendl;
- assert(lock->get_parent()->is_auth());
- assert(lock->is_stable());
+ ceph_assert(lock->get_parent()->is_auth());
+ ceph_assert(lock->is_stable());
if (lock->get_parent()->is_freezing_or_frozen())
return;
!lock->is_rdlocked() &&
//!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
(lock->get_scatter_wanted() ||
- (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
+ (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
dout(7) << "file_eval stable, bump to mixed " << *lock
<< " on " << *lock->get_parent() << dendl;
scatter_mix(lock, need_issue);
else if (lock->get_state() != LOCK_SYNC &&
!lock->is_wrlocked() && // drain wrlocks first!
!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
- !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
+ !(wanted & CEPH_CAP_GWR) &&
!((lock->get_state() == LOCK_MIX) &&
in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
//((wanted & CEPH_CAP_RD) ||
//in->is_replicated() ||
- //lock->get_num_client_lease() ||
+ //lock->is_leased() ||
//(!loner && lock->get_state() == LOCK_EXCL)) &&
) {
dout(7) << "file_eval stable, bump to sync " << *lock
dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
CInode *in = static_cast<CInode*>(lock->get_parent());
- assert(in->is_auth());
- assert(lock->is_stable());
+ ceph_assert(in->is_auth());
+ ceph_assert(lock->is_stable());
if (lock->get_state() == LOCK_LOCK) {
in->start_scatter(lock);
// gather?
switch (lock->get_state()) {
case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
- case LOCK_XSYN:
- file_excl(lock, need_issue);
- if (lock->get_state() != LOCK_EXCL)
- return;
- // fall-thru
case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
+ case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
default: ceph_abort();
}
if (lock->is_rdlocked())
gather++;
if (in->is_replicated()) {
- if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK
- lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too!
+ if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
send_lock_message(lock, LOCK_AC_MIX);
lock->init_gather();
gather++;
CInode *in = static_cast<CInode*>(lock->get_parent());
dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
- assert(in->is_auth());
- assert(lock->is_stable());
+ ceph_assert(in->is_auth());
+ ceph_assert(lock->is_stable());
- assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
+ ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
(lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
switch (lock->get_state()) {
{
dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
CInode *in = static_cast<CInode *>(lock->get_parent());
- assert(in->is_auth());
- assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
+ ceph_assert(in->is_auth());
+ ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
switch (lock->get_state()) {
case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
CInode *in = static_cast<CInode *>(lock->get_parent());
dout(7) << "file_recover " << *lock << " on " << *in << dendl;
- assert(in->is_auth());
+ ceph_assert(in->is_auth());
//assert(lock->is_stable());
- assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
+ ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
int gather = 0;
// messenger
-/* This function DOES put the passed message before returning */
-void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
+void Locker::handle_file_lock(ScatterLock *lock, const MLock::const_ref &m)
{
CInode *in = static_cast<CInode*>(lock->get_parent());
int from = m->get_asker();
if (in->is_rejoining()) {
dout(7) << "handle_file_lock still rejoining " << *in
<< ", dropping " << *m << dendl;
- m->put();
return;
}
}
- dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
+ dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
<< " on " << *lock
<< " from mds." << from << " "
<< *in << dendl;
switch (m->get_action()) {
// -- replica --
case LOCK_AC_SYNC:
- assert(lock->get_state() == LOCK_LOCK ||
+ ceph_assert(lock->get_state() == LOCK_LOCK ||
lock->get_state() == LOCK_MIX ||
lock->get_state() == LOCK_MIX_SYNC2);
break;
case LOCK_AC_MIX:
- assert(lock->get_state() == LOCK_SYNC ||
+ ceph_assert(lock->get_state() == LOCK_SYNC ||
lock->get_state() == LOCK_LOCK ||
lock->get_state() == LOCK_SYNC_MIX2);
mds->mdlog->flush();
break;
}
-
+
// ok
- lock->decode_locked_state(m->get_data());
lock->set_state(LOCK_MIX);
+ lock->decode_locked_state(m->get_data());
if (caps)
issue_caps(in);
// -- auth --
case LOCK_AC_LOCKACK:
- assert(lock->get_state() == LOCK_SYNC_LOCK ||
+ ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
lock->get_state() == LOCK_MIX_LOCK ||
lock->get_state() == LOCK_MIX_LOCK2 ||
lock->get_state() == LOCK_MIX_EXCL ||
lock->get_state() == LOCK_SYNC_EXCL ||
lock->get_state() == LOCK_SYNC_MIX ||
lock->get_state() == LOCK_MIX_TSYN);
- assert(lock->is_gathering(from));
+ ceph_assert(lock->is_gathering(from));
lock->remove_gather(from);
if (lock->get_state() == LOCK_MIX_LOCK ||
break;
case LOCK_AC_SYNCACK:
- assert(lock->get_state() == LOCK_MIX_SYNC);
- assert(lock->is_gathering(from));
+ ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
+ ceph_assert(lock->is_gathering(from));
lock->remove_gather(from);
lock->decode_locked_state(m->get_data());
break;
case LOCK_AC_MIXACK:
- assert(lock->get_state() == LOCK_SYNC_MIX);
- assert(lock->is_gathering(from));
+ ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
+ ceph_assert(lock->is_gathering(from));
lock->remove_gather(from);
if (lock->is_gathering()) {
default:
ceph_abort();
}
-
- m->put();
}
-
-
-
-
-
-