]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / osd.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "osd.h"
5
6 #include <sys/utsname.h>
7
8 #include <boost/iterator/counting_iterator.hpp>
9 #include <boost/range/join.hpp>
10 #include <boost/smart_ptr/make_local_shared.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <seastar/core/timer.hh>
14
15 #include "common/pick_address.h"
16 #include "include/util.h"
17
18 #include "messages/MCommand.h"
19 #include "messages/MOSDBeacon.h"
20 #include "messages/MOSDBoot.h"
21 #include "messages/MOSDMap.h"
22 #include "messages/MOSDMarkMeDown.h"
23 #include "messages/MOSDOp.h"
24 #include "messages/MOSDPeeringOp.h"
25 #include "messages/MOSDRepOpReply.h"
26 #include "messages/MOSDScrub2.h"
27 #include "messages/MPGStats.h"
28
29 #include "os/Transaction.h"
30 #include "osd/ClassHandler.h"
31 #include "osd/OSDCap.h"
32 #include "osd/PGPeeringEvent.h"
33 #include "osd/PeeringState.h"
34
35 #include "crimson/admin/osd_admin.h"
36 #include "crimson/admin/pg_commands.h"
37 #include "crimson/common/buffer_io.h"
38 #include "crimson/common/exception.h"
39 #include "crimson/mon/MonClient.h"
40 #include "crimson/net/Connection.h"
41 #include "crimson/net/Messenger.h"
42 #include "crimson/os/futurized_collection.h"
43 #include "crimson/os/futurized_store.h"
44 #include "crimson/osd/heartbeat.h"
45 #include "crimson/osd/osd_meta.h"
46 #include "crimson/osd/pg.h"
47 #include "crimson/osd/pg_backend.h"
48 #include "crimson/osd/pg_meta.h"
49 #include "crimson/osd/osd_operations/client_request.h"
50 #include "crimson/osd/osd_operations/compound_peering_request.h"
51 #include "crimson/osd/osd_operations/peering_event.h"
52 #include "crimson/osd/osd_operations/pg_advance_map.h"
53 #include "crimson/osd/osd_operations/recovery_subrequest.h"
54 #include "crimson/osd/osd_operations/replicated_request.h"
55
56 namespace {
57 seastar::logger& logger() {
58 return crimson::get_logger(ceph_subsys_osd);
59 }
60 static constexpr int TICK_INTERVAL = 1;
61 }
62
63 using std::make_unique;
64 using std::map;
65 using std::pair;
66 using std::string;
67 using std::unique_ptr;
68 using std::vector;
69
70 using crimson::common::local_conf;
71 using crimson::os::FuturizedStore;
72
73 namespace crimson::osd {
74
75 OSD::OSD(int id, uint32_t nonce,
76 crimson::os::FuturizedStore& store,
77 crimson::net::MessengerRef cluster_msgr,
78 crimson::net::MessengerRef public_msgr,
79 crimson::net::MessengerRef hb_front_msgr,
80 crimson::net::MessengerRef hb_back_msgr)
81 : whoami{id},
82 nonce{nonce},
83 // do this in background
84 beacon_timer{[this] { (void)send_beacon(); }},
85 cluster_msgr{cluster_msgr},
86 public_msgr{public_msgr},
87 monc{new crimson::mon::Client{*public_msgr, *this}},
88 mgrc{new crimson::mgr::Client{*public_msgr, *this}},
89 store{store},
90 shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, store},
91 heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
92 // do this in background
93 tick_timer{[this] {
94 update_heartbeat_peers();
95 update_stats();
96 }},
97 asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
98 osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))),
99 log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
100 clog(log_client.create_channel())
101 {
102 osdmaps[0] = boost::make_local_shared<OSDMap>();
103 for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
104 std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
105 msgr.get()->set_auth_server(monc.get());
106 msgr.get()->set_auth_client(monc.get());
107 }
108
109 if (local_conf()->osd_open_classes_on_start) {
110 const int r = ClassHandler::get_instance().open_all_classes();
111 if (r) {
112 logger().warn("{} warning: got an error loading one or more classes: {}",
113 __func__, cpp_strerror(r));
114 }
115 }
116 logger().info("{}: nonce is {}", __func__, nonce);
117 monc->set_log_client(&log_client);
118 clog->set_log_to_monitors(true);
119 }
120
121 OSD::~OSD() = default;
122
123 namespace {
124 // Initial features in new superblock.
125 // Features here are also automatically upgraded
126 CompatSet get_osd_initial_compat_set()
127 {
128 CompatSet::FeatureSet ceph_osd_feature_compat;
129 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
130 CompatSet::FeatureSet ceph_osd_feature_incompat;
131 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
132 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO);
133 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC);
134 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC);
135 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES);
136 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL);
137 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO);
138 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO);
139 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG);
140 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER);
141 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS);
142 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
143 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
144 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
145 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
146 return CompatSet(ceph_osd_feature_compat,
147 ceph_osd_feature_ro_compat,
148 ceph_osd_feature_incompat);
149 }
150 }
151
152 seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
153 {
154 return store.start().then([this, osd_uuid] {
155 return store.mkfs(osd_uuid).handle_error(
156 crimson::stateful_ec::handle([] (const auto& ec) {
157 logger().error("error creating empty object store in {}: ({}) {}",
158 local_conf().get_val<std::string>("osd_data"),
159 ec.value(), ec.message());
160 std::exit(EXIT_FAILURE);
161 }));
162 }).then([this] {
163 return store.mount().handle_error(
164 crimson::stateful_ec::handle([] (const auto& ec) {
165 logger().error("error mounting object store in {}: ({}) {}",
166 local_conf().get_val<std::string>("osd_data"),
167 ec.value(), ec.message());
168 std::exit(EXIT_FAILURE);
169 }));
170 }).then([cluster_fsid, this] {
171 superblock.cluster_fsid = cluster_fsid;
172 superblock.osd_fsid = store.get_fsid();
173 superblock.whoami = whoami;
174 superblock.compat_features = get_osd_initial_compat_set();
175 return _write_superblock();
176 }).then([cluster_fsid, this] {
177 return store.write_meta("ceph_fsid", cluster_fsid.to_string());
178 }).then([this] {
179 return store.write_meta("magic", CEPH_OSD_ONDISK_MAGIC);
180 }).then([this] {
181 return store.write_meta("whoami", std::to_string(whoami));
182 }).then([this] {
183 return _write_key_meta();
184 }).then([this] {
185 return store.write_meta("ready", "ready");
186 }).then([cluster_fsid, this] {
187 fmt::print("created object store {} for osd.{} fsid {}\n",
188 local_conf().get_val<std::string>("osd_data"),
189 whoami, cluster_fsid);
190 return seastar::now();
191 });
192 }
193
194 seastar::future<> OSD::_write_superblock()
195 {
196 return store.open_collection(coll_t::meta()).then([this] (auto ch) {
197 if (ch) {
198 // if we already have superblock, check if it matches
199 meta_coll = make_unique<OSDMeta>(ch, store);
200 return meta_coll->load_superblock().then([this](OSDSuperblock&& sb) {
201 if (sb.cluster_fsid != superblock.cluster_fsid) {
202 logger().error("provided cluster fsid {} != superblock's {}",
203 sb.cluster_fsid, superblock.cluster_fsid);
204 throw std::invalid_argument("mismatched fsid");
205 }
206 if (sb.whoami != superblock.whoami) {
207 logger().error("provided osd id {} != superblock's {}",
208 sb.whoami, superblock.whoami);
209 throw std::invalid_argument("mismatched osd id");
210 }
211 });
212 } else {
213 // meta collection does not yet, create superblock
214 logger().info(
215 "{} writing superblock cluster_fsid {} osd_fsid {}",
216 "_write_superblock",
217 superblock.cluster_fsid,
218 superblock.osd_fsid);
219 return store.create_new_collection(coll_t::meta()).then([this] (auto ch) {
220 meta_coll = make_unique<OSDMeta>(ch, store);
221 ceph::os::Transaction t;
222 meta_coll->create(t);
223 meta_coll->store_superblock(t, superblock);
224 logger().debug("OSD::_write_superblock: do_transaction...");
225 return store.do_transaction(meta_coll->collection(), std::move(t));
226 });
227 }
228 });
229 }
230
231 // this `to_string` sits in the `crimson::osd` namespace, so we don't brake
232 // the language rule on not overloading in `std::`.
233 static std::string to_string(const seastar::temporary_buffer<char>& temp_buf)
234 {
235 return {temp_buf.get(), temp_buf.size()};
236 }
237
238 seastar::future<> OSD::_write_key_meta()
239 {
240
241 if (auto key = local_conf().get_val<std::string>("key"); !std::empty(key)) {
242 return store.write_meta("osd_key", key);
243 } else if (auto keyfile = local_conf().get_val<std::string>("keyfile");
244 !std::empty(keyfile)) {
245 return read_file(keyfile).then([this] (const auto& temp_buf) {
246 // it's on a truly cold path, so don't worry about memcpy.
247 return store.write_meta("osd_key", to_string(temp_buf));
248 }).handle_exception([keyfile] (auto ep) {
249 logger().error("_write_key_meta: failed to handle keyfile {}: {}",
250 keyfile, ep);
251 ceph_abort();
252 });
253 } else {
254 return seastar::now();
255 }
256 }
257
258 namespace {
259 entity_addrvec_t pick_addresses(int what) {
260 entity_addrvec_t addrs;
261 crimson::common::CephContext cct;
262 // we're interested solely in v2; crimson doesn't do v1
263 const auto flags = what | CEPH_PICK_ADDRESS_MSGR2;
264 if (int r = ::pick_addresses(&cct, flags, &addrs, -1); r < 0) {
265 throw std::runtime_error("failed to pick address");
266 }
267 for (auto addr : addrs.v) {
268 logger().info("picked address {}", addr);
269 }
270 return addrs;
271 }
272 std::pair<entity_addrvec_t, bool>
273 replace_unknown_addrs(entity_addrvec_t maybe_unknowns,
274 const entity_addrvec_t& knowns) {
275 bool changed = false;
276 auto maybe_replace = [&](entity_addr_t addr) {
277 if (!addr.is_blank_ip()) {
278 return addr;
279 }
280 for (auto& b : knowns.v) {
281 if (addr.get_family() == b.get_family()) {
282 auto a = b;
283 a.set_nonce(addr.get_nonce());
284 a.set_type(addr.get_type());
285 a.set_port(addr.get_port());
286 changed = true;
287 return a;
288 }
289 }
290 throw std::runtime_error("failed to replace unknown address");
291 };
292 entity_addrvec_t replaced;
293 std::transform(maybe_unknowns.v.begin(),
294 maybe_unknowns.v.end(),
295 std::back_inserter(replaced.v),
296 maybe_replace);
297 return {replaced, changed};
298 }
299 }
300
301 seastar::future<> OSD::start()
302 {
303 logger().info("start");
304
305 startup_time = ceph::mono_clock::now();
306
307 return store.start().then([this] {
308 return store.mount().handle_error(
309 crimson::stateful_ec::handle([] (const auto& ec) {
310 logger().error("error mounting object store in {}: ({}) {}",
311 local_conf().get_val<std::string>("osd_data"),
312 ec.value(), ec.message());
313 std::exit(EXIT_FAILURE);
314 }));
315 }).then([this] {
316 return store.open_collection(coll_t::meta());
317 }).then([this](auto ch) {
318 meta_coll = make_unique<OSDMeta>(ch, store);
319 return meta_coll->load_superblock();
320 }).then([this](OSDSuperblock&& sb) {
321 superblock = std::move(sb);
322 return get_map(superblock.current_epoch);
323 }).then([this](cached_map_t&& map) {
324 shard_services.update_map(map);
325 osdmap_gate.got_map(map->get_epoch());
326 osdmap = std::move(map);
327 bind_epoch = osdmap->get_epoch();
328 return load_pgs();
329 }).then([this] {
330
331 uint64_t osd_required =
332 CEPH_FEATURE_UID |
333 CEPH_FEATURE_PGID64 |
334 CEPH_FEATURE_OSDENC;
335 using crimson::net::SocketPolicy;
336
337 public_msgr->set_default_policy(SocketPolicy::stateless_server(0));
338 public_msgr->set_policy(entity_name_t::TYPE_MON,
339 SocketPolicy::lossy_client(osd_required));
340 public_msgr->set_policy(entity_name_t::TYPE_MGR,
341 SocketPolicy::lossy_client(osd_required));
342 public_msgr->set_policy(entity_name_t::TYPE_OSD,
343 SocketPolicy::stateless_server(0));
344
345 cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0));
346 cluster_msgr->set_policy(entity_name_t::TYPE_MON,
347 SocketPolicy::lossy_client(0));
348 cluster_msgr->set_policy(entity_name_t::TYPE_OSD,
349 SocketPolicy::lossless_peer(osd_required));
350 cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
351 SocketPolicy::stateless_server(0));
352
353 crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()};
354 return seastar::when_all_succeed(
355 cluster_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER))
356 .safe_then([this, dispatchers]() mutable {
357 return cluster_msgr->start(dispatchers);
358 }, crimson::net::Messenger::bind_ertr::all_same_way(
359 [] (const std::error_code& e) {
360 logger().error("cluster messenger bind(): {}", e);
361 ceph_abort();
362 })),
363 public_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC))
364 .safe_then([this, dispatchers]() mutable {
365 return public_msgr->start(dispatchers);
366 }, crimson::net::Messenger::bind_ertr::all_same_way(
367 [] (const std::error_code& e) {
368 logger().error("public messenger bind(): {}", e);
369 ceph_abort();
370 })));
371 }).then_unpack([this] {
372 return seastar::when_all_succeed(monc->start(),
373 mgrc->start());
374 }).then_unpack([this] {
375 return _add_me_to_crush();
376 }).then([this] {
377 monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
378 monc->sub_want("mgrmap", 0, 0);
379 monc->sub_want("osdmap", 0, 0);
380 return monc->renew_subs();
381 }).then([this] {
382 if (auto [addrs, changed] =
383 replace_unknown_addrs(cluster_msgr->get_myaddrs(),
384 public_msgr->get_myaddrs()); changed) {
385 logger().debug("replacing unkwnown addrs of cluster messenger");
386 return cluster_msgr->set_myaddrs(addrs);
387 } else {
388 return seastar::now();
389 }
390 }).then([this] {
391 return heartbeat->start(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
392 pick_addresses(CEPH_PICK_ADDRESS_CLUSTER));
393 }).then([this] {
394 // create the admin-socket server, and the objects that register
395 // to handle incoming commands
396 return start_asok_admin();
397 }).then([this] {
398 return log_client.set_fsid(monc->get_fsid());
399 }).then([this] {
400 return start_boot();
401 });
402 }
403
404 seastar::future<> OSD::start_boot()
405 {
406 state.set_preboot();
407 return monc->get_version("osdmap").then([this](auto&& ret) {
408 auto [newest, oldest] = ret;
409 return _preboot(oldest, newest);
410 });
411 }
412
413 seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
414 {
415 logger().info("osd.{}: _preboot", whoami);
416 if (osdmap->get_epoch() == 0) {
417 logger().info("waiting for initial osdmap");
418 } else if (osdmap->is_destroyed(whoami)) {
419 logger().warn("osdmap says I am destroyed");
420 // provide a small margin so we don't livelock seeing if we
421 // un-destroyed ourselves.
422 if (osdmap->get_epoch() > newest - 1) {
423 throw std::runtime_error("i am destroyed");
424 }
425 } else if (osdmap->is_noup(whoami)) {
426 logger().warn("osdmap NOUP flag is set, waiting for it to clear");
427 } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
428 logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
429 } else if (osdmap->require_osd_release < ceph_release_t::octopus) {
430 logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
431 } else if (false) {
432 // TODO: update mon if current fullness state is different from osdmap
433 } else if (version_t n = local_conf()->osd_map_message_max;
434 osdmap->get_epoch() >= oldest - 1 &&
435 osdmap->get_epoch() + n > newest) {
436 return _send_boot();
437 }
438 // get all the latest maps
439 if (osdmap->get_epoch() + 1 >= oldest) {
440 return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false);
441 } else {
442 return shard_services.osdmap_subscribe(oldest - 1, true);
443 }
444 }
445
446 seastar::future<> OSD::_send_boot()
447 {
448 state.set_booting();
449
450 entity_addrvec_t public_addrs = public_msgr->get_myaddrs();
451 entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs();
452 entity_addrvec_t hb_back_addrs = heartbeat->get_back_addrs();
453 entity_addrvec_t hb_front_addrs = heartbeat->get_front_addrs();
454 if (cluster_msgr->set_addr_unknowns(public_addrs)) {
455 cluster_addrs = cluster_msgr->get_myaddrs();
456 }
457 if (heartbeat->get_back_msgr()->set_addr_unknowns(cluster_addrs)) {
458 hb_back_addrs = heartbeat->get_back_addrs();
459 }
460 if (heartbeat->get_front_msgr()->set_addr_unknowns(public_addrs)) {
461 hb_front_addrs = heartbeat->get_front_addrs();
462 }
463 logger().info("hb_back_msgr: {}", hb_back_addrs);
464 logger().info("hb_front_msgr: {}", hb_front_addrs);
465 logger().info("cluster_msgr: {}", cluster_addrs);
466
467 auto m = crimson::make_message<MOSDBoot>(superblock,
468 osdmap->get_epoch(),
469 boot_epoch,
470 hb_back_addrs,
471 hb_front_addrs,
472 cluster_addrs,
473 CEPH_FEATURES_ALL);
474 collect_sys_info(&m->metadata, NULL);
475 return monc->send_message(std::move(m));
476 }
477
478 seastar::future<> OSD::_add_me_to_crush()
479 {
480 if (!local_conf().get_val<bool>("osd_crush_update_on_start")) {
481 return seastar::now();
482 }
483 auto get_weight = [this] {
484 if (auto w = local_conf().get_val<double>("osd_crush_initial_weight");
485 w >= 0) {
486 return seastar::make_ready_future<double>(w);
487 } else {
488 return store.stat().then([](auto st) {
489 auto total = st.total;
490 return seastar::make_ready_future<double>(
491 std::max(.00001,
492 double(total) / double(1ull << 40))); // TB
493 });
494 }
495 };
496 return get_weight().then([this](auto weight) {
497 const crimson::crush::CrushLocation loc{make_unique<CephContext>().get()};
498 logger().info("{} crush location is {}", __func__, loc);
499 string cmd = fmt::format(R"({{
500 "prefix": "osd crush create-or-move",
501 "id": {},
502 "weight": {:.4f},
503 "args": [{}]
504 }})", whoami, weight, loc);
505 return monc->run_command(std::move(cmd), {});
506 }).then([](auto&& command_result) {
507 [[maybe_unused]] auto [code, message, out] = std::move(command_result);
508 if (code) {
509 logger().warn("fail to add to crush: {} ({})", message, code);
510 throw std::runtime_error("fail to add to crush");
511 } else {
512 logger().info("added to crush: {}", message);
513 }
514 return seastar::now();
515 });
516 }
517
518 seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
519 Ref<MCommand> m)
520 {
521 return asok->handle_command(conn, std::move(m));
522 }
523
524 /*
525 The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
526 to handle) registered to it:
527 - OSD's specific commands are handled by the OSD object;
528 - there are some common commands registered to be directly handled by the AdminSocket object
529 itself.
530 */
531 seastar::future<> OSD::start_asok_admin()
532 {
533 auto asok_path = local_conf().get_val<std::string>("admin_socket");
534 using namespace crimson::admin;
535 return asok->start(asok_path).then([this] {
536 asok->register_admin_commands();
537 asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this)));
538 asok->register_command(make_asok_hook<SendBeaconHook>(*this));
539 asok->register_command(make_asok_hook<FlushPgStatsHook>(*this));
540 asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this)));
541 asok->register_command(make_asok_hook<DumpMetricsHook>());
542 asok->register_command(make_asok_hook<DumpPerfCountersHook>());
543 asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services()));
544 asok->register_command(make_asok_hook<InjectMDataErrorHook>(get_shard_services()));
545 // PG commands
546 asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
547 asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
548 });
549 }
550
551 seastar::future<> OSD::stop()
552 {
553 logger().info("stop");
554 beacon_timer.cancel();
555 tick_timer.cancel();
556 // see also OSD::shutdown()
557 return prepare_to_stop().then([this] {
558 state.set_stopping();
559 logger().debug("prepared to stop");
560 public_msgr->stop();
561 cluster_msgr->stop();
562 auto gate_close_fut = gate.close();
563 return asok->stop().then([this] {
564 return heartbeat->stop();
565 }).then([this] {
566 return store.umount();
567 }).then([this] {
568 return store.stop();
569 }).then([this] {
570 return seastar::parallel_for_each(pg_map.get_pgs(),
571 [](auto& p) {
572 return p.second->stop();
573 });
574 }).then([this] {
575 return monc->stop();
576 }).then([this] {
577 return mgrc->stop();
578 }).then([fut=std::move(gate_close_fut)]() mutable {
579 return std::move(fut);
580 }).then([this] {
581 return when_all_succeed(
582 public_msgr->shutdown(),
583 cluster_msgr->shutdown()).discard_result();
584 }).handle_exception([](auto ep) {
585 logger().error("error while stopping osd: {}", ep);
586 });
587 });
588 }
589
590 void OSD::dump_status(Formatter* f) const
591 {
592 f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
593 f->dump_stream("osd_fsid") << superblock.osd_fsid;
594 f->dump_unsigned("whoami", superblock.whoami);
595 f->dump_string("state", state.to_string());
596 f->dump_unsigned("oldest_map", superblock.oldest_map);
597 f->dump_unsigned("newest_map", superblock.newest_map);
598 f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
599 }
600
601 void OSD::dump_pg_state_history(Formatter* f) const
602 {
603 f->open_array_section("pgs");
604 for (auto [pgid, pg] : pg_map.get_pgs()) {
605 f->open_object_section("pg");
606 f->dump_stream("pg") << pgid;
607 const auto& peering_state = pg->get_peering_state();
608 f->dump_string("currently", peering_state.get_current_state());
609 peering_state.dump_history(f);
610 f->close_section();
611 }
612 f->close_section();
613 }
614
615 void OSD::print(std::ostream& out) const
616 {
617 out << "{osd." << superblock.whoami << " "
618 << superblock.osd_fsid << " [" << superblock.oldest_map
619 << "," << superblock.newest_map << "] " << pg_map.get_pgs().size()
620 << " pgs}";
621 }
622
623 seastar::future<> OSD::load_pgs()
624 {
625 return store.list_collections().then([this](auto colls) {
626 return seastar::parallel_for_each(colls, [this](auto coll) {
627 spg_t pgid;
628 if (coll.is_pg(&pgid)) {
629 return load_pg(pgid).then([pgid, this](auto&& pg) {
630 logger().info("load_pgs: loaded {}", pgid);
631 pg_map.pg_loaded(pgid, std::move(pg));
632 shard_services.inc_pg_num();
633 return seastar::now();
634 });
635 } else if (coll.is_temp(&pgid)) {
636 // TODO: remove the collection
637 return seastar::now();
638 } else {
639 logger().warn("ignoring unrecognized collection: {}", coll);
640 return seastar::now();
641 }
642 });
643 });
644 }
645
646 seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
647 spg_t pgid,
648 bool do_create)
649 {
650 using ec_profile_t = map<string,string>;
651 auto get_pool_info = [create_map, pgid, this] {
652 if (create_map->have_pg_pool(pgid.pool())) {
653 pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
654 string name = create_map->get_pool_name(pgid.pool());
655 ec_profile_t ec_profile;
656 if (pi.is_erasure()) {
657 ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
658 }
659 return seastar::make_ready_future<std::tuple<pg_pool_t, string, ec_profile_t>>(
660 std::make_tuple(std::move(pi),
661 std::move(name),
662 std::move(ec_profile)));
663 } else {
664 // pool was deleted; grab final pg_pool_t off disk.
665 return meta_coll->load_final_pool_info(pgid.pool());
666 }
667 };
668 auto get_collection = [pgid, do_create, this] {
669 const coll_t cid{pgid};
670 if (do_create) {
671 return store.create_new_collection(cid);
672 } else {
673 return store.open_collection(cid);
674 }
675 };
676 return seastar::when_all(
677 std::move(get_pool_info),
678 std::move(get_collection)
679 ).then([pgid, create_map, this] (auto&& ret) {
680 auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
681 auto coll = std::move(std::get<1>(ret).get0());
682 return seastar::make_ready_future<Ref<PG>>(
683 new PG{pgid,
684 pg_shard_t{whoami, pgid.shard},
685 std::move(coll),
686 std::move(pool),
687 std::move(name),
688 create_map,
689 shard_services,
690 ec_profile});
691 });
692 }
693
694 seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
695 {
696 logger().debug("{}: {}", __func__, pgid);
697
698 return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
699 return pg_meta.get_epoch();
700 }).then([this](epoch_t e) {
701 return get_map(e);
702 }).then([pgid, this] (auto&& create_map) {
703 return make_pg(std::move(create_map), pgid, false);
704 }).then([this](Ref<PG> pg) {
705 return pg->read_state(&store).then([pg] {
706 return seastar::make_ready_future<Ref<PG>>(std::move(pg));
707 });
708 }).handle_exception([pgid](auto ep) {
709 logger().info("pg {} saw exception on load {}", pgid, ep);
710 ceph_abort("Could not load pg" == 0);
711 return seastar::make_exception_future<Ref<PG>>(ep);
712 });
713 }
714
715 std::optional<seastar::future<>>
716 OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
717 {
718 if (state.is_stopping()) {
719 return {};
720 }
721 bool dispatched = true;
722 gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
723 switch (m->get_type()) {
724 case CEPH_MSG_OSD_MAP:
725 return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
726 case CEPH_MSG_OSD_OP:
727 return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
728 case MSG_OSD_PG_CREATE2:
729 shard_services.start_operation<CompoundPeeringRequest>(
730 *this,
731 conn,
732 m);
733 return seastar::now();
734 case MSG_COMMAND:
735 return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
736 case MSG_OSD_MARK_ME_DOWN:
737 return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
738 case MSG_OSD_PG_PULL:
739 [[fallthrough]];
740 case MSG_OSD_PG_PUSH:
741 [[fallthrough]];
742 case MSG_OSD_PG_PUSH_REPLY:
743 [[fallthrough]];
744 case MSG_OSD_PG_RECOVERY_DELETE:
745 [[fallthrough]];
746 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
747 [[fallthrough]];
748 case MSG_OSD_PG_SCAN:
749 [[fallthrough]];
750 case MSG_OSD_PG_BACKFILL:
751 [[fallthrough]];
752 case MSG_OSD_PG_BACKFILL_REMOVE:
753 return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
754 case MSG_OSD_PG_LEASE:
755 [[fallthrough]];
756 case MSG_OSD_PG_LEASE_ACK:
757 [[fallthrough]];
758 case MSG_OSD_PG_NOTIFY2:
759 [[fallthrough]];
760 case MSG_OSD_PG_INFO2:
761 [[fallthrough]];
762 case MSG_OSD_PG_QUERY2:
763 [[fallthrough]];
764 case MSG_OSD_BACKFILL_RESERVE:
765 [[fallthrough]];
766 case MSG_OSD_RECOVERY_RESERVE:
767 [[fallthrough]];
768 case MSG_OSD_PG_LOG:
769 return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
770 case MSG_OSD_REPOP:
771 return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
772 case MSG_OSD_REPOPREPLY:
773 return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
774 case MSG_OSD_SCRUB2:
775 return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
776 default:
777 dispatched = false;
778 return seastar::now();
779 }
780 });
781 return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
782 }
783
784 void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
785 {
786 // TODO: cleanup the session attached to this connection
787 logger().warn("ms_handle_reset");
788 }
789
790 void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
791 {
792 logger().warn("ms_handle_remote_reset");
793 }
794
795 void OSD::handle_authentication(const EntityName& name,
796 const AuthCapsInfo& caps_info)
797 {
798 // TODO: store the parsed cap and associate it with the connection
799 if (caps_info.allow_all) {
800 logger().debug("{} {} has all caps", __func__, name);
801 return;
802 }
803 if (caps_info.caps.length() > 0) {
804 auto p = caps_info.caps.cbegin();
805 string str;
806 try {
807 decode(str, p);
808 } catch (ceph::buffer::error& e) {
809 logger().warn("{} {} failed to decode caps string", __func__, name);
810 return;
811 }
812 OSDCap caps;
813 if (caps.parse(str)) {
814 logger().debug("{} {} has caps {}", __func__, name, str);
815 } else {
816 logger().warn("{} {} failed to parse caps {}", __func__, name, str);
817 }
818 }
819 }
820
821 void OSD::update_stats()
822 {
823 osd_stat_seq++;
824 osd_stat.up_from = get_up_epoch();
825 osd_stat.hb_peers = heartbeat->get_peers();
826 osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
827 gate.dispatch_in_background("statfs", *this, [this] {
828 (void) store.stat().then([this](store_statfs_t&& st) {
829 osd_stat.statfs = st;
830 });
831 });
832 }
833
834 MessageURef OSD::get_stats() const
835 {
836 // todo: m-to-n: collect stats using map-reduce
837 // MPGStats::had_map_for is not used since PGMonitor was removed
838 auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
839 m->osd_stat = osd_stat;
840 for (auto [pgid, pg] : pg_map.get_pgs()) {
841 if (pg->is_primary()) {
842 auto stats = pg->get_stats();
843 // todo: update reported_epoch,reported_seq,last_fresh
844 stats.reported_epoch = osdmap->get_epoch();
845 m->pg_stat.emplace(pgid.pgid, std::move(stats));
846 }
847 }
848 return m;
849 }
850
851 uint64_t OSD::send_pg_stats()
852 {
853 // mgr client sends the report message in background
854 mgrc->report();
855 return osd_stat.seq;
856 }
857
858 OSD::cached_map_t OSD::get_map() const
859 {
860 return osdmap;
861 }
862
863 seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e)
864 {
865 // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
866 if (auto found = osdmaps.find(e); found) {
867 return seastar::make_ready_future<cached_map_t>(std::move(found));
868 } else {
869 return load_map(e).then([e, this](unique_ptr<OSDMap> osdmap) {
870 return seastar::make_ready_future<cached_map_t>(
871 osdmaps.insert(e, std::move(osdmap)));
872 });
873 }
874 }
875
876 void OSD::store_map_bl(ceph::os::Transaction& t,
877 epoch_t e, bufferlist&& bl)
878 {
879 meta_coll->store_map(t, e, bl);
880 map_bl_cache.insert(e, std::move(bl));
881 }
882
883 seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
884 {
885 if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
886 return seastar::make_ready_future<bufferlist>(*found);
887 } else {
888 return meta_coll->load_map(e);
889 }
890 }
891
892 seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls(
893 epoch_t first,
894 epoch_t last)
895 {
896 return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
897 boost::make_counting_iterator<epoch_t>(last + 1),
898 [this](epoch_t e) {
899 return load_map_bl(e).then([e](auto&& bl) {
900 return seastar::make_ready_future<pair<epoch_t, bufferlist>>(
901 std::make_pair(e, std::move(bl)));
902 });
903 },
904 std::map<epoch_t, bufferlist>{},
905 [](auto&& bls, auto&& epoch_bl) {
906 bls.emplace(std::move(epoch_bl));
907 return std::move(bls);
908 });
909 }
910
911 seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
912 {
913 auto o = std::make_unique<OSDMap>();
914 if (e > 0) {
915 return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
916 o->decode(bl);
917 return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
918 });
919 } else {
920 return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
921 }
922 }
923
924 seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
925 epoch_t start, Ref<MOSDMap> m)
926 {
927 return seastar::do_for_each(boost::make_counting_iterator(start),
928 boost::make_counting_iterator(m->get_last() + 1),
929 [&t, m, this](epoch_t e) {
930 if (auto p = m->maps.find(e); p != m->maps.end()) {
931 auto o = std::make_unique<OSDMap>();
932 o->decode(p->second);
933 logger().info("store_maps osdmap.{}", e);
934 store_map_bl(t, e, std::move(std::move(p->second)));
935 osdmaps.insert(e, std::move(o));
936 return seastar::now();
937 } else if (auto p = m->incremental_maps.find(e);
938 p != m->incremental_maps.end()) {
939 return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
940 OSDMap::Incremental inc;
941 auto i = bl.cbegin();
942 inc.decode(i);
943 o->apply_incremental(inc);
944 bufferlist fbl;
945 o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
946 store_map_bl(t, e, std::move(fbl));
947 osdmaps.insert(e, std::move(o));
948 return seastar::now();
949 });
950 } else {
951 logger().error("MOSDMap lied about what maps it had?");
952 return seastar::now();
953 }
954 });
955 }
956
957 bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
958 {
959 if (!conn->peer_is_mon()) {
960 logger().info("{} received from non-mon {}, {}",
961 __func__,
962 conn->get_peer_addr(),
963 *m);
964 return false;
965 }
966 return true;
967 }
968
969 seastar::future<Ref<PG>> OSD::handle_pg_create_info(
970 std::unique_ptr<PGCreateInfo> info) {
971 return seastar::do_with(
972 std::move(info),
973 [this](auto &info) -> seastar::future<Ref<PG>> {
974 return get_map(info->epoch).then(
975 [&info, this](cached_map_t startmap) ->
976 seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
977 const spg_t &pgid = info->pgid;
978 if (info->by_mon) {
979 int64_t pool_id = pgid.pgid.pool();
980 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
981 if (!pool) {
982 logger().debug(
983 "{} ignoring pgid {}, pool dne",
984 __func__,
985 pgid);
986 return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
987 std::make_tuple(Ref<PG>(), startmap));
988 }
989 ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
990 if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
991 // this ensures we do not process old creating messages after the
992 // pool's initial pgs have been created (and pg are subsequently
993 // allowed to split or merge).
994 logger().debug(
995 "{} dropping {} create, pool does not have CREATING flag set",
996 __func__,
997 pgid);
998 return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
999 std::make_tuple(Ref<PG>(), startmap));
1000 }
1001 }
1002 return make_pg(startmap, pgid, true).then(
1003 [startmap=std::move(startmap)](auto pg) mutable {
1004 return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
1005 std::make_tuple(std::move(pg), std::move(startmap)));
1006 });
1007 }).then([this, &info](auto&& ret) ->
1008 seastar::future<Ref<PG>> {
1009 auto [pg, startmap] = std::move(ret);
1010 if (!pg)
1011 return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
1012 const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
1013
1014 int up_primary, acting_primary;
1015 vector<int> up, acting;
1016 startmap->pg_to_up_acting_osds(
1017 info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
1018
1019 int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard),
1020 acting);
1021
1022 PeeringCtx rctx;
1023 create_pg_collection(
1024 rctx.transaction,
1025 info->pgid,
1026 info->pgid.get_split_bits(pp->get_pg_num()));
1027 init_pg_ondisk(
1028 rctx.transaction,
1029 info->pgid,
1030 pp);
1031
1032 pg->init(
1033 role,
1034 up,
1035 up_primary,
1036 acting,
1037 acting_primary,
1038 info->history,
1039 info->past_intervals,
1040 rctx.transaction);
1041
1042 return shard_services.start_operation<PGAdvanceMap>(
1043 *this, pg, pg->get_osdmap_epoch(),
1044 osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] {
1045 return seastar::make_ready_future<Ref<PG>>(pg);
1046 });
1047 });
1048 });
1049 }
1050
1051 seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
1052 Ref<MOSDMap> m)
1053 {
1054 logger().info("handle_osd_map {}", *m);
1055 if (m->fsid != superblock.cluster_fsid) {
1056 logger().warn("fsid mismatched");
1057 return seastar::now();
1058 }
1059 if (state.is_initializing()) {
1060 logger().warn("i am still initializing");
1061 return seastar::now();
1062 }
1063
1064 const auto first = m->get_first();
1065 const auto last = m->get_last();
1066 logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
1067 first, last, superblock.newest_map, m->oldest_map, m->newest_map);
1068 // make sure there is something new, here, before we bother flushing
1069 // the queues and such
1070 if (last <= superblock.newest_map) {
1071 return seastar::now();
1072 }
1073 // missing some?
1074 bool skip_maps = false;
1075 epoch_t start = superblock.newest_map + 1;
1076 if (first > start) {
1077 logger().info("handle_osd_map message skips epochs {}..{}",
1078 start, first - 1);
1079 if (m->oldest_map <= start) {
1080 return shard_services.osdmap_subscribe(start, false);
1081 }
1082 // always try to get the full range of maps--as many as we can. this
1083 // 1- is good to have
1084 // 2- is at present the only way to ensure that we get a *full* map as
1085 // the first map!
1086 if (m->oldest_map < first) {
1087 return shard_services.osdmap_subscribe(m->oldest_map - 1, true);
1088 }
1089 skip_maps = true;
1090 start = first;
1091 }
1092
1093 return seastar::do_with(ceph::os::Transaction{},
1094 [=](auto& t) {
1095 return store_maps(t, start, m).then([=, &t] {
1096 // even if this map isn't from a mon, we may have satisfied our subscription
1097 monc->sub_got("osdmap", last);
1098 if (!superblock.oldest_map || skip_maps) {
1099 superblock.oldest_map = first;
1100 }
1101 superblock.newest_map = last;
1102 superblock.current_epoch = last;
1103
1104 // note in the superblock that we were clean thru the prior epoch
1105 if (boot_epoch && boot_epoch >= superblock.mounted) {
1106 superblock.mounted = boot_epoch;
1107 superblock.clean_thru = last;
1108 }
1109 meta_coll->store_superblock(t, superblock);
1110 logger().debug("OSD::handle_osd_map: do_transaction...");
1111 return store.do_transaction(meta_coll->collection(), std::move(t));
1112 });
1113 }).then([=] {
1114 // TODO: write to superblock and commit the transaction
1115 return committed_osd_maps(start, last, m);
1116 });
1117 }
1118
1119 seastar::future<> OSD::committed_osd_maps(version_t first,
1120 version_t last,
1121 Ref<MOSDMap> m)
1122 {
1123 logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
1124 // advance through the new maps
1125 return seastar::do_for_each(boost::make_counting_iterator(first),
1126 boost::make_counting_iterator(last + 1),
1127 [this](epoch_t cur) {
1128 return get_map(cur).then([this](cached_map_t&& o) {
1129 osdmap = std::move(o);
1130 shard_services.update_map(osdmap);
1131 if (up_epoch == 0 &&
1132 osdmap->is_up(whoami) &&
1133 osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
1134 up_epoch = osdmap->get_epoch();
1135 if (!boot_epoch) {
1136 boot_epoch = osdmap->get_epoch();
1137 }
1138 }
1139 });
1140 }).then([m, this] {
1141 if (osdmap->is_up(whoami)) {
1142 const auto up_from = osdmap->get_up_from(whoami);
1143 logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
1144 whoami, osdmap->get_epoch(), up_from, bind_epoch, state);
1145 if (bind_epoch < up_from &&
1146 osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
1147 state.is_booting()) {
1148 logger().info("osd.{}: activating...", whoami);
1149 state.set_active();
1150 beacon_timer.arm_periodic(
1151 std::chrono::seconds(local_conf()->osd_beacon_report_interval));
1152 tick_timer.arm_periodic(
1153 std::chrono::seconds(TICK_INTERVAL));
1154 }
1155 } else {
1156 if (state.is_prestop()) {
1157 got_stop_ack();
1158 return seastar::now();
1159 }
1160 }
1161 return check_osdmap_features().then([this] {
1162 // yay!
1163 return consume_map(osdmap->get_epoch());
1164 });
1165 }).then([m, this] {
1166 if (state.is_active()) {
1167 logger().info("osd.{}: now active", whoami);
1168 if (!osdmap->exists(whoami) ||
1169 osdmap->is_stop(whoami)) {
1170 return shutdown();
1171 }
1172 if (should_restart()) {
1173 return restart();
1174 } else {
1175 return seastar::now();
1176 }
1177 } else if (state.is_preboot()) {
1178 logger().info("osd.{}: now preboot", whoami);
1179
1180 if (m->get_source().is_mon()) {
1181 return _preboot(m->oldest_map, m->newest_map);
1182 } else {
1183 logger().info("osd.{}: start_boot", whoami);
1184 return start_boot();
1185 }
1186 } else {
1187 logger().info("osd.{}: now {}", whoami, state);
1188 // XXX
1189 return seastar::now();
1190 }
1191 });
1192 }
1193
1194 seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
1195 Ref<MOSDOp> m)
1196 {
1197 (void) shard_services.start_operation<ClientRequest>(
1198 *this,
1199 conn,
1200 std::move(m));
1201 return seastar::now();
1202 }
1203
1204 seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
1205 epoch_t first)
1206 {
1207 if (first >= superblock.oldest_map) {
1208 return load_map_bls(first, superblock.newest_map)
1209 .then([this, conn, first](auto&& bls) {
1210 auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
1211 osdmap->get_encoding_features());
1212 m->oldest_map = first;
1213 m->newest_map = superblock.newest_map;
1214 m->maps = std::move(bls);
1215 return conn->send(std::move(m));
1216 });
1217 } else {
1218 return load_map_bl(osdmap->get_epoch())
1219 .then([this, conn](auto&& bl) mutable {
1220 auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
1221 osdmap->get_encoding_features());
1222 m->oldest_map = superblock.oldest_map;
1223 m->newest_map = superblock.newest_map;
1224 m->maps.emplace(osdmap->get_epoch(), std::move(bl));
1225 return conn->send(std::move(m));
1226 });
1227 }
1228 }
1229
1230 seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
1231 Ref<MOSDRepOp> m)
1232 {
1233 m->finish_decode();
1234 (void) shard_services.start_operation<RepRequest>(
1235 *this,
1236 std::move(conn),
1237 std::move(m));
1238 return seastar::now();
1239 }
1240
1241 seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
1242 Ref<MOSDRepOpReply> m)
1243 {
1244 const auto& pgs = pg_map.get_pgs();
1245 if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
1246 m->finish_decode();
1247 pg->second->handle_rep_op_reply(conn, *m);
1248 } else {
1249 logger().warn("stale reply: {}", *m);
1250 }
1251 return seastar::now();
1252 }
1253
1254 seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
1255 Ref<MOSDScrub2> m)
1256 {
1257 if (m->fsid != superblock.cluster_fsid) {
1258 logger().warn("fsid mismatched");
1259 return seastar::now();
1260 }
1261 return seastar::parallel_for_each(std::move(m->scrub_pgs),
1262 [m, conn, this](spg_t pgid) {
1263 pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
1264 pgid.shard};
1265 PeeringState::RequestScrub scrub_request{m->deep, m->repair};
1266 return shard_services.start_operation<RemotePeeringEvent>(
1267 *this,
1268 conn,
1269 shard_services,
1270 from_shard,
1271 pgid,
1272 PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
1273 });
1274 }
1275
1276 seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
1277 Ref<MOSDMarkMeDown> m)
1278 {
1279 if (state.is_prestop()) {
1280 got_stop_ack();
1281 }
1282 return seastar::now();
1283 }
1284
1285 seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
1286 Ref<MOSDFastDispatchOp> m)
1287 {
1288 (void) shard_services.start_operation<RecoverySubRequest>(
1289 *this,
1290 conn,
1291 std::move(m));
1292 return seastar::now();
1293 }
1294
1295 bool OSD::should_restart() const
1296 {
1297 if (!osdmap->is_up(whoami)) {
1298 logger().info("map e {} marked osd.{} down",
1299 osdmap->get_epoch(), whoami);
1300 return true;
1301 } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
1302 logger().error("map e {} had wrong client addr ({} != my {})",
1303 osdmap->get_epoch(),
1304 osdmap->get_addrs(whoami),
1305 public_msgr->get_myaddrs());
1306 return true;
1307 } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
1308 logger().error("map e {} had wrong cluster addr ({} != my {})",
1309 osdmap->get_epoch(),
1310 osdmap->get_cluster_addrs(whoami),
1311 cluster_msgr->get_myaddrs());
1312 return true;
1313 } else {
1314 return false;
1315 }
1316 }
1317
1318 seastar::future<> OSD::restart()
1319 {
1320 beacon_timer.cancel();
1321 tick_timer.cancel();
1322 up_epoch = 0;
1323 bind_epoch = osdmap->get_epoch();
1324 // TODO: promote to shutdown if being marked down for multiple times
1325 // rebind messengers
1326 return start_boot();
1327 }
1328
1329 seastar::future<> OSD::shutdown()
1330 {
1331 // TODO
1332 superblock.mounted = boot_epoch;
1333 superblock.clean_thru = osdmap->get_epoch();
1334 return seastar::now();
1335 }
1336
1337 seastar::future<> OSD::send_beacon()
1338 {
1339 if (!state.is_active()) {
1340 return seastar::now();
1341 }
1342 // FIXME: min lec should be calculated from pg_stat
1343 // and should set m->pgs
1344 epoch_t min_last_epoch_clean = osdmap->get_epoch();
1345 auto m = crimson::make_message<MOSDBeacon>(osdmap->get_epoch(),
1346 min_last_epoch_clean,
1347 superblock.last_purged_snaps_scrub,
1348 local_conf()->osd_beacon_report_interval);
1349 return monc->send_message(std::move(m));
1350 }
1351
1352 void OSD::update_heartbeat_peers()
1353 {
1354 if (!state.is_active()) {
1355 return;
1356 }
1357 for (auto& pg : pg_map.get_pgs()) {
1358 vector<int> up, acting;
1359 osdmap->pg_to_up_acting_osds(pg.first.pgid,
1360 &up, nullptr,
1361 &acting, nullptr);
1362 for (int osd : boost::join(up, acting)) {
1363 if (osd == CRUSH_ITEM_NONE || osd == whoami) {
1364 continue;
1365 } else {
1366 heartbeat->add_peer(osd, osdmap->get_epoch());
1367 }
1368 }
1369 }
1370 heartbeat->update_peers(whoami);
1371 }
1372
1373 seastar::future<> OSD::handle_peering_op(
1374 crimson::net::ConnectionRef conn,
1375 Ref<MOSDPeeringOp> m)
1376 {
1377 const int from = m->get_source().num();
1378 logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
1379 std::unique_ptr<PGPeeringEvent> evt(m->get_event());
1380 (void) shard_services.start_operation<RemotePeeringEvent>(
1381 *this,
1382 conn,
1383 shard_services,
1384 pg_shard_t{from, m->get_spg().shard},
1385 m->get_spg(),
1386 std::move(*evt));
1387 return seastar::now();
1388 }
1389
1390 seastar::future<> OSD::check_osdmap_features()
1391 {
1392 heartbeat->set_require_authorizer(true);
1393 return store.write_meta("require_osd_release",
1394 stringify((int)osdmap->require_osd_release));
1395 }
1396
1397 seastar::future<> OSD::consume_map(epoch_t epoch)
1398 {
1399 // todo: m-to-n: broadcast this news to all shards
1400 auto &pgs = pg_map.get_pgs();
1401 return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
1402 return shard_services.start_operation<PGAdvanceMap>(
1403 *this, pg.second, pg.second->get_osdmap_epoch(), epoch,
1404 PeeringCtx{}, false).second;
1405 }).then([epoch, this] {
1406 osdmap_gate.got_map(epoch);
1407 return seastar::make_ready_future();
1408 });
1409 }
1410
1411
1412 blocking_future<Ref<PG>>
1413 OSD::get_or_create_pg(
1414 spg_t pgid,
1415 epoch_t epoch,
1416 std::unique_ptr<PGCreateInfo> info)
1417 {
1418 if (info) {
1419 auto [fut, creating] = pg_map.wait_for_pg(pgid);
1420 if (!creating) {
1421 pg_map.set_creating(pgid);
1422 (void)handle_pg_create_info(std::move(info));
1423 }
1424 return std::move(fut);
1425 } else {
1426 return make_ready_blocking_future<Ref<PG>>(pg_map.get_pg(pgid));
1427 }
1428 }
1429
1430 blocking_future<Ref<PG>> OSD::wait_for_pg(
1431 spg_t pgid)
1432 {
1433 return pg_map.wait_for_pg(pgid).first;
1434 }
1435
1436 Ref<PG> OSD::get_pg(spg_t pgid)
1437 {
1438 return pg_map.get_pg(pgid);
1439 }
1440
1441 seastar::future<> OSD::prepare_to_stop()
1442 {
1443 if (osdmap && osdmap->is_up(whoami)) {
1444 state.set_prestop();
1445 const auto timeout =
1446 std::chrono::duration_cast<std::chrono::milliseconds>(
1447 std::chrono::duration<double>(
1448 local_conf().get_val<double>("osd_mon_shutdown_timeout")));
1449
1450 return seastar::with_timeout(
1451 seastar::timer<>::clock::now() + timeout,
1452 monc->send_message(
1453 crimson::make_message<MOSDMarkMeDown>(
1454 monc->get_fsid(),
1455 whoami,
1456 osdmap->get_addrs(whoami),
1457 osdmap->get_epoch(),
1458 true)).then([this] {
1459 return stop_acked.get_future();
1460 })
1461 ).handle_exception_type(
1462 [](seastar::timed_out_error&) {
1463 return seastar::now();
1464 });
1465 }
1466 return seastar::now();
1467 }
1468
1469 }