1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #include <initializer_list>
13 #include <boost/intrusive/list.hpp>
14 #include <boost/asio/basic_waitable_timer.hpp>
16 #include "include/common_fwd.h"
17 #include "include/rados/librados.hpp"
18 #include "common/ceph_time.h"
19 #include "common/async/yield_context.h"
20 #include "cls/rgw/cls_rgw_types.h"
21 #include "cls/lock/cls_lock_client.h"
23 #include "rgw_common.h"
27 namespace rgw
{ namespace sal
{
31 class RGWBucketReshardLock
{
32 using Clock
= ceph::coarse_mono_clock
;
34 rgw::sal::RGWRadosStore
* store
;
35 const std::string lock_oid
;
37 rados::cls::lock::Lock internal_lock
;
38 std::chrono::seconds duration
;
40 Clock::time_point start_time
;
41 Clock::time_point renew_thresh
;
43 void reset_time(const Clock::time_point
& now
) {
45 renew_thresh
= start_time
+ duration
/ 2;
49 RGWBucketReshardLock(rgw::sal::RGWRadosStore
* _store
,
50 const std::string
& reshard_lock_oid
,
52 RGWBucketReshardLock(rgw::sal::RGWRadosStore
* _store
,
53 const RGWBucketInfo
& bucket_info
,
55 RGWBucketReshardLock(_store
, bucket_info
.bucket
.get_key(':'), _ephemeral
)
60 int renew(const Clock::time_point
&);
62 bool should_renew(const Clock::time_point
& now
) const {
63 return now
>= renew_thresh
;
65 }; // class RGWBucketReshardLock
67 class RGWBucketReshard
{
70 friend class RGWReshard
;
72 using Clock
= ceph::coarse_mono_clock
;
76 rgw::sal::RGWRadosStore
*store
;
77 RGWBucketInfo bucket_info
;
78 std::map
<string
, bufferlist
> bucket_attrs
;
80 RGWBucketReshardLock reshard_lock
;
81 RGWBucketReshardLock
* outer_reshard_lock
;
83 // using an initializer_list as an array in contiguous memory
84 // allocated in at once
85 static const std::initializer_list
<uint16_t> reshard_primes
;
87 int create_new_bucket_instance(int new_num_shards
,
88 RGWBucketInfo
& new_bucket_info
);
89 int do_reshard(int num_shards
,
90 RGWBucketInfo
& new_bucket_info
,
94 Formatter
*formatter
);
97 // pass nullptr for the final parameter if no outer reshard lock to
99 RGWBucketReshard(rgw::sal::RGWRadosStore
*_store
,
100 const RGWBucketInfo
& _bucket_info
,
101 const std::map
<string
, bufferlist
>& _bucket_attrs
,
102 RGWBucketReshardLock
* _outer_reshard_lock
);
103 int execute(int num_shards
, int max_op_entries
,
104 bool verbose
= false, ostream
*out
= nullptr,
105 Formatter
*formatter
= nullptr,
106 RGWReshard
*reshard_log
= nullptr);
107 int get_status(std::list
<cls_rgw_bucket_instance_entry
> *status
);
109 static int clear_resharding(rgw::sal::RGWRadosStore
* store
,
110 const RGWBucketInfo
& bucket_info
);
111 int clear_resharding() {
112 return clear_resharding(store
, bucket_info
);
114 static int clear_index_shard_reshard_status(rgw::sal::RGWRadosStore
* store
,
115 const RGWBucketInfo
& bucket_info
);
116 int clear_index_shard_reshard_status() {
117 return clear_index_shard_reshard_status(store
, bucket_info
);
119 static int set_resharding_status(rgw::sal::RGWRadosStore
* store
,
120 const RGWBucketInfo
& bucket_info
,
121 const string
& new_instance_id
,
123 cls_rgw_reshard_status status
);
124 int set_resharding_status(const string
& new_instance_id
,
126 cls_rgw_reshard_status status
) {
127 return set_resharding_status(store
, bucket_info
,
128 new_instance_id
, num_shards
, status
);
131 static uint32_t get_max_prime_shards() {
132 return *std::crbegin(reshard_primes
);
135 // returns the prime in our list less than or equal to the
136 // parameter; the lowest value that can be returned is 1
137 static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards
) {
138 auto it
= std::upper_bound(reshard_primes
.begin(), reshard_primes
.end(),
140 if (it
== reshard_primes
.begin()) {
147 // returns the prime in our list greater than or equal to the
148 // parameter; if we do not have such a prime, 0 is returned
149 static uint32_t get_prime_shards_greater_or_equal(
150 uint32_t requested_shards
)
152 auto it
= std::lower_bound(reshard_primes
.begin(), reshard_primes
.end(),
154 if (it
== reshard_primes
.end()) {
161 // returns a preferred number of shards given a calculated number of
162 // shards based on max_dynamic_shards and the list of prime values
163 static uint32_t get_preferred_shards(uint32_t suggested_shards
,
164 uint32_t max_dynamic_shards
) {
166 // use a prime if max is within our prime range, otherwise use
168 const uint32_t absolute_max
=
169 max_dynamic_shards
>= get_max_prime_shards() ?
171 get_prime_shards_less_or_equal(max_dynamic_shards
);
173 // if we can use a prime number, use it, otherwise use suggested;
174 // note get_prime_shards_greater_or_equal will return 0 if no prime in
176 const uint32_t prime_ish_num_shards
=
177 std::max(get_prime_shards_greater_or_equal(suggested_shards
),
180 // dynamic sharding cannot reshard more than defined maximum
181 const uint32_t final_num_shards
=
182 std::min(prime_ish_num_shards
, absolute_max
);
184 return final_num_shards
;
186 }; // RGWBucketReshard
191 using Clock
= ceph::coarse_mono_clock
;
194 rgw::sal::RGWRadosStore
*store
;
196 rados::cls::lock::Lock instance_lock
;
201 Formatter
*formatter
;
203 void get_logshard_oid(int shard_num
, string
*shard
);
205 class ReshardWorker
: public Thread
{
208 ceph::mutex lock
= ceph::make_mutex("ReshardWorker");
209 ceph::condition_variable cond
;
212 ReshardWorker(CephContext
* const _cct
,
213 RGWReshard
* const _reshard
)
218 void *entry() override
;
222 ReshardWorker
*worker
= nullptr;
223 std::atomic
<bool> down_flag
= { false };
225 string
get_logshard_key(const string
& tenant
, const string
& bucket_name
);
226 void get_bucket_logshard_oid(const string
& tenant
, const string
& bucket_name
, string
*oid
);
229 RGWReshard(rgw::sal::RGWRadosStore
* _store
, bool _verbose
= false, ostream
*_out
= nullptr, Formatter
*_formatter
= nullptr);
230 int add(cls_rgw_reshard_entry
& entry
);
231 int update(const RGWBucketInfo
& bucket_info
, const RGWBucketInfo
& new_bucket_info
);
232 int get(cls_rgw_reshard_entry
& entry
);
233 int remove(cls_rgw_reshard_entry
& entry
);
234 int list(int logshard_num
, string
& marker
, uint32_t max
, std::list
<cls_rgw_reshard_entry
>& entries
, bool *is_truncated
);
235 int clear_bucket_resharding(const string
& bucket_instance_oid
, cls_rgw_reshard_entry
& entry
);
238 int process_single_logshard(int logshard_num
);
239 int process_all_logshards();
241 void start_processor();
242 void stop_processor();
245 class RGWReshardWait
{
247 // the blocking wait uses std::condition_variable::wait_for(), which uses the
248 // std::chrono::steady_clock. use that for the async waits as well
249 using Clock
= std::chrono::steady_clock
;
251 const ceph::timespan duration
;
252 ceph::mutex mutex
= ceph::make_mutex("RGWReshardWait::lock");
253 ceph::condition_variable cond
;
255 struct Waiter
: boost::intrusive::list_base_hook
<> {
256 using Executor
= boost::asio::io_context::executor_type
;
257 using Timer
= boost::asio::basic_waitable_timer
<Clock
,
258 boost::asio::wait_traits
<Clock
>, Executor
>;
260 explicit Waiter(boost::asio::io_context
& ioc
) : timer(ioc
) {}
262 boost::intrusive::list
<Waiter
> waiters
;
264 bool going_down
{false};
267 RGWReshardWait(ceph::timespan duration
= std::chrono::seconds(5))
268 : duration(duration
) {}
270 ceph_assert(going_down
);
272 int wait(optional_yield y
);
273 // unblock any threads waiting on reshard