1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <sys/utsname.h>
8 #include <boost/iterator/counting_iterator.hpp>
9 #include <boost/range/join.hpp>
10 #include <boost/smart_ptr/make_local_shared.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <seastar/core/timer.hh>
15 #include "common/pick_address.h"
16 #include "include/util.h"
18 #include "messages/MCommand.h"
19 #include "messages/MOSDBeacon.h"
20 #include "messages/MOSDBoot.h"
21 #include "messages/MOSDMap.h"
22 #include "messages/MOSDMarkMeDown.h"
23 #include "messages/MOSDOp.h"
24 #include "messages/MOSDPeeringOp.h"
25 #include "messages/MOSDRepOpReply.h"
26 #include "messages/MOSDScrub2.h"
27 #include "messages/MPGStats.h"
29 #include "os/Transaction.h"
30 #include "osd/ClassHandler.h"
31 #include "osd/OSDCap.h"
32 #include "osd/PGPeeringEvent.h"
33 #include "osd/PeeringState.h"
35 #include "crimson/admin/osd_admin.h"
36 #include "crimson/admin/pg_commands.h"
37 #include "crimson/common/buffer_io.h"
38 #include "crimson/common/exception.h"
39 #include "crimson/mon/MonClient.h"
40 #include "crimson/net/Connection.h"
41 #include "crimson/net/Messenger.h"
42 #include "crimson/os/futurized_collection.h"
43 #include "crimson/os/futurized_store.h"
44 #include "crimson/osd/heartbeat.h"
45 #include "crimson/osd/osd_meta.h"
46 #include "crimson/osd/pg.h"
47 #include "crimson/osd/pg_backend.h"
48 #include "crimson/osd/pg_meta.h"
49 #include "crimson/osd/osd_operations/client_request.h"
50 #include "crimson/osd/osd_operations/compound_peering_request.h"
51 #include "crimson/osd/osd_operations/peering_event.h"
52 #include "crimson/osd/osd_operations/pg_advance_map.h"
53 #include "crimson/osd/osd_operations/recovery_subrequest.h"
54 #include "crimson/osd/osd_operations/replicated_request.h"
57 seastar::logger
& logger() {
58 return crimson::get_logger(ceph_subsys_osd
);
60 static constexpr int TICK_INTERVAL
= 1;
63 using std::make_unique
;
67 using std::unique_ptr
;
70 using crimson::common::local_conf
;
71 using crimson::os::FuturizedStore
;
73 namespace crimson::osd
{
75 OSD::OSD(int id
, uint32_t nonce
,
76 crimson::os::FuturizedStore
& store
,
77 crimson::net::MessengerRef cluster_msgr
,
78 crimson::net::MessengerRef public_msgr
,
79 crimson::net::MessengerRef hb_front_msgr
,
80 crimson::net::MessengerRef hb_back_msgr
)
83 // do this in background
84 beacon_timer
{[this] { (void)send_beacon(); }},
85 cluster_msgr
{cluster_msgr
},
86 public_msgr
{public_msgr
},
87 monc
{new crimson::mon::Client
{*public_msgr
, *this}},
88 mgrc
{new crimson::mgr::Client
{*public_msgr
, *this}},
90 shard_services
{*this, whoami
, *cluster_msgr
, *public_msgr
, *monc
, *mgrc
, store
},
91 heartbeat
{new Heartbeat
{whoami
, shard_services
, *monc
, hb_front_msgr
, hb_back_msgr
}},
92 // do this in background
94 update_heartbeat_peers();
97 asok
{seastar::make_lw_shared
<crimson::admin::AdminSocket
>()},
98 osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services
))),
99 log_client(cluster_msgr
.get(), LogClient::NO_FLAGS
),
100 clog(log_client
.create_channel())
102 osdmaps
[0] = boost::make_local_shared
<OSDMap
>();
103 for (auto msgr
: {std::ref(cluster_msgr
), std::ref(public_msgr
),
104 std::ref(hb_front_msgr
), std::ref(hb_back_msgr
)}) {
105 msgr
.get()->set_auth_server(monc
.get());
106 msgr
.get()->set_auth_client(monc
.get());
109 if (local_conf()->osd_open_classes_on_start
) {
110 const int r
= ClassHandler::get_instance().open_all_classes();
112 logger().warn("{} warning: got an error loading one or more classes: {}",
113 __func__
, cpp_strerror(r
));
116 logger().info("{}: nonce is {}", __func__
, nonce
);
117 monc
->set_log_client(&log_client
);
118 clog
->set_log_to_monitors(true);
121 OSD::~OSD() = default;
124 // Initial features in new superblock.
125 // Features here are also automatically upgraded
126 CompatSet
get_osd_initial_compat_set()
128 CompatSet::FeatureSet ceph_osd_feature_compat
;
129 CompatSet::FeatureSet ceph_osd_feature_ro_compat
;
130 CompatSet::FeatureSet ceph_osd_feature_incompat
;
131 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE
);
132 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO
);
133 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC
);
134 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC
);
135 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES
);
136 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL
);
137 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO
);
138 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO
);
139 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG
);
140 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER
);
141 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS
);
142 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA
);
143 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING
);
144 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO
);
145 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES
);
146 return CompatSet(ceph_osd_feature_compat
,
147 ceph_osd_feature_ro_compat
,
148 ceph_osd_feature_incompat
);
152 seastar::future
<> OSD::mkfs(uuid_d osd_uuid
, uuid_d cluster_fsid
)
154 return store
.start().then([this, osd_uuid
] {
155 return store
.mkfs(osd_uuid
).handle_error(
156 crimson::stateful_ec::handle([] (const auto& ec
) {
157 logger().error("error creating empty object store in {}: ({}) {}",
158 local_conf().get_val
<std::string
>("osd_data"),
159 ec
.value(), ec
.message());
160 std::exit(EXIT_FAILURE
);
163 return store
.mount().handle_error(
164 crimson::stateful_ec::handle([] (const auto& ec
) {
165 logger().error("error mounting object store in {}: ({}) {}",
166 local_conf().get_val
<std::string
>("osd_data"),
167 ec
.value(), ec
.message());
168 std::exit(EXIT_FAILURE
);
170 }).then([cluster_fsid
, this] {
171 superblock
.cluster_fsid
= cluster_fsid
;
172 superblock
.osd_fsid
= store
.get_fsid();
173 superblock
.whoami
= whoami
;
174 superblock
.compat_features
= get_osd_initial_compat_set();
175 return _write_superblock();
176 }).then([cluster_fsid
, this] {
177 return store
.write_meta("ceph_fsid", cluster_fsid
.to_string());
179 return store
.write_meta("magic", CEPH_OSD_ONDISK_MAGIC
);
181 return store
.write_meta("whoami", std::to_string(whoami
));
183 return _write_key_meta();
185 return store
.write_meta("ready", "ready");
186 }).then([cluster_fsid
, this] {
187 fmt::print("created object store {} for osd.{} fsid {}\n",
188 local_conf().get_val
<std::string
>("osd_data"),
189 whoami
, cluster_fsid
);
190 return seastar::now();
194 seastar::future
<> OSD::_write_superblock()
196 return store
.open_collection(coll_t::meta()).then([this] (auto ch
) {
198 // if we already have superblock, check if it matches
199 meta_coll
= make_unique
<OSDMeta
>(ch
, store
);
200 return meta_coll
->load_superblock().then([this](OSDSuperblock
&& sb
) {
201 if (sb
.cluster_fsid
!= superblock
.cluster_fsid
) {
202 logger().error("provided cluster fsid {} != superblock's {}",
203 sb
.cluster_fsid
, superblock
.cluster_fsid
);
204 throw std::invalid_argument("mismatched fsid");
206 if (sb
.whoami
!= superblock
.whoami
) {
207 logger().error("provided osd id {} != superblock's {}",
208 sb
.whoami
, superblock
.whoami
);
209 throw std::invalid_argument("mismatched osd id");
213 // meta collection does not yet, create superblock
215 "{} writing superblock cluster_fsid {} osd_fsid {}",
217 superblock
.cluster_fsid
,
218 superblock
.osd_fsid
);
219 return store
.create_new_collection(coll_t::meta()).then([this] (auto ch
) {
220 meta_coll
= make_unique
<OSDMeta
>(ch
, store
);
221 ceph::os::Transaction t
;
222 meta_coll
->create(t
);
223 meta_coll
->store_superblock(t
, superblock
);
224 logger().debug("OSD::_write_superblock: do_transaction...");
225 return store
.do_transaction(meta_coll
->collection(), std::move(t
));
231 // this `to_string` sits in the `crimson::osd` namespace, so we don't brake
232 // the language rule on not overloading in `std::`.
233 static std::string
to_string(const seastar::temporary_buffer
<char>& temp_buf
)
235 return {temp_buf
.get(), temp_buf
.size()};
238 seastar::future
<> OSD::_write_key_meta()
241 if (auto key
= local_conf().get_val
<std::string
>("key"); !std::empty(key
)) {
242 return store
.write_meta("osd_key", key
);
243 } else if (auto keyfile
= local_conf().get_val
<std::string
>("keyfile");
244 !std::empty(keyfile
)) {
245 return read_file(keyfile
).then([this] (const auto& temp_buf
) {
246 // it's on a truly cold path, so don't worry about memcpy.
247 return store
.write_meta("osd_key", to_string(temp_buf
));
248 }).handle_exception([keyfile
] (auto ep
) {
249 logger().error("_write_key_meta: failed to handle keyfile {}: {}",
254 return seastar::now();
259 entity_addrvec_t
pick_addresses(int what
) {
260 entity_addrvec_t addrs
;
261 crimson::common::CephContext cct
;
262 // we're interested solely in v2; crimson doesn't do v1
263 const auto flags
= what
| CEPH_PICK_ADDRESS_MSGR2
;
264 if (int r
= ::pick_addresses(&cct
, flags
, &addrs
, -1); r
< 0) {
265 throw std::runtime_error("failed to pick address");
267 for (auto addr
: addrs
.v
) {
268 logger().info("picked address {}", addr
);
272 std::pair
<entity_addrvec_t
, bool>
273 replace_unknown_addrs(entity_addrvec_t maybe_unknowns
,
274 const entity_addrvec_t
& knowns
) {
275 bool changed
= false;
276 auto maybe_replace
= [&](entity_addr_t addr
) {
277 if (!addr
.is_blank_ip()) {
280 for (auto& b
: knowns
.v
) {
281 if (addr
.get_family() == b
.get_family()) {
283 a
.set_nonce(addr
.get_nonce());
284 a
.set_type(addr
.get_type());
285 a
.set_port(addr
.get_port());
290 throw std::runtime_error("failed to replace unknown address");
292 entity_addrvec_t replaced
;
293 std::transform(maybe_unknowns
.v
.begin(),
294 maybe_unknowns
.v
.end(),
295 std::back_inserter(replaced
.v
),
297 return {replaced
, changed
};
301 seastar::future
<> OSD::start()
303 logger().info("start");
305 startup_time
= ceph::mono_clock::now();
307 return store
.start().then([this] {
308 return store
.mount().handle_error(
309 crimson::stateful_ec::handle([] (const auto& ec
) {
310 logger().error("error mounting object store in {}: ({}) {}",
311 local_conf().get_val
<std::string
>("osd_data"),
312 ec
.value(), ec
.message());
313 std::exit(EXIT_FAILURE
);
316 return store
.open_collection(coll_t::meta());
317 }).then([this](auto ch
) {
318 meta_coll
= make_unique
<OSDMeta
>(ch
, store
);
319 return meta_coll
->load_superblock();
320 }).then([this](OSDSuperblock
&& sb
) {
321 superblock
= std::move(sb
);
322 return get_map(superblock
.current_epoch
);
323 }).then([this](cached_map_t
&& map
) {
324 shard_services
.update_map(map
);
325 osdmap_gate
.got_map(map
->get_epoch());
326 osdmap
= std::move(map
);
327 bind_epoch
= osdmap
->get_epoch();
331 uint64_t osd_required
=
333 CEPH_FEATURE_PGID64
|
335 using crimson::net::SocketPolicy
;
337 public_msgr
->set_default_policy(SocketPolicy::stateless_server(0));
338 public_msgr
->set_policy(entity_name_t::TYPE_MON
,
339 SocketPolicy::lossy_client(osd_required
));
340 public_msgr
->set_policy(entity_name_t::TYPE_MGR
,
341 SocketPolicy::lossy_client(osd_required
));
342 public_msgr
->set_policy(entity_name_t::TYPE_OSD
,
343 SocketPolicy::stateless_server(0));
345 cluster_msgr
->set_default_policy(SocketPolicy::stateless_server(0));
346 cluster_msgr
->set_policy(entity_name_t::TYPE_MON
,
347 SocketPolicy::lossy_client(0));
348 cluster_msgr
->set_policy(entity_name_t::TYPE_OSD
,
349 SocketPolicy::lossless_peer(osd_required
));
350 cluster_msgr
->set_policy(entity_name_t::TYPE_CLIENT
,
351 SocketPolicy::stateless_server(0));
353 crimson::net::dispatchers_t dispatchers
{this, monc
.get(), mgrc
.get()};
354 return seastar::when_all_succeed(
355 cluster_msgr
->bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER
))
356 .safe_then([this, dispatchers
]() mutable {
357 return cluster_msgr
->start(dispatchers
);
358 }, crimson::net::Messenger::bind_ertr::all_same_way(
359 [] (const std::error_code
& e
) {
360 logger().error("cluster messenger bind(): {}", e
);
363 public_msgr
->bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC
))
364 .safe_then([this, dispatchers
]() mutable {
365 return public_msgr
->start(dispatchers
);
366 }, crimson::net::Messenger::bind_ertr::all_same_way(
367 [] (const std::error_code
& e
) {
368 logger().error("public messenger bind(): {}", e
);
371 }).then_unpack([this] {
372 return seastar::when_all_succeed(monc
->start(),
374 }).then_unpack([this] {
375 return _add_me_to_crush();
377 monc
->sub_want("osd_pg_creates", last_pg_create_epoch
, 0);
378 monc
->sub_want("mgrmap", 0, 0);
379 monc
->sub_want("osdmap", 0, 0);
380 return monc
->renew_subs();
382 if (auto [addrs
, changed
] =
383 replace_unknown_addrs(cluster_msgr
->get_myaddrs(),
384 public_msgr
->get_myaddrs()); changed
) {
385 logger().debug("replacing unkwnown addrs of cluster messenger");
386 return cluster_msgr
->set_myaddrs(addrs
);
388 return seastar::now();
391 return heartbeat
->start(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC
),
392 pick_addresses(CEPH_PICK_ADDRESS_CLUSTER
));
394 // create the admin-socket server, and the objects that register
395 // to handle incoming commands
396 return start_asok_admin();
398 return log_client
.set_fsid(monc
->get_fsid());
404 seastar::future
<> OSD::start_boot()
407 return monc
->get_version("osdmap").then([this](auto&& ret
) {
408 auto [newest
, oldest
] = ret
;
409 return _preboot(oldest
, newest
);
413 seastar::future
<> OSD::_preboot(version_t oldest
, version_t newest
)
415 logger().info("osd.{}: _preboot", whoami
);
416 if (osdmap
->get_epoch() == 0) {
417 logger().info("waiting for initial osdmap");
418 } else if (osdmap
->is_destroyed(whoami
)) {
419 logger().warn("osdmap says I am destroyed");
420 // provide a small margin so we don't livelock seeing if we
421 // un-destroyed ourselves.
422 if (osdmap
->get_epoch() > newest
- 1) {
423 throw std::runtime_error("i am destroyed");
425 } else if (osdmap
->is_noup(whoami
)) {
426 logger().warn("osdmap NOUP flag is set, waiting for it to clear");
427 } else if (!osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
)) {
428 logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
429 } else if (osdmap
->require_osd_release
< ceph_release_t::octopus
) {
430 logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
432 // TODO: update mon if current fullness state is different from osdmap
433 } else if (version_t n
= local_conf()->osd_map_message_max
;
434 osdmap
->get_epoch() >= oldest
- 1 &&
435 osdmap
->get_epoch() + n
> newest
) {
438 // get all the latest maps
439 if (osdmap
->get_epoch() + 1 >= oldest
) {
440 return shard_services
.osdmap_subscribe(osdmap
->get_epoch() + 1, false);
442 return shard_services
.osdmap_subscribe(oldest
- 1, true);
446 seastar::future
<> OSD::_send_boot()
450 entity_addrvec_t public_addrs
= public_msgr
->get_myaddrs();
451 entity_addrvec_t cluster_addrs
= cluster_msgr
->get_myaddrs();
452 entity_addrvec_t hb_back_addrs
= heartbeat
->get_back_addrs();
453 entity_addrvec_t hb_front_addrs
= heartbeat
->get_front_addrs();
454 if (cluster_msgr
->set_addr_unknowns(public_addrs
)) {
455 cluster_addrs
= cluster_msgr
->get_myaddrs();
457 if (heartbeat
->get_back_msgr()->set_addr_unknowns(cluster_addrs
)) {
458 hb_back_addrs
= heartbeat
->get_back_addrs();
460 if (heartbeat
->get_front_msgr()->set_addr_unknowns(public_addrs
)) {
461 hb_front_addrs
= heartbeat
->get_front_addrs();
463 logger().info("hb_back_msgr: {}", hb_back_addrs
);
464 logger().info("hb_front_msgr: {}", hb_front_addrs
);
465 logger().info("cluster_msgr: {}", cluster_addrs
);
467 auto m
= crimson::make_message
<MOSDBoot
>(superblock
,
474 collect_sys_info(&m
->metadata
, NULL
);
475 return monc
->send_message(std::move(m
));
478 seastar::future
<> OSD::_add_me_to_crush()
480 if (!local_conf().get_val
<bool>("osd_crush_update_on_start")) {
481 return seastar::now();
483 auto get_weight
= [this] {
484 if (auto w
= local_conf().get_val
<double>("osd_crush_initial_weight");
486 return seastar::make_ready_future
<double>(w
);
488 return store
.stat().then([](auto st
) {
489 auto total
= st
.total
;
490 return seastar::make_ready_future
<double>(
492 double(total
) / double(1ull << 40))); // TB
496 return get_weight().then([this](auto weight
) {
497 const crimson::crush::CrushLocation loc
{make_unique
<CephContext
>().get()};
498 logger().info("{} crush location is {}", __func__
, loc
);
499 string cmd
= fmt::format(R
"({{
500 "prefix
": "osd crush create
-or-move
",
504 }})", whoami
, weight
, loc
);
505 return monc
->run_command(std::move(cmd
), {});
506 }).then([](auto&& command_result
) {
507 [[maybe_unused
]] auto [code
, message
, out
] = std::move(command_result
);
509 logger().warn("fail to add to crush: {} ({})", message
, code
);
510 throw std::runtime_error("fail to add to crush");
512 logger().info("added to crush: {}", message
);
514 return seastar::now();
518 seastar::future
<> OSD::handle_command(crimson::net::ConnectionRef conn
,
521 return asok
->handle_command(conn
, std::move(m
));
525 The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
526 to handle) registered to it:
527 - OSD's specific commands are handled by the OSD object;
528 - there are some common commands registered to be directly handled by the AdminSocket object
531 seastar::future
<> OSD::start_asok_admin()
533 auto asok_path
= local_conf().get_val
<std::string
>("admin_socket");
534 using namespace crimson::admin
;
535 return asok
->start(asok_path
).then([this] {
536 asok
->register_admin_commands();
537 asok
->register_command(make_asok_hook
<OsdStatusHook
>(std::as_const(*this)));
538 asok
->register_command(make_asok_hook
<SendBeaconHook
>(*this));
539 asok
->register_command(make_asok_hook
<FlushPgStatsHook
>(*this));
540 asok
->register_command(make_asok_hook
<DumpPGStateHistory
>(std::as_const(*this)));
541 asok
->register_command(make_asok_hook
<DumpMetricsHook
>());
542 asok
->register_command(make_asok_hook
<DumpPerfCountersHook
>());
543 asok
->register_command(make_asok_hook
<InjectDataErrorHook
>(get_shard_services()));
544 asok
->register_command(make_asok_hook
<InjectMDataErrorHook
>(get_shard_services()));
546 asok
->register_command(make_asok_hook
<pg::QueryCommand
>(*this));
547 asok
->register_command(make_asok_hook
<pg::MarkUnfoundLostCommand
>(*this));
551 seastar::future
<> OSD::stop()
553 logger().info("stop");
554 beacon_timer
.cancel();
556 // see also OSD::shutdown()
557 return prepare_to_stop().then([this] {
558 state
.set_stopping();
559 logger().debug("prepared to stop");
561 cluster_msgr
->stop();
562 auto gate_close_fut
= gate
.close();
563 return asok
->stop().then([this] {
564 return heartbeat
->stop();
566 return store
.umount();
570 return seastar::parallel_for_each(pg_map
.get_pgs(),
572 return p
.second
->stop();
578 }).then([fut
=std::move(gate_close_fut
)]() mutable {
579 return std::move(fut
);
581 return when_all_succeed(
582 public_msgr
->shutdown(),
583 cluster_msgr
->shutdown()).discard_result();
584 }).handle_exception([](auto ep
) {
585 logger().error("error while stopping osd: {}", ep
);
590 void OSD::dump_status(Formatter
* f
) const
592 f
->dump_stream("cluster_fsid") << superblock
.cluster_fsid
;
593 f
->dump_stream("osd_fsid") << superblock
.osd_fsid
;
594 f
->dump_unsigned("whoami", superblock
.whoami
);
595 f
->dump_string("state", state
.to_string());
596 f
->dump_unsigned("oldest_map", superblock
.oldest_map
);
597 f
->dump_unsigned("newest_map", superblock
.newest_map
);
598 f
->dump_unsigned("num_pgs", pg_map
.get_pgs().size());
601 void OSD::dump_pg_state_history(Formatter
* f
) const
603 f
->open_array_section("pgs");
604 for (auto [pgid
, pg
] : pg_map
.get_pgs()) {
605 f
->open_object_section("pg");
606 f
->dump_stream("pg") << pgid
;
607 const auto& peering_state
= pg
->get_peering_state();
608 f
->dump_string("currently", peering_state
.get_current_state());
609 peering_state
.dump_history(f
);
615 void OSD::print(std::ostream
& out
) const
617 out
<< "{osd." << superblock
.whoami
<< " "
618 << superblock
.osd_fsid
<< " [" << superblock
.oldest_map
619 << "," << superblock
.newest_map
<< "] " << pg_map
.get_pgs().size()
623 seastar::future
<> OSD::load_pgs()
625 return store
.list_collections().then([this](auto colls
) {
626 return seastar::parallel_for_each(colls
, [this](auto coll
) {
628 if (coll
.is_pg(&pgid
)) {
629 return load_pg(pgid
).then([pgid
, this](auto&& pg
) {
630 logger().info("load_pgs: loaded {}", pgid
);
631 pg_map
.pg_loaded(pgid
, std::move(pg
));
632 shard_services
.inc_pg_num();
633 return seastar::now();
635 } else if (coll
.is_temp(&pgid
)) {
636 // TODO: remove the collection
637 return seastar::now();
639 logger().warn("ignoring unrecognized collection: {}", coll
);
640 return seastar::now();
646 seastar::future
<Ref
<PG
>> OSD::make_pg(cached_map_t create_map
,
650 using ec_profile_t
= map
<string
,string
>;
651 auto get_pool_info
= [create_map
, pgid
, this] {
652 if (create_map
->have_pg_pool(pgid
.pool())) {
653 pg_pool_t pi
= *create_map
->get_pg_pool(pgid
.pool());
654 string name
= create_map
->get_pool_name(pgid
.pool());
655 ec_profile_t ec_profile
;
656 if (pi
.is_erasure()) {
657 ec_profile
= create_map
->get_erasure_code_profile(pi
.erasure_code_profile
);
659 return seastar::make_ready_future
<std::tuple
<pg_pool_t
, string
, ec_profile_t
>>(
660 std::make_tuple(std::move(pi
),
662 std::move(ec_profile
)));
664 // pool was deleted; grab final pg_pool_t off disk.
665 return meta_coll
->load_final_pool_info(pgid
.pool());
668 auto get_collection
= [pgid
, do_create
, this] {
669 const coll_t cid
{pgid
};
671 return store
.create_new_collection(cid
);
673 return store
.open_collection(cid
);
676 return seastar::when_all(
677 std::move(get_pool_info
),
678 std::move(get_collection
)
679 ).then([pgid
, create_map
, this] (auto&& ret
) {
680 auto [pool
, name
, ec_profile
] = std::move(std::get
<0>(ret
).get0());
681 auto coll
= std::move(std::get
<1>(ret
).get0());
682 return seastar::make_ready_future
<Ref
<PG
>>(
684 pg_shard_t
{whoami
, pgid
.shard
},
694 seastar::future
<Ref
<PG
>> OSD::load_pg(spg_t pgid
)
696 logger().debug("{}: {}", __func__
, pgid
);
698 return seastar::do_with(PGMeta(store
, pgid
), [](auto& pg_meta
) {
699 return pg_meta
.get_epoch();
700 }).then([this](epoch_t e
) {
702 }).then([pgid
, this] (auto&& create_map
) {
703 return make_pg(std::move(create_map
), pgid
, false);
704 }).then([this](Ref
<PG
> pg
) {
705 return pg
->read_state(&store
).then([pg
] {
706 return seastar::make_ready_future
<Ref
<PG
>>(std::move(pg
));
708 }).handle_exception([pgid
](auto ep
) {
709 logger().info("pg {} saw exception on load {}", pgid
, ep
);
710 ceph_abort("Could not load pg" == 0);
711 return seastar::make_exception_future
<Ref
<PG
>>(ep
);
715 std::optional
<seastar::future
<>>
716 OSD::ms_dispatch(crimson::net::ConnectionRef conn
, MessageRef m
)
718 if (state
.is_stopping()) {
721 bool dispatched
= true;
722 gate
.dispatch_in_background(__func__
, *this, [this, conn
, &m
, &dispatched
] {
723 switch (m
->get_type()) {
724 case CEPH_MSG_OSD_MAP
:
725 return handle_osd_map(conn
, boost::static_pointer_cast
<MOSDMap
>(m
));
726 case CEPH_MSG_OSD_OP
:
727 return handle_osd_op(conn
, boost::static_pointer_cast
<MOSDOp
>(m
));
728 case MSG_OSD_PG_CREATE2
:
729 shard_services
.start_operation
<CompoundPeeringRequest
>(
733 return seastar::now();
735 return handle_command(conn
, boost::static_pointer_cast
<MCommand
>(m
));
736 case MSG_OSD_MARK_ME_DOWN
:
737 return handle_mark_me_down(conn
, boost::static_pointer_cast
<MOSDMarkMeDown
>(m
));
738 case MSG_OSD_PG_PULL
:
740 case MSG_OSD_PG_PUSH
:
742 case MSG_OSD_PG_PUSH_REPLY
:
744 case MSG_OSD_PG_RECOVERY_DELETE
:
746 case MSG_OSD_PG_RECOVERY_DELETE_REPLY
:
748 case MSG_OSD_PG_SCAN
:
750 case MSG_OSD_PG_BACKFILL
:
752 case MSG_OSD_PG_BACKFILL_REMOVE
:
753 return handle_recovery_subreq(conn
, boost::static_pointer_cast
<MOSDFastDispatchOp
>(m
));
754 case MSG_OSD_PG_LEASE
:
756 case MSG_OSD_PG_LEASE_ACK
:
758 case MSG_OSD_PG_NOTIFY2
:
760 case MSG_OSD_PG_INFO2
:
762 case MSG_OSD_PG_QUERY2
:
764 case MSG_OSD_BACKFILL_RESERVE
:
766 case MSG_OSD_RECOVERY_RESERVE
:
769 return handle_peering_op(conn
, boost::static_pointer_cast
<MOSDPeeringOp
>(m
));
771 return handle_rep_op(conn
, boost::static_pointer_cast
<MOSDRepOp
>(m
));
772 case MSG_OSD_REPOPREPLY
:
773 return handle_rep_op_reply(conn
, boost::static_pointer_cast
<MOSDRepOpReply
>(m
));
775 return handle_scrub(conn
, boost::static_pointer_cast
<MOSDScrub2
>(m
));
778 return seastar::now();
781 return (dispatched
? std::make_optional(seastar::now()) : std::nullopt
);
784 void OSD::ms_handle_reset(crimson::net::ConnectionRef conn
, bool is_replace
)
786 // TODO: cleanup the session attached to this connection
787 logger().warn("ms_handle_reset");
790 void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn
)
792 logger().warn("ms_handle_remote_reset");
795 void OSD::handle_authentication(const EntityName
& name
,
796 const AuthCapsInfo
& caps_info
)
798 // TODO: store the parsed cap and associate it with the connection
799 if (caps_info
.allow_all
) {
800 logger().debug("{} {} has all caps", __func__
, name
);
803 if (caps_info
.caps
.length() > 0) {
804 auto p
= caps_info
.caps
.cbegin();
808 } catch (ceph::buffer::error
& e
) {
809 logger().warn("{} {} failed to decode caps string", __func__
, name
);
813 if (caps
.parse(str
)) {
814 logger().debug("{} {} has caps {}", __func__
, name
, str
);
816 logger().warn("{} {} failed to parse caps {}", __func__
, name
, str
);
821 void OSD::update_stats()
824 osd_stat
.up_from
= get_up_epoch();
825 osd_stat
.hb_peers
= heartbeat
->get_peers();
826 osd_stat
.seq
= (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq
;
827 gate
.dispatch_in_background("statfs", *this, [this] {
828 (void) store
.stat().then([this](store_statfs_t
&& st
) {
829 osd_stat
.statfs
= st
;
834 MessageURef
OSD::get_stats() const
836 // todo: m-to-n: collect stats using map-reduce
837 // MPGStats::had_map_for is not used since PGMonitor was removed
838 auto m
= crimson::make_message
<MPGStats
>(monc
->get_fsid(), osdmap
->get_epoch());
839 m
->osd_stat
= osd_stat
;
840 for (auto [pgid
, pg
] : pg_map
.get_pgs()) {
841 if (pg
->is_primary()) {
842 auto stats
= pg
->get_stats();
843 // todo: update reported_epoch,reported_seq,last_fresh
844 stats
.reported_epoch
= osdmap
->get_epoch();
845 m
->pg_stat
.emplace(pgid
.pgid
, std::move(stats
));
851 uint64_t OSD::send_pg_stats()
853 // mgr client sends the report message in background
858 OSD::cached_map_t
OSD::get_map() const
863 seastar::future
<OSD::cached_map_t
> OSD::get_map(epoch_t e
)
865 // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
866 if (auto found
= osdmaps
.find(e
); found
) {
867 return seastar::make_ready_future
<cached_map_t
>(std::move(found
));
869 return load_map(e
).then([e
, this](unique_ptr
<OSDMap
> osdmap
) {
870 return seastar::make_ready_future
<cached_map_t
>(
871 osdmaps
.insert(e
, std::move(osdmap
)));
876 void OSD::store_map_bl(ceph::os::Transaction
& t
,
877 epoch_t e
, bufferlist
&& bl
)
879 meta_coll
->store_map(t
, e
, bl
);
880 map_bl_cache
.insert(e
, std::move(bl
));
883 seastar::future
<bufferlist
> OSD::load_map_bl(epoch_t e
)
885 if (std::optional
<bufferlist
> found
= map_bl_cache
.find(e
); found
) {
886 return seastar::make_ready_future
<bufferlist
>(*found
);
888 return meta_coll
->load_map(e
);
892 seastar::future
<std::map
<epoch_t
, bufferlist
>> OSD::load_map_bls(
896 return seastar::map_reduce(boost::make_counting_iterator
<epoch_t
>(first
),
897 boost::make_counting_iterator
<epoch_t
>(last
+ 1),
899 return load_map_bl(e
).then([e
](auto&& bl
) {
900 return seastar::make_ready_future
<pair
<epoch_t
, bufferlist
>>(
901 std::make_pair(e
, std::move(bl
)));
904 std::map
<epoch_t
, bufferlist
>{},
905 [](auto&& bls
, auto&& epoch_bl
) {
906 bls
.emplace(std::move(epoch_bl
));
907 return std::move(bls
);
911 seastar::future
<std::unique_ptr
<OSDMap
>> OSD::load_map(epoch_t e
)
913 auto o
= std::make_unique
<OSDMap
>();
915 return load_map_bl(e
).then([o
=std::move(o
)](bufferlist bl
) mutable {
917 return seastar::make_ready_future
<unique_ptr
<OSDMap
>>(std::move(o
));
920 return seastar::make_ready_future
<unique_ptr
<OSDMap
>>(std::move(o
));
924 seastar::future
<> OSD::store_maps(ceph::os::Transaction
& t
,
925 epoch_t start
, Ref
<MOSDMap
> m
)
927 return seastar::do_for_each(boost::make_counting_iterator(start
),
928 boost::make_counting_iterator(m
->get_last() + 1),
929 [&t
, m
, this](epoch_t e
) {
930 if (auto p
= m
->maps
.find(e
); p
!= m
->maps
.end()) {
931 auto o
= std::make_unique
<OSDMap
>();
932 o
->decode(p
->second
);
933 logger().info("store_maps osdmap.{}", e
);
934 store_map_bl(t
, e
, std::move(std::move(p
->second
)));
935 osdmaps
.insert(e
, std::move(o
));
936 return seastar::now();
937 } else if (auto p
= m
->incremental_maps
.find(e
);
938 p
!= m
->incremental_maps
.end()) {
939 return load_map(e
- 1).then([e
, bl
=p
->second
, &t
, this](auto o
) {
940 OSDMap::Incremental inc
;
941 auto i
= bl
.cbegin();
943 o
->apply_incremental(inc
);
945 o
->encode(fbl
, inc
.encode_features
| CEPH_FEATURE_RESERVED
);
946 store_map_bl(t
, e
, std::move(fbl
));
947 osdmaps
.insert(e
, std::move(o
));
948 return seastar::now();
951 logger().error("MOSDMap lied about what maps it had?");
952 return seastar::now();
957 bool OSD::require_mon_peer(crimson::net::Connection
*conn
, Ref
<Message
> m
)
959 if (!conn
->peer_is_mon()) {
960 logger().info("{} received from non-mon {}, {}",
962 conn
->get_peer_addr(),
969 seastar::future
<Ref
<PG
>> OSD::handle_pg_create_info(
970 std::unique_ptr
<PGCreateInfo
> info
) {
971 return seastar::do_with(
973 [this](auto &info
) -> seastar::future
<Ref
<PG
>> {
974 return get_map(info
->epoch
).then(
975 [&info
, this](cached_map_t startmap
) ->
976 seastar::future
<std::tuple
<Ref
<PG
>, cached_map_t
>> {
977 const spg_t
&pgid
= info
->pgid
;
979 int64_t pool_id
= pgid
.pgid
.pool();
980 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
983 "{} ignoring pgid {}, pool dne",
986 return seastar::make_ready_future
<std::tuple
<Ref
<PG
>, cached_map_t
>>(
987 std::make_tuple(Ref
<PG
>(), startmap
));
989 ceph_assert(osdmap
->require_osd_release
>= ceph_release_t::octopus
);
990 if (!pool
->has_flag(pg_pool_t::FLAG_CREATING
)) {
991 // this ensures we do not process old creating messages after the
992 // pool's initial pgs have been created (and pg are subsequently
993 // allowed to split or merge).
995 "{} dropping {} create, pool does not have CREATING flag set",
998 return seastar::make_ready_future
<std::tuple
<Ref
<PG
>, cached_map_t
>>(
999 std::make_tuple(Ref
<PG
>(), startmap
));
1002 return make_pg(startmap
, pgid
, true).then(
1003 [startmap
=std::move(startmap
)](auto pg
) mutable {
1004 return seastar::make_ready_future
<std::tuple
<Ref
<PG
>, cached_map_t
>>(
1005 std::make_tuple(std::move(pg
), std::move(startmap
)));
1007 }).then([this, &info
](auto&& ret
) ->
1008 seastar::future
<Ref
<PG
>> {
1009 auto [pg
, startmap
] = std::move(ret
);
1011 return seastar::make_ready_future
<Ref
<PG
>>(Ref
<PG
>());
1012 const pg_pool_t
* pp
= startmap
->get_pg_pool(info
->pgid
.pool());
1014 int up_primary
, acting_primary
;
1015 vector
<int> up
, acting
;
1016 startmap
->pg_to_up_acting_osds(
1017 info
->pgid
.pgid
, &up
, &up_primary
, &acting
, &acting_primary
);
1019 int role
= startmap
->calc_pg_role(pg_shard_t(whoami
, info
->pgid
.shard
),
1023 create_pg_collection(
1026 info
->pgid
.get_split_bits(pp
->get_pg_num()));
1039 info
->past_intervals
,
1042 return shard_services
.start_operation
<PGAdvanceMap
>(
1043 *this, pg
, pg
->get_osdmap_epoch(),
1044 osdmap
->get_epoch(), std::move(rctx
), true).second
.then([pg
=pg
] {
1045 return seastar::make_ready_future
<Ref
<PG
>>(pg
);
1051 seastar::future
<> OSD::handle_osd_map(crimson::net::ConnectionRef conn
,
1054 logger().info("handle_osd_map {}", *m
);
1055 if (m
->fsid
!= superblock
.cluster_fsid
) {
1056 logger().warn("fsid mismatched");
1057 return seastar::now();
1059 if (state
.is_initializing()) {
1060 logger().warn("i am still initializing");
1061 return seastar::now();
1064 const auto first
= m
->get_first();
1065 const auto last
= m
->get_last();
1066 logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
1067 first
, last
, superblock
.newest_map
, m
->oldest_map
, m
->newest_map
);
1068 // make sure there is something new, here, before we bother flushing
1069 // the queues and such
1070 if (last
<= superblock
.newest_map
) {
1071 return seastar::now();
1074 bool skip_maps
= false;
1075 epoch_t start
= superblock
.newest_map
+ 1;
1076 if (first
> start
) {
1077 logger().info("handle_osd_map message skips epochs {}..{}",
1079 if (m
->oldest_map
<= start
) {
1080 return shard_services
.osdmap_subscribe(start
, false);
1082 // always try to get the full range of maps--as many as we can. this
1083 // 1- is good to have
1084 // 2- is at present the only way to ensure that we get a *full* map as
1086 if (m
->oldest_map
< first
) {
1087 return shard_services
.osdmap_subscribe(m
->oldest_map
- 1, true);
1093 return seastar::do_with(ceph::os::Transaction
{},
1095 return store_maps(t
, start
, m
).then([=, &t
] {
1096 // even if this map isn't from a mon, we may have satisfied our subscription
1097 monc
->sub_got("osdmap", last
);
1098 if (!superblock
.oldest_map
|| skip_maps
) {
1099 superblock
.oldest_map
= first
;
1101 superblock
.newest_map
= last
;
1102 superblock
.current_epoch
= last
;
1104 // note in the superblock that we were clean thru the prior epoch
1105 if (boot_epoch
&& boot_epoch
>= superblock
.mounted
) {
1106 superblock
.mounted
= boot_epoch
;
1107 superblock
.clean_thru
= last
;
1109 meta_coll
->store_superblock(t
, superblock
);
1110 logger().debug("OSD::handle_osd_map: do_transaction...");
1111 return store
.do_transaction(meta_coll
->collection(), std::move(t
));
1114 // TODO: write to superblock and commit the transaction
1115 return committed_osd_maps(start
, last
, m
);
1119 seastar::future
<> OSD::committed_osd_maps(version_t first
,
1123 logger().info("osd.{}: committed_osd_maps({}, {})", whoami
, first
, last
);
1124 // advance through the new maps
1125 return seastar::do_for_each(boost::make_counting_iterator(first
),
1126 boost::make_counting_iterator(last
+ 1),
1127 [this](epoch_t cur
) {
1128 return get_map(cur
).then([this](cached_map_t
&& o
) {
1129 osdmap
= std::move(o
);
1130 shard_services
.update_map(osdmap
);
1131 if (up_epoch
== 0 &&
1132 osdmap
->is_up(whoami
) &&
1133 osdmap
->get_addrs(whoami
) == public_msgr
->get_myaddrs()) {
1134 up_epoch
= osdmap
->get_epoch();
1136 boot_epoch
= osdmap
->get_epoch();
1141 if (osdmap
->is_up(whoami
)) {
1142 const auto up_from
= osdmap
->get_up_from(whoami
);
1143 logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
1144 whoami
, osdmap
->get_epoch(), up_from
, bind_epoch
, state
);
1145 if (bind_epoch
< up_from
&&
1146 osdmap
->get_addrs(whoami
) == public_msgr
->get_myaddrs() &&
1147 state
.is_booting()) {
1148 logger().info("osd.{}: activating...", whoami
);
1150 beacon_timer
.arm_periodic(
1151 std::chrono::seconds(local_conf()->osd_beacon_report_interval
));
1152 tick_timer
.arm_periodic(
1153 std::chrono::seconds(TICK_INTERVAL
));
1156 if (state
.is_prestop()) {
1158 return seastar::now();
1161 return check_osdmap_features().then([this] {
1163 return consume_map(osdmap
->get_epoch());
1166 if (state
.is_active()) {
1167 logger().info("osd.{}: now active", whoami
);
1168 if (!osdmap
->exists(whoami
) ||
1169 osdmap
->is_stop(whoami
)) {
1172 if (should_restart()) {
1175 return seastar::now();
1177 } else if (state
.is_preboot()) {
1178 logger().info("osd.{}: now preboot", whoami
);
1180 if (m
->get_source().is_mon()) {
1181 return _preboot(m
->oldest_map
, m
->newest_map
);
1183 logger().info("osd.{}: start_boot", whoami
);
1184 return start_boot();
1187 logger().info("osd.{}: now {}", whoami
, state
);
1189 return seastar::now();
1194 seastar::future
<> OSD::handle_osd_op(crimson::net::ConnectionRef conn
,
1197 (void) shard_services
.start_operation
<ClientRequest
>(
1201 return seastar::now();
1204 seastar::future
<> OSD::send_incremental_map(crimson::net::ConnectionRef conn
,
1207 if (first
>= superblock
.oldest_map
) {
1208 return load_map_bls(first
, superblock
.newest_map
)
1209 .then([this, conn
, first
](auto&& bls
) {
1210 auto m
= crimson::make_message
<MOSDMap
>(monc
->get_fsid(),
1211 osdmap
->get_encoding_features());
1212 m
->oldest_map
= first
;
1213 m
->newest_map
= superblock
.newest_map
;
1214 m
->maps
= std::move(bls
);
1215 return conn
->send(std::move(m
));
1218 return load_map_bl(osdmap
->get_epoch())
1219 .then([this, conn
](auto&& bl
) mutable {
1220 auto m
= crimson::make_message
<MOSDMap
>(monc
->get_fsid(),
1221 osdmap
->get_encoding_features());
1222 m
->oldest_map
= superblock
.oldest_map
;
1223 m
->newest_map
= superblock
.newest_map
;
1224 m
->maps
.emplace(osdmap
->get_epoch(), std::move(bl
));
1225 return conn
->send(std::move(m
));
1230 seastar::future
<> OSD::handle_rep_op(crimson::net::ConnectionRef conn
,
1234 (void) shard_services
.start_operation
<RepRequest
>(
1238 return seastar::now();
1241 seastar::future
<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn
,
1242 Ref
<MOSDRepOpReply
> m
)
1244 const auto& pgs
= pg_map
.get_pgs();
1245 if (auto pg
= pgs
.find(m
->get_spg()); pg
!= pgs
.end()) {
1247 pg
->second
->handle_rep_op_reply(conn
, *m
);
1249 logger().warn("stale reply: {}", *m
);
1251 return seastar::now();
1254 seastar::future
<> OSD::handle_scrub(crimson::net::ConnectionRef conn
,
1257 if (m
->fsid
!= superblock
.cluster_fsid
) {
1258 logger().warn("fsid mismatched");
1259 return seastar::now();
1261 return seastar::parallel_for_each(std::move(m
->scrub_pgs
),
1262 [m
, conn
, this](spg_t pgid
) {
1263 pg_shard_t from_shard
{static_cast<int>(m
->get_source().num()),
1265 PeeringState::RequestScrub scrub_request
{m
->deep
, m
->repair
};
1266 return shard_services
.start_operation
<RemotePeeringEvent
>(
1272 PGPeeringEvent
{m
->epoch
, m
->epoch
, scrub_request
}).second
;
1276 seastar::future
<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn
,
1277 Ref
<MOSDMarkMeDown
> m
)
1279 if (state
.is_prestop()) {
1282 return seastar::now();
1285 seastar::future
<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn
,
1286 Ref
<MOSDFastDispatchOp
> m
)
1288 (void) shard_services
.start_operation
<RecoverySubRequest
>(
1292 return seastar::now();
1295 bool OSD::should_restart() const
1297 if (!osdmap
->is_up(whoami
)) {
1298 logger().info("map e {} marked osd.{} down",
1299 osdmap
->get_epoch(), whoami
);
1301 } else if (osdmap
->get_addrs(whoami
) != public_msgr
->get_myaddrs()) {
1302 logger().error("map e {} had wrong client addr ({} != my {})",
1303 osdmap
->get_epoch(),
1304 osdmap
->get_addrs(whoami
),
1305 public_msgr
->get_myaddrs());
1307 } else if (osdmap
->get_cluster_addrs(whoami
) != cluster_msgr
->get_myaddrs()) {
1308 logger().error("map e {} had wrong cluster addr ({} != my {})",
1309 osdmap
->get_epoch(),
1310 osdmap
->get_cluster_addrs(whoami
),
1311 cluster_msgr
->get_myaddrs());
1318 seastar::future
<> OSD::restart()
1320 beacon_timer
.cancel();
1321 tick_timer
.cancel();
1323 bind_epoch
= osdmap
->get_epoch();
1324 // TODO: promote to shutdown if being marked down for multiple times
1325 // rebind messengers
1326 return start_boot();
1329 seastar::future
<> OSD::shutdown()
1332 superblock
.mounted
= boot_epoch
;
1333 superblock
.clean_thru
= osdmap
->get_epoch();
1334 return seastar::now();
1337 seastar::future
<> OSD::send_beacon()
1339 if (!state
.is_active()) {
1340 return seastar::now();
1342 // FIXME: min lec should be calculated from pg_stat
1343 // and should set m->pgs
1344 epoch_t min_last_epoch_clean
= osdmap
->get_epoch();
1345 auto m
= crimson::make_message
<MOSDBeacon
>(osdmap
->get_epoch(),
1346 min_last_epoch_clean
,
1347 superblock
.last_purged_snaps_scrub
,
1348 local_conf()->osd_beacon_report_interval
);
1349 return monc
->send_message(std::move(m
));
1352 void OSD::update_heartbeat_peers()
1354 if (!state
.is_active()) {
1357 for (auto& pg
: pg_map
.get_pgs()) {
1358 vector
<int> up
, acting
;
1359 osdmap
->pg_to_up_acting_osds(pg
.first
.pgid
,
1362 for (int osd
: boost::join(up
, acting
)) {
1363 if (osd
== CRUSH_ITEM_NONE
|| osd
== whoami
) {
1366 heartbeat
->add_peer(osd
, osdmap
->get_epoch());
1370 heartbeat
->update_peers(whoami
);
1373 seastar::future
<> OSD::handle_peering_op(
1374 crimson::net::ConnectionRef conn
,
1375 Ref
<MOSDPeeringOp
> m
)
1377 const int from
= m
->get_source().num();
1378 logger().debug("handle_peering_op on {} from {}", m
->get_spg(), from
);
1379 std::unique_ptr
<PGPeeringEvent
> evt(m
->get_event());
1380 (void) shard_services
.start_operation
<RemotePeeringEvent
>(
1384 pg_shard_t
{from
, m
->get_spg().shard
},
1387 return seastar::now();
1390 seastar::future
<> OSD::check_osdmap_features()
1392 heartbeat
->set_require_authorizer(true);
1393 return store
.write_meta("require_osd_release",
1394 stringify((int)osdmap
->require_osd_release
));
1397 seastar::future
<> OSD::consume_map(epoch_t epoch
)
1399 // todo: m-to-n: broadcast this news to all shards
1400 auto &pgs
= pg_map
.get_pgs();
1401 return seastar::parallel_for_each(pgs
.begin(), pgs
.end(), [=](auto& pg
) {
1402 return shard_services
.start_operation
<PGAdvanceMap
>(
1403 *this, pg
.second
, pg
.second
->get_osdmap_epoch(), epoch
,
1404 PeeringCtx
{}, false).second
;
1405 }).then([epoch
, this] {
1406 osdmap_gate
.got_map(epoch
);
1407 return seastar::make_ready_future();
1412 blocking_future
<Ref
<PG
>>
1413 OSD::get_or_create_pg(
1416 std::unique_ptr
<PGCreateInfo
> info
)
1419 auto [fut
, creating
] = pg_map
.wait_for_pg(pgid
);
1421 pg_map
.set_creating(pgid
);
1422 (void)handle_pg_create_info(std::move(info
));
1424 return std::move(fut
);
1426 return make_ready_blocking_future
<Ref
<PG
>>(pg_map
.get_pg(pgid
));
1430 blocking_future
<Ref
<PG
>> OSD::wait_for_pg(
1433 return pg_map
.wait_for_pg(pgid
).first
;
1436 Ref
<PG
> OSD::get_pg(spg_t pgid
)
1438 return pg_map
.get_pg(pgid
);
1441 seastar::future
<> OSD::prepare_to_stop()
1443 if (osdmap
&& osdmap
->is_up(whoami
)) {
1444 state
.set_prestop();
1445 const auto timeout
=
1446 std::chrono::duration_cast
<std::chrono::milliseconds
>(
1447 std::chrono::duration
<double>(
1448 local_conf().get_val
<double>("osd_mon_shutdown_timeout")));
1450 return seastar::with_timeout(
1451 seastar::timer
<>::clock::now() + timeout
,
1453 crimson::make_message
<MOSDMarkMeDown
>(
1456 osdmap
->get_addrs(whoami
),
1457 osdmap
->get_epoch(),
1458 true)).then([this] {
1459 return stop_acked
.get_future();
1461 ).handle_exception_type(
1462 [](seastar::timed_out_error
&) {
1463 return seastar::now();
1466 return seastar::now();