]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/recovery_backend.cc
c55d3150850330181908744dd7c04b74e4e5951c
[ceph.git] / ceph / src / crimson / osd / recovery_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <fmt/format.h>
5
6 #include "crimson/common/exception.h"
7 #include "crimson/osd/recovery_backend.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/pg_backend.h"
10 #include "crimson/osd/osd_operations/background_recovery.h"
11
12 #include "messages/MOSDFastDispatchOp.h"
13 #include "osd/osd_types.h"
14
15 namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_osd);
18 }
19 }
20
21 hobject_t RecoveryBackend::get_temp_recovery_object(
22 const hobject_t& target,
23 eversion_t version) const
24 {
25 hobject_t hoid =
26 target.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
27 pg.get_info().pgid,
28 version,
29 pg.get_info().history.same_interval_since,
30 target.snap));
31 logger().debug("{} {}", __func__, hoid);
32 return hoid;
33 }
34
35 void RecoveryBackend::add_temp_obj(const hobject_t &oid)
36 {
37 backend->add_temp_obj(oid);
38 }
39
40 void RecoveryBackend::clear_temp_obj(const hobject_t &oid)
41 {
42 backend->clear_temp_obj(oid);
43 }
44
45 void RecoveryBackend::clean_up(ceph::os::Transaction& t,
46 std::string_view why)
47 {
48 for_each_temp_obj([&](auto &soid) {
49 t.remove(pg.get_collection_ref()->get_cid(),
50 ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
51 });
52 clear_temp_objs();
53
54 for (auto& [soid, recovery_waiter] : recovering) {
55 if ((recovery_waiter->pull_info
56 && recovery_waiter->pull_info->is_complete())
57 || (!recovery_waiter->pull_info
58 && recovery_waiter->obc && recovery_waiter->obc->obs.exists)) {
59 recovery_waiter->obc->interrupt(
60 ::crimson::common::actingset_changed(
61 pg.is_primary()));
62 recovery_waiter->interrupt(why);
63 }
64 }
65 recovering.clear();
66 }
67
68 void RecoveryBackend::WaitForObjectRecovery::stop() {
69 if (readable) {
70 readable->set_exception(
71 crimson::common::system_shutdown_exception());
72 readable.reset();
73 }
74 if (recovered) {
75 recovered->set_exception(
76 crimson::common::system_shutdown_exception());
77 recovered.reset();
78 }
79 if (pulled) {
80 pulled->set_exception(
81 crimson::common::system_shutdown_exception());
82 pulled.reset();
83 }
84 for (auto& [pg_shard, pr] : pushes) {
85 pr.set_exception(
86 crimson::common::system_shutdown_exception());
87 }
88 pushes.clear();
89 }
90
91 void RecoveryBackend::handle_backfill_finish(
92 MOSDPGBackfill& m,
93 crimson::net::ConnectionXcoreRef conn)
94 {
95 logger().debug("{}", __func__);
96 ceph_assert(!pg.is_primary());
97 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 1);
98 auto reply = crimson::make_message<MOSDPGBackfill>(
99 MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
100 pg.get_osdmap_epoch(),
101 m.query_epoch,
102 spg_t(pg.get_pgid().pgid, pg.get_primary().shard));
103 reply->set_priority(pg.get_recovery_op_priority());
104 std::ignore = conn->send(std::move(reply));
105 shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
106 static_cast<crimson::osd::PG*>(&pg),
107 pg.get_pg_whoami(),
108 pg.get_pgid(),
109 pg.get_osdmap_epoch(),
110 pg.get_osdmap_epoch(),
111 RecoveryDone{});
112 }
113
114 RecoveryBackend::interruptible_future<>
115 RecoveryBackend::handle_backfill_progress(
116 MOSDPGBackfill& m)
117 {
118 logger().debug("{}", __func__);
119 ceph_assert(!pg.is_primary());
120 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 2);
121
122 ObjectStore::Transaction t;
123 pg.get_peering_state().update_backfill_progress(
124 m.last_backfill,
125 m.stats,
126 m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
127 t);
128 logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction...");
129 return shard_services.get_store().do_transaction(
130 pg.get_collection_ref(), std::move(t)).or_terminate();
131 }
132
133 RecoveryBackend::interruptible_future<>
134 RecoveryBackend::handle_backfill_finish_ack(
135 MOSDPGBackfill& m)
136 {
137 logger().debug("{}", __func__);
138 ceph_assert(pg.is_primary());
139 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 3);
140 // TODO:
141 // finish_recovery_op(hobject_t::get_max());
142 return seastar::now();
143 }
144
145 RecoveryBackend::interruptible_future<>
146 RecoveryBackend::handle_backfill(
147 MOSDPGBackfill& m,
148 crimson::net::ConnectionXcoreRef conn)
149 {
150 logger().debug("{}", __func__);
151 if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
152 logger().debug("{}: discarding {}", __func__, m);
153 return seastar::now();
154 }
155 switch (m.op) {
156 case MOSDPGBackfill::OP_BACKFILL_FINISH:
157 handle_backfill_finish(m, conn);
158 [[fallthrough]];
159 case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
160 return handle_backfill_progress(m);
161 case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
162 return handle_backfill_finish_ack(m);
163 default:
164 ceph_assert("unknown op type for pg backfill");
165 return seastar::now();
166 }
167 }
168
169 RecoveryBackend::interruptible_future<>
170 RecoveryBackend::handle_backfill_remove(
171 MOSDPGBackfillRemove& m)
172 {
173 logger().debug("{} m.ls={}", __func__, m.ls);
174 assert(m.get_type() == MSG_OSD_PG_BACKFILL_REMOVE);
175
176 ObjectStore::Transaction t;
177 for ([[maybe_unused]] const auto& [soid, ver] : m.ls) {
178 // TODO: the reserved space management. PG::try_reserve_recovery_space().
179 t.remove(pg.get_collection_ref()->get_cid(),
180 ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
181 }
182 logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction...");
183 return shard_services.get_store().do_transaction(
184 pg.get_collection_ref(), std::move(t)).or_terminate();
185 }
186
187 RecoveryBackend::interruptible_future<BackfillInterval>
188 RecoveryBackend::scan_for_backfill(
189 const hobject_t& start,
190 [[maybe_unused]] const std::int64_t min,
191 const std::int64_t max)
192 {
193 logger().debug("{} starting from {}", __func__, start);
194 auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>();
195 return backend->list_objects(start, max).then_interruptible(
196 [this, start, version_map] (auto&& ret) {
197 auto&& [objects, next] = std::move(ret);
198 return seastar::do_with(
199 std::move(objects),
200 [this, version_map](auto &objects) {
201 return interruptor::parallel_for_each(objects,
202 [this, version_map] (const hobject_t& object)
203 -> interruptible_future<> {
204 crimson::osd::ObjectContextRef obc;
205 if (pg.is_primary()) {
206 obc = pg.obc_registry.maybe_get_cached_obc(object);
207 }
208 if (obc) {
209 if (obc->obs.exists) {
210 logger().debug("scan_for_backfill found (primary): {} {}",
211 object, obc->obs.oi.version);
212 version_map->emplace(object, obc->obs.oi.version);
213 } else {
214 // if the object does not exist here, it must have been removed
215 // between the collection_list_partial and here. This can happen
216 // for the first item in the range, which is usually last_backfill.
217 }
218 return seastar::now();
219 } else {
220 return backend->load_metadata(object).safe_then_interruptible(
221 [version_map, object] (auto md) {
222 if (md->os.exists) {
223 logger().debug("scan_for_backfill found: {} {}",
224 object, md->os.oi.version);
225 version_map->emplace(object, md->os.oi.version);
226 }
227 return seastar::now();
228 }, PGBackend::load_metadata_ertr::assert_all{});
229 }
230 });
231 }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] {
232 BackfillInterval bi;
233 bi.begin = std::move(start);
234 bi.end = std::move(next);
235 bi.version = pg.get_info().last_update;
236 bi.objects = std::move(*version_map);
237 logger().debug("{} BackfillInterval filled, leaving",
238 "scan_for_backfill");
239 return seastar::make_ready_future<BackfillInterval>(std::move(bi));
240 });
241 });
242 }
243
244 RecoveryBackend::interruptible_future<>
245 RecoveryBackend::handle_scan_get_digest(
246 MOSDPGScan& m,
247 crimson::net::ConnectionXcoreRef conn)
248 {
249 logger().debug("{}", __func__);
250 if (false /* FIXME: check for backfill too full */) {
251 std::ignore = shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
252 // TODO: abstract start_background_recovery
253 static_cast<crimson::osd::PG*>(&pg),
254 pg.get_pg_whoami(),
255 pg.get_pgid(),
256 pg.get_osdmap_epoch(),
257 pg.get_osdmap_epoch(),
258 PeeringState::BackfillTooFull());
259 return seastar::now();
260 }
261 return scan_for_backfill(
262 std::move(m.begin),
263 crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
264 crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
265 ).then_interruptible(
266 [this, query_epoch=m.query_epoch, conn
267 ](auto backfill_interval) {
268 auto reply = crimson::make_message<MOSDPGScan>(
269 MOSDPGScan::OP_SCAN_DIGEST,
270 pg.get_pg_whoami(),
271 pg.get_osdmap_epoch(),
272 query_epoch,
273 spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
274 backfill_interval.begin,
275 backfill_interval.end);
276 encode(backfill_interval.objects, reply->get_data());
277 return conn->send(std::move(reply));
278 });
279 }
280
281 RecoveryBackend::interruptible_future<>
282 RecoveryBackend::handle_scan_digest(
283 MOSDPGScan& m)
284 {
285 logger().debug("{}", __func__);
286 // Check that from is in backfill_targets vector
287 ceph_assert(pg.is_backfill_target(m.from));
288
289 BackfillInterval bi;
290 bi.begin = m.begin;
291 bi.end = m.end;
292 {
293 auto p = m.get_data().cbegin();
294 // take care to preserve ordering!
295 bi.clear_objects();
296 ::decode_noclear(bi.objects, p);
297 }
298 shard_services.start_operation<crimson::osd::BackfillRecovery>(
299 static_cast<crimson::osd::PG*>(&pg),
300 shard_services,
301 pg.get_osdmap_epoch(),
302 crimson::osd::BackfillState::ReplicaScanned{ m.from, std::move(bi) });
303 return seastar::now();
304 }
305
306 RecoveryBackend::interruptible_future<>
307 RecoveryBackend::handle_scan(
308 MOSDPGScan& m,
309 crimson::net::ConnectionXcoreRef conn)
310 {
311 logger().debug("{}", __func__);
312 if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
313 logger().debug("{}: discarding {}", __func__, m);
314 return seastar::now();
315 }
316 switch (m.op) {
317 case MOSDPGScan::OP_SCAN_GET_DIGEST:
318 return handle_scan_get_digest(m, conn);
319 case MOSDPGScan::OP_SCAN_DIGEST:
320 return handle_scan_digest(m);
321 default:
322 // FIXME: move to errorator
323 ceph_assert("unknown op type for pg scan");
324 return seastar::now();
325 }
326 }
327
328 RecoveryBackend::interruptible_future<>
329 RecoveryBackend::handle_recovery_op(
330 Ref<MOSDFastDispatchOp> m,
331 crimson::net::ConnectionXcoreRef conn)
332 {
333 switch (m->get_header().type) {
334 case MSG_OSD_PG_BACKFILL:
335 return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m), conn);
336 case MSG_OSD_PG_BACKFILL_REMOVE:
337 return handle_backfill_remove(*boost::static_pointer_cast<MOSDPGBackfillRemove>(m));
338 case MSG_OSD_PG_SCAN:
339 return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m), conn);
340 default:
341 return seastar::make_exception_future<>(
342 std::invalid_argument(fmt::format("invalid request type: {}",
343 m->get_header().type)));
344 }
345 }