1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
4 #include <fmt/format.h>
5 #include <fmt/ostream.h>
6 #include <seastar/core/future.hh>
7 #include <seastar/core/do_with.hh>
9 #include "crimson/osd/pg.h"
10 #include "crimson/osd/pg_backend.h"
11 #include "osd/osd_types_fmt.h"
12 #include "replicated_recovery_backend.h"
13 #include "msg/Message.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_osd
);
25 RecoveryBackend::interruptible_future
<>
26 ReplicatedRecoveryBackend::recover_object(
27 const hobject_t
& soid
,
30 logger().debug("{}: {}, {}", __func__
, soid
, need
);
31 // always add_recovering(soid) before recover_object(soid)
32 assert(is_recovering(soid
));
33 // start tracking the recovery of soid
34 return maybe_pull_missing_obj(soid
, need
).then_interruptible([this, soid
, need
] {
35 logger().debug("recover_object: loading obc: {}", soid
);
36 return pg
.obc_loader
.with_obc
<RWState::RWREAD
>(soid
,
37 [this, soid
, need
](auto obc
) {
38 logger().debug("recover_object: loaded obc: {}", obc
->obs
.oi
.soid
);
39 auto& recovery_waiter
= get_recovering(soid
);
40 recovery_waiter
.obc
= obc
;
41 recovery_waiter
.obc
->wait_recovery_read();
42 return maybe_push_shards(soid
, need
);
43 }).handle_error_interruptible(
44 crimson::osd::PG::load_obc_ertr::all_same_way([soid
](auto& code
) {
45 // TODO: may need eio handling?
46 logger().error("recover_object saw error code {}, ignoring object {}",
52 RecoveryBackend::interruptible_future
<>
53 ReplicatedRecoveryBackend::maybe_push_shards(
54 const hobject_t
& soid
,
57 return seastar::do_with(
58 get_shards_to_push(soid
),
59 [this, need
, soid
](auto &shards
) {
60 return interruptor::parallel_for_each(
62 [this, need
, soid
](auto shard
) {
63 return prep_push(soid
, need
, shard
).then_interruptible([this, soid
, shard
](auto push
) {
64 auto msg
= crimson::make_message
<MOSDPGPush
>();
65 msg
->from
= pg
.get_pg_whoami();
66 msg
->pgid
= pg
.get_pgid();
67 msg
->map_epoch
= pg
.get_osdmap_epoch();
68 msg
->min_epoch
= pg
.get_last_peering_reset();
69 msg
->pushes
.push_back(std::move(push
));
70 msg
->set_priority(pg
.get_recovery_op_priority());
71 return interruptor::make_interruptible(
72 shard_services
.send_to_osd(shard
.osd
,
74 pg
.get_osdmap_epoch()))
77 return get_recovering(soid
).wait_for_pushes(shard
);
81 }).then_interruptible([this, soid
] {
82 auto &recovery
= get_recovering(soid
);
83 if (auto push_info
= recovery
.pushing
.begin();
84 push_info
!= recovery
.pushing
.end()) {
85 pg
.get_recovery_handler()->on_global_recover(soid
,
86 push_info
->second
.stat
,
88 } else if (recovery
.pull_info
) {
89 // no push happened (empty get_shards_to_push()) but pull actually did
90 pg
.get_recovery_handler()->on_global_recover(soid
,
91 recovery
.pull_info
->stat
,
94 // no pulls, no pushes
96 return seastar::make_ready_future
<>();
97 }).handle_exception_interruptible([this, soid
](auto e
) {
98 auto &recovery
= get_recovering(soid
);
100 recovery
.obc
->drop_recovery_read();
102 recovering
.erase(soid
);
103 return seastar::make_exception_future
<>(e
);
107 RecoveryBackend::interruptible_future
<>
108 ReplicatedRecoveryBackend::maybe_pull_missing_obj(
109 const hobject_t
& soid
,
112 pg_missing_tracker_t local_missing
= pg
.get_local_missing();
113 if (!local_missing
.is_missing(soid
)) {
114 return seastar::make_ready_future
<>();
117 auto& recovery_waiter
= get_recovering(soid
);
118 recovery_waiter
.pull_info
=
119 std::make_optional
<RecoveryBackend::pull_info_t
>();
120 auto& pull_info
= *recovery_waiter
.pull_info
;
121 prepare_pull(pull_op
, pull_info
, soid
, need
);
122 auto msg
= crimson::make_message
<MOSDPGPull
>();
123 msg
->from
= pg
.get_pg_whoami();
124 msg
->set_priority(pg
.get_recovery_op_priority());
125 msg
->pgid
= pg
.get_pgid();
126 msg
->map_epoch
= pg
.get_osdmap_epoch();
127 msg
->min_epoch
= pg
.get_last_peering_reset();
128 msg
->set_pulls({std::move(pull_op
)});
129 return interruptor::make_interruptible(
130 shard_services
.send_to_osd(
133 pg
.get_osdmap_epoch()
134 )).then_interruptible([&recovery_waiter
] {
135 return recovery_waiter
.wait_for_pull();
139 RecoveryBackend::interruptible_future
<>
140 ReplicatedRecoveryBackend::push_delete(
141 const hobject_t
& soid
,
144 logger().debug("{}: {}, {}", __func__
, soid
, need
);
145 epoch_t min_epoch
= pg
.get_last_peering_reset();
147 assert(pg
.get_acting_recovery_backfill().size() > 0);
148 return interruptor::parallel_for_each(pg
.get_acting_recovery_backfill(),
149 [this, soid
, need
, min_epoch
](pg_shard_t shard
)
150 -> interruptible_future
<> {
151 if (shard
== pg
.get_pg_whoami())
152 return seastar::make_ready_future
<>();
153 auto iter
= pg
.get_shard_missing().find(shard
);
154 if (iter
== pg
.get_shard_missing().end())
155 return seastar::make_ready_future
<>();
156 if (iter
->second
.is_missing(soid
)) {
157 logger().debug("push_delete: will remove {} from {}", soid
, shard
);
158 pg
.begin_peer_recover(shard
, soid
);
159 spg_t
target_pg(pg
.get_info().pgid
.pgid
, shard
.shard
);
160 auto msg
= crimson::make_message
<MOSDPGRecoveryDelete
>(
161 pg
.get_pg_whoami(), target_pg
, pg
.get_osdmap_epoch(), min_epoch
);
162 msg
->set_priority(pg
.get_recovery_op_priority());
163 msg
->objects
.push_back(std::make_pair(soid
, need
));
164 return interruptor::make_interruptible(
165 shard_services
.send_to_osd(shard
.osd
, std::move(msg
),
166 pg
.get_osdmap_epoch())).then_interruptible(
167 [this, soid
, shard
] {
168 return get_recovering(soid
).wait_for_pushes(shard
);
171 return seastar::make_ready_future
<>();
175 RecoveryBackend::interruptible_future
<>
176 ReplicatedRecoveryBackend::handle_recovery_delete(
177 Ref
<MOSDPGRecoveryDelete
> m
)
179 logger().debug("{}: {}", __func__
, *m
);
181 auto& p
= m
->objects
.front(); //TODO: only one delete per message for now.
182 return local_recover_delete(p
.first
, p
.second
, pg
.get_osdmap_epoch())
185 auto reply
= crimson::make_message
<MOSDPGRecoveryDeleteReply
>();
186 reply
->from
= pg
.get_pg_whoami();
187 reply
->set_priority(m
->get_priority());
188 reply
->pgid
= spg_t(pg
.get_info().pgid
.pgid
, m
->from
.shard
);
189 reply
->map_epoch
= m
->map_epoch
;
190 reply
->min_epoch
= m
->min_epoch
;
191 reply
->objects
= m
->objects
;
192 return shard_services
.send_to_osd(m
->from
.osd
, std::move(reply
), pg
.get_osdmap_epoch());
196 RecoveryBackend::interruptible_future
<>
197 ReplicatedRecoveryBackend::on_local_recover_persist(
198 const hobject_t
& soid
,
199 const ObjectRecoveryInfo
& _recovery_info
,
201 epoch_t epoch_frozen
)
203 logger().debug("{}", __func__
);
204 ceph::os::Transaction t
;
205 pg
.get_recovery_handler()->on_local_recover(soid
, _recovery_info
, is_delete
, t
);
206 logger().debug("ReplicatedRecoveryBackend::on_local_recover_persist: do_transaction...");
207 return interruptor::make_interruptible(
208 shard_services
.get_store().do_transaction(coll
, std::move(t
)))
210 [this, epoch_frozen
, last_complete
= pg
.get_info().last_complete
] {
211 pg
.get_recovery_handler()->_committed_pushed_object(epoch_frozen
, last_complete
);
212 return seastar::make_ready_future
<>();
216 RecoveryBackend::interruptible_future
<>
217 ReplicatedRecoveryBackend::local_recover_delete(
218 const hobject_t
& soid
,
220 epoch_t epoch_to_freeze
)
222 logger().debug("{}: {}, {}", __func__
, soid
, need
);
223 return backend
->load_metadata(soid
).safe_then_interruptible([this]
224 (auto lomt
) -> interruptible_future
<> {
225 if (lomt
->os
.exists
) {
226 return seastar::do_with(ceph::os::Transaction(),
227 [this, lomt
= std::move(lomt
)](auto& txn
) {
228 return backend
->remove(lomt
->os
, txn
).then_interruptible(
229 [this, &txn
]() mutable {
230 logger().debug("ReplicatedRecoveryBackend::local_recover_delete: do_transaction...");
231 return shard_services
.get_store().do_transaction(coll
,
236 return seastar::make_ready_future
<>();
237 }).safe_then_interruptible([this, soid
, epoch_to_freeze
, need
] {
238 ObjectRecoveryInfo recovery_info
;
239 recovery_info
.soid
= soid
;
240 recovery_info
.version
= need
;
241 return on_local_recover_persist(soid
, recovery_info
,
242 true, epoch_to_freeze
);
243 }, PGBackend::load_metadata_ertr::all_same_way(
244 [this, soid
, epoch_to_freeze
, need
] (auto e
) {
245 ObjectRecoveryInfo recovery_info
;
246 recovery_info
.soid
= soid
;
247 recovery_info
.version
= need
;
248 return on_local_recover_persist(soid
, recovery_info
,
249 true, epoch_to_freeze
);
254 RecoveryBackend::interruptible_future
<>
255 ReplicatedRecoveryBackend::recover_delete(
256 const hobject_t
&soid
, eversion_t need
)
258 logger().debug("{}: {}, {}", __func__
, soid
, need
);
260 epoch_t cur_epoch
= pg
.get_osdmap_epoch();
261 return seastar::do_with(object_stat_sum_t(),
262 [this, soid
, need
, cur_epoch
](auto& stat_diff
) {
263 return local_recover_delete(soid
, need
, cur_epoch
).then_interruptible(
264 [this, &stat_diff
, cur_epoch
, soid
, need
]()
265 -> interruptible_future
<> {
266 if (!pg
.has_reset_since(cur_epoch
)) {
267 bool object_missing
= false;
268 for (const auto& shard
: pg
.get_acting_recovery_backfill()) {
269 if (shard
== pg
.get_pg_whoami())
271 if (pg
.get_shard_missing(shard
)->is_missing(soid
)) {
272 logger().debug("recover_delete: soid {} needs to deleted from replca {}",
274 object_missing
= true;
279 if (!object_missing
) {
280 stat_diff
.num_objects_recovered
= 1;
281 return seastar::make_ready_future
<>();
283 return push_delete(soid
, need
);
286 return seastar::make_ready_future
<>();
287 }).then_interruptible([this, soid
, &stat_diff
] {
288 pg
.get_recovery_handler()->on_global_recover(soid
, stat_diff
, true);
289 return seastar::make_ready_future
<>();
294 RecoveryBackend::interruptible_future
<PushOp
>
295 ReplicatedRecoveryBackend::prep_push(
296 const hobject_t
& soid
,
300 logger().debug("{}: {}, {}", __func__
, soid
, need
);
302 auto& recovery_waiter
= get_recovering(soid
);
303 auto& obc
= recovery_waiter
.obc
;
304 interval_set
<uint64_t> data_subset
;
305 if (obc
->obs
.oi
.size
) {
306 data_subset
.insert(0, obc
->obs
.oi
.size
);
308 const auto& missing
= pg
.get_shard_missing().find(pg_shard
)->second
;
309 const auto it
= missing
.get_items().find(soid
);
310 assert(it
!= missing
.get_items().end());
311 data_subset
.intersection_of(it
->second
.clean_regions
.get_dirty_regions());
312 logger().debug("prep_push: {} data_subset {} to {}",
313 soid
, data_subset
, pg_shard
);
315 auto& push_info
= recovery_waiter
.pushing
[pg_shard
];
316 pg
.begin_peer_recover(pg_shard
, soid
);
317 const auto pmissing_iter
= pg
.get_shard_missing().find(pg_shard
);
318 const auto missing_iter
= pmissing_iter
->second
.get_items().find(soid
);
319 assert(missing_iter
!= pmissing_iter
->second
.get_items().end());
322 push_info
.recovery_info
.size
= obc
->obs
.oi
.size
;
323 push_info
.recovery_info
.copy_subset
= data_subset
;
324 push_info
.recovery_info
.soid
= soid
;
325 push_info
.recovery_info
.oi
= obc
->obs
.oi
;
326 push_info
.recovery_info
.version
= obc
->obs
.oi
.version
;
327 push_info
.recovery_info
.object_exist
=
328 missing_iter
->second
.clean_regions
.object_is_exist();
329 push_info
.recovery_progress
.omap_complete
=
330 !missing_iter
->second
.clean_regions
.omap_is_dirty();
332 return build_push_op(push_info
.recovery_info
,
333 push_info
.recovery_progress
,
334 &push_info
.stat
).then_interruptible(
335 [this, soid
, pg_shard
](auto push_op
) {
336 auto& recovery_waiter
= get_recovering(soid
);
337 auto& push_info
= recovery_waiter
.pushing
[pg_shard
];
338 push_info
.recovery_progress
= push_op
.after_progress
;
343 void ReplicatedRecoveryBackend::prepare_pull(PullOp
& pull_op
,
344 pull_info_t
& pull_info
,
345 const hobject_t
& soid
,
347 logger().debug("{}: {}, {}", __func__
, soid
, need
);
349 pg_missing_tracker_t local_missing
= pg
.get_local_missing();
350 const auto missing_iter
= local_missing
.get_items().find(soid
);
351 auto m
= pg
.get_missing_loc_shards();
352 pg_shard_t fromshard
= *(m
[soid
].begin());
354 //TODO: skipped snap objects case for now
355 pull_op
.recovery_info
.copy_subset
.insert(0, (uint64_t) -1);
356 pull_op
.recovery_info
.copy_subset
.intersection_of(
357 missing_iter
->second
.clean_regions
.get_dirty_regions());
358 pull_op
.recovery_info
.size
= ((uint64_t) -1);
359 pull_op
.recovery_info
.object_exist
=
360 missing_iter
->second
.clean_regions
.object_is_exist();
361 pull_op
.recovery_info
.soid
= soid
;
363 pull_op
.recovery_progress
.data_complete
= false;
364 pull_op
.recovery_progress
.omap_complete
=
365 !missing_iter
->second
.clean_regions
.omap_is_dirty();
366 pull_op
.recovery_progress
.data_recovered_to
= 0;
367 pull_op
.recovery_progress
.first
= true;
369 pull_info
.from
= fromshard
;
370 pull_info
.soid
= soid
;
371 pull_info
.recovery_info
= pull_op
.recovery_info
;
372 pull_info
.recovery_progress
= pull_op
.recovery_progress
;
375 RecoveryBackend::interruptible_future
<PushOp
>
376 ReplicatedRecoveryBackend::build_push_op(
377 const ObjectRecoveryInfo
& recovery_info
,
378 const ObjectRecoveryProgress
& progress
,
379 object_stat_sum_t
* stat
)
381 logger().debug("{} {} @{}",
382 __func__
, recovery_info
.soid
, recovery_info
.version
);
383 return seastar::do_with(ObjectRecoveryProgress(progress
),
384 uint64_t(crimson::common::local_conf()
385 ->osd_recovery_max_chunk
),
386 recovery_info
.version
,
388 [this, &recovery_info
, &progress
, stat
]
389 (auto& new_progress
, auto& available
, auto& v
, auto& push_op
) {
390 return read_metadata_for_push_op(recovery_info
.soid
,
391 progress
, new_progress
,
393 ).then_interruptible([&](eversion_t local_ver
) mutable {
394 // If requestor didn't know the version, use ours
395 if (v
== eversion_t()) {
397 } else if (v
!= local_ver
) {
398 logger().error("build_push_op: {} push {} v{} failed because local copy is {}",
399 pg
.get_pgid(), recovery_info
.soid
, recovery_info
.version
, local_ver
);
402 return read_omap_for_push_op(recovery_info
.soid
,
405 available
, &push_op
);
406 }).then_interruptible([this, &recovery_info
, &progress
,
407 &available
, &push_op
]() mutable {
408 logger().debug("build_push_op: available: {}, copy_subset: {}",
409 available
, recovery_info
.copy_subset
);
410 return read_object_for_push_op(recovery_info
.soid
,
411 recovery_info
.copy_subset
,
412 progress
.data_recovered_to
,
413 available
, &push_op
);
414 }).then_interruptible([&recovery_info
, &v
, &progress
,
415 &new_progress
, stat
, &push_op
]
416 (uint64_t recovered_to
) mutable {
417 new_progress
.data_recovered_to
= recovered_to
;
418 if (new_progress
.is_complete(recovery_info
)) {
419 new_progress
.data_complete
= true;
421 stat
->num_objects_recovered
++;
422 } else if (progress
.first
&& progress
.omap_complete
) {
423 // If omap is not changed, we need recovery omap
424 // when recovery cannot be completed once
425 new_progress
.omap_complete
= false;
428 stat
->num_keys_recovered
+= push_op
.omap_entries
.size();
429 stat
->num_bytes_recovered
+= push_op
.data
.length();
432 push_op
.soid
= recovery_info
.soid
;
433 push_op
.recovery_info
= recovery_info
;
434 push_op
.after_progress
= new_progress
;
435 push_op
.before_progress
= progress
;
436 logger().debug("build_push_op: push_op version:"
437 " {}, push_op data length: {}",
438 push_op
.version
, push_op
.data
.length());
439 return seastar::make_ready_future
<PushOp
>(std::move(push_op
));
444 RecoveryBackend::interruptible_future
<eversion_t
>
445 ReplicatedRecoveryBackend::read_metadata_for_push_op(
446 const hobject_t
& oid
,
447 const ObjectRecoveryProgress
& progress
,
448 ObjectRecoveryProgress
& new_progress
,
452 logger().debug("{}, {}", __func__
, oid
);
453 if (!progress
.first
) {
454 return seastar::make_ready_future
<eversion_t
>(ver
);
456 return interruptor::make_interruptible(interruptor::when_all_succeed(
457 backend
->omap_get_header(coll
, ghobject_t(oid
)).handle_error_interruptible
<false>(
458 crimson::os::FuturizedStore::Shard::read_errorator::all_same_way(
459 [oid
] (const std::error_code
& e
) {
460 logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e
, oid
);
461 return seastar::make_ready_future
<bufferlist
>();
463 interruptor::make_interruptible(store
->get_attrs(coll
, ghobject_t(oid
)))
464 .handle_error_interruptible
<false>(
465 crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way(
466 [oid
] (const std::error_code
& e
) {
467 logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e
, oid
);
468 return seastar::make_ready_future
<crimson::os::FuturizedStore::Shard::attrs_t
>();
470 )).then_unpack_interruptible([&new_progress
, push_op
](auto bl
, auto attrs
) {
471 if (bl
.length() == 0) {
472 logger().warn("read_metadata_for_push_op: fail to read omap header");
473 } else if (attrs
.empty()) {
474 logger().error("read_metadata_for_push_op: fail to read attrs");
477 push_op
->omap_header
.claim_append(std::move(bl
));
478 for (auto&& [key
, val
] : attrs
) {
479 push_op
->attrset
.emplace(std::move(key
), std::move(val
));
481 logger().debug("read_metadata_for_push_op: {}", push_op
->attrset
[OI_ATTR
]);
483 oi
.decode_no_oid(push_op
->attrset
[OI_ATTR
]);
484 new_progress
.first
= false;
489 RecoveryBackend::interruptible_future
<uint64_t>
490 ReplicatedRecoveryBackend::read_object_for_push_op(
491 const hobject_t
& oid
,
492 const interval_set
<uint64_t>& copy_subset
,
497 if (max_len
== 0 || copy_subset
.empty()) {
498 push_op
->data_included
.clear();
499 return seastar::make_ready_future
<uint64_t>(offset
);
501 // 1. get the extents in the interested range
502 return interruptor::make_interruptible(backend
->fiemap(coll
, ghobject_t
{oid
},
503 0, copy_subset
.range_end())).safe_then_interruptible(
504 [=, this](auto&& fiemap_included
) mutable {
505 interval_set
<uint64_t> extents
;
507 extents
.intersection_of(copy_subset
, std::move(fiemap_included
));
508 } catch (std::exception
&) {
509 // if fiemap() fails, we will read nothing, as the intersection of
510 // copy_subset and an empty interval_set would be empty anyway
513 // 2. we can read up to "max_len" bytes from "offset", so truncate the
514 // extents down to this quota. no need to return the number of consumed
515 // bytes, as this is the last consumer of this quota
516 push_op
->data_included
.span_of(extents
, offset
, max_len
);
517 // 3. read the truncated extents
518 // TODO: check if the returned extents are pruned
519 return interruptor::make_interruptible(store
->readv(coll
, ghobject_t
{oid
},
520 push_op
->data_included
, 0));
521 }).safe_then_interruptible([push_op
, range_end
=copy_subset
.range_end()](auto &&bl
) {
522 push_op
->data
.claim_append(std::move(bl
));
523 uint64_t recovered_to
= 0;
524 if (push_op
->data_included
.empty()) {
525 // zero filled section, skip to end!
526 recovered_to
= range_end
;
528 // note down the progress, we will start from there next time
529 recovered_to
= push_op
->data_included
.range_end();
531 return seastar::make_ready_future
<uint64_t>(recovered_to
);
532 }, PGBackend::read_errorator::all_same_way([](auto e
) {
533 logger().debug("build_push_op: read exception");
534 return seastar::make_exception_future
<uint64_t>(e
);
538 static std::optional
<std::string
> nullopt_if_empty(const std::string
& s
)
540 return s
.empty() ? std::nullopt
: std::make_optional(s
);
543 static bool is_too_many_entries_per_chunk(const PushOp
* push_op
)
545 const uint64_t entries_per_chunk
=
546 crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk
;
547 if (!entries_per_chunk
) {
548 // the limit is disabled
551 return push_op
->omap_entries
.size() >= entries_per_chunk
;
554 RecoveryBackend::interruptible_future
<>
555 ReplicatedRecoveryBackend::read_omap_for_push_op(
556 const hobject_t
& oid
,
557 const ObjectRecoveryProgress
& progress
,
558 ObjectRecoveryProgress
& new_progress
,
562 if (progress
.omap_complete
) {
563 return seastar::make_ready_future
<>();
565 return seastar::repeat([&new_progress
, &max_len
, push_op
, &oid
, this] {
566 return shard_services
.get_store().omap_get_values(
567 coll
, ghobject_t
{oid
}, nullopt_if_empty(new_progress
.omap_recovered_to
)
568 ).safe_then([&new_progress
, &max_len
, push_op
](const auto& ret
) {
569 const auto& [done
, kvs
] = ret
;
571 // assuming "values.empty() only if done" holds here!
572 for (const auto& [key
, value
] : kvs
) {
573 if (is_too_many_entries_per_chunk(push_op
)) {
577 if (const uint64_t entry_size
= key
.size() + value
.length();
578 entry_size
> max_len
) {
582 max_len
-= std::min(max_len
, entry_size
);
584 push_op
->omap_entries
.emplace(key
, value
);
586 if (!push_op
->omap_entries
.empty()) {
587 // we iterate in order
588 new_progress
.omap_recovered_to
= std::rbegin(push_op
->omap_entries
)->first
;
591 new_progress
.omap_complete
= true;
593 return seastar::make_ready_future
<seastar::stop_iteration
>(
594 stop
? seastar::stop_iteration::yes
: seastar::stop_iteration::no
596 }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all
{});
600 std::vector
<pg_shard_t
>
601 ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t
& soid
) const
603 std::vector
<pg_shard_t
> shards
;
604 assert(pg
.get_acting_recovery_backfill().size() > 0);
605 for (const auto& peer
: pg
.get_acting_recovery_backfill()) {
606 if (peer
== pg
.get_pg_whoami())
609 pg
.get_shard_missing().find(peer
);
610 assert(shard_missing
!= pg
.get_shard_missing().end());
611 if (shard_missing
->second
.is_missing(soid
)) {
612 shards
.push_back(shard_missing
->first
);
618 RecoveryBackend::interruptible_future
<>
619 ReplicatedRecoveryBackend::handle_pull(Ref
<MOSDPGPull
> m
)
621 logger().debug("{}: {}", __func__
, *m
);
622 if (pg
.can_discard_replica_op(*m
)) {
623 logger().debug("{}: discarding {}", __func__
, *m
);
624 return seastar::now();
626 return seastar::do_with(m
->take_pulls(), [this, from
=m
->from
](auto& pulls
) {
627 return interruptor::parallel_for_each(pulls
,
628 [this, from
](auto& pull_op
) {
629 const hobject_t
& soid
= pull_op
.soid
;
630 logger().debug("handle_pull: {}", soid
);
631 return backend
->stat(coll
, ghobject_t(soid
)).then_interruptible(
632 [this, &pull_op
](auto st
) {
633 ObjectRecoveryInfo
&recovery_info
= pull_op
.recovery_info
;
634 ObjectRecoveryProgress
&progress
= pull_op
.recovery_progress
;
635 if (progress
.first
&& recovery_info
.size
== ((uint64_t) -1)) {
636 // Adjust size and copy_subset
637 recovery_info
.size
= st
.st_size
;
639 interval_set
<uint64_t> object_range
;
640 object_range
.insert(0, st
.st_size
);
641 recovery_info
.copy_subset
.intersection_of(object_range
);
643 recovery_info
.copy_subset
.clear();
645 assert(recovery_info
.clone_subset
.empty());
647 return build_push_op(recovery_info
, progress
, 0);
648 }).then_interruptible([this, from
](auto push_op
) {
649 auto msg
= crimson::make_message
<MOSDPGPush
>();
650 msg
->from
= pg
.get_pg_whoami();
651 msg
->pgid
= pg
.get_pgid();
652 msg
->map_epoch
= pg
.get_osdmap_epoch();
653 msg
->min_epoch
= pg
.get_last_peering_reset();
654 msg
->set_priority(pg
.get_recovery_op_priority());
655 msg
->pushes
.push_back(std::move(push_op
));
656 return shard_services
.send_to_osd(from
.osd
, std::move(msg
),
657 pg
.get_osdmap_epoch());
663 RecoveryBackend::interruptible_future
<bool>
664 ReplicatedRecoveryBackend::_handle_pull_response(
668 ceph::os::Transaction
* t
)
670 logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}",
671 push_op
.recovery_info
, push_op
.after_progress
,
672 push_op
.data
.length(), push_op
.data_included
);
674 const hobject_t
&hoid
= push_op
.soid
;
675 auto& recovery_waiter
= get_recovering(hoid
);
676 auto& pull_info
= *recovery_waiter
.pull_info
;
677 if (pull_info
.recovery_info
.size
== (uint64_t(-1))) {
678 pull_info
.recovery_info
.size
= push_op
.recovery_info
.size
;
679 pull_info
.recovery_info
.copy_subset
.intersection_of(
680 push_op
.recovery_info
.copy_subset
);
683 // If primary doesn't have object info and didn't know version
684 if (pull_info
.recovery_info
.version
== eversion_t())
685 pull_info
.recovery_info
.version
= push_op
.version
;
687 auto prepare_waiter
= interruptor::make_interruptible(
688 seastar::make_ready_future
<>());
689 if (pull_info
.recovery_progress
.first
) {
690 prepare_waiter
= pg
.obc_loader
.with_obc
<RWState::RWNONE
>(
691 pull_info
.recovery_info
.soid
,
692 [&pull_info
, &recovery_waiter
, &push_op
](auto obc
) {
694 recovery_waiter
.obc
= obc
;
695 obc
->obs
.oi
.decode_no_oid(push_op
.attrset
.at(OI_ATTR
), push_op
.soid
);
696 pull_info
.recovery_info
.oi
= obc
->obs
.oi
;
697 return crimson::osd::PG::load_obc_ertr::now();
698 }).handle_error_interruptible(crimson::ct_error::assert_all
{});
700 return prepare_waiter
.then_interruptible(
701 [this, &pull_info
, &push_op
, t
, response
]() mutable {
702 const bool first
= pull_info
.recovery_progress
.first
;
703 pull_info
.recovery_progress
= push_op
.after_progress
;
704 logger().debug("new recovery_info {}, new progress {}",
705 pull_info
.recovery_info
, pull_info
.recovery_progress
);
706 interval_set
<uint64_t> data_zeros
;
708 uint64_t offset
= push_op
.before_progress
.data_recovered_to
;
709 uint64_t length
= (push_op
.after_progress
.data_recovered_to
-
710 push_op
.before_progress
.data_recovered_to
);
712 data_zeros
.insert(offset
, length
);
715 auto [usable_intervals
, data
] =
716 trim_pushed_data(pull_info
.recovery_info
.copy_subset
,
717 push_op
.data_included
, push_op
.data
);
718 bool complete
= pull_info
.is_complete();
719 bool clear_omap
= !push_op
.before_progress
.omap_complete
;
720 return submit_push_data(pull_info
.recovery_info
,
721 first
, complete
, clear_omap
,
722 std::move(data_zeros
), std::move(usable_intervals
),
723 std::move(data
), std::move(push_op
.omap_header
),
724 push_op
.attrset
, std::move(push_op
.omap_entries
), t
)
726 [this, response
, &pull_info
, &push_op
, complete
,
727 t
, bytes_recovered
=data
.length()] {
728 pull_info
.stat
.num_keys_recovered
+= push_op
.omap_entries
.size();
729 pull_info
.stat
.num_bytes_recovered
+= bytes_recovered
;
732 pull_info
.stat
.num_objects_recovered
++;
733 pg
.get_recovery_handler()->on_local_recover(
734 push_op
.soid
, get_recovering(push_op
.soid
).pull_info
->recovery_info
,
738 response
->soid
= push_op
.soid
;
739 response
->recovery_info
= pull_info
.recovery_info
;
740 response
->recovery_progress
= pull_info
.recovery_progress
;
747 RecoveryBackend::interruptible_future
<>
748 ReplicatedRecoveryBackend::handle_pull_response(
751 if (pg
.can_discard_replica_op(*m
)) {
752 logger().debug("{}: discarding {}", __func__
, *m
);
753 return seastar::now();
755 const PushOp
& push_op
= m
->pushes
[0]; //TODO: only one push per message for now.
756 if (push_op
.version
== eversion_t()) {
757 // replica doesn't have it!
758 pg
.get_recovery_handler()->on_failed_recover({ m
->from
}, push_op
.soid
,
759 get_recovering(push_op
.soid
).pull_info
->recovery_info
.version
);
760 return seastar::make_exception_future
<>(
761 std::runtime_error(fmt::format(
762 "Error on pushing side {} when pulling obj {}",
763 m
->from
, push_op
.soid
)));
766 logger().debug("{}: {}", __func__
, *m
);
767 return seastar::do_with(PullOp(), [this, m
](auto& response
) {
768 return seastar::do_with(ceph::os::Transaction(), m
.get(),
769 [this, &response
](auto& t
, auto& m
) {
770 pg_shard_t from
= m
->from
;
771 PushOp
& push_op
= m
->pushes
[0]; // only one push per message for now
772 return _handle_pull_response(from
, push_op
, &response
, &t
773 ).then_interruptible(
774 [this, &t
](bool complete
) {
775 epoch_t epoch_frozen
= pg
.get_osdmap_epoch();
776 logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
777 return shard_services
.get_store().do_transaction(coll
, std::move(t
))
778 .then([this, epoch_frozen
, complete
,
779 last_complete
= pg
.get_info().last_complete
] {
780 pg
.get_recovery_handler()->_committed_pushed_object(epoch_frozen
, last_complete
);
781 return seastar::make_ready_future
<bool>(complete
);
784 }).then_interruptible([this, m
, &response
](bool complete
) {
786 auto& push_op
= m
->pushes
[0];
787 get_recovering(push_op
.soid
).set_pulled();
788 return seastar::make_ready_future
<>();
790 auto reply
= crimson::make_message
<MOSDPGPull
>();
791 reply
->from
= pg
.get_pg_whoami();
792 reply
->set_priority(m
->get_priority());
793 reply
->pgid
= pg
.get_info().pgid
;
794 reply
->map_epoch
= m
->map_epoch
;
795 reply
->min_epoch
= m
->min_epoch
;
796 reply
->set_pulls({std::move(response
)});
797 return shard_services
.send_to_osd(m
->from
.osd
, std::move(reply
), pg
.get_osdmap_epoch());
803 RecoveryBackend::interruptible_future
<>
804 ReplicatedRecoveryBackend::_handle_push(
807 PushReplyOp
*response
,
808 ceph::os::Transaction
*t
)
810 logger().debug("{}", __func__
);
812 bool first
= push_op
.before_progress
.first
;
813 interval_set
<uint64_t> data_zeros
;
815 uint64_t offset
= push_op
.before_progress
.data_recovered_to
;
816 uint64_t length
= (push_op
.after_progress
.data_recovered_to
-
817 push_op
.before_progress
.data_recovered_to
);
819 data_zeros
.insert(offset
, length
);
822 bool complete
= (push_op
.after_progress
.data_complete
&&
823 push_op
.after_progress
.omap_complete
);
824 bool clear_omap
= !push_op
.before_progress
.omap_complete
;
825 response
->soid
= push_op
.recovery_info
.soid
;
827 return submit_push_data(push_op
.recovery_info
, first
, complete
, clear_omap
,
828 std::move(data_zeros
),
829 std::move(push_op
.data_included
),
830 std::move(push_op
.data
),
831 std::move(push_op
.omap_header
),
833 std::move(push_op
.omap_entries
), t
)
835 [this, complete
, &push_op
, t
] {
837 pg
.get_recovery_handler()->on_local_recover(
838 push_op
.recovery_info
.soid
, push_op
.recovery_info
,
844 RecoveryBackend::interruptible_future
<>
845 ReplicatedRecoveryBackend::handle_push(
848 if (pg
.can_discard_replica_op(*m
)) {
849 logger().debug("{}: discarding {}", __func__
, *m
);
850 return seastar::now();
852 if (pg
.is_primary()) {
853 return handle_pull_response(m
);
856 logger().debug("{}: {}", __func__
, *m
);
857 return seastar::do_with(PushReplyOp(), [this, m
](auto& response
) {
858 PushOp
& push_op
= m
->pushes
[0]; // TODO: only one push per message for now
859 return seastar::do_with(ceph::os::Transaction(),
860 [this, m
, &push_op
, &response
](auto& t
) {
861 return _handle_push(m
->from
, push_op
, &response
, &t
).then_interruptible(
863 epoch_t epoch_frozen
= pg
.get_osdmap_epoch();
864 logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
865 return interruptor::make_interruptible(
866 shard_services
.get_store().do_transaction(coll
, std::move(t
))).then_interruptible(
867 [this, epoch_frozen
, last_complete
= pg
.get_info().last_complete
] {
868 //TODO: this should be grouped with pg.on_local_recover somehow.
869 pg
.get_recovery_handler()->_committed_pushed_object(epoch_frozen
, last_complete
);
872 }).then_interruptible([this, m
, &response
]() mutable {
873 auto reply
= crimson::make_message
<MOSDPGPushReply
>();
874 reply
->from
= pg
.get_pg_whoami();
875 reply
->set_priority(m
->get_priority());
876 reply
->pgid
= pg
.get_info().pgid
;
877 reply
->map_epoch
= m
->map_epoch
;
878 reply
->min_epoch
= m
->min_epoch
;
879 std::vector
<PushReplyOp
> replies
= { std::move(response
) };
880 reply
->replies
.swap(replies
);
881 return shard_services
.send_to_osd(m
->from
.osd
,
882 std::move(reply
), pg
.get_osdmap_epoch());
887 RecoveryBackend::interruptible_future
<std::optional
<PushOp
>>
888 ReplicatedRecoveryBackend::_handle_push_reply(
890 const PushReplyOp
&op
)
892 const hobject_t
& soid
= op
.soid
;
893 logger().debug("{}, soid {}, from {}", __func__
, soid
, peer
);
894 auto recovering_iter
= recovering
.find(soid
);
895 if (recovering_iter
== recovering
.end()
896 || !recovering_iter
->second
->pushing
.count(peer
)) {
897 logger().debug("huh, i wasn't pushing {} to osd.{}", soid
, peer
);
898 return seastar::make_ready_future
<std::optional
<PushOp
>>();
900 auto& push_info
= recovering_iter
->second
->pushing
[peer
];
901 bool error
= push_info
.recovery_progress
.error
;
902 if (!push_info
.recovery_progress
.data_complete
&& !error
) {
903 return build_push_op(push_info
.recovery_info
, push_info
.recovery_progress
,
905 ).then_interruptible([&push_info
] (auto push_op
) {
906 push_info
.recovery_progress
= push_op
.after_progress
;
907 return seastar::make_ready_future
<std::optional
<PushOp
>>(
909 }).handle_exception_interruptible(
910 [recovering_iter
, &push_info
, peer
] (auto e
) {
911 push_info
.recovery_progress
.error
= true;
912 recovering_iter
->second
->set_push_failed(peer
, e
);
913 return seastar::make_ready_future
<std::optional
<PushOp
>>();
917 pg
.get_recovery_handler()->on_peer_recover(peer
,
919 push_info
.recovery_info
);
921 recovering_iter
->second
->set_pushed(peer
);
922 return seastar::make_ready_future
<std::optional
<PushOp
>>();
926 RecoveryBackend::interruptible_future
<>
927 ReplicatedRecoveryBackend::handle_push_reply(
928 Ref
<MOSDPGPushReply
> m
)
930 logger().debug("{}: {}", __func__
, *m
);
932 auto& push_reply
= m
->replies
[0]; //TODO: only one reply per message
934 return _handle_push_reply(from
, push_reply
).then_interruptible(
935 [this, from
](std::optional
<PushOp
> push_op
) {
937 auto msg
= crimson::make_message
<MOSDPGPush
>();
938 msg
->from
= pg
.get_pg_whoami();
939 msg
->pgid
= pg
.get_pgid();
940 msg
->map_epoch
= pg
.get_osdmap_epoch();
941 msg
->min_epoch
= pg
.get_last_peering_reset();
942 msg
->set_priority(pg
.get_recovery_op_priority());
943 msg
->pushes
.push_back(std::move(*push_op
));
944 return shard_services
.send_to_osd(from
.osd
,
946 pg
.get_osdmap_epoch());
948 return seastar::make_ready_future
<>();
953 std::pair
<interval_set
<uint64_t>,
955 ReplicatedRecoveryBackend::trim_pushed_data(
956 const interval_set
<uint64_t> ©_subset
,
957 const interval_set
<uint64_t> &intervals_received
,
958 ceph::bufferlist data_received
)
960 logger().debug("{}", __func__
);
961 // what i have is only a subset of what i want
962 if (intervals_received
.subset_of(copy_subset
)) {
963 return {intervals_received
, data_received
};
965 // only collect the extents included by copy_subset and intervals_received
966 interval_set
<uint64_t> intervals_usable
;
967 bufferlist data_usable
;
968 intervals_usable
.intersection_of(copy_subset
, intervals_received
);
969 uint64_t have_off
= 0;
970 for (auto [have_start
, have_len
] : intervals_received
) {
971 interval_set
<uint64_t> want
;
972 want
.insert(have_start
, have_len
);
973 want
.intersection_of(copy_subset
);
974 for (auto [want_start
, want_len
] : want
) {
976 uint64_t data_off
= have_off
+ (want_start
- have_start
);
977 sub
.substr_of(data_received
, data_off
, want_len
);
978 data_usable
.claim_append(sub
);
980 have_off
+= have_len
;
982 return {intervals_usable
, data_usable
};
985 RecoveryBackend::interruptible_future
<hobject_t
>
986 ReplicatedRecoveryBackend::prep_push_target(
987 const ObjectRecoveryInfo
& recovery_info
,
991 ObjectStore::Transaction
* t
,
992 const map
<string
, bufferlist
, less
<>>& attrs
,
993 bufferlist
&& omap_header
)
996 return seastar::make_ready_future
<hobject_t
>(
997 get_temp_recovery_object(recovery_info
.soid
,
998 recovery_info
.version
));
1001 ghobject_t target_oid
;
1003 // overwrite the original object
1004 target_oid
= ghobject_t(recovery_info
.soid
);
1006 target_oid
= ghobject_t(get_temp_recovery_object(recovery_info
.soid
,
1007 recovery_info
.version
));
1008 logger().debug("{}: Adding oid {} in the temp collection",
1009 __func__
, target_oid
);
1010 add_temp_obj(target_oid
.hobj
);
1012 // create a new object
1013 if (!complete
|| !recovery_info
.object_exist
) {
1014 t
->remove(coll
->get_cid(), target_oid
);
1015 t
->touch(coll
->get_cid(), target_oid
);
1017 oi
.decode_no_oid(attrs
.at(OI_ATTR
));
1018 t
->set_alloc_hint(coll
->get_cid(), target_oid
,
1019 oi
.expected_object_size
,
1020 oi
.expected_write_size
,
1021 oi
.alloc_hint_flags
);
1024 // remove xattr and update later if overwrite on original object
1025 t
->rmattrs(coll
->get_cid(), target_oid
);
1026 // if need update omap, clear the previous content first
1028 t
->omap_clear(coll
->get_cid(), target_oid
);
1031 t
->truncate(coll
->get_cid(), target_oid
, recovery_info
.size
);
1032 if (omap_header
.length()) {
1033 t
->omap_setheader(coll
->get_cid(), target_oid
, omap_header
);
1035 if (complete
|| !recovery_info
.object_exist
) {
1036 return seastar::make_ready_future
<hobject_t
>(target_oid
.hobj
);
1038 // clone overlap content in local object if using a new object
1039 return interruptor::make_interruptible(store
->stat(coll
, ghobject_t(recovery_info
.soid
)))
1040 .then_interruptible(
1041 [this, &recovery_info
, t
, target_oid
] (auto st
) {
1042 // TODO: pg num bytes counting
1043 uint64_t local_size
= std::min(recovery_info
.size
, (uint64_t)st
.st_size
);
1044 interval_set
<uint64_t> local_intervals_included
, local_intervals_excluded
;
1046 local_intervals_included
.insert(0, local_size
);
1047 local_intervals_excluded
.intersection_of(local_intervals_included
, recovery_info
.copy_subset
);
1048 local_intervals_included
.subtract(local_intervals_excluded
);
1050 for (auto [off
, len
] : local_intervals_included
) {
1051 logger().debug(" clone_range {} {}~{}",
1052 recovery_info
.soid
, off
, len
);
1053 t
->clone_range(coll
->get_cid(), ghobject_t(recovery_info
.soid
),
1054 target_oid
, off
, len
, off
);
1056 return seastar::make_ready_future
<hobject_t
>(target_oid
.hobj
);
1059 RecoveryBackend::interruptible_future
<>
1060 ReplicatedRecoveryBackend::submit_push_data(
1061 const ObjectRecoveryInfo
&recovery_info
,
1065 interval_set
<uint64_t>&& data_zeros
,
1066 interval_set
<uint64_t>&& intervals_included
,
1067 bufferlist
&& data_included
,
1068 bufferlist
&& omap_header
,
1069 const map
<string
, bufferlist
, less
<>> &attrs
,
1070 map
<string
, bufferlist
>&& omap_entries
,
1071 ObjectStore::Transaction
*t
)
1073 logger().debug("{}", __func__
);
1074 return prep_push_target(recovery_info
, first
, complete
,
1075 clear_omap
, t
, attrs
,
1076 std::move(omap_header
)).then_interruptible(
1080 data_zeros
=std::move(data_zeros
),
1081 intervals_included
=std::move(intervals_included
),
1082 data_included
=std::move(data_included
),
1083 omap_entries
=std::move(omap_entries
),
1084 &attrs
](auto target_oid
) mutable {
1086 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1087 // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
1088 if (!data_zeros
.empty()) {
1089 data_zeros
.intersection_of(recovery_info
.copy_subset
);
1090 assert(intervals_included
.subset_of(data_zeros
));
1091 data_zeros
.subtract(intervals_included
);
1093 logger().debug("submit_push_data recovering object {} copy_subset: {} "
1094 "intervals_included: {} data_zeros: {}",
1095 recovery_info
.soid
, recovery_info
.copy_subset
,
1096 intervals_included
, data_zeros
);
1098 for (auto [start
, len
] : data_zeros
) {
1099 t
->zero(coll
->get_cid(), ghobject_t(target_oid
), start
, len
);
1103 for (auto [start
, len
] : intervals_included
) {
1105 bit
.substr_of(data_included
, off
, len
);
1106 t
->write(coll
->get_cid(), ghobject_t(target_oid
),
1107 start
, len
, bit
, fadvise_flags
);
1111 if (!omap_entries
.empty())
1112 t
->omap_setkeys(coll
->get_cid(), ghobject_t(target_oid
), omap_entries
);
1114 t
->setattrs(coll
->get_cid(), ghobject_t(target_oid
), attrs
);
1118 logger().debug("submit_push_data: Removing oid {} from the temp collection",
1120 clear_temp_obj(target_oid
);
1121 t
->remove(coll
->get_cid(), ghobject_t(recovery_info
.soid
));
1122 t
->collection_move_rename(coll
->get_cid(), ghobject_t(target_oid
),
1123 coll
->get_cid(), ghobject_t(recovery_info
.soid
));
1125 submit_push_complete(recovery_info
, t
);
1127 logger().debug("submit_push_data: done");
1128 return seastar::make_ready_future
<>();
1132 void ReplicatedRecoveryBackend::submit_push_complete(
1133 const ObjectRecoveryInfo
&recovery_info
,
1134 ObjectStore::Transaction
*t
)
1136 for (const auto& [oid
, extents
] : recovery_info
.clone_subset
) {
1137 for (const auto& [off
, len
] : extents
) {
1138 logger().debug(" clone_range {} {}~{}", oid
, off
, len
);
1139 t
->clone_range(coll
->get_cid(), ghobject_t(oid
), ghobject_t(recovery_info
.soid
),
1145 RecoveryBackend::interruptible_future
<>
1146 ReplicatedRecoveryBackend::handle_recovery_delete_reply(
1147 Ref
<MOSDPGRecoveryDeleteReply
> m
)
1149 auto& p
= m
->objects
.front();
1150 hobject_t soid
= p
.first
;
1151 ObjectRecoveryInfo recovery_info
;
1152 recovery_info
.version
= p
.second
;
1153 pg
.get_recovery_handler()->on_peer_recover(m
->from
, soid
, recovery_info
);
1154 get_recovering(soid
).set_pushed(m
->from
);
1155 return seastar::now();
1158 RecoveryBackend::interruptible_future
<>
1159 ReplicatedRecoveryBackend::handle_recovery_op(
1160 Ref
<MOSDFastDispatchOp
> m
,
1161 crimson::net::ConnectionRef conn
)
1163 switch (m
->get_header().type
) {
1164 case MSG_OSD_PG_PULL
:
1165 return handle_pull(boost::static_pointer_cast
<MOSDPGPull
>(m
));
1166 case MSG_OSD_PG_PUSH
:
1167 return handle_push(boost::static_pointer_cast
<MOSDPGPush
>(m
));
1168 case MSG_OSD_PG_PUSH_REPLY
:
1169 return handle_push_reply(
1170 boost::static_pointer_cast
<MOSDPGPushReply
>(m
));
1171 case MSG_OSD_PG_RECOVERY_DELETE
:
1172 return handle_recovery_delete(
1173 boost::static_pointer_cast
<MOSDPGRecoveryDelete
>(m
));
1174 case MSG_OSD_PG_RECOVERY_DELETE_REPLY
:
1175 return handle_recovery_delete_reply(
1176 boost::static_pointer_cast
<MOSDPGRecoveryDeleteReply
>(m
));
1178 // delegate to parent class for handling backend-agnostic recovery ops.
1179 return RecoveryBackend::handle_recovery_op(std::move(m
), conn
);