1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_DATALOG_H
5 #define CEPH_RGW_DATALOG_H
11 #include <string_view>
15 #include <boost/container/flat_map.hpp>
16 #include <boost/smart_ptr/intrusive_ptr.hpp>
17 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
19 #undef FMT_HEADER_ONLY
20 #define FMT_HEADER_ONLY 1
21 #include <fmt/format.h>
23 #include "include/buffer.h"
24 #include "include/encoding.h"
25 #include "include/function2.hpp"
27 #include "include/rados/librados.hpp"
29 #include "common/ceph_context.h"
30 #include "common/ceph_json.h"
31 #include "common/ceph_time.h"
32 #include "common/Formatter.h"
33 #include "common/lru_map.h"
34 #include "common/RefCountedObj.h"
36 #include "cls/log/cls_log_types.h"
38 #include "rgw_basic_types.h"
39 #include "rgw_log_backing.h"
40 #include "rgw_sync_policy.h"
42 #include "rgw_trim_bilog.h"
44 namespace bc
= boost::container
;
46 enum DataLogEntityType
{
47 ENTITY_TYPE_UNKNOWN
= 0,
48 ENTITY_TYPE_BUCKET
= 1,
51 struct rgw_data_change
{
52 DataLogEntityType entity_type
;
54 ceph::real_time timestamp
;
56 void encode(ceph::buffer::list
& bl
) const {
57 ENCODE_START(1, 1, bl
);
58 auto t
= std::uint8_t(entity_type
);
61 encode(timestamp
, bl
);
65 void decode(bufferlist::const_iterator
& bl
) {
69 entity_type
= DataLogEntityType(t
);
71 decode(timestamp
, bl
);
75 void dump(ceph::Formatter
* f
) const;
76 void decode_json(JSONObj
* obj
);
78 WRITE_CLASS_ENCODER(rgw_data_change
)
80 struct rgw_data_change_log_entry
{
82 ceph::real_time log_timestamp
;
83 rgw_data_change entry
;
85 void encode(ceph::buffer::list
& bl
) const {
86 ENCODE_START(1, 1, bl
);
88 encode(log_timestamp
, bl
);
93 void decode(ceph::buffer::list::const_iterator
& bl
) {
96 decode(log_timestamp
, bl
);
101 void dump(ceph::Formatter
* f
) const;
102 void decode_json(JSONObj
* obj
);
104 WRITE_CLASS_ENCODER(rgw_data_change_log_entry
)
106 struct RGWDataChangesLogInfo
{
108 ceph::real_time last_update
;
110 void dump(ceph::Formatter
* f
) const;
111 void decode_json(JSONObj
* obj
);
114 struct RGWDataChangesLogMarker
{
118 RGWDataChangesLogMarker() = default;
121 class RGWDataChangesLog
;
123 class RGWDataChangesBE
;
125 class DataLogBackends final
126 : public logback_generations
,
127 private bc::flat_map
<uint64_t, boost::intrusive_ptr
<RGWDataChangesBE
>> {
128 friend class logback_generations
;
129 friend class GenTrim
;
132 RGWDataChangesLog
& datalog
;
134 DataLogBackends(librados::IoCtx
& ioctx
,
136 fu2::unique_function
<std::string(
137 uint64_t, int) const>&& get_oid
,
138 int shards
, RGWDataChangesLog
& datalog
) noexcept
139 : logback_generations(ioctx
, oid
, std::move(get_oid
),
140 shards
), datalog(datalog
) {}
143 boost::intrusive_ptr
<RGWDataChangesBE
> head() {
144 std::unique_lock
l(m
);
149 int list(const DoutPrefixProvider
*dpp
, int shard
, int max_entries
,
150 std::vector
<rgw_data_change_log_entry
>& entries
,
151 std::string_view marker
,
152 std::string
* out_marker
, bool* truncated
);
153 int trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
);
154 void trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
,
155 librados::AioCompletion
* c
);
156 void set_zero(RGWDataChangesBE
* be
) {
160 bs::error_code
handle_init(entries_t e
) noexcept override
;
161 bs::error_code
handle_new_gens(entries_t e
) noexcept override
;
162 bs::error_code
handle_empty_to(uint64_t new_tail
) noexcept override
;
164 int trim_generations(const DoutPrefixProvider
*dpp
, std::optional
<uint64_t>& through
);
167 class RGWDataChangesLog
{
168 friend DataLogBackends
;
170 librados::IoCtx ioctx
;
171 rgw::BucketChangeObserver
*observer
= nullptr;
173 std::unique_ptr
<DataLogBackends
> bes
;
175 const int num_shards
;
176 std::string
get_prefix() {
177 auto prefix
= cct
->_conf
->rgw_data_log_obj_prefix
;
178 return prefix
.empty() ? prefix
: "data_log";
180 std::string
metadata_log_oid() {
181 return get_prefix() + "generations_metadata";
185 ceph::mutex lock
= ceph::make_mutex("RGWDataChangesLog::lock");
186 ceph::shared_mutex modified_lock
=
187 ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
188 bc::flat_map
<int, bc::flat_set
<std::string
>> modified_shards
;
190 std::atomic
<bool> down_flag
= { false };
192 struct ChangeStatus
{
193 std::shared_ptr
<const rgw_sync_policy_info
> sync_policy
;
194 ceph::real_time cur_expiration
;
195 ceph::real_time cur_sent
;
196 bool pending
= false;
197 RefCountedCond
* cond
= nullptr;
198 ceph::mutex lock
= ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
201 using ChangeStatusPtr
= std::shared_ptr
<ChangeStatus
>;
203 lru_map
<rgw_bucket_shard
, ChangeStatusPtr
> changes
;
205 bc::flat_set
<rgw_bucket_shard
> cur_cycle
;
207 void _get_change(const rgw_bucket_shard
& bs
, ChangeStatusPtr
& status
);
208 void register_renew(const rgw_bucket_shard
& bs
);
209 void update_renewed(const rgw_bucket_shard
& bs
, ceph::real_time expiration
);
211 ceph::mutex renew_lock
= ceph::make_mutex("ChangesRenewThread::lock");
212 ceph::condition_variable renew_cond
;
213 void renew_run() noexcept
;
215 std::thread renew_thread
;
217 std::function
<bool(const rgw_bucket
& bucket
, optional_yield y
, const DoutPrefixProvider
*dpp
)> bucket_filter
;
218 int choose_oid(const rgw_bucket_shard
& bs
);
219 bool going_down() const;
220 bool filter_bucket(const DoutPrefixProvider
*dpp
, const rgw_bucket
& bucket
, optional_yield y
) const;
221 int renew_entries(const DoutPrefixProvider
*dpp
);
225 RGWDataChangesLog(CephContext
* cct
);
226 ~RGWDataChangesLog();
228 int start(const DoutPrefixProvider
*dpp
, const RGWZone
* _zone
, const RGWZoneParams
& zoneparams
,
229 librados::Rados
* lr
);
231 int add_entry(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, int shard_id
);
232 int get_log_shard_id(rgw_bucket
& bucket
, int shard_id
);
233 int list_entries(const DoutPrefixProvider
*dpp
, int shard
, int max_entries
,
234 std::vector
<rgw_data_change_log_entry
>& entries
,
235 std::string_view marker
,
236 std::string
* out_marker
, bool* truncated
);
237 int trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
);
238 int trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
,
239 librados::AioCompletion
* c
); // :(
240 int get_info(const DoutPrefixProvider
*dpp
, int shard_id
, RGWDataChangesLogInfo
*info
);
242 using LogMarker
= RGWDataChangesLogMarker
;
244 int list_entries(const DoutPrefixProvider
*dpp
, int max_entries
,
245 std::vector
<rgw_data_change_log_entry
>& entries
,
246 LogMarker
& marker
, bool* ptruncated
);
248 void mark_modified(int shard_id
, const rgw_bucket_shard
& bs
);
249 auto read_clear_modified() {
250 std::unique_lock wl
{modified_lock
};
251 decltype(modified_shards
) modified
;
252 modified
.swap(modified_shards
);
253 modified_shards
.clear();
257 void set_observer(rgw::BucketChangeObserver
*observer
) {
258 this->observer
= observer
;
261 void set_bucket_filter(decltype(bucket_filter
)&& f
) {
262 bucket_filter
= std::move(f
);
264 // a marker that compares greater than any other
265 std::string
max_marker() const;
266 std::string
get_oid(uint64_t gen_id
, int shard_id
) const;
269 int change_format(const DoutPrefixProvider
*dpp
, log_type type
, optional_yield y
);
270 int trim_generations(const DoutPrefixProvider
*dpp
, std::optional
<uint64_t>& through
);
273 class RGWDataChangesBE
: public boost::intrusive_ref_counter
<RGWDataChangesBE
> {
275 librados::IoCtx
& ioctx
;
276 CephContext
* const cct
;
277 RGWDataChangesLog
& datalog
;
279 std::string
get_oid(int shard_id
) {
280 return datalog
.get_oid(gen_id
, shard_id
);
283 using entries
= std::variant
<std::list
<cls_log_entry
>,
284 std::vector
<ceph::buffer::list
>>;
286 const uint64_t gen_id
;
288 RGWDataChangesBE(librados::IoCtx
& ioctx
,
289 RGWDataChangesLog
& datalog
,
291 : ioctx(ioctx
), cct(static_cast<CephContext
*>(ioctx
.cct())),
292 datalog(datalog
), gen_id(gen_id
) {}
293 virtual ~RGWDataChangesBE() = default;
295 virtual void prepare(ceph::real_time now
,
296 const std::string
& key
,
297 ceph::buffer::list
&& entry
,
299 virtual int push(const DoutPrefixProvider
*dpp
, int index
, entries
&& items
) = 0;
300 virtual int push(const DoutPrefixProvider
*dpp
, int index
, ceph::real_time now
,
301 const std::string
& key
,
302 ceph::buffer::list
&& bl
) = 0;
303 virtual int list(const DoutPrefixProvider
*dpp
, int shard
, int max_entries
,
304 std::vector
<rgw_data_change_log_entry
>& entries
,
305 std::optional
<std::string_view
> marker
,
306 std::string
* out_marker
, bool* truncated
) = 0;
307 virtual int get_info(const DoutPrefixProvider
*dpp
, int index
, RGWDataChangesLogInfo
*info
) = 0;
308 virtual int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
) = 0;
309 virtual int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
,
310 librados::AioCompletion
* c
) = 0;
311 virtual std::string_view
max_marker() const = 0;
312 // 1 on empty, 0 on non-empty, negative on error.
313 virtual int is_empty(const DoutPrefixProvider
*dpp
) = 0;