]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/osd_operations/compound_peering_request.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / osd_operations / compound_peering_request.cc
index 46a6d1ca3947924dcbaa7d05b64a57e4bd8c0140..42e827600b2fc71ccfc36b5ee1b51e6a35f77c91 100644 (file)
@@ -5,11 +5,11 @@
 
 #include "osd/PeeringState.h"
 
-#include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGCreate2.h"
 
 #include "common/Formatter.h"
 
+#include "crimson/common/exception.h"
 #include "crimson/osd/pg.h"
 #include "crimson/osd/osd.h"
 #include "crimson/osd/osd_operations/compound_peering_request.h"
@@ -27,7 +27,7 @@ struct compound_state {
   seastar::promise<BufferedRecoveryMessages> promise;
   // assuming crimson-osd won't need to be compatible with pre-octopus
   // releases
-  BufferedRecoveryMessages ctx{ceph_release_t::octopus};
+  BufferedRecoveryMessages ctx;
   compound_state() = default;
   ~compound_state() {
     promise.set_value(std::move(ctx));
@@ -42,7 +42,8 @@ public:
   PeeringSubEvent(compound_state_ref state, Args &&... args) :
     RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
 
-  seastar::future<> complete_rctx(Ref<crimson::osd::PG> pg) final {
+  PeeringEvent::interruptible_future<>
+  complete_rctx(Ref<crimson::osd::PG> pg) final {
     logger().debug("{}: submitting ctx transaction", *this);
     state->ctx.accept_buffered_messages(ctx);
     state = {};
@@ -56,34 +57,30 @@ public:
   }
 };
 
-std::vector<OperationRef> handle_pg_create(
+std::vector<crimson::OperationRef> handle_pg_create(
   OSD &osd,
   crimson::net::ConnectionRef conn,
   compound_state_ref state,
   Ref<MOSDPGCreate2> m)
 {
-  std::vector<OperationRef> ret;
-  for (auto &p : m->pgs) {
-    const spg_t &pgid = p.first;
-    const auto &[created, created_stamp] = p.second;
+  std::vector<crimson::OperationRef> ret;
+  for (auto& [pgid, when] : m->pgs) {
+    const auto &[created, created_stamp] = when;
     auto q = m->pg_extra.find(pgid);
     ceph_assert(q != m->pg_extra.end());
+    auto& [history, pi] = q->second;
     logger().debug(
-      "{}, {} {} e{} @{} history {} pi {}",
-      __func__,
-      pgid,
-      created,
-      created_stamp,
-      q->second.first,
-      q->second.second);
-    if (!q->second.second.empty() &&
-       m->epoch < q->second.second.get_bounds().second) {
+      "{}: {} e{} @{} "
+      "history {} pi {}",
+      __func__, pgid, created, created_stamp,
+      history, pi);
+    if (!pi.empty() &&
+       m->epoch < pi.get_bounds().second) {
       logger().error(
-       "got pg_create on {} epoch {} unmatched past_intervals (history {})",
-       pgid,
-       m->epoch,
-       q->second.second,
-       q->second.first);
+        "got pg_create on {} epoch {}  "
+        "unmatched past_intervals {} (history {})",
+        pgid, m->epoch,
+        pi, history);
     } else {
       auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
          state,
@@ -96,23 +93,19 @@ std::vector<OperationRef> handle_pg_create(
          m->epoch,
          NullEvt(),
          true,
-         new PGCreateInfo(
-           pgid,
-           m->epoch,
-           q->second.first,
-           q->second.second,
-           true)).first;
+         new PGCreateInfo(pgid, m->epoch, history, pi, true)).first;
       ret.push_back(op);
     }
   }
   return ret;
 }
 
-struct SubOpBlocker : BlockerT<SubOpBlocker> {
+struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
   static constexpr const char * type_name = "CompoundOpBlocker";
 
-  std::vector<OperationRef> subops;
-  SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {}
+  std::vector<crimson::OperationRef> subops;
+  SubOpBlocker(std::vector<crimson::OperationRef> &&subops)
+    : subops(subops) {}
 
   virtual void dump_detail(Formatter *f) const {
     f->open_array_section("dependent_operations");
@@ -162,13 +155,16 @@ seastar::future<> CompoundPeeringRequest::start()
 
   IRef ref = this;
   logger().info("{}: about to fork future", *this);
-  return with_blocking_future(
-    blocker->make_blocking_future(state->promise.get_future())
-  ).then([this, blocker=std::move(blocker)](auto &&ctx) {
-    logger().info("{}: sub events complete", *this);
-    return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
-  }).then([this, ref=std::move(ref)] {
-    logger().info("{}: complete", *this);
+  return crimson::common::handle_system_shutdown(
+    [this, ref, blocker=std::move(blocker), state]() mutable {
+    return with_blocking_future(
+      blocker->make_blocking_future(state->promise.get_future())
+    ).then([this, blocker=std::move(blocker)](auto &&ctx) {
+      logger().info("{}: sub events complete", *this);
+      return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+    }).then([this, ref=std::move(ref)] {
+      logger().info("{}: complete", *this);
+    });
   });
 }