1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/map.hpp>
10 #include <boost/range/adaptor/transformed.hpp>
11 #include <boost/range/algorithm/copy.hpp>
12 #include <boost/range/algorithm/max_element.hpp>
13 #include <boost/range/numeric.hpp>
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
17 #include "messages/MOSDOp.h"
18 #include "messages/MOSDOpReply.h"
19 #include "messages/MOSDPGInfo.h"
20 #include "messages/MOSDPGLog.h"
21 #include "messages/MOSDPGNotify.h"
22 #include "messages/MOSDPGQuery.h"
23 #include "messages/MOSDRepOp.h"
24 #include "messages/MOSDRepOpReply.h"
26 #include "osd/OSDMap.h"
28 #include "os/Transaction.h"
30 #include "crimson/net/Connection.h"
31 #include "crimson/net/Messenger.h"
32 #include "crimson/os/cyanstore/cyan_store.h"
33 #include "crimson/os/futurized_collection.h"
34 #include "crimson/osd/exceptions.h"
35 #include "crimson/osd/pg_meta.h"
36 #include "crimson/osd/pg_backend.h"
37 #include "crimson/osd/ops_executer.h"
38 #include "crimson/osd/osd_operations/peering_event.h"
41 seastar::logger
& logger() {
42 return crimson::get_logger(ceph_subsys_osd
);
46 namespace std::chrono
{
47 std::ostream
& operator<<(std::ostream
& out
, const signedspan
& d
)
49 auto s
= std::chrono::duration_cast
<std::chrono::seconds
>(d
).count();
50 auto ns
= std::abs((d
% 1s
).count());
51 fmt::print(out
, "{}{}s", s
, ns
? fmt::format(".{:0>9}", ns
) : "");
56 namespace crimson::osd
{
58 using crimson::common::local_conf
;
60 class RecoverablePredicate
: public IsPGRecoverablePredicate
{
62 bool operator()(const set
<pg_shard_t
> &have
) const override
{
67 class ReadablePredicate
: public IsPGReadablePredicate
{
70 explicit ReadablePredicate(pg_shard_t whoami
) : whoami(whoami
) {}
71 bool operator()(const set
<pg_shard_t
> &have
) const override
{
72 return have
.count(whoami
);
79 crimson::os::CollectionRef coll_ref
,
83 ShardServices
&shard_services
,
88 pgmeta_oid
{pgid
.make_pgmeta_oid()},
89 osdmap_gate("PG::osdmap_gate", std::nullopt
),
90 shard_services
{shard_services
},
101 shard_services
.get_cct(),
105 shard_services
.get_cct(),
113 wait_for_active_blocker(this)
115 peering_state
.set_backend_predicates(
116 new ReadablePredicate(pg_whoami
),
117 new RecoverablePredicate());
118 osdmap_gate
.got_map(osdmap
->get_epoch());
123 bool PG::try_flush_or_schedule_async() {
124 (void)shard_services
.get_store().do_transaction(
126 ObjectStore::Transaction()).then(
127 [this, epoch
=get_osdmap_epoch()]() {
128 return shard_services
.start_operation
<LocalPeeringEvent
>(
135 PeeringState::IntervalFlush());
140 void PG::queue_check_readable(epoch_t last_peering_reset
, ceph::timespan delay
)
142 // handle the peering event in the background
143 check_readable_timer
.cancel();
144 check_readable_timer
.set_callback([last_peering_reset
, this] {
145 shard_services
.start_operation
<LocalPeeringEvent
>(
152 PeeringState::CheckReadable
{});
154 check_readable_timer
.arm(
155 std::chrono::duration_cast
<seastar::lowres_clock::duration
>(delay
));
158 void PG::recheck_readable()
160 bool changed
= false;
161 const auto mnow
= shard_services
.get_mnow();
162 if (peering_state
.state_test(PG_STATE_WAIT
)) {
163 auto prior_readable_until_ub
= peering_state
.get_prior_readable_until_ub();
164 if (mnow
< prior_readable_until_ub
) {
165 logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
166 __func__
, mnow
, prior_readable_until_ub
);
168 logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
169 __func__
, mnow
, prior_readable_until_ub
);
170 peering_state
.state_clear(PG_STATE_WAIT
);
171 peering_state
.clear_prior_readable_until_ub();
175 if (peering_state
.state_test(PG_STATE_LAGGY
)) {
176 auto readable_until
= peering_state
.get_readable_until();
177 if (readable_until
== readable_until
.zero()) {
178 logger().info("{} still laggy (mnow {}, readable_until zero)",
180 } else if (mnow
>= readable_until
) {
181 logger().info("{} still laggy (mnow {} >= readable_until {})",
182 __func__
, mnow
, readable_until
);
184 logger().info("{} no longer laggy (mnow {} < readable_until {})",
185 __func__
, mnow
, readable_until
);
186 peering_state
.state_clear(PG_STATE_LAGGY
);
191 publish_stats_to_osd();
192 if (!peering_state
.state_test(PG_STATE_WAIT
) &&
193 !peering_state
.state_test(PG_STATE_LAGGY
)) {
194 // TODO: requeue ops waiting for readable
199 unsigned PG::get_target_pg_log_entries() const
201 const unsigned num_pgs
= shard_services
.get_pg_num();
202 const unsigned target
=
203 local_conf().get_val
<uint64_t>("osd_target_pg_log_entries_per_osd");
204 const unsigned min_pg_log_entries
=
205 local_conf().get_val
<uint64_t>("osd_min_pg_log_entries");
206 if (num_pgs
> 0 && target
> 0) {
207 // target an even spread of our budgeted log entries across all
208 // PGs. note that while we only get to control the entry count
209 // for primary PGs, we'll normally be responsible for a mix of
210 // primary and replica PGs (for the same pool(s) even), so this
212 const unsigned max_pg_log_entries
=
213 local_conf().get_val
<uint64_t>("osd_max_pg_log_entries");
214 return std::clamp(target
/ num_pgs
,
218 // fall back to a per-pg value.
219 return min_pg_log_entries
;
223 void PG::on_activate(interval_set
<snapid_t
>)
225 projected_last_update
= peering_state
.get_info().last_update
;
228 void PG::on_activate_complete()
230 wait_for_active_blocker
.on_active();
232 if (peering_state
.needs_recovery()) {
233 shard_services
.start_operation
<LocalPeeringEvent
>(
240 PeeringState::DoRecovery
{});
241 } else if (peering_state
.needs_backfill()) {
242 shard_services
.start_operation
<LocalPeeringEvent
>(
249 PeeringState::RequestBackfill
{});
251 shard_services
.start_operation
<LocalPeeringEvent
>(
258 PeeringState::AllReplicasRecovered
{});
262 void PG::prepare_write(pg_info_t
&info
,
263 pg_info_t
&last_written_info
,
264 PastIntervals
&past_intervals
,
268 bool need_write_epoch
,
269 ceph::os::Transaction
&t
)
271 std::map
<string
,bufferlist
> km
;
272 std::string key_to_remove
;
273 if (dirty_big_info
|| dirty_info
) {
274 int ret
= prepare_info_keymap(
275 shard_services
.get_cct(),
287 ceph_assert(ret
== 0);
289 pglog
.write_log_and_missing(
290 t
, &km
, coll_ref
->get_cid(), pgmeta_oid
,
291 peering_state
.get_pool().info
.require_rollback());
293 t
.omap_setkeys(coll_ref
->get_cid(), pgmeta_oid
, km
);
295 if (!key_to_remove
.empty()) {
296 t
.omap_rmkey(coll_ref
->get_cid(), pgmeta_oid
, key_to_remove
);
300 ghobject_t
PG::do_delete_work(ceph::os::Transaction
&t
,
304 shard_services
.dec_pg_num();
307 void PG::log_state_enter(const char *state
) {
308 logger().info("Entering state: {}", state
);
311 void PG::log_state_exit(
312 const char *state_name
, utime_t enter_time
,
313 uint64_t events
, utime_t event_dur
) {
315 "Exiting state: {}, entered at {}, {} spent on {} events",
322 ceph::signedspan
PG::get_mnow()
324 return shard_services
.get_mnow();
327 HeartbeatStampsRef
PG::get_hb_stamps(int peer
)
329 return shard_services
.get_hb_stamps(peer
);
332 void PG::schedule_renew_lease(epoch_t last_peering_reset
, ceph::timespan delay
)
334 // handle the peering event in the background
335 renew_lease_timer
.cancel();
336 renew_lease_timer
.set_callback([last_peering_reset
, this] {
337 shard_services
.start_operation
<LocalPeeringEvent
>(
346 renew_lease_timer
.arm(
347 std::chrono::duration_cast
<seastar::lowres_clock::duration
>(delay
));
353 const vector
<int>& newup
, int new_up_primary
,
354 const vector
<int>& newacting
, int new_acting_primary
,
355 const pg_history_t
& history
,
356 const PastIntervals
& pi
,
358 ObjectStore::Transaction
&t
)
361 role
, newup
, new_up_primary
, newacting
,
362 new_acting_primary
, history
, pi
, backfill
, t
);
365 seastar::future
<> PG::read_state(crimson::os::FuturizedStore
* store
)
367 return PGMeta
{store
, pgid
}.load(
368 ).then([this, store
](pg_info_t pg_info
, PastIntervals past_intervals
) {
369 return peering_state
.init_from_disk_state(
371 std::move(past_intervals
),
372 [this, store
] (PGLog
&pglog
) {
373 return pglog
.read_log_and_missing_crimson(
376 peering_state
.get_info(),
380 int primary
, up_primary
;
381 vector
<int> acting
, up
;
382 peering_state
.get_osdmap()->pg_to_up_acting_osds(
383 pgid
.pgid
, &up
, &up_primary
, &acting
, &primary
);
384 peering_state
.init_primary_up_acting(
389 int rr
= OSDMap::calc_pg_role(pg_whoami
, acting
);
390 peering_state
.set_role(rr
);
392 epoch_t epoch
= get_osdmap_epoch();
393 shard_services
.start_operation
<LocalPeeringEvent
>(
400 PeeringState::Initialize());
402 return seastar::now();
406 void PG::do_peering_event(
407 const boost::statechart::event_base
&evt
,
410 peering_state
.handle_event(
413 peering_state
.write_if_dirty(rctx
.transaction
);
416 void PG::do_peering_event(
417 PGPeeringEvent
& evt
, PeeringCtx
&rctx
)
419 if (!peering_state
.pg_has_reset_since(evt
.get_epoch_requested())) {
420 logger().debug("{} handling {}", __func__
, evt
.get_desc());
421 do_peering_event(evt
.get_event(), rctx
);
423 logger().debug("{} ignoring {} -- pg has reset", __func__
, evt
.get_desc());
427 void PG::handle_advance_map(
428 cached_map_t next_map
, PeeringCtx
&rctx
)
430 vector
<int> newup
, newacting
;
431 int up_primary
, acting_primary
;
432 next_map
->pg_to_up_acting_osds(
435 &newacting
, &acting_primary
);
436 peering_state
.advance_map(
438 peering_state
.get_osdmap(),
444 osdmap_gate
.got_map(next_map
->get_epoch());
447 void PG::handle_activate_map(PeeringCtx
&rctx
)
449 peering_state
.activate_map(rctx
);
452 void PG::handle_initialize(PeeringCtx
&rctx
)
454 PeeringState::Initialize evt
;
455 peering_state
.handle_event(evt
, &rctx
);
459 void PG::print(ostream
& out
) const
461 out
<< peering_state
<< " ";
465 std::ostream
& operator<<(std::ostream
& os
, const PG
& pg
)
467 os
<< " pg_epoch " << pg
.get_osdmap_epoch() << " ";
472 void PG::WaitForActiveBlocker::dump_detail(Formatter
*f
) const
474 f
->dump_stream("pgid") << pg
->pgid
;
477 void PG::WaitForActiveBlocker::on_active()
483 blocking_future
<> PG::WaitForActiveBlocker::wait()
485 if (pg
->peering_state
.is_active()) {
486 return make_blocking_future(seastar::now());
488 return make_blocking_future(p
.get_shared_future());
492 seastar::future
<> PG::submit_transaction(ObjectContextRef
&& obc
,
493 ceph::os::Transaction
&& txn
,
496 epoch_t map_epoch
= get_osdmap_epoch();
497 eversion_t at_version
{map_epoch
, projected_last_update
.version
+ 1};
498 return backend
->mutate_object(peering_state
.get_acting_recovery_backfill(),
502 peering_state
.get_last_peering_reset(),
504 at_version
).then([this](auto acked
) {
505 for (const auto& peer
: acked
) {
506 peering_state
.update_peer_last_complete_ondisk(
507 peer
.shard
, peer
.last_complete_ondisk
);
509 return seastar::now();
513 seastar::future
<Ref
<MOSDOpReply
>> PG::do_osd_ops(
515 ObjectContextRef obc
)
517 using osd_op_errorator
= OpsExecuter::osd_op_errorator
;
518 const auto oid
= m
->get_snapid() == CEPH_SNAPDIR
? m
->get_hobj().get_head()
521 std::make_unique
<OpsExecuter
>(obc
, *this/* as const& */, m
);
522 return crimson::do_for_each(
523 m
->ops
, [obc
, m
, ox
= ox
.get()](OSDOp
& osd_op
) {
525 "do_osd_ops: {} - object {} - handling op {}",
528 ceph_osd_op_name(osd_op
.op
.op
));
529 return ox
->execute_osd_op(osd_op
);
530 }).safe_then([this, obc
, m
, ox
= ox
.get()] {
532 "do_osd_ops: {} - object {} all operations successful",
535 return std::move(*ox
).submit_changes(
536 [this, m
] (auto&& txn
, auto&& obc
) -> osd_op_errorator::future
<> {
537 // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
540 "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
543 return osd_op_errorator::now();
546 "do_osd_ops: {} - object {} submitting txn",
549 return submit_transaction(std::move(obc
), std::move(txn
), *m
);
552 }).safe_then([m
, obc
, this, ox_deleter
= std::move(ox
)] {
553 auto reply
= make_message
<MOSDOpReply
>(m
.get(), 0, get_osdmap_epoch(),
555 reply
->add_flags(CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
);
557 "do_osd_ops: {} - object {} sending reply",
560 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
561 }, OpsExecuter::osd_op_errorator::all_same_way([=] (const std::error_code
& e
) {
562 assert(e
.value() > 0);
564 "do_osd_ops: {} - object {} got error code {}, {}",
569 auto reply
= make_message
<MOSDOpReply
>(
570 m
.get(), -e
.value(), get_osdmap_epoch(), 0, false);
571 reply
->set_enoent_reply_versions(peering_state
.get_info().last_update
,
572 peering_state
.get_info().last_user_version
);
573 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
574 })).handle_exception_type([=,&oid
](const crimson::osd::error
& e
) {
575 // we need this handler because throwing path which aren't errorated yet.
577 "do_osd_ops: {} - object {} got unhandled exception {} ({})",
582 auto reply
= make_message
<MOSDOpReply
>(
583 m
.get(), -e
.code().value(), get_osdmap_epoch(), 0, false);
584 reply
->set_enoent_reply_versions(peering_state
.get_info().last_update
,
585 peering_state
.get_info().last_user_version
);
586 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
590 seastar::future
<Ref
<MOSDOpReply
>> PG::do_pg_ops(Ref
<MOSDOp
> m
)
592 auto ox
= std::make_unique
<OpsExecuter
>(*this/* as const& */, m
);
593 return seastar::do_for_each(m
->ops
, [ox
= ox
.get()](OSDOp
& osd_op
) {
594 logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op
.op
.op
));
595 return ox
->execute_pg_op(osd_op
);
596 }).then([m
, this, ox
= std::move(ox
)] {
597 auto reply
= make_message
<MOSDOpReply
>(m
.get(), 0, get_osdmap_epoch(),
598 CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
,
600 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
601 }).handle_exception_type([=](const crimson::osd::error
& e
) {
602 auto reply
= make_message
<MOSDOpReply
>(
603 m
.get(), -e
.code().value(), get_osdmap_epoch(), 0, false);
604 reply
->set_enoent_reply_versions(peering_state
.get_info().last_update
,
605 peering_state
.get_info().last_user_version
);
606 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
610 std::pair
<hobject_t
, RWState::State
> PG::get_oid_and_lock(
612 const OpInfo
&op_info
)
614 auto oid
= m
.get_snapid() == CEPH_SNAPDIR
?
615 m
.get_hobj().get_head() : m
.get_hobj();
617 RWState::State lock_type
= RWState::RWNONE
;
618 if (op_info
.rwordered() && op_info
.may_read()) {
619 lock_type
= RWState::RWState::RWEXCL
;
620 } else if (op_info
.rwordered()) {
621 lock_type
= RWState::RWState::RWWRITE
;
623 ceph_assert(op_info
.may_read());
624 lock_type
= RWState::RWState::RWREAD
;
626 return std::make_pair(oid
, lock_type
);
629 std::optional
<hobject_t
> PG::resolve_oid(
631 const hobject_t
&oid
)
633 if (oid
.snap
> ss
.seq
) {
634 return oid
.get_head();
636 // which clone would it be?
637 auto clone
= std::upper_bound(
638 begin(ss
.clones
), end(ss
.clones
),
640 if (clone
== end(ss
.clones
)) {
641 // Doesn't exist, > last clone, < ss.seq
644 auto citer
= ss
.clone_snaps
.find(*clone
);
645 // TODO: how do we want to handle this kind of logic error?
646 ceph_assert(citer
!= ss
.clone_snaps
.end());
649 citer
->second
.begin(),
651 *clone
) == citer
->second
.end()) {
656 return std::optional
<hobject_t
>(soid
);
661 PG::load_obc_ertr::future
<
662 std::pair
<crimson::osd::ObjectContextRef
, bool>>
663 PG::get_or_load_clone_obc(hobject_t oid
, ObjectContextRef head
)
665 ceph_assert(!oid
.is_head());
666 using ObjectContextRef
= crimson::osd::ObjectContextRef
;
667 auto coid
= resolve_oid(head
->get_ro_ss(), oid
);
669 return load_obc_ertr::make_ready_future
<
670 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
671 std::make_pair(ObjectContextRef(), true)
674 auto [obc
, existed
] = shard_services
.obc_registry
.get_cached_obc(*coid
);
676 return load_obc_ertr::make_ready_future
<
677 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
678 std::make_pair(obc
, true)
681 bool got
= obc
->maybe_get_excl();
683 return backend
->load_metadata(*coid
).safe_then(
684 [oid
, obc
=std::move(obc
), head
](auto &&md
) mutable {
685 obc
->set_clone_state(std::move(md
->os
), std::move(head
));
686 return load_obc_ertr::make_ready_future
<
687 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
688 std::make_pair(obc
, false)
694 PG::load_obc_ertr::future
<
695 std::pair
<crimson::osd::ObjectContextRef
, bool>>
696 PG::get_or_load_head_obc(hobject_t oid
)
698 ceph_assert(oid
.is_head());
699 auto [obc
, existed
] = shard_services
.obc_registry
.get_cached_obc(oid
);
702 "{}: found {} in cache",
705 return load_obc_ertr::make_ready_future
<
706 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
707 std::make_pair(std::move(obc
), true)
711 "{}: cache miss on {}",
714 bool got
= obc
->maybe_get_excl();
716 return backend
->load_metadata(oid
).safe_then(
717 [oid
, obc
=std::move(obc
)](auto md
) ->
718 load_obc_ertr::future
<
719 std::pair
<crimson::osd::ObjectContextRef
, bool>>
722 "{}: loaded obs {} for {}",
728 "{}: oid {} missing snapset",
731 return crimson::ct_error::object_corrupted::make();
733 obc
->set_head_state(std::move(md
->os
), std::move(*(md
->ss
)));
735 "{}: returning obc {} for {}",
739 return load_obc_ertr::make_ready_future
<
740 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
741 std::make_pair(obc
, false)
747 PG::load_obc_ertr::future
<crimson::osd::ObjectContextRef
>
749 Operation
*op
, const hobject_t
&oid
, RWState::State type
)
751 return get_or_load_head_obc(oid
.get_head()).safe_then(
752 [this, op
, oid
, type
](auto p
) -> load_obc_ertr::future
<ObjectContextRef
>{
753 auto &[head_obc
, head_existed
] = p
;
756 return head_obc
->get_lock_type(op
, type
).then([head_obc
=head_obc
] {
757 ceph_assert(head_obc
->loaded
);
758 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(head_obc
);
761 head_obc
->degrade_excl_to(type
);
762 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(head_obc
);
765 return head_obc
->get_lock_type(op
, RWState::RWREAD
).then(
766 [this, head_obc
=head_obc
, oid
] {
767 ceph_assert(head_obc
->loaded
);
768 return get_or_load_clone_obc(oid
, head_obc
);
769 }).safe_then([head_obc
=head_obc
, op
, oid
, type
](auto p
) {
770 auto &[obc
, existed
] = p
;
772 return load_obc_ertr::future
<>(
773 obc
->get_lock_type(op
, type
)).safe_then([obc
=obc
] {
774 ceph_assert(obc
->loaded
);
775 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(obc
);
778 obc
->degrade_excl_to(type
);
779 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(obc
);
781 }).safe_then([head_obc
=head_obc
](auto obc
) {
782 head_obc
->put_lock_type(RWState::RWREAD
);
783 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(obc
);
789 seastar::future
<> PG::handle_rep_op(Ref
<MOSDRepOp
> req
)
791 ceph::os::Transaction txn
;
792 auto encoded_txn
= req
->get_data().cbegin();
793 decode(txn
, encoded_txn
);
794 return shard_services
.get_store().do_transaction(coll_ref
, std::move(txn
))
795 .then([req
, lcod
=peering_state
.get_info().last_complete
, this] {
796 peering_state
.update_last_complete_ondisk(lcod
);
797 const auto map_epoch
= get_osdmap_epoch();
798 auto reply
= make_message
<MOSDRepOpReply
>(
799 req
.get(), pg_whoami
, 0,
800 map_epoch
, req
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
801 reply
->set_last_complete_ondisk(lcod
);
802 return shard_services
.send_to_osd(req
->from
.osd
, reply
, map_epoch
);
806 void PG::handle_rep_op_reply(crimson::net::Connection
* conn
,
807 const MOSDRepOpReply
& m
)
809 backend
->got_rep_op_reply(m
);