]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ReplicatedBackend.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "common/errno.h"
15#include "ReplicatedBackend.h"
16#include "messages/MOSDOp.h"
7c673cae 17#include "messages/MOSDRepOp.h"
7c673cae
FG
18#include "messages/MOSDRepOpReply.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPull.h"
21#include "messages/MOSDPGPushReply.h"
22#include "common/EventTrace.h"
11fdf7f2 23#include "include/random.h"
81eedcae 24#include "include/util.h"
11fdf7f2 25#include "OSD.h"
7c673cae
FG
26
27#define dout_context cct
28#define dout_subsys ceph_subsys_osd
29#define DOUT_PREFIX_ARGS this
30#undef dout_prefix
31#define dout_prefix _prefix(_dout, this)
32static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
11fdf7f2 33 return pgb->get_parent()->gen_dbg_prefix(*_dout);
7c673cae
FG
34}
35
36namespace {
37class PG_SendMessageOnConn: public Context {
38 PGBackend::Listener *pg;
39 Message *reply;
40 ConnectionRef conn;
41 public:
42 PG_SendMessageOnConn(
43 PGBackend::Listener *pg,
44 Message *reply,
45 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
46 void finish(int) override {
47 pg->send_message_osd_cluster(reply, conn.get());
48 }
49};
50
51class PG_RecoveryQueueAsync : public Context {
52 PGBackend::Listener *pg;
53 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
54 public:
55 PG_RecoveryQueueAsync(
56 PGBackend::Listener *pg,
57 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
58 void finish(int) override {
59 pg->schedule_recovery_work(c.release());
60 }
61};
62}
63
7c673cae
FG
64struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
65 ReplicatedBackend *pg;
66 RepModifyRef rm;
67 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
68 : pg(pg), rm(r) {}
69 void finish(int r) override {
70 pg->repop_commit(rm);
71 }
72};
73
74static void log_subop_stats(
75 PerfCounters *logger,
76 OpRequestRef op, int subop)
77{
78 utime_t now = ceph_clock_now();
79 utime_t latency = now;
80 latency -= op->get_req()->get_recv_stamp();
81
82
83 logger->inc(l_osd_sop);
84 logger->tinc(l_osd_sop_lat, latency);
85 logger->inc(subop);
86
87 if (subop != l_osd_sop_pull) {
88 uint64_t inb = op->get_req()->get_data().length();
89 logger->inc(l_osd_sop_inb, inb);
90 if (subop == l_osd_sop_w) {
91 logger->inc(l_osd_sop_w_inb, inb);
92 logger->tinc(l_osd_sop_w_lat, latency);
93 } else if (subop == l_osd_sop_push) {
94 logger->inc(l_osd_sop_push_inb, inb);
95 logger->tinc(l_osd_sop_push_lat, latency);
96 } else
11fdf7f2 97 ceph_abort_msg("no support subop");
7c673cae
FG
98 } else {
99 logger->tinc(l_osd_sop_pull_lat, latency);
100 }
101}
102
103ReplicatedBackend::ReplicatedBackend(
104 PGBackend::Listener *pg,
11fdf7f2 105 const coll_t &coll,
7c673cae
FG
106 ObjectStore::CollectionHandle &c,
107 ObjectStore *store,
108 CephContext *cct) :
109 PGBackend(cct, pg, store, coll, c) {}
110
111void ReplicatedBackend::run_recovery_op(
112 PGBackend::RecoveryHandle *_h,
113 int priority)
114{
115 RPGHandle *h = static_cast<RPGHandle *>(_h);
116 send_pushes(priority, h->pushes);
117 send_pulls(priority, h->pulls);
c07f9fc5 118 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
119 delete h;
120}
121
224ce89b 122int ReplicatedBackend::recover_object(
7c673cae
FG
123 const hobject_t &hoid,
124 eversion_t v,
125 ObjectContextRef head,
126 ObjectContextRef obc,
127 RecoveryHandle *_h
128 )
129{
130 dout(10) << __func__ << ": " << hoid << dendl;
131 RPGHandle *h = static_cast<RPGHandle *>(_h);
132 if (get_parent()->get_local_missing().is_missing(hoid)) {
11fdf7f2 133 ceph_assert(!obc);
7c673cae
FG
134 // pull
135 prepare_pull(
136 v,
137 hoid,
138 head,
139 h);
7c673cae 140 } else {
11fdf7f2 141 ceph_assert(obc);
7c673cae
FG
142 int started = start_pushes(
143 hoid,
144 obc,
145 h);
224ce89b
WB
146 if (started < 0) {
147 pushing[hoid].clear();
148 return started;
149 }
7c673cae 150 }
224ce89b 151 return 0;
7c673cae
FG
152}
153
154void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
155{
156 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
157 i != pull_from_peer.end();
158 ) {
159 if (osdmap->is_down(i->first.osd)) {
160 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
161 << ", osdmap has it marked down" << dendl;
162 for (set<hobject_t>::iterator j = i->second.begin();
163 j != i->second.end();
164 ++j) {
165 get_parent()->cancel_pull(*j);
166 clear_pull(pulling.find(*j), false);
167 }
168 pull_from_peer.erase(i++);
169 } else {
170 ++i;
171 }
172 }
173}
174
175bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
176{
177 dout(10) << __func__ << ": " << op << dendl;
178 switch (op->get_req()->get_type()) {
179 case MSG_OSD_PG_PULL:
180 return true;
181 default:
182 return false;
183 }
184}
185
c07f9fc5 186bool ReplicatedBackend::_handle_message(
7c673cae
FG
187 OpRequestRef op
188 )
189{
190 dout(10) << __func__ << ": " << op << dendl;
191 switch (op->get_req()->get_type()) {
192 case MSG_OSD_PG_PUSH:
193 do_push(op);
194 return true;
195
196 case MSG_OSD_PG_PULL:
197 do_pull(op);
198 return true;
199
200 case MSG_OSD_PG_PUSH_REPLY:
201 do_push_reply(op);
202 return true;
203
7c673cae
FG
204 case MSG_OSD_REPOP: {
205 do_repop(op);
206 return true;
207 }
208
209 case MSG_OSD_REPOPREPLY: {
210 do_repop_reply(op);
211 return true;
212 }
213
214 default:
215 break;
216 }
217 return false;
218}
219
220void ReplicatedBackend::clear_recovery_state()
221{
222 // clear pushing/pulling maps
223 for (auto &&i: pushing) {
224 for (auto &&j: i.second) {
225 get_parent()->release_locks(j.second.lock_manager);
226 }
227 }
228 pushing.clear();
229
230 for (auto &&i: pulling) {
231 get_parent()->release_locks(i.second.lock_manager);
232 }
233 pulling.clear();
234 pull_from_peer.clear();
235}
236
237void ReplicatedBackend::on_change()
238{
239 dout(10) << __func__ << dendl;
11fdf7f2
TL
240 for (auto& op : in_progress_ops) {
241 delete op.second->on_commit;
242 op.second->on_commit = nullptr;
7c673cae 243 }
11fdf7f2 244 in_progress_ops.clear();
7c673cae
FG
245 clear_recovery_state();
246}
247
7c673cae
FG
248int ReplicatedBackend::objects_read_sync(
249 const hobject_t &hoid,
250 uint64_t off,
251 uint64_t len,
252 uint32_t op_flags,
253 bufferlist *bl)
254{
255 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
256}
257
9f95a23c
TL
258int ReplicatedBackend::objects_readv_sync(
259 const hobject_t &hoid,
260 map<uint64_t, uint64_t>&& m,
261 uint32_t op_flags,
262 bufferlist *bl)
263{
264 interval_set<uint64_t> im(std::move(m));
265 auto r = store->readv(ch, ghobject_t(hoid), im, *bl, op_flags);
266 if (r >= 0) {
267 m = std::move(im).detach();
268 }
269 return r;
270}
271
7c673cae
FG
272void ReplicatedBackend::objects_read_async(
273 const hobject_t &hoid,
274 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
275 pair<bufferlist*, Context*> > > &to_read,
276 Context *on_complete,
277 bool fast_read)
278{
11fdf7f2 279 ceph_abort_msg("async read is not used by replica pool");
7c673cae
FG
280}
281
282class C_OSD_OnOpCommit : public Context {
283 ReplicatedBackend *pg;
9f95a23c 284 ceph::ref_t<ReplicatedBackend::InProgressOp> op;
7c673cae 285public:
9f95a23c
TL
286 C_OSD_OnOpCommit(ReplicatedBackend *pg, ceph::ref_t<ReplicatedBackend::InProgressOp> op)
287 : pg(pg), op(std::move(op)) {}
7c673cae
FG
288 void finish(int) override {
289 pg->op_commit(op);
290 }
291};
292
7c673cae
FG
293void generate_transaction(
294 PGTransactionUPtr &pgt,
295 const coll_t &coll,
7c673cae
FG
296 vector<pg_log_entry_t> &log_entries,
297 ObjectStore::Transaction *t,
298 set<hobject_t> *added,
9f95a23c
TL
299 set<hobject_t> *removed,
300 const ceph_release_t require_osd_release = ceph_release_t::unknown )
7c673cae 301{
11fdf7f2
TL
302 ceph_assert(t);
303 ceph_assert(added);
304 ceph_assert(removed);
7c673cae
FG
305
306 for (auto &&le: log_entries) {
307 le.mark_unrollbackable();
308 auto oiter = pgt->op_map.find(le.soid);
309 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
31f18b77 310 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
11fdf7f2 311 encode(oiter->second.updated_snaps->second, bl);
31f18b77
FG
312 le.snaps.swap(bl);
313 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
7c673cae
FG
314 }
315 }
316
317 pgt->safe_create_traverse(
318 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
319 const hobject_t &oid = obj_op.first;
320 const ghobject_t goid =
321 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
322 const PGTransaction::ObjectOperation &op = obj_op.second;
323
324 if (oid.is_temp()) {
325 if (op.is_fresh_object()) {
326 added->insert(oid);
327 } else if (op.is_delete()) {
328 removed->insert(oid);
329 }
330 }
331
332 if (op.delete_first) {
333 t->remove(coll, goid);
334 }
335
336 match(
337 op.init_type,
338 [&](const PGTransaction::ObjectOperation::Init::None &) {
339 },
340 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
9f95a23c
TL
341 if (require_osd_release >= ceph_release_t::octopus) {
342 t->create(coll, goid);
343 } else {
344 t->touch(coll, goid);
345 }
7c673cae
FG
346 },
347 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
348 t->clone(
349 coll,
350 ghobject_t(
351 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
352 goid);
353 },
354 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
11fdf7f2 355 ceph_assert(op.source.is_temp());
7c673cae
FG
356 t->collection_move_rename(
357 coll,
358 ghobject_t(
359 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
360 coll,
361 goid);
362 });
363
364 if (op.truncate) {
365 t->truncate(coll, goid, op.truncate->first);
366 if (op.truncate->first != op.truncate->second)
367 t->truncate(coll, goid, op.truncate->second);
368 }
369
370 if (!op.attr_updates.empty()) {
371 map<string, bufferlist> attrs;
372 for (auto &&p: op.attr_updates) {
373 if (p.second)
374 attrs[p.first] = *(p.second);
375 else
376 t->rmattr(coll, goid, p.first);
377 }
378 t->setattrs(coll, goid, attrs);
379 }
380
381 if (op.clear_omap)
382 t->omap_clear(coll, goid);
383 if (op.omap_header)
384 t->omap_setheader(coll, goid, *(op.omap_header));
385
386 for (auto &&up: op.omap_updates) {
387 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
388 switch (up.first) {
389 case UpdateType::Remove:
390 t->omap_rmkeys(coll, goid, up.second);
391 break;
392 case UpdateType::Insert:
393 t->omap_setkeys(coll, goid, up.second);
394 break;
9f95a23c
TL
395 case UpdateType::RemoveRange:
396 t->omap_rmkeyrange(coll, goid, up.second);
397 break;
7c673cae
FG
398 }
399 }
400
401 // updated_snaps doesn't matter since we marked unrollbackable
402
403 if (op.alloc_hint) {
404 auto &hint = *(op.alloc_hint);
405 t->set_alloc_hint(
406 coll,
407 goid,
408 hint.expected_object_size,
409 hint.expected_write_size,
410 hint.flags);
411 }
412
413 for (auto &&extent: op.buffer_updates) {
414 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
415 match(
416 extent.get_val(),
417 [&](const BufferUpdate::Write &op) {
418 t->write(
419 coll,
420 goid,
421 extent.get_off(),
422 extent.get_len(),
9f95a23c
TL
423 op.buffer,
424 op.fadvise_flags);
7c673cae
FG
425 },
426 [&](const BufferUpdate::Zero &op) {
427 t->zero(
428 coll,
429 goid,
430 extent.get_off(),
431 extent.get_len());
432 },
433 [&](const BufferUpdate::CloneRange &op) {
11fdf7f2 434 ceph_assert(op.len == extent.get_len());
7c673cae
FG
435 t->clone_range(
436 coll,
437 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
438 goid,
439 op.offset,
440 extent.get_len(),
441 extent.get_off());
442 });
443 }
444 });
445}
446
447void ReplicatedBackend::submit_transaction(
448 const hobject_t &soid,
449 const object_stat_sum_t &delta_stats,
450 const eversion_t &at_version,
451 PGTransactionUPtr &&_t,
452 const eversion_t &trim_to,
9f95a23c 453 const eversion_t &min_last_complete_ondisk,
7c673cae 454 const vector<pg_log_entry_t> &_log_entries,
9f95a23c 455 std::optional<pg_hit_set_history_t> &hset_history,
7c673cae
FG
456 Context *on_all_commit,
457 ceph_tid_t tid,
458 osd_reqid_t reqid,
459 OpRequestRef orig_op)
460{
461 parent->apply_stats(
462 soid,
463 delta_stats);
464
465 vector<pg_log_entry_t> log_entries(_log_entries);
466 ObjectStore::Transaction op_t;
467 PGTransactionUPtr t(std::move(_t));
468 set<hobject_t> added, removed;
469 generate_transaction(
470 t,
471 coll,
7c673cae
FG
472 log_entries,
473 &op_t,
474 &added,
9f95a23c
TL
475 &removed,
476 get_osdmap()->require_osd_release);
11fdf7f2
TL
477 ceph_assert(added.size() <= 1);
478 ceph_assert(removed.size() <= 1);
7c673cae 479
11fdf7f2 480 auto insert_res = in_progress_ops.insert(
7c673cae
FG
481 make_pair(
482 tid,
9f95a23c 483 ceph::make_ref<InProgressOp>(
11fdf7f2 484 tid, on_all_commit,
7c673cae
FG
485 orig_op, at_version)
486 )
11fdf7f2
TL
487 );
488 ceph_assert(insert_res.second);
489 InProgressOp &op = *insert_res.first->second;
7c673cae 490
7c673cae 491 op.waiting_for_commit.insert(
11fdf7f2
TL
492 parent->get_acting_recovery_backfill_shards().begin(),
493 parent->get_acting_recovery_backfill_shards().end());
7c673cae
FG
494
495 issue_op(
496 soid,
497 at_version,
498 tid,
499 reqid,
500 trim_to,
9f95a23c 501 min_last_complete_ondisk,
7c673cae
FG
502 added.size() ? *(added.begin()) : hobject_t(),
503 removed.size() ? *(removed.begin()) : hobject_t(),
504 log_entries,
505 hset_history,
506 &op,
507 op_t);
508
509 add_temp_objs(added);
510 clear_temp_objs(removed);
511
512 parent->log_operation(
513 log_entries,
514 hset_history,
515 trim_to,
516 at_version,
9f95a23c 517 min_last_complete_ondisk,
7c673cae
FG
518 true,
519 op_t);
520
7c673cae
FG
521 op_t.register_on_commit(
522 parent->bless_context(
523 new C_OSD_OnOpCommit(this, &op)));
524
525 vector<ObjectStore::Transaction> tls;
526 tls.push_back(std::move(op_t));
527
528 parent->queue_transactions(tls, op.op);
11fdf7f2
TL
529 if (at_version != eversion_t()) {
530 parent->op_applied(at_version);
7c673cae
FG
531 }
532}
533
9f95a23c 534void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op)
7c673cae 535{
11fdf7f2
TL
536 if (op->on_commit == nullptr) {
537 // aborted
538 return;
539 }
540
541 FUNCTRACE(cct);
7c673cae
FG
542 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
543 dout(10) << __func__ << ": " << op->tid << dendl;
544 if (op->op) {
545 op->op->mark_event("op_commit");
546 op->op->pg_trace.event("op commit");
547 }
548
549 op->waiting_for_commit.erase(get_parent()->whoami_shard());
550
551 if (op->waiting_for_commit.empty()) {
552 op->on_commit->complete(0);
553 op->on_commit = 0;
7c673cae
FG
554 in_progress_ops.erase(op->tid);
555 }
556}
557
558void ReplicatedBackend::do_repop_reply(OpRequestRef op)
559{
560 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
9f95a23c 561 auto r = op->get_req<MOSDRepOpReply>();
11fdf7f2 562 ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);
7c673cae
FG
563
564 op->mark_started();
565
566 // must be replication.
567 ceph_tid_t rep_tid = r->get_tid();
568 pg_shard_t from = r->from;
569
11fdf7f2
TL
570 auto iter = in_progress_ops.find(rep_tid);
571 if (iter != in_progress_ops.end()) {
572 InProgressOp &ip_op = *iter->second;
9f95a23c 573 const MOSDOp *m = nullptr;
7c673cae 574 if (ip_op.op)
9f95a23c 575 m = ip_op.op->get_req<MOSDOp>();
7c673cae
FG
576
577 if (m)
578 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
579 << " ack_type " << (int)r->ack_type
580 << " from " << from
581 << dendl;
582 else
583 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
584 << " ack_type " << (int)r->ack_type
585 << " from " << from
586 << dendl;
587
588 // oh, good.
589
590 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
11fdf7f2 591 ceph_assert(ip_op.waiting_for_commit.count(from));
7c673cae
FG
592 ip_op.waiting_for_commit.erase(from);
593 if (ip_op.op) {
11fdf7f2 594 ip_op.op->mark_event("sub_op_commit_rec");
7c673cae
FG
595 ip_op.op->pg_trace.event("sub_op_commit_rec");
596 }
597 } else {
11fdf7f2 598 // legacy peer; ignore
7c673cae 599 }
7c673cae
FG
600
601 parent->update_peer_last_complete_ondisk(
602 from,
603 r->get_last_complete_ondisk());
604
7c673cae
FG
605 if (ip_op.waiting_for_commit.empty() &&
606 ip_op.on_commit) {
607 ip_op.on_commit->complete(0);
11fdf7f2 608 ip_op.on_commit = 0;
7c673cae
FG
609 in_progress_ops.erase(iter);
610 }
611 }
612}
613
28e407b8 614int ReplicatedBackend::be_deep_scrub(
7c673cae 615 const hobject_t &poid,
28e407b8
AA
616 ScrubMap &map,
617 ScrubMapBuilder &pos,
618 ScrubMap::object &o)
7c673cae 619{
28e407b8 620 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 621 int r;
28e407b8 622 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
91327a77
AA
623 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
624 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE;
28e407b8
AA
625
626 utime_t sleeptime;
627 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
628 if (sleeptime != utime_t()) {
629 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
630 sleeptime.sleep();
631 }
7c673cae 632
11fdf7f2 633 ceph_assert(poid == pos.ls[pos.pos]);
28e407b8
AA
634 if (!pos.data_done()) {
635 if (pos.data_pos == 0) {
636 pos.data_hash = bufferhash(-1);
637 }
7c673cae 638
28e407b8 639 bufferlist bl;
7c673cae 640 r = store->read(
28e407b8
AA
641 ch,
642 ghobject_t(
643 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
644 pos.data_pos,
645 cct->_conf->osd_deep_scrub_stride, bl,
646 fadvise_flags);
647 if (r < 0) {
648 dout(20) << __func__ << " " << poid << " got "
649 << r << " on read, read_error" << dendl;
650 o.read_error = true;
651 return 0;
652 }
653 if (r > 0) {
654 pos.data_hash << bl;
655 }
656 pos.data_pos += r;
657 if (r == cct->_conf->osd_deep_scrub_stride) {
658 dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
659 << std::hex << pos.data_hash.digest() << std::dec << dendl;
660 return -EINPROGRESS;
661 }
662 // done with bytes
663 pos.data_pos = -1;
664 o.digest = pos.data_hash.digest();
665 o.digest_present = true;
666 dout(20) << __func__ << " " << poid << " done with data, digest 0x"
667 << std::hex << o.digest << std::dec << dendl;
7c673cae 668 }
7c673cae 669
28e407b8
AA
670 // omap header
671 if (pos.omap_pos.empty()) {
672 pos.omap_hash = bufferhash(-1);
673
674 bufferlist hdrbl;
675 r = store->omap_get_header(
11fdf7f2 676 ch,
28e407b8
AA
677 ghobject_t(
678 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
679 &hdrbl, true);
680 if (r == -EIO) {
681 dout(20) << __func__ << " " << poid << " got "
682 << r << " on omap header read, read_error" << dendl;
683 o.read_error = true;
684 return 0;
685 }
686 if (r == 0 && hdrbl.length()) {
81eedcae
TL
687 bool encoded = false;
688 dout(25) << "CRC header " << cleanbin(hdrbl, encoded, true) << dendl;
28e407b8 689 pos.omap_hash << hdrbl;
7c673cae 690 }
7c673cae
FG
691 }
692
28e407b8 693 // omap
7c673cae 694 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
11fdf7f2 695 ch,
7c673cae
FG
696 ghobject_t(
697 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
11fdf7f2 698 ceph_assert(iter);
28e407b8
AA
699 if (pos.omap_pos.length()) {
700 iter->lower_bound(pos.omap_pos);
701 } else {
702 iter->seek_to_first();
703 }
11fdf7f2 704 int max = g_conf()->osd_deep_scrub_keys;
28e407b8
AA
705 while (iter->status() == 0 && iter->valid()) {
706 pos.omap_bytes += iter->value().length();
707 ++pos.omap_keys;
708 --max;
709 // fixme: we can do this more efficiently.
710 bufferlist bl;
11fdf7f2
TL
711 encode(iter->key(), bl);
712 encode(iter->value(), bl);
28e407b8
AA
713 pos.omap_hash << bl;
714
715 iter->next();
716
717 if (iter->valid() && max == 0) {
718 pos.omap_pos = iter->key();
719 return -EINPROGRESS;
720 }
721 if (iter->status() < 0) {
722 dout(25) << __func__ << " " << poid
723 << " on omap scan, db status error" << dendl;
724 o.read_error = true;
725 return 0;
726 }
7c673cae
FG
727 }
728
28e407b8
AA
729 if (pos.omap_keys > cct->_conf->
730 osd_deep_scrub_large_omap_object_key_threshold ||
731 pos.omap_bytes > cct->_conf->
732 osd_deep_scrub_large_omap_object_value_sum_threshold) {
733 dout(25) << __func__ << " " << poid
734 << " large omap object detected. Object has " << pos.omap_keys
735 << " keys and size " << pos.omap_bytes << " bytes" << dendl;
736 o.large_omap_object_found = true;
737 o.large_omap_object_key_count = pos.omap_keys;
738 o.large_omap_object_value_size = pos.omap_bytes;
739 map.has_large_omap_object_errors = true;
7c673cae
FG
740 }
741
28e407b8 742 o.omap_digest = pos.omap_hash.digest();
7c673cae 743 o.omap_digest_present = true;
28e407b8 744 dout(20) << __func__ << " done with " << poid << " omap_digest "
7c673cae 745 << std::hex << o.omap_digest << std::dec << dendl;
28e407b8 746
11fdf7f2
TL
747 // Sum up omap usage
748 if (pos.omap_keys > 0 || pos.omap_bytes > 0) {
749 dout(25) << __func__ << " adding " << pos.omap_keys << " keys and "
750 << pos.omap_bytes << " bytes to pg_stats sums" << dendl;
751 map.has_omap_keys = true;
752 o.object_omap_bytes = pos.omap_bytes;
753 o.object_omap_keys = pos.omap_keys;
754 }
755
28e407b8
AA
756 // done!
757 return 0;
7c673cae
FG
758}
759
760void ReplicatedBackend::_do_push(OpRequestRef op)
761{
9f95a23c 762 auto m = op->get_req<MOSDPGPush>();
11fdf7f2 763 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
7c673cae
FG
764 pg_shard_t from = m->from;
765
766 op->mark_started();
767
768 vector<PushReplyOp> replies;
769 ObjectStore::Transaction t;
11fdf7f2
TL
770 if (get_parent()->check_failsafe_full()) {
771 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
7c673cae
FG
772 ceph_abort();
773 }
774 for (vector<PushOp>::const_iterator i = m->pushes.begin();
775 i != m->pushes.end();
776 ++i) {
777 replies.push_back(PushReplyOp());
11fdf7f2 778 handle_push(from, *i, &(replies.back()), &t, m->is_repair);
7c673cae
FG
779 }
780
781 MOSDPGPushReply *reply = new MOSDPGPushReply;
782 reply->from = get_parent()->whoami_shard();
783 reply->set_priority(m->get_priority());
784 reply->pgid = get_info().pgid;
785 reply->map_epoch = m->map_epoch;
786 reply->min_epoch = m->min_epoch;
787 reply->replies.swap(replies);
788 reply->compute_cost(cct);
789
790 t.register_on_complete(
791 new PG_SendMessageOnConn(
792 get_parent(), reply, m->get_connection()));
793
794 get_parent()->queue_transaction(std::move(t));
795}
796
797struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
798 ReplicatedBackend *bc;
799 list<ReplicatedBackend::pull_complete_info> to_continue;
800 int priority;
801 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
802 : bc(bc), priority(priority) {}
803
804 void finish(ThreadPool::TPHandle &handle) override {
805 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
806 for (auto &&i: to_continue) {
807 auto j = bc->pulling.find(i.hoid);
11fdf7f2 808 ceph_assert(j != bc->pulling.end());
7c673cae
FG
809 ObjectContextRef obc = j->second.obc;
810 bc->clear_pull(j, false /* already did it */);
224ce89b
WB
811 int started = bc->start_pushes(i.hoid, obc, h);
812 if (started < 0) {
813 bc->pushing[i.hoid].clear();
9f95a23c
TL
814 bc->get_parent()->on_failed_pull(
815 { bc->get_parent()->whoami_shard() },
816 i.hoid, obc->obs.oi.version);
224ce89b 817 } else if (!started) {
7c673cae 818 bc->get_parent()->on_global_recover(
c07f9fc5 819 i.hoid, i.stat, false);
7c673cae
FG
820 }
821 handle.reset_tp_timeout();
822 }
823 bc->run_recovery_op(h, priority);
824 }
825};
826
827void ReplicatedBackend::_do_pull_response(OpRequestRef op)
828{
9f95a23c 829 auto m = op->get_req<MOSDPGPush>();
11fdf7f2 830 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
7c673cae
FG
831 pg_shard_t from = m->from;
832
833 op->mark_started();
834
835 vector<PullOp> replies(1);
11fdf7f2
TL
836 if (get_parent()->check_failsafe_full()) {
837 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push)." << dendl;
7c673cae
FG
838 ceph_abort();
839 }
840
841 ObjectStore::Transaction t;
842 list<pull_complete_info> to_continue;
843 for (vector<PushOp>::const_iterator i = m->pushes.begin();
844 i != m->pushes.end();
845 ++i) {
846 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
847 if (more)
848 replies.push_back(PullOp());
849 }
850 if (!to_continue.empty()) {
851 C_ReplicatedBackend_OnPullComplete *c =
852 new C_ReplicatedBackend_OnPullComplete(
853 this,
854 m->get_priority());
855 c->to_continue.swap(to_continue);
856 t.register_on_complete(
857 new PG_RecoveryQueueAsync(
858 get_parent(),
11fdf7f2 859 get_parent()->bless_unlocked_gencontext(c)));
7c673cae
FG
860 }
861 replies.erase(replies.end() - 1);
862
863 if (replies.size()) {
864 MOSDPGPull *reply = new MOSDPGPull;
865 reply->from = parent->whoami_shard();
866 reply->set_priority(m->get_priority());
867 reply->pgid = get_info().pgid;
868 reply->map_epoch = m->map_epoch;
869 reply->min_epoch = m->min_epoch;
870 reply->set_pulls(&replies);
871 reply->compute_cost(cct);
872
873 t.register_on_complete(
874 new PG_SendMessageOnConn(
875 get_parent(), reply, m->get_connection()));
876 }
877
878 get_parent()->queue_transaction(std::move(t));
879}
880
881void ReplicatedBackend::do_pull(OpRequestRef op)
882{
883 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
11fdf7f2 884 ceph_assert(m->get_type() == MSG_OSD_PG_PULL);
7c673cae
FG
885 pg_shard_t from = m->from;
886
887 map<pg_shard_t, vector<PushOp> > replies;
888 vector<PullOp> pulls;
889 m->take_pulls(&pulls);
890 for (auto& i : pulls) {
891 replies[from].push_back(PushOp());
892 handle_pull(from, i, &(replies[from].back()));
893 }
894 send_pushes(m->get_priority(), replies);
895}
896
897void ReplicatedBackend::do_push_reply(OpRequestRef op)
898{
9f95a23c 899 auto m = op->get_req<MOSDPGPushReply>();
11fdf7f2 900 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
7c673cae
FG
901 pg_shard_t from = m->from;
902
903 vector<PushOp> replies(1);
904 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
905 i != m->replies.end();
906 ++i) {
907 bool more = handle_push_reply(from, *i, &(replies.back()));
908 if (more)
909 replies.push_back(PushOp());
910 }
911 replies.erase(replies.end() - 1);
912
913 map<pg_shard_t, vector<PushOp> > _replies;
914 _replies[from].swap(replies);
915 send_pushes(m->get_priority(), _replies);
916}
917
918Message * ReplicatedBackend::generate_subop(
919 const hobject_t &soid,
920 const eversion_t &at_version,
921 ceph_tid_t tid,
922 osd_reqid_t reqid,
923 eversion_t pg_trim_to,
9f95a23c 924 eversion_t min_last_complete_ondisk,
7c673cae
FG
925 hobject_t new_temp_oid,
926 hobject_t discard_temp_oid,
11fdf7f2 927 const bufferlist &log_entries,
9f95a23c 928 std::optional<pg_hit_set_history_t> &hset_hist,
7c673cae
FG
929 ObjectStore::Transaction &op_t,
930 pg_shard_t peer,
931 const pg_info_t &pinfo)
932{
933 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
934 // forward the write/update/whatever
935 MOSDRepOp *wr = new MOSDRepOp(
936 reqid, parent->whoami_shard(),
937 spg_t(get_info().pgid.pgid, peer.shard),
938 soid, acks_wanted,
11fdf7f2 939 get_osdmap_epoch(),
7c673cae
FG
940 parent->get_last_peering_reset_epoch(),
941 tid, at_version);
942
943 // ship resulting transaction, log entries, and pg_stats
944 if (!parent->should_send_op(peer, soid)) {
7c673cae 945 ObjectStore::Transaction t;
11fdf7f2 946 encode(t, wr->get_data());
7c673cae 947 } else {
11fdf7f2 948 encode(op_t, wr->get_data());
7c673cae
FG
949 wr->get_header().data_off = op_t.get_data_alignment();
950 }
951
11fdf7f2 952 wr->logbl = log_entries;
7c673cae
FG
953
954 if (pinfo.is_incomplete())
955 wr->pg_stats = pinfo.stats; // reflects backfill progress
956 else
957 wr->pg_stats = get_info().stats;
958
959 wr->pg_trim_to = pg_trim_to;
9f95a23c
TL
960
961 if (HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)) {
962 wr->min_last_complete_ondisk = min_last_complete_ondisk;
963 } else {
964 /* Some replicas need this field to be at_version. New replicas
965 * will ignore it */
966 wr->set_rollback_to(at_version);
967 }
7c673cae
FG
968
969 wr->new_temp_oid = new_temp_oid;
970 wr->discard_temp_oid = discard_temp_oid;
971 wr->updated_hit_set_history = hset_hist;
972 return wr;
973}
974
975void ReplicatedBackend::issue_op(
976 const hobject_t &soid,
977 const eversion_t &at_version,
978 ceph_tid_t tid,
979 osd_reqid_t reqid,
980 eversion_t pg_trim_to,
9f95a23c 981 eversion_t min_last_complete_ondisk,
7c673cae
FG
982 hobject_t new_temp_oid,
983 hobject_t discard_temp_oid,
984 const vector<pg_log_entry_t> &log_entries,
9f95a23c 985 std::optional<pg_hit_set_history_t> &hset_hist,
7c673cae
FG
986 InProgressOp *op,
987 ObjectStore::Transaction &op_t)
988{
11fdf7f2
TL
989 if (parent->get_acting_recovery_backfill_shards().size() > 1) {
990 if (op->op) {
991 op->op->pg_trace.event("issue replication ops");
992 ostringstream ss;
993 set<pg_shard_t> replicas = parent->get_acting_recovery_backfill_shards();
994 replicas.erase(parent->whoami_shard());
995 ss << "waiting for subops from " << replicas;
7c673cae 996 op->op->mark_sub_op_sent(ss.str());
11fdf7f2 997 }
7c673cae 998
11fdf7f2
TL
999 // avoid doing the same work in generate_subop
1000 bufferlist logs;
1001 encode(log_entries, logs);
1002
1003 for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
1004 if (shard == parent->whoami_shard()) continue;
1005 const pg_info_t &pinfo = parent->get_shard_info().find(shard)->second;
1006
1007 Message *wr;
1008 wr = generate_subop(
1009 soid,
1010 at_version,
1011 tid,
1012 reqid,
1013 pg_trim_to,
9f95a23c 1014 min_last_complete_ondisk,
11fdf7f2
TL
1015 new_temp_oid,
1016 discard_temp_oid,
1017 logs,
1018 hset_hist,
1019 op_t,
1020 shard,
1021 pinfo);
1022 if (op->op && op->op->pg_trace)
1023 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1024 get_parent()->send_message_osd_cluster(
1025 shard.osd, wr, get_osdmap_epoch());
1026 }
7c673cae
FG
1027 }
1028}
1029
1030// sub op modify
1031void ReplicatedBackend::do_repop(OpRequestRef op)
1032{
1033 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
9f95a23c 1034 auto m = op->get_req<MOSDRepOp>();
7c673cae 1035 int msg_type = m->get_type();
11fdf7f2 1036 ceph_assert(MSG_OSD_REPOP == msg_type);
7c673cae
FG
1037
1038 const hobject_t& soid = m->poid;
1039
1040 dout(10) << __func__ << " " << soid
1041 << " v " << m->version
1042 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1043 << " " << m->logbl.length()
1044 << dendl;
1045
1046 // sanity checks
11fdf7f2 1047 ceph_assert(m->map_epoch >= get_info().history.same_interval_since);
7c673cae 1048
11fdf7f2 1049 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
28e407b8
AA
1050 parent->maybe_preempt_replica_scrub(soid);
1051
7c673cae
FG
1052 int ackerosd = m->get_source().num();
1053
1054 op->mark_started();
1055
1056 RepModifyRef rm(std::make_shared<RepModify>());
1057 rm->op = op;
1058 rm->ackerosd = ackerosd;
1059 rm->last_complete = get_info().last_complete;
11fdf7f2 1060 rm->epoch_started = get_osdmap_epoch();
7c673cae 1061
11fdf7f2 1062 ceph_assert(m->logbl.length());
7c673cae
FG
1063 // shipped transaction and log entries
1064 vector<pg_log_entry_t> log;
1065
11fdf7f2
TL
1066 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1067 decode(rm->opt, p);
7c673cae
FG
1068
1069 if (m->new_temp_oid != hobject_t()) {
1070 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1071 add_temp_obj(m->new_temp_oid);
1072 }
1073 if (m->discard_temp_oid != hobject_t()) {
1074 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1075 if (rm->opt.empty()) {
1076 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1077 << " since we won't get the transaction" << dendl;
1078 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1079 }
1080 clear_temp_obj(m->discard_temp_oid);
1081 }
1082
1083 p = const_cast<bufferlist&>(m->logbl).begin();
11fdf7f2 1084 decode(log, p);
7c673cae
FG
1085 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1086
1087 bool update_snaps = false;
1088 if (!rm->opt.empty()) {
1089 // If the opt is non-empty, we infer we are before
1090 // last_backfill (according to the primary, not our
1091 // not-quite-accurate value), and should update the
1092 // collections now. Otherwise, we do it later on push.
1093 update_snaps = true;
1094 }
11fdf7f2
TL
1095
1096 // flag set to true during async recovery
1097 bool async = false;
1098 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
1099 if (pmissing.is_missing(soid)) {
1100 async = true;
1101 dout(30) << __func__ << " is_missing " << pmissing.is_missing(soid) << dendl;
1102 for (auto &&e: log) {
1103 dout(30) << " add_next_event entry " << e << dendl;
1104 get_parent()->add_local_next_event(e);
1105 dout(30) << " entry is_delete " << e.is_delete() << dendl;
1106 }
1107 }
1108
7c673cae
FG
1109 parent->update_stats(m->pg_stats);
1110 parent->log_operation(
1111 log,
1112 m->updated_hit_set_history,
1113 m->pg_trim_to,
9f95a23c
TL
1114 m->version, /* Replicated PGs don't have rollback info */
1115 m->min_last_complete_ondisk,
7c673cae 1116 update_snaps,
11fdf7f2
TL
1117 rm->localt,
1118 async);
7c673cae
FG
1119
1120 rm->opt.register_on_commit(
1121 parent->bless_context(
1122 new C_OSD_RepModifyCommit(this, rm)));
7c673cae
FG
1123 vector<ObjectStore::Transaction> tls;
1124 tls.reserve(2);
1125 tls.push_back(std::move(rm->localt));
1126 tls.push_back(std::move(rm->opt));
1127 parent->queue_transactions(tls, op);
1128 // op is cleaned up by oncommit/onapply when both are executed
11fdf7f2 1129 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
7c673cae
FG
1130}
1131
1132void ReplicatedBackend::repop_commit(RepModifyRef rm)
1133{
1134 rm->op->mark_commit_sent();
1135 rm->op->pg_trace.event("sup_op_commit");
1136 rm->committed = true;
1137
1138 // send commit.
9f95a23c 1139 auto m = rm->op->get_req<MOSDRepOp>();
11fdf7f2 1140 ceph_assert(m->get_type() == MSG_OSD_REPOP);
7c673cae
FG
1141 dout(10) << __func__ << " on op " << *m
1142 << ", sending commit to osd." << rm->ackerosd
1143 << dendl;
11fdf7f2 1144 ceph_assert(get_osdmap()->is_up(rm->ackerosd));
7c673cae
FG
1145
1146 get_parent()->update_last_complete_ondisk(rm->last_complete);
1147
1148 MOSDRepOpReply *reply = new MOSDRepOpReply(
1149 m,
1150 get_parent()->whoami_shard(),
11fdf7f2 1151 0, get_osdmap_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
7c673cae
FG
1152 reply->set_last_complete_ondisk(rm->last_complete);
1153 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1154 reply->trace = rm->op->pg_trace;
1155 get_parent()->send_message_osd_cluster(
11fdf7f2 1156 rm->ackerosd, reply, get_osdmap_epoch());
7c673cae
FG
1157
1158 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1159}
1160
1161
1162// ===========================================================
1163
1164void ReplicatedBackend::calc_head_subsets(
1165 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1166 const pg_missing_t& missing,
1167 const hobject_t &last_backfill,
1168 interval_set<uint64_t>& data_subset,
1169 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1170 ObcLockManager &manager)
1171{
1172 dout(10) << "calc_head_subsets " << head
1173 << " clone_overlap " << snapset.clone_overlap << dendl;
1174
1175 uint64_t size = obc->obs.oi.size;
1176 if (size)
1177 data_subset.insert(0, size);
1178
9f95a23c
TL
1179 if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS)) {
1180 const auto it = missing.get_items().find(head);
1181 assert(it != missing.get_items().end());
1182 data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
1183 dout(10) << "calc_head_subsets " << head
1184 << " data_subset " << data_subset << dendl;
1185 }
1186
7c673cae
FG
1187 if (get_parent()->get_pool().allow_incomplete_clones()) {
1188 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1189 return;
1190 }
1191
1192 if (!cct->_conf->osd_recover_clone_overlap) {
1193 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1194 return;
1195 }
1196
1197
1198 interval_set<uint64_t> cloning;
1199 interval_set<uint64_t> prev;
9f95a23c 1200 hobject_t c = head;
7c673cae
FG
1201 if (size)
1202 prev.insert(0, size);
1203
1204 for (int j=snapset.clones.size()-1; j>=0; j--) {
7c673cae
FG
1205 c.snap = snapset.clones[j];
1206 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1207 if (!missing.is_missing(c) &&
1208 c < last_backfill &&
1209 get_parent()->try_lock_for_read(c, manager)) {
1210 dout(10) << "calc_head_subsets " << head << " has prev " << c
1211 << " overlap " << prev << dendl;
9f95a23c 1212 cloning = prev;
7c673cae
FG
1213 break;
1214 }
1215 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1216 << " overlap " << prev << dendl;
1217 }
1218
9f95a23c
TL
1219 cloning.intersection_of(data_subset);
1220 if (cloning.empty()) {
1221 dout(10) << "skipping clone, nothing needs to clone" << dendl;
1222 return;
1223 }
7c673cae 1224
9f95a23c 1225 if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
7c673cae
FG
1226 dout(10) << "skipping clone, too many holes" << dendl;
1227 get_parent()->release_locks(manager);
1228 clone_subsets.clear();
1229 cloning.clear();
9f95a23c 1230 return;
7c673cae
FG
1231 }
1232
1233 // what's left for us to push?
9f95a23c 1234 clone_subsets[c] = cloning;
7c673cae
FG
1235 data_subset.subtract(cloning);
1236
1237 dout(10) << "calc_head_subsets " << head
1238 << " data_subset " << data_subset
1239 << " clone_subsets " << clone_subsets << dendl;
1240}
1241
1242void ReplicatedBackend::calc_clone_subsets(
1243 SnapSet& snapset, const hobject_t& soid,
1244 const pg_missing_t& missing,
1245 const hobject_t &last_backfill,
1246 interval_set<uint64_t>& data_subset,
1247 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1248 ObcLockManager &manager)
1249{
1250 dout(10) << "calc_clone_subsets " << soid
1251 << " clone_overlap " << snapset.clone_overlap << dendl;
1252
1253 uint64_t size = snapset.clone_size[soid.snap];
1254 if (size)
1255 data_subset.insert(0, size);
1256
1257 if (get_parent()->get_pool().allow_incomplete_clones()) {
1258 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1259 return;
1260 }
1261
1262 if (!cct->_conf->osd_recover_clone_overlap) {
1263 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1264 return;
1265 }
1266
1267 unsigned i;
1268 for (i=0; i < snapset.clones.size(); i++)
1269 if (snapset.clones[i] == soid.snap)
1270 break;
1271
1272 // any overlap with next older clone?
1273 interval_set<uint64_t> cloning;
1274 interval_set<uint64_t> prev;
1275 if (size)
1276 prev.insert(0, size);
1277 for (int j=i-1; j>=0; j--) {
1278 hobject_t c = soid;
1279 c.snap = snapset.clones[j];
1280 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1281 if (!missing.is_missing(c) &&
1282 c < last_backfill &&
1283 get_parent()->try_lock_for_read(c, manager)) {
1284 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1285 << " overlap " << prev << dendl;
1286 clone_subsets[c] = prev;
1287 cloning.union_of(prev);
1288 break;
1289 }
1290 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1291 << " overlap " << prev << dendl;
1292 }
1293
1294 // overlap with next newest?
1295 interval_set<uint64_t> next;
1296 if (size)
1297 next.insert(0, size);
1298 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1299 hobject_t c = soid;
1300 c.snap = snapset.clones[j];
1301 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1302 if (!missing.is_missing(c) &&
1303 c < last_backfill &&
1304 get_parent()->try_lock_for_read(c, manager)) {
1305 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1306 << " overlap " << next << dendl;
1307 clone_subsets[c] = next;
1308 cloning.union_of(next);
1309 break;
1310 }
1311 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1312 << " overlap " << next << dendl;
1313 }
1314
9f95a23c 1315 if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
7c673cae
FG
1316 dout(10) << "skipping clone, too many holes" << dendl;
1317 get_parent()->release_locks(manager);
1318 clone_subsets.clear();
1319 cloning.clear();
1320 }
1321
1322
1323 // what's left for us to push?
1324 data_subset.subtract(cloning);
1325
1326 dout(10) << "calc_clone_subsets " << soid
1327 << " data_subset " << data_subset
1328 << " clone_subsets " << clone_subsets << dendl;
1329}
1330
1331void ReplicatedBackend::prepare_pull(
1332 eversion_t v,
1333 const hobject_t& soid,
1334 ObjectContextRef headctx,
1335 RPGHandle *h)
1336{
9f95a23c
TL
1337 const auto missing_iter = get_parent()->get_local_missing().get_items().find(soid);
1338 ceph_assert(missing_iter != get_parent()->get_local_missing().get_items().end());
1339 eversion_t _v = missing_iter->second.need;
11fdf7f2 1340 ceph_assert(_v == v);
7c673cae
FG
1341 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1342 get_parent()->get_missing_loc_shards());
1343 const map<pg_shard_t, pg_missing_t > &peer_missing(
1344 get_parent()->get_shard_missing());
1345 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
11fdf7f2
TL
1346 ceph_assert(q != missing_loc.end());
1347 ceph_assert(!q->second.empty());
7c673cae
FG
1348
1349 // pick a pullee
81eedcae
TL
1350 auto p = q->second.end();
1351 if (cct->_conf->osd_debug_feed_pullee >= 0) {
1352 for (auto it = q->second.begin(); it != q->second.end(); it++) {
1353 if (it->osd == cct->_conf->osd_debug_feed_pullee) {
1354 p = it;
1355 break;
1356 }
1357 }
1358 }
1359 if (p == q->second.end()) {
1360 // probably because user feed a wrong pullee
1361 p = q->second.begin();
1362 std::advance(p,
1363 util::generate_random_number<int>(0,
1364 q->second.size() - 1));
1365 }
11fdf7f2 1366 ceph_assert(get_osdmap()->is_up(p->osd));
7c673cae
FG
1367 pg_shard_t fromshard = *p;
1368
1369 dout(7) << "pull " << soid
1370 << " v " << v
1371 << " on osds " << q->second
1372 << " from osd." << fromshard
1373 << dendl;
1374
11fdf7f2 1375 ceph_assert(peer_missing.count(fromshard));
7c673cae
FG
1376 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1377 if (pmissing.is_missing(soid, v)) {
11fdf7f2 1378 ceph_assert(pmissing.get_items().find(soid)->second.have != v);
7c673cae
FG
1379 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1380 << " at version " << pmissing.get_items().find(soid)->second.have
1381 << " rather than at version " << v << dendl;
1382 v = pmissing.get_items().find(soid)->second.have;
11fdf7f2 1383 ceph_assert(get_parent()->get_log().get_log().objects.count(soid) &&
7c673cae
FG
1384 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1385 pg_log_entry_t::LOST_REVERT) &&
1386 (get_parent()->get_log().get_log().objects.find(
1387 soid)->second->reverting_to ==
1388 v));
1389 }
1390
1391 ObjectRecoveryInfo recovery_info;
1392 ObcLockManager lock_manager;
1393
1394 if (soid.is_snap()) {
11fdf7f2
TL
1395 ceph_assert(!get_parent()->get_local_missing().is_missing(soid.get_head()));
1396 ceph_assert(headctx);
7c673cae
FG
1397 // check snapset
1398 SnapSetContext *ssc = headctx->ssc;
11fdf7f2 1399 ceph_assert(ssc);
7c673cae
FG
1400 dout(10) << " snapset " << ssc->snapset << dendl;
1401 recovery_info.ss = ssc->snapset;
1402 calc_clone_subsets(
1403 ssc->snapset, soid, get_parent()->get_local_missing(),
1404 get_info().last_backfill,
1405 recovery_info.copy_subset,
1406 recovery_info.clone_subset,
1407 lock_manager);
1408 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1409 dout(10) << " pulling " << recovery_info << dendl;
1410
11fdf7f2 1411 ceph_assert(ssc->snapset.clone_size.count(soid.snap));
7c673cae 1412 recovery_info.size = ssc->snapset.clone_size[soid.snap];
9f95a23c 1413 recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
7c673cae
FG
1414 } else {
1415 // pulling head or unversioned object.
1416 // always pull the whole thing.
1417 recovery_info.copy_subset.insert(0, (uint64_t)-1);
9f95a23c
TL
1418 if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS))
1419 recovery_info.copy_subset.intersection_of(missing_iter->second.clean_regions.get_dirty_regions());
7c673cae 1420 recovery_info.size = ((uint64_t)-1);
9f95a23c 1421 recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
7c673cae
FG
1422 }
1423
1424 h->pulls[fromshard].push_back(PullOp());
1425 PullOp &op = h->pulls[fromshard].back();
1426 op.soid = soid;
1427
1428 op.recovery_info = recovery_info;
1429 op.recovery_info.soid = soid;
1430 op.recovery_info.version = v;
1431 op.recovery_progress.data_complete = false;
9f95a23c
TL
1432 op.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty()
1433 && HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
7c673cae
FG
1434 op.recovery_progress.data_recovered_to = 0;
1435 op.recovery_progress.first = true;
1436
11fdf7f2 1437 ceph_assert(!pulling.count(soid));
7c673cae
FG
1438 pull_from_peer[fromshard].insert(soid);
1439 PullInfo &pi = pulling[soid];
1440 pi.from = fromshard;
1441 pi.soid = soid;
1442 pi.head_ctx = headctx;
1443 pi.recovery_info = op.recovery_info;
1444 pi.recovery_progress = op.recovery_progress;
1445 pi.cache_dont_need = h->cache_dont_need;
1446 pi.lock_manager = std::move(lock_manager);
1447}
1448
1449/*
1450 * intelligently push an object to a replica. make use of existing
1451 * clones/heads and dup data ranges where possible.
1452 */
224ce89b 1453int ReplicatedBackend::prep_push_to_replica(
7c673cae
FG
1454 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1455 PushOp *pop, bool cache_dont_need)
1456{
1457 const object_info_t& oi = obc->obs.oi;
1458 uint64_t size = obc->obs.oi.size;
1459
1460 dout(10) << __func__ << ": " << soid << " v" << oi.version
1461 << " size " << size << " to osd." << peer << dendl;
1462
1463 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1464 interval_set<uint64_t> data_subset;
1465
1466 ObcLockManager lock_manager;
1467 // are we doing a clone on the replica?
1468 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1469 hobject_t head = soid;
1470 head.snap = CEPH_NOSNAP;
1471
1472 // try to base push off of clones that succeed/preceed poid
1473 // we need the head (and current SnapSet) locally to do that.
1474 if (get_parent()->get_local_missing().is_missing(head)) {
1475 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1476 return prep_push(obc, soid, peer, pop, cache_dont_need);
1477 }
7c673cae
FG
1478
1479 SnapSetContext *ssc = obc->ssc;
11fdf7f2 1480 ceph_assert(ssc);
7c673cae
FG
1481 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1482 pop->recovery_info.ss = ssc->snapset;
1483 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1484 get_parent()->get_shard_missing().find(peer);
11fdf7f2 1485 ceph_assert(pm != get_parent()->get_shard_missing().end());
7c673cae
FG
1486 map<pg_shard_t, pg_info_t>::const_iterator pi =
1487 get_parent()->get_shard_info().find(peer);
11fdf7f2 1488 ceph_assert(pi != get_parent()->get_shard_info().end());
7c673cae
FG
1489 calc_clone_subsets(
1490 ssc->snapset, soid,
1491 pm->second,
1492 pi->second.last_backfill,
1493 data_subset, clone_subsets,
1494 lock_manager);
1495 } else if (soid.snap == CEPH_NOSNAP) {
1496 // pushing head or unversioned object.
1497 // base this on partially on replica's clones?
1498 SnapSetContext *ssc = obc->ssc;
11fdf7f2 1499 ceph_assert(ssc);
7c673cae
FG
1500 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1501 calc_head_subsets(
1502 obc,
1503 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1504 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1505 data_subset, clone_subsets,
1506 lock_manager);
1507 }
1508
224ce89b 1509 return prep_push(
7c673cae
FG
1510 obc,
1511 soid,
1512 peer,
1513 oi.version,
1514 data_subset,
1515 clone_subsets,
1516 pop,
1517 cache_dont_need,
1518 std::move(lock_manager));
1519}
1520
224ce89b 1521int ReplicatedBackend::prep_push(ObjectContextRef obc,
7c673cae
FG
1522 const hobject_t& soid, pg_shard_t peer,
1523 PushOp *pop, bool cache_dont_need)
1524{
1525 interval_set<uint64_t> data_subset;
1526 if (obc->obs.oi.size)
1527 data_subset.insert(0, obc->obs.oi.size);
1528 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1529
224ce89b 1530 return prep_push(obc, soid, peer,
7c673cae
FG
1531 obc->obs.oi.version, data_subset, clone_subsets,
1532 pop, cache_dont_need, ObcLockManager());
1533}
1534
224ce89b 1535int ReplicatedBackend::prep_push(
7c673cae
FG
1536 ObjectContextRef obc,
1537 const hobject_t& soid, pg_shard_t peer,
1538 eversion_t version,
1539 interval_set<uint64_t> &data_subset,
1540 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1541 PushOp *pop,
1542 bool cache_dont_need,
1543 ObcLockManager &&lock_manager)
1544{
1545 get_parent()->begin_peer_recover(peer, soid);
9f95a23c
TL
1546 const auto pmissing_iter = get_parent()->get_shard_missing().find(peer);
1547 const auto missing_iter = pmissing_iter->second.get_items().find(soid);
1548 assert(missing_iter != pmissing_iter->second.get_items().end());
7c673cae
FG
1549 // take note.
1550 PushInfo &pi = pushing[soid][peer];
1551 pi.obc = obc;
1552 pi.recovery_info.size = obc->obs.oi.size;
1553 pi.recovery_info.copy_subset = data_subset;
1554 pi.recovery_info.clone_subset = clone_subsets;
1555 pi.recovery_info.soid = soid;
1556 pi.recovery_info.oi = obc->obs.oi;
1557 pi.recovery_info.ss = pop->recovery_info.ss;
1558 pi.recovery_info.version = version;
9f95a23c
TL
1559 pi.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
1560 pi.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty() &&
1561 HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
7c673cae
FG
1562 pi.lock_manager = std::move(lock_manager);
1563
1564 ObjectRecoveryProgress new_progress;
1565 int r = build_push_op(pi.recovery_info,
1566 pi.recovery_progress,
1567 &new_progress,
1568 pop,
1569 &(pi.stat), cache_dont_need);
224ce89b
WB
1570 if (r < 0)
1571 return r;
7c673cae 1572 pi.recovery_progress = new_progress;
224ce89b 1573 return 0;
7c673cae
FG
1574}
1575
1576void ReplicatedBackend::submit_push_data(
1577 const ObjectRecoveryInfo &recovery_info,
1578 bool first,
1579 bool complete,
9f95a23c 1580 bool clear_omap,
7c673cae 1581 bool cache_dont_need,
9f95a23c 1582 interval_set<uint64_t> &data_zeros,
7c673cae
FG
1583 const interval_set<uint64_t> &intervals_included,
1584 bufferlist data_included,
1585 bufferlist omap_header,
1586 const map<string, bufferlist> &attrs,
1587 const map<string, bufferlist> &omap_entries,
1588 ObjectStore::Transaction *t)
1589{
1590 hobject_t target_oid;
1591 if (first && complete) {
1592 target_oid = recovery_info.soid;
1593 } else {
1594 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1595 recovery_info.version);
1596 if (first) {
1597 dout(10) << __func__ << ": Adding oid "
1598 << target_oid << " in the temp collection" << dendl;
1599 add_temp_obj(target_oid);
1600 }
1601 }
1602
1603 if (first) {
9f95a23c
TL
1604 if (!complete) {
1605 t->remove(coll, ghobject_t(target_oid));
1606 t->touch(coll, ghobject_t(target_oid));
1607 bufferlist bv = attrs.at(OI_ATTR);
1608 object_info_t oi(bv);
1609 t->set_alloc_hint(coll, ghobject_t(target_oid),
1610 oi.expected_object_size,
1611 oi.expected_write_size,
1612 oi.alloc_hint_flags);
1613 } else {
1614 if (!recovery_info.object_exist) {
1615 t->remove(coll, ghobject_t(target_oid));
1616 t->touch(coll, ghobject_t(target_oid));
1617 bufferlist bv = attrs.at(OI_ATTR);
1618 object_info_t oi(bv);
1619 t->set_alloc_hint(coll, ghobject_t(target_oid),
1620 oi.expected_object_size,
1621 oi.expected_write_size,
1622 oi.alloc_hint_flags);
1623 }
1624 //remove xattr and update later if overwrite on original object
1625 t->rmattrs(coll, ghobject_t(target_oid));
1626 //if need update omap, clear the previous content first
1627 if (clear_omap)
1628 t->omap_clear(coll, ghobject_t(target_oid));
1629 }
1630
7c673cae 1631 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
9f95a23c 1632 if (omap_header.length())
7c673cae
FG
1633 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1634
9f95a23c
TL
1635 struct stat st;
1636 int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
11fdf7f2 1637 if (get_parent()->pg_is_remote_backfilling()) {
11fdf7f2 1638 uint64_t size = 0;
9f95a23c 1639 if (r == 0)
11fdf7f2 1640 size = st.st_size;
11fdf7f2
TL
1641 // Don't need to do anything if object is still the same size
1642 if (size != recovery_info.oi.size) {
1643 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1644 get_parent()->pg_add_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1645 dout(10) << __func__ << " " << recovery_info.soid
1646 << " backfill size " << recovery_info.oi.size
1647 << " previous size " << size
1648 << " net size " << recovery_info.oi.size - size
1649 << dendl;
1650 }
1651 }
9f95a23c
TL
1652 if (!complete) {
1653 //clone overlap content in local object
1654 if (recovery_info.object_exist) {
1655 assert(r == 0);
1656 uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
1657 interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
1658 if (local_size) {
1659 local_intervals_included.insert(0, local_size);
1660 local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
1661 local_intervals_included.subtract(local_intervals_excluded);
1662 }
1663 for (interval_set<uint64_t>::const_iterator q = local_intervals_included.begin();
1664 q != local_intervals_included.end();
1665 ++q) {
1666 dout(15) << " clone_range " << recovery_info.soid << " "
1667 << q.get_start() << "~" << q.get_len() << dendl;
1668 t->clone_range(coll, ghobject_t(recovery_info.soid), ghobject_t(target_oid),
1669 q.get_start(), q.get_len(), q.get_start());
1670 }
1671 }
1672 }
7c673cae
FG
1673 }
1674 uint64_t off = 0;
1675 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1676 if (cache_dont_need)
1677 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
9f95a23c
TL
1678 // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
1679 if (data_zeros.size() > 0) {
1680 data_zeros.intersection_of(recovery_info.copy_subset);
1681 assert(intervals_included.subset_of(data_zeros));
1682 data_zeros.subtract(intervals_included);
1683
1684 dout(20) << __func__ <<" recovering object " << recovery_info.soid
1685 << " copy_subset: " << recovery_info.copy_subset
1686 << " intervals_included: " << intervals_included
1687 << " data_zeros: " << data_zeros << dendl;
1688
1689 for (auto p = data_zeros.begin(); p != data_zeros.end(); ++p)
1690 t->zero(coll, ghobject_t(target_oid), p.get_start(), p.get_len());
1691 }
7c673cae
FG
1692 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1693 p != intervals_included.end();
1694 ++p) {
1695 bufferlist bit;
1696 bit.substr_of(data_included, off, p.get_len());
1697 t->write(coll, ghobject_t(target_oid),
1698 p.get_start(), p.get_len(), bit, fadvise_flags);
1699 off += p.get_len();
1700 }
1701
1702 if (!omap_entries.empty())
1703 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1704 if (!attrs.empty())
1705 t->setattrs(coll, ghobject_t(target_oid), attrs);
1706
1707 if (complete) {
1708 if (!first) {
1709 dout(10) << __func__ << ": Removing oid "
9f95a23c 1710 << target_oid << " from the temp collection" << dendl;
7c673cae
FG
1711 clear_temp_obj(target_oid);
1712 t->remove(coll, ghobject_t(recovery_info.soid));
1713 t->collection_move_rename(coll, ghobject_t(target_oid),
9f95a23c 1714 coll, ghobject_t(recovery_info.soid));
7c673cae
FG
1715 }
1716
1717 submit_push_complete(recovery_info, t);
9f95a23c 1718
7c673cae
FG
1719 }
1720}
1721
1722void ReplicatedBackend::submit_push_complete(
1723 const ObjectRecoveryInfo &recovery_info,
1724 ObjectStore::Transaction *t)
1725{
1726 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1727 recovery_info.clone_subset.begin();
1728 p != recovery_info.clone_subset.end();
1729 ++p) {
1730 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1731 q != p->second.end();
1732 ++q) {
1733 dout(15) << " clone_range " << p->first << " "
1734 << q.get_start() << "~" << q.get_len() << dendl;
1735 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1736 q.get_start(), q.get_len(), q.get_start());
1737 }
1738 }
1739}
1740
1741ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1742 const ObjectRecoveryInfo& recovery_info,
1743 SnapSetContext *ssc,
1744 ObcLockManager &manager)
1745{
1746 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1747 return recovery_info;
1748 ObjectRecoveryInfo new_info = recovery_info;
1749 new_info.copy_subset.clear();
1750 new_info.clone_subset.clear();
11fdf7f2 1751 ceph_assert(ssc);
7c673cae
FG
1752 get_parent()->release_locks(manager); // might already have locks
1753 calc_clone_subsets(
1754 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1755 get_info().last_backfill,
1756 new_info.copy_subset, new_info.clone_subset,
1757 manager);
1758 return new_info;
1759}
1760
1761bool ReplicatedBackend::handle_pull_response(
1762 pg_shard_t from, const PushOp &pop, PullOp *response,
1763 list<pull_complete_info> *to_continue,
1764 ObjectStore::Transaction *t)
1765{
1766 interval_set<uint64_t> data_included = pop.data_included;
1767 bufferlist data;
1768 data = pop.data;
1769 dout(10) << "handle_pull_response "
1770 << pop.recovery_info
1771 << pop.after_progress
1772 << " data.size() is " << data.length()
1773 << " data_included: " << data_included
1774 << dendl;
1775 if (pop.version == eversion_t()) {
1776 // replica doesn't have it!
224ce89b 1777 _failed_pull(from, pop.soid);
7c673cae
FG
1778 return false;
1779 }
1780
1781 const hobject_t &hoid = pop.soid;
11fdf7f2 1782 ceph_assert((data_included.empty() && data.length() == 0) ||
9f95a23c 1783 (!data_included.empty() && data.length() > 0));
7c673cae
FG
1784
1785 auto piter = pulling.find(hoid);
1786 if (piter == pulling.end()) {
1787 return false;
1788 }
1789
1790 PullInfo &pi = piter->second;
1791 if (pi.recovery_info.size == (uint64_t(-1))) {
1792 pi.recovery_info.size = pop.recovery_info.size;
1793 pi.recovery_info.copy_subset.intersection_of(
1794 pop.recovery_info.copy_subset);
1795 }
224ce89b
WB
1796 // If primary doesn't have object info and didn't know version
1797 if (pi.recovery_info.version == eversion_t()) {
1798 pi.recovery_info.version = pop.version;
1799 }
7c673cae
FG
1800
1801 bool first = pi.recovery_progress.first;
1802 if (first) {
1803 // attrs only reference the origin bufferlist (decode from
1804 // MOSDPGPush message) whose size is much greater than attrs in
1805 // recovery. If obc cache it (get_obc maybe cache the attr), this
1806 // causes the whole origin bufferlist would not be free until obc
1807 // is evicted from obc cache. So rebuild the bufferlists before
1808 // cache it.
1809 auto attrset = pop.attrset;
1810 for (auto& a : attrset) {
1811 a.second.rebuild();
1812 }
1813 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
9f95a23c
TL
1814 if (attrset.find(SS_ATTR) != attrset.end()) {
1815 bufferlist ssbv = attrset.at(SS_ATTR);
1816 SnapSet ss(ssbv);
1817 assert(!pi.obc->ssc->exists || ss.seq == pi.obc->ssc->snapset.seq);
1818 }
7c673cae
FG
1819 pi.recovery_info.oi = pi.obc->obs.oi;
1820 pi.recovery_info = recalc_subsets(
1821 pi.recovery_info,
1822 pi.obc->ssc,
1823 pi.lock_manager);
1824 }
1825
1826
1827 interval_set<uint64_t> usable_intervals;
1828 bufferlist usable_data;
1829 trim_pushed_data(pi.recovery_info.copy_subset,
1830 data_included,
1831 data,
1832 &usable_intervals,
1833 &usable_data);
1834 data_included = usable_intervals;
1835 data.claim(usable_data);
1836
1837
1838 pi.recovery_progress = pop.after_progress;
1839
1840 dout(10) << "new recovery_info " << pi.recovery_info
9f95a23c
TL
1841 << ", new progress " << pi.recovery_progress
1842 << dendl;
1843 interval_set<uint64_t> data_zeros;
1844 uint64_t z_offset = pop.before_progress.data_recovered_to;
1845 uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
1846 if(z_length)
1847 data_zeros.insert(z_offset, z_length);
7c673cae 1848 bool complete = pi.is_complete();
9f95a23c
TL
1849 bool clear_omap = !pop.before_progress.omap_complete;
1850
1851 submit_push_data(pi.recovery_info,
1852 first,
1853 complete,
1854 clear_omap,
1855 pi.cache_dont_need,
1856 data_zeros,
1857 data_included,
1858 data,
1859 pop.omap_header,
1860 pop.attrset,
1861 pop.omap_entries,
1862 t);
7c673cae
FG
1863
1864 pi.stat.num_keys_recovered += pop.omap_entries.size();
1865 pi.stat.num_bytes_recovered += data.length();
11fdf7f2 1866 get_parent()->get_logger()->inc(l_osd_rbytes, pop.omap_entries.size() + data.length());
7c673cae
FG
1867
1868 if (complete) {
1869 pi.stat.num_objects_recovered++;
11fdf7f2
TL
1870 // XXX: This could overcount if regular recovery is needed right after a repair
1871 if (get_parent()->pg_is_repair()) {
1872 pi.stat.num_objects_repaired++;
1873 get_parent()->inc_osd_stat_repaired();
1874 }
7c673cae
FG
1875 clear_pull_from(piter);
1876 to_continue->push_back({hoid, pi.stat});
1877 get_parent()->on_local_recover(
c07f9fc5 1878 hoid, pi.recovery_info, pi.obc, false, t);
7c673cae
FG
1879 return false;
1880 } else {
1881 response->soid = pop.soid;
1882 response->recovery_info = pi.recovery_info;
1883 response->recovery_progress = pi.recovery_progress;
1884 return true;
1885 }
1886}
1887
1888void ReplicatedBackend::handle_push(
1889 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
11fdf7f2 1890 ObjectStore::Transaction *t, bool is_repair)
7c673cae
FG
1891{
1892 dout(10) << "handle_push "
1893 << pop.recovery_info
1894 << pop.after_progress
1895 << dendl;
1896 bufferlist data;
1897 data = pop.data;
1898 bool first = pop.before_progress.first;
1899 bool complete = pop.after_progress.data_complete &&
1900 pop.after_progress.omap_complete;
9f95a23c
TL
1901 bool clear_omap = !pop.before_progress.omap_complete;
1902 interval_set<uint64_t> data_zeros;
1903 uint64_t z_offset = pop.before_progress.data_recovered_to;
1904 uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
1905 if(z_length)
1906 data_zeros.insert(z_offset, z_length);
7c673cae 1907 response->soid = pop.recovery_info.soid;
9f95a23c 1908
7c673cae
FG
1909 submit_push_data(pop.recovery_info,
1910 first,
1911 complete,
9f95a23c 1912 clear_omap,
7c673cae 1913 true, // must be replicate
9f95a23c 1914 data_zeros,
7c673cae
FG
1915 pop.data_included,
1916 data,
1917 pop.omap_header,
1918 pop.attrset,
1919 pop.omap_entries,
1920 t);
1921
11fdf7f2
TL
1922 if (complete) {
1923 if (is_repair) {
1924 get_parent()->inc_osd_stat_repaired();
1925 dout(20) << __func__ << " repair complete" << dendl;
1926 }
7c673cae
FG
1927 get_parent()->on_local_recover(
1928 pop.recovery_info.soid,
1929 pop.recovery_info,
1930 ObjectContextRef(), // ok, is replica
c07f9fc5 1931 false,
7c673cae 1932 t);
11fdf7f2 1933 }
7c673cae
FG
1934}
1935
1936void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1937{
1938 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1939 i != pushes.end();
1940 ++i) {
1941 ConnectionRef con = get_parent()->get_con_osd_cluster(
1942 i->first.osd,
11fdf7f2 1943 get_osdmap_epoch());
7c673cae
FG
1944 if (!con)
1945 continue;
1946 vector<PushOp>::iterator j = i->second.begin();
1947 while (j != i->second.end()) {
1948 uint64_t cost = 0;
1949 uint64_t pushes = 0;
1950 MOSDPGPush *msg = new MOSDPGPush();
1951 msg->from = get_parent()->whoami_shard();
1952 msg->pgid = get_parent()->primary_spg_t();
11fdf7f2 1953 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1954 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1955 msg->set_priority(prio);
11fdf7f2 1956 msg->is_repair = get_parent()->pg_is_repair();
7c673cae
FG
1957 for (;
1958 (j != i->second.end() &&
1959 cost < cct->_conf->osd_max_push_cost &&
1960 pushes < cct->_conf->osd_max_push_objects) ;
1961 ++j) {
1962 dout(20) << __func__ << ": sending push " << *j
1963 << " to osd." << i->first << dendl;
1964 cost += j->cost(cct);
1965 pushes += 1;
1966 msg->pushes.push_back(*j);
1967 }
1968 msg->set_cost(cost);
1969 get_parent()->send_message_osd_cluster(msg, con);
1970 }
1971 }
1972}
1973
1974void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1975{
1976 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1977 i != pulls.end();
1978 ++i) {
1979 ConnectionRef con = get_parent()->get_con_osd_cluster(
1980 i->first.osd,
11fdf7f2 1981 get_osdmap_epoch());
7c673cae
FG
1982 if (!con)
1983 continue;
1984 dout(20) << __func__ << ": sending pulls " << i->second
1985 << " to osd." << i->first << dendl;
1986 MOSDPGPull *msg = new MOSDPGPull();
1987 msg->from = parent->whoami_shard();
1988 msg->set_priority(prio);
1989 msg->pgid = get_parent()->primary_spg_t();
11fdf7f2 1990 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1991 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1992 msg->set_pulls(&i->second);
1993 msg->compute_cost(cct);
1994 get_parent()->send_message_osd_cluster(msg, con);
1995 }
1996}
1997
1998int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1999 const ObjectRecoveryProgress &progress,
2000 ObjectRecoveryProgress *out_progress,
2001 PushOp *out_op,
2002 object_stat_sum_t *stat,
2003 bool cache_dont_need)
2004{
2005 ObjectRecoveryProgress _new_progress;
2006 if (!out_progress)
2007 out_progress = &_new_progress;
2008 ObjectRecoveryProgress &new_progress = *out_progress;
2009 new_progress = progress;
2010
224ce89b 2011 dout(7) << __func__ << " " << recovery_info.soid
7c673cae
FG
2012 << " v " << recovery_info.version
2013 << " size " << recovery_info.size
2014 << " recovery_info: " << recovery_info
2015 << dendl;
2016
224ce89b 2017 eversion_t v = recovery_info.version;
11fdf7f2 2018 object_info_t oi;
7c673cae 2019 if (progress.first) {
11fdf7f2 2020 int r = store->omap_get_header(ch, ghobject_t(recovery_info.soid), &out_op->omap_header);
7c673cae
FG
2021 if(r < 0) {
2022 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
2023 return r;
2024 }
2025 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
2026 if(r < 0) {
2027 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
2028 return r;
2029 }
2030
2031 // Debug
2032 bufferlist bv = out_op->attrset[OI_ATTR];
224ce89b 2033 try {
11fdf7f2
TL
2034 auto bliter = bv.cbegin();
2035 decode(oi, bliter);
224ce89b
WB
2036 } catch (...) {
2037 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
2038 return -EINVAL;
2039 }
7c673cae 2040
224ce89b
WB
2041 // If requestor didn't know the version, use ours
2042 if (v == eversion_t()) {
2043 v = oi.version;
2044 } else if (oi.version != v) {
7c673cae
FG
2045 get_parent()->clog_error() << get_info().pgid << " push "
2046 << recovery_info.soid << " v "
2047 << recovery_info.version
2048 << " failed because local copy is "
2049 << oi.version;
2050 return -EINVAL;
2051 }
2052
2053 new_progress.first = false;
2054 }
224ce89b
WB
2055 // Once we provide the version subsequent requests will have it, so
2056 // at this point it must be known.
11fdf7f2 2057 ceph_assert(v != eversion_t());
7c673cae
FG
2058
2059 uint64_t available = cct->_conf->osd_recovery_max_chunk;
2060 if (!progress.omap_complete) {
2061 ObjectMap::ObjectMapIterator iter =
11fdf7f2 2062 store->get_omap_iterator(ch,
7c673cae 2063 ghobject_t(recovery_info.soid));
11fdf7f2 2064 ceph_assert(iter);
7c673cae
FG
2065 for (iter->lower_bound(progress.omap_recovered_to);
2066 iter->valid();
11fdf7f2 2067 iter->next()) {
7c673cae
FG
2068 if (!out_op->omap_entries.empty() &&
2069 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
2070 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
2071 available <= iter->key().size() + iter->value().length()))
2072 break;
2073 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
2074
2075 if ((iter->key().size() + iter->value().length()) <= available)
2076 available -= (iter->key().size() + iter->value().length());
2077 else
2078 available = 0;
2079 }
2080 if (!iter->valid())
2081 new_progress.omap_complete = true;
2082 else
2083 new_progress.omap_recovered_to = iter->key();
2084 }
2085
2086 if (available > 0) {
2087 if (!recovery_info.copy_subset.empty()) {
2088 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2089 map<uint64_t, uint64_t> m;
2090 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2091 copy_subset.range_end(), m);
2092 if (r >= 0) {
9f95a23c 2093 interval_set<uint64_t> fiemap_included(std::move(m));
7c673cae
FG
2094 copy_subset.intersection_of(fiemap_included);
2095 } else {
2096 // intersection of copy_subset and empty interval_set would be empty anyway
2097 copy_subset.clear();
2098 }
2099
2100 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2101 available);
2102 if (out_op->data_included.empty()) // zero filled section, skip to end!
2103 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2104 else
2105 new_progress.data_recovered_to = out_op->data_included.range_end();
2106 }
2107 } else {
2108 out_op->data_included.clear();
2109 }
2110
9f95a23c
TL
2111 auto origin_size = out_op->data_included.size();
2112 bufferlist bit;
2113 int r = store->readv(ch, ghobject_t(recovery_info.soid),
2114 out_op->data_included, bit,
2115 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2116 if (cct->_conf->osd_debug_random_push_read_error &&
224ce89b 2117 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
9f95a23c
TL
2118 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2119 r = -EIO;
7c673cae 2120 }
9f95a23c
TL
2121 if (r < 0) {
2122 return r;
2123 }
2124 if (out_op->data_included.size() != origin_size) {
2125 dout(10) << __func__ << " some extents get pruned "
2126 << out_op->data_included.size() << "/" << origin_size
2127 << dendl;
2128 new_progress.data_complete = true;
2129 }
2130 out_op->data.claim_append(bit);
eafe8130
TL
2131 if (progress.first && !out_op->data_included.empty() &&
2132 out_op->data_included.begin().get_start() == 0 &&
11fdf7f2
TL
2133 out_op->data.length() == oi.size && oi.is_data_digest()) {
2134 uint32_t crc = out_op->data.crc32c(-1);
2135 if (oi.data_digest != crc) {
2136 dout(0) << __func__ << " " << coll << std::hex
2137 << " full-object read crc 0x" << crc
2138 << " != expected 0x" << oi.data_digest
2139 << std::dec << " on " << recovery_info.soid << dendl;
2140 return -EIO;
2141 }
2142 }
7c673cae
FG
2143
2144 if (new_progress.is_complete(recovery_info)) {
2145 new_progress.data_complete = true;
11fdf7f2 2146 if (stat) {
7c673cae 2147 stat->num_objects_recovered++;
11fdf7f2
TL
2148 if (get_parent()->pg_is_repair())
2149 stat->num_objects_repaired++;
2150 }
9f95a23c
TL
2151 } else if (progress.first && progress.omap_complete) {
2152 // If omap is not changed, we need recovery omap when recovery cannot be completed once
2153 new_progress.omap_complete = false;
7c673cae
FG
2154 }
2155
2156 if (stat) {
2157 stat->num_keys_recovered += out_op->omap_entries.size();
2158 stat->num_bytes_recovered += out_op->data.length();
11fdf7f2 2159 get_parent()->get_logger()->inc(l_osd_rbytes, out_op->omap_entries.size() + out_op->data.length());
7c673cae
FG
2160 }
2161
2162 get_parent()->get_logger()->inc(l_osd_push);
2163 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2164
2165 // send
224ce89b 2166 out_op->version = v;
7c673cae
FG
2167 out_op->soid = recovery_info.soid;
2168 out_op->recovery_info = recovery_info;
2169 out_op->after_progress = new_progress;
2170 out_op->before_progress = progress;
2171 return 0;
2172}
2173
2174void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2175{
2176 op->recovery_info.version = eversion_t();
2177 op->version = eversion_t();
2178 op->soid = soid;
2179}
2180
2181bool ReplicatedBackend::handle_push_reply(
2182 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2183{
2184 const hobject_t &soid = op.soid;
2185 if (pushing.count(soid) == 0) {
2186 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2187 << ", or anybody else"
2188 << dendl;
2189 return false;
2190 } else if (pushing[soid].count(peer) == 0) {
2191 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2192 << dendl;
2193 return false;
2194 } else {
2195 PushInfo *pi = &pushing[soid][peer];
224ce89b 2196 bool error = pushing[soid].begin()->second.recovery_progress.error;
7c673cae 2197
224ce89b 2198 if (!pi->recovery_progress.data_complete && !error) {
7c673cae
FG
2199 dout(10) << " pushing more from, "
2200 << pi->recovery_progress.data_recovered_to
2201 << " of " << pi->recovery_info.copy_subset << dendl;
2202 ObjectRecoveryProgress new_progress;
2203 int r = build_push_op(
2204 pi->recovery_info,
2205 pi->recovery_progress, &new_progress, reply,
2206 &(pi->stat));
224ce89b 2207 // Handle the case of a read error right after we wrote, which is
11fdf7f2 2208 // hopefully extremely rare.
224ce89b
WB
2209 if (r < 0) {
2210 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2211
2212 error = true;
2213 goto done;
2214 }
7c673cae
FG
2215 pi->recovery_progress = new_progress;
2216 return true;
2217 } else {
2218 // done!
224ce89b
WB
2219done:
2220 if (!error)
2221 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
7c673cae
FG
2222
2223 get_parent()->release_locks(pi->lock_manager);
2224 object_stat_sum_t stat = pi->stat;
224ce89b 2225 eversion_t v = pi->recovery_info.version;
7c673cae
FG
2226 pushing[soid].erase(peer);
2227 pi = NULL;
2228
2229 if (pushing[soid].empty()) {
224ce89b 2230 if (!error)
c07f9fc5 2231 get_parent()->on_global_recover(soid, stat, false);
224ce89b 2232 else
9f95a23c
TL
2233 get_parent()->on_failed_pull(
2234 std::set<pg_shard_t>{ get_parent()->whoami_shard() },
2235 soid,
2236 v);
7c673cae
FG
2237 pushing.erase(soid);
2238 } else {
224ce89b
WB
2239 // This looks weird, but we erased the current peer and need to remember
2240 // the error on any other one, while getting more acks.
2241 if (error)
2242 pushing[soid].begin()->second.recovery_progress.error = true;
7c673cae
FG
2243 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2244 << pushing[soid].size() << " others" << dendl;
2245 }
2246 return false;
2247 }
2248 }
2249}
2250
2251void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2252{
2253 const hobject_t &soid = op.soid;
2254 struct stat st;
2255 int r = store->stat(ch, ghobject_t(soid), &st);
2256 if (r != 0) {
2257 get_parent()->clog_error() << get_info().pgid << " "
2258 << peer << " tried to pull " << soid
2259 << " but got " << cpp_strerror(-r);
2260 prep_push_op_blank(soid, reply);
2261 } else {
2262 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2263 ObjectRecoveryProgress &progress = op.recovery_progress;
2264 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2265 // Adjust size and copy_subset
2266 recovery_info.size = st.st_size;
9f95a23c
TL
2267 if (st.st_size) {
2268 interval_set<uint64_t> object_range;
2269 object_range.insert(0, st.st_size);
2270 recovery_info.copy_subset.intersection_of(object_range);
2271 } else {
2272 recovery_info.copy_subset.clear();
2273 }
2274 assert(recovery_info.clone_subset.empty());
7c673cae
FG
2275 }
2276
2277 r = build_push_op(recovery_info, progress, 0, reply);
2278 if (r < 0)
2279 prep_push_op_blank(soid, reply);
2280 }
2281}
2282
2283/**
2284 * trim received data to remove what we don't want
2285 *
2286 * @param copy_subset intervals we want
2287 * @param data_included intervals we got
2288 * @param data_recieved data we got
2289 * @param intervals_usable intervals we want to keep
2290 * @param data_usable matching data we want to keep
2291 */
2292void ReplicatedBackend::trim_pushed_data(
2293 const interval_set<uint64_t> &copy_subset,
2294 const interval_set<uint64_t> &intervals_received,
2295 bufferlist data_received,
2296 interval_set<uint64_t> *intervals_usable,
2297 bufferlist *data_usable)
2298{
2299 if (intervals_received.subset_of(copy_subset)) {
2300 *intervals_usable = intervals_received;
2301 *data_usable = data_received;
2302 return;
2303 }
2304
2305 intervals_usable->intersection_of(copy_subset,
2306 intervals_received);
2307
2308 uint64_t off = 0;
2309 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2310 p != intervals_received.end();
2311 ++p) {
2312 interval_set<uint64_t> x;
2313 x.insert(p.get_start(), p.get_len());
2314 x.intersection_of(copy_subset);
2315 for (interval_set<uint64_t>::const_iterator q = x.begin();
2316 q != x.end();
2317 ++q) {
2318 bufferlist sub;
2319 uint64_t data_off = off + (q.get_start() - p.get_start());
2320 sub.substr_of(data_received, data_off, q.get_len());
2321 data_usable->claim_append(sub);
2322 }
2323 off += p.get_len();
2324 }
2325}
2326
224ce89b 2327void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
7c673cae 2328{
224ce89b 2329 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
81eedcae
TL
2330 auto it = pulling.find(soid);
2331 assert(it != pulling.end());
9f95a23c
TL
2332 get_parent()->on_failed_pull(
2333 { from },
2334 soid,
2335 it->second.recovery_info.version);
7c673cae 2336
81eedcae 2337 clear_pull(it);
7c673cae
FG
2338}
2339
2340void ReplicatedBackend::clear_pull_from(
2341 map<hobject_t, PullInfo>::iterator piter)
2342{
2343 auto from = piter->second.from;
2344 pull_from_peer[from].erase(piter->second.soid);
2345 if (pull_from_peer[from].empty())
2346 pull_from_peer.erase(from);
2347}
2348
2349void ReplicatedBackend::clear_pull(
2350 map<hobject_t, PullInfo>::iterator piter,
2351 bool clear_pull_from_peer)
2352{
2353 if (clear_pull_from_peer) {
2354 clear_pull_from(piter);
2355 }
2356 get_parent()->release_locks(piter->second.lock_manager);
2357 pulling.erase(piter);
2358}
2359
2360int ReplicatedBackend::start_pushes(
2361 const hobject_t &soid,
2362 ObjectContextRef obc,
2363 RPGHandle *h)
2364{
224ce89b
WB
2365 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2366
2367 dout(20) << __func__ << " soid " << soid << dendl;
7c673cae 2368 // who needs it?
11fdf7f2 2369 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
7c673cae 2370 for (set<pg_shard_t>::iterator i =
11fdf7f2
TL
2371 get_parent()->get_acting_recovery_backfill_shards().begin();
2372 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2373 ++i) {
2374 if (*i == get_parent()->whoami_shard()) continue;
2375 pg_shard_t peer = *i;
2376 map<pg_shard_t, pg_missing_t>::const_iterator j =
2377 get_parent()->get_shard_missing().find(peer);
11fdf7f2 2378 ceph_assert(j != get_parent()->get_shard_missing().end());
7c673cae 2379 if (j->second.is_missing(soid)) {
224ce89b
WB
2380 shards.push_back(j);
2381 }
2382 }
2383
2384 // If more than 1 read will occur ignore possible request to not cache
2385 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2386
2387 for (auto j : shards) {
2388 pg_shard_t peer = j->first;
2389 h->pushes[peer].push_back(PushOp());
2390 int r = prep_push_to_replica(obc, soid, peer,
2391 &(h->pushes[peer].back()), cache);
2392 if (r < 0) {
2393 // Back out all failed reads
2394 for (auto k : shards) {
2395 pg_shard_t p = k->first;
2396 dout(10) << __func__ << " clean up peer " << p << dendl;
2397 h->pushes[p].pop_back();
2398 if (p == peer) break;
2399 }
2400 return r;
7c673cae
FG
2401 }
2402 }
224ce89b 2403 return shards.size();
7c673cae 2404}