]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_trim_bilog.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_trim_bilog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2017 Red Hat, Inc
8 *
9 * Author: Casey Bodley <cbodley@redhat.com>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 */
16
17 #include <mutex>
18 #include <boost/circular_buffer.hpp>
19 #include <boost/container/flat_map.hpp>
20
21 #include "include/scope_guard.h"
22 #include "common/bounded_key_counter.h"
23 #include "common/errno.h"
24 #include "rgw_trim_bilog.h"
25 #include "rgw_cr_rados.h"
26 #include "rgw_cr_rest.h"
27 #include "rgw_cr_tools.h"
28 #include "rgw_data_sync.h"
29 #include "rgw_metadata.h"
30 #include "rgw_sal.h"
31 #include "rgw_zone.h"
32 #include "rgw_sync.h"
33 #include "rgw_bucket.h"
34
35 #include "services/svc_zone.h"
36 #include "services/svc_meta.h"
37
38 #include <boost/asio/yield.hpp>
39 #include "include/ceph_assert.h"
40
41 #define dout_subsys ceph_subsys_rgw
42
43 #undef dout_prefix
44 #define dout_prefix (*_dout << "trim: ")
45
46 using rgw::BucketTrimConfig;
47 using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
48
49 const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
50 using rgw::BucketTrimStatus;
51
52
53 // watch/notify api for gateways to coordinate about which buckets to trim
54 enum TrimNotifyType {
55 NotifyTrimCounters = 0,
56 NotifyTrimComplete,
57 };
58 WRITE_RAW_ENCODER(TrimNotifyType);
59
60 struct TrimNotifyHandler {
61 virtual ~TrimNotifyHandler() = default;
62
63 virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
64 };
65
66 /// api to share the bucket trim counters between gateways in the same zone.
67 /// each gateway will process different datalog shards, so the gateway that runs
68 /// the trim process needs to accumulate their counters
69 struct TrimCounters {
70 /// counter for a single bucket
71 struct BucketCounter {
72 std::string bucket; //< bucket instance metadata key
73 int count{0};
74
75 BucketCounter() = default;
76 BucketCounter(const std::string& bucket, int count)
77 : bucket(bucket), count(count) {}
78
79 void encode(bufferlist& bl) const;
80 void decode(bufferlist::const_iterator& p);
81 };
82 using Vector = std::vector<BucketCounter>;
83
84 /// request bucket trim counters from peer gateways
85 struct Request {
86 uint16_t max_buckets; //< maximum number of bucket counters to return
87
88 void encode(bufferlist& bl) const;
89 void decode(bufferlist::const_iterator& p);
90 };
91
92 /// return the current bucket trim counters
93 struct Response {
94 Vector bucket_counters;
95
96 void encode(bufferlist& bl) const;
97 void decode(bufferlist::const_iterator& p);
98 };
99
100 /// server interface to query the hottest buckets
101 struct Server {
102 virtual ~Server() = default;
103
104 virtual void get_bucket_counters(int count, Vector& counters) = 0;
105 virtual void reset_bucket_counters() = 0;
106 };
107
108 /// notify handler
109 class Handler : public TrimNotifyHandler {
110 Server *const server;
111 public:
112 explicit Handler(Server *server) : server(server) {}
113
114 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
115 };
116 };
117 std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
118 {
119 return out << rhs.bucket << ":" << rhs.count;
120 }
121
122 void TrimCounters::BucketCounter::encode(bufferlist& bl) const
123 {
124 using ceph::encode;
125 // no versioning to save space
126 encode(bucket, bl);
127 encode(count, bl);
128 }
129 void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
130 {
131 using ceph::decode;
132 decode(bucket, p);
133 decode(count, p);
134 }
135 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
136
137 void TrimCounters::Request::encode(bufferlist& bl) const
138 {
139 ENCODE_START(1, 1, bl);
140 encode(max_buckets, bl);
141 ENCODE_FINISH(bl);
142 }
143 void TrimCounters::Request::decode(bufferlist::const_iterator& p)
144 {
145 DECODE_START(1, p);
146 decode(max_buckets, p);
147 DECODE_FINISH(p);
148 }
149 WRITE_CLASS_ENCODER(TrimCounters::Request);
150
151 void TrimCounters::Response::encode(bufferlist& bl) const
152 {
153 ENCODE_START(1, 1, bl);
154 encode(bucket_counters, bl);
155 ENCODE_FINISH(bl);
156 }
157 void TrimCounters::Response::decode(bufferlist::const_iterator& p)
158 {
159 DECODE_START(1, p);
160 decode(bucket_counters, p);
161 DECODE_FINISH(p);
162 }
163 WRITE_CLASS_ENCODER(TrimCounters::Response);
164
165 void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
166 bufferlist& output)
167 {
168 Request request;
169 decode(request, input);
170 auto count = std::min<uint16_t>(request.max_buckets, 128);
171
172 Response response;
173 server->get_bucket_counters(count, response.bucket_counters);
174 encode(response, output);
175 }
176
177 /// api to notify peer gateways that trim has completed and their bucket change
178 /// counters can be reset
179 struct TrimComplete {
180 struct Request {
181 void encode(bufferlist& bl) const;
182 void decode(bufferlist::const_iterator& p);
183 };
184 struct Response {
185 void encode(bufferlist& bl) const;
186 void decode(bufferlist::const_iterator& p);
187 };
188
189 /// server interface to reset bucket counters
190 using Server = TrimCounters::Server;
191
192 /// notify handler
193 class Handler : public TrimNotifyHandler {
194 Server *const server;
195 public:
196 explicit Handler(Server *server) : server(server) {}
197
198 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
199 };
200 };
201
202 void TrimComplete::Request::encode(bufferlist& bl) const
203 {
204 ENCODE_START(1, 1, bl);
205 ENCODE_FINISH(bl);
206 }
207 void TrimComplete::Request::decode(bufferlist::const_iterator& p)
208 {
209 DECODE_START(1, p);
210 DECODE_FINISH(p);
211 }
212 WRITE_CLASS_ENCODER(TrimComplete::Request);
213
214 void TrimComplete::Response::encode(bufferlist& bl) const
215 {
216 ENCODE_START(1, 1, bl);
217 ENCODE_FINISH(bl);
218 }
219 void TrimComplete::Response::decode(bufferlist::const_iterator& p)
220 {
221 DECODE_START(1, p);
222 DECODE_FINISH(p);
223 }
224 WRITE_CLASS_ENCODER(TrimComplete::Response);
225
226 void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
227 bufferlist& output)
228 {
229 Request request;
230 decode(request, input);
231
232 server->reset_bucket_counters();
233
234 Response response;
235 encode(response, output);
236 }
237
238
239 /// rados watcher for bucket trim notifications
240 class BucketTrimWatcher : public librados::WatchCtx2 {
241 rgw::sal::RGWRadosStore *const store;
242 const rgw_raw_obj& obj;
243 rgw_rados_ref ref;
244 uint64_t handle{0};
245
246 using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
247 boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
248
249 public:
250 BucketTrimWatcher(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj,
251 TrimCounters::Server *counters)
252 : store(store), obj(obj) {
253 handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
254 handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
255 }
256
257 ~BucketTrimWatcher() {
258 stop();
259 }
260
261 int start() {
262 int r = store->getRados()->get_raw_obj_ref(obj, &ref);
263 if (r < 0) {
264 return r;
265 }
266
267 // register a watch on the realm's control object
268 r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
269 if (r == -ENOENT) {
270 constexpr bool exclusive = true;
271 r = ref.pool.ioctx().create(ref.obj.oid, exclusive);
272 if (r == -EEXIST || r == 0) {
273 r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
274 }
275 }
276 if (r < 0) {
277 lderr(store->ctx()) << "Failed to watch " << ref.obj
278 << " with " << cpp_strerror(-r) << dendl;
279 ref.pool.ioctx().close();
280 return r;
281 }
282
283 ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl;
284 return 0;
285 }
286
287 int restart() {
288 int r = ref.pool.ioctx().unwatch2(handle);
289 if (r < 0) {
290 lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
291 << " with " << cpp_strerror(-r) << dendl;
292 }
293 r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
294 if (r < 0) {
295 lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
296 << " with " << cpp_strerror(-r) << dendl;
297 ref.pool.ioctx().close();
298 }
299 return r;
300 }
301
302 void stop() {
303 if (handle) {
304 ref.pool.ioctx().unwatch2(handle);
305 ref.pool.ioctx().close();
306 }
307 }
308
309 /// respond to bucket trim notifications
310 void handle_notify(uint64_t notify_id, uint64_t cookie,
311 uint64_t notifier_id, bufferlist& bl) override {
312 if (cookie != handle) {
313 return;
314 }
315 bufferlist reply;
316 try {
317 auto p = bl.cbegin();
318 TrimNotifyType type;
319 decode(type, p);
320
321 auto handler = handlers.find(type);
322 if (handler != handlers.end()) {
323 handler->second->handle(p, reply);
324 } else {
325 lderr(store->ctx()) << "no handler for notify type " << type << dendl;
326 }
327 } catch (const buffer::error& e) {
328 lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
329 }
330 ref.pool.ioctx().notify_ack(ref.obj.oid, notify_id, cookie, reply);
331 }
332
333 /// reestablish the watch if it gets disconnected
334 void handle_error(uint64_t cookie, int err) override {
335 if (cookie != handle) {
336 return;
337 }
338 if (err == -ENOTCONN) {
339 ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
340 restart();
341 }
342 }
343 };
344
345
346 /// Interface to communicate with the trim manager about completed operations
347 struct BucketTrimObserver {
348 virtual ~BucketTrimObserver() = default;
349
350 virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
351 virtual bool trimmed_recently(const std::string_view& bucket_instance) = 0;
352 };
353
354 /// populate the status with the minimum stable marker of each shard
355 template <typename Iter>
356 int take_min_status(CephContext *cct, Iter first, Iter last,
357 std::vector<std::string> *status)
358 {
359 for (auto peer = first; peer != last; ++peer) {
360 if (peer->size() != status->size()) {
361 // all peers must agree on the number of shards
362 return -EINVAL;
363 }
364 auto m = status->begin();
365 for (auto& shard : *peer) {
366 auto& marker = *m++;
367 // if no sync has started, we can safely trim everything
368 if (shard.state == rgw_bucket_shard_sync_info::StateInit) {
369 continue;
370 }
371 // always take the first marker, or any later marker that's smaller
372 if (peer == first || marker > shard.inc_marker.position) {
373 marker = std::move(shard.inc_marker.position);
374 }
375 }
376 }
377 return 0;
378 }
379
380 /// trim each bilog shard to the given marker, while limiting the number of
381 /// concurrent requests
382 class BucketTrimShardCollectCR : public RGWShardCollectCR {
383 static constexpr int MAX_CONCURRENT_SHARDS = 16;
384 rgw::sal::RGWRadosStore *const store;
385 const RGWBucketInfo& bucket_info;
386 const std::vector<std::string>& markers; //< shard markers to trim
387 size_t i{0}; //< index of current shard marker
388 public:
389 BucketTrimShardCollectCR(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info,
390 const std::vector<std::string>& markers)
391 : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
392 store(store), bucket_info(bucket_info), markers(markers)
393 {}
394 bool spawn_next() override;
395 };
396
397 bool BucketTrimShardCollectCR::spawn_next()
398 {
399 while (i < markers.size()) {
400 const auto& marker = markers[i];
401 const auto shard_id = i++;
402
403 // skip empty markers
404 if (!marker.empty()) {
405 ldout(cct, 10) << "trimming bilog shard " << shard_id
406 << " of " << bucket_info.bucket << " at marker " << marker << dendl;
407 spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
408 std::string{}, marker),
409 false);
410 return true;
411 }
412 }
413 return false;
414 }
415
416 /// trim the bilog of all of the given bucket instance's shards
417 class BucketTrimInstanceCR : public RGWCoroutine {
418 rgw::sal::RGWRadosStore *const store;
419 RGWHTTPManager *const http;
420 BucketTrimObserver *const observer;
421 std::string bucket_instance;
422 rgw_bucket_get_sync_policy_params get_policy_params;
423 std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
424 rgw_bucket bucket;
425 const std::string& zone_id; //< my zone id
426 RGWBucketInfo _bucket_info;
427 const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices
428 int child_ret = 0;
429
430 using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
431 std::vector<StatusShards> peer_status; //< sync status for each peer
432 std::vector<std::string> min_markers; //< min marker per shard
433
434 public:
435 BucketTrimInstanceCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
436 BucketTrimObserver *observer,
437 const std::string& bucket_instance)
438 : RGWCoroutine(store->ctx()), store(store),
439 http(http), observer(observer),
440 bucket_instance(bucket_instance),
441 zone_id(store->svc()->zone->get_zone().id) {
442 rgw_bucket_parse_bucket_key(cct, bucket_instance, &bucket, nullptr);
443 source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
444 }
445
446 int operate() override;
447 };
448
449 int BucketTrimInstanceCR::operate()
450 {
451 reenter(this) {
452 ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
453
454 get_policy_params.zone = zone_id;
455 get_policy_params.bucket = bucket;
456 yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(),
457 store,
458 get_policy_params,
459 source_policy));
460 if (retcode < 0) {
461 if (retcode != -ENOENT) {
462 ldout(cct, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket << dendl;
463 }
464
465 return set_cr_error(retcode);
466 }
467
468 if (auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info();
469 opt_bucket_info) {
470 pbucket_info = &(*opt_bucket_info);
471 } else {
472 /* this shouldn't really happen */
473 return set_cr_error(-ENOENT);
474 }
475
476 // query peers for sync status
477 set_status("fetching sync status from relevant peers");
478 yield {
479 const auto& all_dests = source_policy->policy_handler->get_all_dests();
480
481 vector<rgw_zone_id> zids;
482 rgw_zone_id last_zid;
483 for (auto& diter : all_dests) {
484 const auto& zid = diter.first;
485 if (zid == last_zid) {
486 continue;
487 }
488 last_zid = zid;
489 zids.push_back(zid);
490 }
491
492 peer_status.resize(zids.size());
493
494 auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
495
496 auto p = peer_status.begin();
497 for (auto& zid : zids) {
498 // query data sync status from each sync peer
499 rgw_http_param_pair params[] = {
500 { "type", "bucket-index" },
501 { "status", nullptr },
502 { "options", "merge" },
503 { "bucket", bucket_instance.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket
504 param is not provided */
505 { "source-zone", zone_id.c_str() },
506 { nullptr, nullptr }
507 };
508
509 auto ziter = zone_conn_map.find(zid);
510 if (ziter == zone_conn_map.end()) {
511 ldout(cct, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;
512 return set_cr_error(-ECANCELED);
513 }
514 using StatusCR = RGWReadRESTResourceCR<StatusShards>;
515 spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p),
516 false);
517 ++p;
518 }
519 }
520 // wait for a response from each peer. all must respond to attempt trim
521 while (num_spawned()) {
522 yield wait_for_child();
523 collect(&child_ret, nullptr);
524 if (child_ret < 0) {
525 drain_all();
526 return set_cr_error(child_ret);
527 }
528 }
529
530 // initialize each shard with the maximum marker, which is only used when
531 // there are no peers syncing from us
532 min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards),
533 RGWSyncLogTrimCR::max_marker);
534
535 // determine the minimum marker for each shard
536 retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
537 &min_markers);
538 if (retcode < 0) {
539 ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
540 return set_cr_error(retcode);
541 }
542
543 // trim shards with a ShardCollectCR
544 ldout(cct, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
545 << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
546 set_status("trimming bilog shards");
547 yield call(new BucketTrimShardCollectCR(store, *pbucket_info, min_markers));
548 // ENODATA just means there were no keys to trim
549 if (retcode == -ENODATA) {
550 retcode = 0;
551 }
552 if (retcode < 0) {
553 ldout(cct, 4) << "failed to trim bilog shards: "
554 << cpp_strerror(retcode) << dendl;
555 return set_cr_error(retcode);
556 }
557
558 observer->on_bucket_trimmed(std::move(bucket_instance));
559 return set_cr_done();
560 }
561 return 0;
562 }
563
564 /// trim each bucket instance while limiting the number of concurrent operations
565 class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
566 rgw::sal::RGWRadosStore *const store;
567 RGWHTTPManager *const http;
568 BucketTrimObserver *const observer;
569 std::vector<std::string>::const_iterator bucket;
570 std::vector<std::string>::const_iterator end;
571 public:
572 BucketTrimInstanceCollectCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
573 BucketTrimObserver *observer,
574 const std::vector<std::string>& buckets,
575 int max_concurrent)
576 : RGWShardCollectCR(store->ctx(), max_concurrent),
577 store(store), http(http), observer(observer),
578 bucket(buckets.begin()), end(buckets.end())
579 {}
580 bool spawn_next() override;
581 };
582
583 bool BucketTrimInstanceCollectCR::spawn_next()
584 {
585 if (bucket == end) {
586 return false;
587 }
588 spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false);
589 ++bucket;
590 return true;
591 }
592
593 /// correlate the replies from each peer gateway into the given counter
594 int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
595 {
596 counter.clear();
597
598 try {
599 // decode notify responses
600 auto p = bl.cbegin();
601 std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
602 std::set<std::pair<uint64_t, uint64_t>> timeouts;
603 decode(replies, p);
604 decode(timeouts, p);
605
606 for (auto& peer : replies) {
607 auto q = peer.second.cbegin();
608 TrimCounters::Response response;
609 decode(response, q);
610 for (const auto& b : response.bucket_counters) {
611 counter.insert(b.bucket, b.count);
612 }
613 }
614 } catch (const buffer::error& e) {
615 return -EIO;
616 }
617 return 0;
618 }
619
620 /// metadata callback has the signature bool(string&& key, string&& marker)
621 using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
622
623 /// lists metadata keys, passing each to a callback until it returns false.
624 /// on reaching the end, it will restart at the beginning and list up to the
625 /// initial marker
626 class AsyncMetadataList : public RGWAsyncRadosRequest {
627 CephContext *const cct;
628 RGWMetadataManager *const mgr;
629 const std::string section;
630 const std::string start_marker;
631 MetadataListCallback callback;
632
633 int _send_request() override;
634 public:
635 AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
636 RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
637 const std::string& section, const std::string& start_marker,
638 const MetadataListCallback& callback)
639 : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
640 section(section), start_marker(start_marker), callback(callback)
641 {}
642 };
643
644 int AsyncMetadataList::_send_request()
645 {
646 void* handle = nullptr;
647 std::list<std::string> keys;
648 bool truncated{false};
649 std::string marker;
650
651 // start a listing at the given marker
652 int r = mgr->list_keys_init(section, start_marker, &handle);
653 if (r == -EINVAL) {
654 // restart with empty marker below
655 } else if (r < 0) {
656 ldout(cct, 10) << "failed to init metadata listing: "
657 << cpp_strerror(r) << dendl;
658 return r;
659 } else {
660 ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
661
662 // release the handle when scope exits
663 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
664
665 do {
666 // get the next key and marker
667 r = mgr->list_keys_next(handle, 1, keys, &truncated);
668 if (r < 0) {
669 ldout(cct, 10) << "failed to list metadata: "
670 << cpp_strerror(r) << dendl;
671 return r;
672 }
673 marker = mgr->get_marker(handle);
674
675 if (!keys.empty()) {
676 ceph_assert(keys.size() == 1);
677 auto& key = keys.front();
678 if (!callback(std::move(key), std::move(marker))) {
679 return 0;
680 }
681 }
682 } while (truncated);
683
684 if (start_marker.empty()) {
685 // already listed all keys
686 return 0;
687 }
688 }
689
690 // restart the listing from the beginning (empty marker)
691 handle = nullptr;
692
693 r = mgr->list_keys_init(section, "", &handle);
694 if (r < 0) {
695 ldout(cct, 10) << "failed to restart metadata listing: "
696 << cpp_strerror(r) << dendl;
697 return r;
698 }
699 ldout(cct, 20) << "restarting metadata listing" << dendl;
700
701 // release the handle when scope exits
702 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
703 do {
704 // get the next key and marker
705 r = mgr->list_keys_next(handle, 1, keys, &truncated);
706 if (r < 0) {
707 ldout(cct, 10) << "failed to list metadata: "
708 << cpp_strerror(r) << dendl;
709 return r;
710 }
711 marker = mgr->get_marker(handle);
712
713 if (!keys.empty()) {
714 ceph_assert(keys.size() == 1);
715 auto& key = keys.front();
716 // stop at original marker
717 if (marker > start_marker) {
718 return 0;
719 }
720 if (!callback(std::move(key), std::move(marker))) {
721 return 0;
722 }
723 }
724 } while (truncated);
725
726 return 0;
727 }
728
729 /// coroutine wrapper for AsyncMetadataList
730 class MetadataListCR : public RGWSimpleCoroutine {
731 RGWAsyncRadosProcessor *const async_rados;
732 RGWMetadataManager *const mgr;
733 const std::string& section;
734 const std::string& start_marker;
735 MetadataListCallback callback;
736 RGWAsyncRadosRequest *req{nullptr};
737 public:
738 MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
739 RGWMetadataManager *mgr, const std::string& section,
740 const std::string& start_marker,
741 const MetadataListCallback& callback)
742 : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
743 section(section), start_marker(start_marker), callback(callback)
744 {}
745 ~MetadataListCR() override {
746 request_cleanup();
747 }
748
749 int send_request() override {
750 req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
751 mgr, section, start_marker, callback);
752 async_rados->queue(req);
753 return 0;
754 }
755 int request_complete() override {
756 return req->get_ret_status();
757 }
758 void request_cleanup() override {
759 if (req) {
760 req->finish();
761 req = nullptr;
762 }
763 }
764 };
765
766 class BucketTrimCR : public RGWCoroutine {
767 rgw::sal::RGWRadosStore *const store;
768 RGWHTTPManager *const http;
769 const BucketTrimConfig& config;
770 BucketTrimObserver *const observer;
771 const rgw_raw_obj& obj;
772 ceph::mono_time start_time;
773 bufferlist notify_replies;
774 BucketChangeCounter counter;
775 std::vector<std::string> buckets; //< buckets selected for trim
776 BucketTrimStatus status;
777 RGWObjVersionTracker objv; //< version tracker for trim status object
778 std::string last_cold_marker; //< position for next trim marker
779
780 static const std::string section; //< metadata section for bucket instances
781 public:
782 BucketTrimCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
783 const BucketTrimConfig& config, BucketTrimObserver *observer,
784 const rgw_raw_obj& obj)
785 : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
786 observer(observer), obj(obj), counter(config.counter_size)
787 {}
788
789 int operate() override;
790 };
791
792 const std::string BucketTrimCR::section{"bucket.instance"};
793
794 int BucketTrimCR::operate()
795 {
796 reenter(this) {
797 start_time = ceph::mono_clock::now();
798
799 if (config.buckets_per_interval) {
800 // query watch/notify for hot buckets
801 ldout(cct, 10) << "fetching active bucket counters" << dendl;
802 set_status("fetching active bucket counters");
803 yield {
804 // request the top bucket counters from each peer gateway
805 const TrimNotifyType type = NotifyTrimCounters;
806 TrimCounters::Request request{32};
807 bufferlist bl;
808 encode(type, bl);
809 encode(request, bl);
810 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
811 &notify_replies));
812 }
813 if (retcode < 0) {
814 ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl;
815 return set_cr_error(retcode);
816 }
817
818 // select the hottest buckets for trim
819 retcode = accumulate_peer_counters(notify_replies, counter);
820 if (retcode < 0) {
821 ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
822 return set_cr_error(retcode);
823 }
824 buckets.reserve(config.buckets_per_interval);
825
826 const int max_count = config.buckets_per_interval -
827 config.min_cold_buckets_per_interval;
828 counter.get_highest(max_count,
829 [this] (const std::string& bucket, int count) {
830 buckets.push_back(bucket);
831 });
832 }
833
834 if (buckets.size() < config.buckets_per_interval) {
835 // read BucketTrimStatus for marker position
836 set_status("reading trim status");
837 using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
838 yield call(new ReadStatus(store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj,
839 &status, true, &objv));
840 if (retcode < 0) {
841 ldout(cct, 10) << "failed to read bilog trim status: "
842 << cpp_strerror(retcode) << dendl;
843 return set_cr_error(retcode);
844 }
845 if (status.marker == "MAX") {
846 status.marker.clear(); // restart at the beginning
847 }
848 ldout(cct, 10) << "listing cold buckets from marker="
849 << status.marker << dendl;
850
851 set_status("listing cold buckets for trim");
852 yield {
853 // capture a reference so 'this' remains valid in the callback
854 auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
855 // list cold buckets to consider for trim
856 auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
857 // filter out keys that we trimmed recently
858 if (observer->trimmed_recently(bucket)) {
859 return true;
860 }
861 // filter out active buckets that we've already selected
862 auto i = std::find(buckets.begin(), buckets.end(), bucket);
863 if (i != buckets.end()) {
864 return true;
865 }
866 buckets.emplace_back(std::move(bucket));
867 // remember the last cold bucket spawned to update the status marker
868 last_cold_marker = std::move(marker);
869 // return true if there's room for more
870 return buckets.size() < config.buckets_per_interval;
871 };
872
873 call(new MetadataListCR(cct, store->svc()->rados->get_async_processor(),
874 store->ctl()->meta.mgr,
875 section, status.marker, cb));
876 }
877 if (retcode < 0) {
878 ldout(cct, 4) << "failed to list bucket instance metadata: "
879 << cpp_strerror(retcode) << dendl;
880 return set_cr_error(retcode);
881 }
882 }
883
884 // trim bucket instances with limited concurrency
885 set_status("trimming buckets");
886 ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
887 yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
888 config.concurrent_buckets));
889 // ignore errors from individual buckets
890
891 // write updated trim status
892 if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
893 set_status("writing updated trim status");
894 status.marker = std::move(last_cold_marker);
895 ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
896 using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
897 yield call(new WriteStatus(store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj,
898 status, &objv));
899 if (retcode < 0) {
900 ldout(cct, 4) << "failed to write updated trim status: "
901 << cpp_strerror(retcode) << dendl;
902 return set_cr_error(retcode);
903 }
904 }
905
906 // notify peers that trim completed
907 set_status("trim completed");
908 yield {
909 const TrimNotifyType type = NotifyTrimComplete;
910 TrimComplete::Request request;
911 bufferlist bl;
912 encode(type, bl);
913 encode(request, bl);
914 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
915 nullptr));
916 }
917 if (retcode < 0) {
918 ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
919 return set_cr_error(retcode);
920 }
921
922 ldout(cct, 4) << "bucket index log processing completed in "
923 << ceph::mono_clock::now() - start_time << dendl;
924 return set_cr_done();
925 }
926 return 0;
927 }
928
929 class BucketTrimPollCR : public RGWCoroutine {
930 rgw::sal::RGWRadosStore *const store;
931 RGWHTTPManager *const http;
932 const BucketTrimConfig& config;
933 BucketTrimObserver *const observer;
934 const rgw_raw_obj& obj;
935 const std::string name{"trim"}; //< lock name
936 const std::string cookie;
937
938 public:
939 BucketTrimPollCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
940 const BucketTrimConfig& config,
941 BucketTrimObserver *observer, const rgw_raw_obj& obj)
942 : RGWCoroutine(store->ctx()), store(store), http(http),
943 config(config), observer(observer), obj(obj),
944 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
945 {}
946
947 int operate() override;
948 };
949
950 int BucketTrimPollCR::operate()
951 {
952 reenter(this) {
953 for (;;) {
954 set_status("sleeping");
955 wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
956
957 // prevent others from trimming for our entire wait interval
958 set_status("acquiring trim lock");
959 yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
960 obj, name, cookie,
961 config.trim_interval_sec));
962 if (retcode < 0) {
963 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
964 continue;
965 }
966
967 set_status("trimming");
968 yield call(new BucketTrimCR(store, http, config, observer, obj));
969 if (retcode < 0) {
970 // on errors, unlock so other gateways can try
971 set_status("unlocking");
972 yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store,
973 obj, name, cookie));
974 }
975 }
976 }
977 return 0;
978 }
979
980 /// tracks a bounded list of events with timestamps. old events can be expired,
981 /// and recent events can be searched by key. expiration depends on events being
982 /// inserted in temporal order
983 template <typename T, typename Clock = ceph::coarse_mono_clock>
984 class RecentEventList {
985 public:
986 using clock_type = Clock;
987 using time_point = typename clock_type::time_point;
988
989 RecentEventList(size_t max_size, const ceph::timespan& max_duration)
990 : events(max_size), max_duration(max_duration)
991 {}
992
993 /// insert an event at the given point in time. this time must be at least as
994 /// recent as the last inserted event
995 void insert(T&& value, const time_point& now) {
996 // ceph_assert(events.empty() || now >= events.back().time)
997 events.push_back(Event{std::move(value), now});
998 }
999
1000 /// performs a linear search for an event matching the given key, whose type
1001 /// U can be any that provides operator==(U, T)
1002 template <typename U>
1003 bool lookup(const U& key) const {
1004 for (const auto& event : events) {
1005 if (key == event.value) {
1006 return true;
1007 }
1008 }
1009 return false;
1010 }
1011
1012 /// remove events that are no longer recent compared to the given point in time
1013 void expire_old(const time_point& now) {
1014 const auto expired_before = now - max_duration;
1015 while (!events.empty() && events.front().time < expired_before) {
1016 events.pop_front();
1017 }
1018 }
1019
1020 private:
1021 struct Event {
1022 T value;
1023 time_point time;
1024 };
1025 boost::circular_buffer<Event> events;
1026 const ceph::timespan max_duration;
1027 };
1028
1029 namespace rgw {
1030
1031 // read bucket trim configuration from ceph context
1032 void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
1033 {
1034 const auto& conf = cct->_conf;
1035
1036 config.trim_interval_sec =
1037 conf.get_val<int64_t>("rgw_sync_log_trim_interval");
1038 config.counter_size = 512;
1039 config.buckets_per_interval =
1040 conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
1041 config.min_cold_buckets_per_interval =
1042 conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
1043 config.concurrent_buckets =
1044 conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
1045 config.notify_timeout_ms = 10000;
1046 config.recent_size = 128;
1047 config.recent_duration = std::chrono::hours(2);
1048 }
1049
1050 class BucketTrimManager::Impl : public TrimCounters::Server,
1051 public BucketTrimObserver {
1052 public:
1053 rgw::sal::RGWRadosStore *const store;
1054 const BucketTrimConfig config;
1055
1056 const rgw_raw_obj status_obj;
1057
1058 /// count frequency of bucket instance entries in the data changes log
1059 BucketChangeCounter counter;
1060
1061 using RecentlyTrimmedBucketList = RecentEventList<std::string>;
1062 using clock_type = RecentlyTrimmedBucketList::clock_type;
1063 /// track recently trimmed buckets to focus trim activity elsewhere
1064 RecentlyTrimmedBucketList trimmed;
1065
1066 /// serve the bucket trim watch/notify api
1067 BucketTrimWatcher watcher;
1068
1069 /// protect data shared between data sync, trim, and watch/notify threads
1070 std::mutex mutex;
1071
1072 Impl(rgw::sal::RGWRadosStore *store, const BucketTrimConfig& config)
1073 : store(store), config(config),
1074 status_obj(store->svc()->zone->get_zone_params().log_pool, BucketTrimStatus::oid),
1075 counter(config.counter_size),
1076 trimmed(config.recent_size, config.recent_duration),
1077 watcher(store, status_obj, this)
1078 {}
1079
1080 /// TrimCounters::Server interface for watch/notify api
1081 void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
1082 buckets.reserve(count);
1083 std::lock_guard<std::mutex> lock(mutex);
1084 counter.get_highest(count, [&buckets] (const std::string& key, int count) {
1085 buckets.emplace_back(key, count);
1086 });
1087 ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
1088 }
1089
1090 void reset_bucket_counters() override {
1091 ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
1092 std::lock_guard<std::mutex> lock(mutex);
1093 counter.clear();
1094 trimmed.expire_old(clock_type::now());
1095 }
1096
1097 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1098 void on_bucket_trimmed(std::string&& bucket_instance) override {
1099 ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
1100 std::lock_guard<std::mutex> lock(mutex);
1101 trimmed.insert(std::move(bucket_instance), clock_type::now());
1102 }
1103
1104 bool trimmed_recently(const std::string_view& bucket_instance) override {
1105 std::lock_guard<std::mutex> lock(mutex);
1106 return trimmed.lookup(bucket_instance);
1107 }
1108 };
1109
1110 BucketTrimManager::BucketTrimManager(rgw::sal::RGWRadosStore *store,
1111 const BucketTrimConfig& config)
1112 : impl(new Impl(store, config))
1113 {
1114 }
1115 BucketTrimManager::~BucketTrimManager() = default;
1116
1117 int BucketTrimManager::init()
1118 {
1119 return impl->watcher.start();
1120 }
1121
1122 void BucketTrimManager::on_bucket_changed(const std::string_view& bucket)
1123 {
1124 std::lock_guard<std::mutex> lock(impl->mutex);
1125 // filter recently trimmed bucket instances out of bucket change counter
1126 if (impl->trimmed.lookup(bucket)) {
1127 return;
1128 }
1129 impl->counter.insert(std::string(bucket));
1130 }
1131
1132 RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
1133 {
1134 return new BucketTrimPollCR(impl->store, http, impl->config,
1135 impl.get(), impl->status_obj);
1136 }
1137
1138 RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
1139 {
1140 // return the trim coroutine without any polling
1141 return new BucketTrimCR(impl->store, http, impl->config,
1142 impl.get(), impl->status_obj);
1143 }
1144
1145 } // namespace rgw