1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include "common/debug.h"
7 #include "common/containers.h"
8 #include "common/errno.h"
9 #include "common/error_code.h"
11 #include "common/async/blocked_completion.h"
12 #include "common/async/librados_completion.h"
14 #include "cls/fifo/cls_fifo_types.h"
15 #include "cls/log/cls_log_client.h"
17 #include "cls_fifo_legacy.h"
18 #include "rgw_datalog.h"
19 #include "rgw_log_backing.h"
20 #include "rgw_tools.h"
22 #define dout_context g_ceph_context
23 static constexpr auto dout_subsys
= ceph_subsys_rgw
;
25 namespace bs
= boost::system
;
26 namespace lr
= librados
;
28 using ceph::containers::tiny_vector
;
30 void rgw_data_change::dump(ceph::Formatter
*f
) const
33 switch (entity_type
) {
34 case ENTITY_TYPE_BUCKET
:
40 encode_json("entity_type", type
, f
);
41 encode_json("key", key
, f
);
42 utime_t
ut(timestamp
);
43 encode_json("timestamp", ut
, f
);
46 void rgw_data_change::decode_json(JSONObj
*obj
) {
48 JSONDecoder::decode_json("entity_type", s
, obj
);
50 entity_type
= ENTITY_TYPE_BUCKET
;
52 entity_type
= ENTITY_TYPE_UNKNOWN
;
54 JSONDecoder::decode_json("key", key
, obj
);
56 JSONDecoder::decode_json("timestamp", ut
, obj
);
57 timestamp
= ut
.to_real_time();
60 void rgw_data_change_log_entry::dump(Formatter
*f
) const
62 encode_json("log_id", log_id
, f
);
63 utime_t
ut(log_timestamp
);
64 encode_json("log_timestamp", ut
, f
);
65 encode_json("entry", entry
, f
);
68 void rgw_data_change_log_entry::decode_json(JSONObj
*obj
) {
69 JSONDecoder::decode_json("log_id", log_id
, obj
);
71 JSONDecoder::decode_json("log_timestamp", ut
, obj
);
72 log_timestamp
= ut
.to_real_time();
73 JSONDecoder::decode_json("entry", entry
, obj
);
76 class RGWDataChangesOmap final
: public RGWDataChangesBE
{
77 using centries
= std::list
<cls_log_entry
>;
78 std::vector
<std::string
> oids
;
81 RGWDataChangesOmap(lr::IoCtx
& ioctx
,
82 RGWDataChangesLog
& datalog
,
85 : RGWDataChangesBE(ioctx
, datalog
, gen_id
) {
86 oids
.reserve(num_shards
);
87 for (auto i
= 0; i
< num_shards
; ++i
) {
88 oids
.push_back(get_oid(i
));
91 ~RGWDataChangesOmap() override
= default;
93 void prepare(ceph::real_time ut
, const std::string
& key
,
94 ceph::buffer::list
&& entry
, entries
& out
) override
{
95 if (!std::holds_alternative
<centries
>(out
)) {
96 ceph_assert(std::visit([](const auto& v
) { return std::empty(v
); }, out
));
101 cls_log_add_prepare_entry(e
, utime_t(ut
), {}, key
, entry
);
102 std::get
<centries
>(out
).push_back(std::move(e
));
104 int push(const DoutPrefixProvider
*dpp
, int index
, entries
&& items
) override
{
105 lr::ObjectWriteOperation op
;
106 cls_log_add(op
, std::get
<centries
>(items
), true);
107 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, null_yield
);
109 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
110 << ": failed to push to " << oids
[index
] << cpp_strerror(-r
)
115 int push(const DoutPrefixProvider
*dpp
, int index
, ceph::real_time now
,
116 const std::string
& key
,
117 ceph::buffer::list
&& bl
) override
{
118 lr::ObjectWriteOperation op
;
119 cls_log_add(op
, utime_t(now
), {}, key
, bl
);
120 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, null_yield
);
122 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
123 << ": failed to push to " << oids
[index
]
124 << cpp_strerror(-r
) << dendl
;
128 int list(const DoutPrefixProvider
*dpp
, int index
, int max_entries
,
129 std::vector
<rgw_data_change_log_entry
>& entries
,
130 std::optional
<std::string_view
> marker
,
131 std::string
* out_marker
, bool* truncated
) override
{
132 std::list
<cls_log_entry
> log_entries
;
133 lr::ObjectReadOperation op
;
134 cls_log_list(op
, {}, {}, std::string(marker
.value_or("")),
135 max_entries
, log_entries
, out_marker
, truncated
);
136 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, nullptr, null_yield
);
142 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
143 << ": failed to list " << oids
[index
]
144 << cpp_strerror(-r
) << dendl
;
147 for (auto iter
= log_entries
.begin(); iter
!= log_entries
.end(); ++iter
) {
148 rgw_data_change_log_entry log_entry
;
149 log_entry
.log_id
= iter
->id
;
150 auto rt
= iter
->timestamp
.to_real_time();
151 log_entry
.log_timestamp
= rt
;
152 auto liter
= iter
->data
.cbegin();
154 decode(log_entry
.entry
, liter
);
155 } catch (ceph::buffer::error
& err
) {
156 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
157 << ": failed to decode data changes log entry: "
158 << err
.what() << dendl
;
161 entries
.push_back(log_entry
);
165 int get_info(const DoutPrefixProvider
*dpp
, int index
, RGWDataChangesLogInfo
*info
) override
{
166 cls_log_header header
;
167 lr::ObjectReadOperation op
;
168 cls_log_info(op
, &header
);
169 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, nullptr, null_yield
);
170 if (r
== -ENOENT
) r
= 0;
172 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
173 << ": failed to get info from " << oids
[index
]
174 << cpp_strerror(-r
) << dendl
;
176 info
->marker
= header
.max_marker
;
177 info
->last_update
= header
.max_time
.to_real_time();
181 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
) override
{
182 lr::ObjectWriteOperation op
;
183 cls_log_trim(op
, {}, {}, {}, std::string(marker
));
184 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[index
], &op
, null_yield
);
185 if (r
== -ENOENT
) r
= -ENODATA
;
186 if (r
< 0 && r
!= -ENODATA
) {
187 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
188 << ": failed to get info from " << oids
[index
]
189 << cpp_strerror(-r
) << dendl
;
193 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
,
194 lr::AioCompletion
* c
) override
{
195 lr::ObjectWriteOperation op
;
196 cls_log_trim(op
, {}, {}, {}, std::string(marker
));
197 auto r
= ioctx
.aio_operate(oids
[index
], c
, &op
, 0);
198 if (r
== -ENOENT
) r
= -ENODATA
;
200 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
201 << ": failed to get info from " << oids
[index
]
202 << cpp_strerror(-r
) << dendl
;
206 std::string_view
max_marker() const override
{
209 int is_empty(const DoutPrefixProvider
*dpp
) override
{
210 for (auto shard
= 0u; shard
< oids
.size(); ++shard
) {
211 std::list
<cls_log_entry
> log_entries
;
212 lr::ObjectReadOperation op
;
213 std::string out_marker
;
215 cls_log_list(op
, {}, {}, {}, 1, log_entries
, &out_marker
, &truncated
);
216 auto r
= rgw_rados_operate(dpp
, ioctx
, oids
[shard
], &op
, nullptr, null_yield
);
221 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
222 << ": failed to list " << oids
[shard
]
223 << cpp_strerror(-r
) << dendl
;
226 if (!log_entries
.empty()) {
234 class RGWDataChangesFIFO final
: public RGWDataChangesBE
{
235 using centries
= std::vector
<ceph::buffer::list
>;
236 tiny_vector
<LazyFIFO
> fifos
;
239 RGWDataChangesFIFO(lr::IoCtx
& ioctx
,
240 RGWDataChangesLog
& datalog
,
241 uint64_t gen_id
, int shards
)
242 : RGWDataChangesBE(ioctx
, datalog
, gen_id
),
243 fifos(shards
, [&ioctx
, this](std::size_t i
, auto emplacer
) {
244 emplacer
.emplace(ioctx
, get_oid(i
));
246 ~RGWDataChangesFIFO() override
= default;
247 void prepare(ceph::real_time
, const std::string
&,
248 ceph::buffer::list
&& entry
, entries
& out
) override
{
249 if (!std::holds_alternative
<centries
>(out
)) {
250 ceph_assert(std::visit([](auto& v
) { return std::empty(v
); }, out
));
253 std::get
<centries
>(out
).push_back(std::move(entry
));
255 int push(const DoutPrefixProvider
*dpp
, int index
, entries
&& items
) override
{
256 auto r
= fifos
[index
].push(dpp
, std::get
<centries
>(items
), null_yield
);
258 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
259 << ": unable to push to FIFO: " << get_oid(index
)
260 << ": " << cpp_strerror(-r
) << dendl
;
264 int push(const DoutPrefixProvider
*dpp
, int index
, ceph::real_time
,
266 ceph::buffer::list
&& bl
) override
{
267 auto r
= fifos
[index
].push(dpp
, std::move(bl
), null_yield
);
269 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
270 << ": unable to push to FIFO: " << get_oid(index
)
271 << ": " << cpp_strerror(-r
) << dendl
;
275 int list(const DoutPrefixProvider
*dpp
, int index
, int max_entries
,
276 std::vector
<rgw_data_change_log_entry
>& entries
,
277 std::optional
<std::string_view
> marker
,
278 std::string
* out_marker
, bool* truncated
) override
{
279 std::vector
<rgw::cls::fifo::list_entry
> log_entries
;
281 auto r
= fifos
[index
].list(dpp
, max_entries
, marker
, &log_entries
, &more
,
284 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
285 << ": unable to list FIFO: " << get_oid(index
)
286 << ": " << cpp_strerror(-r
) << dendl
;
289 for (const auto& entry
: log_entries
) {
290 rgw_data_change_log_entry log_entry
;
291 log_entry
.log_id
= entry
.marker
;
292 log_entry
.log_timestamp
= entry
.mtime
;
293 auto liter
= entry
.data
.cbegin();
295 decode(log_entry
.entry
, liter
);
296 } catch (const buffer::error
& err
) {
297 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
298 << ": failed to decode data changes log entry: "
299 << err
.what() << dendl
;
302 entries
.push_back(std::move(log_entry
));
306 if (out_marker
&& !log_entries
.empty()) {
307 *out_marker
= log_entries
.back().marker
;
311 int get_info(const DoutPrefixProvider
*dpp
, int index
, RGWDataChangesLogInfo
*info
) override
{
312 auto& fifo
= fifos
[index
];
313 auto r
= fifo
.read_meta(dpp
, null_yield
);
315 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
316 << ": unable to get FIFO metadata: " << get_oid(index
)
317 << ": " << cpp_strerror(-r
) << dendl
;
320 rados::cls::fifo::info m
;
321 fifo
.meta(dpp
, m
, null_yield
);
322 auto p
= m
.head_part_num
;
325 info
->last_update
= ceph::real_clock::zero();
328 rgw::cls::fifo::part_info h
;
329 r
= fifo
.get_part_info(dpp
, p
, &h
, null_yield
);
331 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
332 << ": unable to get part info: " << get_oid(index
) << "/" << p
333 << ": " << cpp_strerror(-r
) << dendl
;
336 info
->marker
= rgw::cls::fifo::marker
{p
, h
.last_ofs
}.to_string();
337 info
->last_update
= h
.max_time
;
340 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
) override
{
341 auto r
= fifos
[index
].trim(dpp
, marker
, false, null_yield
);
343 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
344 << ": unable to trim FIFO: " << get_oid(index
)
345 << ": " << cpp_strerror(-r
) << dendl
;
349 int trim(const DoutPrefixProvider
*dpp
, int index
, std::string_view marker
,
350 librados::AioCompletion
* c
) override
{
352 if (marker
== rgw::cls::fifo::marker(0, 0).to_string()) {
353 rgw_complete_aio_completion(c
, -ENODATA
);
355 fifos
[index
].trim(dpp
, marker
, false, c
, null_yield
);
359 std::string_view
max_marker() const override
{
360 static const std::string mm
=
361 rgw::cls::fifo::marker::max().to_string();
362 return std::string_view(mm
);
364 int is_empty(const DoutPrefixProvider
*dpp
) override
{
365 std::vector
<rgw::cls::fifo::list_entry
> log_entries
;
367 for (auto shard
= 0u; shard
< fifos
.size(); ++shard
) {
368 auto r
= fifos
[shard
].list(dpp
, 1, {}, &log_entries
, &more
,
371 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
372 << ": unable to list FIFO: " << get_oid(shard
)
373 << ": " << cpp_strerror(-r
) << dendl
;
376 if (!log_entries
.empty()) {
384 RGWDataChangesLog::RGWDataChangesLog(CephContext
* cct
)
386 num_shards(cct
->_conf
->rgw_data_log_num_shards
),
387 prefix(get_prefix()),
388 changes(cct
->_conf
->rgw_data_log_changes_size
) {}
390 bs::error_code
DataLogBackends::handle_init(entries_t e
) noexcept
{
391 std::unique_lock
l(m
);
393 for (const auto& [gen_id
, gen
] : e
) {
396 << __PRETTY_FUNCTION__
<< ":" << __LINE__
397 << ": ERROR: given empty generation: gen_id=" << gen_id
<< dendl
;
399 if (count(gen_id
) != 0) {
401 << __PRETTY_FUNCTION__
<< ":" << __LINE__
402 << ": ERROR: generation already exists: gen_id=" << gen_id
<< dendl
;
407 emplace(gen_id
, new RGWDataChangesOmap(ioctx
, datalog
, gen_id
, shards
));
410 emplace(gen_id
, new RGWDataChangesFIFO(ioctx
, datalog
, gen_id
, shards
));
414 << __PRETTY_FUNCTION__
<< ":" << __LINE__
415 << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
416 << ", type" << gen
.type
<< dendl
;
417 return bs::error_code(EFAULT
, bs::system_category());
419 } catch (const bs::system_error
& err
) {
421 << __PRETTY_FUNCTION__
<< ":" << __LINE__
422 << ": error setting up backend: gen_id=" << gen_id
423 << ", err=" << err
.what() << dendl
;
429 bs::error_code
DataLogBackends::handle_new_gens(entries_t e
) noexcept
{
430 return handle_init(std::move(e
));
432 bs::error_code
DataLogBackends::handle_empty_to(uint64_t new_tail
) noexcept
{
433 std::unique_lock
l(m
);
435 if (i
->first
< new_tail
) {
438 if (new_tail
>= (cend() - 1)->first
) {
440 << __PRETTY_FUNCTION__
<< ":" << __LINE__
441 << ": ERROR: attempt to trim head: new_tail=" << new_tail
<< dendl
;
442 return bs::error_code(EFAULT
, bs::system_category());
444 erase(i
, upper_bound(new_tail
));
449 int RGWDataChangesLog::start(const DoutPrefixProvider
*dpp
, const RGWZone
* _zone
,
450 const RGWZoneParams
& zoneparams
,
455 auto defbacking
= to_log_type(
456 cct
->_conf
.get_val
<std::string
>("rgw_default_data_log_backing"));
457 // Should be guaranteed by `set_enum_allowed`
458 ceph_assert(defbacking
);
459 auto log_pool
= zoneparams
.log_pool
;
460 auto r
= rgw_init_ioctx(dpp
, lr
, log_pool
, ioctx
, true, false);
462 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
463 << ": Failed to initialized ioctx, r=" << r
464 << ", pool=" << log_pool
<< dendl
;
468 auto besr
= logback_generations::init
<DataLogBackends
>(
469 dpp
, ioctx
, metadata_log_oid(), [this](uint64_t gen_id
, int shard
) {
470 return get_oid(gen_id
, shard
);
472 num_shards
, *defbacking
, null_yield
, *this);
476 lderr(cct
) << __PRETTY_FUNCTION__
477 << ": Error initializing backends: "
478 << besr
.error().message() << dendl
;
479 return ceph::from_error_code(besr
.error());
482 bes
= std::move(*besr
);
483 renew_thread
= make_named_thread("rgw_dt_lg_renew",
484 &RGWDataChangesLog::renew_run
, this);
488 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard
& bs
) {
489 const auto& name
= bs
.bucket
.name
;
490 auto shard_shift
= (bs
.shard_id
> 0 ? bs
.shard_id
: 0);
491 auto r
= (ceph_str_hash_linux(name
.data(), name
.size()) +
492 shard_shift
) % num_shards
;
493 return static_cast<int>(r
);
496 int RGWDataChangesLog::renew_entries(const DoutPrefixProvider
*dpp
)
501 /* we can't keep the bucket name as part of the cls_log_entry, and we need
502 * it later, so we keep two lists under the map */
503 bc::flat_map
<int, std::pair
<std::vector
<rgw_bucket_shard
>,
504 RGWDataChangesBE::entries
>> m
;
506 std::unique_lock
l(lock
);
507 decltype(cur_cycle
) entries
;
508 entries
.swap(cur_cycle
);
511 auto ut
= real_clock::now();
512 auto be
= bes
->head();
513 for (const auto& bs
: entries
) {
514 auto index
= choose_oid(bs
);
516 rgw_data_change change
;
518 change
.entity_type
= ENTITY_TYPE_BUCKET
;
519 change
.key
= bs
.get_key();
520 change
.timestamp
= ut
;
523 m
[index
].first
.push_back(bs
);
524 be
->prepare(ut
, change
.key
, std::move(bl
), m
[index
].second
);
527 for (auto& [index
, p
] : m
) {
528 auto& [buckets
, entries
] = p
;
530 auto now
= real_clock::now();
532 auto ret
= be
->push(dpp
, index
, std::move(entries
));
534 /* we don't really need to have a special handling for failed cases here,
535 * as this is just an optimization. */
536 ldpp_dout(dpp
, -1) << "ERROR: svc.cls->timelog.add() returned " << ret
<< dendl
;
540 auto expiration
= now
;
541 expiration
+= ceph::make_timespan(cct
->_conf
->rgw_data_log_window
);
542 for (auto& bs
: buckets
) {
543 update_renewed(bs
, expiration
);
550 void RGWDataChangesLog::_get_change(const rgw_bucket_shard
& bs
,
551 ChangeStatusPtr
& status
)
553 ceph_assert(ceph_mutex_is_locked(lock
));
554 if (!changes
.find(bs
, status
)) {
555 status
= ChangeStatusPtr(new ChangeStatus
);
556 changes
.add(bs
, status
);
560 void RGWDataChangesLog::register_renew(const rgw_bucket_shard
& bs
)
562 std::scoped_lock l
{lock
};
563 cur_cycle
.insert(bs
);
566 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard
& bs
,
567 real_time expiration
)
569 std::unique_lock l
{lock
};
570 ChangeStatusPtr status
;
571 _get_change(bs
, status
);
575 ldout(cct
, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
576 << bs
.bucket
.name
<< " shard_id=" << bs
.shard_id
577 << " expiration=" << expiration
<< dendl
;
579 std::unique_lock
sl(status
->lock
);
580 status
->cur_expiration
= expiration
;
583 int RGWDataChangesLog::get_log_shard_id(rgw_bucket
& bucket
, int shard_id
) {
584 rgw_bucket_shard
bs(bucket
, shard_id
);
585 return choose_oid(bs
);
588 bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider
*dpp
,
589 const rgw_bucket
& bucket
,
590 optional_yield y
) const
592 if (!bucket_filter
) {
596 return bucket_filter(bucket
, y
, dpp
);
599 std::string
RGWDataChangesLog::get_oid(uint64_t gen_id
, int i
) const {
601 fmt::format("{}@G{}.{}", prefix
, gen_id
, i
) :
602 fmt::format("{}.{}", prefix
, i
));
605 int RGWDataChangesLog::add_entry(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, int shard_id
) {
606 auto& bucket
= bucket_info
.bucket
;
608 if (!filter_bucket(dpp
, bucket
, null_yield
)) {
613 observer
->on_bucket_changed(bucket
.get_key());
616 rgw_bucket_shard
bs(bucket
, shard_id
);
618 int index
= choose_oid(bs
);
619 mark_modified(index
, bs
);
621 std::unique_lock
l(lock
);
623 ChangeStatusPtr status
;
624 _get_change(bs
, status
);
627 auto now
= real_clock::now();
629 std::unique_lock
sl(status
->lock
);
631 ldpp_dout(dpp
, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket
.name
632 << " shard_id=" << shard_id
<< " now=" << now
633 << " cur_expiration=" << status
->cur_expiration
<< dendl
;
635 if (now
< status
->cur_expiration
) {
636 /* no need to send, recently completed */
642 RefCountedCond
* cond
;
644 if (status
->pending
) {
652 int ret
= cond
->wait();
660 status
->cond
= new RefCountedCond
;
661 status
->pending
= true;
663 ceph::real_time expiration
;
668 status
->cur_sent
= now
;
671 expiration
+= ceph::make_timespan(cct
->_conf
->rgw_data_log_window
);
675 ceph::buffer::list bl
;
676 rgw_data_change change
;
677 change
.entity_type
= ENTITY_TYPE_BUCKET
;
678 change
.key
= bs
.get_key();
679 change
.timestamp
= now
;
682 ldpp_dout(dpp
, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now
<< " cur_expiration=" << expiration
<< dendl
;
684 auto be
= bes
->head();
685 ret
= be
->push(dpp
, index
, now
, change
.key
, std::move(bl
));
687 now
= real_clock::now();
691 } while (!ret
&& real_clock::now() > expiration
);
695 status
->pending
= false;
696 /* time of when operation started, not completed */
697 status
->cur_expiration
= status
->cur_sent
;
698 status
->cur_expiration
+= make_timespan(cct
->_conf
->rgw_data_log_window
);
699 status
->cond
= nullptr;
708 int DataLogBackends::list(const DoutPrefixProvider
*dpp
, int shard
, int max_entries
,
709 std::vector
<rgw_data_change_log_entry
>& entries
,
710 std::string_view marker
,
711 std::string
* out_marker
,
714 const auto [start_id
, start_cursor
] = cursorgen(marker
);
715 auto gen_id
= start_id
;
716 std::string out_cursor
;
717 while (max_entries
> 0) {
718 std::vector
<rgw_data_change_log_entry
> gentries
;
719 std::unique_lock
l(m
);
720 auto i
= lower_bound(gen_id
);
721 if (i
== end()) return 0;
725 auto r
= be
->list(dpp
, shard
, max_entries
, gentries
,
726 gen_id
== start_id
? start_cursor
: std::string
{},
727 &out_cursor
, truncated
);
731 if (out_marker
&& !out_cursor
.empty()) {
732 *out_marker
= gencursor(gen_id
, out_cursor
);
734 for (auto& g
: gentries
) {
735 g
.log_id
= gencursor(gen_id
, g
.log_id
);
737 if (int s
= gentries
.size(); s
< 0 || s
> max_entries
)
740 max_entries
-= gentries
.size();
742 std::move(gentries
.begin(), gentries
.end(),
743 std::back_inserter(entries
));
749 int RGWDataChangesLog::list_entries(const DoutPrefixProvider
*dpp
, int shard
, int max_entries
,
750 std::vector
<rgw_data_change_log_entry
>& entries
,
751 std::string_view marker
,
752 std::string
* out_marker
, bool* truncated
)
754 assert(shard
< num_shards
);
755 return bes
->list(dpp
, shard
, max_entries
, entries
, marker
, out_marker
, truncated
);
758 int RGWDataChangesLog::list_entries(const DoutPrefixProvider
*dpp
, int max_entries
,
759 std::vector
<rgw_data_change_log_entry
>& entries
,
760 LogMarker
& marker
, bool *ptruncated
)
764 for (; marker
.shard
< num_shards
&& int(entries
.size()) < max_entries
;
765 marker
.shard
++, marker
.marker
.clear()) {
766 int ret
= list_entries(dpp
, marker
.shard
, max_entries
- entries
.size(),
767 entries
, marker
.marker
, NULL
, &truncated
);
768 if (ret
== -ENOENT
) {
779 *ptruncated
= (marker
.shard
< num_shards
);
783 int RGWDataChangesLog::get_info(const DoutPrefixProvider
*dpp
, int shard_id
, RGWDataChangesLogInfo
*info
)
785 assert(shard_id
< num_shards
);
786 auto be
= bes
->head();
787 auto r
= be
->get_info(dpp
, shard_id
, info
);
788 if (!info
->marker
.empty()) {
789 info
->marker
= gencursor(be
->gen_id
, info
->marker
);
794 int DataLogBackends::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
)
796 auto [target_gen
, cursor
] = cursorgen(marker
);
797 std::unique_lock
l(m
);
798 const auto head_gen
= (end() - 1)->second
->gen_id
;
799 const auto tail_gen
= begin()->first
;
800 if (target_gen
< tail_gen
) return 0;
802 for (auto be
= lower_bound(0)->second
;
803 be
->gen_id
<= target_gen
&& be
->gen_id
<= head_gen
&& r
>= 0;
804 be
= upper_bound(be
->gen_id
)->second
) {
806 auto c
= be
->gen_id
== target_gen
? cursor
: be
->max_marker();
807 r
= be
->trim(dpp
, shard_id
, c
);
810 if (r
== -ENODATA
&& be
->gen_id
< target_gen
)
812 if (be
->gen_id
== target_gen
)
819 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
)
821 assert(shard_id
< num_shards
);
822 return bes
->trim_entries(dpp
, shard_id
, marker
);
825 class GenTrim
: public rgw::cls::fifo::Completion
<GenTrim
> {
827 DataLogBackends
* const bes
;
829 const uint64_t target_gen
;
830 const std::string cursor
;
831 const uint64_t head_gen
;
832 const uint64_t tail_gen
;
833 boost::intrusive_ptr
<RGWDataChangesBE
> be
;
835 GenTrim(const DoutPrefixProvider
*dpp
, DataLogBackends
* bes
, int shard_id
, uint64_t target_gen
,
836 std::string cursor
, uint64_t head_gen
, uint64_t tail_gen
,
837 boost::intrusive_ptr
<RGWDataChangesBE
> be
,
838 lr::AioCompletion
* super
)
839 : Completion(dpp
, super
), bes(bes
), shard_id(shard_id
), target_gen(target_gen
),
840 cursor(std::move(cursor
)), head_gen(head_gen
), tail_gen(tail_gen
),
843 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
844 auto gen_id
= be
->gen_id
;
848 if (r
== -ENODATA
&& gen_id
< target_gen
)
851 complete(std::move(p
), r
);
856 std::unique_lock
l(bes
->m
);
857 auto i
= bes
->upper_bound(gen_id
);
858 if (i
== bes
->end() || i
->first
> target_gen
|| i
->first
> head_gen
) {
860 complete(std::move(p
), -ENODATA
);
865 auto c
= be
->gen_id
== target_gen
? cursor
: be
->max_marker();
866 be
->trim(dpp
, shard_id
, c
, call(std::move(p
)));
870 void DataLogBackends::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
,
871 librados::AioCompletion
* c
)
873 auto [target_gen
, cursor
] = cursorgen(marker
);
874 std::unique_lock
l(m
);
875 const auto head_gen
= (end() - 1)->second
->gen_id
;
876 const auto tail_gen
= begin()->first
;
877 if (target_gen
< tail_gen
) {
879 rgw_complete_aio_completion(c
, -ENODATA
);
882 auto be
= begin()->second
;
884 auto gt
= std::make_unique
<GenTrim
>(dpp
, this, shard_id
, target_gen
,
885 std::string(cursor
), head_gen
, tail_gen
,
888 auto cc
= be
->gen_id
== target_gen
? cursor
: be
->max_marker();
889 be
->trim(dpp
, shard_id
, cc
, GenTrim::call(std::move(gt
)));
892 int DataLogBackends::trim_generations(const DoutPrefixProvider
*dpp
, std::optional
<uint64_t>& through
) {
894 std::vector
<mapped_type
> candidates
;
896 std::scoped_lock
l(m
);
898 for (auto i
= cbegin(); i
< e
; ++i
) {
899 candidates
.push_back(i
->second
);
903 std::optional
<uint64_t> highest
;
904 for (auto& be
: candidates
) {
905 auto r
= be
->is_empty(dpp
);
909 highest
= be
->gen_id
;
919 auto ec
= empty_to(dpp
, *highest
, null_yield
);
921 return ceph::from_error_code(ec
);
925 return ceph::from_error_code(remove_empty(dpp
, null_yield
));
929 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider
*dpp
, int shard_id
, std::string_view marker
,
930 librados::AioCompletion
* c
)
932 assert(shard_id
< num_shards
);
933 bes
->trim_entries(dpp
, shard_id
, marker
, c
);
937 bool RGWDataChangesLog::going_down() const
942 RGWDataChangesLog::~RGWDataChangesLog() {
944 if (renew_thread
.joinable()) {
950 void RGWDataChangesLog::renew_run() noexcept
{
951 static constexpr auto runs_per_prune
= 150;
954 const DoutPrefix
dp(cct
, dout_subsys
, "rgw data changes log: ");
955 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl
;
956 int r
= renew_entries(&dp
);
958 ldpp_dout(&dp
, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r
<< dendl
;
964 if (run
== runs_per_prune
) {
965 std::optional
<uint64_t> through
;
966 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl
;
967 trim_generations(&dp
, through
);
969 derr
<< "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
971 } else if (through
) {
972 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations "
973 << "through " << *through
<< "." << dendl
;
975 ldpp_dout(&dp
, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune."
983 int interval
= cct
->_conf
->rgw_data_log_window
* 3 / 4;
984 std::unique_lock locker
{renew_lock
};
985 renew_cond
.wait_for(locker
, std::chrono::seconds(interval
));
989 void RGWDataChangesLog::renew_stop()
991 std::lock_guard l
{renew_lock
};
992 renew_cond
.notify_all();
995 void RGWDataChangesLog::mark_modified(int shard_id
, const rgw_bucket_shard
& bs
)
997 if (!cct
->_conf
->rgw_data_notify_interval_msec
) {
1001 auto key
= bs
.get_key();
1003 std::shared_lock rl
{modified_lock
}; // read lock to check for existence
1004 auto shard
= modified_shards
.find(shard_id
);
1005 if (shard
!= modified_shards
.end() && shard
->second
.count(key
)) {
1010 std::unique_lock wl
{modified_lock
}; // write lock for insertion
1011 modified_shards
[shard_id
].insert(key
);
1014 std::string
RGWDataChangesLog::max_marker() const {
1015 return gencursor(std::numeric_limits
<uint64_t>::max(),
1016 "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
1019 int RGWDataChangesLog::change_format(const DoutPrefixProvider
*dpp
, log_type type
, optional_yield y
) {
1020 return ceph::from_error_code(bes
->new_backing(dpp
, type
, y
));
1023 int RGWDataChangesLog::trim_generations(const DoutPrefixProvider
*dpp
, std::optional
<uint64_t>& through
) {
1024 return bes
->trim_generations(dpp
, through
);
1027 void RGWDataChangesLogInfo::dump(Formatter
*f
) const
1029 encode_json("marker", marker
, f
);
1030 utime_t
ut(last_update
);
1031 encode_json("last_update", ut
, f
);
1034 void RGWDataChangesLogInfo::decode_json(JSONObj
*obj
)
1036 JSONDecoder::decode_json("marker", marker
, obj
);
1038 JSONDecoder::decode_json("last_update", ut
, obj
);
1039 last_update
= ut
.to_real_time();