]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/osd_operations/peering_event.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / osd_operations / peering_event.cc
index a43fb8d2c3ed5550fd440c66d85af6f3dabf51e2..b323b4a817bf29262c75b38001bb154cd8d6976a 100644 (file)
@@ -9,6 +9,7 @@
 #include "common/Formatter.h"
 #include "crimson/osd/pg.h"
 #include "crimson/osd/osd.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_connection_priv.h"
 
@@ -20,7 +21,8 @@ namespace {
 
 namespace crimson::osd {
 
-void PeeringEvent::print(std::ostream &lhs) const
+template <class T>
+void PeeringEvent<T>::print(std::ostream &lhs) const
 {
   lhs << "PeeringEvent("
       << "from=" << from
@@ -31,7 +33,8 @@ void PeeringEvent::print(std::ostream &lhs) const
       << ")";
 }
 
-void PeeringEvent::dump_detail(Formatter *f) const
+template <class T>
+void PeeringEvent<T>::dump_detail(Formatter *f) const
 {
   f->open_object_section("PeeringEvent");
   f->dump_stream("from") << from;
@@ -39,79 +42,76 @@ void PeeringEvent::dump_detail(Formatter *f) const
   f->dump_int("sent", evt.get_epoch_sent());
   f->dump_int("requested", evt.get_epoch_requested());
   f->dump_string("evt", evt.get_desc());
+  f->open_array_section("events");
+  {
+    std::apply([f](auto&... events) {
+      (..., events.dump(f));
+    }, static_cast<const T*>(this)->tracking_events);
+  }
+  f->close_section();
   f->close_section();
 }
 
 
-PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
+template <class T>
+PGPeeringPipeline &PeeringEvent<T>::pp(PG &pg)
 {
   return pg.peering_request_pg_pipeline;
 }
 
-seastar::future<> PeeringEvent::start()
+template <class T>
+seastar::future<> PeeringEvent<T>::with_pg(
+  ShardServices &shard_services, Ref<PG> pg)
 {
-
-  logger().debug("{}: start", *this);
-
-  IRef ref = this;
-  auto maybe_delay = seastar::now();
-  if (delay) {
-    maybe_delay = seastar::sleep(
-      std::chrono::milliseconds(std::lround(delay * 1000)));
+  if (!pg) {
+    logger().warn("{}: pg absent, did not create", *this);
+    on_pg_absent(shard_services);
+    that()->get_handle().exit();
+    return complete_rctx_no_pg(shard_services);
   }
-  return maybe_delay.then([this] {
-    return get_pg();
-  }).then([this](Ref<PG> pg) {
-    if (!pg) {
-      logger().warn("{}: pg absent, did not create", *this);
-      on_pg_absent();
-      handle.exit();
-      return complete_rctx_no_pg();
-    }
-    return interruptor::with_interruption([this, pg] {
-      logger().debug("{}: pg present", *this);
-      return with_blocking_future_interruptible<interruptor::condition>(
-        handle.enter(pp(*pg).await_map)
-      ).then_interruptible([this, pg] {
-        return with_blocking_future_interruptible<interruptor::condition>(
-          pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
-      }).then_interruptible([this, pg](auto) {
-        return with_blocking_future_interruptible<interruptor::condition>(
-          handle.enter(pp(*pg).process));
-      }).then_interruptible([this, pg] {
-        // TODO: likely we should synchronize also with the pg log-based
-        // recovery.
-        return with_blocking_future_interruptible<interruptor::condition>(
-          handle.enter(BackfillRecovery::bp(*pg).process));
-      }).then_interruptible([this, pg] {
-        pg->do_peering_event(evt, ctx);
-        handle.exit();
-        return complete_rctx(pg);
-      }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> {
-        if (!pg->get_need_up_thru()) {
-          return seastar::now();
-        }
-        return shard_services.send_alive(pg->get_same_interval_since());
-      }).then_interruptible([this] {
-        return shard_services.send_pg_temp();
+
+  using interruptor = typename T::interruptor;
+  return interruptor::with_interruption([this, pg, &shard_services] {
+    logger().debug("{}: pg present", *this);
+    return this->template enter_stage<interruptor>(pp(*pg).await_map
+    ).then_interruptible([this, pg] {
+      return this->template with_blocking_event<
+       PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+       >([this, pg](auto &&trigger) {
+         return pg->osdmap_gate.wait_for_map(
+           std::move(trigger), evt.get_epoch_sent());
+       });
+    }).then_interruptible([this, pg](auto) {
+      return this->template enter_stage<interruptor>(pp(*pg).process);
+    }).then_interruptible([this, pg, &shard_services] {
+      return pg->do_peering_event(evt, ctx
+      ).then_interruptible([this, pg, &shard_services] {
+       that()->get_handle().exit();
+       return complete_rctx(shard_services, pg);
       });
-    },
-    [this](std::exception_ptr ep) {
-      logger().debug("{}: interrupted with {}", *this, ep);
-      return seastar::now();
-    },
-    pg);
-  }).finally([ref=std::move(ref)] {
-    logger().debug("{}: complete", *ref);
-  });
+    }).then_interruptible([pg, &shard_services]()
+                         -> typename T::template interruptible_future<> {
+      if (!pg->get_need_up_thru()) {
+       return seastar::now();
+      }
+      return shard_services.send_alive(pg->get_same_interval_since());
+    }).then_interruptible([&shard_services] {
+      return shard_services.send_pg_temp();
+    });
+  }, [this](std::exception_ptr ep) {
+    logger().debug("{}: interrupted with {}", *this, ep);
+  }, pg);
 }
 
-void PeeringEvent::on_pg_absent()
+template <class T>
+void PeeringEvent<T>::on_pg_absent(ShardServices &)
 {
   logger().debug("{}: pg absent, dropping", *this);
 }
 
-PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg)
+template <class T>
+typename PeeringEvent<T>::template interruptible_future<>
+PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg)
 {
   logger().debug("{}: submitting ctx", *this);
   return shard_services.dispatch_context(
@@ -119,22 +119,17 @@ PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg)
     std::move(ctx));
 }
 
-RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
+ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
 {
   return get_osd_priv(conn.get()).peering_request_conn_pipeline;
 }
 
-RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op()
-{
-  return osd.peering_request_osd_pipeline;
-}
-
-void RemotePeeringEvent::on_pg_absent()
+void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
 {
   if (auto& e = get_event().get_event();
       e.dynamic_type() == MQuery::static_type()) {
     const auto map_epoch =
-      shard_services.get_osdmap_service().get_map()->get_epoch();
+      shard_services.get_map()->get_epoch();
     const auto& q = static_cast<const MQuery&>(e);
     const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
     if (q.query.type == q.query.LOG ||
@@ -152,53 +147,44 @@ void RemotePeeringEvent::on_pg_absent()
   }
 }
 
-PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
+RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(
+  ShardServices &shard_services,
+  Ref<PG> pg)
 {
   if (pg) {
-    return PeeringEvent::complete_rctx(pg);
+    return PeeringEvent::complete_rctx(shard_services, pg);
   } else {
-    logger().debug("{}: OSDState is {}", *this, osd.state);
-    return osd.state.when_active().then([this] {
-      assert(osd.state.is_active());
-      return shard_services.dispatch_context_messages(std::move(ctx));
-    });
+    return shard_services.dispatch_context_messages(std::move(ctx));
   }
 }
 
-seastar::future<> RemotePeeringEvent::complete_rctx_no_pg()
+seastar::future<> RemotePeeringEvent::complete_rctx_no_pg(
+  ShardServices &shard_services)
 {
-  logger().debug("{}: OSDState is {}", *this, osd.state);
-  return osd.state.when_active().then([this] {
-    assert(osd.state.is_active());
-    return shard_services.dispatch_context_messages(std::move(ctx));
-  });
+  return shard_services.dispatch_context_messages(std::move(ctx));
 }
 
-seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
+seastar::future<> LocalPeeringEvent::start()
 {
-  return with_blocking_future(
-    handle.enter(op().await_active)
-  ).then([this] {
-    return osd.state.when_active();
-  }).then([this] {
-    return with_blocking_future(handle.enter(cp().await_map));
-  }).then([this] {
-    return with_blocking_future(
-      osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
-  }).then([this](auto epoch) {
-    logger().debug("{}: got map {}", *this, epoch);
-    return with_blocking_future(handle.enter(cp().get_pg));
-  }).then([this] {
-    return with_blocking_future(
-      osd.get_or_create_pg(
-       pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
+  logger().debug("{}: start", *this);
+
+  IRef ref = this;
+  auto maybe_delay = seastar::now();
+  if (delay) {
+    maybe_delay = seastar::sleep(
+      std::chrono::milliseconds(std::lround(delay * 1000)));
+  }
+  return maybe_delay.then([this] {
+    return with_pg(pg->get_shard_services(), pg);
+  }).finally([ref=std::move(ref)] {
+    logger().debug("{}: complete", *ref);
   });
 }
 
-seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
-  return seastar::make_ready_future<Ref<PG>>(pg);
-}
 
 LocalPeeringEvent::~LocalPeeringEvent() {}
 
+template class PeeringEvent<RemotePeeringEvent>;
+template class PeeringEvent<LocalPeeringEvent>;
+
 }