]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Migrator.cc
update sources to 12.2.8
[ceph.git] / ceph / src / mds / Migrator.cc
index 40a89626bc798710342ca357424d873370d4ecf8..f58ec7fd7ef52ec8ebe4a1ac8df2c09db39d3b2e 100644 (file)
@@ -110,6 +110,14 @@ public:
 /* This function DOES put the passed message before returning*/
 void Migrator::dispatch(Message *m)
 {
+  if (unlikely(inject_message_loss)) {
+    if (inject_message_loss == m->get_type() - MDS_PORT_MIGRATOR) {
+      dout(0) << "inject message loss " << *m << dendl;
+      m->put();
+      return;
+    }
+  }
+
   switch (m->get_type()) {
     // import
   case MSG_MDS_EXPORTDIRDISCOVER:
@@ -156,6 +164,9 @@ void Migrator::dispatch(Message *m)
   case MSG_MDS_EXPORTCAPS:
     handle_export_caps(static_cast<MExportCaps*>(m));
     break;
+  case MSG_MDS_EXPORTCAPSACK:
+    handle_export_caps_ack(static_cast<MExportCapsAck*>(m));
+    break;
   case MSG_MDS_GATHERCAPS:
     handle_gather_caps(static_cast<MGatherCaps*>(m));
     break;
@@ -370,7 +381,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
 
     if (it->second.state == EXPORT_CANCELLED) {
       export_state.erase(it);
-      dir->state_clear(CDir::STATE_EXPORTING);
+      dir->clear_exporting();
       // send pending import_maps?
       cache->maybe_send_pending_resolves();
     }
@@ -395,7 +406,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
 void Migrator::export_cancel_finish(CDir *dir)
 {
   assert(dir->state_test(CDir::STATE_EXPORTING));
-  dir->state_clear(CDir::STATE_EXPORTING);
+  dir->clear_exporting();
 
   // pinned by Migrator::export_notify_abort()
   dir->auth_unpin(this);
@@ -860,7 +871,7 @@ void Migrator::export_dir(CDir *dir, mds_rank_t dest)
   mds->hit_export_target(ceph_clock_now(), dest, -1);
 
   dir->auth_pin(this);
-  dir->state_set(CDir::STATE_EXPORTING);
+  dir->mark_exporting();
 
   MDRequestRef mdr = mds->mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
   mdr->more()->export_dir = dir;
@@ -1074,7 +1085,7 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
     mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
     export_state.erase(it);
 
-    dir->state_clear(CDir::STATE_EXPORTING);
+    dir->clear_exporting();
     cache->maybe_send_pending_resolves();
     return;
   }
@@ -2073,7 +2084,7 @@ void Migrator::export_finish(CDir *dir)
   MutationRef mut = it->second.mut;
   // remove from exporting list, clean up state
   export_state.erase(it);
-  dir->state_clear(CDir::STATE_EXPORTING);
+  dir->clear_exporting();
 
   cache->show_subtrees();
   audit();
@@ -3090,10 +3101,16 @@ void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_c
        cap->mark_importing();
     }
 
-    Capability::Import& im = import_map[it.first];
-    im.cap_id = cap->get_cap_id();
-    im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
-    im.issue_seq = cap->get_last_seq() + 1;
+    // Always ask exporter mds to send cap export messages for auth caps.
+    // For non-auth caps, ask exporter mds to send cap export messages to
+    // clients who haven't opened sessions. The cap export messages will
+    // make clients open sessions.
+    if (auth_cap || session->connection == nullptr) {
+      Capability::Import& im = import_map[it.first];
+      im.cap_id = cap->get_cap_id();
+      im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
+      im.issue_seq = cap->get_last_seq() + 1;
+    }
 
     if (peer >= 0) {
       cap->merge(it.second, auth_cap);
@@ -3299,16 +3316,55 @@ void Migrator::export_caps(CInode *in)
   mds->send_message_mds(ex, dest);
 }
 
+/* This function DOES put the passed message before returning*/
+void Migrator::handle_export_caps_ack(MExportCapsAck *ack)
+{
+  mds_rank_t from = ack->get_source().num();
+  CInode *in = cache->get_inode(ack->ino);
+  if (in) {
+    assert(!in->is_auth());
+
+    dout(10) << "handle_export_caps_ack " << *ack << " from "
+            << ack->get_source() << " on " << *in << dendl;
+
+    map<client_t,Capability::Import> imported_caps;
+    map<client_t,uint64_t> caps_ids;
+    auto blp = ack->cap_bl.begin();
+    ::decode(imported_caps, blp);
+    ::decode(caps_ids, blp);
+
+    for (auto& it : imported_caps) {
+      Capability *cap = in->get_client_cap(it.first);
+      if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
+       continue;
+
+      dout(7) << __func__ << " telling client." << it.first
+             << " exported caps on " << *in << dendl;
+      MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, in->ino(), 0,
+                                      cap->get_cap_id(), cap->get_mseq(),
+                                      mds->get_osd_epoch_barrier());
+      m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
+      mds->send_message_client_counted(m, it.first);
+
+      in->remove_client_cap(it.first);
+    }
+
+    mds->locker->request_inode_file_caps(in);
+    mds->locker->try_eval(in, CEPH_CAP_LOCKS);
+  }
+
+  ack->put();
+}
+
 void Migrator::handle_gather_caps(MGatherCaps *m)
 {
   CInode *in = cache->get_inode(m->ino);
-
   if (!in)
     goto out;
 
   dout(10) << "handle_gather_caps " << *m << " from " << m->get_source()
-           << " on " << *in
-          << dendl;
+           << " on " << *in << dendl;
+
   if (in->is_any_caps() &&
       !in->is_auth() &&
       !in->is_ambiguous_auth() &&
@@ -3384,14 +3440,25 @@ void Migrator::logged_import_caps(CInode *in,
   // force open client sessions and finish cap import
   mds->server->finish_force_open_sessions(imported_session_map);
 
-  map<client_t,Capability::Import> imported_caps;
-
   auto it = peer_exports.find(in);
   assert(it != peer_exports.end());
 
   // clients will release caps from the exporter when they receive the cap import message.
+  map<client_t,Capability::Import> imported_caps;
   finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
   mds->locker->eval(in, CEPH_CAP_LOCKS, true);
+
+  if (!imported_caps.empty()) {
+    MExportCapsAck *ack = new MExportCapsAck(in->ino());
+    map<client_t,uint64_t> peer_caps_ids;
+    for (auto &p : imported_caps )
+      peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
+
+    ::encode(imported_caps, ack->cap_bl);
+    ::encode(peer_caps_ids, ack->cap_bl);
+    mds->send_message_mds(ack, from);
+  }
+
   in->auth_unpin(this);
 }
 
@@ -3403,4 +3470,9 @@ void Migrator::handle_conf_change(const struct md_config_t *conf,
     inject_session_race = conf->get_val<bool>("mds_inject_migrator_session_race");
     dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
   }
+
+  if (changed.count("mds_inject_migrator_message_loss")) {
+    inject_message_loss = g_conf->get_val<int64_t>("mds_inject_migrator_message_loss");
+    dout(0) << "mds_inject_migrator_message_loss is " << inject_message_loss << dendl;
+  }
 }