]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #ifndef CEPH_RGW_DATALOG_H | |
5 | #define CEPH_RGW_DATALOG_H | |
6 | ||
7 | #include <cstdint> | |
8 | #include <list> | |
9 | #include <memory> | |
10 | #include <string> | |
11 | #include <string_view> | |
12 | #include <variant> | |
13 | #include <vector> | |
14 | ||
15 | #include <boost/container/flat_map.hpp> | |
16 | #include <boost/smart_ptr/intrusive_ptr.hpp> | |
17 | #include <boost/smart_ptr/intrusive_ref_counter.hpp> | |
18 | ||
19 | #undef FMT_HEADER_ONLY | |
20 | #define FMT_HEADER_ONLY 1 | |
21 | #include <fmt/format.h> | |
22 | ||
23 | #include "include/buffer.h" | |
24 | #include "include/encoding.h" | |
25 | #include "include/function2.hpp" | |
26 | ||
27 | #include "include/rados/librados.hpp" | |
28 | ||
29 | #include "common/ceph_context.h" | |
30 | #include "common/ceph_json.h" | |
31 | #include "common/ceph_time.h" | |
32 | #include "common/Formatter.h" | |
33 | #include "common/lru_map.h" | |
34 | #include "common/RefCountedObj.h" | |
35 | ||
36 | #include "cls/log/cls_log_types.h" | |
37 | ||
38 | #include "rgw_basic_types.h" | |
39 | #include "rgw_log_backing.h" | |
40 | #include "rgw_sync_policy.h" | |
41 | #include "rgw_zone.h" | |
42 | #include "rgw_trim_bilog.h" | |
43 | ||
44 | namespace bc = boost::container; | |
45 | ||
46 | enum DataLogEntityType { | |
47 | ENTITY_TYPE_UNKNOWN = 0, | |
48 | ENTITY_TYPE_BUCKET = 1, | |
49 | }; | |
50 | ||
51 | struct rgw_data_change { | |
52 | DataLogEntityType entity_type; | |
53 | std::string key; | |
54 | ceph::real_time timestamp; | |
55 | ||
56 | void encode(ceph::buffer::list& bl) const { | |
57 | ENCODE_START(1, 1, bl); | |
58 | auto t = std::uint8_t(entity_type); | |
59 | encode(t, bl); | |
60 | encode(key, bl); | |
61 | encode(timestamp, bl); | |
62 | ENCODE_FINISH(bl); | |
63 | } | |
64 | ||
65 | void decode(bufferlist::const_iterator& bl) { | |
66 | DECODE_START(1, bl); | |
67 | std::uint8_t t; | |
68 | decode(t, bl); | |
69 | entity_type = DataLogEntityType(t); | |
70 | decode(key, bl); | |
71 | decode(timestamp, bl); | |
72 | DECODE_FINISH(bl); | |
73 | } | |
74 | ||
75 | void dump(ceph::Formatter* f) const; | |
76 | void decode_json(JSONObj* obj); | |
77 | }; | |
78 | WRITE_CLASS_ENCODER(rgw_data_change) | |
79 | ||
80 | struct rgw_data_change_log_entry { | |
81 | std::string log_id; | |
82 | ceph::real_time log_timestamp; | |
83 | rgw_data_change entry; | |
84 | ||
85 | void encode(ceph::buffer::list& bl) const { | |
86 | ENCODE_START(1, 1, bl); | |
87 | encode(log_id, bl); | |
88 | encode(log_timestamp, bl); | |
89 | encode(entry, bl); | |
90 | ENCODE_FINISH(bl); | |
91 | } | |
92 | ||
93 | void decode(ceph::buffer::list::const_iterator& bl) { | |
94 | DECODE_START(1, bl); | |
95 | decode(log_id, bl); | |
96 | decode(log_timestamp, bl); | |
97 | decode(entry, bl); | |
98 | DECODE_FINISH(bl); | |
99 | } | |
100 | ||
101 | void dump(ceph::Formatter* f) const; | |
102 | void decode_json(JSONObj* obj); | |
103 | }; | |
104 | WRITE_CLASS_ENCODER(rgw_data_change_log_entry) | |
105 | ||
106 | struct RGWDataChangesLogInfo { | |
107 | std::string marker; | |
108 | ceph::real_time last_update; | |
109 | ||
110 | void dump(ceph::Formatter* f) const; | |
111 | void decode_json(JSONObj* obj); | |
112 | }; | |
113 | ||
114 | struct RGWDataChangesLogMarker { | |
115 | int shard = 0; | |
116 | std::optional<std::string> marker; | |
117 | ||
118 | RGWDataChangesLogMarker() = default; | |
119 | }; | |
120 | ||
121 | class RGWDataChangesLog; | |
122 | ||
123 | class RGWDataChangesBE; | |
124 | ||
125 | class DataLogBackends final | |
126 | : public logback_generations, | |
127 | private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> { | |
128 | friend class logback_generations; | |
129 | friend class GenTrim; | |
130 | ||
131 | std::mutex m; | |
132 | RGWDataChangesLog& datalog; | |
133 | ||
134 | DataLogBackends(librados::IoCtx& ioctx, | |
135 | std::string oid, | |
136 | fu2::unique_function<std::string( | |
137 | uint64_t, int) const>&& get_oid, | |
138 | int shards, RGWDataChangesLog& datalog) noexcept | |
139 | : logback_generations(ioctx, oid, std::move(get_oid), | |
140 | shards), datalog(datalog) {} | |
141 | public: | |
142 | ||
143 | boost::intrusive_ptr<RGWDataChangesBE> head() { | |
144 | std::unique_lock l(m); | |
145 | auto i = end(); | |
146 | --i; | |
147 | return i->second; | |
148 | } | |
b3b6e05e | 149 | int list(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 TL |
150 | std::vector<rgw_data_change_log_entry>& entries, |
151 | std::optional<std::string_view> marker, | |
152 | std::string* out_marker, bool* truncated); | |
b3b6e05e TL |
153 | int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker); |
154 | void trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, | |
f67539c2 TL |
155 | librados::AioCompletion* c); |
156 | void set_zero(RGWDataChangesBE* be) { | |
157 | emplace(0, be); | |
158 | } | |
159 | ||
160 | bs::error_code handle_init(entries_t e) noexcept override; | |
161 | bs::error_code handle_new_gens(entries_t e) noexcept override; | |
162 | bs::error_code handle_empty_to(uint64_t new_tail) noexcept override; | |
163 | ||
b3b6e05e | 164 | int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through); |
f67539c2 TL |
165 | }; |
166 | ||
167 | class RGWDataChangesLog { | |
168 | friend DataLogBackends; | |
169 | CephContext *cct; | |
170 | librados::IoCtx ioctx; | |
171 | rgw::BucketChangeObserver *observer = nullptr; | |
172 | const RGWZone* zone; | |
173 | std::unique_ptr<DataLogBackends> bes; | |
174 | ||
175 | const int num_shards; | |
176 | std::string get_prefix() { | |
177 | auto prefix = cct->_conf->rgw_data_log_obj_prefix; | |
178 | return prefix.empty() ? prefix : "data_log"s; | |
179 | } | |
180 | std::string metadata_log_oid() { | |
181 | return get_prefix() + "generations_metadata"s; | |
182 | } | |
183 | std::string prefix; | |
184 | ||
185 | ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock"); | |
186 | ceph::shared_mutex modified_lock = | |
187 | ceph::make_shared_mutex("RGWDataChangesLog::modified_lock"); | |
188 | bc::flat_map<int, bc::flat_set<std::string>> modified_shards; | |
189 | ||
190 | std::atomic<bool> down_flag = { false }; | |
191 | ||
192 | struct ChangeStatus { | |
193 | std::shared_ptr<const rgw_sync_policy_info> sync_policy; | |
194 | ceph::real_time cur_expiration; | |
195 | ceph::real_time cur_sent; | |
196 | bool pending = false; | |
197 | RefCountedCond* cond = nullptr; | |
198 | ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::ChangeStatus"); | |
199 | }; | |
200 | ||
201 | using ChangeStatusPtr = std::shared_ptr<ChangeStatus>; | |
202 | ||
203 | lru_map<rgw_bucket_shard, ChangeStatusPtr> changes; | |
204 | ||
205 | bc::flat_set<rgw_bucket_shard> cur_cycle; | |
206 | ||
207 | void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); | |
208 | void register_renew(const rgw_bucket_shard& bs); | |
209 | void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration); | |
210 | ||
211 | ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock"); | |
212 | ceph::condition_variable renew_cond; | |
213 | void renew_run(); | |
214 | void renew_stop(); | |
215 | std::thread renew_thread; | |
216 | ||
b3b6e05e | 217 | std::function<bool(const rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp)> bucket_filter; |
f67539c2 TL |
218 | int choose_oid(const rgw_bucket_shard& bs); |
219 | bool going_down() const; | |
b3b6e05e TL |
220 | bool filter_bucket(const DoutPrefixProvider *dpp, const rgw_bucket& bucket, optional_yield y) const; |
221 | int renew_entries(const DoutPrefixProvider *dpp); | |
f67539c2 TL |
222 | |
223 | public: | |
224 | ||
225 | RGWDataChangesLog(CephContext* cct); | |
226 | ~RGWDataChangesLog(); | |
227 | ||
b3b6e05e | 228 | int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams, |
f67539c2 TL |
229 | librados::Rados* lr); |
230 | ||
b3b6e05e | 231 | int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id); |
f67539c2 | 232 | int get_log_shard_id(rgw_bucket& bucket, int shard_id); |
b3b6e05e | 233 | int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 TL |
234 | std::vector<rgw_data_change_log_entry>& entries, |
235 | std::optional<std::string_view> marker, | |
236 | std::string* out_marker, bool* truncated); | |
b3b6e05e TL |
237 | int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker); |
238 | int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, | |
f67539c2 | 239 | librados::AioCompletion* c); // :( |
b3b6e05e | 240 | int get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info); |
f67539c2 TL |
241 | |
242 | using LogMarker = RGWDataChangesLogMarker; | |
243 | ||
b3b6e05e | 244 | int list_entries(const DoutPrefixProvider *dpp, int max_entries, |
f67539c2 TL |
245 | std::vector<rgw_data_change_log_entry>& entries, |
246 | LogMarker& marker, bool* ptruncated); | |
247 | ||
248 | void mark_modified(int shard_id, const rgw_bucket_shard& bs); | |
249 | auto read_clear_modified() { | |
250 | std::unique_lock wl{modified_lock}; | |
251 | decltype(modified_shards) modified; | |
252 | modified.swap(modified_shards); | |
253 | modified_shards.clear(); | |
254 | return modified; | |
255 | } | |
256 | ||
257 | void set_observer(rgw::BucketChangeObserver *observer) { | |
258 | this->observer = observer; | |
259 | } | |
260 | ||
261 | void set_bucket_filter(decltype(bucket_filter)&& f) { | |
262 | bucket_filter = std::move(f); | |
263 | } | |
264 | // a marker that compares greater than any other | |
265 | std::string max_marker() const; | |
266 | std::string get_oid(uint64_t gen_id, int shard_id) const; | |
267 | ||
268 | ||
b3b6e05e TL |
269 | int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y); |
270 | int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through); | |
f67539c2 TL |
271 | }; |
272 | ||
273 | class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> { | |
274 | protected: | |
275 | librados::IoCtx& ioctx; | |
276 | CephContext* const cct; | |
277 | RGWDataChangesLog& datalog; | |
278 | ||
279 | std::string get_oid(int shard_id) { | |
280 | return datalog.get_oid(gen_id, shard_id); | |
281 | } | |
282 | public: | |
283 | using entries = std::variant<std::list<cls_log_entry>, | |
284 | std::vector<ceph::buffer::list>>; | |
285 | ||
286 | const uint64_t gen_id; | |
287 | ||
288 | RGWDataChangesBE(librados::IoCtx& ioctx, | |
289 | RGWDataChangesLog& datalog, | |
290 | uint64_t gen_id) | |
291 | : ioctx(ioctx), cct(static_cast<CephContext*>(ioctx.cct())), | |
292 | datalog(datalog), gen_id(gen_id) {} | |
293 | virtual ~RGWDataChangesBE() = default; | |
294 | ||
295 | virtual void prepare(ceph::real_time now, | |
296 | const std::string& key, | |
297 | ceph::buffer::list&& entry, | |
298 | entries& out) = 0; | |
b3b6e05e TL |
299 | virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items) = 0; |
300 | virtual int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now, | |
f67539c2 TL |
301 | const std::string& key, |
302 | ceph::buffer::list&& bl) = 0; | |
b3b6e05e | 303 | virtual int list(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 TL |
304 | std::vector<rgw_data_change_log_entry>& entries, |
305 | std::optional<std::string_view> marker, | |
306 | std::string* out_marker, bool* truncated) = 0; | |
b3b6e05e TL |
307 | virtual int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) = 0; |
308 | virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) = 0; | |
309 | virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, | |
f67539c2 TL |
310 | librados::AioCompletion* c) = 0; |
311 | virtual std::string_view max_marker() const = 0; | |
312 | // 1 on empty, 0 on non-empty, negative on error. | |
b3b6e05e | 313 | virtual int is_empty(const DoutPrefixProvider *dpp) = 0; |
f67539c2 TL |
314 | }; |
315 | ||
316 | ||
317 | #endif |