]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_reshard.h
CommitLineData
31f18b77 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
31f18b77
FG
3
4#ifndef RGW_RESHARD_H
5#define RGW_RESHARD_H
6
7#include <vector>
9f95a23c 8#include <initializer_list>
f64942e4 9#include <functional>
9f95a23c
TL
10#include <iterator>
11#include <algorithm>
f64942e4 12
11fdf7f2 13#include <boost/intrusive/list.hpp>
9f95a23c 14#include <boost/asio/basic_waitable_timer.hpp>
11fdf7f2 15
9f95a23c 16#include "include/common_fwd.h"
31f18b77 17#include "include/rados/librados.hpp"
f64942e4 18#include "common/ceph_time.h"
9f95a23c 19#include "common/async/yield_context.h"
31f18b77
FG
20#include "cls/rgw/cls_rgw_types.h"
21#include "cls/lock/cls_lock_client.h"
31f18b77 22
9f95a23c 23#include "rgw_common.h"
f64942e4 24
9f95a23c
TL
25
26class RGWReshard;
27namespace rgw { namespace sal {
28 class RGWRadosStore;
29} }
31f18b77 30
f64942e4
AA
31class RGWBucketReshardLock {
32 using Clock = ceph::coarse_mono_clock;
33
9f95a23c 34 rgw::sal::RGWRadosStore* store;
f64942e4
AA
35 const std::string lock_oid;
36 const bool ephemeral;
37 rados::cls::lock::Lock internal_lock;
38 std::chrono::seconds duration;
39
40 Clock::time_point start_time;
41 Clock::time_point renew_thresh;
42
43 void reset_time(const Clock::time_point& now) {
44 start_time = now;
45 renew_thresh = start_time + duration / 2;
46 }
47
48public:
9f95a23c 49 RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
f64942e4
AA
50 const std::string& reshard_lock_oid,
51 bool _ephemeral);
9f95a23c 52 RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
f64942e4
AA
53 const RGWBucketInfo& bucket_info,
54 bool _ephemeral) :
55 RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
56 {}
57
58 int lock();
59 void unlock();
60 int renew(const Clock::time_point&);
61
62 bool should_renew(const Clock::time_point& now) const {
63 return now >= renew_thresh;
64 }
65}; // class RGWBucketReshardLock
31f18b77
FG
66
67class RGWBucketReshard {
f64942e4
AA
68public:
69
31f18b77
FG
70 friend class RGWReshard;
71
f64942e4
AA
72 using Clock = ceph::coarse_mono_clock;
73
74private:
75
9f95a23c 76 rgw::sal::RGWRadosStore *store;
31f18b77
FG
77 RGWBucketInfo bucket_info;
78 std::map<string, bufferlist> bucket_attrs;
79
f64942e4
AA
80 RGWBucketReshardLock reshard_lock;
81 RGWBucketReshardLock* outer_reshard_lock;
31f18b77 82
9f95a23c
TL
83 // using an initializer_list as an array in contiguous memory
84 // allocated in at once
85 static const std::initializer_list<uint16_t> reshard_primes;
86
f64942e4
AA
87 int create_new_bucket_instance(int new_num_shards,
88 RGWBucketInfo& new_bucket_info);
31f18b77 89 int do_reshard(int num_shards,
b32b8144 90 RGWBucketInfo& new_bucket_info,
31f18b77
FG
91 int max_entries,
92 bool verbose,
93 ostream *os,
94 Formatter *formatter);
95public:
31f18b77 96
f64942e4
AA
97 // pass nullptr for the final parameter if no outer reshard lock to
98 // manage
9f95a23c
TL
99 RGWBucketReshard(rgw::sal::RGWRadosStore *_store,
100 const RGWBucketInfo& _bucket_info,
f64942e4
AA
101 const std::map<string, bufferlist>& _bucket_attrs,
102 RGWBucketReshardLock* _outer_reshard_lock);
31f18b77
FG
103 int execute(int num_shards, int max_op_entries,
104 bool verbose = false, ostream *out = nullptr,
105 Formatter *formatter = nullptr,
106 RGWReshard *reshard_log = nullptr);
31f18b77 107 int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
94b18763 108 int cancel();
9f95a23c 109 static int clear_resharding(rgw::sal::RGWRadosStore* store,
f64942e4
AA
110 const RGWBucketInfo& bucket_info);
111 int clear_resharding() {
112 return clear_resharding(store, bucket_info);
113 }
9f95a23c 114 static int clear_index_shard_reshard_status(rgw::sal::RGWRadosStore* store,
f64942e4
AA
115 const RGWBucketInfo& bucket_info);
116 int clear_index_shard_reshard_status() {
117 return clear_index_shard_reshard_status(store, bucket_info);
118 }
9f95a23c 119 static int set_resharding_status(rgw::sal::RGWRadosStore* store,
f64942e4
AA
120 const RGWBucketInfo& bucket_info,
121 const string& new_instance_id,
122 int32_t num_shards,
123 cls_rgw_reshard_status status);
124 int set_resharding_status(const string& new_instance_id,
125 int32_t num_shards,
126 cls_rgw_reshard_status status) {
127 return set_resharding_status(store, bucket_info,
128 new_instance_id, num_shards, status);
129 }
9f95a23c
TL
130
131 static uint32_t get_max_prime_shards() {
132 return *std::crbegin(reshard_primes);
133 }
134
135 // returns the prime in our list less than or equal to the
136 // parameter; the lowest value that can be returned is 1
137 static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards) {
138 auto it = std::upper_bound(reshard_primes.begin(), reshard_primes.end(),
139 requested_shards);
140 if (it == reshard_primes.begin()) {
141 return 1;
142 } else {
143 return *(--it);
144 }
145 }
146
147 // returns the prime in our list greater than or equal to the
148 // parameter; if we do not have such a prime, 0 is returned
149 static uint32_t get_prime_shards_greater_or_equal(
150 uint32_t requested_shards)
151 {
152 auto it = std::lower_bound(reshard_primes.begin(), reshard_primes.end(),
153 requested_shards);
154 if (it == reshard_primes.end()) {
155 return 0;
156 } else {
157 return *it;
158 }
159 }
160
161 // returns a preferred number of shards given a calculated number of
162 // shards based on max_dynamic_shards and the list of prime values
163 static uint32_t get_preferred_shards(uint32_t suggested_shards,
164 uint32_t max_dynamic_shards) {
165
166 // use a prime if max is within our prime range, otherwise use
167 // specified max
168 const uint32_t absolute_max =
169 max_dynamic_shards >= get_max_prime_shards() ?
170 max_dynamic_shards :
171 get_prime_shards_less_or_equal(max_dynamic_shards);
172
173 // if we can use a prime number, use it, otherwise use suggested;
174 // note get_prime_shards_greater_or_equal will return 0 if no prime in
175 // prime range
176 const uint32_t prime_ish_num_shards =
177 std::max(get_prime_shards_greater_or_equal(suggested_shards),
178 suggested_shards);
179
180 // dynamic sharding cannot reshard more than defined maximum
181 const uint32_t final_num_shards =
182 std::min(prime_ish_num_shards, absolute_max);
183
184 return final_num_shards;
185 }
f64942e4 186}; // RGWBucketReshard
31f18b77 187
9f95a23c 188
31f18b77 189class RGWReshard {
f64942e4
AA
190public:
191 using Clock = ceph::coarse_mono_clock;
192
193private:
9f95a23c 194 rgw::sal::RGWRadosStore *store;
31f18b77
FG
195 string lock_name;
196 rados::cls::lock::Lock instance_lock;
197 int num_logshards;
198
199 bool verbose;
200 ostream *out;
201 Formatter *formatter;
202
203 void get_logshard_oid(int shard_num, string *shard);
204protected:
205 class ReshardWorker : public Thread {
206 CephContext *cct;
207 RGWReshard *reshard;
9f95a23c
TL
208 ceph::mutex lock = ceph::make_mutex("ReshardWorker");
209 ceph::condition_variable cond;
31f18b77
FG
210
211 public:
212 ReshardWorker(CephContext * const _cct,
9f95a23c 213 RGWReshard * const _reshard)
31f18b77 214 : cct(_cct),
9f95a23c 215 reshard(_reshard) {
31f18b77
FG
216 }
217
218 void *entry() override;
219 void stop();
220 };
221
224ce89b 222 ReshardWorker *worker = nullptr;
31f18b77
FG
223 std::atomic<bool> down_flag = { false };
224
225 string get_logshard_key(const string& tenant, const string& bucket_name);
226 void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
227
228public:
9f95a23c 229 RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
31f18b77
FG
230 int add(cls_rgw_reshard_entry& entry);
231 int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
232 int get(cls_rgw_reshard_entry& entry);
233 int remove(cls_rgw_reshard_entry& entry);
234 int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
235 int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
236
237 /* reshard thread */
238 int process_single_logshard(int logshard_num);
239 int process_all_logshards();
240 bool going_down();
241 void start_processor();
242 void stop_processor();
243};
244
31f18b77 245class RGWReshardWait {
81eedcae
TL
246 public:
247 // the blocking wait uses std::condition_variable::wait_for(), which uses the
248 // std::chrono::steady_clock. use that for the async waits as well
249 using Clock = std::chrono::steady_clock;
250 private:
11fdf7f2
TL
251 const ceph::timespan duration;
252 ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock");
253 ceph::condition_variable cond;
254
11fdf7f2 255 struct Waiter : boost::intrusive::list_base_hook<> {
92f5a8d4
TL
256 using Executor = boost::asio::io_context::executor_type;
257 using Timer = boost::asio::basic_waitable_timer<Clock,
258 boost::asio::wait_traits<Clock>, Executor>;
92f5a8d4 259 Timer timer;
11fdf7f2
TL
260 explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
261 };
262 boost::intrusive::list<Waiter> waiters;
31f18b77
FG
263
264 bool going_down{false};
265
31f18b77 266public:
11fdf7f2
TL
267 RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5))
268 : duration(duration) {}
31f18b77 269 ~RGWReshardWait() {
11fdf7f2 270 ceph_assert(going_down);
31f18b77 271 }
11fdf7f2
TL
272 int wait(optional_yield y);
273 // unblock any threads waiting on reshard
274 void stop();
31f18b77
FG
275};
276
277#endif