]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "crimson/osd/pg_shard_manager.h" | |
5 | #include "crimson/osd/pg.h" | |
6 | ||
7 | namespace { | |
8 | seastar::logger& logger() { | |
9 | return crimson::get_logger(ceph_subsys_osd); | |
10 | } | |
11 | } | |
12 | ||
13 | namespace crimson::osd { | |
14 | ||
15 | seastar::future<> PGShardManager::start( | |
16 | const int whoami, | |
17 | crimson::net::Messenger &cluster_msgr, | |
18 | crimson::net::Messenger &public_msgr, | |
19 | crimson::mon::Client &monc, | |
20 | crimson::mgr::Client &mgrc, | |
21 | crimson::os::FuturizedStore &store) | |
22 | { | |
23 | ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); | |
24 | return osd_singleton_state.start_single( | |
25 | whoami, std::ref(cluster_msgr), std::ref(public_msgr), | |
26 | std::ref(monc), std::ref(mgrc) | |
27 | ).then([this, whoami, &store] { | |
28 | ceph::mono_time startup_time = ceph::mono_clock::now(); | |
29 | return shard_services.start( | |
30 | std::ref(osd_singleton_state), | |
31 | whoami, | |
32 | startup_time, | |
33 | osd_singleton_state.local().perf, | |
34 | osd_singleton_state.local().recoverystate_perf, | |
35 | std::ref(store)); | |
36 | }); | |
37 | } | |
38 | ||
39 | seastar::future<> PGShardManager::stop() | |
40 | { | |
41 | ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); | |
42 | return shard_services.stop( | |
43 | ).then([this] { | |
44 | return osd_singleton_state.stop(); | |
45 | }); | |
46 | } | |
47 | ||
48 | seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) | |
49 | { | |
50 | ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); | |
51 | return store.list_collections( | |
52 | ).then([this](auto colls_cores) { | |
53 | return seastar::parallel_for_each( | |
54 | colls_cores, | |
55 | [this](auto coll_core) { | |
56 | auto[coll, shard_core] = coll_core; | |
57 | spg_t pgid; | |
58 | if (coll.is_pg(&pgid)) { | |
59 | auto core = get_osd_singleton_state( | |
60 | ).pg_to_shard_mapping.maybe_create_pg( | |
61 | pgid, shard_core); | |
62 | return with_remote_shard_state( | |
63 | core, | |
64 | [pgid]( | |
65 | PerShardState &per_shard_state, | |
66 | ShardServices &shard_services) { | |
67 | return shard_services.load_pg( | |
68 | pgid | |
69 | ).then([pgid, &per_shard_state](auto &&pg) { | |
70 | logger().info("load_pgs: loaded {}", pgid); | |
71 | per_shard_state.pg_map.pg_loaded(pgid, std::move(pg)); | |
72 | return seastar::now(); | |
73 | }); | |
74 | }); | |
75 | } else if (coll.is_temp(&pgid)) { | |
76 | logger().warn( | |
77 | "found temp collection on crimson osd, should be impossible: {}", | |
78 | coll); | |
79 | ceph_assert(0 == "temp collection on crimson osd, should be impossible"); | |
80 | return seastar::now(); | |
81 | } else { | |
82 | logger().warn("ignoring unrecognized collection: {}", coll); | |
83 | return seastar::now(); | |
84 | } | |
85 | }); | |
86 | }); | |
87 | } | |
88 | ||
89 | seastar::future<> PGShardManager::stop_pgs() | |
90 | { | |
91 | ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); | |
92 | return shard_services.invoke_on_all([](auto &local_service) { | |
93 | return local_service.local_state.stop_pgs(); | |
94 | }); | |
95 | } | |
96 | ||
97 | seastar::future<std::map<pg_t, pg_stat_t>> | |
98 | PGShardManager::get_pg_stats() const | |
99 | { | |
100 | ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); | |
101 | return shard_services.map_reduce0( | |
102 | [](auto &local) { | |
103 | return local.local_state.get_pg_stats(); | |
104 | }, | |
105 | std::map<pg_t, pg_stat_t>(), | |
106 | [](auto &&left, auto &&right) { | |
107 | left.merge(std::move(right)); | |
108 | return std::move(left); | |
109 | }); | |
110 | } | |
111 | ||
112 | seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch) | |
113 | { | |
114 | ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); | |
115 | return shard_services.invoke_on_all([epoch](auto &local_service) { | |
116 | return local_service.local_state.broadcast_map_to_pgs( | |
117 | local_service, epoch | |
118 | ); | |
119 | }).then([this, epoch] { | |
120 | get_osd_singleton_state().osdmap_gate.got_map(epoch); | |
121 | return seastar::now(); | |
122 | }); | |
123 | } | |
124 | ||
125 | seastar::future<> PGShardManager::set_up_epoch(epoch_t e) { | |
126 | ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); | |
127 | return shard_services.invoke_on_all( | |
128 | seastar::smp_submit_to_options{}, | |
129 | [e](auto &local_service) { | |
130 | local_service.local_state.set_up_epoch(e); | |
131 | return seastar::now(); | |
132 | }); | |
133 | } | |
134 | ||
135 | } |