1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <seastar/core/future.hh>
6 #include "osd/PeeringState.h"
8 #include "messages/MOSDPGCreate2.h"
10 #include "common/Formatter.h"
12 #include "crimson/common/exception.h"
13 #include "crimson/osd/pg.h"
14 #include "crimson/osd/osd.h"
15 #include "crimson/osd/osd_operations/compound_peering_request.h"
18 seastar::logger
& logger() {
19 return crimson::get_logger(ceph_subsys_osd
);
24 using namespace crimson::osd
;
26 struct compound_state
{
27 seastar::promise
<BufferedRecoveryMessages
> promise
;
28 // assuming crimson-osd won't need to be compatible with pre-octopus
30 BufferedRecoveryMessages ctx
;
31 compound_state() = default;
33 promise
.set_value(std::move(ctx
));
36 using compound_state_ref
= seastar::lw_shared_ptr
<compound_state
>;
38 class PeeringSubEvent
: public RemotePeeringEvent
{
39 compound_state_ref state
;
41 template <typename
... Args
>
42 PeeringSubEvent(compound_state_ref state
, Args
&&... args
) :
43 RemotePeeringEvent(std::forward
<Args
>(args
)...), state(state
) {}
45 PeeringEvent::interruptible_future
<>
46 complete_rctx(Ref
<crimson::osd::PG
> pg
) final
{
47 logger().debug("{}: submitting ctx transaction", *this);
48 state
->ctx
.accept_buffered_messages(ctx
);
51 ceph_assert(ctx
.transaction
.empty());
52 return seastar::now();
54 return osd
.get_shard_services().dispatch_context_transaction(
55 pg
->get_collection_ref(), ctx
);
60 std::vector
<crimson::OperationRef
> handle_pg_create(
62 crimson::net::ConnectionRef conn
,
63 compound_state_ref state
,
66 std::vector
<crimson::OperationRef
> ret
;
67 for (auto& [pgid
, when
] : m
->pgs
) {
68 const auto &[created
, created_stamp
] = when
;
69 auto q
= m
->pg_extra
.find(pgid
);
70 ceph_assert(q
!= m
->pg_extra
.end());
71 auto& [history
, pi
] = q
->second
;
75 __func__
, pgid
, created
, created_stamp
,
78 m
->epoch
< pi
.get_bounds().second
) {
80 "got pg_create on {} epoch {} "
81 "unmatched past_intervals {} (history {})",
85 auto op
= osd
.get_shard_services().start_operation
<PeeringSubEvent
>(
89 osd
.get_shard_services(),
96 new PGCreateInfo(pgid
, m
->epoch
, history
, pi
, true)).first
;
103 struct SubOpBlocker
: crimson::BlockerT
<SubOpBlocker
> {
104 static constexpr const char * type_name
= "CompoundOpBlocker";
106 std::vector
<crimson::OperationRef
> subops
;
107 SubOpBlocker(std::vector
<crimson::OperationRef
> &&subops
)
110 virtual void dump_detail(Formatter
*f
) const {
111 f
->open_array_section("dependent_operations");
113 for (auto &i
: subops
) {
123 namespace crimson::osd
{
125 CompoundPeeringRequest::CompoundPeeringRequest(
126 OSD
&osd
, crimson::net::ConnectionRef conn
, Ref
<Message
> m
)
132 void CompoundPeeringRequest::print(std::ostream
&lhs
) const
137 void CompoundPeeringRequest::dump_detail(Formatter
*f
) const
139 f
->dump_stream("message") << *m
;
142 seastar::future
<> CompoundPeeringRequest::start()
144 logger().info("{}: starting", *this);
145 auto state
= seastar::make_lw_shared
<compound_state
>();
146 auto blocker
= std::make_unique
<SubOpBlocker
>(
148 assert((m
->get_type() == MSG_OSD_PG_CREATE2
));
149 return handle_pg_create(
153 boost::static_pointer_cast
<MOSDPGCreate2
>(m
));
157 logger().info("{}: about to fork future", *this);
158 return crimson::common::handle_system_shutdown(
159 [this, ref
, blocker
=std::move(blocker
), state
]() mutable {
160 return with_blocking_future(
161 blocker
->make_blocking_future(state
->promise
.get_future())
162 ).then([this, blocker
=std::move(blocker
)](auto &&ctx
) {
163 logger().info("{}: sub events complete", *this);
164 return osd
.get_shard_services().dispatch_context_messages(std::move(ctx
));
165 }).then([this, ref
=std::move(ref
)] {
166 logger().info("{}: complete", *this);
171 } // namespace crimson::osd