1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/map.hpp>
10 #include <boost/range/adaptor/transformed.hpp>
11 #include <boost/range/algorithm/copy.hpp>
12 #include <boost/range/algorithm/max_element.hpp>
13 #include <boost/range/numeric.hpp>
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
17 #include "messages/MOSDOp.h"
18 #include "messages/MOSDOpReply.h"
19 #include "messages/MOSDRepOp.h"
20 #include "messages/MOSDRepOpReply.h"
22 #include "osd/OSDMap.h"
24 #include "os/Transaction.h"
26 #include "crimson/common/exception.h"
27 #include "crimson/net/Connection.h"
28 #include "crimson/net/Messenger.h"
29 #include "crimson/os/cyanstore/cyan_store.h"
30 #include "crimson/os/futurized_collection.h"
31 #include "crimson/osd/exceptions.h"
32 #include "crimson/osd/pg_meta.h"
33 #include "crimson/osd/pg_backend.h"
34 #include "crimson/osd/ops_executer.h"
35 #include "crimson/osd/osd_operations/osdop_params.h"
36 #include "crimson/osd/osd_operations/peering_event.h"
37 #include "crimson/osd/pg_recovery.h"
38 #include "crimson/osd/replicated_recovery_backend.h"
41 seastar::logger
& logger() {
42 return crimson::get_logger(ceph_subsys_osd
);
46 namespace std::chrono
{
47 std::ostream
& operator<<(std::ostream
& out
, const signedspan
& d
)
49 auto s
= std::chrono::duration_cast
<std::chrono::seconds
>(d
).count();
50 auto ns
= std::abs((d
% 1s
).count());
51 fmt::print(out
, "{}{}s", s
, ns
? fmt::format(".{:0>9}", ns
) : "");
56 namespace crimson::osd
{
58 using crimson::common::local_conf
;
60 class RecoverablePredicate
: public IsPGRecoverablePredicate
{
62 bool operator()(const set
<pg_shard_t
> &have
) const override
{
67 class ReadablePredicate
: public IsPGReadablePredicate
{
70 explicit ReadablePredicate(pg_shard_t whoami
) : whoami(whoami
) {}
71 bool operator()(const set
<pg_shard_t
> &have
) const override
{
72 return have
.count(whoami
);
79 crimson::os::CollectionRef coll_ref
,
83 ShardServices
&shard_services
,
88 pgmeta_oid
{pgid
.make_pgmeta_oid()},
89 osdmap_gate("PG::osdmap_gate", std::nullopt
),
90 shard_services
{shard_services
},
101 std::make_unique
<ReplicatedRecoveryBackend
>(
102 *this, shard_services
, coll_ref
, backend
.get())),
104 std::make_unique
<PGRecovery
>(this)),
106 shard_services
.get_cct(),
117 wait_for_active_blocker(this)
119 peering_state
.set_backend_predicates(
120 new ReadablePredicate(pg_whoami
),
121 new RecoverablePredicate());
122 osdmap_gate
.got_map(osdmap
->get_epoch());
127 bool PG::try_flush_or_schedule_async() {
128 (void)shard_services
.get_store().do_transaction(
130 ObjectStore::Transaction()).then(
131 [this, epoch
=get_osdmap_epoch()]() {
132 return shard_services
.start_operation
<LocalPeeringEvent
>(
139 PeeringState::IntervalFlush());
144 void PG::queue_check_readable(epoch_t last_peering_reset
, ceph::timespan delay
)
146 // handle the peering event in the background
147 check_readable_timer
.cancel();
148 check_readable_timer
.set_callback([last_peering_reset
, this] {
149 (void) shard_services
.start_operation
<LocalPeeringEvent
>(
156 PeeringState::CheckReadable
{});
158 check_readable_timer
.arm(
159 std::chrono::duration_cast
<seastar::lowres_clock::duration
>(delay
));
162 void PG::recheck_readable()
164 bool changed
= false;
165 const auto mnow
= shard_services
.get_mnow();
166 if (peering_state
.state_test(PG_STATE_WAIT
)) {
167 auto prior_readable_until_ub
= peering_state
.get_prior_readable_until_ub();
168 if (mnow
< prior_readable_until_ub
) {
169 logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
170 __func__
, mnow
, prior_readable_until_ub
);
172 logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
173 __func__
, mnow
, prior_readable_until_ub
);
174 peering_state
.state_clear(PG_STATE_WAIT
);
175 peering_state
.clear_prior_readable_until_ub();
179 if (peering_state
.state_test(PG_STATE_LAGGY
)) {
180 auto readable_until
= peering_state
.get_readable_until();
181 if (readable_until
== readable_until
.zero()) {
182 logger().info("{} still laggy (mnow {}, readable_until zero)",
184 } else if (mnow
>= readable_until
) {
185 logger().info("{} still laggy (mnow {} >= readable_until {})",
186 __func__
, mnow
, readable_until
);
188 logger().info("{} no longer laggy (mnow {} < readable_until {})",
189 __func__
, mnow
, readable_until
);
190 peering_state
.state_clear(PG_STATE_LAGGY
);
195 publish_stats_to_osd();
196 if (!peering_state
.state_test(PG_STATE_WAIT
) &&
197 !peering_state
.state_test(PG_STATE_LAGGY
)) {
198 // TODO: requeue ops waiting for readable
203 unsigned PG::get_target_pg_log_entries() const
205 const unsigned num_pgs
= shard_services
.get_pg_num();
206 const unsigned target
=
207 local_conf().get_val
<uint64_t>("osd_target_pg_log_entries_per_osd");
208 const unsigned min_pg_log_entries
=
209 local_conf().get_val
<uint64_t>("osd_min_pg_log_entries");
210 if (num_pgs
> 0 && target
> 0) {
211 // target an even spread of our budgeted log entries across all
212 // PGs. note that while we only get to control the entry count
213 // for primary PGs, we'll normally be responsible for a mix of
214 // primary and replica PGs (for the same pool(s) even), so this
216 const unsigned max_pg_log_entries
=
217 local_conf().get_val
<uint64_t>("osd_max_pg_log_entries");
218 return std::clamp(target
/ num_pgs
,
222 // fall back to a per-pg value.
223 return min_pg_log_entries
;
227 void PG::on_activate(interval_set
<snapid_t
>)
229 projected_last_update
= peering_state
.get_info().last_update
;
232 void PG::on_activate_complete()
234 wait_for_active_blocker
.on_active();
236 if (peering_state
.needs_recovery()) {
237 logger().info("{}: requesting recovery",
239 (void) shard_services
.start_operation
<LocalPeeringEvent
>(
246 PeeringState::DoRecovery
{});
247 } else if (peering_state
.needs_backfill()) {
248 logger().info("{}: requesting backfill",
250 (void) shard_services
.start_operation
<LocalPeeringEvent
>(
257 PeeringState::RequestBackfill
{});
259 logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
260 " for pg: {}", __func__
, pgid
);
261 (void) shard_services
.start_operation
<LocalPeeringEvent
>(
268 PeeringState::AllReplicasRecovered
{});
270 backend
->on_activate_complete();
273 void PG::prepare_write(pg_info_t
&info
,
274 pg_info_t
&last_written_info
,
275 PastIntervals
&past_intervals
,
279 bool need_write_epoch
,
280 ceph::os::Transaction
&t
)
282 std::map
<string
,bufferlist
> km
;
283 std::string key_to_remove
;
284 if (dirty_big_info
|| dirty_info
) {
285 int ret
= prepare_info_keymap(
286 shard_services
.get_cct(),
298 ceph_assert(ret
== 0);
300 pglog
.write_log_and_missing(
301 t
, &km
, coll_ref
->get_cid(), pgmeta_oid
,
302 peering_state
.get_pool().info
.require_rollback());
304 t
.omap_setkeys(coll_ref
->get_cid(), pgmeta_oid
, km
);
306 if (!key_to_remove
.empty()) {
307 t
.omap_rmkey(coll_ref
->get_cid(), pgmeta_oid
, key_to_remove
);
311 std::pair
<ghobject_t
, bool>
312 PG::do_delete_work(ceph::os::Transaction
&t
, ghobject_t _next
)
315 shard_services
.dec_pg_num();
316 return {_next
, false};
319 void PG::scrub_requested(scrub_level_t scrub_level
, scrub_type_t scrub_type
)
321 // TODO: should update the stats upon finishing the scrub
322 peering_state
.update_stats([scrub_level
, this](auto& history
, auto& stats
) {
323 const utime_t now
= ceph_clock_now();
324 history
.last_scrub
= peering_state
.get_info().last_update
;
325 history
.last_scrub_stamp
= now
;
326 history
.last_clean_scrub_stamp
= now
;
327 if (scrub_level
== scrub_level_t::deep
) {
328 history
.last_deep_scrub
= history
.last_scrub
;
329 history
.last_deep_scrub_stamp
= now
;
331 // yes, please publish the stats
336 void PG::log_state_enter(const char *state
) {
337 logger().info("Entering state: {}", state
);
340 void PG::log_state_exit(
341 const char *state_name
, utime_t enter_time
,
342 uint64_t events
, utime_t event_dur
) {
344 "Exiting state: {}, entered at {}, {} spent on {} events",
351 ceph::signedspan
PG::get_mnow()
353 return shard_services
.get_mnow();
356 HeartbeatStampsRef
PG::get_hb_stamps(int peer
)
358 return shard_services
.get_hb_stamps(peer
);
361 void PG::schedule_renew_lease(epoch_t last_peering_reset
, ceph::timespan delay
)
363 // handle the peering event in the background
364 renew_lease_timer
.cancel();
365 renew_lease_timer
.set_callback([last_peering_reset
, this] {
366 (void) shard_services
.start_operation
<LocalPeeringEvent
>(
375 renew_lease_timer
.arm(
376 std::chrono::duration_cast
<seastar::lowres_clock::duration
>(delay
));
382 const vector
<int>& newup
, int new_up_primary
,
383 const vector
<int>& newacting
, int new_acting_primary
,
384 const pg_history_t
& history
,
385 const PastIntervals
& pi
,
387 ObjectStore::Transaction
&t
)
390 role
, newup
, new_up_primary
, newacting
,
391 new_acting_primary
, history
, pi
, backfill
, t
);
394 seastar::future
<> PG::read_state(crimson::os::FuturizedStore
* store
)
396 if (__builtin_expect(stopping
, false)) {
397 return seastar::make_exception_future
<>(
398 crimson::common::system_shutdown_exception());
401 return seastar::do_with(PGMeta(store
, pgid
), [] (auto& pg_meta
) {
402 return pg_meta
.load();
403 }).then([this, store
](auto&& ret
) {
404 auto [pg_info
, past_intervals
] = std::move(ret
);
405 return peering_state
.init_from_disk_state(
407 std::move(past_intervals
),
408 [this, store
] (PGLog
&pglog
) {
409 return pglog
.read_log_and_missing_crimson(
412 peering_state
.get_info(),
416 int primary
, up_primary
;
417 vector
<int> acting
, up
;
418 peering_state
.get_osdmap()->pg_to_up_acting_osds(
419 pgid
.pgid
, &up
, &up_primary
, &acting
, &primary
);
420 peering_state
.init_primary_up_acting(
425 int rr
= OSDMap::calc_pg_role(pg_whoami
, acting
);
426 peering_state
.set_role(rr
);
428 epoch_t epoch
= get_osdmap_epoch();
429 (void) shard_services
.start_operation
<LocalPeeringEvent
>(
436 PeeringState::Initialize());
438 return seastar::now();
442 void PG::do_peering_event(
443 const boost::statechart::event_base
&evt
,
446 peering_state
.handle_event(
449 peering_state
.write_if_dirty(rctx
.transaction
);
452 void PG::do_peering_event(
453 PGPeeringEvent
& evt
, PeeringCtx
&rctx
)
455 if (!peering_state
.pg_has_reset_since(evt
.get_epoch_requested())) {
456 logger().debug("{} handling {} for pg: {}", __func__
, evt
.get_desc(), pgid
);
457 do_peering_event(evt
.get_event(), rctx
);
459 logger().debug("{} ignoring {} -- pg has reset", __func__
, evt
.get_desc());
463 void PG::handle_advance_map(
464 cached_map_t next_map
, PeeringCtx
&rctx
)
466 vector
<int> newup
, newacting
;
467 int up_primary
, acting_primary
;
468 next_map
->pg_to_up_acting_osds(
471 &newacting
, &acting_primary
);
472 peering_state
.advance_map(
474 peering_state
.get_osdmap(),
480 osdmap_gate
.got_map(next_map
->get_epoch());
483 void PG::handle_activate_map(PeeringCtx
&rctx
)
485 peering_state
.activate_map(rctx
);
488 void PG::handle_initialize(PeeringCtx
&rctx
)
490 PeeringState::Initialize evt
;
491 peering_state
.handle_event(evt
, &rctx
);
495 void PG::print(ostream
& out
) const
497 out
<< peering_state
<< " ";
500 void PG::dump_primary(Formatter
* f
)
502 peering_state
.dump_peering_state(f
);
504 f
->open_array_section("recovery_state");
505 PeeringState::QueryState
q(f
);
506 peering_state
.handle_event(q
, 0);
510 // TODO: scrubber state
514 std::ostream
& operator<<(std::ostream
& os
, const PG
& pg
)
516 os
<< " pg_epoch " << pg
.get_osdmap_epoch() << " ";
521 void PG::WaitForActiveBlocker::dump_detail(Formatter
*f
) const
523 f
->dump_stream("pgid") << pg
->pgid
;
526 void PG::WaitForActiveBlocker::on_active()
532 blocking_future
<> PG::WaitForActiveBlocker::wait()
534 if (pg
->peering_state
.is_active()) {
535 return make_blocking_future(seastar::now());
537 return make_blocking_future(p
.get_shared_future());
541 seastar::future
<> PG::WaitForActiveBlocker::stop()
543 p
.set_exception(crimson::common::system_shutdown_exception());
544 return seastar::now();
547 seastar::future
<> PG::submit_transaction(const OpInfo
& op_info
,
548 const std::vector
<OSDOp
>& ops
,
549 ObjectContextRef
&& obc
,
550 ceph::os::Transaction
&& txn
,
551 const osd_op_params_t
& osd_op_p
)
553 if (__builtin_expect(stopping
, false)) {
554 return seastar::make_exception_future
<>(
555 crimson::common::system_shutdown_exception());
558 epoch_t map_epoch
= get_osdmap_epoch();
560 if (__builtin_expect(osd_op_p
.at_version
.epoch
!= map_epoch
, false)) {
561 throw crimson::common::actingset_changed(is_primary());
564 std::vector
<pg_log_entry_t
> log_entries
;
565 log_entries
.emplace_back(obc
->obs
.exists
?
566 pg_log_entry_t::MODIFY
: pg_log_entry_t::DELETE
,
567 obc
->obs
.oi
.soid
, osd_op_p
.at_version
, obc
->obs
.oi
.version
,
568 osd_op_p
.user_modify
? osd_op_p
.at_version
.version
: 0,
569 osd_op_p
.req
->get_reqid(), osd_op_p
.req
->get_mtime(),
570 op_info
.allows_returnvec() && !ops
.empty() ? ops
.back().rval
.code
: 0);
571 // TODO: refactor the submit_transaction
572 if (op_info
.allows_returnvec()) {
573 // also the per-op values are recorded in the pg log
574 log_entries
.back().set_op_returns(ops
);
575 logger().debug("{} op_returns: {}",
576 __func__
, log_entries
.back().op_returns
);
578 log_entries
.back().clean_regions
= std::move(osd_op_p
.clean_regions
);
579 peering_state
.pre_submit_op(obc
->obs
.oi
.soid
, log_entries
, osd_op_p
.at_version
);
580 peering_state
.append_log_with_trim_to_updated(std::move(log_entries
), osd_op_p
.at_version
,
583 return backend
->mutate_object(peering_state
.get_acting_recovery_backfill(),
587 peering_state
.get_last_peering_reset(),
589 std::move(log_entries
)).then(
590 [this, last_complete
=peering_state
.get_info().last_complete
,
591 at_version
=osd_op_p
.at_version
](auto acked
) {
592 for (const auto& peer
: acked
) {
593 peering_state
.update_peer_last_complete_ondisk(
594 peer
.shard
, peer
.last_complete_ondisk
);
596 peering_state
.complete_write(at_version
, last_complete
);
597 return seastar::now();
601 osd_op_params_t
&& PG::fill_op_params_bump_pg_version(
602 osd_op_params_t
&& osd_op_p
,
604 const bool user_modify
)
606 osd_op_p
.req
= std::move(m
);
607 osd_op_p
.at_version
= next_version();
608 osd_op_p
.pg_trim_to
= get_pg_trim_to();
609 osd_op_p
.min_last_complete_ondisk
= get_min_last_complete_ondisk();
610 osd_op_p
.last_complete
= get_info().last_complete
;
612 osd_op_p
.user_at_version
= osd_op_p
.at_version
.version
;
614 return std::move(osd_op_p
);
617 seastar::future
<Ref
<MOSDOpReply
>> PG::handle_failed_op(
618 const std::error_code
& e
,
619 ObjectContextRef obc
,
620 const OpsExecuter
& ox
,
621 const MOSDOp
& m
) const
623 // Oops, an operation had failed. do_osd_ops() altogether with
624 // OpsExecuter already dropped the ObjectStore::Transaction if
625 // there was any. However, this is not enough to completely
626 // rollback as we gave OpsExecuter the very single copy of `obc`
627 // we maintain and we did it for both reading and writing.
628 // Now all modifications must be reverted.
630 // Let's just reload from the store. Evicting from the shared
631 // LRU would be tricky as next MOSDOp (the one at `get_obc`
632 // phase) could actually already finished the lookup. Fortunately,
633 // this is supposed to live on cold paths, so performance is not
634 // a concern -- simplicity wins.
636 // The conditional's purpose is to efficiently handle hot errors
637 // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
638 // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
639 // typically append them before any write. If OpsExecuter hasn't
640 // seen any modifying operation, `obc` is supposed to be kept
642 assert(e
.value() > 0);
643 const bool need_reload_obc
= ox
.has_seen_write();
645 "{}: {} - object {} got error code {}, {}; need_reload_obc {}",
652 return (need_reload_obc
? reload_obc(*obc
)
653 : load_obc_ertr::now()
654 ).safe_then([&e
, &m
, obc
= std::move(obc
), this] {
655 auto reply
= make_message
<MOSDOpReply
>(
656 &m
, -e
.value(), get_osdmap_epoch(), 0, false);
657 reply
->set_enoent_reply_versions(
658 peering_state
.get_info().last_update
,
659 peering_state
.get_info().last_user_version
);
660 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
661 }, load_obc_ertr::assert_all
{ "can't live with object state messed up" });
664 seastar::future
<Ref
<MOSDOpReply
>> PG::do_osd_ops(
666 ObjectContextRef obc
,
667 const OpInfo
&op_info
)
669 if (__builtin_expect(stopping
, false)) {
670 throw crimson::common::system_shutdown_exception();
673 using osd_op_errorator
= OpsExecuter::osd_op_errorator
;
674 const auto oid
= m
->get_snapid() == CEPH_SNAPDIR
? m
->get_hobj().get_head()
676 auto ox
= std::make_unique
<OpsExecuter
>(
677 obc
, op_info
, get_pool().info
, get_backend(), *m
);
678 return crimson::do_for_each(
679 m
->ops
, [obc
, m
, ox
= ox
.get()](OSDOp
& osd_op
) {
681 "do_osd_ops: {} - object {} - handling op {}",
684 ceph_osd_op_name(osd_op
.op
.op
));
685 return ox
->execute_op(osd_op
);
686 }).safe_then([this, obc
, m
, ox
= ox
.get(), &op_info
] {
688 "do_osd_ops: {} - object {} all operations successful",
691 return std::move(*ox
).flush_changes(
692 [m
] (auto&& obc
) -> osd_op_errorator::future
<> {
694 "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
697 return osd_op_errorator::now();
699 [this, m
, &op_info
] (auto&& txn
,
702 bool user_modify
) -> osd_op_errorator::future
<> {
704 "do_osd_ops: {} - object {} submitting txn",
707 auto filled_osd_op_p
= fill_op_params_bump_pg_version(
711 return submit_transaction(
713 filled_osd_op_p
.req
->ops
,
716 std::move(filled_osd_op_p
));
721 rvec
= op_info
.allows_returnvec()] {
722 // TODO: should stop at the first op which returns a negative retval,
723 // cmpext uses it for returning the index of first unmatched byte
724 int result
= m
->ops
.empty() ? 0 : m
->ops
.back().rval
.code
;
725 if (result
> 0 && !rvec
) {
728 auto reply
= make_message
<MOSDOpReply
>(m
.get(),
733 reply
->add_flags(CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
);
735 "do_osd_ops: {} - object {} sending reply",
738 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
739 }, osd_op_errorator::all_same_way([ox
= ox
.get(),
742 this] (const std::error_code
& e
) {
743 return handle_failed_op(e
, std::move(obc
), *ox
, *m
);
744 })).handle_exception_type([ox_deleter
= std::move(ox
),
747 this] (const crimson::osd::error
& e
) {
748 // we need this handler because throwing path which aren't errorated yet.
749 logger().debug("encountered the legacy error handling path!");
750 return handle_failed_op(e
.code(), std::move(obc
), *ox_deleter
, *m
);
754 seastar::future
<Ref
<MOSDOpReply
>> PG::do_pg_ops(Ref
<MOSDOp
> m
)
756 if (__builtin_expect(stopping
, false)) {
757 throw crimson::common::system_shutdown_exception();
760 auto ox
= std::make_unique
<PgOpsExecuter
>(std::as_const(*this),
762 return seastar::do_for_each(m
->ops
, [ox
= ox
.get()](OSDOp
& osd_op
) {
763 logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op
.op
.op
));
764 return ox
->execute_op(osd_op
);
765 }).then([m
, this, ox
= std::move(ox
)] {
766 auto reply
= make_message
<MOSDOpReply
>(m
.get(), 0, get_osdmap_epoch(),
767 CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
,
769 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
770 }).handle_exception_type([=](const crimson::osd::error
& e
) {
771 auto reply
= make_message
<MOSDOpReply
>(
772 m
.get(), -e
.code().value(), get_osdmap_epoch(), 0, false);
773 reply
->set_enoent_reply_versions(peering_state
.get_info().last_update
,
774 peering_state
.get_info().last_user_version
);
775 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
779 hobject_t
PG::get_oid(const MOSDOp
&m
)
781 return (m
.get_snapid() == CEPH_SNAPDIR
?
782 m
.get_hobj().get_head() :
786 RWState::State
PG::get_lock_type(const OpInfo
&op_info
)
789 if (op_info
.rwordered() && op_info
.may_read()) {
790 return RWState::RWEXCL
;
791 } else if (op_info
.rwordered()) {
792 return RWState::RWWRITE
;
794 ceph_assert(op_info
.may_read());
795 return RWState::RWREAD
;
799 std::optional
<hobject_t
> PG::resolve_oid(
801 const hobject_t
&oid
)
803 if (oid
.snap
> ss
.seq
) {
804 return oid
.get_head();
806 // which clone would it be?
807 auto clone
= std::upper_bound(
808 begin(ss
.clones
), end(ss
.clones
),
810 if (clone
== end(ss
.clones
)) {
811 // Doesn't exist, > last clone, < ss.seq
814 auto citer
= ss
.clone_snaps
.find(*clone
);
815 // TODO: how do we want to handle this kind of logic error?
816 ceph_assert(citer
!= ss
.clone_snaps
.end());
819 citer
->second
.begin(),
821 *clone
) == citer
->second
.end()) {
826 return std::optional
<hobject_t
>(soid
);
831 template<RWState::State State
>
832 PG::load_obc_ertr::future
<>
833 PG::with_head_obc(hobject_t oid
, with_obc_func_t
&& func
)
835 assert(oid
.is_head());
836 auto [obc
, existed
] = shard_services
.obc_registry
.get_cached_obc(oid
);
837 return obc
->with_lock
<State
>(
838 [oid
=std::move(oid
), existed
=existed
, obc
=std::move(obc
),
839 func
=std::move(func
), this] {
840 auto loaded
= load_obc_ertr::make_ready_future
<ObjectContextRef
>(obc
);
842 logger().debug("with_head_obc: found {} in cache", oid
);
844 logger().debug("with_head_obc: cache miss on {}", oid
);
845 loaded
= obc
->with_promoted_lock
<State
>([this, obc
] {
846 return load_head_obc(obc
);
849 return loaded
.safe_then([func
=std::move(func
)](auto obc
) {
850 return func(std::move(obc
));
855 template<RWState::State State
>
856 PG::load_obc_ertr::future
<>
857 PG::with_clone_obc(hobject_t oid
, with_obc_func_t
&& func
)
859 assert(!oid
.is_head());
860 return with_head_obc
<RWState::RWREAD
>(oid
.get_head(),
861 [oid
, func
=std::move(func
), this](auto head
) -> load_obc_ertr::future
<> {
862 auto coid
= resolve_oid(head
->get_ro_ss(), oid
);
864 // TODO: return crimson::ct_error::enoent::make();
865 logger().error("with_clone_obc: {} clone not found", coid
);
866 return load_obc_ertr::make_ready_future
<>();
868 auto [clone
, existed
] = shard_services
.obc_registry
.get_cached_obc(*coid
);
869 return clone
->template with_lock
<State
>(
870 [coid
=*coid
, existed
=existed
,
871 head
=std::move(head
), clone
=std::move(clone
),
872 func
=std::move(func
), this]() -> load_obc_ertr::future
<> {
873 auto loaded
= load_obc_ertr::make_ready_future
<ObjectContextRef
>(clone
);
875 logger().debug("with_clone_obc: found {} in cache", coid
);
877 logger().debug("with_clone_obc: cache miss on {}", coid
);
878 loaded
= clone
->template with_promoted_lock
<State
>(
879 [coid
, clone
, head
, this] {
880 return backend
->load_metadata(coid
).safe_then(
881 [coid
, clone
=std::move(clone
), head
=std::move(head
)](auto md
) mutable {
882 clone
->set_clone_state(std::move(md
->os
), std::move(head
));
887 return loaded
.safe_then([func
=std::move(func
)](auto clone
) {
888 return func(std::move(clone
));
894 // explicitly instantiate the used instantiations
895 template PG::load_obc_ertr::future
<>
896 PG::with_head_obc
<RWState::RWNONE
>(hobject_t
, with_obc_func_t
&&);
898 PG::load_obc_ertr::future
<crimson::osd::ObjectContextRef
>
899 PG::load_head_obc(ObjectContextRef obc
)
901 hobject_t oid
= obc
->get_oid();
902 return backend
->load_metadata(oid
).safe_then([obc
=std::move(obc
)](auto md
)
903 -> load_obc_ertr::future
<crimson::osd::ObjectContextRef
> {
904 const hobject_t
& oid
= md
->os
.oi
.soid
;
906 "load_head_obc: loaded obs {} for {}", md
->os
.oi
, oid
);
909 "load_head_obc: oid {} missing snapset", oid
);
910 return crimson::ct_error::object_corrupted::make();
912 obc
->set_head_state(std::move(md
->os
), std::move(*(md
->ss
)));
914 "load_head_obc: returning obc {} for {}",
915 obc
->obs
.oi
, obc
->obs
.oi
.soid
);
916 return load_obc_ertr::make_ready_future
<
917 crimson::osd::ObjectContextRef
>(obc
);
921 PG::load_obc_ertr::future
<>
922 PG::reload_obc(crimson::osd::ObjectContext
& obc
) const
924 assert(obc
.is_head());
925 return backend
->load_metadata(obc
.get_oid()).safe_then([&obc
](auto md
)
926 -> load_obc_ertr::future
<> {
928 "{}: reloaded obs {} for {}",
934 "{}: oid {} missing snapset",
937 return crimson::ct_error::object_corrupted::make();
939 obc
.set_head_state(std::move(md
->os
), std::move(*(md
->ss
)));
940 return load_obc_ertr::now();
944 PG::load_obc_ertr::future
<>
945 PG::with_locked_obc(Ref
<MOSDOp
> &m
, const OpInfo
&op_info
,
946 Operation
*op
, PG::with_obc_func_t
&&f
)
948 if (__builtin_expect(stopping
, false)) {
949 throw crimson::common::system_shutdown_exception();
951 const hobject_t oid
= get_oid(*m
);
952 switch (get_lock_type(op_info
)) {
953 case RWState::RWREAD
:
955 return with_head_obc
<RWState::RWREAD
>(oid
, std::move(f
));
957 return with_clone_obc
<RWState::RWREAD
>(oid
, std::move(f
));
959 case RWState::RWWRITE
:
961 return with_head_obc
<RWState::RWWRITE
>(oid
, std::move(f
));
963 return with_clone_obc
<RWState::RWWRITE
>(oid
, std::move(f
));
965 case RWState::RWEXCL
:
967 return with_head_obc
<RWState::RWWRITE
>(oid
, std::move(f
));
969 return with_clone_obc
<RWState::RWWRITE
>(oid
, std::move(f
));
976 seastar::future
<> PG::handle_rep_op(Ref
<MOSDRepOp
> req
)
978 if (__builtin_expect(stopping
, false)) {
979 return seastar::make_exception_future
<>(
980 crimson::common::system_shutdown_exception());
983 if (can_discard_replica_op(*req
)) {
984 return seastar::now();
987 ceph::os::Transaction txn
;
988 auto encoded_txn
= req
->get_data().cbegin();
989 decode(txn
, encoded_txn
);
990 auto p
= req
->logbl
.cbegin();
991 std::vector
<pg_log_entry_t
> log_entries
;
992 decode(log_entries
, p
);
993 peering_state
.append_log(std::move(log_entries
), req
->pg_trim_to
,
994 req
->version
, req
->min_last_complete_ondisk
, txn
, !txn
.empty(), false);
995 return shard_services
.get_store().do_transaction(coll_ref
, std::move(txn
))
996 .then([req
, lcod
=peering_state
.get_info().last_complete
, this] {
997 peering_state
.update_last_complete_ondisk(lcod
);
998 const auto map_epoch
= get_osdmap_epoch();
999 auto reply
= make_message
<MOSDRepOpReply
>(
1000 req
.get(), pg_whoami
, 0,
1001 map_epoch
, req
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1002 reply
->set_last_complete_ondisk(lcod
);
1003 return shard_services
.send_to_osd(req
->from
.osd
, reply
, map_epoch
);
1007 void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn
,
1008 const MOSDRepOpReply
& m
)
1010 if (!can_discard_replica_op(m
)) {
1011 backend
->got_rep_op_reply(m
);
1015 template <typename MsgType
>
1016 bool PG::can_discard_replica_op(const MsgType
& m
) const
1018 // if a repop is replied after a replica goes down in a new osdmap, and
1019 // before the pg advances to this new osdmap, the repop replies before this
1020 // repop can be discarded by that replica OSD, because the primary resets the
1021 // connection to it when handling the new osdmap marking it down, and also
1022 // resets the messenger sesssion when the replica reconnects. to avoid the
1023 // out-of-order replies, the messages from that replica should be discarded.
1024 const auto osdmap
= peering_state
.get_osdmap();
1025 const int from_osd
= m
.get_source().num();
1026 if (osdmap
->is_down(from_osd
)) {
1029 // Mostly, this overlaps with the old_peering_msg
1030 // condition. An important exception is pushes
1031 // sent by replicas not in the acting set, since
1032 // if such a replica goes down it does not cause
1034 if (osdmap
->get_down_at(from_osd
) >= m
.map_epoch
) {
1038 // if pg changes *at all*, we reset and repeer!
1039 if (epoch_t lpr
= peering_state
.get_last_peering_reset();
1040 lpr
> m
.map_epoch
) {
1041 logger().debug("{}: pg changed {} after {}, dropping",
1042 __func__
, get_info().history
, m
.map_epoch
);
1048 seastar::future
<> PG::stop()
1050 logger().info("PG {} {}", pgid
, __func__
);
1052 return osdmap_gate
.stop().then([this] {
1053 return wait_for_active_blocker
.stop();
1055 return recovery_handler
->stop();
1057 return recovery_backend
->stop();
1059 return backend
->stop();
1063 void PG::on_change(ceph::os::Transaction
&t
) {
1064 recovery_backend
->on_peering_interval_change(t
);
1065 backend
->on_actingset_changed({ is_primary() });
1068 bool PG::can_discard_op(const MOSDOp
& m
) const {
1069 return __builtin_expect(m
.get_map_epoch()
1070 < peering_state
.get_info().history
.same_primary_since
, false);
1073 bool PG::is_degraded_or_backfilling_object(const hobject_t
& soid
) const {
1074 /* The conditions below may clear (on_local_recover, before we queue
1075 * the transaction) before we actually requeue the degraded waiters
1076 * in on_global_recover after the transaction completes.
1078 if (peering_state
.get_pg_log().get_missing().get_items().count(soid
))
1080 ceph_assert(!get_acting_recovery_backfill().empty());
1081 for (auto& peer
: get_acting_recovery_backfill()) {
1082 if (peer
== get_primary()) continue;
1083 auto peer_missing_entry
= peering_state
.get_peer_missing().find(peer
);
1084 // If an object is missing on an async_recovery_target, return false.
1085 // This will not block the op and the object is async recovered later.
1086 if (peer_missing_entry
!= peering_state
.get_peer_missing().end() &&
1087 peer_missing_entry
->second
.get_items().count(soid
)) {
1090 // Object is degraded if after last_backfill AND
1091 // we are backfilling it
1092 if (is_backfill_target(peer
) &&
1093 peering_state
.get_peer_info(peer
).last_backfill
<= soid
&&
1094 recovery_handler
->backfill_state
->get_last_backfill_started() >= soid
&&
1095 recovery_backend
->is_recovering(soid
)) {