#include "MDSTableClient.h"
#include "events/ETableClient.h"
-#include "messages/MMDSTableRequest.h"
-
#include "common/config.h"
#define dout_context g_ceph_context
};
-void MDSTableClient::handle_request(class MMDSTableRequest *m)
+void MDSTableClient::handle_request(const MMDSTableRequest::const_ref &m)
{
dout(10) << "handle_request " << *m << dendl;
- assert(m->table == table);
+ ceph_assert(m->table == table);
+
+ if (mds->get_state() < MDSMap::STATE_RESOLVE) {
+ if (mds->get_want_state() == CEPH_MDS_STATE_RESOLVE) {
+ mds->wait_for_resolve(new C_MDS_RetryMessage(mds, m));
+ }
+ return;
+ }
version_t tid = m->get_tid();
uint64_t reqid = m->reqid;
case TABLESERVER_OP_QUERY_REPLY:
handle_query_result(m);
break;
+
+ case TABLESERVER_OP_NOTIFY_PREP:
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 9);
+ handle_notify_prep(m);
+ break;
case TABLESERVER_OP_AGREE:
if (pending_prepare.count(reqid)) {
dout(10) << "got agree on " << reqid << " atid " << tid << dendl;
- assert(g_conf->mds_kill_mdstable_at != 3);
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 3);
- MDSInternalContextBase *onfinish = pending_prepare[reqid].onfinish;
+ MDSContext *onfinish = pending_prepare[reqid].onfinish;
*pending_prepare[reqid].ptid = tid;
if (pending_prepare[reqid].pbl)
*pending_prepare[reqid].pbl = m->bl;
}
else if (prepared_update.count(tid)) {
dout(10) << "got duplicated agree on " << reqid << " atid " << tid << dendl;
- assert(prepared_update[tid] == reqid);
- assert(!server_ready);
+ ceph_assert(prepared_update[tid] == reqid);
+ ceph_assert(!server_ready);
}
else if (pending_commit.count(tid)) {
dout(10) << "stray agree on " << reqid << " tid " << tid
<< ", already committing, will resend COMMIT" << dendl;
- assert(!server_ready);
+ ceph_assert(!server_ready);
// will re-send commit when receiving the server ready message
}
else {
dout(10) << "stray agree on " << reqid << " tid " << tid
<< ", sending ROLLBACK" << dendl;
- assert(!server_ready);
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_ROLLBACK, 0, tid);
+ ceph_assert(!server_ready);
+ auto req = MMDSTableRequest::create(table, TABLESERVER_OP_ROLLBACK, 0, tid);
mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
}
break;
pending_commit[tid]->pending_commit_tids[table].count(tid)) {
dout(10) << "got ack on tid " << tid << ", logging" << dendl;
- assert(g_conf->mds_kill_mdstable_at != 7);
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 7);
// remove from committing list
pending_commit[tid]->pending_commit_tids[table].erase(tid);
break;
case TABLESERVER_OP_SERVER_READY:
- assert(!server_ready);
+ ceph_assert(!server_ready);
server_ready = true;
if (last_reqid == ~0ULL)
break;
default:
- assert(0 == "unrecognized mds_table_client request op");
+ ceph_abort_msg("unrecognized mds_table_client request op");
}
-
- m->put();
}
void MDSTableClient::_logged_ack(version_t tid)
{
dout(10) << "_logged_ack " << tid << dendl;
-
- assert(g_conf->mds_kill_mdstable_at != 8);
-
// kick any waiters (LogSegment trim)
if (ack_waiters.count(tid)) {
dout(15) << "kicking ack waiters on tid " << tid << dendl;
}
void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl,
- MDSInternalContextBase *onfinish)
+ MDSContext *onfinish)
{
if (last_reqid == ~0ULL) {
dout(10) << "tableserver is not ready yet, waiting for request id" << dendl;
if (server_ready) {
// send message
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, reqid);
+ auto req = MMDSTableRequest::create(table, TABLESERVER_OP_PREPARE, reqid);
req->bl = mutation;
mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
} else
{
dout(10) << "commit " << tid << dendl;
- assert(prepared_update.count(tid));
+ ceph_assert(prepared_update.count(tid));
prepared_update.erase(tid);
- assert(pending_commit.count(tid) == 0);
+ ceph_assert(pending_commit.count(tid) == 0);
pending_commit[tid] = ls;
ls->pending_commit_tids[table].insert(tid);
- assert(g_conf->mds_kill_mdstable_at != 4);
+ notify_commit(tid);
+
+ ceph_assert(g_conf()->mds_kill_mdstable_at != 4);
if (server_ready) {
// send message
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, tid);
+ auto req = MMDSTableRequest::create(table, TABLESERVER_OP_COMMIT, 0, tid);
mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
} else
dout(10) << "tableserver is not ready yet, deferring request" << dendl;
dout(10) << "got_journaled_agree " << tid << dendl;
ls->pending_commit_tids[table].insert(tid);
pending_commit[tid] = ls;
+
+ notify_commit(tid);
}
void MDSTableClient::got_journaled_ack(version_t tid)
p != pending_commit.end();
++p) {
dout(10) << "resending commit on " << p->first << dendl;
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, p->first);
+ auto req = MMDSTableRequest::create(table, TABLESERVER_OP_COMMIT, 0, p->first);
mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
}
}
p != pending_prepare.end();
++p) {
dout(10) << "resending prepare on " << p->first << dendl;
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, p->first);
+ auto req = MMDSTableRequest::create(table, TABLESERVER_OP_PREPARE, p->first);
req->bl = p->second.mutation;
mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
}