]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_reshard.h
a5c190f59143fd110be5f89395b25ad52f84ff8a
[ceph.git] / ceph / src / rgw / rgw_reshard.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef RGW_RESHARD_H
5 #define RGW_RESHARD_H
6
7 #include <vector>
8 #include <initializer_list>
9 #include <functional>
10 #include <iterator>
11 #include <algorithm>
12
13 #include <boost/intrusive/list.hpp>
14 #include <boost/asio/basic_waitable_timer.hpp>
15
16 #include "include/common_fwd.h"
17 #include "include/rados/librados.hpp"
18 #include "common/ceph_time.h"
19 #include "common/async/yield_context.h"
20 #include "cls/rgw/cls_rgw_types.h"
21 #include "cls/lock/cls_lock_client.h"
22
23 #include "rgw_common.h"
24
25
26 class RGWReshard;
27 namespace rgw { namespace sal {
28 class RGWRadosStore;
29 } }
30
31 class RGWBucketReshardLock {
32 using Clock = ceph::coarse_mono_clock;
33
34 rgw::sal::RGWRadosStore* store;
35 const std::string lock_oid;
36 const bool ephemeral;
37 rados::cls::lock::Lock internal_lock;
38 std::chrono::seconds duration;
39
40 Clock::time_point start_time;
41 Clock::time_point renew_thresh;
42
43 void reset_time(const Clock::time_point& now) {
44 start_time = now;
45 renew_thresh = start_time + duration / 2;
46 }
47
48 public:
49 RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
50 const std::string& reshard_lock_oid,
51 bool _ephemeral);
52 RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
53 const RGWBucketInfo& bucket_info,
54 bool _ephemeral) :
55 RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
56 {}
57
58 int lock();
59 void unlock();
60 int renew(const Clock::time_point&);
61
62 bool should_renew(const Clock::time_point& now) const {
63 return now >= renew_thresh;
64 }
65 }; // class RGWBucketReshardLock
66
67 class RGWBucketReshard {
68 public:
69
70 friend class RGWReshard;
71
72 using Clock = ceph::coarse_mono_clock;
73
74 private:
75
76 rgw::sal::RGWRadosStore *store;
77 RGWBucketInfo bucket_info;
78 std::map<string, bufferlist> bucket_attrs;
79
80 RGWBucketReshardLock reshard_lock;
81 RGWBucketReshardLock* outer_reshard_lock;
82
83 // using an initializer_list as an array in contiguous memory
84 // allocated in at once
85 static const std::initializer_list<uint16_t> reshard_primes;
86
87 int create_new_bucket_instance(int new_num_shards,
88 RGWBucketInfo& new_bucket_info);
89 int do_reshard(int num_shards,
90 RGWBucketInfo& new_bucket_info,
91 int max_entries,
92 bool verbose,
93 ostream *os,
94 Formatter *formatter);
95 public:
96
97 // pass nullptr for the final parameter if no outer reshard lock to
98 // manage
99 RGWBucketReshard(rgw::sal::RGWRadosStore *_store,
100 const RGWBucketInfo& _bucket_info,
101 const std::map<string, bufferlist>& _bucket_attrs,
102 RGWBucketReshardLock* _outer_reshard_lock);
103 int execute(int num_shards, int max_op_entries,
104 bool verbose = false, ostream *out = nullptr,
105 Formatter *formatter = nullptr,
106 RGWReshard *reshard_log = nullptr);
107 int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
108 int cancel();
109 static int clear_resharding(rgw::sal::RGWRadosStore* store,
110 const RGWBucketInfo& bucket_info);
111 int clear_resharding() {
112 return clear_resharding(store, bucket_info);
113 }
114 static int clear_index_shard_reshard_status(rgw::sal::RGWRadosStore* store,
115 const RGWBucketInfo& bucket_info);
116 int clear_index_shard_reshard_status() {
117 return clear_index_shard_reshard_status(store, bucket_info);
118 }
119 static int set_resharding_status(rgw::sal::RGWRadosStore* store,
120 const RGWBucketInfo& bucket_info,
121 const string& new_instance_id,
122 int32_t num_shards,
123 cls_rgw_reshard_status status);
124 int set_resharding_status(const string& new_instance_id,
125 int32_t num_shards,
126 cls_rgw_reshard_status status) {
127 return set_resharding_status(store, bucket_info,
128 new_instance_id, num_shards, status);
129 }
130
131 static uint32_t get_max_prime_shards() {
132 return *std::crbegin(reshard_primes);
133 }
134
135 // returns the prime in our list less than or equal to the
136 // parameter; the lowest value that can be returned is 1
137 static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards) {
138 auto it = std::upper_bound(reshard_primes.begin(), reshard_primes.end(),
139 requested_shards);
140 if (it == reshard_primes.begin()) {
141 return 1;
142 } else {
143 return *(--it);
144 }
145 }
146
147 // returns the prime in our list greater than or equal to the
148 // parameter; if we do not have such a prime, 0 is returned
149 static uint32_t get_prime_shards_greater_or_equal(
150 uint32_t requested_shards)
151 {
152 auto it = std::lower_bound(reshard_primes.begin(), reshard_primes.end(),
153 requested_shards);
154 if (it == reshard_primes.end()) {
155 return 0;
156 } else {
157 return *it;
158 }
159 }
160
161 // returns a preferred number of shards given a calculated number of
162 // shards based on max_dynamic_shards and the list of prime values
163 static uint32_t get_preferred_shards(uint32_t suggested_shards,
164 uint32_t max_dynamic_shards) {
165
166 // use a prime if max is within our prime range, otherwise use
167 // specified max
168 const uint32_t absolute_max =
169 max_dynamic_shards >= get_max_prime_shards() ?
170 max_dynamic_shards :
171 get_prime_shards_less_or_equal(max_dynamic_shards);
172
173 // if we can use a prime number, use it, otherwise use suggested;
174 // note get_prime_shards_greater_or_equal will return 0 if no prime in
175 // prime range
176 const uint32_t prime_ish_num_shards =
177 std::max(get_prime_shards_greater_or_equal(suggested_shards),
178 suggested_shards);
179
180 // dynamic sharding cannot reshard more than defined maximum
181 const uint32_t final_num_shards =
182 std::min(prime_ish_num_shards, absolute_max);
183
184 return final_num_shards;
185 }
186 }; // RGWBucketReshard
187
188
189 class RGWReshard {
190 public:
191 using Clock = ceph::coarse_mono_clock;
192
193 private:
194 rgw::sal::RGWRadosStore *store;
195 string lock_name;
196 rados::cls::lock::Lock instance_lock;
197 int num_logshards;
198
199 bool verbose;
200 ostream *out;
201 Formatter *formatter;
202
203 void get_logshard_oid(int shard_num, string *shard);
204 protected:
205 class ReshardWorker : public Thread {
206 CephContext *cct;
207 RGWReshard *reshard;
208 ceph::mutex lock = ceph::make_mutex("ReshardWorker");
209 ceph::condition_variable cond;
210
211 public:
212 ReshardWorker(CephContext * const _cct,
213 RGWReshard * const _reshard)
214 : cct(_cct),
215 reshard(_reshard) {
216 }
217
218 void *entry() override;
219 void stop();
220 };
221
222 ReshardWorker *worker = nullptr;
223 std::atomic<bool> down_flag = { false };
224
225 string get_logshard_key(const string& tenant, const string& bucket_name);
226 void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
227
228 public:
229 RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
230 int add(cls_rgw_reshard_entry& entry);
231 int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
232 int get(cls_rgw_reshard_entry& entry);
233 int remove(cls_rgw_reshard_entry& entry);
234 int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
235 int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
236
237 /* reshard thread */
238 int process_single_logshard(int logshard_num);
239 int process_all_logshards();
240 bool going_down();
241 void start_processor();
242 void stop_processor();
243 };
244
245 class RGWReshardWait {
246 public:
247 // the blocking wait uses std::condition_variable::wait_for(), which uses the
248 // std::chrono::steady_clock. use that for the async waits as well
249 using Clock = std::chrono::steady_clock;
250 private:
251 const ceph::timespan duration;
252 ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock");
253 ceph::condition_variable cond;
254
255 struct Waiter : boost::intrusive::list_base_hook<> {
256 using Executor = boost::asio::io_context::executor_type;
257 using Timer = boost::asio::basic_waitable_timer<Clock,
258 boost::asio::wait_traits<Clock>, Executor>;
259 Timer timer;
260 explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
261 };
262 boost::intrusive::list<Waiter> waiters;
263
264 bool going_down{false};
265
266 public:
267 RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5))
268 : duration(duration) {}
269 ~RGWReshardWait() {
270 ceph_assert(going_down);
271 }
272 int wait(optional_yield y);
273 // unblock any threads waiting on reshard
274 void stop();
275 };
276
277 #endif