]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSTableClient.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mds / MDSTableClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <iostream>
16
17 #include "MDSMap.h"
18
19 #include "MDSContext.h"
20 #include "msg/Messenger.h"
21
22 #include "MDSRank.h"
23 #include "MDLog.h"
24 #include "LogSegment.h"
25
26 #include "MDSTableClient.h"
27 #include "events/ETableClient.h"
28
29 #include "common/config.h"
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mds
33 #undef dout_prefix
34 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".tableclient(" << get_mdstable_name(table) << ") "
35
36
37 class C_LoggedAck : public MDSLogContextBase {
38 MDSTableClient *tc;
39 version_t tid;
40 MDSRank *get_mds() override { return tc->mds; }
41 public:
42 C_LoggedAck(MDSTableClient *a, version_t t) : tc(a), tid(t) {}
43 void finish(int r) override {
44 tc->_logged_ack(tid);
45 }
46 };
47
48
49 void MDSTableClient::handle_request(const MMDSTableRequest::const_ref &m)
50 {
51 dout(10) << "handle_request " << *m << dendl;
52 ceph_assert(m->table == table);
53
54 if (mds->get_state() < MDSMap::STATE_RESOLVE) {
55 if (mds->get_want_state() == CEPH_MDS_STATE_RESOLVE) {
56 mds->wait_for_resolve(new C_MDS_RetryMessage(mds, m));
57 }
58 return;
59 }
60
61 version_t tid = m->get_tid();
62 uint64_t reqid = m->reqid;
63
64 switch (m->op) {
65 case TABLESERVER_OP_QUERY_REPLY:
66 handle_query_result(m);
67 break;
68
69 case TABLESERVER_OP_NOTIFY_PREP:
70 ceph_assert(g_conf()->mds_kill_mdstable_at != 9);
71 handle_notify_prep(m);
72 break;
73
74 case TABLESERVER_OP_AGREE:
75 if (pending_prepare.count(reqid)) {
76 dout(10) << "got agree on " << reqid << " atid " << tid << dendl;
77
78 ceph_assert(g_conf()->mds_kill_mdstable_at != 3);
79
80 MDSContext *onfinish = pending_prepare[reqid].onfinish;
81 *pending_prepare[reqid].ptid = tid;
82 if (pending_prepare[reqid].pbl)
83 *pending_prepare[reqid].pbl = m->bl;
84 pending_prepare.erase(reqid);
85 prepared_update[tid] = reqid;
86 if (onfinish) {
87 onfinish->complete(0);
88 }
89 }
90 else if (prepared_update.count(tid)) {
91 dout(10) << "got duplicated agree on " << reqid << " atid " << tid << dendl;
92 ceph_assert(prepared_update[tid] == reqid);
93 ceph_assert(!server_ready);
94 }
95 else if (pending_commit.count(tid)) {
96 dout(10) << "stray agree on " << reqid << " tid " << tid
97 << ", already committing, will resend COMMIT" << dendl;
98 ceph_assert(!server_ready);
99 // will re-send commit when receiving the server ready message
100 }
101 else {
102 dout(10) << "stray agree on " << reqid << " tid " << tid
103 << ", sending ROLLBACK" << dendl;
104 ceph_assert(!server_ready);
105 auto req = MMDSTableRequest::create(table, TABLESERVER_OP_ROLLBACK, 0, tid);
106 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
107 }
108 break;
109
110 case TABLESERVER_OP_ACK:
111 if (pending_commit.count(tid) &&
112 pending_commit[tid]->pending_commit_tids[table].count(tid)) {
113 dout(10) << "got ack on tid " << tid << ", logging" << dendl;
114
115 ceph_assert(g_conf()->mds_kill_mdstable_at != 7);
116
117 // remove from committing list
118 pending_commit[tid]->pending_commit_tids[table].erase(tid);
119 pending_commit.erase(tid);
120
121 // log ACK.
122 mds->mdlog->start_submit_entry(new ETableClient(table, TABLESERVER_OP_ACK, tid),
123 new C_LoggedAck(this, tid));
124 } else {
125 dout(10) << "got stray ack on tid " << tid << ", ignoring" << dendl;
126 }
127 break;
128
129 case TABLESERVER_OP_SERVER_READY:
130 ceph_assert(!server_ready);
131 server_ready = true;
132
133 if (last_reqid == ~0ULL)
134 last_reqid = reqid;
135
136 resend_queries();
137 resend_prepares();
138 resend_commits();
139 break;
140
141 default:
142 ceph_abort_msg("unrecognized mds_table_client request op");
143 }
144 }
145
146
147 void MDSTableClient::_logged_ack(version_t tid)
148 {
149 dout(10) << "_logged_ack " << tid << dendl;
150 // kick any waiters (LogSegment trim)
151 if (ack_waiters.count(tid)) {
152 dout(15) << "kicking ack waiters on tid " << tid << dendl;
153 mds->queue_waiters(ack_waiters[tid]);
154 ack_waiters.erase(tid);
155 }
156 }
157
158 void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl,
159 MDSContext *onfinish)
160 {
161 if (last_reqid == ~0ULL) {
162 dout(10) << "tableserver is not ready yet, waiting for request id" << dendl;
163 waiting_for_reqid.push_back(_pending_prepare(onfinish, ptid, pbl, mutation));
164 return;
165 }
166
167 uint64_t reqid = ++last_reqid;
168 dout(10) << "_prepare " << reqid << dendl;
169
170 pending_prepare[reqid].mutation = mutation;
171 pending_prepare[reqid].ptid = ptid;
172 pending_prepare[reqid].pbl = pbl;
173 pending_prepare[reqid].onfinish = onfinish;
174
175 if (server_ready) {
176 // send message
177 auto req = MMDSTableRequest::create(table, TABLESERVER_OP_PREPARE, reqid);
178 req->bl = mutation;
179 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
180 } else
181 dout(10) << "tableserver is not ready yet, deferring request" << dendl;
182 }
183
184 void MDSTableClient::commit(version_t tid, LogSegment *ls)
185 {
186 dout(10) << "commit " << tid << dendl;
187
188 ceph_assert(prepared_update.count(tid));
189 prepared_update.erase(tid);
190
191 ceph_assert(pending_commit.count(tid) == 0);
192 pending_commit[tid] = ls;
193 ls->pending_commit_tids[table].insert(tid);
194
195 notify_commit(tid);
196
197 ceph_assert(g_conf()->mds_kill_mdstable_at != 4);
198
199 if (server_ready) {
200 // send message
201 auto req = MMDSTableRequest::create(table, TABLESERVER_OP_COMMIT, 0, tid);
202 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
203 } else
204 dout(10) << "tableserver is not ready yet, deferring request" << dendl;
205 }
206
207
208
209 // recovery
210
211 void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls)
212 {
213 dout(10) << "got_journaled_agree " << tid << dendl;
214 ls->pending_commit_tids[table].insert(tid);
215 pending_commit[tid] = ls;
216
217 notify_commit(tid);
218 }
219
220 void MDSTableClient::got_journaled_ack(version_t tid)
221 {
222 dout(10) << "got_journaled_ack " << tid << dendl;
223 if (pending_commit.count(tid)) {
224 pending_commit[tid]->pending_commit_tids[table].erase(tid);
225 pending_commit.erase(tid);
226 }
227 }
228
229 void MDSTableClient::resend_commits()
230 {
231 for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
232 p != pending_commit.end();
233 ++p) {
234 dout(10) << "resending commit on " << p->first << dendl;
235 auto req = MMDSTableRequest::create(table, TABLESERVER_OP_COMMIT, 0, p->first);
236 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
237 }
238 }
239
240 void MDSTableClient::resend_prepares()
241 {
242 while (!waiting_for_reqid.empty()) {
243 pending_prepare[++last_reqid] = waiting_for_reqid.front();
244 waiting_for_reqid.pop_front();
245 }
246
247 for (map<uint64_t, _pending_prepare>::iterator p = pending_prepare.begin();
248 p != pending_prepare.end();
249 ++p) {
250 dout(10) << "resending prepare on " << p->first << dendl;
251 auto req = MMDSTableRequest::create(table, TABLESERVER_OP_PREPARE, p->first);
252 req->bl = p->second.mutation;
253 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
254 }
255 }
256
257 void MDSTableClient::handle_mds_failure(mds_rank_t who)
258 {
259 if (who != mds->get_mds_map()->get_tableserver())
260 return; // do nothing.
261
262 dout(7) << "tableserver mds." << who << " fails" << dendl;
263 server_ready = false;
264 }