]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/recovery_backend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / recovery_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <fmt/format.h>
5
6 #include "crimson/common/exception.h"
7 #include "crimson/osd/recovery_backend.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/pg_backend.h"
10
11 #include "messages/MOSDFastDispatchOp.h"
12 #include "osd/osd_types.h"
13
14 namespace {
15 seastar::logger& logger() {
16 return crimson::get_logger(ceph_subsys_osd);
17 }
18 }
19
20 hobject_t RecoveryBackend::get_temp_recovery_object(
21 const hobject_t& target,
22 eversion_t version) const
23 {
24 hobject_t hoid =
25 target.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
26 pg.get_info().pgid,
27 version,
28 pg.get_info().history.same_interval_since,
29 target.snap));
30 logger().debug("{} {}", __func__, hoid);
31 return hoid;
32 }
33
34 void RecoveryBackend::clean_up(ceph::os::Transaction& t,
35 std::string_view why)
36 {
37 for (auto& soid : temp_contents) {
38 t.remove(pg.get_collection_ref()->get_cid(),
39 ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
40 }
41 temp_contents.clear();
42
43 for (auto& [soid, recovery_waiter] : recovering) {
44 if ((recovery_waiter.pi && recovery_waiter.pi->is_complete())
45 || (!recovery_waiter.pi
46 && recovery_waiter.obc && recovery_waiter.obc->obs.exists)) {
47 recovery_waiter.obc->interrupt(
48 ::crimson::common::actingset_changed(
49 pg.is_primary()));
50 recovery_waiter.interrupt(why);
51 }
52 }
53 recovering.clear();
54 }
55
56 void RecoveryBackend::WaitForObjectRecovery::stop() {
57 readable.set_exception(
58 crimson::common::system_shutdown_exception());
59 recovered.set_exception(
60 crimson::common::system_shutdown_exception());
61 pulled.set_exception(
62 crimson::common::system_shutdown_exception());
63 for (auto& [pg_shard, pr] : pushes) {
64 pr.set_exception(
65 crimson::common::system_shutdown_exception());
66 }
67 }
68
69 void RecoveryBackend::handle_backfill_finish(
70 MOSDPGBackfill& m)
71 {
72 logger().debug("{}", __func__);
73 ceph_assert(!pg.is_primary());
74 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 1);
75 auto reply = crimson::make_message<MOSDPGBackfill>(
76 MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
77 pg.get_osdmap_epoch(),
78 m.query_epoch,
79 spg_t(pg.get_pgid().pgid, pg.get_primary().shard));
80 reply->set_priority(pg.get_recovery_op_priority());
81 std::ignore = m.get_connection()->send(std::move(reply));
82 shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
83 static_cast<crimson::osd::PG*>(&pg),
84 shard_services,
85 pg.get_pg_whoami(),
86 pg.get_pgid(),
87 pg.get_osdmap_epoch(),
88 pg.get_osdmap_epoch(),
89 RecoveryDone{});
90 }
91
92 RecoveryBackend::interruptible_future<>
93 RecoveryBackend::handle_backfill_progress(
94 MOSDPGBackfill& m)
95 {
96 logger().debug("{}", __func__);
97 ceph_assert(!pg.is_primary());
98 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 2);
99
100 ObjectStore::Transaction t;
101 pg.get_peering_state().update_backfill_progress(
102 m.last_backfill,
103 m.stats,
104 m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
105 t);
106 logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction...");
107 return shard_services.get_store().do_transaction(
108 pg.get_collection_ref(), std::move(t)).or_terminate();
109 }
110
111 RecoveryBackend::interruptible_future<>
112 RecoveryBackend::handle_backfill_finish_ack(
113 MOSDPGBackfill& m)
114 {
115 logger().debug("{}", __func__);
116 ceph_assert(pg.is_primary());
117 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 3);
118 // TODO:
119 // finish_recovery_op(hobject_t::get_max());
120 return seastar::now();
121 }
122
123 RecoveryBackend::interruptible_future<>
124 RecoveryBackend::handle_backfill(
125 MOSDPGBackfill& m)
126 {
127 logger().debug("{}", __func__);
128 if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
129 logger().debug("{}: discarding {}", __func__, m);
130 return seastar::now();
131 }
132 switch (m.op) {
133 case MOSDPGBackfill::OP_BACKFILL_FINISH:
134 handle_backfill_finish(m);
135 [[fallthrough]];
136 case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
137 return handle_backfill_progress(m);
138 case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
139 return handle_backfill_finish_ack(m);
140 default:
141 ceph_assert("unknown op type for pg backfill");
142 return seastar::now();
143 }
144 }
145
146 RecoveryBackend::interruptible_future<>
147 RecoveryBackend::handle_backfill_remove(
148 MOSDPGBackfillRemove& m)
149 {
150 logger().debug("{} m.ls={}", __func__, m.ls);
151 assert(m.get_type() == MSG_OSD_PG_BACKFILL_REMOVE);
152 if (pg.can_discard_replica_op(m)) {
153 logger().debug("{}: discarding {}", __func__, m);
154 return seastar::now();
155 }
156 ObjectStore::Transaction t;
157 for ([[maybe_unused]] const auto& [soid, ver] : m.ls) {
158 // TODO: the reserved space management. PG::try_reserve_recovery_space().
159 t.remove(pg.get_collection_ref()->get_cid(),
160 ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
161 }
162 logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction...");
163 return shard_services.get_store().do_transaction(
164 pg.get_collection_ref(), std::move(t)).or_terminate();
165 }
166
167 RecoveryBackend::interruptible_future<BackfillInterval>
168 RecoveryBackend::scan_for_backfill(
169 const hobject_t& start,
170 [[maybe_unused]] const std::int64_t min,
171 const std::int64_t max)
172 {
173 logger().debug("{} starting from {}", __func__, start);
174 auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>();
175 return backend->list_objects(start, max).then_interruptible(
176 [this, start, version_map] (auto&& ret) {
177 auto&& [objects, next] = std::move(ret);
178 return interruptor::parallel_for_each(std::move(objects),
179 [this, version_map] (const hobject_t& object)
180 -> interruptible_future<> {
181 crimson::osd::ObjectContextRef obc;
182 if (pg.is_primary()) {
183 obc = shard_services.obc_registry.maybe_get_cached_obc(object);
184 }
185 if (obc) {
186 if (obc->obs.exists) {
187 logger().debug("scan_for_backfill found (primary): {} {}",
188 object, obc->obs.oi.version);
189 version_map->emplace(object, obc->obs.oi.version);
190 } else {
191 // if the object does not exist here, it must have been removed
192 // between the collection_list_partial and here. This can happen
193 // for the first item in the range, which is usually last_backfill.
194 }
195 return seastar::now();
196 } else {
197 return backend->load_metadata(object).safe_then_interruptible(
198 [version_map, object] (auto md) {
199 if (md->os.exists) {
200 logger().debug("scan_for_backfill found: {} {}",
201 object, md->os.oi.version);
202 version_map->emplace(object, md->os.oi.version);
203 }
204 return seastar::now();
205 }, PGBackend::load_metadata_ertr::assert_all{});
206 }
207 }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] {
208 BackfillInterval bi;
209 bi.begin = std::move(start);
210 bi.end = std::move(next);
211 bi.version = pg.get_info().last_update;
212 bi.objects = std::move(*version_map);
213 logger().debug("{} BackfillInterval filled, leaving",
214 "scan_for_backfill");
215 return seastar::make_ready_future<BackfillInterval>(std::move(bi));
216 });
217 });
218 }
219
220 RecoveryBackend::interruptible_future<>
221 RecoveryBackend::handle_scan_get_digest(
222 MOSDPGScan& m)
223 {
224 logger().debug("{}", __func__);
225 if (false /* FIXME: check for backfill too full */) {
226 std::ignore = shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
227 // TODO: abstract start_background_recovery
228 static_cast<crimson::osd::PG*>(&pg),
229 shard_services,
230 pg.get_pg_whoami(),
231 pg.get_pgid(),
232 pg.get_osdmap_epoch(),
233 pg.get_osdmap_epoch(),
234 PeeringState::BackfillTooFull());
235 return seastar::now();
236 }
237 return scan_for_backfill(
238 std::move(m.begin),
239 crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
240 crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
241 ).then_interruptible([this,
242 query_epoch=m.query_epoch,
243 conn=m.get_connection()] (auto backfill_interval) {
244 auto reply = crimson::make_message<MOSDPGScan>(
245 MOSDPGScan::OP_SCAN_DIGEST,
246 pg.get_pg_whoami(),
247 pg.get_osdmap_epoch(),
248 query_epoch,
249 spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
250 backfill_interval.begin,
251 backfill_interval.end);
252 encode(backfill_interval.objects, reply->get_data());
253 return conn->send(std::move(reply));
254 });
255 }
256
257 RecoveryBackend::interruptible_future<>
258 RecoveryBackend::handle_scan_digest(
259 MOSDPGScan& m)
260 {
261 logger().debug("{}", __func__);
262 // Check that from is in backfill_targets vector
263 ceph_assert(pg.is_backfill_target(m.from));
264
265 BackfillInterval bi;
266 bi.begin = m.begin;
267 bi.end = m.end;
268 {
269 auto p = m.get_data().cbegin();
270 // take care to preserve ordering!
271 bi.clear_objects();
272 ::decode_noclear(bi.objects, p);
273 }
274 shard_services.start_operation<crimson::osd::BackfillRecovery>(
275 static_cast<crimson::osd::PG*>(&pg),
276 shard_services,
277 pg.get_osdmap_epoch(),
278 crimson::osd::BackfillState::ReplicaScanned{ m.from, std::move(bi) });
279 return seastar::now();
280 }
281
282 RecoveryBackend::interruptible_future<>
283 RecoveryBackend::handle_scan(
284 MOSDPGScan& m)
285 {
286 logger().debug("{}", __func__);
287 if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
288 logger().debug("{}: discarding {}", __func__, m);
289 return seastar::now();
290 }
291 switch (m.op) {
292 case MOSDPGScan::OP_SCAN_GET_DIGEST:
293 return handle_scan_get_digest(m);
294 case MOSDPGScan::OP_SCAN_DIGEST:
295 return handle_scan_digest(m);
296 default:
297 // FIXME: move to errorator
298 ceph_assert("unknown op type for pg scan");
299 return seastar::now();
300 }
301 }
302
303 RecoveryBackend::interruptible_future<>
304 RecoveryBackend::handle_recovery_op(
305 Ref<MOSDFastDispatchOp> m)
306 {
307 switch (m->get_header().type) {
308 case MSG_OSD_PG_BACKFILL:
309 return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m));
310 case MSG_OSD_PG_BACKFILL_REMOVE:
311 return handle_backfill_remove(*boost::static_pointer_cast<MOSDPGBackfillRemove>(m));
312 case MSG_OSD_PG_SCAN:
313 return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m));
314 default:
315 return seastar::make_exception_future<>(
316 std::invalid_argument(fmt::format("invalid request type: {}",
317 m->get_header().type)));
318 }
319 }