1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "MDSContext.h"
18 #include "msg/Messenger.h"
22 #include "LogSegment.h"
24 #include "MDSTableClient.h"
25 #include "events/ETableClient.h"
27 #include "common/config.h"
29 #define dout_context g_ceph_context
30 #define dout_subsys ceph_subsys_mds
32 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".tableclient(" << get_mdstable_name(table) << ") "
35 class C_LoggedAck
: public MDSLogContextBase
{
38 MDSRank
*get_mds() override
{ return tc
->mds
; }
40 C_LoggedAck(MDSTableClient
*a
, version_t t
) : tc(a
), tid(t
) {}
41 void finish(int r
) override
{
47 void MDSTableClient::handle_request(const cref_t
<MMDSTableRequest
> &m
)
49 dout(10) << "handle_request " << *m
<< dendl
;
50 ceph_assert(m
->table
== table
);
52 if (mds
->get_state() < MDSMap::STATE_RESOLVE
) {
53 if (mds
->get_want_state() == CEPH_MDS_STATE_RESOLVE
) {
54 mds
->wait_for_resolve(new C_MDS_RetryMessage(mds
, m
));
59 version_t tid
= m
->get_tid();
60 uint64_t reqid
= m
->reqid
;
63 case TABLESERVER_OP_QUERY_REPLY
:
64 handle_query_result(m
);
67 case TABLESERVER_OP_NOTIFY_PREP
:
68 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 9);
69 handle_notify_prep(m
);
72 case TABLESERVER_OP_AGREE
:
73 if (pending_prepare
.count(reqid
)) {
74 dout(10) << "got agree on " << reqid
<< " atid " << tid
<< dendl
;
76 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 3);
78 MDSContext
*onfinish
= pending_prepare
[reqid
].onfinish
;
79 *pending_prepare
[reqid
].ptid
= tid
;
80 if (pending_prepare
[reqid
].pbl
)
81 *pending_prepare
[reqid
].pbl
= m
->bl
;
82 pending_prepare
.erase(reqid
);
83 prepared_update
[tid
] = reqid
;
85 onfinish
->complete(0);
88 else if (prepared_update
.count(tid
)) {
89 dout(10) << "got duplicated agree on " << reqid
<< " atid " << tid
<< dendl
;
90 ceph_assert(prepared_update
[tid
] == reqid
);
91 ceph_assert(!server_ready
);
93 else if (pending_commit
.count(tid
)) {
94 dout(10) << "stray agree on " << reqid
<< " tid " << tid
95 << ", already committing, will resend COMMIT" << dendl
;
96 ceph_assert(!server_ready
);
97 // will re-send commit when receiving the server ready message
100 dout(10) << "stray agree on " << reqid
<< " tid " << tid
101 << ", sending ROLLBACK" << dendl
;
102 ceph_assert(!server_ready
);
103 auto req
= make_message
<MMDSTableRequest
>(table
, TABLESERVER_OP_ROLLBACK
, 0, tid
);
104 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
108 case TABLESERVER_OP_ACK
:
109 if (pending_commit
.count(tid
) &&
110 pending_commit
[tid
]->pending_commit_tids
[table
].count(tid
)) {
111 dout(10) << "got ack on tid " << tid
<< ", logging" << dendl
;
113 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 7);
115 // remove from committing list
116 pending_commit
[tid
]->pending_commit_tids
[table
].erase(tid
);
117 pending_commit
.erase(tid
);
120 mds
->mdlog
->start_submit_entry(new ETableClient(table
, TABLESERVER_OP_ACK
, tid
),
121 new C_LoggedAck(this, tid
));
123 dout(10) << "got stray ack on tid " << tid
<< ", ignoring" << dendl
;
127 case TABLESERVER_OP_SERVER_READY
:
128 ceph_assert(!server_ready
);
131 if (last_reqid
== ~0ULL)
140 ceph_abort_msg("unrecognized mds_table_client request op");
145 void MDSTableClient::_logged_ack(version_t tid
)
147 dout(10) << "_logged_ack " << tid
<< dendl
;
148 // kick any waiters (LogSegment trim)
149 if (ack_waiters
.count(tid
)) {
150 dout(15) << "kicking ack waiters on tid " << tid
<< dendl
;
151 mds
->queue_waiters(ack_waiters
[tid
]);
152 ack_waiters
.erase(tid
);
156 void MDSTableClient::_prepare(bufferlist
& mutation
, version_t
*ptid
, bufferlist
*pbl
,
157 MDSContext
*onfinish
)
159 if (last_reqid
== ~0ULL) {
160 dout(10) << "tableserver is not ready yet, waiting for request id" << dendl
;
161 waiting_for_reqid
.push_back(_pending_prepare(onfinish
, ptid
, pbl
, mutation
));
165 uint64_t reqid
= ++last_reqid
;
166 dout(10) << "_prepare " << reqid
<< dendl
;
168 pending_prepare
[reqid
].mutation
= mutation
;
169 pending_prepare
[reqid
].ptid
= ptid
;
170 pending_prepare
[reqid
].pbl
= pbl
;
171 pending_prepare
[reqid
].onfinish
= onfinish
;
175 auto req
= make_message
<MMDSTableRequest
>(table
, TABLESERVER_OP_PREPARE
, reqid
);
177 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
179 dout(10) << "tableserver is not ready yet, deferring request" << dendl
;
182 void MDSTableClient::commit(version_t tid
, LogSegment
*ls
)
184 dout(10) << "commit " << tid
<< dendl
;
186 ceph_assert(prepared_update
.count(tid
));
187 prepared_update
.erase(tid
);
189 ceph_assert(pending_commit
.count(tid
) == 0);
190 pending_commit
[tid
] = ls
;
191 ls
->pending_commit_tids
[table
].insert(tid
);
195 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 4);
199 auto req
= make_message
<MMDSTableRequest
>(table
, TABLESERVER_OP_COMMIT
, 0, tid
);
200 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
202 dout(10) << "tableserver is not ready yet, deferring request" << dendl
;
209 void MDSTableClient::got_journaled_agree(version_t tid
, LogSegment
*ls
)
211 dout(10) << "got_journaled_agree " << tid
<< dendl
;
212 ls
->pending_commit_tids
[table
].insert(tid
);
213 pending_commit
[tid
] = ls
;
218 void MDSTableClient::got_journaled_ack(version_t tid
)
220 dout(10) << "got_journaled_ack " << tid
<< dendl
;
221 if (pending_commit
.count(tid
)) {
222 pending_commit
[tid
]->pending_commit_tids
[table
].erase(tid
);
223 pending_commit
.erase(tid
);
227 void MDSTableClient::resend_commits()
229 for (auto p
= pending_commit
.begin(); p
!= pending_commit
.end(); ++p
) {
230 dout(10) << "resending commit on " << p
->first
<< dendl
;
231 auto req
= make_message
<MMDSTableRequest
>(table
, TABLESERVER_OP_COMMIT
, 0, p
->first
);
232 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
236 void MDSTableClient::resend_prepares()
238 while (!waiting_for_reqid
.empty()) {
239 pending_prepare
[++last_reqid
] = waiting_for_reqid
.front();
240 waiting_for_reqid
.pop_front();
243 for (auto p
= pending_prepare
.begin(); p
!= pending_prepare
.end(); ++p
) {
244 dout(10) << "resending prepare on " << p
->first
<< dendl
;
245 auto req
= make_message
<MMDSTableRequest
>(table
, TABLESERVER_OP_PREPARE
, p
->first
);
246 req
->bl
= p
->second
.mutation
;
247 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
251 void MDSTableClient::handle_mds_failure(mds_rank_t who
)
253 if (who
!= mds
->get_mds_map()->get_tableserver())
254 return; // do nothing.
256 dout(7) << "tableserver mds." << who
<< " fails" << dendl
;
257 server_ready
= false;