1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #include <initializer_list>
13 #include <boost/intrusive/list.hpp>
14 #include <boost/asio/basic_waitable_timer.hpp>
16 #include "include/common_fwd.h"
17 #include "include/rados/librados.hpp"
18 #include "common/ceph_time.h"
19 #include "common/async/yield_context.h"
20 #include "cls/rgw/cls_rgw_types.h"
21 #include "cls/lock/cls_lock_client.h"
23 #include "rgw_common.h"
27 namespace rgw
{ namespace sal
{
31 class RGWBucketReshardLock
{
32 using Clock
= ceph::coarse_mono_clock
;
34 rgw::sal::RadosStore
* store
;
35 const std::string lock_oid
;
37 rados::cls::lock::Lock internal_lock
;
38 std::chrono::seconds duration
;
40 Clock::time_point start_time
;
41 Clock::time_point renew_thresh
;
43 void reset_time(const Clock::time_point
& now
) {
45 renew_thresh
= start_time
+ duration
/ 2;
49 RGWBucketReshardLock(rgw::sal::RadosStore
* _store
,
50 const std::string
& reshard_lock_oid
,
52 RGWBucketReshardLock(rgw::sal::RadosStore
* _store
,
53 const RGWBucketInfo
& bucket_info
,
55 RGWBucketReshardLock(_store
, bucket_info
.bucket
.get_key(':'), _ephemeral
)
58 int lock(const DoutPrefixProvider
*dpp
);
60 int renew(const Clock::time_point
&);
62 bool should_renew(const Clock::time_point
& now
) const {
63 return now
>= renew_thresh
;
65 }; // class RGWBucketReshardLock
67 class RGWBucketReshard
{
70 friend class RGWReshard
;
72 using Clock
= ceph::coarse_mono_clock
;
76 rgw::sal::RadosStore
* store
;
77 RGWBucketInfo bucket_info
;
78 std::map
<std::string
, bufferlist
> bucket_attrs
;
80 RGWBucketReshardLock reshard_lock
;
81 RGWBucketReshardLock
* outer_reshard_lock
;
83 // using an initializer_list as an array in contiguous memory
84 // allocated in at once
85 static const std::initializer_list
<uint16_t> reshard_primes
;
87 int create_new_bucket_instance(int new_num_shards
,
88 RGWBucketInfo
& new_bucket_info
,
89 const DoutPrefixProvider
*dpp
);
90 int do_reshard(int num_shards
,
91 RGWBucketInfo
& new_bucket_info
,
96 const DoutPrefixProvider
*dpp
);
99 // pass nullptr for the final parameter if no outer reshard lock to
101 RGWBucketReshard(rgw::sal::RadosStore
* _store
,
102 const RGWBucketInfo
& _bucket_info
,
103 const std::map
<std::string
, bufferlist
>& _bucket_attrs
,
104 RGWBucketReshardLock
* _outer_reshard_lock
);
105 int execute(int num_shards
, int max_op_entries
,
106 const DoutPrefixProvider
*dpp
,
107 bool verbose
= false, std::ostream
*out
= nullptr,
108 Formatter
*formatter
= nullptr,
109 RGWReshard
*reshard_log
= nullptr);
110 int get_status(const DoutPrefixProvider
*dpp
, std::list
<cls_rgw_bucket_instance_entry
> *status
);
111 int cancel(const DoutPrefixProvider
*dpp
);
112 static int clear_resharding(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* store
,
113 const RGWBucketInfo
& bucket_info
);
114 int clear_resharding(const DoutPrefixProvider
*dpp
) {
115 return clear_resharding(dpp
, store
, bucket_info
);
117 static int clear_index_shard_reshard_status(const DoutPrefixProvider
*dpp
,
118 rgw::sal::RadosStore
* store
,
119 const RGWBucketInfo
& bucket_info
);
120 int clear_index_shard_reshard_status(const DoutPrefixProvider
*dpp
) {
121 return clear_index_shard_reshard_status(dpp
, store
, bucket_info
);
123 static int set_resharding_status(const DoutPrefixProvider
*dpp
,
124 rgw::sal::RadosStore
* store
,
125 const RGWBucketInfo
& bucket_info
,
126 const std::string
& new_instance_id
,
128 cls_rgw_reshard_status status
);
129 int set_resharding_status(const DoutPrefixProvider
*dpp
, const std::string
& new_instance_id
,
131 cls_rgw_reshard_status status
) {
132 return set_resharding_status(dpp
, store
, bucket_info
,
133 new_instance_id
, num_shards
, status
);
136 static uint32_t get_max_prime_shards() {
137 return *std::crbegin(reshard_primes
);
140 // returns the prime in our list less than or equal to the
141 // parameter; the lowest value that can be returned is 1
142 static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards
) {
143 auto it
= std::upper_bound(reshard_primes
.begin(), reshard_primes
.end(),
145 if (it
== reshard_primes
.begin()) {
152 // returns the prime in our list greater than or equal to the
153 // parameter; if we do not have such a prime, 0 is returned
154 static uint32_t get_prime_shards_greater_or_equal(
155 uint32_t requested_shards
)
157 auto it
= std::lower_bound(reshard_primes
.begin(), reshard_primes
.end(),
159 if (it
== reshard_primes
.end()) {
166 // returns a preferred number of shards given a calculated number of
167 // shards based on max_dynamic_shards and the list of prime values
168 static uint32_t get_preferred_shards(uint32_t suggested_shards
,
169 uint32_t max_dynamic_shards
) {
171 // use a prime if max is within our prime range, otherwise use
173 const uint32_t absolute_max
=
174 max_dynamic_shards
>= get_max_prime_shards() ?
176 get_prime_shards_less_or_equal(max_dynamic_shards
);
178 // if we can use a prime number, use it, otherwise use suggested;
179 // note get_prime_shards_greater_or_equal will return 0 if no prime in
181 const uint32_t prime_ish_num_shards
=
182 std::max(get_prime_shards_greater_or_equal(suggested_shards
),
185 // dynamic sharding cannot reshard more than defined maximum
186 const uint32_t final_num_shards
=
187 std::min(prime_ish_num_shards
, absolute_max
);
189 return final_num_shards
;
191 }; // RGWBucketReshard
196 using Clock
= ceph::coarse_mono_clock
;
199 rgw::sal::RadosStore
* store
;
200 std::string lock_name
;
201 rados::cls::lock::Lock instance_lock
;
206 Formatter
*formatter
;
208 void get_logshard_oid(int shard_num
, std::string
*shard
);
210 class ReshardWorker
: public Thread
, public DoutPrefixProvider
{
213 ceph::mutex lock
= ceph::make_mutex("ReshardWorker");
214 ceph::condition_variable cond
;
217 ReshardWorker(CephContext
* const _cct
,
218 RGWReshard
* const _reshard
)
222 void *entry() override
;
225 CephContext
*get_cct() const override
;
226 unsigned get_subsys() const;
227 std::ostream
& gen_prefix(std::ostream
& out
) const;
230 ReshardWorker
*worker
= nullptr;
231 std::atomic
<bool> down_flag
= { false };
233 std::string
get_logshard_key(const std::string
& tenant
, const std::string
& bucket_name
);
234 void get_bucket_logshard_oid(const std::string
& tenant
, const std::string
& bucket_name
, std::string
*oid
);
237 RGWReshard(rgw::sal::RadosStore
* _store
, bool _verbose
= false, std::ostream
*_out
= nullptr, Formatter
*_formatter
= nullptr);
238 int add(const DoutPrefixProvider
*dpp
, cls_rgw_reshard_entry
& entry
);
239 int update(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, const RGWBucketInfo
& new_bucket_info
);
240 int get(const DoutPrefixProvider
*dpp
, cls_rgw_reshard_entry
& entry
);
241 int remove(const DoutPrefixProvider
*dpp
, cls_rgw_reshard_entry
& entry
);
242 int list(const DoutPrefixProvider
*dpp
, int logshard_num
, std::string
& marker
, uint32_t max
, std::list
<cls_rgw_reshard_entry
>& entries
, bool *is_truncated
);
243 int clear_bucket_resharding(const DoutPrefixProvider
*dpp
, const std::string
& bucket_instance_oid
, cls_rgw_reshard_entry
& entry
);
246 int process_single_logshard(int logshard_num
, const DoutPrefixProvider
*dpp
);
247 int process_all_logshards(const DoutPrefixProvider
*dpp
);
249 void start_processor();
250 void stop_processor();
253 class RGWReshardWait
{
255 // the blocking wait uses std::condition_variable::wait_for(), which uses the
256 // std::chrono::steady_clock. use that for the async waits as well
257 using Clock
= std::chrono::steady_clock
;
259 const ceph::timespan duration
;
260 ceph::mutex mutex
= ceph::make_mutex("RGWReshardWait::lock");
261 ceph::condition_variable cond
;
263 struct Waiter
: boost::intrusive::list_base_hook
<> {
264 using Executor
= boost::asio::io_context::executor_type
;
265 using Timer
= boost::asio::basic_waitable_timer
<Clock
,
266 boost::asio::wait_traits
<Clock
>, Executor
>;
268 explicit Waiter(boost::asio::io_context
& ioc
) : timer(ioc
) {}
270 boost::intrusive::list
<Waiter
> waiters
;
272 bool going_down
{false};
275 RGWReshardWait(ceph::timespan duration
= std::chrono::seconds(5))
276 : duration(duration
) {}
278 ceph_assert(going_down
);
280 int wait(optional_yield y
);
281 // unblock any threads waiting on reshard