]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
1e59de90 | 4 | #pragma once |
f67539c2 TL |
5 | |
6 | #include <cstdint> | |
7 | #include <list> | |
8 | #include <memory> | |
9 | #include <string> | |
10 | #include <string_view> | |
11 | #include <variant> | |
12 | #include <vector> | |
13 | ||
14 | #include <boost/container/flat_map.hpp> | |
1e59de90 | 15 | #include <boost/container/flat_set.hpp> |
f67539c2 TL |
16 | #include <boost/smart_ptr/intrusive_ptr.hpp> |
17 | #include <boost/smart_ptr/intrusive_ref_counter.hpp> | |
18 | ||
f67539c2 TL |
19 | #include <fmt/format.h> |
20 | ||
1e59de90 | 21 | #include "common/async/yield_context.h" |
f67539c2 TL |
22 | #include "include/buffer.h" |
23 | #include "include/encoding.h" | |
24 | #include "include/function2.hpp" | |
25 | ||
26 | #include "include/rados/librados.hpp" | |
27 | ||
28 | #include "common/ceph_context.h" | |
29 | #include "common/ceph_json.h" | |
30 | #include "common/ceph_time.h" | |
31 | #include "common/Formatter.h" | |
32 | #include "common/lru_map.h" | |
33 | #include "common/RefCountedObj.h" | |
34 | ||
35 | #include "cls/log/cls_log_types.h" | |
36 | ||
37 | #include "rgw_basic_types.h" | |
38 | #include "rgw_log_backing.h" | |
39 | #include "rgw_sync_policy.h" | |
40 | #include "rgw_zone.h" | |
41 | #include "rgw_trim_bilog.h" | |
42 | ||
43 | namespace bc = boost::container; | |
44 | ||
45 | enum DataLogEntityType { | |
46 | ENTITY_TYPE_UNKNOWN = 0, | |
47 | ENTITY_TYPE_BUCKET = 1, | |
48 | }; | |
49 | ||
50 | struct rgw_data_change { | |
51 | DataLogEntityType entity_type; | |
52 | std::string key; | |
53 | ceph::real_time timestamp; | |
1e59de90 | 54 | uint64_t gen = 0; |
f67539c2 TL |
55 | |
56 | void encode(ceph::buffer::list& bl) const { | |
1e59de90 TL |
57 | // require decoders to recognize v2 when gen>0 |
58 | const uint8_t compat = (gen == 0) ? 1 : 2; | |
59 | ENCODE_START(2, compat, bl); | |
f67539c2 TL |
60 | auto t = std::uint8_t(entity_type); |
61 | encode(t, bl); | |
62 | encode(key, bl); | |
63 | encode(timestamp, bl); | |
1e59de90 | 64 | encode(gen, bl); |
f67539c2 TL |
65 | ENCODE_FINISH(bl); |
66 | } | |
67 | ||
68 | void decode(bufferlist::const_iterator& bl) { | |
1e59de90 | 69 | DECODE_START(2, bl); |
f67539c2 TL |
70 | std::uint8_t t; |
71 | decode(t, bl); | |
72 | entity_type = DataLogEntityType(t); | |
73 | decode(key, bl); | |
74 | decode(timestamp, bl); | |
1e59de90 TL |
75 | if (struct_v < 2) { |
76 | gen = 0; | |
77 | } else { | |
78 | decode(gen, bl); | |
79 | } | |
f67539c2 TL |
80 | DECODE_FINISH(bl); |
81 | } | |
82 | ||
83 | void dump(ceph::Formatter* f) const; | |
84 | void decode_json(JSONObj* obj); | |
85 | }; | |
86 | WRITE_CLASS_ENCODER(rgw_data_change) | |
87 | ||
88 | struct rgw_data_change_log_entry { | |
89 | std::string log_id; | |
90 | ceph::real_time log_timestamp; | |
91 | rgw_data_change entry; | |
92 | ||
93 | void encode(ceph::buffer::list& bl) const { | |
94 | ENCODE_START(1, 1, bl); | |
95 | encode(log_id, bl); | |
96 | encode(log_timestamp, bl); | |
97 | encode(entry, bl); | |
98 | ENCODE_FINISH(bl); | |
99 | } | |
100 | ||
101 | void decode(ceph::buffer::list::const_iterator& bl) { | |
102 | DECODE_START(1, bl); | |
103 | decode(log_id, bl); | |
104 | decode(log_timestamp, bl); | |
105 | decode(entry, bl); | |
106 | DECODE_FINISH(bl); | |
107 | } | |
108 | ||
109 | void dump(ceph::Formatter* f) const; | |
110 | void decode_json(JSONObj* obj); | |
111 | }; | |
112 | WRITE_CLASS_ENCODER(rgw_data_change_log_entry) | |
113 | ||
114 | struct RGWDataChangesLogInfo { | |
115 | std::string marker; | |
116 | ceph::real_time last_update; | |
117 | ||
118 | void dump(ceph::Formatter* f) const; | |
119 | void decode_json(JSONObj* obj); | |
120 | }; | |
121 | ||
122 | struct RGWDataChangesLogMarker { | |
123 | int shard = 0; | |
522d829b | 124 | std::string marker; |
f67539c2 TL |
125 | |
126 | RGWDataChangesLogMarker() = default; | |
127 | }; | |
128 | ||
129 | class RGWDataChangesLog; | |
130 | ||
1e59de90 TL |
131 | struct rgw_data_notify_entry { |
132 | std::string key; | |
133 | uint64_t gen = 0; | |
134 | ||
135 | void dump(ceph::Formatter* f) const; | |
136 | void decode_json(JSONObj* obj); | |
137 | ||
138 | rgw_data_notify_entry& operator=(const rgw_data_notify_entry&) = default; | |
139 | ||
140 | bool operator <(const rgw_data_notify_entry& d) const { | |
141 | if (key < d.key) { | |
142 | return true; | |
143 | } | |
144 | if (d.key < key) { | |
145 | return false; | |
146 | } | |
147 | return gen < d.gen; | |
148 | } | |
149 | friend std::ostream& operator <<(std::ostream& m, | |
150 | const rgw_data_notify_entry& e) { | |
151 | return m << "[key: " << e.key << ", gen: " << e.gen << "]"; | |
152 | } | |
153 | }; | |
154 | ||
f67539c2 TL |
155 | class RGWDataChangesBE; |
156 | ||
157 | class DataLogBackends final | |
158 | : public logback_generations, | |
159 | private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> { | |
160 | friend class logback_generations; | |
161 | friend class GenTrim; | |
162 | ||
163 | std::mutex m; | |
164 | RGWDataChangesLog& datalog; | |
165 | ||
166 | DataLogBackends(librados::IoCtx& ioctx, | |
167 | std::string oid, | |
168 | fu2::unique_function<std::string( | |
169 | uint64_t, int) const>&& get_oid, | |
170 | int shards, RGWDataChangesLog& datalog) noexcept | |
171 | : logback_generations(ioctx, oid, std::move(get_oid), | |
172 | shards), datalog(datalog) {} | |
173 | public: | |
174 | ||
175 | boost::intrusive_ptr<RGWDataChangesBE> head() { | |
176 | std::unique_lock l(m); | |
177 | auto i = end(); | |
178 | --i; | |
179 | return i->second; | |
180 | } | |
b3b6e05e | 181 | int list(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 | 182 | std::vector<rgw_data_change_log_entry>& entries, |
1e59de90 TL |
183 | std::string_view marker, std::string* out_marker, bool* truncated, |
184 | optional_yield y); | |
185 | int trim_entries(const DoutPrefixProvider *dpp, int shard_id, | |
186 | std::string_view marker, optional_yield y); | |
b3b6e05e | 187 | void trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, |
f67539c2 TL |
188 | librados::AioCompletion* c); |
189 | void set_zero(RGWDataChangesBE* be) { | |
190 | emplace(0, be); | |
191 | } | |
192 | ||
193 | bs::error_code handle_init(entries_t e) noexcept override; | |
194 | bs::error_code handle_new_gens(entries_t e) noexcept override; | |
195 | bs::error_code handle_empty_to(uint64_t new_tail) noexcept override; | |
196 | ||
1e59de90 TL |
197 | int trim_generations(const DoutPrefixProvider *dpp, |
198 | std::optional<uint64_t>& through, | |
199 | optional_yield y); | |
f67539c2 TL |
200 | }; |
201 | ||
1e59de90 TL |
202 | struct BucketGen { |
203 | rgw_bucket_shard shard; | |
204 | uint64_t gen; | |
205 | ||
206 | BucketGen(const rgw_bucket_shard& shard, uint64_t gen) | |
207 | : shard(shard), gen(gen) {} | |
208 | ||
209 | BucketGen(rgw_bucket_shard&& shard, uint64_t gen) | |
210 | : shard(std::move(shard)), gen(gen) {} | |
211 | ||
212 | BucketGen(const BucketGen&) = default; | |
213 | BucketGen(BucketGen&&) = default; | |
214 | BucketGen& operator =(const BucketGen&) = default; | |
215 | BucketGen& operator =(BucketGen&&) = default; | |
216 | ||
217 | ~BucketGen() = default; | |
218 | }; | |
219 | ||
220 | inline bool operator ==(const BucketGen& l, const BucketGen& r) { | |
221 | return (l.shard == r.shard) && (l.gen == r.gen); | |
222 | } | |
223 | ||
224 | inline bool operator <(const BucketGen& l, const BucketGen& r) { | |
225 | if (l.shard < r.shard) { | |
226 | return true; | |
227 | } else if (l.shard == r.shard) { | |
228 | return l.gen < r.gen; | |
229 | } else { | |
230 | return false; | |
231 | } | |
232 | } | |
233 | ||
f67539c2 TL |
234 | class RGWDataChangesLog { |
235 | friend DataLogBackends; | |
236 | CephContext *cct; | |
237 | librados::IoCtx ioctx; | |
238 | rgw::BucketChangeObserver *observer = nullptr; | |
239 | const RGWZone* zone; | |
240 | std::unique_ptr<DataLogBackends> bes; | |
241 | ||
242 | const int num_shards; | |
243 | std::string get_prefix() { | |
244 | auto prefix = cct->_conf->rgw_data_log_obj_prefix; | |
20effc67 | 245 | return prefix.empty() ? prefix : "data_log"; |
f67539c2 TL |
246 | } |
247 | std::string metadata_log_oid() { | |
20effc67 | 248 | return get_prefix() + "generations_metadata"; |
f67539c2 TL |
249 | } |
250 | std::string prefix; | |
251 | ||
252 | ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock"); | |
253 | ceph::shared_mutex modified_lock = | |
254 | ceph::make_shared_mutex("RGWDataChangesLog::modified_lock"); | |
1e59de90 | 255 | bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> modified_shards; |
f67539c2 TL |
256 | |
257 | std::atomic<bool> down_flag = { false }; | |
258 | ||
259 | struct ChangeStatus { | |
260 | std::shared_ptr<const rgw_sync_policy_info> sync_policy; | |
261 | ceph::real_time cur_expiration; | |
262 | ceph::real_time cur_sent; | |
263 | bool pending = false; | |
264 | RefCountedCond* cond = nullptr; | |
265 | ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::ChangeStatus"); | |
266 | }; | |
267 | ||
268 | using ChangeStatusPtr = std::shared_ptr<ChangeStatus>; | |
269 | ||
1e59de90 | 270 | lru_map<BucketGen, ChangeStatusPtr> changes; |
f67539c2 | 271 | |
1e59de90 | 272 | bc::flat_set<BucketGen> cur_cycle; |
f67539c2 | 273 | |
1e59de90 TL |
274 | ChangeStatusPtr _get_change(const rgw_bucket_shard& bs, uint64_t gen); |
275 | void register_renew(const rgw_bucket_shard& bs, | |
276 | const rgw::bucket_log_layout_generation& gen); | |
277 | void update_renewed(const rgw_bucket_shard& bs, | |
278 | uint64_t gen, | |
279 | ceph::real_time expiration); | |
f67539c2 TL |
280 | |
281 | ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock"); | |
282 | ceph::condition_variable renew_cond; | |
20effc67 | 283 | void renew_run() noexcept; |
f67539c2 TL |
284 | void renew_stop(); |
285 | std::thread renew_thread; | |
286 | ||
b3b6e05e | 287 | std::function<bool(const rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp)> bucket_filter; |
f67539c2 | 288 | bool going_down() const; |
b3b6e05e TL |
289 | bool filter_bucket(const DoutPrefixProvider *dpp, const rgw_bucket& bucket, optional_yield y) const; |
290 | int renew_entries(const DoutPrefixProvider *dpp); | |
f67539c2 TL |
291 | |
292 | public: | |
293 | ||
294 | RGWDataChangesLog(CephContext* cct); | |
295 | ~RGWDataChangesLog(); | |
296 | ||
b3b6e05e | 297 | int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams, |
f67539c2 | 298 | librados::Rados* lr); |
1e59de90 TL |
299 | int choose_oid(const rgw_bucket_shard& bs); |
300 | int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, | |
301 | const rgw::bucket_log_layout_generation& gen, int shard_id, | |
302 | optional_yield y); | |
f67539c2 | 303 | int get_log_shard_id(rgw_bucket& bucket, int shard_id); |
b3b6e05e | 304 | int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 | 305 | std::vector<rgw_data_change_log_entry>& entries, |
1e59de90 TL |
306 | std::string_view marker, std::string* out_marker, |
307 | bool* truncated, optional_yield y); | |
308 | int trim_entries(const DoutPrefixProvider *dpp, int shard_id, | |
309 | std::string_view marker, optional_yield y); | |
b3b6e05e | 310 | int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, |
f67539c2 | 311 | librados::AioCompletion* c); // :( |
1e59de90 TL |
312 | int get_info(const DoutPrefixProvider *dpp, int shard_id, |
313 | RGWDataChangesLogInfo *info, optional_yield y); | |
f67539c2 TL |
314 | |
315 | using LogMarker = RGWDataChangesLogMarker; | |
316 | ||
b3b6e05e | 317 | int list_entries(const DoutPrefixProvider *dpp, int max_entries, |
f67539c2 | 318 | std::vector<rgw_data_change_log_entry>& entries, |
1e59de90 TL |
319 | LogMarker& marker, bool* ptruncated, |
320 | optional_yield y); | |
f67539c2 | 321 | |
1e59de90 | 322 | void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen); |
f67539c2 TL |
323 | auto read_clear_modified() { |
324 | std::unique_lock wl{modified_lock}; | |
325 | decltype(modified_shards) modified; | |
326 | modified.swap(modified_shards); | |
327 | modified_shards.clear(); | |
328 | return modified; | |
329 | } | |
330 | ||
331 | void set_observer(rgw::BucketChangeObserver *observer) { | |
332 | this->observer = observer; | |
333 | } | |
334 | ||
335 | void set_bucket_filter(decltype(bucket_filter)&& f) { | |
336 | bucket_filter = std::move(f); | |
337 | } | |
338 | // a marker that compares greater than any other | |
339 | std::string max_marker() const; | |
340 | std::string get_oid(uint64_t gen_id, int shard_id) const; | |
341 | ||
342 | ||
b3b6e05e | 343 | int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y); |
1e59de90 TL |
344 | int trim_generations(const DoutPrefixProvider *dpp, |
345 | std::optional<uint64_t>& through, | |
346 | optional_yield y); | |
f67539c2 TL |
347 | }; |
348 | ||
349 | class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> { | |
350 | protected: | |
351 | librados::IoCtx& ioctx; | |
352 | CephContext* const cct; | |
353 | RGWDataChangesLog& datalog; | |
354 | ||
355 | std::string get_oid(int shard_id) { | |
356 | return datalog.get_oid(gen_id, shard_id); | |
357 | } | |
358 | public: | |
359 | using entries = std::variant<std::list<cls_log_entry>, | |
360 | std::vector<ceph::buffer::list>>; | |
361 | ||
362 | const uint64_t gen_id; | |
363 | ||
364 | RGWDataChangesBE(librados::IoCtx& ioctx, | |
365 | RGWDataChangesLog& datalog, | |
366 | uint64_t gen_id) | |
367 | : ioctx(ioctx), cct(static_cast<CephContext*>(ioctx.cct())), | |
368 | datalog(datalog), gen_id(gen_id) {} | |
369 | virtual ~RGWDataChangesBE() = default; | |
370 | ||
371 | virtual void prepare(ceph::real_time now, | |
372 | const std::string& key, | |
373 | ceph::buffer::list&& entry, | |
374 | entries& out) = 0; | |
1e59de90 TL |
375 | virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items, |
376 | optional_yield y) = 0; | |
b3b6e05e | 377 | virtual int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now, |
1e59de90 TL |
378 | const std::string& key, ceph::buffer::list&& bl, |
379 | optional_yield y) = 0; | |
b3b6e05e | 380 | virtual int list(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 TL |
381 | std::vector<rgw_data_change_log_entry>& entries, |
382 | std::optional<std::string_view> marker, | |
1e59de90 TL |
383 | std::string* out_marker, bool* truncated, |
384 | optional_yield y) = 0; | |
385 | virtual int get_info(const DoutPrefixProvider *dpp, int index, | |
386 | RGWDataChangesLogInfo *info, optional_yield y) = 0; | |
387 | virtual int trim(const DoutPrefixProvider *dpp, int index, | |
388 | std::string_view marker, optional_yield y) = 0; | |
389 | virtual int trim(const DoutPrefixProvider *dpp, int index, | |
390 | std::string_view marker, librados::AioCompletion* c) = 0; | |
f67539c2 TL |
391 | virtual std::string_view max_marker() const = 0; |
392 | // 1 on empty, 0 on non-empty, negative on error. | |
1e59de90 | 393 | virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0; |
f67539c2 | 394 | }; |