]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSTableClient.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / mds / MDSTableClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <iostream>
16
17 #include "MDSMap.h"
18
19 #include "MDSContext.h"
20 #include "msg/Messenger.h"
21
22 #include "MDSRank.h"
23 #include "MDLog.h"
24 #include "LogSegment.h"
25
26 #include "MDSTableClient.h"
27 #include "events/ETableClient.h"
28
29 #include "messages/MMDSTableRequest.h"
30
31 #include "common/config.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mds
35 #undef dout_prefix
36 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".tableclient(" << get_mdstable_name(table) << ") "
37
38
39 class C_LoggedAck : public MDSLogContextBase {
40 MDSTableClient *tc;
41 version_t tid;
42 MDSRank *get_mds() override { return tc->mds; }
43 public:
44 C_LoggedAck(MDSTableClient *a, version_t t) : tc(a), tid(t) {}
45 void finish(int r) override {
46 tc->_logged_ack(tid);
47 }
48 };
49
50
51 void MDSTableClient::handle_request(class MMDSTableRequest *m)
52 {
53 dout(10) << "handle_request " << *m << dendl;
54 assert(m->table == table);
55
56 version_t tid = m->get_tid();
57 uint64_t reqid = m->reqid;
58
59 switch (m->op) {
60 case TABLESERVER_OP_QUERY_REPLY:
61 handle_query_result(m);
62 break;
63
64 case TABLESERVER_OP_AGREE:
65 if (pending_prepare.count(reqid)) {
66 dout(10) << "got agree on " << reqid << " atid " << tid << dendl;
67
68 assert(g_conf->mds_kill_mdstable_at != 3);
69
70 MDSInternalContextBase *onfinish = pending_prepare[reqid].onfinish;
71 *pending_prepare[reqid].ptid = tid;
72 if (pending_prepare[reqid].pbl)
73 *pending_prepare[reqid].pbl = m->bl;
74 pending_prepare.erase(reqid);
75 prepared_update[tid] = reqid;
76 if (onfinish) {
77 onfinish->complete(0);
78 }
79 }
80 else if (prepared_update.count(tid)) {
81 dout(10) << "got duplicated agree on " << reqid << " atid " << tid << dendl;
82 assert(prepared_update[tid] == reqid);
83 assert(!server_ready);
84 }
85 else if (pending_commit.count(tid)) {
86 dout(10) << "stray agree on " << reqid << " tid " << tid
87 << ", already committing, will resend COMMIT" << dendl;
88 assert(!server_ready);
89 // will re-send commit when receiving the server ready message
90 }
91 else {
92 dout(10) << "stray agree on " << reqid << " tid " << tid
93 << ", sending ROLLBACK" << dendl;
94 assert(!server_ready);
95 MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_ROLLBACK, 0, tid);
96 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
97 }
98 break;
99
100 case TABLESERVER_OP_ACK:
101 if (pending_commit.count(tid) &&
102 pending_commit[tid]->pending_commit_tids[table].count(tid)) {
103 dout(10) << "got ack on tid " << tid << ", logging" << dendl;
104
105 assert(g_conf->mds_kill_mdstable_at != 7);
106
107 // remove from committing list
108 pending_commit[tid]->pending_commit_tids[table].erase(tid);
109 pending_commit.erase(tid);
110
111 // log ACK.
112 mds->mdlog->start_submit_entry(new ETableClient(table, TABLESERVER_OP_ACK, tid),
113 new C_LoggedAck(this, tid));
114 } else {
115 dout(10) << "got stray ack on tid " << tid << ", ignoring" << dendl;
116 }
117 break;
118
119 case TABLESERVER_OP_SERVER_READY:
120 assert(!server_ready);
121 server_ready = true;
122
123 if (last_reqid == ~0ULL)
124 last_reqid = reqid;
125
126 resend_queries();
127 resend_prepares();
128 resend_commits();
129 break;
130
131 default:
132 assert(0 == "unrecognized mds_table_client request op");
133 }
134
135 m->put();
136 }
137
138
139 void MDSTableClient::_logged_ack(version_t tid)
140 {
141 dout(10) << "_logged_ack " << tid << dendl;
142
143 assert(g_conf->mds_kill_mdstable_at != 8);
144
145 // kick any waiters (LogSegment trim)
146 if (ack_waiters.count(tid)) {
147 dout(15) << "kicking ack waiters on tid " << tid << dendl;
148 mds->queue_waiters(ack_waiters[tid]);
149 ack_waiters.erase(tid);
150 }
151 }
152
153 void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl,
154 MDSInternalContextBase *onfinish)
155 {
156 if (last_reqid == ~0ULL) {
157 dout(10) << "tableserver is not ready yet, waiting for request id" << dendl;
158 waiting_for_reqid.push_back(_pending_prepare(onfinish, ptid, pbl, mutation));
159 return;
160 }
161
162 uint64_t reqid = ++last_reqid;
163 dout(10) << "_prepare " << reqid << dendl;
164
165 pending_prepare[reqid].mutation = mutation;
166 pending_prepare[reqid].ptid = ptid;
167 pending_prepare[reqid].pbl = pbl;
168 pending_prepare[reqid].onfinish = onfinish;
169
170 if (server_ready) {
171 // send message
172 MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, reqid);
173 req->bl = mutation;
174 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
175 } else
176 dout(10) << "tableserver is not ready yet, deferring request" << dendl;
177 }
178
179 void MDSTableClient::commit(version_t tid, LogSegment *ls)
180 {
181 dout(10) << "commit " << tid << dendl;
182
183 assert(prepared_update.count(tid));
184 prepared_update.erase(tid);
185
186 assert(pending_commit.count(tid) == 0);
187 pending_commit[tid] = ls;
188 ls->pending_commit_tids[table].insert(tid);
189
190 assert(g_conf->mds_kill_mdstable_at != 4);
191
192 if (server_ready) {
193 // send message
194 MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, tid);
195 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
196 } else
197 dout(10) << "tableserver is not ready yet, deferring request" << dendl;
198 }
199
200
201
202 // recovery
203
204 void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls)
205 {
206 dout(10) << "got_journaled_agree " << tid << dendl;
207 ls->pending_commit_tids[table].insert(tid);
208 pending_commit[tid] = ls;
209 }
210
211 void MDSTableClient::got_journaled_ack(version_t tid)
212 {
213 dout(10) << "got_journaled_ack " << tid << dendl;
214 if (pending_commit.count(tid)) {
215 pending_commit[tid]->pending_commit_tids[table].erase(tid);
216 pending_commit.erase(tid);
217 }
218 }
219
220 void MDSTableClient::resend_commits()
221 {
222 for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
223 p != pending_commit.end();
224 ++p) {
225 dout(10) << "resending commit on " << p->first << dendl;
226 MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, p->first);
227 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
228 }
229 }
230
231 void MDSTableClient::resend_prepares()
232 {
233 while (!waiting_for_reqid.empty()) {
234 pending_prepare[++last_reqid] = waiting_for_reqid.front();
235 waiting_for_reqid.pop_front();
236 }
237
238 for (map<uint64_t, _pending_prepare>::iterator p = pending_prepare.begin();
239 p != pending_prepare.end();
240 ++p) {
241 dout(10) << "resending prepare on " << p->first << dendl;
242 MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, p->first);
243 req->bl = p->second.mutation;
244 mds->send_message_mds(req, mds->get_mds_map()->get_tableserver());
245 }
246 }
247
248 void MDSTableClient::handle_mds_failure(mds_rank_t who)
249 {
250 if (who != mds->get_mds_map()->get_tableserver())
251 return; // do nothing.
252
253 dout(7) << "tableserver mds." << who << " fails" << dendl;
254 server_ready = false;
255 }