]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
11fdf7f2 TL |
4 | #include "osd.h" |
5 | ||
9f95a23c TL |
6 | #include <sys/utsname.h> |
7 | ||
8 | #include <boost/iterator/counting_iterator.hpp> | |
11fdf7f2 TL |
9 | #include <boost/range/join.hpp> |
10 | #include <boost/smart_ptr/make_local_shared.hpp> | |
9f95a23c TL |
11 | #include <fmt/format.h> |
12 | #include <fmt/ostream.h> | |
f67539c2 | 13 | #include <seastar/core/timer.hh> |
11fdf7f2 TL |
14 | |
15 | #include "common/pick_address.h" | |
9f95a23c TL |
16 | #include "include/util.h" |
17 | ||
f67539c2 | 18 | #include "messages/MCommand.h" |
11fdf7f2 TL |
19 | #include "messages/MOSDBeacon.h" |
20 | #include "messages/MOSDBoot.h" | |
21 | #include "messages/MOSDMap.h" | |
f67539c2 | 22 | #include "messages/MOSDMarkMeDown.h" |
9f95a23c TL |
23 | #include "messages/MOSDOp.h" |
24 | #include "messages/MOSDPGLog.h" | |
f67539c2 TL |
25 | #include "messages/MOSDPGPull.h" |
26 | #include "messages/MOSDPGPush.h" | |
27 | #include "messages/MOSDPGPushReply.h" | |
28 | #include "messages/MOSDPGRecoveryDelete.h" | |
29 | #include "messages/MOSDPGRecoveryDeleteReply.h" | |
9f95a23c | 30 | #include "messages/MOSDRepOpReply.h" |
f67539c2 | 31 | #include "messages/MOSDScrub2.h" |
9f95a23c TL |
32 | #include "messages/MPGStats.h" |
33 | ||
34 | #include "os/Transaction.h" | |
35 | #include "osd/ClassHandler.h" | |
f67539c2 | 36 | #include "osd/OSDCap.h" |
9f95a23c TL |
37 | #include "osd/PGPeeringEvent.h" |
38 | #include "osd/PeeringState.h" | |
39 | ||
f67539c2 TL |
40 | #include "crimson/admin/osd_admin.h" |
41 | #include "crimson/admin/pg_commands.h" | |
42 | #include "crimson/common/exception.h" | |
9f95a23c | 43 | #include "crimson/mon/MonClient.h" |
11fdf7f2 TL |
44 | #include "crimson/net/Connection.h" |
45 | #include "crimson/net/Messenger.h" | |
9f95a23c TL |
46 | #include "crimson/os/futurized_collection.h" |
47 | #include "crimson/os/futurized_store.h" | |
11fdf7f2 TL |
48 | #include "crimson/osd/heartbeat.h" |
49 | #include "crimson/osd/osd_meta.h" | |
50 | #include "crimson/osd/pg.h" | |
9f95a23c | 51 | #include "crimson/osd/pg_backend.h" |
11fdf7f2 | 52 | #include "crimson/osd/pg_meta.h" |
9f95a23c TL |
53 | #include "crimson/osd/osd_operations/client_request.h" |
54 | #include "crimson/osd/osd_operations/compound_peering_request.h" | |
55 | #include "crimson/osd/osd_operations/peering_event.h" | |
56 | #include "crimson/osd/osd_operations/pg_advance_map.h" | |
f67539c2 | 57 | #include "crimson/osd/osd_operations/recovery_subrequest.h" |
9f95a23c | 58 | #include "crimson/osd/osd_operations/replicated_request.h" |
11fdf7f2 TL |
59 | |
60 | namespace { | |
61 | seastar::logger& logger() { | |
9f95a23c | 62 | return crimson::get_logger(ceph_subsys_osd); |
11fdf7f2 TL |
63 | } |
64 | static constexpr int TICK_INTERVAL = 1; | |
65 | } | |
66 | ||
9f95a23c TL |
67 | using crimson::common::local_conf; |
68 | using crimson::os::FuturizedStore; | |
69 | ||
70 | namespace crimson::osd { | |
11fdf7f2 | 71 | |
9f95a23c TL |
72 | OSD::OSD(int id, uint32_t nonce, |
73 | crimson::net::MessengerRef cluster_msgr, | |
74 | crimson::net::MessengerRef public_msgr, | |
75 | crimson::net::MessengerRef hb_front_msgr, | |
76 | crimson::net::MessengerRef hb_back_msgr) | |
11fdf7f2 TL |
77 | : whoami{id}, |
78 | nonce{nonce}, | |
9f95a23c TL |
79 | // do this in background |
80 | beacon_timer{[this] { (void)send_beacon(); }}, | |
81 | cluster_msgr{cluster_msgr}, | |
82 | public_msgr{public_msgr}, | |
83 | monc{new crimson::mon::Client{*public_msgr, *this}}, | |
84 | mgrc{new crimson::mgr::Client{*public_msgr, *this}}, | |
85 | store{crimson::os::FuturizedStore::create( | |
86 | local_conf().get_val<std::string>("osd_objectstore"), | |
87 | local_conf().get_val<std::string>("osd_data"), | |
88 | local_conf().get_config_values())}, | |
f67539c2 TL |
89 | shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, *store}, |
90 | heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}}, | |
9f95a23c | 91 | // do this in background |
f67539c2 TL |
92 | tick_timer{[this] { |
93 | update_heartbeat_peers(); | |
94 | update_stats(); | |
95 | }}, | |
9f95a23c TL |
96 | asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()}, |
97 | osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))) | |
11fdf7f2 TL |
98 | { |
99 | osdmaps[0] = boost::make_local_shared<OSDMap>(); | |
9f95a23c TL |
100 | for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr), |
101 | std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) { | |
102 | msgr.get()->set_auth_server(monc.get()); | |
103 | msgr.get()->set_auth_client(monc.get()); | |
104 | } | |
105 | ||
106 | if (local_conf()->osd_open_classes_on_start) { | |
107 | const int r = ClassHandler::get_instance().open_all_classes(); | |
108 | if (r) { | |
109 | logger().warn("{} warning: got an error loading one or more classes: {}", | |
110 | __func__, cpp_strerror(r)); | |
111 | } | |
112 | } | |
11fdf7f2 TL |
113 | } |
114 | ||
115 | OSD::~OSD() = default; | |
116 | ||
117 | namespace { | |
118 | // Initial features in new superblock. | |
119 | // Features here are also automatically upgraded | |
120 | CompatSet get_osd_initial_compat_set() | |
121 | { | |
122 | CompatSet::FeatureSet ceph_osd_feature_compat; | |
123 | CompatSet::FeatureSet ceph_osd_feature_ro_compat; | |
124 | CompatSet::FeatureSet ceph_osd_feature_incompat; | |
125 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); | |
126 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO); | |
127 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC); | |
128 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC); | |
129 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES); | |
130 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL); | |
131 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO); | |
132 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO); | |
133 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG); | |
134 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER); | |
135 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS); | |
136 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA); | |
137 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING); | |
138 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO); | |
139 | ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES); | |
140 | return CompatSet(ceph_osd_feature_compat, | |
141 | ceph_osd_feature_ro_compat, | |
142 | ceph_osd_feature_incompat); | |
143 | } | |
144 | } | |
145 | ||
9f95a23c | 146 | seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid) |
11fdf7f2 | 147 | { |
9f95a23c TL |
148 | return store->start().then([this, osd_uuid] { |
149 | return store->mkfs(osd_uuid); | |
150 | }).then([this] { | |
11fdf7f2 | 151 | return store->mount(); |
9f95a23c | 152 | }).then([cluster_fsid, this] { |
11fdf7f2 | 153 | superblock.cluster_fsid = cluster_fsid; |
9f95a23c | 154 | superblock.osd_fsid = store->get_fsid(); |
11fdf7f2 TL |
155 | superblock.whoami = whoami; |
156 | superblock.compat_features = get_osd_initial_compat_set(); | |
157 | ||
9f95a23c TL |
158 | logger().info( |
159 | "{} writing superblock cluster_fsid {} osd_fsid {}", | |
160 | __func__, | |
161 | cluster_fsid, | |
162 | superblock.osd_fsid); | |
163 | return store->create_new_collection(coll_t::meta()); | |
164 | }).then([this] (auto ch) { | |
165 | meta_coll = make_unique<OSDMeta>(ch , store.get()); | |
11fdf7f2 TL |
166 | ceph::os::Transaction t; |
167 | meta_coll->create(t); | |
168 | meta_coll->store_superblock(t, superblock); | |
169 | return store->do_transaction(meta_coll->collection(), std::move(t)); | |
170 | }).then([cluster_fsid, this] { | |
9f95a23c TL |
171 | return when_all_succeed( |
172 | store->write_meta("ceph_fsid", cluster_fsid.to_string()), | |
173 | store->write_meta("whoami", std::to_string(whoami))); | |
f67539c2 | 174 | }).then_unpack([cluster_fsid, this] { |
9f95a23c TL |
175 | fmt::print("created object store {} for osd.{} fsid {}\n", |
176 | local_conf().get_val<std::string>("osd_data"), | |
177 | whoami, cluster_fsid); | |
11fdf7f2 TL |
178 | return seastar::now(); |
179 | }); | |
180 | } | |
181 | ||
182 | namespace { | |
183 | entity_addrvec_t pick_addresses(int what) { | |
184 | entity_addrvec_t addrs; | |
9f95a23c | 185 | crimson::common::CephContext cct; |
11fdf7f2 TL |
186 | if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) { |
187 | throw std::runtime_error("failed to pick address"); | |
188 | } | |
11fdf7f2 | 189 | for (auto addr : addrs.v) { |
11fdf7f2 TL |
190 | logger().info("picked address {}", addr); |
191 | } | |
192 | return addrs; | |
193 | } | |
194 | std::pair<entity_addrvec_t, bool> | |
195 | replace_unknown_addrs(entity_addrvec_t maybe_unknowns, | |
196 | const entity_addrvec_t& knowns) { | |
197 | bool changed = false; | |
198 | auto maybe_replace = [&](entity_addr_t addr) { | |
199 | if (!addr.is_blank_ip()) { | |
200 | return addr; | |
201 | } | |
202 | for (auto& b : knowns.v) { | |
203 | if (addr.get_family() == b.get_family()) { | |
204 | auto a = b; | |
205 | a.set_nonce(addr.get_nonce()); | |
206 | a.set_type(addr.get_type()); | |
207 | a.set_port(addr.get_port()); | |
208 | changed = true; | |
209 | return a; | |
210 | } | |
211 | } | |
212 | throw std::runtime_error("failed to replace unknown address"); | |
213 | }; | |
214 | entity_addrvec_t replaced; | |
215 | std::transform(maybe_unknowns.v.begin(), | |
216 | maybe_unknowns.v.end(), | |
217 | std::back_inserter(replaced.v), | |
218 | maybe_replace); | |
219 | return {replaced, changed}; | |
220 | } | |
221 | } | |
222 | ||
223 | seastar::future<> OSD::start() | |
224 | { | |
225 | logger().info("start"); | |
226 | ||
9f95a23c | 227 | startup_time = ceph::mono_clock::now(); |
11fdf7f2 | 228 | |
9f95a23c | 229 | return store->start().then([this] { |
11fdf7f2 TL |
230 | return store->mount(); |
231 | }).then([this] { | |
9f95a23c TL |
232 | return store->open_collection(coll_t::meta()); |
233 | }).then([this](auto ch) { | |
234 | meta_coll = make_unique<OSDMeta>(ch, store.get()); | |
11fdf7f2 TL |
235 | return meta_coll->load_superblock(); |
236 | }).then([this](OSDSuperblock&& sb) { | |
237 | superblock = std::move(sb); | |
238 | return get_map(superblock.current_epoch); | |
239 | }).then([this](cached_map_t&& map) { | |
9f95a23c TL |
240 | shard_services.update_map(map); |
241 | osdmap_gate.got_map(map->get_epoch()); | |
11fdf7f2 TL |
242 | osdmap = std::move(map); |
243 | return load_pgs(); | |
244 | }).then([this] { | |
9f95a23c TL |
245 | |
246 | uint64_t osd_required = | |
247 | CEPH_FEATURE_UID | | |
248 | CEPH_FEATURE_PGID64 | | |
249 | CEPH_FEATURE_OSDENC; | |
250 | using crimson::net::SocketPolicy; | |
251 | ||
252 | public_msgr->set_default_policy(SocketPolicy::stateless_server(0)); | |
253 | public_msgr->set_policy(entity_name_t::TYPE_MON, | |
254 | SocketPolicy::lossy_client(osd_required)); | |
255 | public_msgr->set_policy(entity_name_t::TYPE_MGR, | |
256 | SocketPolicy::lossy_client(osd_required)); | |
257 | public_msgr->set_policy(entity_name_t::TYPE_OSD, | |
258 | SocketPolicy::stateless_server(0)); | |
259 | ||
260 | cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0)); | |
261 | cluster_msgr->set_policy(entity_name_t::TYPE_MON, | |
262 | SocketPolicy::lossy_client(0)); | |
263 | cluster_msgr->set_policy(entity_name_t::TYPE_OSD, | |
264 | SocketPolicy::lossless_peer(osd_required)); | |
265 | cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT, | |
266 | SocketPolicy::stateless_server(0)); | |
267 | ||
f67539c2 | 268 | crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()}; |
11fdf7f2 TL |
269 | return seastar::when_all_succeed( |
270 | cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), | |
271 | local_conf()->ms_bind_port_min, | |
272 | local_conf()->ms_bind_port_max) | |
f67539c2 TL |
273 | .safe_then([this, dispatchers]() mutable { |
274 | return cluster_msgr->start(dispatchers); | |
275 | }, crimson::net::Messenger::bind_ertr::all_same_way( | |
276 | [] (const std::error_code& e) { | |
277 | logger().error("cluster messenger try_bind(): address range is unavailable."); | |
278 | ceph_abort(); | |
279 | })), | |
11fdf7f2 TL |
280 | public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), |
281 | local_conf()->ms_bind_port_min, | |
282 | local_conf()->ms_bind_port_max) | |
f67539c2 TL |
283 | .safe_then([this, dispatchers]() mutable { |
284 | return public_msgr->start(dispatchers); | |
285 | }, crimson::net::Messenger::bind_ertr::all_same_way( | |
286 | [] (const std::error_code& e) { | |
287 | logger().error("public messenger try_bind(): address range is unavailable."); | |
288 | ceph_abort(); | |
289 | }))); | |
290 | }).then_unpack([this] { | |
9f95a23c TL |
291 | return seastar::when_all_succeed(monc->start(), |
292 | mgrc->start()); | |
f67539c2 | 293 | }).then_unpack([this] { |
9f95a23c | 294 | return _add_me_to_crush(); |
11fdf7f2 TL |
295 | }).then([this] { |
296 | monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); | |
297 | monc->sub_want("mgrmap", 0, 0); | |
298 | monc->sub_want("osdmap", 0, 0); | |
299 | return monc->renew_subs(); | |
300 | }).then([this] { | |
301 | if (auto [addrs, changed] = | |
302 | replace_unknown_addrs(cluster_msgr->get_myaddrs(), | |
303 | public_msgr->get_myaddrs()); changed) { | |
9f95a23c TL |
304 | return cluster_msgr->set_myaddrs(addrs); |
305 | } else { | |
306 | return seastar::now(); | |
11fdf7f2 | 307 | } |
9f95a23c | 308 | }).then([this] { |
11fdf7f2 TL |
309 | return heartbeat->start(public_msgr->get_myaddrs(), |
310 | cluster_msgr->get_myaddrs()); | |
9f95a23c TL |
311 | }).then([this] { |
312 | // create the admin-socket server, and the objects that register | |
313 | // to handle incoming commands | |
314 | return start_asok_admin(); | |
11fdf7f2 TL |
315 | }).then([this] { |
316 | return start_boot(); | |
317 | }); | |
318 | } | |
319 | ||
320 | seastar::future<> OSD::start_boot() | |
321 | { | |
322 | state.set_preboot(); | |
f67539c2 TL |
323 | return monc->get_version("osdmap").then([this](auto&& ret) { |
324 | auto [newest, oldest] = ret; | |
11fdf7f2 TL |
325 | return _preboot(oldest, newest); |
326 | }); | |
327 | } | |
328 | ||
329 | seastar::future<> OSD::_preboot(version_t oldest, version_t newest) | |
330 | { | |
331 | logger().info("osd.{}: _preboot", whoami); | |
332 | if (osdmap->get_epoch() == 0) { | |
f67539c2 | 333 | logger().info("waiting for initial osdmap"); |
11fdf7f2 TL |
334 | } else if (osdmap->is_destroyed(whoami)) { |
335 | logger().warn("osdmap says I am destroyed"); | |
336 | // provide a small margin so we don't livelock seeing if we | |
337 | // un-destroyed ourselves. | |
338 | if (osdmap->get_epoch() > newest - 1) { | |
339 | throw std::runtime_error("i am destroyed"); | |
340 | } | |
81eedcae | 341 | } else if (osdmap->is_noup(whoami)) { |
11fdf7f2 TL |
342 | logger().warn("osdmap NOUP flag is set, waiting for it to clear"); |
343 | } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { | |
344 | logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it"); | |
9f95a23c TL |
345 | } else if (osdmap->require_osd_release < ceph_release_t::octopus) { |
346 | logger().error("osdmap require_osd_release < octopus; please upgrade to octopus"); | |
11fdf7f2 TL |
347 | } else if (false) { |
348 | // TODO: update mon if current fullness state is different from osdmap | |
349 | } else if (version_t n = local_conf()->osd_map_message_max; | |
350 | osdmap->get_epoch() >= oldest - 1 && | |
351 | osdmap->get_epoch() + n > newest) { | |
352 | return _send_boot(); | |
353 | } | |
354 | // get all the latest maps | |
355 | if (osdmap->get_epoch() + 1 >= oldest) { | |
9f95a23c | 356 | return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false); |
11fdf7f2 | 357 | } else { |
9f95a23c | 358 | return shard_services.osdmap_subscribe(oldest - 1, true); |
11fdf7f2 TL |
359 | } |
360 | } | |
361 | ||
362 | seastar::future<> OSD::_send_boot() | |
363 | { | |
364 | state.set_booting(); | |
365 | ||
366 | logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); | |
367 | logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); | |
368 | logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); | |
369 | auto m = make_message<MOSDBoot>(superblock, | |
370 | osdmap->get_epoch(), | |
371 | osdmap->get_epoch(), | |
372 | heartbeat->get_back_addrs(), | |
373 | heartbeat->get_front_addrs(), | |
374 | cluster_msgr->get_myaddrs(), | |
375 | CEPH_FEATURES_ALL); | |
9f95a23c | 376 | collect_sys_info(&m->metadata, NULL); |
11fdf7f2 TL |
377 | return monc->send_message(m); |
378 | } | |
379 | ||
9f95a23c TL |
380 | seastar::future<> OSD::_add_me_to_crush() |
381 | { | |
382 | if (!local_conf().get_val<bool>("osd_crush_update_on_start")) { | |
383 | return seastar::now(); | |
384 | } | |
385 | auto get_weight = [this] { | |
386 | if (auto w = local_conf().get_val<double>("osd_crush_initial_weight"); | |
387 | w >= 0) { | |
388 | return seastar::make_ready_future<double>(w); | |
389 | } else { | |
390 | return store->stat().then([](auto st) { | |
391 | auto total = st.total; | |
392 | return seastar::make_ready_future<double>( | |
393 | std::max(.00001, | |
394 | double(total) / double(1ull << 40))); // TB | |
395 | }); | |
396 | } | |
397 | }; | |
398 | return get_weight().then([this](auto weight) { | |
399 | const crimson::crush::CrushLocation loc{make_unique<CephContext>().get()}; | |
400 | logger().info("{} crush location is {}", __func__, loc); | |
401 | string cmd = fmt::format(R"({{ | |
402 | "prefix": "osd crush create-or-move", | |
403 | "id": {}, | |
404 | "weight": {:.4f}, | |
405 | "args": [{}] | |
406 | }})", whoami, weight, loc); | |
407 | return monc->run_command({cmd}, {}); | |
f67539c2 TL |
408 | }).then([](auto&& command_result) { |
409 | [[maybe_unused]] auto [code, message, out] = std::move(command_result); | |
9f95a23c TL |
410 | if (code) { |
411 | logger().warn("fail to add to crush: {} ({})", message, code); | |
412 | throw std::runtime_error("fail to add to crush"); | |
413 | } else { | |
414 | logger().info("added to crush: {}", message); | |
415 | } | |
416 | return seastar::now(); | |
417 | }); | |
418 | } | |
419 | ||
f67539c2 TL |
420 | seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn, |
421 | Ref<MCommand> m) | |
9f95a23c | 422 | { |
f67539c2 | 423 | return asok->handle_command(conn, std::move(m)); |
9f95a23c TL |
424 | } |
425 | ||
426 | /* | |
427 | The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands | |
428 | to handle) registered to it: | |
429 | - OSD's specific commands are handled by the OSD object; | |
430 | - there are some common commands registered to be directly handled by the AdminSocket object | |
431 | itself. | |
432 | */ | |
433 | seastar::future<> OSD::start_asok_admin() | |
434 | { | |
435 | auto asok_path = local_conf().get_val<std::string>("admin_socket"); | |
436 | using namespace crimson::admin; | |
437 | return asok->start(asok_path).then([this] { | |
438 | return seastar::when_all_succeed( | |
439 | asok->register_admin_commands(), | |
f67539c2 | 440 | asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this))), |
9f95a23c | 441 | asok->register_command(make_asok_hook<SendBeaconHook>(*this)), |
f67539c2 TL |
442 | asok->register_command(make_asok_hook<FlushPgStatsHook>(*this)), |
443 | asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this))), | |
444 | asok->register_command(make_asok_hook<SeastarMetricsHook>()), | |
445 | // PG commands | |
446 | asok->register_command(make_asok_hook<pg::QueryCommand>(*this)), | |
447 | asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this))); | |
448 | }).then_unpack([] { | |
449 | return seastar::now(); | |
9f95a23c TL |
450 | }); |
451 | } | |
452 | ||
11fdf7f2 TL |
453 | seastar::future<> OSD::stop() |
454 | { | |
9f95a23c | 455 | logger().info("stop"); |
11fdf7f2 | 456 | // see also OSD::shutdown() |
f67539c2 TL |
457 | return prepare_to_stop().then([this] { |
458 | state.set_stopping(); | |
459 | logger().debug("prepared to stop"); | |
460 | public_msgr->stop(); | |
461 | cluster_msgr->stop(); | |
462 | auto gate_close_fut = gate.close(); | |
463 | return asok->stop().then([this] { | |
464 | return heartbeat->stop(); | |
465 | }).then([this] { | |
466 | return store->umount(); | |
467 | }).then([this] { | |
468 | return store->stop(); | |
469 | }).then([this] { | |
470 | return seastar::parallel_for_each(pg_map.get_pgs(), | |
471 | [](auto& p) { | |
472 | return p.second->stop(); | |
473 | }); | |
474 | }).then([this] { | |
475 | return monc->stop(); | |
476 | }).then([this] { | |
477 | return mgrc->stop(); | |
478 | }).then([fut=std::move(gate_close_fut)]() mutable { | |
479 | return std::move(fut); | |
480 | }).then([this] { | |
481 | return when_all_succeed( | |
482 | public_msgr->shutdown(), | |
483 | cluster_msgr->shutdown()); | |
484 | }).then_unpack([] { | |
485 | return seastar::now(); | |
486 | }).handle_exception([](auto ep) { | |
487 | logger().error("error while stopping osd: {}", ep); | |
488 | }); | |
11fdf7f2 TL |
489 | }); |
490 | } | |
491 | ||
9f95a23c TL |
492 | void OSD::dump_status(Formatter* f) const |
493 | { | |
494 | f->dump_stream("cluster_fsid") << superblock.cluster_fsid; | |
495 | f->dump_stream("osd_fsid") << superblock.osd_fsid; | |
496 | f->dump_unsigned("whoami", superblock.whoami); | |
497 | f->dump_string("state", state.to_string()); | |
498 | f->dump_unsigned("oldest_map", superblock.oldest_map); | |
499 | f->dump_unsigned("newest_map", superblock.newest_map); | |
500 | f->dump_unsigned("num_pgs", pg_map.get_pgs().size()); | |
501 | } | |
502 | ||
f67539c2 TL |
503 | void OSD::dump_pg_state_history(Formatter* f) const |
504 | { | |
505 | f->open_array_section("pgs"); | |
506 | for (auto [pgid, pg] : pg_map.get_pgs()) { | |
507 | f->open_object_section("pg"); | |
508 | f->dump_stream("pg") << pgid; | |
509 | const auto& peering_state = pg->get_peering_state(); | |
510 | f->dump_string("currently", peering_state.get_current_state()); | |
511 | peering_state.dump_history(f); | |
512 | f->close_section(); | |
513 | } | |
514 | f->close_section(); | |
515 | } | |
516 | ||
517 | void OSD::print(std::ostream& out) const | |
518 | { | |
519 | out << "{osd." << superblock.whoami << " " | |
520 | << superblock.osd_fsid << " [" << superblock.oldest_map | |
521 | << "," << superblock.newest_map << "] " << pg_map.get_pgs().size() | |
522 | << " pgs}"; | |
523 | } | |
524 | ||
11fdf7f2 TL |
525 | seastar::future<> OSD::load_pgs() |
526 | { | |
9f95a23c TL |
527 | return store->list_collections().then([this](auto colls) { |
528 | return seastar::parallel_for_each(colls, [this](auto coll) { | |
11fdf7f2 TL |
529 | spg_t pgid; |
530 | if (coll.is_pg(&pgid)) { | |
531 | return load_pg(pgid).then([pgid, this](auto&& pg) { | |
532 | logger().info("load_pgs: loaded {}", pgid); | |
9f95a23c TL |
533 | pg_map.pg_loaded(pgid, std::move(pg)); |
534 | shard_services.inc_pg_num(); | |
11fdf7f2 TL |
535 | return seastar::now(); |
536 | }); | |
537 | } else if (coll.is_temp(&pgid)) { | |
538 | // TODO: remove the collection | |
539 | return seastar::now(); | |
540 | } else { | |
541 | logger().warn("ignoring unrecognized collection: {}", coll); | |
542 | return seastar::now(); | |
543 | } | |
544 | }); | |
9f95a23c | 545 | }); |
11fdf7f2 TL |
546 | } |
547 | ||
9f95a23c TL |
548 | seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map, |
549 | spg_t pgid, | |
550 | bool do_create) | |
11fdf7f2 TL |
551 | { |
552 | using ec_profile_t = map<string,string>; | |
9f95a23c | 553 | auto get_pool_info = [create_map, pgid, this] { |
11fdf7f2 TL |
554 | if (create_map->have_pg_pool(pgid.pool())) { |
555 | pg_pool_t pi = *create_map->get_pg_pool(pgid.pool()); | |
556 | string name = create_map->get_pool_name(pgid.pool()); | |
557 | ec_profile_t ec_profile; | |
558 | if (pi.is_erasure()) { | |
9f95a23c | 559 | ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile); |
11fdf7f2 | 560 | } |
f67539c2 TL |
561 | return seastar::make_ready_future<std::tuple<pg_pool_t, string, ec_profile_t>>( |
562 | std::make_tuple(std::move(pi), | |
563 | std::move(name), | |
564 | std::move(ec_profile))); | |
11fdf7f2 TL |
565 | } else { |
566 | // pool was deleted; grab final pg_pool_t off disk. | |
567 | return meta_coll->load_final_pool_info(pgid.pool()); | |
568 | } | |
9f95a23c TL |
569 | }; |
570 | auto get_collection = [pgid, do_create, this] { | |
571 | const coll_t cid{pgid}; | |
572 | if (do_create) { | |
573 | return store->create_new_collection(cid); | |
574 | } else { | |
575 | return store->open_collection(cid); | |
576 | } | |
577 | }; | |
f67539c2 | 578 | return seastar::when_all( |
9f95a23c TL |
579 | std::move(get_pool_info), |
580 | std::move(get_collection) | |
f67539c2 TL |
581 | ).then([pgid, create_map, this] (auto&& ret) { |
582 | auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0()); | |
583 | auto coll = std::move(std::get<1>(ret).get0()); | |
9f95a23c TL |
584 | return seastar::make_ready_future<Ref<PG>>( |
585 | new PG{pgid, | |
586 | pg_shard_t{whoami, pgid.shard}, | |
587 | std::move(coll), | |
588 | std::move(pool), | |
589 | std::move(name), | |
590 | create_map, | |
591 | shard_services, | |
592 | ec_profile}); | |
11fdf7f2 TL |
593 | }); |
594 | } | |
595 | ||
9f95a23c TL |
596 | seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid) |
597 | { | |
f67539c2 TL |
598 | logger().debug("{}: {}", __func__, pgid); |
599 | ||
600 | return seastar::do_with(PGMeta(store.get(), pgid), [] (auto& pg_meta) { | |
601 | return pg_meta.get_epoch(); | |
602 | }).then([this](epoch_t e) { | |
9f95a23c TL |
603 | return get_map(e); |
604 | }).then([pgid, this] (auto&& create_map) { | |
605 | return make_pg(std::move(create_map), pgid, false); | |
f67539c2 | 606 | }).then([this](Ref<PG> pg) { |
9f95a23c | 607 | return pg->read_state(store.get()).then([pg] { |
f67539c2 | 608 | return seastar::make_ready_future<Ref<PG>>(std::move(pg)); |
9f95a23c TL |
609 | }); |
610 | }).handle_exception([pgid](auto ep) { | |
611 | logger().info("pg {} saw exception on load {}", pgid, ep); | |
612 | ceph_abort("Could not load pg" == 0); | |
613 | return seastar::make_exception_future<Ref<PG>>(ep); | |
614 | }); | |
615 | } | |
616 | ||
f67539c2 TL |
617 | std::optional<seastar::future<>> |
618 | OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) | |
11fdf7f2 | 619 | { |
11fdf7f2 | 620 | if (state.is_stopping()) { |
f67539c2 | 621 | return {}; |
11fdf7f2 | 622 | } |
f67539c2 TL |
623 | bool dispatched = true; |
624 | gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { | |
625 | switch (m->get_type()) { | |
626 | case CEPH_MSG_OSD_MAP: | |
627 | return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m)); | |
628 | case CEPH_MSG_OSD_OP: | |
629 | return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m)); | |
630 | case MSG_OSD_PG_CREATE2: | |
631 | shard_services.start_operation<CompoundPeeringRequest>( | |
632 | *this, | |
633 | conn, | |
634 | m); | |
635 | return seastar::now(); | |
636 | case MSG_COMMAND: | |
637 | return handle_command(conn, boost::static_pointer_cast<MCommand>(m)); | |
638 | case MSG_OSD_MARK_ME_DOWN: | |
639 | return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m)); | |
640 | case MSG_OSD_PG_PULL: | |
641 | [[fallthrough]]; | |
642 | case MSG_OSD_PG_PUSH: | |
643 | [[fallthrough]]; | |
644 | case MSG_OSD_PG_PUSH_REPLY: | |
645 | [[fallthrough]]; | |
646 | case MSG_OSD_PG_RECOVERY_DELETE: | |
647 | [[fallthrough]]; | |
648 | case MSG_OSD_PG_RECOVERY_DELETE_REPLY: | |
649 | [[fallthrough]]; | |
650 | case MSG_OSD_PG_SCAN: | |
651 | [[fallthrough]]; | |
652 | case MSG_OSD_PG_BACKFILL: | |
653 | [[fallthrough]]; | |
654 | case MSG_OSD_PG_BACKFILL_REMOVE: | |
655 | return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m)); | |
656 | case MSG_OSD_PG_LEASE: | |
657 | [[fallthrough]]; | |
658 | case MSG_OSD_PG_LEASE_ACK: | |
659 | [[fallthrough]]; | |
660 | case MSG_OSD_PG_NOTIFY2: | |
661 | [[fallthrough]]; | |
662 | case MSG_OSD_PG_INFO2: | |
663 | [[fallthrough]]; | |
664 | case MSG_OSD_PG_QUERY2: | |
665 | [[fallthrough]]; | |
666 | case MSG_OSD_BACKFILL_RESERVE: | |
667 | [[fallthrough]]; | |
668 | case MSG_OSD_RECOVERY_RESERVE: | |
669 | [[fallthrough]]; | |
670 | case MSG_OSD_PG_LOG: | |
671 | return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m)); | |
672 | case MSG_OSD_REPOP: | |
673 | return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m)); | |
674 | case MSG_OSD_REPOPREPLY: | |
675 | return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m)); | |
676 | case MSG_OSD_SCRUB2: | |
677 | return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m)); | |
678 | default: | |
679 | dispatched = false; | |
680 | return seastar::now(); | |
681 | } | |
682 | }); | |
683 | return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); | |
11fdf7f2 TL |
684 | } |
685 | ||
f67539c2 | 686 | void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) |
11fdf7f2 TL |
687 | { |
688 | // TODO: cleanup the session attached to this connection | |
689 | logger().warn("ms_handle_reset"); | |
11fdf7f2 TL |
690 | } |
691 | ||
f67539c2 | 692 | void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn) |
11fdf7f2 TL |
693 | { |
694 | logger().warn("ms_handle_remote_reset"); | |
11fdf7f2 TL |
695 | } |
696 | ||
9f95a23c | 697 | void OSD::handle_authentication(const EntityName& name, |
f67539c2 TL |
698 | const AuthCapsInfo& caps_info) |
699 | { | |
700 | // TODO: store the parsed cap and associate it with the connection | |
701 | if (caps_info.allow_all) { | |
702 | logger().debug("{} {} has all caps", __func__, name); | |
703 | return; | |
704 | } | |
705 | if (caps_info.caps.length() > 0) { | |
706 | auto p = caps_info.caps.cbegin(); | |
707 | string str; | |
708 | try { | |
709 | decode(str, p); | |
710 | } catch (ceph::buffer::error& e) { | |
711 | logger().warn("{} {} failed to decode caps string", __func__, name); | |
712 | return; | |
713 | } | |
714 | OSDCap caps; | |
715 | if (caps.parse(str)) { | |
716 | logger().debug("{} {} has caps {}", __func__, name, str); | |
717 | } else { | |
718 | logger().warn("{} {} failed to parse caps {}", __func__, name, str); | |
719 | } | |
720 | } | |
721 | } | |
722 | ||
723 | void OSD::update_stats() | |
9f95a23c | 724 | { |
f67539c2 TL |
725 | osd_stat_seq++; |
726 | osd_stat.up_from = get_up_epoch(); | |
727 | osd_stat.hb_peers = heartbeat->get_peers(); | |
728 | osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq; | |
729 | gate.dispatch_in_background("statfs", *this, [this] { | |
730 | (void) store->stat().then([this](store_statfs_t&& st) { | |
731 | osd_stat.statfs = st; | |
732 | }); | |
733 | }); | |
9f95a23c TL |
734 | } |
735 | ||
f67539c2 | 736 | MessageRef OSD::get_stats() const |
9f95a23c TL |
737 | { |
738 | // todo: m-to-n: collect stats using map-reduce | |
739 | // MPGStats::had_map_for is not used since PGMonitor was removed | |
740 | auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch()); | |
f67539c2 | 741 | m->osd_stat = osd_stat; |
9f95a23c TL |
742 | for (auto [pgid, pg] : pg_map.get_pgs()) { |
743 | if (pg->is_primary()) { | |
744 | auto stats = pg->get_stats(); | |
745 | // todo: update reported_epoch,reported_seq,last_fresh | |
746 | stats.reported_epoch = osdmap->get_epoch(); | |
747 | m->pg_stat.emplace(pgid.pgid, std::move(stats)); | |
748 | } | |
749 | } | |
750 | return m; | |
751 | } | |
752 | ||
f67539c2 TL |
753 | uint64_t OSD::send_pg_stats() |
754 | { | |
755 | // mgr client sends the report message in background | |
756 | mgrc->report(); | |
757 | return osd_stat.seq; | |
758 | } | |
759 | ||
11fdf7f2 TL |
760 | OSD::cached_map_t OSD::get_map() const |
761 | { | |
762 | return osdmap; | |
763 | } | |
764 | ||
765 | seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e) | |
766 | { | |
767 | // TODO: use LRU cache for managing osdmap, fallback to disk if we have to | |
768 | if (auto found = osdmaps.find(e); found) { | |
769 | return seastar::make_ready_future<cached_map_t>(std::move(found)); | |
770 | } else { | |
9f95a23c | 771 | return load_map(e).then([e, this](unique_ptr<OSDMap> osdmap) { |
11fdf7f2 TL |
772 | return seastar::make_ready_future<cached_map_t>( |
773 | osdmaps.insert(e, std::move(osdmap))); | |
774 | }); | |
775 | } | |
776 | } | |
777 | ||
778 | void OSD::store_map_bl(ceph::os::Transaction& t, | |
779 | epoch_t e, bufferlist&& bl) | |
780 | { | |
781 | meta_coll->store_map(t, e, bl); | |
782 | map_bl_cache.insert(e, std::move(bl)); | |
783 | } | |
784 | ||
785 | seastar::future<bufferlist> OSD::load_map_bl(epoch_t e) | |
786 | { | |
787 | if (std::optional<bufferlist> found = map_bl_cache.find(e); found) { | |
788 | return seastar::make_ready_future<bufferlist>(*found); | |
789 | } else { | |
790 | return meta_coll->load_map(e); | |
791 | } | |
792 | } | |
793 | ||
f67539c2 TL |
794 | seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls( |
795 | epoch_t first, | |
796 | epoch_t last) | |
797 | { | |
798 | return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first), | |
799 | boost::make_counting_iterator<epoch_t>(last + 1), | |
800 | [this](epoch_t e) { | |
801 | return load_map_bl(e).then([e](auto&& bl) { | |
802 | return seastar::make_ready_future<pair<epoch_t, bufferlist>>( | |
803 | std::make_pair(e, std::move(bl))); | |
804 | }); | |
805 | }, | |
806 | std::map<epoch_t, bufferlist>{}, | |
807 | [](auto&& bls, auto&& epoch_bl) { | |
808 | bls.emplace(std::move(epoch_bl)); | |
809 | return std::move(bls); | |
810 | }); | |
811 | } | |
812 | ||
9f95a23c TL |
813 | seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e) |
814 | { | |
815 | auto o = std::make_unique<OSDMap>(); | |
816 | if (e > 0) { | |
817 | return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable { | |
818 | o->decode(bl); | |
819 | return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o)); | |
820 | }); | |
821 | } else { | |
822 | return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o)); | |
823 | } | |
824 | } | |
825 | ||
11fdf7f2 TL |
826 | seastar::future<> OSD::store_maps(ceph::os::Transaction& t, |
827 | epoch_t start, Ref<MOSDMap> m) | |
828 | { | |
9f95a23c TL |
829 | return seastar::do_for_each(boost::make_counting_iterator(start), |
830 | boost::make_counting_iterator(m->get_last() + 1), | |
11fdf7f2 TL |
831 | [&t, m, this](epoch_t e) { |
832 | if (auto p = m->maps.find(e); p != m->maps.end()) { | |
833 | auto o = std::make_unique<OSDMap>(); | |
834 | o->decode(p->second); | |
835 | logger().info("store_maps osdmap.{}", e); | |
836 | store_map_bl(t, e, std::move(std::move(p->second))); | |
837 | osdmaps.insert(e, std::move(o)); | |
838 | return seastar::now(); | |
839 | } else if (auto p = m->incremental_maps.find(e); | |
840 | p != m->incremental_maps.end()) { | |
9f95a23c TL |
841 | return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) { |
842 | OSDMap::Incremental inc; | |
843 | auto i = bl.cbegin(); | |
844 | inc.decode(i); | |
845 | o->apply_incremental(inc); | |
846 | bufferlist fbl; | |
847 | o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED); | |
848 | store_map_bl(t, e, std::move(fbl)); | |
849 | osdmaps.insert(e, std::move(o)); | |
850 | return seastar::now(); | |
11fdf7f2 TL |
851 | }); |
852 | } else { | |
853 | logger().error("MOSDMap lied about what maps it had?"); | |
854 | return seastar::now(); | |
855 | } | |
856 | }); | |
857 | } | |
858 | ||
9f95a23c | 859 | bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m) |
11fdf7f2 | 860 | { |
9f95a23c TL |
861 | if (!conn->peer_is_mon()) { |
862 | logger().info("{} received from non-mon {}, {}", | |
863 | __func__, | |
864 | conn->get_peer_addr(), | |
865 | *m); | |
866 | return false; | |
11fdf7f2 | 867 | } |
9f95a23c | 868 | return true; |
11fdf7f2 TL |
869 | } |
870 | ||
9f95a23c TL |
871 | seastar::future<Ref<PG>> OSD::handle_pg_create_info( |
872 | std::unique_ptr<PGCreateInfo> info) { | |
873 | return seastar::do_with( | |
874 | std::move(info), | |
875 | [this](auto &info) -> seastar::future<Ref<PG>> { | |
876 | return get_map(info->epoch).then( | |
877 | [&info, this](cached_map_t startmap) -> | |
f67539c2 | 878 | seastar::future<std::tuple<Ref<PG>, cached_map_t>> { |
9f95a23c TL |
879 | const spg_t &pgid = info->pgid; |
880 | if (info->by_mon) { | |
881 | int64_t pool_id = pgid.pgid.pool(); | |
882 | const pg_pool_t *pool = osdmap->get_pg_pool(pool_id); | |
883 | if (!pool) { | |
884 | logger().debug( | |
885 | "{} ignoring pgid {}, pool dne", | |
886 | __func__, | |
887 | pgid); | |
f67539c2 TL |
888 | return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>( |
889 | std::make_tuple(Ref<PG>(), startmap)); | |
9f95a23c TL |
890 | } |
891 | ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus); | |
892 | if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) { | |
893 | // this ensures we do not process old creating messages after the | |
894 | // pool's initial pgs have been created (and pg are subsequently | |
895 | // allowed to split or merge). | |
896 | logger().debug( | |
897 | "{} dropping {} create, pool does not have CREATING flag set", | |
898 | __func__, | |
899 | pgid); | |
f67539c2 TL |
900 | return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>( |
901 | std::make_tuple(Ref<PG>(), startmap)); | |
9f95a23c TL |
902 | } |
903 | } | |
904 | return make_pg(startmap, pgid, true).then( | |
905 | [startmap=std::move(startmap)](auto pg) mutable { | |
f67539c2 TL |
906 | return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>( |
907 | std::make_tuple(std::move(pg), std::move(startmap))); | |
9f95a23c | 908 | }); |
f67539c2 | 909 | }).then([this, &info](auto&& ret) -> |
9f95a23c | 910 | seastar::future<Ref<PG>> { |
f67539c2 | 911 | auto [pg, startmap] = std::move(ret); |
9f95a23c TL |
912 | if (!pg) |
913 | return seastar::make_ready_future<Ref<PG>>(Ref<PG>()); | |
914 | PeeringCtx rctx{ceph_release_t::octopus}; | |
915 | const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool()); | |
916 | ||
917 | int up_primary, acting_primary; | |
918 | vector<int> up, acting; | |
919 | startmap->pg_to_up_acting_osds( | |
920 | info->pgid.pgid, &up, &up_primary, &acting, &acting_primary); | |
921 | ||
922 | int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard), | |
923 | acting); | |
924 | ||
925 | create_pg_collection( | |
926 | rctx.transaction, | |
927 | info->pgid, | |
928 | info->pgid.get_split_bits(pp->get_pg_num())); | |
929 | init_pg_ondisk( | |
930 | rctx.transaction, | |
931 | info->pgid, | |
932 | pp); | |
933 | ||
934 | pg->init( | |
935 | role, | |
936 | up, | |
937 | up_primary, | |
938 | acting, | |
939 | acting_primary, | |
940 | info->history, | |
941 | info->past_intervals, | |
942 | false, | |
943 | rctx.transaction); | |
944 | ||
945 | return shard_services.start_operation<PGAdvanceMap>( | |
946 | *this, pg, pg->get_osdmap_epoch(), | |
f67539c2 | 947 | osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] { |
9f95a23c TL |
948 | return seastar::make_ready_future<Ref<PG>>(pg); |
949 | }); | |
950 | }); | |
951 | }); | |
952 | } | |
953 | ||
f67539c2 | 954 | seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn, |
11fdf7f2 TL |
955 | Ref<MOSDMap> m) |
956 | { | |
957 | logger().info("handle_osd_map {}", *m); | |
958 | if (m->fsid != superblock.cluster_fsid) { | |
959 | logger().warn("fsid mismatched"); | |
960 | return seastar::now(); | |
961 | } | |
962 | if (state.is_initializing()) { | |
963 | logger().warn("i am still initializing"); | |
964 | return seastar::now(); | |
965 | } | |
966 | ||
967 | const auto first = m->get_first(); | |
968 | const auto last = m->get_last(); | |
9f95a23c TL |
969 | logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]", |
970 | first, last, superblock.newest_map, m->oldest_map, m->newest_map); | |
11fdf7f2 TL |
971 | // make sure there is something new, here, before we bother flushing |
972 | // the queues and such | |
973 | if (last <= superblock.newest_map) { | |
974 | return seastar::now(); | |
975 | } | |
976 | // missing some? | |
977 | bool skip_maps = false; | |
978 | epoch_t start = superblock.newest_map + 1; | |
979 | if (first > start) { | |
980 | logger().info("handle_osd_map message skips epochs {}..{}", | |
981 | start, first - 1); | |
982 | if (m->oldest_map <= start) { | |
9f95a23c | 983 | return shard_services.osdmap_subscribe(start, false); |
11fdf7f2 TL |
984 | } |
985 | // always try to get the full range of maps--as many as we can. this | |
986 | // 1- is good to have | |
987 | // 2- is at present the only way to ensure that we get a *full* map as | |
988 | // the first map! | |
989 | if (m->oldest_map < first) { | |
9f95a23c | 990 | return shard_services.osdmap_subscribe(m->oldest_map - 1, true); |
11fdf7f2 TL |
991 | } |
992 | skip_maps = true; | |
993 | start = first; | |
994 | } | |
995 | ||
996 | return seastar::do_with(ceph::os::Transaction{}, | |
997 | [=](auto& t) { | |
998 | return store_maps(t, start, m).then([=, &t] { | |
999 | // even if this map isn't from a mon, we may have satisfied our subscription | |
1000 | monc->sub_got("osdmap", last); | |
1001 | if (!superblock.oldest_map || skip_maps) { | |
1002 | superblock.oldest_map = first; | |
1003 | } | |
1004 | superblock.newest_map = last; | |
1005 | superblock.current_epoch = last; | |
1006 | ||
1007 | // note in the superblock that we were clean thru the prior epoch | |
1008 | if (boot_epoch && boot_epoch >= superblock.mounted) { | |
1009 | superblock.mounted = boot_epoch; | |
1010 | superblock.clean_thru = last; | |
1011 | } | |
1012 | meta_coll->store_superblock(t, superblock); | |
1013 | return store->do_transaction(meta_coll->collection(), std::move(t)); | |
1014 | }); | |
1015 | }).then([=] { | |
1016 | // TODO: write to superblock and commit the transaction | |
1017 | return committed_osd_maps(start, last, m); | |
1018 | }); | |
1019 | } | |
1020 | ||
1021 | seastar::future<> OSD::committed_osd_maps(version_t first, | |
1022 | version_t last, | |
1023 | Ref<MOSDMap> m) | |
1024 | { | |
1025 | logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); | |
1026 | // advance through the new maps | |
9f95a23c TL |
1027 | return seastar::do_for_each(boost::make_counting_iterator(first), |
1028 | boost::make_counting_iterator(last + 1), | |
1029 | [this](epoch_t cur) { | |
11fdf7f2 TL |
1030 | return get_map(cur).then([this](cached_map_t&& o) { |
1031 | osdmap = std::move(o); | |
9f95a23c TL |
1032 | shard_services.update_map(osdmap); |
1033 | if (up_epoch == 0 && | |
11fdf7f2 TL |
1034 | osdmap->is_up(whoami) && |
1035 | osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { | |
1036 | up_epoch = osdmap->get_epoch(); | |
1037 | if (!boot_epoch) { | |
1038 | boot_epoch = osdmap->get_epoch(); | |
1039 | } | |
1040 | } | |
1041 | }); | |
1042 | }).then([m, this] { | |
1043 | if (osdmap->is_up(whoami) && | |
1044 | osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && | |
1045 | bind_epoch < osdmap->get_up_from(whoami)) { | |
1046 | if (state.is_booting()) { | |
1047 | logger().info("osd.{}: activating...", whoami); | |
1048 | state.set_active(); | |
1049 | beacon_timer.arm_periodic( | |
1050 | std::chrono::seconds(local_conf()->osd_beacon_report_interval)); | |
f67539c2 | 1051 | tick_timer.arm_periodic( |
11fdf7f2 TL |
1052 | std::chrono::seconds(TICK_INTERVAL)); |
1053 | } | |
f67539c2 TL |
1054 | } else if (!osdmap->is_up(whoami)) { |
1055 | if (state.is_prestop()) { | |
1056 | got_stop_ack(); | |
1057 | return seastar::now(); | |
1058 | } | |
11fdf7f2 | 1059 | } |
9f95a23c TL |
1060 | check_osdmap_features(); |
1061 | // yay! | |
1062 | return consume_map(osdmap->get_epoch()); | |
1063 | }).then([m, this] { | |
11fdf7f2 TL |
1064 | if (state.is_active()) { |
1065 | logger().info("osd.{}: now active", whoami); | |
1066 | if (!osdmap->exists(whoami)) { | |
1067 | return shutdown(); | |
1068 | } | |
1069 | if (should_restart()) { | |
1070 | return restart(); | |
1071 | } else { | |
1072 | return seastar::now(); | |
1073 | } | |
1074 | } else if (state.is_preboot()) { | |
1075 | logger().info("osd.{}: now preboot", whoami); | |
1076 | ||
1077 | if (m->get_source().is_mon()) { | |
1078 | return _preboot(m->oldest_map, m->newest_map); | |
1079 | } else { | |
1080 | logger().info("osd.{}: start_boot", whoami); | |
1081 | return start_boot(); | |
1082 | } | |
1083 | } else { | |
1084 | logger().info("osd.{}: now {}", whoami, state); | |
1085 | // XXX | |
1086 | return seastar::now(); | |
1087 | } | |
1088 | }); | |
1089 | } | |
1090 | ||
f67539c2 | 1091 | seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn, |
9f95a23c TL |
1092 | Ref<MOSDOp> m) |
1093 | { | |
f67539c2 | 1094 | (void) shard_services.start_operation<ClientRequest>( |
9f95a23c | 1095 | *this, |
f67539c2 | 1096 | conn, |
9f95a23c TL |
1097 | std::move(m)); |
1098 | return seastar::now(); | |
1099 | } | |
1100 | ||
f67539c2 TL |
1101 | seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn, |
1102 | epoch_t first) | |
1103 | { | |
1104 | if (first >= superblock.oldest_map) { | |
1105 | return load_map_bls(first, superblock.newest_map) | |
1106 | .then([this, conn, first](auto&& bls) { | |
1107 | auto m = make_message<MOSDMap>(monc->get_fsid(), | |
1108 | osdmap->get_encoding_features()); | |
1109 | m->oldest_map = first; | |
1110 | m->newest_map = superblock.newest_map; | |
1111 | m->maps = std::move(bls); | |
1112 | return conn->send(m); | |
1113 | }); | |
1114 | } else { | |
1115 | return load_map_bl(osdmap->get_epoch()) | |
1116 | .then([this, conn](auto&& bl) mutable { | |
1117 | auto m = make_message<MOSDMap>(monc->get_fsid(), | |
1118 | osdmap->get_encoding_features()); | |
1119 | m->oldest_map = superblock.oldest_map; | |
1120 | m->newest_map = superblock.newest_map; | |
1121 | m->maps.emplace(osdmap->get_epoch(), std::move(bl)); | |
1122 | return conn->send(m); | |
1123 | }); | |
1124 | } | |
1125 | } | |
1126 | ||
1127 | seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn, | |
9f95a23c TL |
1128 | Ref<MOSDRepOp> m) |
1129 | { | |
1130 | m->finish_decode(); | |
f67539c2 | 1131 | (void) shard_services.start_operation<RepRequest>( |
9f95a23c | 1132 | *this, |
f67539c2 | 1133 | std::move(conn), |
9f95a23c TL |
1134 | std::move(m)); |
1135 | return seastar::now(); | |
1136 | } | |
1137 | ||
f67539c2 | 1138 | seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn, |
9f95a23c TL |
1139 | Ref<MOSDRepOpReply> m) |
1140 | { | |
1141 | const auto& pgs = pg_map.get_pgs(); | |
1142 | if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) { | |
1143 | m->finish_decode(); | |
1144 | pg->second->handle_rep_op_reply(conn, *m); | |
1145 | } else { | |
1146 | logger().warn("stale reply: {}", *m); | |
1147 | } | |
1148 | return seastar::now(); | |
1149 | } | |
1150 | ||
f67539c2 TL |
1151 | seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn, |
1152 | Ref<MOSDScrub2> m) | |
1153 | { | |
1154 | if (m->fsid != superblock.cluster_fsid) { | |
1155 | logger().warn("fsid mismatched"); | |
1156 | return seastar::now(); | |
1157 | } | |
1158 | return seastar::parallel_for_each(std::move(m->scrub_pgs), | |
1159 | [m, conn, this](spg_t pgid) { | |
1160 | pg_shard_t from_shard{static_cast<int>(m->get_source().num()), | |
1161 | pgid.shard}; | |
1162 | PeeringState::RequestScrub scrub_request{m->deep, m->repair}; | |
1163 | return shard_services.start_operation<RemotePeeringEvent>( | |
1164 | *this, | |
1165 | conn, | |
1166 | shard_services, | |
1167 | from_shard, | |
1168 | pgid, | |
1169 | PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second; | |
1170 | }); | |
1171 | } | |
1172 | ||
1173 | seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn, | |
1174 | Ref<MOSDMarkMeDown> m) | |
1175 | { | |
1176 | if (state.is_prestop()) { | |
1177 | got_stop_ack(); | |
1178 | } | |
1179 | return seastar::now(); | |
1180 | } | |
1181 | ||
1182 | seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn, | |
1183 | Ref<MOSDFastDispatchOp> m) | |
1184 | { | |
1185 | (void) shard_services.start_operation<RecoverySubRequest>( | |
1186 | *this, | |
1187 | conn, | |
1188 | std::move(m)); | |
1189 | return seastar::now(); | |
1190 | } | |
1191 | ||
11fdf7f2 TL |
1192 | bool OSD::should_restart() const |
1193 | { | |
1194 | if (!osdmap->is_up(whoami)) { | |
1195 | logger().info("map e {} marked osd.{} down", | |
1196 | osdmap->get_epoch(), whoami); | |
1197 | return true; | |
1198 | } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { | |
1199 | logger().error("map e {} had wrong client addr ({} != my {})", | |
1200 | osdmap->get_epoch(), | |
1201 | osdmap->get_addrs(whoami), | |
1202 | public_msgr->get_myaddrs()); | |
1203 | return true; | |
1204 | } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { | |
1205 | logger().error("map e {} had wrong cluster addr ({} != my {})", | |
1206 | osdmap->get_epoch(), | |
1207 | osdmap->get_cluster_addrs(whoami), | |
1208 | cluster_msgr->get_myaddrs()); | |
1209 | return true; | |
1210 | } else { | |
1211 | return false; | |
1212 | } | |
1213 | } | |
1214 | ||
1215 | seastar::future<> OSD::restart() | |
1216 | { | |
9f95a23c | 1217 | beacon_timer.cancel(); |
f67539c2 | 1218 | tick_timer.cancel(); |
11fdf7f2 TL |
1219 | up_epoch = 0; |
1220 | bind_epoch = osdmap->get_epoch(); | |
1221 | // TODO: promote to shutdown if being marked down for multiple times | |
1222 | // rebind messengers | |
1223 | return start_boot(); | |
1224 | } | |
1225 | ||
1226 | seastar::future<> OSD::shutdown() | |
1227 | { | |
1228 | // TODO | |
1229 | superblock.mounted = boot_epoch; | |
1230 | superblock.clean_thru = osdmap->get_epoch(); | |
1231 | return seastar::now(); | |
1232 | } | |
1233 | ||
1234 | seastar::future<> OSD::send_beacon() | |
1235 | { | |
9f95a23c TL |
1236 | if (!state.is_active()) { |
1237 | return seastar::now(); | |
1238 | } | |
11fdf7f2 TL |
1239 | // FIXME: min lec should be calculated from pg_stat |
1240 | // and should set m->pgs | |
1241 | epoch_t min_last_epoch_clean = osdmap->get_epoch(); | |
1242 | auto m = make_message<MOSDBeacon>(osdmap->get_epoch(), | |
9f95a23c | 1243 | min_last_epoch_clean, |
f67539c2 TL |
1244 | superblock.last_purged_snaps_scrub, |
1245 | local_conf()->osd_beacon_report_interval); | |
11fdf7f2 TL |
1246 | return monc->send_message(m); |
1247 | } | |
1248 | ||
f67539c2 | 1249 | void OSD::update_heartbeat_peers() |
11fdf7f2 TL |
1250 | { |
1251 | if (!state.is_active()) { | |
f67539c2 | 1252 | return; |
11fdf7f2 | 1253 | } |
9f95a23c | 1254 | for (auto& pg : pg_map.get_pgs()) { |
11fdf7f2 TL |
1255 | vector<int> up, acting; |
1256 | osdmap->pg_to_up_acting_osds(pg.first.pgid, | |
1257 | &up, nullptr, | |
1258 | &acting, nullptr); | |
9f95a23c TL |
1259 | for (int osd : boost::join(up, acting)) { |
1260 | if (osd == CRUSH_ITEM_NONE || osd == whoami) { | |
1261 | continue; | |
1262 | } else { | |
11fdf7f2 TL |
1263 | heartbeat->add_peer(osd, osdmap->get_epoch()); |
1264 | } | |
1265 | } | |
1266 | } | |
f67539c2 | 1267 | heartbeat->update_peers(whoami); |
9f95a23c TL |
1268 | } |
1269 | ||
1270 | seastar::future<> OSD::handle_peering_op( | |
f67539c2 | 1271 | crimson::net::ConnectionRef conn, |
9f95a23c TL |
1272 | Ref<MOSDPeeringOp> m) |
1273 | { | |
1274 | const int from = m->get_source().num(); | |
1275 | logger().debug("handle_peering_op on {} from {}", m->get_spg(), from); | |
f67539c2 TL |
1276 | std::unique_ptr<PGPeeringEvent> evt(m->get_event()); |
1277 | (void) shard_services.start_operation<RemotePeeringEvent>( | |
9f95a23c | 1278 | *this, |
f67539c2 | 1279 | conn, |
9f95a23c TL |
1280 | shard_services, |
1281 | pg_shard_t{from, m->get_spg().shard}, | |
1282 | m->get_spg(), | |
f67539c2 | 1283 | std::move(*evt)); |
9f95a23c TL |
1284 | return seastar::now(); |
1285 | } | |
1286 | ||
1287 | void OSD::check_osdmap_features() | |
1288 | { | |
1289 | heartbeat->set_require_authorizer(true); | |
1290 | } | |
1291 | ||
1292 | seastar::future<> OSD::consume_map(epoch_t epoch) | |
1293 | { | |
1294 | // todo: m-to-n: broadcast this news to all shards | |
1295 | auto &pgs = pg_map.get_pgs(); | |
1296 | return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) { | |
1297 | return shard_services.start_operation<PGAdvanceMap>( | |
1298 | *this, pg.second, pg.second->get_osdmap_epoch(), epoch, | |
1299 | PeeringCtx{ceph_release_t::octopus}, false).second; | |
1300 | }).then([epoch, this] { | |
1301 | osdmap_gate.got_map(epoch); | |
1302 | return seastar::make_ready_future(); | |
1303 | }); | |
1304 | } | |
1305 | ||
1306 | ||
1307 | blocking_future<Ref<PG>> | |
1308 | OSD::get_or_create_pg( | |
1309 | spg_t pgid, | |
1310 | epoch_t epoch, | |
1311 | std::unique_ptr<PGCreateInfo> info) | |
1312 | { | |
f67539c2 TL |
1313 | if (info) { |
1314 | auto [fut, creating] = pg_map.wait_for_pg(pgid); | |
1315 | if (!creating) { | |
1316 | pg_map.set_creating(pgid); | |
1317 | (void)handle_pg_create_info(std::move(info)); | |
1318 | } | |
1319 | return std::move(fut); | |
1320 | } else { | |
1321 | return make_ready_blocking_future<Ref<PG>>(pg_map.get_pg(pgid)); | |
9f95a23c | 1322 | } |
9f95a23c TL |
1323 | } |
1324 | ||
1325 | blocking_future<Ref<PG>> OSD::wait_for_pg( | |
1326 | spg_t pgid) | |
1327 | { | |
f67539c2 TL |
1328 | return pg_map.wait_for_pg(pgid).first; |
1329 | } | |
1330 | ||
1331 | Ref<PG> OSD::get_pg(spg_t pgid) | |
1332 | { | |
1333 | return pg_map.get_pg(pgid); | |
1334 | } | |
1335 | ||
1336 | seastar::future<> OSD::prepare_to_stop() | |
1337 | { | |
1338 | if (osdmap && osdmap->is_up(whoami)) { | |
1339 | state.set_prestop(); | |
1340 | const auto timeout = | |
1341 | std::chrono::duration_cast<std::chrono::milliseconds>( | |
1342 | std::chrono::duration<double>( | |
1343 | local_conf().get_val<double>("osd_mon_shutdown_timeout"))); | |
1344 | ||
1345 | return seastar::with_timeout( | |
1346 | seastar::timer<>::clock::now() + timeout, | |
1347 | monc->send_message( | |
1348 | make_message<MOSDMarkMeDown>( | |
1349 | monc->get_fsid(), | |
1350 | whoami, | |
1351 | osdmap->get_addrs(whoami), | |
1352 | osdmap->get_epoch(), | |
1353 | true)).then([this] { | |
1354 | return stop_acked.get_future(); | |
1355 | }) | |
1356 | ).handle_exception_type( | |
1357 | [](seastar::timed_out_error&) { | |
1358 | return seastar::now(); | |
1359 | }); | |
1360 | } | |
1361 | return seastar::now(); | |
9f95a23c TL |
1362 | } |
1363 | ||
11fdf7f2 | 1364 | } |