1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <seastar/core/future.hh>
6 #include "messages/MOSDPGLog.h"
8 #include "common/Formatter.h"
9 #include "crimson/osd/pg.h"
10 #include "crimson/osd/osd.h"
11 #include "crimson/osd/osd_operations/peering_event.h"
12 #include "crimson/osd/osd_connection_priv.h"
15 seastar::logger
& logger() {
16 return crimson::get_logger(ceph_subsys_osd
);
20 namespace crimson::osd
{
22 void PeeringEvent::print(std::ostream
&lhs
) const
24 lhs
<< "PeeringEvent("
27 << " sent=" << evt
.get_epoch_sent()
28 << " requested=" << evt
.get_epoch_requested()
29 << " evt=" << evt
.get_desc()
33 void PeeringEvent::dump_detail(Formatter
*f
) const
35 f
->open_object_section("PeeringEvent");
36 f
->dump_stream("from") << from
;
37 f
->dump_stream("pgid") << pgid
;
38 f
->dump_int("sent", evt
.get_epoch_sent());
39 f
->dump_int("requested", evt
.get_epoch_requested());
40 f
->dump_string("evt", evt
.get_desc());
45 PeeringEvent::PGPipeline
&PeeringEvent::pp(PG
&pg
)
47 return pg
.peering_request_pg_pipeline
;
50 seastar::future
<> PeeringEvent::start()
53 logger().debug("{}: start", *this);
56 return get_pg().then([this](Ref
<PG
> pg
) {
58 logger().warn("{}: pg absent, did not create", *this);
61 return complete_rctx(pg
);
63 logger().debug("{}: pg present", *this);
64 return with_blocking_future(handle
.enter(pp(*pg
).await_map
)
66 return with_blocking_future(
67 pg
->osdmap_gate
.wait_for_map(evt
.get_epoch_sent()));
68 }).then([this, pg
](auto) {
69 return with_blocking_future(handle
.enter(pp(*pg
).process
));
71 pg
->do_peering_event(evt
, ctx
);
73 return complete_rctx(pg
);
76 }).then([this, ref
=std::move(ref
)] {
77 logger().debug("{}: complete", *this);
81 void PeeringEvent::on_pg_absent()
83 logger().debug("{}: pg absent, dropping", *this);
86 seastar::future
<> PeeringEvent::complete_rctx(Ref
<PG
> pg
)
88 logger().debug("{}: submitting ctx", *this);
89 return shard_services
.dispatch_context(
90 pg
->get_collection_ref(),
94 RemotePeeringEvent::ConnectionPipeline
&RemotePeeringEvent::cp()
96 return get_osd_priv(conn
.get()).peering_request_conn_pipeline
;
99 void RemotePeeringEvent::on_pg_absent()
101 if (auto& e
= get_event().get_event();
102 e
.dynamic_type() == MQuery::static_type()) {
103 const auto map_epoch
=
104 shard_services
.get_osdmap_service().get_map()->get_epoch();
105 const auto& q
= static_cast<const MQuery
&>(e
);
106 const pg_info_t empty
{spg_t
{pgid
.pgid
, q
.query
.to
}};
107 if (q
.query
.type
== q
.query
.LOG
||
108 q
.query
.type
== q
.query
.FULLLOG
) {
109 auto m
= ceph::make_message
<MOSDPGLog
>(q
.query
.from
, q
.query
.to
,
112 ctx
.send_osd_message(q
.from
.osd
, std::move(m
));
114 ctx
.send_notify(q
.from
.osd
, {q
.query
.from
, q
.query
.to
,
122 seastar::future
<> RemotePeeringEvent::complete_rctx(Ref
<PG
> pg
)
125 return PeeringEvent::complete_rctx(pg
);
127 return shard_services
.dispatch_context_messages(std::move(ctx
));
131 seastar::future
<Ref
<PG
>> RemotePeeringEvent::get_pg()
133 return with_blocking_future(
134 handle
.enter(cp().await_map
)
136 return with_blocking_future(
137 osd
.osdmap_gate
.wait_for_map(evt
.get_epoch_sent()));
138 }).then([this](auto epoch
) {
139 logger().debug("{}: got map {}", *this, epoch
);
140 return with_blocking_future(handle
.enter(cp().get_pg
));
142 return with_blocking_future(
143 osd
.get_or_create_pg(
144 pgid
, evt
.get_epoch_sent(), std::move(evt
.create_info
)));
148 seastar::future
<Ref
<PG
>> LocalPeeringEvent::get_pg() {
149 return seastar::make_ready_future
<Ref
<PG
>>(pg
);
152 LocalPeeringEvent::~LocalPeeringEvent() {}