]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.h
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_reshard.h
CommitLineData
31f18b77
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef RGW_RESHARD_H
5#define RGW_RESHARD_H
6
7#include <vector>
f64942e4
AA
8#include <functional>
9
11fdf7f2
TL
10#include <boost/intrusive/list.hpp>
11
31f18b77 12#include "include/rados/librados.hpp"
f64942e4 13#include "common/ceph_time.h"
31f18b77
FG
14#include "cls/rgw/cls_rgw_types.h"
15#include "cls/lock/cls_lock_client.h"
16#include "rgw_bucket.h"
17
f64942e4 18
31f18b77
FG
19class CephContext;
20class RGWRados;
21
f64942e4
AA
22class RGWBucketReshardLock {
23 using Clock = ceph::coarse_mono_clock;
24
25 RGWRados* store;
26 const std::string lock_oid;
27 const bool ephemeral;
28 rados::cls::lock::Lock internal_lock;
29 std::chrono::seconds duration;
30
31 Clock::time_point start_time;
32 Clock::time_point renew_thresh;
33
34 void reset_time(const Clock::time_point& now) {
35 start_time = now;
36 renew_thresh = start_time + duration / 2;
37 }
38
39public:
40 RGWBucketReshardLock(RGWRados* _store,
41 const std::string& reshard_lock_oid,
42 bool _ephemeral);
43 RGWBucketReshardLock(RGWRados* _store,
44 const RGWBucketInfo& bucket_info,
45 bool _ephemeral) :
46 RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
47 {}
48
49 int lock();
50 void unlock();
51 int renew(const Clock::time_point&);
52
53 bool should_renew(const Clock::time_point& now) const {
54 return now >= renew_thresh;
55 }
56}; // class RGWBucketReshardLock
31f18b77
FG
57
58class RGWBucketReshard {
f64942e4
AA
59public:
60
31f18b77
FG
61 friend class RGWReshard;
62
f64942e4
AA
63 using Clock = ceph::coarse_mono_clock;
64
65private:
66
31f18b77
FG
67 RGWRados *store;
68 RGWBucketInfo bucket_info;
69 std::map<string, bufferlist> bucket_attrs;
70
f64942e4
AA
71 RGWBucketReshardLock reshard_lock;
72 RGWBucketReshardLock* outer_reshard_lock;
31f18b77 73
f64942e4
AA
74 int create_new_bucket_instance(int new_num_shards,
75 RGWBucketInfo& new_bucket_info);
31f18b77 76 int do_reshard(int num_shards,
b32b8144 77 RGWBucketInfo& new_bucket_info,
31f18b77
FG
78 int max_entries,
79 bool verbose,
80 ostream *os,
81 Formatter *formatter);
82public:
31f18b77 83
f64942e4
AA
84 // pass nullptr for the final parameter if no outer reshard lock to
85 // manage
86 RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
87 const std::map<string, bufferlist>& _bucket_attrs,
88 RGWBucketReshardLock* _outer_reshard_lock);
31f18b77
FG
89 int execute(int num_shards, int max_op_entries,
90 bool verbose = false, ostream *out = nullptr,
91 Formatter *formatter = nullptr,
92 RGWReshard *reshard_log = nullptr);
31f18b77 93 int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
94b18763 94 int cancel();
f64942e4
AA
95 static int clear_resharding(RGWRados* store,
96 const RGWBucketInfo& bucket_info);
97 int clear_resharding() {
98 return clear_resharding(store, bucket_info);
99 }
100 static int clear_index_shard_reshard_status(RGWRados* store,
101 const RGWBucketInfo& bucket_info);
102 int clear_index_shard_reshard_status() {
103 return clear_index_shard_reshard_status(store, bucket_info);
104 }
105 static int set_resharding_status(RGWRados* store,
106 const RGWBucketInfo& bucket_info,
107 const string& new_instance_id,
108 int32_t num_shards,
109 cls_rgw_reshard_status status);
110 int set_resharding_status(const string& new_instance_id,
111 int32_t num_shards,
112 cls_rgw_reshard_status status) {
113 return set_resharding_status(store, bucket_info,
114 new_instance_id, num_shards, status);
115 }
116}; // RGWBucketReshard
31f18b77
FG
117
118class RGWReshard {
f64942e4
AA
119public:
120 using Clock = ceph::coarse_mono_clock;
121
122private:
31f18b77
FG
123 RGWRados *store;
124 string lock_name;
125 rados::cls::lock::Lock instance_lock;
126 int num_logshards;
127
128 bool verbose;
129 ostream *out;
130 Formatter *formatter;
131
132 void get_logshard_oid(int shard_num, string *shard);
133protected:
134 class ReshardWorker : public Thread {
135 CephContext *cct;
136 RGWReshard *reshard;
137 Mutex lock;
138 Cond cond;
139
140 public:
141 ReshardWorker(CephContext * const _cct,
142 RGWReshard * const _reshard)
143 : cct(_cct),
144 reshard(_reshard),
145 lock("ReshardWorker") {
146 }
147
148 void *entry() override;
149 void stop();
150 };
151
224ce89b 152 ReshardWorker *worker = nullptr;
31f18b77
FG
153 std::atomic<bool> down_flag = { false };
154
155 string get_logshard_key(const string& tenant, const string& bucket_name);
156 void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
157
158public:
159 RGWReshard(RGWRados* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
160 int add(cls_rgw_reshard_entry& entry);
161 int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
162 int get(cls_rgw_reshard_entry& entry);
163 int remove(cls_rgw_reshard_entry& entry);
164 int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
165 int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
166
167 /* reshard thread */
168 int process_single_logshard(int logshard_num);
169 int process_all_logshards();
170 bool going_down();
171 void start_processor();
172 void stop_processor();
173};
174
31f18b77 175class RGWReshardWait {
81eedcae
TL
176 public:
177 // the blocking wait uses std::condition_variable::wait_for(), which uses the
178 // std::chrono::steady_clock. use that for the async waits as well
179 using Clock = std::chrono::steady_clock;
180 private:
11fdf7f2
TL
181 const ceph::timespan duration;
182 ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock");
183 ceph::condition_variable cond;
184
11fdf7f2 185 struct Waiter : boost::intrusive::list_base_hook<> {
92f5a8d4
TL
186#if BOOST_VERSION < 107000
187 using Timer = boost::asio::basic_waitable_timer<Clock>;
188#else
189 using Executor = boost::asio::io_context::executor_type;
190 using Timer = boost::asio::basic_waitable_timer<Clock,
191 boost::asio::wait_traits<Clock>, Executor>;
192#endif
193 Timer timer;
11fdf7f2
TL
194 explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
195 };
196 boost::intrusive::list<Waiter> waiters;
31f18b77
FG
197
198 bool going_down{false};
199
31f18b77 200public:
11fdf7f2
TL
201 RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5))
202 : duration(duration) {}
31f18b77 203 ~RGWReshardWait() {
11fdf7f2 204 ceph_assert(going_down);
31f18b77 205 }
11fdf7f2
TL
206 int wait(optional_yield y);
207 // unblock any threads waiting on reshard
208 void stop();
31f18b77
FG
209};
210
211#endif