]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.h
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / rgw_reshard.h
CommitLineData
31f18b77 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
31f18b77
FG
3
4#ifndef RGW_RESHARD_H
5#define RGW_RESHARD_H
6
7#include <vector>
9f95a23c 8#include <initializer_list>
f64942e4 9#include <functional>
9f95a23c
TL
10#include <iterator>
11#include <algorithm>
f64942e4 12
11fdf7f2 13#include <boost/intrusive/list.hpp>
9f95a23c 14#include <boost/asio/basic_waitable_timer.hpp>
11fdf7f2 15
9f95a23c 16#include "include/common_fwd.h"
31f18b77 17#include "include/rados/librados.hpp"
f64942e4 18#include "common/ceph_time.h"
9f95a23c 19#include "common/async/yield_context.h"
31f18b77
FG
20#include "cls/rgw/cls_rgw_types.h"
21#include "cls/lock/cls_lock_client.h"
31f18b77 22
9f95a23c 23#include "rgw_common.h"
f64942e4 24
9f95a23c
TL
25
26class RGWReshard;
27namespace rgw { namespace sal {
20effc67 28 class RadosStore;
9f95a23c 29} }
31f18b77 30
f64942e4
AA
31class RGWBucketReshardLock {
32 using Clock = ceph::coarse_mono_clock;
33
20effc67 34 rgw::sal::RadosStore* store;
f64942e4
AA
35 const std::string lock_oid;
36 const bool ephemeral;
37 rados::cls::lock::Lock internal_lock;
38 std::chrono::seconds duration;
39
40 Clock::time_point start_time;
41 Clock::time_point renew_thresh;
42
43 void reset_time(const Clock::time_point& now) {
44 start_time = now;
45 renew_thresh = start_time + duration / 2;
46 }
47
48public:
20effc67 49 RGWBucketReshardLock(rgw::sal::RadosStore* _store,
f64942e4
AA
50 const std::string& reshard_lock_oid,
51 bool _ephemeral);
20effc67 52 RGWBucketReshardLock(rgw::sal::RadosStore* _store,
f64942e4
AA
53 const RGWBucketInfo& bucket_info,
54 bool _ephemeral) :
55 RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
56 {}
57
20effc67 58 int lock(const DoutPrefixProvider *dpp);
f64942e4
AA
59 void unlock();
60 int renew(const Clock::time_point&);
61
62 bool should_renew(const Clock::time_point& now) const {
63 return now >= renew_thresh;
64 }
65}; // class RGWBucketReshardLock
31f18b77
FG
66
67class RGWBucketReshard {
f64942e4
AA
68public:
69
31f18b77
FG
70 friend class RGWReshard;
71
f64942e4
AA
72 using Clock = ceph::coarse_mono_clock;
73
74private:
75
20effc67 76 rgw::sal::RadosStore* store;
31f18b77 77 RGWBucketInfo bucket_info;
20effc67 78 std::map<std::string, bufferlist> bucket_attrs;
31f18b77 79
f64942e4
AA
80 RGWBucketReshardLock reshard_lock;
81 RGWBucketReshardLock* outer_reshard_lock;
31f18b77 82
9f95a23c
TL
83 // using an initializer_list as an array in contiguous memory
84 // allocated in at once
85 static const std::initializer_list<uint16_t> reshard_primes;
86
f64942e4 87 int create_new_bucket_instance(int new_num_shards,
b3b6e05e
TL
88 RGWBucketInfo& new_bucket_info,
89 const DoutPrefixProvider *dpp);
31f18b77 90 int do_reshard(int num_shards,
b32b8144 91 RGWBucketInfo& new_bucket_info,
31f18b77
FG
92 int max_entries,
93 bool verbose,
20effc67 94 std::ostream *os,
b3b6e05e
TL
95 Formatter *formatter,
96 const DoutPrefixProvider *dpp);
31f18b77 97public:
31f18b77 98
f64942e4
AA
99 // pass nullptr for the final parameter if no outer reshard lock to
100 // manage
20effc67 101 RGWBucketReshard(rgw::sal::RadosStore* _store,
9f95a23c 102 const RGWBucketInfo& _bucket_info,
20effc67 103 const std::map<std::string, bufferlist>& _bucket_attrs,
f64942e4 104 RGWBucketReshardLock* _outer_reshard_lock);
31f18b77 105 int execute(int num_shards, int max_op_entries,
b3b6e05e 106 const DoutPrefixProvider *dpp,
20effc67 107 bool verbose = false, std::ostream *out = nullptr,
31f18b77
FG
108 Formatter *formatter = nullptr,
109 RGWReshard *reshard_log = nullptr);
b3b6e05e
TL
110 int get_status(const DoutPrefixProvider *dpp, std::list<cls_rgw_bucket_instance_entry> *status);
111 int cancel(const DoutPrefixProvider *dpp);
20effc67 112 static int clear_resharding(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store,
f64942e4 113 const RGWBucketInfo& bucket_info);
b3b6e05e
TL
114 int clear_resharding(const DoutPrefixProvider *dpp) {
115 return clear_resharding(dpp, store, bucket_info);
f64942e4 116 }
b3b6e05e 117 static int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp,
20effc67 118 rgw::sal::RadosStore* store,
f64942e4 119 const RGWBucketInfo& bucket_info);
b3b6e05e
TL
120 int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp) {
121 return clear_index_shard_reshard_status(dpp, store, bucket_info);
f64942e4 122 }
b3b6e05e 123 static int set_resharding_status(const DoutPrefixProvider *dpp,
20effc67 124 rgw::sal::RadosStore* store,
f64942e4 125 const RGWBucketInfo& bucket_info,
20effc67 126 const std::string& new_instance_id,
f64942e4
AA
127 int32_t num_shards,
128 cls_rgw_reshard_status status);
20effc67 129 int set_resharding_status(const DoutPrefixProvider *dpp, const std::string& new_instance_id,
f64942e4
AA
130 int32_t num_shards,
131 cls_rgw_reshard_status status) {
b3b6e05e 132 return set_resharding_status(dpp, store, bucket_info,
f64942e4
AA
133 new_instance_id, num_shards, status);
134 }
9f95a23c
TL
135
136 static uint32_t get_max_prime_shards() {
137 return *std::crbegin(reshard_primes);
138 }
139
140 // returns the prime in our list less than or equal to the
141 // parameter; the lowest value that can be returned is 1
142 static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards) {
143 auto it = std::upper_bound(reshard_primes.begin(), reshard_primes.end(),
144 requested_shards);
145 if (it == reshard_primes.begin()) {
146 return 1;
147 } else {
148 return *(--it);
149 }
150 }
151
152 // returns the prime in our list greater than or equal to the
153 // parameter; if we do not have such a prime, 0 is returned
154 static uint32_t get_prime_shards_greater_or_equal(
155 uint32_t requested_shards)
156 {
157 auto it = std::lower_bound(reshard_primes.begin(), reshard_primes.end(),
158 requested_shards);
159 if (it == reshard_primes.end()) {
160 return 0;
161 } else {
162 return *it;
163 }
164 }
165
166 // returns a preferred number of shards given a calculated number of
167 // shards based on max_dynamic_shards and the list of prime values
168 static uint32_t get_preferred_shards(uint32_t suggested_shards,
169 uint32_t max_dynamic_shards) {
170
171 // use a prime if max is within our prime range, otherwise use
172 // specified max
173 const uint32_t absolute_max =
174 max_dynamic_shards >= get_max_prime_shards() ?
175 max_dynamic_shards :
176 get_prime_shards_less_or_equal(max_dynamic_shards);
177
178 // if we can use a prime number, use it, otherwise use suggested;
179 // note get_prime_shards_greater_or_equal will return 0 if no prime in
180 // prime range
181 const uint32_t prime_ish_num_shards =
182 std::max(get_prime_shards_greater_or_equal(suggested_shards),
183 suggested_shards);
184
185 // dynamic sharding cannot reshard more than defined maximum
186 const uint32_t final_num_shards =
187 std::min(prime_ish_num_shards, absolute_max);
188
189 return final_num_shards;
190 }
f64942e4 191}; // RGWBucketReshard
31f18b77 192
9f95a23c 193
31f18b77 194class RGWReshard {
f64942e4
AA
195public:
196 using Clock = ceph::coarse_mono_clock;
197
198private:
20effc67
TL
199 rgw::sal::RadosStore* store;
200 std::string lock_name;
31f18b77
FG
201 rados::cls::lock::Lock instance_lock;
202 int num_logshards;
203
204 bool verbose;
20effc67 205 std::ostream *out;
31f18b77
FG
206 Formatter *formatter;
207
20effc67 208 void get_logshard_oid(int shard_num, std::string *shard);
31f18b77 209protected:
b3b6e05e 210 class ReshardWorker : public Thread, public DoutPrefixProvider {
31f18b77
FG
211 CephContext *cct;
212 RGWReshard *reshard;
9f95a23c
TL
213 ceph::mutex lock = ceph::make_mutex("ReshardWorker");
214 ceph::condition_variable cond;
31f18b77
FG
215
216 public:
217 ReshardWorker(CephContext * const _cct,
9f95a23c 218 RGWReshard * const _reshard)
31f18b77 219 : cct(_cct),
b3b6e05e 220 reshard(_reshard) {}
31f18b77
FG
221
222 void *entry() override;
223 void stop();
b3b6e05e
TL
224
225 CephContext *get_cct() const override;
226 unsigned get_subsys() const;
227 std::ostream& gen_prefix(std::ostream& out) const;
31f18b77
FG
228 };
229
224ce89b 230 ReshardWorker *worker = nullptr;
31f18b77
FG
231 std::atomic<bool> down_flag = { false };
232
20effc67
TL
233 std::string get_logshard_key(const std::string& tenant, const std::string& bucket_name);
234 void get_bucket_logshard_oid(const std::string& tenant, const std::string& bucket_name, std::string *oid);
31f18b77
FG
235
236public:
20effc67 237 RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr);
b3b6e05e
TL
238 int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
239 int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
20effc67 240 int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
b3b6e05e 241 int remove(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
20effc67
TL
242 int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
243 int clear_bucket_resharding(const DoutPrefixProvider *dpp, const std::string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
31f18b77
FG
244
245 /* reshard thread */
b3b6e05e
TL
246 int process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp);
247 int process_all_logshards(const DoutPrefixProvider *dpp);
31f18b77
FG
248 bool going_down();
249 void start_processor();
250 void stop_processor();
251};
252
31f18b77 253class RGWReshardWait {
81eedcae
TL
254 public:
255 // the blocking wait uses std::condition_variable::wait_for(), which uses the
256 // std::chrono::steady_clock. use that for the async waits as well
257 using Clock = std::chrono::steady_clock;
258 private:
11fdf7f2
TL
259 const ceph::timespan duration;
260 ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock");
261 ceph::condition_variable cond;
262
11fdf7f2 263 struct Waiter : boost::intrusive::list_base_hook<> {
92f5a8d4
TL
264 using Executor = boost::asio::io_context::executor_type;
265 using Timer = boost::asio::basic_waitable_timer<Clock,
266 boost::asio::wait_traits<Clock>, Executor>;
92f5a8d4 267 Timer timer;
11fdf7f2
TL
268 explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
269 };
270 boost::intrusive::list<Waiter> waiters;
31f18b77
FG
271
272 bool going_down{false};
273
31f18b77 274public:
11fdf7f2
TL
275 RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5))
276 : duration(duration) {}
31f18b77 277 ~RGWReshardWait() {
11fdf7f2 278 ceph_assert(going_down);
31f18b77 279 }
11fdf7f2
TL
280 int wait(optional_yield y);
281 // unblock any threads waiting on reshard
282 void stop();
31f18b77
FG
283};
284
285#endif