1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include "common/debug.h"
7 #include "common/containers.h"
8 #include "common/errno.h"
9 #include "common/error_code.h"
11 #include "common/async/blocked_completion.h"
12 #include "common/async/librados_completion.h"
14 #include "cls/fifo/cls_fifo_types.h"
15 #include "cls/log/cls_log_client.h"
17 #include "cls_fifo_legacy.h"
18 #include "rgw_datalog.h"
19 #include "rgw_log_backing.h"
20 #include "rgw_tools.h"
22 #define dout_context g_ceph_context
23 static constexpr auto dout_subsys
= ceph_subsys_rgw
;
25 namespace bs
= boost::system
;
26 namespace lr
= librados
;
28 using ceph::containers::tiny_vector
;
30 void rgw_data_change::dump(ceph::Formatter
*f
) const
33 switch (entity_type
) {
34 case ENTITY_TYPE_BUCKET
:
40 encode_json("entity_type", type
, f
);
41 encode_json("key", key
, f
);
42 utime_t
ut(timestamp
);
43 encode_json("timestamp", ut
, f
);
46 void rgw_data_change::decode_json(JSONObj
*obj
) {
48 JSONDecoder::decode_json("entity_type", s
, obj
);
50 entity_type
= ENTITY_TYPE_BUCKET
;
52 entity_type
= ENTITY_TYPE_UNKNOWN
;
54 JSONDecoder::decode_json("key", key
, obj
);
56 JSONDecoder::decode_json("timestamp", ut
, obj
);
57 timestamp
= ut
.to_real_time();
60 void rgw_data_change_log_entry::dump(Formatter
*f
) const
62 encode_json("log_id", log_id
, f
);
63 utime_t
ut(log_timestamp
);
64 encode_json("log_timestamp", ut
, f
);
65 encode_json("entry", entry
, f
);
68 void rgw_data_change_log_entry::decode_json(JSONObj
*obj
) {
69 JSONDecoder::decode_json("log_id", log_id
, obj
);
71 JSONDecoder::decode_json("log_timestamp", ut
, obj
);
72 log_timestamp
= ut
.to_real_time();
73 JSONDecoder::decode_json("entry", entry
, obj
);
76 class RGWDataChangesOmap final
: public RGWDataChangesBE
{
77 using centries
= std::list
<cls_log_entry
>;
78 std::vector
<std::string
> oids
;
81 RGWDataChangesOmap(lr::IoCtx
& ioctx
,
82 RGWDataChangesLog
& datalog
,
85 : RGWDataChangesBE(ioctx
, datalog
, gen_id
) {
86 oids
.reserve(num_shards
);
87 for (auto i
= 0; i
< num_shards
; ++i
) {
88 oids
.push_back(get_oid(i
));
91 ~RGWDataChangesOmap() override
= default;
93 void prepare(ceph::real_time ut
, const std::string
& key
,
94 ceph::buffer::list
&& entry
, entries
& out
) override
{
95 if (!std::holds_alternative
<centries
>(out
)) {
96 ceph_assert(std::visit([](const auto& v
) { return std::empty(v
); }, out
));
101 cls_log_add_prepare_entry(e
, utime_t(ut
), {}, key
, entry
);
102 std::get
<centries
>(out
).push_back(std::move(e
));
104 int push(const DoutPrefixProvider
*dpp
, int index
, entries
&& items
) override
{
105 lr::ObjectWriteOperation op
;
106 cls_log_add(op
, std::get
<centries
>(items
), true);
107 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, null_yield
);
109 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
110 << ": failed to push to " << oids
[index
] << cpp_strerror(-r
)
115 int push(const DoutPrefixProvider
*dpp
, int index
, ceph::real_time now
,
116 const std::string
& key
,
117 ceph::buffer::list
&& bl
) override
{
118 lr::ObjectWriteOperation op
;
119 cls_log_add(op
, utime_t(now
), {}, key
, bl
);
120 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, null_yield
);
122 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
123 << ": failed to push to " << oids
[index
]
124 << cpp_strerror(-r
) << dendl
;
128 int list(const DoutPrefixProvider
*dpp
, int index
, int max_entries
,
129 std::vector
<rgw_data_change_log_entry
>& entries
,
130 std::optional
<std::string_view
> marker
,
131 std::string
* out_marker
, bool* truncated
) override
{
132 std::list
<cls_log_entry
> log_entries
;
133 lr::ObjectReadOperation op
;
134 cls_log_list(op
, {}, {}, std::string(marker
.value_or("")),
135 max_entries
, log_entries
, out_marker
, truncated
);
136 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, nullptr, null_yield
);
142 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
143 << ": failed to list " << oids
[index
]
144 << cpp_strerror(-r
) << dendl
;
147 for (auto iter
= log_entries
.begin(); iter
!= log_entries
.end(); ++iter
) {
148 rgw_data_change_log_entry log_entry
;
149 log_entry
.log_id
= iter
->id
;
150 auto rt
= iter
->timestamp
.to_real_time();
151 log_entry
.log_timestamp
= rt
;
152 auto liter
= iter
->data
.cbegin();
154 decode(log_entry
.entry
, liter
);
155 } catch (ceph::buffer::error
& err
) {
156 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
157 << ": failed to decode data changes log entry: "
158 << err
.what() << dendl
;
161 entries
.push_back(log_entry
);
165 int get_info(const DoutPrefixProvider
*dpp
, int index
, RGWDataChangesLogInfo
*info
) override
{
166 cls_log_header header
;
167 lr::ObjectReadOperation op
;
168 cls_log_info(op
, &header
);
169 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, nullptr, null_yield
);
170 if (r
== -ENOENT
) r
= 0;
172 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
173 << ": failed to get info from " << oids
[index
]
174 << cpp_strerror(-r
) << dendl
;
176 info
->marker
= header
.max_marker
;
177 info
->last_update
= header
.max_time
.to_real_time();
181 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
) override
{
182 lr::ObjectWriteOperation op
;
183 cls_log_trim(op
, {}, {}, {}, std::string(marker
));
184 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, null_yield
);
185 if (r
== -ENOENT
) r
= -ENODATA
;
186 if (r
< 0 && r
!= -ENODATA
) {
187 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
188 << ": failed to get info from " << oids
[index
]
189 << cpp_strerror(-r
) << dendl
;
193 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
,
194 lr::AioCompletion
* c
) override
{
195 lr::ObjectWriteOperation op
;
196 cls_log_trim(op
, {}, {}, {}, std::string(marker
));
197 auto r
= ioctx
.aio_operate(oids
[index
], c
, &op
, 0);
198 if (r
== -ENOENT
) r
= -ENODATA
;
200 lderr(cct
) << __PRETTY_FUNCTION__
201 << ": failed to get info from " << oids
[index
]
202 << cpp_strerror(-r
) << dendl
;
206 std::string_view
max_marker() const override
{
209 int is_empty(const DoutPrefixProvider
*dpp
) override
{
210 for (auto shard
= 0u; shard
< oids
.size(); ++shard
) {
211 std::list
<cls_log_entry
> log_entries
;
212 lr::ObjectReadOperation op
;
213 std::string out_marker
;
215 cls_log_list(op
, {}, {}, {}, 1, log_entries
, &out_marker
, &truncated
);
216 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[shard
], &op
, nullptr, null_yield
);
221 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
222 << ": failed to list " << oids
[shard
]
223 << cpp_strerror(-r
) << dendl
;
226 if (!log_entries
.empty()) {
234 class RGWDataChangesFIFO final
: public RGWDataChangesBE
{
235 using centries
= std::vector
<ceph::buffer::list
>;
236 tiny_vector
<LazyFIFO
> fifos
;
239 RGWDataChangesFIFO(lr::IoCtx
& ioctx
,
240 RGWDataChangesLog
& datalog
,
241 uint64_t gen_id
, int shards
)
242 : RGWDataChangesBE(ioctx
, datalog
, gen_id
),
243 fifos(shards
, [&ioctx
, this](std::size_t i
, auto emplacer
) {
244 emplacer
.emplace(ioctx
, get_oid(i
));
246 ~RGWDataChangesFIFO() override
= default;
247 void prepare(ceph::real_time
, const std::string
&,
248 ceph::buffer::list
&& entry
, entries
& out
) override
{
249 if (!std::holds_alternative
<centries
>(out
)) {
250 ceph_assert(std::visit([](auto& v
) { return std::empty(v
); }, out
));
253 std::get
<centries
>(out
).push_back(std::move(entry
));
255 int push(const DoutPrefixProvider
*dpp
, int index
, entries
&& items
) override
{
256 auto r
= fifos
[index
].push(dpp
, std::get
<centries
>(items
), null_yield
);
258 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
259 << ": unable to push to FIFO: " << get_oid(index
)
260 << ": " << cpp_strerror(-r
) << dendl
;
264 int push(const DoutPrefixProvider
*dpp
, int index
, ceph::real_time
,
266 ceph::buffer::list
&& bl
) override
{
267 auto r
= fifos
[index
].push(dpp
, std::move(bl
), null_yield
);
269 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
270 << ": unable to push to FIFO: " << get_oid(index
)
271 << ": " << cpp_strerror(-r
) << dendl
;
275 int list(const DoutPrefixProvider
*dpp
, int index
, int max_entries
,
276 std::vector
<rgw_data_change_log_entry
>& entries
,
277 std::optional
<std::string_view
> marker
,
278 std::string
* out_marker
, bool* truncated
) override
{
279 std::vector
<rgw::cls::fifo::list_entry
> log_entries
;
281 auto r
= fifos
[index
].list(dpp
, max_entries
, marker
, &log_entries
, &more
,
284 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
285 << ": unable to list FIFO: " << get_oid(index
)
286 << ": " << cpp_strerror(-r
) << dendl
;
289 for (const auto& entry
: log_entries
) {
290 rgw_data_change_log_entry log_entry
;
291 log_entry
.log_id
= entry
.marker
;
292 log_entry
.log_timestamp
= entry
.mtime
;
293 auto liter
= entry
.data
.cbegin();
295 decode(log_entry
.entry
, liter
);
296 } catch (const buffer::error
& err
) {
297 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
298 << ": failed to decode data changes log entry: "
299 << err
.what() << dendl
;
302 entries
.push_back(std::move(log_entry
));
306 if (out_marker
&& !log_entries
.empty()) {
307 *out_marker
= log_entries
.back().marker
;
311 int get_info(const DoutPrefixProvider
*dpp
, int index
, RGWDataChangesLogInfo
*info
) override
{
312 auto& fifo
= fifos
[index
];
313 auto r
= fifo
.read_meta(dpp
, null_yield
);
315 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
316 << ": unable to get FIFO metadata: " << get_oid(index
)
317 << ": " << cpp_strerror(-r
) << dendl
;
320 rados::cls::fifo::info m
;
321 fifo
.meta(dpp
, m
, null_yield
);
322 auto p
= m
.head_part_num
;
325 info
->last_update
= ceph::real_clock::zero();
328 rgw::cls::fifo::part_info h
;
329 r
= fifo
.get_part_info(dpp
, p
, &h
, null_yield
);
331 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
332 << ": unable to get part info: " << get_oid(index
) << "/" << p
333 << ": " << cpp_strerror(-r
) << dendl
;
336 info
->marker
= rgw::cls::fifo::marker
{p
, h
.last_ofs
}.to_string();
337 info
->last_update
= h
.max_time
;
340 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
) override
{
341 auto r
= fifos
[index
].trim(dpp
, marker
, false, null_yield
);
343 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
344 << ": unable to trim FIFO: " << get_oid(index
)
345 << ": " << cpp_strerror(-r
) << dendl
;
349 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
,
350 librados::AioCompletion
* c
) override
{
352 if (marker
== rgw::cls::fifo::marker(0, 0).to_string()) {
353 rgw_complete_aio_completion(c
, -ENODATA
);
355 fifos
[index
].trim(dpp
, marker
, false, c
, null_yield
);
359 std::string_view
max_marker() const override
{
360 static const std::string mm
=
361 rgw::cls::fifo::marker::max().to_string();
362 return std::string_view(mm
);
364 int is_empty(const DoutPrefixProvider
*dpp
) override
{
365 std::vector
<rgw::cls::fifo::list_entry
> log_entries
;
367 for (auto shard
= 0u; shard
< fifos
.size(); ++shard
) {
368 auto r
= fifos
[shard
].list(dpp
, 1, {}, &log_entries
, &more
,
371 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
372 << ": unable to list FIFO: " << get_oid(shard
)
373 << ": " << cpp_strerror(-r
) << dendl
;
376 if (!log_entries
.empty()) {
384 RGWDataChangesLog::RGWDataChangesLog(CephContext
* cct
)
386 num_shards(cct
->_conf
->rgw_data_log_num_shards
),
387 prefix(get_prefix()),
388 changes(cct
->_conf
->rgw_data_log_changes_size
) {}
390 bs::error_code
DataLogBackends::handle_init(entries_t e
) noexcept
{
391 std::unique_lock
l(m
);
393 for (const auto& [gen_id
, gen
] : e
) {
396 << __PRETTY_FUNCTION__
<< ":" << __LINE__
397 << ": ERROR: given empty generation: gen_id=" << gen_id
<< dendl
;
399 if (count(gen_id
) != 0) {
401 << __PRETTY_FUNCTION__
<< ":" << __LINE__
402 << ": ERROR: generation already exists: gen_id=" << gen_id
<< dendl
;
407 emplace(gen_id
, new RGWDataChangesOmap(ioctx
, datalog
, gen_id
, shards
));
410 emplace(gen_id
, new RGWDataChangesFIFO(ioctx
, datalog
, gen_id
, shards
));
414 << __PRETTY_FUNCTION__
<< ":" << __LINE__
415 << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
416 << ", type" << gen
.type
<< dendl
;
417 return bs::error_code(EFAULT
, bs::system_category());
419 } catch (const bs::system_error
& err
) {
421 << __PRETTY_FUNCTION__
<< ":" << __LINE__
422 << ": error setting up backend: gen_id=" << gen_id
423 << ", err=" << err
.what() << dendl
;
429 bs::error_code
DataLogBackends::handle_new_gens(entries_t e
) noexcept
{
430 return handle_init(std::move(e
));
432 bs::error_code
DataLogBackends::handle_empty_to(uint64_t new_tail
) noexcept
{
433 std::unique_lock
l(m
);
435 if (i
->first
< new_tail
) {
438 if (new_tail
>= (cend() - 1)->first
) {
440 << __PRETTY_FUNCTION__
<< ":" << __LINE__
441 << ": ERROR: attempt to trim head: new_tail=" << new_tail
<< dendl
;
442 return bs::error_code(EFAULT
, bs::system_category());
444 erase(i
, upper_bound(new_tail
));
449 int RGWDataChangesLog::start(const DoutPrefixProvider
*dpp
,
450 const RGWZone
* _zone
,
451 const RGWZoneParams
& zoneparams
,
456 auto defbacking
= to_log_type(
457 cct
->_conf
.get_val
<std::string
>("rgw_default_data_log_backing"));
458 // Should be guaranteed by `set_enum_allowed`
459 ceph_assert(defbacking
);
460 auto log_pool
= zoneparams
.log_pool
;
461 auto r
= rgw_init_ioctx(dpp
, lr
, log_pool
, ioctx
, true, false);
463 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
464 << ": Failed to initialized ioctx, r=" << r
465 << ", pool=" << log_pool
<< dendl
;
469 auto besr
= logback_generations::init
<DataLogBackends
>(
470 dpp
, ioctx
, metadata_log_oid(), [this](uint64_t gen_id
, int shard
) {
471 return get_oid(gen_id
, shard
);
473 num_shards
, *defbacking
, null_yield
, *this);
477 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
478 << ": Error initializing backends: "
479 << besr
.error().message() << dendl
;
480 return ceph::from_error_code(besr
.error());
483 bes
= std::move(*besr
);
484 renew_thread
= make_named_thread("rgw_dt_lg_renew",
485 &RGWDataChangesLog::renew_run
, this);
489 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard
& bs
) {
490 const auto& name
= bs
.bucket
.name
;
491 auto shard_shift
= (bs
.shard_id
> 0 ? bs
.shard_id
: 0);
492 auto r
= (ceph_str_hash_linux(name
.data(), name
.size()) +
493 shard_shift
) % num_shards
;
494 return static_cast<int>(r
);
497 int RGWDataChangesLog::renew_entries(const DoutPrefixProvider
*dpp
)
502 /* we can't keep the bucket name as part of the cls_log_entry, and we need
503 * it later, so we keep two lists under the map */
504 bc::flat_map
<int, std::pair
<std::vector
<rgw_bucket_shard
>,
505 RGWDataChangesBE::entries
>> m
;
507 std::unique_lock
l(lock
);
508 decltype(cur_cycle
) entries
;
509 entries
.swap(cur_cycle
);
512 auto ut
= real_clock::now();
513 auto be
= bes
->head();
514 for (const auto& bs
: entries
) {
515 auto index
= choose_oid(bs
);
517 rgw_data_change change
;
519 change
.entity_type
= ENTITY_TYPE_BUCKET
;
520 change
.key
= bs
.get_key();
521 change
.timestamp
= ut
;
524 m
[index
].first
.push_back(bs
);
525 be
->prepare(ut
, change
.key
, std::move(bl
), m
[index
].second
);
528 for (auto& [index
, p
] : m
) {
529 auto& [buckets
, entries
] = p
;
531 auto now
= real_clock::now();
533 auto ret
= be
->push(dpp
, index
, std::move(entries
));
535 /* we don't really need to have a special handling for failed cases here,
536 * as this is just an optimization. */
537 ldpp_dout(dpp
, -1) << "ERROR: svc.cls->timelog.add() returned " << ret
<< dendl
;
541 auto expiration
= now
;
542 expiration
+= ceph::make_timespan(cct
->_conf
->rgw_data_log_window
);
543 for (auto& bs
: buckets
) {
544 update_renewed(bs
, expiration
);
551 void RGWDataChangesLog::_get_change(const rgw_bucket_shard
& bs
,
552 ChangeStatusPtr
& status
)
554 ceph_assert(ceph_mutex_is_locked(lock
));
555 if (!changes
.find(bs
, status
)) {
556 status
= ChangeStatusPtr(new ChangeStatus
);
557 changes
.add(bs
, status
);
561 void RGWDataChangesLog::register_renew(const rgw_bucket_shard
& bs
)
563 std::scoped_lock l
{lock
};
564 cur_cycle
.insert(bs
);
567 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard
& bs
,
568 real_time expiration
)
570 std::scoped_lock l
{lock
};
571 ChangeStatusPtr status
;
572 _get_change(bs
, status
);
574 ldout(cct
, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
575 << bs
.bucket
.name
<< " shard_id=" << bs
.shard_id
576 << " expiration=" << expiration
<< dendl
;
577 status
->cur_expiration
= expiration
;
580 int RGWDataChangesLog::get_log_shard_id(rgw_bucket
& bucket
, int shard_id
) {
581 rgw_bucket_shard
bs(bucket
, shard_id
);
582 return choose_oid(bs
);
585 bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider
*dpp
,
586 const rgw_bucket
& bucket
,
587 optional_yield y
) const
589 if (!bucket_filter
) {
593 return bucket_filter(bucket
, y
, dpp
);
596 std::string
RGWDataChangesLog::get_oid(uint64_t gen_id
, int i
) const {
598 fmt::format("{}@G{}.{}", prefix
, gen_id
, i
) :
599 fmt::format("{}.{}", prefix
, i
));
602 int RGWDataChangesLog::add_entry(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, int shard_id
) {
603 auto& bucket
= bucket_info
.bucket
;
605 if (!filter_bucket(dpp
, bucket
, null_yield
)) {
610 observer
->on_bucket_changed(bucket
.get_key());
613 rgw_bucket_shard
bs(bucket
, shard_id
);
615 int index
= choose_oid(bs
);
616 mark_modified(index
, bs
);
618 std::unique_lock
l(lock
);
620 ChangeStatusPtr status
;
621 _get_change(bs
, status
);
624 auto now
= real_clock::now();
626 std::unique_lock
sl(status
->lock
);
628 ldpp_dout(dpp
, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket
.name
629 << " shard_id=" << shard_id
<< " now=" << now
630 << " cur_expiration=" << status
->cur_expiration
<< dendl
;
632 if (now
< status
->cur_expiration
) {
633 /* no need to send, recently completed */
639 RefCountedCond
* cond
;
641 if (status
->pending
) {
649 int ret
= cond
->wait();
657 status
->cond
= new RefCountedCond
;
658 status
->pending
= true;
660 ceph::real_time expiration
;
665 status
->cur_sent
= now
;
668 expiration
+= ceph::make_timespan(cct
->_conf
->rgw_data_log_window
);
672 ceph::buffer::list bl
;
673 rgw_data_change change
;
674 change
.entity_type
= ENTITY_TYPE_BUCKET
;
675 change
.key
= bs
.get_key();
676 change
.timestamp
= now
;
679 ldpp_dout(dpp
, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now
<< " cur_expiration=" << expiration
<< dendl
;
681 auto be
= bes
->head();
682 ret
= be
->push(dpp
, index
, now
, change
.key
, std::move(bl
));
684 now
= real_clock::now();
688 } while (!ret
&& real_clock::now() > expiration
);
692 status
->pending
= false;
693 /* time of when operation started, not completed */
694 status
->cur_expiration
= status
->cur_sent
;
695 status
->cur_expiration
+= make_timespan(cct
->_conf
->rgw_data_log_window
);
696 status
->cond
= nullptr;
705 int DataLogBackends::list(const DoutPrefixProvider
*dpp
, int shard
, int max_entries
,
706 std::vector
<rgw_data_change_log_entry
>& entries
,
707 std::string_view marker
,
708 std::string
* out_marker
,
711 const auto [start_id
, start_cursor
] = cursorgen(marker
);
712 auto gen_id
= start_id
;
713 std::string out_cursor
;
714 while (max_entries
> 0) {
715 std::vector
<rgw_data_change_log_entry
> gentries
;
716 std::unique_lock
l(m
);
717 auto i
= lower_bound(gen_id
);
718 if (i
== end()) return 0;
722 auto r
= be
->list(dpp
, shard
, max_entries
, gentries
,
723 gen_id
== start_id
? start_cursor
: std::string
{},
724 &out_cursor
, truncated
);
728 if (out_marker
&& !out_cursor
.empty()) {
729 *out_marker
= gencursor(gen_id
, out_cursor
);
731 for (auto& g
: gentries
) {
732 g
.log_id
= gencursor(gen_id
, g
.log_id
);
734 if (int s
= gentries
.size(); s
< 0 || s
> max_entries
)
737 max_entries
-= gentries
.size();
739 std::move(gentries
.begin(), gentries
.end(),
740 std::back_inserter(entries
));
746 int RGWDataChangesLog::list_entries(const DoutPrefixProvider
*dpp
, int shard
, int max_entries
,
747 std::vector
<rgw_data_change_log_entry
>& entries
,
748 std::string_view marker
,
749 std::string
* out_marker
, bool* truncated
)
751 assert(shard
< num_shards
);
752 return bes
->list(dpp
, shard
, max_entries
, entries
, marker
, out_marker
, truncated
);
755 int RGWDataChangesLog::list_entries(const DoutPrefixProvider
*dpp
, int max_entries
,
756 std::vector
<rgw_data_change_log_entry
>& entries
,
757 LogMarker
& marker
, bool *ptruncated
)
761 for (; marker
.shard
< num_shards
&& int(entries
.size()) < max_entries
;
762 marker
.shard
++, marker
.marker
.clear()) {
763 int ret
= list_entries(dpp
, marker
.shard
, max_entries
- entries
.size(),
764 entries
, marker
.marker
, NULL
, &truncated
);
765 if (ret
== -ENOENT
) {
776 *ptruncated
= (marker
.shard
< num_shards
);
780 int RGWDataChangesLog::get_info(const DoutPrefixProvider
*dpp
, int shard_id
, RGWDataChangesLogInfo
*info
)
782 assert(shard_id
< num_shards
);
783 auto be
= bes
->head();
784 auto r
= be
->get_info(dpp
, shard_id
, info
);
785 if (!info
->marker
.empty()) {
786 info
->marker
= gencursor(be
->gen_id
, info
->marker
);
791 int DataLogBackends::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
)
793 auto [target_gen
, cursor
] = cursorgen(marker
);
794 std::unique_lock
l(m
);
795 const auto head_gen
= (end() - 1)->second
->gen_id
;
796 const auto tail_gen
= begin()->first
;
797 if (target_gen
< tail_gen
) return 0;
799 for (auto be
= lower_bound(0)->second
;
800 be
->gen_id
<= target_gen
&& be
->gen_id
<= head_gen
&& r
>= 0;
801 be
= upper_bound(be
->gen_id
)->second
) {
803 auto c
= be
->gen_id
== target_gen
? cursor
: be
->max_marker();
804 r
= be
->trim(dpp
, shard_id
, c
);
807 if (r
== -ENODATA
&& be
->gen_id
< target_gen
)
809 if (be
->gen_id
== target_gen
)
816 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
)
818 assert(shard_id
< num_shards
);
819 return bes
->trim_entries(dpp
, shard_id
, marker
);
822 class GenTrim
: public rgw::cls::fifo::Completion
<GenTrim
> {
824 DataLogBackends
* const bes
;
826 const uint64_t target_gen
;
827 const std::string cursor
;
828 const uint64_t head_gen
;
829 const uint64_t tail_gen
;
830 boost::intrusive_ptr
<RGWDataChangesBE
> be
;
832 GenTrim(const DoutPrefixProvider
*dpp
, DataLogBackends
* bes
, int shard_id
, uint64_t target_gen
,
833 std::string cursor
, uint64_t head_gen
, uint64_t tail_gen
,
834 boost::intrusive_ptr
<RGWDataChangesBE
> be
,
835 lr::AioCompletion
* super
)
836 : Completion(dpp
, super
), bes(bes
), shard_id(shard_id
), target_gen(target_gen
),
837 cursor(std::move(cursor
)), head_gen(head_gen
), tail_gen(tail_gen
),
840 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
841 auto gen_id
= be
->gen_id
;
845 if (r
== -ENODATA
&& gen_id
< target_gen
)
848 complete(std::move(p
), r
);
853 std::unique_lock
l(bes
->m
);
854 auto i
= bes
->upper_bound(gen_id
);
855 if (i
== bes
->end() || i
->first
> target_gen
|| i
->first
> head_gen
) {
857 complete(std::move(p
), -ENODATA
);
862 auto c
= be
->gen_id
== target_gen
? cursor
: be
->max_marker();
863 be
->trim(dpp
, shard_id
, c
, call(std::move(p
)));
867 void DataLogBackends::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
,
868 librados::AioCompletion
* c
)
870 auto [target_gen
, cursor
] = cursorgen(marker
);
871 std::unique_lock
l(m
);
872 const auto head_gen
= (end() - 1)->second
->gen_id
;
873 const auto tail_gen
= begin()->first
;
874 if (target_gen
< tail_gen
) {
876 rgw_complete_aio_completion(c
, -ENODATA
);
879 auto be
= begin()->second
;
881 auto gt
= std::make_unique
<GenTrim
>(dpp
, this, shard_id
, target_gen
,
882 std::string(cursor
), head_gen
, tail_gen
,
885 auto cc
= be
->gen_id
== target_gen
? cursor
: be
->max_marker();
886 be
->trim(dpp
, shard_id
, cc
, GenTrim::call(std::move(gt
)));
889 int DataLogBackends::trim_generations(const DoutPrefixProvider
*dpp
, std::optional
<uint64_t>& through
) {
891 std::vector
<mapped_type
> candidates
;
893 std::scoped_lock
l(m
);
895 for (auto i
= cbegin(); i
< e
; ++i
) {
896 candidates
.push_back(i
->second
);
900 std::optional
<uint64_t> highest
;
901 for (auto& be
: candidates
) {
902 auto r
= be
->is_empty(dpp
);
906 highest
= be
->gen_id
;
916 auto ec
= empty_to(dpp
, *highest
, null_yield
);
918 return ceph::from_error_code(ec
);
922 return ceph::from_error_code(remove_empty(dpp
, null_yield
));
926 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
,
927 librados::AioCompletion
* c
)
929 assert(shard_id
< num_shards
);
930 bes
->trim_entries(dpp
, shard_id
, marker
, c
);
934 bool RGWDataChangesLog::going_down() const
939 RGWDataChangesLog::~RGWDataChangesLog() {
941 if (renew_thread
.joinable()) {
947 void RGWDataChangesLog::renew_run() {
948 static constexpr auto runs_per_prune
= 150;
951 const DoutPrefix
dp(cct
, dout_subsys
, "rgw data changes log: ");
952 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl
;
953 int r
= renew_entries(&dp
);
955 ldpp_dout(&dp
, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r
<< dendl
;
961 if (run
== runs_per_prune
) {
962 std::optional
<uint64_t> through
;
963 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl
;
964 trim_generations(&dp
, through
);
966 derr
<< "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
968 } else if (through
) {
969 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations "
970 << "through " << *through
<< "." << dendl
;
972 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune."
980 int interval
= cct
->_conf
->rgw_data_log_window
* 3 / 4;
981 std::unique_lock locker
{renew_lock
};
982 renew_cond
.wait_for(locker
, std::chrono::seconds(interval
));
986 void RGWDataChangesLog::renew_stop()
988 std::lock_guard l
{renew_lock
};
989 renew_cond
.notify_all();
992 void RGWDataChangesLog::mark_modified(int shard_id
, const rgw_bucket_shard
& bs
)
994 auto key
= bs
.get_key();
996 std::shared_lock rl
{modified_lock
}; // read lock to check for existence
997 auto shard
= modified_shards
.find(shard_id
);
998 if (shard
!= modified_shards
.end() && shard
->second
.count(key
)) {
1003 std::unique_lock wl
{modified_lock
}; // write lock for insertion
1004 modified_shards
[shard_id
].insert(key
);
1007 std::string
RGWDataChangesLog::max_marker() const {
1008 return gencursor(std::numeric_limits
<uint64_t>::max(),
1009 "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
1012 int RGWDataChangesLog::change_format(const DoutPrefixProvider
*dpp
, log_type type
, optional_yield y
) {
1013 return ceph::from_error_code(bes
->new_backing(dpp
, type
, y
));
1016 int RGWDataChangesLog::trim_generations(const DoutPrefixProvider
*dpp
, std::optional
<uint64_t>& through
) {
1017 return bes
->trim_generations(dpp
, through
);