]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "MDSTableServer.h" | |
16 | #include "MDSRank.h" | |
17 | #include "MDLog.h" | |
18 | #include "msg/Messenger.h" | |
19 | ||
7c673cae FG |
20 | #include "events/ETableServer.h" |
21 | ||
22 | #define dout_context g_ceph_context | |
23 | #define dout_subsys ceph_subsys_mds | |
24 | #undef dout_prefix | |
25 | #define dout_prefix *_dout << "mds." << rank << ".tableserver(" << get_mdstable_name(table) << ") " | |
26 | ||
20effc67 TL |
27 | using namespace std; |
28 | ||
9f95a23c | 29 | void MDSTableServer::handle_request(const cref_t<MMDSTableRequest> &req) |
7c673cae | 30 | { |
11fdf7f2 | 31 | ceph_assert(req->op >= 0); |
7c673cae FG |
32 | switch (req->op) { |
33 | case TABLESERVER_OP_QUERY: return handle_query(req); | |
34 | case TABLESERVER_OP_PREPARE: return handle_prepare(req); | |
35 | case TABLESERVER_OP_COMMIT: return handle_commit(req); | |
36 | case TABLESERVER_OP_ROLLBACK: return handle_rollback(req); | |
11fdf7f2 TL |
37 | case TABLESERVER_OP_NOTIFY_ACK: return handle_notify_ack(req); |
38 | default: ceph_abort_msg("unrecognized mds_table_server request op"); | |
7c673cae FG |
39 | } |
40 | } | |
41 | ||
42 | class C_Prepare : public MDSLogContextBase { | |
43 | MDSTableServer *server; | |
9f95a23c | 44 | cref_t<MMDSTableRequest> req; |
7c673cae FG |
45 | version_t tid; |
46 | MDSRank *get_mds() override { return server->mds; } | |
47 | public: | |
48 | ||
9f95a23c | 49 | C_Prepare(MDSTableServer *s, const cref_t<MMDSTableRequest> r, version_t v) : server(s), req(r), tid(v) {} |
7c673cae FG |
50 | void finish(int r) override { |
51 | server->_prepare_logged(req, tid); | |
52 | } | |
53 | }; | |
54 | ||
55 | // prepare | |
9f95a23c | 56 | void MDSTableServer::handle_prepare(const cref_t<MMDSTableRequest> &req) |
7c673cae FG |
57 | { |
58 | dout(7) << "handle_prepare " << *req << dendl; | |
59 | mds_rank_t from = mds_rank_t(req->get_source().num()); | |
7c673cae | 60 | |
11fdf7f2 | 61 | ceph_assert(g_conf()->mds_kill_mdstable_at != 1); |
7c673cae | 62 | |
11fdf7f2 | 63 | projected_version++; |
7c673cae | 64 | |
11fdf7f2 TL |
65 | ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from, |
66 | projected_version, projected_version); | |
7c673cae | 67 | mds->mdlog->start_entry(le); |
11fdf7f2 TL |
68 | le->mutation = req->bl; |
69 | mds->mdlog->submit_entry(le, new C_Prepare(this, req, projected_version)); | |
7c673cae FG |
70 | mds->mdlog->flush(); |
71 | } | |
72 | ||
9f95a23c | 73 | void MDSTableServer::_prepare_logged(const cref_t<MMDSTableRequest> &req, version_t tid) |
7c673cae FG |
74 | { |
75 | dout(7) << "_create_logged " << *req << " tid " << tid << dendl; | |
11fdf7f2 | 76 | mds_rank_t from = mds_rank_t(req->get_source().num()); |
7c673cae | 77 | |
11fdf7f2 | 78 | ceph_assert(g_conf()->mds_kill_mdstable_at != 2); |
7c673cae | 79 | |
11fdf7f2 TL |
80 | _note_prepare(from, req->reqid); |
81 | bufferlist out; | |
82 | _prepare(req->bl, req->reqid, from, out); | |
83 | ceph_assert(version == tid); | |
84 | ||
9f95a23c | 85 | auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_AGREE, req->reqid, tid); |
11fdf7f2 TL |
86 | reply->bl = std::move(out); |
87 | ||
88 | if (_notify_prep(tid)) { | |
89 | auto& p = pending_notifies[tid]; | |
90 | p.notify_ack_gather = active_clients; | |
91 | p.mds = from; | |
92 | p.reply = reply; | |
93 | } else { | |
94 | mds->send_message_mds(reply, from); | |
95 | } | |
96 | } | |
97 | ||
9f95a23c | 98 | void MDSTableServer::handle_notify_ack(const cref_t<MMDSTableRequest> &m) |
11fdf7f2 TL |
99 | { |
100 | dout(7) << __func__ << " " << *m << dendl; | |
101 | mds_rank_t from = mds_rank_t(m->get_source().num()); | |
102 | version_t tid = m->get_tid(); | |
103 | ||
104 | auto p = pending_notifies.find(tid); | |
105 | if (p != pending_notifies.end()) { | |
106 | if (p->second.notify_ack_gather.erase(from)) { | |
107 | if (p->second.notify_ack_gather.empty()) { | |
108 | if (p->second.onfinish) | |
109 | p->second.onfinish->complete(0); | |
110 | else | |
111 | mds->send_message_mds(p->second.reply, p->second.mds); | |
112 | pending_notifies.erase(p); | |
113 | } | |
114 | } else { | |
115 | dout(0) << "got unexpected notify ack for tid " << tid << " from mds." << from << dendl; | |
116 | } | |
117 | } else { | |
118 | } | |
7c673cae FG |
119 | } |
120 | ||
121 | class C_Commit : public MDSLogContextBase { | |
122 | MDSTableServer *server; | |
9f95a23c | 123 | cref_t<MMDSTableRequest> req; |
7c673cae FG |
124 | MDSRank *get_mds() override { return server->mds; } |
125 | public: | |
9f95a23c | 126 | C_Commit(MDSTableServer *s, const cref_t<MMDSTableRequest> &r) : server(s), req(r) {} |
7c673cae FG |
127 | void finish(int r) override { |
128 | server->_commit_logged(req); | |
129 | } | |
130 | }; | |
131 | ||
132 | // commit | |
9f95a23c | 133 | void MDSTableServer::handle_commit(const cref_t<MMDSTableRequest> &req) |
7c673cae FG |
134 | { |
135 | dout(7) << "handle_commit " << *req << dendl; | |
136 | ||
137 | version_t tid = req->get_tid(); | |
138 | ||
139 | if (pending_for_mds.count(tid)) { | |
140 | ||
11fdf7f2 TL |
141 | if (committing_tids.count(tid)) { |
142 | dout(0) << "got commit for tid " << tid << ", already committing, waiting." << dendl; | |
7c673cae | 143 | return; |
11fdf7f2 TL |
144 | } |
145 | ||
146 | ceph_assert(g_conf()->mds_kill_mdstable_at != 5); | |
147 | ||
148 | projected_version++; | |
149 | committing_tids.insert(tid); | |
7c673cae | 150 | |
7c673cae | 151 | mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE, |
11fdf7f2 | 152 | tid, projected_version), |
7c673cae FG |
153 | new C_Commit(this, req)); |
154 | } | |
155 | else if (tid <= version) { | |
11fdf7f2 TL |
156 | dout(0) << "got commit for tid " << tid << " <= " << version |
157 | << ", already committed, sending ack." << dendl; | |
9f95a23c | 158 | auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_ACK, req->reqid, tid); |
11fdf7f2 | 159 | mds->send_message(reply, req->get_connection()); |
7c673cae FG |
160 | } |
161 | else { | |
162 | // wtf. | |
163 | dout(0) << "got commit for tid " << tid << " > " << version << dendl; | |
11fdf7f2 | 164 | ceph_assert(tid <= version); |
7c673cae FG |
165 | } |
166 | } | |
167 | ||
9f95a23c | 168 | void MDSTableServer::_commit_logged(const cref_t<MMDSTableRequest> &req) |
7c673cae FG |
169 | { |
170 | dout(7) << "_commit_logged, sending ACK" << dendl; | |
171 | ||
11fdf7f2 TL |
172 | ceph_assert(g_conf()->mds_kill_mdstable_at != 6); |
173 | version_t tid = req->get_tid(); | |
174 | ||
175 | pending_for_mds.erase(tid); | |
176 | committing_tids.erase(tid); | |
7c673cae | 177 | |
11fdf7f2 TL |
178 | _commit(tid, req); |
179 | _note_commit(tid); | |
180 | ||
9f95a23c | 181 | auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid()); |
7c673cae | 182 | mds->send_message_mds(reply, mds_rank_t(req->get_source().num())); |
7c673cae FG |
183 | } |
184 | ||
11fdf7f2 TL |
185 | class C_Rollback : public MDSLogContextBase { |
186 | MDSTableServer *server; | |
9f95a23c | 187 | cref_t<MMDSTableRequest> req; |
11fdf7f2 TL |
188 | MDSRank *get_mds() override { return server->mds; } |
189 | public: | |
9f95a23c | 190 | C_Rollback(MDSTableServer *s, const cref_t<MMDSTableRequest> &r) : server(s), req(r) {} |
11fdf7f2 TL |
191 | void finish(int r) override { |
192 | server->_rollback_logged(req); | |
193 | } | |
194 | }; | |
195 | ||
7c673cae | 196 | // ROLLBACK |
9f95a23c | 197 | void MDSTableServer::handle_rollback(const cref_t<MMDSTableRequest> &req) |
7c673cae FG |
198 | { |
199 | dout(7) << "handle_rollback " << *req << dendl; | |
200 | ||
11fdf7f2 TL |
201 | ceph_assert(g_conf()->mds_kill_mdstable_at != 8); |
202 | version_t tid = req->get_tid(); | |
203 | ceph_assert(pending_for_mds.count(tid)); | |
204 | ceph_assert(!committing_tids.count(tid)); | |
205 | ||
206 | projected_version++; | |
207 | committing_tids.insert(tid); | |
208 | ||
209 | mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE, | |
210 | tid, projected_version), | |
211 | new C_Rollback(this, req)); | |
212 | } | |
213 | ||
9f95a23c | 214 | void MDSTableServer::_rollback_logged(const cref_t<MMDSTableRequest> &req) |
11fdf7f2 TL |
215 | { |
216 | dout(7) << "_rollback_logged " << *req << dendl; | |
217 | ||
7c673cae | 218 | version_t tid = req->get_tid(); |
11fdf7f2 TL |
219 | |
220 | pending_for_mds.erase(tid); | |
221 | committing_tids.erase(tid); | |
222 | ||
7c673cae FG |
223 | _rollback(tid); |
224 | _note_rollback(tid); | |
7c673cae FG |
225 | } |
226 | ||
227 | ||
228 | ||
229 | // SERVER UPDATE | |
11fdf7f2 TL |
230 | class C_ServerUpdate : public MDSLogContextBase { |
231 | MDSTableServer *server; | |
232 | bufferlist bl; | |
233 | MDSRank *get_mds() override { return server->mds; } | |
234 | public: | |
235 | C_ServerUpdate(MDSTableServer *s, bufferlist &b) : server(s), bl(b) {} | |
236 | void finish(int r) override { | |
237 | server->_server_update_logged(bl); | |
238 | } | |
239 | }; | |
7c673cae FG |
240 | |
241 | void MDSTableServer::do_server_update(bufferlist& bl) | |
242 | { | |
243 | dout(10) << "do_server_update len " << bl.length() << dendl; | |
11fdf7f2 TL |
244 | |
245 | projected_version++; | |
246 | ||
247 | ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, projected_version); | |
7c673cae FG |
248 | mds->mdlog->start_entry(le); |
249 | le->mutation = bl; | |
11fdf7f2 | 250 | mds->mdlog->submit_entry(le, new C_ServerUpdate(this, bl)); |
7c673cae FG |
251 | } |
252 | ||
11fdf7f2 TL |
253 | void MDSTableServer::_server_update_logged(bufferlist& bl) |
254 | { | |
255 | dout(10) << "_server_update_logged len " << bl.length() << dendl; | |
256 | _server_update(bl); | |
257 | _note_server_update(bl); | |
258 | } | |
7c673cae FG |
259 | |
260 | // recovery | |
261 | ||
11fdf7f2 TL |
262 | class C_ServerRecovery : public MDSContext { |
263 | MDSTableServer *server; | |
264 | MDSRank *get_mds() override { return server->mds; } | |
265 | public: | |
266 | C_ServerRecovery(MDSTableServer *s) : server(s) {} | |
267 | void finish(int r) override { | |
268 | server->_do_server_recovery(); | |
269 | } | |
270 | }; | |
271 | ||
272 | void MDSTableServer::_do_server_recovery() | |
273 | { | |
274 | dout(7) << __func__ << " " << active_clients << dendl; | |
275 | map<mds_rank_t, uint64_t> next_reqids; | |
276 | ||
277 | for (auto p : pending_for_mds) { | |
278 | mds_rank_t who = p.second.mds; | |
279 | if (!active_clients.count(who)) | |
280 | continue; | |
281 | ||
282 | if (p.second.reqid >= next_reqids[who]) | |
283 | next_reqids[who] = p.second.reqid + 1; | |
284 | ||
285 | version_t tid = p.second.tid; | |
9f95a23c | 286 | auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_AGREE, p.second.reqid, tid); |
11fdf7f2 TL |
287 | _get_reply_buffer(tid, &reply->bl); |
288 | mds->send_message_mds(reply, who); | |
289 | } | |
290 | ||
291 | for (auto p : active_clients) { | |
9f95a23c | 292 | auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_SERVER_READY, next_reqids[p]); |
11fdf7f2 TL |
293 | mds->send_message_mds(reply, p); |
294 | } | |
295 | recovered = true; | |
296 | } | |
297 | ||
7c673cae FG |
298 | void MDSTableServer::finish_recovery(set<mds_rank_t>& active) |
299 | { | |
11fdf7f2 TL |
300 | dout(7) << __func__ << dendl; |
301 | ||
302 | active_clients = active; | |
303 | ||
304 | // don't know if survivor mds have received all 'notify prep' messages. | |
305 | // so we need to send 'notify prep' again. | |
306 | if (!pending_for_mds.empty() && _notify_prep(version)) { | |
307 | auto& q = pending_notifies[version]; | |
308 | q.notify_ack_gather = active_clients; | |
309 | q.mds = MDS_RANK_NONE; | |
310 | q.onfinish = new C_ServerRecovery(this); | |
311 | } else { | |
312 | _do_server_recovery(); | |
313 | } | |
7c673cae FG |
314 | } |
315 | ||
316 | void MDSTableServer::handle_mds_recovery(mds_rank_t who) | |
317 | { | |
318 | dout(7) << "handle_mds_recovery mds." << who << dendl; | |
319 | ||
11fdf7f2 TL |
320 | active_clients.insert(who); |
321 | if (!recovered) { | |
322 | dout(7) << " still not recovered, delaying" << dendl; | |
323 | return; | |
324 | } | |
325 | ||
7c673cae FG |
326 | uint64_t next_reqid = 0; |
327 | // resend agrees for recovered mds | |
11fdf7f2 | 328 | for (auto p = pending_for_mds.begin(); p != pending_for_mds.end(); ++p) { |
7c673cae FG |
329 | if (p->second.mds != who) |
330 | continue; | |
11fdf7f2 TL |
331 | ceph_assert(!pending_notifies.count(p->second.tid)); |
332 | ||
7c673cae FG |
333 | if (p->second.reqid >= next_reqid) |
334 | next_reqid = p->second.reqid + 1; | |
11fdf7f2 | 335 | |
9f95a23c | 336 | auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid); |
11fdf7f2 | 337 | _get_reply_buffer(p->second.tid, &reply->bl); |
7c673cae FG |
338 | mds->send_message_mds(reply, who); |
339 | } | |
340 | ||
9f95a23c | 341 | auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_SERVER_READY, next_reqid); |
7c673cae FG |
342 | mds->send_message_mds(reply, who); |
343 | } | |
11fdf7f2 TL |
344 | |
345 | void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who) | |
346 | { | |
347 | dout(7) << __func__ << " mds." << who << dendl; | |
348 | ||
349 | active_clients.erase(who); | |
350 | ||
9f95a23c | 351 | list<ref_t<MMDSTableRequest>> rollback; |
11fdf7f2 TL |
352 | for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) { |
353 | auto q = p++; | |
354 | if (q->second.mds == who) { | |
355 | // haven't sent reply yet. | |
356 | rollback.push_back(q->second.reply); | |
357 | pending_notifies.erase(q); | |
358 | } else if (q->second.notify_ack_gather.erase(who)) { | |
359 | // the failed mds will reload snaptable when it recovers. | |
360 | // so we can remove it from the gather set. | |
361 | if (q->second.notify_ack_gather.empty()) { | |
362 | if (q->second.onfinish) | |
363 | q->second.onfinish->complete(0); | |
364 | else | |
365 | mds->send_message_mds(q->second.reply, q->second.mds); | |
366 | pending_notifies.erase(q); | |
367 | } | |
368 | } | |
369 | } | |
370 | ||
371 | for (auto &req : rollback) { | |
372 | req->op = TABLESERVER_OP_ROLLBACK; | |
373 | handle_rollback(req); | |
374 | } | |
375 | } |