]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/shard_services.h
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / osd / shard_services.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <memory>
7
8 #include <boost/intrusive_ptr.hpp>
9 #include <seastar/core/future.hh>
10
11 #include "include/common_fwd.h"
12 #include "osd_operation.h"
13 #include "msg/MessageRef.h"
14 #include "crimson/common/exception.h"
15 #include "crimson/common/shared_lru.h"
16 #include "crimson/os/futurized_collection.h"
17 #include "osd/PeeringState.h"
18 #include "crimson/osd/osdmap_service.h"
19 #include "crimson/osd/osdmap_gate.h"
20 #include "crimson/osd/osd_meta.h"
21 #include "crimson/osd/object_context.h"
22 #include "crimson/osd/pg_map.h"
23 #include "crimson/osd/state.h"
24 #include "common/AsyncReserver.h"
25
26 namespace crimson::net {
27 class Messenger;
28 }
29
30 namespace crimson::mgr {
31 class Client;
32 }
33
34 namespace crimson::mon {
35 class Client;
36 }
37
38 namespace crimson::os {
39 class FuturizedStore;
40 }
41
42 class OSDMap;
43 class PeeringCtx;
44 class BufferedRecoveryMessages;
45
46 namespace crimson::osd {
47
48 // seastar::sharded puts start_single on core 0
49 constexpr core_id_t PRIMARY_CORE = 0;
50
51 class PGShardManager;
52
53 /**
54 * PerShardState
55 *
56 * Per-shard state holding instances local to each shard.
57 */
58 class PerShardState {
59 friend class ShardServices;
60 friend class PGShardManager;
61 using cached_map_t = OSDMapService::cached_map_t;
62 using local_cached_map_t = OSDMapService::local_cached_map_t;
63
64 const core_id_t core = seastar::this_shard_id();
65 #define assert_core() ceph_assert(seastar::this_shard_id() == core);
66
67 const int whoami;
68 crimson::os::FuturizedStore::Shard &store;
69 crimson::common::CephContext cct;
70
71 PerfCounters *perf = nullptr;
72 PerfCounters *recoverystate_perf = nullptr;
73
74 // Op Management
75 OSDOperationRegistry registry;
76 OperationThrottler throttler;
77
78 seastar::future<> dump_ops_in_flight(Formatter *f) const;
79
80 epoch_t up_epoch = 0;
81 OSDMapService::cached_map_t osdmap;
82 const auto &get_osdmap() const {
83 assert_core();
84 return osdmap;
85 }
86 void update_map(OSDMapService::cached_map_t new_osdmap) {
87 assert_core();
88 osdmap = std::move(new_osdmap);
89 }
90 void set_up_epoch(epoch_t epoch) {
91 assert_core();
92 up_epoch = epoch;
93 }
94
95 // prevent creating new osd operations when system is shutting down,
96 // this is necessary because there are chances that a new operation
97 // is created, after the interruption of all ongoing operations, and
98 // creats and waits on a new and may-never-resolve future, in which
99 // case the shutdown may never succeed.
100 bool stopping = false;
101 seastar::future<> stop_registry() {
102 assert_core();
103 crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__);
104 stopping = true;
105 return registry.stop();
106 }
107
108 // PGMap state
109 PGMap pg_map;
110
111 seastar::future<> stop_pgs();
112 std::map<pg_t, pg_stat_t> get_pg_stats() const;
113 seastar::future<> broadcast_map_to_pgs(
114 ShardServices &shard_services,
115 epoch_t epoch);
116
117 Ref<PG> get_pg(spg_t pgid);
118 template <typename F>
119 void for_each_pg(F &&f) const {
120 assert_core();
121 for (auto &pg : pg_map.get_pgs()) {
122 std::invoke(f, pg.first, pg.second);
123 }
124 }
125
126 template <typename T, typename... Args>
127 auto start_operation(Args&&... args) {
128 assert_core();
129 if (__builtin_expect(stopping, false)) {
130 throw crimson::common::system_shutdown_exception();
131 }
132 auto op = registry.create_operation<T>(std::forward<Args>(args)...);
133 crimson::get_logger(ceph_subsys_osd).info(
134 "PerShardState::{}, {}", __func__, *op);
135 auto fut = seastar::yield().then([op] {
136 return op->start().finally([op /* by copy */] {
137 // ensure the op's lifetime is appropriate. It is not enough to
138 // guarantee it's alive at the scheduling stages (i.e. `then()`
139 // calling) but also during the actual execution (i.e. when passed
140 // lambdas are actually run).
141 });
142 });
143 return std::make_pair(std::move(op), std::move(fut));
144 }
145
146 template <typename InterruptorT, typename T, typename... Args>
147 auto start_operation_may_interrupt(Args&&... args) {
148 assert_core();
149 if (__builtin_expect(stopping, false)) {
150 throw crimson::common::system_shutdown_exception();
151 }
152 auto op = registry.create_operation<T>(std::forward<Args>(args)...);
153 crimson::get_logger(ceph_subsys_osd).info(
154 "PerShardState::{}, {}", __func__, *op);
155 auto fut = InterruptorT::make_interruptible(
156 seastar::yield()
157 ).then_interruptible([op] {
158 return op->start().finally([op /* by copy */] {
159 // ensure the op's lifetime is appropriate. It is not enough to
160 // guarantee it's alive at the scheduling stages (i.e. `then()`
161 // calling) but also during the actual execution (i.e. when passed
162 // lambdas are actually run).
163 });
164 });
165 return std::make_pair(std::move(op), std::move(fut));
166 }
167
168 // tids for ops i issue, prefixed with core id to ensure uniqueness
169 ceph_tid_t next_tid;
170 ceph_tid_t get_tid() {
171 assert_core();
172 return next_tid++;
173 }
174
175 HeartbeatStampsRef get_hb_stamps(int peer);
176 std::map<int, HeartbeatStampsRef> heartbeat_stamps;
177
178 // Time state
179 const ceph::mono_time startup_time;
180 ceph::signedspan get_mnow() const {
181 assert_core();
182 return ceph::mono_clock::now() - startup_time;
183 }
184
185 public:
186 PerShardState(
187 int whoami,
188 ceph::mono_time startup_time,
189 PerfCounters *perf,
190 PerfCounters *recoverystate_perf,
191 crimson::os::FuturizedStore &store);
192 };
193
194 /**
195 * OSDSingletonState
196 *
197 * OSD-wide singleton holding instances that need to be accessible
198 * from all PGs.
199 */
200 class OSDSingletonState : public md_config_obs_t {
201 friend class ShardServices;
202 friend class PGShardManager;
203 using cached_map_t = OSDMapService::cached_map_t;
204 using local_cached_map_t = OSDMapService::local_cached_map_t;
205
206 public:
207 OSDSingletonState(
208 int whoami,
209 crimson::net::Messenger &cluster_msgr,
210 crimson::net::Messenger &public_msgr,
211 crimson::mon::Client &monc,
212 crimson::mgr::Client &mgrc);
213
214 private:
215 const int whoami;
216
217 crimson::common::CephContext cct;
218 PerfCounters *perf = nullptr;
219 PerfCounters *recoverystate_perf = nullptr;
220
221 OSDState osd_state;
222
223 SharedLRU<epoch_t, OSDMap> osdmaps;
224 SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
225
226 cached_map_t osdmap;
227 cached_map_t &get_osdmap() { return osdmap; }
228 void update_map(cached_map_t new_osdmap) {
229 osdmap = std::move(new_osdmap);
230 }
231 OSD_OSDMapGate osdmap_gate;
232
233 crimson::net::Messenger &cluster_msgr;
234 crimson::net::Messenger &public_msgr;
235
236 seastar::future<> send_to_osd(int peer, MessageURef m, epoch_t from_epoch);
237
238 crimson::mon::Client &monc;
239 seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
240
241 crimson::mgr::Client &mgrc;
242
243 std::unique_ptr<OSDMeta> meta_coll;
244 template <typename... Args>
245 void init_meta_coll(Args&&... args) {
246 meta_coll = std::make_unique<OSDMeta>(std::forward<Args>(args)...);
247 }
248 OSDMeta &get_meta_coll() {
249 assert(meta_coll);
250 return *meta_coll;
251 }
252
253 OSDSuperblock superblock;
254 void set_superblock(OSDSuperblock _superblock) {
255 superblock = std::move(_superblock);
256 }
257
258 seastar::future<> send_incremental_map(
259 crimson::net::Connection &conn,
260 epoch_t first);
261
262 seastar::future<> send_incremental_map_to_osd(int osd, epoch_t first);
263
264 auto get_pool_info(int64_t poolid) {
265 return get_meta_coll().load_final_pool_info(poolid);
266 }
267
268 // global pg temp state
269 struct pg_temp_t {
270 std::vector<int> acting;
271 bool forced = false;
272 };
273 std::map<pg_t, pg_temp_t> pg_temp_wanted;
274 std::map<pg_t, pg_temp_t> pg_temp_pending;
275 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
276
277 void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
278 bool forced = false);
279 void remove_want_pg_temp(pg_t pgid);
280 void requeue_pg_temp();
281 seastar::future<> send_pg_temp();
282
283 // TODO: add config to control mapping
284 PGShardMapping pg_to_shard_mapping{0, seastar::smp::count};
285
286 std::set<pg_t> pg_created;
287 seastar::future<> send_pg_created(pg_t pgid);
288 seastar::future<> send_pg_created();
289 void prune_pg_created();
290
291 struct DirectFinisher {
292 void queue(Context *c) {
293 c->complete(0);
294 }
295 } finisher;
296 AsyncReserver<spg_t, DirectFinisher> local_reserver;
297 AsyncReserver<spg_t, DirectFinisher> remote_reserver;
298 AsyncReserver<spg_t, DirectFinisher> snap_reserver;
299
300 epoch_t up_thru_wanted = 0;
301 seastar::future<> send_alive(epoch_t want);
302
303 const char** get_tracked_conf_keys() const final;
304 void handle_conf_change(
305 const ConfigProxy& conf,
306 const std::set <std::string> &changed) final;
307
308 seastar::future<local_cached_map_t> get_local_map(epoch_t e);
309 seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e);
310 seastar::future<bufferlist> load_map_bl(epoch_t e);
311 seastar::future<std::map<epoch_t, bufferlist>>
312 load_map_bls(epoch_t first, epoch_t last);
313 void store_map_bl(ceph::os::Transaction& t,
314 epoch_t e, bufferlist&& bl);
315 seastar::future<> store_maps(ceph::os::Transaction& t,
316 epoch_t start, Ref<MOSDMap> m);
317 };
318
319 /**
320 * Represents services available to each PG
321 */
322 class ShardServices : public OSDMapService {
323 friend class PGShardManager;
324 using cached_map_t = OSDMapService::cached_map_t;
325 using local_cached_map_t = OSDMapService::local_cached_map_t;
326
327 PerShardState local_state;
328 seastar::sharded<OSDSingletonState> &osd_singleton_state;
329
330 template <typename F, typename... Args>
331 auto with_singleton(F &&f, Args&&... args) {
332 return osd_singleton_state.invoke_on(
333 PRIMARY_CORE,
334 std::forward<F>(f),
335 std::forward<Args>(args)...
336 );
337 }
338
339 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
340 template <typename... Args> \
341 auto FROM_METHOD(Args&&... args) const { \
342 return TARGET.TO_METHOD(std::forward<Args>(args)...); \
343 }
344
345 #define FORWARD(FROM_METHOD, TO_METHOD, TARGET) \
346 template <typename... Args> \
347 auto FROM_METHOD(Args&&... args) { \
348 return TARGET.TO_METHOD(std::forward<Args>(args)...); \
349 }
350
351 #define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state)
352 #define FORWARD_TO_LOCAL_CONST(METHOD) FORWARD_CONST( \
353 METHOD, METHOD, local_state) \
354
355 #define FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, TARGET) \
356 template <typename... Args> \
357 auto METHOD(Args&&... args) { \
358 return with_singleton( \
359 [](auto &local_state, auto&&... args) { \
360 return local_state.TARGET( \
361 std::forward<decltype(args)>(args)...); \
362 }, std::forward<Args>(args)...); \
363 }
364 #define FORWARD_TO_OSD_SINGLETON(METHOD) \
365 FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD)
366
367 public:
368 template <typename... PSSArgs>
369 ShardServices(
370 seastar::sharded<OSDSingletonState> &osd_singleton_state,
371 PSSArgs&&... args)
372 : local_state(std::forward<PSSArgs>(args)...),
373 osd_singleton_state(osd_singleton_state) {}
374
375 FORWARD_TO_OSD_SINGLETON(send_to_osd)
376
377 crimson::os::FuturizedStore::Shard &get_store() {
378 return local_state.store;
379 }
380
381 auto remove_pg(spg_t pgid) {
382 local_state.pg_map.remove_pg(pgid);
383 return with_singleton(
384 [pgid](auto &osstate) {
385 osstate.pg_to_shard_mapping.remove_pg(pgid);
386 });
387 }
388
389 crimson::common::CephContext *get_cct() {
390 return &(local_state.cct);
391 }
392
393 template <typename T, typename... Args>
394 auto start_operation(Args&&... args) {
395 return local_state.start_operation<T>(std::forward<Args>(args)...);
396 }
397
398 template <typename InterruptorT, typename T, typename... Args>
399 auto start_operation_may_interrupt(Args&&... args) {
400 return local_state.start_operation_may_interrupt<
401 InterruptorT, T>(std::forward<Args>(args)...);
402 }
403
404 auto &get_registry() { return local_state.registry; }
405
406 // Loggers
407 PerfCounters &get_recoverystate_perf_logger() {
408 return *local_state.recoverystate_perf;
409 }
410 PerfCounters &get_perf_logger() {
411 return *local_state.perf;
412 }
413
414 // Diagnostics
415 FORWARD_TO_LOCAL_CONST(dump_ops_in_flight);
416
417 // Local PG Management
418 seastar::future<Ref<PG>> make_pg(
419 cached_map_t create_map,
420 spg_t pgid,
421 bool do_create);
422 seastar::future<Ref<PG>> handle_pg_create_info(
423 std::unique_ptr<PGCreateInfo> info);
424
425 using get_or_create_pg_ertr = PGMap::wait_for_pg_ertr;
426 using get_or_create_pg_ret = get_or_create_pg_ertr::future<Ref<PG>>;
427 get_or_create_pg_ret get_or_create_pg(
428 PGMap::PGCreationBlockingEvent::TriggerI&&,
429 spg_t pgid,
430 epoch_t epoch,
431 std::unique_ptr<PGCreateInfo> info);
432
433 using wait_for_pg_ertr = PGMap::wait_for_pg_ertr;
434 using wait_for_pg_ret = wait_for_pg_ertr::future<Ref<PG>>;
435 wait_for_pg_ret wait_for_pg(
436 PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
437 seastar::future<Ref<PG>> load_pg(spg_t pgid);
438
439 /// Dispatch and reset ctx transaction
440 seastar::future<> dispatch_context_transaction(
441 crimson::os::CollectionRef col, PeeringCtx &ctx);
442
443 /// Dispatch and reset ctx messages
444 seastar::future<> dispatch_context_messages(
445 BufferedRecoveryMessages &&ctx);
446
447 /// Dispatch ctx and dispose of context
448 seastar::future<> dispatch_context(
449 crimson::os::CollectionRef col,
450 PeeringCtx &&ctx);
451
452 /// Dispatch ctx and dispose of ctx, transaction must be empty
453 seastar::future<> dispatch_context(
454 PeeringCtx &&ctx) {
455 return dispatch_context({}, std::move(ctx));
456 }
457
458 /// Return per-core tid
459 ceph_tid_t get_tid() { return local_state.get_tid(); }
460
461 /// Return core-local pg count * number of cores
462 unsigned get_num_local_pgs() const {
463 return local_state.pg_map.get_pg_count();
464 }
465
466 // OSDMapService
467 cached_map_t get_map() const final { return local_state.get_osdmap(); }
468 epoch_t get_up_epoch() const final { return local_state.up_epoch; }
469 seastar::future<cached_map_t> get_map(epoch_t e) final {
470 return with_singleton(
471 [](auto &sstate, epoch_t e) {
472 return sstate.get_local_map(
473 e
474 ).then([](auto lmap) {
475 return seastar::foreign_ptr<local_cached_map_t>(lmap);
476 });
477 }, e).then([](auto fmap) {
478 return make_local_shared_foreign(std::move(fmap));
479 });
480 }
481
482 FORWARD_TO_OSD_SINGLETON(get_pool_info)
483 FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
484
485 FORWARD_TO_OSD_SINGLETON(send_incremental_map)
486 FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd)
487
488 FORWARD_TO_OSD_SINGLETON(osdmap_subscribe)
489 FORWARD_TO_OSD_SINGLETON(queue_want_pg_temp)
490 FORWARD_TO_OSD_SINGLETON(remove_want_pg_temp)
491 FORWARD_TO_OSD_SINGLETON(requeue_pg_temp)
492 FORWARD_TO_OSD_SINGLETON(send_pg_created)
493 FORWARD_TO_OSD_SINGLETON(send_alive)
494 FORWARD_TO_OSD_SINGLETON(send_pg_temp)
495 FORWARD_TO_LOCAL_CONST(get_mnow)
496 FORWARD_TO_LOCAL(get_hb_stamps)
497
498 FORWARD(pg_created, pg_created, local_state.pg_map)
499
500 FORWARD_TO_OSD_SINGLETON_TARGET(
501 local_update_priority,
502 local_reserver.update_priority)
503 FORWARD_TO_OSD_SINGLETON_TARGET(
504 local_cancel_reservation,
505 local_reserver.cancel_reservation)
506 FORWARD_TO_OSD_SINGLETON_TARGET(
507 local_dump_reservations,
508 local_reserver.dump)
509 FORWARD_TO_OSD_SINGLETON_TARGET(
510 remote_cancel_reservation,
511 remote_reserver.cancel_reservation)
512 FORWARD_TO_OSD_SINGLETON_TARGET(
513 remote_dump_reservations,
514 remote_reserver.dump)
515 FORWARD_TO_OSD_SINGLETON_TARGET(
516 snap_cancel_reservation,
517 snap_reserver.cancel_reservation)
518 FORWARD_TO_OSD_SINGLETON_TARGET(
519 snap_dump_reservations,
520 snap_reserver.dump)
521
522 Context *invoke_context_on_core(core_id_t core, Context *c) {
523 if (!c) return nullptr;
524 return new LambdaContext([core, c](int code) {
525 std::ignore = seastar::smp::submit_to(
526 core,
527 [c, code] {
528 c->complete(code);
529 });
530 });
531 }
532 seastar::future<> local_request_reservation(
533 spg_t item,
534 Context *on_reserved,
535 unsigned prio,
536 Context *on_preempt) {
537 return with_singleton(
538 [item, prio](OSDSingletonState &singleton,
539 Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
540 return singleton.local_reserver.request_reservation(
541 item,
542 wrapped_on_reserved,
543 prio,
544 wrapped_on_preempt);
545 },
546 invoke_context_on_core(seastar::this_shard_id(), on_reserved),
547 invoke_context_on_core(seastar::this_shard_id(), on_preempt));
548 }
549 seastar::future<> remote_request_reservation(
550 spg_t item,
551 Context *on_reserved,
552 unsigned prio,
553 Context *on_preempt) {
554 return with_singleton(
555 [item, prio](OSDSingletonState &singleton,
556 Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
557 return singleton.remote_reserver.request_reservation(
558 item,
559 wrapped_on_reserved,
560 prio,
561 wrapped_on_preempt);
562 },
563 invoke_context_on_core(seastar::this_shard_id(), on_reserved),
564 invoke_context_on_core(seastar::this_shard_id(), on_preempt));
565 }
566 seastar::future<> snap_request_reservation(
567 spg_t item,
568 Context *on_reserved,
569 unsigned prio) {
570 return with_singleton(
571 [item, prio](OSDSingletonState &singleton,
572 Context *wrapped_on_reserved) {
573 return singleton.snap_reserver.request_reservation(
574 item,
575 wrapped_on_reserved,
576 prio);
577 },
578 invoke_context_on_core(seastar::this_shard_id(), on_reserved));
579 }
580
581 #undef FORWARD_CONST
582 #undef FORWARD
583 #undef FORWARD_TO_OSD_SINGLETON
584 #undef FORWARD_TO_LOCAL
585 #undef FORWARD_TO_LOCAL_CONST
586 };
587
588 }
589
590 #if FMT_VERSION >= 90000
591 template <> struct fmt::formatter<crimson::osd::OSDSingletonState::pg_temp_t> : fmt::ostream_formatter {};
592 #endif