]>
Commit | Line | Data |
---|---|---|
b32b8144 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
b32b8144 FG |
4 | /* |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2017 Red Hat, Inc | |
8 | * | |
9 | * Author: Casey Bodley <cbodley@redhat.com> | |
10 | * | |
11 | * This is free software; you can redistribute it and/or | |
12 | * modify it under the terms of the GNU Lesser General Public | |
13 | * License version 2.1, as published by the Free Software | |
14 | * Foundation. See file COPYING. | |
15 | */ | |
16 | ||
17 | #include <mutex> | |
18 | #include <boost/circular_buffer.hpp> | |
19 | #include <boost/container/flat_map.hpp> | |
20 | ||
f64942e4 | 21 | #include "include/scope_guard.h" |
b32b8144 FG |
22 | #include "common/bounded_key_counter.h" |
23 | #include "common/errno.h" | |
9f95a23c | 24 | #include "rgw_trim_bilog.h" |
b32b8144 FG |
25 | #include "rgw_cr_rados.h" |
26 | #include "rgw_cr_rest.h" | |
9f95a23c | 27 | #include "rgw_cr_tools.h" |
b32b8144 FG |
28 | #include "rgw_data_sync.h" |
29 | #include "rgw_metadata.h" | |
9f95a23c | 30 | #include "rgw_sal.h" |
11fdf7f2 | 31 | #include "rgw_zone.h" |
b32b8144 | 32 | #include "rgw_sync.h" |
9f95a23c | 33 | #include "rgw_bucket.h" |
b32b8144 | 34 | |
11fdf7f2 | 35 | #include "services/svc_zone.h" |
9f95a23c | 36 | #include "services/svc_meta.h" |
11fdf7f2 | 37 | |
b32b8144 | 38 | #include <boost/asio/yield.hpp> |
11fdf7f2 | 39 | #include "include/ceph_assert.h" |
b32b8144 FG |
40 | |
41 | #define dout_subsys ceph_subsys_rgw | |
42 | ||
43 | #undef dout_prefix | |
44 | #define dout_prefix (*_dout << "trim: ") | |
45 | ||
20effc67 TL |
46 | using namespace std; |
47 | ||
b32b8144 FG |
48 | using rgw::BucketTrimConfig; |
49 | using BucketChangeCounter = BoundedKeyCounter<std::string, int>; | |
50 | ||
51 | const std::string rgw::BucketTrimStatus::oid = "bilog.trim"; | |
52 | using rgw::BucketTrimStatus; | |
53 | ||
54 | ||
55 | // watch/notify api for gateways to coordinate about which buckets to trim | |
56 | enum TrimNotifyType { | |
57 | NotifyTrimCounters = 0, | |
58 | NotifyTrimComplete, | |
59 | }; | |
60 | WRITE_RAW_ENCODER(TrimNotifyType); | |
61 | ||
62 | struct TrimNotifyHandler { | |
63 | virtual ~TrimNotifyHandler() = default; | |
64 | ||
11fdf7f2 | 65 | virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0; |
b32b8144 FG |
66 | }; |
67 | ||
68 | /// api to share the bucket trim counters between gateways in the same zone. | |
69 | /// each gateway will process different datalog shards, so the gateway that runs | |
70 | /// the trim process needs to accumulate their counters | |
71 | struct TrimCounters { | |
72 | /// counter for a single bucket | |
73 | struct BucketCounter { | |
74 | std::string bucket; //< bucket instance metadata key | |
75 | int count{0}; | |
76 | ||
77 | BucketCounter() = default; | |
78 | BucketCounter(const std::string& bucket, int count) | |
79 | : bucket(bucket), count(count) {} | |
80 | ||
81 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 82 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
83 | }; |
84 | using Vector = std::vector<BucketCounter>; | |
85 | ||
86 | /// request bucket trim counters from peer gateways | |
87 | struct Request { | |
88 | uint16_t max_buckets; //< maximum number of bucket counters to return | |
89 | ||
90 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 91 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
92 | }; |
93 | ||
94 | /// return the current bucket trim counters | |
95 | struct Response { | |
96 | Vector bucket_counters; | |
97 | ||
98 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 99 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
100 | }; |
101 | ||
102 | /// server interface to query the hottest buckets | |
103 | struct Server { | |
104 | virtual ~Server() = default; | |
105 | ||
106 | virtual void get_bucket_counters(int count, Vector& counters) = 0; | |
107 | virtual void reset_bucket_counters() = 0; | |
108 | }; | |
109 | ||
110 | /// notify handler | |
111 | class Handler : public TrimNotifyHandler { | |
112 | Server *const server; | |
113 | public: | |
11fdf7f2 | 114 | explicit Handler(Server *server) : server(server) {} |
b32b8144 | 115 | |
11fdf7f2 | 116 | void handle(bufferlist::const_iterator& input, bufferlist& output) override; |
b32b8144 FG |
117 | }; |
118 | }; | |
119 | std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs) | |
120 | { | |
121 | return out << rhs.bucket << ":" << rhs.count; | |
122 | } | |
123 | ||
124 | void TrimCounters::BucketCounter::encode(bufferlist& bl) const | |
125 | { | |
11fdf7f2 | 126 | using ceph::encode; |
b32b8144 | 127 | // no versioning to save space |
11fdf7f2 TL |
128 | encode(bucket, bl); |
129 | encode(count, bl); | |
b32b8144 | 130 | } |
11fdf7f2 | 131 | void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p) |
b32b8144 | 132 | { |
11fdf7f2 TL |
133 | using ceph::decode; |
134 | decode(bucket, p); | |
135 | decode(count, p); | |
b32b8144 FG |
136 | } |
137 | WRITE_CLASS_ENCODER(TrimCounters::BucketCounter); | |
138 | ||
139 | void TrimCounters::Request::encode(bufferlist& bl) const | |
140 | { | |
141 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 142 | encode(max_buckets, bl); |
b32b8144 FG |
143 | ENCODE_FINISH(bl); |
144 | } | |
11fdf7f2 | 145 | void TrimCounters::Request::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
146 | { |
147 | DECODE_START(1, p); | |
11fdf7f2 | 148 | decode(max_buckets, p); |
b32b8144 FG |
149 | DECODE_FINISH(p); |
150 | } | |
151 | WRITE_CLASS_ENCODER(TrimCounters::Request); | |
152 | ||
153 | void TrimCounters::Response::encode(bufferlist& bl) const | |
154 | { | |
155 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 156 | encode(bucket_counters, bl); |
b32b8144 FG |
157 | ENCODE_FINISH(bl); |
158 | } | |
11fdf7f2 | 159 | void TrimCounters::Response::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
160 | { |
161 | DECODE_START(1, p); | |
11fdf7f2 | 162 | decode(bucket_counters, p); |
b32b8144 FG |
163 | DECODE_FINISH(p); |
164 | } | |
165 | WRITE_CLASS_ENCODER(TrimCounters::Response); | |
166 | ||
11fdf7f2 | 167 | void TrimCounters::Handler::handle(bufferlist::const_iterator& input, |
b32b8144 FG |
168 | bufferlist& output) |
169 | { | |
170 | Request request; | |
11fdf7f2 | 171 | decode(request, input); |
b32b8144 FG |
172 | auto count = std::min<uint16_t>(request.max_buckets, 128); |
173 | ||
174 | Response response; | |
175 | server->get_bucket_counters(count, response.bucket_counters); | |
11fdf7f2 | 176 | encode(response, output); |
b32b8144 FG |
177 | } |
178 | ||
179 | /// api to notify peer gateways that trim has completed and their bucket change | |
180 | /// counters can be reset | |
181 | struct TrimComplete { | |
182 | struct Request { | |
183 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 184 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
185 | }; |
186 | struct Response { | |
187 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 188 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
189 | }; |
190 | ||
191 | /// server interface to reset bucket counters | |
192 | using Server = TrimCounters::Server; | |
193 | ||
194 | /// notify handler | |
195 | class Handler : public TrimNotifyHandler { | |
196 | Server *const server; | |
197 | public: | |
11fdf7f2 | 198 | explicit Handler(Server *server) : server(server) {} |
b32b8144 | 199 | |
11fdf7f2 | 200 | void handle(bufferlist::const_iterator& input, bufferlist& output) override; |
b32b8144 FG |
201 | }; |
202 | }; | |
203 | ||
204 | void TrimComplete::Request::encode(bufferlist& bl) const | |
205 | { | |
206 | ENCODE_START(1, 1, bl); | |
207 | ENCODE_FINISH(bl); | |
208 | } | |
11fdf7f2 | 209 | void TrimComplete::Request::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
210 | { |
211 | DECODE_START(1, p); | |
212 | DECODE_FINISH(p); | |
213 | } | |
214 | WRITE_CLASS_ENCODER(TrimComplete::Request); | |
215 | ||
216 | void TrimComplete::Response::encode(bufferlist& bl) const | |
217 | { | |
218 | ENCODE_START(1, 1, bl); | |
219 | ENCODE_FINISH(bl); | |
220 | } | |
11fdf7f2 | 221 | void TrimComplete::Response::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
222 | { |
223 | DECODE_START(1, p); | |
224 | DECODE_FINISH(p); | |
225 | } | |
226 | WRITE_CLASS_ENCODER(TrimComplete::Response); | |
227 | ||
11fdf7f2 | 228 | void TrimComplete::Handler::handle(bufferlist::const_iterator& input, |
b32b8144 FG |
229 | bufferlist& output) |
230 | { | |
231 | Request request; | |
11fdf7f2 | 232 | decode(request, input); |
b32b8144 FG |
233 | |
234 | server->reset_bucket_counters(); | |
235 | ||
236 | Response response; | |
11fdf7f2 | 237 | encode(response, output); |
b32b8144 FG |
238 | } |
239 | ||
240 | ||
241 | /// rados watcher for bucket trim notifications | |
242 | class BucketTrimWatcher : public librados::WatchCtx2 { | |
20effc67 | 243 | rgw::sal::RadosStore* const store; |
b32b8144 FG |
244 | const rgw_raw_obj& obj; |
245 | rgw_rados_ref ref; | |
246 | uint64_t handle{0}; | |
247 | ||
248 | using HandlerPtr = std::unique_ptr<TrimNotifyHandler>; | |
249 | boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers; | |
250 | ||
251 | public: | |
20effc67 | 252 | BucketTrimWatcher(rgw::sal::RadosStore* store, const rgw_raw_obj& obj, |
b32b8144 FG |
253 | TrimCounters::Server *counters) |
254 | : store(store), obj(obj) { | |
255 | handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); | |
256 | handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters)); | |
257 | } | |
258 | ||
259 | ~BucketTrimWatcher() { | |
260 | stop(); | |
261 | } | |
262 | ||
b3b6e05e TL |
263 | int start(const DoutPrefixProvider *dpp) { |
264 | int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); | |
b32b8144 FG |
265 | if (r < 0) { |
266 | return r; | |
267 | } | |
268 | ||
269 | // register a watch on the realm's control object | |
9f95a23c | 270 | r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); |
b32b8144 FG |
271 | if (r == -ENOENT) { |
272 | constexpr bool exclusive = true; | |
9f95a23c | 273 | r = ref.pool.ioctx().create(ref.obj.oid, exclusive); |
b32b8144 | 274 | if (r == -EEXIST || r == 0) { |
9f95a23c | 275 | r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); |
b32b8144 FG |
276 | } |
277 | } | |
278 | if (r < 0) { | |
b3b6e05e | 279 | ldpp_dout(dpp, -1) << "Failed to watch " << ref.obj |
b32b8144 | 280 | << " with " << cpp_strerror(-r) << dendl; |
9f95a23c | 281 | ref.pool.ioctx().close(); |
b32b8144 FG |
282 | return r; |
283 | } | |
284 | ||
b3b6e05e | 285 | ldpp_dout(dpp, 10) << "Watching " << ref.obj.oid << dendl; |
b32b8144 FG |
286 | return 0; |
287 | } | |
288 | ||
289 | int restart() { | |
9f95a23c | 290 | int r = ref.pool.ioctx().unwatch2(handle); |
b32b8144 | 291 | if (r < 0) { |
11fdf7f2 | 292 | lderr(store->ctx()) << "Failed to unwatch on " << ref.obj |
b32b8144 FG |
293 | << " with " << cpp_strerror(-r) << dendl; |
294 | } | |
9f95a23c | 295 | r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); |
b32b8144 | 296 | if (r < 0) { |
11fdf7f2 | 297 | lderr(store->ctx()) << "Failed to restart watch on " << ref.obj |
b32b8144 | 298 | << " with " << cpp_strerror(-r) << dendl; |
9f95a23c | 299 | ref.pool.ioctx().close(); |
b32b8144 FG |
300 | } |
301 | return r; | |
302 | } | |
303 | ||
304 | void stop() { | |
305 | if (handle) { | |
9f95a23c TL |
306 | ref.pool.ioctx().unwatch2(handle); |
307 | ref.pool.ioctx().close(); | |
b32b8144 FG |
308 | } |
309 | } | |
310 | ||
311 | /// respond to bucket trim notifications | |
312 | void handle_notify(uint64_t notify_id, uint64_t cookie, | |
313 | uint64_t notifier_id, bufferlist& bl) override { | |
314 | if (cookie != handle) { | |
315 | return; | |
316 | } | |
317 | bufferlist reply; | |
318 | try { | |
11fdf7f2 | 319 | auto p = bl.cbegin(); |
b32b8144 | 320 | TrimNotifyType type; |
11fdf7f2 | 321 | decode(type, p); |
b32b8144 FG |
322 | |
323 | auto handler = handlers.find(type); | |
324 | if (handler != handlers.end()) { | |
325 | handler->second->handle(p, reply); | |
326 | } else { | |
327 | lderr(store->ctx()) << "no handler for notify type " << type << dendl; | |
328 | } | |
329 | } catch (const buffer::error& e) { | |
330 | lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl; | |
331 | } | |
9f95a23c | 332 | ref.pool.ioctx().notify_ack(ref.obj.oid, notify_id, cookie, reply); |
b32b8144 FG |
333 | } |
334 | ||
335 | /// reestablish the watch if it gets disconnected | |
336 | void handle_error(uint64_t cookie, int err) override { | |
337 | if (cookie != handle) { | |
338 | return; | |
339 | } | |
340 | if (err == -ENOTCONN) { | |
11fdf7f2 | 341 | ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl; |
b32b8144 FG |
342 | restart(); |
343 | } | |
344 | } | |
345 | }; | |
346 | ||
347 | ||
348 | /// Interface to communicate with the trim manager about completed operations | |
349 | struct BucketTrimObserver { | |
350 | virtual ~BucketTrimObserver() = default; | |
351 | ||
352 | virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0; | |
f67539c2 | 353 | virtual bool trimmed_recently(const std::string_view& bucket_instance) = 0; |
b32b8144 FG |
354 | }; |
355 | ||
356 | /// populate the status with the minimum stable marker of each shard | |
357 | template <typename Iter> | |
358 | int take_min_status(CephContext *cct, Iter first, Iter last, | |
359 | std::vector<std::string> *status) | |
360 | { | |
b32b8144 | 361 | for (auto peer = first; peer != last; ++peer) { |
eafe8130 | 362 | if (peer->size() != status->size()) { |
b32b8144 FG |
363 | // all peers must agree on the number of shards |
364 | return -EINVAL; | |
365 | } | |
366 | auto m = status->begin(); | |
367 | for (auto& shard : *peer) { | |
368 | auto& marker = *m++; | |
9f95a23c TL |
369 | // if no sync has started, we can safely trim everything |
370 | if (shard.state == rgw_bucket_shard_sync_info::StateInit) { | |
b32b8144 FG |
371 | continue; |
372 | } | |
373 | // always take the first marker, or any later marker that's smaller | |
374 | if (peer == first || marker > shard.inc_marker.position) { | |
375 | marker = std::move(shard.inc_marker.position); | |
376 | } | |
377 | } | |
378 | } | |
379 | return 0; | |
380 | } | |
381 | ||
382 | /// trim each bilog shard to the given marker, while limiting the number of | |
383 | /// concurrent requests | |
384 | class BucketTrimShardCollectCR : public RGWShardCollectCR { | |
385 | static constexpr int MAX_CONCURRENT_SHARDS = 16; | |
b3b6e05e | 386 | const DoutPrefixProvider *dpp; |
20effc67 | 387 | rgw::sal::RadosStore* const store; |
b32b8144 FG |
388 | const RGWBucketInfo& bucket_info; |
389 | const std::vector<std::string>& markers; //< shard markers to trim | |
390 | size_t i{0}; //< index of current shard marker | |
391 | public: | |
b3b6e05e | 392 | BucketTrimShardCollectCR(const DoutPrefixProvider *dpp, |
20effc67 | 393 | rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info, |
b32b8144 FG |
394 | const std::vector<std::string>& markers) |
395 | : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS), | |
b3b6e05e | 396 | dpp(dpp), store(store), bucket_info(bucket_info), markers(markers) |
b32b8144 FG |
397 | {} |
398 | bool spawn_next() override; | |
399 | }; | |
400 | ||
401 | bool BucketTrimShardCollectCR::spawn_next() | |
402 | { | |
403 | while (i < markers.size()) { | |
404 | const auto& marker = markers[i]; | |
405 | const auto shard_id = i++; | |
406 | ||
407 | // skip empty markers | |
408 | if (!marker.empty()) { | |
b3b6e05e | 409 | ldpp_dout(dpp, 10) << "trimming bilog shard " << shard_id |
b32b8144 | 410 | << " of " << bucket_info.bucket << " at marker " << marker << dendl; |
b3b6e05e | 411 | spawn(new RGWRadosBILogTrimCR(dpp, store, bucket_info, shard_id, |
b32b8144 FG |
412 | std::string{}, marker), |
413 | false); | |
414 | return true; | |
415 | } | |
416 | } | |
417 | return false; | |
418 | } | |
419 | ||
420 | /// trim the bilog of all of the given bucket instance's shards | |
421 | class BucketTrimInstanceCR : public RGWCoroutine { | |
20effc67 | 422 | rgw::sal::RadosStore* const store; |
b32b8144 FG |
423 | RGWHTTPManager *const http; |
424 | BucketTrimObserver *const observer; | |
425 | std::string bucket_instance; | |
9f95a23c TL |
426 | rgw_bucket_get_sync_policy_params get_policy_params; |
427 | std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy; | |
428 | rgw_bucket bucket; | |
b32b8144 | 429 | const std::string& zone_id; //< my zone id |
9f95a23c TL |
430 | RGWBucketInfo _bucket_info; |
431 | const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices | |
11fdf7f2 | 432 | int child_ret = 0; |
b3b6e05e | 433 | const DoutPrefixProvider *dpp; |
b32b8144 FG |
434 | |
435 | using StatusShards = std::vector<rgw_bucket_shard_sync_info>; | |
436 | std::vector<StatusShards> peer_status; //< sync status for each peer | |
437 | std::vector<std::string> min_markers; //< min marker per shard | |
438 | ||
439 | public: | |
20effc67 | 440 | BucketTrimInstanceCR(rgw::sal::RadosStore* store, RGWHTTPManager *http, |
b32b8144 | 441 | BucketTrimObserver *observer, |
b3b6e05e TL |
442 | const std::string& bucket_instance, |
443 | const DoutPrefixProvider *dpp) | |
b32b8144 FG |
444 | : RGWCoroutine(store->ctx()), store(store), |
445 | http(http), observer(observer), | |
446 | bucket_instance(bucket_instance), | |
b3b6e05e TL |
447 | zone_id(store->svc()->zone->get_zone().id), |
448 | dpp(dpp) { | |
9f95a23c TL |
449 | rgw_bucket_parse_bucket_key(cct, bucket_instance, &bucket, nullptr); |
450 | source_policy = make_shared<rgw_bucket_get_sync_policy_result>(); | |
451 | } | |
b32b8144 | 452 | |
b3b6e05e | 453 | int operate(const DoutPrefixProvider *dpp) override; |
b32b8144 FG |
454 | }; |
455 | ||
b3b6e05e | 456 | int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp) |
b32b8144 FG |
457 | { |
458 | reenter(this) { | |
b3b6e05e | 459 | ldpp_dout(dpp, 4) << "starting trim on bucket=" << bucket_instance << dendl; |
b32b8144 | 460 | |
9f95a23c TL |
461 | get_policy_params.zone = zone_id; |
462 | get_policy_params.bucket = bucket; | |
463 | yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(), | |
464 | store, | |
465 | get_policy_params, | |
b3b6e05e TL |
466 | source_policy, |
467 | dpp)); | |
9f95a23c TL |
468 | if (retcode < 0) { |
469 | if (retcode != -ENOENT) { | |
b3b6e05e | 470 | ldpp_dout(dpp, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket << dendl; |
9f95a23c TL |
471 | } |
472 | ||
473 | return set_cr_error(retcode); | |
474 | } | |
475 | ||
476 | if (auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info(); | |
477 | opt_bucket_info) { | |
478 | pbucket_info = &(*opt_bucket_info); | |
479 | } else { | |
480 | /* this shouldn't really happen */ | |
481 | return set_cr_error(-ENOENT); | |
482 | } | |
483 | ||
b32b8144 | 484 | // query peers for sync status |
9f95a23c | 485 | set_status("fetching sync status from relevant peers"); |
b32b8144 | 486 | yield { |
9f95a23c TL |
487 | const auto& all_dests = source_policy->policy_handler->get_all_dests(); |
488 | ||
489 | vector<rgw_zone_id> zids; | |
490 | rgw_zone_id last_zid; | |
491 | for (auto& diter : all_dests) { | |
492 | const auto& zid = diter.first; | |
493 | if (zid == last_zid) { | |
494 | continue; | |
495 | } | |
496 | last_zid = zid; | |
497 | zids.push_back(zid); | |
498 | } | |
499 | ||
500 | peer_status.resize(zids.size()); | |
501 | ||
502 | auto& zone_conn_map = store->svc()->zone->get_zone_conn_map(); | |
b32b8144 FG |
503 | |
504 | auto p = peer_status.begin(); | |
9f95a23c TL |
505 | for (auto& zid : zids) { |
506 | // query data sync status from each sync peer | |
507 | rgw_http_param_pair params[] = { | |
508 | { "type", "bucket-index" }, | |
509 | { "status", nullptr }, | |
510 | { "options", "merge" }, | |
511 | { "bucket", bucket_instance.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket | |
512 | param is not provided */ | |
513 | { "source-zone", zone_id.c_str() }, | |
514 | { nullptr, nullptr } | |
515 | }; | |
516 | ||
517 | auto ziter = zone_conn_map.find(zid); | |
518 | if (ziter == zone_conn_map.end()) { | |
b3b6e05e | 519 | ldpp_dout(dpp, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl; |
9f95a23c TL |
520 | return set_cr_error(-ECANCELED); |
521 | } | |
b32b8144 | 522 | using StatusCR = RGWReadRESTResourceCR<StatusShards>; |
9f95a23c | 523 | spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p), |
b32b8144 FG |
524 | false); |
525 | ++p; | |
526 | } | |
b32b8144 FG |
527 | } |
528 | // wait for a response from each peer. all must respond to attempt trim | |
529 | while (num_spawned()) { | |
b32b8144 FG |
530 | yield wait_for_child(); |
531 | collect(&child_ret, nullptr); | |
532 | if (child_ret < 0) { | |
533 | drain_all(); | |
534 | return set_cr_error(child_ret); | |
535 | } | |
536 | } | |
537 | ||
eafe8130 TL |
538 | // initialize each shard with the maximum marker, which is only used when |
539 | // there are no peers syncing from us | |
f67539c2 | 540 | min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards), |
eafe8130 TL |
541 | RGWSyncLogTrimCR::max_marker); |
542 | ||
b32b8144 FG |
543 | // determine the minimum marker for each shard |
544 | retcode = take_min_status(cct, peer_status.begin(), peer_status.end(), | |
545 | &min_markers); | |
546 | if (retcode < 0) { | |
b3b6e05e | 547 | ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl; |
b32b8144 FG |
548 | return set_cr_error(retcode); |
549 | } | |
550 | ||
551 | // trim shards with a ShardCollectCR | |
b3b6e05e | 552 | ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket |
b32b8144 FG |
553 | << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; |
554 | set_status("trimming bilog shards"); | |
b3b6e05e | 555 | yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, min_markers)); |
b32b8144 FG |
556 | // ENODATA just means there were no keys to trim |
557 | if (retcode == -ENODATA) { | |
558 | retcode = 0; | |
559 | } | |
560 | if (retcode < 0) { | |
b3b6e05e | 561 | ldpp_dout(dpp, 4) << "failed to trim bilog shards: " |
b32b8144 FG |
562 | << cpp_strerror(retcode) << dendl; |
563 | return set_cr_error(retcode); | |
564 | } | |
565 | ||
566 | observer->on_bucket_trimmed(std::move(bucket_instance)); | |
567 | return set_cr_done(); | |
568 | } | |
569 | return 0; | |
570 | } | |
571 | ||
572 | /// trim each bucket instance while limiting the number of concurrent operations | |
573 | class BucketTrimInstanceCollectCR : public RGWShardCollectCR { | |
20effc67 | 574 | rgw::sal::RadosStore* const store; |
b32b8144 FG |
575 | RGWHTTPManager *const http; |
576 | BucketTrimObserver *const observer; | |
577 | std::vector<std::string>::const_iterator bucket; | |
578 | std::vector<std::string>::const_iterator end; | |
b3b6e05e | 579 | const DoutPrefixProvider *dpp; |
b32b8144 | 580 | public: |
20effc67 | 581 | BucketTrimInstanceCollectCR(rgw::sal::RadosStore* store, RGWHTTPManager *http, |
b32b8144 FG |
582 | BucketTrimObserver *observer, |
583 | const std::vector<std::string>& buckets, | |
b3b6e05e TL |
584 | int max_concurrent, |
585 | const DoutPrefixProvider *dpp) | |
b32b8144 FG |
586 | : RGWShardCollectCR(store->ctx(), max_concurrent), |
587 | store(store), http(http), observer(observer), | |
b3b6e05e TL |
588 | bucket(buckets.begin()), end(buckets.end()), |
589 | dpp(dpp) | |
b32b8144 FG |
590 | {} |
591 | bool spawn_next() override; | |
592 | }; | |
593 | ||
594 | bool BucketTrimInstanceCollectCR::spawn_next() | |
595 | { | |
596 | if (bucket == end) { | |
597 | return false; | |
598 | } | |
b3b6e05e | 599 | spawn(new BucketTrimInstanceCR(store, http, observer, *bucket, dpp), false); |
b32b8144 FG |
600 | ++bucket; |
601 | return true; | |
602 | } | |
603 | ||
604 | /// correlate the replies from each peer gateway into the given counter | |
605 | int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter) | |
606 | { | |
607 | counter.clear(); | |
608 | ||
609 | try { | |
610 | // decode notify responses | |
11fdf7f2 | 611 | auto p = bl.cbegin(); |
b32b8144 FG |
612 | std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies; |
613 | std::set<std::pair<uint64_t, uint64_t>> timeouts; | |
11fdf7f2 TL |
614 | decode(replies, p); |
615 | decode(timeouts, p); | |
b32b8144 FG |
616 | |
617 | for (auto& peer : replies) { | |
11fdf7f2 | 618 | auto q = peer.second.cbegin(); |
b32b8144 | 619 | TrimCounters::Response response; |
11fdf7f2 | 620 | decode(response, q); |
b32b8144 FG |
621 | for (const auto& b : response.bucket_counters) { |
622 | counter.insert(b.bucket, b.count); | |
623 | } | |
624 | } | |
625 | } catch (const buffer::error& e) { | |
626 | return -EIO; | |
627 | } | |
628 | return 0; | |
629 | } | |
630 | ||
631 | /// metadata callback has the signature bool(string&& key, string&& marker) | |
632 | using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>; | |
633 | ||
634 | /// lists metadata keys, passing each to a callback until it returns false. | |
635 | /// on reaching the end, it will restart at the beginning and list up to the | |
636 | /// initial marker | |
637 | class AsyncMetadataList : public RGWAsyncRadosRequest { | |
638 | CephContext *const cct; | |
639 | RGWMetadataManager *const mgr; | |
640 | const std::string section; | |
641 | const std::string start_marker; | |
642 | MetadataListCallback callback; | |
b32b8144 | 643 | |
b3b6e05e | 644 | int _send_request(const DoutPrefixProvider *dpp) override; |
b32b8144 FG |
645 | public: |
646 | AsyncMetadataList(CephContext *cct, RGWCoroutine *caller, | |
647 | RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr, | |
648 | const std::string& section, const std::string& start_marker, | |
649 | const MetadataListCallback& callback) | |
650 | : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr), | |
651 | section(section), start_marker(start_marker), callback(callback) | |
652 | {} | |
b32b8144 FG |
653 | }; |
654 | ||
b3b6e05e | 655 | int AsyncMetadataList::_send_request(const DoutPrefixProvider *dpp) |
b32b8144 | 656 | { |
f64942e4 AA |
657 | void* handle = nullptr; |
658 | std::list<std::string> keys; | |
659 | bool truncated{false}; | |
660 | std::string marker; | |
661 | ||
b32b8144 | 662 | // start a listing at the given marker |
b3b6e05e | 663 | int r = mgr->list_keys_init(dpp, section, start_marker, &handle); |
f64942e4 AA |
664 | if (r == -EINVAL) { |
665 | // restart with empty marker below | |
666 | } else if (r < 0) { | |
b3b6e05e | 667 | ldpp_dout(dpp, 10) << "failed to init metadata listing: " |
b32b8144 FG |
668 | << cpp_strerror(r) << dendl; |
669 | return r; | |
f64942e4 | 670 | } else { |
b3b6e05e | 671 | ldpp_dout(dpp, 20) << "starting metadata listing at " << start_marker << dendl; |
f64942e4 AA |
672 | |
673 | // release the handle when scope exits | |
674 | auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); | |
675 | ||
676 | do { | |
677 | // get the next key and marker | |
20effc67 | 678 | r = mgr->list_keys_next(dpp, handle, 1, keys, &truncated); |
f64942e4 | 679 | if (r < 0) { |
b3b6e05e | 680 | ldpp_dout(dpp, 10) << "failed to list metadata: " |
f64942e4 AA |
681 | << cpp_strerror(r) << dendl; |
682 | return r; | |
b32b8144 | 683 | } |
f64942e4 AA |
684 | marker = mgr->get_marker(handle); |
685 | ||
686 | if (!keys.empty()) { | |
687 | ceph_assert(keys.size() == 1); | |
688 | auto& key = keys.front(); | |
689 | if (!callback(std::move(key), std::move(marker))) { | |
690 | return 0; | |
691 | } | |
692 | } | |
693 | } while (truncated); | |
b32b8144 | 694 | |
f64942e4 AA |
695 | if (start_marker.empty()) { |
696 | // already listed all keys | |
697 | return 0; | |
698 | } | |
b32b8144 FG |
699 | } |
700 | ||
701 | // restart the listing from the beginning (empty marker) | |
b32b8144 FG |
702 | handle = nullptr; |
703 | ||
b3b6e05e | 704 | r = mgr->list_keys_init(dpp, section, "", &handle); |
b32b8144 | 705 | if (r < 0) { |
b3b6e05e | 706 | ldpp_dout(dpp, 10) << "failed to restart metadata listing: " |
b32b8144 FG |
707 | << cpp_strerror(r) << dendl; |
708 | return r; | |
709 | } | |
b3b6e05e | 710 | ldpp_dout(dpp, 20) << "restarting metadata listing" << dendl; |
b32b8144 | 711 | |
f64942e4 AA |
712 | // release the handle when scope exits |
713 | auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); | |
b32b8144 FG |
714 | do { |
715 | // get the next key and marker | |
20effc67 | 716 | r = mgr->list_keys_next(dpp, handle, 1, keys, &truncated); |
b32b8144 | 717 | if (r < 0) { |
b3b6e05e | 718 | ldpp_dout(dpp, 10) << "failed to list metadata: " |
b32b8144 FG |
719 | << cpp_strerror(r) << dendl; |
720 | return r; | |
721 | } | |
722 | marker = mgr->get_marker(handle); | |
723 | ||
724 | if (!keys.empty()) { | |
11fdf7f2 | 725 | ceph_assert(keys.size() == 1); |
b32b8144 FG |
726 | auto& key = keys.front(); |
727 | // stop at original marker | |
eafe8130 | 728 | if (marker > start_marker) { |
b32b8144 FG |
729 | return 0; |
730 | } | |
731 | if (!callback(std::move(key), std::move(marker))) { | |
732 | return 0; | |
733 | } | |
734 | } | |
735 | } while (truncated); | |
736 | ||
737 | return 0; | |
738 | } | |
739 | ||
740 | /// coroutine wrapper for AsyncMetadataList | |
741 | class MetadataListCR : public RGWSimpleCoroutine { | |
742 | RGWAsyncRadosProcessor *const async_rados; | |
743 | RGWMetadataManager *const mgr; | |
744 | const std::string& section; | |
745 | const std::string& start_marker; | |
746 | MetadataListCallback callback; | |
747 | RGWAsyncRadosRequest *req{nullptr}; | |
748 | public: | |
749 | MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados, | |
750 | RGWMetadataManager *mgr, const std::string& section, | |
751 | const std::string& start_marker, | |
752 | const MetadataListCallback& callback) | |
753 | : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr), | |
754 | section(section), start_marker(start_marker), callback(callback) | |
755 | {} | |
756 | ~MetadataListCR() override { | |
757 | request_cleanup(); | |
758 | } | |
759 | ||
b3b6e05e | 760 | int send_request(const DoutPrefixProvider *dpp) override { |
b32b8144 FG |
761 | req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(), |
762 | mgr, section, start_marker, callback); | |
763 | async_rados->queue(req); | |
764 | return 0; | |
765 | } | |
766 | int request_complete() override { | |
767 | return req->get_ret_status(); | |
768 | } | |
769 | void request_cleanup() override { | |
770 | if (req) { | |
771 | req->finish(); | |
772 | req = nullptr; | |
773 | } | |
774 | } | |
775 | }; | |
776 | ||
777 | class BucketTrimCR : public RGWCoroutine { | |
20effc67 | 778 | rgw::sal::RadosStore* const store; |
b32b8144 FG |
779 | RGWHTTPManager *const http; |
780 | const BucketTrimConfig& config; | |
781 | BucketTrimObserver *const observer; | |
782 | const rgw_raw_obj& obj; | |
783 | ceph::mono_time start_time; | |
784 | bufferlist notify_replies; | |
785 | BucketChangeCounter counter; | |
786 | std::vector<std::string> buckets; //< buckets selected for trim | |
787 | BucketTrimStatus status; | |
788 | RGWObjVersionTracker objv; //< version tracker for trim status object | |
789 | std::string last_cold_marker; //< position for next trim marker | |
b3b6e05e | 790 | const DoutPrefixProvider *dpp; |
b32b8144 FG |
791 | |
792 | static const std::string section; //< metadata section for bucket instances | |
793 | public: | |
20effc67 | 794 | BucketTrimCR(rgw::sal::RadosStore* store, RGWHTTPManager *http, |
b32b8144 | 795 | const BucketTrimConfig& config, BucketTrimObserver *observer, |
b3b6e05e | 796 | const rgw_raw_obj& obj, const DoutPrefixProvider *dpp) |
b32b8144 | 797 | : RGWCoroutine(store->ctx()), store(store), http(http), config(config), |
b3b6e05e | 798 | observer(observer), obj(obj), counter(config.counter_size), dpp(dpp) |
b32b8144 FG |
799 | {} |
800 | ||
b3b6e05e | 801 | int operate(const DoutPrefixProvider *dpp) override; |
b32b8144 FG |
802 | }; |
803 | ||
804 | const std::string BucketTrimCR::section{"bucket.instance"}; | |
805 | ||
b3b6e05e | 806 | int BucketTrimCR::operate(const DoutPrefixProvider *dpp) |
b32b8144 FG |
807 | { |
808 | reenter(this) { | |
809 | start_time = ceph::mono_clock::now(); | |
810 | ||
811 | if (config.buckets_per_interval) { | |
812 | // query watch/notify for hot buckets | |
b3b6e05e | 813 | ldpp_dout(dpp, 10) << "fetching active bucket counters" << dendl; |
b32b8144 FG |
814 | set_status("fetching active bucket counters"); |
815 | yield { | |
816 | // request the top bucket counters from each peer gateway | |
817 | const TrimNotifyType type = NotifyTrimCounters; | |
818 | TrimCounters::Request request{32}; | |
819 | bufferlist bl; | |
11fdf7f2 TL |
820 | encode(type, bl); |
821 | encode(request, bl); | |
b32b8144 FG |
822 | call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, |
823 | ¬ify_replies)); | |
824 | } | |
825 | if (retcode < 0) { | |
b3b6e05e | 826 | ldpp_dout(dpp, 10) << "failed to fetch peer bucket counters" << dendl; |
b32b8144 FG |
827 | return set_cr_error(retcode); |
828 | } | |
829 | ||
830 | // select the hottest buckets for trim | |
831 | retcode = accumulate_peer_counters(notify_replies, counter); | |
832 | if (retcode < 0) { | |
833 | ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl; | |
834 | return set_cr_error(retcode); | |
835 | } | |
836 | buckets.reserve(config.buckets_per_interval); | |
837 | ||
838 | const int max_count = config.buckets_per_interval - | |
839 | config.min_cold_buckets_per_interval; | |
840 | counter.get_highest(max_count, | |
841 | [this] (const std::string& bucket, int count) { | |
842 | buckets.push_back(bucket); | |
843 | }); | |
844 | } | |
845 | ||
846 | if (buckets.size() < config.buckets_per_interval) { | |
847 | // read BucketTrimStatus for marker position | |
848 | set_status("reading trim status"); | |
849 | using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>; | |
b3b6e05e | 850 | yield call(new ReadStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj, |
b32b8144 FG |
851 | &status, true, &objv)); |
852 | if (retcode < 0) { | |
b3b6e05e | 853 | ldpp_dout(dpp, 10) << "failed to read bilog trim status: " |
b32b8144 FG |
854 | << cpp_strerror(retcode) << dendl; |
855 | return set_cr_error(retcode); | |
856 | } | |
857 | if (status.marker == "MAX") { | |
858 | status.marker.clear(); // restart at the beginning | |
859 | } | |
b3b6e05e | 860 | ldpp_dout(dpp, 10) << "listing cold buckets from marker=" |
b32b8144 FG |
861 | << status.marker << dendl; |
862 | ||
863 | set_status("listing cold buckets for trim"); | |
864 | yield { | |
865 | // capture a reference so 'this' remains valid in the callback | |
866 | auto ref = boost::intrusive_ptr<RGWCoroutine>{this}; | |
867 | // list cold buckets to consider for trim | |
868 | auto cb = [this, ref] (std::string&& bucket, std::string&& marker) { | |
869 | // filter out keys that we trimmed recently | |
870 | if (observer->trimmed_recently(bucket)) { | |
871 | return true; | |
872 | } | |
873 | // filter out active buckets that we've already selected | |
874 | auto i = std::find(buckets.begin(), buckets.end(), bucket); | |
875 | if (i != buckets.end()) { | |
876 | return true; | |
877 | } | |
878 | buckets.emplace_back(std::move(bucket)); | |
879 | // remember the last cold bucket spawned to update the status marker | |
880 | last_cold_marker = std::move(marker); | |
881 | // return true if there's room for more | |
882 | return buckets.size() < config.buckets_per_interval; | |
883 | }; | |
884 | ||
9f95a23c TL |
885 | call(new MetadataListCR(cct, store->svc()->rados->get_async_processor(), |
886 | store->ctl()->meta.mgr, | |
b32b8144 FG |
887 | section, status.marker, cb)); |
888 | } | |
889 | if (retcode < 0) { | |
890 | ldout(cct, 4) << "failed to list bucket instance metadata: " | |
891 | << cpp_strerror(retcode) << dendl; | |
892 | return set_cr_error(retcode); | |
893 | } | |
894 | } | |
895 | ||
896 | // trim bucket instances with limited concurrency | |
897 | set_status("trimming buckets"); | |
b3b6e05e | 898 | ldpp_dout(dpp, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; |
b32b8144 | 899 | yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets, |
b3b6e05e | 900 | config.concurrent_buckets, dpp)); |
b32b8144 FG |
901 | // ignore errors from individual buckets |
902 | ||
903 | // write updated trim status | |
904 | if (!last_cold_marker.empty() && status.marker != last_cold_marker) { | |
905 | set_status("writing updated trim status"); | |
906 | status.marker = std::move(last_cold_marker); | |
b3b6e05e | 907 | ldpp_dout(dpp, 20) << "writing bucket trim marker=" << status.marker << dendl; |
b32b8144 | 908 | using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>; |
b3b6e05e | 909 | yield call(new WriteStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj, |
b32b8144 FG |
910 | status, &objv)); |
911 | if (retcode < 0) { | |
b3b6e05e | 912 | ldpp_dout(dpp, 4) << "failed to write updated trim status: " |
b32b8144 FG |
913 | << cpp_strerror(retcode) << dendl; |
914 | return set_cr_error(retcode); | |
915 | } | |
916 | } | |
917 | ||
918 | // notify peers that trim completed | |
919 | set_status("trim completed"); | |
920 | yield { | |
921 | const TrimNotifyType type = NotifyTrimComplete; | |
922 | TrimComplete::Request request; | |
923 | bufferlist bl; | |
11fdf7f2 TL |
924 | encode(type, bl); |
925 | encode(request, bl); | |
b32b8144 FG |
926 | call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, |
927 | nullptr)); | |
928 | } | |
929 | if (retcode < 0) { | |
930 | ldout(cct, 10) << "failed to notify peers of trim completion" << dendl; | |
931 | return set_cr_error(retcode); | |
932 | } | |
933 | ||
b3b6e05e | 934 | ldpp_dout(dpp, 4) << "bucket index log processing completed in " |
b32b8144 FG |
935 | << ceph::mono_clock::now() - start_time << dendl; |
936 | return set_cr_done(); | |
937 | } | |
938 | return 0; | |
939 | } | |
940 | ||
941 | class BucketTrimPollCR : public RGWCoroutine { | |
20effc67 | 942 | rgw::sal::RadosStore* const store; |
b32b8144 FG |
943 | RGWHTTPManager *const http; |
944 | const BucketTrimConfig& config; | |
945 | BucketTrimObserver *const observer; | |
946 | const rgw_raw_obj& obj; | |
947 | const std::string name{"trim"}; //< lock name | |
948 | const std::string cookie; | |
b3b6e05e | 949 | const DoutPrefixProvider *dpp; |
b32b8144 FG |
950 | |
951 | public: | |
20effc67 | 952 | BucketTrimPollCR(rgw::sal::RadosStore* store, RGWHTTPManager *http, |
b32b8144 | 953 | const BucketTrimConfig& config, |
b3b6e05e TL |
954 | BucketTrimObserver *observer, const rgw_raw_obj& obj, |
955 | const DoutPrefixProvider *dpp) | |
b32b8144 FG |
956 | : RGWCoroutine(store->ctx()), store(store), http(http), |
957 | config(config), observer(observer), obj(obj), | |
b3b6e05e TL |
958 | cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), |
959 | dpp(dpp) {} | |
b32b8144 | 960 | |
b3b6e05e | 961 | int operate(const DoutPrefixProvider *dpp) override; |
b32b8144 FG |
962 | }; |
963 | ||
b3b6e05e | 964 | int BucketTrimPollCR::operate(const DoutPrefixProvider *dpp) |
b32b8144 FG |
965 | { |
966 | reenter(this) { | |
967 | for (;;) { | |
968 | set_status("sleeping"); | |
11fdf7f2 | 969 | wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0}); |
b32b8144 FG |
970 | |
971 | // prevent others from trimming for our entire wait interval | |
972 | set_status("acquiring trim lock"); | |
9f95a23c | 973 | yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store, |
b32b8144 FG |
974 | obj, name, cookie, |
975 | config.trim_interval_sec)); | |
976 | if (retcode < 0) { | |
977 | ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; | |
978 | continue; | |
979 | } | |
980 | ||
981 | set_status("trimming"); | |
b3b6e05e | 982 | yield call(new BucketTrimCR(store, http, config, observer, obj, dpp)); |
b32b8144 FG |
983 | if (retcode < 0) { |
984 | // on errors, unlock so other gateways can try | |
985 | set_status("unlocking"); | |
9f95a23c | 986 | yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store, |
b32b8144 FG |
987 | obj, name, cookie)); |
988 | } | |
989 | } | |
990 | } | |
991 | return 0; | |
992 | } | |
993 | ||
994 | /// tracks a bounded list of events with timestamps. old events can be expired, | |
995 | /// and recent events can be searched by key. expiration depends on events being | |
996 | /// inserted in temporal order | |
997 | template <typename T, typename Clock = ceph::coarse_mono_clock> | |
998 | class RecentEventList { | |
999 | public: | |
1000 | using clock_type = Clock; | |
1001 | using time_point = typename clock_type::time_point; | |
1002 | ||
1003 | RecentEventList(size_t max_size, const ceph::timespan& max_duration) | |
1004 | : events(max_size), max_duration(max_duration) | |
1005 | {} | |
1006 | ||
1007 | /// insert an event at the given point in time. this time must be at least as | |
1008 | /// recent as the last inserted event | |
1009 | void insert(T&& value, const time_point& now) { | |
11fdf7f2 | 1010 | // ceph_assert(events.empty() || now >= events.back().time) |
b32b8144 FG |
1011 | events.push_back(Event{std::move(value), now}); |
1012 | } | |
1013 | ||
1014 | /// performs a linear search for an event matching the given key, whose type | |
1015 | /// U can be any that provides operator==(U, T) | |
1016 | template <typename U> | |
1017 | bool lookup(const U& key) const { | |
1018 | for (const auto& event : events) { | |
1019 | if (key == event.value) { | |
1020 | return true; | |
1021 | } | |
1022 | } | |
1023 | return false; | |
1024 | } | |
1025 | ||
1026 | /// remove events that are no longer recent compared to the given point in time | |
1027 | void expire_old(const time_point& now) { | |
1028 | const auto expired_before = now - max_duration; | |
1029 | while (!events.empty() && events.front().time < expired_before) { | |
1030 | events.pop_front(); | |
1031 | } | |
1032 | } | |
1033 | ||
1034 | private: | |
1035 | struct Event { | |
1036 | T value; | |
1037 | time_point time; | |
1038 | }; | |
1039 | boost::circular_buffer<Event> events; | |
1040 | const ceph::timespan max_duration; | |
1041 | }; | |
1042 | ||
1043 | namespace rgw { | |
1044 | ||
1045 | // read bucket trim configuration from ceph context | |
1046 | void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config) | |
1047 | { | |
11fdf7f2 | 1048 | const auto& conf = cct->_conf; |
b32b8144 FG |
1049 | |
1050 | config.trim_interval_sec = | |
11fdf7f2 | 1051 | conf.get_val<int64_t>("rgw_sync_log_trim_interval"); |
b32b8144 FG |
1052 | config.counter_size = 512; |
1053 | config.buckets_per_interval = | |
11fdf7f2 | 1054 | conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets"); |
b32b8144 | 1055 | config.min_cold_buckets_per_interval = |
11fdf7f2 | 1056 | conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets"); |
b32b8144 | 1057 | config.concurrent_buckets = |
11fdf7f2 | 1058 | conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets"); |
b32b8144 FG |
1059 | config.notify_timeout_ms = 10000; |
1060 | config.recent_size = 128; | |
1061 | config.recent_duration = std::chrono::hours(2); | |
1062 | } | |
1063 | ||
1064 | class BucketTrimManager::Impl : public TrimCounters::Server, | |
1065 | public BucketTrimObserver { | |
1066 | public: | |
20effc67 | 1067 | rgw::sal::RadosStore* const store; |
b32b8144 FG |
1068 | const BucketTrimConfig config; |
1069 | ||
1070 | const rgw_raw_obj status_obj; | |
1071 | ||
1072 | /// count frequency of bucket instance entries in the data changes log | |
1073 | BucketChangeCounter counter; | |
1074 | ||
1075 | using RecentlyTrimmedBucketList = RecentEventList<std::string>; | |
1076 | using clock_type = RecentlyTrimmedBucketList::clock_type; | |
1077 | /// track recently trimmed buckets to focus trim activity elsewhere | |
1078 | RecentlyTrimmedBucketList trimmed; | |
1079 | ||
1080 | /// serve the bucket trim watch/notify api | |
1081 | BucketTrimWatcher watcher; | |
1082 | ||
1083 | /// protect data shared between data sync, trim, and watch/notify threads | |
1084 | std::mutex mutex; | |
1085 | ||
20effc67 | 1086 | Impl(rgw::sal::RadosStore* store, const BucketTrimConfig& config) |
b32b8144 | 1087 | : store(store), config(config), |
20effc67 | 1088 | status_obj(store->get_zone()->get_params().log_pool, BucketTrimStatus::oid), |
b32b8144 FG |
1089 | counter(config.counter_size), |
1090 | trimmed(config.recent_size, config.recent_duration), | |
1091 | watcher(store, status_obj, this) | |
1092 | {} | |
1093 | ||
1094 | /// TrimCounters::Server interface for watch/notify api | |
1095 | void get_bucket_counters(int count, TrimCounters::Vector& buckets) { | |
1096 | buckets.reserve(count); | |
1097 | std::lock_guard<std::mutex> lock(mutex); | |
1098 | counter.get_highest(count, [&buckets] (const std::string& key, int count) { | |
1099 | buckets.emplace_back(key, count); | |
1100 | }); | |
1101 | ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; | |
1102 | } | |
1103 | ||
1104 | void reset_bucket_counters() override { | |
1105 | ldout(store->ctx(), 20) << "bucket trim completed" << dendl; | |
1106 | std::lock_guard<std::mutex> lock(mutex); | |
1107 | counter.clear(); | |
1108 | trimmed.expire_old(clock_type::now()); | |
1109 | } | |
1110 | ||
1111 | /// BucketTrimObserver interface to remember successfully-trimmed buckets | |
1112 | void on_bucket_trimmed(std::string&& bucket_instance) override { | |
1113 | ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl; | |
1114 | std::lock_guard<std::mutex> lock(mutex); | |
1115 | trimmed.insert(std::move(bucket_instance), clock_type::now()); | |
1116 | } | |
1117 | ||
f67539c2 | 1118 | bool trimmed_recently(const std::string_view& bucket_instance) override { |
b32b8144 FG |
1119 | std::lock_guard<std::mutex> lock(mutex); |
1120 | return trimmed.lookup(bucket_instance); | |
1121 | } | |
1122 | }; | |
1123 | ||
20effc67 | 1124 | BucketTrimManager::BucketTrimManager(rgw::sal::RadosStore* store, |
b32b8144 FG |
1125 | const BucketTrimConfig& config) |
1126 | : impl(new Impl(store, config)) | |
1127 | { | |
1128 | } | |
1129 | BucketTrimManager::~BucketTrimManager() = default; | |
1130 | ||
1131 | int BucketTrimManager::init() | |
1132 | { | |
b3b6e05e | 1133 | return impl->watcher.start(this); |
b32b8144 FG |
1134 | } |
1135 | ||
f67539c2 | 1136 | void BucketTrimManager::on_bucket_changed(const std::string_view& bucket) |
b32b8144 FG |
1137 | { |
1138 | std::lock_guard<std::mutex> lock(impl->mutex); | |
1139 | // filter recently trimmed bucket instances out of bucket change counter | |
1140 | if (impl->trimmed.lookup(bucket)) { | |
1141 | return; | |
1142 | } | |
f67539c2 | 1143 | impl->counter.insert(std::string(bucket)); |
b32b8144 FG |
1144 | } |
1145 | ||
1146 | RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http) | |
1147 | { | |
1148 | return new BucketTrimPollCR(impl->store, http, impl->config, | |
b3b6e05e | 1149 | impl.get(), impl->status_obj, this); |
b32b8144 FG |
1150 | } |
1151 | ||
1152 | RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http) | |
1153 | { | |
1154 | // return the trim coroutine without any polling | |
1155 | return new BucketTrimCR(impl->store, http, impl->config, | |
b3b6e05e TL |
1156 | impl.get(), impl->status_obj, this); |
1157 | } | |
1158 | ||
1159 | CephContext* BucketTrimManager::get_cct() const | |
1160 | { | |
1161 | return impl->store->ctx(); | |
1162 | } | |
1163 | ||
1164 | unsigned BucketTrimManager::get_subsys() const | |
1165 | { | |
1166 | return dout_subsys; | |
1167 | } | |
1168 | ||
1169 | std::ostream& BucketTrimManager::gen_prefix(std::ostream& out) const | |
1170 | { | |
1171 | return out << "rgw bucket trim manager: "; | |
b32b8144 FG |
1172 | } |
1173 | ||
1174 | } // namespace rgw |