]>
Commit | Line | Data |
---|---|---|
31f18b77 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
31f18b77 FG |
3 | |
4 | #ifndef RGW_RESHARD_H | |
5 | #define RGW_RESHARD_H | |
6 | ||
7 | #include <vector> | |
9f95a23c | 8 | #include <initializer_list> |
f64942e4 | 9 | #include <functional> |
9f95a23c TL |
10 | #include <iterator> |
11 | #include <algorithm> | |
f64942e4 | 12 | |
11fdf7f2 | 13 | #include <boost/intrusive/list.hpp> |
9f95a23c | 14 | #include <boost/asio/basic_waitable_timer.hpp> |
11fdf7f2 | 15 | |
9f95a23c | 16 | #include "include/common_fwd.h" |
31f18b77 | 17 | #include "include/rados/librados.hpp" |
f64942e4 | 18 | #include "common/ceph_time.h" |
9f95a23c | 19 | #include "common/async/yield_context.h" |
31f18b77 FG |
20 | #include "cls/rgw/cls_rgw_types.h" |
21 | #include "cls/lock/cls_lock_client.h" | |
31f18b77 | 22 | |
9f95a23c | 23 | #include "rgw_common.h" |
f64942e4 | 24 | |
9f95a23c TL |
25 | |
26 | class RGWReshard; | |
27 | namespace rgw { namespace sal { | |
28 | class RGWRadosStore; | |
29 | } } | |
31f18b77 | 30 | |
f64942e4 AA |
31 | class RGWBucketReshardLock { |
32 | using Clock = ceph::coarse_mono_clock; | |
33 | ||
9f95a23c | 34 | rgw::sal::RGWRadosStore* store; |
f64942e4 AA |
35 | const std::string lock_oid; |
36 | const bool ephemeral; | |
37 | rados::cls::lock::Lock internal_lock; | |
38 | std::chrono::seconds duration; | |
39 | ||
40 | Clock::time_point start_time; | |
41 | Clock::time_point renew_thresh; | |
42 | ||
43 | void reset_time(const Clock::time_point& now) { | |
44 | start_time = now; | |
45 | renew_thresh = start_time + duration / 2; | |
46 | } | |
47 | ||
48 | public: | |
9f95a23c | 49 | RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store, |
f64942e4 AA |
50 | const std::string& reshard_lock_oid, |
51 | bool _ephemeral); | |
9f95a23c | 52 | RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store, |
f64942e4 AA |
53 | const RGWBucketInfo& bucket_info, |
54 | bool _ephemeral) : | |
55 | RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral) | |
56 | {} | |
57 | ||
58 | int lock(); | |
59 | void unlock(); | |
60 | int renew(const Clock::time_point&); | |
61 | ||
62 | bool should_renew(const Clock::time_point& now) const { | |
63 | return now >= renew_thresh; | |
64 | } | |
65 | }; // class RGWBucketReshardLock | |
31f18b77 FG |
66 | |
67 | class RGWBucketReshard { | |
f64942e4 AA |
68 | public: |
69 | ||
31f18b77 FG |
70 | friend class RGWReshard; |
71 | ||
f64942e4 AA |
72 | using Clock = ceph::coarse_mono_clock; |
73 | ||
74 | private: | |
75 | ||
9f95a23c | 76 | rgw::sal::RGWRadosStore *store; |
31f18b77 FG |
77 | RGWBucketInfo bucket_info; |
78 | std::map<string, bufferlist> bucket_attrs; | |
79 | ||
f64942e4 AA |
80 | RGWBucketReshardLock reshard_lock; |
81 | RGWBucketReshardLock* outer_reshard_lock; | |
31f18b77 | 82 | |
9f95a23c TL |
83 | // using an initializer_list as an array in contiguous memory |
84 | // allocated in at once | |
85 | static const std::initializer_list<uint16_t> reshard_primes; | |
86 | ||
f64942e4 AA |
87 | int create_new_bucket_instance(int new_num_shards, |
88 | RGWBucketInfo& new_bucket_info); | |
31f18b77 | 89 | int do_reshard(int num_shards, |
b32b8144 | 90 | RGWBucketInfo& new_bucket_info, |
31f18b77 FG |
91 | int max_entries, |
92 | bool verbose, | |
93 | ostream *os, | |
94 | Formatter *formatter); | |
95 | public: | |
31f18b77 | 96 | |
f64942e4 AA |
97 | // pass nullptr for the final parameter if no outer reshard lock to |
98 | // manage | |
9f95a23c TL |
99 | RGWBucketReshard(rgw::sal::RGWRadosStore *_store, |
100 | const RGWBucketInfo& _bucket_info, | |
f64942e4 AA |
101 | const std::map<string, bufferlist>& _bucket_attrs, |
102 | RGWBucketReshardLock* _outer_reshard_lock); | |
31f18b77 FG |
103 | int execute(int num_shards, int max_op_entries, |
104 | bool verbose = false, ostream *out = nullptr, | |
105 | Formatter *formatter = nullptr, | |
106 | RGWReshard *reshard_log = nullptr); | |
31f18b77 | 107 | int get_status(std::list<cls_rgw_bucket_instance_entry> *status); |
94b18763 | 108 | int cancel(); |
9f95a23c | 109 | static int clear_resharding(rgw::sal::RGWRadosStore* store, |
f64942e4 AA |
110 | const RGWBucketInfo& bucket_info); |
111 | int clear_resharding() { | |
112 | return clear_resharding(store, bucket_info); | |
113 | } | |
9f95a23c | 114 | static int clear_index_shard_reshard_status(rgw::sal::RGWRadosStore* store, |
f64942e4 AA |
115 | const RGWBucketInfo& bucket_info); |
116 | int clear_index_shard_reshard_status() { | |
117 | return clear_index_shard_reshard_status(store, bucket_info); | |
118 | } | |
9f95a23c | 119 | static int set_resharding_status(rgw::sal::RGWRadosStore* store, |
f64942e4 AA |
120 | const RGWBucketInfo& bucket_info, |
121 | const string& new_instance_id, | |
122 | int32_t num_shards, | |
123 | cls_rgw_reshard_status status); | |
124 | int set_resharding_status(const string& new_instance_id, | |
125 | int32_t num_shards, | |
126 | cls_rgw_reshard_status status) { | |
127 | return set_resharding_status(store, bucket_info, | |
128 | new_instance_id, num_shards, status); | |
129 | } | |
9f95a23c TL |
130 | |
131 | static uint32_t get_max_prime_shards() { | |
132 | return *std::crbegin(reshard_primes); | |
133 | } | |
134 | ||
135 | // returns the prime in our list less than or equal to the | |
136 | // parameter; the lowest value that can be returned is 1 | |
137 | static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards) { | |
138 | auto it = std::upper_bound(reshard_primes.begin(), reshard_primes.end(), | |
139 | requested_shards); | |
140 | if (it == reshard_primes.begin()) { | |
141 | return 1; | |
142 | } else { | |
143 | return *(--it); | |
144 | } | |
145 | } | |
146 | ||
147 | // returns the prime in our list greater than or equal to the | |
148 | // parameter; if we do not have such a prime, 0 is returned | |
149 | static uint32_t get_prime_shards_greater_or_equal( | |
150 | uint32_t requested_shards) | |
151 | { | |
152 | auto it = std::lower_bound(reshard_primes.begin(), reshard_primes.end(), | |
153 | requested_shards); | |
154 | if (it == reshard_primes.end()) { | |
155 | return 0; | |
156 | } else { | |
157 | return *it; | |
158 | } | |
159 | } | |
160 | ||
161 | // returns a preferred number of shards given a calculated number of | |
162 | // shards based on max_dynamic_shards and the list of prime values | |
163 | static uint32_t get_preferred_shards(uint32_t suggested_shards, | |
164 | uint32_t max_dynamic_shards) { | |
165 | ||
166 | // use a prime if max is within our prime range, otherwise use | |
167 | // specified max | |
168 | const uint32_t absolute_max = | |
169 | max_dynamic_shards >= get_max_prime_shards() ? | |
170 | max_dynamic_shards : | |
171 | get_prime_shards_less_or_equal(max_dynamic_shards); | |
172 | ||
173 | // if we can use a prime number, use it, otherwise use suggested; | |
174 | // note get_prime_shards_greater_or_equal will return 0 if no prime in | |
175 | // prime range | |
176 | const uint32_t prime_ish_num_shards = | |
177 | std::max(get_prime_shards_greater_or_equal(suggested_shards), | |
178 | suggested_shards); | |
179 | ||
180 | // dynamic sharding cannot reshard more than defined maximum | |
181 | const uint32_t final_num_shards = | |
182 | std::min(prime_ish_num_shards, absolute_max); | |
183 | ||
184 | return final_num_shards; | |
185 | } | |
f64942e4 | 186 | }; // RGWBucketReshard |
31f18b77 | 187 | |
9f95a23c | 188 | |
31f18b77 | 189 | class RGWReshard { |
f64942e4 AA |
190 | public: |
191 | using Clock = ceph::coarse_mono_clock; | |
192 | ||
193 | private: | |
9f95a23c | 194 | rgw::sal::RGWRadosStore *store; |
31f18b77 FG |
195 | string lock_name; |
196 | rados::cls::lock::Lock instance_lock; | |
197 | int num_logshards; | |
198 | ||
199 | bool verbose; | |
200 | ostream *out; | |
201 | Formatter *formatter; | |
202 | ||
203 | void get_logshard_oid(int shard_num, string *shard); | |
204 | protected: | |
205 | class ReshardWorker : public Thread { | |
206 | CephContext *cct; | |
207 | RGWReshard *reshard; | |
9f95a23c TL |
208 | ceph::mutex lock = ceph::make_mutex("ReshardWorker"); |
209 | ceph::condition_variable cond; | |
31f18b77 FG |
210 | |
211 | public: | |
212 | ReshardWorker(CephContext * const _cct, | |
9f95a23c | 213 | RGWReshard * const _reshard) |
31f18b77 | 214 | : cct(_cct), |
9f95a23c | 215 | reshard(_reshard) { |
31f18b77 FG |
216 | } |
217 | ||
218 | void *entry() override; | |
219 | void stop(); | |
220 | }; | |
221 | ||
224ce89b | 222 | ReshardWorker *worker = nullptr; |
31f18b77 FG |
223 | std::atomic<bool> down_flag = { false }; |
224 | ||
225 | string get_logshard_key(const string& tenant, const string& bucket_name); | |
226 | void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid); | |
227 | ||
228 | public: | |
9f95a23c | 229 | RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr); |
31f18b77 FG |
230 | int add(cls_rgw_reshard_entry& entry); |
231 | int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info); | |
232 | int get(cls_rgw_reshard_entry& entry); | |
233 | int remove(cls_rgw_reshard_entry& entry); | |
234 | int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated); | |
235 | int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); | |
236 | ||
237 | /* reshard thread */ | |
238 | int process_single_logshard(int logshard_num); | |
239 | int process_all_logshards(); | |
240 | bool going_down(); | |
241 | void start_processor(); | |
242 | void stop_processor(); | |
243 | }; | |
244 | ||
31f18b77 | 245 | class RGWReshardWait { |
81eedcae TL |
246 | public: |
247 | // the blocking wait uses std::condition_variable::wait_for(), which uses the | |
248 | // std::chrono::steady_clock. use that for the async waits as well | |
249 | using Clock = std::chrono::steady_clock; | |
250 | private: | |
11fdf7f2 TL |
251 | const ceph::timespan duration; |
252 | ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock"); | |
253 | ceph::condition_variable cond; | |
254 | ||
11fdf7f2 | 255 | struct Waiter : boost::intrusive::list_base_hook<> { |
92f5a8d4 TL |
256 | using Executor = boost::asio::io_context::executor_type; |
257 | using Timer = boost::asio::basic_waitable_timer<Clock, | |
258 | boost::asio::wait_traits<Clock>, Executor>; | |
92f5a8d4 | 259 | Timer timer; |
11fdf7f2 TL |
260 | explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {} |
261 | }; | |
262 | boost::intrusive::list<Waiter> waiters; | |
31f18b77 FG |
263 | |
264 | bool going_down{false}; | |
265 | ||
31f18b77 | 266 | public: |
11fdf7f2 TL |
267 | RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5)) |
268 | : duration(duration) {} | |
31f18b77 | 269 | ~RGWReshardWait() { |
11fdf7f2 | 270 | ceph_assert(going_down); |
31f18b77 | 271 | } |
11fdf7f2 TL |
272 | int wait(optional_yield y); |
273 | // unblock any threads waiting on reshard | |
274 | void stop(); | |
31f18b77 FG |
275 | }; |
276 | ||
277 | #endif |