#include "common/Formatter.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_connection_priv.h"
namespace crimson::osd {
-void PeeringEvent::print(std::ostream &lhs) const
+template <class T>
+void PeeringEvent<T>::print(std::ostream &lhs) const
{
lhs << "PeeringEvent("
<< "from=" << from
<< ")";
}
-void PeeringEvent::dump_detail(Formatter *f) const
+template <class T>
+void PeeringEvent<T>::dump_detail(Formatter *f) const
{
f->open_object_section("PeeringEvent");
f->dump_stream("from") << from;
f->dump_int("sent", evt.get_epoch_sent());
f->dump_int("requested", evt.get_epoch_requested());
f->dump_string("evt", evt.get_desc());
+ f->open_array_section("events");
+ {
+ std::apply([f](auto&... events) {
+ (..., events.dump(f));
+ }, static_cast<const T*>(this)->tracking_events);
+ }
+ f->close_section();
f->close_section();
}
-PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
+template <class T>
+PGPeeringPipeline &PeeringEvent<T>::pp(PG &pg)
{
return pg.peering_request_pg_pipeline;
}
-seastar::future<> PeeringEvent::start()
+template <class T>
+seastar::future<> PeeringEvent<T>::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
{
-
- logger().debug("{}: start", *this);
-
- IRef ref = this;
- auto maybe_delay = seastar::now();
- if (delay) {
- maybe_delay = seastar::sleep(
- std::chrono::milliseconds(std::lround(delay * 1000)));
+ if (!pg) {
+ logger().warn("{}: pg absent, did not create", *this);
+ on_pg_absent(shard_services);
+ that()->get_handle().exit();
+ return complete_rctx_no_pg(shard_services);
}
- return maybe_delay.then([this] {
- return get_pg();
- }).then([this](Ref<PG> pg) {
- if (!pg) {
- logger().warn("{}: pg absent, did not create", *this);
- on_pg_absent();
- handle.exit();
- return complete_rctx_no_pg();
- }
- return interruptor::with_interruption([this, pg] {
- logger().debug("{}: pg present", *this);
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).await_map)
- ).then_interruptible([this, pg] {
- return with_blocking_future_interruptible<interruptor::condition>(
- pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
- }).then_interruptible([this, pg](auto) {
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).process));
- }).then_interruptible([this, pg] {
- // TODO: likely we should synchronize also with the pg log-based
- // recovery.
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(BackfillRecovery::bp(*pg).process));
- }).then_interruptible([this, pg] {
- pg->do_peering_event(evt, ctx);
- handle.exit();
- return complete_rctx(pg);
- }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> {
- if (!pg->get_need_up_thru()) {
- return seastar::now();
- }
- return shard_services.send_alive(pg->get_same_interval_since());
- }).then_interruptible([this] {
- return shard_services.send_pg_temp();
+
+ using interruptor = typename T::interruptor;
+ return interruptor::with_interruption([this, pg, &shard_services] {
+ logger().debug("{}: pg present", *this);
+ return this->template enter_stage<interruptor>(pp(*pg).await_map
+ ).then_interruptible([this, pg] {
+ return this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, pg](auto &&trigger) {
+ return pg->osdmap_gate.wait_for_map(
+ std::move(trigger), evt.get_epoch_sent());
+ });
+ }).then_interruptible([this, pg](auto) {
+ return this->template enter_stage<interruptor>(pp(*pg).process);
+ }).then_interruptible([this, pg, &shard_services] {
+ return pg->do_peering_event(evt, ctx
+ ).then_interruptible([this, pg, &shard_services] {
+ that()->get_handle().exit();
+ return complete_rctx(shard_services, pg);
});
- },
- [this](std::exception_ptr ep) {
- logger().debug("{}: interrupted with {}", *this, ep);
- return seastar::now();
- },
- pg);
- }).finally([ref=std::move(ref)] {
- logger().debug("{}: complete", *ref);
- });
+ }).then_interruptible([pg, &shard_services]()
+ -> typename T::template interruptible_future<> {
+ if (!pg->get_need_up_thru()) {
+ return seastar::now();
+ }
+ return shard_services.send_alive(pg->get_same_interval_since());
+ }).then_interruptible([&shard_services] {
+ return shard_services.send_pg_temp();
+ });
+ }, [this](std::exception_ptr ep) {
+ logger().debug("{}: interrupted with {}", *this, ep);
+ }, pg);
}
-void PeeringEvent::on_pg_absent()
+template <class T>
+void PeeringEvent<T>::on_pg_absent(ShardServices &)
{
logger().debug("{}: pg absent, dropping", *this);
}
-PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg)
+template <class T>
+typename PeeringEvent<T>::template interruptible_future<>
+PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg)
{
logger().debug("{}: submitting ctx", *this);
return shard_services.dispatch_context(
std::move(ctx));
}
-RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
+ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
{
return get_osd_priv(conn.get()).peering_request_conn_pipeline;
}
-RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op()
-{
- return osd.peering_request_osd_pipeline;
-}
-
-void RemotePeeringEvent::on_pg_absent()
+void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
{
if (auto& e = get_event().get_event();
e.dynamic_type() == MQuery::static_type()) {
const auto map_epoch =
- shard_services.get_osdmap_service().get_map()->get_epoch();
+ shard_services.get_map()->get_epoch();
const auto& q = static_cast<const MQuery&>(e);
const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
if (q.query.type == q.query.LOG ||
}
}
-PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
+RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(
+ ShardServices &shard_services,
+ Ref<PG> pg)
{
if (pg) {
- return PeeringEvent::complete_rctx(pg);
+ return PeeringEvent::complete_rctx(shard_services, pg);
} else {
- logger().debug("{}: OSDState is {}", *this, osd.state);
- return osd.state.when_active().then([this] {
- assert(osd.state.is_active());
- return shard_services.dispatch_context_messages(std::move(ctx));
- });
+ return shard_services.dispatch_context_messages(std::move(ctx));
}
}
-seastar::future<> RemotePeeringEvent::complete_rctx_no_pg()
+seastar::future<> RemotePeeringEvent::complete_rctx_no_pg(
+ ShardServices &shard_services)
{
- logger().debug("{}: OSDState is {}", *this, osd.state);
- return osd.state.when_active().then([this] {
- assert(osd.state.is_active());
- return shard_services.dispatch_context_messages(std::move(ctx));
- });
+ return shard_services.dispatch_context_messages(std::move(ctx));
}
-seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
+seastar::future<> LocalPeeringEvent::start()
{
- return with_blocking_future(
- handle.enter(op().await_active)
- ).then([this] {
- return osd.state.when_active();
- }).then([this] {
- return with_blocking_future(handle.enter(cp().await_map));
- }).then([this] {
- return with_blocking_future(
- osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
- }).then([this](auto epoch) {
- logger().debug("{}: got map {}", *this, epoch);
- return with_blocking_future(handle.enter(cp().get_pg));
- }).then([this] {
- return with_blocking_future(
- osd.get_or_create_pg(
- pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ auto maybe_delay = seastar::now();
+ if (delay) {
+ maybe_delay = seastar::sleep(
+ std::chrono::milliseconds(std::lround(delay * 1000)));
+ }
+ return maybe_delay.then([this] {
+ return with_pg(pg->get_shard_services(), pg);
+ }).finally([ref=std::move(ref)] {
+ logger().debug("{}: complete", *ref);
});
}
-seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
- return seastar::make_ready_future<Ref<PG>>(pg);
-}
LocalPeeringEvent::~LocalPeeringEvent() {}
+template class PeeringEvent<RemotePeeringEvent>;
+template class PeeringEvent<LocalPeeringEvent>;
+
}