1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <sys/utsname.h>
8 #include <boost/iterator/counting_iterator.hpp>
9 #include <boost/range/join.hpp>
10 #include <boost/smart_ptr/make_local_shared.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
14 #include "common/pick_address.h"
15 #include "include/util.h"
17 #include "messages/MOSDAlive.h"
18 #include "messages/MOSDBeacon.h"
19 #include "messages/MOSDBoot.h"
20 #include "messages/MOSDMap.h"
21 #include "messages/MOSDOp.h"
22 #include "messages/MOSDPGLog.h"
23 #include "messages/MOSDRepOpReply.h"
24 #include "messages/MPGStats.h"
26 #include "os/Transaction.h"
27 #include "osd/ClassHandler.h"
28 #include "osd/PGPeeringEvent.h"
29 #include "osd/PeeringState.h"
31 #include "crimson/mon/MonClient.h"
32 #include "crimson/net/Connection.h"
33 #include "crimson/net/Messenger.h"
34 #include "crimson/os/cyanstore/cyan_object.h"
35 #include "crimson/os/futurized_collection.h"
36 #include "crimson/os/futurized_store.h"
37 #include "crimson/osd/heartbeat.h"
38 #include "crimson/osd/osd_meta.h"
39 #include "crimson/osd/pg.h"
40 #include "crimson/osd/pg_backend.h"
41 #include "crimson/osd/pg_meta.h"
42 #include "crimson/osd/osd_operations/client_request.h"
43 #include "crimson/osd/osd_operations/compound_peering_request.h"
44 #include "crimson/osd/osd_operations/peering_event.h"
45 #include "crimson/osd/osd_operations/pg_advance_map.h"
46 #include "crimson/osd/osd_operations/replicated_request.h"
49 seastar::logger
& logger() {
50 return crimson::get_logger(ceph_subsys_osd
);
52 static constexpr int TICK_INTERVAL
= 1;
55 using crimson::common::local_conf
;
56 using crimson::os::FuturizedStore
;
58 namespace crimson::osd
{
60 OSD::OSD(int id
, uint32_t nonce
,
61 crimson::net::MessengerRef cluster_msgr
,
62 crimson::net::MessengerRef public_msgr
,
63 crimson::net::MessengerRef hb_front_msgr
,
64 crimson::net::MessengerRef hb_back_msgr
)
67 // do this in background
68 beacon_timer
{[this] { (void)send_beacon(); }},
69 cluster_msgr
{cluster_msgr
},
70 public_msgr
{public_msgr
},
71 monc
{new crimson::mon::Client
{*public_msgr
, *this}},
72 mgrc
{new crimson::mgr::Client
{*public_msgr
, *this}},
73 store
{crimson::os::FuturizedStore::create(
74 local_conf().get_val
<std::string
>("osd_objectstore"),
75 local_conf().get_val
<std::string
>("osd_data"),
76 local_conf().get_config_values())},
77 shard_services
{*this, *cluster_msgr
, *public_msgr
, *monc
, *mgrc
, *store
},
78 heartbeat
{new Heartbeat
{shard_services
, *monc
, hb_front_msgr
, hb_back_msgr
}},
79 // do this in background
80 heartbeat_timer
{[this] { (void)update_heartbeat_peers(); }},
81 asok
{seastar::make_lw_shared
<crimson::admin::AdminSocket
>()},
82 osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services
)))
84 osdmaps
[0] = boost::make_local_shared
<OSDMap
>();
85 for (auto msgr
: {std::ref(cluster_msgr
), std::ref(public_msgr
),
86 std::ref(hb_front_msgr
), std::ref(hb_back_msgr
)}) {
87 msgr
.get()->set_auth_server(monc
.get());
88 msgr
.get()->set_auth_client(monc
.get());
91 if (local_conf()->osd_open_classes_on_start
) {
92 const int r
= ClassHandler::get_instance().open_all_classes();
94 logger().warn("{} warning: got an error loading one or more classes: {}",
95 __func__
, cpp_strerror(r
));
100 OSD::~OSD() = default;
103 // Initial features in new superblock.
104 // Features here are also automatically upgraded
105 CompatSet
get_osd_initial_compat_set()
107 CompatSet::FeatureSet ceph_osd_feature_compat
;
108 CompatSet::FeatureSet ceph_osd_feature_ro_compat
;
109 CompatSet::FeatureSet ceph_osd_feature_incompat
;
110 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE
);
111 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO
);
112 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC
);
113 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC
);
114 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES
);
115 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL
);
116 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO
);
117 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO
);
118 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG
);
119 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER
);
120 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS
);
121 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA
);
122 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING
);
123 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO
);
124 ceph_osd_feature_incompat
.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES
);
125 return CompatSet(ceph_osd_feature_compat
,
126 ceph_osd_feature_ro_compat
,
127 ceph_osd_feature_incompat
);
131 seastar::future
<> OSD::mkfs(uuid_d osd_uuid
, uuid_d cluster_fsid
)
133 return store
->start().then([this, osd_uuid
] {
134 return store
->mkfs(osd_uuid
);
136 return store
->mount();
137 }).then([cluster_fsid
, this] {
138 superblock
.cluster_fsid
= cluster_fsid
;
139 superblock
.osd_fsid
= store
->get_fsid();
140 superblock
.whoami
= whoami
;
141 superblock
.compat_features
= get_osd_initial_compat_set();
144 "{} writing superblock cluster_fsid {} osd_fsid {}",
147 superblock
.osd_fsid
);
148 return store
->create_new_collection(coll_t::meta());
149 }).then([this] (auto ch
) {
150 meta_coll
= make_unique
<OSDMeta
>(ch
, store
.get());
151 ceph::os::Transaction t
;
152 meta_coll
->create(t
);
153 meta_coll
->store_superblock(t
, superblock
);
154 return store
->do_transaction(meta_coll
->collection(), std::move(t
));
155 }).then([cluster_fsid
, this] {
156 return when_all_succeed(
157 store
->write_meta("ceph_fsid", cluster_fsid
.to_string()),
158 store
->write_meta("whoami", std::to_string(whoami
)));
159 }).then([cluster_fsid
, this] {
160 fmt::print("created object store {} for osd.{} fsid {}\n",
161 local_conf().get_val
<std::string
>("osd_data"),
162 whoami
, cluster_fsid
);
163 return seastar::now();
168 entity_addrvec_t
pick_addresses(int what
) {
169 entity_addrvec_t addrs
;
170 crimson::common::CephContext cct
;
171 if (int r
= ::pick_addresses(&cct
, what
, &addrs
, -1); r
< 0) {
172 throw std::runtime_error("failed to pick address");
174 for (auto addr
: addrs
.v
) {
175 logger().info("picked address {}", addr
);
179 std::pair
<entity_addrvec_t
, bool>
180 replace_unknown_addrs(entity_addrvec_t maybe_unknowns
,
181 const entity_addrvec_t
& knowns
) {
182 bool changed
= false;
183 auto maybe_replace
= [&](entity_addr_t addr
) {
184 if (!addr
.is_blank_ip()) {
187 for (auto& b
: knowns
.v
) {
188 if (addr
.get_family() == b
.get_family()) {
190 a
.set_nonce(addr
.get_nonce());
191 a
.set_type(addr
.get_type());
192 a
.set_port(addr
.get_port());
197 throw std::runtime_error("failed to replace unknown address");
199 entity_addrvec_t replaced
;
200 std::transform(maybe_unknowns
.v
.begin(),
201 maybe_unknowns
.v
.end(),
202 std::back_inserter(replaced
.v
),
204 return {replaced
, changed
};
208 seastar::future
<> OSD::start()
210 logger().info("start");
212 startup_time
= ceph::mono_clock::now();
214 return store
->start().then([this] {
215 return store
->mount();
217 return store
->open_collection(coll_t::meta());
218 }).then([this](auto ch
) {
219 meta_coll
= make_unique
<OSDMeta
>(ch
, store
.get());
220 return meta_coll
->load_superblock();
221 }).then([this](OSDSuperblock
&& sb
) {
222 superblock
= std::move(sb
);
223 return get_map(superblock
.current_epoch
);
224 }).then([this](cached_map_t
&& map
) {
225 shard_services
.update_map(map
);
226 osdmap_gate
.got_map(map
->get_epoch());
227 osdmap
= std::move(map
);
231 uint64_t osd_required
=
233 CEPH_FEATURE_PGID64
|
235 using crimson::net::SocketPolicy
;
237 public_msgr
->set_default_policy(SocketPolicy::stateless_server(0));
238 public_msgr
->set_policy(entity_name_t::TYPE_MON
,
239 SocketPolicy::lossy_client(osd_required
));
240 public_msgr
->set_policy(entity_name_t::TYPE_MGR
,
241 SocketPolicy::lossy_client(osd_required
));
242 public_msgr
->set_policy(entity_name_t::TYPE_OSD
,
243 SocketPolicy::stateless_server(0));
245 cluster_msgr
->set_default_policy(SocketPolicy::stateless_server(0));
246 cluster_msgr
->set_policy(entity_name_t::TYPE_MON
,
247 SocketPolicy::lossy_client(0));
248 cluster_msgr
->set_policy(entity_name_t::TYPE_OSD
,
249 SocketPolicy::lossless_peer(osd_required
));
250 cluster_msgr
->set_policy(entity_name_t::TYPE_CLIENT
,
251 SocketPolicy::stateless_server(0));
253 dispatchers
.push_front(this);
254 dispatchers
.push_front(monc
.get());
255 dispatchers
.push_front(mgrc
.get());
256 return seastar::when_all_succeed(
257 cluster_msgr
->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER
),
258 local_conf()->ms_bind_port_min
,
259 local_conf()->ms_bind_port_max
)
260 .then([this] { return cluster_msgr
->start(&dispatchers
); }),
261 public_msgr
->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC
),
262 local_conf()->ms_bind_port_min
,
263 local_conf()->ms_bind_port_max
)
264 .then([this] { return public_msgr
->start(&dispatchers
); }));
266 return seastar::when_all_succeed(monc
->start(),
269 return _add_me_to_crush();
271 monc
->sub_want("osd_pg_creates", last_pg_create_epoch
, 0);
272 monc
->sub_want("mgrmap", 0, 0);
273 monc
->sub_want("osdmap", 0, 0);
274 return monc
->renew_subs();
276 if (auto [addrs
, changed
] =
277 replace_unknown_addrs(cluster_msgr
->get_myaddrs(),
278 public_msgr
->get_myaddrs()); changed
) {
279 return cluster_msgr
->set_myaddrs(addrs
);
281 return seastar::now();
284 return heartbeat
->start(public_msgr
->get_myaddrs(),
285 cluster_msgr
->get_myaddrs());
287 // create the admin-socket server, and the objects that register
288 // to handle incoming commands
289 return start_asok_admin();
295 seastar::future
<> OSD::start_boot()
298 return monc
->get_version("osdmap").then([this](version_t newest
, version_t oldest
) {
299 return _preboot(oldest
, newest
);
303 seastar::future
<> OSD::_preboot(version_t oldest
, version_t newest
)
305 logger().info("osd.{}: _preboot", whoami
);
306 if (osdmap
->get_epoch() == 0) {
307 logger().warn("waiting for initial osdmap");
308 } else if (osdmap
->is_destroyed(whoami
)) {
309 logger().warn("osdmap says I am destroyed");
310 // provide a small margin so we don't livelock seeing if we
311 // un-destroyed ourselves.
312 if (osdmap
->get_epoch() > newest
- 1) {
313 throw std::runtime_error("i am destroyed");
315 } else if (osdmap
->is_noup(whoami
)) {
316 logger().warn("osdmap NOUP flag is set, waiting for it to clear");
317 } else if (!osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
)) {
318 logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
319 } else if (osdmap
->require_osd_release
< ceph_release_t::octopus
) {
320 logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
322 // TODO: update mon if current fullness state is different from osdmap
323 } else if (version_t n
= local_conf()->osd_map_message_max
;
324 osdmap
->get_epoch() >= oldest
- 1 &&
325 osdmap
->get_epoch() + n
> newest
) {
328 // get all the latest maps
329 if (osdmap
->get_epoch() + 1 >= oldest
) {
330 return shard_services
.osdmap_subscribe(osdmap
->get_epoch() + 1, false);
332 return shard_services
.osdmap_subscribe(oldest
- 1, true);
336 seastar::future
<> OSD::_send_boot()
340 logger().info("hb_back_msgr: {}", heartbeat
->get_back_addrs());
341 logger().info("hb_front_msgr: {}", heartbeat
->get_front_addrs());
342 logger().info("cluster_msgr: {}", cluster_msgr
->get_myaddr());
343 auto m
= make_message
<MOSDBoot
>(superblock
,
346 heartbeat
->get_back_addrs(),
347 heartbeat
->get_front_addrs(),
348 cluster_msgr
->get_myaddrs(),
350 collect_sys_info(&m
->metadata
, NULL
);
351 return monc
->send_message(m
);
354 seastar::future
<> OSD::_add_me_to_crush()
356 if (!local_conf().get_val
<bool>("osd_crush_update_on_start")) {
357 return seastar::now();
359 auto get_weight
= [this] {
360 if (auto w
= local_conf().get_val
<double>("osd_crush_initial_weight");
362 return seastar::make_ready_future
<double>(w
);
364 return store
->stat().then([](auto st
) {
365 auto total
= st
.total
;
366 return seastar::make_ready_future
<double>(
368 double(total
) / double(1ull << 40))); // TB
372 return get_weight().then([this](auto weight
) {
373 const crimson::crush::CrushLocation loc
{make_unique
<CephContext
>().get()};
374 logger().info("{} crush location is {}", __func__
, loc
);
375 string cmd
= fmt::format(R
"({{
376 "prefix
": "osd crush create
-or-move
",
380 }})", whoami
, weight
, loc
);
381 return monc
->run_command({cmd
}, {});
382 }).then([](int32_t code
, string message
, bufferlist
) {
384 logger().warn("fail to add to crush: {} ({})", message
, code
);
385 throw std::runtime_error("fail to add to crush");
387 logger().info("added to crush: {}", message
);
389 return seastar::now();
393 seastar::future
<> OSD::_send_alive()
395 auto want
= osdmap
->get_epoch();
397 "{} want {} up_thru_wanted {}",
401 if (!osdmap
->exists(whoami
)) {
402 logger().warn("{} DNE", __func__
);
403 return seastar::now();
404 } else if (want
<= up_thru_wanted
) {
405 logger().debug("{} {} <= {}", __func__
, want
, up_thru_wanted
);
406 return seastar::now();
408 up_thru_wanted
= want
;
409 auto m
= make_message
<MOSDAlive
>(osdmap
->get_epoch(), want
);
410 return monc
->send_message(std::move(m
));
415 The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
416 to handle) registered to it:
417 - OSD's specific commands are handled by the OSD object;
418 - there are some common commands registered to be directly handled by the AdminSocket object
421 seastar::future
<> OSD::start_asok_admin()
423 auto asok_path
= local_conf().get_val
<std::string
>("admin_socket");
424 using namespace crimson::admin
;
425 return asok
->start(asok_path
).then([this] {
426 return seastar::when_all_succeed(
427 asok
->register_admin_commands(),
428 asok
->register_command(make_asok_hook
<OsdStatusHook
>(*this)),
429 asok
->register_command(make_asok_hook
<SendBeaconHook
>(*this)),
430 asok
->register_command(make_asok_hook
<ConfigShowHook
>()),
431 asok
->register_command(make_asok_hook
<ConfigGetHook
>()),
432 asok
->register_command(make_asok_hook
<ConfigSetHook
>()));
436 seastar::future
<> OSD::stop()
438 logger().info("stop");
439 // see also OSD::shutdown()
440 state
.set_stopping();
442 return gate
.close().then([this] {
445 return heartbeat
->stop();
449 return when_all_succeed(
450 public_msgr
->shutdown(),
451 cluster_msgr
->shutdown());
453 return store
->umount();
455 return store
->stop();
456 }).handle_exception([](auto ep
) {
457 logger().error("error while stopping osd: {}", ep
);
461 void OSD::dump_status(Formatter
* f
) const
463 f
->dump_stream("cluster_fsid") << superblock
.cluster_fsid
;
464 f
->dump_stream("osd_fsid") << superblock
.osd_fsid
;
465 f
->dump_unsigned("whoami", superblock
.whoami
);
466 f
->dump_string("state", state
.to_string());
467 f
->dump_unsigned("oldest_map", superblock
.oldest_map
);
468 f
->dump_unsigned("newest_map", superblock
.newest_map
);
469 f
->dump_unsigned("num_pgs", pg_map
.get_pgs().size());
472 seastar::future
<> OSD::load_pgs()
474 return store
->list_collections().then([this](auto colls
) {
475 return seastar::parallel_for_each(colls
, [this](auto coll
) {
477 if (coll
.is_pg(&pgid
)) {
478 return load_pg(pgid
).then([pgid
, this](auto&& pg
) {
479 logger().info("load_pgs: loaded {}", pgid
);
480 pg_map
.pg_loaded(pgid
, std::move(pg
));
481 shard_services
.inc_pg_num();
482 return seastar::now();
484 } else if (coll
.is_temp(&pgid
)) {
485 // TODO: remove the collection
486 return seastar::now();
488 logger().warn("ignoring unrecognized collection: {}", coll
);
489 return seastar::now();
495 seastar::future
<Ref
<PG
>> OSD::make_pg(cached_map_t create_map
,
499 using ec_profile_t
= map
<string
,string
>;
500 auto get_pool_info
= [create_map
, pgid
, this] {
501 if (create_map
->have_pg_pool(pgid
.pool())) {
502 pg_pool_t pi
= *create_map
->get_pg_pool(pgid
.pool());
503 string name
= create_map
->get_pool_name(pgid
.pool());
504 ec_profile_t ec_profile
;
505 if (pi
.is_erasure()) {
506 ec_profile
= create_map
->get_erasure_code_profile(pi
.erasure_code_profile
);
508 return seastar::make_ready_future
<pg_pool_t
, string
, ec_profile_t
>(
511 std::move(ec_profile
));
513 // pool was deleted; grab final pg_pool_t off disk.
514 return meta_coll
->load_final_pool_info(pgid
.pool());
517 auto get_collection
= [pgid
, do_create
, this] {
518 const coll_t cid
{pgid
};
520 return store
->create_new_collection(cid
);
522 return store
->open_collection(cid
);
525 return seastar::when_all_succeed(
526 std::move(get_pool_info
),
527 std::move(get_collection
)
528 ).then([pgid
, create_map
, this] (auto info
,
530 auto [pool
, name
, ec_profile
] = std::move(info
);
531 return seastar::make_ready_future
<Ref
<PG
>>(
533 pg_shard_t
{whoami
, pgid
.shard
},
543 seastar::future
<Ref
<PG
>> OSD::load_pg(spg_t pgid
)
545 return PGMeta
{store
.get(), pgid
}.get_epoch().then([this](epoch_t e
) {
547 }).then([pgid
, this] (auto&& create_map
) {
548 return make_pg(std::move(create_map
), pgid
, false);
549 }).then([this, pgid
](Ref
<PG
> pg
) {
550 return pg
->read_state(store
.get()).then([pg
] {
551 return seastar::make_ready_future
<Ref
<PG
>>(std::move(pg
));
553 }).handle_exception([pgid
](auto ep
) {
554 logger().info("pg {} saw exception on load {}", pgid
, ep
);
555 ceph_abort("Could not load pg" == 0);
556 return seastar::make_exception_future
<Ref
<PG
>>(ep
);
560 seastar::future
<> OSD::ms_dispatch(crimson::net::Connection
* conn
, MessageRef m
)
562 if (state
.is_stopping()) {
563 return seastar::now();
566 switch (m
->get_type()) {
567 case CEPH_MSG_OSD_MAP
:
568 return handle_osd_map(conn
, boost::static_pointer_cast
<MOSDMap
>(m
));
569 case CEPH_MSG_OSD_OP
:
570 return handle_osd_op(conn
, boost::static_pointer_cast
<MOSDOp
>(m
));
571 case MSG_OSD_PG_CREATE2
:
572 shard_services
.start_operation
<CompoundPeeringRequest
>(
576 return seastar::now();
577 case MSG_OSD_PG_LEASE
:
579 case MSG_OSD_PG_LEASE_ACK
:
581 case MSG_OSD_PG_NOTIFY2
:
583 case MSG_OSD_PG_INFO2
:
585 case MSG_OSD_PG_QUERY2
:
588 return handle_peering_op(conn
, boost::static_pointer_cast
<MOSDPeeringOp
>(m
));
590 return handle_rep_op(conn
, boost::static_pointer_cast
<MOSDRepOp
>(m
));
591 case MSG_OSD_REPOPREPLY
:
592 return handle_rep_op_reply(conn
, boost::static_pointer_cast
<MOSDRepOpReply
>(m
));
594 logger().info("{} unhandled message {}", __func__
, *m
);
595 return seastar::now();
599 seastar::future
<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn
)
601 if (conn
->get_peer_type() != CEPH_ENTITY_TYPE_MON
) {
602 return seastar::now();
604 return seastar::now();
608 seastar::future
<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn
)
610 // TODO: cleanup the session attached to this connection
611 logger().warn("ms_handle_reset");
612 return seastar::now();
615 seastar::future
<> OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn
)
617 logger().warn("ms_handle_remote_reset");
618 return seastar::now();
621 void OSD::handle_authentication(const EntityName
& name
,
622 const AuthCapsInfo
& caps
)
627 MessageRef
OSD::get_stats()
629 // todo: m-to-n: collect stats using map-reduce
630 // MPGStats::had_map_for is not used since PGMonitor was removed
631 auto m
= make_message
<MPGStats
>(monc
->get_fsid(), osdmap
->get_epoch());
633 for (auto [pgid
, pg
] : pg_map
.get_pgs()) {
634 if (pg
->is_primary()) {
635 auto stats
= pg
->get_stats();
636 // todo: update reported_epoch,reported_seq,last_fresh
637 stats
.reported_epoch
= osdmap
->get_epoch();
638 m
->pg_stat
.emplace(pgid
.pgid
, std::move(stats
));
644 OSD::cached_map_t
OSD::get_map() const
649 seastar::future
<OSD::cached_map_t
> OSD::get_map(epoch_t e
)
651 // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
652 if (auto found
= osdmaps
.find(e
); found
) {
653 return seastar::make_ready_future
<cached_map_t
>(std::move(found
));
655 return load_map(e
).then([e
, this](unique_ptr
<OSDMap
> osdmap
) {
656 return seastar::make_ready_future
<cached_map_t
>(
657 osdmaps
.insert(e
, std::move(osdmap
)));
662 void OSD::store_map_bl(ceph::os::Transaction
& t
,
663 epoch_t e
, bufferlist
&& bl
)
665 meta_coll
->store_map(t
, e
, bl
);
666 map_bl_cache
.insert(e
, std::move(bl
));
669 seastar::future
<bufferlist
> OSD::load_map_bl(epoch_t e
)
671 if (std::optional
<bufferlist
> found
= map_bl_cache
.find(e
); found
) {
672 return seastar::make_ready_future
<bufferlist
>(*found
);
674 return meta_coll
->load_map(e
);
678 seastar::future
<std::unique_ptr
<OSDMap
>> OSD::load_map(epoch_t e
)
680 auto o
= std::make_unique
<OSDMap
>();
682 return load_map_bl(e
).then([o
=std::move(o
)](bufferlist bl
) mutable {
684 return seastar::make_ready_future
<unique_ptr
<OSDMap
>>(std::move(o
));
687 return seastar::make_ready_future
<unique_ptr
<OSDMap
>>(std::move(o
));
691 seastar::future
<> OSD::store_maps(ceph::os::Transaction
& t
,
692 epoch_t start
, Ref
<MOSDMap
> m
)
694 return seastar::do_for_each(boost::make_counting_iterator(start
),
695 boost::make_counting_iterator(m
->get_last() + 1),
696 [&t
, m
, this](epoch_t e
) {
697 if (auto p
= m
->maps
.find(e
); p
!= m
->maps
.end()) {
698 auto o
= std::make_unique
<OSDMap
>();
699 o
->decode(p
->second
);
700 logger().info("store_maps osdmap.{}", e
);
701 store_map_bl(t
, e
, std::move(std::move(p
->second
)));
702 osdmaps
.insert(e
, std::move(o
));
703 return seastar::now();
704 } else if (auto p
= m
->incremental_maps
.find(e
);
705 p
!= m
->incremental_maps
.end()) {
706 return load_map(e
- 1).then([e
, bl
=p
->second
, &t
, this](auto o
) {
707 OSDMap::Incremental inc
;
708 auto i
= bl
.cbegin();
710 o
->apply_incremental(inc
);
712 o
->encode(fbl
, inc
.encode_features
| CEPH_FEATURE_RESERVED
);
713 store_map_bl(t
, e
, std::move(fbl
));
714 osdmaps
.insert(e
, std::move(o
));
715 return seastar::now();
718 logger().error("MOSDMap lied about what maps it had?");
719 return seastar::now();
724 bool OSD::require_mon_peer(crimson::net::Connection
*conn
, Ref
<Message
> m
)
726 if (!conn
->peer_is_mon()) {
727 logger().info("{} received from non-mon {}, {}",
729 conn
->get_peer_addr(),
736 seastar::future
<Ref
<PG
>> OSD::handle_pg_create_info(
737 std::unique_ptr
<PGCreateInfo
> info
) {
738 return seastar::do_with(
740 [this](auto &info
) -> seastar::future
<Ref
<PG
>> {
741 return get_map(info
->epoch
).then(
742 [&info
, this](cached_map_t startmap
) ->
743 seastar::future
<Ref
<PG
>, cached_map_t
> {
744 const spg_t
&pgid
= info
->pgid
;
746 int64_t pool_id
= pgid
.pgid
.pool();
747 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
750 "{} ignoring pgid {}, pool dne",
753 return seastar::make_ready_future
<Ref
<PG
>, cached_map_t
>(
757 ceph_assert(osdmap
->require_osd_release
>= ceph_release_t::octopus
);
758 if (!pool
->has_flag(pg_pool_t::FLAG_CREATING
)) {
759 // this ensures we do not process old creating messages after the
760 // pool's initial pgs have been created (and pg are subsequently
761 // allowed to split or merge).
763 "{} dropping {} create, pool does not have CREATING flag set",
766 return seastar::make_ready_future
<Ref
<PG
>, cached_map_t
>(
771 return make_pg(startmap
, pgid
, true).then(
772 [startmap
=std::move(startmap
)](auto pg
) mutable {
773 return seastar::make_ready_future
<Ref
<PG
>, cached_map_t
>(
775 std::move(startmap
));
777 }).then([this, &info
](auto pg
, auto startmap
) ->
778 seastar::future
<Ref
<PG
>> {
780 return seastar::make_ready_future
<Ref
<PG
>>(Ref
<PG
>());
781 PeeringCtx rctx
{ceph_release_t::octopus
};
782 const pg_pool_t
* pp
= startmap
->get_pg_pool(info
->pgid
.pool());
784 int up_primary
, acting_primary
;
785 vector
<int> up
, acting
;
786 startmap
->pg_to_up_acting_osds(
787 info
->pgid
.pgid
, &up
, &up_primary
, &acting
, &acting_primary
);
789 int role
= startmap
->calc_pg_role(pg_shard_t(whoami
, info
->pgid
.shard
),
792 create_pg_collection(
795 info
->pgid
.get_split_bits(pp
->get_pg_num()));
808 info
->past_intervals
,
812 return shard_services
.start_operation
<PGAdvanceMap
>(
813 *this, pg
, pg
->get_osdmap_epoch(),
814 osdmap
->get_epoch(), std::move(rctx
), true).second
.then([pg
] {
815 return seastar::make_ready_future
<Ref
<PG
>>(pg
);
821 seastar::future
<> OSD::handle_osd_map(crimson::net::Connection
* conn
,
824 logger().info("handle_osd_map {}", *m
);
825 if (m
->fsid
!= superblock
.cluster_fsid
) {
826 logger().warn("fsid mismatched");
827 return seastar::now();
829 if (state
.is_initializing()) {
830 logger().warn("i am still initializing");
831 return seastar::now();
834 const auto first
= m
->get_first();
835 const auto last
= m
->get_last();
836 logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
837 first
, last
, superblock
.newest_map
, m
->oldest_map
, m
->newest_map
);
838 // make sure there is something new, here, before we bother flushing
839 // the queues and such
840 if (last
<= superblock
.newest_map
) {
841 return seastar::now();
844 bool skip_maps
= false;
845 epoch_t start
= superblock
.newest_map
+ 1;
847 logger().info("handle_osd_map message skips epochs {}..{}",
849 if (m
->oldest_map
<= start
) {
850 return shard_services
.osdmap_subscribe(start
, false);
852 // always try to get the full range of maps--as many as we can. this
853 // 1- is good to have
854 // 2- is at present the only way to ensure that we get a *full* map as
856 if (m
->oldest_map
< first
) {
857 return shard_services
.osdmap_subscribe(m
->oldest_map
- 1, true);
863 return seastar::do_with(ceph::os::Transaction
{},
865 return store_maps(t
, start
, m
).then([=, &t
] {
866 // even if this map isn't from a mon, we may have satisfied our subscription
867 monc
->sub_got("osdmap", last
);
868 if (!superblock
.oldest_map
|| skip_maps
) {
869 superblock
.oldest_map
= first
;
871 superblock
.newest_map
= last
;
872 superblock
.current_epoch
= last
;
874 // note in the superblock that we were clean thru the prior epoch
875 if (boot_epoch
&& boot_epoch
>= superblock
.mounted
) {
876 superblock
.mounted
= boot_epoch
;
877 superblock
.clean_thru
= last
;
879 meta_coll
->store_superblock(t
, superblock
);
880 return store
->do_transaction(meta_coll
->collection(), std::move(t
));
883 // TODO: write to superblock and commit the transaction
884 return committed_osd_maps(start
, last
, m
);
888 seastar::future
<> OSD::committed_osd_maps(version_t first
,
892 logger().info("osd.{}: committed_osd_maps({}, {})", whoami
, first
, last
);
893 // advance through the new maps
894 return seastar::do_for_each(boost::make_counting_iterator(first
),
895 boost::make_counting_iterator(last
+ 1),
896 [this](epoch_t cur
) {
897 return get_map(cur
).then([this](cached_map_t
&& o
) {
898 osdmap
= std::move(o
);
899 shard_services
.update_map(osdmap
);
901 osdmap
->is_up(whoami
) &&
902 osdmap
->get_addrs(whoami
) == public_msgr
->get_myaddrs()) {
903 up_epoch
= osdmap
->get_epoch();
905 boot_epoch
= osdmap
->get_epoch();
910 if (osdmap
->is_up(whoami
) &&
911 osdmap
->get_addrs(whoami
) == public_msgr
->get_myaddrs() &&
912 bind_epoch
< osdmap
->get_up_from(whoami
)) {
913 if (state
.is_booting()) {
914 logger().info("osd.{}: activating...", whoami
);
916 beacon_timer
.arm_periodic(
917 std::chrono::seconds(local_conf()->osd_beacon_report_interval
));
918 heartbeat_timer
.arm_periodic(
919 std::chrono::seconds(TICK_INTERVAL
));
922 check_osdmap_features();
924 return consume_map(osdmap
->get_epoch());
926 if (state
.is_active()) {
927 logger().info("osd.{}: now active", whoami
);
928 if (!osdmap
->exists(whoami
)) {
931 if (should_restart()) {
934 return seastar::now();
936 } else if (state
.is_preboot()) {
937 logger().info("osd.{}: now preboot", whoami
);
939 if (m
->get_source().is_mon()) {
940 return _preboot(m
->oldest_map
, m
->newest_map
);
942 logger().info("osd.{}: start_boot", whoami
);
946 logger().info("osd.{}: now {}", whoami
, state
);
948 return seastar::now();
953 seastar::future
<> OSD::handle_osd_op(crimson::net::Connection
* conn
,
956 shard_services
.start_operation
<ClientRequest
>(
960 return seastar::now();
963 seastar::future
<> OSD::handle_rep_op(crimson::net::Connection
* conn
,
967 shard_services
.start_operation
<RepRequest
>(
971 return seastar::now();
974 seastar::future
<> OSD::handle_rep_op_reply(crimson::net::Connection
* conn
,
975 Ref
<MOSDRepOpReply
> m
)
977 const auto& pgs
= pg_map
.get_pgs();
978 if (auto pg
= pgs
.find(m
->get_spg()); pg
!= pgs
.end()) {
980 pg
->second
->handle_rep_op_reply(conn
, *m
);
982 logger().warn("stale reply: {}", *m
);
984 return seastar::now();
987 bool OSD::should_restart() const
989 if (!osdmap
->is_up(whoami
)) {
990 logger().info("map e {} marked osd.{} down",
991 osdmap
->get_epoch(), whoami
);
993 } else if (osdmap
->get_addrs(whoami
) != public_msgr
->get_myaddrs()) {
994 logger().error("map e {} had wrong client addr ({} != my {})",
996 osdmap
->get_addrs(whoami
),
997 public_msgr
->get_myaddrs());
999 } else if (osdmap
->get_cluster_addrs(whoami
) != cluster_msgr
->get_myaddrs()) {
1000 logger().error("map e {} had wrong cluster addr ({} != my {})",
1001 osdmap
->get_epoch(),
1002 osdmap
->get_cluster_addrs(whoami
),
1003 cluster_msgr
->get_myaddrs());
1010 seastar::future
<> OSD::restart()
1012 beacon_timer
.cancel();
1013 heartbeat_timer
.cancel();
1015 bind_epoch
= osdmap
->get_epoch();
1016 // TODO: promote to shutdown if being marked down for multiple times
1017 // rebind messengers
1018 return start_boot();
1021 seastar::future
<> OSD::shutdown()
1024 superblock
.mounted
= boot_epoch
;
1025 superblock
.clean_thru
= osdmap
->get_epoch();
1026 return seastar::now();
1029 seastar::future
<> OSD::send_beacon()
1031 if (!state
.is_active()) {
1032 return seastar::now();
1034 // FIXME: min lec should be calculated from pg_stat
1035 // and should set m->pgs
1036 epoch_t min_last_epoch_clean
= osdmap
->get_epoch();
1037 auto m
= make_message
<MOSDBeacon
>(osdmap
->get_epoch(),
1038 min_last_epoch_clean
,
1039 superblock
.last_purged_snaps_scrub
);
1040 return monc
->send_message(m
);
1043 seastar::future
<> OSD::update_heartbeat_peers()
1045 if (!state
.is_active()) {
1046 return seastar::now();
1048 for (auto& pg
: pg_map
.get_pgs()) {
1049 vector
<int> up
, acting
;
1050 osdmap
->pg_to_up_acting_osds(pg
.first
.pgid
,
1053 for (int osd
: boost::join(up
, acting
)) {
1054 if (osd
== CRUSH_ITEM_NONE
|| osd
== whoami
) {
1057 heartbeat
->add_peer(osd
, osdmap
->get_epoch());
1061 return heartbeat
->update_peers(whoami
);
1064 seastar::future
<> OSD::handle_peering_op(
1065 crimson::net::Connection
* conn
,
1066 Ref
<MOSDPeeringOp
> m
)
1068 const int from
= m
->get_source().num();
1069 logger().debug("handle_peering_op on {} from {}", m
->get_spg(), from
);
1070 shard_services
.start_operation
<RemotePeeringEvent
>(
1074 pg_shard_t
{from
, m
->get_spg().shard
},
1076 std::move(*m
->get_event()));
1077 return seastar::now();
1080 void OSD::check_osdmap_features()
1082 heartbeat
->set_require_authorizer(true);
1085 seastar::future
<> OSD::consume_map(epoch_t epoch
)
1087 // todo: m-to-n: broadcast this news to all shards
1088 auto &pgs
= pg_map
.get_pgs();
1089 return seastar::parallel_for_each(pgs
.begin(), pgs
.end(), [=](auto& pg
) {
1090 return shard_services
.start_operation
<PGAdvanceMap
>(
1091 *this, pg
.second
, pg
.second
->get_osdmap_epoch(), epoch
,
1092 PeeringCtx
{ceph_release_t::octopus
}, false).second
;
1093 }).then([epoch
, this] {
1094 osdmap_gate
.got_map(epoch
);
1095 return seastar::make_ready_future();
1100 blocking_future
<Ref
<PG
>>
1101 OSD::get_or_create_pg(
1104 std::unique_ptr
<PGCreateInfo
> info
)
1106 auto [fut
, creating
] = pg_map
.get_pg(pgid
, bool(info
));
1107 if (!creating
&& info
) {
1108 pg_map
.set_creating(pgid
);
1109 (void)handle_pg_create_info(std::move(info
));
1111 return std::move(fut
);
1114 blocking_future
<Ref
<PG
>> OSD::wait_for_pg(
1117 return pg_map
.get_pg(pgid
).first
;