1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <seastar/core/future.hh>
5 #include <seastar/core/sleep.hh>
7 #include "messages/MOSDPGLog.h"
9 #include "common/Formatter.h"
10 #include "crimson/osd/pg.h"
11 #include "crimson/osd/osd.h"
12 #include "crimson/osd/osd_operations/peering_event.h"
13 #include "crimson/osd/osd_connection_priv.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_osd
);
21 namespace crimson::osd
{
23 void PeeringEvent::print(std::ostream
&lhs
) const
25 lhs
<< "PeeringEvent("
28 << " sent=" << evt
.get_epoch_sent()
29 << " requested=" << evt
.get_epoch_requested()
30 << " evt=" << evt
.get_desc()
34 void PeeringEvent::dump_detail(Formatter
*f
) const
36 f
->open_object_section("PeeringEvent");
37 f
->dump_stream("from") << from
;
38 f
->dump_stream("pgid") << pgid
;
39 f
->dump_int("sent", evt
.get_epoch_sent());
40 f
->dump_int("requested", evt
.get_epoch_requested());
41 f
->dump_string("evt", evt
.get_desc());
46 PeeringEvent::PGPipeline
&PeeringEvent::pp(PG
&pg
)
48 return pg
.peering_request_pg_pipeline
;
51 seastar::future
<> PeeringEvent::start()
54 logger().debug("{}: start", *this);
57 auto maybe_delay
= seastar::now();
59 maybe_delay
= seastar::sleep(
60 std::chrono::milliseconds(std::lround(delay
* 1000)));
62 return maybe_delay
.then([this] {
64 }).then([this](Ref
<PG
> pg
) {
66 logger().warn("{}: pg absent, did not create", *this);
69 return complete_rctx_no_pg();
71 return interruptor::with_interruption([this, pg
] {
72 logger().debug("{}: pg present", *this);
73 return with_blocking_future_interruptible
<interruptor::condition
>(
74 handle
.enter(pp(*pg
).await_map
)
75 ).then_interruptible([this, pg
] {
76 return with_blocking_future_interruptible
<interruptor::condition
>(
77 pg
->osdmap_gate
.wait_for_map(evt
.get_epoch_sent()));
78 }).then_interruptible([this, pg
](auto) {
79 return with_blocking_future_interruptible
<interruptor::condition
>(
80 handle
.enter(pp(*pg
).process
));
81 }).then_interruptible([this, pg
] {
82 // TODO: likely we should synchronize also with the pg log-based
84 return with_blocking_future_interruptible
<interruptor::condition
>(
85 handle
.enter(BackfillRecovery::bp(*pg
).process
));
86 }).then_interruptible([this, pg
] {
87 pg
->do_peering_event(evt
, ctx
);
89 return complete_rctx(pg
);
90 }).then_interruptible([this, pg
] () -> PeeringEvent::interruptible_future
<> {
91 if (!pg
->get_need_up_thru()) {
92 return seastar::now();
94 return shard_services
.send_alive(pg
->get_same_interval_since());
95 }).then_interruptible([this] {
96 return shard_services
.send_pg_temp();
99 [this](std::exception_ptr ep
) {
100 logger().debug("{}: interrupted with {}", *this, ep
);
101 return seastar::now();
104 }).finally([ref
=std::move(ref
)] {
105 logger().debug("{}: complete", *ref
);
109 void PeeringEvent::on_pg_absent()
111 logger().debug("{}: pg absent, dropping", *this);
114 PeeringEvent::interruptible_future
<> PeeringEvent::complete_rctx(Ref
<PG
> pg
)
116 logger().debug("{}: submitting ctx", *this);
117 return shard_services
.dispatch_context(
118 pg
->get_collection_ref(),
122 RemotePeeringEvent::ConnectionPipeline
&RemotePeeringEvent::cp()
124 return get_osd_priv(conn
.get()).peering_request_conn_pipeline
;
127 RemotePeeringEvent::OSDPipeline
&RemotePeeringEvent::op()
129 return osd
.peering_request_osd_pipeline
;
132 void RemotePeeringEvent::on_pg_absent()
134 if (auto& e
= get_event().get_event();
135 e
.dynamic_type() == MQuery::static_type()) {
136 const auto map_epoch
=
137 shard_services
.get_osdmap_service().get_map()->get_epoch();
138 const auto& q
= static_cast<const MQuery
&>(e
);
139 const pg_info_t empty
{spg_t
{pgid
.pgid
, q
.query
.to
}};
140 if (q
.query
.type
== q
.query
.LOG
||
141 q
.query
.type
== q
.query
.FULLLOG
) {
142 auto m
= crimson::make_message
<MOSDPGLog
>(q
.query
.from
, q
.query
.to
,
145 ctx
.send_osd_message(q
.from
.osd
, std::move(m
));
147 ctx
.send_notify(q
.from
.osd
, {q
.query
.from
, q
.query
.to
,
155 PeeringEvent::interruptible_future
<> RemotePeeringEvent::complete_rctx(Ref
<PG
> pg
)
158 return PeeringEvent::complete_rctx(pg
);
160 logger().debug("{}: OSDState is {}", *this, osd
.state
);
161 return osd
.state
.when_active().then([this] {
162 assert(osd
.state
.is_active());
163 return shard_services
.dispatch_context_messages(std::move(ctx
));
168 seastar::future
<> RemotePeeringEvent::complete_rctx_no_pg()
170 logger().debug("{}: OSDState is {}", *this, osd
.state
);
171 return osd
.state
.when_active().then([this] {
172 assert(osd
.state
.is_active());
173 return shard_services
.dispatch_context_messages(std::move(ctx
));
177 seastar::future
<Ref
<PG
>> RemotePeeringEvent::get_pg()
179 return with_blocking_future(
180 handle
.enter(op().await_active
)
182 return osd
.state
.when_active();
184 return with_blocking_future(handle
.enter(cp().await_map
));
186 return with_blocking_future(
187 osd
.osdmap_gate
.wait_for_map(evt
.get_epoch_sent()));
188 }).then([this](auto epoch
) {
189 logger().debug("{}: got map {}", *this, epoch
);
190 return with_blocking_future(handle
.enter(cp().get_pg
));
192 return with_blocking_future(
193 osd
.get_or_create_pg(
194 pgid
, evt
.get_epoch_sent(), std::move(evt
.create_info
)));
198 seastar::future
<Ref
<PG
>> LocalPeeringEvent::get_pg() {
199 return seastar::make_ready_future
<Ref
<PG
>>(pg
);
202 LocalPeeringEvent::~LocalPeeringEvent() {}