]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/osd.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / osd / osd.cc
index 735b6d777ca913ef0c2334a9ea8e4df6d17c61a0..cfe4f54ab2e5e9aa49769f45f9d28fb651998dac 100644 (file)
@@ -95,6 +95,9 @@ OSD::OSD(int id, uint32_t nonce,
     monc{new crimson::mon::Client{*public_msgr, *this}},
     mgrc{new crimson::mgr::Client{*public_msgr, *this}},
     store{store},
+    pg_shard_manager{osd_singleton_state,
+                     shard_services,
+                     pg_to_shard_mappings},
     // do this in background -- continuation rearms timer when complete
     tick_timer{[this] {
       std::ignore = update_heartbeat_peers(
@@ -108,6 +111,7 @@ OSD::OSD(int id, uint32_t nonce,
     log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
     clog(log_client.create_channel())
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
                     std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
     msgr.get()->set_auth_server(monc.get());
@@ -159,6 +163,7 @@ CompatSet get_osd_initial_compat_set()
 
 seastar::future<> OSD::open_meta_coll()
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   return store.get_sharded_store().open_collection(
     coll_t::meta()
   ).then([this](auto ch) {
@@ -354,11 +359,27 @@ seastar::future<> OSD::start()
   logger().info("start");
 
   startup_time = ceph::mono_clock::now();
-
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   return store.start().then([this] {
-    return pg_shard_manager.start(
-    whoami, *cluster_msgr,
-    *public_msgr, *monc, *mgrc, store);
+    return pg_to_shard_mappings.start(0, seastar::smp::count
+    ).then([this] {
+      return osd_singleton_state.start_single(
+        whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
+        std::ref(*monc), std::ref(*mgrc));
+    }).then([this] {
+      return osd_states.start();
+    }).then([this] {
+      ceph::mono_time startup_time = ceph::mono_clock::now();
+      return shard_services.start(
+        std::ref(osd_singleton_state),
+        std::ref(pg_to_shard_mappings),
+        whoami,
+        startup_time,
+        osd_singleton_state.local().perf,
+        osd_singleton_state.local().recoverystate_perf,
+        std::ref(store),
+        std::ref(osd_states));
+    });
   }).then([this] {
     heartbeat.reset(new Heartbeat{
        whoami, get_shard_services(),
@@ -385,11 +406,13 @@ seastar::future<> OSD::start()
     osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
     return pg_shard_manager.update_map(std::move(map));
   }).then([this] {
-    pg_shard_manager.got_map(osdmap->get_epoch());
+    return shard_services.invoke_on_all([this](auto &local_service) {
+      local_service.local_state.osdmap_gate.got_map(osdmap->get_epoch());
+    });
+  }).then([this] {
     bind_epoch = osdmap->get_epoch();
     return pg_shard_manager.load_pgs(store);
   }).then([this] {
-
     uint64_t osd_required =
       CEPH_FEATURE_UID |
       CEPH_FEATURE_PGID64 |
@@ -584,9 +607,11 @@ seastar::future<> OSD::_add_me_to_crush()
   });
 }
 
-seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
-                                     Ref<MCommand> m)
+seastar::future<> OSD::handle_command(
+  crimson::net::ConnectionRef conn,
+  Ref<MCommand> m)
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   return asok->handle_command(conn, std::move(m));
 }
 
@@ -637,7 +662,8 @@ seastar::future<> OSD::stop()
   tick_timer.cancel();
   // see also OSD::shutdown()
   return prepare_to_stop().then([this] {
-    pg_shard_manager.set_stopping();
+    return pg_shard_manager.set_stopping();
+  }).then([this] {
     logger().debug("prepared to stop");
     public_msgr->stop();
     cluster_msgr->stop();
@@ -657,7 +683,13 @@ seastar::future<> OSD::stop()
     }).then([this] {
       return mgrc->stop();
     }).then([this] {
-      return pg_shard_manager.stop();
+      return shard_services.stop();
+    }).then([this] {
+      return osd_states.stop();
+    }).then([this] {
+      return osd_singleton_state.stop();
+    }).then([this] {
+      return pg_to_shard_mappings.stop();
     }).then([fut=std::move(gate_close_fut)]() mutable {
       return std::move(fut);
     }).then([this] {
@@ -697,77 +729,104 @@ std::optional<seastar::future<>>
 OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   if (pg_shard_manager.is_stopping()) {
-    return {};
+    return seastar::now();
   }
-  // XXX: we're assuming the `switch` part is executed immediately, and thus
-  // we won't smash the stack. Taking into account how `seastar::with_gate`
-  // is currently implemented, this seems to be the case (Summer 2022).
-  bool dispatched = true;
-  gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn),
-                                                m=std::move(m), &dispatched] {
+  auto maybe_ret = do_ms_dispatch(conn, std::move(m));
+  if (!maybe_ret.has_value()) {
+    return std::nullopt;
+  }
+
+  gate.dispatch_in_background(
+      __func__, *this, [ret=std::move(maybe_ret.value())]() mutable {
+    return std::move(ret);
+  });
+  return seastar::now();
+}
+
+std::optional<seastar::future<>>
+OSD::do_ms_dispatch(
+   crimson::net::ConnectionRef conn,
+   MessageRef m)
+{
+  if (seastar::this_shard_id() != PRIMARY_CORE) {
     switch (m->get_type()) {
     case CEPH_MSG_OSD_MAP:
-      return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
-    case CEPH_MSG_OSD_OP:
-      return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
-    case MSG_OSD_PG_CREATE2:
-      return handle_pg_create(
-       conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
-      return seastar::now();
     case MSG_COMMAND:
-      return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
     case MSG_OSD_MARK_ME_DOWN:
-      return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
-    case MSG_OSD_PG_PULL:
-      [[fallthrough]];
-    case MSG_OSD_PG_PUSH:
-      [[fallthrough]];
-    case MSG_OSD_PG_PUSH_REPLY:
-      [[fallthrough]];
-    case MSG_OSD_PG_RECOVERY_DELETE:
-      [[fallthrough]];
-    case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
-      [[fallthrough]];
-    case MSG_OSD_PG_SCAN:
-      [[fallthrough]];
-    case MSG_OSD_PG_BACKFILL:
-      [[fallthrough]];
-    case MSG_OSD_PG_BACKFILL_REMOVE:
-      return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
-    case MSG_OSD_PG_LEASE:
-      [[fallthrough]];
-    case MSG_OSD_PG_LEASE_ACK:
-      [[fallthrough]];
-    case MSG_OSD_PG_NOTIFY2:
-      [[fallthrough]];
-    case MSG_OSD_PG_INFO2:
-      [[fallthrough]];
-    case MSG_OSD_PG_QUERY2:
-      [[fallthrough]];
-    case MSG_OSD_BACKFILL_RESERVE:
-      [[fallthrough]];
-    case MSG_OSD_RECOVERY_RESERVE:
-      [[fallthrough]];
-    case MSG_OSD_PG_LOG:
-      return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
-    case MSG_OSD_REPOP:
-      return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
-    case MSG_OSD_REPOPREPLY:
-      return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
-    case MSG_OSD_SCRUB2:
-      return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
-    case MSG_OSD_PG_UPDATE_LOG_MISSING:
-      return handle_update_log_missing(conn, boost::static_pointer_cast<
-        MOSDPGUpdateLogMissing>(m));
-    case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
-      return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
-        MOSDPGUpdateLogMissingReply>(m));
-    default:
-      dispatched = false;
-      return seastar::now();
+      // FIXME: order is not guaranteed in this path
+      return conn.get_foreign(
+      ).then([this, m=std::move(m)](auto f_conn) {
+        return seastar::smp::submit_to(PRIMARY_CORE,
+            [f_conn=std::move(f_conn), m=std::move(m), this]() mutable {
+          auto conn = make_local_shared_foreign(std::move(f_conn));
+          auto ret = do_ms_dispatch(conn, std::move(m));
+          assert(ret.has_value());
+          return std::move(ret.value());
+        });
+      });
     }
-  });
-  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
+  }
+
+  switch (m->get_type()) {
+  case CEPH_MSG_OSD_MAP:
+    return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m));
+  case CEPH_MSG_OSD_OP:
+    return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+  case MSG_OSD_PG_CREATE2:
+    return handle_pg_create(
+      conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
+    return seastar::now();
+  case MSG_COMMAND:
+    return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
+  case MSG_OSD_MARK_ME_DOWN:
+    return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
+  case MSG_OSD_PG_PULL:
+    [[fallthrough]];
+  case MSG_OSD_PG_PUSH:
+    [[fallthrough]];
+  case MSG_OSD_PG_PUSH_REPLY:
+    [[fallthrough]];
+  case MSG_OSD_PG_RECOVERY_DELETE:
+    [[fallthrough]];
+  case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+    [[fallthrough]];
+  case MSG_OSD_PG_SCAN:
+    [[fallthrough]];
+  case MSG_OSD_PG_BACKFILL:
+    [[fallthrough]];
+  case MSG_OSD_PG_BACKFILL_REMOVE:
+    return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
+  case MSG_OSD_PG_LEASE:
+    [[fallthrough]];
+  case MSG_OSD_PG_LEASE_ACK:
+    [[fallthrough]];
+  case MSG_OSD_PG_NOTIFY2:
+    [[fallthrough]];
+  case MSG_OSD_PG_INFO2:
+    [[fallthrough]];
+  case MSG_OSD_PG_QUERY2:
+    [[fallthrough]];
+  case MSG_OSD_BACKFILL_RESERVE:
+    [[fallthrough]];
+  case MSG_OSD_RECOVERY_RESERVE:
+    [[fallthrough]];
+  case MSG_OSD_PG_LOG:
+    return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+  case MSG_OSD_REPOP:
+    return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+  case MSG_OSD_REPOPREPLY:
+    return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
+  case MSG_OSD_SCRUB2:
+    return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+  case MSG_OSD_PG_UPDATE_LOG_MISSING:
+    return handle_update_log_missing(conn, boost::static_pointer_cast<
+      MOSDPGUpdateLogMissing>(m));
+  case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+    return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
+      MOSDPGUpdateLogMissingReply>(m));
+  default:
+    return std::nullopt;
+  }
 }
 
 void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
@@ -841,20 +900,26 @@ uint64_t OSD::send_pg_stats()
   return osd_stat.seq;
 }
 
-bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
+seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m)
 {
-  if (!conn->peer_is_mon()) {
-    logger().info("{} received from non-mon {}, {}",
-                 __func__,
-                 conn->get_peer_addr(),
-                 *m);
-    return false;
-  }
-  return true;
+  /* Ensure that only one MOSDMap is processed at a time.  Allowing concurrent
+  * processing may eventually be worthwhile, but such an implementation would
+  * need to ensure (among other things)
+  * 1. any particular map is only processed once
+  * 2. PGAdvanceMap operations are processed in order for each PG
+  * As map handling is not presently a bottleneck, we stick to this
+  * simpler invariant for now.
+  * See https://tracker.ceph.com/issues/59165
+  */
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return handle_osd_map_lock.lock().then([this, m] {
+    return _handle_osd_map(m);
+  }).finally([this] {
+    return handle_osd_map_lock.unlock();
+  });
 }
 
-seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
-                                      Ref<MOSDMap> m)
+seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
 {
   logger().info("handle_osd_map {}", *m);
   if (m->fsid != superblock.cluster_fsid) {
@@ -926,10 +991,12 @@ seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
   });
 }
 
-seastar::future<> OSD::committed_osd_maps(version_t first,
-                                          version_t last,
-                                          Ref<MOSDMap> m)
+seastar::future<> OSD::committed_osd_maps(
+  version_t first,
+  version_t last,
+  Ref<MOSDMap> m)
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
   // advance through the new maps
   return seastar::do_for_each(boost::make_counting_iterator(first),
@@ -956,6 +1023,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
       }
     });
   }).then([m, this] {
+    auto fut = seastar::now();
     if (osdmap->is_up(whoami)) {
       const auto up_from = osdmap->get_up_from(whoami);
       logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
@@ -965,12 +1033,13 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
           osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
           pg_shard_manager.is_booting()) {
         logger().info("osd.{}: activating...", whoami);
-        pg_shard_manager.set_active();
-        beacon_timer.arm_periodic(
-          std::chrono::seconds(local_conf()->osd_beacon_report_interval));
-       // timer continuation rearms when complete
-        tick_timer.arm(
-          std::chrono::seconds(TICK_INTERVAL));
+        fut = pg_shard_manager.set_active().then([this] {
+          beacon_timer.arm_periodic(
+            std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+         // timer continuation rearms when complete
+          tick_timer.arm(
+            std::chrono::seconds(TICK_INTERVAL));
+        });
       }
     } else {
       if (pg_shard_manager.is_prestop()) {
@@ -978,9 +1047,13 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
        return seastar::now();
       }
     }
-    return check_osdmap_features().then([this] {
-      // yay!
-      return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+    return fut.then([this] {
+      return check_osdmap_features().then([this] {
+        // yay!
+        logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
+                      " to {} epoch to pgs", whoami, osdmap->get_epoch());
+        return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+      });
     });
   }).then([m, this] {
     if (pg_shard_manager.is_active()) {
@@ -1013,20 +1086,22 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
   });
 }
 
-seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
-                                     Ref<MOSDOp> m)
+seastar::future<> OSD::handle_osd_op(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDOp> m)
 {
-  (void) pg_shard_manager.start_pg_operation<ClientRequest>(
+  return pg_shard_manager.start_pg_operation<ClientRequest>(
     get_shard_services(),
     conn,
-    std::move(m));
-  return seastar::now();
+    std::move(m)).second;
 }
 
-seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
-                                       Ref<MOSDPGCreate2> m)
+seastar::future<> OSD::handle_pg_create(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDPGCreate2> m)
 {
-  for (auto& [pgid, when] : m->pgs) {
+  return seastar::do_for_each(m->pgs, [this, conn, m](auto& pg) {
+    auto& [pgid, when] = pg;
     const auto &[created, created_stamp] = when;
     auto q = m->pg_extra.find(pgid);
     ceph_assert(q != m->pg_extra.end());
@@ -1043,8 +1118,9 @@ seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
         "unmatched past_intervals {} (history {})",
         pgid, m->epoch,
         pi, history);
+        return seastar::now();
     } else {
-      std::ignore = pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
+      return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
          conn,
          pg_shard_t(),
          pgid,
@@ -1052,10 +1128,9 @@ seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
          m->epoch,
          NullEvt(),
          true,
-         new PGCreateInfo(pgid, m->epoch, history, pi, true));
+         new PGCreateInfo(pgid, m->epoch, history, pi, true)).second;
     }
-  }
-  return seastar::now();
+  });
 }
 
 seastar::future<> OSD::handle_update_log_missing(
@@ -1063,10 +1138,9 @@ seastar::future<> OSD::handle_update_log_missing(
   Ref<MOSDPGUpdateLogMissing> m)
 {
   m->decode_payload();
-  (void) pg_shard_manager.start_pg_operation<LogMissingRequest>(
+  return pg_shard_manager.start_pg_operation<LogMissingRequest>(
     std::move(conn),
-    std::move(m));
-  return seastar::now();
+    std::move(m)).second;
 }
 
 seastar::future<> OSD::handle_update_log_missing_reply(
@@ -1074,24 +1148,24 @@ seastar::future<> OSD::handle_update_log_missing_reply(
   Ref<MOSDPGUpdateLogMissingReply> m)
 {
   m->decode_payload();
-  (void) pg_shard_manager.start_pg_operation<LogMissingRequestReply>(
+  return pg_shard_manager.start_pg_operation<LogMissingRequestReply>(
     std::move(conn),
-    std::move(m));
-  return seastar::now();
+    std::move(m)).second;
 }
 
-seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
-                                    Ref<MOSDRepOp> m)
+seastar::future<> OSD::handle_rep_op(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDRepOp> m)
 {
   m->finish_decode();
-  std::ignore = pg_shard_manager.start_pg_operation<RepRequest>(
+  return pg_shard_manager.start_pg_operation<RepRequest>(
     std::move(conn),
-    std::move(m));
-  return seastar::now();
+    std::move(m)).second;
 }
 
-seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
-                                          Ref<MOSDRepOpReply> m)
+seastar::future<> OSD::handle_rep_op_reply(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDRepOpReply> m)
 {
   spg_t pgid = m->get_spg();
   return pg_shard_manager.with_pg(
@@ -1107,8 +1181,9 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
     });
 }
 
-seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
-                                   Ref<MOSDScrub2> m)
+seastar::future<> OSD::handle_scrub(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDScrub2> m)
 {
   if (m->fsid != superblock.cluster_fsid) {
     logger().warn("fsid mismatched");
@@ -1127,21 +1202,23 @@ seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
   });
 }
 
-seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
-                                          Ref<MOSDMarkMeDown> m)
+seastar::future<> OSD::handle_mark_me_down(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDMarkMeDown> m)
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   if (pg_shard_manager.is_prestop()) {
     got_stop_ack();
   }
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
-                                  Ref<MOSDFastDispatchOp> m)
+seastar::future<> OSD::handle_recovery_subreq(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDFastDispatchOp> m)
 {
-  std::ignore = pg_shard_manager.start_pg_operation<RecoverySubRequest>(
-    conn, std::move(m));
-  return seastar::now();
+  return pg_shard_manager.start_pg_operation<RecoverySubRequest>(
+    conn, std::move(m)).second;
 }
 
 bool OSD::should_restart() const
@@ -1234,18 +1311,19 @@ seastar::future<> OSD::handle_peering_op(
   logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
   m->set_features(conn->get_features());
   std::unique_ptr<PGPeeringEvent> evt(m->get_event());
-  (void) pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
+  return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
     conn,
     pg_shard_t{from, m->get_spg().shard},
     m->get_spg(),
-    std::move(*evt));
-  return seastar::now();
+    std::move(*evt)).second;
 }
 
 seastar::future<> OSD::check_osdmap_features()
 {
-  return store.write_meta("require_osd_release",
-                          stringify((int)osdmap->require_osd_release));
+  assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return store.write_meta(
+      "require_osd_release",
+      stringify((int)osdmap->require_osd_release));
 }
 
 seastar::future<> OSD::prepare_to_stop()