]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/SnapClient.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / mds / SnapClient.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSMap.h"
16#include "MDSRank.h"
17#include "msg/Messenger.h"
18#include "messages/MMDSTableRequest.h"
19#include "SnapClient.h"
20
21#include "common/config.h"
22#include "include/ceph_assert.h"
23
24#define dout_context g_ceph_context
25#define dout_subsys ceph_subsys_mds
26#undef dout_prefix
27#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".snapclient "
28
29void SnapClient::resend_queries()
30{
31 if (!waiting_for_version.empty() || (!synced && sync_reqid > 0)) {
32 version_t want;
33 if (!waiting_for_version.empty())
34 want = std::max<version_t>(cached_version, waiting_for_version.rbegin()->first);
35 else
36 want = std::max<version_t>(cached_version, 1);
37 refresh(want, NULL);
38 if (!synced)
39 sync_reqid = last_reqid;
40 }
41}
42
9f95a23c 43void SnapClient::handle_query_result(const cref_t<MMDSTableRequest> &m)
11fdf7f2
TL
44{
45 dout(10) << __func__ << " " << *m << dendl;
46
47 char type;
48 using ceph::decode;
49 auto p = m->bl.cbegin();
50 decode(type, p);
51
52 switch (type) {
53 case 'U': // uptodate
54 ceph_assert(cached_version == m->get_tid());
55 break;
56 case 'F': // full
57 {
58 decode(cached_snaps, p);
59 decode(cached_pending_update, p);
60 decode(cached_pending_destroy, p);
61
62 snapid_t last_created, last_destroyed;
63 decode(last_created, p);
64 decode(last_destroyed, p);
65
66 if (last_created > cached_last_created)
67 cached_last_created = last_created;
68 if (last_destroyed > cached_last_destroyed)
69 cached_last_destroyed = last_destroyed;
70
71 cached_version = m->get_tid();
72 }
73 break;
74 default:
75 ceph_abort();
76 };
77
78 if (!committing_tids.empty()) {
79 for (auto p = committing_tids.begin();
80 p != committing_tids.end() && *p <= cached_version; ) {
81 if (cached_pending_update.count(*p)) {
82 if (cached_pending_update[*p].snapid > cached_last_created)
83 cached_last_created = cached_pending_update[*p].snapid;
84 ++p;
85 } else if (cached_pending_destroy.count(*p)) {
86 if (cached_pending_destroy[*p].second > cached_last_destroyed)
87 cached_last_destroyed = cached_pending_destroy[*p].second;
88 ++p;
89 } else {
90 // pending update/destroy have been committed.
91 committing_tids.erase(p++);
92 }
93 }
94 }
95
96 if (m->op == TABLESERVER_OP_QUERY_REPLY && m->reqid >= sync_reqid)
97 synced = true;
98
99 if (synced && !waiting_for_version.empty()) {
100 MDSContext::vec finished;
101 while (!waiting_for_version.empty()) {
102 auto it = waiting_for_version.begin();
103 if (it->first > cached_version)
104 break;
105 auto& v = it->second;
106 finished.insert(finished.end(), v.begin(), v.end());
107 waiting_for_version.erase(it);
108 }
109 if (!finished.empty())
110 mds->queue_waiters(finished);
111 }
112}
113
9f95a23c 114void SnapClient::handle_notify_prep(const cref_t<MMDSTableRequest> &m)
11fdf7f2
TL
115{
116 dout(10) << __func__ << " " << *m << dendl;
117 handle_query_result(m);
9f95a23c 118 auto ack = make_message<MMDSTableRequest>(table, TABLESERVER_OP_NOTIFY_ACK, 0, m->get_tid());
11fdf7f2
TL
119 mds->send_message(ack, m->get_connection());
120}
121
122void SnapClient::notify_commit(version_t tid)
123{
124 dout(10) << __func__ << " tid " << tid << dendl;
125
126 ceph_assert(cached_version == 0 || cached_version >= tid);
127 if (cached_version == 0) {
128 committing_tids.insert(tid);
129 } else if (cached_pending_update.count(tid)) {
130 committing_tids.insert(tid);
131 if (cached_pending_update[tid].snapid > cached_last_created)
132 cached_last_created = cached_pending_update[tid].snapid;
133 } else if (cached_pending_destroy.count(tid)) {
134 committing_tids.insert(tid);
135 if (cached_pending_destroy[tid].second > cached_last_destroyed)
136 cached_last_destroyed = cached_pending_destroy[tid].second;
137 } else if (cached_version > tid) {
138 // no need to record the tid if it has already been committed.
139 } else {
140 ceph_abort();
141 }
142}
143
144void SnapClient::refresh(version_t want, MDSContext *onfinish)
145{
146 dout(10) << __func__ << " want " << want << dendl;
147
148 ceph_assert(want >= cached_version);
149 if (onfinish)
150 waiting_for_version[want].push_back(onfinish);
151
152 if (!server_ready)
153 return;
154
155 mds_rank_t ts = mds->mdsmap->get_tableserver();
9f95a23c 156 auto req = make_message<MMDSTableRequest>(table, TABLESERVER_OP_QUERY, ++last_reqid, 0);
11fdf7f2
TL
157 using ceph::encode;
158 char op = 'F';
159 encode(op, req->bl);
160 encode(cached_version, req->bl);
161 mds->send_message_mds(req, ts);
162}
163
164void SnapClient::sync(MDSContext *onfinish)
165{
166 dout(10) << __func__ << dendl;
167
168 refresh(std::max<version_t>(cached_version, 1), onfinish);
169 synced = false;
170 if (server_ready)
171 sync_reqid = last_reqid;
172 else
173 sync_reqid = (last_reqid == ~0ULL) ? 1 : last_reqid + 1;
174}
175
176void SnapClient::get_snaps(set<snapid_t>& result) const
177{
178 ceph_assert(cached_version > 0);
179 for (auto& p : cached_snaps)
180 result.insert(p.first);
181
182 for (auto tid : committing_tids) {
183 auto q = cached_pending_update.find(tid);
184 if (q != cached_pending_update.end())
185 result.insert(q->second.snapid);
186
187 auto r = cached_pending_destroy.find(tid);
188 if (r != cached_pending_destroy.end())
189 result.erase(r->second.first);
190 }
191}
192
193set<snapid_t> SnapClient::filter(const set<snapid_t>& snaps) const
194{
195 ceph_assert(cached_version > 0);
196 if (snaps.empty())
197 return snaps;
198
199 set<snapid_t> result;
200
201 for (auto p : snaps) {
202 if (cached_snaps.count(p))
203 result.insert(p);
204 }
205
206 for (auto tid : committing_tids) {
207 auto q = cached_pending_update.find(tid);
208 if (q != cached_pending_update.end()) {
209 if (snaps.count(q->second.snapid))
210 result.insert(q->second.snapid);
211 }
212
213 auto r = cached_pending_destroy.find(tid);
214 if (r != cached_pending_destroy.end())
215 result.erase(r->second.first);
216 }
217
218 dout(10) << __func__ << " " << snaps << " -> " << result << dendl;
219 return result;
220}
221
222const SnapInfo* SnapClient::get_snap_info(snapid_t snapid) const
223{
224 ceph_assert(cached_version > 0);
225
226 const SnapInfo* result = NULL;
227 auto it = cached_snaps.find(snapid);
228 if (it != cached_snaps.end())
229 result = &it->second;
230
231 for (auto tid : committing_tids) {
232 auto q = cached_pending_update.find(tid);
233 if (q != cached_pending_update.end() && q->second.snapid == snapid) {
234 result = &q->second;
235 break;
236 }
237
238 auto r = cached_pending_destroy.find(tid);
239 if (r != cached_pending_destroy.end() && r->second.first == snapid) {
240 result = NULL;
241 break;
242 }
243 }
244
245 dout(10) << __func__ << " snapid " << snapid << " -> " << result << dendl;
246 return result;
247}
248
249void SnapClient::get_snap_infos(map<snapid_t, const SnapInfo*>& infomap,
250 const set<snapid_t>& snaps) const
251{
252 ceph_assert(cached_version > 0);
253
254 if (snaps.empty())
255 return;
256
257 map<snapid_t, const SnapInfo*> result;
258 for (auto p : snaps) {
259 auto it = cached_snaps.find(p);
260 if (it != cached_snaps.end())
261 result[p] = &it->second;
262 }
263
264 for (auto tid : committing_tids) {
265 auto q = cached_pending_update.find(tid);
266 if (q != cached_pending_update.end()) {
267 if (snaps.count(q->second.snapid))
268 result[q->second.snapid] = &q->second;
269 }
270
271 auto r = cached_pending_destroy.find(tid);
272 if (r != cached_pending_destroy.end())
273 result.erase(r->second.first);
274 }
275
276 infomap.insert(result.begin(), result.end());
277}
278
279int SnapClient::dump_cache(Formatter *f) const
280{
281 if (!is_synced()) {
282 dout(5) << "dump_cache: not synced" << dendl;
283 return -EINVAL;
284 }
285
286 map<snapid_t, const SnapInfo*> snaps;
287 for (auto& p : cached_snaps)
288 snaps[p.first] = &p.second;
289
290 for (auto tid : committing_tids) {
291 auto q = cached_pending_update.find(tid);
292 if (q != cached_pending_update.end())
293 snaps[q->second.snapid] = &q->second;
294
295 auto r = cached_pending_destroy.find(tid);
296 if (r != cached_pending_destroy.end())
297 snaps.erase(r->second.first);
298 }
299
300 f->open_object_section("snapclient");
301
302 f->dump_int("last_created", get_last_created());
303 f->dump_int("last_destroyed", get_last_destroyed());
304
305 f->open_array_section("snaps");
306 for (auto p : snaps) {
307 f->open_object_section("snap");
308 p.second->dump(f);
309 f->close_section();
310 }
311 f->close_section();
312
313 f->close_section();
314
315 return 0;
316}