]>
Commit | Line | Data |
---|---|---|
31f18b77 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
31f18b77 FG |
3 | |
4 | #ifndef RGW_RESHARD_H | |
5 | #define RGW_RESHARD_H | |
6 | ||
7 | #include <vector> | |
9f95a23c | 8 | #include <initializer_list> |
f64942e4 | 9 | #include <functional> |
9f95a23c TL |
10 | #include <iterator> |
11 | #include <algorithm> | |
f64942e4 | 12 | |
11fdf7f2 | 13 | #include <boost/intrusive/list.hpp> |
9f95a23c | 14 | #include <boost/asio/basic_waitable_timer.hpp> |
11fdf7f2 | 15 | |
9f95a23c | 16 | #include "include/common_fwd.h" |
31f18b77 | 17 | #include "include/rados/librados.hpp" |
f64942e4 | 18 | #include "common/ceph_time.h" |
9f95a23c | 19 | #include "common/async/yield_context.h" |
31f18b77 FG |
20 | #include "cls/rgw/cls_rgw_types.h" |
21 | #include "cls/lock/cls_lock_client.h" | |
31f18b77 | 22 | |
9f95a23c | 23 | #include "rgw_common.h" |
f64942e4 | 24 | |
9f95a23c TL |
25 | |
26 | class RGWReshard; | |
27 | namespace rgw { namespace sal { | |
20effc67 | 28 | class RadosStore; |
9f95a23c | 29 | } } |
31f18b77 | 30 | |
f64942e4 AA |
31 | class RGWBucketReshardLock { |
32 | using Clock = ceph::coarse_mono_clock; | |
33 | ||
20effc67 | 34 | rgw::sal::RadosStore* store; |
f64942e4 AA |
35 | const std::string lock_oid; |
36 | const bool ephemeral; | |
37 | rados::cls::lock::Lock internal_lock; | |
38 | std::chrono::seconds duration; | |
39 | ||
40 | Clock::time_point start_time; | |
41 | Clock::time_point renew_thresh; | |
42 | ||
43 | void reset_time(const Clock::time_point& now) { | |
44 | start_time = now; | |
45 | renew_thresh = start_time + duration / 2; | |
46 | } | |
47 | ||
48 | public: | |
20effc67 | 49 | RGWBucketReshardLock(rgw::sal::RadosStore* _store, |
f64942e4 AA |
50 | const std::string& reshard_lock_oid, |
51 | bool _ephemeral); | |
20effc67 | 52 | RGWBucketReshardLock(rgw::sal::RadosStore* _store, |
f64942e4 AA |
53 | const RGWBucketInfo& bucket_info, |
54 | bool _ephemeral) : | |
55 | RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral) | |
56 | {} | |
57 | ||
20effc67 | 58 | int lock(const DoutPrefixProvider *dpp); |
f64942e4 AA |
59 | void unlock(); |
60 | int renew(const Clock::time_point&); | |
61 | ||
62 | bool should_renew(const Clock::time_point& now) const { | |
63 | return now >= renew_thresh; | |
64 | } | |
65 | }; // class RGWBucketReshardLock | |
31f18b77 FG |
66 | |
67 | class RGWBucketReshard { | |
f64942e4 AA |
68 | public: |
69 | ||
31f18b77 FG |
70 | friend class RGWReshard; |
71 | ||
f64942e4 AA |
72 | using Clock = ceph::coarse_mono_clock; |
73 | ||
74 | private: | |
75 | ||
20effc67 | 76 | rgw::sal::RadosStore* store; |
31f18b77 | 77 | RGWBucketInfo bucket_info; |
20effc67 | 78 | std::map<std::string, bufferlist> bucket_attrs; |
31f18b77 | 79 | |
f64942e4 AA |
80 | RGWBucketReshardLock reshard_lock; |
81 | RGWBucketReshardLock* outer_reshard_lock; | |
31f18b77 | 82 | |
9f95a23c TL |
83 | // using an initializer_list as an array in contiguous memory |
84 | // allocated in at once | |
85 | static const std::initializer_list<uint16_t> reshard_primes; | |
86 | ||
f64942e4 | 87 | int create_new_bucket_instance(int new_num_shards, |
b3b6e05e TL |
88 | RGWBucketInfo& new_bucket_info, |
89 | const DoutPrefixProvider *dpp); | |
31f18b77 | 90 | int do_reshard(int num_shards, |
b32b8144 | 91 | RGWBucketInfo& new_bucket_info, |
31f18b77 FG |
92 | int max_entries, |
93 | bool verbose, | |
20effc67 | 94 | std::ostream *os, |
b3b6e05e TL |
95 | Formatter *formatter, |
96 | const DoutPrefixProvider *dpp); | |
31f18b77 | 97 | public: |
31f18b77 | 98 | |
f64942e4 AA |
99 | // pass nullptr for the final parameter if no outer reshard lock to |
100 | // manage | |
20effc67 | 101 | RGWBucketReshard(rgw::sal::RadosStore* _store, |
9f95a23c | 102 | const RGWBucketInfo& _bucket_info, |
20effc67 | 103 | const std::map<std::string, bufferlist>& _bucket_attrs, |
f64942e4 | 104 | RGWBucketReshardLock* _outer_reshard_lock); |
31f18b77 | 105 | int execute(int num_shards, int max_op_entries, |
b3b6e05e | 106 | const DoutPrefixProvider *dpp, |
20effc67 | 107 | bool verbose = false, std::ostream *out = nullptr, |
31f18b77 FG |
108 | Formatter *formatter = nullptr, |
109 | RGWReshard *reshard_log = nullptr); | |
b3b6e05e TL |
110 | int get_status(const DoutPrefixProvider *dpp, std::list<cls_rgw_bucket_instance_entry> *status); |
111 | int cancel(const DoutPrefixProvider *dpp); | |
20effc67 | 112 | static int clear_resharding(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, |
f64942e4 | 113 | const RGWBucketInfo& bucket_info); |
b3b6e05e TL |
114 | int clear_resharding(const DoutPrefixProvider *dpp) { |
115 | return clear_resharding(dpp, store, bucket_info); | |
f64942e4 | 116 | } |
b3b6e05e | 117 | static int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp, |
20effc67 | 118 | rgw::sal::RadosStore* store, |
f64942e4 | 119 | const RGWBucketInfo& bucket_info); |
b3b6e05e TL |
120 | int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp) { |
121 | return clear_index_shard_reshard_status(dpp, store, bucket_info); | |
f64942e4 | 122 | } |
b3b6e05e | 123 | static int set_resharding_status(const DoutPrefixProvider *dpp, |
20effc67 | 124 | rgw::sal::RadosStore* store, |
f64942e4 | 125 | const RGWBucketInfo& bucket_info, |
20effc67 | 126 | const std::string& new_instance_id, |
f64942e4 AA |
127 | int32_t num_shards, |
128 | cls_rgw_reshard_status status); | |
20effc67 | 129 | int set_resharding_status(const DoutPrefixProvider *dpp, const std::string& new_instance_id, |
f64942e4 AA |
130 | int32_t num_shards, |
131 | cls_rgw_reshard_status status) { | |
b3b6e05e | 132 | return set_resharding_status(dpp, store, bucket_info, |
f64942e4 AA |
133 | new_instance_id, num_shards, status); |
134 | } | |
9f95a23c TL |
135 | |
136 | static uint32_t get_max_prime_shards() { | |
137 | return *std::crbegin(reshard_primes); | |
138 | } | |
139 | ||
140 | // returns the prime in our list less than or equal to the | |
141 | // parameter; the lowest value that can be returned is 1 | |
142 | static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards) { | |
143 | auto it = std::upper_bound(reshard_primes.begin(), reshard_primes.end(), | |
144 | requested_shards); | |
145 | if (it == reshard_primes.begin()) { | |
146 | return 1; | |
147 | } else { | |
148 | return *(--it); | |
149 | } | |
150 | } | |
151 | ||
152 | // returns the prime in our list greater than or equal to the | |
153 | // parameter; if we do not have such a prime, 0 is returned | |
154 | static uint32_t get_prime_shards_greater_or_equal( | |
155 | uint32_t requested_shards) | |
156 | { | |
157 | auto it = std::lower_bound(reshard_primes.begin(), reshard_primes.end(), | |
158 | requested_shards); | |
159 | if (it == reshard_primes.end()) { | |
160 | return 0; | |
161 | } else { | |
162 | return *it; | |
163 | } | |
164 | } | |
165 | ||
166 | // returns a preferred number of shards given a calculated number of | |
167 | // shards based on max_dynamic_shards and the list of prime values | |
168 | static uint32_t get_preferred_shards(uint32_t suggested_shards, | |
169 | uint32_t max_dynamic_shards) { | |
170 | ||
171 | // use a prime if max is within our prime range, otherwise use | |
172 | // specified max | |
173 | const uint32_t absolute_max = | |
174 | max_dynamic_shards >= get_max_prime_shards() ? | |
175 | max_dynamic_shards : | |
176 | get_prime_shards_less_or_equal(max_dynamic_shards); | |
177 | ||
178 | // if we can use a prime number, use it, otherwise use suggested; | |
179 | // note get_prime_shards_greater_or_equal will return 0 if no prime in | |
180 | // prime range | |
181 | const uint32_t prime_ish_num_shards = | |
182 | std::max(get_prime_shards_greater_or_equal(suggested_shards), | |
183 | suggested_shards); | |
184 | ||
185 | // dynamic sharding cannot reshard more than defined maximum | |
186 | const uint32_t final_num_shards = | |
187 | std::min(prime_ish_num_shards, absolute_max); | |
188 | ||
189 | return final_num_shards; | |
190 | } | |
f64942e4 | 191 | }; // RGWBucketReshard |
31f18b77 | 192 | |
9f95a23c | 193 | |
31f18b77 | 194 | class RGWReshard { |
f64942e4 AA |
195 | public: |
196 | using Clock = ceph::coarse_mono_clock; | |
197 | ||
198 | private: | |
20effc67 TL |
199 | rgw::sal::RadosStore* store; |
200 | std::string lock_name; | |
31f18b77 FG |
201 | rados::cls::lock::Lock instance_lock; |
202 | int num_logshards; | |
203 | ||
204 | bool verbose; | |
20effc67 | 205 | std::ostream *out; |
31f18b77 FG |
206 | Formatter *formatter; |
207 | ||
20effc67 | 208 | void get_logshard_oid(int shard_num, std::string *shard); |
31f18b77 | 209 | protected: |
b3b6e05e | 210 | class ReshardWorker : public Thread, public DoutPrefixProvider { |
31f18b77 FG |
211 | CephContext *cct; |
212 | RGWReshard *reshard; | |
9f95a23c TL |
213 | ceph::mutex lock = ceph::make_mutex("ReshardWorker"); |
214 | ceph::condition_variable cond; | |
31f18b77 FG |
215 | |
216 | public: | |
217 | ReshardWorker(CephContext * const _cct, | |
9f95a23c | 218 | RGWReshard * const _reshard) |
31f18b77 | 219 | : cct(_cct), |
b3b6e05e | 220 | reshard(_reshard) {} |
31f18b77 FG |
221 | |
222 | void *entry() override; | |
223 | void stop(); | |
b3b6e05e TL |
224 | |
225 | CephContext *get_cct() const override; | |
226 | unsigned get_subsys() const; | |
227 | std::ostream& gen_prefix(std::ostream& out) const; | |
31f18b77 FG |
228 | }; |
229 | ||
224ce89b | 230 | ReshardWorker *worker = nullptr; |
31f18b77 FG |
231 | std::atomic<bool> down_flag = { false }; |
232 | ||
20effc67 TL |
233 | std::string get_logshard_key(const std::string& tenant, const std::string& bucket_name); |
234 | void get_bucket_logshard_oid(const std::string& tenant, const std::string& bucket_name, std::string *oid); | |
31f18b77 FG |
235 | |
236 | public: | |
20effc67 | 237 | RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr); |
b3b6e05e TL |
238 | int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry); |
239 | int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info); | |
20effc67 | 240 | int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry); |
b3b6e05e | 241 | int remove(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry); |
20effc67 TL |
242 | int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated); |
243 | int clear_bucket_resharding(const DoutPrefixProvider *dpp, const std::string& bucket_instance_oid, cls_rgw_reshard_entry& entry); | |
31f18b77 FG |
244 | |
245 | /* reshard thread */ | |
b3b6e05e TL |
246 | int process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp); |
247 | int process_all_logshards(const DoutPrefixProvider *dpp); | |
31f18b77 FG |
248 | bool going_down(); |
249 | void start_processor(); | |
250 | void stop_processor(); | |
251 | }; | |
252 | ||
31f18b77 | 253 | class RGWReshardWait { |
81eedcae TL |
254 | public: |
255 | // the blocking wait uses std::condition_variable::wait_for(), which uses the | |
256 | // std::chrono::steady_clock. use that for the async waits as well | |
257 | using Clock = std::chrono::steady_clock; | |
258 | private: | |
11fdf7f2 TL |
259 | const ceph::timespan duration; |
260 | ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock"); | |
261 | ceph::condition_variable cond; | |
262 | ||
11fdf7f2 | 263 | struct Waiter : boost::intrusive::list_base_hook<> { |
92f5a8d4 TL |
264 | using Executor = boost::asio::io_context::executor_type; |
265 | using Timer = boost::asio::basic_waitable_timer<Clock, | |
266 | boost::asio::wait_traits<Clock>, Executor>; | |
92f5a8d4 | 267 | Timer timer; |
11fdf7f2 TL |
268 | explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {} |
269 | }; | |
270 | boost::intrusive::list<Waiter> waiters; | |
31f18b77 FG |
271 | |
272 | bool going_down{false}; | |
273 | ||
31f18b77 | 274 | public: |
11fdf7f2 TL |
275 | RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5)) |
276 | : duration(duration) {} | |
31f18b77 | 277 | ~RGWReshardWait() { |
11fdf7f2 | 278 | ceph_assert(going_down); |
31f18b77 | 279 | } |
11fdf7f2 TL |
280 | int wait(optional_yield y); |
281 | // unblock any threads waiting on reshard | |
282 | void stop(); | |
31f18b77 FG |
283 | }; |
284 | ||
285 | #endif |