1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <fmt/format.h>
6 #include "crimson/common/exception.h"
7 #include "crimson/osd/recovery_backend.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/pg_backend.h"
10 #include "crimson/osd/osd_operations/background_recovery.h"
12 #include "messages/MOSDFastDispatchOp.h"
13 #include "osd/osd_types.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_osd
);
21 hobject_t
RecoveryBackend::get_temp_recovery_object(
22 const hobject_t
& target
,
23 eversion_t version
) const
26 target
.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
29 pg
.get_info().history
.same_interval_since
,
31 logger().debug("{} {}", __func__
, hoid
);
35 void RecoveryBackend::clean_up(ceph::os::Transaction
& t
,
38 for (auto& soid
: temp_contents
) {
39 t
.remove(pg
.get_collection_ref()->get_cid(),
40 ghobject_t(soid
, ghobject_t::NO_GEN
, pg
.get_pg_whoami().shard
));
42 temp_contents
.clear();
44 for (auto& [soid
, recovery_waiter
] : recovering
) {
45 if ((recovery_waiter
->pull_info
46 && recovery_waiter
->pull_info
->is_complete())
47 || (!recovery_waiter
->pull_info
48 && recovery_waiter
->obc
&& recovery_waiter
->obc
->obs
.exists
)) {
49 recovery_waiter
->obc
->interrupt(
50 ::crimson::common::actingset_changed(
52 recovery_waiter
->interrupt(why
);
58 void RecoveryBackend::WaitForObjectRecovery::stop() {
59 readable
.set_exception(
60 crimson::common::system_shutdown_exception());
61 recovered
.set_exception(
62 crimson::common::system_shutdown_exception());
64 crimson::common::system_shutdown_exception());
65 for (auto& [pg_shard
, pr
] : pushes
) {
67 crimson::common::system_shutdown_exception());
71 void RecoveryBackend::handle_backfill_finish(
73 crimson::net::ConnectionRef conn
)
75 logger().debug("{}", __func__
);
76 ceph_assert(!pg
.is_primary());
77 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 1);
78 auto reply
= crimson::make_message
<MOSDPGBackfill
>(
79 MOSDPGBackfill::OP_BACKFILL_FINISH_ACK
,
80 pg
.get_osdmap_epoch(),
82 spg_t(pg
.get_pgid().pgid
, pg
.get_primary().shard
));
83 reply
->set_priority(pg
.get_recovery_op_priority());
84 std::ignore
= conn
->send(std::move(reply
));
85 shard_services
.start_operation
<crimson::osd::LocalPeeringEvent
>(
86 static_cast<crimson::osd::PG
*>(&pg
),
89 pg
.get_osdmap_epoch(),
90 pg
.get_osdmap_epoch(),
94 RecoveryBackend::interruptible_future
<>
95 RecoveryBackend::handle_backfill_progress(
98 logger().debug("{}", __func__
);
99 ceph_assert(!pg
.is_primary());
100 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 2);
102 ObjectStore::Transaction t
;
103 pg
.get_peering_state().update_backfill_progress(
106 m
.op
== MOSDPGBackfill::OP_BACKFILL_PROGRESS
,
108 logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction...");
109 return shard_services
.get_store().do_transaction(
110 pg
.get_collection_ref(), std::move(t
)).or_terminate();
113 RecoveryBackend::interruptible_future
<>
114 RecoveryBackend::handle_backfill_finish_ack(
117 logger().debug("{}", __func__
);
118 ceph_assert(pg
.is_primary());
119 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 3);
121 // finish_recovery_op(hobject_t::get_max());
122 return seastar::now();
125 RecoveryBackend::interruptible_future
<>
126 RecoveryBackend::handle_backfill(
128 crimson::net::ConnectionRef conn
)
130 logger().debug("{}", __func__
);
131 if (pg
.old_peering_msg(m
.map_epoch
, m
.query_epoch
)) {
132 logger().debug("{}: discarding {}", __func__
, m
);
133 return seastar::now();
136 case MOSDPGBackfill::OP_BACKFILL_FINISH
:
137 handle_backfill_finish(m
, conn
);
139 case MOSDPGBackfill::OP_BACKFILL_PROGRESS
:
140 return handle_backfill_progress(m
);
141 case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK
:
142 return handle_backfill_finish_ack(m
);
144 ceph_assert("unknown op type for pg backfill");
145 return seastar::now();
149 RecoveryBackend::interruptible_future
<>
150 RecoveryBackend::handle_backfill_remove(
151 MOSDPGBackfillRemove
& m
)
153 logger().debug("{} m.ls={}", __func__
, m
.ls
);
154 assert(m
.get_type() == MSG_OSD_PG_BACKFILL_REMOVE
);
155 if (pg
.can_discard_replica_op(m
)) {
156 logger().debug("{}: discarding {}", __func__
, m
);
157 return seastar::now();
159 ObjectStore::Transaction t
;
160 for ([[maybe_unused
]] const auto& [soid
, ver
] : m
.ls
) {
161 // TODO: the reserved space management. PG::try_reserve_recovery_space().
162 t
.remove(pg
.get_collection_ref()->get_cid(),
163 ghobject_t(soid
, ghobject_t::NO_GEN
, pg
.get_pg_whoami().shard
));
165 logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction...");
166 return shard_services
.get_store().do_transaction(
167 pg
.get_collection_ref(), std::move(t
)).or_terminate();
170 RecoveryBackend::interruptible_future
<BackfillInterval
>
171 RecoveryBackend::scan_for_backfill(
172 const hobject_t
& start
,
173 [[maybe_unused
]] const std::int64_t min
,
174 const std::int64_t max
)
176 logger().debug("{} starting from {}", __func__
, start
);
177 auto version_map
= seastar::make_lw_shared
<std::map
<hobject_t
, eversion_t
>>();
178 return backend
->list_objects(start
, max
).then_interruptible(
179 [this, start
, version_map
] (auto&& ret
) {
180 auto&& [objects
, next
] = std::move(ret
);
181 return seastar::do_with(
183 [this, version_map
](auto &objects
) {
184 return interruptor::parallel_for_each(objects
,
185 [this, version_map
] (const hobject_t
& object
)
186 -> interruptible_future
<> {
187 crimson::osd::ObjectContextRef obc
;
188 if (pg
.is_primary()) {
189 obc
= pg
.obc_registry
.maybe_get_cached_obc(object
);
192 if (obc
->obs
.exists
) {
193 logger().debug("scan_for_backfill found (primary): {} {}",
194 object
, obc
->obs
.oi
.version
);
195 version_map
->emplace(object
, obc
->obs
.oi
.version
);
197 // if the object does not exist here, it must have been removed
198 // between the collection_list_partial and here. This can happen
199 // for the first item in the range, which is usually last_backfill.
201 return seastar::now();
203 return backend
->load_metadata(object
).safe_then_interruptible(
204 [version_map
, object
] (auto md
) {
206 logger().debug("scan_for_backfill found: {} {}",
207 object
, md
->os
.oi
.version
);
208 version_map
->emplace(object
, md
->os
.oi
.version
);
210 return seastar::now();
211 }, PGBackend::load_metadata_ertr::assert_all
{});
214 }).then_interruptible([version_map
, start
=std::move(start
), next
=std::move(next
), this] {
216 bi
.begin
= std::move(start
);
217 bi
.end
= std::move(next
);
218 bi
.version
= pg
.get_info().last_update
;
219 bi
.objects
= std::move(*version_map
);
220 logger().debug("{} BackfillInterval filled, leaving",
221 "scan_for_backfill");
222 return seastar::make_ready_future
<BackfillInterval
>(std::move(bi
));
227 RecoveryBackend::interruptible_future
<>
228 RecoveryBackend::handle_scan_get_digest(
230 crimson::net::ConnectionRef conn
)
232 logger().debug("{}", __func__
);
233 if (false /* FIXME: check for backfill too full */) {
234 std::ignore
= shard_services
.start_operation
<crimson::osd::LocalPeeringEvent
>(
235 // TODO: abstract start_background_recovery
236 static_cast<crimson::osd::PG
*>(&pg
),
239 pg
.get_osdmap_epoch(),
240 pg
.get_osdmap_epoch(),
241 PeeringState::BackfillTooFull());
242 return seastar::now();
244 return scan_for_backfill(
246 crimson::common::local_conf().get_val
<std::int64_t>("osd_backfill_scan_min"),
247 crimson::common::local_conf().get_val
<std::int64_t>("osd_backfill_scan_max")
248 ).then_interruptible(
249 [this, query_epoch
=m
.query_epoch
, conn
250 ](auto backfill_interval
) {
251 auto reply
= crimson::make_message
<MOSDPGScan
>(
252 MOSDPGScan::OP_SCAN_DIGEST
,
254 pg
.get_osdmap_epoch(),
256 spg_t(pg
.get_info().pgid
.pgid
, pg
.get_primary().shard
),
257 backfill_interval
.begin
,
258 backfill_interval
.end
);
259 encode(backfill_interval
.objects
, reply
->get_data());
260 return conn
->send(std::move(reply
));
264 RecoveryBackend::interruptible_future
<>
265 RecoveryBackend::handle_scan_digest(
268 logger().debug("{}", __func__
);
269 // Check that from is in backfill_targets vector
270 ceph_assert(pg
.is_backfill_target(m
.from
));
276 auto p
= m
.get_data().cbegin();
277 // take care to preserve ordering!
279 ::decode_noclear(bi
.objects
, p
);
281 shard_services
.start_operation
<crimson::osd::BackfillRecovery
>(
282 static_cast<crimson::osd::PG
*>(&pg
),
284 pg
.get_osdmap_epoch(),
285 crimson::osd::BackfillState::ReplicaScanned
{ m
.from
, std::move(bi
) });
286 return seastar::now();
289 RecoveryBackend::interruptible_future
<>
290 RecoveryBackend::handle_scan(
292 crimson::net::ConnectionRef conn
)
294 logger().debug("{}", __func__
);
295 if (pg
.old_peering_msg(m
.map_epoch
, m
.query_epoch
)) {
296 logger().debug("{}: discarding {}", __func__
, m
);
297 return seastar::now();
300 case MOSDPGScan::OP_SCAN_GET_DIGEST
:
301 return handle_scan_get_digest(m
, conn
);
302 case MOSDPGScan::OP_SCAN_DIGEST
:
303 return handle_scan_digest(m
);
305 // FIXME: move to errorator
306 ceph_assert("unknown op type for pg scan");
307 return seastar::now();
311 RecoveryBackend::interruptible_future
<>
312 RecoveryBackend::handle_recovery_op(
313 Ref
<MOSDFastDispatchOp
> m
,
314 crimson::net::ConnectionRef conn
)
316 switch (m
->get_header().type
) {
317 case MSG_OSD_PG_BACKFILL
:
318 return handle_backfill(*boost::static_pointer_cast
<MOSDPGBackfill
>(m
), conn
);
319 case MSG_OSD_PG_BACKFILL_REMOVE
:
320 return handle_backfill_remove(*boost::static_pointer_cast
<MOSDPGBackfillRemove
>(m
));
321 case MSG_OSD_PG_SCAN
:
322 return handle_scan(*boost::static_pointer_cast
<MOSDPGScan
>(m
), conn
);
324 return seastar::make_exception_future
<>(
325 std::invalid_argument(fmt::format("invalid request type: {}",
326 m
->get_header().type
)));