#include "Mutation.h"
#include "include/filepath.h"
+#include "common/likely.h"
#include "events/EExport.h"
#include "events/EImportStart.h"
handle_export_prep(static_cast<MExportDirPrep*>(m));
break;
case MSG_MDS_EXPORTDIR:
- handle_export_dir(static_cast<MExportDir*>(m));
+ if (unlikely(inject_session_race)) {
+ dout(0) << "waiting for inject_session_race" << dendl;
+ mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
+ } else {
+ handle_export_dir(static_cast<MExportDir*>(m));
+ }
break;
case MSG_MDS_EXPORTDIRFINISH:
handle_export_finish(static_cast<MExportDirFinish*>(m));
map<client_t,Capability::Import>::iterator q = peer_imported.find(it->first);
assert(q != peer_imported.end());
- m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0);
+ m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
+ (q->second.cap_id > 0 ? peer : -1), 0);
mds->send_message_client_counted(m, it->first);
}
in->clear_client_caps_after_export();
CDir *dir;
mds_rank_t from;
public:
- map<client_t,entity_inst_t> imported_client_map;
- map<client_t,uint64_t> sseqmap;
+ map<client_t,pair<Session*,uint64_t> > imported_session_map;
C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
}
void finish(int r) override {
- mig->import_logged_start(df, dir, from, imported_client_map, sseqmap);
+ mig->import_logged_start(df, dir, from, imported_session_map);
}
};
// new client sessions, open these after we journal
// include imported sessions in EImportStart
bufferlist::iterator cmp = m->client_map.begin();
- ::decode(onlogged->imported_client_map, cmp);
+ map<client_t,entity_inst_t> client_map;
+ decode(client_map, cmp);
assert(cmp.end());
- le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap);
- le->client_map.claim(m->client_map);
+ le->cmapv = mds->server->prepare_force_open_sessions(client_map, onlogged->imported_session_map);
+ encode(client_map, le->client_map, mds->mdsmap->get_up_features());
bufferlist::iterator blp = m->export_data.begin();
int num_imported_inodes = 0;
if (stat.state == IMPORT_ACKING) {
// remove imported caps
for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
- p != stat.peer_exports.end();
- ++p) {
+ p != stat.peer_exports.end();
+ ++p) {
CInode *in = p->first;
for (map<client_t,Capability::Export>::iterator q = p->second.begin();
- q != p->second.end();
- ++q) {
+ q != p->second.end();
+ ++q) {
Capability *cap = in->get_client_cap(q->first);
- assert(cap);
+ if (!cap) {
+ assert(!stat.session_map.count(q->first));
+ continue;
+ }
if (cap->is_importing())
in->remove_client_cap(q->first);
}
in->put(CInode::PIN_IMPORTINGCAPS);
}
- for (map<client_t,entity_inst_t>::iterator p = stat.client_map.begin();
- p != stat.client_map.end();
- ++p) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
- assert(session);
+ for (auto& p : stat.session_map) {
+ Session *session = p.second.first;
session->dec_importing();
}
}
void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
- map<client_t,entity_inst_t>& imported_client_map,
- map<client_t,uint64_t>& sseqmap)
+ map<client_t,pair<Session*,uint64_t> >& imported_session_map)
{
map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
if (it == import_state.end() ||
it->second.state != IMPORT_LOGGINGSTART) {
dout(7) << "import " << df << " must have aborted" << dendl;
- mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
+ mds->server->finish_force_open_sessions(imported_session_map);
return;
}
assert (g_conf->mds_kill_import_at != 7);
// force open client sessions and finish cap import
- mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false);
- it->second.client_map.swap(imported_client_map);
+ mds->server->finish_force_open_sessions(imported_session_map, false);
map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
p != it->second.peer_exports.end();
++p) {
// parameter 'peer' is NONE, delay sending cap import messages to client
- finish_import_inode_caps(p->first, MDS_RANK_NONE, true, p->second, imported_caps[p->first->ino()]);
+ finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
+ p->second, imported_caps[p->first->ino()]);
}
+
+ it->second.session_map.swap(imported_session_map);
// send notify's etc.
dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
for (map<client_t,Capability::Export>::iterator q = p->second.begin();
q != p->second.end();
++q) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ auto r = it->second.session_map.find(q->first);
+ if (r == it->second.session_map.end())
+ continue;
+
+ Session *session = r->second.first;
Capability *cap = in->get_client_cap(q->first);
assert(cap);
cap->merge(q->second, true);
p->second.clear();
in->replica_caps_wanted = 0;
}
- for (map<client_t,entity_inst_t>::iterator p = it->second.client_map.begin();
- p != it->second.client_map.end();
- ++p) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
- assert(session);
+ for (auto& p : it->second.session_map) {
+ Session *session = p.second.first;
session->dec_importing();
}
}
assert(!dn->get_linkage()->get_inode());
dn->dir->link_primary_inode(dn, in);
}
+
+ if (in->is_dir())
+ dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru);
// add inode?
if (added) {
}
void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
- map<client_t,Capability::Export> &export_map,
+ const map<client_t,pair<Session*,uint64_t> >& session_map,
+ const map<client_t,Capability::Export> &export_map,
map<client_t,Capability::Import> &import_map)
{
- for (map<client_t,Capability::Export>::iterator it = export_map.begin();
- it != export_map.end();
- ++it) {
- dout(10) << "finish_import_inode_caps for client." << it->first << " on " << *in << dendl;
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first.v));
- assert(session);
+ for (auto& it : export_map) {
+ dout(10) << "finish_import_inode_caps for client." << it.first << " on " << *in << dendl;
+
+ auto p = session_map.find(it.first);
+ if (p == session_map.end()) {
+ dout(10) << " no session for client." << it.first << dendl;
+ (void)import_map[it.first];
+ continue;
+ }
- Capability *cap = in->get_client_cap(it->first);
+ Session *session = p->second.first;
+
+ Capability *cap = in->get_client_cap(it.first);
if (!cap) {
- cap = in->add_client_cap(it->first, session);
+ cap = in->add_client_cap(it.first, session);
if (peer < 0)
cap->mark_importing();
}
- Capability::Import& im = import_map[it->first];
+ Capability::Import& im = import_map[it.first];
im.cap_id = cap->get_cap_id();
- im.mseq = auth_cap ? it->second.mseq : cap->get_mseq();
+ im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
im.issue_seq = cap->get_last_seq() + 1;
if (peer >= 0) {
- cap->merge(it->second, auth_cap);
- mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
- it->second.seq, it->second.mseq - 1, peer,
+ cap->merge(it.second, auth_cap);
+ mds->mdcache->do_cap_import(session, in, cap, it.second.cap_id,
+ it.second.seq, it.second.mseq - 1, peer,
auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
}
}
CInode *in;
mds_rank_t from;
public:
+ map<client_t,pair<Session*,uint64_t> > imported_session_map;
map<CInode*, map<client_t,Capability::Export> > peer_exports;
- map<client_t,entity_inst_t> client_map;
- map<client_t,uint64_t> sseqmap;
C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
void finish(int r) override {
- mig->logged_import_caps(in, from, peer_exports, client_map, sseqmap);
+ mig->logged_import_caps(in, from, imported_session_map, peer_exports);
}
};
assert(in->is_auth());
// FIXME
- if (!in->can_auth_pin())
+ if (!in->can_auth_pin()) {
+ ex->put();
return;
+ }
+
in->auth_pin(this);
+ map<client_t,entity_inst_t> client_map;
+ client_map.swap(ex->client_map);
+
C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
this, in, mds_rank_t(ex->get_source().num()));
- finish->client_map = ex->client_map;
+ version_t pv = mds->server->prepare_force_open_sessions(client_map,
+ finish->imported_session_map);
// decode new caps
bufferlist::iterator blp = ex->cap_bl.begin();
decode_import_inode_caps(in, false, blp, finish->peer_exports);
assert(!finish->peer_exports.empty()); // thus, inode is pinned.
// journal open client sessions
- version_t pv = mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap);
- ESessions *le = new ESessions(pv, ex->client_map);
+ ESessions *le = new ESessions(pv, client_map);
mds->mdlog->start_submit_entry(le, finish);
mds->mdlog->flush();
void Migrator::logged_import_caps(CInode *in,
mds_rank_t from,
- map<CInode*, map<client_t,Capability::Export> >& peer_exports,
- map<client_t,entity_inst_t>& client_map,
- map<client_t,uint64_t>& sseqmap)
+ map<client_t,pair<Session*,uint64_t> >& imported_session_map,
+ map<CInode*, map<client_t,Capability::Export> >& peer_exports)
{
dout(10) << "logged_import_caps on " << *in << dendl;
// see export_go() vs export_go_synced()
assert(in->is_auth());
// force open client sessions and finish cap import
- mds->server->finish_force_open_sessions(client_map, sseqmap);
+ mds->server->finish_force_open_sessions(imported_session_map);
map<client_t,Capability::Import> imported_caps;
- assert(peer_exports.count(in));
+ auto it = peer_exports.find(in);
+ assert(it != peer_exports.end());
+
// clients will release caps from the exporter when they receive the cap import message.
- finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
+ finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
in->auth_unpin(this);
}
+
+void Migrator::handle_conf_change(const struct md_config_t *conf,
+ const std::set <std::string> &changed,
+ const MDSMap &mds_map)
+{
+ if (changed.count("mds_inject_migrator_session_race")) {
+ inject_session_race = conf->get_val<bool>("mds_inject_migrator_session_race");
+ dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
+ }
+}