]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSTableServer.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mds / MDSTableServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSTableServer.h"
16 #include "MDSRank.h"
17 #include "MDLog.h"
18 #include "msg/Messenger.h"
19
20 #include "events/ETableServer.h"
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_mds
24 #undef dout_prefix
25 #define dout_prefix *_dout << "mds." << rank << ".tableserver(" << get_mdstable_name(table) << ") "
26
27 void MDSTableServer::handle_request(const MMDSTableRequest::const_ref &req)
28 {
29 ceph_assert(req->op >= 0);
30 switch (req->op) {
31 case TABLESERVER_OP_QUERY: return handle_query(req);
32 case TABLESERVER_OP_PREPARE: return handle_prepare(req);
33 case TABLESERVER_OP_COMMIT: return handle_commit(req);
34 case TABLESERVER_OP_ROLLBACK: return handle_rollback(req);
35 case TABLESERVER_OP_NOTIFY_ACK: return handle_notify_ack(req);
36 default: ceph_abort_msg("unrecognized mds_table_server request op");
37 }
38 }
39
40 class C_Prepare : public MDSLogContextBase {
41 MDSTableServer *server;
42 MMDSTableRequest::const_ref req;
43 version_t tid;
44 MDSRank *get_mds() override { return server->mds; }
45 public:
46
47 C_Prepare(MDSTableServer *s, const MMDSTableRequest::const_ref r, version_t v) : server(s), req(r), tid(v) {}
48 void finish(int r) override {
49 server->_prepare_logged(req, tid);
50 }
51 };
52
53 // prepare
54 void MDSTableServer::handle_prepare(const MMDSTableRequest::const_ref &req)
55 {
56 dout(7) << "handle_prepare " << *req << dendl;
57 mds_rank_t from = mds_rank_t(req->get_source().num());
58
59 ceph_assert(g_conf()->mds_kill_mdstable_at != 1);
60
61 projected_version++;
62
63 ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from,
64 projected_version, projected_version);
65 mds->mdlog->start_entry(le);
66 le->mutation = req->bl;
67 mds->mdlog->submit_entry(le, new C_Prepare(this, req, projected_version));
68 mds->mdlog->flush();
69 }
70
71 void MDSTableServer::_prepare_logged(const MMDSTableRequest::const_ref &req, version_t tid)
72 {
73 dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
74 mds_rank_t from = mds_rank_t(req->get_source().num());
75
76 ceph_assert(g_conf()->mds_kill_mdstable_at != 2);
77
78 _note_prepare(from, req->reqid);
79 bufferlist out;
80 _prepare(req->bl, req->reqid, from, out);
81 ceph_assert(version == tid);
82
83 auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_AGREE, req->reqid, tid);
84 reply->bl = std::move(out);
85
86 if (_notify_prep(tid)) {
87 auto& p = pending_notifies[tid];
88 p.notify_ack_gather = active_clients;
89 p.mds = from;
90 p.reply = reply;
91 } else {
92 mds->send_message_mds(reply, from);
93 }
94 }
95
96 void MDSTableServer::handle_notify_ack(const MMDSTableRequest::const_ref &m)
97 {
98 dout(7) << __func__ << " " << *m << dendl;
99 mds_rank_t from = mds_rank_t(m->get_source().num());
100 version_t tid = m->get_tid();
101
102 auto p = pending_notifies.find(tid);
103 if (p != pending_notifies.end()) {
104 if (p->second.notify_ack_gather.erase(from)) {
105 if (p->second.notify_ack_gather.empty()) {
106 if (p->second.onfinish)
107 p->second.onfinish->complete(0);
108 else
109 mds->send_message_mds(p->second.reply, p->second.mds);
110 pending_notifies.erase(p);
111 }
112 } else {
113 dout(0) << "got unexpected notify ack for tid " << tid << " from mds." << from << dendl;
114 }
115 } else {
116 }
117 }
118
119 class C_Commit : public MDSLogContextBase {
120 MDSTableServer *server;
121 MMDSTableRequest::const_ref req;
122 MDSRank *get_mds() override { return server->mds; }
123 public:
124 C_Commit(MDSTableServer *s, const MMDSTableRequest::const_ref &r) : server(s), req(r) {}
125 void finish(int r) override {
126 server->_commit_logged(req);
127 }
128 };
129
130 // commit
131 void MDSTableServer::handle_commit(const MMDSTableRequest::const_ref &req)
132 {
133 dout(7) << "handle_commit " << *req << dendl;
134
135 version_t tid = req->get_tid();
136
137 if (pending_for_mds.count(tid)) {
138
139 if (committing_tids.count(tid)) {
140 dout(0) << "got commit for tid " << tid << ", already committing, waiting." << dendl;
141 return;
142 }
143
144 ceph_assert(g_conf()->mds_kill_mdstable_at != 5);
145
146 projected_version++;
147 committing_tids.insert(tid);
148
149 mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE,
150 tid, projected_version),
151 new C_Commit(this, req));
152 }
153 else if (tid <= version) {
154 dout(0) << "got commit for tid " << tid << " <= " << version
155 << ", already committed, sending ack." << dendl;
156 auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_ACK, req->reqid, tid);
157 mds->send_message(reply, req->get_connection());
158 }
159 else {
160 // wtf.
161 dout(0) << "got commit for tid " << tid << " > " << version << dendl;
162 ceph_assert(tid <= version);
163 }
164 }
165
166 void MDSTableServer::_commit_logged(const MMDSTableRequest::const_ref &req)
167 {
168 dout(7) << "_commit_logged, sending ACK" << dendl;
169
170 ceph_assert(g_conf()->mds_kill_mdstable_at != 6);
171 version_t tid = req->get_tid();
172
173 pending_for_mds.erase(tid);
174 committing_tids.erase(tid);
175
176 _commit(tid, req);
177 _note_commit(tid);
178
179 auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
180 mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
181 }
182
183 class C_Rollback : public MDSLogContextBase {
184 MDSTableServer *server;
185 MMDSTableRequest::const_ref req;
186 MDSRank *get_mds() override { return server->mds; }
187 public:
188 C_Rollback(MDSTableServer *s, const MMDSTableRequest::const_ref &r) : server(s), req(r) {}
189 void finish(int r) override {
190 server->_rollback_logged(req);
191 }
192 };
193
194 // ROLLBACK
195 void MDSTableServer::handle_rollback(const MMDSTableRequest::const_ref &req)
196 {
197 dout(7) << "handle_rollback " << *req << dendl;
198
199 ceph_assert(g_conf()->mds_kill_mdstable_at != 8);
200 version_t tid = req->get_tid();
201 ceph_assert(pending_for_mds.count(tid));
202 ceph_assert(!committing_tids.count(tid));
203
204 projected_version++;
205 committing_tids.insert(tid);
206
207 mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE,
208 tid, projected_version),
209 new C_Rollback(this, req));
210 }
211
212 void MDSTableServer::_rollback_logged(const MMDSTableRequest::const_ref &req)
213 {
214 dout(7) << "_rollback_logged " << *req << dendl;
215
216 version_t tid = req->get_tid();
217
218 pending_for_mds.erase(tid);
219 committing_tids.erase(tid);
220
221 _rollback(tid);
222 _note_rollback(tid);
223 }
224
225
226
227 // SERVER UPDATE
228 class C_ServerUpdate : public MDSLogContextBase {
229 MDSTableServer *server;
230 bufferlist bl;
231 MDSRank *get_mds() override { return server->mds; }
232 public:
233 C_ServerUpdate(MDSTableServer *s, bufferlist &b) : server(s), bl(b) {}
234 void finish(int r) override {
235 server->_server_update_logged(bl);
236 }
237 };
238
239 void MDSTableServer::do_server_update(bufferlist& bl)
240 {
241 dout(10) << "do_server_update len " << bl.length() << dendl;
242
243 projected_version++;
244
245 ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, projected_version);
246 mds->mdlog->start_entry(le);
247 le->mutation = bl;
248 mds->mdlog->submit_entry(le, new C_ServerUpdate(this, bl));
249 }
250
251 void MDSTableServer::_server_update_logged(bufferlist& bl)
252 {
253 dout(10) << "_server_update_logged len " << bl.length() << dendl;
254 _server_update(bl);
255 _note_server_update(bl);
256 }
257
258 // recovery
259
260 class C_ServerRecovery : public MDSContext {
261 MDSTableServer *server;
262 MDSRank *get_mds() override { return server->mds; }
263 public:
264 C_ServerRecovery(MDSTableServer *s) : server(s) {}
265 void finish(int r) override {
266 server->_do_server_recovery();
267 }
268 };
269
270 void MDSTableServer::_do_server_recovery()
271 {
272 dout(7) << __func__ << " " << active_clients << dendl;
273 map<mds_rank_t, uint64_t> next_reqids;
274
275 for (auto p : pending_for_mds) {
276 mds_rank_t who = p.second.mds;
277 if (!active_clients.count(who))
278 continue;
279
280 if (p.second.reqid >= next_reqids[who])
281 next_reqids[who] = p.second.reqid + 1;
282
283 version_t tid = p.second.tid;
284 auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_AGREE, p.second.reqid, tid);
285 _get_reply_buffer(tid, &reply->bl);
286 mds->send_message_mds(reply, who);
287 }
288
289 for (auto p : active_clients) {
290 auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_SERVER_READY, next_reqids[p]);
291 mds->send_message_mds(reply, p);
292 }
293 recovered = true;
294 }
295
296 void MDSTableServer::finish_recovery(set<mds_rank_t>& active)
297 {
298 dout(7) << __func__ << dendl;
299
300 active_clients = active;
301
302 // don't know if survivor mds have received all 'notify prep' messages.
303 // so we need to send 'notify prep' again.
304 if (!pending_for_mds.empty() && _notify_prep(version)) {
305 auto& q = pending_notifies[version];
306 q.notify_ack_gather = active_clients;
307 q.mds = MDS_RANK_NONE;
308 q.onfinish = new C_ServerRecovery(this);
309 } else {
310 _do_server_recovery();
311 }
312 }
313
314 void MDSTableServer::handle_mds_recovery(mds_rank_t who)
315 {
316 dout(7) << "handle_mds_recovery mds." << who << dendl;
317
318 active_clients.insert(who);
319 if (!recovered) {
320 dout(7) << " still not recovered, delaying" << dendl;
321 return;
322 }
323
324 uint64_t next_reqid = 0;
325 // resend agrees for recovered mds
326 for (auto p = pending_for_mds.begin(); p != pending_for_mds.end(); ++p) {
327 if (p->second.mds != who)
328 continue;
329 ceph_assert(!pending_notifies.count(p->second.tid));
330
331 if (p->second.reqid >= next_reqid)
332 next_reqid = p->second.reqid + 1;
333
334 auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
335 _get_reply_buffer(p->second.tid, &reply->bl);
336 mds->send_message_mds(reply, who);
337 }
338
339 auto reply = MMDSTableRequest::create(table, TABLESERVER_OP_SERVER_READY, next_reqid);
340 mds->send_message_mds(reply, who);
341 }
342
343 void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who)
344 {
345 dout(7) << __func__ << " mds." << who << dendl;
346
347 active_clients.erase(who);
348
349 list<MMDSTableRequest::ref> rollback;
350 for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) {
351 auto q = p++;
352 if (q->second.mds == who) {
353 // haven't sent reply yet.
354 rollback.push_back(q->second.reply);
355 pending_notifies.erase(q);
356 } else if (q->second.notify_ack_gather.erase(who)) {
357 // the failed mds will reload snaptable when it recovers.
358 // so we can remove it from the gather set.
359 if (q->second.notify_ack_gather.empty()) {
360 if (q->second.onfinish)
361 q->second.onfinish->complete(0);
362 else
363 mds->send_message_mds(q->second.reply, q->second.mds);
364 pending_notifies.erase(q);
365 }
366 }
367 }
368
369 for (auto &req : rollback) {
370 req->op = TABLESERVER_OP_ROLLBACK;
371 handle_rollback(req);
372 }
373 }