]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSTableClient.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mds / MDSTableClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSMap.h"
16
17 #include "MDSContext.h"
18 #include "msg/Messenger.h"
19
20 #include "MDSRank.h"
21 #include "MDLog.h"
22 #include "LogSegment.h"
23
24 #include "MDSTableClient.h"
25 #include "events/ETableClient.h"
26
27 #include "common/config.h"
28
29 #define dout_context g_ceph_context
30 #define dout_subsys ceph_subsys_mds
31 #undef dout_prefix
32 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".tableclient(" << get_mdstable_name(table) << ") "
33
34
35 class C_LoggedAck : public MDSLogContextBase {
36 MDSTableClient *tc;
37 version_t tid;
38 MDSRank *get_mds() override { return tc->mds; }
39 public:
40 C_LoggedAck(MDSTableClient *a, version_t t) : tc(a), tid(t) {}
41 void finish(int r) override {
42 tc->_logged_ack(tid);
43 }
44 };
45
46
47 void MDSTableClient::handle_request(const cref_t<MMDSTableRequest> &m)
48 {
49 dout(10) << "handle_request " << *m << dendl;
50 ceph_assert(m->table == table);
51
52 if (mds->get_state() < MDSMap::STATE_RESOLVE) {
53 if (mds->get_want_state() == CEPH_MDS_STATE_RESOLVE) {
54 mds->wait_for_resolve(new C_MDS_RetryMessage(mds, m));
55 }
56 return;
57 }
58
59 version_t tid = m->get_tid();
60 uint64_t reqid = m->reqid;
61
62 switch (m->op) {
63 case TABLESERVER_OP_QUERY_REPLY:
64 handle_query_result(m);
65 break;
66
67 case TABLESERVER_OP_NOTIFY_PREP:
68 ceph_assert(g_conf()->mds_kill_mdstable_at != 9);
69 handle_notify_prep(m);
70 break;
71
72 case TABLESERVER_OP_AGREE:
73 if (pending_prepare.count(reqid)) {
74 dout(10) << "got agree on " << reqid << " atid " << tid << dendl;
75
76 ceph_assert(g_conf()->mds_kill_mdstable_at != 3);
77
78 MDSContext *onfinish = pending_prepare[reqid].onfinish;
79 *pending_prepare[reqid].ptid = tid;
80 if (pending_prepare[reqid].pbl)
81 *pending_prepare[reqid].pbl = m->bl;
82 pending_prepare.erase(reqid);
83 prepared_update[tid] = reqid;
84 if (onfinish) {
85 onfinish->complete(0);
86 }
87 }
88 else if (prepared_update.count(tid)) {
89 dout(10) << "got duplicated agree on " << reqid << " atid " << tid << dendl;
90 ceph_assert(prepared_update[tid] == reqid);
91 ceph_assert(!server_ready);
92 }
93 else if (pending_commit.count(tid)) {
94 dout(10) << "stray agree on " << reqid << " tid " << tid
95 << ", already committing, will resend COMMIT" << dendl;
96 ceph_assert(!server_ready);
97 // will re-send commit when receiving the server ready message
98 }
99 else {
100 dout(10) << "stray agree on " << reqid << " tid " << tid
101 << ", sending ROLLBACK" << dendl;
102 ceph_assert(!server_ready);
103 auto req = make_message<MMDSTableRequest>(table, TABLESERVER_OP_ROLLBACK, 0, tid);
104 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
105 }
106 break;
107
108 case TABLESERVER_OP_ACK:
109 if (pending_commit.count(tid) &&
110 pending_commit[tid]->pending_commit_tids[table].count(tid)) {
111 dout(10) << "got ack on tid " << tid << ", logging" << dendl;
112
113 ceph_assert(g_conf()->mds_kill_mdstable_at != 7);
114
115 // remove from committing list
116 pending_commit[tid]->pending_commit_tids[table].erase(tid);
117 pending_commit.erase(tid);
118
119 // log ACK.
120 mds->mdlog->start_submit_entry(new ETableClient(table, TABLESERVER_OP_ACK, tid),
121 new C_LoggedAck(this, tid));
122 } else {
123 dout(10) << "got stray ack on tid " << tid << ", ignoring" << dendl;
124 }
125 break;
126
127 case TABLESERVER_OP_SERVER_READY:
128 ceph_assert(!server_ready);
129 server_ready = true;
130
131 if (last_reqid == ~0ULL)
132 last_reqid = reqid;
133
134 resend_queries();
135 resend_prepares();
136 resend_commits();
137 break;
138
139 default:
140 ceph_abort_msg("unrecognized mds_table_client request op");
141 }
142 }
143
144
145 void MDSTableClient::_logged_ack(version_t tid)
146 {
147 dout(10) << "_logged_ack " << tid << dendl;
148 // kick any waiters (LogSegment trim)
149 if (ack_waiters.count(tid)) {
150 dout(15) << "kicking ack waiters on tid " << tid << dendl;
151 mds->queue_waiters(ack_waiters[tid]);
152 ack_waiters.erase(tid);
153 }
154 }
155
156 void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl,
157 MDSContext *onfinish)
158 {
159 if (last_reqid == ~0ULL) {
160 dout(10) << "tableserver is not ready yet, waiting for request id" << dendl;
161 waiting_for_reqid.push_back(_pending_prepare(onfinish, ptid, pbl, mutation));
162 return;
163 }
164
165 uint64_t reqid = ++last_reqid;
166 dout(10) << "_prepare " << reqid << dendl;
167
168 pending_prepare[reqid].mutation = mutation;
169 pending_prepare[reqid].ptid = ptid;
170 pending_prepare[reqid].pbl = pbl;
171 pending_prepare[reqid].onfinish = onfinish;
172
173 if (server_ready) {
174 // send message
175 auto req = make_message<MMDSTableRequest>(table, TABLESERVER_OP_PREPARE, reqid);
176 req->bl = mutation;
177 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
178 } else
179 dout(10) << "tableserver is not ready yet, deferring request" << dendl;
180 }
181
182 void MDSTableClient::commit(version_t tid, LogSegment *ls)
183 {
184 dout(10) << "commit " << tid << dendl;
185
186 ceph_assert(prepared_update.count(tid));
187 prepared_update.erase(tid);
188
189 ceph_assert(pending_commit.count(tid) == 0);
190 pending_commit[tid] = ls;
191 ls->pending_commit_tids[table].insert(tid);
192
193 notify_commit(tid);
194
195 ceph_assert(g_conf()->mds_kill_mdstable_at != 4);
196
197 if (server_ready) {
198 // send message
199 auto req = make_message<MMDSTableRequest>(table, TABLESERVER_OP_COMMIT, 0, tid);
200 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
201 } else
202 dout(10) << "tableserver is not ready yet, deferring request" << dendl;
203 }
204
205
206
207 // recovery
208
209 void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls)
210 {
211 dout(10) << "got_journaled_agree " << tid << dendl;
212 ls->pending_commit_tids[table].insert(tid);
213 pending_commit[tid] = ls;
214
215 notify_commit(tid);
216 }
217
218 void MDSTableClient::got_journaled_ack(version_t tid)
219 {
220 dout(10) << "got_journaled_ack " << tid << dendl;
221 if (pending_commit.count(tid)) {
222 pending_commit[tid]->pending_commit_tids[table].erase(tid);
223 pending_commit.erase(tid);
224 }
225 }
226
227 void MDSTableClient::resend_commits()
228 {
229 for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
230 p != pending_commit.end();
231 ++p) {
232 dout(10) << "resending commit on " << p->first << dendl;
233 auto req = make_message<MMDSTableRequest>(table, TABLESERVER_OP_COMMIT, 0, p->first);
234 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
235 }
236 }
237
238 void MDSTableClient::resend_prepares()
239 {
240 while (!waiting_for_reqid.empty()) {
241 pending_prepare[++last_reqid] = waiting_for_reqid.front();
242 waiting_for_reqid.pop_front();
243 }
244
245 for (map<uint64_t, _pending_prepare>::iterator p = pending_prepare.begin();
246 p != pending_prepare.end();
247 ++p) {
248 dout(10) << "resending prepare on " << p->first << dendl;
249 auto req = make_message<MMDSTableRequest>(table, TABLESERVER_OP_PREPARE, p->first);
250 req->bl = p->second.mutation;
251 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
252 }
253 }
254
255 void MDSTableClient::handle_mds_failure(mds_rank_t who)
256 {
257 if (who != mds->get_mds_map()->get_tableserver())
258 return; // do nothing.
259
260 dout(7) << "tableserver mds." << who << " fails" << dendl;
261 server_ready = false;
262 }