]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/SnapClient.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mds / SnapClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSMap.h"
16 #include "MDSRank.h"
17 #include "msg/Messenger.h"
18 #include "messages/MMDSTableRequest.h"
19 #include "SnapClient.h"
20
21 #include "common/config.h"
22 #include "include/ceph_assert.h"
23
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_mds
26 #undef dout_prefix
27 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".snapclient "
28
29 using namespace std;
30
31 void SnapClient::resend_queries()
32 {
33 if (!waiting_for_version.empty() || (!synced && sync_reqid > 0)) {
34 version_t want;
35 if (!waiting_for_version.empty())
36 want = std::max<version_t>(cached_version, waiting_for_version.rbegin()->first);
37 else
38 want = std::max<version_t>(cached_version, 1);
39 refresh(want, NULL);
40 if (!synced)
41 sync_reqid = last_reqid;
42 }
43 }
44
45 void SnapClient::handle_query_result(const cref_t<MMDSTableRequest> &m)
46 {
47 dout(10) << __func__ << " " << *m << dendl;
48
49 char type;
50 using ceph::decode;
51 auto p = m->bl.cbegin();
52 decode(type, p);
53
54 switch (type) {
55 case 'U': // uptodate
56 ceph_assert(cached_version == m->get_tid());
57 break;
58 case 'F': // full
59 {
60 decode(cached_snaps, p);
61 decode(cached_pending_update, p);
62 decode(cached_pending_destroy, p);
63
64 snapid_t last_created, last_destroyed;
65 decode(last_created, p);
66 decode(last_destroyed, p);
67
68 if (last_created > cached_last_created)
69 cached_last_created = last_created;
70 if (last_destroyed > cached_last_destroyed)
71 cached_last_destroyed = last_destroyed;
72
73 cached_version = m->get_tid();
74 }
75 break;
76 default:
77 ceph_abort();
78 };
79
80 if (!committing_tids.empty()) {
81 for (auto p = committing_tids.begin();
82 p != committing_tids.end() && *p <= cached_version; ) {
83 if (cached_pending_update.count(*p)) {
84 if (cached_pending_update[*p].snapid > cached_last_created)
85 cached_last_created = cached_pending_update[*p].snapid;
86 ++p;
87 } else if (cached_pending_destroy.count(*p)) {
88 if (cached_pending_destroy[*p].second > cached_last_destroyed)
89 cached_last_destroyed = cached_pending_destroy[*p].second;
90 ++p;
91 } else {
92 // pending update/destroy have been committed.
93 committing_tids.erase(p++);
94 }
95 }
96 }
97
98 if (m->op == TABLESERVER_OP_QUERY_REPLY && m->reqid >= sync_reqid)
99 synced = true;
100
101 if (synced && !waiting_for_version.empty()) {
102 MDSContext::vec finished;
103 while (!waiting_for_version.empty()) {
104 auto it = waiting_for_version.begin();
105 if (it->first > cached_version)
106 break;
107 auto& v = it->second;
108 finished.insert(finished.end(), v.begin(), v.end());
109 waiting_for_version.erase(it);
110 }
111 if (!finished.empty())
112 mds->queue_waiters(finished);
113 }
114 }
115
116 void SnapClient::handle_notify_prep(const cref_t<MMDSTableRequest> &m)
117 {
118 dout(10) << __func__ << " " << *m << dendl;
119 handle_query_result(m);
120 auto ack = make_message<MMDSTableRequest>(table, TABLESERVER_OP_NOTIFY_ACK, 0, m->get_tid());
121 mds->send_message(ack, m->get_connection());
122 }
123
124 void SnapClient::notify_commit(version_t tid)
125 {
126 dout(10) << __func__ << " tid " << tid << dendl;
127
128 ceph_assert(cached_version == 0 || cached_version >= tid);
129 if (cached_version == 0) {
130 committing_tids.insert(tid);
131 } else if (cached_pending_update.count(tid)) {
132 committing_tids.insert(tid);
133 if (cached_pending_update[tid].snapid > cached_last_created)
134 cached_last_created = cached_pending_update[tid].snapid;
135 } else if (cached_pending_destroy.count(tid)) {
136 committing_tids.insert(tid);
137 if (cached_pending_destroy[tid].second > cached_last_destroyed)
138 cached_last_destroyed = cached_pending_destroy[tid].second;
139 } else if (cached_version > tid) {
140 // no need to record the tid if it has already been committed.
141 } else {
142 ceph_abort();
143 }
144 }
145
146 void SnapClient::refresh(version_t want, MDSContext *onfinish)
147 {
148 dout(10) << __func__ << " want " << want << dendl;
149
150 ceph_assert(want >= cached_version);
151 if (onfinish)
152 waiting_for_version[want].push_back(onfinish);
153
154 if (!server_ready)
155 return;
156
157 mds_rank_t ts = mds->mdsmap->get_tableserver();
158 auto req = make_message<MMDSTableRequest>(table, TABLESERVER_OP_QUERY, ++last_reqid, 0);
159 using ceph::encode;
160 char op = 'F';
161 encode(op, req->bl);
162 encode(cached_version, req->bl);
163 mds->send_message_mds(req, ts);
164 }
165
166 void SnapClient::sync(MDSContext *onfinish)
167 {
168 dout(10) << __func__ << dendl;
169
170 refresh(std::max<version_t>(cached_version, 1), onfinish);
171 synced = false;
172 if (server_ready)
173 sync_reqid = last_reqid;
174 else
175 sync_reqid = (last_reqid == ~0ULL) ? 1 : last_reqid + 1;
176 }
177
178 void SnapClient::get_snaps(set<snapid_t>& result) const
179 {
180 ceph_assert(cached_version > 0);
181 for (auto& p : cached_snaps)
182 result.insert(p.first);
183
184 for (auto tid : committing_tids) {
185 auto q = cached_pending_update.find(tid);
186 if (q != cached_pending_update.end())
187 result.insert(q->second.snapid);
188
189 auto r = cached_pending_destroy.find(tid);
190 if (r != cached_pending_destroy.end())
191 result.erase(r->second.first);
192 }
193 }
194
195 set<snapid_t> SnapClient::filter(const set<snapid_t>& snaps) const
196 {
197 ceph_assert(cached_version > 0);
198 if (snaps.empty())
199 return snaps;
200
201 set<snapid_t> result;
202
203 for (auto p : snaps) {
204 if (cached_snaps.count(p))
205 result.insert(p);
206 }
207
208 for (auto tid : committing_tids) {
209 auto q = cached_pending_update.find(tid);
210 if (q != cached_pending_update.end()) {
211 if (snaps.count(q->second.snapid))
212 result.insert(q->second.snapid);
213 }
214
215 auto r = cached_pending_destroy.find(tid);
216 if (r != cached_pending_destroy.end())
217 result.erase(r->second.first);
218 }
219
220 dout(10) << __func__ << " " << snaps << " -> " << result << dendl;
221 return result;
222 }
223
224 const SnapInfo* SnapClient::get_snap_info(snapid_t snapid) const
225 {
226 ceph_assert(cached_version > 0);
227
228 const SnapInfo* result = NULL;
229 auto it = cached_snaps.find(snapid);
230 if (it != cached_snaps.end())
231 result = &it->second;
232
233 for (auto tid : committing_tids) {
234 auto q = cached_pending_update.find(tid);
235 if (q != cached_pending_update.end() && q->second.snapid == snapid) {
236 result = &q->second;
237 break;
238 }
239
240 auto r = cached_pending_destroy.find(tid);
241 if (r != cached_pending_destroy.end() && r->second.first == snapid) {
242 result = NULL;
243 break;
244 }
245 }
246
247 dout(10) << __func__ << " snapid " << snapid << " -> " << result << dendl;
248 return result;
249 }
250
251 void SnapClient::get_snap_infos(map<snapid_t, const SnapInfo*>& infomap,
252 const set<snapid_t>& snaps) const
253 {
254 ceph_assert(cached_version > 0);
255
256 if (snaps.empty())
257 return;
258
259 map<snapid_t, const SnapInfo*> result;
260 for (auto p : snaps) {
261 auto it = cached_snaps.find(p);
262 if (it != cached_snaps.end())
263 result[p] = &it->second;
264 }
265
266 for (auto tid : committing_tids) {
267 auto q = cached_pending_update.find(tid);
268 if (q != cached_pending_update.end()) {
269 if (snaps.count(q->second.snapid))
270 result[q->second.snapid] = &q->second;
271 }
272
273 auto r = cached_pending_destroy.find(tid);
274 if (r != cached_pending_destroy.end())
275 result.erase(r->second.first);
276 }
277
278 infomap.insert(result.begin(), result.end());
279 }
280
281 int SnapClient::dump_cache(Formatter *f) const
282 {
283 if (!is_synced()) {
284 dout(5) << "dump_cache: not synced" << dendl;
285 return -CEPHFS_EINVAL;
286 }
287
288 map<snapid_t, const SnapInfo*> snaps;
289 for (auto& p : cached_snaps)
290 snaps[p.first] = &p.second;
291
292 for (auto tid : committing_tids) {
293 auto q = cached_pending_update.find(tid);
294 if (q != cached_pending_update.end())
295 snaps[q->second.snapid] = &q->second;
296
297 auto r = cached_pending_destroy.find(tid);
298 if (r != cached_pending_destroy.end())
299 snaps.erase(r->second.first);
300 }
301
302 f->open_object_section("snapclient");
303
304 f->dump_int("last_created", get_last_created());
305 f->dump_int("last_destroyed", get_last_destroyed());
306
307 f->open_array_section("snaps");
308 for (auto p : snaps) {
309 f->open_object_section("snap");
310 p.second->dump(f);
311 f->close_section();
312 }
313 f->close_section();
314
315 f->close_section();
316
317 return 0;
318 }