1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * Author: Casey Bodley <cbodley@redhat.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include <boost/circular_buffer.hpp>
18 #include <boost/container/flat_map.hpp>
20 #include "include/scope_guard.h"
21 #include "common/bounded_key_counter.h"
22 #include "common/errno.h"
23 #include "rgw_sync_log_trim.h"
24 #include "rgw_cr_rados.h"
25 #include "rgw_cr_rest.h"
26 #include "rgw_data_sync.h"
27 #include "rgw_metadata.h"
28 #include "rgw_rados.h"
31 #include <boost/asio/yield.hpp>
32 #include "include/assert.h"
34 #define dout_subsys ceph_subsys_rgw
37 #define dout_prefix (*_dout << "trim: ")
39 using rgw::BucketTrimConfig
;
40 using BucketChangeCounter
= BoundedKeyCounter
<std::string
, int>;
42 const std::string
rgw::BucketTrimStatus::oid
= "bilog.trim";
43 using rgw::BucketTrimStatus
;
46 // watch/notify api for gateways to coordinate about which buckets to trim
48 NotifyTrimCounters
= 0,
51 WRITE_RAW_ENCODER(TrimNotifyType
);
53 struct TrimNotifyHandler
{
54 virtual ~TrimNotifyHandler() = default;
56 virtual void handle(bufferlist::iterator
& input
, bufferlist
& output
) = 0;
59 /// api to share the bucket trim counters between gateways in the same zone.
60 /// each gateway will process different datalog shards, so the gateway that runs
61 /// the trim process needs to accumulate their counters
63 /// counter for a single bucket
64 struct BucketCounter
{
65 std::string bucket
; //< bucket instance metadata key
68 BucketCounter() = default;
69 BucketCounter(const std::string
& bucket
, int count
)
70 : bucket(bucket
), count(count
) {}
72 void encode(bufferlist
& bl
) const;
73 void decode(bufferlist::iterator
& p
);
75 using Vector
= std::vector
<BucketCounter
>;
77 /// request bucket trim counters from peer gateways
79 uint16_t max_buckets
; //< maximum number of bucket counters to return
81 void encode(bufferlist
& bl
) const;
82 void decode(bufferlist::iterator
& p
);
85 /// return the current bucket trim counters
87 Vector bucket_counters
;
89 void encode(bufferlist
& bl
) const;
90 void decode(bufferlist::iterator
& p
);
93 /// server interface to query the hottest buckets
95 virtual ~Server() = default;
97 virtual void get_bucket_counters(int count
, Vector
& counters
) = 0;
98 virtual void reset_bucket_counters() = 0;
102 class Handler
: public TrimNotifyHandler
{
103 Server
*const server
;
105 Handler(Server
*server
) : server(server
) {}
107 void handle(bufferlist::iterator
& input
, bufferlist
& output
) override
;
110 std::ostream
& operator<<(std::ostream
& out
, const TrimCounters::BucketCounter
& rhs
)
112 return out
<< rhs
.bucket
<< ":" << rhs
.count
;
115 void TrimCounters::BucketCounter::encode(bufferlist
& bl
) const
117 // no versioning to save space
118 ::encode(bucket
, bl
);
121 void TrimCounters::BucketCounter::decode(bufferlist::iterator
& p
)
126 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter
);
128 void TrimCounters::Request::encode(bufferlist
& bl
) const
130 ENCODE_START(1, 1, bl
);
131 ::encode(max_buckets
, bl
);
134 void TrimCounters::Request::decode(bufferlist::iterator
& p
)
137 ::decode(max_buckets
, p
);
140 WRITE_CLASS_ENCODER(TrimCounters::Request
);
142 void TrimCounters::Response::encode(bufferlist
& bl
) const
144 ENCODE_START(1, 1, bl
);
145 ::encode(bucket_counters
, bl
);
148 void TrimCounters::Response::decode(bufferlist::iterator
& p
)
151 ::decode(bucket_counters
, p
);
154 WRITE_CLASS_ENCODER(TrimCounters::Response
);
156 void TrimCounters::Handler::handle(bufferlist::iterator
& input
,
160 ::decode(request
, input
);
161 auto count
= std::min
<uint16_t>(request
.max_buckets
, 128);
164 server
->get_bucket_counters(count
, response
.bucket_counters
);
165 ::encode(response
, output
);
168 /// api to notify peer gateways that trim has completed and their bucket change
169 /// counters can be reset
170 struct TrimComplete
{
172 void encode(bufferlist
& bl
) const;
173 void decode(bufferlist::iterator
& p
);
176 void encode(bufferlist
& bl
) const;
177 void decode(bufferlist::iterator
& p
);
180 /// server interface to reset bucket counters
181 using Server
= TrimCounters::Server
;
184 class Handler
: public TrimNotifyHandler
{
185 Server
*const server
;
187 Handler(Server
*server
) : server(server
) {}
189 void handle(bufferlist::iterator
& input
, bufferlist
& output
) override
;
193 void TrimComplete::Request::encode(bufferlist
& bl
) const
195 ENCODE_START(1, 1, bl
);
198 void TrimComplete::Request::decode(bufferlist::iterator
& p
)
203 WRITE_CLASS_ENCODER(TrimComplete::Request
);
205 void TrimComplete::Response::encode(bufferlist
& bl
) const
207 ENCODE_START(1, 1, bl
);
210 void TrimComplete::Response::decode(bufferlist::iterator
& p
)
215 WRITE_CLASS_ENCODER(TrimComplete::Response
);
217 void TrimComplete::Handler::handle(bufferlist::iterator
& input
,
221 ::decode(request
, input
);
223 server
->reset_bucket_counters();
226 ::encode(response
, output
);
230 /// rados watcher for bucket trim notifications
231 class BucketTrimWatcher
: public librados::WatchCtx2
{
232 RGWRados
*const store
;
233 const rgw_raw_obj
& obj
;
237 using HandlerPtr
= std::unique_ptr
<TrimNotifyHandler
>;
238 boost::container::flat_map
<TrimNotifyType
, HandlerPtr
> handlers
;
241 BucketTrimWatcher(RGWRados
*store
, const rgw_raw_obj
& obj
,
242 TrimCounters::Server
*counters
)
243 : store(store
), obj(obj
) {
244 handlers
.emplace(NotifyTrimCounters
, new TrimCounters::Handler(counters
));
245 handlers
.emplace(NotifyTrimComplete
, new TrimComplete::Handler(counters
));
248 ~BucketTrimWatcher() {
253 int r
= store
->get_raw_obj_ref(obj
, &ref
);
258 // register a watch on the realm's control object
259 r
= ref
.ioctx
.watch2(ref
.oid
, &handle
, this);
261 constexpr bool exclusive
= true;
262 r
= ref
.ioctx
.create(ref
.oid
, exclusive
);
263 if (r
== -EEXIST
|| r
== 0) {
264 r
= ref
.ioctx
.watch2(ref
.oid
, &handle
, this);
268 lderr(store
->ctx()) << "Failed to watch " << ref
.oid
269 << " with " << cpp_strerror(-r
) << dendl
;
274 ldout(store
->ctx(), 10) << "Watching " << ref
.oid
<< dendl
;
279 int r
= ref
.ioctx
.unwatch2(handle
);
281 lderr(store
->ctx()) << "Failed to unwatch on " << ref
.oid
282 << " with " << cpp_strerror(-r
) << dendl
;
284 r
= ref
.ioctx
.watch2(ref
.oid
, &handle
, this);
286 lderr(store
->ctx()) << "Failed to restart watch on " << ref
.oid
287 << " with " << cpp_strerror(-r
) << dendl
;
295 ref
.ioctx
.unwatch2(handle
);
300 /// respond to bucket trim notifications
301 void handle_notify(uint64_t notify_id
, uint64_t cookie
,
302 uint64_t notifier_id
, bufferlist
& bl
) override
{
303 if (cookie
!= handle
) {
312 auto handler
= handlers
.find(type
);
313 if (handler
!= handlers
.end()) {
314 handler
->second
->handle(p
, reply
);
316 lderr(store
->ctx()) << "no handler for notify type " << type
<< dendl
;
318 } catch (const buffer::error
& e
) {
319 lderr(store
->ctx()) << "Failed to decode notification: " << e
.what() << dendl
;
321 ref
.ioctx
.notify_ack(ref
.oid
, notify_id
, cookie
, reply
);
324 /// reestablish the watch if it gets disconnected
325 void handle_error(uint64_t cookie
, int err
) override
{
326 if (cookie
!= handle
) {
329 if (err
== -ENOTCONN
) {
330 ldout(store
->ctx(), 4) << "Disconnected watch on " << ref
.oid
<< dendl
;
337 /// Interface to communicate with the trim manager about completed operations
338 struct BucketTrimObserver
{
339 virtual ~BucketTrimObserver() = default;
341 virtual void on_bucket_trimmed(std::string
&& bucket_instance
) = 0;
342 virtual bool trimmed_recently(const boost::string_view
& bucket_instance
) = 0;
345 /// populate the status with the minimum stable marker of each shard
346 template <typename Iter
>
347 int take_min_status(CephContext
*cct
, Iter first
, Iter last
,
348 std::vector
<std::string
> *status
)
351 boost::optional
<size_t> num_shards
;
352 for (auto peer
= first
; peer
!= last
; ++peer
) {
353 const size_t peer_shards
= peer
->size();
355 num_shards
= peer_shards
;
356 status
->resize(*num_shards
);
357 } else if (*num_shards
!= peer_shards
) {
358 // all peers must agree on the number of shards
361 auto m
= status
->begin();
362 for (auto& shard
: *peer
) {
364 // only consider incremental sync markers
365 if (shard
.state
!= rgw_bucket_shard_sync_info::StateIncrementalSync
) {
368 // always take the first marker, or any later marker that's smaller
369 if (peer
== first
|| marker
> shard
.inc_marker
.position
) {
370 marker
= std::move(shard
.inc_marker
.position
);
377 /// trim each bilog shard to the given marker, while limiting the number of
378 /// concurrent requests
379 class BucketTrimShardCollectCR
: public RGWShardCollectCR
{
380 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
381 RGWRados
*const store
;
382 const RGWBucketInfo
& bucket_info
;
383 const std::vector
<std::string
>& markers
; //< shard markers to trim
384 size_t i
{0}; //< index of current shard marker
386 BucketTrimShardCollectCR(RGWRados
*store
, const RGWBucketInfo
& bucket_info
,
387 const std::vector
<std::string
>& markers
)
388 : RGWShardCollectCR(store
->ctx(), MAX_CONCURRENT_SHARDS
),
389 store(store
), bucket_info(bucket_info
), markers(markers
)
391 bool spawn_next() override
;
394 bool BucketTrimShardCollectCR::spawn_next()
396 while (i
< markers
.size()) {
397 const auto& marker
= markers
[i
];
398 const auto shard_id
= i
++;
400 // skip empty markers
401 if (!marker
.empty()) {
402 ldout(cct
, 10) << "trimming bilog shard " << shard_id
403 << " of " << bucket_info
.bucket
<< " at marker " << marker
<< dendl
;
404 spawn(new RGWRadosBILogTrimCR(store
, bucket_info
, shard_id
,
405 std::string
{}, marker
),
413 /// trim the bilog of all of the given bucket instance's shards
414 class BucketTrimInstanceCR
: public RGWCoroutine
{
415 RGWRados
*const store
;
416 RGWHTTPManager
*const http
;
417 BucketTrimObserver
*const observer
;
418 std::string bucket_instance
;
419 const std::string
& zone_id
; //< my zone id
420 RGWBucketInfo bucket_info
; //< bucket instance info to locate bucket indices
422 using StatusShards
= std::vector
<rgw_bucket_shard_sync_info
>;
423 std::vector
<StatusShards
> peer_status
; //< sync status for each peer
424 std::vector
<std::string
> min_markers
; //< min marker per shard
427 BucketTrimInstanceCR(RGWRados
*store
, RGWHTTPManager
*http
,
428 BucketTrimObserver
*observer
,
429 const std::string
& bucket_instance
)
430 : RGWCoroutine(store
->ctx()), store(store
),
431 http(http
), observer(observer
),
432 bucket_instance(bucket_instance
),
433 zone_id(store
->get_zone().id
),
434 peer_status(store
->zone_conn_map
.size())
437 int operate() override
;
440 int BucketTrimInstanceCR::operate()
443 ldout(cct
, 4) << "starting trim on bucket=" << bucket_instance
<< dendl
;
445 // query peers for sync status
446 set_status("fetching sync status from peers");
448 // query data sync status from each sync peer
449 rgw_http_param_pair params
[] = {
450 { "type", "bucket-index" },
451 { "status", nullptr },
452 { "bucket", bucket_instance
.c_str() },
453 { "source-zone", zone_id
.c_str() },
457 auto p
= peer_status
.begin();
458 for (auto& c
: store
->zone_conn_map
) {
459 using StatusCR
= RGWReadRESTResourceCR
<StatusShards
>;
460 spawn(new StatusCR(cct
, c
.second
, http
, "/admin/log/", params
, &*p
),
464 // in parallel, read the local bucket instance info
465 spawn(new RGWGetBucketInstanceInfoCR(store
->get_async_rados(), store
,
466 bucket_instance
, &bucket_info
),
469 // wait for a response from each peer. all must respond to attempt trim
470 while (num_spawned()) {
472 yield
wait_for_child();
473 collect(&child_ret
, nullptr);
476 return set_cr_error(child_ret
);
480 // determine the minimum marker for each shard
481 retcode
= take_min_status(cct
, peer_status
.begin(), peer_status
.end(),
484 ldout(cct
, 4) << "failed to correlate bucket sync status from peers" << dendl
;
485 return set_cr_error(retcode
);
488 // trim shards with a ShardCollectCR
489 ldout(cct
, 10) << "trimming bilogs for bucket=" << bucket_info
.bucket
490 << " markers=" << min_markers
<< ", shards=" << min_markers
.size() << dendl
;
491 set_status("trimming bilog shards");
492 yield
call(new BucketTrimShardCollectCR(store
, bucket_info
, min_markers
));
493 // ENODATA just means there were no keys to trim
494 if (retcode
== -ENODATA
) {
498 ldout(cct
, 4) << "failed to trim bilog shards: "
499 << cpp_strerror(retcode
) << dendl
;
500 return set_cr_error(retcode
);
503 observer
->on_bucket_trimmed(std::move(bucket_instance
));
504 return set_cr_done();
509 /// trim each bucket instance while limiting the number of concurrent operations
510 class BucketTrimInstanceCollectCR
: public RGWShardCollectCR
{
511 RGWRados
*const store
;
512 RGWHTTPManager
*const http
;
513 BucketTrimObserver
*const observer
;
514 std::vector
<std::string
>::const_iterator bucket
;
515 std::vector
<std::string
>::const_iterator end
;
517 BucketTrimInstanceCollectCR(RGWRados
*store
, RGWHTTPManager
*http
,
518 BucketTrimObserver
*observer
,
519 const std::vector
<std::string
>& buckets
,
521 : RGWShardCollectCR(store
->ctx(), max_concurrent
),
522 store(store
), http(http
), observer(observer
),
523 bucket(buckets
.begin()), end(buckets
.end())
525 bool spawn_next() override
;
528 bool BucketTrimInstanceCollectCR::spawn_next()
533 spawn(new BucketTrimInstanceCR(store
, http
, observer
, *bucket
), false);
538 /// correlate the replies from each peer gateway into the given counter
539 int accumulate_peer_counters(bufferlist
& bl
, BucketChangeCounter
& counter
)
544 // decode notify responses
546 std::map
<std::pair
<uint64_t, uint64_t>, bufferlist
> replies
;
547 std::set
<std::pair
<uint64_t, uint64_t>> timeouts
;
548 ::decode(replies
, p
);
549 ::decode(timeouts
, p
);
551 for (auto& peer
: replies
) {
552 auto q
= peer
.second
.begin();
553 TrimCounters::Response response
;
554 ::decode(response
, q
);
555 for (const auto& b
: response
.bucket_counters
) {
556 counter
.insert(b
.bucket
, b
.count
);
559 } catch (const buffer::error
& e
) {
565 /// metadata callback has the signature bool(string&& key, string&& marker)
566 using MetadataListCallback
= std::function
<bool(std::string
&&, std::string
&&)>;
568 /// lists metadata keys, passing each to a callback until it returns false.
569 /// on reaching the end, it will restart at the beginning and list up to the
571 class AsyncMetadataList
: public RGWAsyncRadosRequest
{
572 CephContext
*const cct
;
573 RGWMetadataManager
*const mgr
;
574 const std::string section
;
575 const std::string start_marker
;
576 MetadataListCallback callback
;
578 int _send_request() override
;
580 AsyncMetadataList(CephContext
*cct
, RGWCoroutine
*caller
,
581 RGWAioCompletionNotifier
*cn
, RGWMetadataManager
*mgr
,
582 const std::string
& section
, const std::string
& start_marker
,
583 const MetadataListCallback
& callback
)
584 : RGWAsyncRadosRequest(caller
, cn
), cct(cct
), mgr(mgr
),
585 section(section
), start_marker(start_marker
), callback(callback
)
589 int AsyncMetadataList::_send_request()
591 void* handle
= nullptr;
592 std::list
<std::string
> keys
;
593 bool truncated
{false};
596 // start a listing at the given marker
597 int r
= mgr
->list_keys_init(section
, start_marker
, &handle
);
599 // restart with empty marker below
601 ldout(cct
, 10) << "failed to init metadata listing: "
602 << cpp_strerror(r
) << dendl
;
605 ldout(cct
, 20) << "starting metadata listing at " << start_marker
<< dendl
;
607 // release the handle when scope exits
608 auto g
= make_scope_guard([=] { mgr
->list_keys_complete(handle
); });
611 // get the next key and marker
612 r
= mgr
->list_keys_next(handle
, 1, keys
, &truncated
);
614 ldout(cct
, 10) << "failed to list metadata: "
615 << cpp_strerror(r
) << dendl
;
618 marker
= mgr
->get_marker(handle
);
621 ceph_assert(keys
.size() == 1);
622 auto& key
= keys
.front();
623 if (!callback(std::move(key
), std::move(marker
))) {
629 if (start_marker
.empty()) {
630 // already listed all keys
635 // restart the listing from the beginning (empty marker)
638 r
= mgr
->list_keys_init(section
, "", &handle
);
640 ldout(cct
, 10) << "failed to restart metadata listing: "
641 << cpp_strerror(r
) << dendl
;
644 ldout(cct
, 20) << "restarting metadata listing" << dendl
;
646 // release the handle when scope exits
647 auto g
= make_scope_guard([=] { mgr
->list_keys_complete(handle
); });
649 // get the next key and marker
650 r
= mgr
->list_keys_next(handle
, 1, keys
, &truncated
);
652 ldout(cct
, 10) << "failed to list metadata: "
653 << cpp_strerror(r
) << dendl
;
656 marker
= mgr
->get_marker(handle
);
659 assert(keys
.size() == 1);
660 auto& key
= keys
.front();
661 // stop at original marker
662 if (marker
>= start_marker
) {
665 if (!callback(std::move(key
), std::move(marker
))) {
674 /// coroutine wrapper for AsyncMetadataList
675 class MetadataListCR
: public RGWSimpleCoroutine
{
676 RGWAsyncRadosProcessor
*const async_rados
;
677 RGWMetadataManager
*const mgr
;
678 const std::string
& section
;
679 const std::string
& start_marker
;
680 MetadataListCallback callback
;
681 RGWAsyncRadosRequest
*req
{nullptr};
683 MetadataListCR(CephContext
*cct
, RGWAsyncRadosProcessor
*async_rados
,
684 RGWMetadataManager
*mgr
, const std::string
& section
,
685 const std::string
& start_marker
,
686 const MetadataListCallback
& callback
)
687 : RGWSimpleCoroutine(cct
), async_rados(async_rados
), mgr(mgr
),
688 section(section
), start_marker(start_marker
), callback(callback
)
690 ~MetadataListCR() override
{
694 int send_request() override
{
695 req
= new AsyncMetadataList(cct
, this, stack
->create_completion_notifier(),
696 mgr
, section
, start_marker
, callback
);
697 async_rados
->queue(req
);
700 int request_complete() override
{
701 return req
->get_ret_status();
703 void request_cleanup() override
{
711 class BucketTrimCR
: public RGWCoroutine
{
712 RGWRados
*const store
;
713 RGWHTTPManager
*const http
;
714 const BucketTrimConfig
& config
;
715 BucketTrimObserver
*const observer
;
716 const rgw_raw_obj
& obj
;
717 ceph::mono_time start_time
;
718 bufferlist notify_replies
;
719 BucketChangeCounter counter
;
720 std::vector
<std::string
> buckets
; //< buckets selected for trim
721 BucketTrimStatus status
;
722 RGWObjVersionTracker objv
; //< version tracker for trim status object
723 std::string last_cold_marker
; //< position for next trim marker
725 static const std::string section
; //< metadata section for bucket instances
727 BucketTrimCR(RGWRados
*store
, RGWHTTPManager
*http
,
728 const BucketTrimConfig
& config
, BucketTrimObserver
*observer
,
729 const rgw_raw_obj
& obj
)
730 : RGWCoroutine(store
->ctx()), store(store
), http(http
), config(config
),
731 observer(observer
), obj(obj
), counter(config
.counter_size
)
737 const std::string
BucketTrimCR::section
{"bucket.instance"};
739 int BucketTrimCR::operate()
742 start_time
= ceph::mono_clock::now();
744 if (config
.buckets_per_interval
) {
745 // query watch/notify for hot buckets
746 ldout(cct
, 10) << "fetching active bucket counters" << dendl
;
747 set_status("fetching active bucket counters");
749 // request the top bucket counters from each peer gateway
750 const TrimNotifyType type
= NotifyTrimCounters
;
751 TrimCounters::Request request
{32};
754 ::encode(request
, bl
);
755 call(new RGWRadosNotifyCR(store
, obj
, bl
, config
.notify_timeout_ms
,
759 ldout(cct
, 10) << "failed to fetch peer bucket counters" << dendl
;
760 return set_cr_error(retcode
);
763 // select the hottest buckets for trim
764 retcode
= accumulate_peer_counters(notify_replies
, counter
);
766 ldout(cct
, 4) << "failed to correlate peer bucket counters" << dendl
;
767 return set_cr_error(retcode
);
769 buckets
.reserve(config
.buckets_per_interval
);
771 const int max_count
= config
.buckets_per_interval
-
772 config
.min_cold_buckets_per_interval
;
773 counter
.get_highest(max_count
,
774 [this] (const std::string
& bucket
, int count
) {
775 buckets
.push_back(bucket
);
779 if (buckets
.size() < config
.buckets_per_interval
) {
780 // read BucketTrimStatus for marker position
781 set_status("reading trim status");
782 using ReadStatus
= RGWSimpleRadosReadCR
<BucketTrimStatus
>;
783 yield
call(new ReadStatus(store
->get_async_rados(), store
, obj
,
784 &status
, true, &objv
));
786 ldout(cct
, 10) << "failed to read bilog trim status: "
787 << cpp_strerror(retcode
) << dendl
;
788 return set_cr_error(retcode
);
790 if (status
.marker
== "MAX") {
791 status
.marker
.clear(); // restart at the beginning
793 ldout(cct
, 10) << "listing cold buckets from marker="
794 << status
.marker
<< dendl
;
796 set_status("listing cold buckets for trim");
798 // capture a reference so 'this' remains valid in the callback
799 auto ref
= boost::intrusive_ptr
<RGWCoroutine
>{this};
800 // list cold buckets to consider for trim
801 auto cb
= [this, ref
] (std::string
&& bucket
, std::string
&& marker
) {
802 // filter out keys that we trimmed recently
803 if (observer
->trimmed_recently(bucket
)) {
806 // filter out active buckets that we've already selected
807 auto i
= std::find(buckets
.begin(), buckets
.end(), bucket
);
808 if (i
!= buckets
.end()) {
811 buckets
.emplace_back(std::move(bucket
));
812 // remember the last cold bucket spawned to update the status marker
813 last_cold_marker
= std::move(marker
);
814 // return true if there's room for more
815 return buckets
.size() < config
.buckets_per_interval
;
818 call(new MetadataListCR(cct
, store
->get_async_rados(), store
->meta_mgr
,
819 section
, status
.marker
, cb
));
822 ldout(cct
, 4) << "failed to list bucket instance metadata: "
823 << cpp_strerror(retcode
) << dendl
;
824 return set_cr_error(retcode
);
828 // trim bucket instances with limited concurrency
829 set_status("trimming buckets");
830 ldout(cct
, 4) << "collected " << buckets
.size() << " buckets for trim" << dendl
;
831 yield
call(new BucketTrimInstanceCollectCR(store
, http
, observer
, buckets
,
832 config
.concurrent_buckets
));
833 // ignore errors from individual buckets
835 // write updated trim status
836 if (!last_cold_marker
.empty() && status
.marker
!= last_cold_marker
) {
837 set_status("writing updated trim status");
838 status
.marker
= std::move(last_cold_marker
);
839 ldout(cct
, 20) << "writing bucket trim marker=" << status
.marker
<< dendl
;
840 using WriteStatus
= RGWSimpleRadosWriteCR
<BucketTrimStatus
>;
841 yield
call(new WriteStatus(store
->get_async_rados(), store
, obj
,
844 ldout(cct
, 4) << "failed to write updated trim status: "
845 << cpp_strerror(retcode
) << dendl
;
846 return set_cr_error(retcode
);
850 // notify peers that trim completed
851 set_status("trim completed");
853 const TrimNotifyType type
= NotifyTrimComplete
;
854 TrimComplete::Request request
;
857 ::encode(request
, bl
);
858 call(new RGWRadosNotifyCR(store
, obj
, bl
, config
.notify_timeout_ms
,
862 ldout(cct
, 10) << "failed to notify peers of trim completion" << dendl
;
863 return set_cr_error(retcode
);
866 ldout(cct
, 4) << "bucket index log processing completed in "
867 << ceph::mono_clock::now() - start_time
<< dendl
;
868 return set_cr_done();
873 class BucketTrimPollCR
: public RGWCoroutine
{
874 RGWRados
*const store
;
875 RGWHTTPManager
*const http
;
876 const BucketTrimConfig
& config
;
877 BucketTrimObserver
*const observer
;
878 const rgw_raw_obj
& obj
;
879 const std::string name
{"trim"}; //< lock name
880 const std::string cookie
;
883 BucketTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
884 const BucketTrimConfig
& config
,
885 BucketTrimObserver
*observer
, const rgw_raw_obj
& obj
)
886 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
887 config(config
), observer(observer
), obj(obj
),
888 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
))
894 int BucketTrimPollCR::operate()
898 set_status("sleeping");
899 wait(utime_t
{config
.trim_interval_sec
, 0});
901 // prevent others from trimming for our entire wait interval
902 set_status("acquiring trim lock");
903 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
905 config
.trim_interval_sec
));
907 ldout(cct
, 4) << "failed to lock: " << cpp_strerror(retcode
) << dendl
;
911 set_status("trimming");
912 yield
call(new BucketTrimCR(store
, http
, config
, observer
, obj
));
914 // on errors, unlock so other gateways can try
915 set_status("unlocking");
916 yield
call(new RGWSimpleRadosUnlockCR(store
->get_async_rados(), store
,
924 /// tracks a bounded list of events with timestamps. old events can be expired,
925 /// and recent events can be searched by key. expiration depends on events being
926 /// inserted in temporal order
927 template <typename T
, typename Clock
= ceph::coarse_mono_clock
>
928 class RecentEventList
{
930 using clock_type
= Clock
;
931 using time_point
= typename
clock_type::time_point
;
933 RecentEventList(size_t max_size
, const ceph::timespan
& max_duration
)
934 : events(max_size
), max_duration(max_duration
)
937 /// insert an event at the given point in time. this time must be at least as
938 /// recent as the last inserted event
939 void insert(T
&& value
, const time_point
& now
) {
940 // assert(events.empty() || now >= events.back().time)
941 events
.push_back(Event
{std::move(value
), now
});
944 /// performs a linear search for an event matching the given key, whose type
945 /// U can be any that provides operator==(U, T)
946 template <typename U
>
947 bool lookup(const U
& key
) const {
948 for (const auto& event
: events
) {
949 if (key
== event
.value
) {
956 /// remove events that are no longer recent compared to the given point in time
957 void expire_old(const time_point
& now
) {
958 const auto expired_before
= now
- max_duration
;
959 while (!events
.empty() && events
.front().time
< expired_before
) {
969 boost::circular_buffer
<Event
> events
;
970 const ceph::timespan max_duration
;
975 // read bucket trim configuration from ceph context
976 void configure_bucket_trim(CephContext
*cct
, BucketTrimConfig
& config
)
978 auto conf
= cct
->_conf
;
980 config
.trim_interval_sec
=
981 conf
->get_val
<int64_t>("rgw_sync_log_trim_interval");
982 config
.counter_size
= 512;
983 config
.buckets_per_interval
=
984 conf
->get_val
<int64_t>("rgw_sync_log_trim_max_buckets");
985 config
.min_cold_buckets_per_interval
=
986 conf
->get_val
<int64_t>("rgw_sync_log_trim_min_cold_buckets");
987 config
.concurrent_buckets
=
988 conf
->get_val
<int64_t>("rgw_sync_log_trim_concurrent_buckets");
989 config
.notify_timeout_ms
= 10000;
990 config
.recent_size
= 128;
991 config
.recent_duration
= std::chrono::hours(2);
994 class BucketTrimManager::Impl
: public TrimCounters::Server
,
995 public BucketTrimObserver
{
997 RGWRados
*const store
;
998 const BucketTrimConfig config
;
1000 const rgw_raw_obj status_obj
;
1002 /// count frequency of bucket instance entries in the data changes log
1003 BucketChangeCounter counter
;
1005 using RecentlyTrimmedBucketList
= RecentEventList
<std::string
>;
1006 using clock_type
= RecentlyTrimmedBucketList::clock_type
;
1007 /// track recently trimmed buckets to focus trim activity elsewhere
1008 RecentlyTrimmedBucketList trimmed
;
1010 /// serve the bucket trim watch/notify api
1011 BucketTrimWatcher watcher
;
1013 /// protect data shared between data sync, trim, and watch/notify threads
1016 Impl(RGWRados
*store
, const BucketTrimConfig
& config
)
1017 : store(store
), config(config
),
1018 status_obj(store
->get_zone_params().log_pool
, BucketTrimStatus::oid
),
1019 counter(config
.counter_size
),
1020 trimmed(config
.recent_size
, config
.recent_duration
),
1021 watcher(store
, status_obj
, this)
1024 /// TrimCounters::Server interface for watch/notify api
1025 void get_bucket_counters(int count
, TrimCounters::Vector
& buckets
) {
1026 buckets
.reserve(count
);
1027 std::lock_guard
<std::mutex
> lock(mutex
);
1028 counter
.get_highest(count
, [&buckets
] (const std::string
& key
, int count
) {
1029 buckets
.emplace_back(key
, count
);
1031 ldout(store
->ctx(), 20) << "get_bucket_counters: " << buckets
<< dendl
;
1034 void reset_bucket_counters() override
{
1035 ldout(store
->ctx(), 20) << "bucket trim completed" << dendl
;
1036 std::lock_guard
<std::mutex
> lock(mutex
);
1038 trimmed
.expire_old(clock_type::now());
1041 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1042 void on_bucket_trimmed(std::string
&& bucket_instance
) override
{
1043 ldout(store
->ctx(), 20) << "trimmed bucket instance " << bucket_instance
<< dendl
;
1044 std::lock_guard
<std::mutex
> lock(mutex
);
1045 trimmed
.insert(std::move(bucket_instance
), clock_type::now());
1048 bool trimmed_recently(const boost::string_view
& bucket_instance
) override
{
1049 std::lock_guard
<std::mutex
> lock(mutex
);
1050 return trimmed
.lookup(bucket_instance
);
1054 BucketTrimManager::BucketTrimManager(RGWRados
*store
,
1055 const BucketTrimConfig
& config
)
1056 : impl(new Impl(store
, config
))
1059 BucketTrimManager::~BucketTrimManager() = default;
1061 int BucketTrimManager::init()
1063 return impl
->watcher
.start();
1066 void BucketTrimManager::on_bucket_changed(const boost::string_view
& bucket
)
1068 std::lock_guard
<std::mutex
> lock(impl
->mutex
);
1069 // filter recently trimmed bucket instances out of bucket change counter
1070 if (impl
->trimmed
.lookup(bucket
)) {
1073 impl
->counter
.insert(bucket
.to_string());
1076 RGWCoroutine
* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager
*http
)
1078 return new BucketTrimPollCR(impl
->store
, http
, impl
->config
,
1079 impl
.get(), impl
->status_obj
);
1082 RGWCoroutine
* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager
*http
)
1084 // return the trim coroutine without any polling
1085 return new BucketTrimCR(impl
->store
, http
, impl
->config
,
1086 impl
.get(), impl
->status_obj
);