]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <fmt/format.h> | |
5 | ||
6 | #include "crimson/common/exception.h" | |
7 | #include "crimson/osd/recovery_backend.h" | |
8 | #include "crimson/osd/pg.h" | |
9 | #include "crimson/osd/pg_backend.h" | |
10 | ||
11 | #include "messages/MOSDFastDispatchOp.h" | |
12 | #include "osd/osd_types.h" | |
13 | ||
14 | namespace { | |
15 | seastar::logger& logger() { | |
16 | return crimson::get_logger(ceph_subsys_osd); | |
17 | } | |
18 | } | |
19 | ||
20 | hobject_t RecoveryBackend::get_temp_recovery_object( | |
21 | const hobject_t& target, | |
22 | eversion_t version) const | |
23 | { | |
24 | hobject_t hoid = | |
25 | target.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}", | |
26 | pg.get_info().pgid, | |
27 | version, | |
28 | pg.get_info().history.same_interval_since, | |
29 | target.snap)); | |
30 | logger().debug("{} {}", __func__, hoid); | |
31 | return hoid; | |
32 | } | |
33 | ||
34 | void RecoveryBackend::clean_up(ceph::os::Transaction& t, | |
35 | std::string_view why) | |
36 | { | |
37 | for (auto& soid : temp_contents) { | |
38 | t.remove(pg.get_collection_ref()->get_cid(), | |
39 | ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard)); | |
40 | } | |
41 | temp_contents.clear(); | |
42 | ||
43 | for (auto& [soid, recovery_waiter] : recovering) { | |
44 | if ((recovery_waiter.pi && recovery_waiter.pi->is_complete()) | |
45 | || (!recovery_waiter.pi | |
46 | && recovery_waiter.obc && recovery_waiter.obc->obs.exists)) { | |
47 | recovery_waiter.obc->interrupt( | |
48 | ::crimson::common::actingset_changed( | |
49 | pg.is_primary())); | |
50 | recovery_waiter.interrupt(why); | |
51 | } | |
52 | } | |
53 | recovering.clear(); | |
54 | } | |
55 | ||
56 | void RecoveryBackend::WaitForObjectRecovery::stop() { | |
57 | readable.set_exception( | |
58 | crimson::common::system_shutdown_exception()); | |
59 | recovered.set_exception( | |
60 | crimson::common::system_shutdown_exception()); | |
61 | pulled.set_exception( | |
62 | crimson::common::system_shutdown_exception()); | |
63 | for (auto& [pg_shard, pr] : pushes) { | |
64 | pr.set_exception( | |
65 | crimson::common::system_shutdown_exception()); | |
66 | } | |
67 | } | |
68 | ||
69 | void RecoveryBackend::handle_backfill_finish( | |
70 | MOSDPGBackfill& m) | |
71 | { | |
72 | logger().debug("{}", __func__); | |
73 | ceph_assert(!pg.is_primary()); | |
74 | ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 1); | |
20effc67 | 75 | auto reply = crimson::make_message<MOSDPGBackfill>( |
f67539c2 TL |
76 | MOSDPGBackfill::OP_BACKFILL_FINISH_ACK, |
77 | pg.get_osdmap_epoch(), | |
78 | m.query_epoch, | |
79 | spg_t(pg.get_pgid().pgid, pg.get_primary().shard)); | |
80 | reply->set_priority(pg.get_recovery_op_priority()); | |
81 | std::ignore = m.get_connection()->send(std::move(reply)); | |
82 | shard_services.start_operation<crimson::osd::LocalPeeringEvent>( | |
83 | static_cast<crimson::osd::PG*>(&pg), | |
84 | shard_services, | |
85 | pg.get_pg_whoami(), | |
86 | pg.get_pgid(), | |
87 | pg.get_osdmap_epoch(), | |
88 | pg.get_osdmap_epoch(), | |
89 | RecoveryDone{}); | |
90 | } | |
91 | ||
20effc67 TL |
92 | RecoveryBackend::interruptible_future<> |
93 | RecoveryBackend::handle_backfill_progress( | |
f67539c2 TL |
94 | MOSDPGBackfill& m) |
95 | { | |
96 | logger().debug("{}", __func__); | |
97 | ceph_assert(!pg.is_primary()); | |
98 | ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 2); | |
99 | ||
100 | ObjectStore::Transaction t; | |
101 | pg.get_peering_state().update_backfill_progress( | |
102 | m.last_backfill, | |
103 | m.stats, | |
104 | m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS, | |
105 | t); | |
20effc67 | 106 | logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction..."); |
f67539c2 | 107 | return shard_services.get_store().do_transaction( |
20effc67 | 108 | pg.get_collection_ref(), std::move(t)).or_terminate(); |
f67539c2 TL |
109 | } |
110 | ||
20effc67 TL |
111 | RecoveryBackend::interruptible_future<> |
112 | RecoveryBackend::handle_backfill_finish_ack( | |
f67539c2 TL |
113 | MOSDPGBackfill& m) |
114 | { | |
115 | logger().debug("{}", __func__); | |
116 | ceph_assert(pg.is_primary()); | |
117 | ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 3); | |
118 | // TODO: | |
119 | // finish_recovery_op(hobject_t::get_max()); | |
120 | return seastar::now(); | |
121 | } | |
122 | ||
20effc67 TL |
123 | RecoveryBackend::interruptible_future<> |
124 | RecoveryBackend::handle_backfill( | |
f67539c2 TL |
125 | MOSDPGBackfill& m) |
126 | { | |
127 | logger().debug("{}", __func__); | |
20effc67 TL |
128 | if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { |
129 | logger().debug("{}: discarding {}", __func__, m); | |
130 | return seastar::now(); | |
131 | } | |
f67539c2 TL |
132 | switch (m.op) { |
133 | case MOSDPGBackfill::OP_BACKFILL_FINISH: | |
134 | handle_backfill_finish(m); | |
135 | [[fallthrough]]; | |
136 | case MOSDPGBackfill::OP_BACKFILL_PROGRESS: | |
137 | return handle_backfill_progress(m); | |
138 | case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK: | |
139 | return handle_backfill_finish_ack(m); | |
140 | default: | |
141 | ceph_assert("unknown op type for pg backfill"); | |
142 | return seastar::now(); | |
143 | } | |
144 | } | |
145 | ||
20effc67 TL |
146 | RecoveryBackend::interruptible_future<> |
147 | RecoveryBackend::handle_backfill_remove( | |
f67539c2 TL |
148 | MOSDPGBackfillRemove& m) |
149 | { | |
150 | logger().debug("{} m.ls={}", __func__, m.ls); | |
151 | assert(m.get_type() == MSG_OSD_PG_BACKFILL_REMOVE); | |
20effc67 TL |
152 | if (pg.can_discard_replica_op(m)) { |
153 | logger().debug("{}: discarding {}", __func__, m); | |
154 | return seastar::now(); | |
155 | } | |
f67539c2 TL |
156 | ObjectStore::Transaction t; |
157 | for ([[maybe_unused]] const auto& [soid, ver] : m.ls) { | |
158 | // TODO: the reserved space management. PG::try_reserve_recovery_space(). | |
159 | t.remove(pg.get_collection_ref()->get_cid(), | |
160 | ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard)); | |
161 | } | |
20effc67 | 162 | logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction..."); |
f67539c2 | 163 | return shard_services.get_store().do_transaction( |
20effc67 | 164 | pg.get_collection_ref(), std::move(t)).or_terminate(); |
f67539c2 TL |
165 | } |
166 | ||
20effc67 TL |
167 | RecoveryBackend::interruptible_future<BackfillInterval> |
168 | RecoveryBackend::scan_for_backfill( | |
f67539c2 TL |
169 | const hobject_t& start, |
170 | [[maybe_unused]] const std::int64_t min, | |
171 | const std::int64_t max) | |
172 | { | |
173 | logger().debug("{} starting from {}", __func__, start); | |
174 | auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>(); | |
20effc67 | 175 | return backend->list_objects(start, max).then_interruptible( |
f67539c2 TL |
176 | [this, start, version_map] (auto&& ret) { |
177 | auto&& [objects, next] = std::move(ret); | |
20effc67 TL |
178 | return interruptor::parallel_for_each(std::move(objects), |
179 | [this, version_map] (const hobject_t& object) | |
180 | -> interruptible_future<> { | |
f67539c2 TL |
181 | crimson::osd::ObjectContextRef obc; |
182 | if (pg.is_primary()) { | |
183 | obc = shard_services.obc_registry.maybe_get_cached_obc(object); | |
184 | } | |
185 | if (obc) { | |
186 | if (obc->obs.exists) { | |
187 | logger().debug("scan_for_backfill found (primary): {} {}", | |
188 | object, obc->obs.oi.version); | |
189 | version_map->emplace(object, obc->obs.oi.version); | |
190 | } else { | |
191 | // if the object does not exist here, it must have been removed | |
192 | // between the collection_list_partial and here. This can happen | |
193 | // for the first item in the range, which is usually last_backfill. | |
194 | } | |
195 | return seastar::now(); | |
196 | } else { | |
20effc67 | 197 | return backend->load_metadata(object).safe_then_interruptible( |
f67539c2 TL |
198 | [version_map, object] (auto md) { |
199 | if (md->os.exists) { | |
200 | logger().debug("scan_for_backfill found: {} {}", | |
201 | object, md->os.oi.version); | |
202 | version_map->emplace(object, md->os.oi.version); | |
203 | } | |
204 | return seastar::now(); | |
205 | }, PGBackend::load_metadata_ertr::assert_all{}); | |
206 | } | |
20effc67 | 207 | }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] { |
f67539c2 TL |
208 | BackfillInterval bi; |
209 | bi.begin = std::move(start); | |
210 | bi.end = std::move(next); | |
211 | bi.version = pg.get_info().last_update; | |
212 | bi.objects = std::move(*version_map); | |
213 | logger().debug("{} BackfillInterval filled, leaving", | |
214 | "scan_for_backfill"); | |
215 | return seastar::make_ready_future<BackfillInterval>(std::move(bi)); | |
216 | }); | |
217 | }); | |
218 | } | |
219 | ||
20effc67 TL |
220 | RecoveryBackend::interruptible_future<> |
221 | RecoveryBackend::handle_scan_get_digest( | |
f67539c2 TL |
222 | MOSDPGScan& m) |
223 | { | |
224 | logger().debug("{}", __func__); | |
225 | if (false /* FIXME: check for backfill too full */) { | |
226 | std::ignore = shard_services.start_operation<crimson::osd::LocalPeeringEvent>( | |
227 | // TODO: abstract start_background_recovery | |
228 | static_cast<crimson::osd::PG*>(&pg), | |
229 | shard_services, | |
230 | pg.get_pg_whoami(), | |
231 | pg.get_pgid(), | |
232 | pg.get_osdmap_epoch(), | |
233 | pg.get_osdmap_epoch(), | |
234 | PeeringState::BackfillTooFull()); | |
235 | return seastar::now(); | |
236 | } | |
237 | return scan_for_backfill( | |
238 | std::move(m.begin), | |
239 | crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"), | |
240 | crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max") | |
20effc67 | 241 | ).then_interruptible([this, |
f67539c2 TL |
242 | query_epoch=m.query_epoch, |
243 | conn=m.get_connection()] (auto backfill_interval) { | |
20effc67 | 244 | auto reply = crimson::make_message<MOSDPGScan>( |
f67539c2 TL |
245 | MOSDPGScan::OP_SCAN_DIGEST, |
246 | pg.get_pg_whoami(), | |
247 | pg.get_osdmap_epoch(), | |
248 | query_epoch, | |
249 | spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard), | |
250 | backfill_interval.begin, | |
251 | backfill_interval.end); | |
252 | encode(backfill_interval.objects, reply->get_data()); | |
253 | return conn->send(std::move(reply)); | |
254 | }); | |
255 | } | |
256 | ||
20effc67 TL |
257 | RecoveryBackend::interruptible_future<> |
258 | RecoveryBackend::handle_scan_digest( | |
f67539c2 TL |
259 | MOSDPGScan& m) |
260 | { | |
261 | logger().debug("{}", __func__); | |
262 | // Check that from is in backfill_targets vector | |
263 | ceph_assert(pg.is_backfill_target(m.from)); | |
264 | ||
265 | BackfillInterval bi; | |
266 | bi.begin = m.begin; | |
267 | bi.end = m.end; | |
268 | { | |
269 | auto p = m.get_data().cbegin(); | |
270 | // take care to preserve ordering! | |
271 | bi.clear_objects(); | |
272 | ::decode_noclear(bi.objects, p); | |
273 | } | |
274 | shard_services.start_operation<crimson::osd::BackfillRecovery>( | |
275 | static_cast<crimson::osd::PG*>(&pg), | |
276 | shard_services, | |
277 | pg.get_osdmap_epoch(), | |
278 | crimson::osd::BackfillState::ReplicaScanned{ m.from, std::move(bi) }); | |
279 | return seastar::now(); | |
280 | } | |
281 | ||
20effc67 TL |
282 | RecoveryBackend::interruptible_future<> |
283 | RecoveryBackend::handle_scan( | |
f67539c2 TL |
284 | MOSDPGScan& m) |
285 | { | |
286 | logger().debug("{}", __func__); | |
20effc67 TL |
287 | if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { |
288 | logger().debug("{}: discarding {}", __func__, m); | |
289 | return seastar::now(); | |
290 | } | |
f67539c2 TL |
291 | switch (m.op) { |
292 | case MOSDPGScan::OP_SCAN_GET_DIGEST: | |
293 | return handle_scan_get_digest(m); | |
294 | case MOSDPGScan::OP_SCAN_DIGEST: | |
295 | return handle_scan_digest(m); | |
296 | default: | |
297 | // FIXME: move to errorator | |
298 | ceph_assert("unknown op type for pg scan"); | |
299 | return seastar::now(); | |
300 | } | |
301 | } | |
302 | ||
20effc67 TL |
303 | RecoveryBackend::interruptible_future<> |
304 | RecoveryBackend::handle_recovery_op( | |
f67539c2 TL |
305 | Ref<MOSDFastDispatchOp> m) |
306 | { | |
307 | switch (m->get_header().type) { | |
308 | case MSG_OSD_PG_BACKFILL: | |
309 | return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m)); | |
310 | case MSG_OSD_PG_BACKFILL_REMOVE: | |
311 | return handle_backfill_remove(*boost::static_pointer_cast<MOSDPGBackfillRemove>(m)); | |
312 | case MSG_OSD_PG_SCAN: | |
313 | return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m)); | |
314 | default: | |
315 | return seastar::make_exception_future<>( | |
316 | std::invalid_argument(fmt::format("invalid request type: {}", | |
317 | m->get_header().type))); | |
318 | } | |
319 | } |