#include "common/config.h"
+#include "msg/Message.h"
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
handle_peer_request(ref_cast<MMDSPeerRequest>(m));
return;
default:
- derr << "server unknown message " << m->get_type() << dendl;
- ceph_abort_msg("server unknown message");
+ derr << "Server unknown message " << m->get_type() << " from peer type " << m->get_connection()->get_peer_type() << dendl;
+ ceph_abort_msg("server unknown message " + to_string(m->get_type()) + " from peer type " + to_string(m->get_connection()->get_peer_type()));
}
}
unsigned flags = m->get_flags();
if (flags != CEPH_RECLAIM_RESET) { // currently only support reset
dout(10) << __func__ << " unsupported flags" << dendl;
- reply->set_result(-CEPHFS_EOPNOTSUPP);
+ reply->set_result(-CEPHFS_EINVAL);
mds->send_message_client(reply, session);
return;
}
if (flags & CEPH_RECLAIM_RESET) {
finish_reclaim_session(session, reply);
- return;
- }
-
- ceph_abort();
+ } else ceph_assert(0); /* no other flags are handled at this time */
}
void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaimReply> &reply)
void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
{
Session *session = mds->get_session(m);
+ uint32_t flags = m->get_flags();
dout(3) << __func__ << " " << *m << " from " << m->get_source() << dendl;
- ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+ ceph_assert(m->is_a_client()); // should _not_ come from an mds!
if (!session) {
dout(0) << " ignoring sessionless msg " << *m << dendl;
return;
}
- if (m->get_flags() & MClientReclaim::FLAG_FINISH) {
+ if (flags & MClientReclaim::FLAG_FINISH) {
+ if (flags ^ MClientReclaim::FLAG_FINISH) {
+ dout(0) << __func__ << " client specified FLAG_FINISH with other flags."
+ " Other flags:" << flags << dendl;
+ auto reply = make_message<MClientReclaimReply>(0);
+ reply->set_result(-CEPHFS_EINVAL);
+ mds->send_message_client(reply, session);
+ return;
+ }
finish_reclaim_session(session);
} else {
reclaim_session(session, m);
Session *session = mds->get_session(m);
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
- ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+ ceph_assert(m->is_a_client()); // should _not_ come from an mds!
if (!session) {
dout(0) << " ignoring sessionless msg " << *m << dendl;
session->is_stale() ||
session->is_killing() ||
terminating_sessions) {
- dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
+ if (m->supported_features.test(CEPHFS_FEATURE_NOTIFY_SESSION_STATE)) {
+ if (session->is_open() && !mds->is_stopping()) {
+ dout(10) << "currently already opened" << dendl;
+
+ auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN,
+ session->get_push_seq());
+ if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+ reply->supported_features = supported_features;
+ mds->send_message_client(reply, session);
+ if (mdcache->is_readonly()) {
+ auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
+ mds->send_message_client(m, session);
+ }
+ }
+ }
+ dout(10) << "currently " << session->get_state_name()
+ << ", dropping this req" << dendl;
return;
}
ceph_assert(session->is_closed() || session->is_closing());
break;
default:
- ceph_abort();
+ auto m = make_message<MClientSession>(CEPH_SESSION_REJECT);
+ mds->send_message_client(m, session);
+ derr << "Server received unknown message " << m->get_type() << ", closing session and blocklisting the client " << session->get_client() << dendl;
+ CachedStackStringStream css;
+ mds->evict_client(session->get_client().v, false, true, *css, nullptr);
}
}
mdr->pin(dn);
early_reply(mdr, in, dn);
-
+
mdr->committing = true;
submit_mdlog_entry(le, fin, mdr, __func__);
-
+
if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
if (mds->queue_one_replay()) {
dout(10) << " queued next replay op" << dendl;
} else {
dout(10) << " journaled last replay op" << dendl;
}
- } else if (mdr->did_early_reply)
+ } else if (mdr->did_early_reply) {
mds->locker->drop_rdlocks_for_early_reply(mdr.get());
- else
+ if (dn && dn->is_waiter_for(CDentry::WAIT_UNLINK_FINISH))
+ mdlog->flush();
+ } else {
mdlog->flush();
+ }
}
void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
bool sessionclosed_isok = replay_unsafe_with_closed_session;
// active session?
Session *session = 0;
- if (req->get_source().is_client()) {
+ if (req->is_a_client()) {
session = mds->get_session(req);
if (!session) {
dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
// process embedded cap releases?
// (only if NOT replay!)
- if (!req->releases.empty() && req->get_source().is_client() && !req->is_replay()) {
+ if (!req->releases.empty() && req->is_a_client() && !req->is_replay()) {
client_t client = req->get_source().num();
for (const auto &r : req->releases) {
mds->locker->process_request_cap_release(mdr, client, r.item, r.dname);
break;
default:
- ceph_abort();
+ ceph_abort_msg("unknown op " + to_string(m->get_op()) + " requested");
}
}
if (!lock) {
dout(10) << "don't have object, dropping" << dendl;
- ceph_abort(); // can this happen, if we auth pinned properly.
+ ceph_abort_msg("don't have object"); // can this happen, if we auth pinned properly.
}
if (op == MMDSPeerRequest::OP_XLOCK && !lock->get_parent()->is_auth()) {
dout(10) << "not auth for remote xlock attempt, dropping on "
break;
default:
- ceph_abort();
+ ceph_abort_msg("unknown op "+ to_string(op)+ " received");
}
}
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDentry::linkage_t *dnl = dn->get_projected_linkage();
if (!excl && !dnl->is_null()) {
// it existed.
// ------------------------------------------------
+struct C_WaitUnlinkToFinish : public MDSContext {
+protected:
+ MDCache *mdcache;
+ CDentry *dn;
+ MDSContext *fin;
+
+ MDSRank *get_mds() override
+ {
+ ceph_assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+
+public:
+ C_WaitUnlinkToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+ mdcache(m), dn(d), fin(f) {}
+ void finish(int r) override {
+ fin->complete(r);
+ dn->put(CDentry::PIN_PURGING);
+ }
+};
+
+bool Server::is_unlink_pending(CDentry *dn)
+{
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ if (!dnl->is_null() && dn->state_test(CDentry::STATE_UNLINKING)) {
+ return true;
+ }
+ return false;
+}
+
+void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
+{
+ dout(20) << __func__ << " dn " << *dn << dendl;
+ mds->locker->drop_locks(mdr.get());
+ auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+ dn->get(CDentry::PIN_PURGING);
+ dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
+}
+
// MKNOD
class C_MDS_mknod_finish : public ServerLogContext {
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
if (!check_access(mdr, diri, MAY_WRITE))
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
targeti = ret.second->get_projected_linkage()->get_inode();
}
+ if (is_unlink_pending(destdn)) {
+ wait_for_pending_unlink(destdn, mdr);
+ return;
+ }
+
ceph_assert(destdn->get_projected_linkage()->is_null());
if (req->get_alternate_name().size() > alternate_name_max) {
dout(10) << " alternate_name longer than " << alternate_name_max << dendl;
mdr->apply();
MDRequestRef null_ref;
- if (inc)
+ if (inc) {
mdcache->send_dentry_link(dn, null_ref);
- else
+ } else {
+ dn->state_clear(CDentry::STATE_UNLINKING);
mdcache->send_dentry_unlink(dn, NULL, null_ref);
-
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+ mdcache->mds->queue_waiters(finished);
+ }
+
// bump target popularity
mds->balancer->hit_inode(targeti, META_POP_IWR);
mds->balancer->hit_dir(dn->get_dir(), META_POP_IWR);
if (rmdir)
mdr->disable_lock_cache();
+
CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
if (!dn)
return;
+ // notify replica MDSes the dentry is under unlink
+ if (!dn->state_test(CDentry::STATE_UNLINKING)) {
+ dn->state_set(CDentry::STATE_UNLINKING);
+ mdcache->send_dentry_unlink(dn, nullptr, mdr, true);
+ if (dn->replica_unlinking_ref) {
+ return;
+ }
+ }
+
CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
ceph_assert(!dnl->is_null());
CInode *in = dnl->get_inode();
if (rmdir) {
// do empty directory checks
if (_dir_is_nonempty_unlocked(mdr, in)) {
- respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+ dn->state_clear(CDentry::STATE_UNLINKING);
+ respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
return;
}
} else {
dout(7) << "handle_client_unlink on dir " << *in << ", returning error" << dendl;
+ dn->state_clear(CDentry::STATE_UNLINKING);
respond_to_request(mdr, -CEPHFS_EISDIR);
return;
}
if (rmdir) {
// unlink
dout(7) << "handle_client_rmdir on non-dir " << *in << ", returning error" << dendl;
+ dn->state_clear(CDentry::STATE_UNLINKING);
respond_to_request(mdr, -CEPHFS_ENOTDIR);
return;
}
CInode *diri = dn->get_dir()->get_inode();
if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
- if (!check_access(mdr, diri, MAY_WRITE))
+ if (!check_access(mdr, diri, MAY_WRITE)) {
+ dn->state_clear(CDentry::STATE_UNLINKING);
return;
+ }
}
// -- create stray dentry? --
if (in->is_dir() &&
_dir_is_nonempty(mdr, in)) {
respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+ dn->state_clear(CDentry::STATE_UNLINKING);
return;
}
}
mdr->apply();
-
+
+ dn->state_clear(CDentry::STATE_UNLINKING);
mdcache->send_dentry_unlink(dn, straydn, mdr);
-
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+ mdcache->mds->queue_waiters(finished);
+
if (straydn) {
// update subtree map?
if (strayin->is_dir())
// reply
respond_to_request(mdr, 0);
-
+
// removing a new dn?
dn->get_dir()->try_remove_unlinked_dn(dn);
if (!destdn)
return;
+ if (is_unlink_pending(destdn)) {
+ wait_for_pending_unlink(destdn, mdr);
+ return;
+ }
+
+ if (is_unlink_pending(srcdn)) {
+ wait_for_pending_unlink(srcdn, mdr);
+ return;
+ }
+
dout(10) << " destdn " << *destdn << dendl;
CDir *destdir = destdn->get_dir();
ceph_assert(destdir->is_auth());