]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / osd.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "osd.h"
5
6 #include <sys/utsname.h>
7
8 #include <boost/iterator/counting_iterator.hpp>
9 #include <boost/range/join.hpp>
10 #include <boost/smart_ptr/make_local_shared.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13
14 #include "common/pick_address.h"
15 #include "include/util.h"
16
17 #include "messages/MOSDAlive.h"
18 #include "messages/MOSDBeacon.h"
19 #include "messages/MOSDBoot.h"
20 #include "messages/MOSDMap.h"
21 #include "messages/MOSDOp.h"
22 #include "messages/MOSDPGLog.h"
23 #include "messages/MOSDRepOpReply.h"
24 #include "messages/MPGStats.h"
25
26 #include "os/Transaction.h"
27 #include "osd/ClassHandler.h"
28 #include "osd/PGPeeringEvent.h"
29 #include "osd/PeeringState.h"
30
31 #include "crimson/mon/MonClient.h"
32 #include "crimson/net/Connection.h"
33 #include "crimson/net/Messenger.h"
34 #include "crimson/os/cyanstore/cyan_object.h"
35 #include "crimson/os/futurized_collection.h"
36 #include "crimson/os/futurized_store.h"
37 #include "crimson/osd/heartbeat.h"
38 #include "crimson/osd/osd_meta.h"
39 #include "crimson/osd/pg.h"
40 #include "crimson/osd/pg_backend.h"
41 #include "crimson/osd/pg_meta.h"
42 #include "crimson/osd/osd_operations/client_request.h"
43 #include "crimson/osd/osd_operations/compound_peering_request.h"
44 #include "crimson/osd/osd_operations/peering_event.h"
45 #include "crimson/osd/osd_operations/pg_advance_map.h"
46 #include "crimson/osd/osd_operations/replicated_request.h"
47
48 namespace {
49 seastar::logger& logger() {
50 return crimson::get_logger(ceph_subsys_osd);
51 }
52 static constexpr int TICK_INTERVAL = 1;
53 }
54
55 using crimson::common::local_conf;
56 using crimson::os::FuturizedStore;
57
58 namespace crimson::osd {
59
60 OSD::OSD(int id, uint32_t nonce,
61 crimson::net::MessengerRef cluster_msgr,
62 crimson::net::MessengerRef public_msgr,
63 crimson::net::MessengerRef hb_front_msgr,
64 crimson::net::MessengerRef hb_back_msgr)
65 : whoami{id},
66 nonce{nonce},
67 // do this in background
68 beacon_timer{[this] { (void)send_beacon(); }},
69 cluster_msgr{cluster_msgr},
70 public_msgr{public_msgr},
71 monc{new crimson::mon::Client{*public_msgr, *this}},
72 mgrc{new crimson::mgr::Client{*public_msgr, *this}},
73 store{crimson::os::FuturizedStore::create(
74 local_conf().get_val<std::string>("osd_objectstore"),
75 local_conf().get_val<std::string>("osd_data"),
76 local_conf().get_config_values())},
77 shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
78 heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
79 // do this in background
80 heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
81 asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
82 osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
83 {
84 osdmaps[0] = boost::make_local_shared<OSDMap>();
85 for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
86 std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
87 msgr.get()->set_auth_server(monc.get());
88 msgr.get()->set_auth_client(monc.get());
89 }
90
91 if (local_conf()->osd_open_classes_on_start) {
92 const int r = ClassHandler::get_instance().open_all_classes();
93 if (r) {
94 logger().warn("{} warning: got an error loading one or more classes: {}",
95 __func__, cpp_strerror(r));
96 }
97 }
98 }
99
100 OSD::~OSD() = default;
101
102 namespace {
103 // Initial features in new superblock.
104 // Features here are also automatically upgraded
105 CompatSet get_osd_initial_compat_set()
106 {
107 CompatSet::FeatureSet ceph_osd_feature_compat;
108 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
109 CompatSet::FeatureSet ceph_osd_feature_incompat;
110 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
111 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO);
112 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC);
113 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC);
114 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES);
115 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL);
116 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO);
117 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO);
118 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG);
119 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER);
120 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS);
121 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
122 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
123 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
124 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
125 return CompatSet(ceph_osd_feature_compat,
126 ceph_osd_feature_ro_compat,
127 ceph_osd_feature_incompat);
128 }
129 }
130
131 seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
132 {
133 return store->start().then([this, osd_uuid] {
134 return store->mkfs(osd_uuid);
135 }).then([this] {
136 return store->mount();
137 }).then([cluster_fsid, this] {
138 superblock.cluster_fsid = cluster_fsid;
139 superblock.osd_fsid = store->get_fsid();
140 superblock.whoami = whoami;
141 superblock.compat_features = get_osd_initial_compat_set();
142
143 logger().info(
144 "{} writing superblock cluster_fsid {} osd_fsid {}",
145 __func__,
146 cluster_fsid,
147 superblock.osd_fsid);
148 return store->create_new_collection(coll_t::meta());
149 }).then([this] (auto ch) {
150 meta_coll = make_unique<OSDMeta>(ch , store.get());
151 ceph::os::Transaction t;
152 meta_coll->create(t);
153 meta_coll->store_superblock(t, superblock);
154 return store->do_transaction(meta_coll->collection(), std::move(t));
155 }).then([cluster_fsid, this] {
156 return when_all_succeed(
157 store->write_meta("ceph_fsid", cluster_fsid.to_string()),
158 store->write_meta("whoami", std::to_string(whoami)));
159 }).then([cluster_fsid, this] {
160 fmt::print("created object store {} for osd.{} fsid {}\n",
161 local_conf().get_val<std::string>("osd_data"),
162 whoami, cluster_fsid);
163 return seastar::now();
164 });
165 }
166
167 namespace {
168 entity_addrvec_t pick_addresses(int what) {
169 entity_addrvec_t addrs;
170 crimson::common::CephContext cct;
171 if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) {
172 throw std::runtime_error("failed to pick address");
173 }
174 for (auto addr : addrs.v) {
175 logger().info("picked address {}", addr);
176 }
177 return addrs;
178 }
179 std::pair<entity_addrvec_t, bool>
180 replace_unknown_addrs(entity_addrvec_t maybe_unknowns,
181 const entity_addrvec_t& knowns) {
182 bool changed = false;
183 auto maybe_replace = [&](entity_addr_t addr) {
184 if (!addr.is_blank_ip()) {
185 return addr;
186 }
187 for (auto& b : knowns.v) {
188 if (addr.get_family() == b.get_family()) {
189 auto a = b;
190 a.set_nonce(addr.get_nonce());
191 a.set_type(addr.get_type());
192 a.set_port(addr.get_port());
193 changed = true;
194 return a;
195 }
196 }
197 throw std::runtime_error("failed to replace unknown address");
198 };
199 entity_addrvec_t replaced;
200 std::transform(maybe_unknowns.v.begin(),
201 maybe_unknowns.v.end(),
202 std::back_inserter(replaced.v),
203 maybe_replace);
204 return {replaced, changed};
205 }
206 }
207
208 seastar::future<> OSD::start()
209 {
210 logger().info("start");
211
212 startup_time = ceph::mono_clock::now();
213
214 return store->start().then([this] {
215 return store->mount();
216 }).then([this] {
217 return store->open_collection(coll_t::meta());
218 }).then([this](auto ch) {
219 meta_coll = make_unique<OSDMeta>(ch, store.get());
220 return meta_coll->load_superblock();
221 }).then([this](OSDSuperblock&& sb) {
222 superblock = std::move(sb);
223 return get_map(superblock.current_epoch);
224 }).then([this](cached_map_t&& map) {
225 shard_services.update_map(map);
226 osdmap_gate.got_map(map->get_epoch());
227 osdmap = std::move(map);
228 return load_pgs();
229 }).then([this] {
230
231 uint64_t osd_required =
232 CEPH_FEATURE_UID |
233 CEPH_FEATURE_PGID64 |
234 CEPH_FEATURE_OSDENC;
235 using crimson::net::SocketPolicy;
236
237 public_msgr->set_default_policy(SocketPolicy::stateless_server(0));
238 public_msgr->set_policy(entity_name_t::TYPE_MON,
239 SocketPolicy::lossy_client(osd_required));
240 public_msgr->set_policy(entity_name_t::TYPE_MGR,
241 SocketPolicy::lossy_client(osd_required));
242 public_msgr->set_policy(entity_name_t::TYPE_OSD,
243 SocketPolicy::stateless_server(0));
244
245 cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0));
246 cluster_msgr->set_policy(entity_name_t::TYPE_MON,
247 SocketPolicy::lossy_client(0));
248 cluster_msgr->set_policy(entity_name_t::TYPE_OSD,
249 SocketPolicy::lossless_peer(osd_required));
250 cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
251 SocketPolicy::stateless_server(0));
252
253 dispatchers.push_front(this);
254 dispatchers.push_front(monc.get());
255 dispatchers.push_front(mgrc.get());
256 return seastar::when_all_succeed(
257 cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
258 local_conf()->ms_bind_port_min,
259 local_conf()->ms_bind_port_max)
260 .then([this] { return cluster_msgr->start(&dispatchers); }),
261 public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
262 local_conf()->ms_bind_port_min,
263 local_conf()->ms_bind_port_max)
264 .then([this] { return public_msgr->start(&dispatchers); }));
265 }).then([this] {
266 return seastar::when_all_succeed(monc->start(),
267 mgrc->start());
268 }).then([this] {
269 return _add_me_to_crush();
270 }).then([this] {
271 monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
272 monc->sub_want("mgrmap", 0, 0);
273 monc->sub_want("osdmap", 0, 0);
274 return monc->renew_subs();
275 }).then([this] {
276 if (auto [addrs, changed] =
277 replace_unknown_addrs(cluster_msgr->get_myaddrs(),
278 public_msgr->get_myaddrs()); changed) {
279 return cluster_msgr->set_myaddrs(addrs);
280 } else {
281 return seastar::now();
282 }
283 }).then([this] {
284 return heartbeat->start(public_msgr->get_myaddrs(),
285 cluster_msgr->get_myaddrs());
286 }).then([this] {
287 // create the admin-socket server, and the objects that register
288 // to handle incoming commands
289 return start_asok_admin();
290 }).then([this] {
291 return start_boot();
292 });
293 }
294
295 seastar::future<> OSD::start_boot()
296 {
297 state.set_preboot();
298 return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
299 return _preboot(oldest, newest);
300 });
301 }
302
303 seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
304 {
305 logger().info("osd.{}: _preboot", whoami);
306 if (osdmap->get_epoch() == 0) {
307 logger().warn("waiting for initial osdmap");
308 } else if (osdmap->is_destroyed(whoami)) {
309 logger().warn("osdmap says I am destroyed");
310 // provide a small margin so we don't livelock seeing if we
311 // un-destroyed ourselves.
312 if (osdmap->get_epoch() > newest - 1) {
313 throw std::runtime_error("i am destroyed");
314 }
315 } else if (osdmap->is_noup(whoami)) {
316 logger().warn("osdmap NOUP flag is set, waiting for it to clear");
317 } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
318 logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
319 } else if (osdmap->require_osd_release < ceph_release_t::octopus) {
320 logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
321 } else if (false) {
322 // TODO: update mon if current fullness state is different from osdmap
323 } else if (version_t n = local_conf()->osd_map_message_max;
324 osdmap->get_epoch() >= oldest - 1 &&
325 osdmap->get_epoch() + n > newest) {
326 return _send_boot();
327 }
328 // get all the latest maps
329 if (osdmap->get_epoch() + 1 >= oldest) {
330 return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false);
331 } else {
332 return shard_services.osdmap_subscribe(oldest - 1, true);
333 }
334 }
335
336 seastar::future<> OSD::_send_boot()
337 {
338 state.set_booting();
339
340 logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
341 logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
342 logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
343 auto m = make_message<MOSDBoot>(superblock,
344 osdmap->get_epoch(),
345 osdmap->get_epoch(),
346 heartbeat->get_back_addrs(),
347 heartbeat->get_front_addrs(),
348 cluster_msgr->get_myaddrs(),
349 CEPH_FEATURES_ALL);
350 collect_sys_info(&m->metadata, NULL);
351 return monc->send_message(m);
352 }
353
354 seastar::future<> OSD::_add_me_to_crush()
355 {
356 if (!local_conf().get_val<bool>("osd_crush_update_on_start")) {
357 return seastar::now();
358 }
359 auto get_weight = [this] {
360 if (auto w = local_conf().get_val<double>("osd_crush_initial_weight");
361 w >= 0) {
362 return seastar::make_ready_future<double>(w);
363 } else {
364 return store->stat().then([](auto st) {
365 auto total = st.total;
366 return seastar::make_ready_future<double>(
367 std::max(.00001,
368 double(total) / double(1ull << 40))); // TB
369 });
370 }
371 };
372 return get_weight().then([this](auto weight) {
373 const crimson::crush::CrushLocation loc{make_unique<CephContext>().get()};
374 logger().info("{} crush location is {}", __func__, loc);
375 string cmd = fmt::format(R"({{
376 "prefix": "osd crush create-or-move",
377 "id": {},
378 "weight": {:.4f},
379 "args": [{}]
380 }})", whoami, weight, loc);
381 return monc->run_command({cmd}, {});
382 }).then([](int32_t code, string message, bufferlist) {
383 if (code) {
384 logger().warn("fail to add to crush: {} ({})", message, code);
385 throw std::runtime_error("fail to add to crush");
386 } else {
387 logger().info("added to crush: {}", message);
388 }
389 return seastar::now();
390 });
391 }
392
393 seastar::future<> OSD::_send_alive()
394 {
395 auto want = osdmap->get_epoch();
396 logger().info(
397 "{} want {} up_thru_wanted {}",
398 __func__,
399 want,
400 up_thru_wanted);
401 if (!osdmap->exists(whoami)) {
402 logger().warn("{} DNE", __func__);
403 return seastar::now();
404 } else if (want <= up_thru_wanted) {
405 logger().debug("{} {} <= {}", __func__, want, up_thru_wanted);
406 return seastar::now();
407 } else {
408 up_thru_wanted = want;
409 auto m = make_message<MOSDAlive>(osdmap->get_epoch(), want);
410 return monc->send_message(std::move(m));
411 }
412 }
413
414 /*
415 The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
416 to handle) registered to it:
417 - OSD's specific commands are handled by the OSD object;
418 - there are some common commands registered to be directly handled by the AdminSocket object
419 itself.
420 */
421 seastar::future<> OSD::start_asok_admin()
422 {
423 auto asok_path = local_conf().get_val<std::string>("admin_socket");
424 using namespace crimson::admin;
425 return asok->start(asok_path).then([this] {
426 return seastar::when_all_succeed(
427 asok->register_admin_commands(),
428 asok->register_command(make_asok_hook<OsdStatusHook>(*this)),
429 asok->register_command(make_asok_hook<SendBeaconHook>(*this)),
430 asok->register_command(make_asok_hook<ConfigShowHook>()),
431 asok->register_command(make_asok_hook<ConfigGetHook>()),
432 asok->register_command(make_asok_hook<ConfigSetHook>()));
433 });
434 }
435
436 seastar::future<> OSD::stop()
437 {
438 logger().info("stop");
439 // see also OSD::shutdown()
440 state.set_stopping();
441
442 return gate.close().then([this] {
443 return asok->stop();
444 }).then([this] {
445 return heartbeat->stop();
446 }).then([this] {
447 return monc->stop();
448 }).then([this] {
449 return when_all_succeed(
450 public_msgr->shutdown(),
451 cluster_msgr->shutdown());
452 }).then([this] {
453 return store->umount();
454 }).then([this] {
455 return store->stop();
456 }).handle_exception([](auto ep) {
457 logger().error("error while stopping osd: {}", ep);
458 });
459 }
460
461 void OSD::dump_status(Formatter* f) const
462 {
463 f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
464 f->dump_stream("osd_fsid") << superblock.osd_fsid;
465 f->dump_unsigned("whoami", superblock.whoami);
466 f->dump_string("state", state.to_string());
467 f->dump_unsigned("oldest_map", superblock.oldest_map);
468 f->dump_unsigned("newest_map", superblock.newest_map);
469 f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
470 }
471
472 seastar::future<> OSD::load_pgs()
473 {
474 return store->list_collections().then([this](auto colls) {
475 return seastar::parallel_for_each(colls, [this](auto coll) {
476 spg_t pgid;
477 if (coll.is_pg(&pgid)) {
478 return load_pg(pgid).then([pgid, this](auto&& pg) {
479 logger().info("load_pgs: loaded {}", pgid);
480 pg_map.pg_loaded(pgid, std::move(pg));
481 shard_services.inc_pg_num();
482 return seastar::now();
483 });
484 } else if (coll.is_temp(&pgid)) {
485 // TODO: remove the collection
486 return seastar::now();
487 } else {
488 logger().warn("ignoring unrecognized collection: {}", coll);
489 return seastar::now();
490 }
491 });
492 });
493 }
494
495 seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
496 spg_t pgid,
497 bool do_create)
498 {
499 using ec_profile_t = map<string,string>;
500 auto get_pool_info = [create_map, pgid, this] {
501 if (create_map->have_pg_pool(pgid.pool())) {
502 pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
503 string name = create_map->get_pool_name(pgid.pool());
504 ec_profile_t ec_profile;
505 if (pi.is_erasure()) {
506 ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
507 }
508 return seastar::make_ready_future<pg_pool_t, string, ec_profile_t>(
509 std::move(pi),
510 std::move(name),
511 std::move(ec_profile));
512 } else {
513 // pool was deleted; grab final pg_pool_t off disk.
514 return meta_coll->load_final_pool_info(pgid.pool());
515 }
516 };
517 auto get_collection = [pgid, do_create, this] {
518 const coll_t cid{pgid};
519 if (do_create) {
520 return store->create_new_collection(cid);
521 } else {
522 return store->open_collection(cid);
523 }
524 };
525 return seastar::when_all_succeed(
526 std::move(get_pool_info),
527 std::move(get_collection)
528 ).then([pgid, create_map, this] (auto info,
529 auto coll) {
530 auto [pool, name, ec_profile] = std::move(info);
531 return seastar::make_ready_future<Ref<PG>>(
532 new PG{pgid,
533 pg_shard_t{whoami, pgid.shard},
534 std::move(coll),
535 std::move(pool),
536 std::move(name),
537 create_map,
538 shard_services,
539 ec_profile});
540 });
541 }
542
543 seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
544 {
545 return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
546 return get_map(e);
547 }).then([pgid, this] (auto&& create_map) {
548 return make_pg(std::move(create_map), pgid, false);
549 }).then([this, pgid](Ref<PG> pg) {
550 return pg->read_state(store.get()).then([pg] {
551 return seastar::make_ready_future<Ref<PG>>(std::move(pg));
552 });
553 }).handle_exception([pgid](auto ep) {
554 logger().info("pg {} saw exception on load {}", pgid, ep);
555 ceph_abort("Could not load pg" == 0);
556 return seastar::make_exception_future<Ref<PG>>(ep);
557 });
558 }
559
560 seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
561 {
562 if (state.is_stopping()) {
563 return seastar::now();
564 }
565
566 switch (m->get_type()) {
567 case CEPH_MSG_OSD_MAP:
568 return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
569 case CEPH_MSG_OSD_OP:
570 return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
571 case MSG_OSD_PG_CREATE2:
572 shard_services.start_operation<CompoundPeeringRequest>(
573 *this,
574 conn->get_shared(),
575 m);
576 return seastar::now();
577 case MSG_OSD_PG_LEASE:
578 [[fallthrough]];
579 case MSG_OSD_PG_LEASE_ACK:
580 [[fallthrough]];
581 case MSG_OSD_PG_NOTIFY2:
582 [[fallthrough]];
583 case MSG_OSD_PG_INFO2:
584 [[fallthrough]];
585 case MSG_OSD_PG_QUERY2:
586 [[fallthrough]];
587 case MSG_OSD_PG_LOG:
588 return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
589 case MSG_OSD_REPOP:
590 return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
591 case MSG_OSD_REPOPREPLY:
592 return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
593 default:
594 logger().info("{} unhandled message {}", __func__, *m);
595 return seastar::now();
596 }
597 }
598
599 seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn)
600 {
601 if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
602 return seastar::now();
603 } else {
604 return seastar::now();
605 }
606 }
607
608 seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn)
609 {
610 // TODO: cleanup the session attached to this connection
611 logger().warn("ms_handle_reset");
612 return seastar::now();
613 }
614
615 seastar::future<> OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
616 {
617 logger().warn("ms_handle_remote_reset");
618 return seastar::now();
619 }
620
621 void OSD::handle_authentication(const EntityName& name,
622 const AuthCapsInfo& caps)
623 {
624 // todo
625 }
626
627 MessageRef OSD::get_stats()
628 {
629 // todo: m-to-n: collect stats using map-reduce
630 // MPGStats::had_map_for is not used since PGMonitor was removed
631 auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
632
633 for (auto [pgid, pg] : pg_map.get_pgs()) {
634 if (pg->is_primary()) {
635 auto stats = pg->get_stats();
636 // todo: update reported_epoch,reported_seq,last_fresh
637 stats.reported_epoch = osdmap->get_epoch();
638 m->pg_stat.emplace(pgid.pgid, std::move(stats));
639 }
640 }
641 return m;
642 }
643
644 OSD::cached_map_t OSD::get_map() const
645 {
646 return osdmap;
647 }
648
649 seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e)
650 {
651 // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
652 if (auto found = osdmaps.find(e); found) {
653 return seastar::make_ready_future<cached_map_t>(std::move(found));
654 } else {
655 return load_map(e).then([e, this](unique_ptr<OSDMap> osdmap) {
656 return seastar::make_ready_future<cached_map_t>(
657 osdmaps.insert(e, std::move(osdmap)));
658 });
659 }
660 }
661
662 void OSD::store_map_bl(ceph::os::Transaction& t,
663 epoch_t e, bufferlist&& bl)
664 {
665 meta_coll->store_map(t, e, bl);
666 map_bl_cache.insert(e, std::move(bl));
667 }
668
669 seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
670 {
671 if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
672 return seastar::make_ready_future<bufferlist>(*found);
673 } else {
674 return meta_coll->load_map(e);
675 }
676 }
677
678 seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
679 {
680 auto o = std::make_unique<OSDMap>();
681 if (e > 0) {
682 return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
683 o->decode(bl);
684 return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
685 });
686 } else {
687 return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
688 }
689 }
690
691 seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
692 epoch_t start, Ref<MOSDMap> m)
693 {
694 return seastar::do_for_each(boost::make_counting_iterator(start),
695 boost::make_counting_iterator(m->get_last() + 1),
696 [&t, m, this](epoch_t e) {
697 if (auto p = m->maps.find(e); p != m->maps.end()) {
698 auto o = std::make_unique<OSDMap>();
699 o->decode(p->second);
700 logger().info("store_maps osdmap.{}", e);
701 store_map_bl(t, e, std::move(std::move(p->second)));
702 osdmaps.insert(e, std::move(o));
703 return seastar::now();
704 } else if (auto p = m->incremental_maps.find(e);
705 p != m->incremental_maps.end()) {
706 return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
707 OSDMap::Incremental inc;
708 auto i = bl.cbegin();
709 inc.decode(i);
710 o->apply_incremental(inc);
711 bufferlist fbl;
712 o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
713 store_map_bl(t, e, std::move(fbl));
714 osdmaps.insert(e, std::move(o));
715 return seastar::now();
716 });
717 } else {
718 logger().error("MOSDMap lied about what maps it had?");
719 return seastar::now();
720 }
721 });
722 }
723
724 bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
725 {
726 if (!conn->peer_is_mon()) {
727 logger().info("{} received from non-mon {}, {}",
728 __func__,
729 conn->get_peer_addr(),
730 *m);
731 return false;
732 }
733 return true;
734 }
735
736 seastar::future<Ref<PG>> OSD::handle_pg_create_info(
737 std::unique_ptr<PGCreateInfo> info) {
738 return seastar::do_with(
739 std::move(info),
740 [this](auto &info) -> seastar::future<Ref<PG>> {
741 return get_map(info->epoch).then(
742 [&info, this](cached_map_t startmap) ->
743 seastar::future<Ref<PG>, cached_map_t> {
744 const spg_t &pgid = info->pgid;
745 if (info->by_mon) {
746 int64_t pool_id = pgid.pgid.pool();
747 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
748 if (!pool) {
749 logger().debug(
750 "{} ignoring pgid {}, pool dne",
751 __func__,
752 pgid);
753 return seastar::make_ready_future<Ref<PG>, cached_map_t>(
754 Ref<PG>(),
755 startmap);
756 }
757 ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
758 if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
759 // this ensures we do not process old creating messages after the
760 // pool's initial pgs have been created (and pg are subsequently
761 // allowed to split or merge).
762 logger().debug(
763 "{} dropping {} create, pool does not have CREATING flag set",
764 __func__,
765 pgid);
766 return seastar::make_ready_future<Ref<PG>, cached_map_t>(
767 Ref<PG>(),
768 startmap);
769 }
770 }
771 return make_pg(startmap, pgid, true).then(
772 [startmap=std::move(startmap)](auto pg) mutable {
773 return seastar::make_ready_future<Ref<PG>, cached_map_t>(
774 std::move(pg),
775 std::move(startmap));
776 });
777 }).then([this, &info](auto pg, auto startmap) ->
778 seastar::future<Ref<PG>> {
779 if (!pg)
780 return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
781 PeeringCtx rctx{ceph_release_t::octopus};
782 const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
783
784 int up_primary, acting_primary;
785 vector<int> up, acting;
786 startmap->pg_to_up_acting_osds(
787 info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
788
789 int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard),
790 acting);
791
792 create_pg_collection(
793 rctx.transaction,
794 info->pgid,
795 info->pgid.get_split_bits(pp->get_pg_num()));
796 init_pg_ondisk(
797 rctx.transaction,
798 info->pgid,
799 pp);
800
801 pg->init(
802 role,
803 up,
804 up_primary,
805 acting,
806 acting_primary,
807 info->history,
808 info->past_intervals,
809 false,
810 rctx.transaction);
811
812 return shard_services.start_operation<PGAdvanceMap>(
813 *this, pg, pg->get_osdmap_epoch(),
814 osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
815 return seastar::make_ready_future<Ref<PG>>(pg);
816 });
817 });
818 });
819 }
820
821 seastar::future<> OSD::handle_osd_map(crimson::net::Connection* conn,
822 Ref<MOSDMap> m)
823 {
824 logger().info("handle_osd_map {}", *m);
825 if (m->fsid != superblock.cluster_fsid) {
826 logger().warn("fsid mismatched");
827 return seastar::now();
828 }
829 if (state.is_initializing()) {
830 logger().warn("i am still initializing");
831 return seastar::now();
832 }
833
834 const auto first = m->get_first();
835 const auto last = m->get_last();
836 logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
837 first, last, superblock.newest_map, m->oldest_map, m->newest_map);
838 // make sure there is something new, here, before we bother flushing
839 // the queues and such
840 if (last <= superblock.newest_map) {
841 return seastar::now();
842 }
843 // missing some?
844 bool skip_maps = false;
845 epoch_t start = superblock.newest_map + 1;
846 if (first > start) {
847 logger().info("handle_osd_map message skips epochs {}..{}",
848 start, first - 1);
849 if (m->oldest_map <= start) {
850 return shard_services.osdmap_subscribe(start, false);
851 }
852 // always try to get the full range of maps--as many as we can. this
853 // 1- is good to have
854 // 2- is at present the only way to ensure that we get a *full* map as
855 // the first map!
856 if (m->oldest_map < first) {
857 return shard_services.osdmap_subscribe(m->oldest_map - 1, true);
858 }
859 skip_maps = true;
860 start = first;
861 }
862
863 return seastar::do_with(ceph::os::Transaction{},
864 [=](auto& t) {
865 return store_maps(t, start, m).then([=, &t] {
866 // even if this map isn't from a mon, we may have satisfied our subscription
867 monc->sub_got("osdmap", last);
868 if (!superblock.oldest_map || skip_maps) {
869 superblock.oldest_map = first;
870 }
871 superblock.newest_map = last;
872 superblock.current_epoch = last;
873
874 // note in the superblock that we were clean thru the prior epoch
875 if (boot_epoch && boot_epoch >= superblock.mounted) {
876 superblock.mounted = boot_epoch;
877 superblock.clean_thru = last;
878 }
879 meta_coll->store_superblock(t, superblock);
880 return store->do_transaction(meta_coll->collection(), std::move(t));
881 });
882 }).then([=] {
883 // TODO: write to superblock and commit the transaction
884 return committed_osd_maps(start, last, m);
885 });
886 }
887
888 seastar::future<> OSD::committed_osd_maps(version_t first,
889 version_t last,
890 Ref<MOSDMap> m)
891 {
892 logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
893 // advance through the new maps
894 return seastar::do_for_each(boost::make_counting_iterator(first),
895 boost::make_counting_iterator(last + 1),
896 [this](epoch_t cur) {
897 return get_map(cur).then([this](cached_map_t&& o) {
898 osdmap = std::move(o);
899 shard_services.update_map(osdmap);
900 if (up_epoch == 0 &&
901 osdmap->is_up(whoami) &&
902 osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
903 up_epoch = osdmap->get_epoch();
904 if (!boot_epoch) {
905 boot_epoch = osdmap->get_epoch();
906 }
907 }
908 });
909 }).then([m, this] {
910 if (osdmap->is_up(whoami) &&
911 osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
912 bind_epoch < osdmap->get_up_from(whoami)) {
913 if (state.is_booting()) {
914 logger().info("osd.{}: activating...", whoami);
915 state.set_active();
916 beacon_timer.arm_periodic(
917 std::chrono::seconds(local_conf()->osd_beacon_report_interval));
918 heartbeat_timer.arm_periodic(
919 std::chrono::seconds(TICK_INTERVAL));
920 }
921 }
922 check_osdmap_features();
923 // yay!
924 return consume_map(osdmap->get_epoch());
925 }).then([m, this] {
926 if (state.is_active()) {
927 logger().info("osd.{}: now active", whoami);
928 if (!osdmap->exists(whoami)) {
929 return shutdown();
930 }
931 if (should_restart()) {
932 return restart();
933 } else {
934 return seastar::now();
935 }
936 } else if (state.is_preboot()) {
937 logger().info("osd.{}: now preboot", whoami);
938
939 if (m->get_source().is_mon()) {
940 return _preboot(m->oldest_map, m->newest_map);
941 } else {
942 logger().info("osd.{}: start_boot", whoami);
943 return start_boot();
944 }
945 } else {
946 logger().info("osd.{}: now {}", whoami, state);
947 // XXX
948 return seastar::now();
949 }
950 });
951 }
952
953 seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn,
954 Ref<MOSDOp> m)
955 {
956 shard_services.start_operation<ClientRequest>(
957 *this,
958 conn->get_shared(),
959 std::move(m));
960 return seastar::now();
961 }
962
963 seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn,
964 Ref<MOSDRepOp> m)
965 {
966 m->finish_decode();
967 shard_services.start_operation<RepRequest>(
968 *this,
969 conn->get_shared(),
970 std::move(m));
971 return seastar::now();
972 }
973
974 seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
975 Ref<MOSDRepOpReply> m)
976 {
977 const auto& pgs = pg_map.get_pgs();
978 if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
979 m->finish_decode();
980 pg->second->handle_rep_op_reply(conn, *m);
981 } else {
982 logger().warn("stale reply: {}", *m);
983 }
984 return seastar::now();
985 }
986
987 bool OSD::should_restart() const
988 {
989 if (!osdmap->is_up(whoami)) {
990 logger().info("map e {} marked osd.{} down",
991 osdmap->get_epoch(), whoami);
992 return true;
993 } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
994 logger().error("map e {} had wrong client addr ({} != my {})",
995 osdmap->get_epoch(),
996 osdmap->get_addrs(whoami),
997 public_msgr->get_myaddrs());
998 return true;
999 } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
1000 logger().error("map e {} had wrong cluster addr ({} != my {})",
1001 osdmap->get_epoch(),
1002 osdmap->get_cluster_addrs(whoami),
1003 cluster_msgr->get_myaddrs());
1004 return true;
1005 } else {
1006 return false;
1007 }
1008 }
1009
1010 seastar::future<> OSD::restart()
1011 {
1012 beacon_timer.cancel();
1013 heartbeat_timer.cancel();
1014 up_epoch = 0;
1015 bind_epoch = osdmap->get_epoch();
1016 // TODO: promote to shutdown if being marked down for multiple times
1017 // rebind messengers
1018 return start_boot();
1019 }
1020
1021 seastar::future<> OSD::shutdown()
1022 {
1023 // TODO
1024 superblock.mounted = boot_epoch;
1025 superblock.clean_thru = osdmap->get_epoch();
1026 return seastar::now();
1027 }
1028
1029 seastar::future<> OSD::send_beacon()
1030 {
1031 if (!state.is_active()) {
1032 return seastar::now();
1033 }
1034 // FIXME: min lec should be calculated from pg_stat
1035 // and should set m->pgs
1036 epoch_t min_last_epoch_clean = osdmap->get_epoch();
1037 auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
1038 min_last_epoch_clean,
1039 superblock.last_purged_snaps_scrub);
1040 return monc->send_message(m);
1041 }
1042
1043 seastar::future<> OSD::update_heartbeat_peers()
1044 {
1045 if (!state.is_active()) {
1046 return seastar::now();
1047 }
1048 for (auto& pg : pg_map.get_pgs()) {
1049 vector<int> up, acting;
1050 osdmap->pg_to_up_acting_osds(pg.first.pgid,
1051 &up, nullptr,
1052 &acting, nullptr);
1053 for (int osd : boost::join(up, acting)) {
1054 if (osd == CRUSH_ITEM_NONE || osd == whoami) {
1055 continue;
1056 } else {
1057 heartbeat->add_peer(osd, osdmap->get_epoch());
1058 }
1059 }
1060 }
1061 return heartbeat->update_peers(whoami);
1062 }
1063
1064 seastar::future<> OSD::handle_peering_op(
1065 crimson::net::Connection* conn,
1066 Ref<MOSDPeeringOp> m)
1067 {
1068 const int from = m->get_source().num();
1069 logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
1070 shard_services.start_operation<RemotePeeringEvent>(
1071 *this,
1072 conn->get_shared(),
1073 shard_services,
1074 pg_shard_t{from, m->get_spg().shard},
1075 m->get_spg(),
1076 std::move(*m->get_event()));
1077 return seastar::now();
1078 }
1079
1080 void OSD::check_osdmap_features()
1081 {
1082 heartbeat->set_require_authorizer(true);
1083 }
1084
1085 seastar::future<> OSD::consume_map(epoch_t epoch)
1086 {
1087 // todo: m-to-n: broadcast this news to all shards
1088 auto &pgs = pg_map.get_pgs();
1089 return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
1090 return shard_services.start_operation<PGAdvanceMap>(
1091 *this, pg.second, pg.second->get_osdmap_epoch(), epoch,
1092 PeeringCtx{ceph_release_t::octopus}, false).second;
1093 }).then([epoch, this] {
1094 osdmap_gate.got_map(epoch);
1095 return seastar::make_ready_future();
1096 });
1097 }
1098
1099
1100 blocking_future<Ref<PG>>
1101 OSD::get_or_create_pg(
1102 spg_t pgid,
1103 epoch_t epoch,
1104 std::unique_ptr<PGCreateInfo> info)
1105 {
1106 auto [fut, creating] = pg_map.get_pg(pgid, bool(info));
1107 if (!creating && info) {
1108 pg_map.set_creating(pgid);
1109 (void)handle_pg_create_info(std::move(info));
1110 }
1111 return std::move(fut);
1112 }
1113
1114 blocking_future<Ref<PG>> OSD::wait_for_pg(
1115 spg_t pgid)
1116 {
1117 return pg_map.get_pg(pgid).first;
1118 }
1119
1120 }