]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Migrator.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mds / Migrator.cc
index 1e2a3a302f6520269e53e09ddc7859e8aee4a87f..40a89626bc798710342ca357424d873370d4ecf8 100644 (file)
@@ -27,6 +27,7 @@
 #include "Mutation.h"
 
 #include "include/filepath.h"
+#include "common/likely.h"
 
 #include "events/EExport.h"
 #include "events/EImportStart.h"
@@ -118,7 +119,12 @@ void Migrator::dispatch(Message *m)
     handle_export_prep(static_cast<MExportDirPrep*>(m));
     break;
   case MSG_MDS_EXPORTDIR:
-    handle_export_dir(static_cast<MExportDir*>(m));
+    if (unlikely(inject_session_race)) {
+      dout(0) << "waiting for inject_session_race" << dendl;
+      mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
+    } else {
+      handle_export_dir(static_cast<MExportDir*>(m));
+    }
     break;
   case MSG_MDS_EXPORTDIRFINISH:
     handle_export_finish(static_cast<MExportDirFinish*>(m));
@@ -1511,7 +1517,8 @@ void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer,
 
     map<client_t,Capability::Import>::iterator q = peer_imported.find(it->first);
     assert(q != peer_imported.end());
-    m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0);
+    m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
+                   (q->second.cap_id > 0 ? peer : -1), 0);
     mds->send_message_client_counted(m, it->first);
   }
   in->clear_client_caps_after_export();
@@ -2442,14 +2449,13 @@ class C_MDS_ImportDirLoggedStart : public MigratorLogContext {
   CDir *dir;
   mds_rank_t from;
 public:
-  map<client_t,entity_inst_t> imported_client_map;
-  map<client_t,uint64_t> sseqmap;
+  map<client_t,pair<Session*,uint64_t> > imported_session_map;
 
   C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
     MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
   }
   void finish(int r) override {
-    mig->import_logged_start(df, dir, from, imported_client_map, sseqmap);
+    mig->import_logged_start(df, dir, from, imported_session_map);
   }
 };
 
@@ -2492,10 +2498,11 @@ void Migrator::handle_export_dir(MExportDir *m)
   // new client sessions, open these after we journal
   // include imported sessions in EImportStart
   bufferlist::iterator cmp = m->client_map.begin();
-  ::decode(onlogged->imported_client_map, cmp);
+  map<client_t,entity_inst_t> client_map;
+  decode(client_map, cmp);
   assert(cmp.end());
-  le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap);
-  le->client_map.claim(m->client_map);
+  le->cmapv = mds->server->prepare_force_open_sessions(client_map, onlogged->imported_session_map);
+  encode(client_map, le->client_map, mds->mdsmap->get_up_features());
 
   bufferlist::iterator blp = m->export_data.begin();
   int num_imported_inodes = 0;
@@ -2691,24 +2698,24 @@ void Migrator::import_reverse(CDir *dir)
   if (stat.state == IMPORT_ACKING) {
     // remove imported caps
     for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
-       p != stat.peer_exports.end();
-       ++p) {
+        p != stat.peer_exports.end();
+        ++p) {
       CInode *in = p->first;
       for (map<client_t,Capability::Export>::iterator q = p->second.begin();
-         q != p->second.end();
-         ++q) {
+          q != p->second.end();
+          ++q) {
        Capability *cap = in->get_client_cap(q->first);
-       assert(cap);
+       if (!cap) {
+         assert(!stat.session_map.count(q->first));
+         continue;
+       }
        if (cap->is_importing())
          in->remove_client_cap(q->first);
       }
       in->put(CInode::PIN_IMPORTINGCAPS);
     }
-    for (map<client_t,entity_inst_t>::iterator p = stat.client_map.begin();
-        p != stat.client_map.end();
-        ++p) {
-      Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
-      assert(session);
+    for (auto& p : stat.session_map) {
+      Session *session = p.second.first;
       session->dec_importing();
     }
   }
@@ -2808,14 +2815,13 @@ void Migrator::import_reverse_final(CDir *dir)
 
 
 void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
-                                  map<client_t,entity_inst_t>& imported_client_map,
-                                  map<client_t,uint64_t>& sseqmap)
+                                  map<client_t,pair<Session*,uint64_t> >& imported_session_map)
 {
   map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
   if (it == import_state.end() ||
       it->second.state != IMPORT_LOGGINGSTART) {
     dout(7) << "import " << df << " must have aborted" << dendl;
-    mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
+    mds->server->finish_force_open_sessions(imported_session_map);
     return;
   }
 
@@ -2827,16 +2833,18 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
   assert (g_conf->mds_kill_import_at != 7);
 
   // force open client sessions and finish cap import
-  mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false);
-  it->second.client_map.swap(imported_client_map);
+  mds->server->finish_force_open_sessions(imported_session_map, false);
   
   map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
   for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
        p != it->second.peer_exports.end();
        ++p) {
     // parameter 'peer' is NONE, delay sending cap import messages to client
-    finish_import_inode_caps(p->first, MDS_RANK_NONE, true, p->second, imported_caps[p->first->ino()]);
+    finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
+                            p->second, imported_caps[p->first->ino()]);
   }
+
+  it->second.session_map.swap(imported_session_map);
   
   // send notify's etc.
   dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
@@ -2894,8 +2902,11 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
       for (map<client_t,Capability::Export>::iterator q = p->second.begin();
          q != p->second.end();
          ++q) {
-       Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
-       assert(session);
+       auto r = it->second.session_map.find(q->first);
+       if (r == it->second.session_map.end())
+         continue;
+
+       Session *session = r->second.first;
        Capability *cap = in->get_client_cap(q->first);
        assert(cap);
        cap->merge(q->second, true);
@@ -2906,11 +2917,8 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
       p->second.clear();
       in->replica_caps_wanted = 0;
     }
-    for (map<client_t,entity_inst_t>::iterator p = it->second.client_map.begin();
-        p != it->second.client_map.end();
-        ++p) {
-      Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
-      assert(session);
+    for (auto& p : it->second.session_map) {
+      Session *session = p.second.first;
       session->dec_importing();
     }
   }
@@ -3009,6 +3017,9 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp,
     assert(!dn->get_linkage()->get_inode());
     dn->dir->link_primary_inode(dn, in);
   }
+
+  if (in->is_dir())
+    dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru);
  
   // add inode?
   if (added) {
@@ -3056,32 +3067,38 @@ void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
 }
 
 void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
-                                       map<client_t,Capability::Export> &export_map,
+                                       const map<client_t,pair<Session*,uint64_t> >& session_map,
+                                       const map<client_t,Capability::Export> &export_map,
                                        map<client_t,Capability::Import> &import_map)
 {
-  for (map<client_t,Capability::Export>::iterator it = export_map.begin();
-       it != export_map.end();
-       ++it) {
-    dout(10) << "finish_import_inode_caps for client." << it->first << " on " << *in << dendl;
-    Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first.v));
-    assert(session);
+  for (auto& it : export_map) {
+    dout(10) << "finish_import_inode_caps for client." << it.first << " on " << *in << dendl;
+
+    auto p = session_map.find(it.first);
+    if (p == session_map.end()) {
+      dout(10) << " no session for client." << it.first << dendl;
+      (void)import_map[it.first];
+      continue;
+    }
 
-    Capability *cap = in->get_client_cap(it->first);
+    Session *session = p->second.first;
+
+    Capability *cap = in->get_client_cap(it.first);
     if (!cap) {
-      cap = in->add_client_cap(it->first, session);
+      cap = in->add_client_cap(it.first, session);
       if (peer < 0)
        cap->mark_importing();
     }
 
-    Capability::Import& im = import_map[it->first];
+    Capability::Import& im = import_map[it.first];
     im.cap_id = cap->get_cap_id();
-    im.mseq = auth_cap ? it->second.mseq : cap->get_mseq();
+    im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
     im.issue_seq = cap->get_last_seq() + 1;
 
     if (peer >= 0) {
-      cap->merge(it->second, auth_cap);
-      mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
-                                 it->second.seq, it->second.mseq - 1, peer,
+      cap->merge(it.second, auth_cap);
+      mds->mdcache->do_cap_import(session, in, cap, it.second.cap_id,
+                                 it.second.seq, it.second.mseq - 1, peer,
                                  auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
     }
   }
@@ -3306,13 +3323,12 @@ class C_M_LoggedImportCaps : public MigratorLogContext {
   CInode *in;
   mds_rank_t from;
 public:
+  map<client_t,pair<Session*,uint64_t> > imported_session_map;
   map<CInode*, map<client_t,Capability::Export> > peer_exports;
-  map<client_t,entity_inst_t> client_map;
-  map<client_t,uint64_t> sseqmap;
 
   C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
   void finish(int r) override {
-    mig->logged_import_caps(in, from, peer_exports, client_map, sseqmap);
+    mig->logged_import_caps(in, from, imported_session_map, peer_exports);
   }  
 };
 
@@ -3326,23 +3342,29 @@ void Migrator::handle_export_caps(MExportCaps *ex)
   assert(in->is_auth());
 
   // FIXME
-  if (!in->can_auth_pin())
+  if (!in->can_auth_pin()) {
+    ex->put();
     return;
+  }
+
   in->auth_pin(this);
 
+  map<client_t,entity_inst_t> client_map;
+  client_map.swap(ex->client_map);
+
   C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
       this, in, mds_rank_t(ex->get_source().num()));
-  finish->client_map = ex->client_map;
 
+  version_t pv = mds->server->prepare_force_open_sessions(client_map,
+                                                         finish->imported_session_map);
   // decode new caps
   bufferlist::iterator blp = ex->cap_bl.begin();
   decode_import_inode_caps(in, false, blp, finish->peer_exports);
   assert(!finish->peer_exports.empty());   // thus, inode is pinned.
 
   // journal open client sessions
-  version_t pv = mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap);
   
-  ESessions *le = new ESessions(pv, ex->client_map);
+  ESessions *le = new ESessions(pv, client_map);
   mds->mdlog->start_submit_entry(le, finish);
   mds->mdlog->flush();
 
@@ -3352,22 +3374,33 @@ void Migrator::handle_export_caps(MExportCaps *ex)
 
 void Migrator::logged_import_caps(CInode *in, 
                                  mds_rank_t from,
-                                 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
-                                 map<client_t,entity_inst_t>& client_map,
-                                 map<client_t,uint64_t>& sseqmap) 
+                                 map<client_t,pair<Session*,uint64_t> >& imported_session_map,
+                                 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
 {
   dout(10) << "logged_import_caps on " << *in << dendl;
   // see export_go() vs export_go_synced()
   assert(in->is_auth());
 
   // force open client sessions and finish cap import
-  mds->server->finish_force_open_sessions(client_map, sseqmap);
+  mds->server->finish_force_open_sessions(imported_session_map);
 
   map<client_t,Capability::Import> imported_caps;
 
-  assert(peer_exports.count(in));
+  auto it = peer_exports.find(in);
+  assert(it != peer_exports.end());
+
   // clients will release caps from the exporter when they receive the cap import message.
-  finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
+  finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
   mds->locker->eval(in, CEPH_CAP_LOCKS, true);
   in->auth_unpin(this);
 }
+
+void Migrator::handle_conf_change(const struct md_config_t *conf,
+                                  const std::set <std::string> &changed,
+                                  const MDSMap &mds_map)
+{
+  if (changed.count("mds_inject_migrator_session_race")) {
+    inject_session_race = conf->get_val<bool>("mds_inject_migrator_session_race");
+    dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
+  }
+}