1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <fmt/format.h>
6 #include "crimson/common/exception.h"
7 #include "crimson/osd/recovery_backend.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/pg_backend.h"
11 #include "messages/MOSDFastDispatchOp.h"
12 #include "osd/osd_types.h"
15 seastar::logger
& logger() {
16 return crimson::get_logger(ceph_subsys_osd
);
20 hobject_t
RecoveryBackend::get_temp_recovery_object(
21 const hobject_t
& target
,
22 eversion_t version
) const
25 target
.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
28 pg
.get_info().history
.same_interval_since
,
30 logger().debug("{} {}", __func__
, hoid
);
34 void RecoveryBackend::clean_up(ceph::os::Transaction
& t
,
37 for (auto& soid
: temp_contents
) {
38 t
.remove(pg
.get_collection_ref()->get_cid(),
39 ghobject_t(soid
, ghobject_t::NO_GEN
, pg
.get_pg_whoami().shard
));
41 temp_contents
.clear();
43 for (auto& [soid
, recovery_waiter
] : recovering
) {
44 if ((recovery_waiter
.pi
&& recovery_waiter
.pi
->is_complete())
45 || (!recovery_waiter
.pi
46 && recovery_waiter
.obc
&& recovery_waiter
.obc
->obs
.exists
)) {
47 recovery_waiter
.obc
->interrupt(
48 ::crimson::common::actingset_changed(
50 recovery_waiter
.interrupt(why
);
56 void RecoveryBackend::WaitForObjectRecovery::stop() {
57 readable
.set_exception(
58 crimson::common::system_shutdown_exception());
59 recovered
.set_exception(
60 crimson::common::system_shutdown_exception());
62 crimson::common::system_shutdown_exception());
63 for (auto& [pg_shard
, pr
] : pushes
) {
65 crimson::common::system_shutdown_exception());
69 void RecoveryBackend::handle_backfill_finish(
72 logger().debug("{}", __func__
);
73 ceph_assert(!pg
.is_primary());
74 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 1);
75 auto reply
= crimson::make_message
<MOSDPGBackfill
>(
76 MOSDPGBackfill::OP_BACKFILL_FINISH_ACK
,
77 pg
.get_osdmap_epoch(),
79 spg_t(pg
.get_pgid().pgid
, pg
.get_primary().shard
));
80 reply
->set_priority(pg
.get_recovery_op_priority());
81 std::ignore
= m
.get_connection()->send(std::move(reply
));
82 shard_services
.start_operation
<crimson::osd::LocalPeeringEvent
>(
83 static_cast<crimson::osd::PG
*>(&pg
),
87 pg
.get_osdmap_epoch(),
88 pg
.get_osdmap_epoch(),
92 RecoveryBackend::interruptible_future
<>
93 RecoveryBackend::handle_backfill_progress(
96 logger().debug("{}", __func__
);
97 ceph_assert(!pg
.is_primary());
98 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 2);
100 ObjectStore::Transaction t
;
101 pg
.get_peering_state().update_backfill_progress(
104 m
.op
== MOSDPGBackfill::OP_BACKFILL_PROGRESS
,
106 logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction...");
107 return shard_services
.get_store().do_transaction(
108 pg
.get_collection_ref(), std::move(t
)).or_terminate();
111 RecoveryBackend::interruptible_future
<>
112 RecoveryBackend::handle_backfill_finish_ack(
115 logger().debug("{}", __func__
);
116 ceph_assert(pg
.is_primary());
117 ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at
!= 3);
119 // finish_recovery_op(hobject_t::get_max());
120 return seastar::now();
123 RecoveryBackend::interruptible_future
<>
124 RecoveryBackend::handle_backfill(
127 logger().debug("{}", __func__
);
128 if (pg
.old_peering_msg(m
.map_epoch
, m
.query_epoch
)) {
129 logger().debug("{}: discarding {}", __func__
, m
);
130 return seastar::now();
133 case MOSDPGBackfill::OP_BACKFILL_FINISH
:
134 handle_backfill_finish(m
);
136 case MOSDPGBackfill::OP_BACKFILL_PROGRESS
:
137 return handle_backfill_progress(m
);
138 case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK
:
139 return handle_backfill_finish_ack(m
);
141 ceph_assert("unknown op type for pg backfill");
142 return seastar::now();
146 RecoveryBackend::interruptible_future
<>
147 RecoveryBackend::handle_backfill_remove(
148 MOSDPGBackfillRemove
& m
)
150 logger().debug("{} m.ls={}", __func__
, m
.ls
);
151 assert(m
.get_type() == MSG_OSD_PG_BACKFILL_REMOVE
);
152 if (pg
.can_discard_replica_op(m
)) {
153 logger().debug("{}: discarding {}", __func__
, m
);
154 return seastar::now();
156 ObjectStore::Transaction t
;
157 for ([[maybe_unused
]] const auto& [soid
, ver
] : m
.ls
) {
158 // TODO: the reserved space management. PG::try_reserve_recovery_space().
159 t
.remove(pg
.get_collection_ref()->get_cid(),
160 ghobject_t(soid
, ghobject_t::NO_GEN
, pg
.get_pg_whoami().shard
));
162 logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction...");
163 return shard_services
.get_store().do_transaction(
164 pg
.get_collection_ref(), std::move(t
)).or_terminate();
167 RecoveryBackend::interruptible_future
<BackfillInterval
>
168 RecoveryBackend::scan_for_backfill(
169 const hobject_t
& start
,
170 [[maybe_unused
]] const std::int64_t min
,
171 const std::int64_t max
)
173 logger().debug("{} starting from {}", __func__
, start
);
174 auto version_map
= seastar::make_lw_shared
<std::map
<hobject_t
, eversion_t
>>();
175 return backend
->list_objects(start
, max
).then_interruptible(
176 [this, start
, version_map
] (auto&& ret
) {
177 auto&& [objects
, next
] = std::move(ret
);
178 return interruptor::parallel_for_each(std::move(objects
),
179 [this, version_map
] (const hobject_t
& object
)
180 -> interruptible_future
<> {
181 crimson::osd::ObjectContextRef obc
;
182 if (pg
.is_primary()) {
183 obc
= shard_services
.obc_registry
.maybe_get_cached_obc(object
);
186 if (obc
->obs
.exists
) {
187 logger().debug("scan_for_backfill found (primary): {} {}",
188 object
, obc
->obs
.oi
.version
);
189 version_map
->emplace(object
, obc
->obs
.oi
.version
);
191 // if the object does not exist here, it must have been removed
192 // between the collection_list_partial and here. This can happen
193 // for the first item in the range, which is usually last_backfill.
195 return seastar::now();
197 return backend
->load_metadata(object
).safe_then_interruptible(
198 [version_map
, object
] (auto md
) {
200 logger().debug("scan_for_backfill found: {} {}",
201 object
, md
->os
.oi
.version
);
202 version_map
->emplace(object
, md
->os
.oi
.version
);
204 return seastar::now();
205 }, PGBackend::load_metadata_ertr::assert_all
{});
207 }).then_interruptible([version_map
, start
=std::move(start
), next
=std::move(next
), this] {
209 bi
.begin
= std::move(start
);
210 bi
.end
= std::move(next
);
211 bi
.version
= pg
.get_info().last_update
;
212 bi
.objects
= std::move(*version_map
);
213 logger().debug("{} BackfillInterval filled, leaving",
214 "scan_for_backfill");
215 return seastar::make_ready_future
<BackfillInterval
>(std::move(bi
));
220 RecoveryBackend::interruptible_future
<>
221 RecoveryBackend::handle_scan_get_digest(
224 logger().debug("{}", __func__
);
225 if (false /* FIXME: check for backfill too full */) {
226 std::ignore
= shard_services
.start_operation
<crimson::osd::LocalPeeringEvent
>(
227 // TODO: abstract start_background_recovery
228 static_cast<crimson::osd::PG
*>(&pg
),
232 pg
.get_osdmap_epoch(),
233 pg
.get_osdmap_epoch(),
234 PeeringState::BackfillTooFull());
235 return seastar::now();
237 return scan_for_backfill(
239 crimson::common::local_conf().get_val
<std::int64_t>("osd_backfill_scan_min"),
240 crimson::common::local_conf().get_val
<std::int64_t>("osd_backfill_scan_max")
241 ).then_interruptible([this,
242 query_epoch
=m
.query_epoch
,
243 conn
=m
.get_connection()] (auto backfill_interval
) {
244 auto reply
= crimson::make_message
<MOSDPGScan
>(
245 MOSDPGScan::OP_SCAN_DIGEST
,
247 pg
.get_osdmap_epoch(),
249 spg_t(pg
.get_info().pgid
.pgid
, pg
.get_primary().shard
),
250 backfill_interval
.begin
,
251 backfill_interval
.end
);
252 encode(backfill_interval
.objects
, reply
->get_data());
253 return conn
->send(std::move(reply
));
257 RecoveryBackend::interruptible_future
<>
258 RecoveryBackend::handle_scan_digest(
261 logger().debug("{}", __func__
);
262 // Check that from is in backfill_targets vector
263 ceph_assert(pg
.is_backfill_target(m
.from
));
269 auto p
= m
.get_data().cbegin();
270 // take care to preserve ordering!
272 ::decode_noclear(bi
.objects
, p
);
274 shard_services
.start_operation
<crimson::osd::BackfillRecovery
>(
275 static_cast<crimson::osd::PG
*>(&pg
),
277 pg
.get_osdmap_epoch(),
278 crimson::osd::BackfillState::ReplicaScanned
{ m
.from
, std::move(bi
) });
279 return seastar::now();
282 RecoveryBackend::interruptible_future
<>
283 RecoveryBackend::handle_scan(
286 logger().debug("{}", __func__
);
287 if (pg
.old_peering_msg(m
.map_epoch
, m
.query_epoch
)) {
288 logger().debug("{}: discarding {}", __func__
, m
);
289 return seastar::now();
292 case MOSDPGScan::OP_SCAN_GET_DIGEST
:
293 return handle_scan_get_digest(m
);
294 case MOSDPGScan::OP_SCAN_DIGEST
:
295 return handle_scan_digest(m
);
297 // FIXME: move to errorator
298 ceph_assert("unknown op type for pg scan");
299 return seastar::now();
303 RecoveryBackend::interruptible_future
<>
304 RecoveryBackend::handle_recovery_op(
305 Ref
<MOSDFastDispatchOp
> m
)
307 switch (m
->get_header().type
) {
308 case MSG_OSD_PG_BACKFILL
:
309 return handle_backfill(*boost::static_pointer_cast
<MOSDPGBackfill
>(m
));
310 case MSG_OSD_PG_BACKFILL_REMOVE
:
311 return handle_backfill_remove(*boost::static_pointer_cast
<MOSDPGBackfillRemove
>(m
));
312 case MSG_OSD_PG_SCAN
:
313 return handle_scan(*boost::static_pointer_cast
<MOSDPGScan
>(m
));
315 return seastar::make_exception_future
<>(
316 std::invalid_argument(fmt::format("invalid request type: {}",
317 m
->get_header().type
)));