]>
Commit | Line | Data |
---|---|---|
31f18b77 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef RGW_RESHARD_H | |
5 | #define RGW_RESHARD_H | |
6 | ||
7 | #include <vector> | |
f64942e4 AA |
8 | #include <functional> |
9 | ||
11fdf7f2 TL |
10 | #include <boost/intrusive/list.hpp> |
11 | ||
31f18b77 | 12 | #include "include/rados/librados.hpp" |
f64942e4 | 13 | #include "common/ceph_time.h" |
31f18b77 FG |
14 | #include "cls/rgw/cls_rgw_types.h" |
15 | #include "cls/lock/cls_lock_client.h" | |
16 | #include "rgw_bucket.h" | |
17 | ||
f64942e4 | 18 | |
31f18b77 FG |
19 | class CephContext; |
20 | class RGWRados; | |
21 | ||
f64942e4 AA |
22 | class RGWBucketReshardLock { |
23 | using Clock = ceph::coarse_mono_clock; | |
24 | ||
25 | RGWRados* store; | |
26 | const std::string lock_oid; | |
27 | const bool ephemeral; | |
28 | rados::cls::lock::Lock internal_lock; | |
29 | std::chrono::seconds duration; | |
30 | ||
31 | Clock::time_point start_time; | |
32 | Clock::time_point renew_thresh; | |
33 | ||
34 | void reset_time(const Clock::time_point& now) { | |
35 | start_time = now; | |
36 | renew_thresh = start_time + duration / 2; | |
37 | } | |
38 | ||
39 | public: | |
40 | RGWBucketReshardLock(RGWRados* _store, | |
41 | const std::string& reshard_lock_oid, | |
42 | bool _ephemeral); | |
43 | RGWBucketReshardLock(RGWRados* _store, | |
44 | const RGWBucketInfo& bucket_info, | |
45 | bool _ephemeral) : | |
46 | RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral) | |
47 | {} | |
48 | ||
49 | int lock(); | |
50 | void unlock(); | |
51 | int renew(const Clock::time_point&); | |
52 | ||
53 | bool should_renew(const Clock::time_point& now) const { | |
54 | return now >= renew_thresh; | |
55 | } | |
56 | }; // class RGWBucketReshardLock | |
31f18b77 FG |
57 | |
58 | class RGWBucketReshard { | |
f64942e4 AA |
59 | public: |
60 | ||
31f18b77 FG |
61 | friend class RGWReshard; |
62 | ||
f64942e4 AA |
63 | using Clock = ceph::coarse_mono_clock; |
64 | ||
65 | private: | |
66 | ||
31f18b77 FG |
67 | RGWRados *store; |
68 | RGWBucketInfo bucket_info; | |
69 | std::map<string, bufferlist> bucket_attrs; | |
70 | ||
f64942e4 AA |
71 | RGWBucketReshardLock reshard_lock; |
72 | RGWBucketReshardLock* outer_reshard_lock; | |
31f18b77 | 73 | |
f64942e4 AA |
74 | int create_new_bucket_instance(int new_num_shards, |
75 | RGWBucketInfo& new_bucket_info); | |
31f18b77 | 76 | int do_reshard(int num_shards, |
b32b8144 | 77 | RGWBucketInfo& new_bucket_info, |
31f18b77 FG |
78 | int max_entries, |
79 | bool verbose, | |
80 | ostream *os, | |
81 | Formatter *formatter); | |
82 | public: | |
31f18b77 | 83 | |
f64942e4 AA |
84 | // pass nullptr for the final parameter if no outer reshard lock to |
85 | // manage | |
86 | RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, | |
87 | const std::map<string, bufferlist>& _bucket_attrs, | |
88 | RGWBucketReshardLock* _outer_reshard_lock); | |
31f18b77 FG |
89 | int execute(int num_shards, int max_op_entries, |
90 | bool verbose = false, ostream *out = nullptr, | |
91 | Formatter *formatter = nullptr, | |
92 | RGWReshard *reshard_log = nullptr); | |
31f18b77 | 93 | int get_status(std::list<cls_rgw_bucket_instance_entry> *status); |
94b18763 | 94 | int cancel(); |
f64942e4 AA |
95 | static int clear_resharding(RGWRados* store, |
96 | const RGWBucketInfo& bucket_info); | |
97 | int clear_resharding() { | |
98 | return clear_resharding(store, bucket_info); | |
99 | } | |
100 | static int clear_index_shard_reshard_status(RGWRados* store, | |
101 | const RGWBucketInfo& bucket_info); | |
102 | int clear_index_shard_reshard_status() { | |
103 | return clear_index_shard_reshard_status(store, bucket_info); | |
104 | } | |
105 | static int set_resharding_status(RGWRados* store, | |
106 | const RGWBucketInfo& bucket_info, | |
107 | const string& new_instance_id, | |
108 | int32_t num_shards, | |
109 | cls_rgw_reshard_status status); | |
110 | int set_resharding_status(const string& new_instance_id, | |
111 | int32_t num_shards, | |
112 | cls_rgw_reshard_status status) { | |
113 | return set_resharding_status(store, bucket_info, | |
114 | new_instance_id, num_shards, status); | |
115 | } | |
116 | }; // RGWBucketReshard | |
31f18b77 FG |
117 | |
118 | class RGWReshard { | |
f64942e4 AA |
119 | public: |
120 | using Clock = ceph::coarse_mono_clock; | |
121 | ||
122 | private: | |
31f18b77 FG |
123 | RGWRados *store; |
124 | string lock_name; | |
125 | rados::cls::lock::Lock instance_lock; | |
126 | int num_logshards; | |
127 | ||
128 | bool verbose; | |
129 | ostream *out; | |
130 | Formatter *formatter; | |
131 | ||
132 | void get_logshard_oid(int shard_num, string *shard); | |
133 | protected: | |
134 | class ReshardWorker : public Thread { | |
135 | CephContext *cct; | |
136 | RGWReshard *reshard; | |
137 | Mutex lock; | |
138 | Cond cond; | |
139 | ||
140 | public: | |
141 | ReshardWorker(CephContext * const _cct, | |
142 | RGWReshard * const _reshard) | |
143 | : cct(_cct), | |
144 | reshard(_reshard), | |
145 | lock("ReshardWorker") { | |
146 | } | |
147 | ||
148 | void *entry() override; | |
149 | void stop(); | |
150 | }; | |
151 | ||
224ce89b | 152 | ReshardWorker *worker = nullptr; |
31f18b77 FG |
153 | std::atomic<bool> down_flag = { false }; |
154 | ||
155 | string get_logshard_key(const string& tenant, const string& bucket_name); | |
156 | void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid); | |
157 | ||
158 | public: | |
159 | RGWReshard(RGWRados* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr); | |
160 | int add(cls_rgw_reshard_entry& entry); | |
161 | int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info); | |
162 | int get(cls_rgw_reshard_entry& entry); | |
163 | int remove(cls_rgw_reshard_entry& entry); | |
164 | int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated); | |
165 | int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); | |
166 | ||
167 | /* reshard thread */ | |
168 | int process_single_logshard(int logshard_num); | |
169 | int process_all_logshards(); | |
170 | bool going_down(); | |
171 | void start_processor(); | |
172 | void stop_processor(); | |
173 | }; | |
174 | ||
31f18b77 | 175 | class RGWReshardWait { |
81eedcae TL |
176 | public: |
177 | // the blocking wait uses std::condition_variable::wait_for(), which uses the | |
178 | // std::chrono::steady_clock. use that for the async waits as well | |
179 | using Clock = std::chrono::steady_clock; | |
180 | private: | |
11fdf7f2 TL |
181 | const ceph::timespan duration; |
182 | ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock"); | |
183 | ceph::condition_variable cond; | |
184 | ||
11fdf7f2 | 185 | struct Waiter : boost::intrusive::list_base_hook<> { |
92f5a8d4 TL |
186 | #if BOOST_VERSION < 107000 |
187 | using Timer = boost::asio::basic_waitable_timer<Clock>; | |
188 | #else | |
189 | using Executor = boost::asio::io_context::executor_type; | |
190 | using Timer = boost::asio::basic_waitable_timer<Clock, | |
191 | boost::asio::wait_traits<Clock>, Executor>; | |
192 | #endif | |
193 | Timer timer; | |
11fdf7f2 TL |
194 | explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {} |
195 | }; | |
196 | boost::intrusive::list<Waiter> waiters; | |
31f18b77 FG |
197 | |
198 | bool going_down{false}; | |
199 | ||
31f18b77 | 200 | public: |
11fdf7f2 TL |
201 | RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5)) |
202 | : duration(duration) {} | |
31f18b77 | 203 | ~RGWReshardWait() { |
11fdf7f2 | 204 | ceph_assert(going_down); |
31f18b77 | 205 | } |
11fdf7f2 TL |
206 | int wait(optional_yield y); |
207 | // unblock any threads waiting on reshard | |
208 | void stop(); | |
31f18b77 FG |
209 | }; |
210 | ||
211 | #endif |