]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/driver/rados/rgw_datalog.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_datalog.h
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
1e59de90 4#pragma once
f67539c2
TL
5
6#include <cstdint>
7#include <list>
8#include <memory>
9#include <string>
10#include <string_view>
11#include <variant>
12#include <vector>
13
14#include <boost/container/flat_map.hpp>
1e59de90 15#include <boost/container/flat_set.hpp>
f67539c2
TL
16#include <boost/smart_ptr/intrusive_ptr.hpp>
17#include <boost/smart_ptr/intrusive_ref_counter.hpp>
18
f67539c2
TL
19#include <fmt/format.h>
20
1e59de90 21#include "common/async/yield_context.h"
f67539c2
TL
22#include "include/buffer.h"
23#include "include/encoding.h"
24#include "include/function2.hpp"
25
26#include "include/rados/librados.hpp"
27
28#include "common/ceph_context.h"
29#include "common/ceph_json.h"
30#include "common/ceph_time.h"
31#include "common/Formatter.h"
32#include "common/lru_map.h"
33#include "common/RefCountedObj.h"
34
35#include "cls/log/cls_log_types.h"
36
37#include "rgw_basic_types.h"
38#include "rgw_log_backing.h"
39#include "rgw_sync_policy.h"
40#include "rgw_zone.h"
41#include "rgw_trim_bilog.h"
42
43namespace bc = boost::container;
44
45enum DataLogEntityType {
46 ENTITY_TYPE_UNKNOWN = 0,
47 ENTITY_TYPE_BUCKET = 1,
48};
49
50struct rgw_data_change {
51 DataLogEntityType entity_type;
52 std::string key;
53 ceph::real_time timestamp;
1e59de90 54 uint64_t gen = 0;
f67539c2
TL
55
56 void encode(ceph::buffer::list& bl) const {
1e59de90
TL
57 // require decoders to recognize v2 when gen>0
58 const uint8_t compat = (gen == 0) ? 1 : 2;
59 ENCODE_START(2, compat, bl);
f67539c2
TL
60 auto t = std::uint8_t(entity_type);
61 encode(t, bl);
62 encode(key, bl);
63 encode(timestamp, bl);
1e59de90 64 encode(gen, bl);
f67539c2
TL
65 ENCODE_FINISH(bl);
66 }
67
68 void decode(bufferlist::const_iterator& bl) {
1e59de90 69 DECODE_START(2, bl);
f67539c2
TL
70 std::uint8_t t;
71 decode(t, bl);
72 entity_type = DataLogEntityType(t);
73 decode(key, bl);
74 decode(timestamp, bl);
1e59de90
TL
75 if (struct_v < 2) {
76 gen = 0;
77 } else {
78 decode(gen, bl);
79 }
f67539c2
TL
80 DECODE_FINISH(bl);
81 }
82
83 void dump(ceph::Formatter* f) const;
84 void decode_json(JSONObj* obj);
85};
86WRITE_CLASS_ENCODER(rgw_data_change)
87
88struct rgw_data_change_log_entry {
89 std::string log_id;
90 ceph::real_time log_timestamp;
91 rgw_data_change entry;
92
93 void encode(ceph::buffer::list& bl) const {
94 ENCODE_START(1, 1, bl);
95 encode(log_id, bl);
96 encode(log_timestamp, bl);
97 encode(entry, bl);
98 ENCODE_FINISH(bl);
99 }
100
101 void decode(ceph::buffer::list::const_iterator& bl) {
102 DECODE_START(1, bl);
103 decode(log_id, bl);
104 decode(log_timestamp, bl);
105 decode(entry, bl);
106 DECODE_FINISH(bl);
107 }
108
109 void dump(ceph::Formatter* f) const;
110 void decode_json(JSONObj* obj);
111};
112WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
113
114struct RGWDataChangesLogInfo {
115 std::string marker;
116 ceph::real_time last_update;
117
118 void dump(ceph::Formatter* f) const;
119 void decode_json(JSONObj* obj);
120};
121
122struct RGWDataChangesLogMarker {
123 int shard = 0;
522d829b 124 std::string marker;
f67539c2
TL
125
126 RGWDataChangesLogMarker() = default;
127};
128
129class RGWDataChangesLog;
130
1e59de90
TL
131struct rgw_data_notify_entry {
132 std::string key;
133 uint64_t gen = 0;
134
135 void dump(ceph::Formatter* f) const;
136 void decode_json(JSONObj* obj);
137
138 rgw_data_notify_entry& operator=(const rgw_data_notify_entry&) = default;
139
140 bool operator <(const rgw_data_notify_entry& d) const {
141 if (key < d.key) {
142 return true;
143 }
144 if (d.key < key) {
145 return false;
146 }
147 return gen < d.gen;
148 }
149 friend std::ostream& operator <<(std::ostream& m,
150 const rgw_data_notify_entry& e) {
151 return m << "[key: " << e.key << ", gen: " << e.gen << "]";
152 }
153};
154
f67539c2
TL
155class RGWDataChangesBE;
156
157class DataLogBackends final
158 : public logback_generations,
159 private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
160 friend class logback_generations;
161 friend class GenTrim;
162
163 std::mutex m;
164 RGWDataChangesLog& datalog;
165
166 DataLogBackends(librados::IoCtx& ioctx,
167 std::string oid,
168 fu2::unique_function<std::string(
169 uint64_t, int) const>&& get_oid,
170 int shards, RGWDataChangesLog& datalog) noexcept
171 : logback_generations(ioctx, oid, std::move(get_oid),
172 shards), datalog(datalog) {}
173public:
174
175 boost::intrusive_ptr<RGWDataChangesBE> head() {
176 std::unique_lock l(m);
177 auto i = end();
178 --i;
179 return i->second;
180 }
b3b6e05e 181 int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2 182 std::vector<rgw_data_change_log_entry>& entries,
1e59de90
TL
183 std::string_view marker, std::string* out_marker, bool* truncated,
184 optional_yield y);
185 int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
186 std::string_view marker, optional_yield y);
b3b6e05e 187 void trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
188 librados::AioCompletion* c);
189 void set_zero(RGWDataChangesBE* be) {
190 emplace(0, be);
191 }
192
193 bs::error_code handle_init(entries_t e) noexcept override;
194 bs::error_code handle_new_gens(entries_t e) noexcept override;
195 bs::error_code handle_empty_to(uint64_t new_tail) noexcept override;
196
1e59de90
TL
197 int trim_generations(const DoutPrefixProvider *dpp,
198 std::optional<uint64_t>& through,
199 optional_yield y);
f67539c2
TL
200};
201
1e59de90
TL
202struct BucketGen {
203 rgw_bucket_shard shard;
204 uint64_t gen;
205
206 BucketGen(const rgw_bucket_shard& shard, uint64_t gen)
207 : shard(shard), gen(gen) {}
208
209 BucketGen(rgw_bucket_shard&& shard, uint64_t gen)
210 : shard(std::move(shard)), gen(gen) {}
211
212 BucketGen(const BucketGen&) = default;
213 BucketGen(BucketGen&&) = default;
214 BucketGen& operator =(const BucketGen&) = default;
215 BucketGen& operator =(BucketGen&&) = default;
216
217 ~BucketGen() = default;
218};
219
220inline bool operator ==(const BucketGen& l, const BucketGen& r) {
221 return (l.shard == r.shard) && (l.gen == r.gen);
222}
223
224inline bool operator <(const BucketGen& l, const BucketGen& r) {
225 if (l.shard < r.shard) {
226 return true;
227 } else if (l.shard == r.shard) {
228 return l.gen < r.gen;
229 } else {
230 return false;
231 }
232}
233
f67539c2
TL
234class RGWDataChangesLog {
235 friend DataLogBackends;
236 CephContext *cct;
237 librados::IoCtx ioctx;
238 rgw::BucketChangeObserver *observer = nullptr;
239 const RGWZone* zone;
240 std::unique_ptr<DataLogBackends> bes;
241
242 const int num_shards;
243 std::string get_prefix() {
244 auto prefix = cct->_conf->rgw_data_log_obj_prefix;
20effc67 245 return prefix.empty() ? prefix : "data_log";
f67539c2
TL
246 }
247 std::string metadata_log_oid() {
20effc67 248 return get_prefix() + "generations_metadata";
f67539c2
TL
249 }
250 std::string prefix;
251
252 ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
253 ceph::shared_mutex modified_lock =
254 ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
1e59de90 255 bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> modified_shards;
f67539c2
TL
256
257 std::atomic<bool> down_flag = { false };
258
259 struct ChangeStatus {
260 std::shared_ptr<const rgw_sync_policy_info> sync_policy;
261 ceph::real_time cur_expiration;
262 ceph::real_time cur_sent;
263 bool pending = false;
264 RefCountedCond* cond = nullptr;
265 ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
266 };
267
268 using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
269
1e59de90 270 lru_map<BucketGen, ChangeStatusPtr> changes;
f67539c2 271
1e59de90 272 bc::flat_set<BucketGen> cur_cycle;
f67539c2 273
1e59de90
TL
274 ChangeStatusPtr _get_change(const rgw_bucket_shard& bs, uint64_t gen);
275 void register_renew(const rgw_bucket_shard& bs,
276 const rgw::bucket_log_layout_generation& gen);
277 void update_renewed(const rgw_bucket_shard& bs,
278 uint64_t gen,
279 ceph::real_time expiration);
f67539c2
TL
280
281 ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
282 ceph::condition_variable renew_cond;
20effc67 283 void renew_run() noexcept;
f67539c2
TL
284 void renew_stop();
285 std::thread renew_thread;
286
b3b6e05e 287 std::function<bool(const rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp)> bucket_filter;
f67539c2 288 bool going_down() const;
b3b6e05e
TL
289 bool filter_bucket(const DoutPrefixProvider *dpp, const rgw_bucket& bucket, optional_yield y) const;
290 int renew_entries(const DoutPrefixProvider *dpp);
f67539c2
TL
291
292public:
293
294 RGWDataChangesLog(CephContext* cct);
295 ~RGWDataChangesLog();
296
b3b6e05e 297 int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams,
f67539c2 298 librados::Rados* lr);
1e59de90
TL
299 int choose_oid(const rgw_bucket_shard& bs);
300 int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
301 const rgw::bucket_log_layout_generation& gen, int shard_id,
302 optional_yield y);
f67539c2 303 int get_log_shard_id(rgw_bucket& bucket, int shard_id);
b3b6e05e 304 int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2 305 std::vector<rgw_data_change_log_entry>& entries,
1e59de90
TL
306 std::string_view marker, std::string* out_marker,
307 bool* truncated, optional_yield y);
308 int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
309 std::string_view marker, optional_yield y);
b3b6e05e 310 int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2 311 librados::AioCompletion* c); // :(
1e59de90
TL
312 int get_info(const DoutPrefixProvider *dpp, int shard_id,
313 RGWDataChangesLogInfo *info, optional_yield y);
f67539c2
TL
314
315 using LogMarker = RGWDataChangesLogMarker;
316
b3b6e05e 317 int list_entries(const DoutPrefixProvider *dpp, int max_entries,
f67539c2 318 std::vector<rgw_data_change_log_entry>& entries,
1e59de90
TL
319 LogMarker& marker, bool* ptruncated,
320 optional_yield y);
f67539c2 321
1e59de90 322 void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
f67539c2
TL
323 auto read_clear_modified() {
324 std::unique_lock wl{modified_lock};
325 decltype(modified_shards) modified;
326 modified.swap(modified_shards);
327 modified_shards.clear();
328 return modified;
329 }
330
331 void set_observer(rgw::BucketChangeObserver *observer) {
332 this->observer = observer;
333 }
334
335 void set_bucket_filter(decltype(bucket_filter)&& f) {
336 bucket_filter = std::move(f);
337 }
338 // a marker that compares greater than any other
339 std::string max_marker() const;
340 std::string get_oid(uint64_t gen_id, int shard_id) const;
341
342
b3b6e05e 343 int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y);
1e59de90
TL
344 int trim_generations(const DoutPrefixProvider *dpp,
345 std::optional<uint64_t>& through,
346 optional_yield y);
f67539c2
TL
347};
348
349class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
350protected:
351 librados::IoCtx& ioctx;
352 CephContext* const cct;
353 RGWDataChangesLog& datalog;
354
355 std::string get_oid(int shard_id) {
356 return datalog.get_oid(gen_id, shard_id);
357 }
358public:
359 using entries = std::variant<std::list<cls_log_entry>,
360 std::vector<ceph::buffer::list>>;
361
362 const uint64_t gen_id;
363
364 RGWDataChangesBE(librados::IoCtx& ioctx,
365 RGWDataChangesLog& datalog,
366 uint64_t gen_id)
367 : ioctx(ioctx), cct(static_cast<CephContext*>(ioctx.cct())),
368 datalog(datalog), gen_id(gen_id) {}
369 virtual ~RGWDataChangesBE() = default;
370
371 virtual void prepare(ceph::real_time now,
372 const std::string& key,
373 ceph::buffer::list&& entry,
374 entries& out) = 0;
1e59de90
TL
375 virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items,
376 optional_yield y) = 0;
b3b6e05e 377 virtual int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
1e59de90
TL
378 const std::string& key, ceph::buffer::list&& bl,
379 optional_yield y) = 0;
b3b6e05e 380 virtual int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2
TL
381 std::vector<rgw_data_change_log_entry>& entries,
382 std::optional<std::string_view> marker,
1e59de90
TL
383 std::string* out_marker, bool* truncated,
384 optional_yield y) = 0;
385 virtual int get_info(const DoutPrefixProvider *dpp, int index,
386 RGWDataChangesLogInfo *info, optional_yield y) = 0;
387 virtual int trim(const DoutPrefixProvider *dpp, int index,
388 std::string_view marker, optional_yield y) = 0;
389 virtual int trim(const DoutPrefixProvider *dpp, int index,
390 std::string_view marker, librados::AioCompletion* c) = 0;
f67539c2
TL
391 virtual std::string_view max_marker() const = 0;
392 // 1 on empty, 0 on non-empty, negative on error.
1e59de90 393 virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0;
f67539c2 394};