1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * Author: Casey Bodley <cbodley@redhat.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include <boost/circular_buffer.hpp>
18 #include <boost/container/flat_map.hpp>
20 #include "common/bounded_key_counter.h"
21 #include "common/errno.h"
22 #include "rgw_sync_log_trim.h"
23 #include "rgw_cr_rados.h"
24 #include "rgw_cr_rest.h"
25 #include "rgw_data_sync.h"
26 #include "rgw_metadata.h"
27 #include "rgw_rados.h"
30 #include <boost/asio/yield.hpp>
31 #include "include/assert.h"
33 #define dout_subsys ceph_subsys_rgw
36 #define dout_prefix (*_dout << "trim: ")
38 using rgw::BucketTrimConfig
;
39 using BucketChangeCounter
= BoundedKeyCounter
<std::string
, int>;
41 const std::string
rgw::BucketTrimStatus::oid
= "bilog.trim";
42 using rgw::BucketTrimStatus
;
45 // watch/notify api for gateways to coordinate about which buckets to trim
47 NotifyTrimCounters
= 0,
50 WRITE_RAW_ENCODER(TrimNotifyType
);
52 struct TrimNotifyHandler
{
53 virtual ~TrimNotifyHandler() = default;
55 virtual void handle(bufferlist::iterator
& input
, bufferlist
& output
) = 0;
58 /// api to share the bucket trim counters between gateways in the same zone.
59 /// each gateway will process different datalog shards, so the gateway that runs
60 /// the trim process needs to accumulate their counters
62 /// counter for a single bucket
63 struct BucketCounter
{
64 std::string bucket
; //< bucket instance metadata key
67 BucketCounter() = default;
68 BucketCounter(const std::string
& bucket
, int count
)
69 : bucket(bucket
), count(count
) {}
71 void encode(bufferlist
& bl
) const;
72 void decode(bufferlist::iterator
& p
);
74 using Vector
= std::vector
<BucketCounter
>;
76 /// request bucket trim counters from peer gateways
78 uint16_t max_buckets
; //< maximum number of bucket counters to return
80 void encode(bufferlist
& bl
) const;
81 void decode(bufferlist::iterator
& p
);
84 /// return the current bucket trim counters
86 Vector bucket_counters
;
88 void encode(bufferlist
& bl
) const;
89 void decode(bufferlist::iterator
& p
);
92 /// server interface to query the hottest buckets
94 virtual ~Server() = default;
96 virtual void get_bucket_counters(int count
, Vector
& counters
) = 0;
97 virtual void reset_bucket_counters() = 0;
101 class Handler
: public TrimNotifyHandler
{
102 Server
*const server
;
104 Handler(Server
*server
) : server(server
) {}
106 void handle(bufferlist::iterator
& input
, bufferlist
& output
) override
;
109 std::ostream
& operator<<(std::ostream
& out
, const TrimCounters::BucketCounter
& rhs
)
111 return out
<< rhs
.bucket
<< ":" << rhs
.count
;
114 void TrimCounters::BucketCounter::encode(bufferlist
& bl
) const
116 // no versioning to save space
117 ::encode(bucket
, bl
);
120 void TrimCounters::BucketCounter::decode(bufferlist::iterator
& p
)
125 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter
);
127 void TrimCounters::Request::encode(bufferlist
& bl
) const
129 ENCODE_START(1, 1, bl
);
130 ::encode(max_buckets
, bl
);
133 void TrimCounters::Request::decode(bufferlist::iterator
& p
)
136 ::decode(max_buckets
, p
);
139 WRITE_CLASS_ENCODER(TrimCounters::Request
);
141 void TrimCounters::Response::encode(bufferlist
& bl
) const
143 ENCODE_START(1, 1, bl
);
144 ::encode(bucket_counters
, bl
);
147 void TrimCounters::Response::decode(bufferlist::iterator
& p
)
150 ::decode(bucket_counters
, p
);
153 WRITE_CLASS_ENCODER(TrimCounters::Response
);
155 void TrimCounters::Handler::handle(bufferlist::iterator
& input
,
159 ::decode(request
, input
);
160 auto count
= std::min
<uint16_t>(request
.max_buckets
, 128);
163 server
->get_bucket_counters(count
, response
.bucket_counters
);
164 ::encode(response
, output
);
167 /// api to notify peer gateways that trim has completed and their bucket change
168 /// counters can be reset
169 struct TrimComplete
{
171 void encode(bufferlist
& bl
) const;
172 void decode(bufferlist::iterator
& p
);
175 void encode(bufferlist
& bl
) const;
176 void decode(bufferlist::iterator
& p
);
179 /// server interface to reset bucket counters
180 using Server
= TrimCounters::Server
;
183 class Handler
: public TrimNotifyHandler
{
184 Server
*const server
;
186 Handler(Server
*server
) : server(server
) {}
188 void handle(bufferlist::iterator
& input
, bufferlist
& output
) override
;
192 void TrimComplete::Request::encode(bufferlist
& bl
) const
194 ENCODE_START(1, 1, bl
);
197 void TrimComplete::Request::decode(bufferlist::iterator
& p
)
202 WRITE_CLASS_ENCODER(TrimComplete::Request
);
204 void TrimComplete::Response::encode(bufferlist
& bl
) const
206 ENCODE_START(1, 1, bl
);
209 void TrimComplete::Response::decode(bufferlist::iterator
& p
)
214 WRITE_CLASS_ENCODER(TrimComplete::Response
);
216 void TrimComplete::Handler::handle(bufferlist::iterator
& input
,
220 ::decode(request
, input
);
222 server
->reset_bucket_counters();
225 ::encode(response
, output
);
229 /// rados watcher for bucket trim notifications
230 class BucketTrimWatcher
: public librados::WatchCtx2
{
231 RGWRados
*const store
;
232 const rgw_raw_obj
& obj
;
236 using HandlerPtr
= std::unique_ptr
<TrimNotifyHandler
>;
237 boost::container::flat_map
<TrimNotifyType
, HandlerPtr
> handlers
;
240 BucketTrimWatcher(RGWRados
*store
, const rgw_raw_obj
& obj
,
241 TrimCounters::Server
*counters
)
242 : store(store
), obj(obj
) {
243 handlers
.emplace(NotifyTrimCounters
, new TrimCounters::Handler(counters
));
244 handlers
.emplace(NotifyTrimComplete
, new TrimComplete::Handler(counters
));
247 ~BucketTrimWatcher() {
252 int r
= store
->get_raw_obj_ref(obj
, &ref
);
257 // register a watch on the realm's control object
258 r
= ref
.ioctx
.watch2(ref
.oid
, &handle
, this);
260 constexpr bool exclusive
= true;
261 r
= ref
.ioctx
.create(ref
.oid
, exclusive
);
262 if (r
== -EEXIST
|| r
== 0) {
263 r
= ref
.ioctx
.watch2(ref
.oid
, &handle
, this);
267 lderr(store
->ctx()) << "Failed to watch " << ref
.oid
268 << " with " << cpp_strerror(-r
) << dendl
;
273 ldout(store
->ctx(), 10) << "Watching " << ref
.oid
<< dendl
;
278 int r
= ref
.ioctx
.unwatch2(handle
);
280 lderr(store
->ctx()) << "Failed to unwatch on " << ref
.oid
281 << " with " << cpp_strerror(-r
) << dendl
;
283 r
= ref
.ioctx
.watch2(ref
.oid
, &handle
, this);
285 lderr(store
->ctx()) << "Failed to restart watch on " << ref
.oid
286 << " with " << cpp_strerror(-r
) << dendl
;
294 ref
.ioctx
.unwatch2(handle
);
299 /// respond to bucket trim notifications
300 void handle_notify(uint64_t notify_id
, uint64_t cookie
,
301 uint64_t notifier_id
, bufferlist
& bl
) override
{
302 if (cookie
!= handle
) {
311 auto handler
= handlers
.find(type
);
312 if (handler
!= handlers
.end()) {
313 handler
->second
->handle(p
, reply
);
315 lderr(store
->ctx()) << "no handler for notify type " << type
<< dendl
;
317 } catch (const buffer::error
& e
) {
318 lderr(store
->ctx()) << "Failed to decode notification: " << e
.what() << dendl
;
320 ref
.ioctx
.notify_ack(ref
.oid
, notify_id
, cookie
, reply
);
323 /// reestablish the watch if it gets disconnected
324 void handle_error(uint64_t cookie
, int err
) override
{
325 if (cookie
!= handle
) {
328 if (err
== -ENOTCONN
) {
329 ldout(store
->ctx(), 4) << "Disconnected watch on " << ref
.oid
<< dendl
;
336 /// Interface to communicate with the trim manager about completed operations
337 struct BucketTrimObserver
{
338 virtual ~BucketTrimObserver() = default;
340 virtual void on_bucket_trimmed(std::string
&& bucket_instance
) = 0;
341 virtual bool trimmed_recently(const boost::string_view
& bucket_instance
) = 0;
344 /// populate the status with the minimum stable marker of each shard
345 template <typename Iter
>
346 int take_min_status(CephContext
*cct
, Iter first
, Iter last
,
347 std::vector
<std::string
> *status
)
350 boost::optional
<size_t> num_shards
;
351 for (auto peer
= first
; peer
!= last
; ++peer
) {
352 const size_t peer_shards
= peer
->size();
354 num_shards
= peer_shards
;
355 status
->resize(*num_shards
);
356 } else if (*num_shards
!= peer_shards
) {
357 // all peers must agree on the number of shards
360 auto m
= status
->begin();
361 for (auto& shard
: *peer
) {
363 // only consider incremental sync markers
364 if (shard
.state
!= rgw_bucket_shard_sync_info::StateIncrementalSync
) {
367 // always take the first marker, or any later marker that's smaller
368 if (peer
== first
|| marker
> shard
.inc_marker
.position
) {
369 marker
= std::move(shard
.inc_marker
.position
);
376 /// trim each bilog shard to the given marker, while limiting the number of
377 /// concurrent requests
378 class BucketTrimShardCollectCR
: public RGWShardCollectCR
{
379 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
380 RGWRados
*const store
;
381 const RGWBucketInfo
& bucket_info
;
382 const std::vector
<std::string
>& markers
; //< shard markers to trim
383 size_t i
{0}; //< index of current shard marker
385 BucketTrimShardCollectCR(RGWRados
*store
, const RGWBucketInfo
& bucket_info
,
386 const std::vector
<std::string
>& markers
)
387 : RGWShardCollectCR(store
->ctx(), MAX_CONCURRENT_SHARDS
),
388 store(store
), bucket_info(bucket_info
), markers(markers
)
390 bool spawn_next() override
;
393 bool BucketTrimShardCollectCR::spawn_next()
395 while (i
< markers
.size()) {
396 const auto& marker
= markers
[i
];
397 const auto shard_id
= i
++;
399 // skip empty markers
400 if (!marker
.empty()) {
401 ldout(cct
, 10) << "trimming bilog shard " << shard_id
402 << " of " << bucket_info
.bucket
<< " at marker " << marker
<< dendl
;
403 spawn(new RGWRadosBILogTrimCR(store
, bucket_info
, shard_id
,
404 std::string
{}, marker
),
412 /// trim the bilog of all of the given bucket instance's shards
413 class BucketTrimInstanceCR
: public RGWCoroutine
{
414 RGWRados
*const store
;
415 RGWHTTPManager
*const http
;
416 BucketTrimObserver
*const observer
;
417 std::string bucket_instance
;
418 const std::string
& zone_id
; //< my zone id
419 RGWBucketInfo bucket_info
; //< bucket instance info to locate bucket indices
421 using StatusShards
= std::vector
<rgw_bucket_shard_sync_info
>;
422 std::vector
<StatusShards
> peer_status
; //< sync status for each peer
423 std::vector
<std::string
> min_markers
; //< min marker per shard
426 BucketTrimInstanceCR(RGWRados
*store
, RGWHTTPManager
*http
,
427 BucketTrimObserver
*observer
,
428 const std::string
& bucket_instance
)
429 : RGWCoroutine(store
->ctx()), store(store
),
430 http(http
), observer(observer
),
431 bucket_instance(bucket_instance
),
432 zone_id(store
->get_zone().id
),
433 peer_status(store
->zone_conn_map
.size())
436 int operate() override
;
439 int BucketTrimInstanceCR::operate()
442 ldout(cct
, 4) << "starting trim on bucket=" << bucket_instance
<< dendl
;
444 // query peers for sync status
445 set_status("fetching sync status from peers");
447 // query data sync status from each sync peer
448 rgw_http_param_pair params
[] = {
449 { "type", "bucket-index" },
450 { "status", nullptr },
451 { "bucket", bucket_instance
.c_str() },
452 { "source-zone", zone_id
.c_str() },
456 auto p
= peer_status
.begin();
457 for (auto& c
: store
->zone_conn_map
) {
458 using StatusCR
= RGWReadRESTResourceCR
<StatusShards
>;
459 spawn(new StatusCR(cct
, c
.second
, http
, "/admin/log/", params
, &*p
),
463 // in parallel, read the local bucket instance info
464 spawn(new RGWGetBucketInstanceInfoCR(store
->get_async_rados(), store
,
465 bucket_instance
, &bucket_info
),
468 // wait for a response from each peer. all must respond to attempt trim
469 while (num_spawned()) {
471 yield
wait_for_child();
472 collect(&child_ret
, nullptr);
475 return set_cr_error(child_ret
);
479 // determine the minimum marker for each shard
480 retcode
= take_min_status(cct
, peer_status
.begin(), peer_status
.end(),
483 ldout(cct
, 4) << "failed to correlate bucket sync status from peers" << dendl
;
484 return set_cr_error(retcode
);
487 // trim shards with a ShardCollectCR
488 ldout(cct
, 10) << "trimming bilogs for bucket=" << bucket_info
.bucket
489 << " markers=" << min_markers
<< ", shards=" << min_markers
.size() << dendl
;
490 set_status("trimming bilog shards");
491 yield
call(new BucketTrimShardCollectCR(store
, bucket_info
, min_markers
));
492 // ENODATA just means there were no keys to trim
493 if (retcode
== -ENODATA
) {
497 ldout(cct
, 4) << "failed to trim bilog shards: "
498 << cpp_strerror(retcode
) << dendl
;
499 return set_cr_error(retcode
);
502 observer
->on_bucket_trimmed(std::move(bucket_instance
));
503 return set_cr_done();
508 /// trim each bucket instance while limiting the number of concurrent operations
509 class BucketTrimInstanceCollectCR
: public RGWShardCollectCR
{
510 RGWRados
*const store
;
511 RGWHTTPManager
*const http
;
512 BucketTrimObserver
*const observer
;
513 std::vector
<std::string
>::const_iterator bucket
;
514 std::vector
<std::string
>::const_iterator end
;
516 BucketTrimInstanceCollectCR(RGWRados
*store
, RGWHTTPManager
*http
,
517 BucketTrimObserver
*observer
,
518 const std::vector
<std::string
>& buckets
,
520 : RGWShardCollectCR(store
->ctx(), max_concurrent
),
521 store(store
), http(http
), observer(observer
),
522 bucket(buckets
.begin()), end(buckets
.end())
524 bool spawn_next() override
;
527 bool BucketTrimInstanceCollectCR::spawn_next()
532 spawn(new BucketTrimInstanceCR(store
, http
, observer
, *bucket
), false);
537 /// correlate the replies from each peer gateway into the given counter
538 int accumulate_peer_counters(bufferlist
& bl
, BucketChangeCounter
& counter
)
543 // decode notify responses
545 std::map
<std::pair
<uint64_t, uint64_t>, bufferlist
> replies
;
546 std::set
<std::pair
<uint64_t, uint64_t>> timeouts
;
547 ::decode(replies
, p
);
548 ::decode(timeouts
, p
);
550 for (auto& peer
: replies
) {
551 auto q
= peer
.second
.begin();
552 TrimCounters::Response response
;
553 ::decode(response
, q
);
554 for (const auto& b
: response
.bucket_counters
) {
555 counter
.insert(b
.bucket
, b
.count
);
558 } catch (const buffer::error
& e
) {
564 /// metadata callback has the signature bool(string&& key, string&& marker)
565 using MetadataListCallback
= std::function
<bool(std::string
&&, std::string
&&)>;
567 /// lists metadata keys, passing each to a callback until it returns false.
568 /// on reaching the end, it will restart at the beginning and list up to the
570 class AsyncMetadataList
: public RGWAsyncRadosRequest
{
571 CephContext
*const cct
;
572 RGWMetadataManager
*const mgr
;
573 const std::string section
;
574 const std::string start_marker
;
575 MetadataListCallback callback
;
576 void *handle
{nullptr};
578 int _send_request() override
;
580 AsyncMetadataList(CephContext
*cct
, RGWCoroutine
*caller
,
581 RGWAioCompletionNotifier
*cn
, RGWMetadataManager
*mgr
,
582 const std::string
& section
, const std::string
& start_marker
,
583 const MetadataListCallback
& callback
)
584 : RGWAsyncRadosRequest(caller
, cn
), cct(cct
), mgr(mgr
),
585 section(section
), start_marker(start_marker
), callback(callback
)
587 ~AsyncMetadataList() override
{
589 mgr
->list_keys_complete(handle
);
594 int AsyncMetadataList::_send_request()
596 // start a listing at the given marker
597 int r
= mgr
->list_keys_init(section
, start_marker
, &handle
);
599 ldout(cct
, 10) << "failed to init metadata listing: "
600 << cpp_strerror(r
) << dendl
;
603 ldout(cct
, 20) << "starting metadata listing at " << start_marker
<< dendl
;
605 std::list
<std::string
> keys
;
606 bool truncated
{false};
610 // get the next key and marker
611 r
= mgr
->list_keys_next(handle
, 1, keys
, &truncated
);
613 ldout(cct
, 10) << "failed to list metadata: "
614 << cpp_strerror(r
) << dendl
;
617 marker
= mgr
->get_marker(handle
);
620 assert(keys
.size() == 1);
621 auto& key
= keys
.front();
622 if (!callback(std::move(key
), std::move(marker
))) {
628 if (start_marker
.empty()) {
629 // already listed all keys
633 // restart the listing from the beginning (empty marker)
634 mgr
->list_keys_complete(handle
);
637 r
= mgr
->list_keys_init(section
, "", &handle
);
639 ldout(cct
, 10) << "failed to restart metadata listing: "
640 << cpp_strerror(r
) << dendl
;
643 ldout(cct
, 20) << "restarting metadata listing" << dendl
;
646 // get the next key and marker
647 r
= mgr
->list_keys_next(handle
, 1, keys
, &truncated
);
649 ldout(cct
, 10) << "failed to list metadata: "
650 << cpp_strerror(r
) << dendl
;
653 marker
= mgr
->get_marker(handle
);
656 assert(keys
.size() == 1);
657 auto& key
= keys
.front();
658 // stop at original marker
659 if (marker
>= start_marker
) {
662 if (!callback(std::move(key
), std::move(marker
))) {
671 /// coroutine wrapper for AsyncMetadataList
672 class MetadataListCR
: public RGWSimpleCoroutine
{
673 RGWAsyncRadosProcessor
*const async_rados
;
674 RGWMetadataManager
*const mgr
;
675 const std::string
& section
;
676 const std::string
& start_marker
;
677 MetadataListCallback callback
;
678 RGWAsyncRadosRequest
*req
{nullptr};
680 MetadataListCR(CephContext
*cct
, RGWAsyncRadosProcessor
*async_rados
,
681 RGWMetadataManager
*mgr
, const std::string
& section
,
682 const std::string
& start_marker
,
683 const MetadataListCallback
& callback
)
684 : RGWSimpleCoroutine(cct
), async_rados(async_rados
), mgr(mgr
),
685 section(section
), start_marker(start_marker
), callback(callback
)
687 ~MetadataListCR() override
{
691 int send_request() override
{
692 req
= new AsyncMetadataList(cct
, this, stack
->create_completion_notifier(),
693 mgr
, section
, start_marker
, callback
);
694 async_rados
->queue(req
);
697 int request_complete() override
{
698 return req
->get_ret_status();
700 void request_cleanup() override
{
708 class BucketTrimCR
: public RGWCoroutine
{
709 RGWRados
*const store
;
710 RGWHTTPManager
*const http
;
711 const BucketTrimConfig
& config
;
712 BucketTrimObserver
*const observer
;
713 const rgw_raw_obj
& obj
;
714 ceph::mono_time start_time
;
715 bufferlist notify_replies
;
716 BucketChangeCounter counter
;
717 std::vector
<std::string
> buckets
; //< buckets selected for trim
718 BucketTrimStatus status
;
719 RGWObjVersionTracker objv
; //< version tracker for trim status object
720 std::string last_cold_marker
; //< position for next trim marker
722 static const std::string section
; //< metadata section for bucket instances
724 BucketTrimCR(RGWRados
*store
, RGWHTTPManager
*http
,
725 const BucketTrimConfig
& config
, BucketTrimObserver
*observer
,
726 const rgw_raw_obj
& obj
)
727 : RGWCoroutine(store
->ctx()), store(store
), http(http
), config(config
),
728 observer(observer
), obj(obj
), counter(config
.counter_size
)
734 const std::string
BucketTrimCR::section
{"bucket.instance"};
736 int BucketTrimCR::operate()
739 start_time
= ceph::mono_clock::now();
741 if (config
.buckets_per_interval
) {
742 // query watch/notify for hot buckets
743 ldout(cct
, 10) << "fetching active bucket counters" << dendl
;
744 set_status("fetching active bucket counters");
746 // request the top bucket counters from each peer gateway
747 const TrimNotifyType type
= NotifyTrimCounters
;
748 TrimCounters::Request request
{32};
751 ::encode(request
, bl
);
752 call(new RGWRadosNotifyCR(store
, obj
, bl
, config
.notify_timeout_ms
,
756 ldout(cct
, 10) << "failed to fetch peer bucket counters" << dendl
;
757 return set_cr_error(retcode
);
760 // select the hottest buckets for trim
761 retcode
= accumulate_peer_counters(notify_replies
, counter
);
763 ldout(cct
, 4) << "failed to correlate peer bucket counters" << dendl
;
764 return set_cr_error(retcode
);
766 buckets
.reserve(config
.buckets_per_interval
);
768 const int max_count
= config
.buckets_per_interval
-
769 config
.min_cold_buckets_per_interval
;
770 counter
.get_highest(max_count
,
771 [this] (const std::string
& bucket
, int count
) {
772 buckets
.push_back(bucket
);
776 if (buckets
.size() < config
.buckets_per_interval
) {
777 // read BucketTrimStatus for marker position
778 set_status("reading trim status");
779 using ReadStatus
= RGWSimpleRadosReadCR
<BucketTrimStatus
>;
780 yield
call(new ReadStatus(store
->get_async_rados(), store
, obj
,
781 &status
, true, &objv
));
783 ldout(cct
, 10) << "failed to read bilog trim status: "
784 << cpp_strerror(retcode
) << dendl
;
785 return set_cr_error(retcode
);
787 if (status
.marker
== "MAX") {
788 status
.marker
.clear(); // restart at the beginning
790 ldout(cct
, 10) << "listing cold buckets from marker="
791 << status
.marker
<< dendl
;
793 set_status("listing cold buckets for trim");
795 // capture a reference so 'this' remains valid in the callback
796 auto ref
= boost::intrusive_ptr
<RGWCoroutine
>{this};
797 // list cold buckets to consider for trim
798 auto cb
= [this, ref
] (std::string
&& bucket
, std::string
&& marker
) {
799 // filter out keys that we trimmed recently
800 if (observer
->trimmed_recently(bucket
)) {
803 // filter out active buckets that we've already selected
804 auto i
= std::find(buckets
.begin(), buckets
.end(), bucket
);
805 if (i
!= buckets
.end()) {
808 buckets
.emplace_back(std::move(bucket
));
809 // remember the last cold bucket spawned to update the status marker
810 last_cold_marker
= std::move(marker
);
811 // return true if there's room for more
812 return buckets
.size() < config
.buckets_per_interval
;
815 call(new MetadataListCR(cct
, store
->get_async_rados(), store
->meta_mgr
,
816 section
, status
.marker
, cb
));
819 ldout(cct
, 4) << "failed to list bucket instance metadata: "
820 << cpp_strerror(retcode
) << dendl
;
821 return set_cr_error(retcode
);
825 // trim bucket instances with limited concurrency
826 set_status("trimming buckets");
827 ldout(cct
, 4) << "collected " << buckets
.size() << " buckets for trim" << dendl
;
828 yield
call(new BucketTrimInstanceCollectCR(store
, http
, observer
, buckets
,
829 config
.concurrent_buckets
));
830 // ignore errors from individual buckets
832 // write updated trim status
833 if (!last_cold_marker
.empty() && status
.marker
!= last_cold_marker
) {
834 set_status("writing updated trim status");
835 status
.marker
= std::move(last_cold_marker
);
836 ldout(cct
, 20) << "writing bucket trim marker=" << status
.marker
<< dendl
;
837 using WriteStatus
= RGWSimpleRadosWriteCR
<BucketTrimStatus
>;
838 yield
call(new WriteStatus(store
->get_async_rados(), store
, obj
,
841 ldout(cct
, 4) << "failed to write updated trim status: "
842 << cpp_strerror(retcode
) << dendl
;
843 return set_cr_error(retcode
);
847 // notify peers that trim completed
848 set_status("trim completed");
850 const TrimNotifyType type
= NotifyTrimComplete
;
851 TrimComplete::Request request
;
854 ::encode(request
, bl
);
855 call(new RGWRadosNotifyCR(store
, obj
, bl
, config
.notify_timeout_ms
,
859 ldout(cct
, 10) << "failed to notify peers of trim completion" << dendl
;
860 return set_cr_error(retcode
);
863 ldout(cct
, 4) << "bucket index log processing completed in "
864 << ceph::mono_clock::now() - start_time
<< dendl
;
865 return set_cr_done();
870 class BucketTrimPollCR
: public RGWCoroutine
{
871 RGWRados
*const store
;
872 RGWHTTPManager
*const http
;
873 const BucketTrimConfig
& config
;
874 BucketTrimObserver
*const observer
;
875 const rgw_raw_obj
& obj
;
876 const std::string name
{"trim"}; //< lock name
877 const std::string cookie
;
880 BucketTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
881 const BucketTrimConfig
& config
,
882 BucketTrimObserver
*observer
, const rgw_raw_obj
& obj
)
883 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
884 config(config
), observer(observer
), obj(obj
),
885 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
))
891 int BucketTrimPollCR::operate()
895 set_status("sleeping");
896 wait(utime_t
{config
.trim_interval_sec
, 0});
898 // prevent others from trimming for our entire wait interval
899 set_status("acquiring trim lock");
900 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
902 config
.trim_interval_sec
));
904 ldout(cct
, 4) << "failed to lock: " << cpp_strerror(retcode
) << dendl
;
908 set_status("trimming");
909 yield
call(new BucketTrimCR(store
, http
, config
, observer
, obj
));
911 // on errors, unlock so other gateways can try
912 set_status("unlocking");
913 yield
call(new RGWSimpleRadosUnlockCR(store
->get_async_rados(), store
,
921 /// tracks a bounded list of events with timestamps. old events can be expired,
922 /// and recent events can be searched by key. expiration depends on events being
923 /// inserted in temporal order
924 template <typename T
, typename Clock
= ceph::coarse_mono_clock
>
925 class RecentEventList
{
927 using clock_type
= Clock
;
928 using time_point
= typename
clock_type::time_point
;
930 RecentEventList(size_t max_size
, const ceph::timespan
& max_duration
)
931 : events(max_size
), max_duration(max_duration
)
934 /// insert an event at the given point in time. this time must be at least as
935 /// recent as the last inserted event
936 void insert(T
&& value
, const time_point
& now
) {
937 // assert(events.empty() || now >= events.back().time)
938 events
.push_back(Event
{std::move(value
), now
});
941 /// performs a linear search for an event matching the given key, whose type
942 /// U can be any that provides operator==(U, T)
943 template <typename U
>
944 bool lookup(const U
& key
) const {
945 for (const auto& event
: events
) {
946 if (key
== event
.value
) {
953 /// remove events that are no longer recent compared to the given point in time
954 void expire_old(const time_point
& now
) {
955 const auto expired_before
= now
- max_duration
;
956 while (!events
.empty() && events
.front().time
< expired_before
) {
966 boost::circular_buffer
<Event
> events
;
967 const ceph::timespan max_duration
;
972 // read bucket trim configuration from ceph context
973 void configure_bucket_trim(CephContext
*cct
, BucketTrimConfig
& config
)
975 auto conf
= cct
->_conf
;
977 config
.trim_interval_sec
=
978 conf
->get_val
<int64_t>("rgw_sync_log_trim_interval");
979 config
.counter_size
= 512;
980 config
.buckets_per_interval
=
981 conf
->get_val
<int64_t>("rgw_sync_log_trim_max_buckets");
982 config
.min_cold_buckets_per_interval
=
983 conf
->get_val
<int64_t>("rgw_sync_log_trim_min_cold_buckets");
984 config
.concurrent_buckets
=
985 conf
->get_val
<int64_t>("rgw_sync_log_trim_concurrent_buckets");
986 config
.notify_timeout_ms
= 10000;
987 config
.recent_size
= 128;
988 config
.recent_duration
= std::chrono::hours(2);
991 class BucketTrimManager::Impl
: public TrimCounters::Server
,
992 public BucketTrimObserver
{
994 RGWRados
*const store
;
995 const BucketTrimConfig config
;
997 const rgw_raw_obj status_obj
;
999 /// count frequency of bucket instance entries in the data changes log
1000 BucketChangeCounter counter
;
1002 using RecentlyTrimmedBucketList
= RecentEventList
<std::string
>;
1003 using clock_type
= RecentlyTrimmedBucketList::clock_type
;
1004 /// track recently trimmed buckets to focus trim activity elsewhere
1005 RecentlyTrimmedBucketList trimmed
;
1007 /// serve the bucket trim watch/notify api
1008 BucketTrimWatcher watcher
;
1010 /// protect data shared between data sync, trim, and watch/notify threads
1013 Impl(RGWRados
*store
, const BucketTrimConfig
& config
)
1014 : store(store
), config(config
),
1015 status_obj(store
->get_zone_params().log_pool
, BucketTrimStatus::oid
),
1016 counter(config
.counter_size
),
1017 trimmed(config
.recent_size
, config
.recent_duration
),
1018 watcher(store
, status_obj
, this)
1021 /// TrimCounters::Server interface for watch/notify api
1022 void get_bucket_counters(int count
, TrimCounters::Vector
& buckets
) {
1023 buckets
.reserve(count
);
1024 std::lock_guard
<std::mutex
> lock(mutex
);
1025 counter
.get_highest(count
, [&buckets
] (const std::string
& key
, int count
) {
1026 buckets
.emplace_back(key
, count
);
1028 ldout(store
->ctx(), 20) << "get_bucket_counters: " << buckets
<< dendl
;
1031 void reset_bucket_counters() override
{
1032 ldout(store
->ctx(), 20) << "bucket trim completed" << dendl
;
1033 std::lock_guard
<std::mutex
> lock(mutex
);
1035 trimmed
.expire_old(clock_type::now());
1038 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1039 void on_bucket_trimmed(std::string
&& bucket_instance
) override
{
1040 ldout(store
->ctx(), 20) << "trimmed bucket instance " << bucket_instance
<< dendl
;
1041 std::lock_guard
<std::mutex
> lock(mutex
);
1042 trimmed
.insert(std::move(bucket_instance
), clock_type::now());
1045 bool trimmed_recently(const boost::string_view
& bucket_instance
) override
{
1046 std::lock_guard
<std::mutex
> lock(mutex
);
1047 return trimmed
.lookup(bucket_instance
);
1051 BucketTrimManager::BucketTrimManager(RGWRados
*store
,
1052 const BucketTrimConfig
& config
)
1053 : impl(new Impl(store
, config
))
1056 BucketTrimManager::~BucketTrimManager() = default;
1058 int BucketTrimManager::init()
1060 return impl
->watcher
.start();
1063 void BucketTrimManager::on_bucket_changed(const boost::string_view
& bucket
)
1065 std::lock_guard
<std::mutex
> lock(impl
->mutex
);
1066 // filter recently trimmed bucket instances out of bucket change counter
1067 if (impl
->trimmed
.lookup(bucket
)) {
1070 impl
->counter
.insert(bucket
.to_string());
1073 RGWCoroutine
* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager
*http
)
1075 return new BucketTrimPollCR(impl
->store
, http
, impl
->config
,
1076 impl
.get(), impl
->status_obj
);
1079 RGWCoroutine
* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager
*http
)
1081 // return the trim coroutine without any polling
1082 return new BucketTrimCR(impl
->store
, http
, impl
->config
,
1083 impl
.get(), impl
->status_obj
);