1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "msg/Messenger.h"
18 #include "messages/MMDSTableRequest.h"
19 #include "SnapClient.h"
21 #include "common/config.h"
22 #include "include/ceph_assert.h"
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_mds
27 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".snapclient "
31 void SnapClient::resend_queries()
33 if (!waiting_for_version
.empty() || (!synced
&& sync_reqid
> 0)) {
35 if (!waiting_for_version
.empty())
36 want
= std::max
<version_t
>(cached_version
, waiting_for_version
.rbegin()->first
);
38 want
= std::max
<version_t
>(cached_version
, 1);
41 sync_reqid
= last_reqid
;
45 void SnapClient::handle_query_result(const cref_t
<MMDSTableRequest
> &m
)
47 dout(10) << __func__
<< " " << *m
<< dendl
;
51 auto p
= m
->bl
.cbegin();
56 ceph_assert(cached_version
== m
->get_tid());
60 decode(cached_snaps
, p
);
61 decode(cached_pending_update
, p
);
62 decode(cached_pending_destroy
, p
);
64 snapid_t last_created
, last_destroyed
;
65 decode(last_created
, p
);
66 decode(last_destroyed
, p
);
68 if (last_created
> cached_last_created
)
69 cached_last_created
= last_created
;
70 if (last_destroyed
> cached_last_destroyed
)
71 cached_last_destroyed
= last_destroyed
;
73 cached_version
= m
->get_tid();
80 if (!committing_tids
.empty()) {
81 for (auto p
= committing_tids
.begin();
82 p
!= committing_tids
.end() && *p
<= cached_version
; ) {
83 if (cached_pending_update
.count(*p
)) {
84 if (cached_pending_update
[*p
].snapid
> cached_last_created
)
85 cached_last_created
= cached_pending_update
[*p
].snapid
;
87 } else if (cached_pending_destroy
.count(*p
)) {
88 if (cached_pending_destroy
[*p
].second
> cached_last_destroyed
)
89 cached_last_destroyed
= cached_pending_destroy
[*p
].second
;
92 // pending update/destroy have been committed.
93 committing_tids
.erase(p
++);
98 if (m
->op
== TABLESERVER_OP_QUERY_REPLY
&& m
->reqid
>= sync_reqid
)
101 if (synced
&& !waiting_for_version
.empty()) {
102 MDSContext::vec finished
;
103 while (!waiting_for_version
.empty()) {
104 auto it
= waiting_for_version
.begin();
105 if (it
->first
> cached_version
)
107 auto& v
= it
->second
;
108 finished
.insert(finished
.end(), v
.begin(), v
.end());
109 waiting_for_version
.erase(it
);
111 if (!finished
.empty())
112 mds
->queue_waiters(finished
);
116 void SnapClient::handle_notify_prep(const cref_t
<MMDSTableRequest
> &m
)
118 dout(10) << __func__
<< " " << *m
<< dendl
;
119 handle_query_result(m
);
120 auto ack
= make_message
<MMDSTableRequest
>(table
, TABLESERVER_OP_NOTIFY_ACK
, 0, m
->get_tid());
121 mds
->send_message(ack
, m
->get_connection());
124 void SnapClient::notify_commit(version_t tid
)
126 dout(10) << __func__
<< " tid " << tid
<< dendl
;
128 ceph_assert(cached_version
== 0 || cached_version
>= tid
);
129 if (cached_version
== 0) {
130 committing_tids
.insert(tid
);
131 } else if (cached_pending_update
.count(tid
)) {
132 committing_tids
.insert(tid
);
133 if (cached_pending_update
[tid
].snapid
> cached_last_created
)
134 cached_last_created
= cached_pending_update
[tid
].snapid
;
135 } else if (cached_pending_destroy
.count(tid
)) {
136 committing_tids
.insert(tid
);
137 if (cached_pending_destroy
[tid
].second
> cached_last_destroyed
)
138 cached_last_destroyed
= cached_pending_destroy
[tid
].second
;
139 } else if (cached_version
> tid
) {
140 // no need to record the tid if it has already been committed.
146 void SnapClient::refresh(version_t want
, MDSContext
*onfinish
)
148 dout(10) << __func__
<< " want " << want
<< dendl
;
150 ceph_assert(want
>= cached_version
);
152 waiting_for_version
[want
].push_back(onfinish
);
157 mds_rank_t ts
= mds
->mdsmap
->get_tableserver();
158 auto req
= make_message
<MMDSTableRequest
>(table
, TABLESERVER_OP_QUERY
, ++last_reqid
, 0);
162 encode(cached_version
, req
->bl
);
163 mds
->send_message_mds(req
, ts
);
166 void SnapClient::sync(MDSContext
*onfinish
)
168 dout(10) << __func__
<< dendl
;
170 refresh(std::max
<version_t
>(cached_version
, 1), onfinish
);
173 sync_reqid
= last_reqid
;
175 sync_reqid
= (last_reqid
== ~0ULL) ? 1 : last_reqid
+ 1;
178 void SnapClient::get_snaps(set
<snapid_t
>& result
) const
180 ceph_assert(cached_version
> 0);
181 for (auto& p
: cached_snaps
)
182 result
.insert(p
.first
);
184 for (auto tid
: committing_tids
) {
185 auto q
= cached_pending_update
.find(tid
);
186 if (q
!= cached_pending_update
.end())
187 result
.insert(q
->second
.snapid
);
189 auto r
= cached_pending_destroy
.find(tid
);
190 if (r
!= cached_pending_destroy
.end())
191 result
.erase(r
->second
.first
);
195 set
<snapid_t
> SnapClient::filter(const set
<snapid_t
>& snaps
) const
197 ceph_assert(cached_version
> 0);
201 set
<snapid_t
> result
;
203 for (auto p
: snaps
) {
204 if (cached_snaps
.count(p
))
208 for (auto tid
: committing_tids
) {
209 auto q
= cached_pending_update
.find(tid
);
210 if (q
!= cached_pending_update
.end()) {
211 if (snaps
.count(q
->second
.snapid
))
212 result
.insert(q
->second
.snapid
);
215 auto r
= cached_pending_destroy
.find(tid
);
216 if (r
!= cached_pending_destroy
.end())
217 result
.erase(r
->second
.first
);
220 dout(10) << __func__
<< " " << snaps
<< " -> " << result
<< dendl
;
224 const SnapInfo
* SnapClient::get_snap_info(snapid_t snapid
) const
226 ceph_assert(cached_version
> 0);
228 const SnapInfo
* result
= NULL
;
229 auto it
= cached_snaps
.find(snapid
);
230 if (it
!= cached_snaps
.end())
231 result
= &it
->second
;
233 for (auto tid
: committing_tids
) {
234 auto q
= cached_pending_update
.find(tid
);
235 if (q
!= cached_pending_update
.end() && q
->second
.snapid
== snapid
) {
240 auto r
= cached_pending_destroy
.find(tid
);
241 if (r
!= cached_pending_destroy
.end() && r
->second
.first
== snapid
) {
247 dout(10) << __func__
<< " snapid " << snapid
<< " -> " << result
<< dendl
;
251 void SnapClient::get_snap_infos(map
<snapid_t
, const SnapInfo
*>& infomap
,
252 const set
<snapid_t
>& snaps
) const
254 ceph_assert(cached_version
> 0);
259 map
<snapid_t
, const SnapInfo
*> result
;
260 for (auto p
: snaps
) {
261 auto it
= cached_snaps
.find(p
);
262 if (it
!= cached_snaps
.end())
263 result
[p
] = &it
->second
;
266 for (auto tid
: committing_tids
) {
267 auto q
= cached_pending_update
.find(tid
);
268 if (q
!= cached_pending_update
.end()) {
269 if (snaps
.count(q
->second
.snapid
))
270 result
[q
->second
.snapid
] = &q
->second
;
273 auto r
= cached_pending_destroy
.find(tid
);
274 if (r
!= cached_pending_destroy
.end())
275 result
.erase(r
->second
.first
);
278 infomap
.insert(result
.begin(), result
.end());
281 int SnapClient::dump_cache(Formatter
*f
) const
284 dout(5) << "dump_cache: not synced" << dendl
;
285 return -CEPHFS_EINVAL
;
288 map
<snapid_t
, const SnapInfo
*> snaps
;
289 for (auto& p
: cached_snaps
)
290 snaps
[p
.first
] = &p
.second
;
292 for (auto tid
: committing_tids
) {
293 auto q
= cached_pending_update
.find(tid
);
294 if (q
!= cached_pending_update
.end())
295 snaps
[q
->second
.snapid
] = &q
->second
;
297 auto r
= cached_pending_destroy
.find(tid
);
298 if (r
!= cached_pending_destroy
.end())
299 snaps
.erase(r
->second
.first
);
302 f
->open_object_section("snapclient");
304 f
->dump_int("last_created", get_last_created());
305 f
->dump_int("last_destroyed", get_last_destroyed());
307 f
->open_array_section("snaps");
308 for (auto p
: snaps
) {
309 f
->open_object_section("snap");