]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/osd.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / osd.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
11fdf7f2
TL
4#include "osd.h"
5
9f95a23c
TL
6#include <sys/utsname.h>
7
8#include <boost/iterator/counting_iterator.hpp>
11fdf7f2
TL
9#include <boost/range/join.hpp>
10#include <boost/smart_ptr/make_local_shared.hpp>
9f95a23c
TL
11#include <fmt/format.h>
12#include <fmt/ostream.h>
f67539c2 13#include <seastar/core/timer.hh>
11fdf7f2
TL
14
15#include "common/pick_address.h"
9f95a23c
TL
16#include "include/util.h"
17
f67539c2 18#include "messages/MCommand.h"
11fdf7f2
TL
19#include "messages/MOSDBeacon.h"
20#include "messages/MOSDBoot.h"
21#include "messages/MOSDMap.h"
f67539c2 22#include "messages/MOSDMarkMeDown.h"
9f95a23c
TL
23#include "messages/MOSDOp.h"
24#include "messages/MOSDPGLog.h"
f67539c2
TL
25#include "messages/MOSDPGPull.h"
26#include "messages/MOSDPGPush.h"
27#include "messages/MOSDPGPushReply.h"
28#include "messages/MOSDPGRecoveryDelete.h"
29#include "messages/MOSDPGRecoveryDeleteReply.h"
9f95a23c 30#include "messages/MOSDRepOpReply.h"
f67539c2 31#include "messages/MOSDScrub2.h"
9f95a23c
TL
32#include "messages/MPGStats.h"
33
34#include "os/Transaction.h"
35#include "osd/ClassHandler.h"
f67539c2 36#include "osd/OSDCap.h"
9f95a23c
TL
37#include "osd/PGPeeringEvent.h"
38#include "osd/PeeringState.h"
39
f67539c2
TL
40#include "crimson/admin/osd_admin.h"
41#include "crimson/admin/pg_commands.h"
42#include "crimson/common/exception.h"
9f95a23c 43#include "crimson/mon/MonClient.h"
11fdf7f2
TL
44#include "crimson/net/Connection.h"
45#include "crimson/net/Messenger.h"
9f95a23c
TL
46#include "crimson/os/futurized_collection.h"
47#include "crimson/os/futurized_store.h"
11fdf7f2
TL
48#include "crimson/osd/heartbeat.h"
49#include "crimson/osd/osd_meta.h"
50#include "crimson/osd/pg.h"
9f95a23c 51#include "crimson/osd/pg_backend.h"
11fdf7f2 52#include "crimson/osd/pg_meta.h"
9f95a23c
TL
53#include "crimson/osd/osd_operations/client_request.h"
54#include "crimson/osd/osd_operations/compound_peering_request.h"
55#include "crimson/osd/osd_operations/peering_event.h"
56#include "crimson/osd/osd_operations/pg_advance_map.h"
f67539c2 57#include "crimson/osd/osd_operations/recovery_subrequest.h"
9f95a23c 58#include "crimson/osd/osd_operations/replicated_request.h"
11fdf7f2
TL
59
60namespace {
61 seastar::logger& logger() {
9f95a23c 62 return crimson::get_logger(ceph_subsys_osd);
11fdf7f2
TL
63 }
64 static constexpr int TICK_INTERVAL = 1;
65}
66
9f95a23c
TL
67using crimson::common::local_conf;
68using crimson::os::FuturizedStore;
69
70namespace crimson::osd {
11fdf7f2 71
9f95a23c
TL
72OSD::OSD(int id, uint32_t nonce,
73 crimson::net::MessengerRef cluster_msgr,
74 crimson::net::MessengerRef public_msgr,
75 crimson::net::MessengerRef hb_front_msgr,
76 crimson::net::MessengerRef hb_back_msgr)
11fdf7f2
TL
77 : whoami{id},
78 nonce{nonce},
9f95a23c
TL
79 // do this in background
80 beacon_timer{[this] { (void)send_beacon(); }},
81 cluster_msgr{cluster_msgr},
82 public_msgr{public_msgr},
83 monc{new crimson::mon::Client{*public_msgr, *this}},
84 mgrc{new crimson::mgr::Client{*public_msgr, *this}},
85 store{crimson::os::FuturizedStore::create(
86 local_conf().get_val<std::string>("osd_objectstore"),
87 local_conf().get_val<std::string>("osd_data"),
88 local_conf().get_config_values())},
f67539c2
TL
89 shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
90 heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
9f95a23c 91 // do this in background
f67539c2
TL
92 tick_timer{[this] {
93 update_heartbeat_peers();
94 update_stats();
95 }},
9f95a23c
TL
96 asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
97 osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
11fdf7f2
TL
98{
99 osdmaps[0] = boost::make_local_shared<OSDMap>();
9f95a23c
TL
100 for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
101 std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
102 msgr.get()->set_auth_server(monc.get());
103 msgr.get()->set_auth_client(monc.get());
104 }
105
106 if (local_conf()->osd_open_classes_on_start) {
107 const int r = ClassHandler::get_instance().open_all_classes();
108 if (r) {
109 logger().warn("{} warning: got an error loading one or more classes: {}",
110 __func__, cpp_strerror(r));
111 }
112 }
11fdf7f2
TL
113}
114
115OSD::~OSD() = default;
116
117namespace {
118// Initial features in new superblock.
119// Features here are also automatically upgraded
120CompatSet get_osd_initial_compat_set()
121{
122 CompatSet::FeatureSet ceph_osd_feature_compat;
123 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
124 CompatSet::FeatureSet ceph_osd_feature_incompat;
125 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
126 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO);
127 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC);
128 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC);
129 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES);
130 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL);
131 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO);
132 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO);
133 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG);
134 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER);
135 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS);
136 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
137 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
138 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
139 ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
140 return CompatSet(ceph_osd_feature_compat,
141 ceph_osd_feature_ro_compat,
142 ceph_osd_feature_incompat);
143}
144}
145
9f95a23c 146seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
11fdf7f2 147{
9f95a23c
TL
148 return store->start().then([this, osd_uuid] {
149 return store->mkfs(osd_uuid);
150 }).then([this] {
11fdf7f2 151 return store->mount();
9f95a23c 152 }).then([cluster_fsid, this] {
11fdf7f2 153 superblock.cluster_fsid = cluster_fsid;
9f95a23c 154 superblock.osd_fsid = store->get_fsid();
11fdf7f2
TL
155 superblock.whoami = whoami;
156 superblock.compat_features = get_osd_initial_compat_set();
157
9f95a23c
TL
158 logger().info(
159 "{} writing superblock cluster_fsid {} osd_fsid {}",
160 __func__,
161 cluster_fsid,
162 superblock.osd_fsid);
163 return store->create_new_collection(coll_t::meta());
164 }).then([this] (auto ch) {
165 meta_coll = make_unique<OSDMeta>(ch , store.get());
11fdf7f2
TL
166 ceph::os::Transaction t;
167 meta_coll->create(t);
168 meta_coll->store_superblock(t, superblock);
169 return store->do_transaction(meta_coll->collection(), std::move(t));
170 }).then([cluster_fsid, this] {
9f95a23c
TL
171 return when_all_succeed(
172 store->write_meta("ceph_fsid", cluster_fsid.to_string()),
173 store->write_meta("whoami", std::to_string(whoami)));
f67539c2 174 }).then_unpack([cluster_fsid, this] {
9f95a23c
TL
175 fmt::print("created object store {} for osd.{} fsid {}\n",
176 local_conf().get_val<std::string>("osd_data"),
177 whoami, cluster_fsid);
11fdf7f2
TL
178 return seastar::now();
179 });
180}
181
182namespace {
183 entity_addrvec_t pick_addresses(int what) {
184 entity_addrvec_t addrs;
9f95a23c 185 crimson::common::CephContext cct;
11fdf7f2
TL
186 if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) {
187 throw std::runtime_error("failed to pick address");
188 }
11fdf7f2 189 for (auto addr : addrs.v) {
11fdf7f2
TL
190 logger().info("picked address {}", addr);
191 }
192 return addrs;
193 }
194 std::pair<entity_addrvec_t, bool>
195 replace_unknown_addrs(entity_addrvec_t maybe_unknowns,
196 const entity_addrvec_t& knowns) {
197 bool changed = false;
198 auto maybe_replace = [&](entity_addr_t addr) {
199 if (!addr.is_blank_ip()) {
200 return addr;
201 }
202 for (auto& b : knowns.v) {
203 if (addr.get_family() == b.get_family()) {
204 auto a = b;
205 a.set_nonce(addr.get_nonce());
206 a.set_type(addr.get_type());
207 a.set_port(addr.get_port());
208 changed = true;
209 return a;
210 }
211 }
212 throw std::runtime_error("failed to replace unknown address");
213 };
214 entity_addrvec_t replaced;
215 std::transform(maybe_unknowns.v.begin(),
216 maybe_unknowns.v.end(),
217 std::back_inserter(replaced.v),
218 maybe_replace);
219 return {replaced, changed};
220 }
221}
222
223seastar::future<> OSD::start()
224{
225 logger().info("start");
226
9f95a23c 227 startup_time = ceph::mono_clock::now();
11fdf7f2 228
9f95a23c 229 return store->start().then([this] {
11fdf7f2
TL
230 return store->mount();
231 }).then([this] {
9f95a23c
TL
232 return store->open_collection(coll_t::meta());
233 }).then([this](auto ch) {
234 meta_coll = make_unique<OSDMeta>(ch, store.get());
11fdf7f2
TL
235 return meta_coll->load_superblock();
236 }).then([this](OSDSuperblock&& sb) {
237 superblock = std::move(sb);
238 return get_map(superblock.current_epoch);
239 }).then([this](cached_map_t&& map) {
9f95a23c
TL
240 shard_services.update_map(map);
241 osdmap_gate.got_map(map->get_epoch());
11fdf7f2
TL
242 osdmap = std::move(map);
243 return load_pgs();
244 }).then([this] {
9f95a23c
TL
245
246 uint64_t osd_required =
247 CEPH_FEATURE_UID |
248 CEPH_FEATURE_PGID64 |
249 CEPH_FEATURE_OSDENC;
250 using crimson::net::SocketPolicy;
251
252 public_msgr->set_default_policy(SocketPolicy::stateless_server(0));
253 public_msgr->set_policy(entity_name_t::TYPE_MON,
254 SocketPolicy::lossy_client(osd_required));
255 public_msgr->set_policy(entity_name_t::TYPE_MGR,
256 SocketPolicy::lossy_client(osd_required));
257 public_msgr->set_policy(entity_name_t::TYPE_OSD,
258 SocketPolicy::stateless_server(0));
259
260 cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0));
261 cluster_msgr->set_policy(entity_name_t::TYPE_MON,
262 SocketPolicy::lossy_client(0));
263 cluster_msgr->set_policy(entity_name_t::TYPE_OSD,
264 SocketPolicy::lossless_peer(osd_required));
265 cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
266 SocketPolicy::stateless_server(0));
267
f67539c2 268 crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()};
11fdf7f2
TL
269 return seastar::when_all_succeed(
270 cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
271 local_conf()->ms_bind_port_min,
272 local_conf()->ms_bind_port_max)
f67539c2
TL
273 .safe_then([this, dispatchers]() mutable {
274 return cluster_msgr->start(dispatchers);
275 }, crimson::net::Messenger::bind_ertr::all_same_way(
276 [] (const std::error_code& e) {
277 logger().error("cluster messenger try_bind(): address range is unavailable.");
278 ceph_abort();
279 })),
11fdf7f2
TL
280 public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
281 local_conf()->ms_bind_port_min,
282 local_conf()->ms_bind_port_max)
f67539c2
TL
283 .safe_then([this, dispatchers]() mutable {
284 return public_msgr->start(dispatchers);
285 }, crimson::net::Messenger::bind_ertr::all_same_way(
286 [] (const std::error_code& e) {
287 logger().error("public messenger try_bind(): address range is unavailable.");
288 ceph_abort();
289 })));
290 }).then_unpack([this] {
9f95a23c
TL
291 return seastar::when_all_succeed(monc->start(),
292 mgrc->start());
f67539c2 293 }).then_unpack([this] {
9f95a23c 294 return _add_me_to_crush();
11fdf7f2
TL
295 }).then([this] {
296 monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
297 monc->sub_want("mgrmap", 0, 0);
298 monc->sub_want("osdmap", 0, 0);
299 return monc->renew_subs();
300 }).then([this] {
301 if (auto [addrs, changed] =
302 replace_unknown_addrs(cluster_msgr->get_myaddrs(),
303 public_msgr->get_myaddrs()); changed) {
9f95a23c
TL
304 return cluster_msgr->set_myaddrs(addrs);
305 } else {
306 return seastar::now();
11fdf7f2 307 }
9f95a23c 308 }).then([this] {
11fdf7f2
TL
309 return heartbeat->start(public_msgr->get_myaddrs(),
310 cluster_msgr->get_myaddrs());
9f95a23c
TL
311 }).then([this] {
312 // create the admin-socket server, and the objects that register
313 // to handle incoming commands
314 return start_asok_admin();
11fdf7f2
TL
315 }).then([this] {
316 return start_boot();
317 });
318}
319
320seastar::future<> OSD::start_boot()
321{
322 state.set_preboot();
f67539c2
TL
323 return monc->get_version("osdmap").then([this](auto&& ret) {
324 auto [newest, oldest] = ret;
11fdf7f2
TL
325 return _preboot(oldest, newest);
326 });
327}
328
329seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
330{
331 logger().info("osd.{}: _preboot", whoami);
332 if (osdmap->get_epoch() == 0) {
f67539c2 333 logger().info("waiting for initial osdmap");
11fdf7f2
TL
334 } else if (osdmap->is_destroyed(whoami)) {
335 logger().warn("osdmap says I am destroyed");
336 // provide a small margin so we don't livelock seeing if we
337 // un-destroyed ourselves.
338 if (osdmap->get_epoch() > newest - 1) {
339 throw std::runtime_error("i am destroyed");
340 }
81eedcae 341 } else if (osdmap->is_noup(whoami)) {
11fdf7f2
TL
342 logger().warn("osdmap NOUP flag is set, waiting for it to clear");
343 } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
344 logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
9f95a23c
TL
345 } else if (osdmap->require_osd_release < ceph_release_t::octopus) {
346 logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
11fdf7f2
TL
347 } else if (false) {
348 // TODO: update mon if current fullness state is different from osdmap
349 } else if (version_t n = local_conf()->osd_map_message_max;
350 osdmap->get_epoch() >= oldest - 1 &&
351 osdmap->get_epoch() + n > newest) {
352 return _send_boot();
353 }
354 // get all the latest maps
355 if (osdmap->get_epoch() + 1 >= oldest) {
9f95a23c 356 return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false);
11fdf7f2 357 } else {
9f95a23c 358 return shard_services.osdmap_subscribe(oldest - 1, true);
11fdf7f2
TL
359 }
360}
361
362seastar::future<> OSD::_send_boot()
363{
364 state.set_booting();
365
366 logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
367 logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
368 logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
369 auto m = make_message<MOSDBoot>(superblock,
370 osdmap->get_epoch(),
371 osdmap->get_epoch(),
372 heartbeat->get_back_addrs(),
373 heartbeat->get_front_addrs(),
374 cluster_msgr->get_myaddrs(),
375 CEPH_FEATURES_ALL);
9f95a23c 376 collect_sys_info(&m->metadata, NULL);
11fdf7f2
TL
377 return monc->send_message(m);
378}
379
9f95a23c
TL
380seastar::future<> OSD::_add_me_to_crush()
381{
382 if (!local_conf().get_val<bool>("osd_crush_update_on_start")) {
383 return seastar::now();
384 }
385 auto get_weight = [this] {
386 if (auto w = local_conf().get_val<double>("osd_crush_initial_weight");
387 w >= 0) {
388 return seastar::make_ready_future<double>(w);
389 } else {
390 return store->stat().then([](auto st) {
391 auto total = st.total;
392 return seastar::make_ready_future<double>(
393 std::max(.00001,
394 double(total) / double(1ull << 40))); // TB
395 });
396 }
397 };
398 return get_weight().then([this](auto weight) {
399 const crimson::crush::CrushLocation loc{make_unique<CephContext>().get()};
400 logger().info("{} crush location is {}", __func__, loc);
401 string cmd = fmt::format(R"({{
402 "prefix": "osd crush create-or-move",
403 "id": {},
404 "weight": {:.4f},
405 "args": [{}]
406 }})", whoami, weight, loc);
407 return monc->run_command({cmd}, {});
f67539c2
TL
408 }).then([](auto&& command_result) {
409 [[maybe_unused]] auto [code, message, out] = std::move(command_result);
9f95a23c
TL
410 if (code) {
411 logger().warn("fail to add to crush: {} ({})", message, code);
412 throw std::runtime_error("fail to add to crush");
413 } else {
414 logger().info("added to crush: {}", message);
415 }
416 return seastar::now();
417 });
418}
419
f67539c2
TL
420seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
421 Ref<MCommand> m)
9f95a23c 422{
f67539c2 423 return asok->handle_command(conn, std::move(m));
9f95a23c
TL
424}
425
426/*
427 The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
428 to handle) registered to it:
429 - OSD's specific commands are handled by the OSD object;
430 - there are some common commands registered to be directly handled by the AdminSocket object
431 itself.
432*/
433seastar::future<> OSD::start_asok_admin()
434{
435 auto asok_path = local_conf().get_val<std::string>("admin_socket");
436 using namespace crimson::admin;
437 return asok->start(asok_path).then([this] {
438 return seastar::when_all_succeed(
439 asok->register_admin_commands(),
f67539c2 440 asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this))),
9f95a23c 441 asok->register_command(make_asok_hook<SendBeaconHook>(*this)),
f67539c2
TL
442 asok->register_command(make_asok_hook<FlushPgStatsHook>(*this)),
443 asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this))),
444 asok->register_command(make_asok_hook<SeastarMetricsHook>()),
445 // PG commands
446 asok->register_command(make_asok_hook<pg::QueryCommand>(*this)),
447 asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this)));
448 }).then_unpack([] {
449 return seastar::now();
9f95a23c
TL
450 });
451}
452
11fdf7f2
TL
453seastar::future<> OSD::stop()
454{
9f95a23c 455 logger().info("stop");
11fdf7f2 456 // see also OSD::shutdown()
f67539c2
TL
457 return prepare_to_stop().then([this] {
458 state.set_stopping();
459 logger().debug("prepared to stop");
460 public_msgr->stop();
461 cluster_msgr->stop();
462 auto gate_close_fut = gate.close();
463 return asok->stop().then([this] {
464 return heartbeat->stop();
465 }).then([this] {
466 return store->umount();
467 }).then([this] {
468 return store->stop();
469 }).then([this] {
470 return seastar::parallel_for_each(pg_map.get_pgs(),
471 [](auto& p) {
472 return p.second->stop();
473 });
474 }).then([this] {
475 return monc->stop();
476 }).then([this] {
477 return mgrc->stop();
478 }).then([fut=std::move(gate_close_fut)]() mutable {
479 return std::move(fut);
480 }).then([this] {
481 return when_all_succeed(
482 public_msgr->shutdown(),
483 cluster_msgr->shutdown());
484 }).then_unpack([] {
485 return seastar::now();
486 }).handle_exception([](auto ep) {
487 logger().error("error while stopping osd: {}", ep);
488 });
11fdf7f2
TL
489 });
490}
491
9f95a23c
TL
492void OSD::dump_status(Formatter* f) const
493{
494 f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
495 f->dump_stream("osd_fsid") << superblock.osd_fsid;
496 f->dump_unsigned("whoami", superblock.whoami);
497 f->dump_string("state", state.to_string());
498 f->dump_unsigned("oldest_map", superblock.oldest_map);
499 f->dump_unsigned("newest_map", superblock.newest_map);
500 f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
501}
502
f67539c2
TL
503void OSD::dump_pg_state_history(Formatter* f) const
504{
505 f->open_array_section("pgs");
506 for (auto [pgid, pg] : pg_map.get_pgs()) {
507 f->open_object_section("pg");
508 f->dump_stream("pg") << pgid;
509 const auto& peering_state = pg->get_peering_state();
510 f->dump_string("currently", peering_state.get_current_state());
511 peering_state.dump_history(f);
512 f->close_section();
513 }
514 f->close_section();
515}
516
517void OSD::print(std::ostream& out) const
518{
519 out << "{osd." << superblock.whoami << " "
520 << superblock.osd_fsid << " [" << superblock.oldest_map
521 << "," << superblock.newest_map << "] " << pg_map.get_pgs().size()
522 << " pgs}";
523}
524
11fdf7f2
TL
525seastar::future<> OSD::load_pgs()
526{
9f95a23c
TL
527 return store->list_collections().then([this](auto colls) {
528 return seastar::parallel_for_each(colls, [this](auto coll) {
11fdf7f2
TL
529 spg_t pgid;
530 if (coll.is_pg(&pgid)) {
531 return load_pg(pgid).then([pgid, this](auto&& pg) {
532 logger().info("load_pgs: loaded {}", pgid);
9f95a23c
TL
533 pg_map.pg_loaded(pgid, std::move(pg));
534 shard_services.inc_pg_num();
11fdf7f2
TL
535 return seastar::now();
536 });
537 } else if (coll.is_temp(&pgid)) {
538 // TODO: remove the collection
539 return seastar::now();
540 } else {
541 logger().warn("ignoring unrecognized collection: {}", coll);
542 return seastar::now();
543 }
544 });
9f95a23c 545 });
11fdf7f2
TL
546}
547
9f95a23c
TL
548seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
549 spg_t pgid,
550 bool do_create)
11fdf7f2
TL
551{
552 using ec_profile_t = map<string,string>;
9f95a23c 553 auto get_pool_info = [create_map, pgid, this] {
11fdf7f2
TL
554 if (create_map->have_pg_pool(pgid.pool())) {
555 pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
556 string name = create_map->get_pool_name(pgid.pool());
557 ec_profile_t ec_profile;
558 if (pi.is_erasure()) {
9f95a23c 559 ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
11fdf7f2 560 }
f67539c2
TL
561 return seastar::make_ready_future<std::tuple<pg_pool_t, string, ec_profile_t>>(
562 std::make_tuple(std::move(pi),
563 std::move(name),
564 std::move(ec_profile)));
11fdf7f2
TL
565 } else {
566 // pool was deleted; grab final pg_pool_t off disk.
567 return meta_coll->load_final_pool_info(pgid.pool());
568 }
9f95a23c
TL
569 };
570 auto get_collection = [pgid, do_create, this] {
571 const coll_t cid{pgid};
572 if (do_create) {
573 return store->create_new_collection(cid);
574 } else {
575 return store->open_collection(cid);
576 }
577 };
f67539c2 578 return seastar::when_all(
9f95a23c
TL
579 std::move(get_pool_info),
580 std::move(get_collection)
f67539c2
TL
581 ).then([pgid, create_map, this] (auto&& ret) {
582 auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
583 auto coll = std::move(std::get<1>(ret).get0());
9f95a23c
TL
584 return seastar::make_ready_future<Ref<PG>>(
585 new PG{pgid,
586 pg_shard_t{whoami, pgid.shard},
587 std::move(coll),
588 std::move(pool),
589 std::move(name),
590 create_map,
591 shard_services,
592 ec_profile});
11fdf7f2
TL
593 });
594}
595
9f95a23c
TL
596seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
597{
f67539c2
TL
598 logger().debug("{}: {}", __func__, pgid);
599
600 return seastar::do_with(PGMeta(store.get(), pgid), [] (auto& pg_meta) {
601 return pg_meta.get_epoch();
602 }).then([this](epoch_t e) {
9f95a23c
TL
603 return get_map(e);
604 }).then([pgid, this] (auto&& create_map) {
605 return make_pg(std::move(create_map), pgid, false);
f67539c2 606 }).then([this](Ref<PG> pg) {
9f95a23c 607 return pg->read_state(store.get()).then([pg] {
f67539c2 608 return seastar::make_ready_future<Ref<PG>>(std::move(pg));
9f95a23c
TL
609 });
610 }).handle_exception([pgid](auto ep) {
611 logger().info("pg {} saw exception on load {}", pgid, ep);
612 ceph_abort("Could not load pg" == 0);
613 return seastar::make_exception_future<Ref<PG>>(ep);
614 });
615}
616
f67539c2
TL
617std::optional<seastar::future<>>
618OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
11fdf7f2 619{
11fdf7f2 620 if (state.is_stopping()) {
f67539c2 621 return {};
11fdf7f2 622 }
f67539c2
TL
623 bool dispatched = true;
624 gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
625 switch (m->get_type()) {
626 case CEPH_MSG_OSD_MAP:
627 return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
628 case CEPH_MSG_OSD_OP:
629 return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
630 case MSG_OSD_PG_CREATE2:
631 shard_services.start_operation<CompoundPeeringRequest>(
632 *this,
633 conn,
634 m);
635 return seastar::now();
636 case MSG_COMMAND:
637 return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
638 case MSG_OSD_MARK_ME_DOWN:
639 return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
640 case MSG_OSD_PG_PULL:
641 [[fallthrough]];
642 case MSG_OSD_PG_PUSH:
643 [[fallthrough]];
644 case MSG_OSD_PG_PUSH_REPLY:
645 [[fallthrough]];
646 case MSG_OSD_PG_RECOVERY_DELETE:
647 [[fallthrough]];
648 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
649 [[fallthrough]];
650 case MSG_OSD_PG_SCAN:
651 [[fallthrough]];
652 case MSG_OSD_PG_BACKFILL:
653 [[fallthrough]];
654 case MSG_OSD_PG_BACKFILL_REMOVE:
655 return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
656 case MSG_OSD_PG_LEASE:
657 [[fallthrough]];
658 case MSG_OSD_PG_LEASE_ACK:
659 [[fallthrough]];
660 case MSG_OSD_PG_NOTIFY2:
661 [[fallthrough]];
662 case MSG_OSD_PG_INFO2:
663 [[fallthrough]];
664 case MSG_OSD_PG_QUERY2:
665 [[fallthrough]];
666 case MSG_OSD_BACKFILL_RESERVE:
667 [[fallthrough]];
668 case MSG_OSD_RECOVERY_RESERVE:
669 [[fallthrough]];
670 case MSG_OSD_PG_LOG:
671 return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
672 case MSG_OSD_REPOP:
673 return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
674 case MSG_OSD_REPOPREPLY:
675 return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
676 case MSG_OSD_SCRUB2:
677 return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
678 default:
679 dispatched = false;
680 return seastar::now();
681 }
682 });
683 return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
11fdf7f2
TL
684}
685
f67539c2 686void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
11fdf7f2
TL
687{
688 // TODO: cleanup the session attached to this connection
689 logger().warn("ms_handle_reset");
11fdf7f2
TL
690}
691
f67539c2 692void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
11fdf7f2
TL
693{
694 logger().warn("ms_handle_remote_reset");
11fdf7f2
TL
695}
696
9f95a23c 697void OSD::handle_authentication(const EntityName& name,
f67539c2
TL
698 const AuthCapsInfo& caps_info)
699{
700 // TODO: store the parsed cap and associate it with the connection
701 if (caps_info.allow_all) {
702 logger().debug("{} {} has all caps", __func__, name);
703 return;
704 }
705 if (caps_info.caps.length() > 0) {
706 auto p = caps_info.caps.cbegin();
707 string str;
708 try {
709 decode(str, p);
710 } catch (ceph::buffer::error& e) {
711 logger().warn("{} {} failed to decode caps string", __func__, name);
712 return;
713 }
714 OSDCap caps;
715 if (caps.parse(str)) {
716 logger().debug("{} {} has caps {}", __func__, name, str);
717 } else {
718 logger().warn("{} {} failed to parse caps {}", __func__, name, str);
719 }
720 }
721}
722
723void OSD::update_stats()
9f95a23c 724{
f67539c2
TL
725 osd_stat_seq++;
726 osd_stat.up_from = get_up_epoch();
727 osd_stat.hb_peers = heartbeat->get_peers();
728 osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
729 gate.dispatch_in_background("statfs", *this, [this] {
730 (void) store->stat().then([this](store_statfs_t&& st) {
731 osd_stat.statfs = st;
732 });
733 });
9f95a23c
TL
734}
735
f67539c2 736MessageRef OSD::get_stats() const
9f95a23c
TL
737{
738 // todo: m-to-n: collect stats using map-reduce
739 // MPGStats::had_map_for is not used since PGMonitor was removed
740 auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
f67539c2 741 m->osd_stat = osd_stat;
9f95a23c
TL
742 for (auto [pgid, pg] : pg_map.get_pgs()) {
743 if (pg->is_primary()) {
744 auto stats = pg->get_stats();
745 // todo: update reported_epoch,reported_seq,last_fresh
746 stats.reported_epoch = osdmap->get_epoch();
747 m->pg_stat.emplace(pgid.pgid, std::move(stats));
748 }
749 }
750 return m;
751}
752
f67539c2
TL
753uint64_t OSD::send_pg_stats()
754{
755 // mgr client sends the report message in background
756 mgrc->report();
757 return osd_stat.seq;
758}
759
11fdf7f2
TL
760OSD::cached_map_t OSD::get_map() const
761{
762 return osdmap;
763}
764
765seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e)
766{
767 // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
768 if (auto found = osdmaps.find(e); found) {
769 return seastar::make_ready_future<cached_map_t>(std::move(found));
770 } else {
9f95a23c 771 return load_map(e).then([e, this](unique_ptr<OSDMap> osdmap) {
11fdf7f2
TL
772 return seastar::make_ready_future<cached_map_t>(
773 osdmaps.insert(e, std::move(osdmap)));
774 });
775 }
776}
777
778void OSD::store_map_bl(ceph::os::Transaction& t,
779 epoch_t e, bufferlist&& bl)
780{
781 meta_coll->store_map(t, e, bl);
782 map_bl_cache.insert(e, std::move(bl));
783}
784
785seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
786{
787 if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
788 return seastar::make_ready_future<bufferlist>(*found);
789 } else {
790 return meta_coll->load_map(e);
791 }
792}
793
f67539c2
TL
794seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls(
795 epoch_t first,
796 epoch_t last)
797{
798 return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
799 boost::make_counting_iterator<epoch_t>(last + 1),
800 [this](epoch_t e) {
801 return load_map_bl(e).then([e](auto&& bl) {
802 return seastar::make_ready_future<pair<epoch_t, bufferlist>>(
803 std::make_pair(e, std::move(bl)));
804 });
805 },
806 std::map<epoch_t, bufferlist>{},
807 [](auto&& bls, auto&& epoch_bl) {
808 bls.emplace(std::move(epoch_bl));
809 return std::move(bls);
810 });
811}
812
9f95a23c
TL
813seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
814{
815 auto o = std::make_unique<OSDMap>();
816 if (e > 0) {
817 return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
818 o->decode(bl);
819 return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
820 });
821 } else {
822 return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
823 }
824}
825
11fdf7f2
TL
826seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
827 epoch_t start, Ref<MOSDMap> m)
828{
9f95a23c
TL
829 return seastar::do_for_each(boost::make_counting_iterator(start),
830 boost::make_counting_iterator(m->get_last() + 1),
11fdf7f2
TL
831 [&t, m, this](epoch_t e) {
832 if (auto p = m->maps.find(e); p != m->maps.end()) {
833 auto o = std::make_unique<OSDMap>();
834 o->decode(p->second);
835 logger().info("store_maps osdmap.{}", e);
836 store_map_bl(t, e, std::move(std::move(p->second)));
837 osdmaps.insert(e, std::move(o));
838 return seastar::now();
839 } else if (auto p = m->incremental_maps.find(e);
840 p != m->incremental_maps.end()) {
9f95a23c
TL
841 return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
842 OSDMap::Incremental inc;
843 auto i = bl.cbegin();
844 inc.decode(i);
845 o->apply_incremental(inc);
846 bufferlist fbl;
847 o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
848 store_map_bl(t, e, std::move(fbl));
849 osdmaps.insert(e, std::move(o));
850 return seastar::now();
11fdf7f2
TL
851 });
852 } else {
853 logger().error("MOSDMap lied about what maps it had?");
854 return seastar::now();
855 }
856 });
857}
858
9f95a23c 859bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
11fdf7f2 860{
9f95a23c
TL
861 if (!conn->peer_is_mon()) {
862 logger().info("{} received from non-mon {}, {}",
863 __func__,
864 conn->get_peer_addr(),
865 *m);
866 return false;
11fdf7f2 867 }
9f95a23c 868 return true;
11fdf7f2
TL
869}
870
9f95a23c
TL
871seastar::future<Ref<PG>> OSD::handle_pg_create_info(
872 std::unique_ptr<PGCreateInfo> info) {
873 return seastar::do_with(
874 std::move(info),
875 [this](auto &info) -> seastar::future<Ref<PG>> {
876 return get_map(info->epoch).then(
877 [&info, this](cached_map_t startmap) ->
f67539c2 878 seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
9f95a23c
TL
879 const spg_t &pgid = info->pgid;
880 if (info->by_mon) {
881 int64_t pool_id = pgid.pgid.pool();
882 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
883 if (!pool) {
884 logger().debug(
885 "{} ignoring pgid {}, pool dne",
886 __func__,
887 pgid);
f67539c2
TL
888 return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
889 std::make_tuple(Ref<PG>(), startmap));
9f95a23c
TL
890 }
891 ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
892 if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
893 // this ensures we do not process old creating messages after the
894 // pool's initial pgs have been created (and pg are subsequently
895 // allowed to split or merge).
896 logger().debug(
897 "{} dropping {} create, pool does not have CREATING flag set",
898 __func__,
899 pgid);
f67539c2
TL
900 return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
901 std::make_tuple(Ref<PG>(), startmap));
9f95a23c
TL
902 }
903 }
904 return make_pg(startmap, pgid, true).then(
905 [startmap=std::move(startmap)](auto pg) mutable {
f67539c2
TL
906 return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
907 std::make_tuple(std::move(pg), std::move(startmap)));
9f95a23c 908 });
f67539c2 909 }).then([this, &info](auto&& ret) ->
9f95a23c 910 seastar::future<Ref<PG>> {
f67539c2 911 auto [pg, startmap] = std::move(ret);
9f95a23c
TL
912 if (!pg)
913 return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
914 PeeringCtx rctx{ceph_release_t::octopus};
915 const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
916
917 int up_primary, acting_primary;
918 vector<int> up, acting;
919 startmap->pg_to_up_acting_osds(
920 info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
921
922 int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard),
923 acting);
924
925 create_pg_collection(
926 rctx.transaction,
927 info->pgid,
928 info->pgid.get_split_bits(pp->get_pg_num()));
929 init_pg_ondisk(
930 rctx.transaction,
931 info->pgid,
932 pp);
933
934 pg->init(
935 role,
936 up,
937 up_primary,
938 acting,
939 acting_primary,
940 info->history,
941 info->past_intervals,
942 false,
943 rctx.transaction);
944
945 return shard_services.start_operation<PGAdvanceMap>(
946 *this, pg, pg->get_osdmap_epoch(),
f67539c2 947 osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] {
9f95a23c
TL
948 return seastar::make_ready_future<Ref<PG>>(pg);
949 });
950 });
951 });
952}
953
f67539c2 954seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
11fdf7f2
TL
955 Ref<MOSDMap> m)
956{
957 logger().info("handle_osd_map {}", *m);
958 if (m->fsid != superblock.cluster_fsid) {
959 logger().warn("fsid mismatched");
960 return seastar::now();
961 }
962 if (state.is_initializing()) {
963 logger().warn("i am still initializing");
964 return seastar::now();
965 }
966
967 const auto first = m->get_first();
968 const auto last = m->get_last();
9f95a23c
TL
969 logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
970 first, last, superblock.newest_map, m->oldest_map, m->newest_map);
11fdf7f2
TL
971 // make sure there is something new, here, before we bother flushing
972 // the queues and such
973 if (last <= superblock.newest_map) {
974 return seastar::now();
975 }
976 // missing some?
977 bool skip_maps = false;
978 epoch_t start = superblock.newest_map + 1;
979 if (first > start) {
980 logger().info("handle_osd_map message skips epochs {}..{}",
981 start, first - 1);
982 if (m->oldest_map <= start) {
9f95a23c 983 return shard_services.osdmap_subscribe(start, false);
11fdf7f2
TL
984 }
985 // always try to get the full range of maps--as many as we can. this
986 // 1- is good to have
987 // 2- is at present the only way to ensure that we get a *full* map as
988 // the first map!
989 if (m->oldest_map < first) {
9f95a23c 990 return shard_services.osdmap_subscribe(m->oldest_map - 1, true);
11fdf7f2
TL
991 }
992 skip_maps = true;
993 start = first;
994 }
995
996 return seastar::do_with(ceph::os::Transaction{},
997 [=](auto& t) {
998 return store_maps(t, start, m).then([=, &t] {
999 // even if this map isn't from a mon, we may have satisfied our subscription
1000 monc->sub_got("osdmap", last);
1001 if (!superblock.oldest_map || skip_maps) {
1002 superblock.oldest_map = first;
1003 }
1004 superblock.newest_map = last;
1005 superblock.current_epoch = last;
1006
1007 // note in the superblock that we were clean thru the prior epoch
1008 if (boot_epoch && boot_epoch >= superblock.mounted) {
1009 superblock.mounted = boot_epoch;
1010 superblock.clean_thru = last;
1011 }
1012 meta_coll->store_superblock(t, superblock);
1013 return store->do_transaction(meta_coll->collection(), std::move(t));
1014 });
1015 }).then([=] {
1016 // TODO: write to superblock and commit the transaction
1017 return committed_osd_maps(start, last, m);
1018 });
1019}
1020
1021seastar::future<> OSD::committed_osd_maps(version_t first,
1022 version_t last,
1023 Ref<MOSDMap> m)
1024{
1025 logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
1026 // advance through the new maps
9f95a23c
TL
1027 return seastar::do_for_each(boost::make_counting_iterator(first),
1028 boost::make_counting_iterator(last + 1),
1029 [this](epoch_t cur) {
11fdf7f2
TL
1030 return get_map(cur).then([this](cached_map_t&& o) {
1031 osdmap = std::move(o);
9f95a23c
TL
1032 shard_services.update_map(osdmap);
1033 if (up_epoch == 0 &&
11fdf7f2
TL
1034 osdmap->is_up(whoami) &&
1035 osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
1036 up_epoch = osdmap->get_epoch();
1037 if (!boot_epoch) {
1038 boot_epoch = osdmap->get_epoch();
1039 }
1040 }
1041 });
1042 }).then([m, this] {
1043 if (osdmap->is_up(whoami) &&
1044 osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
1045 bind_epoch < osdmap->get_up_from(whoami)) {
1046 if (state.is_booting()) {
1047 logger().info("osd.{}: activating...", whoami);
1048 state.set_active();
1049 beacon_timer.arm_periodic(
1050 std::chrono::seconds(local_conf()->osd_beacon_report_interval));
f67539c2 1051 tick_timer.arm_periodic(
11fdf7f2
TL
1052 std::chrono::seconds(TICK_INTERVAL));
1053 }
f67539c2
TL
1054 } else if (!osdmap->is_up(whoami)) {
1055 if (state.is_prestop()) {
1056 got_stop_ack();
1057 return seastar::now();
1058 }
11fdf7f2 1059 }
9f95a23c
TL
1060 check_osdmap_features();
1061 // yay!
1062 return consume_map(osdmap->get_epoch());
1063 }).then([m, this] {
11fdf7f2
TL
1064 if (state.is_active()) {
1065 logger().info("osd.{}: now active", whoami);
1066 if (!osdmap->exists(whoami)) {
1067 return shutdown();
1068 }
1069 if (should_restart()) {
1070 return restart();
1071 } else {
1072 return seastar::now();
1073 }
1074 } else if (state.is_preboot()) {
1075 logger().info("osd.{}: now preboot", whoami);
1076
1077 if (m->get_source().is_mon()) {
1078 return _preboot(m->oldest_map, m->newest_map);
1079 } else {
1080 logger().info("osd.{}: start_boot", whoami);
1081 return start_boot();
1082 }
1083 } else {
1084 logger().info("osd.{}: now {}", whoami, state);
1085 // XXX
1086 return seastar::now();
1087 }
1088 });
1089}
1090
f67539c2 1091seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
9f95a23c
TL
1092 Ref<MOSDOp> m)
1093{
f67539c2 1094 (void) shard_services.start_operation<ClientRequest>(
9f95a23c 1095 *this,
f67539c2 1096 conn,
9f95a23c
TL
1097 std::move(m));
1098 return seastar::now();
1099}
1100
f67539c2
TL
1101seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
1102 epoch_t first)
1103{
1104 if (first >= superblock.oldest_map) {
1105 return load_map_bls(first, superblock.newest_map)
1106 .then([this, conn, first](auto&& bls) {
1107 auto m = make_message<MOSDMap>(monc->get_fsid(),
1108 osdmap->get_encoding_features());
1109 m->oldest_map = first;
1110 m->newest_map = superblock.newest_map;
1111 m->maps = std::move(bls);
1112 return conn->send(m);
1113 });
1114 } else {
1115 return load_map_bl(osdmap->get_epoch())
1116 .then([this, conn](auto&& bl) mutable {
1117 auto m = make_message<MOSDMap>(monc->get_fsid(),
1118 osdmap->get_encoding_features());
1119 m->oldest_map = superblock.oldest_map;
1120 m->newest_map = superblock.newest_map;
1121 m->maps.emplace(osdmap->get_epoch(), std::move(bl));
1122 return conn->send(m);
1123 });
1124 }
1125}
1126
1127seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
9f95a23c
TL
1128 Ref<MOSDRepOp> m)
1129{
1130 m->finish_decode();
f67539c2 1131 (void) shard_services.start_operation<RepRequest>(
9f95a23c 1132 *this,
f67539c2 1133 std::move(conn),
9f95a23c
TL
1134 std::move(m));
1135 return seastar::now();
1136}
1137
f67539c2 1138seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
9f95a23c
TL
1139 Ref<MOSDRepOpReply> m)
1140{
1141 const auto& pgs = pg_map.get_pgs();
1142 if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
1143 m->finish_decode();
1144 pg->second->handle_rep_op_reply(conn, *m);
1145 } else {
1146 logger().warn("stale reply: {}", *m);
1147 }
1148 return seastar::now();
1149}
1150
f67539c2
TL
1151seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
1152 Ref<MOSDScrub2> m)
1153{
1154 if (m->fsid != superblock.cluster_fsid) {
1155 logger().warn("fsid mismatched");
1156 return seastar::now();
1157 }
1158 return seastar::parallel_for_each(std::move(m->scrub_pgs),
1159 [m, conn, this](spg_t pgid) {
1160 pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
1161 pgid.shard};
1162 PeeringState::RequestScrub scrub_request{m->deep, m->repair};
1163 return shard_services.start_operation<RemotePeeringEvent>(
1164 *this,
1165 conn,
1166 shard_services,
1167 from_shard,
1168 pgid,
1169 PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
1170 });
1171}
1172
1173seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
1174 Ref<MOSDMarkMeDown> m)
1175{
1176 if (state.is_prestop()) {
1177 got_stop_ack();
1178 }
1179 return seastar::now();
1180}
1181
1182seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
1183 Ref<MOSDFastDispatchOp> m)
1184{
1185 (void) shard_services.start_operation<RecoverySubRequest>(
1186 *this,
1187 conn,
1188 std::move(m));
1189 return seastar::now();
1190}
1191
11fdf7f2
TL
1192bool OSD::should_restart() const
1193{
1194 if (!osdmap->is_up(whoami)) {
1195 logger().info("map e {} marked osd.{} down",
1196 osdmap->get_epoch(), whoami);
1197 return true;
1198 } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
1199 logger().error("map e {} had wrong client addr ({} != my {})",
1200 osdmap->get_epoch(),
1201 osdmap->get_addrs(whoami),
1202 public_msgr->get_myaddrs());
1203 return true;
1204 } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
1205 logger().error("map e {} had wrong cluster addr ({} != my {})",
1206 osdmap->get_epoch(),
1207 osdmap->get_cluster_addrs(whoami),
1208 cluster_msgr->get_myaddrs());
1209 return true;
1210 } else {
1211 return false;
1212 }
1213}
1214
1215seastar::future<> OSD::restart()
1216{
9f95a23c 1217 beacon_timer.cancel();
f67539c2 1218 tick_timer.cancel();
11fdf7f2
TL
1219 up_epoch = 0;
1220 bind_epoch = osdmap->get_epoch();
1221 // TODO: promote to shutdown if being marked down for multiple times
1222 // rebind messengers
1223 return start_boot();
1224}
1225
1226seastar::future<> OSD::shutdown()
1227{
1228 // TODO
1229 superblock.mounted = boot_epoch;
1230 superblock.clean_thru = osdmap->get_epoch();
1231 return seastar::now();
1232}
1233
1234seastar::future<> OSD::send_beacon()
1235{
9f95a23c
TL
1236 if (!state.is_active()) {
1237 return seastar::now();
1238 }
11fdf7f2
TL
1239 // FIXME: min lec should be calculated from pg_stat
1240 // and should set m->pgs
1241 epoch_t min_last_epoch_clean = osdmap->get_epoch();
1242 auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
9f95a23c 1243 min_last_epoch_clean,
f67539c2
TL
1244 superblock.last_purged_snaps_scrub,
1245 local_conf()->osd_beacon_report_interval);
11fdf7f2
TL
1246 return monc->send_message(m);
1247}
1248
f67539c2 1249void OSD::update_heartbeat_peers()
11fdf7f2
TL
1250{
1251 if (!state.is_active()) {
f67539c2 1252 return;
11fdf7f2 1253 }
9f95a23c 1254 for (auto& pg : pg_map.get_pgs()) {
11fdf7f2
TL
1255 vector<int> up, acting;
1256 osdmap->pg_to_up_acting_osds(pg.first.pgid,
1257 &up, nullptr,
1258 &acting, nullptr);
9f95a23c
TL
1259 for (int osd : boost::join(up, acting)) {
1260 if (osd == CRUSH_ITEM_NONE || osd == whoami) {
1261 continue;
1262 } else {
11fdf7f2
TL
1263 heartbeat->add_peer(osd, osdmap->get_epoch());
1264 }
1265 }
1266 }
f67539c2 1267 heartbeat->update_peers(whoami);
9f95a23c
TL
1268}
1269
1270seastar::future<> OSD::handle_peering_op(
f67539c2 1271 crimson::net::ConnectionRef conn,
9f95a23c
TL
1272 Ref<MOSDPeeringOp> m)
1273{
1274 const int from = m->get_source().num();
1275 logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
f67539c2
TL
1276 std::unique_ptr<PGPeeringEvent> evt(m->get_event());
1277 (void) shard_services.start_operation<RemotePeeringEvent>(
9f95a23c 1278 *this,
f67539c2 1279 conn,
9f95a23c
TL
1280 shard_services,
1281 pg_shard_t{from, m->get_spg().shard},
1282 m->get_spg(),
f67539c2 1283 std::move(*evt));
9f95a23c
TL
1284 return seastar::now();
1285}
1286
1287void OSD::check_osdmap_features()
1288{
1289 heartbeat->set_require_authorizer(true);
1290}
1291
1292seastar::future<> OSD::consume_map(epoch_t epoch)
1293{
1294 // todo: m-to-n: broadcast this news to all shards
1295 auto &pgs = pg_map.get_pgs();
1296 return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
1297 return shard_services.start_operation<PGAdvanceMap>(
1298 *this, pg.second, pg.second->get_osdmap_epoch(), epoch,
1299 PeeringCtx{ceph_release_t::octopus}, false).second;
1300 }).then([epoch, this] {
1301 osdmap_gate.got_map(epoch);
1302 return seastar::make_ready_future();
1303 });
1304}
1305
1306
1307blocking_future<Ref<PG>>
1308OSD::get_or_create_pg(
1309 spg_t pgid,
1310 epoch_t epoch,
1311 std::unique_ptr<PGCreateInfo> info)
1312{
f67539c2
TL
1313 if (info) {
1314 auto [fut, creating] = pg_map.wait_for_pg(pgid);
1315 if (!creating) {
1316 pg_map.set_creating(pgid);
1317 (void)handle_pg_create_info(std::move(info));
1318 }
1319 return std::move(fut);
1320 } else {
1321 return make_ready_blocking_future<Ref<PG>>(pg_map.get_pg(pgid));
9f95a23c 1322 }
9f95a23c
TL
1323}
1324
1325blocking_future<Ref<PG>> OSD::wait_for_pg(
1326 spg_t pgid)
1327{
f67539c2
TL
1328 return pg_map.wait_for_pg(pgid).first;
1329}
1330
1331Ref<PG> OSD::get_pg(spg_t pgid)
1332{
1333 return pg_map.get_pg(pgid);
1334}
1335
1336seastar::future<> OSD::prepare_to_stop()
1337{
1338 if (osdmap && osdmap->is_up(whoami)) {
1339 state.set_prestop();
1340 const auto timeout =
1341 std::chrono::duration_cast<std::chrono::milliseconds>(
1342 std::chrono::duration<double>(
1343 local_conf().get_val<double>("osd_mon_shutdown_timeout")));
1344
1345 return seastar::with_timeout(
1346 seastar::timer<>::clock::now() + timeout,
1347 monc->send_message(
1348 make_message<MOSDMarkMeDown>(
1349 monc->get_fsid(),
1350 whoami,
1351 osdmap->get_addrs(whoami),
1352 osdmap->get_epoch(),
1353 true)).then([this] {
1354 return stop_acked.get_future();
1355 })
1356 ).handle_exception_type(
1357 [](seastar::timed_out_error&) {
1358 return seastar::now();
1359 });
1360 }
1361 return seastar::now();
9f95a23c
TL
1362}
1363
11fdf7f2 1364}