1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/map.hpp>
10 #include <boost/range/adaptor/transformed.hpp>
11 #include <boost/range/algorithm/copy.hpp>
12 #include <boost/range/algorithm/max_element.hpp>
13 #include <boost/range/numeric.hpp>
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
17 #include "messages/MOSDOp.h"
18 #include "messages/MOSDOpReply.h"
19 #include "messages/MOSDPGInfo.h"
20 #include "messages/MOSDPGLog.h"
21 #include "messages/MOSDPGNotify.h"
22 #include "messages/MOSDPGQuery.h"
23 #include "messages/MOSDRepOp.h"
24 #include "messages/MOSDRepOpReply.h"
26 #include "osd/OSDMap.h"
28 #include "os/Transaction.h"
30 #include "crimson/net/Connection.h"
31 #include "crimson/net/Messenger.h"
32 #include "crimson/os/cyanstore/cyan_store.h"
33 #include "crimson/os/futurized_collection.h"
34 #include "crimson/osd/exceptions.h"
35 #include "crimson/osd/pg_meta.h"
36 #include "crimson/osd/pg_backend.h"
37 #include "crimson/osd/ops_executer.h"
38 #include "crimson/osd/osd_operations/peering_event.h"
41 seastar::logger
& logger() {
42 return crimson::get_logger(ceph_subsys_osd
);
46 namespace std::chrono
{
47 std::ostream
& operator<<(std::ostream
& out
, const signedspan
& d
)
49 auto s
= std::chrono::duration_cast
<std::chrono::seconds
>(d
).count();
50 auto ns
= std::abs((d
% 1s
).count());
51 fmt::print(out
, "{}{}s", s
, ns
? fmt::format(".{:0>9}", ns
) : "");
56 namespace crimson::osd
{
58 using crimson::common::local_conf
;
60 class RecoverablePredicate
: public IsPGRecoverablePredicate
{
62 bool operator()(const set
<pg_shard_t
> &have
) const override
{
67 class ReadablePredicate
: public IsPGReadablePredicate
{
70 explicit ReadablePredicate(pg_shard_t whoami
) : whoami(whoami
) {}
71 bool operator()(const set
<pg_shard_t
> &have
) const override
{
72 return have
.count(whoami
);
79 crimson::os::CollectionRef coll_ref
,
83 ShardServices
&shard_services
,
88 pgmeta_oid
{pgid
.make_pgmeta_oid()},
89 osdmap_gate("PG::osdmap_gate", std::nullopt
),
90 shard_services
{shard_services
},
101 shard_services
.get_cct(),
105 shard_services
.get_cct(),
113 wait_for_active_blocker(this)
115 peering_state
.set_backend_predicates(
116 new ReadablePredicate(pg_whoami
),
117 new RecoverablePredicate());
118 osdmap_gate
.got_map(osdmap
->get_epoch());
123 bool PG::try_flush_or_schedule_async() {
124 (void)shard_services
.get_store().do_transaction(
126 ObjectStore::Transaction()).then(
127 [this, epoch
=get_osdmap_epoch()]() {
128 return shard_services
.start_operation
<LocalPeeringEvent
>(
135 PeeringState::IntervalFlush());
140 void PG::queue_check_readable(epoch_t last_peering_reset
, ceph::timespan delay
)
142 // handle the peering event in the background
143 check_readable_timer
.cancel();
144 check_readable_timer
.set_callback([last_peering_reset
, this] {
145 shard_services
.start_operation
<LocalPeeringEvent
>(
152 PeeringState::CheckReadable
{});
154 check_readable_timer
.arm(
155 std::chrono::duration_cast
<seastar::lowres_clock::duration
>(delay
));
158 void PG::recheck_readable()
160 bool changed
= false;
161 const auto mnow
= shard_services
.get_mnow();
162 if (peering_state
.state_test(PG_STATE_WAIT
)) {
163 auto prior_readable_until_ub
= peering_state
.get_prior_readable_until_ub();
164 if (mnow
< prior_readable_until_ub
) {
165 logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
166 __func__
, mnow
, prior_readable_until_ub
);
168 logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
169 __func__
, mnow
, prior_readable_until_ub
);
170 peering_state
.state_clear(PG_STATE_WAIT
);
171 peering_state
.clear_prior_readable_until_ub();
175 if (peering_state
.state_test(PG_STATE_LAGGY
)) {
176 auto readable_until
= peering_state
.get_readable_until();
177 if (readable_until
== readable_until
.zero()) {
178 logger().info("{} still laggy (mnow {}, readable_until zero)",
180 } else if (mnow
>= readable_until
) {
181 logger().info("{} still laggy (mnow {} >= readable_until {})",
182 __func__
, mnow
, readable_until
);
184 logger().info("{} no longer laggy (mnow {} < readable_until {})",
185 __func__
, mnow
, readable_until
);
186 peering_state
.state_clear(PG_STATE_LAGGY
);
191 publish_stats_to_osd();
192 if (!peering_state
.state_test(PG_STATE_WAIT
) &&
193 !peering_state
.state_test(PG_STATE_LAGGY
)) {
194 // TODO: requeue ops waiting for readable
199 unsigned PG::get_target_pg_log_entries() const
201 const unsigned num_pgs
= shard_services
.get_pg_num();
202 const unsigned target
=
203 local_conf().get_val
<uint64_t>("osd_target_pg_log_entries_per_osd");
204 const unsigned min_pg_log_entries
=
205 local_conf().get_val
<uint64_t>("osd_min_pg_log_entries");
206 if (num_pgs
> 0 && target
> 0) {
207 // target an even spread of our budgeted log entries across all
208 // PGs. note that while we only get to control the entry count
209 // for primary PGs, we'll normally be responsible for a mix of
210 // primary and replica PGs (for the same pool(s) even), so this
212 const unsigned max_pg_log_entries
=
213 local_conf().get_val
<uint64_t>("osd_max_pg_log_entries");
214 return std::clamp(target
/ num_pgs
,
218 // fall back to a per-pg value.
219 return min_pg_log_entries
;
223 void PG::on_activate(interval_set
<snapid_t
>)
225 projected_last_update
= peering_state
.get_info().last_update
;
228 void PG::on_activate_complete()
230 wait_for_active_blocker
.on_active();
232 if (peering_state
.needs_recovery()) {
233 shard_services
.start_operation
<LocalPeeringEvent
>(
240 PeeringState::DoRecovery
{});
241 } else if (peering_state
.needs_backfill()) {
242 shard_services
.start_operation
<LocalPeeringEvent
>(
249 PeeringState::RequestBackfill
{});
251 shard_services
.start_operation
<LocalPeeringEvent
>(
258 PeeringState::AllReplicasRecovered
{});
262 void PG::prepare_write(pg_info_t
&info
,
263 pg_info_t
&last_written_info
,
264 PastIntervals
&past_intervals
,
268 bool need_write_epoch
,
269 ceph::os::Transaction
&t
)
271 std::map
<string
,bufferlist
> km
;
272 std::string key_to_remove
;
273 if (dirty_big_info
|| dirty_info
) {
274 int ret
= prepare_info_keymap(
275 shard_services
.get_cct(),
287 ceph_assert(ret
== 0);
289 pglog
.write_log_and_missing(
290 t
, &km
, coll_ref
->get_cid(), pgmeta_oid
,
291 peering_state
.get_pool().info
.require_rollback());
293 t
.omap_setkeys(coll_ref
->get_cid(), pgmeta_oid
, km
);
295 if (!key_to_remove
.empty()) {
296 t
.omap_rmkey(coll_ref
->get_cid(), pgmeta_oid
, key_to_remove
);
300 void PG::do_delete_work(ceph::os::Transaction
&t
)
303 shard_services
.dec_pg_num();
306 void PG::log_state_enter(const char *state
) {
307 logger().info("Entering state: {}", state
);
310 void PG::log_state_exit(
311 const char *state_name
, utime_t enter_time
,
312 uint64_t events
, utime_t event_dur
) {
314 "Exiting state: {}, entered at {}, {} spent on {} events",
321 ceph::signedspan
PG::get_mnow()
323 return shard_services
.get_mnow();
326 HeartbeatStampsRef
PG::get_hb_stamps(int peer
)
328 return shard_services
.get_hb_stamps(peer
);
331 void PG::schedule_renew_lease(epoch_t last_peering_reset
, ceph::timespan delay
)
333 // handle the peering event in the background
334 renew_lease_timer
.cancel();
335 renew_lease_timer
.set_callback([last_peering_reset
, this] {
336 shard_services
.start_operation
<LocalPeeringEvent
>(
345 renew_lease_timer
.arm(
346 std::chrono::duration_cast
<seastar::lowres_clock::duration
>(delay
));
352 const vector
<int>& newup
, int new_up_primary
,
353 const vector
<int>& newacting
, int new_acting_primary
,
354 const pg_history_t
& history
,
355 const PastIntervals
& pi
,
357 ObjectStore::Transaction
&t
)
360 role
, newup
, new_up_primary
, newacting
,
361 new_acting_primary
, history
, pi
, backfill
, t
);
364 seastar::future
<> PG::read_state(crimson::os::FuturizedStore
* store
)
366 return PGMeta
{store
, pgid
}.load(
367 ).then([this, store
](pg_info_t pg_info
, PastIntervals past_intervals
) {
368 return peering_state
.init_from_disk_state(
370 std::move(past_intervals
),
371 [this, store
] (PGLog
&pglog
) {
372 return pglog
.read_log_and_missing_crimson(
375 peering_state
.get_info(),
379 int primary
, up_primary
;
380 vector
<int> acting
, up
;
381 peering_state
.get_osdmap()->pg_to_up_acting_osds(
382 pgid
.pgid
, &up
, &up_primary
, &acting
, &primary
);
383 peering_state
.init_primary_up_acting(
388 int rr
= OSDMap::calc_pg_role(pg_whoami
, acting
);
389 peering_state
.set_role(rr
);
391 epoch_t epoch
= get_osdmap_epoch();
392 shard_services
.start_operation
<LocalPeeringEvent
>(
399 PeeringState::Initialize());
401 return seastar::now();
405 void PG::do_peering_event(
406 const boost::statechart::event_base
&evt
,
409 peering_state
.handle_event(
412 peering_state
.write_if_dirty(rctx
.transaction
);
415 void PG::do_peering_event(
416 PGPeeringEvent
& evt
, PeeringCtx
&rctx
)
418 if (!peering_state
.pg_has_reset_since(evt
.get_epoch_requested())) {
419 logger().debug("{} handling {}", __func__
, evt
.get_desc());
420 do_peering_event(evt
.get_event(), rctx
);
422 logger().debug("{} ignoring {} -- pg has reset", __func__
, evt
.get_desc());
426 void PG::handle_advance_map(
427 cached_map_t next_map
, PeeringCtx
&rctx
)
429 vector
<int> newup
, newacting
;
430 int up_primary
, acting_primary
;
431 next_map
->pg_to_up_acting_osds(
434 &newacting
, &acting_primary
);
435 peering_state
.advance_map(
437 peering_state
.get_osdmap(),
443 osdmap_gate
.got_map(next_map
->get_epoch());
446 void PG::handle_activate_map(PeeringCtx
&rctx
)
448 peering_state
.activate_map(rctx
);
451 void PG::handle_initialize(PeeringCtx
&rctx
)
453 PeeringState::Initialize evt
;
454 peering_state
.handle_event(evt
, &rctx
);
458 void PG::print(ostream
& out
) const
460 out
<< peering_state
<< " ";
464 std::ostream
& operator<<(std::ostream
& os
, const PG
& pg
)
466 os
<< " pg_epoch " << pg
.get_osdmap_epoch() << " ";
471 void PG::WaitForActiveBlocker::dump_detail(Formatter
*f
) const
473 f
->dump_stream("pgid") << pg
->pgid
;
476 void PG::WaitForActiveBlocker::on_active()
482 blocking_future
<> PG::WaitForActiveBlocker::wait()
484 if (pg
->peering_state
.is_active()) {
485 return make_blocking_future(seastar::now());
487 return make_blocking_future(p
.get_shared_future());
491 seastar::future
<> PG::submit_transaction(ObjectContextRef
&& obc
,
492 ceph::os::Transaction
&& txn
,
495 epoch_t map_epoch
= get_osdmap_epoch();
496 eversion_t at_version
{map_epoch
, projected_last_update
.version
+ 1};
497 return backend
->mutate_object(peering_state
.get_acting_recovery_backfill(),
501 peering_state
.get_last_peering_reset(),
503 at_version
).then([this](auto acked
) {
504 for (const auto& peer
: acked
) {
505 peering_state
.update_peer_last_complete_ondisk(
506 peer
.shard
, peer
.last_complete_ondisk
);
508 return seastar::now();
512 seastar::future
<Ref
<MOSDOpReply
>> PG::do_osd_ops(
514 ObjectContextRef obc
)
516 using osd_op_errorator
= OpsExecuter::osd_op_errorator
;
517 const auto oid
= m
->get_snapid() == CEPH_SNAPDIR
? m
->get_hobj().get_head()
520 std::make_unique
<OpsExecuter
>(obc
, *this/* as const& */, m
);
521 return crimson::do_for_each(
522 m
->ops
, [obc
, m
, ox
= ox
.get()](OSDOp
& osd_op
) {
524 "do_osd_ops: {} - object {} - handling op {}",
527 ceph_osd_op_name(osd_op
.op
.op
));
528 return ox
->execute_osd_op(osd_op
);
529 }).safe_then([this, obc
, m
, ox
= ox
.get()] {
531 "do_osd_ops: {} - object {} all operations successful",
534 return std::move(*ox
).submit_changes(
535 [this, m
] (auto&& txn
, auto&& obc
) -> osd_op_errorator::future
<> {
536 // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
539 "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
542 return osd_op_errorator::now();
545 "do_osd_ops: {} - object {} submitting txn",
548 return submit_transaction(std::move(obc
), std::move(txn
), *m
);
551 }).safe_then([m
, obc
, this, ox_deleter
= std::move(ox
)] {
552 auto reply
= make_message
<MOSDOpReply
>(m
.get(), 0, get_osdmap_epoch(),
554 reply
->add_flags(CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
);
556 "do_osd_ops: {} - object {} sending reply",
559 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
560 }, OpsExecuter::osd_op_errorator::all_same_way([=] (const std::error_code
& e
) {
561 assert(e
.value() > 0);
563 "do_osd_ops: {} - object {} got error code {}, {}",
568 auto reply
= make_message
<MOSDOpReply
>(
569 m
.get(), -e
.value(), get_osdmap_epoch(), 0, false);
570 reply
->set_enoent_reply_versions(peering_state
.get_info().last_update
,
571 peering_state
.get_info().last_user_version
);
572 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
573 })).handle_exception_type([=,&oid
](const crimson::osd::error
& e
) {
574 // we need this handler because throwing path which aren't errorated yet.
576 "do_osd_ops: {} - object {} got unhandled exception {} ({})",
581 auto reply
= make_message
<MOSDOpReply
>(
582 m
.get(), -e
.code().value(), get_osdmap_epoch(), 0, false);
583 reply
->set_enoent_reply_versions(peering_state
.get_info().last_update
,
584 peering_state
.get_info().last_user_version
);
585 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
589 seastar::future
<Ref
<MOSDOpReply
>> PG::do_pg_ops(Ref
<MOSDOp
> m
)
591 auto ox
= std::make_unique
<OpsExecuter
>(*this/* as const& */, m
);
592 return seastar::do_for_each(m
->ops
, [ox
= ox
.get()](OSDOp
& osd_op
) {
593 logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op
.op
.op
));
594 return ox
->execute_pg_op(osd_op
);
595 }).then([m
, this, ox
= std::move(ox
)] {
596 auto reply
= make_message
<MOSDOpReply
>(m
.get(), 0, get_osdmap_epoch(),
597 CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
,
599 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
600 }).handle_exception_type([=](const crimson::osd::error
& e
) {
601 auto reply
= make_message
<MOSDOpReply
>(
602 m
.get(), -e
.code().value(), get_osdmap_epoch(), 0, false);
603 reply
->set_enoent_reply_versions(peering_state
.get_info().last_update
,
604 peering_state
.get_info().last_user_version
);
605 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
609 std::pair
<hobject_t
, RWState::State
> PG::get_oid_and_lock(
611 const OpInfo
&op_info
)
613 auto oid
= m
.get_snapid() == CEPH_SNAPDIR
?
614 m
.get_hobj().get_head() : m
.get_hobj();
616 RWState::State lock_type
= RWState::RWNONE
;
617 if (op_info
.rwordered() && op_info
.may_read()) {
618 lock_type
= RWState::RWState::RWEXCL
;
619 } else if (op_info
.rwordered()) {
620 lock_type
= RWState::RWState::RWWRITE
;
622 ceph_assert(op_info
.may_read());
623 lock_type
= RWState::RWState::RWREAD
;
625 return std::make_pair(oid
, lock_type
);
628 std::optional
<hobject_t
> PG::resolve_oid(
630 const hobject_t
&oid
)
632 if (oid
.snap
> ss
.seq
) {
633 return oid
.get_head();
635 // which clone would it be?
636 auto clone
= std::upper_bound(
637 begin(ss
.clones
), end(ss
.clones
),
639 if (clone
== end(ss
.clones
)) {
640 // Doesn't exist, > last clone, < ss.seq
643 auto citer
= ss
.clone_snaps
.find(*clone
);
644 // TODO: how do we want to handle this kind of logic error?
645 ceph_assert(citer
!= ss
.clone_snaps
.end());
648 citer
->second
.begin(),
650 *clone
) == citer
->second
.end()) {
655 return std::optional
<hobject_t
>(soid
);
660 PG::load_obc_ertr::future
<
661 std::pair
<crimson::osd::ObjectContextRef
, bool>>
662 PG::get_or_load_clone_obc(hobject_t oid
, ObjectContextRef head
)
664 ceph_assert(!oid
.is_head());
665 using ObjectContextRef
= crimson::osd::ObjectContextRef
;
666 auto coid
= resolve_oid(head
->get_ro_ss(), oid
);
668 return load_obc_ertr::make_ready_future
<
669 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
670 std::make_pair(ObjectContextRef(), true)
673 auto [obc
, existed
] = shard_services
.obc_registry
.get_cached_obc(*coid
);
675 return load_obc_ertr::make_ready_future
<
676 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
677 std::make_pair(obc
, true)
680 bool got
= obc
->maybe_get_excl();
682 return backend
->load_metadata(*coid
).safe_then(
683 [oid
, obc
=std::move(obc
), head
](auto &&md
) mutable {
684 obc
->set_clone_state(std::move(md
->os
), std::move(head
));
685 return load_obc_ertr::make_ready_future
<
686 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
687 std::make_pair(obc
, false)
693 PG::load_obc_ertr::future
<
694 std::pair
<crimson::osd::ObjectContextRef
, bool>>
695 PG::get_or_load_head_obc(hobject_t oid
)
697 ceph_assert(oid
.is_head());
698 auto [obc
, existed
] = shard_services
.obc_registry
.get_cached_obc(oid
);
701 "{}: found {} in cache",
704 return load_obc_ertr::make_ready_future
<
705 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
706 std::make_pair(std::move(obc
), true)
710 "{}: cache miss on {}",
713 bool got
= obc
->maybe_get_excl();
715 return backend
->load_metadata(oid
).safe_then(
716 [oid
, obc
=std::move(obc
)](auto md
) ->
717 load_obc_ertr::future
<
718 std::pair
<crimson::osd::ObjectContextRef
, bool>>
721 "{}: loaded obs {} for {}",
727 "{}: oid {} missing snapset",
730 return crimson::ct_error::object_corrupted::make();
732 obc
->set_head_state(std::move(md
->os
), std::move(*(md
->ss
)));
734 "{}: returning obc {} for {}",
738 return load_obc_ertr::make_ready_future
<
739 std::pair
<crimson::osd::ObjectContextRef
, bool>>(
740 std::make_pair(obc
, false)
746 PG::load_obc_ertr::future
<crimson::osd::ObjectContextRef
>
748 Operation
*op
, const hobject_t
&oid
, RWState::State type
)
750 return get_or_load_head_obc(oid
.get_head()).safe_then(
751 [this, op
, oid
, type
](auto p
) -> load_obc_ertr::future
<ObjectContextRef
>{
752 auto &[head_obc
, head_existed
] = p
;
755 return head_obc
->get_lock_type(op
, type
).then([head_obc
=head_obc
] {
756 ceph_assert(head_obc
->loaded
);
757 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(head_obc
);
760 head_obc
->degrade_excl_to(type
);
761 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(head_obc
);
764 return head_obc
->get_lock_type(op
, RWState::RWREAD
).then(
765 [this, head_obc
=head_obc
, oid
] {
766 ceph_assert(head_obc
->loaded
);
767 return get_or_load_clone_obc(oid
, head_obc
);
768 }).safe_then([head_obc
=head_obc
, op
, oid
, type
](auto p
) {
769 auto &[obc
, existed
] = p
;
771 return load_obc_ertr::future
<>(
772 obc
->get_lock_type(op
, type
)).safe_then([obc
=obc
] {
773 ceph_assert(obc
->loaded
);
774 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(obc
);
777 obc
->degrade_excl_to(type
);
778 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(obc
);
780 }).safe_then([head_obc
=head_obc
](auto obc
) {
781 head_obc
->put_lock_type(RWState::RWREAD
);
782 return load_obc_ertr::make_ready_future
<ObjectContextRef
>(obc
);
788 seastar::future
<> PG::handle_rep_op(Ref
<MOSDRepOp
> req
)
790 ceph::os::Transaction txn
;
791 auto encoded_txn
= req
->get_data().cbegin();
792 decode(txn
, encoded_txn
);
793 return shard_services
.get_store().do_transaction(coll_ref
, std::move(txn
))
794 .then([req
, lcod
=peering_state
.get_info().last_complete
, this] {
795 peering_state
.update_last_complete_ondisk(lcod
);
796 const auto map_epoch
= get_osdmap_epoch();
797 auto reply
= make_message
<MOSDRepOpReply
>(
798 req
.get(), pg_whoami
, 0,
799 map_epoch
, req
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
800 reply
->set_last_complete_ondisk(lcod
);
801 return shard_services
.send_to_osd(req
->from
.osd
, reply
, map_epoch
);
805 void PG::handle_rep_op_reply(crimson::net::Connection
* conn
,
806 const MOSDRepOpReply
& m
)
808 backend
->got_rep_op_reply(m
);