]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/MDSTableServer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mds / MDSTableServer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSTableServer.h"
16#include "MDSRank.h"
17#include "MDLog.h"
18#include "msg/Messenger.h"
19
7c673cae
FG
20#include "events/ETableServer.h"
21
22#define dout_context g_ceph_context
23#define dout_subsys ceph_subsys_mds
24#undef dout_prefix
25#define dout_prefix *_dout << "mds." << rank << ".tableserver(" << get_mdstable_name(table) << ") "
26
20effc67
TL
27using namespace std;
28
9f95a23c 29void MDSTableServer::handle_request(const cref_t<MMDSTableRequest> &req)
7c673cae 30{
11fdf7f2 31 ceph_assert(req->op >= 0);
7c673cae
FG
32 switch (req->op) {
33 case TABLESERVER_OP_QUERY: return handle_query(req);
34 case TABLESERVER_OP_PREPARE: return handle_prepare(req);
35 case TABLESERVER_OP_COMMIT: return handle_commit(req);
36 case TABLESERVER_OP_ROLLBACK: return handle_rollback(req);
11fdf7f2
TL
37 case TABLESERVER_OP_NOTIFY_ACK: return handle_notify_ack(req);
38 default: ceph_abort_msg("unrecognized mds_table_server request op");
7c673cae
FG
39 }
40}
41
42class C_Prepare : public MDSLogContextBase {
43 MDSTableServer *server;
9f95a23c 44 cref_t<MMDSTableRequest> req;
7c673cae
FG
45 version_t tid;
46 MDSRank *get_mds() override { return server->mds; }
47public:
48
9f95a23c 49 C_Prepare(MDSTableServer *s, const cref_t<MMDSTableRequest> r, version_t v) : server(s), req(r), tid(v) {}
7c673cae
FG
50 void finish(int r) override {
51 server->_prepare_logged(req, tid);
52 }
53};
54
55// prepare
9f95a23c 56void MDSTableServer::handle_prepare(const cref_t<MMDSTableRequest> &req)
7c673cae
FG
57{
58 dout(7) << "handle_prepare " << *req << dendl;
59 mds_rank_t from = mds_rank_t(req->get_source().num());
7c673cae 60
11fdf7f2 61 ceph_assert(g_conf()->mds_kill_mdstable_at != 1);
7c673cae 62
11fdf7f2 63 projected_version++;
7c673cae 64
11fdf7f2
TL
65 ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from,
66 projected_version, projected_version);
7c673cae 67 mds->mdlog->start_entry(le);
11fdf7f2
TL
68 le->mutation = req->bl;
69 mds->mdlog->submit_entry(le, new C_Prepare(this, req, projected_version));
7c673cae
FG
70 mds->mdlog->flush();
71}
72
9f95a23c 73void MDSTableServer::_prepare_logged(const cref_t<MMDSTableRequest> &req, version_t tid)
7c673cae
FG
74{
75 dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
11fdf7f2 76 mds_rank_t from = mds_rank_t(req->get_source().num());
7c673cae 77
11fdf7f2 78 ceph_assert(g_conf()->mds_kill_mdstable_at != 2);
7c673cae 79
11fdf7f2
TL
80 _note_prepare(from, req->reqid);
81 bufferlist out;
82 _prepare(req->bl, req->reqid, from, out);
83 ceph_assert(version == tid);
84
9f95a23c 85 auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_AGREE, req->reqid, tid);
11fdf7f2
TL
86 reply->bl = std::move(out);
87
88 if (_notify_prep(tid)) {
89 auto& p = pending_notifies[tid];
90 p.notify_ack_gather = active_clients;
91 p.mds = from;
92 p.reply = reply;
93 } else {
94 mds->send_message_mds(reply, from);
95 }
96}
97
9f95a23c 98void MDSTableServer::handle_notify_ack(const cref_t<MMDSTableRequest> &m)
11fdf7f2
TL
99{
100 dout(7) << __func__ << " " << *m << dendl;
101 mds_rank_t from = mds_rank_t(m->get_source().num());
102 version_t tid = m->get_tid();
103
104 auto p = pending_notifies.find(tid);
105 if (p != pending_notifies.end()) {
106 if (p->second.notify_ack_gather.erase(from)) {
107 if (p->second.notify_ack_gather.empty()) {
108 if (p->second.onfinish)
109 p->second.onfinish->complete(0);
110 else
111 mds->send_message_mds(p->second.reply, p->second.mds);
112 pending_notifies.erase(p);
113 }
114 } else {
115 dout(0) << "got unexpected notify ack for tid " << tid << " from mds." << from << dendl;
116 }
117 } else {
118 }
7c673cae
FG
119}
120
121class C_Commit : public MDSLogContextBase {
122 MDSTableServer *server;
9f95a23c 123 cref_t<MMDSTableRequest> req;
7c673cae
FG
124 MDSRank *get_mds() override { return server->mds; }
125public:
9f95a23c 126 C_Commit(MDSTableServer *s, const cref_t<MMDSTableRequest> &r) : server(s), req(r) {}
7c673cae
FG
127 void finish(int r) override {
128 server->_commit_logged(req);
129 }
130};
131
132// commit
9f95a23c 133void MDSTableServer::handle_commit(const cref_t<MMDSTableRequest> &req)
7c673cae
FG
134{
135 dout(7) << "handle_commit " << *req << dendl;
136
137 version_t tid = req->get_tid();
138
139 if (pending_for_mds.count(tid)) {
140
11fdf7f2
TL
141 if (committing_tids.count(tid)) {
142 dout(0) << "got commit for tid " << tid << ", already committing, waiting." << dendl;
7c673cae 143 return;
11fdf7f2
TL
144 }
145
146 ceph_assert(g_conf()->mds_kill_mdstable_at != 5);
147
148 projected_version++;
149 committing_tids.insert(tid);
7c673cae 150
7c673cae 151 mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE,
11fdf7f2 152 tid, projected_version),
7c673cae
FG
153 new C_Commit(this, req));
154 }
155 else if (tid <= version) {
11fdf7f2
TL
156 dout(0) << "got commit for tid " << tid << " <= " << version
157 << ", already committed, sending ack." << dendl;
9f95a23c 158 auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_ACK, req->reqid, tid);
11fdf7f2 159 mds->send_message(reply, req->get_connection());
7c673cae
FG
160 }
161 else {
162 // wtf.
163 dout(0) << "got commit for tid " << tid << " > " << version << dendl;
11fdf7f2 164 ceph_assert(tid <= version);
7c673cae
FG
165 }
166}
167
9f95a23c 168void MDSTableServer::_commit_logged(const cref_t<MMDSTableRequest> &req)
7c673cae
FG
169{
170 dout(7) << "_commit_logged, sending ACK" << dendl;
171
11fdf7f2
TL
172 ceph_assert(g_conf()->mds_kill_mdstable_at != 6);
173 version_t tid = req->get_tid();
174
175 pending_for_mds.erase(tid);
176 committing_tids.erase(tid);
7c673cae 177
11fdf7f2
TL
178 _commit(tid, req);
179 _note_commit(tid);
180
9f95a23c 181 auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
7c673cae 182 mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
7c673cae
FG
183}
184
11fdf7f2
TL
185class C_Rollback : public MDSLogContextBase {
186 MDSTableServer *server;
9f95a23c 187 cref_t<MMDSTableRequest> req;
11fdf7f2
TL
188 MDSRank *get_mds() override { return server->mds; }
189public:
9f95a23c 190 C_Rollback(MDSTableServer *s, const cref_t<MMDSTableRequest> &r) : server(s), req(r) {}
11fdf7f2
TL
191 void finish(int r) override {
192 server->_rollback_logged(req);
193 }
194};
195
7c673cae 196// ROLLBACK
9f95a23c 197void MDSTableServer::handle_rollback(const cref_t<MMDSTableRequest> &req)
7c673cae
FG
198{
199 dout(7) << "handle_rollback " << *req << dendl;
200
11fdf7f2
TL
201 ceph_assert(g_conf()->mds_kill_mdstable_at != 8);
202 version_t tid = req->get_tid();
203 ceph_assert(pending_for_mds.count(tid));
204 ceph_assert(!committing_tids.count(tid));
205
206 projected_version++;
207 committing_tids.insert(tid);
208
209 mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE,
210 tid, projected_version),
211 new C_Rollback(this, req));
212}
213
9f95a23c 214void MDSTableServer::_rollback_logged(const cref_t<MMDSTableRequest> &req)
11fdf7f2
TL
215{
216 dout(7) << "_rollback_logged " << *req << dendl;
217
7c673cae 218 version_t tid = req->get_tid();
11fdf7f2
TL
219
220 pending_for_mds.erase(tid);
221 committing_tids.erase(tid);
222
7c673cae
FG
223 _rollback(tid);
224 _note_rollback(tid);
7c673cae
FG
225}
226
227
228
229// SERVER UPDATE
11fdf7f2
TL
230class C_ServerUpdate : public MDSLogContextBase {
231 MDSTableServer *server;
232 bufferlist bl;
233 MDSRank *get_mds() override { return server->mds; }
234public:
235 C_ServerUpdate(MDSTableServer *s, bufferlist &b) : server(s), bl(b) {}
236 void finish(int r) override {
237 server->_server_update_logged(bl);
238 }
239};
7c673cae
FG
240
241void MDSTableServer::do_server_update(bufferlist& bl)
242{
243 dout(10) << "do_server_update len " << bl.length() << dendl;
11fdf7f2
TL
244
245 projected_version++;
246
247 ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, projected_version);
7c673cae
FG
248 mds->mdlog->start_entry(le);
249 le->mutation = bl;
11fdf7f2 250 mds->mdlog->submit_entry(le, new C_ServerUpdate(this, bl));
7c673cae
FG
251}
252
11fdf7f2
TL
253void MDSTableServer::_server_update_logged(bufferlist& bl)
254{
255 dout(10) << "_server_update_logged len " << bl.length() << dendl;
256 _server_update(bl);
257 _note_server_update(bl);
258}
7c673cae
FG
259
260// recovery
261
11fdf7f2
TL
262class C_ServerRecovery : public MDSContext {
263 MDSTableServer *server;
264 MDSRank *get_mds() override { return server->mds; }
265public:
266 C_ServerRecovery(MDSTableServer *s) : server(s) {}
267 void finish(int r) override {
268 server->_do_server_recovery();
269 }
270};
271
272void MDSTableServer::_do_server_recovery()
273{
274 dout(7) << __func__ << " " << active_clients << dendl;
275 map<mds_rank_t, uint64_t> next_reqids;
276
277 for (auto p : pending_for_mds) {
278 mds_rank_t who = p.second.mds;
279 if (!active_clients.count(who))
280 continue;
281
282 if (p.second.reqid >= next_reqids[who])
283 next_reqids[who] = p.second.reqid + 1;
284
285 version_t tid = p.second.tid;
9f95a23c 286 auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_AGREE, p.second.reqid, tid);
11fdf7f2
TL
287 _get_reply_buffer(tid, &reply->bl);
288 mds->send_message_mds(reply, who);
289 }
290
291 for (auto p : active_clients) {
9f95a23c 292 auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_SERVER_READY, next_reqids[p]);
11fdf7f2
TL
293 mds->send_message_mds(reply, p);
294 }
295 recovered = true;
296}
297
7c673cae
FG
298void MDSTableServer::finish_recovery(set<mds_rank_t>& active)
299{
11fdf7f2
TL
300 dout(7) << __func__ << dendl;
301
302 active_clients = active;
303
304 // don't know if survivor mds have received all 'notify prep' messages.
305 // so we need to send 'notify prep' again.
306 if (!pending_for_mds.empty() && _notify_prep(version)) {
307 auto& q = pending_notifies[version];
308 q.notify_ack_gather = active_clients;
309 q.mds = MDS_RANK_NONE;
310 q.onfinish = new C_ServerRecovery(this);
311 } else {
312 _do_server_recovery();
313 }
7c673cae
FG
314}
315
316void MDSTableServer::handle_mds_recovery(mds_rank_t who)
317{
318 dout(7) << "handle_mds_recovery mds." << who << dendl;
319
11fdf7f2
TL
320 active_clients.insert(who);
321 if (!recovered) {
322 dout(7) << " still not recovered, delaying" << dendl;
323 return;
324 }
325
7c673cae
FG
326 uint64_t next_reqid = 0;
327 // resend agrees for recovered mds
11fdf7f2 328 for (auto p = pending_for_mds.begin(); p != pending_for_mds.end(); ++p) {
7c673cae
FG
329 if (p->second.mds != who)
330 continue;
11fdf7f2
TL
331 ceph_assert(!pending_notifies.count(p->second.tid));
332
7c673cae
FG
333 if (p->second.reqid >= next_reqid)
334 next_reqid = p->second.reqid + 1;
11fdf7f2 335
9f95a23c 336 auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
11fdf7f2 337 _get_reply_buffer(p->second.tid, &reply->bl);
7c673cae
FG
338 mds->send_message_mds(reply, who);
339 }
340
9f95a23c 341 auto reply = make_message<MMDSTableRequest>(table, TABLESERVER_OP_SERVER_READY, next_reqid);
7c673cae
FG
342 mds->send_message_mds(reply, who);
343}
11fdf7f2
TL
344
345void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who)
346{
347 dout(7) << __func__ << " mds." << who << dendl;
348
349 active_clients.erase(who);
350
9f95a23c 351 list<ref_t<MMDSTableRequest>> rollback;
11fdf7f2
TL
352 for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) {
353 auto q = p++;
354 if (q->second.mds == who) {
355 // haven't sent reply yet.
356 rollback.push_back(q->second.reply);
357 pending_notifies.erase(q);
358 } else if (q->second.notify_ack_gather.erase(who)) {
359 // the failed mds will reload snaptable when it recovers.
360 // so we can remove it from the gather set.
361 if (q->second.notify_ack_gather.empty()) {
362 if (q->second.onfinish)
363 q->second.onfinish->complete(0);
364 else
365 mds->send_message_mds(q->second.reply, q->second.mds);
366 pending_notifies.erase(q);
367 }
368 }
369 }
370
371 for (auto &req : rollback) {
372 req->op = TABLESERVER_OP_ROLLBACK;
373 handle_rollback(req);
374 }
375}