]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_datalog.h
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_datalog.h
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#ifndef CEPH_RGW_DATALOG_H
5#define CEPH_RGW_DATALOG_H
6
7#include <cstdint>
8#include <list>
9#include <memory>
10#include <string>
11#include <string_view>
12#include <variant>
13#include <vector>
14
15#include <boost/container/flat_map.hpp>
16#include <boost/smart_ptr/intrusive_ptr.hpp>
17#include <boost/smart_ptr/intrusive_ref_counter.hpp>
18
19#undef FMT_HEADER_ONLY
20#define FMT_HEADER_ONLY 1
21#include <fmt/format.h>
22
23#include "include/buffer.h"
24#include "include/encoding.h"
25#include "include/function2.hpp"
26
27#include "include/rados/librados.hpp"
28
29#include "common/ceph_context.h"
30#include "common/ceph_json.h"
31#include "common/ceph_time.h"
32#include "common/Formatter.h"
33#include "common/lru_map.h"
34#include "common/RefCountedObj.h"
35
36#include "cls/log/cls_log_types.h"
37
38#include "rgw_basic_types.h"
39#include "rgw_log_backing.h"
40#include "rgw_sync_policy.h"
41#include "rgw_zone.h"
42#include "rgw_trim_bilog.h"
43
44namespace bc = boost::container;
45
46enum DataLogEntityType {
47 ENTITY_TYPE_UNKNOWN = 0,
48 ENTITY_TYPE_BUCKET = 1,
49};
50
51struct rgw_data_change {
52 DataLogEntityType entity_type;
53 std::string key;
54 ceph::real_time timestamp;
55
56 void encode(ceph::buffer::list& bl) const {
57 ENCODE_START(1, 1, bl);
58 auto t = std::uint8_t(entity_type);
59 encode(t, bl);
60 encode(key, bl);
61 encode(timestamp, bl);
62 ENCODE_FINISH(bl);
63 }
64
65 void decode(bufferlist::const_iterator& bl) {
66 DECODE_START(1, bl);
67 std::uint8_t t;
68 decode(t, bl);
69 entity_type = DataLogEntityType(t);
70 decode(key, bl);
71 decode(timestamp, bl);
72 DECODE_FINISH(bl);
73 }
74
75 void dump(ceph::Formatter* f) const;
76 void decode_json(JSONObj* obj);
77};
78WRITE_CLASS_ENCODER(rgw_data_change)
79
80struct rgw_data_change_log_entry {
81 std::string log_id;
82 ceph::real_time log_timestamp;
83 rgw_data_change entry;
84
85 void encode(ceph::buffer::list& bl) const {
86 ENCODE_START(1, 1, bl);
87 encode(log_id, bl);
88 encode(log_timestamp, bl);
89 encode(entry, bl);
90 ENCODE_FINISH(bl);
91 }
92
93 void decode(ceph::buffer::list::const_iterator& bl) {
94 DECODE_START(1, bl);
95 decode(log_id, bl);
96 decode(log_timestamp, bl);
97 decode(entry, bl);
98 DECODE_FINISH(bl);
99 }
100
101 void dump(ceph::Formatter* f) const;
102 void decode_json(JSONObj* obj);
103};
104WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
105
106struct RGWDataChangesLogInfo {
107 std::string marker;
108 ceph::real_time last_update;
109
110 void dump(ceph::Formatter* f) const;
111 void decode_json(JSONObj* obj);
112};
113
114struct RGWDataChangesLogMarker {
115 int shard = 0;
116 std::optional<std::string> marker;
117
118 RGWDataChangesLogMarker() = default;
119};
120
121class RGWDataChangesLog;
122
123class RGWDataChangesBE;
124
125class DataLogBackends final
126 : public logback_generations,
127 private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
128 friend class logback_generations;
129 friend class GenTrim;
130
131 std::mutex m;
132 RGWDataChangesLog& datalog;
133
134 DataLogBackends(librados::IoCtx& ioctx,
135 std::string oid,
136 fu2::unique_function<std::string(
137 uint64_t, int) const>&& get_oid,
138 int shards, RGWDataChangesLog& datalog) noexcept
139 : logback_generations(ioctx, oid, std::move(get_oid),
140 shards), datalog(datalog) {}
141public:
142
143 boost::intrusive_ptr<RGWDataChangesBE> head() {
144 std::unique_lock l(m);
145 auto i = end();
146 --i;
147 return i->second;
148 }
b3b6e05e 149 int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2
TL
150 std::vector<rgw_data_change_log_entry>& entries,
151 std::optional<std::string_view> marker,
152 std::string* out_marker, bool* truncated);
b3b6e05e
TL
153 int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker);
154 void trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
155 librados::AioCompletion* c);
156 void set_zero(RGWDataChangesBE* be) {
157 emplace(0, be);
158 }
159
160 bs::error_code handle_init(entries_t e) noexcept override;
161 bs::error_code handle_new_gens(entries_t e) noexcept override;
162 bs::error_code handle_empty_to(uint64_t new_tail) noexcept override;
163
b3b6e05e 164 int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
f67539c2
TL
165};
166
167class RGWDataChangesLog {
168 friend DataLogBackends;
169 CephContext *cct;
170 librados::IoCtx ioctx;
171 rgw::BucketChangeObserver *observer = nullptr;
172 const RGWZone* zone;
173 std::unique_ptr<DataLogBackends> bes;
174
175 const int num_shards;
176 std::string get_prefix() {
177 auto prefix = cct->_conf->rgw_data_log_obj_prefix;
178 return prefix.empty() ? prefix : "data_log"s;
179 }
180 std::string metadata_log_oid() {
181 return get_prefix() + "generations_metadata"s;
182 }
183 std::string prefix;
184
185 ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
186 ceph::shared_mutex modified_lock =
187 ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
188 bc::flat_map<int, bc::flat_set<std::string>> modified_shards;
189
190 std::atomic<bool> down_flag = { false };
191
192 struct ChangeStatus {
193 std::shared_ptr<const rgw_sync_policy_info> sync_policy;
194 ceph::real_time cur_expiration;
195 ceph::real_time cur_sent;
196 bool pending = false;
197 RefCountedCond* cond = nullptr;
198 ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
199 };
200
201 using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
202
203 lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
204
205 bc::flat_set<rgw_bucket_shard> cur_cycle;
206
207 void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
208 void register_renew(const rgw_bucket_shard& bs);
209 void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
210
211 ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
212 ceph::condition_variable renew_cond;
213 void renew_run();
214 void renew_stop();
215 std::thread renew_thread;
216
b3b6e05e 217 std::function<bool(const rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp)> bucket_filter;
f67539c2
TL
218 int choose_oid(const rgw_bucket_shard& bs);
219 bool going_down() const;
b3b6e05e
TL
220 bool filter_bucket(const DoutPrefixProvider *dpp, const rgw_bucket& bucket, optional_yield y) const;
221 int renew_entries(const DoutPrefixProvider *dpp);
f67539c2
TL
222
223public:
224
225 RGWDataChangesLog(CephContext* cct);
226 ~RGWDataChangesLog();
227
b3b6e05e 228 int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams,
f67539c2
TL
229 librados::Rados* lr);
230
b3b6e05e 231 int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id);
f67539c2 232 int get_log_shard_id(rgw_bucket& bucket, int shard_id);
b3b6e05e 233 int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2
TL
234 std::vector<rgw_data_change_log_entry>& entries,
235 std::optional<std::string_view> marker,
236 std::string* out_marker, bool* truncated);
b3b6e05e
TL
237 int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker);
238 int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2 239 librados::AioCompletion* c); // :(
b3b6e05e 240 int get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info);
f67539c2
TL
241
242 using LogMarker = RGWDataChangesLogMarker;
243
b3b6e05e 244 int list_entries(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
245 std::vector<rgw_data_change_log_entry>& entries,
246 LogMarker& marker, bool* ptruncated);
247
248 void mark_modified(int shard_id, const rgw_bucket_shard& bs);
249 auto read_clear_modified() {
250 std::unique_lock wl{modified_lock};
251 decltype(modified_shards) modified;
252 modified.swap(modified_shards);
253 modified_shards.clear();
254 return modified;
255 }
256
257 void set_observer(rgw::BucketChangeObserver *observer) {
258 this->observer = observer;
259 }
260
261 void set_bucket_filter(decltype(bucket_filter)&& f) {
262 bucket_filter = std::move(f);
263 }
264 // a marker that compares greater than any other
265 std::string max_marker() const;
266 std::string get_oid(uint64_t gen_id, int shard_id) const;
267
268
b3b6e05e
TL
269 int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y);
270 int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
f67539c2
TL
271};
272
273class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
274protected:
275 librados::IoCtx& ioctx;
276 CephContext* const cct;
277 RGWDataChangesLog& datalog;
278
279 std::string get_oid(int shard_id) {
280 return datalog.get_oid(gen_id, shard_id);
281 }
282public:
283 using entries = std::variant<std::list<cls_log_entry>,
284 std::vector<ceph::buffer::list>>;
285
286 const uint64_t gen_id;
287
288 RGWDataChangesBE(librados::IoCtx& ioctx,
289 RGWDataChangesLog& datalog,
290 uint64_t gen_id)
291 : ioctx(ioctx), cct(static_cast<CephContext*>(ioctx.cct())),
292 datalog(datalog), gen_id(gen_id) {}
293 virtual ~RGWDataChangesBE() = default;
294
295 virtual void prepare(ceph::real_time now,
296 const std::string& key,
297 ceph::buffer::list&& entry,
298 entries& out) = 0;
b3b6e05e
TL
299 virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items) = 0;
300 virtual int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
f67539c2
TL
301 const std::string& key,
302 ceph::buffer::list&& bl) = 0;
b3b6e05e 303 virtual int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2
TL
304 std::vector<rgw_data_change_log_entry>& entries,
305 std::optional<std::string_view> marker,
306 std::string* out_marker, bool* truncated) = 0;
b3b6e05e
TL
307 virtual int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) = 0;
308 virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) = 0;
309 virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
f67539c2
TL
310 librados::AioCompletion* c) = 0;
311 virtual std::string_view max_marker() const = 0;
312 // 1 on empty, 0 on non-empty, negative on error.
b3b6e05e 313 virtual int is_empty(const DoutPrefixProvider *dpp) = 0;
f67539c2
TL
314};
315
316
317#endif