]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/recovery_backend.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / recovery_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <fmt/format.h>
5
6 #include "crimson/common/exception.h"
7 #include "crimson/osd/recovery_backend.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/pg_backend.h"
10 #include "crimson/osd/osd_operations/background_recovery.h"
11
12 #include "messages/MOSDFastDispatchOp.h"
13 #include "osd/osd_types.h"
14
15 namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_osd);
18 }
19 }
20
21 hobject_t RecoveryBackend::get_temp_recovery_object(
22 const hobject_t& target,
23 eversion_t version) const
24 {
25 hobject_t hoid =
26 target.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
27 pg.get_info().pgid,
28 version,
29 pg.get_info().history.same_interval_since,
30 target.snap));
31 logger().debug("{} {}", __func__, hoid);
32 return hoid;
33 }
34
35 void RecoveryBackend::clean_up(ceph::os::Transaction& t,
36 std::string_view why)
37 {
38 for (auto& soid : temp_contents) {
39 t.remove(pg.get_collection_ref()->get_cid(),
40 ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
41 }
42 temp_contents.clear();
43
44 for (auto& [soid, recovery_waiter] : recovering) {
45 if ((recovery_waiter->pull_info
46 && recovery_waiter->pull_info->is_complete())
47 || (!recovery_waiter->pull_info
48 && recovery_waiter->obc && recovery_waiter->obc->obs.exists)) {
49 recovery_waiter->obc->interrupt(
50 ::crimson::common::actingset_changed(
51 pg.is_primary()));
52 recovery_waiter->interrupt(why);
53 }
54 }
55 recovering.clear();
56 }
57
58 void RecoveryBackend::WaitForObjectRecovery::stop() {
59 readable.set_exception(
60 crimson::common::system_shutdown_exception());
61 recovered.set_exception(
62 crimson::common::system_shutdown_exception());
63 pulled.set_exception(
64 crimson::common::system_shutdown_exception());
65 for (auto& [pg_shard, pr] : pushes) {
66 pr.set_exception(
67 crimson::common::system_shutdown_exception());
68 }
69 }
70
71 void RecoveryBackend::handle_backfill_finish(
72 MOSDPGBackfill& m,
73 crimson::net::ConnectionRef conn)
74 {
75 logger().debug("{}", __func__);
76 ceph_assert(!pg.is_primary());
77 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 1);
78 auto reply = crimson::make_message<MOSDPGBackfill>(
79 MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
80 pg.get_osdmap_epoch(),
81 m.query_epoch,
82 spg_t(pg.get_pgid().pgid, pg.get_primary().shard));
83 reply->set_priority(pg.get_recovery_op_priority());
84 std::ignore = conn->send(std::move(reply));
85 shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
86 static_cast<crimson::osd::PG*>(&pg),
87 pg.get_pg_whoami(),
88 pg.get_pgid(),
89 pg.get_osdmap_epoch(),
90 pg.get_osdmap_epoch(),
91 RecoveryDone{});
92 }
93
94 RecoveryBackend::interruptible_future<>
95 RecoveryBackend::handle_backfill_progress(
96 MOSDPGBackfill& m)
97 {
98 logger().debug("{}", __func__);
99 ceph_assert(!pg.is_primary());
100 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 2);
101
102 ObjectStore::Transaction t;
103 pg.get_peering_state().update_backfill_progress(
104 m.last_backfill,
105 m.stats,
106 m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
107 t);
108 logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction...");
109 return shard_services.get_store().do_transaction(
110 pg.get_collection_ref(), std::move(t)).or_terminate();
111 }
112
113 RecoveryBackend::interruptible_future<>
114 RecoveryBackend::handle_backfill_finish_ack(
115 MOSDPGBackfill& m)
116 {
117 logger().debug("{}", __func__);
118 ceph_assert(pg.is_primary());
119 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 3);
120 // TODO:
121 // finish_recovery_op(hobject_t::get_max());
122 return seastar::now();
123 }
124
125 RecoveryBackend::interruptible_future<>
126 RecoveryBackend::handle_backfill(
127 MOSDPGBackfill& m,
128 crimson::net::ConnectionRef conn)
129 {
130 logger().debug("{}", __func__);
131 if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
132 logger().debug("{}: discarding {}", __func__, m);
133 return seastar::now();
134 }
135 switch (m.op) {
136 case MOSDPGBackfill::OP_BACKFILL_FINISH:
137 handle_backfill_finish(m, conn);
138 [[fallthrough]];
139 case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
140 return handle_backfill_progress(m);
141 case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
142 return handle_backfill_finish_ack(m);
143 default:
144 ceph_assert("unknown op type for pg backfill");
145 return seastar::now();
146 }
147 }
148
149 RecoveryBackend::interruptible_future<>
150 RecoveryBackend::handle_backfill_remove(
151 MOSDPGBackfillRemove& m)
152 {
153 logger().debug("{} m.ls={}", __func__, m.ls);
154 assert(m.get_type() == MSG_OSD_PG_BACKFILL_REMOVE);
155 if (pg.can_discard_replica_op(m)) {
156 logger().debug("{}: discarding {}", __func__, m);
157 return seastar::now();
158 }
159 ObjectStore::Transaction t;
160 for ([[maybe_unused]] const auto& [soid, ver] : m.ls) {
161 // TODO: the reserved space management. PG::try_reserve_recovery_space().
162 t.remove(pg.get_collection_ref()->get_cid(),
163 ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
164 }
165 logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction...");
166 return shard_services.get_store().do_transaction(
167 pg.get_collection_ref(), std::move(t)).or_terminate();
168 }
169
170 RecoveryBackend::interruptible_future<BackfillInterval>
171 RecoveryBackend::scan_for_backfill(
172 const hobject_t& start,
173 [[maybe_unused]] const std::int64_t min,
174 const std::int64_t max)
175 {
176 logger().debug("{} starting from {}", __func__, start);
177 auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>();
178 return backend->list_objects(start, max).then_interruptible(
179 [this, start, version_map] (auto&& ret) {
180 auto&& [objects, next] = std::move(ret);
181 return seastar::do_with(
182 std::move(objects),
183 [this, version_map](auto &objects) {
184 return interruptor::parallel_for_each(objects,
185 [this, version_map] (const hobject_t& object)
186 -> interruptible_future<> {
187 crimson::osd::ObjectContextRef obc;
188 if (pg.is_primary()) {
189 obc = pg.obc_registry.maybe_get_cached_obc(object);
190 }
191 if (obc) {
192 if (obc->obs.exists) {
193 logger().debug("scan_for_backfill found (primary): {} {}",
194 object, obc->obs.oi.version);
195 version_map->emplace(object, obc->obs.oi.version);
196 } else {
197 // if the object does not exist here, it must have been removed
198 // between the collection_list_partial and here. This can happen
199 // for the first item in the range, which is usually last_backfill.
200 }
201 return seastar::now();
202 } else {
203 return backend->load_metadata(object).safe_then_interruptible(
204 [version_map, object] (auto md) {
205 if (md->os.exists) {
206 logger().debug("scan_for_backfill found: {} {}",
207 object, md->os.oi.version);
208 version_map->emplace(object, md->os.oi.version);
209 }
210 return seastar::now();
211 }, PGBackend::load_metadata_ertr::assert_all{});
212 }
213 });
214 }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] {
215 BackfillInterval bi;
216 bi.begin = std::move(start);
217 bi.end = std::move(next);
218 bi.version = pg.get_info().last_update;
219 bi.objects = std::move(*version_map);
220 logger().debug("{} BackfillInterval filled, leaving",
221 "scan_for_backfill");
222 return seastar::make_ready_future<BackfillInterval>(std::move(bi));
223 });
224 });
225 }
226
227 RecoveryBackend::interruptible_future<>
228 RecoveryBackend::handle_scan_get_digest(
229 MOSDPGScan& m,
230 crimson::net::ConnectionRef conn)
231 {
232 logger().debug("{}", __func__);
233 if (false /* FIXME: check for backfill too full */) {
234 std::ignore = shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
235 // TODO: abstract start_background_recovery
236 static_cast<crimson::osd::PG*>(&pg),
237 pg.get_pg_whoami(),
238 pg.get_pgid(),
239 pg.get_osdmap_epoch(),
240 pg.get_osdmap_epoch(),
241 PeeringState::BackfillTooFull());
242 return seastar::now();
243 }
244 return scan_for_backfill(
245 std::move(m.begin),
246 crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
247 crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
248 ).then_interruptible(
249 [this, query_epoch=m.query_epoch, conn
250 ](auto backfill_interval) {
251 auto reply = crimson::make_message<MOSDPGScan>(
252 MOSDPGScan::OP_SCAN_DIGEST,
253 pg.get_pg_whoami(),
254 pg.get_osdmap_epoch(),
255 query_epoch,
256 spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
257 backfill_interval.begin,
258 backfill_interval.end);
259 encode(backfill_interval.objects, reply->get_data());
260 return conn->send(std::move(reply));
261 });
262 }
263
264 RecoveryBackend::interruptible_future<>
265 RecoveryBackend::handle_scan_digest(
266 MOSDPGScan& m)
267 {
268 logger().debug("{}", __func__);
269 // Check that from is in backfill_targets vector
270 ceph_assert(pg.is_backfill_target(m.from));
271
272 BackfillInterval bi;
273 bi.begin = m.begin;
274 bi.end = m.end;
275 {
276 auto p = m.get_data().cbegin();
277 // take care to preserve ordering!
278 bi.clear_objects();
279 ::decode_noclear(bi.objects, p);
280 }
281 shard_services.start_operation<crimson::osd::BackfillRecovery>(
282 static_cast<crimson::osd::PG*>(&pg),
283 shard_services,
284 pg.get_osdmap_epoch(),
285 crimson::osd::BackfillState::ReplicaScanned{ m.from, std::move(bi) });
286 return seastar::now();
287 }
288
289 RecoveryBackend::interruptible_future<>
290 RecoveryBackend::handle_scan(
291 MOSDPGScan& m,
292 crimson::net::ConnectionRef conn)
293 {
294 logger().debug("{}", __func__);
295 if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
296 logger().debug("{}: discarding {}", __func__, m);
297 return seastar::now();
298 }
299 switch (m.op) {
300 case MOSDPGScan::OP_SCAN_GET_DIGEST:
301 return handle_scan_get_digest(m, conn);
302 case MOSDPGScan::OP_SCAN_DIGEST:
303 return handle_scan_digest(m);
304 default:
305 // FIXME: move to errorator
306 ceph_assert("unknown op type for pg scan");
307 return seastar::now();
308 }
309 }
310
311 RecoveryBackend::interruptible_future<>
312 RecoveryBackend::handle_recovery_op(
313 Ref<MOSDFastDispatchOp> m,
314 crimson::net::ConnectionRef conn)
315 {
316 switch (m->get_header().type) {
317 case MSG_OSD_PG_BACKFILL:
318 return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m), conn);
319 case MSG_OSD_PG_BACKFILL_REMOVE:
320 return handle_backfill_remove(*boost::static_pointer_cast<MOSDPGBackfillRemove>(m));
321 case MSG_OSD_PG_SCAN:
322 return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m), conn);
323 default:
324 return seastar::make_exception_future<>(
325 std::invalid_argument(fmt::format("invalid request type: {}",
326 m->get_header().type)));
327 }
328 }