X-Git-Url: https://git.proxmox.com/?p=ceph.git;a=blobdiff_plain;f=ceph%2Fsrc%2Fmds%2FMigrator.cc;h=40a89626bc798710342ca357424d873370d4ecf8;hp=1e2a3a302f6520269e53e09ddc7859e8aee4a87f;hb=28e407b858acd3bddc89f68583571f771bb42e46;hpb=dfcb7b53b2e4fcd2a5af0240d4975adc711ab96e diff --git a/ceph/src/mds/Migrator.cc b/ceph/src/mds/Migrator.cc index 1e2a3a302..40a89626b 100644 --- a/ceph/src/mds/Migrator.cc +++ b/ceph/src/mds/Migrator.cc @@ -27,6 +27,7 @@ #include "Mutation.h" #include "include/filepath.h" +#include "common/likely.h" #include "events/EExport.h" #include "events/EImportStart.h" @@ -118,7 +119,12 @@ void Migrator::dispatch(Message *m) handle_export_prep(static_cast(m)); break; case MSG_MDS_EXPORTDIR: - handle_export_dir(static_cast(m)); + if (unlikely(inject_session_race)) { + dout(0) << "waiting for inject_session_race" << dendl; + mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m)); + } else { + handle_export_dir(static_cast(m)); + } break; case MSG_MDS_EXPORTDIRFINISH: handle_export_finish(static_cast(m)); @@ -1511,7 +1517,8 @@ void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer, map::iterator q = peer_imported.find(it->first); assert(q != peer_imported.end()); - m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0); + m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, + (q->second.cap_id > 0 ? peer : -1), 0); mds->send_message_client_counted(m, it->first); } in->clear_client_caps_after_export(); @@ -2442,14 +2449,13 @@ class C_MDS_ImportDirLoggedStart : public MigratorLogContext { CDir *dir; mds_rank_t from; public: - map imported_client_map; - map sseqmap; + map > imported_session_map; C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) : MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) { } void finish(int r) override { - mig->import_logged_start(df, dir, from, imported_client_map, sseqmap); + mig->import_logged_start(df, dir, from, imported_session_map); } }; @@ -2492,10 +2498,11 @@ void Migrator::handle_export_dir(MExportDir *m) // new client sessions, open these after we journal // include imported sessions in EImportStart bufferlist::iterator cmp = m->client_map.begin(); - ::decode(onlogged->imported_client_map, cmp); + map client_map; + decode(client_map, cmp); assert(cmp.end()); - le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap); - le->client_map.claim(m->client_map); + le->cmapv = mds->server->prepare_force_open_sessions(client_map, onlogged->imported_session_map); + encode(client_map, le->client_map, mds->mdsmap->get_up_features()); bufferlist::iterator blp = m->export_data.begin(); int num_imported_inodes = 0; @@ -2691,24 +2698,24 @@ void Migrator::import_reverse(CDir *dir) if (stat.state == IMPORT_ACKING) { // remove imported caps for (map >::iterator p = stat.peer_exports.begin(); - p != stat.peer_exports.end(); - ++p) { + p != stat.peer_exports.end(); + ++p) { CInode *in = p->first; for (map::iterator q = p->second.begin(); - q != p->second.end(); - ++q) { + q != p->second.end(); + ++q) { Capability *cap = in->get_client_cap(q->first); - assert(cap); + if (!cap) { + assert(!stat.session_map.count(q->first)); + continue; + } if (cap->is_importing()) in->remove_client_cap(q->first); } in->put(CInode::PIN_IMPORTINGCAPS); } - for (map::iterator p = stat.client_map.begin(); - p != stat.client_map.end(); - ++p) { - Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v)); - assert(session); + for (auto& p : stat.session_map) { + Session *session = p.second.first; session->dec_importing(); } } @@ -2808,14 +2815,13 @@ void Migrator::import_reverse_final(CDir *dir) void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from, - map& imported_client_map, - map& sseqmap) + map >& imported_session_map) { map::iterator it = import_state.find(dir->dirfrag()); if (it == import_state.end() || it->second.state != IMPORT_LOGGINGSTART) { dout(7) << "import " << df << " must have aborted" << dendl; - mds->server->finish_force_open_sessions(imported_client_map, sseqmap); + mds->server->finish_force_open_sessions(imported_session_map); return; } @@ -2827,16 +2833,18 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from, assert (g_conf->mds_kill_import_at != 7); // force open client sessions and finish cap import - mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false); - it->second.client_map.swap(imported_client_map); + mds->server->finish_force_open_sessions(imported_session_map, false); map > imported_caps; for (map >::iterator p = it->second.peer_exports.begin(); p != it->second.peer_exports.end(); ++p) { // parameter 'peer' is NONE, delay sending cap import messages to client - finish_import_inode_caps(p->first, MDS_RANK_NONE, true, p->second, imported_caps[p->first->ino()]); + finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map, + p->second, imported_caps[p->first->ino()]); } + + it->second.session_map.swap(imported_session_map); // send notify's etc. dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl; @@ -2894,8 +2902,11 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last) for (map::iterator q = p->second.begin(); q != p->second.end(); ++q) { - Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v)); - assert(session); + auto r = it->second.session_map.find(q->first); + if (r == it->second.session_map.end()) + continue; + + Session *session = r->second.first; Capability *cap = in->get_client_cap(q->first); assert(cap); cap->merge(q->second, true); @@ -2906,11 +2917,8 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last) p->second.clear(); in->replica_caps_wanted = 0; } - for (map::iterator p = it->second.client_map.begin(); - p != it->second.client_map.end(); - ++p) { - Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v)); - assert(session); + for (auto& p : it->second.session_map) { + Session *session = p.second.first; session->dec_importing(); } } @@ -3009,6 +3017,9 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, assert(!dn->get_linkage()->get_inode()); dn->dir->link_primary_inode(dn, in); } + + if (in->is_dir()) + dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru); // add inode? if (added) { @@ -3056,32 +3067,38 @@ void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap, } void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap, - map &export_map, + const map >& session_map, + const map &export_map, map &import_map) { - for (map::iterator it = export_map.begin(); - it != export_map.end(); - ++it) { - dout(10) << "finish_import_inode_caps for client." << it->first << " on " << *in << dendl; - Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first.v)); - assert(session); + for (auto& it : export_map) { + dout(10) << "finish_import_inode_caps for client." << it.first << " on " << *in << dendl; + + auto p = session_map.find(it.first); + if (p == session_map.end()) { + dout(10) << " no session for client." << it.first << dendl; + (void)import_map[it.first]; + continue; + } - Capability *cap = in->get_client_cap(it->first); + Session *session = p->second.first; + + Capability *cap = in->get_client_cap(it.first); if (!cap) { - cap = in->add_client_cap(it->first, session); + cap = in->add_client_cap(it.first, session); if (peer < 0) cap->mark_importing(); } - Capability::Import& im = import_map[it->first]; + Capability::Import& im = import_map[it.first]; im.cap_id = cap->get_cap_id(); - im.mseq = auth_cap ? it->second.mseq : cap->get_mseq(); + im.mseq = auth_cap ? it.second.mseq : cap->get_mseq(); im.issue_seq = cap->get_last_seq() + 1; if (peer >= 0) { - cap->merge(it->second, auth_cap); - mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id, - it->second.seq, it->second.mseq - 1, peer, + cap->merge(it.second, auth_cap); + mds->mdcache->do_cap_import(session, in, cap, it.second.cap_id, + it.second.seq, it.second.mseq - 1, peer, auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE); } } @@ -3306,13 +3323,12 @@ class C_M_LoggedImportCaps : public MigratorLogContext { CInode *in; mds_rank_t from; public: + map > imported_session_map; map > peer_exports; - map client_map; - map sseqmap; C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {} void finish(int r) override { - mig->logged_import_caps(in, from, peer_exports, client_map, sseqmap); + mig->logged_import_caps(in, from, imported_session_map, peer_exports); } }; @@ -3326,23 +3342,29 @@ void Migrator::handle_export_caps(MExportCaps *ex) assert(in->is_auth()); // FIXME - if (!in->can_auth_pin()) + if (!in->can_auth_pin()) { + ex->put(); return; + } + in->auth_pin(this); + map client_map; + client_map.swap(ex->client_map); + C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps( this, in, mds_rank_t(ex->get_source().num())); - finish->client_map = ex->client_map; + version_t pv = mds->server->prepare_force_open_sessions(client_map, + finish->imported_session_map); // decode new caps bufferlist::iterator blp = ex->cap_bl.begin(); decode_import_inode_caps(in, false, blp, finish->peer_exports); assert(!finish->peer_exports.empty()); // thus, inode is pinned. // journal open client sessions - version_t pv = mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap); - ESessions *le = new ESessions(pv, ex->client_map); + ESessions *le = new ESessions(pv, client_map); mds->mdlog->start_submit_entry(le, finish); mds->mdlog->flush(); @@ -3352,22 +3374,33 @@ void Migrator::handle_export_caps(MExportCaps *ex) void Migrator::logged_import_caps(CInode *in, mds_rank_t from, - map >& peer_exports, - map& client_map, - map& sseqmap) + map >& imported_session_map, + map >& peer_exports) { dout(10) << "logged_import_caps on " << *in << dendl; // see export_go() vs export_go_synced() assert(in->is_auth()); // force open client sessions and finish cap import - mds->server->finish_force_open_sessions(client_map, sseqmap); + mds->server->finish_force_open_sessions(imported_session_map); map imported_caps; - assert(peer_exports.count(in)); + auto it = peer_exports.find(in); + assert(it != peer_exports.end()); + // clients will release caps from the exporter when they receive the cap import message. - finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps); + finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps); mds->locker->eval(in, CEPH_CAP_LOCKS, true); in->auth_unpin(this); } + +void Migrator::handle_conf_change(const struct md_config_t *conf, + const std::set &changed, + const MDSMap &mds_map) +{ + if (changed.count("mds_inject_migrator_session_race")) { + inject_session_race = conf->get_val("mds_inject_migrator_session_race"); + dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl; + } +}