]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Server.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / mds / Server.cc
index b01eb92b368495267b73ed4717244e92858205a3..7097eb675ead272096b2690f21fde571e6945027 100644 (file)
@@ -60,6 +60,8 @@
 
 #include "common/config.h"
 
+#include "msg/Message.h"
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
@@ -361,8 +363,8 @@ void Server::dispatch(const cref_t<Message> &m)
     handle_peer_request(ref_cast<MMDSPeerRequest>(m));
     return;
   default:
-    derr << "server unknown message " << m->get_type() << dendl;
-    ceph_abort_msg("server unknown message");  
+    derr << "Server unknown message " << m->get_type() << " from peer type " << m->get_connection()->get_peer_type() << dendl;
+    ceph_abort_msg("server unknown message  " + to_string(m->get_type()) + " from peer type " + to_string(m->get_connection()->get_peer_type()));  
   }
 }
 
@@ -438,7 +440,7 @@ void Server::reclaim_session(Session *session, const cref_t<MClientReclaim> &m)
   unsigned flags = m->get_flags();
   if (flags != CEPH_RECLAIM_RESET) { // currently only support reset
     dout(10) << __func__ << " unsupported flags" << dendl;
-    reply->set_result(-CEPHFS_EOPNOTSUPP);
+    reply->set_result(-CEPHFS_EINVAL);
     mds->send_message_client(reply, session);
     return;
   }
@@ -460,10 +462,7 @@ void Server::reclaim_session(Session *session, const cref_t<MClientReclaim> &m)
 
   if (flags & CEPH_RECLAIM_RESET) {
     finish_reclaim_session(session, reply);
-    return;
-  }
-
-  ceph_abort();
+  } else ceph_assert(0); /* no other flags are handled at this time */
 }
 
 void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaimReply> &reply)
@@ -507,8 +506,9 @@ void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaim
 void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
 {
   Session *session = mds->get_session(m);
+  uint32_t flags = m->get_flags();
   dout(3) << __func__ <<  " " << *m << " from " << m->get_source() << dendl;
-  ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+  ceph_assert(m->is_a_client()); // should _not_ come from an mds!
 
   if (!session) {
     dout(0) << " ignoring sessionless msg " << *m << dendl;
@@ -526,7 +526,15 @@ void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
     return;
   }
 
-  if (m->get_flags() & MClientReclaim::FLAG_FINISH) {
+  if (flags & MClientReclaim::FLAG_FINISH) {
+    if (flags ^ MClientReclaim::FLAG_FINISH) {
+      dout(0) << __func__ << " client specified FLAG_FINISH with other flags."
+                             " Other flags:" << flags << dendl;
+      auto reply = make_message<MClientReclaimReply>(0);
+      reply->set_result(-CEPHFS_EINVAL);
+      mds->send_message_client(reply, session);
+      return;
+    }
     finish_reclaim_session(session);
   } else {
     reclaim_session(session, m);
@@ -539,7 +547,7 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
   Session *session = mds->get_session(m);
 
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
-  ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+  ceph_assert(m->is_a_client()); // should _not_ come from an mds!
 
   if (!session) {
     dout(0) << " ignoring sessionless msg " << *m << dendl;
@@ -585,7 +593,23 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
        session->is_stale() ||
        session->is_killing() ||
        terminating_sessions) {
-      dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
+      if (m->supported_features.test(CEPHFS_FEATURE_NOTIFY_SESSION_STATE)) {
+       if (session->is_open() && !mds->is_stopping()) {
+          dout(10) << "currently already opened" << dendl;
+
+          auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN,
+                                                    session->get_push_seq());
+          if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+            reply->supported_features = supported_features;
+          mds->send_message_client(reply, session);
+          if (mdcache->is_readonly()) {
+            auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
+            mds->send_message_client(m, session);
+          }
+       }
+      }
+      dout(10) << "currently " << session->get_state_name()
+               << ", dropping this req" << dendl;
       return;
     }
     ceph_assert(session->is_closed() || session->is_closing());
@@ -782,7 +806,11 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
     break;
 
   default:
-    ceph_abort();
+    auto m = make_message<MClientSession>(CEPH_SESSION_REJECT);
+    mds->send_message_client(m, session);
+    derr << "Server received unknown message " << m->get_type() << ", closing session and blocklisting the client " << session->get_client() << dendl;
+    CachedStackStringStream css;
+    mds->evict_client(session->get_client().v, false, true, *css, nullptr);
   }
 }
 
@@ -1920,20 +1948,23 @@ void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEv
     mdr->pin(dn);
 
   early_reply(mdr, in, dn);
-  
+
   mdr->committing = true;
   submit_mdlog_entry(le, fin, mdr, __func__);
-  
+
   if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
     if (mds->queue_one_replay()) {
       dout(10) << " queued next replay op" << dendl;
     } else {
       dout(10) << " journaled last replay op" << dendl;
     }
-  } else if (mdr->did_early_reply)
+  } else if (mdr->did_early_reply) {
     mds->locker->drop_rdlocks_for_early_reply(mdr.get());
-  else
+    if (dn && dn->is_waiter_for(CDentry::WAIT_UNLINK_FINISH))
+      mdlog->flush();
+  } else {
     mdlog->flush();
+  }
 }
 
 void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
@@ -2345,7 +2376,7 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
   bool sessionclosed_isok = replay_unsafe_with_closed_session;
   // active session?
   Session *session = 0;
-  if (req->get_source().is_client()) {
+  if (req->is_a_client()) {
     session = mds->get_session(req);
     if (!session) {
       dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
@@ -2449,7 +2480,7 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
 
   // process embedded cap releases?
   //  (only if NOT replay!)
-  if (!req->releases.empty() && req->get_source().is_client() && !req->is_replay()) {
+  if (!req->releases.empty() && req->is_a_client() && !req->is_replay()) {
     client_t client = req->get_source().num();
     for (const auto &r : req->releases) {
       mds->locker->process_request_cap_release(mdr, client, r.item, r.dname);
@@ -2860,7 +2891,7 @@ void Server::handle_peer_request_reply(const cref_t<MMDSPeerRequest> &m)
     break;
 
   default:
-    ceph_abort();
+    ceph_abort_msg("unknown op " + to_string(m->get_op()) + " requested");
   }
 }
 
@@ -2887,7 +2918,7 @@ void Server::dispatch_peer_request(MDRequestRef& mdr)
 
       if (!lock) {
        dout(10) << "don't have object, dropping" << dendl;
-       ceph_abort(); // can this happen, if we auth pinned properly.
+       ceph_abort_msg("don't have object"); // can this happen, if we auth pinned properly.
       }
       if (op == MMDSPeerRequest::OP_XLOCK && !lock->get_parent()->is_auth()) {
        dout(10) << "not auth for remote xlock attempt, dropping on " 
@@ -2974,7 +3005,7 @@ void Server::dispatch_peer_request(MDRequestRef& mdr)
     break;
 
   default: 
-    ceph_abort();
+    ceph_abort_msg("unknown op "+ to_string(op)+ " received");
   }
 }
 
@@ -4432,6 +4463,11 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
   if (!excl && !dnl->is_null()) {
     // it existed.
@@ -6602,6 +6638,45 @@ void Server::handle_client_getvxattr(MDRequestRef& mdr)
 
 // ------------------------------------------------
 
+struct C_WaitUnlinkToFinish : public MDSContext {
+protected:
+  MDCache *mdcache;
+  CDentry *dn;
+  MDSContext *fin;
+
+  MDSRank *get_mds() override
+  {
+    ceph_assert(mdcache != NULL);
+    return mdcache->mds;
+  }
+
+public:
+  C_WaitUnlinkToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+    mdcache(m), dn(d), fin(f) {}
+  void finish(int r) override {
+    fin->complete(r);
+    dn->put(CDentry::PIN_PURGING);
+  }
+};
+
+bool Server::is_unlink_pending(CDentry *dn)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  if (!dnl->is_null() && dn->state_test(CDentry::STATE_UNLINKING)) {
+      return true;
+  }
+  return false;
+}
+
+void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
+{
+  dout(20) << __func__ << " dn " << *dn << dendl;
+  mds->locker->drop_locks(mdr.get());
+  auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+  dn->get(CDentry::PIN_PURGING);
+  dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
+}
+
 // MKNOD
 
 class C_MDS_mknod_finish : public ServerLogContext {
@@ -6665,6 +6740,11 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
   if (!check_access(mdr, diri, MAY_WRITE))
@@ -6763,6 +6843,11 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -6858,6 +6943,11 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   if (!dn)
     return;
 
+  if (is_unlink_pending(dn)) {
+    wait_for_pending_unlink(dn, mdr);
+    return;
+  }
+
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
 
@@ -6964,6 +7054,11 @@ void Server::handle_client_link(MDRequestRef& mdr)
     targeti = ret.second->get_projected_linkage()->get_inode();
   }
 
+  if (is_unlink_pending(destdn)) {
+    wait_for_pending_unlink(destdn, mdr);
+    return;
+  }
+
   ceph_assert(destdn->get_projected_linkage()->is_null());
   if (req->get_alternate_name().size() > alternate_name_max) {
     dout(10) << " alternate_name longer than " << alternate_name_max << dendl;
@@ -7248,11 +7343,17 @@ void Server::_link_remote_finish(MDRequestRef& mdr, bool inc,
   mdr->apply();
 
   MDRequestRef null_ref;
-  if (inc)
+  if (inc) {
     mdcache->send_dentry_link(dn, null_ref);
-  else
+  } else {
+    dn->state_clear(CDentry::STATE_UNLINKING);
     mdcache->send_dentry_unlink(dn, NULL, null_ref);
-  
+
+    MDSContext::vec finished;
+    dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+    mdcache->mds->queue_waiters(finished);
+  }
+
   // bump target popularity
   mds->balancer->hit_inode(targeti, META_POP_IWR);
   mds->balancer->hit_dir(dn->get_dir(), META_POP_IWR);
@@ -7626,10 +7727,20 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   if (rmdir)
     mdr->disable_lock_cache();
+
   CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
   if (!dn)
     return;
 
+  // notify replica MDSes the dentry is under unlink
+  if (!dn->state_test(CDentry::STATE_UNLINKING)) {
+    dn->state_set(CDentry::STATE_UNLINKING);
+    mdcache->send_dentry_unlink(dn, nullptr, mdr, true);
+    if (dn->replica_unlinking_ref) {
+      return;
+    }
+  }
+
   CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
   ceph_assert(!dnl->is_null());
   CInode *in = dnl->get_inode();
@@ -7646,11 +7757,13 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // do empty directory checks
       if (_dir_is_nonempty_unlocked(mdr, in)) {
-       respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+        dn->state_clear(CDentry::STATE_UNLINKING);
+        respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
        return;
       }
     } else {
       dout(7) << "handle_client_unlink on dir " << *in << ", returning error" << dendl;
+      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_EISDIR);
       return;
     }
@@ -7658,6 +7771,7 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
     if (rmdir) {
       // unlink
       dout(7) << "handle_client_rmdir on non-dir " << *in << ", returning error" << dendl;
+      dn->state_clear(CDentry::STATE_UNLINKING);
       respond_to_request(mdr, -CEPHFS_ENOTDIR);
       return;
     }
@@ -7665,8 +7779,10 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
 
   CInode *diri = dn->get_dir()->get_inode();
   if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
-    if (!check_access(mdr, diri, MAY_WRITE))
+    if (!check_access(mdr, diri, MAY_WRITE)) {
+      dn->state_clear(CDentry::STATE_UNLINKING);
       return;
+    }
   }
 
   // -- create stray dentry? --
@@ -7705,6 +7821,7 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   if (in->is_dir() &&
       _dir_is_nonempty(mdr, in)) {
     respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+    dn->state_clear(CDentry::STATE_UNLINKING);
     return;
   }
 
@@ -7904,9 +8021,14 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
   }
 
   mdr->apply();
-  
+
+  dn->state_clear(CDentry::STATE_UNLINKING);
   mdcache->send_dentry_unlink(dn, straydn, mdr);
-  
+
+  MDSContext::vec finished;
+  dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+  mdcache->mds->queue_waiters(finished);
+
   if (straydn) {
     // update subtree map?
     if (strayin->is_dir())
@@ -7921,7 +8043,7 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
 
   // reply
   respond_to_request(mdr, 0);
-  
+
   // removing a new dn?
   dn->get_dir()->try_remove_unlinked_dn(dn);
 
@@ -8380,6 +8502,16 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   if (!destdn)
     return;
 
+  if (is_unlink_pending(destdn)) {
+    wait_for_pending_unlink(destdn, mdr);
+    return;
+  }
+
+  if (is_unlink_pending(srcdn)) {
+    wait_for_pending_unlink(srcdn, mdr);
+    return;
+  }
+
   dout(10) << " destdn " << *destdn << dendl;
   CDir *destdir = destdn->get_dir();
   ceph_assert(destdir->is_auth());