]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/replicated_recovery_backend.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / replicated_recovery_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #include <fmt/format.h>
5 #include <fmt/ostream.h>
6 #include <seastar/core/future.hh>
7 #include <seastar/core/do_with.hh>
8
9 #include "crimson/osd/pg.h"
10 #include "crimson/osd/pg_backend.h"
11 #include "osd/osd_types_fmt.h"
12 #include "replicated_recovery_backend.h"
13 #include "msg/Message.h"
14
15 namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_osd);
18 }
19 }
20
21 using std::less;
22 using std::map;
23 using std::string;
24
25 RecoveryBackend::interruptible_future<>
26 ReplicatedRecoveryBackend::recover_object(
27 const hobject_t& soid,
28 eversion_t need)
29 {
30 logger().debug("{}: {}, {}", __func__, soid, need);
31 // always add_recovering(soid) before recover_object(soid)
32 assert(is_recovering(soid));
33 // start tracking the recovery of soid
34 return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
35 logger().debug("recover_object: loading obc: {}", soid);
36 return pg.obc_loader.with_obc<RWState::RWREAD>(soid,
37 [this, soid, need](auto obc) {
38 logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
39 auto& recovery_waiter = get_recovering(soid);
40 recovery_waiter.obc = obc;
41 recovery_waiter.obc->wait_recovery_read();
42 return maybe_push_shards(soid, need);
43 }).handle_error_interruptible(
44 crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
45 // TODO: may need eio handling?
46 logger().error("recover_object saw error code {}, ignoring object {}",
47 code, soid);
48 }));
49 });
50 }
51
52 RecoveryBackend::interruptible_future<>
53 ReplicatedRecoveryBackend::maybe_push_shards(
54 const hobject_t& soid,
55 eversion_t need)
56 {
57 return seastar::do_with(
58 get_shards_to_push(soid),
59 [this, need, soid](auto &shards) {
60 return interruptor::parallel_for_each(
61 shards,
62 [this, need, soid](auto shard) {
63 return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
64 auto msg = crimson::make_message<MOSDPGPush>();
65 msg->from = pg.get_pg_whoami();
66 msg->pgid = pg.get_pgid();
67 msg->map_epoch = pg.get_osdmap_epoch();
68 msg->min_epoch = pg.get_last_peering_reset();
69 msg->pushes.push_back(std::move(push));
70 msg->set_priority(pg.get_recovery_op_priority());
71 return interruptor::make_interruptible(
72 shard_services.send_to_osd(shard.osd,
73 std::move(msg),
74 pg.get_osdmap_epoch()))
75 .then_interruptible(
76 [this, soid, shard] {
77 return get_recovering(soid).wait_for_pushes(shard);
78 });
79 });
80 });
81 }).then_interruptible([this, soid] {
82 auto &recovery = get_recovering(soid);
83 if (auto push_info = recovery.pushing.begin();
84 push_info != recovery.pushing.end()) {
85 pg.get_recovery_handler()->on_global_recover(soid,
86 push_info->second.stat,
87 false);
88 } else if (recovery.pull_info) {
89 // no push happened (empty get_shards_to_push()) but pull actually did
90 pg.get_recovery_handler()->on_global_recover(soid,
91 recovery.pull_info->stat,
92 false);
93 } else {
94 // no pulls, no pushes
95 }
96 return seastar::make_ready_future<>();
97 }).handle_exception_interruptible([this, soid](auto e) {
98 auto &recovery = get_recovering(soid);
99 if (recovery.obc) {
100 recovery.obc->drop_recovery_read();
101 }
102 recovering.erase(soid);
103 return seastar::make_exception_future<>(e);
104 });
105 }
106
107 RecoveryBackend::interruptible_future<>
108 ReplicatedRecoveryBackend::maybe_pull_missing_obj(
109 const hobject_t& soid,
110 eversion_t need)
111 {
112 pg_missing_tracker_t local_missing = pg.get_local_missing();
113 if (!local_missing.is_missing(soid)) {
114 return seastar::make_ready_future<>();
115 }
116 PullOp pull_op;
117 auto& recovery_waiter = get_recovering(soid);
118 recovery_waiter.pull_info =
119 std::make_optional<RecoveryBackend::pull_info_t>();
120 auto& pull_info = *recovery_waiter.pull_info;
121 prepare_pull(pull_op, pull_info, soid, need);
122 auto msg = crimson::make_message<MOSDPGPull>();
123 msg->from = pg.get_pg_whoami();
124 msg->set_priority(pg.get_recovery_op_priority());
125 msg->pgid = pg.get_pgid();
126 msg->map_epoch = pg.get_osdmap_epoch();
127 msg->min_epoch = pg.get_last_peering_reset();
128 msg->set_pulls({std::move(pull_op)});
129 return interruptor::make_interruptible(
130 shard_services.send_to_osd(
131 pull_info.from.osd,
132 std::move(msg),
133 pg.get_osdmap_epoch()
134 )).then_interruptible([&recovery_waiter] {
135 return recovery_waiter.wait_for_pull();
136 });
137 }
138
139 RecoveryBackend::interruptible_future<>
140 ReplicatedRecoveryBackend::push_delete(
141 const hobject_t& soid,
142 eversion_t need)
143 {
144 logger().debug("{}: {}, {}", __func__, soid, need);
145 epoch_t min_epoch = pg.get_last_peering_reset();
146
147 assert(pg.get_acting_recovery_backfill().size() > 0);
148 return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(),
149 [this, soid, need, min_epoch](pg_shard_t shard)
150 -> interruptible_future<> {
151 if (shard == pg.get_pg_whoami())
152 return seastar::make_ready_future<>();
153 auto iter = pg.get_shard_missing().find(shard);
154 if (iter == pg.get_shard_missing().end())
155 return seastar::make_ready_future<>();
156 if (iter->second.is_missing(soid)) {
157 logger().debug("push_delete: will remove {} from {}", soid, shard);
158 pg.begin_peer_recover(shard, soid);
159 spg_t target_pg(pg.get_info().pgid.pgid, shard.shard);
160 auto msg = crimson::make_message<MOSDPGRecoveryDelete>(
161 pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch);
162 msg->set_priority(pg.get_recovery_op_priority());
163 msg->objects.push_back(std::make_pair(soid, need));
164 return interruptor::make_interruptible(
165 shard_services.send_to_osd(shard.osd, std::move(msg),
166 pg.get_osdmap_epoch())).then_interruptible(
167 [this, soid, shard] {
168 return get_recovering(soid).wait_for_pushes(shard);
169 });
170 }
171 return seastar::make_ready_future<>();
172 });
173 }
174
175 RecoveryBackend::interruptible_future<>
176 ReplicatedRecoveryBackend::handle_recovery_delete(
177 Ref<MOSDPGRecoveryDelete> m)
178 {
179 logger().debug("{}: {}", __func__, *m);
180
181 auto& p = m->objects.front(); //TODO: only one delete per message for now.
182 return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch())
183 .then_interruptible(
184 [this, m] {
185 auto reply = crimson::make_message<MOSDPGRecoveryDeleteReply>();
186 reply->from = pg.get_pg_whoami();
187 reply->set_priority(m->get_priority());
188 reply->pgid = spg_t(pg.get_info().pgid.pgid, m->from.shard);
189 reply->map_epoch = m->map_epoch;
190 reply->min_epoch = m->min_epoch;
191 reply->objects = m->objects;
192 return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch());
193 });
194 }
195
196 RecoveryBackend::interruptible_future<>
197 ReplicatedRecoveryBackend::on_local_recover_persist(
198 const hobject_t& soid,
199 const ObjectRecoveryInfo& _recovery_info,
200 bool is_delete,
201 epoch_t epoch_frozen)
202 {
203 logger().debug("{}", __func__);
204 ceph::os::Transaction t;
205 pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t);
206 logger().debug("ReplicatedRecoveryBackend::on_local_recover_persist: do_transaction...");
207 return interruptor::make_interruptible(
208 shard_services.get_store().do_transaction(coll, std::move(t)))
209 .then_interruptible(
210 [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
211 pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
212 return seastar::make_ready_future<>();
213 });
214 }
215
216 RecoveryBackend::interruptible_future<>
217 ReplicatedRecoveryBackend::local_recover_delete(
218 const hobject_t& soid,
219 eversion_t need,
220 epoch_t epoch_to_freeze)
221 {
222 logger().debug("{}: {}, {}", __func__, soid, need);
223 return backend->load_metadata(soid).safe_then_interruptible([this]
224 (auto lomt) -> interruptible_future<> {
225 if (lomt->os.exists) {
226 return seastar::do_with(ceph::os::Transaction(),
227 [this, lomt = std::move(lomt)](auto& txn) {
228 return backend->remove(lomt->os, txn).then_interruptible(
229 [this, &txn]() mutable {
230 logger().debug("ReplicatedRecoveryBackend::local_recover_delete: do_transaction...");
231 return shard_services.get_store().do_transaction(coll,
232 std::move(txn));
233 });
234 });
235 }
236 return seastar::make_ready_future<>();
237 }).safe_then_interruptible([this, soid, epoch_to_freeze, need] {
238 ObjectRecoveryInfo recovery_info;
239 recovery_info.soid = soid;
240 recovery_info.version = need;
241 return on_local_recover_persist(soid, recovery_info,
242 true, epoch_to_freeze);
243 }, PGBackend::load_metadata_ertr::all_same_way(
244 [this, soid, epoch_to_freeze, need] (auto e) {
245 ObjectRecoveryInfo recovery_info;
246 recovery_info.soid = soid;
247 recovery_info.version = need;
248 return on_local_recover_persist(soid, recovery_info,
249 true, epoch_to_freeze);
250 })
251 );
252 }
253
254 RecoveryBackend::interruptible_future<>
255 ReplicatedRecoveryBackend::recover_delete(
256 const hobject_t &soid, eversion_t need)
257 {
258 logger().debug("{}: {}, {}", __func__, soid, need);
259
260 epoch_t cur_epoch = pg.get_osdmap_epoch();
261 return seastar::do_with(object_stat_sum_t(),
262 [this, soid, need, cur_epoch](auto& stat_diff) {
263 return local_recover_delete(soid, need, cur_epoch).then_interruptible(
264 [this, &stat_diff, cur_epoch, soid, need]()
265 -> interruptible_future<> {
266 if (!pg.has_reset_since(cur_epoch)) {
267 bool object_missing = false;
268 for (const auto& shard : pg.get_acting_recovery_backfill()) {
269 if (shard == pg.get_pg_whoami())
270 continue;
271 if (pg.get_shard_missing(shard)->is_missing(soid)) {
272 logger().debug("recover_delete: soid {} needs to deleted from replca {}",
273 soid, shard);
274 object_missing = true;
275 break;
276 }
277 }
278
279 if (!object_missing) {
280 stat_diff.num_objects_recovered = 1;
281 return seastar::make_ready_future<>();
282 } else {
283 return push_delete(soid, need);
284 }
285 }
286 return seastar::make_ready_future<>();
287 }).then_interruptible([this, soid, &stat_diff] {
288 pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true);
289 return seastar::make_ready_future<>();
290 });
291 });
292 }
293
294 RecoveryBackend::interruptible_future<PushOp>
295 ReplicatedRecoveryBackend::prep_push(
296 const hobject_t& soid,
297 eversion_t need,
298 pg_shard_t pg_shard)
299 {
300 logger().debug("{}: {}, {}", __func__, soid, need);
301
302 auto& recovery_waiter = get_recovering(soid);
303 auto& obc = recovery_waiter.obc;
304 interval_set<uint64_t> data_subset;
305 if (obc->obs.oi.size) {
306 data_subset.insert(0, obc->obs.oi.size);
307 }
308 const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
309 const auto it = missing.get_items().find(soid);
310 assert(it != missing.get_items().end());
311 data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
312 logger().debug("prep_push: {} data_subset {} to {}",
313 soid, data_subset, pg_shard);
314
315 auto& push_info = recovery_waiter.pushing[pg_shard];
316 pg.begin_peer_recover(pg_shard, soid);
317 const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
318 const auto missing_iter = pmissing_iter->second.get_items().find(soid);
319 assert(missing_iter != pmissing_iter->second.get_items().end());
320
321 push_info.obc = obc;
322 push_info.recovery_info.size = obc->obs.oi.size;
323 push_info.recovery_info.copy_subset = data_subset;
324 push_info.recovery_info.soid = soid;
325 push_info.recovery_info.oi = obc->obs.oi;
326 push_info.recovery_info.version = obc->obs.oi.version;
327 push_info.recovery_info.object_exist =
328 missing_iter->second.clean_regions.object_is_exist();
329 push_info.recovery_progress.omap_complete =
330 !missing_iter->second.clean_regions.omap_is_dirty();
331
332 return build_push_op(push_info.recovery_info,
333 push_info.recovery_progress,
334 &push_info.stat).then_interruptible(
335 [this, soid, pg_shard](auto push_op) {
336 auto& recovery_waiter = get_recovering(soid);
337 auto& push_info = recovery_waiter.pushing[pg_shard];
338 push_info.recovery_progress = push_op.after_progress;
339 return push_op;
340 });
341 }
342
343 void ReplicatedRecoveryBackend::prepare_pull(PullOp& pull_op,
344 pull_info_t& pull_info,
345 const hobject_t& soid,
346 eversion_t need) {
347 logger().debug("{}: {}, {}", __func__, soid, need);
348
349 pg_missing_tracker_t local_missing = pg.get_local_missing();
350 const auto missing_iter = local_missing.get_items().find(soid);
351 auto m = pg.get_missing_loc_shards();
352 pg_shard_t fromshard = *(m[soid].begin());
353
354 //TODO: skipped snap objects case for now
355 pull_op.recovery_info.copy_subset.insert(0, (uint64_t) -1);
356 pull_op.recovery_info.copy_subset.intersection_of(
357 missing_iter->second.clean_regions.get_dirty_regions());
358 pull_op.recovery_info.size = ((uint64_t) -1);
359 pull_op.recovery_info.object_exist =
360 missing_iter->second.clean_regions.object_is_exist();
361 pull_op.recovery_info.soid = soid;
362 pull_op.soid = soid;
363 pull_op.recovery_progress.data_complete = false;
364 pull_op.recovery_progress.omap_complete =
365 !missing_iter->second.clean_regions.omap_is_dirty();
366 pull_op.recovery_progress.data_recovered_to = 0;
367 pull_op.recovery_progress.first = true;
368
369 pull_info.from = fromshard;
370 pull_info.soid = soid;
371 pull_info.recovery_info = pull_op.recovery_info;
372 pull_info.recovery_progress = pull_op.recovery_progress;
373 }
374
375 RecoveryBackend::interruptible_future<PushOp>
376 ReplicatedRecoveryBackend::build_push_op(
377 const ObjectRecoveryInfo& recovery_info,
378 const ObjectRecoveryProgress& progress,
379 object_stat_sum_t* stat)
380 {
381 logger().debug("{} {} @{}",
382 __func__, recovery_info.soid, recovery_info.version);
383 return seastar::do_with(ObjectRecoveryProgress(progress),
384 uint64_t(crimson::common::local_conf()
385 ->osd_recovery_max_chunk),
386 recovery_info.version,
387 PushOp(),
388 [this, &recovery_info, &progress, stat]
389 (auto& new_progress, auto& available, auto& v, auto& push_op) {
390 return read_metadata_for_push_op(recovery_info.soid,
391 progress, new_progress,
392 v, &push_op
393 ).then_interruptible([&](eversion_t local_ver) mutable {
394 // If requestor didn't know the version, use ours
395 if (v == eversion_t()) {
396 v = local_ver;
397 } else if (v != local_ver) {
398 logger().error("build_push_op: {} push {} v{} failed because local copy is {}",
399 pg.get_pgid(), recovery_info.soid, recovery_info.version, local_ver);
400 // TODO: bail out
401 }
402 return read_omap_for_push_op(recovery_info.soid,
403 progress,
404 new_progress,
405 available, &push_op);
406 }).then_interruptible([this, &recovery_info, &progress,
407 &available, &push_op]() mutable {
408 logger().debug("build_push_op: available: {}, copy_subset: {}",
409 available, recovery_info.copy_subset);
410 return read_object_for_push_op(recovery_info.soid,
411 recovery_info.copy_subset,
412 progress.data_recovered_to,
413 available, &push_op);
414 }).then_interruptible([&recovery_info, &v, &progress,
415 &new_progress, stat, &push_op]
416 (uint64_t recovered_to) mutable {
417 new_progress.data_recovered_to = recovered_to;
418 if (new_progress.is_complete(recovery_info)) {
419 new_progress.data_complete = true;
420 if (stat)
421 stat->num_objects_recovered++;
422 } else if (progress.first && progress.omap_complete) {
423 // If omap is not changed, we need recovery omap
424 // when recovery cannot be completed once
425 new_progress.omap_complete = false;
426 }
427 if (stat) {
428 stat->num_keys_recovered += push_op.omap_entries.size();
429 stat->num_bytes_recovered += push_op.data.length();
430 }
431 push_op.version = v;
432 push_op.soid = recovery_info.soid;
433 push_op.recovery_info = recovery_info;
434 push_op.after_progress = new_progress;
435 push_op.before_progress = progress;
436 logger().debug("build_push_op: push_op version:"
437 " {}, push_op data length: {}",
438 push_op.version, push_op.data.length());
439 return seastar::make_ready_future<PushOp>(std::move(push_op));
440 });
441 });
442 }
443
444 RecoveryBackend::interruptible_future<eversion_t>
445 ReplicatedRecoveryBackend::read_metadata_for_push_op(
446 const hobject_t& oid,
447 const ObjectRecoveryProgress& progress,
448 ObjectRecoveryProgress& new_progress,
449 eversion_t ver,
450 PushOp* push_op)
451 {
452 logger().debug("{}, {}", __func__, oid);
453 if (!progress.first) {
454 return seastar::make_ready_future<eversion_t>(ver);
455 }
456 return interruptor::make_interruptible(interruptor::when_all_succeed(
457 backend->omap_get_header(coll, ghobject_t(oid)).handle_error_interruptible<false>(
458 crimson::os::FuturizedStore::Shard::read_errorator::all_same_way(
459 [oid] (const std::error_code& e) {
460 logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid);
461 return seastar::make_ready_future<bufferlist>();
462 })),
463 interruptor::make_interruptible(store->get_attrs(coll, ghobject_t(oid)))
464 .handle_error_interruptible<false>(
465 crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way(
466 [oid] (const std::error_code& e) {
467 logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid);
468 return seastar::make_ready_future<crimson::os::FuturizedStore::Shard::attrs_t>();
469 }))
470 )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) {
471 if (bl.length() == 0) {
472 logger().warn("read_metadata_for_push_op: fail to read omap header");
473 } else if (attrs.empty()) {
474 logger().error("read_metadata_for_push_op: fail to read attrs");
475 return eversion_t{};
476 }
477 push_op->omap_header.claim_append(std::move(bl));
478 for (auto&& [key, val] : attrs) {
479 push_op->attrset.emplace(std::move(key), std::move(val));
480 }
481 logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]);
482 object_info_t oi;
483 oi.decode_no_oid(push_op->attrset[OI_ATTR]);
484 new_progress.first = false;
485 return oi.version;
486 });
487 }
488
489 RecoveryBackend::interruptible_future<uint64_t>
490 ReplicatedRecoveryBackend::read_object_for_push_op(
491 const hobject_t& oid,
492 const interval_set<uint64_t>& copy_subset,
493 uint64_t offset,
494 uint64_t max_len,
495 PushOp* push_op)
496 {
497 if (max_len == 0 || copy_subset.empty()) {
498 push_op->data_included.clear();
499 return seastar::make_ready_future<uint64_t>(offset);
500 }
501 // 1. get the extents in the interested range
502 return interruptor::make_interruptible(backend->fiemap(coll, ghobject_t{oid},
503 0, copy_subset.range_end())).safe_then_interruptible(
504 [=, this](auto&& fiemap_included) mutable {
505 interval_set<uint64_t> extents;
506 try {
507 extents.intersection_of(copy_subset, std::move(fiemap_included));
508 } catch (std::exception &) {
509 // if fiemap() fails, we will read nothing, as the intersection of
510 // copy_subset and an empty interval_set would be empty anyway
511 extents.clear();
512 }
513 // 2. we can read up to "max_len" bytes from "offset", so truncate the
514 // extents down to this quota. no need to return the number of consumed
515 // bytes, as this is the last consumer of this quota
516 push_op->data_included.span_of(extents, offset, max_len);
517 // 3. read the truncated extents
518 // TODO: check if the returned extents are pruned
519 return interruptor::make_interruptible(store->readv(coll, ghobject_t{oid},
520 push_op->data_included, 0));
521 }).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) {
522 push_op->data.claim_append(std::move(bl));
523 uint64_t recovered_to = 0;
524 if (push_op->data_included.empty()) {
525 // zero filled section, skip to end!
526 recovered_to = range_end;
527 } else {
528 // note down the progress, we will start from there next time
529 recovered_to = push_op->data_included.range_end();
530 }
531 return seastar::make_ready_future<uint64_t>(recovered_to);
532 }, PGBackend::read_errorator::all_same_way([](auto e) {
533 logger().debug("build_push_op: read exception");
534 return seastar::make_exception_future<uint64_t>(e);
535 }));
536 }
537
538 static std::optional<std::string> nullopt_if_empty(const std::string& s)
539 {
540 return s.empty() ? std::nullopt : std::make_optional(s);
541 }
542
543 static bool is_too_many_entries_per_chunk(const PushOp* push_op)
544 {
545 const uint64_t entries_per_chunk =
546 crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk;
547 if (!entries_per_chunk) {
548 // the limit is disabled
549 return false;
550 }
551 return push_op->omap_entries.size() >= entries_per_chunk;
552 }
553
554 RecoveryBackend::interruptible_future<>
555 ReplicatedRecoveryBackend::read_omap_for_push_op(
556 const hobject_t& oid,
557 const ObjectRecoveryProgress& progress,
558 ObjectRecoveryProgress& new_progress,
559 uint64_t& max_len,
560 PushOp* push_op)
561 {
562 if (progress.omap_complete) {
563 return seastar::make_ready_future<>();
564 }
565 return seastar::repeat([&new_progress, &max_len, push_op, &oid, this] {
566 return shard_services.get_store().omap_get_values(
567 coll, ghobject_t{oid}, nullopt_if_empty(new_progress.omap_recovered_to)
568 ).safe_then([&new_progress, &max_len, push_op](const auto& ret) {
569 const auto& [done, kvs] = ret;
570 bool stop = done;
571 // assuming "values.empty() only if done" holds here!
572 for (const auto& [key, value] : kvs) {
573 if (is_too_many_entries_per_chunk(push_op)) {
574 stop = true;
575 break;
576 }
577 if (const uint64_t entry_size = key.size() + value.length();
578 entry_size > max_len) {
579 stop = true;
580 break;
581 } else {
582 max_len -= std::min(max_len, entry_size);
583 }
584 push_op->omap_entries.emplace(key, value);
585 }
586 if (!push_op->omap_entries.empty()) {
587 // we iterate in order
588 new_progress.omap_recovered_to = std::rbegin(push_op->omap_entries)->first;
589 }
590 if (done) {
591 new_progress.omap_complete = true;
592 }
593 return seastar::make_ready_future<seastar::stop_iteration>(
594 stop ? seastar::stop_iteration::yes : seastar::stop_iteration::no
595 );
596 }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all{});
597 });
598 }
599
600 std::vector<pg_shard_t>
601 ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const
602 {
603 std::vector<pg_shard_t> shards;
604 assert(pg.get_acting_recovery_backfill().size() > 0);
605 for (const auto& peer : pg.get_acting_recovery_backfill()) {
606 if (peer == pg.get_pg_whoami())
607 continue;
608 auto shard_missing =
609 pg.get_shard_missing().find(peer);
610 assert(shard_missing != pg.get_shard_missing().end());
611 if (shard_missing->second.is_missing(soid)) {
612 shards.push_back(shard_missing->first);
613 }
614 }
615 return shards;
616 }
617
618 RecoveryBackend::interruptible_future<>
619 ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
620 {
621 logger().debug("{}: {}", __func__, *m);
622 if (pg.can_discard_replica_op(*m)) {
623 logger().debug("{}: discarding {}", __func__, *m);
624 return seastar::now();
625 }
626 return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) {
627 return interruptor::parallel_for_each(pulls,
628 [this, from](auto& pull_op) {
629 const hobject_t& soid = pull_op.soid;
630 logger().debug("handle_pull: {}", soid);
631 return backend->stat(coll, ghobject_t(soid)).then_interruptible(
632 [this, &pull_op](auto st) {
633 ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
634 ObjectRecoveryProgress &progress = pull_op.recovery_progress;
635 if (progress.first && recovery_info.size == ((uint64_t) -1)) {
636 // Adjust size and copy_subset
637 recovery_info.size = st.st_size;
638 if (st.st_size) {
639 interval_set<uint64_t> object_range;
640 object_range.insert(0, st.st_size);
641 recovery_info.copy_subset.intersection_of(object_range);
642 } else {
643 recovery_info.copy_subset.clear();
644 }
645 assert(recovery_info.clone_subset.empty());
646 }
647 return build_push_op(recovery_info, progress, 0);
648 }).then_interruptible([this, from](auto push_op) {
649 auto msg = crimson::make_message<MOSDPGPush>();
650 msg->from = pg.get_pg_whoami();
651 msg->pgid = pg.get_pgid();
652 msg->map_epoch = pg.get_osdmap_epoch();
653 msg->min_epoch = pg.get_last_peering_reset();
654 msg->set_priority(pg.get_recovery_op_priority());
655 msg->pushes.push_back(std::move(push_op));
656 return shard_services.send_to_osd(from.osd, std::move(msg),
657 pg.get_osdmap_epoch());
658 });
659 });
660 });
661 }
662
663 RecoveryBackend::interruptible_future<bool>
664 ReplicatedRecoveryBackend::_handle_pull_response(
665 pg_shard_t from,
666 PushOp& push_op,
667 PullOp* response,
668 ceph::os::Transaction* t)
669 {
670 logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}",
671 push_op.recovery_info, push_op.after_progress,
672 push_op.data.length(), push_op.data_included);
673
674 const hobject_t &hoid = push_op.soid;
675 auto& recovery_waiter = get_recovering(hoid);
676 auto& pull_info = *recovery_waiter.pull_info;
677 if (pull_info.recovery_info.size == (uint64_t(-1))) {
678 pull_info.recovery_info.size = push_op.recovery_info.size;
679 pull_info.recovery_info.copy_subset.intersection_of(
680 push_op.recovery_info.copy_subset);
681 }
682
683 // If primary doesn't have object info and didn't know version
684 if (pull_info.recovery_info.version == eversion_t())
685 pull_info.recovery_info.version = push_op.version;
686
687 auto prepare_waiter = interruptor::make_interruptible(
688 seastar::make_ready_future<>());
689 if (pull_info.recovery_progress.first) {
690 prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
691 pull_info.recovery_info.soid,
692 [&pull_info, &recovery_waiter, &push_op](auto obc) {
693 pull_info.obc = obc;
694 recovery_waiter.obc = obc;
695 obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR), push_op.soid);
696 pull_info.recovery_info.oi = obc->obs.oi;
697 return crimson::osd::PG::load_obc_ertr::now();
698 }).handle_error_interruptible(crimson::ct_error::assert_all{});
699 };
700 return prepare_waiter.then_interruptible(
701 [this, &pull_info, &push_op, t, response]() mutable {
702 const bool first = pull_info.recovery_progress.first;
703 pull_info.recovery_progress = push_op.after_progress;
704 logger().debug("new recovery_info {}, new progress {}",
705 pull_info.recovery_info, pull_info.recovery_progress);
706 interval_set<uint64_t> data_zeros;
707 {
708 uint64_t offset = push_op.before_progress.data_recovered_to;
709 uint64_t length = (push_op.after_progress.data_recovered_to -
710 push_op.before_progress.data_recovered_to);
711 if (length) {
712 data_zeros.insert(offset, length);
713 }
714 }
715 auto [usable_intervals, data] =
716 trim_pushed_data(pull_info.recovery_info.copy_subset,
717 push_op.data_included, push_op.data);
718 bool complete = pull_info.is_complete();
719 bool clear_omap = !push_op.before_progress.omap_complete;
720 return submit_push_data(pull_info.recovery_info,
721 first, complete, clear_omap,
722 std::move(data_zeros), std::move(usable_intervals),
723 std::move(data), std::move(push_op.omap_header),
724 push_op.attrset, std::move(push_op.omap_entries), t)
725 .then_interruptible(
726 [this, response, &pull_info, &push_op, complete,
727 t, bytes_recovered=data.length()] {
728 pull_info.stat.num_keys_recovered += push_op.omap_entries.size();
729 pull_info.stat.num_bytes_recovered += bytes_recovered;
730
731 if (complete) {
732 pull_info.stat.num_objects_recovered++;
733 pg.get_recovery_handler()->on_local_recover(
734 push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info,
735 false, *t);
736 return true;
737 } else {
738 response->soid = push_op.soid;
739 response->recovery_info = pull_info.recovery_info;
740 response->recovery_progress = pull_info.recovery_progress;
741 return false;
742 }
743 });
744 });
745 }
746
747 RecoveryBackend::interruptible_future<>
748 ReplicatedRecoveryBackend::handle_pull_response(
749 Ref<MOSDPGPush> m)
750 {
751 if (pg.can_discard_replica_op(*m)) {
752 logger().debug("{}: discarding {}", __func__, *m);
753 return seastar::now();
754 }
755 const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
756 if (push_op.version == eversion_t()) {
757 // replica doesn't have it!
758 pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid,
759 get_recovering(push_op.soid).pull_info->recovery_info.version);
760 return seastar::make_exception_future<>(
761 std::runtime_error(fmt::format(
762 "Error on pushing side {} when pulling obj {}",
763 m->from, push_op.soid)));
764 }
765
766 logger().debug("{}: {}", __func__, *m);
767 return seastar::do_with(PullOp(), [this, m](auto& response) {
768 return seastar::do_with(ceph::os::Transaction(), m.get(),
769 [this, &response](auto& t, auto& m) {
770 pg_shard_t from = m->from;
771 PushOp& push_op = m->pushes[0]; // only one push per message for now
772 return _handle_pull_response(from, push_op, &response, &t
773 ).then_interruptible(
774 [this, &t](bool complete) {
775 epoch_t epoch_frozen = pg.get_osdmap_epoch();
776 logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
777 return shard_services.get_store().do_transaction(coll, std::move(t))
778 .then([this, epoch_frozen, complete,
779 last_complete = pg.get_info().last_complete] {
780 pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
781 return seastar::make_ready_future<bool>(complete);
782 });
783 });
784 }).then_interruptible([this, m, &response](bool complete) {
785 if (complete) {
786 auto& push_op = m->pushes[0];
787 get_recovering(push_op.soid).set_pulled();
788 return seastar::make_ready_future<>();
789 } else {
790 auto reply = crimson::make_message<MOSDPGPull>();
791 reply->from = pg.get_pg_whoami();
792 reply->set_priority(m->get_priority());
793 reply->pgid = pg.get_info().pgid;
794 reply->map_epoch = m->map_epoch;
795 reply->min_epoch = m->min_epoch;
796 reply->set_pulls({std::move(response)});
797 return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch());
798 }
799 });
800 });
801 }
802
803 RecoveryBackend::interruptible_future<>
804 ReplicatedRecoveryBackend::_handle_push(
805 pg_shard_t from,
806 PushOp &push_op,
807 PushReplyOp *response,
808 ceph::os::Transaction *t)
809 {
810 logger().debug("{}", __func__);
811
812 bool first = push_op.before_progress.first;
813 interval_set<uint64_t> data_zeros;
814 {
815 uint64_t offset = push_op.before_progress.data_recovered_to;
816 uint64_t length = (push_op.after_progress.data_recovered_to -
817 push_op.before_progress.data_recovered_to);
818 if (length) {
819 data_zeros.insert(offset, length);
820 }
821 }
822 bool complete = (push_op.after_progress.data_complete &&
823 push_op.after_progress.omap_complete);
824 bool clear_omap = !push_op.before_progress.omap_complete;
825 response->soid = push_op.recovery_info.soid;
826
827 return submit_push_data(push_op.recovery_info, first, complete, clear_omap,
828 std::move(data_zeros),
829 std::move(push_op.data_included),
830 std::move(push_op.data),
831 std::move(push_op.omap_header),
832 push_op.attrset,
833 std::move(push_op.omap_entries), t)
834 .then_interruptible(
835 [this, complete, &push_op, t] {
836 if (complete) {
837 pg.get_recovery_handler()->on_local_recover(
838 push_op.recovery_info.soid, push_op.recovery_info,
839 false, *t);
840 }
841 });
842 }
843
844 RecoveryBackend::interruptible_future<>
845 ReplicatedRecoveryBackend::handle_push(
846 Ref<MOSDPGPush> m)
847 {
848 if (pg.can_discard_replica_op(*m)) {
849 logger().debug("{}: discarding {}", __func__, *m);
850 return seastar::now();
851 }
852 if (pg.is_primary()) {
853 return handle_pull_response(m);
854 }
855
856 logger().debug("{}: {}", __func__, *m);
857 return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
858 PushOp& push_op = m->pushes[0]; // TODO: only one push per message for now
859 return seastar::do_with(ceph::os::Transaction(),
860 [this, m, &push_op, &response](auto& t) {
861 return _handle_push(m->from, push_op, &response, &t).then_interruptible(
862 [this, &t] {
863 epoch_t epoch_frozen = pg.get_osdmap_epoch();
864 logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
865 return interruptor::make_interruptible(
866 shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible(
867 [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
868 //TODO: this should be grouped with pg.on_local_recover somehow.
869 pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
870 });
871 });
872 }).then_interruptible([this, m, &response]() mutable {
873 auto reply = crimson::make_message<MOSDPGPushReply>();
874 reply->from = pg.get_pg_whoami();
875 reply->set_priority(m->get_priority());
876 reply->pgid = pg.get_info().pgid;
877 reply->map_epoch = m->map_epoch;
878 reply->min_epoch = m->min_epoch;
879 std::vector<PushReplyOp> replies = { std::move(response) };
880 reply->replies.swap(replies);
881 return shard_services.send_to_osd(m->from.osd,
882 std::move(reply), pg.get_osdmap_epoch());
883 });
884 });
885 }
886
887 RecoveryBackend::interruptible_future<std::optional<PushOp>>
888 ReplicatedRecoveryBackend::_handle_push_reply(
889 pg_shard_t peer,
890 const PushReplyOp &op)
891 {
892 const hobject_t& soid = op.soid;
893 logger().debug("{}, soid {}, from {}", __func__, soid, peer);
894 auto recovering_iter = recovering.find(soid);
895 if (recovering_iter == recovering.end()
896 || !recovering_iter->second->pushing.count(peer)) {
897 logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
898 return seastar::make_ready_future<std::optional<PushOp>>();
899 } else {
900 auto& push_info = recovering_iter->second->pushing[peer];
901 bool error = push_info.recovery_progress.error;
902 if (!push_info.recovery_progress.data_complete && !error) {
903 return build_push_op(push_info.recovery_info, push_info.recovery_progress,
904 &push_info.stat
905 ).then_interruptible([&push_info] (auto push_op) {
906 push_info.recovery_progress = push_op.after_progress;
907 return seastar::make_ready_future<std::optional<PushOp>>(
908 std::move(push_op));
909 }).handle_exception_interruptible(
910 [recovering_iter, &push_info, peer] (auto e) {
911 push_info.recovery_progress.error = true;
912 recovering_iter->second->set_push_failed(peer, e);
913 return seastar::make_ready_future<std::optional<PushOp>>();
914 });
915 }
916 if (!error) {
917 pg.get_recovery_handler()->on_peer_recover(peer,
918 soid,
919 push_info.recovery_info);
920 }
921 recovering_iter->second->set_pushed(peer);
922 return seastar::make_ready_future<std::optional<PushOp>>();
923 }
924 }
925
926 RecoveryBackend::interruptible_future<>
927 ReplicatedRecoveryBackend::handle_push_reply(
928 Ref<MOSDPGPushReply> m)
929 {
930 logger().debug("{}: {}", __func__, *m);
931 auto from = m->from;
932 auto& push_reply = m->replies[0]; //TODO: only one reply per message
933
934 return _handle_push_reply(from, push_reply).then_interruptible(
935 [this, from](std::optional<PushOp> push_op) {
936 if (push_op) {
937 auto msg = crimson::make_message<MOSDPGPush>();
938 msg->from = pg.get_pg_whoami();
939 msg->pgid = pg.get_pgid();
940 msg->map_epoch = pg.get_osdmap_epoch();
941 msg->min_epoch = pg.get_last_peering_reset();
942 msg->set_priority(pg.get_recovery_op_priority());
943 msg->pushes.push_back(std::move(*push_op));
944 return shard_services.send_to_osd(from.osd,
945 std::move(msg),
946 pg.get_osdmap_epoch());
947 } else {
948 return seastar::make_ready_future<>();
949 }
950 });
951 }
952
953 std::pair<interval_set<uint64_t>,
954 bufferlist>
955 ReplicatedRecoveryBackend::trim_pushed_data(
956 const interval_set<uint64_t> &copy_subset,
957 const interval_set<uint64_t> &intervals_received,
958 ceph::bufferlist data_received)
959 {
960 logger().debug("{}", __func__);
961 // what i have is only a subset of what i want
962 if (intervals_received.subset_of(copy_subset)) {
963 return {intervals_received, data_received};
964 }
965 // only collect the extents included by copy_subset and intervals_received
966 interval_set<uint64_t> intervals_usable;
967 bufferlist data_usable;
968 intervals_usable.intersection_of(copy_subset, intervals_received);
969 uint64_t have_off = 0;
970 for (auto [have_start, have_len] : intervals_received) {
971 interval_set<uint64_t> want;
972 want.insert(have_start, have_len);
973 want.intersection_of(copy_subset);
974 for (auto [want_start, want_len] : want) {
975 bufferlist sub;
976 uint64_t data_off = have_off + (want_start - have_start);
977 sub.substr_of(data_received, data_off, want_len);
978 data_usable.claim_append(sub);
979 }
980 have_off += have_len;
981 }
982 return {intervals_usable, data_usable};
983 }
984
985 RecoveryBackend::interruptible_future<hobject_t>
986 ReplicatedRecoveryBackend::prep_push_target(
987 const ObjectRecoveryInfo& recovery_info,
988 bool first,
989 bool complete,
990 bool clear_omap,
991 ObjectStore::Transaction* t,
992 const map<string, bufferlist, less<>>& attrs,
993 bufferlist&& omap_header)
994 {
995 if (!first) {
996 return seastar::make_ready_future<hobject_t>(
997 get_temp_recovery_object(recovery_info.soid,
998 recovery_info.version));
999 }
1000
1001 ghobject_t target_oid;
1002 if (complete) {
1003 // overwrite the original object
1004 target_oid = ghobject_t(recovery_info.soid);
1005 } else {
1006 target_oid = ghobject_t(get_temp_recovery_object(recovery_info.soid,
1007 recovery_info.version));
1008 logger().debug("{}: Adding oid {} in the temp collection",
1009 __func__, target_oid);
1010 add_temp_obj(target_oid.hobj);
1011 }
1012 // create a new object
1013 if (!complete || !recovery_info.object_exist) {
1014 t->remove(coll->get_cid(), target_oid);
1015 t->touch(coll->get_cid(), target_oid);
1016 object_info_t oi;
1017 oi.decode_no_oid(attrs.at(OI_ATTR));
1018 t->set_alloc_hint(coll->get_cid(), target_oid,
1019 oi.expected_object_size,
1020 oi.expected_write_size,
1021 oi.alloc_hint_flags);
1022 }
1023 if (complete) {
1024 // remove xattr and update later if overwrite on original object
1025 t->rmattrs(coll->get_cid(), target_oid);
1026 // if need update omap, clear the previous content first
1027 if (clear_omap) {
1028 t->omap_clear(coll->get_cid(), target_oid);
1029 }
1030 }
1031 t->truncate(coll->get_cid(), target_oid, recovery_info.size);
1032 if (omap_header.length()) {
1033 t->omap_setheader(coll->get_cid(), target_oid, omap_header);
1034 }
1035 if (complete || !recovery_info.object_exist) {
1036 return seastar::make_ready_future<hobject_t>(target_oid.hobj);
1037 }
1038 // clone overlap content in local object if using a new object
1039 return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid)))
1040 .then_interruptible(
1041 [this, &recovery_info, t, target_oid] (auto st) {
1042 // TODO: pg num bytes counting
1043 uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
1044 interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
1045 if (local_size) {
1046 local_intervals_included.insert(0, local_size);
1047 local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
1048 local_intervals_included.subtract(local_intervals_excluded);
1049 }
1050 for (auto [off, len] : local_intervals_included) {
1051 logger().debug(" clone_range {} {}~{}",
1052 recovery_info.soid, off, len);
1053 t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid),
1054 target_oid, off, len, off);
1055 }
1056 return seastar::make_ready_future<hobject_t>(target_oid.hobj);
1057 });
1058 }
1059 RecoveryBackend::interruptible_future<>
1060 ReplicatedRecoveryBackend::submit_push_data(
1061 const ObjectRecoveryInfo &recovery_info,
1062 bool first,
1063 bool complete,
1064 bool clear_omap,
1065 interval_set<uint64_t>&& data_zeros,
1066 interval_set<uint64_t>&& intervals_included,
1067 bufferlist&& data_included,
1068 bufferlist&& omap_header,
1069 const map<string, bufferlist, less<>> &attrs,
1070 map<string, bufferlist>&& omap_entries,
1071 ObjectStore::Transaction *t)
1072 {
1073 logger().debug("{}", __func__);
1074 return prep_push_target(recovery_info, first, complete,
1075 clear_omap, t, attrs,
1076 std::move(omap_header)).then_interruptible(
1077 [this,
1078 &recovery_info, t,
1079 first, complete,
1080 data_zeros=std::move(data_zeros),
1081 intervals_included=std::move(intervals_included),
1082 data_included=std::move(data_included),
1083 omap_entries=std::move(omap_entries),
1084 &attrs](auto target_oid) mutable {
1085
1086 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1087 // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
1088 if (!data_zeros.empty()) {
1089 data_zeros.intersection_of(recovery_info.copy_subset);
1090 assert(intervals_included.subset_of(data_zeros));
1091 data_zeros.subtract(intervals_included);
1092
1093 logger().debug("submit_push_data recovering object {} copy_subset: {} "
1094 "intervals_included: {} data_zeros: {}",
1095 recovery_info.soid, recovery_info.copy_subset,
1096 intervals_included, data_zeros);
1097
1098 for (auto [start, len] : data_zeros) {
1099 t->zero(coll->get_cid(), ghobject_t(target_oid), start, len);
1100 }
1101 }
1102 uint64_t off = 0;
1103 for (auto [start, len] : intervals_included) {
1104 bufferlist bit;
1105 bit.substr_of(data_included, off, len);
1106 t->write(coll->get_cid(), ghobject_t(target_oid),
1107 start, len, bit, fadvise_flags);
1108 off += len;
1109 }
1110
1111 if (!omap_entries.empty())
1112 t->omap_setkeys(coll->get_cid(), ghobject_t(target_oid), omap_entries);
1113 if (!attrs.empty())
1114 t->setattrs(coll->get_cid(), ghobject_t(target_oid), attrs);
1115
1116 if (complete) {
1117 if (!first) {
1118 logger().debug("submit_push_data: Removing oid {} from the temp collection",
1119 target_oid);
1120 clear_temp_obj(target_oid);
1121 t->remove(coll->get_cid(), ghobject_t(recovery_info.soid));
1122 t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid),
1123 coll->get_cid(), ghobject_t(recovery_info.soid));
1124 }
1125 submit_push_complete(recovery_info, t);
1126 }
1127 logger().debug("submit_push_data: done");
1128 return seastar::make_ready_future<>();
1129 });
1130 }
1131
1132 void ReplicatedRecoveryBackend::submit_push_complete(
1133 const ObjectRecoveryInfo &recovery_info,
1134 ObjectStore::Transaction *t)
1135 {
1136 for (const auto& [oid, extents] : recovery_info.clone_subset) {
1137 for (const auto& [off, len] : extents) {
1138 logger().debug(" clone_range {} {}~{}", oid, off, len);
1139 t->clone_range(coll->get_cid(), ghobject_t(oid), ghobject_t(recovery_info.soid),
1140 off, len, off);
1141 }
1142 }
1143 }
1144
1145 RecoveryBackend::interruptible_future<>
1146 ReplicatedRecoveryBackend::handle_recovery_delete_reply(
1147 Ref<MOSDPGRecoveryDeleteReply> m)
1148 {
1149 auto& p = m->objects.front();
1150 hobject_t soid = p.first;
1151 ObjectRecoveryInfo recovery_info;
1152 recovery_info.version = p.second;
1153 pg.get_recovery_handler()->on_peer_recover(m->from, soid, recovery_info);
1154 get_recovering(soid).set_pushed(m->from);
1155 return seastar::now();
1156 }
1157
1158 RecoveryBackend::interruptible_future<>
1159 ReplicatedRecoveryBackend::handle_recovery_op(
1160 Ref<MOSDFastDispatchOp> m,
1161 crimson::net::ConnectionRef conn)
1162 {
1163 switch (m->get_header().type) {
1164 case MSG_OSD_PG_PULL:
1165 return handle_pull(boost::static_pointer_cast<MOSDPGPull>(m));
1166 case MSG_OSD_PG_PUSH:
1167 return handle_push(boost::static_pointer_cast<MOSDPGPush>(m));
1168 case MSG_OSD_PG_PUSH_REPLY:
1169 return handle_push_reply(
1170 boost::static_pointer_cast<MOSDPGPushReply>(m));
1171 case MSG_OSD_PG_RECOVERY_DELETE:
1172 return handle_recovery_delete(
1173 boost::static_pointer_cast<MOSDPGRecoveryDelete>(m));
1174 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
1175 return handle_recovery_delete_reply(
1176 boost::static_pointer_cast<MOSDPGRecoveryDeleteReply>(m));
1177 default:
1178 // delegate to parent class for handling backend-agnostic recovery ops.
1179 return RecoveryBackend::handle_recovery_op(std::move(m), conn);
1180 }
1181 }
1182