1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2017 Red Hat, Inc
9 * Author: Casey Bodley <cbodley@redhat.com>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
18 #include <boost/circular_buffer.hpp>
19 #include <boost/container/flat_map.hpp>
21 #include "include/scope_guard.h"
22 #include "common/bounded_key_counter.h"
23 #include "common/errno.h"
24 #include "rgw_trim_bilog.h"
25 #include "rgw_cr_rados.h"
26 #include "rgw_cr_rest.h"
27 #include "rgw_cr_tools.h"
28 #include "rgw_data_sync.h"
29 #include "rgw_metadata.h"
33 #include "rgw_bucket.h"
35 #include "services/svc_zone.h"
36 #include "services/svc_meta.h"
38 #include <boost/asio/yield.hpp>
39 #include "include/ceph_assert.h"
41 #define dout_subsys ceph_subsys_rgw
44 #define dout_prefix (*_dout << "trim: ")
46 using rgw::BucketTrimConfig
;
47 using BucketChangeCounter
= BoundedKeyCounter
<std::string
, int>;
49 const std::string
rgw::BucketTrimStatus::oid
= "bilog.trim";
50 using rgw::BucketTrimStatus
;
53 // watch/notify api for gateways to coordinate about which buckets to trim
55 NotifyTrimCounters
= 0,
58 WRITE_RAW_ENCODER(TrimNotifyType
);
60 struct TrimNotifyHandler
{
61 virtual ~TrimNotifyHandler() = default;
63 virtual void handle(bufferlist::const_iterator
& input
, bufferlist
& output
) = 0;
66 /// api to share the bucket trim counters between gateways in the same zone.
67 /// each gateway will process different datalog shards, so the gateway that runs
68 /// the trim process needs to accumulate their counters
70 /// counter for a single bucket
71 struct BucketCounter
{
72 std::string bucket
; //< bucket instance metadata key
75 BucketCounter() = default;
76 BucketCounter(const std::string
& bucket
, int count
)
77 : bucket(bucket
), count(count
) {}
79 void encode(bufferlist
& bl
) const;
80 void decode(bufferlist::const_iterator
& p
);
82 using Vector
= std::vector
<BucketCounter
>;
84 /// request bucket trim counters from peer gateways
86 uint16_t max_buckets
; //< maximum number of bucket counters to return
88 void encode(bufferlist
& bl
) const;
89 void decode(bufferlist::const_iterator
& p
);
92 /// return the current bucket trim counters
94 Vector bucket_counters
;
96 void encode(bufferlist
& bl
) const;
97 void decode(bufferlist::const_iterator
& p
);
100 /// server interface to query the hottest buckets
102 virtual ~Server() = default;
104 virtual void get_bucket_counters(int count
, Vector
& counters
) = 0;
105 virtual void reset_bucket_counters() = 0;
109 class Handler
: public TrimNotifyHandler
{
110 Server
*const server
;
112 explicit Handler(Server
*server
) : server(server
) {}
114 void handle(bufferlist::const_iterator
& input
, bufferlist
& output
) override
;
117 std::ostream
& operator<<(std::ostream
& out
, const TrimCounters::BucketCounter
& rhs
)
119 return out
<< rhs
.bucket
<< ":" << rhs
.count
;
122 void TrimCounters::BucketCounter::encode(bufferlist
& bl
) const
125 // no versioning to save space
129 void TrimCounters::BucketCounter::decode(bufferlist::const_iterator
& p
)
135 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter
);
137 void TrimCounters::Request::encode(bufferlist
& bl
) const
139 ENCODE_START(1, 1, bl
);
140 encode(max_buckets
, bl
);
143 void TrimCounters::Request::decode(bufferlist::const_iterator
& p
)
146 decode(max_buckets
, p
);
149 WRITE_CLASS_ENCODER(TrimCounters::Request
);
151 void TrimCounters::Response::encode(bufferlist
& bl
) const
153 ENCODE_START(1, 1, bl
);
154 encode(bucket_counters
, bl
);
157 void TrimCounters::Response::decode(bufferlist::const_iterator
& p
)
160 decode(bucket_counters
, p
);
163 WRITE_CLASS_ENCODER(TrimCounters::Response
);
165 void TrimCounters::Handler::handle(bufferlist::const_iterator
& input
,
169 decode(request
, input
);
170 auto count
= std::min
<uint16_t>(request
.max_buckets
, 128);
173 server
->get_bucket_counters(count
, response
.bucket_counters
);
174 encode(response
, output
);
177 /// api to notify peer gateways that trim has completed and their bucket change
178 /// counters can be reset
179 struct TrimComplete
{
181 void encode(bufferlist
& bl
) const;
182 void decode(bufferlist::const_iterator
& p
);
185 void encode(bufferlist
& bl
) const;
186 void decode(bufferlist::const_iterator
& p
);
189 /// server interface to reset bucket counters
190 using Server
= TrimCounters::Server
;
193 class Handler
: public TrimNotifyHandler
{
194 Server
*const server
;
196 explicit Handler(Server
*server
) : server(server
) {}
198 void handle(bufferlist::const_iterator
& input
, bufferlist
& output
) override
;
202 void TrimComplete::Request::encode(bufferlist
& bl
) const
204 ENCODE_START(1, 1, bl
);
207 void TrimComplete::Request::decode(bufferlist::const_iterator
& p
)
212 WRITE_CLASS_ENCODER(TrimComplete::Request
);
214 void TrimComplete::Response::encode(bufferlist
& bl
) const
216 ENCODE_START(1, 1, bl
);
219 void TrimComplete::Response::decode(bufferlist::const_iterator
& p
)
224 WRITE_CLASS_ENCODER(TrimComplete::Response
);
226 void TrimComplete::Handler::handle(bufferlist::const_iterator
& input
,
230 decode(request
, input
);
232 server
->reset_bucket_counters();
235 encode(response
, output
);
239 /// rados watcher for bucket trim notifications
240 class BucketTrimWatcher
: public librados::WatchCtx2
{
241 rgw::sal::RGWRadosStore
*const store
;
242 const rgw_raw_obj
& obj
;
246 using HandlerPtr
= std::unique_ptr
<TrimNotifyHandler
>;
247 boost::container::flat_map
<TrimNotifyType
, HandlerPtr
> handlers
;
250 BucketTrimWatcher(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
251 TrimCounters::Server
*counters
)
252 : store(store
), obj(obj
) {
253 handlers
.emplace(NotifyTrimCounters
, new TrimCounters::Handler(counters
));
254 handlers
.emplace(NotifyTrimComplete
, new TrimComplete::Handler(counters
));
257 ~BucketTrimWatcher() {
262 int r
= store
->getRados()->get_raw_obj_ref(obj
, &ref
);
267 // register a watch on the realm's control object
268 r
= ref
.pool
.ioctx().watch2(ref
.obj
.oid
, &handle
, this);
270 constexpr bool exclusive
= true;
271 r
= ref
.pool
.ioctx().create(ref
.obj
.oid
, exclusive
);
272 if (r
== -EEXIST
|| r
== 0) {
273 r
= ref
.pool
.ioctx().watch2(ref
.obj
.oid
, &handle
, this);
277 lderr(store
->ctx()) << "Failed to watch " << ref
.obj
278 << " with " << cpp_strerror(-r
) << dendl
;
279 ref
.pool
.ioctx().close();
283 ldout(store
->ctx(), 10) << "Watching " << ref
.obj
.oid
<< dendl
;
288 int r
= ref
.pool
.ioctx().unwatch2(handle
);
290 lderr(store
->ctx()) << "Failed to unwatch on " << ref
.obj
291 << " with " << cpp_strerror(-r
) << dendl
;
293 r
= ref
.pool
.ioctx().watch2(ref
.obj
.oid
, &handle
, this);
295 lderr(store
->ctx()) << "Failed to restart watch on " << ref
.obj
296 << " with " << cpp_strerror(-r
) << dendl
;
297 ref
.pool
.ioctx().close();
304 ref
.pool
.ioctx().unwatch2(handle
);
305 ref
.pool
.ioctx().close();
309 /// respond to bucket trim notifications
310 void handle_notify(uint64_t notify_id
, uint64_t cookie
,
311 uint64_t notifier_id
, bufferlist
& bl
) override
{
312 if (cookie
!= handle
) {
317 auto p
= bl
.cbegin();
321 auto handler
= handlers
.find(type
);
322 if (handler
!= handlers
.end()) {
323 handler
->second
->handle(p
, reply
);
325 lderr(store
->ctx()) << "no handler for notify type " << type
<< dendl
;
327 } catch (const buffer::error
& e
) {
328 lderr(store
->ctx()) << "Failed to decode notification: " << e
.what() << dendl
;
330 ref
.pool
.ioctx().notify_ack(ref
.obj
.oid
, notify_id
, cookie
, reply
);
333 /// reestablish the watch if it gets disconnected
334 void handle_error(uint64_t cookie
, int err
) override
{
335 if (cookie
!= handle
) {
338 if (err
== -ENOTCONN
) {
339 ldout(store
->ctx(), 4) << "Disconnected watch on " << ref
.obj
<< dendl
;
346 /// Interface to communicate with the trim manager about completed operations
347 struct BucketTrimObserver
{
348 virtual ~BucketTrimObserver() = default;
350 virtual void on_bucket_trimmed(std::string
&& bucket_instance
) = 0;
351 virtual bool trimmed_recently(const boost::string_view
& bucket_instance
) = 0;
354 /// populate the status with the minimum stable marker of each shard
355 template <typename Iter
>
356 int take_min_status(CephContext
*cct
, Iter first
, Iter last
,
357 std::vector
<std::string
> *status
)
359 for (auto peer
= first
; peer
!= last
; ++peer
) {
360 if (peer
->size() != status
->size()) {
361 // all peers must agree on the number of shards
364 auto m
= status
->begin();
365 for (auto& shard
: *peer
) {
367 // if no sync has started, we can safely trim everything
368 if (shard
.state
== rgw_bucket_shard_sync_info::StateInit
) {
371 // always take the first marker, or any later marker that's smaller
372 if (peer
== first
|| marker
> shard
.inc_marker
.position
) {
373 marker
= std::move(shard
.inc_marker
.position
);
380 /// trim each bilog shard to the given marker, while limiting the number of
381 /// concurrent requests
382 class BucketTrimShardCollectCR
: public RGWShardCollectCR
{
383 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
384 rgw::sal::RGWRadosStore
*const store
;
385 const RGWBucketInfo
& bucket_info
;
386 const std::vector
<std::string
>& markers
; //< shard markers to trim
387 size_t i
{0}; //< index of current shard marker
389 BucketTrimShardCollectCR(rgw::sal::RGWRadosStore
*store
, const RGWBucketInfo
& bucket_info
,
390 const std::vector
<std::string
>& markers
)
391 : RGWShardCollectCR(store
->ctx(), MAX_CONCURRENT_SHARDS
),
392 store(store
), bucket_info(bucket_info
), markers(markers
)
394 bool spawn_next() override
;
397 bool BucketTrimShardCollectCR::spawn_next()
399 while (i
< markers
.size()) {
400 const auto& marker
= markers
[i
];
401 const auto shard_id
= i
++;
403 // skip empty markers
404 if (!marker
.empty()) {
405 ldout(cct
, 10) << "trimming bilog shard " << shard_id
406 << " of " << bucket_info
.bucket
<< " at marker " << marker
<< dendl
;
407 spawn(new RGWRadosBILogTrimCR(store
, bucket_info
, shard_id
,
408 std::string
{}, marker
),
416 /// trim the bilog of all of the given bucket instance's shards
417 class BucketTrimInstanceCR
: public RGWCoroutine
{
418 rgw::sal::RGWRadosStore
*const store
;
419 RGWHTTPManager
*const http
;
420 BucketTrimObserver
*const observer
;
421 std::string bucket_instance
;
422 rgw_bucket_get_sync_policy_params get_policy_params
;
423 std::shared_ptr
<rgw_bucket_get_sync_policy_result
> source_policy
;
425 const std::string
& zone_id
; //< my zone id
426 RGWBucketInfo _bucket_info
;
427 const RGWBucketInfo
*pbucket_info
; //< pointer to bucket instance info to locate bucket indices
430 using StatusShards
= std::vector
<rgw_bucket_shard_sync_info
>;
431 std::vector
<StatusShards
> peer_status
; //< sync status for each peer
432 std::vector
<std::string
> min_markers
; //< min marker per shard
435 BucketTrimInstanceCR(rgw::sal::RGWRadosStore
*store
, RGWHTTPManager
*http
,
436 BucketTrimObserver
*observer
,
437 const std::string
& bucket_instance
)
438 : RGWCoroutine(store
->ctx()), store(store
),
439 http(http
), observer(observer
),
440 bucket_instance(bucket_instance
),
441 zone_id(store
->svc()->zone
->get_zone().id
) {
442 rgw_bucket_parse_bucket_key(cct
, bucket_instance
, &bucket
, nullptr);
443 source_policy
= make_shared
<rgw_bucket_get_sync_policy_result
>();
446 int operate() override
;
449 int BucketTrimInstanceCR::operate()
452 ldout(cct
, 4) << "starting trim on bucket=" << bucket_instance
<< dendl
;
454 get_policy_params
.zone
= zone_id
;
455 get_policy_params
.bucket
= bucket
;
456 yield
call(new RGWBucketGetSyncPolicyHandlerCR(store
->svc()->rados
->get_async_processor(),
461 if (retcode
!= -ENOENT
) {
462 ldout(cct
, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket
<< dendl
;
465 return set_cr_error(retcode
);
468 if (auto& opt_bucket_info
= source_policy
->policy_handler
->get_bucket_info();
470 pbucket_info
= &(*opt_bucket_info
);
472 /* this shouldn't really happen */
473 return set_cr_error(-ENOENT
);
476 // query peers for sync status
477 set_status("fetching sync status from relevant peers");
479 const auto& all_dests
= source_policy
->policy_handler
->get_all_dests();
481 vector
<rgw_zone_id
> zids
;
482 rgw_zone_id last_zid
;
483 for (auto& diter
: all_dests
) {
484 const auto& zid
= diter
.first
;
485 if (zid
== last_zid
) {
492 peer_status
.resize(zids
.size());
494 auto& zone_conn_map
= store
->svc()->zone
->get_zone_conn_map();
496 auto p
= peer_status
.begin();
497 for (auto& zid
: zids
) {
498 // query data sync status from each sync peer
499 rgw_http_param_pair params
[] = {
500 { "type", "bucket-index" },
501 { "status", nullptr },
502 { "options", "merge" },
503 { "bucket", bucket_instance
.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket
504 param is not provided */
505 { "source-zone", zone_id
.c_str() },
509 auto ziter
= zone_conn_map
.find(zid
);
510 if (ziter
== zone_conn_map
.end()) {
511 ldout(cct
, 0) << "WARNING: no connection to zone " << zid
<< ", can't trim bucket: " << bucket
<< dendl
;
512 return set_cr_error(-ECANCELED
);
514 using StatusCR
= RGWReadRESTResourceCR
<StatusShards
>;
515 spawn(new StatusCR(cct
, ziter
->second
, http
, "/admin/log/", params
, &*p
),
520 // wait for a response from each peer. all must respond to attempt trim
521 while (num_spawned()) {
522 yield
wait_for_child();
523 collect(&child_ret
, nullptr);
526 return set_cr_error(child_ret
);
530 // initialize each shard with the maximum marker, which is only used when
531 // there are no peers syncing from us
532 min_markers
.assign(std::max(1u, pbucket_info
->num_shards
),
533 RGWSyncLogTrimCR::max_marker
);
535 // determine the minimum marker for each shard
536 retcode
= take_min_status(cct
, peer_status
.begin(), peer_status
.end(),
539 ldout(cct
, 4) << "failed to correlate bucket sync status from peers" << dendl
;
540 return set_cr_error(retcode
);
543 // trim shards with a ShardCollectCR
544 ldout(cct
, 10) << "trimming bilogs for bucket=" << pbucket_info
->bucket
545 << " markers=" << min_markers
<< ", shards=" << min_markers
.size() << dendl
;
546 set_status("trimming bilog shards");
547 yield
call(new BucketTrimShardCollectCR(store
, *pbucket_info
, min_markers
));
548 // ENODATA just means there were no keys to trim
549 if (retcode
== -ENODATA
) {
553 ldout(cct
, 4) << "failed to trim bilog shards: "
554 << cpp_strerror(retcode
) << dendl
;
555 return set_cr_error(retcode
);
558 observer
->on_bucket_trimmed(std::move(bucket_instance
));
559 return set_cr_done();
564 /// trim each bucket instance while limiting the number of concurrent operations
565 class BucketTrimInstanceCollectCR
: public RGWShardCollectCR
{
566 rgw::sal::RGWRadosStore
*const store
;
567 RGWHTTPManager
*const http
;
568 BucketTrimObserver
*const observer
;
569 std::vector
<std::string
>::const_iterator bucket
;
570 std::vector
<std::string
>::const_iterator end
;
572 BucketTrimInstanceCollectCR(rgw::sal::RGWRadosStore
*store
, RGWHTTPManager
*http
,
573 BucketTrimObserver
*observer
,
574 const std::vector
<std::string
>& buckets
,
576 : RGWShardCollectCR(store
->ctx(), max_concurrent
),
577 store(store
), http(http
), observer(observer
),
578 bucket(buckets
.begin()), end(buckets
.end())
580 bool spawn_next() override
;
583 bool BucketTrimInstanceCollectCR::spawn_next()
588 spawn(new BucketTrimInstanceCR(store
, http
, observer
, *bucket
), false);
593 /// correlate the replies from each peer gateway into the given counter
594 int accumulate_peer_counters(bufferlist
& bl
, BucketChangeCounter
& counter
)
599 // decode notify responses
600 auto p
= bl
.cbegin();
601 std::map
<std::pair
<uint64_t, uint64_t>, bufferlist
> replies
;
602 std::set
<std::pair
<uint64_t, uint64_t>> timeouts
;
606 for (auto& peer
: replies
) {
607 auto q
= peer
.second
.cbegin();
608 TrimCounters::Response response
;
610 for (const auto& b
: response
.bucket_counters
) {
611 counter
.insert(b
.bucket
, b
.count
);
614 } catch (const buffer::error
& e
) {
620 /// metadata callback has the signature bool(string&& key, string&& marker)
621 using MetadataListCallback
= std::function
<bool(std::string
&&, std::string
&&)>;
623 /// lists metadata keys, passing each to a callback until it returns false.
624 /// on reaching the end, it will restart at the beginning and list up to the
626 class AsyncMetadataList
: public RGWAsyncRadosRequest
{
627 CephContext
*const cct
;
628 RGWMetadataManager
*const mgr
;
629 const std::string section
;
630 const std::string start_marker
;
631 MetadataListCallback callback
;
633 int _send_request() override
;
635 AsyncMetadataList(CephContext
*cct
, RGWCoroutine
*caller
,
636 RGWAioCompletionNotifier
*cn
, RGWMetadataManager
*mgr
,
637 const std::string
& section
, const std::string
& start_marker
,
638 const MetadataListCallback
& callback
)
639 : RGWAsyncRadosRequest(caller
, cn
), cct(cct
), mgr(mgr
),
640 section(section
), start_marker(start_marker
), callback(callback
)
644 int AsyncMetadataList::_send_request()
646 void* handle
= nullptr;
647 std::list
<std::string
> keys
;
648 bool truncated
{false};
651 // start a listing at the given marker
652 int r
= mgr
->list_keys_init(section
, start_marker
, &handle
);
654 // restart with empty marker below
656 ldout(cct
, 10) << "failed to init metadata listing: "
657 << cpp_strerror(r
) << dendl
;
660 ldout(cct
, 20) << "starting metadata listing at " << start_marker
<< dendl
;
662 // release the handle when scope exits
663 auto g
= make_scope_guard([=] { mgr
->list_keys_complete(handle
); });
666 // get the next key and marker
667 r
= mgr
->list_keys_next(handle
, 1, keys
, &truncated
);
669 ldout(cct
, 10) << "failed to list metadata: "
670 << cpp_strerror(r
) << dendl
;
673 marker
= mgr
->get_marker(handle
);
676 ceph_assert(keys
.size() == 1);
677 auto& key
= keys
.front();
678 if (!callback(std::move(key
), std::move(marker
))) {
684 if (start_marker
.empty()) {
685 // already listed all keys
690 // restart the listing from the beginning (empty marker)
693 r
= mgr
->list_keys_init(section
, "", &handle
);
695 ldout(cct
, 10) << "failed to restart metadata listing: "
696 << cpp_strerror(r
) << dendl
;
699 ldout(cct
, 20) << "restarting metadata listing" << dendl
;
701 // release the handle when scope exits
702 auto g
= make_scope_guard([=] { mgr
->list_keys_complete(handle
); });
704 // get the next key and marker
705 r
= mgr
->list_keys_next(handle
, 1, keys
, &truncated
);
707 ldout(cct
, 10) << "failed to list metadata: "
708 << cpp_strerror(r
) << dendl
;
711 marker
= mgr
->get_marker(handle
);
714 ceph_assert(keys
.size() == 1);
715 auto& key
= keys
.front();
716 // stop at original marker
717 if (marker
> start_marker
) {
720 if (!callback(std::move(key
), std::move(marker
))) {
729 /// coroutine wrapper for AsyncMetadataList
730 class MetadataListCR
: public RGWSimpleCoroutine
{
731 RGWAsyncRadosProcessor
*const async_rados
;
732 RGWMetadataManager
*const mgr
;
733 const std::string
& section
;
734 const std::string
& start_marker
;
735 MetadataListCallback callback
;
736 RGWAsyncRadosRequest
*req
{nullptr};
738 MetadataListCR(CephContext
*cct
, RGWAsyncRadosProcessor
*async_rados
,
739 RGWMetadataManager
*mgr
, const std::string
& section
,
740 const std::string
& start_marker
,
741 const MetadataListCallback
& callback
)
742 : RGWSimpleCoroutine(cct
), async_rados(async_rados
), mgr(mgr
),
743 section(section
), start_marker(start_marker
), callback(callback
)
745 ~MetadataListCR() override
{
749 int send_request() override
{
750 req
= new AsyncMetadataList(cct
, this, stack
->create_completion_notifier(),
751 mgr
, section
, start_marker
, callback
);
752 async_rados
->queue(req
);
755 int request_complete() override
{
756 return req
->get_ret_status();
758 void request_cleanup() override
{
766 class BucketTrimCR
: public RGWCoroutine
{
767 rgw::sal::RGWRadosStore
*const store
;
768 RGWHTTPManager
*const http
;
769 const BucketTrimConfig
& config
;
770 BucketTrimObserver
*const observer
;
771 const rgw_raw_obj
& obj
;
772 ceph::mono_time start_time
;
773 bufferlist notify_replies
;
774 BucketChangeCounter counter
;
775 std::vector
<std::string
> buckets
; //< buckets selected for trim
776 BucketTrimStatus status
;
777 RGWObjVersionTracker objv
; //< version tracker for trim status object
778 std::string last_cold_marker
; //< position for next trim marker
780 static const std::string section
; //< metadata section for bucket instances
782 BucketTrimCR(rgw::sal::RGWRadosStore
*store
, RGWHTTPManager
*http
,
783 const BucketTrimConfig
& config
, BucketTrimObserver
*observer
,
784 const rgw_raw_obj
& obj
)
785 : RGWCoroutine(store
->ctx()), store(store
), http(http
), config(config
),
786 observer(observer
), obj(obj
), counter(config
.counter_size
)
789 int operate() override
;
792 const std::string
BucketTrimCR::section
{"bucket.instance"};
794 int BucketTrimCR::operate()
797 start_time
= ceph::mono_clock::now();
799 if (config
.buckets_per_interval
) {
800 // query watch/notify for hot buckets
801 ldout(cct
, 10) << "fetching active bucket counters" << dendl
;
802 set_status("fetching active bucket counters");
804 // request the top bucket counters from each peer gateway
805 const TrimNotifyType type
= NotifyTrimCounters
;
806 TrimCounters::Request request
{32};
810 call(new RGWRadosNotifyCR(store
, obj
, bl
, config
.notify_timeout_ms
,
814 ldout(cct
, 10) << "failed to fetch peer bucket counters" << dendl
;
815 return set_cr_error(retcode
);
818 // select the hottest buckets for trim
819 retcode
= accumulate_peer_counters(notify_replies
, counter
);
821 ldout(cct
, 4) << "failed to correlate peer bucket counters" << dendl
;
822 return set_cr_error(retcode
);
824 buckets
.reserve(config
.buckets_per_interval
);
826 const int max_count
= config
.buckets_per_interval
-
827 config
.min_cold_buckets_per_interval
;
828 counter
.get_highest(max_count
,
829 [this] (const std::string
& bucket
, int count
) {
830 buckets
.push_back(bucket
);
834 if (buckets
.size() < config
.buckets_per_interval
) {
835 // read BucketTrimStatus for marker position
836 set_status("reading trim status");
837 using ReadStatus
= RGWSimpleRadosReadCR
<BucketTrimStatus
>;
838 yield
call(new ReadStatus(store
->svc()->rados
->get_async_processor(), store
->svc()->sysobj
, obj
,
839 &status
, true, &objv
));
841 ldout(cct
, 10) << "failed to read bilog trim status: "
842 << cpp_strerror(retcode
) << dendl
;
843 return set_cr_error(retcode
);
845 if (status
.marker
== "MAX") {
846 status
.marker
.clear(); // restart at the beginning
848 ldout(cct
, 10) << "listing cold buckets from marker="
849 << status
.marker
<< dendl
;
851 set_status("listing cold buckets for trim");
853 // capture a reference so 'this' remains valid in the callback
854 auto ref
= boost::intrusive_ptr
<RGWCoroutine
>{this};
855 // list cold buckets to consider for trim
856 auto cb
= [this, ref
] (std::string
&& bucket
, std::string
&& marker
) {
857 // filter out keys that we trimmed recently
858 if (observer
->trimmed_recently(bucket
)) {
861 // filter out active buckets that we've already selected
862 auto i
= std::find(buckets
.begin(), buckets
.end(), bucket
);
863 if (i
!= buckets
.end()) {
866 buckets
.emplace_back(std::move(bucket
));
867 // remember the last cold bucket spawned to update the status marker
868 last_cold_marker
= std::move(marker
);
869 // return true if there's room for more
870 return buckets
.size() < config
.buckets_per_interval
;
873 call(new MetadataListCR(cct
, store
->svc()->rados
->get_async_processor(),
874 store
->ctl()->meta
.mgr
,
875 section
, status
.marker
, cb
));
878 ldout(cct
, 4) << "failed to list bucket instance metadata: "
879 << cpp_strerror(retcode
) << dendl
;
880 return set_cr_error(retcode
);
884 // trim bucket instances with limited concurrency
885 set_status("trimming buckets");
886 ldout(cct
, 4) << "collected " << buckets
.size() << " buckets for trim" << dendl
;
887 yield
call(new BucketTrimInstanceCollectCR(store
, http
, observer
, buckets
,
888 config
.concurrent_buckets
));
889 // ignore errors from individual buckets
891 // write updated trim status
892 if (!last_cold_marker
.empty() && status
.marker
!= last_cold_marker
) {
893 set_status("writing updated trim status");
894 status
.marker
= std::move(last_cold_marker
);
895 ldout(cct
, 20) << "writing bucket trim marker=" << status
.marker
<< dendl
;
896 using WriteStatus
= RGWSimpleRadosWriteCR
<BucketTrimStatus
>;
897 yield
call(new WriteStatus(store
->svc()->rados
->get_async_processor(), store
->svc()->sysobj
, obj
,
900 ldout(cct
, 4) << "failed to write updated trim status: "
901 << cpp_strerror(retcode
) << dendl
;
902 return set_cr_error(retcode
);
906 // notify peers that trim completed
907 set_status("trim completed");
909 const TrimNotifyType type
= NotifyTrimComplete
;
910 TrimComplete::Request request
;
914 call(new RGWRadosNotifyCR(store
, obj
, bl
, config
.notify_timeout_ms
,
918 ldout(cct
, 10) << "failed to notify peers of trim completion" << dendl
;
919 return set_cr_error(retcode
);
922 ldout(cct
, 4) << "bucket index log processing completed in "
923 << ceph::mono_clock::now() - start_time
<< dendl
;
924 return set_cr_done();
929 class BucketTrimPollCR
: public RGWCoroutine
{
930 rgw::sal::RGWRadosStore
*const store
;
931 RGWHTTPManager
*const http
;
932 const BucketTrimConfig
& config
;
933 BucketTrimObserver
*const observer
;
934 const rgw_raw_obj
& obj
;
935 const std::string name
{"trim"}; //< lock name
936 const std::string cookie
;
939 BucketTrimPollCR(rgw::sal::RGWRadosStore
*store
, RGWHTTPManager
*http
,
940 const BucketTrimConfig
& config
,
941 BucketTrimObserver
*observer
, const rgw_raw_obj
& obj
)
942 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
943 config(config
), observer(observer
), obj(obj
),
944 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
))
947 int operate() override
;
950 int BucketTrimPollCR::operate()
954 set_status("sleeping");
955 wait(utime_t
{static_cast<time_t>(config
.trim_interval_sec
), 0});
957 // prevent others from trimming for our entire wait interval
958 set_status("acquiring trim lock");
959 yield
call(new RGWSimpleRadosLockCR(store
->svc()->rados
->get_async_processor(), store
,
961 config
.trim_interval_sec
));
963 ldout(cct
, 4) << "failed to lock: " << cpp_strerror(retcode
) << dendl
;
967 set_status("trimming");
968 yield
call(new BucketTrimCR(store
, http
, config
, observer
, obj
));
970 // on errors, unlock so other gateways can try
971 set_status("unlocking");
972 yield
call(new RGWSimpleRadosUnlockCR(store
->svc()->rados
->get_async_processor(), store
,
980 /// tracks a bounded list of events with timestamps. old events can be expired,
981 /// and recent events can be searched by key. expiration depends on events being
982 /// inserted in temporal order
983 template <typename T
, typename Clock
= ceph::coarse_mono_clock
>
984 class RecentEventList
{
986 using clock_type
= Clock
;
987 using time_point
= typename
clock_type::time_point
;
989 RecentEventList(size_t max_size
, const ceph::timespan
& max_duration
)
990 : events(max_size
), max_duration(max_duration
)
993 /// insert an event at the given point in time. this time must be at least as
994 /// recent as the last inserted event
995 void insert(T
&& value
, const time_point
& now
) {
996 // ceph_assert(events.empty() || now >= events.back().time)
997 events
.push_back(Event
{std::move(value
), now
});
1000 /// performs a linear search for an event matching the given key, whose type
1001 /// U can be any that provides operator==(U, T)
1002 template <typename U
>
1003 bool lookup(const U
& key
) const {
1004 for (const auto& event
: events
) {
1005 if (key
== event
.value
) {
1012 /// remove events that are no longer recent compared to the given point in time
1013 void expire_old(const time_point
& now
) {
1014 const auto expired_before
= now
- max_duration
;
1015 while (!events
.empty() && events
.front().time
< expired_before
) {
1025 boost::circular_buffer
<Event
> events
;
1026 const ceph::timespan max_duration
;
1031 // read bucket trim configuration from ceph context
1032 void configure_bucket_trim(CephContext
*cct
, BucketTrimConfig
& config
)
1034 const auto& conf
= cct
->_conf
;
1036 config
.trim_interval_sec
=
1037 conf
.get_val
<int64_t>("rgw_sync_log_trim_interval");
1038 config
.counter_size
= 512;
1039 config
.buckets_per_interval
=
1040 conf
.get_val
<int64_t>("rgw_sync_log_trim_max_buckets");
1041 config
.min_cold_buckets_per_interval
=
1042 conf
.get_val
<int64_t>("rgw_sync_log_trim_min_cold_buckets");
1043 config
.concurrent_buckets
=
1044 conf
.get_val
<int64_t>("rgw_sync_log_trim_concurrent_buckets");
1045 config
.notify_timeout_ms
= 10000;
1046 config
.recent_size
= 128;
1047 config
.recent_duration
= std::chrono::hours(2);
1050 class BucketTrimManager::Impl
: public TrimCounters::Server
,
1051 public BucketTrimObserver
{
1053 rgw::sal::RGWRadosStore
*const store
;
1054 const BucketTrimConfig config
;
1056 const rgw_raw_obj status_obj
;
1058 /// count frequency of bucket instance entries in the data changes log
1059 BucketChangeCounter counter
;
1061 using RecentlyTrimmedBucketList
= RecentEventList
<std::string
>;
1062 using clock_type
= RecentlyTrimmedBucketList::clock_type
;
1063 /// track recently trimmed buckets to focus trim activity elsewhere
1064 RecentlyTrimmedBucketList trimmed
;
1066 /// serve the bucket trim watch/notify api
1067 BucketTrimWatcher watcher
;
1069 /// protect data shared between data sync, trim, and watch/notify threads
1072 Impl(rgw::sal::RGWRadosStore
*store
, const BucketTrimConfig
& config
)
1073 : store(store
), config(config
),
1074 status_obj(store
->svc()->zone
->get_zone_params().log_pool
, BucketTrimStatus::oid
),
1075 counter(config
.counter_size
),
1076 trimmed(config
.recent_size
, config
.recent_duration
),
1077 watcher(store
, status_obj
, this)
1080 /// TrimCounters::Server interface for watch/notify api
1081 void get_bucket_counters(int count
, TrimCounters::Vector
& buckets
) {
1082 buckets
.reserve(count
);
1083 std::lock_guard
<std::mutex
> lock(mutex
);
1084 counter
.get_highest(count
, [&buckets
] (const std::string
& key
, int count
) {
1085 buckets
.emplace_back(key
, count
);
1087 ldout(store
->ctx(), 20) << "get_bucket_counters: " << buckets
<< dendl
;
1090 void reset_bucket_counters() override
{
1091 ldout(store
->ctx(), 20) << "bucket trim completed" << dendl
;
1092 std::lock_guard
<std::mutex
> lock(mutex
);
1094 trimmed
.expire_old(clock_type::now());
1097 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1098 void on_bucket_trimmed(std::string
&& bucket_instance
) override
{
1099 ldout(store
->ctx(), 20) << "trimmed bucket instance " << bucket_instance
<< dendl
;
1100 std::lock_guard
<std::mutex
> lock(mutex
);
1101 trimmed
.insert(std::move(bucket_instance
), clock_type::now());
1104 bool trimmed_recently(const boost::string_view
& bucket_instance
) override
{
1105 std::lock_guard
<std::mutex
> lock(mutex
);
1106 return trimmed
.lookup(bucket_instance
);
1110 BucketTrimManager::BucketTrimManager(rgw::sal::RGWRadosStore
*store
,
1111 const BucketTrimConfig
& config
)
1112 : impl(new Impl(store
, config
))
1115 BucketTrimManager::~BucketTrimManager() = default;
1117 int BucketTrimManager::init()
1119 return impl
->watcher
.start();
1122 void BucketTrimManager::on_bucket_changed(const boost::string_view
& bucket
)
1124 std::lock_guard
<std::mutex
> lock(impl
->mutex
);
1125 // filter recently trimmed bucket instances out of bucket change counter
1126 if (impl
->trimmed
.lookup(bucket
)) {
1129 impl
->counter
.insert(bucket
.to_string());
1132 RGWCoroutine
* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager
*http
)
1134 return new BucketTrimPollCR(impl
->store
, http
, impl
->config
,
1135 impl
.get(), impl
->status_obj
);
1138 RGWCoroutine
* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager
*http
)
1140 // return the trim coroutine without any polling
1141 return new BucketTrimCR(impl
->store
, http
, impl
->config
,
1142 impl
.get(), impl
->status_obj
);