]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_reshard.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_reshard.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef RGW_RESHARD_H
5 #define RGW_RESHARD_H
6
7 #include <vector>
8 #include <initializer_list>
9 #include <functional>
10 #include <iterator>
11 #include <algorithm>
12
13 #include <boost/intrusive/list.hpp>
14 #include <boost/asio/basic_waitable_timer.hpp>
15
16 #include "include/common_fwd.h"
17 #include "include/rados/librados.hpp"
18 #include "common/ceph_time.h"
19 #include "common/async/yield_context.h"
20 #include "cls/rgw/cls_rgw_types.h"
21 #include "cls/lock/cls_lock_client.h"
22
23 #include "rgw_common.h"
24
25
26 class RGWReshard;
27 namespace rgw { namespace sal {
28 class RadosStore;
29 } }
30
31 class RGWBucketReshardLock {
32 using Clock = ceph::coarse_mono_clock;
33
34 rgw::sal::RadosStore* store;
35 const std::string lock_oid;
36 const bool ephemeral;
37 rados::cls::lock::Lock internal_lock;
38 std::chrono::seconds duration;
39
40 Clock::time_point start_time;
41 Clock::time_point renew_thresh;
42
43 void reset_time(const Clock::time_point& now) {
44 start_time = now;
45 renew_thresh = start_time + duration / 2;
46 }
47
48 public:
49 RGWBucketReshardLock(rgw::sal::RadosStore* _store,
50 const std::string& reshard_lock_oid,
51 bool _ephemeral);
52 RGWBucketReshardLock(rgw::sal::RadosStore* _store,
53 const RGWBucketInfo& bucket_info,
54 bool _ephemeral) :
55 RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
56 {}
57
58 int lock(const DoutPrefixProvider *dpp);
59 void unlock();
60 int renew(const Clock::time_point&);
61
62 bool should_renew(const Clock::time_point& now) const {
63 return now >= renew_thresh;
64 }
65 }; // class RGWBucketReshardLock
66
67 class RGWBucketReshard {
68 public:
69
70 friend class RGWReshard;
71
72 using Clock = ceph::coarse_mono_clock;
73
74 private:
75
76 rgw::sal::RadosStore* store;
77 RGWBucketInfo bucket_info;
78 std::map<std::string, bufferlist> bucket_attrs;
79
80 RGWBucketReshardLock reshard_lock;
81 RGWBucketReshardLock* outer_reshard_lock;
82
83 // using an initializer_list as an array in contiguous memory
84 // allocated in at once
85 static const std::initializer_list<uint16_t> reshard_primes;
86
87 int create_new_bucket_instance(int new_num_shards,
88 RGWBucketInfo& new_bucket_info,
89 const DoutPrefixProvider *dpp);
90 int do_reshard(int num_shards,
91 RGWBucketInfo& new_bucket_info,
92 int max_entries,
93 bool verbose,
94 std::ostream *os,
95 Formatter *formatter,
96 const DoutPrefixProvider *dpp);
97 public:
98
99 // pass nullptr for the final parameter if no outer reshard lock to
100 // manage
101 RGWBucketReshard(rgw::sal::RadosStore* _store,
102 const RGWBucketInfo& _bucket_info,
103 const std::map<std::string, bufferlist>& _bucket_attrs,
104 RGWBucketReshardLock* _outer_reshard_lock);
105 int execute(int num_shards, int max_op_entries,
106 const DoutPrefixProvider *dpp,
107 bool verbose = false, std::ostream *out = nullptr,
108 Formatter *formatter = nullptr,
109 RGWReshard *reshard_log = nullptr);
110 int get_status(const DoutPrefixProvider *dpp, std::list<cls_rgw_bucket_instance_entry> *status);
111 int cancel(const DoutPrefixProvider *dpp);
112 static int clear_resharding(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store,
113 const RGWBucketInfo& bucket_info);
114 int clear_resharding(const DoutPrefixProvider *dpp) {
115 return clear_resharding(dpp, store, bucket_info);
116 }
117 static int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp,
118 rgw::sal::RadosStore* store,
119 const RGWBucketInfo& bucket_info);
120 int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp) {
121 return clear_index_shard_reshard_status(dpp, store, bucket_info);
122 }
123 static int set_resharding_status(const DoutPrefixProvider *dpp,
124 rgw::sal::RadosStore* store,
125 const RGWBucketInfo& bucket_info,
126 const std::string& new_instance_id,
127 int32_t num_shards,
128 cls_rgw_reshard_status status);
129 int set_resharding_status(const DoutPrefixProvider *dpp, const std::string& new_instance_id,
130 int32_t num_shards,
131 cls_rgw_reshard_status status) {
132 return set_resharding_status(dpp, store, bucket_info,
133 new_instance_id, num_shards, status);
134 }
135
136 static uint32_t get_max_prime_shards() {
137 return *std::crbegin(reshard_primes);
138 }
139
140 // returns the prime in our list less than or equal to the
141 // parameter; the lowest value that can be returned is 1
142 static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards) {
143 auto it = std::upper_bound(reshard_primes.begin(), reshard_primes.end(),
144 requested_shards);
145 if (it == reshard_primes.begin()) {
146 return 1;
147 } else {
148 return *(--it);
149 }
150 }
151
152 // returns the prime in our list greater than or equal to the
153 // parameter; if we do not have such a prime, 0 is returned
154 static uint32_t get_prime_shards_greater_or_equal(
155 uint32_t requested_shards)
156 {
157 auto it = std::lower_bound(reshard_primes.begin(), reshard_primes.end(),
158 requested_shards);
159 if (it == reshard_primes.end()) {
160 return 0;
161 } else {
162 return *it;
163 }
164 }
165
166 // returns a preferred number of shards given a calculated number of
167 // shards based on max_dynamic_shards and the list of prime values
168 static uint32_t get_preferred_shards(uint32_t suggested_shards,
169 uint32_t max_dynamic_shards) {
170
171 // use a prime if max is within our prime range, otherwise use
172 // specified max
173 const uint32_t absolute_max =
174 max_dynamic_shards >= get_max_prime_shards() ?
175 max_dynamic_shards :
176 get_prime_shards_less_or_equal(max_dynamic_shards);
177
178 // if we can use a prime number, use it, otherwise use suggested;
179 // note get_prime_shards_greater_or_equal will return 0 if no prime in
180 // prime range
181 const uint32_t prime_ish_num_shards =
182 std::max(get_prime_shards_greater_or_equal(suggested_shards),
183 suggested_shards);
184
185 // dynamic sharding cannot reshard more than defined maximum
186 const uint32_t final_num_shards =
187 std::min(prime_ish_num_shards, absolute_max);
188
189 return final_num_shards;
190 }
191 }; // RGWBucketReshard
192
193
194 class RGWReshard {
195 public:
196 using Clock = ceph::coarse_mono_clock;
197
198 private:
199 rgw::sal::RadosStore* store;
200 std::string lock_name;
201 rados::cls::lock::Lock instance_lock;
202 int num_logshards;
203
204 bool verbose;
205 std::ostream *out;
206 Formatter *formatter;
207
208 void get_logshard_oid(int shard_num, std::string *shard);
209 protected:
210 class ReshardWorker : public Thread, public DoutPrefixProvider {
211 CephContext *cct;
212 RGWReshard *reshard;
213 ceph::mutex lock = ceph::make_mutex("ReshardWorker");
214 ceph::condition_variable cond;
215
216 public:
217 ReshardWorker(CephContext * const _cct,
218 RGWReshard * const _reshard)
219 : cct(_cct),
220 reshard(_reshard) {}
221
222 void *entry() override;
223 void stop();
224
225 CephContext *get_cct() const override;
226 unsigned get_subsys() const;
227 std::ostream& gen_prefix(std::ostream& out) const;
228 };
229
230 ReshardWorker *worker = nullptr;
231 std::atomic<bool> down_flag = { false };
232
233 std::string get_logshard_key(const std::string& tenant, const std::string& bucket_name);
234 void get_bucket_logshard_oid(const std::string& tenant, const std::string& bucket_name, std::string *oid);
235
236 public:
237 RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr);
238 int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
239 int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
240 int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
241 int remove(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
242 int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
243 int clear_bucket_resharding(const DoutPrefixProvider *dpp, const std::string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
244
245 /* reshard thread */
246 int process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp);
247 int process_all_logshards(const DoutPrefixProvider *dpp);
248 bool going_down();
249 void start_processor();
250 void stop_processor();
251 };
252
253 class RGWReshardWait {
254 public:
255 // the blocking wait uses std::condition_variable::wait_for(), which uses the
256 // std::chrono::steady_clock. use that for the async waits as well
257 using Clock = std::chrono::steady_clock;
258 private:
259 const ceph::timespan duration;
260 ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock");
261 ceph::condition_variable cond;
262
263 struct Waiter : boost::intrusive::list_base_hook<> {
264 using Executor = boost::asio::io_context::executor_type;
265 using Timer = boost::asio::basic_waitable_timer<Clock,
266 boost::asio::wait_traits<Clock>, Executor>;
267 Timer timer;
268 explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
269 };
270 boost::intrusive::list<Waiter> waiters;
271
272 bool going_down{false};
273
274 public:
275 RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5))
276 : duration(duration) {}
277 ~RGWReshardWait() {
278 ceph_assert(going_down);
279 }
280 int wait(optional_yield y);
281 // unblock any threads waiting on reshard
282 void stop();
283 };
284
285 #endif