1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <fmt/format.h>
6 #include "crimson/common/exception.h"
7 #include "crimson/osd/recovery_backend.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/pg_backend.h"
10 #include "crimson/osd/osd_operations/background_recovery.h"
12 #include "messages/MOSDFastDispatchOp.h"
13 #include "osd/osd_types.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_osd
);
21 hobject_t
RecoveryBackend::get_temp_recovery_object(
22 const hobject_t
& target
,
23 eversion_t version
) const
26 target
.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
29 pg
.get_info().history
.same_interval_since
,
31 logger().debug("{} {}", __func__
, hoid
);
35 void RecoveryBackend::add_temp_obj(const hobject_t
&oid
)
37 backend
->add_temp_obj(oid
);
40 void RecoveryBackend::clear_temp_obj(const hobject_t
&oid
)
42 backend
->clear_temp_obj(oid
);
45 void RecoveryBackend::clean_up(ceph::os::Transaction
& t
,
48 for_each_temp_obj([&](auto &soid
) {
49 t
.remove(pg
.get_collection_ref()->get_cid(),
50 ghobject_t(soid
, ghobject_t::NO_GEN
, pg
.get_pg_whoami().shard
));
54 for (auto& [soid
, recovery_waiter
] : recovering
) {
55 if ((recovery_waiter
->pull_info
56 && recovery_waiter
->pull_info
->is_complete())
57 || (!recovery_waiter
->pull_info
58 && recovery_waiter
->obc
&& recovery_waiter
->obc
->obs
.exists
)) {
59 recovery_waiter
->obc
->interrupt(
60 ::crimson::common::actingset_changed(
62 recovery_waiter
->interrupt(why
);
68 void RecoveryBackend::WaitForObjectRecovery::stop() {
70 readable
->set_exception(
71 crimson::common::system_shutdown_exception());
75 recovered
->set_exception(
76 crimson::common::system_shutdown_exception());
80 pulled
->set_exception(
81 crimson::common::system_shutdown_exception());
84 for (auto& [pg_shard
, pr
] : pushes
) {
86 crimson::common::system_shutdown_exception());
91 void RecoveryBackend::handle_backfill_finish(
93 crimson::net::ConnectionXcoreRef conn
)
95 logger().debug("{}", __func__
);
96 ceph_assert(!pg
.is_primary());
97 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 1);
98 auto reply
= crimson::make_message
<MOSDPGBackfill
>(
99 MOSDPGBackfill::OP_BACKFILL_FINISH_ACK
,
100 pg
.get_osdmap_epoch(),
102 spg_t(pg
.get_pgid().pgid
, pg
.get_primary().shard
));
103 reply
->set_priority(pg
.get_recovery_op_priority());
104 std::ignore
= conn
->send(std::move(reply
));
105 shard_services
.start_operation
<crimson::osd::LocalPeeringEvent
>(
106 static_cast<crimson::osd::PG
*>(&pg
),
109 pg
.get_osdmap_epoch(),
110 pg
.get_osdmap_epoch(),
114 RecoveryBackend::interruptible_future
<>
115 RecoveryBackend::handle_backfill_progress(
118 logger().debug("{}", __func__
);
119 ceph_assert(!pg
.is_primary());
120 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 2);
122 ObjectStore::Transaction t
;
123 pg
.get_peering_state().update_backfill_progress(
126 m
.op
== MOSDPGBackfill::OP_BACKFILL_PROGRESS
,
128 logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction...");
129 return shard_services
.get_store().do_transaction(
130 pg
.get_collection_ref(), std::move(t
)).or_terminate();
133 RecoveryBackend::interruptible_future
<>
134 RecoveryBackend::handle_backfill_finish_ack(
137 logger().debug("{}", __func__
);
138 ceph_assert(pg
.is_primary());
139 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 3);
141 // finish_recovery_op(hobject_t::get_max());
142 return seastar::now();
145 RecoveryBackend::interruptible_future
<>
146 RecoveryBackend::handle_backfill(
148 crimson::net::ConnectionXcoreRef conn
)
150 logger().debug("{}", __func__
);
151 if (pg
.old_peering_msg(m
.map_epoch
, m
.query_epoch
)) {
152 logger().debug("{}: discarding {}", __func__
, m
);
153 return seastar::now();
156 case MOSDPGBackfill::OP_BACKFILL_FINISH
:
157 handle_backfill_finish(m
, conn
);
159 case MOSDPGBackfill::OP_BACKFILL_PROGRESS
:
160 return handle_backfill_progress(m
);
161 case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK
:
162 return handle_backfill_finish_ack(m
);
164 ceph_assert("unknown op type for pg backfill");
165 return seastar::now();
169 RecoveryBackend::interruptible_future
<>
170 RecoveryBackend::handle_backfill_remove(
171 MOSDPGBackfillRemove
& m
)
173 logger().debug("{} m.ls={}", __func__
, m
.ls
);
174 assert(m
.get_type() == MSG_OSD_PG_BACKFILL_REMOVE
);
176 ObjectStore::Transaction t
;
177 for ([[maybe_unused
]] const auto& [soid
, ver
] : m
.ls
) {
178 // TODO: the reserved space management. PG::try_reserve_recovery_space().
179 t
.remove(pg
.get_collection_ref()->get_cid(),
180 ghobject_t(soid
, ghobject_t::NO_GEN
, pg
.get_pg_whoami().shard
));
182 logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction...");
183 return shard_services
.get_store().do_transaction(
184 pg
.get_collection_ref(), std::move(t
)).or_terminate();
187 RecoveryBackend::interruptible_future
<BackfillInterval
>
188 RecoveryBackend::scan_for_backfill(
189 const hobject_t
& start
,
190 [[maybe_unused
]] const std::int64_t min
,
191 const std::int64_t max
)
193 logger().debug("{} starting from {}", __func__
, start
);
194 auto version_map
= seastar::make_lw_shared
<std::map
<hobject_t
, eversion_t
>>();
195 return backend
->list_objects(start
, max
).then_interruptible(
196 [this, start
, version_map
] (auto&& ret
) {
197 auto&& [objects
, next
] = std::move(ret
);
198 return seastar::do_with(
200 [this, version_map
](auto &objects
) {
201 return interruptor::parallel_for_each(objects
,
202 [this, version_map
] (const hobject_t
& object
)
203 -> interruptible_future
<> {
204 crimson::osd::ObjectContextRef obc
;
205 if (pg
.is_primary()) {
206 obc
= pg
.obc_registry
.maybe_get_cached_obc(object
);
209 if (obc
->obs
.exists
) {
210 logger().debug("scan_for_backfill found (primary): {} {}",
211 object
, obc
->obs
.oi
.version
);
212 version_map
->emplace(object
, obc
->obs
.oi
.version
);
214 // if the object does not exist here, it must have been removed
215 // between the collection_list_partial and here. This can happen
216 // for the first item in the range, which is usually last_backfill.
218 return seastar::now();
220 return backend
->load_metadata(object
).safe_then_interruptible(
221 [version_map
, object
] (auto md
) {
223 logger().debug("scan_for_backfill found: {} {}",
224 object
, md
->os
.oi
.version
);
225 version_map
->emplace(object
, md
->os
.oi
.version
);
227 return seastar::now();
228 }, PGBackend::load_metadata_ertr::assert_all
{});
231 }).then_interruptible([version_map
, start
=std::move(start
), next
=std::move(next
), this] {
233 bi
.begin
= std::move(start
);
234 bi
.end
= std::move(next
);
235 bi
.version
= pg
.get_info().last_update
;
236 bi
.objects
= std::move(*version_map
);
237 logger().debug("{} BackfillInterval filled, leaving",
238 "scan_for_backfill");
239 return seastar::make_ready_future
<BackfillInterval
>(std::move(bi
));
244 RecoveryBackend::interruptible_future
<>
245 RecoveryBackend::handle_scan_get_digest(
247 crimson::net::ConnectionXcoreRef conn
)
249 logger().debug("{}", __func__
);
250 if (false /* FIXME: check for backfill too full */) {
251 std::ignore
= shard_services
.start_operation
<crimson::osd::LocalPeeringEvent
>(
252 // TODO: abstract start_background_recovery
253 static_cast<crimson::osd::PG
*>(&pg
),
256 pg
.get_osdmap_epoch(),
257 pg
.get_osdmap_epoch(),
258 PeeringState::BackfillTooFull());
259 return seastar::now();
261 return scan_for_backfill(
263 crimson::common::local_conf().get_val
<std::int64_t>("osd_backfill_scan_min"),
264 crimson::common::local_conf().get_val
<std::int64_t>("osd_backfill_scan_max")
265 ).then_interruptible(
266 [this, query_epoch
=m
.query_epoch
, conn
267 ](auto backfill_interval
) {
268 auto reply
= crimson::make_message
<MOSDPGScan
>(
269 MOSDPGScan::OP_SCAN_DIGEST
,
271 pg
.get_osdmap_epoch(),
273 spg_t(pg
.get_info().pgid
.pgid
, pg
.get_primary().shard
),
274 backfill_interval
.begin
,
275 backfill_interval
.end
);
276 encode(backfill_interval
.objects
, reply
->get_data());
277 return conn
->send(std::move(reply
));
281 RecoveryBackend::interruptible_future
<>
282 RecoveryBackend::handle_scan_digest(
285 logger().debug("{}", __func__
);
286 // Check that from is in backfill_targets vector
287 ceph_assert(pg
.is_backfill_target(m
.from
));
293 auto p
= m
.get_data().cbegin();
294 // take care to preserve ordering!
296 ::decode_noclear(bi
.objects
, p
);
298 shard_services
.start_operation
<crimson::osd::BackfillRecovery
>(
299 static_cast<crimson::osd::PG
*>(&pg
),
301 pg
.get_osdmap_epoch(),
302 crimson::osd::BackfillState::ReplicaScanned
{ m
.from
, std::move(bi
) });
303 return seastar::now();
306 RecoveryBackend::interruptible_future
<>
307 RecoveryBackend::handle_scan(
309 crimson::net::ConnectionXcoreRef conn
)
311 logger().debug("{}", __func__
);
312 if (pg
.old_peering_msg(m
.map_epoch
, m
.query_epoch
)) {
313 logger().debug("{}: discarding {}", __func__
, m
);
314 return seastar::now();
317 case MOSDPGScan::OP_SCAN_GET_DIGEST
:
318 return handle_scan_get_digest(m
, conn
);
319 case MOSDPGScan::OP_SCAN_DIGEST
:
320 return handle_scan_digest(m
);
322 // FIXME: move to errorator
323 ceph_assert("unknown op type for pg scan");
324 return seastar::now();
328 RecoveryBackend::interruptible_future
<>
329 RecoveryBackend::handle_recovery_op(
330 Ref
<MOSDFastDispatchOp
> m
,
331 crimson::net::ConnectionXcoreRef conn
)
333 switch (m
->get_header().type
) {
334 case MSG_OSD_PG_BACKFILL
:
335 return handle_backfill(*boost::static_pointer_cast
<MOSDPGBackfill
>(m
), conn
);
336 case MSG_OSD_PG_BACKFILL_REMOVE
:
337 return handle_backfill_remove(*boost::static_pointer_cast
<MOSDPGBackfillRemove
>(m
));
338 case MSG_OSD_PG_SCAN
:
339 return handle_scan(*boost::static_pointer_cast
<MOSDPGScan
>(m
), conn
);
341 return seastar::make_exception_future
<>(
342 std::invalid_argument(fmt::format("invalid request type: {}",
343 m
->get_header().type
)));