1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MDSTableServer.h"
18 #include "msg/Messenger.h"
20 #include "events/ETableServer.h"
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_mds
25 #define dout_prefix *_dout << "mds." << rank << ".tableserver(" << get_mdstable_name(table) << ") "
27 void MDSTableServer::handle_request(const MMDSTableRequest::const_ref
&req
)
29 ceph_assert(req
->op
>= 0);
31 case TABLESERVER_OP_QUERY
: return handle_query(req
);
32 case TABLESERVER_OP_PREPARE
: return handle_prepare(req
);
33 case TABLESERVER_OP_COMMIT
: return handle_commit(req
);
34 case TABLESERVER_OP_ROLLBACK
: return handle_rollback(req
);
35 case TABLESERVER_OP_NOTIFY_ACK
: return handle_notify_ack(req
);
36 default: ceph_abort_msg("unrecognized mds_table_server request op");
40 class C_Prepare
: public MDSLogContextBase
{
41 MDSTableServer
*server
;
42 MMDSTableRequest::const_ref req
;
44 MDSRank
*get_mds() override
{ return server
->mds
; }
47 C_Prepare(MDSTableServer
*s
, const MMDSTableRequest::const_ref r
, version_t v
) : server(s
), req(r
), tid(v
) {}
48 void finish(int r
) override
{
49 server
->_prepare_logged(req
, tid
);
54 void MDSTableServer::handle_prepare(const MMDSTableRequest::const_ref
&req
)
56 dout(7) << "handle_prepare " << *req
<< dendl
;
57 mds_rank_t from
= mds_rank_t(req
->get_source().num());
59 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 1);
63 ETableServer
*le
= new ETableServer(table
, TABLESERVER_OP_PREPARE
, req
->reqid
, from
,
64 projected_version
, projected_version
);
65 mds
->mdlog
->start_entry(le
);
66 le
->mutation
= req
->bl
;
67 mds
->mdlog
->submit_entry(le
, new C_Prepare(this, req
, projected_version
));
71 void MDSTableServer::_prepare_logged(const MMDSTableRequest::const_ref
&req
, version_t tid
)
73 dout(7) << "_create_logged " << *req
<< " tid " << tid
<< dendl
;
74 mds_rank_t from
= mds_rank_t(req
->get_source().num());
76 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 2);
78 _note_prepare(from
, req
->reqid
);
80 _prepare(req
->bl
, req
->reqid
, from
, out
);
81 ceph_assert(version
== tid
);
83 auto reply
= MMDSTableRequest::create(table
, TABLESERVER_OP_AGREE
, req
->reqid
, tid
);
84 reply
->bl
= std::move(out
);
86 if (_notify_prep(tid
)) {
87 auto& p
= pending_notifies
[tid
];
88 p
.notify_ack_gather
= active_clients
;
92 mds
->send_message_mds(reply
, from
);
96 void MDSTableServer::handle_notify_ack(const MMDSTableRequest::const_ref
&m
)
98 dout(7) << __func__
<< " " << *m
<< dendl
;
99 mds_rank_t from
= mds_rank_t(m
->get_source().num());
100 version_t tid
= m
->get_tid();
102 auto p
= pending_notifies
.find(tid
);
103 if (p
!= pending_notifies
.end()) {
104 if (p
->second
.notify_ack_gather
.erase(from
)) {
105 if (p
->second
.notify_ack_gather
.empty()) {
106 if (p
->second
.onfinish
)
107 p
->second
.onfinish
->complete(0);
109 mds
->send_message_mds(p
->second
.reply
, p
->second
.mds
);
110 pending_notifies
.erase(p
);
113 dout(0) << "got unexpected notify ack for tid " << tid
<< " from mds." << from
<< dendl
;
119 class C_Commit
: public MDSLogContextBase
{
120 MDSTableServer
*server
;
121 MMDSTableRequest::const_ref req
;
122 MDSRank
*get_mds() override
{ return server
->mds
; }
124 C_Commit(MDSTableServer
*s
, const MMDSTableRequest::const_ref
&r
) : server(s
), req(r
) {}
125 void finish(int r
) override
{
126 server
->_commit_logged(req
);
131 void MDSTableServer::handle_commit(const MMDSTableRequest::const_ref
&req
)
133 dout(7) << "handle_commit " << *req
<< dendl
;
135 version_t tid
= req
->get_tid();
137 if (pending_for_mds
.count(tid
)) {
139 if (committing_tids
.count(tid
)) {
140 dout(0) << "got commit for tid " << tid
<< ", already committing, waiting." << dendl
;
144 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 5);
147 committing_tids
.insert(tid
);
149 mds
->mdlog
->start_submit_entry(new ETableServer(table
, TABLESERVER_OP_COMMIT
, 0, MDS_RANK_NONE
,
150 tid
, projected_version
),
151 new C_Commit(this, req
));
153 else if (tid
<= version
) {
154 dout(0) << "got commit for tid " << tid
<< " <= " << version
155 << ", already committed, sending ack." << dendl
;
156 auto reply
= MMDSTableRequest::create(table
, TABLESERVER_OP_ACK
, req
->reqid
, tid
);
157 mds
->send_message(reply
, req
->get_connection());
161 dout(0) << "got commit for tid " << tid
<< " > " << version
<< dendl
;
162 ceph_assert(tid
<= version
);
166 void MDSTableServer::_commit_logged(const MMDSTableRequest::const_ref
&req
)
168 dout(7) << "_commit_logged, sending ACK" << dendl
;
170 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 6);
171 version_t tid
= req
->get_tid();
173 pending_for_mds
.erase(tid
);
174 committing_tids
.erase(tid
);
179 auto reply
= MMDSTableRequest::create(table
, TABLESERVER_OP_ACK
, req
->reqid
, req
->get_tid());
180 mds
->send_message_mds(reply
, mds_rank_t(req
->get_source().num()));
183 class C_Rollback
: public MDSLogContextBase
{
184 MDSTableServer
*server
;
185 MMDSTableRequest::const_ref req
;
186 MDSRank
*get_mds() override
{ return server
->mds
; }
188 C_Rollback(MDSTableServer
*s
, const MMDSTableRequest::const_ref
&r
) : server(s
), req(r
) {}
189 void finish(int r
) override
{
190 server
->_rollback_logged(req
);
195 void MDSTableServer::handle_rollback(const MMDSTableRequest::const_ref
&req
)
197 dout(7) << "handle_rollback " << *req
<< dendl
;
199 ceph_assert(g_conf()->mds_kill_mdstable_at
!= 8);
200 version_t tid
= req
->get_tid();
201 ceph_assert(pending_for_mds
.count(tid
));
202 ceph_assert(!committing_tids
.count(tid
));
205 committing_tids
.insert(tid
);
207 mds
->mdlog
->start_submit_entry(new ETableServer(table
, TABLESERVER_OP_ROLLBACK
, 0, MDS_RANK_NONE
,
208 tid
, projected_version
),
209 new C_Rollback(this, req
));
212 void MDSTableServer::_rollback_logged(const MMDSTableRequest::const_ref
&req
)
214 dout(7) << "_rollback_logged " << *req
<< dendl
;
216 version_t tid
= req
->get_tid();
218 pending_for_mds
.erase(tid
);
219 committing_tids
.erase(tid
);
228 class C_ServerUpdate
: public MDSLogContextBase
{
229 MDSTableServer
*server
;
231 MDSRank
*get_mds() override
{ return server
->mds
; }
233 C_ServerUpdate(MDSTableServer
*s
, bufferlist
&b
) : server(s
), bl(b
) {}
234 void finish(int r
) override
{
235 server
->_server_update_logged(bl
);
239 void MDSTableServer::do_server_update(bufferlist
& bl
)
241 dout(10) << "do_server_update len " << bl
.length() << dendl
;
245 ETableServer
*le
= new ETableServer(table
, TABLESERVER_OP_SERVER_UPDATE
, 0, MDS_RANK_NONE
, 0, projected_version
);
246 mds
->mdlog
->start_entry(le
);
248 mds
->mdlog
->submit_entry(le
, new C_ServerUpdate(this, bl
));
251 void MDSTableServer::_server_update_logged(bufferlist
& bl
)
253 dout(10) << "_server_update_logged len " << bl
.length() << dendl
;
255 _note_server_update(bl
);
260 class C_ServerRecovery
: public MDSContext
{
261 MDSTableServer
*server
;
262 MDSRank
*get_mds() override
{ return server
->mds
; }
264 C_ServerRecovery(MDSTableServer
*s
) : server(s
) {}
265 void finish(int r
) override
{
266 server
->_do_server_recovery();
270 void MDSTableServer::_do_server_recovery()
272 dout(7) << __func__
<< " " << active_clients
<< dendl
;
273 map
<mds_rank_t
, uint64_t> next_reqids
;
275 for (auto p
: pending_for_mds
) {
276 mds_rank_t who
= p
.second
.mds
;
277 if (!active_clients
.count(who
))
280 if (p
.second
.reqid
>= next_reqids
[who
])
281 next_reqids
[who
] = p
.second
.reqid
+ 1;
283 version_t tid
= p
.second
.tid
;
284 auto reply
= MMDSTableRequest::create(table
, TABLESERVER_OP_AGREE
, p
.second
.reqid
, tid
);
285 _get_reply_buffer(tid
, &reply
->bl
);
286 mds
->send_message_mds(reply
, who
);
289 for (auto p
: active_clients
) {
290 auto reply
= MMDSTableRequest::create(table
, TABLESERVER_OP_SERVER_READY
, next_reqids
[p
]);
291 mds
->send_message_mds(reply
, p
);
296 void MDSTableServer::finish_recovery(set
<mds_rank_t
>& active
)
298 dout(7) << __func__
<< dendl
;
300 active_clients
= active
;
302 // don't know if survivor mds have received all 'notify prep' messages.
303 // so we need to send 'notify prep' again.
304 if (!pending_for_mds
.empty() && _notify_prep(version
)) {
305 auto& q
= pending_notifies
[version
];
306 q
.notify_ack_gather
= active_clients
;
307 q
.mds
= MDS_RANK_NONE
;
308 q
.onfinish
= new C_ServerRecovery(this);
310 _do_server_recovery();
314 void MDSTableServer::handle_mds_recovery(mds_rank_t who
)
316 dout(7) << "handle_mds_recovery mds." << who
<< dendl
;
318 active_clients
.insert(who
);
320 dout(7) << " still not recovered, delaying" << dendl
;
324 uint64_t next_reqid
= 0;
325 // resend agrees for recovered mds
326 for (auto p
= pending_for_mds
.begin(); p
!= pending_for_mds
.end(); ++p
) {
327 if (p
->second
.mds
!= who
)
329 ceph_assert(!pending_notifies
.count(p
->second
.tid
));
331 if (p
->second
.reqid
>= next_reqid
)
332 next_reqid
= p
->second
.reqid
+ 1;
334 auto reply
= MMDSTableRequest::create(table
, TABLESERVER_OP_AGREE
, p
->second
.reqid
, p
->second
.tid
);
335 _get_reply_buffer(p
->second
.tid
, &reply
->bl
);
336 mds
->send_message_mds(reply
, who
);
339 auto reply
= MMDSTableRequest::create(table
, TABLESERVER_OP_SERVER_READY
, next_reqid
);
340 mds
->send_message_mds(reply
, who
);
343 void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who
)
345 dout(7) << __func__
<< " mds." << who
<< dendl
;
347 active_clients
.erase(who
);
349 list
<MMDSTableRequest::ref
> rollback
;
350 for (auto p
= pending_notifies
.begin(); p
!= pending_notifies
.end(); ) {
352 if (q
->second
.mds
== who
) {
353 // haven't sent reply yet.
354 rollback
.push_back(q
->second
.reply
);
355 pending_notifies
.erase(q
);
356 } else if (q
->second
.notify_ack_gather
.erase(who
)) {
357 // the failed mds will reload snaptable when it recovers.
358 // so we can remove it from the gather set.
359 if (q
->second
.notify_ack_gather
.empty()) {
360 if (q
->second
.onfinish
)
361 q
->second
.onfinish
->complete(0);
363 mds
->send_message_mds(q
->second
.reply
, q
->second
.mds
);
364 pending_notifies
.erase(q
);
369 for (auto &req
: rollback
) {
370 req
->op
= TABLESERVER_OP_ROLLBACK
;
371 handle_rollback(req
);