]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include <iostream> | |
16 | ||
17 | #include "MDSMap.h" | |
18 | ||
19 | #include "MDSContext.h" | |
20 | #include "msg/Messenger.h" | |
21 | ||
22 | #include "MDSRank.h" | |
23 | #include "MDLog.h" | |
24 | #include "LogSegment.h" | |
25 | ||
26 | #include "MDSTableClient.h" | |
27 | #include "events/ETableClient.h" | |
28 | ||
29 | #include "messages/MMDSTableRequest.h" | |
30 | ||
31 | #include "common/config.h" | |
32 | ||
33 | #define dout_context g_ceph_context | |
34 | #define dout_subsys ceph_subsys_mds | |
35 | #undef dout_prefix | |
36 | #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".tableclient(" << get_mdstable_name(table) << ") " | |
37 | ||
38 | ||
39 | class C_LoggedAck : public MDSLogContextBase { | |
40 | MDSTableClient *tc; | |
41 | version_t tid; | |
42 | MDSRank *get_mds() override { return tc->mds; } | |
43 | public: | |
44 | C_LoggedAck(MDSTableClient *a, version_t t) : tc(a), tid(t) {} | |
45 | void finish(int r) override { | |
46 | tc->_logged_ack(tid); | |
47 | } | |
48 | }; | |
49 | ||
50 | ||
51 | void MDSTableClient::handle_request(class MMDSTableRequest *m) | |
52 | { | |
53 | dout(10) << "handle_request " << *m << dendl; | |
54 | assert(m->table == table); | |
55 | ||
56 | version_t tid = m->get_tid(); | |
57 | uint64_t reqid = m->reqid; | |
58 | ||
59 | switch (m->op) { | |
60 | case TABLESERVER_OP_QUERY_REPLY: | |
61 | handle_query_result(m); | |
62 | break; | |
63 | ||
64 | case TABLESERVER_OP_AGREE: | |
65 | if (pending_prepare.count(reqid)) { | |
66 | dout(10) << "got agree on " << reqid << " atid " << tid << dendl; | |
67 | ||
68 | assert(g_conf->mds_kill_mdstable_at != 3); | |
69 | ||
70 | MDSInternalContextBase *onfinish = pending_prepare[reqid].onfinish; | |
71 | *pending_prepare[reqid].ptid = tid; | |
72 | if (pending_prepare[reqid].pbl) | |
73 | *pending_prepare[reqid].pbl = m->bl; | |
74 | pending_prepare.erase(reqid); | |
75 | prepared_update[tid] = reqid; | |
76 | if (onfinish) { | |
77 | onfinish->complete(0); | |
78 | } | |
79 | } | |
80 | else if (prepared_update.count(tid)) { | |
81 | dout(10) << "got duplicated agree on " << reqid << " atid " << tid << dendl; | |
82 | assert(prepared_update[tid] == reqid); | |
83 | assert(!server_ready); | |
84 | } | |
85 | else if (pending_commit.count(tid)) { | |
86 | dout(10) << "stray agree on " << reqid << " tid " << tid | |
87 | << ", already committing, will resend COMMIT" << dendl; | |
88 | assert(!server_ready); | |
89 | // will re-send commit when receiving the server ready message | |
90 | } | |
91 | else { | |
92 | dout(10) << "stray agree on " << reqid << " tid " << tid | |
93 | << ", sending ROLLBACK" << dendl; | |
94 | assert(!server_ready); | |
95 | MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_ROLLBACK, 0, tid); | |
96 | mds->send_message_mds(req, mds->get_mds_map()->get_tableserver()); | |
97 | } | |
98 | break; | |
99 | ||
100 | case TABLESERVER_OP_ACK: | |
101 | if (pending_commit.count(tid) && | |
102 | pending_commit[tid]->pending_commit_tids[table].count(tid)) { | |
103 | dout(10) << "got ack on tid " << tid << ", logging" << dendl; | |
104 | ||
105 | assert(g_conf->mds_kill_mdstable_at != 7); | |
106 | ||
107 | // remove from committing list | |
108 | pending_commit[tid]->pending_commit_tids[table].erase(tid); | |
109 | pending_commit.erase(tid); | |
110 | ||
111 | // log ACK. | |
112 | mds->mdlog->start_submit_entry(new ETableClient(table, TABLESERVER_OP_ACK, tid), | |
113 | new C_LoggedAck(this, tid)); | |
114 | } else { | |
115 | dout(10) << "got stray ack on tid " << tid << ", ignoring" << dendl; | |
116 | } | |
117 | break; | |
118 | ||
119 | case TABLESERVER_OP_SERVER_READY: | |
120 | assert(!server_ready); | |
121 | server_ready = true; | |
122 | ||
123 | if (last_reqid == ~0ULL) | |
124 | last_reqid = reqid; | |
125 | ||
126 | resend_queries(); | |
127 | resend_prepares(); | |
128 | resend_commits(); | |
129 | break; | |
130 | ||
131 | default: | |
132 | assert(0 == "unrecognized mds_table_client request op"); | |
133 | } | |
134 | ||
135 | m->put(); | |
136 | } | |
137 | ||
138 | ||
139 | void MDSTableClient::_logged_ack(version_t tid) | |
140 | { | |
141 | dout(10) << "_logged_ack " << tid << dendl; | |
142 | ||
143 | assert(g_conf->mds_kill_mdstable_at != 8); | |
144 | ||
145 | // kick any waiters (LogSegment trim) | |
146 | if (ack_waiters.count(tid)) { | |
147 | dout(15) << "kicking ack waiters on tid " << tid << dendl; | |
148 | mds->queue_waiters(ack_waiters[tid]); | |
149 | ack_waiters.erase(tid); | |
150 | } | |
151 | } | |
152 | ||
153 | void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl, | |
154 | MDSInternalContextBase *onfinish) | |
155 | { | |
156 | if (last_reqid == ~0ULL) { | |
157 | dout(10) << "tableserver is not ready yet, waiting for request id" << dendl; | |
158 | waiting_for_reqid.push_back(_pending_prepare(onfinish, ptid, pbl, mutation)); | |
159 | return; | |
160 | } | |
161 | ||
162 | uint64_t reqid = ++last_reqid; | |
163 | dout(10) << "_prepare " << reqid << dendl; | |
164 | ||
165 | pending_prepare[reqid].mutation = mutation; | |
166 | pending_prepare[reqid].ptid = ptid; | |
167 | pending_prepare[reqid].pbl = pbl; | |
168 | pending_prepare[reqid].onfinish = onfinish; | |
169 | ||
170 | if (server_ready) { | |
171 | // send message | |
172 | MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, reqid); | |
173 | req->bl = mutation; | |
174 | mds->send_message_mds(req, mds->get_mds_map()->get_tableserver()); | |
175 | } else | |
176 | dout(10) << "tableserver is not ready yet, deferring request" << dendl; | |
177 | } | |
178 | ||
179 | void MDSTableClient::commit(version_t tid, LogSegment *ls) | |
180 | { | |
181 | dout(10) << "commit " << tid << dendl; | |
182 | ||
183 | assert(prepared_update.count(tid)); | |
184 | prepared_update.erase(tid); | |
185 | ||
186 | assert(pending_commit.count(tid) == 0); | |
187 | pending_commit[tid] = ls; | |
188 | ls->pending_commit_tids[table].insert(tid); | |
189 | ||
190 | assert(g_conf->mds_kill_mdstable_at != 4); | |
191 | ||
192 | if (server_ready) { | |
193 | // send message | |
194 | MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, tid); | |
195 | mds->send_message_mds(req, mds->get_mds_map()->get_tableserver()); | |
196 | } else | |
197 | dout(10) << "tableserver is not ready yet, deferring request" << dendl; | |
198 | } | |
199 | ||
200 | ||
201 | ||
202 | // recovery | |
203 | ||
204 | void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls) | |
205 | { | |
206 | dout(10) << "got_journaled_agree " << tid << dendl; | |
207 | ls->pending_commit_tids[table].insert(tid); | |
208 | pending_commit[tid] = ls; | |
209 | } | |
210 | ||
211 | void MDSTableClient::got_journaled_ack(version_t tid) | |
212 | { | |
213 | dout(10) << "got_journaled_ack " << tid << dendl; | |
214 | if (pending_commit.count(tid)) { | |
215 | pending_commit[tid]->pending_commit_tids[table].erase(tid); | |
216 | pending_commit.erase(tid); | |
217 | } | |
218 | } | |
219 | ||
220 | void MDSTableClient::resend_commits() | |
221 | { | |
222 | for (map<version_t,LogSegment*>::iterator p = pending_commit.begin(); | |
223 | p != pending_commit.end(); | |
224 | ++p) { | |
225 | dout(10) << "resending commit on " << p->first << dendl; | |
226 | MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, p->first); | |
227 | mds->send_message_mds(req, mds->get_mds_map()->get_tableserver()); | |
228 | } | |
229 | } | |
230 | ||
231 | void MDSTableClient::resend_prepares() | |
232 | { | |
233 | while (!waiting_for_reqid.empty()) { | |
234 | pending_prepare[++last_reqid] = waiting_for_reqid.front(); | |
235 | waiting_for_reqid.pop_front(); | |
236 | } | |
237 | ||
238 | for (map<uint64_t, _pending_prepare>::iterator p = pending_prepare.begin(); | |
239 | p != pending_prepare.end(); | |
240 | ++p) { | |
241 | dout(10) << "resending prepare on " << p->first << dendl; | |
242 | MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, p->first); | |
243 | req->bl = p->second.mutation; | |
244 | mds->send_message_mds(req, mds->get_mds_map()->get_tableserver()); | |
245 | } | |
246 | } | |
247 | ||
248 | void MDSTableClient::handle_mds_failure(mds_rank_t who) | |
249 | { | |
250 | if (who != mds->get_mds_map()->get_tableserver()) | |
251 | return; // do nothing. | |
252 | ||
253 | dout(7) << "tableserver mds." << who << " fails" << dendl; | |
254 | server_ready = false; | |
255 | } |