1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "cls/log/cls_log_client.h"
5 #include "cls/version/cls_version_client.h"
7 #include "rgw_log_backing.h"
9 #include "cls_fifo_legacy.h"
11 namespace cb
= ceph::buffer
;
13 static constexpr auto dout_subsys
= ceph_subsys_rgw
;
15 enum class shard_check
{ dne
, omap
, fifo
, corrupt
};
16 inline std::ostream
& operator <<(std::ostream
& m
, const shard_check
& t
) {
18 case shard_check::dne
:
19 return m
<< "shard_check::dne";
20 case shard_check::omap
:
21 return m
<< "shard_check::omap";
22 case shard_check::fifo
:
23 return m
<< "shard_check::fifo";
24 case shard_check::corrupt
:
25 return m
<< "shard_check::corrupt";
28 return m
<< "shard_check::UNKNOWN=" << static_cast<uint32_t>(t
);
32 /// Return the shard type, and a bool to see whether it has entries.
34 probe_shard(const DoutPrefixProvider
*dpp
, librados::IoCtx
& ioctx
, const std::string
& oid
,
35 bool& fifo_unsupported
, optional_yield y
)
37 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
38 << " probing oid=" << oid
40 if (!fifo_unsupported
) {
41 std::unique_ptr
<rgw::cls::fifo::FIFO
> fifo
;
42 auto r
= rgw::cls::fifo::FIFO::open(dpp
, ioctx
, oid
,
47 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
48 << ": oid=" << oid
<< " is FIFO"
50 return shard_check::fifo
;
53 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
54 << ": oid=" << oid
<< " is empty and therefore OMAP"
56 return shard_check::omap
;
59 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
60 << ": oid=" << oid
<< " does not exist"
62 return shard_check::dne
;
65 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
66 << ": FIFO is unsupported, marking."
68 fifo_unsupported
= true;
69 return shard_check::omap
;
72 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
73 << ": error probing: r=" << r
74 << ", oid=" << oid
<< dendl
;
75 return shard_check::corrupt
;
78 // Since FIFO is unsupported, OMAP is the only alternative
79 return shard_check::omap
;
83 tl::expected
<log_type
, bs::error_code
>
84 handle_dne(const DoutPrefixProvider
*dpp
, librados::IoCtx
& ioctx
,
87 bool fifo_unsupported
,
90 if (def
== log_type::fifo
) {
91 if (fifo_unsupported
) {
92 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
93 << " WARNING: FIFO set as default but not supported by OSD. "
94 << "Falling back to OMAP." << dendl
;
95 return log_type::omap
;
97 std::unique_ptr
<rgw::cls::fifo::FIFO
> fifo
;
98 auto r
= rgw::cls::fifo::FIFO::create(dpp
, ioctx
, oid
,
102 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
103 << " error creating FIFO: r=" << r
104 << ", oid=" << oid
<< dendl
;
105 return tl::unexpected(bs::error_code(-r
, bs::system_category()));
112 tl::expected
<log_type
, bs::error_code
>
113 log_backing_type(const DoutPrefixProvider
*dpp
,
114 librados::IoCtx
& ioctx
,
117 const fu2::unique_function
<std::string(int) const>& get_oid
,
120 auto check
= shard_check::dne
;
121 bool fifo_unsupported
= false;
122 for (int i
= 0; i
< shards
; ++i
) {
123 auto c
= probe_shard(dpp
, ioctx
, get_oid(i
), fifo_unsupported
, y
);
124 if (c
== shard_check::corrupt
)
125 return tl::unexpected(bs::error_code(EIO
, bs::system_category()));
126 if (c
== shard_check::dne
) continue;
127 if (check
== shard_check::dne
) {
133 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
134 << " clashing types: check=" << check
135 << ", c=" << c
<< dendl
;
136 return tl::unexpected(bs::error_code(EIO
, bs::system_category()));
139 if (check
== shard_check::corrupt
) {
140 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
141 << " should be unreachable!" << dendl
;
142 return tl::unexpected(bs::error_code(EIO
, bs::system_category()));
145 if (check
== shard_check::dne
)
146 return handle_dne(dpp
, ioctx
,
152 return (check
== shard_check::fifo
? log_type::fifo
: log_type::omap
);
155 bs::error_code
log_remove(const DoutPrefixProvider
*dpp
,
156 librados::IoCtx
& ioctx
,
158 const fu2::unique_function
<std::string(int) const>& get_oid
,
163 for (int i
= 0; i
< shards
; ++i
) {
164 auto oid
= get_oid(i
);
165 rados::cls::fifo::info info
;
166 uint32_t part_header_size
= 0, part_entry_overhead
= 0;
168 auto r
= rgw::cls::fifo::get_meta(dpp
, ioctx
, oid
, nullopt
, &info
,
169 &part_header_size
, &part_entry_overhead
,
171 if (r
== -ENOENT
) continue;
172 if (r
== 0 && info
.head_part_num
> -1) {
173 for (auto j
= info
.tail_part_num
; j
<= info
.head_part_num
; ++j
) {
174 librados::ObjectWriteOperation op
;
176 auto part_oid
= info
.part_oid(j
);
177 auto subr
= rgw_rados_operate(dpp
, ioctx
, part_oid
, &op
, null_yield
);
178 if (subr
< 0 && subr
!= -ENOENT
) {
180 ec
= bs::error_code(-subr
, bs::system_category());
181 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
182 << ": failed removing FIFO part: part_oid=" << part_oid
183 << ", subr=" << subr
<< dendl
;
187 if (r
< 0 && r
!= -ENODATA
) {
189 ec
= bs::error_code(-r
, bs::system_category());
190 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
191 << ": failed checking FIFO part: oid=" << oid
192 << ", r=" << r
<< dendl
;
194 librados::ObjectWriteOperation op
;
195 if (i
== 0 && leave_zero
) {
196 // Leave shard 0 in existence, but remove contents and
197 // omap. cls_lock stores things in the xattrs. And sync needs to
198 // rendezvous with locks on generation 0 shard 0.
199 op
.omap_set_header({});
205 r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, null_yield
);
206 if (r
< 0 && r
!= -ENOENT
) {
208 ec
= bs::error_code(-r
, bs::system_category());
209 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
210 << ": failed removing shard: oid=" << oid
211 << ", r=" << r
<< dendl
;
217 logback_generations::~logback_generations() {
218 if (watchcookie
> 0) {
219 auto cct
= static_cast<CephContext
*>(ioctx
.cct());
220 auto r
= ioctx
.unwatch2(watchcookie
);
222 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
223 << ": failed unwatching oid=" << oid
224 << ", r=" << r
<< dendl
;
229 bs::error_code
logback_generations::setup(const DoutPrefixProvider
*dpp
,
231 optional_yield y
) noexcept
235 auto cct
= static_cast<CephContext
*>(ioctx
.cct());
236 auto res
= read(dpp
, y
);
237 if (!res
&& res
.error() != bs::errc::no_such_file_or_directory
) {
241 std::unique_lock
lock(m
);
242 std::tie(entries_
, version
) = std::move(*res
);
244 // Are we the first? Then create generation 0 and the generations
246 librados::ObjectWriteOperation op
;
247 auto type
= log_backing_type(dpp
, ioctx
, def
, shards
,
249 return this->get_oid(0, shard
);
254 logback_generation l
;
257 std::unique_lock
lock(m
);
259 static constexpr auto TAG_LEN
= 24;
261 append_rand_alpha(cct
, version
.tag
, version
.tag
, TAG_LEN
);
263 cls_version_set(op
, version
);
265 entries_
.emplace(0, std::move(l
));
266 encode(entries_
, bl
);
270 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
271 if (r
< 0 && r
!= -EEXIST
) {
272 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
273 << ": failed writing oid=" << oid
274 << ", r=" << r
<< dendl
;
275 bs::system_error(-r
, bs::system_category());
277 // Did someone race us? Then re-read.
282 if (res
->first
.empty())
283 return bs::error_code(EIO
, bs::system_category());
284 auto l
= res
->first
.begin()->second
;
285 // In the unlikely event that someone raced us, created
286 // generation zero, incremented, then erased generation zero,
287 // don't leave generation zero lying around.
289 auto ec
= log_remove(dpp
, ioctx
, shards
,
291 return this->get_oid(0, shard
);
295 std::unique_lock
lock(m
);
296 std::tie(entries_
, version
) = std::move(*res
);
299 // Pass all non-empty generations to the handler
300 std::unique_lock
lock(m
);
301 auto i
= lowest_nomempty(entries_
);
303 std::copy(i
, entries_
.cend(),
304 std::inserter(e
, e
.end()));
308 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
309 << ": failed to re-establish watch, unsafe to continue: oid="
310 << oid
<< ", ec=" << ec
.message() << dendl
;
312 return handle_init(std::move(e
));
313 } catch (const std::bad_alloc
&) {
314 return bs::error_code(ENOMEM
, bs::system_category());
318 bs::error_code
logback_generations::update(const DoutPrefixProvider
*dpp
, optional_yield y
) noexcept
321 auto res
= read(dpp
, y
);
326 std::unique_lock
l(m
);
327 auto& [es
, v
] = *res
;
333 // Check consistency and prepare update
335 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
336 << ": INCONSISTENCY! Read empty update." << dendl
;
337 return bs::error_code(EFAULT
, bs::system_category());
339 auto cur_lowest
= lowest_nomempty(entries_
);
340 // Straight up can't happen
341 assert(cur_lowest
!= entries_
.cend());
342 auto new_lowest
= lowest_nomempty(es
);
343 if (new_lowest
== es
.cend()) {
344 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
345 << ": INCONSISTENCY! Read update with no active head." << dendl
;
346 return bs::error_code(EFAULT
, bs::system_category());
348 if (new_lowest
->first
< cur_lowest
->first
) {
349 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
350 << ": INCONSISTENCY! Tail moved wrong way." << dendl
;
351 return bs::error_code(EFAULT
, bs::system_category());
354 std::optional
<uint64_t> highest_empty
;
355 if (new_lowest
->first
> cur_lowest
->first
&& new_lowest
!= es
.begin()) {
357 highest_empty
= new_lowest
->first
;
360 entries_t new_entries
;
362 if ((es
.end() - 1)->first
< (entries_
.end() - 1)->first
) {
363 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
364 << ": INCONSISTENCY! Head moved wrong way." << dendl
;
365 return bs::error_code(EFAULT
, bs::system_category());
368 if ((es
.end() - 1)->first
> (entries_
.end() - 1)->first
) {
369 auto ei
= es
.lower_bound((entries_
.end() - 1)->first
+ 1);
370 std::copy(ei
, es
.end(), std::inserter(new_entries
, new_entries
.end()));
373 // Everything checks out!
380 auto ec
= handle_empty_to(*highest_empty
);
384 if (!new_entries
.empty()) {
385 auto ec
= handle_new_gens(std::move(new_entries
));
388 } catch (const std::bad_alloc
&) {
389 return bs::error_code(ENOMEM
, bs::system_category());
394 auto logback_generations::read(const DoutPrefixProvider
*dpp
, optional_yield y
) noexcept
->
395 tl::expected
<std::pair
<entries_t
, obj_version
>, bs::error_code
>
398 librados::ObjectReadOperation op
;
399 std::unique_lock
l(m
);
400 cls_version_check(op
, version
, VER_COND_GE
);
403 cls_version_read(op
, &v2
);
405 op
.read(0, 0, &bl
, nullptr);
406 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, nullptr, y
);
409 ldpp_dout(dpp
, 5) << __PRETTY_FUNCTION__
<< ":" << __LINE__
411 << " not found" << dendl
;
413 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
414 << ": failed reading oid=" << oid
415 << ", r=" << r
<< dendl
;
417 return tl::unexpected(bs::error_code(-r
, bs::system_category()));
419 auto bi
= bl
.cbegin();
423 } catch (const cb::error
& err
) {
424 return tl::unexpected(err
.code());
426 return std::pair
{ std::move(e
), std::move(v2
) };
427 } catch (const std::bad_alloc
&) {
428 return tl::unexpected(bs::error_code(ENOMEM
, bs::system_category()));
432 bs::error_code
logback_generations::write(const DoutPrefixProvider
*dpp
, entries_t
&& e
,
433 std::unique_lock
<std::mutex
>&& l_
,
434 optional_yield y
) noexcept
436 auto l
= std::move(l_
);
437 ceph_assert(l
.mutex() == &m
&&
440 librados::ObjectWriteOperation op
;
441 cls_version_check(op
, version
, VER_COND_GE
);
446 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
448 entries_
= std::move(e
);
453 if (r
< 0 && r
!= -ECANCELED
) {
454 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
455 << ": failed reading oid=" << oid
456 << ", r=" << r
<< dendl
;
457 return { -r
, bs::system_category() };
459 if (r
== -ECANCELED
) {
460 auto ec
= update(dpp
, y
);
464 return { ECANCELED
, bs::system_category() };
467 } catch (const std::bad_alloc
&) {
468 return { ENOMEM
, bs::system_category() };
474 bs::error_code
logback_generations::watch() noexcept
{
476 auto cct
= static_cast<CephContext
*>(ioctx
.cct());
477 auto r
= ioctx
.watch2(oid
, &watchcookie
, this);
479 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
480 << ": failed to set watch oid=" << oid
481 << ", r=" << r
<< dendl
;
482 return { -r
, bs::system_category() };
484 } catch (const std::bad_alloc
&) {
485 return bs::error_code(ENOMEM
, bs::system_category());
490 bs::error_code
logback_generations::new_backing(const DoutPrefixProvider
*dpp
,
492 optional_yield y
) noexcept
{
493 static constexpr auto max_tries
= 10;
495 auto ec
= update(dpp
, y
);
498 entries_t new_entries
;
500 std::unique_lock
l(m
);
501 auto last
= entries_
.end() - 1;
502 if (last
->second
.type
== type
) {
503 // Nothing to be done
506 auto newgenid
= last
->first
+ 1;
507 logback_generation newgen
;
508 newgen
.gen_id
= newgenid
;
510 new_entries
.emplace(newgenid
, newgen
);
512 es
.emplace(newgenid
, std::move(newgen
));
513 ec
= write(dpp
, std::move(es
), std::move(l
), y
);
515 } while (ec
== bs::errc::operation_canceled
&&
517 if (tries
>= max_tries
) {
518 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
519 << ": exhausted retry attempts." << dendl
;
524 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
525 << ": write failed with ec=" << ec
.message() << dendl
;
531 auto r
= rgw_rados_notify(dpp
, ioctx
, oid
, bl
, 10'000, &rbl
, y
);
533 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
534 << ": notify failed with r=" << r
<< dendl
;
535 return { -r
, bs::system_category() };
537 ec
= handle_new_gens(new_entries
);
538 } catch (const std::bad_alloc
&) {
539 return bs::error_code(ENOMEM
, bs::system_category());
544 bs::error_code
logback_generations::empty_to(const DoutPrefixProvider
*dpp
,
546 optional_yield y
) noexcept
{
547 static constexpr auto max_tries
= 10;
549 auto ec
= update(dpp
, y
);
552 uint64_t newtail
= 0;
554 std::unique_lock
l(m
);
556 auto last
= entries_
.end() - 1;
557 if (gen_id
>= last
->first
) {
558 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
559 << ": Attempt to trim beyond the possible." << dendl
;
560 return bs::error_code(EINVAL
, bs::system_category());
564 auto ei
= es
.upper_bound(gen_id
);
565 if (ei
== es
.begin()) {
566 // Nothing to be done.
569 for (auto i
= es
.begin(); i
< ei
; ++i
) {
571 i
->second
.pruned
= ceph::real_clock::now();
573 ec
= write(dpp
, std::move(es
), std::move(l
), y
);
575 } while (ec
== bs::errc::operation_canceled
&&
577 if (tries
>= max_tries
) {
578 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
579 << ": exhausted retry attempts." << dendl
;
584 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
585 << ": write failed with ec=" << ec
.message() << dendl
;
591 auto r
= rgw_rados_notify(dpp
, ioctx
, oid
, bl
, 10'000, &rbl
, y
);
593 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
594 << ": notify failed with r=" << r
<< dendl
;
595 return { -r
, bs::system_category() };
597 ec
= handle_empty_to(newtail
);
598 } catch (const std::bad_alloc
&) {
599 return bs::error_code(ENOMEM
, bs::system_category());
604 bs::error_code
logback_generations::remove_empty(const DoutPrefixProvider
*dpp
, optional_yield y
) noexcept
{
605 static constexpr auto max_tries
= 10;
607 auto ec
= update(dpp
, y
);
610 entries_t new_entries
;
611 std::unique_lock
l(m
);
612 ceph_assert(!entries_
.empty());
614 auto i
= lowest_nomempty(entries_
);
615 if (i
== entries_
.begin()) {
620 auto now
= ceph::real_clock::now();
623 std::copy_if(entries_
.cbegin(), entries_
.cend(),
624 std::inserter(es
, es
.end()),
625 [now
](const auto& e
) {
626 if (!e
.second
.pruned
)
629 auto pruned
= *e
.second
.pruned
;
630 return (now
- pruned
) >= 1h
;
633 for (const auto& [gen_id
, e
] : es
) {
634 ceph_assert(e
.pruned
);
635 auto ec
= log_remove(dpp
, ioctx
, shards
,
636 [this, gen_id
= gen_id
](int shard
) {
637 return this->get_oid(gen_id
, shard
);
638 }, (gen_id
== 0), y
);
640 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
641 << ": Error pruning: gen_id=" << gen_id
642 << " ec=" << ec
.message() << dendl
;
644 if (auto i
= es2
.find(gen_id
); i
!= es2
.end()) {
650 ec
= write(dpp
, std::move(es2
), std::move(l
), y
);
652 } while (ec
== bs::errc::operation_canceled
&&
654 if (tries
>= max_tries
) {
655 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
656 << ": exhausted retry attempts." << dendl
;
661 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
662 << ": write failed with ec=" << ec
.message() << dendl
;
665 } catch (const std::bad_alloc
&) {
666 return bs::error_code(ENOMEM
, bs::system_category());
671 void logback_generations::handle_notify(uint64_t notify_id
,
673 uint64_t notifier_id
,
676 auto cct
= static_cast<CephContext
*>(ioctx
.cct());
677 const DoutPrefix
dp(cct
, dout_subsys
, "logback generations handle_notify: ");
678 if (notifier_id
!= my_id
) {
679 auto ec
= update(&dp
, null_yield
);
682 << __PRETTY_FUNCTION__
<< ":" << __LINE__
683 << ": update failed, no one to report to and no safe way to continue."
689 ioctx
.notify_ack(oid
, notify_id
, watchcookie
, rbl
);
692 void logback_generations::handle_error(uint64_t cookie
, int err
) {
693 auto cct
= static_cast<CephContext
*>(ioctx
.cct());
694 auto r
= ioctx
.unwatch2(watchcookie
);
696 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
697 << ": failed to set unwatch oid=" << oid
698 << ", r=" << r
<< dendl
;
703 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
704 << ": failed to re-establish watch, unsafe to continue: oid="
705 << oid
<< ", ec=" << ec
.message() << dendl
;