#include "MDLog.h"
#include "msg/Messenger.h"
-#include "messages/MMDSTableRequest.h"
#include "events/ETableServer.h"
#define dout_context g_ceph_context
#undef dout_prefix
#define dout_prefix *_dout << "mds." << rank << ".tableserver(" << get_mdstable_name(table) << ") "
-/* This function DOES put the passed message before returning */
-void MDSTableServer::handle_request(MMDSTableRequest *req)
+void MDSTableServer::handle_request(const MMDSTableRequest::const_ref &req)
{
- assert(req->op >= 0);
+ ceph_assert(req->op >= 0);
switch (req->op) {
case TABLESERVER_OP_QUERY: return handle_query(req);
case TABLESERVER_OP_PREPARE: return handle_prepare(req);
case TABLESERVER_OP_COMMIT: return handle_commit(req);
case TABLESERVER_OP_ROLLBACK: return handle_rollback(req);
- default: assert(0 == "unrecognized mds_table_server request op");
+ case TABLESERVER_OP_NOTIFY_ACK: return handle_notify_ack(req);
+ default: ceph_abort_msg("unrecognized mds_table_server request op");
}
}
class C_Prepare : public MDSLogContextBase {
MDSTableServer *server;
- MMDSTableRequest *req;
+ MMDSTableRequest::const_ref req;
version_t tid;
MDSRank *get_mds() override { return server->mds; }
public:
- C_Prepare(MDSTableServer *s, MMDSTableRequest *r, version_t v) : server(s), req(r), tid(v) {}
+ C_Prepare(MDSTableServer *s, const MMDSTableRequest::const_ref r, version_t v) : server(s), req(r), tid(v) {}
void finish(int r) override {
server->_prepare_logged(req, tid);
}
};
// prepare
-/* This function DOES put the passed message before returning */
-void MDSTableServer::handle_prepare(MMDSTableRequest *req)
+void MDSTableServer::handle_prepare(const MMDSTableRequest::const_ref &req)
{
dout(7) << "handle_prepare " << *req << dendl;
mds_rank_t from = mds_rank_t(req->get_source().num());
- bufferlist bl = req->bl;
- _prepare(req->bl, req->reqid, from);
- _note_prepare(from, req->reqid);
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 1);
- assert(g_conf->mds_kill_mdstable_at != 1);
+ projected_version++;
- ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from, version, version);
+ ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from,
+ projected_version, projected_version);
mds->mdlog->start_entry(le);
- le->mutation = bl; // original request, NOT modified return value coming out of _prepare!
- mds->mdlog->submit_entry(le, new C_Prepare(this, req, version));
+ le->mutation = req->bl;
+ mds->mdlog->submit_entry(le, new C_Prepare(this, req, projected_version));
mds->mdlog->flush();
}
-void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid)
+void MDSTableServer::_prepare_logged(const MMDSTableRequest::const_ref &req, version_t tid)
{
dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
+ mds_rank_t from = mds_rank_t(req->get_source().num());
- assert(g_conf->mds_kill_mdstable_at != 2);
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 2);
- MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid);
- reply->bl = req->bl;
- mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
- req->put();
+ _note_prepare(from, req->reqid);
+ bufferlist out;
+ _prepare(req->bl, req->reqid, from, out);
+ ceph_assert(version == tid);
+
+ auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_AGREE, req->reqid, tid);
+ reply->bl = std::move(out);
+
+ if (_notify_prep(tid)) {
+ auto& p = pending_notifies[tid];
+ p.notify_ack_gather = active_clients;
+ p.mds = from;
+ p.reply = reply;
+ } else {
+ mds->send_message_mds(reply, from);
+ }
+}
+
+void MDSTableServer::handle_notify_ack(const MMDSTableRequest::const_ref &m)
+{
+ dout(7) << __func__ << " " << *m << dendl;
+ mds_rank_t from = mds_rank_t(m->get_source().num());
+ version_t tid = m->get_tid();
+
+ auto p = pending_notifies.find(tid);
+ if (p != pending_notifies.end()) {
+ if (p->second.notify_ack_gather.erase(from)) {
+ if (p->second.notify_ack_gather.empty()) {
+ if (p->second.onfinish)
+ p->second.onfinish->complete(0);
+ else
+ mds->send_message_mds(p->second.reply, p->second.mds);
+ pending_notifies.erase(p);
+ }
+ } else {
+ dout(0) << "got unexpected notify ack for tid " << tid << " from mds." << from << dendl;
+ }
+ } else {
+ }
}
class C_Commit : public MDSLogContextBase {
MDSTableServer *server;
- MMDSTableRequest *req;
+ MMDSTableRequest::const_ref req;
MDSRank *get_mds() override { return server->mds; }
public:
- C_Commit(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {}
+ C_Commit(MDSTableServer *s, const MMDSTableRequest::const_ref &r) : server(s), req(r) {}
void finish(int r) override {
server->_commit_logged(req);
}
};
// commit
-/* This function DOES put the passed message before returning */
-void MDSTableServer::handle_commit(MMDSTableRequest *req)
+void MDSTableServer::handle_commit(const MMDSTableRequest::const_ref &req)
{
dout(7) << "handle_commit " << *req << dendl;
if (pending_for_mds.count(tid)) {
- assert(g_conf->mds_kill_mdstable_at != 5);
-
- if (!_commit(tid, req))
+ if (committing_tids.count(tid)) {
+ dout(0) << "got commit for tid " << tid << ", already committing, waiting." << dendl;
return;
+ }
+
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 5);
+
+ projected_version++;
+ committing_tids.insert(tid);
- _note_commit(tid);
mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE,
- tid, version),
+ tid, projected_version),
new C_Commit(this, req));
}
else if (tid <= version) {
- dout(0) << "got commit for tid " << tid << " <= " << version
- << ", already committed, sending ack."
- << dendl;
- _commit_logged(req);
+ dout(0) << "got commit for tid " << tid << " <= " << version
+ << ", already committed, sending ack." << dendl;
+ auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_ACK, req->reqid, tid);
+ mds->send_message(reply, req->get_connection());
}
else {
// wtf.
dout(0) << "got commit for tid " << tid << " > " << version << dendl;
- assert(tid <= version);
+ ceph_assert(tid <= version);
}
}
-/* This function DOES put the passed message before returning */
-void MDSTableServer::_commit_logged(MMDSTableRequest *req)
+void MDSTableServer::_commit_logged(const MMDSTableRequest::const_ref &req)
{
dout(7) << "_commit_logged, sending ACK" << dendl;
- assert(g_conf->mds_kill_mdstable_at != 6);
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 6);
+ version_t tid = req->get_tid();
+
+ pending_for_mds.erase(tid);
+ committing_tids.erase(tid);
- MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
+ _commit(tid, req);
+ _note_commit(tid);
+
+ auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
- req->put();
}
+class C_Rollback : public MDSLogContextBase {
+ MDSTableServer *server;
+ MMDSTableRequest::const_ref req;
+ MDSRank *get_mds() override { return server->mds; }
+public:
+ C_Rollback(MDSTableServer *s, const MMDSTableRequest::const_ref &r) : server(s), req(r) {}
+ void finish(int r) override {
+ server->_rollback_logged(req);
+ }
+};
+
// ROLLBACK
-/* This function DOES put the passed message before returning */
-void MDSTableServer::handle_rollback(MMDSTableRequest *req)
+void MDSTableServer::handle_rollback(const MMDSTableRequest::const_ref &req)
{
dout(7) << "handle_rollback " << *req << dendl;
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 8);
+ version_t tid = req->get_tid();
+ ceph_assert(pending_for_mds.count(tid));
+ ceph_assert(!committing_tids.count(tid));
+
+ projected_version++;
+ committing_tids.insert(tid);
+
+ mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE,
+ tid, projected_version),
+ new C_Rollback(this, req));
+}
+
+void MDSTableServer::_rollback_logged(const MMDSTableRequest::const_ref &req)
+{
+ dout(7) << "_rollback_logged " << *req << dendl;
+
version_t tid = req->get_tid();
- assert(pending_for_mds.count(tid));
+
+ pending_for_mds.erase(tid);
+ committing_tids.erase(tid);
+
_rollback(tid);
_note_rollback(tid);
- mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE,
- tid, version));
- req->put();
}
// SERVER UPDATE
+class C_ServerUpdate : public MDSLogContextBase {
+ MDSTableServer *server;
+ bufferlist bl;
+ MDSRank *get_mds() override { return server->mds; }
+public:
+ C_ServerUpdate(MDSTableServer *s, bufferlist &b) : server(s), bl(b) {}
+ void finish(int r) override {
+ server->_server_update_logged(bl);
+ }
+};
void MDSTableServer::do_server_update(bufferlist& bl)
{
dout(10) << "do_server_update len " << bl.length() << dendl;
- _server_update(bl);
- ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, version);
+
+ projected_version++;
+
+ ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, projected_version);
mds->mdlog->start_entry(le);
le->mutation = bl;
- mds->mdlog->submit_entry(le);
+ mds->mdlog->submit_entry(le, new C_ServerUpdate(this, bl));
}
+void MDSTableServer::_server_update_logged(bufferlist& bl)
+{
+ dout(10) << "_server_update_logged len " << bl.length() << dendl;
+ _server_update(bl);
+ _note_server_update(bl);
+}
// recovery
+class C_ServerRecovery : public MDSContext {
+ MDSTableServer *server;
+ MDSRank *get_mds() override { return server->mds; }
+public:
+ C_ServerRecovery(MDSTableServer *s) : server(s) {}
+ void finish(int r) override {
+ server->_do_server_recovery();
+ }
+};
+
+void MDSTableServer::_do_server_recovery()
+{
+ dout(7) << __func__ << " " << active_clients << dendl;
+ map<mds_rank_t, uint64_t> next_reqids;
+
+ for (auto p : pending_for_mds) {
+ mds_rank_t who = p.second.mds;
+ if (!active_clients.count(who))
+ continue;
+
+ if (p.second.reqid >= next_reqids[who])
+ next_reqids[who] = p.second.reqid + 1;
+
+ version_t tid = p.second.tid;
+ auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_AGREE, p.second.reqid, tid);
+ _get_reply_buffer(tid, &reply->bl);
+ mds->send_message_mds(reply, who);
+ }
+
+ for (auto p : active_clients) {
+ auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_SERVER_READY, next_reqids[p]);
+ mds->send_message_mds(reply, p);
+ }
+ recovered = true;
+}
+
void MDSTableServer::finish_recovery(set<mds_rank_t>& active)
{
- dout(7) << "finish_recovery" << dendl;
- for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
- handle_mds_recovery(*p); // resend agrees for everyone.
+ dout(7) << __func__ << dendl;
+
+ active_clients = active;
+
+ // don't know if survivor mds have received all 'notify prep' messages.
+ // so we need to send 'notify prep' again.
+ if (!pending_for_mds.empty() && _notify_prep(version)) {
+ auto& q = pending_notifies[version];
+ q.notify_ack_gather = active_clients;
+ q.mds = MDS_RANK_NONE;
+ q.onfinish = new C_ServerRecovery(this);
+ } else {
+ _do_server_recovery();
+ }
}
void MDSTableServer::handle_mds_recovery(mds_rank_t who)
{
dout(7) << "handle_mds_recovery mds." << who << dendl;
+ active_clients.insert(who);
+ if (!recovered) {
+ dout(7) << " still not recovered, delaying" << dendl;
+ return;
+ }
+
uint64_t next_reqid = 0;
// resend agrees for recovered mds
- for (map<version_t,mds_table_pending_t>::iterator p = pending_for_mds.begin();
- p != pending_for_mds.end();
- ++p) {
+ for (auto p = pending_for_mds.begin(); p != pending_for_mds.end(); ++p) {
if (p->second.mds != who)
continue;
+ ceph_assert(!pending_notifies.count(p->second.tid));
+
if (p->second.reqid >= next_reqid)
next_reqid = p->second.reqid + 1;
- MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
+
+ auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
+ _get_reply_buffer(p->second.tid, &reply->bl);
mds->send_message_mds(reply, who);
}
- MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid);
+ auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_SERVER_READY, next_reqid);
mds->send_message_mds(reply, who);
}
+
+void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who)
+{
+ dout(7) << __func__ << " mds." << who << dendl;
+
+ active_clients.erase(who);
+
+ list<MMDSTableRequest::ref> rollback;
+ for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) {
+ auto q = p++;
+ if (q->second.mds == who) {
+ // haven't sent reply yet.
+ rollback.push_back(q->second.reply);
+ pending_notifies.erase(q);
+ } else if (q->second.notify_ack_gather.erase(who)) {
+ // the failed mds will reload snaptable when it recovers.
+ // so we can remove it from the gather set.
+ if (q->second.notify_ack_gather.empty()) {
+ if (q->second.onfinish)
+ q->second.onfinish->complete(0);
+ else
+ mds->send_message_mds(q->second.reply, q->second.mds);
+ pending_notifies.erase(q);
+ }
+ }
+ }
+
+ for (auto &req : rollback) {
+ req->op = TABLESERVER_OP_ROLLBACK;
+ handle_rollback(req);
+ }
+}