/* This function DOES put the passed message before returning*/
void Migrator::dispatch(Message *m)
{
+ if (unlikely(inject_message_loss)) {
+ if (inject_message_loss == m->get_type() - MDS_PORT_MIGRATOR) {
+ dout(0) << "inject message loss " << *m << dendl;
+ m->put();
+ return;
+ }
+ }
+
switch (m->get_type()) {
// import
case MSG_MDS_EXPORTDIRDISCOVER:
case MSG_MDS_EXPORTCAPS:
handle_export_caps(static_cast<MExportCaps*>(m));
break;
+ case MSG_MDS_EXPORTCAPSACK:
+ handle_export_caps_ack(static_cast<MExportCapsAck*>(m));
+ break;
case MSG_MDS_GATHERCAPS:
handle_gather_caps(static_cast<MGatherCaps*>(m));
break;
if (it->second.state == EXPORT_CANCELLED) {
export_state.erase(it);
- dir->state_clear(CDir::STATE_EXPORTING);
+ dir->clear_exporting();
// send pending import_maps?
cache->maybe_send_pending_resolves();
}
void Migrator::export_cancel_finish(CDir *dir)
{
assert(dir->state_test(CDir::STATE_EXPORTING));
- dir->state_clear(CDir::STATE_EXPORTING);
+ dir->clear_exporting();
// pinned by Migrator::export_notify_abort()
dir->auth_unpin(this);
mds->hit_export_target(ceph_clock_now(), dest, -1);
dir->auth_pin(this);
- dir->state_set(CDir::STATE_EXPORTING);
+ dir->mark_exporting();
MDRequestRef mdr = mds->mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
mdr->more()->export_dir = dir;
mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
export_state.erase(it);
- dir->state_clear(CDir::STATE_EXPORTING);
+ dir->clear_exporting();
cache->maybe_send_pending_resolves();
return;
}
MutationRef mut = it->second.mut;
// remove from exporting list, clean up state
export_state.erase(it);
- dir->state_clear(CDir::STATE_EXPORTING);
+ dir->clear_exporting();
cache->show_subtrees();
audit();
cap->mark_importing();
}
- Capability::Import& im = import_map[it.first];
- im.cap_id = cap->get_cap_id();
- im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
- im.issue_seq = cap->get_last_seq() + 1;
+ // Always ask exporter mds to send cap export messages for auth caps.
+ // For non-auth caps, ask exporter mds to send cap export messages to
+ // clients who haven't opened sessions. The cap export messages will
+ // make clients open sessions.
+ if (auth_cap || session->connection == nullptr) {
+ Capability::Import& im = import_map[it.first];
+ im.cap_id = cap->get_cap_id();
+ im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
+ im.issue_seq = cap->get_last_seq() + 1;
+ }
if (peer >= 0) {
cap->merge(it.second, auth_cap);
mds->send_message_mds(ex, dest);
}
+/* This function DOES put the passed message before returning*/
+void Migrator::handle_export_caps_ack(MExportCapsAck *ack)
+{
+ mds_rank_t from = ack->get_source().num();
+ CInode *in = cache->get_inode(ack->ino);
+ if (in) {
+ assert(!in->is_auth());
+
+ dout(10) << "handle_export_caps_ack " << *ack << " from "
+ << ack->get_source() << " on " << *in << dendl;
+
+ map<client_t,Capability::Import> imported_caps;
+ map<client_t,uint64_t> caps_ids;
+ auto blp = ack->cap_bl.begin();
+ ::decode(imported_caps, blp);
+ ::decode(caps_ids, blp);
+
+ for (auto& it : imported_caps) {
+ Capability *cap = in->get_client_cap(it.first);
+ if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
+ continue;
+
+ dout(7) << __func__ << " telling client." << it.first
+ << " exported caps on " << *in << dendl;
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, in->ino(), 0,
+ cap->get_cap_id(), cap->get_mseq(),
+ mds->get_osd_epoch_barrier());
+ m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
+ mds->send_message_client_counted(m, it.first);
+
+ in->remove_client_cap(it.first);
+ }
+
+ mds->locker->request_inode_file_caps(in);
+ mds->locker->try_eval(in, CEPH_CAP_LOCKS);
+ }
+
+ ack->put();
+}
+
void Migrator::handle_gather_caps(MGatherCaps *m)
{
CInode *in = cache->get_inode(m->ino);
-
if (!in)
goto out;
dout(10) << "handle_gather_caps " << *m << " from " << m->get_source()
- << " on " << *in
- << dendl;
+ << " on " << *in << dendl;
+
if (in->is_any_caps() &&
!in->is_auth() &&
!in->is_ambiguous_auth() &&
// force open client sessions and finish cap import
mds->server->finish_force_open_sessions(imported_session_map);
- map<client_t,Capability::Import> imported_caps;
-
auto it = peer_exports.find(in);
assert(it != peer_exports.end());
// clients will release caps from the exporter when they receive the cap import message.
+ map<client_t,Capability::Import> imported_caps;
finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
+
+ if (!imported_caps.empty()) {
+ MExportCapsAck *ack = new MExportCapsAck(in->ino());
+ map<client_t,uint64_t> peer_caps_ids;
+ for (auto &p : imported_caps )
+ peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
+
+ ::encode(imported_caps, ack->cap_bl);
+ ::encode(peer_caps_ids, ack->cap_bl);
+ mds->send_message_mds(ack, from);
+ }
+
in->auth_unpin(this);
}
inject_session_race = conf->get_val<bool>("mds_inject_migrator_session_race");
dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
}
+
+ if (changed.count("mds_inject_migrator_message_loss")) {
+ inject_message_loss = g_conf->get_val<int64_t>("mds_inject_migrator_message_loss");
+ dout(0) << "mds_inject_migrator_message_loss is " << inject_message_loss << dendl;
+ }
}