]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ReplicatedBackend.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "common/errno.h"
15#include "ReplicatedBackend.h"
16#include "messages/MOSDOp.h"
7c673cae 17#include "messages/MOSDRepOp.h"
7c673cae
FG
18#include "messages/MOSDRepOpReply.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPull.h"
21#include "messages/MOSDPGPushReply.h"
22#include "common/EventTrace.h"
11fdf7f2 23#include "include/random.h"
81eedcae 24#include "include/util.h"
11fdf7f2 25#include "OSD.h"
7c673cae
FG
26
27#define dout_context cct
28#define dout_subsys ceph_subsys_osd
29#define DOUT_PREFIX_ARGS this
30#undef dout_prefix
31#define dout_prefix _prefix(_dout, this)
32static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
11fdf7f2 33 return pgb->get_parent()->gen_dbg_prefix(*_dout);
7c673cae
FG
34}
35
f67539c2
TL
36using std::list;
37using std::make_pair;
38using std::map;
39using std::ostringstream;
40using std::set;
41using std::pair;
42using std::string;
43using std::unique_ptr;
44using std::vector;
45
46using ceph::bufferhash;
47using ceph::bufferlist;
48using ceph::decode;
49using ceph::encode;
50
7c673cae
FG
51namespace {
52class PG_SendMessageOnConn: public Context {
53 PGBackend::Listener *pg;
54 Message *reply;
55 ConnectionRef conn;
56 public:
57 PG_SendMessageOnConn(
58 PGBackend::Listener *pg,
59 Message *reply,
60 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
61 void finish(int) override {
f67539c2 62 pg->send_message_osd_cluster(MessageRef(reply, false), conn.get());
7c673cae
FG
63 }
64};
65
66class PG_RecoveryQueueAsync : public Context {
67 PGBackend::Listener *pg;
68 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
69 public:
70 PG_RecoveryQueueAsync(
71 PGBackend::Listener *pg,
72 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
73 void finish(int) override {
74 pg->schedule_recovery_work(c.release());
75 }
76};
77}
78
7c673cae
FG
79struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
80 ReplicatedBackend *pg;
81 RepModifyRef rm;
82 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
83 : pg(pg), rm(r) {}
84 void finish(int r) override {
85 pg->repop_commit(rm);
86 }
87};
88
89static void log_subop_stats(
90 PerfCounters *logger,
91 OpRequestRef op, int subop)
92{
f67539c2 93 utime_t latency = ceph_clock_now();
7c673cae
FG
94 latency -= op->get_req()->get_recv_stamp();
95
96
97 logger->inc(l_osd_sop);
98 logger->tinc(l_osd_sop_lat, latency);
99 logger->inc(subop);
100
101 if (subop != l_osd_sop_pull) {
102 uint64_t inb = op->get_req()->get_data().length();
103 logger->inc(l_osd_sop_inb, inb);
104 if (subop == l_osd_sop_w) {
105 logger->inc(l_osd_sop_w_inb, inb);
106 logger->tinc(l_osd_sop_w_lat, latency);
107 } else if (subop == l_osd_sop_push) {
108 logger->inc(l_osd_sop_push_inb, inb);
109 logger->tinc(l_osd_sop_push_lat, latency);
110 } else
11fdf7f2 111 ceph_abort_msg("no support subop");
7c673cae
FG
112 } else {
113 logger->tinc(l_osd_sop_pull_lat, latency);
114 }
115}
116
117ReplicatedBackend::ReplicatedBackend(
118 PGBackend::Listener *pg,
11fdf7f2 119 const coll_t &coll,
7c673cae
FG
120 ObjectStore::CollectionHandle &c,
121 ObjectStore *store,
122 CephContext *cct) :
123 PGBackend(cct, pg, store, coll, c) {}
124
125void ReplicatedBackend::run_recovery_op(
126 PGBackend::RecoveryHandle *_h,
127 int priority)
128{
129 RPGHandle *h = static_cast<RPGHandle *>(_h);
130 send_pushes(priority, h->pushes);
131 send_pulls(priority, h->pulls);
c07f9fc5 132 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
133 delete h;
134}
135
224ce89b 136int ReplicatedBackend::recover_object(
7c673cae
FG
137 const hobject_t &hoid,
138 eversion_t v,
139 ObjectContextRef head,
140 ObjectContextRef obc,
141 RecoveryHandle *_h
142 )
143{
144 dout(10) << __func__ << ": " << hoid << dendl;
145 RPGHandle *h = static_cast<RPGHandle *>(_h);
146 if (get_parent()->get_local_missing().is_missing(hoid)) {
11fdf7f2 147 ceph_assert(!obc);
7c673cae
FG
148 // pull
149 prepare_pull(
150 v,
151 hoid,
152 head,
153 h);
7c673cae 154 } else {
11fdf7f2 155 ceph_assert(obc);
7c673cae
FG
156 int started = start_pushes(
157 hoid,
158 obc,
159 h);
224ce89b
WB
160 if (started < 0) {
161 pushing[hoid].clear();
162 return started;
163 }
7c673cae 164 }
224ce89b 165 return 0;
7c673cae
FG
166}
167
168void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
169{
170 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
171 i != pull_from_peer.end();
172 ) {
173 if (osdmap->is_down(i->first.osd)) {
174 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
175 << ", osdmap has it marked down" << dendl;
176 for (set<hobject_t>::iterator j = i->second.begin();
177 j != i->second.end();
178 ++j) {
179 get_parent()->cancel_pull(*j);
180 clear_pull(pulling.find(*j), false);
181 }
182 pull_from_peer.erase(i++);
183 } else {
184 ++i;
185 }
186 }
187}
188
189bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
190{
191 dout(10) << __func__ << ": " << op << dendl;
192 switch (op->get_req()->get_type()) {
193 case MSG_OSD_PG_PULL:
194 return true;
195 default:
196 return false;
197 }
198}
199
c07f9fc5 200bool ReplicatedBackend::_handle_message(
7c673cae
FG
201 OpRequestRef op
202 )
203{
204 dout(10) << __func__ << ": " << op << dendl;
205 switch (op->get_req()->get_type()) {
206 case MSG_OSD_PG_PUSH:
207 do_push(op);
208 return true;
209
210 case MSG_OSD_PG_PULL:
211 do_pull(op);
212 return true;
213
214 case MSG_OSD_PG_PUSH_REPLY:
215 do_push_reply(op);
216 return true;
217
7c673cae
FG
218 case MSG_OSD_REPOP: {
219 do_repop(op);
220 return true;
221 }
222
223 case MSG_OSD_REPOPREPLY: {
224 do_repop_reply(op);
225 return true;
226 }
227
228 default:
229 break;
230 }
231 return false;
232}
233
234void ReplicatedBackend::clear_recovery_state()
235{
236 // clear pushing/pulling maps
237 for (auto &&i: pushing) {
238 for (auto &&j: i.second) {
239 get_parent()->release_locks(j.second.lock_manager);
240 }
241 }
242 pushing.clear();
243
244 for (auto &&i: pulling) {
245 get_parent()->release_locks(i.second.lock_manager);
246 }
247 pulling.clear();
248 pull_from_peer.clear();
249}
250
251void ReplicatedBackend::on_change()
252{
253 dout(10) << __func__ << dendl;
11fdf7f2
TL
254 for (auto& op : in_progress_ops) {
255 delete op.second->on_commit;
256 op.second->on_commit = nullptr;
7c673cae 257 }
11fdf7f2 258 in_progress_ops.clear();
7c673cae
FG
259 clear_recovery_state();
260}
261
7c673cae
FG
262int ReplicatedBackend::objects_read_sync(
263 const hobject_t &hoid,
264 uint64_t off,
265 uint64_t len,
266 uint32_t op_flags,
267 bufferlist *bl)
268{
269 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
270}
271
9f95a23c
TL
272int ReplicatedBackend::objects_readv_sync(
273 const hobject_t &hoid,
274 map<uint64_t, uint64_t>&& m,
275 uint32_t op_flags,
276 bufferlist *bl)
277{
278 interval_set<uint64_t> im(std::move(m));
279 auto r = store->readv(ch, ghobject_t(hoid), im, *bl, op_flags);
280 if (r >= 0) {
281 m = std::move(im).detach();
282 }
283 return r;
284}
285
7c673cae
FG
286void ReplicatedBackend::objects_read_async(
287 const hobject_t &hoid,
288 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
289 pair<bufferlist*, Context*> > > &to_read,
290 Context *on_complete,
291 bool fast_read)
292{
11fdf7f2 293 ceph_abort_msg("async read is not used by replica pool");
7c673cae
FG
294}
295
296class C_OSD_OnOpCommit : public Context {
297 ReplicatedBackend *pg;
9f95a23c 298 ceph::ref_t<ReplicatedBackend::InProgressOp> op;
7c673cae 299public:
9f95a23c
TL
300 C_OSD_OnOpCommit(ReplicatedBackend *pg, ceph::ref_t<ReplicatedBackend::InProgressOp> op)
301 : pg(pg), op(std::move(op)) {}
7c673cae
FG
302 void finish(int) override {
303 pg->op_commit(op);
304 }
305};
306
7c673cae
FG
307void generate_transaction(
308 PGTransactionUPtr &pgt,
309 const coll_t &coll,
7c673cae
FG
310 vector<pg_log_entry_t> &log_entries,
311 ObjectStore::Transaction *t,
312 set<hobject_t> *added,
9f95a23c
TL
313 set<hobject_t> *removed,
314 const ceph_release_t require_osd_release = ceph_release_t::unknown )
7c673cae 315{
11fdf7f2
TL
316 ceph_assert(t);
317 ceph_assert(added);
318 ceph_assert(removed);
7c673cae
FG
319
320 for (auto &&le: log_entries) {
321 le.mark_unrollbackable();
322 auto oiter = pgt->op_map.find(le.soid);
323 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
31f18b77 324 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
11fdf7f2 325 encode(oiter->second.updated_snaps->second, bl);
31f18b77
FG
326 le.snaps.swap(bl);
327 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
7c673cae
FG
328 }
329 }
330
331 pgt->safe_create_traverse(
332 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
333 const hobject_t &oid = obj_op.first;
334 const ghobject_t goid =
335 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
336 const PGTransaction::ObjectOperation &op = obj_op.second;
337
338 if (oid.is_temp()) {
339 if (op.is_fresh_object()) {
340 added->insert(oid);
341 } else if (op.is_delete()) {
342 removed->insert(oid);
343 }
344 }
345
346 if (op.delete_first) {
347 t->remove(coll, goid);
348 }
349
350 match(
351 op.init_type,
352 [&](const PGTransaction::ObjectOperation::Init::None &) {
353 },
354 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
9f95a23c
TL
355 if (require_osd_release >= ceph_release_t::octopus) {
356 t->create(coll, goid);
357 } else {
358 t->touch(coll, goid);
359 }
7c673cae
FG
360 },
361 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
362 t->clone(
363 coll,
364 ghobject_t(
365 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
366 goid);
367 },
368 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
11fdf7f2 369 ceph_assert(op.source.is_temp());
7c673cae
FG
370 t->collection_move_rename(
371 coll,
372 ghobject_t(
373 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
374 coll,
375 goid);
376 });
377
378 if (op.truncate) {
379 t->truncate(coll, goid, op.truncate->first);
380 if (op.truncate->first != op.truncate->second)
381 t->truncate(coll, goid, op.truncate->second);
382 }
383
384 if (!op.attr_updates.empty()) {
385 map<string, bufferlist> attrs;
386 for (auto &&p: op.attr_updates) {
387 if (p.second)
388 attrs[p.first] = *(p.second);
389 else
390 t->rmattr(coll, goid, p.first);
391 }
392 t->setattrs(coll, goid, attrs);
393 }
394
395 if (op.clear_omap)
396 t->omap_clear(coll, goid);
397 if (op.omap_header)
398 t->omap_setheader(coll, goid, *(op.omap_header));
399
400 for (auto &&up: op.omap_updates) {
401 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
402 switch (up.first) {
403 case UpdateType::Remove:
404 t->omap_rmkeys(coll, goid, up.second);
405 break;
406 case UpdateType::Insert:
407 t->omap_setkeys(coll, goid, up.second);
408 break;
9f95a23c
TL
409 case UpdateType::RemoveRange:
410 t->omap_rmkeyrange(coll, goid, up.second);
411 break;
7c673cae
FG
412 }
413 }
414
415 // updated_snaps doesn't matter since we marked unrollbackable
416
417 if (op.alloc_hint) {
418 auto &hint = *(op.alloc_hint);
419 t->set_alloc_hint(
420 coll,
421 goid,
422 hint.expected_object_size,
423 hint.expected_write_size,
424 hint.flags);
425 }
426
427 for (auto &&extent: op.buffer_updates) {
428 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
429 match(
430 extent.get_val(),
431 [&](const BufferUpdate::Write &op) {
432 t->write(
433 coll,
434 goid,
435 extent.get_off(),
436 extent.get_len(),
9f95a23c
TL
437 op.buffer,
438 op.fadvise_flags);
7c673cae
FG
439 },
440 [&](const BufferUpdate::Zero &op) {
441 t->zero(
442 coll,
443 goid,
444 extent.get_off(),
445 extent.get_len());
446 },
447 [&](const BufferUpdate::CloneRange &op) {
11fdf7f2 448 ceph_assert(op.len == extent.get_len());
7c673cae
FG
449 t->clone_range(
450 coll,
451 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
452 goid,
453 op.offset,
454 extent.get_len(),
455 extent.get_off());
456 });
457 }
458 });
459}
460
461void ReplicatedBackend::submit_transaction(
462 const hobject_t &soid,
463 const object_stat_sum_t &delta_stats,
464 const eversion_t &at_version,
465 PGTransactionUPtr &&_t,
466 const eversion_t &trim_to,
9f95a23c 467 const eversion_t &min_last_complete_ondisk,
f67539c2 468 vector<pg_log_entry_t>&& _log_entries,
9f95a23c 469 std::optional<pg_hit_set_history_t> &hset_history,
7c673cae
FG
470 Context *on_all_commit,
471 ceph_tid_t tid,
472 osd_reqid_t reqid,
473 OpRequestRef orig_op)
474{
475 parent->apply_stats(
476 soid,
477 delta_stats);
478
479 vector<pg_log_entry_t> log_entries(_log_entries);
480 ObjectStore::Transaction op_t;
481 PGTransactionUPtr t(std::move(_t));
482 set<hobject_t> added, removed;
483 generate_transaction(
484 t,
485 coll,
7c673cae
FG
486 log_entries,
487 &op_t,
488 &added,
9f95a23c
TL
489 &removed,
490 get_osdmap()->require_osd_release);
11fdf7f2
TL
491 ceph_assert(added.size() <= 1);
492 ceph_assert(removed.size() <= 1);
7c673cae 493
11fdf7f2 494 auto insert_res = in_progress_ops.insert(
7c673cae
FG
495 make_pair(
496 tid,
9f95a23c 497 ceph::make_ref<InProgressOp>(
11fdf7f2 498 tid, on_all_commit,
7c673cae
FG
499 orig_op, at_version)
500 )
11fdf7f2
TL
501 );
502 ceph_assert(insert_res.second);
503 InProgressOp &op = *insert_res.first->second;
7c673cae 504
f67539c2
TL
505#ifdef HAVE_JAEGER
506 auto rep_sub_trans = jaeger_tracing::child_span("ReplicatedBackend::submit_transaction", orig_op->osd_parent_span);
507#endif
7c673cae 508 op.waiting_for_commit.insert(
11fdf7f2
TL
509 parent->get_acting_recovery_backfill_shards().begin(),
510 parent->get_acting_recovery_backfill_shards().end());
7c673cae
FG
511
512 issue_op(
513 soid,
514 at_version,
515 tid,
516 reqid,
517 trim_to,
9f95a23c 518 min_last_complete_ondisk,
7c673cae
FG
519 added.size() ? *(added.begin()) : hobject_t(),
520 removed.size() ? *(removed.begin()) : hobject_t(),
521 log_entries,
522 hset_history,
523 &op,
524 op_t);
525
526 add_temp_objs(added);
527 clear_temp_objs(removed);
528
529 parent->log_operation(
f67539c2 530 std::move(log_entries),
7c673cae
FG
531 hset_history,
532 trim_to,
533 at_version,
9f95a23c 534 min_last_complete_ondisk,
7c673cae
FG
535 true,
536 op_t);
537
7c673cae
FG
538 op_t.register_on_commit(
539 parent->bless_context(
540 new C_OSD_OnOpCommit(this, &op)));
541
542 vector<ObjectStore::Transaction> tls;
543 tls.push_back(std::move(op_t));
544
545 parent->queue_transactions(tls, op.op);
11fdf7f2
TL
546 if (at_version != eversion_t()) {
547 parent->op_applied(at_version);
7c673cae
FG
548 }
549}
550
9f95a23c 551void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op)
7c673cae 552{
11fdf7f2
TL
553 if (op->on_commit == nullptr) {
554 // aborted
555 return;
556 }
557
558 FUNCTRACE(cct);
7c673cae
FG
559 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
560 dout(10) << __func__ << ": " << op->tid << dendl;
561 if (op->op) {
562 op->op->mark_event("op_commit");
563 op->op->pg_trace.event("op commit");
564 }
565
566 op->waiting_for_commit.erase(get_parent()->whoami_shard());
567
568 if (op->waiting_for_commit.empty()) {
569 op->on_commit->complete(0);
570 op->on_commit = 0;
7c673cae
FG
571 in_progress_ops.erase(op->tid);
572 }
573}
574
575void ReplicatedBackend::do_repop_reply(OpRequestRef op)
576{
577 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
9f95a23c 578 auto r = op->get_req<MOSDRepOpReply>();
11fdf7f2 579 ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);
7c673cae
FG
580
581 op->mark_started();
582
583 // must be replication.
584 ceph_tid_t rep_tid = r->get_tid();
585 pg_shard_t from = r->from;
586
11fdf7f2
TL
587 auto iter = in_progress_ops.find(rep_tid);
588 if (iter != in_progress_ops.end()) {
589 InProgressOp &ip_op = *iter->second;
9f95a23c 590 const MOSDOp *m = nullptr;
7c673cae 591 if (ip_op.op)
9f95a23c 592 m = ip_op.op->get_req<MOSDOp>();
7c673cae
FG
593
594 if (m)
595 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
596 << " ack_type " << (int)r->ack_type
597 << " from " << from
598 << dendl;
599 else
600 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
601 << " ack_type " << (int)r->ack_type
602 << " from " << from
603 << dendl;
604
605 // oh, good.
606
607 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
11fdf7f2 608 ceph_assert(ip_op.waiting_for_commit.count(from));
7c673cae
FG
609 ip_op.waiting_for_commit.erase(from);
610 if (ip_op.op) {
11fdf7f2 611 ip_op.op->mark_event("sub_op_commit_rec");
7c673cae
FG
612 ip_op.op->pg_trace.event("sub_op_commit_rec");
613 }
614 } else {
11fdf7f2 615 // legacy peer; ignore
7c673cae 616 }
7c673cae
FG
617
618 parent->update_peer_last_complete_ondisk(
619 from,
620 r->get_last_complete_ondisk());
621
7c673cae
FG
622 if (ip_op.waiting_for_commit.empty() &&
623 ip_op.on_commit) {
624 ip_op.on_commit->complete(0);
11fdf7f2 625 ip_op.on_commit = 0;
7c673cae
FG
626 in_progress_ops.erase(iter);
627 }
628 }
629}
630
28e407b8 631int ReplicatedBackend::be_deep_scrub(
7c673cae 632 const hobject_t &poid,
28e407b8
AA
633 ScrubMap &map,
634 ScrubMapBuilder &pos,
635 ScrubMap::object &o)
7c673cae 636{
28e407b8 637 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 638 int r;
28e407b8 639 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
91327a77
AA
640 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
641 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE;
28e407b8
AA
642
643 utime_t sleeptime;
644 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
645 if (sleeptime != utime_t()) {
646 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
647 sleeptime.sleep();
648 }
7c673cae 649
11fdf7f2 650 ceph_assert(poid == pos.ls[pos.pos]);
28e407b8
AA
651 if (!pos.data_done()) {
652 if (pos.data_pos == 0) {
653 pos.data_hash = bufferhash(-1);
654 }
7c673cae 655
28e407b8 656 bufferlist bl;
7c673cae 657 r = store->read(
28e407b8
AA
658 ch,
659 ghobject_t(
660 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
661 pos.data_pos,
662 cct->_conf->osd_deep_scrub_stride, bl,
663 fadvise_flags);
664 if (r < 0) {
665 dout(20) << __func__ << " " << poid << " got "
666 << r << " on read, read_error" << dendl;
667 o.read_error = true;
668 return 0;
669 }
670 if (r > 0) {
671 pos.data_hash << bl;
672 }
673 pos.data_pos += r;
674 if (r == cct->_conf->osd_deep_scrub_stride) {
675 dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
676 << std::hex << pos.data_hash.digest() << std::dec << dendl;
677 return -EINPROGRESS;
678 }
679 // done with bytes
680 pos.data_pos = -1;
681 o.digest = pos.data_hash.digest();
682 o.digest_present = true;
683 dout(20) << __func__ << " " << poid << " done with data, digest 0x"
684 << std::hex << o.digest << std::dec << dendl;
7c673cae 685 }
7c673cae 686
28e407b8
AA
687 // omap header
688 if (pos.omap_pos.empty()) {
689 pos.omap_hash = bufferhash(-1);
690
691 bufferlist hdrbl;
692 r = store->omap_get_header(
11fdf7f2 693 ch,
28e407b8
AA
694 ghobject_t(
695 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
696 &hdrbl, true);
697 if (r == -EIO) {
698 dout(20) << __func__ << " " << poid << " got "
699 << r << " on omap header read, read_error" << dendl;
700 o.read_error = true;
701 return 0;
702 }
703 if (r == 0 && hdrbl.length()) {
81eedcae
TL
704 bool encoded = false;
705 dout(25) << "CRC header " << cleanbin(hdrbl, encoded, true) << dendl;
28e407b8 706 pos.omap_hash << hdrbl;
7c673cae 707 }
7c673cae
FG
708 }
709
28e407b8 710 // omap
7c673cae 711 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
11fdf7f2 712 ch,
7c673cae
FG
713 ghobject_t(
714 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
11fdf7f2 715 ceph_assert(iter);
28e407b8
AA
716 if (pos.omap_pos.length()) {
717 iter->lower_bound(pos.omap_pos);
718 } else {
719 iter->seek_to_first();
720 }
11fdf7f2 721 int max = g_conf()->osd_deep_scrub_keys;
28e407b8
AA
722 while (iter->status() == 0 && iter->valid()) {
723 pos.omap_bytes += iter->value().length();
724 ++pos.omap_keys;
725 --max;
726 // fixme: we can do this more efficiently.
727 bufferlist bl;
11fdf7f2
TL
728 encode(iter->key(), bl);
729 encode(iter->value(), bl);
28e407b8
AA
730 pos.omap_hash << bl;
731
732 iter->next();
733
734 if (iter->valid() && max == 0) {
735 pos.omap_pos = iter->key();
736 return -EINPROGRESS;
737 }
738 if (iter->status() < 0) {
739 dout(25) << __func__ << " " << poid
740 << " on omap scan, db status error" << dendl;
741 o.read_error = true;
742 return 0;
743 }
7c673cae
FG
744 }
745
28e407b8
AA
746 if (pos.omap_keys > cct->_conf->
747 osd_deep_scrub_large_omap_object_key_threshold ||
748 pos.omap_bytes > cct->_conf->
749 osd_deep_scrub_large_omap_object_value_sum_threshold) {
750 dout(25) << __func__ << " " << poid
751 << " large omap object detected. Object has " << pos.omap_keys
752 << " keys and size " << pos.omap_bytes << " bytes" << dendl;
753 o.large_omap_object_found = true;
754 o.large_omap_object_key_count = pos.omap_keys;
755 o.large_omap_object_value_size = pos.omap_bytes;
756 map.has_large_omap_object_errors = true;
7c673cae
FG
757 }
758
28e407b8 759 o.omap_digest = pos.omap_hash.digest();
7c673cae 760 o.omap_digest_present = true;
28e407b8 761 dout(20) << __func__ << " done with " << poid << " omap_digest "
7c673cae 762 << std::hex << o.omap_digest << std::dec << dendl;
28e407b8 763
11fdf7f2
TL
764 // Sum up omap usage
765 if (pos.omap_keys > 0 || pos.omap_bytes > 0) {
766 dout(25) << __func__ << " adding " << pos.omap_keys << " keys and "
767 << pos.omap_bytes << " bytes to pg_stats sums" << dendl;
768 map.has_omap_keys = true;
769 o.object_omap_bytes = pos.omap_bytes;
770 o.object_omap_keys = pos.omap_keys;
771 }
772
28e407b8
AA
773 // done!
774 return 0;
7c673cae
FG
775}
776
777void ReplicatedBackend::_do_push(OpRequestRef op)
778{
9f95a23c 779 auto m = op->get_req<MOSDPGPush>();
11fdf7f2 780 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
7c673cae
FG
781 pg_shard_t from = m->from;
782
783 op->mark_started();
784
785 vector<PushReplyOp> replies;
786 ObjectStore::Transaction t;
11fdf7f2
TL
787 if (get_parent()->check_failsafe_full()) {
788 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
7c673cae
FG
789 ceph_abort();
790 }
791 for (vector<PushOp>::const_iterator i = m->pushes.begin();
792 i != m->pushes.end();
793 ++i) {
794 replies.push_back(PushReplyOp());
11fdf7f2 795 handle_push(from, *i, &(replies.back()), &t, m->is_repair);
7c673cae
FG
796 }
797
798 MOSDPGPushReply *reply = new MOSDPGPushReply;
799 reply->from = get_parent()->whoami_shard();
800 reply->set_priority(m->get_priority());
801 reply->pgid = get_info().pgid;
802 reply->map_epoch = m->map_epoch;
803 reply->min_epoch = m->min_epoch;
804 reply->replies.swap(replies);
805 reply->compute_cost(cct);
806
807 t.register_on_complete(
808 new PG_SendMessageOnConn(
809 get_parent(), reply, m->get_connection()));
810
811 get_parent()->queue_transaction(std::move(t));
812}
813
814struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
815 ReplicatedBackend *bc;
816 list<ReplicatedBackend::pull_complete_info> to_continue;
817 int priority;
818 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
819 : bc(bc), priority(priority) {}
820
821 void finish(ThreadPool::TPHandle &handle) override {
822 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
823 for (auto &&i: to_continue) {
824 auto j = bc->pulling.find(i.hoid);
11fdf7f2 825 ceph_assert(j != bc->pulling.end());
7c673cae
FG
826 ObjectContextRef obc = j->second.obc;
827 bc->clear_pull(j, false /* already did it */);
224ce89b
WB
828 int started = bc->start_pushes(i.hoid, obc, h);
829 if (started < 0) {
830 bc->pushing[i.hoid].clear();
9f95a23c
TL
831 bc->get_parent()->on_failed_pull(
832 { bc->get_parent()->whoami_shard() },
833 i.hoid, obc->obs.oi.version);
224ce89b 834 } else if (!started) {
7c673cae 835 bc->get_parent()->on_global_recover(
c07f9fc5 836 i.hoid, i.stat, false);
7c673cae
FG
837 }
838 handle.reset_tp_timeout();
839 }
840 bc->run_recovery_op(h, priority);
841 }
842};
843
844void ReplicatedBackend::_do_pull_response(OpRequestRef op)
845{
9f95a23c 846 auto m = op->get_req<MOSDPGPush>();
11fdf7f2 847 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
7c673cae
FG
848 pg_shard_t from = m->from;
849
850 op->mark_started();
851
852 vector<PullOp> replies(1);
11fdf7f2
TL
853 if (get_parent()->check_failsafe_full()) {
854 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push)." << dendl;
7c673cae
FG
855 ceph_abort();
856 }
857
858 ObjectStore::Transaction t;
859 list<pull_complete_info> to_continue;
860 for (vector<PushOp>::const_iterator i = m->pushes.begin();
861 i != m->pushes.end();
862 ++i) {
863 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
864 if (more)
865 replies.push_back(PullOp());
866 }
867 if (!to_continue.empty()) {
868 C_ReplicatedBackend_OnPullComplete *c =
869 new C_ReplicatedBackend_OnPullComplete(
870 this,
871 m->get_priority());
872 c->to_continue.swap(to_continue);
873 t.register_on_complete(
874 new PG_RecoveryQueueAsync(
875 get_parent(),
11fdf7f2 876 get_parent()->bless_unlocked_gencontext(c)));
7c673cae
FG
877 }
878 replies.erase(replies.end() - 1);
879
880 if (replies.size()) {
881 MOSDPGPull *reply = new MOSDPGPull;
882 reply->from = parent->whoami_shard();
883 reply->set_priority(m->get_priority());
884 reply->pgid = get_info().pgid;
885 reply->map_epoch = m->map_epoch;
886 reply->min_epoch = m->min_epoch;
f67539c2 887 reply->set_pulls(std::move(replies));
7c673cae
FG
888 reply->compute_cost(cct);
889
890 t.register_on_complete(
891 new PG_SendMessageOnConn(
892 get_parent(), reply, m->get_connection()));
893 }
894
895 get_parent()->queue_transaction(std::move(t));
896}
897
898void ReplicatedBackend::do_pull(OpRequestRef op)
899{
900 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
11fdf7f2 901 ceph_assert(m->get_type() == MSG_OSD_PG_PULL);
7c673cae
FG
902 pg_shard_t from = m->from;
903
904 map<pg_shard_t, vector<PushOp> > replies;
f67539c2 905 for (auto& i : m->take_pulls()) {
7c673cae
FG
906 replies[from].push_back(PushOp());
907 handle_pull(from, i, &(replies[from].back()));
908 }
909 send_pushes(m->get_priority(), replies);
910}
911
912void ReplicatedBackend::do_push_reply(OpRequestRef op)
913{
9f95a23c 914 auto m = op->get_req<MOSDPGPushReply>();
11fdf7f2 915 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
7c673cae
FG
916 pg_shard_t from = m->from;
917
918 vector<PushOp> replies(1);
919 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
920 i != m->replies.end();
921 ++i) {
922 bool more = handle_push_reply(from, *i, &(replies.back()));
923 if (more)
924 replies.push_back(PushOp());
925 }
926 replies.erase(replies.end() - 1);
927
928 map<pg_shard_t, vector<PushOp> > _replies;
929 _replies[from].swap(replies);
930 send_pushes(m->get_priority(), _replies);
931}
932
933Message * ReplicatedBackend::generate_subop(
934 const hobject_t &soid,
935 const eversion_t &at_version,
936 ceph_tid_t tid,
937 osd_reqid_t reqid,
938 eversion_t pg_trim_to,
9f95a23c 939 eversion_t min_last_complete_ondisk,
7c673cae
FG
940 hobject_t new_temp_oid,
941 hobject_t discard_temp_oid,
11fdf7f2 942 const bufferlist &log_entries,
9f95a23c 943 std::optional<pg_hit_set_history_t> &hset_hist,
7c673cae
FG
944 ObjectStore::Transaction &op_t,
945 pg_shard_t peer,
946 const pg_info_t &pinfo)
947{
948 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
949 // forward the write/update/whatever
950 MOSDRepOp *wr = new MOSDRepOp(
951 reqid, parent->whoami_shard(),
952 spg_t(get_info().pgid.pgid, peer.shard),
953 soid, acks_wanted,
11fdf7f2 954 get_osdmap_epoch(),
7c673cae
FG
955 parent->get_last_peering_reset_epoch(),
956 tid, at_version);
957
958 // ship resulting transaction, log entries, and pg_stats
959 if (!parent->should_send_op(peer, soid)) {
7c673cae 960 ObjectStore::Transaction t;
11fdf7f2 961 encode(t, wr->get_data());
7c673cae 962 } else {
11fdf7f2 963 encode(op_t, wr->get_data());
7c673cae
FG
964 wr->get_header().data_off = op_t.get_data_alignment();
965 }
966
11fdf7f2 967 wr->logbl = log_entries;
7c673cae
FG
968
969 if (pinfo.is_incomplete())
970 wr->pg_stats = pinfo.stats; // reflects backfill progress
971 else
972 wr->pg_stats = get_info().stats;
973
974 wr->pg_trim_to = pg_trim_to;
9f95a23c
TL
975
976 if (HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)) {
977 wr->min_last_complete_ondisk = min_last_complete_ondisk;
978 } else {
979 /* Some replicas need this field to be at_version. New replicas
980 * will ignore it */
981 wr->set_rollback_to(at_version);
982 }
7c673cae
FG
983
984 wr->new_temp_oid = new_temp_oid;
985 wr->discard_temp_oid = discard_temp_oid;
986 wr->updated_hit_set_history = hset_hist;
987 return wr;
988}
989
990void ReplicatedBackend::issue_op(
991 const hobject_t &soid,
992 const eversion_t &at_version,
993 ceph_tid_t tid,
994 osd_reqid_t reqid,
995 eversion_t pg_trim_to,
9f95a23c 996 eversion_t min_last_complete_ondisk,
7c673cae
FG
997 hobject_t new_temp_oid,
998 hobject_t discard_temp_oid,
999 const vector<pg_log_entry_t> &log_entries,
9f95a23c 1000 std::optional<pg_hit_set_history_t> &hset_hist,
7c673cae
FG
1001 InProgressOp *op,
1002 ObjectStore::Transaction &op_t)
1003{
11fdf7f2
TL
1004 if (parent->get_acting_recovery_backfill_shards().size() > 1) {
1005 if (op->op) {
1006 op->op->pg_trace.event("issue replication ops");
1007 ostringstream ss;
1008 set<pg_shard_t> replicas = parent->get_acting_recovery_backfill_shards();
1009 replicas.erase(parent->whoami_shard());
1010 ss << "waiting for subops from " << replicas;
7c673cae 1011 op->op->mark_sub_op_sent(ss.str());
11fdf7f2 1012 }
7c673cae 1013
11fdf7f2
TL
1014 // avoid doing the same work in generate_subop
1015 bufferlist logs;
1016 encode(log_entries, logs);
1017
1018 for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
1019 if (shard == parent->whoami_shard()) continue;
1020 const pg_info_t &pinfo = parent->get_shard_info().find(shard)->second;
1021
1022 Message *wr;
1023 wr = generate_subop(
1024 soid,
1025 at_version,
1026 tid,
1027 reqid,
1028 pg_trim_to,
9f95a23c 1029 min_last_complete_ondisk,
11fdf7f2
TL
1030 new_temp_oid,
1031 discard_temp_oid,
1032 logs,
1033 hset_hist,
1034 op_t,
1035 shard,
1036 pinfo);
1037 if (op->op && op->op->pg_trace)
1038 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1039 get_parent()->send_message_osd_cluster(
1040 shard.osd, wr, get_osdmap_epoch());
1041 }
7c673cae
FG
1042 }
1043}
1044
1045// sub op modify
1046void ReplicatedBackend::do_repop(OpRequestRef op)
1047{
1048 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
9f95a23c 1049 auto m = op->get_req<MOSDRepOp>();
7c673cae 1050 int msg_type = m->get_type();
11fdf7f2 1051 ceph_assert(MSG_OSD_REPOP == msg_type);
7c673cae
FG
1052
1053 const hobject_t& soid = m->poid;
1054
1055 dout(10) << __func__ << " " << soid
1056 << " v " << m->version
1057 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1058 << " " << m->logbl.length()
1059 << dendl;
1060
f67539c2
TL
1061#ifdef HAVE_JAEGER
1062 auto do_repop_span = jaeger_tracing::child_span(__func__, op->osd_parent_span);
1063#endif
1064
7c673cae 1065 // sanity checks
11fdf7f2 1066 ceph_assert(m->map_epoch >= get_info().history.same_interval_since);
7c673cae 1067
11fdf7f2 1068 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
28e407b8
AA
1069 parent->maybe_preempt_replica_scrub(soid);
1070
7c673cae
FG
1071 int ackerosd = m->get_source().num();
1072
1073 op->mark_started();
1074
1075 RepModifyRef rm(std::make_shared<RepModify>());
1076 rm->op = op;
1077 rm->ackerosd = ackerosd;
1078 rm->last_complete = get_info().last_complete;
11fdf7f2 1079 rm->epoch_started = get_osdmap_epoch();
7c673cae 1080
11fdf7f2 1081 ceph_assert(m->logbl.length());
7c673cae
FG
1082 // shipped transaction and log entries
1083 vector<pg_log_entry_t> log;
1084
11fdf7f2
TL
1085 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1086 decode(rm->opt, p);
7c673cae
FG
1087
1088 if (m->new_temp_oid != hobject_t()) {
1089 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1090 add_temp_obj(m->new_temp_oid);
1091 }
1092 if (m->discard_temp_oid != hobject_t()) {
1093 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1094 if (rm->opt.empty()) {
1095 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1096 << " since we won't get the transaction" << dendl;
1097 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1098 }
1099 clear_temp_obj(m->discard_temp_oid);
1100 }
1101
1102 p = const_cast<bufferlist&>(m->logbl).begin();
11fdf7f2 1103 decode(log, p);
7c673cae
FG
1104 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1105
1106 bool update_snaps = false;
1107 if (!rm->opt.empty()) {
1108 // If the opt is non-empty, we infer we are before
1109 // last_backfill (according to the primary, not our
1110 // not-quite-accurate value), and should update the
1111 // collections now. Otherwise, we do it later on push.
1112 update_snaps = true;
1113 }
11fdf7f2
TL
1114
1115 // flag set to true during async recovery
1116 bool async = false;
1117 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
1118 if (pmissing.is_missing(soid)) {
1119 async = true;
1120 dout(30) << __func__ << " is_missing " << pmissing.is_missing(soid) << dendl;
1121 for (auto &&e: log) {
1122 dout(30) << " add_next_event entry " << e << dendl;
1123 get_parent()->add_local_next_event(e);
1124 dout(30) << " entry is_delete " << e.is_delete() << dendl;
1125 }
1126 }
1127
7c673cae
FG
1128 parent->update_stats(m->pg_stats);
1129 parent->log_operation(
f67539c2 1130 std::move(log),
7c673cae
FG
1131 m->updated_hit_set_history,
1132 m->pg_trim_to,
9f95a23c
TL
1133 m->version, /* Replicated PGs don't have rollback info */
1134 m->min_last_complete_ondisk,
7c673cae 1135 update_snaps,
11fdf7f2
TL
1136 rm->localt,
1137 async);
7c673cae
FG
1138
1139 rm->opt.register_on_commit(
1140 parent->bless_context(
1141 new C_OSD_RepModifyCommit(this, rm)));
7c673cae
FG
1142 vector<ObjectStore::Transaction> tls;
1143 tls.reserve(2);
1144 tls.push_back(std::move(rm->localt));
1145 tls.push_back(std::move(rm->opt));
1146 parent->queue_transactions(tls, op);
1147 // op is cleaned up by oncommit/onapply when both are executed
11fdf7f2 1148 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
7c673cae
FG
1149}
1150
1151void ReplicatedBackend::repop_commit(RepModifyRef rm)
1152{
1153 rm->op->mark_commit_sent();
1154 rm->op->pg_trace.event("sup_op_commit");
1155 rm->committed = true;
1156
1157 // send commit.
9f95a23c 1158 auto m = rm->op->get_req<MOSDRepOp>();
11fdf7f2 1159 ceph_assert(m->get_type() == MSG_OSD_REPOP);
7c673cae
FG
1160 dout(10) << __func__ << " on op " << *m
1161 << ", sending commit to osd." << rm->ackerosd
1162 << dendl;
11fdf7f2 1163 ceph_assert(get_osdmap()->is_up(rm->ackerosd));
7c673cae
FG
1164
1165 get_parent()->update_last_complete_ondisk(rm->last_complete);
1166
1167 MOSDRepOpReply *reply = new MOSDRepOpReply(
1168 m,
1169 get_parent()->whoami_shard(),
11fdf7f2 1170 0, get_osdmap_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
7c673cae
FG
1171 reply->set_last_complete_ondisk(rm->last_complete);
1172 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1173 reply->trace = rm->op->pg_trace;
1174 get_parent()->send_message_osd_cluster(
11fdf7f2 1175 rm->ackerosd, reply, get_osdmap_epoch());
7c673cae
FG
1176
1177 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1178}
1179
1180
1181// ===========================================================
1182
1183void ReplicatedBackend::calc_head_subsets(
1184 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1185 const pg_missing_t& missing,
1186 const hobject_t &last_backfill,
1187 interval_set<uint64_t>& data_subset,
1188 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1189 ObcLockManager &manager)
1190{
1191 dout(10) << "calc_head_subsets " << head
1192 << " clone_overlap " << snapset.clone_overlap << dendl;
1193
1194 uint64_t size = obc->obs.oi.size;
1195 if (size)
1196 data_subset.insert(0, size);
1197
9f95a23c
TL
1198 if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS)) {
1199 const auto it = missing.get_items().find(head);
1200 assert(it != missing.get_items().end());
1201 data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
1202 dout(10) << "calc_head_subsets " << head
1203 << " data_subset " << data_subset << dendl;
1204 }
1205
7c673cae
FG
1206 if (get_parent()->get_pool().allow_incomplete_clones()) {
1207 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1208 return;
1209 }
1210
1211 if (!cct->_conf->osd_recover_clone_overlap) {
1212 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1213 return;
1214 }
1215
1216
1217 interval_set<uint64_t> cloning;
1218 interval_set<uint64_t> prev;
9f95a23c 1219 hobject_t c = head;
7c673cae
FG
1220 if (size)
1221 prev.insert(0, size);
1222
1223 for (int j=snapset.clones.size()-1; j>=0; j--) {
7c673cae
FG
1224 c.snap = snapset.clones[j];
1225 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1226 if (!missing.is_missing(c) &&
1227 c < last_backfill &&
1228 get_parent()->try_lock_for_read(c, manager)) {
1229 dout(10) << "calc_head_subsets " << head << " has prev " << c
1230 << " overlap " << prev << dendl;
9f95a23c 1231 cloning = prev;
7c673cae
FG
1232 break;
1233 }
1234 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1235 << " overlap " << prev << dendl;
1236 }
1237
9f95a23c
TL
1238 cloning.intersection_of(data_subset);
1239 if (cloning.empty()) {
1240 dout(10) << "skipping clone, nothing needs to clone" << dendl;
1241 return;
1242 }
7c673cae 1243
9f95a23c 1244 if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
7c673cae
FG
1245 dout(10) << "skipping clone, too many holes" << dendl;
1246 get_parent()->release_locks(manager);
1247 clone_subsets.clear();
1248 cloning.clear();
9f95a23c 1249 return;
7c673cae
FG
1250 }
1251
1252 // what's left for us to push?
9f95a23c 1253 clone_subsets[c] = cloning;
7c673cae
FG
1254 data_subset.subtract(cloning);
1255
1256 dout(10) << "calc_head_subsets " << head
1257 << " data_subset " << data_subset
1258 << " clone_subsets " << clone_subsets << dendl;
1259}
1260
1261void ReplicatedBackend::calc_clone_subsets(
1262 SnapSet& snapset, const hobject_t& soid,
1263 const pg_missing_t& missing,
1264 const hobject_t &last_backfill,
1265 interval_set<uint64_t>& data_subset,
1266 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1267 ObcLockManager &manager)
1268{
1269 dout(10) << "calc_clone_subsets " << soid
1270 << " clone_overlap " << snapset.clone_overlap << dendl;
1271
1272 uint64_t size = snapset.clone_size[soid.snap];
1273 if (size)
1274 data_subset.insert(0, size);
1275
1276 if (get_parent()->get_pool().allow_incomplete_clones()) {
1277 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1278 return;
1279 }
1280
1281 if (!cct->_conf->osd_recover_clone_overlap) {
1282 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1283 return;
1284 }
1285
1286 unsigned i;
1287 for (i=0; i < snapset.clones.size(); i++)
1288 if (snapset.clones[i] == soid.snap)
1289 break;
1290
1291 // any overlap with next older clone?
1292 interval_set<uint64_t> cloning;
1293 interval_set<uint64_t> prev;
1294 if (size)
1295 prev.insert(0, size);
1296 for (int j=i-1; j>=0; j--) {
1297 hobject_t c = soid;
1298 c.snap = snapset.clones[j];
1299 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1300 if (!missing.is_missing(c) &&
1301 c < last_backfill &&
1302 get_parent()->try_lock_for_read(c, manager)) {
1303 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1304 << " overlap " << prev << dendl;
1305 clone_subsets[c] = prev;
1306 cloning.union_of(prev);
1307 break;
1308 }
1309 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1310 << " overlap " << prev << dendl;
1311 }
1312
1313 // overlap with next newest?
1314 interval_set<uint64_t> next;
1315 if (size)
1316 next.insert(0, size);
1317 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1318 hobject_t c = soid;
1319 c.snap = snapset.clones[j];
1320 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1321 if (!missing.is_missing(c) &&
1322 c < last_backfill &&
1323 get_parent()->try_lock_for_read(c, manager)) {
1324 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1325 << " overlap " << next << dendl;
1326 clone_subsets[c] = next;
1327 cloning.union_of(next);
1328 break;
1329 }
1330 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1331 << " overlap " << next << dendl;
1332 }
1333
9f95a23c 1334 if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
7c673cae
FG
1335 dout(10) << "skipping clone, too many holes" << dendl;
1336 get_parent()->release_locks(manager);
1337 clone_subsets.clear();
1338 cloning.clear();
1339 }
1340
1341
1342 // what's left for us to push?
1343 data_subset.subtract(cloning);
1344
1345 dout(10) << "calc_clone_subsets " << soid
1346 << " data_subset " << data_subset
1347 << " clone_subsets " << clone_subsets << dendl;
1348}
1349
1350void ReplicatedBackend::prepare_pull(
1351 eversion_t v,
1352 const hobject_t& soid,
1353 ObjectContextRef headctx,
1354 RPGHandle *h)
1355{
9f95a23c
TL
1356 const auto missing_iter = get_parent()->get_local_missing().get_items().find(soid);
1357 ceph_assert(missing_iter != get_parent()->get_local_missing().get_items().end());
1358 eversion_t _v = missing_iter->second.need;
11fdf7f2 1359 ceph_assert(_v == v);
7c673cae
FG
1360 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1361 get_parent()->get_missing_loc_shards());
1362 const map<pg_shard_t, pg_missing_t > &peer_missing(
1363 get_parent()->get_shard_missing());
1364 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
11fdf7f2
TL
1365 ceph_assert(q != missing_loc.end());
1366 ceph_assert(!q->second.empty());
7c673cae
FG
1367
1368 // pick a pullee
81eedcae
TL
1369 auto p = q->second.end();
1370 if (cct->_conf->osd_debug_feed_pullee >= 0) {
1371 for (auto it = q->second.begin(); it != q->second.end(); it++) {
1372 if (it->osd == cct->_conf->osd_debug_feed_pullee) {
1373 p = it;
1374 break;
1375 }
1376 }
1377 }
1378 if (p == q->second.end()) {
1379 // probably because user feed a wrong pullee
1380 p = q->second.begin();
1381 std::advance(p,
f67539c2
TL
1382 ceph::util::generate_random_number<int>(0,
1383 q->second.size() - 1));
81eedcae 1384 }
11fdf7f2 1385 ceph_assert(get_osdmap()->is_up(p->osd));
7c673cae
FG
1386 pg_shard_t fromshard = *p;
1387
1388 dout(7) << "pull " << soid
1389 << " v " << v
1390 << " on osds " << q->second
1391 << " from osd." << fromshard
1392 << dendl;
1393
11fdf7f2 1394 ceph_assert(peer_missing.count(fromshard));
7c673cae
FG
1395 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1396 if (pmissing.is_missing(soid, v)) {
11fdf7f2 1397 ceph_assert(pmissing.get_items().find(soid)->second.have != v);
7c673cae
FG
1398 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1399 << " at version " << pmissing.get_items().find(soid)->second.have
1400 << " rather than at version " << v << dendl;
1401 v = pmissing.get_items().find(soid)->second.have;
11fdf7f2 1402 ceph_assert(get_parent()->get_log().get_log().objects.count(soid) &&
7c673cae
FG
1403 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1404 pg_log_entry_t::LOST_REVERT) &&
1405 (get_parent()->get_log().get_log().objects.find(
1406 soid)->second->reverting_to ==
1407 v));
1408 }
1409
1410 ObjectRecoveryInfo recovery_info;
1411 ObcLockManager lock_manager;
1412
1413 if (soid.is_snap()) {
11fdf7f2
TL
1414 ceph_assert(!get_parent()->get_local_missing().is_missing(soid.get_head()));
1415 ceph_assert(headctx);
7c673cae
FG
1416 // check snapset
1417 SnapSetContext *ssc = headctx->ssc;
11fdf7f2 1418 ceph_assert(ssc);
7c673cae
FG
1419 dout(10) << " snapset " << ssc->snapset << dendl;
1420 recovery_info.ss = ssc->snapset;
1421 calc_clone_subsets(
1422 ssc->snapset, soid, get_parent()->get_local_missing(),
1423 get_info().last_backfill,
1424 recovery_info.copy_subset,
1425 recovery_info.clone_subset,
1426 lock_manager);
1427 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1428 dout(10) << " pulling " << recovery_info << dendl;
1429
11fdf7f2 1430 ceph_assert(ssc->snapset.clone_size.count(soid.snap));
7c673cae 1431 recovery_info.size = ssc->snapset.clone_size[soid.snap];
9f95a23c 1432 recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
7c673cae
FG
1433 } else {
1434 // pulling head or unversioned object.
1435 // always pull the whole thing.
1436 recovery_info.copy_subset.insert(0, (uint64_t)-1);
9f95a23c
TL
1437 if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS))
1438 recovery_info.copy_subset.intersection_of(missing_iter->second.clean_regions.get_dirty_regions());
7c673cae 1439 recovery_info.size = ((uint64_t)-1);
9f95a23c 1440 recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
7c673cae
FG
1441 }
1442
1443 h->pulls[fromshard].push_back(PullOp());
1444 PullOp &op = h->pulls[fromshard].back();
1445 op.soid = soid;
1446
1447 op.recovery_info = recovery_info;
1448 op.recovery_info.soid = soid;
1449 op.recovery_info.version = v;
1450 op.recovery_progress.data_complete = false;
9f95a23c
TL
1451 op.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty()
1452 && HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
7c673cae
FG
1453 op.recovery_progress.data_recovered_to = 0;
1454 op.recovery_progress.first = true;
1455
11fdf7f2 1456 ceph_assert(!pulling.count(soid));
7c673cae
FG
1457 pull_from_peer[fromshard].insert(soid);
1458 PullInfo &pi = pulling[soid];
1459 pi.from = fromshard;
1460 pi.soid = soid;
1461 pi.head_ctx = headctx;
1462 pi.recovery_info = op.recovery_info;
1463 pi.recovery_progress = op.recovery_progress;
1464 pi.cache_dont_need = h->cache_dont_need;
1465 pi.lock_manager = std::move(lock_manager);
1466}
1467
1468/*
1469 * intelligently push an object to a replica. make use of existing
1470 * clones/heads and dup data ranges where possible.
1471 */
224ce89b 1472int ReplicatedBackend::prep_push_to_replica(
7c673cae
FG
1473 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1474 PushOp *pop, bool cache_dont_need)
1475{
1476 const object_info_t& oi = obc->obs.oi;
1477 uint64_t size = obc->obs.oi.size;
1478
1479 dout(10) << __func__ << ": " << soid << " v" << oi.version
1480 << " size " << size << " to osd." << peer << dendl;
1481
1482 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1483 interval_set<uint64_t> data_subset;
1484
1485 ObcLockManager lock_manager;
1486 // are we doing a clone on the replica?
1487 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1488 hobject_t head = soid;
1489 head.snap = CEPH_NOSNAP;
1490
1491 // try to base push off of clones that succeed/preceed poid
1492 // we need the head (and current SnapSet) locally to do that.
1493 if (get_parent()->get_local_missing().is_missing(head)) {
1494 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1495 return prep_push(obc, soid, peer, pop, cache_dont_need);
1496 }
7c673cae
FG
1497
1498 SnapSetContext *ssc = obc->ssc;
11fdf7f2 1499 ceph_assert(ssc);
7c673cae
FG
1500 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1501 pop->recovery_info.ss = ssc->snapset;
1502 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1503 get_parent()->get_shard_missing().find(peer);
11fdf7f2 1504 ceph_assert(pm != get_parent()->get_shard_missing().end());
7c673cae
FG
1505 map<pg_shard_t, pg_info_t>::const_iterator pi =
1506 get_parent()->get_shard_info().find(peer);
11fdf7f2 1507 ceph_assert(pi != get_parent()->get_shard_info().end());
7c673cae
FG
1508 calc_clone_subsets(
1509 ssc->snapset, soid,
1510 pm->second,
1511 pi->second.last_backfill,
1512 data_subset, clone_subsets,
1513 lock_manager);
1514 } else if (soid.snap == CEPH_NOSNAP) {
1515 // pushing head or unversioned object.
1516 // base this on partially on replica's clones?
1517 SnapSetContext *ssc = obc->ssc;
11fdf7f2 1518 ceph_assert(ssc);
7c673cae
FG
1519 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1520 calc_head_subsets(
1521 obc,
1522 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1523 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1524 data_subset, clone_subsets,
1525 lock_manager);
1526 }
1527
224ce89b 1528 return prep_push(
7c673cae
FG
1529 obc,
1530 soid,
1531 peer,
1532 oi.version,
1533 data_subset,
1534 clone_subsets,
1535 pop,
1536 cache_dont_need,
1537 std::move(lock_manager));
1538}
1539
224ce89b 1540int ReplicatedBackend::prep_push(ObjectContextRef obc,
7c673cae
FG
1541 const hobject_t& soid, pg_shard_t peer,
1542 PushOp *pop, bool cache_dont_need)
1543{
1544 interval_set<uint64_t> data_subset;
1545 if (obc->obs.oi.size)
1546 data_subset.insert(0, obc->obs.oi.size);
1547 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1548
224ce89b 1549 return prep_push(obc, soid, peer,
7c673cae
FG
1550 obc->obs.oi.version, data_subset, clone_subsets,
1551 pop, cache_dont_need, ObcLockManager());
1552}
1553
224ce89b 1554int ReplicatedBackend::prep_push(
7c673cae
FG
1555 ObjectContextRef obc,
1556 const hobject_t& soid, pg_shard_t peer,
1557 eversion_t version,
1558 interval_set<uint64_t> &data_subset,
1559 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1560 PushOp *pop,
1561 bool cache_dont_need,
1562 ObcLockManager &&lock_manager)
1563{
1564 get_parent()->begin_peer_recover(peer, soid);
9f95a23c
TL
1565 const auto pmissing_iter = get_parent()->get_shard_missing().find(peer);
1566 const auto missing_iter = pmissing_iter->second.get_items().find(soid);
1567 assert(missing_iter != pmissing_iter->second.get_items().end());
7c673cae
FG
1568 // take note.
1569 PushInfo &pi = pushing[soid][peer];
1570 pi.obc = obc;
1571 pi.recovery_info.size = obc->obs.oi.size;
1572 pi.recovery_info.copy_subset = data_subset;
1573 pi.recovery_info.clone_subset = clone_subsets;
1574 pi.recovery_info.soid = soid;
1575 pi.recovery_info.oi = obc->obs.oi;
1576 pi.recovery_info.ss = pop->recovery_info.ss;
1577 pi.recovery_info.version = version;
9f95a23c
TL
1578 pi.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
1579 pi.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty() &&
1580 HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
7c673cae
FG
1581 pi.lock_manager = std::move(lock_manager);
1582
1583 ObjectRecoveryProgress new_progress;
1584 int r = build_push_op(pi.recovery_info,
1585 pi.recovery_progress,
1586 &new_progress,
1587 pop,
1588 &(pi.stat), cache_dont_need);
224ce89b
WB
1589 if (r < 0)
1590 return r;
7c673cae 1591 pi.recovery_progress = new_progress;
224ce89b 1592 return 0;
7c673cae
FG
1593}
1594
1595void ReplicatedBackend::submit_push_data(
1596 const ObjectRecoveryInfo &recovery_info,
1597 bool first,
1598 bool complete,
9f95a23c 1599 bool clear_omap,
7c673cae 1600 bool cache_dont_need,
9f95a23c 1601 interval_set<uint64_t> &data_zeros,
7c673cae
FG
1602 const interval_set<uint64_t> &intervals_included,
1603 bufferlist data_included,
1604 bufferlist omap_header,
1605 const map<string, bufferlist> &attrs,
1606 const map<string, bufferlist> &omap_entries,
1607 ObjectStore::Transaction *t)
1608{
1609 hobject_t target_oid;
1610 if (first && complete) {
1611 target_oid = recovery_info.soid;
1612 } else {
1613 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1614 recovery_info.version);
1615 if (first) {
1616 dout(10) << __func__ << ": Adding oid "
1617 << target_oid << " in the temp collection" << dendl;
1618 add_temp_obj(target_oid);
1619 }
1620 }
1621
1622 if (first) {
9f95a23c
TL
1623 if (!complete) {
1624 t->remove(coll, ghobject_t(target_oid));
1625 t->touch(coll, ghobject_t(target_oid));
1626 bufferlist bv = attrs.at(OI_ATTR);
1627 object_info_t oi(bv);
1628 t->set_alloc_hint(coll, ghobject_t(target_oid),
1629 oi.expected_object_size,
1630 oi.expected_write_size,
1631 oi.alloc_hint_flags);
1632 } else {
1633 if (!recovery_info.object_exist) {
1634 t->remove(coll, ghobject_t(target_oid));
1635 t->touch(coll, ghobject_t(target_oid));
1636 bufferlist bv = attrs.at(OI_ATTR);
1637 object_info_t oi(bv);
1638 t->set_alloc_hint(coll, ghobject_t(target_oid),
1639 oi.expected_object_size,
1640 oi.expected_write_size,
1641 oi.alloc_hint_flags);
1642 }
1643 //remove xattr and update later if overwrite on original object
1644 t->rmattrs(coll, ghobject_t(target_oid));
1645 //if need update omap, clear the previous content first
1646 if (clear_omap)
1647 t->omap_clear(coll, ghobject_t(target_oid));
1648 }
1649
7c673cae 1650 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
9f95a23c 1651 if (omap_header.length())
7c673cae
FG
1652 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1653
9f95a23c
TL
1654 struct stat st;
1655 int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
11fdf7f2 1656 if (get_parent()->pg_is_remote_backfilling()) {
11fdf7f2 1657 uint64_t size = 0;
9f95a23c 1658 if (r == 0)
11fdf7f2 1659 size = st.st_size;
11fdf7f2
TL
1660 // Don't need to do anything if object is still the same size
1661 if (size != recovery_info.oi.size) {
1662 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1663 get_parent()->pg_add_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1664 dout(10) << __func__ << " " << recovery_info.soid
1665 << " backfill size " << recovery_info.oi.size
1666 << " previous size " << size
1667 << " net size " << recovery_info.oi.size - size
1668 << dendl;
1669 }
1670 }
9f95a23c
TL
1671 if (!complete) {
1672 //clone overlap content in local object
1673 if (recovery_info.object_exist) {
1674 assert(r == 0);
1675 uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
1676 interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
1677 if (local_size) {
1678 local_intervals_included.insert(0, local_size);
1679 local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
1680 local_intervals_included.subtract(local_intervals_excluded);
1681 }
1682 for (interval_set<uint64_t>::const_iterator q = local_intervals_included.begin();
1683 q != local_intervals_included.end();
1684 ++q) {
1685 dout(15) << " clone_range " << recovery_info.soid << " "
1686 << q.get_start() << "~" << q.get_len() << dendl;
1687 t->clone_range(coll, ghobject_t(recovery_info.soid), ghobject_t(target_oid),
1688 q.get_start(), q.get_len(), q.get_start());
1689 }
1690 }
1691 }
7c673cae
FG
1692 }
1693 uint64_t off = 0;
1694 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1695 if (cache_dont_need)
1696 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
9f95a23c
TL
1697 // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
1698 if (data_zeros.size() > 0) {
1699 data_zeros.intersection_of(recovery_info.copy_subset);
1700 assert(intervals_included.subset_of(data_zeros));
1701 data_zeros.subtract(intervals_included);
1702
1703 dout(20) << __func__ <<" recovering object " << recovery_info.soid
1704 << " copy_subset: " << recovery_info.copy_subset
1705 << " intervals_included: " << intervals_included
1706 << " data_zeros: " << data_zeros << dendl;
1707
1708 for (auto p = data_zeros.begin(); p != data_zeros.end(); ++p)
1709 t->zero(coll, ghobject_t(target_oid), p.get_start(), p.get_len());
1710 }
7c673cae
FG
1711 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1712 p != intervals_included.end();
1713 ++p) {
1714 bufferlist bit;
1715 bit.substr_of(data_included, off, p.get_len());
1716 t->write(coll, ghobject_t(target_oid),
1717 p.get_start(), p.get_len(), bit, fadvise_flags);
1718 off += p.get_len();
1719 }
1720
1721 if (!omap_entries.empty())
1722 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1723 if (!attrs.empty())
1724 t->setattrs(coll, ghobject_t(target_oid), attrs);
1725
1726 if (complete) {
1727 if (!first) {
1728 dout(10) << __func__ << ": Removing oid "
9f95a23c 1729 << target_oid << " from the temp collection" << dendl;
7c673cae
FG
1730 clear_temp_obj(target_oid);
1731 t->remove(coll, ghobject_t(recovery_info.soid));
1732 t->collection_move_rename(coll, ghobject_t(target_oid),
9f95a23c 1733 coll, ghobject_t(recovery_info.soid));
7c673cae
FG
1734 }
1735
1736 submit_push_complete(recovery_info, t);
9f95a23c 1737
7c673cae
FG
1738 }
1739}
1740
1741void ReplicatedBackend::submit_push_complete(
1742 const ObjectRecoveryInfo &recovery_info,
1743 ObjectStore::Transaction *t)
1744{
1745 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1746 recovery_info.clone_subset.begin();
1747 p != recovery_info.clone_subset.end();
1748 ++p) {
1749 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1750 q != p->second.end();
1751 ++q) {
1752 dout(15) << " clone_range " << p->first << " "
1753 << q.get_start() << "~" << q.get_len() << dendl;
1754 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1755 q.get_start(), q.get_len(), q.get_start());
1756 }
1757 }
1758}
1759
1760ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1761 const ObjectRecoveryInfo& recovery_info,
1762 SnapSetContext *ssc,
1763 ObcLockManager &manager)
1764{
1765 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1766 return recovery_info;
1767 ObjectRecoveryInfo new_info = recovery_info;
1768 new_info.copy_subset.clear();
1769 new_info.clone_subset.clear();
11fdf7f2 1770 ceph_assert(ssc);
7c673cae
FG
1771 get_parent()->release_locks(manager); // might already have locks
1772 calc_clone_subsets(
1773 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1774 get_info().last_backfill,
1775 new_info.copy_subset, new_info.clone_subset,
1776 manager);
1777 return new_info;
1778}
1779
1780bool ReplicatedBackend::handle_pull_response(
1781 pg_shard_t from, const PushOp &pop, PullOp *response,
1782 list<pull_complete_info> *to_continue,
1783 ObjectStore::Transaction *t)
1784{
1785 interval_set<uint64_t> data_included = pop.data_included;
1786 bufferlist data;
1787 data = pop.data;
1788 dout(10) << "handle_pull_response "
1789 << pop.recovery_info
1790 << pop.after_progress
1791 << " data.size() is " << data.length()
1792 << " data_included: " << data_included
1793 << dendl;
1794 if (pop.version == eversion_t()) {
1795 // replica doesn't have it!
224ce89b 1796 _failed_pull(from, pop.soid);
7c673cae
FG
1797 return false;
1798 }
1799
1800 const hobject_t &hoid = pop.soid;
11fdf7f2 1801 ceph_assert((data_included.empty() && data.length() == 0) ||
9f95a23c 1802 (!data_included.empty() && data.length() > 0));
7c673cae
FG
1803
1804 auto piter = pulling.find(hoid);
1805 if (piter == pulling.end()) {
1806 return false;
1807 }
1808
1809 PullInfo &pi = piter->second;
1810 if (pi.recovery_info.size == (uint64_t(-1))) {
1811 pi.recovery_info.size = pop.recovery_info.size;
1812 pi.recovery_info.copy_subset.intersection_of(
1813 pop.recovery_info.copy_subset);
1814 }
224ce89b
WB
1815 // If primary doesn't have object info and didn't know version
1816 if (pi.recovery_info.version == eversion_t()) {
1817 pi.recovery_info.version = pop.version;
1818 }
7c673cae
FG
1819
1820 bool first = pi.recovery_progress.first;
1821 if (first) {
1822 // attrs only reference the origin bufferlist (decode from
1823 // MOSDPGPush message) whose size is much greater than attrs in
1824 // recovery. If obc cache it (get_obc maybe cache the attr), this
1825 // causes the whole origin bufferlist would not be free until obc
1826 // is evicted from obc cache. So rebuild the bufferlists before
1827 // cache it.
1828 auto attrset = pop.attrset;
1829 for (auto& a : attrset) {
1830 a.second.rebuild();
1831 }
1832 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
9f95a23c
TL
1833 if (attrset.find(SS_ATTR) != attrset.end()) {
1834 bufferlist ssbv = attrset.at(SS_ATTR);
1835 SnapSet ss(ssbv);
1836 assert(!pi.obc->ssc->exists || ss.seq == pi.obc->ssc->snapset.seq);
1837 }
7c673cae
FG
1838 pi.recovery_info.oi = pi.obc->obs.oi;
1839 pi.recovery_info = recalc_subsets(
1840 pi.recovery_info,
1841 pi.obc->ssc,
1842 pi.lock_manager);
1843 }
1844
1845
1846 interval_set<uint64_t> usable_intervals;
1847 bufferlist usable_data;
1848 trim_pushed_data(pi.recovery_info.copy_subset,
1849 data_included,
1850 data,
1851 &usable_intervals,
1852 &usable_data);
1853 data_included = usable_intervals;
f67539c2 1854 data = std::move(usable_data);
7c673cae
FG
1855
1856
1857 pi.recovery_progress = pop.after_progress;
1858
1859 dout(10) << "new recovery_info " << pi.recovery_info
9f95a23c
TL
1860 << ", new progress " << pi.recovery_progress
1861 << dendl;
1862 interval_set<uint64_t> data_zeros;
1863 uint64_t z_offset = pop.before_progress.data_recovered_to;
1864 uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
f67539c2 1865 if (z_length)
9f95a23c 1866 data_zeros.insert(z_offset, z_length);
7c673cae 1867 bool complete = pi.is_complete();
9f95a23c
TL
1868 bool clear_omap = !pop.before_progress.omap_complete;
1869
1870 submit_push_data(pi.recovery_info,
1871 first,
1872 complete,
1873 clear_omap,
1874 pi.cache_dont_need,
1875 data_zeros,
1876 data_included,
1877 data,
1878 pop.omap_header,
1879 pop.attrset,
1880 pop.omap_entries,
1881 t);
7c673cae
FG
1882
1883 pi.stat.num_keys_recovered += pop.omap_entries.size();
1884 pi.stat.num_bytes_recovered += data.length();
11fdf7f2 1885 get_parent()->get_logger()->inc(l_osd_rbytes, pop.omap_entries.size() + data.length());
7c673cae
FG
1886
1887 if (complete) {
1888 pi.stat.num_objects_recovered++;
11fdf7f2
TL
1889 // XXX: This could overcount if regular recovery is needed right after a repair
1890 if (get_parent()->pg_is_repair()) {
1891 pi.stat.num_objects_repaired++;
1892 get_parent()->inc_osd_stat_repaired();
1893 }
7c673cae
FG
1894 clear_pull_from(piter);
1895 to_continue->push_back({hoid, pi.stat});
1896 get_parent()->on_local_recover(
c07f9fc5 1897 hoid, pi.recovery_info, pi.obc, false, t);
7c673cae
FG
1898 return false;
1899 } else {
1900 response->soid = pop.soid;
1901 response->recovery_info = pi.recovery_info;
1902 response->recovery_progress = pi.recovery_progress;
1903 return true;
1904 }
1905}
1906
1907void ReplicatedBackend::handle_push(
1908 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
11fdf7f2 1909 ObjectStore::Transaction *t, bool is_repair)
7c673cae
FG
1910{
1911 dout(10) << "handle_push "
1912 << pop.recovery_info
1913 << pop.after_progress
1914 << dendl;
1915 bufferlist data;
1916 data = pop.data;
1917 bool first = pop.before_progress.first;
1918 bool complete = pop.after_progress.data_complete &&
1919 pop.after_progress.omap_complete;
9f95a23c
TL
1920 bool clear_omap = !pop.before_progress.omap_complete;
1921 interval_set<uint64_t> data_zeros;
1922 uint64_t z_offset = pop.before_progress.data_recovered_to;
1923 uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
f67539c2 1924 if (z_length)
9f95a23c 1925 data_zeros.insert(z_offset, z_length);
7c673cae 1926 response->soid = pop.recovery_info.soid;
9f95a23c 1927
7c673cae
FG
1928 submit_push_data(pop.recovery_info,
1929 first,
1930 complete,
9f95a23c 1931 clear_omap,
7c673cae 1932 true, // must be replicate
9f95a23c 1933 data_zeros,
7c673cae
FG
1934 pop.data_included,
1935 data,
1936 pop.omap_header,
1937 pop.attrset,
1938 pop.omap_entries,
1939 t);
1940
11fdf7f2
TL
1941 if (complete) {
1942 if (is_repair) {
1943 get_parent()->inc_osd_stat_repaired();
1944 dout(20) << __func__ << " repair complete" << dendl;
1945 }
7c673cae
FG
1946 get_parent()->on_local_recover(
1947 pop.recovery_info.soid,
1948 pop.recovery_info,
1949 ObjectContextRef(), // ok, is replica
c07f9fc5 1950 false,
7c673cae 1951 t);
11fdf7f2 1952 }
7c673cae
FG
1953}
1954
1955void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1956{
1957 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1958 i != pushes.end();
1959 ++i) {
1960 ConnectionRef con = get_parent()->get_con_osd_cluster(
1961 i->first.osd,
11fdf7f2 1962 get_osdmap_epoch());
7c673cae
FG
1963 if (!con)
1964 continue;
1965 vector<PushOp>::iterator j = i->second.begin();
1966 while (j != i->second.end()) {
1967 uint64_t cost = 0;
1968 uint64_t pushes = 0;
1969 MOSDPGPush *msg = new MOSDPGPush();
1970 msg->from = get_parent()->whoami_shard();
1971 msg->pgid = get_parent()->primary_spg_t();
11fdf7f2 1972 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1973 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1974 msg->set_priority(prio);
11fdf7f2 1975 msg->is_repair = get_parent()->pg_is_repair();
7c673cae
FG
1976 for (;
1977 (j != i->second.end() &&
1978 cost < cct->_conf->osd_max_push_cost &&
1979 pushes < cct->_conf->osd_max_push_objects) ;
1980 ++j) {
1981 dout(20) << __func__ << ": sending push " << *j
1982 << " to osd." << i->first << dendl;
1983 cost += j->cost(cct);
1984 pushes += 1;
1985 msg->pushes.push_back(*j);
1986 }
1987 msg->set_cost(cost);
1988 get_parent()->send_message_osd_cluster(msg, con);
1989 }
1990 }
1991}
1992
1993void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1994{
1995 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1996 i != pulls.end();
1997 ++i) {
1998 ConnectionRef con = get_parent()->get_con_osd_cluster(
1999 i->first.osd,
11fdf7f2 2000 get_osdmap_epoch());
7c673cae
FG
2001 if (!con)
2002 continue;
2003 dout(20) << __func__ << ": sending pulls " << i->second
2004 << " to osd." << i->first << dendl;
2005 MOSDPGPull *msg = new MOSDPGPull();
2006 msg->from = parent->whoami_shard();
2007 msg->set_priority(prio);
2008 msg->pgid = get_parent()->primary_spg_t();
11fdf7f2 2009 msg->map_epoch = get_osdmap_epoch();
7c673cae 2010 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
f67539c2 2011 msg->set_pulls(std::move(i->second));
7c673cae
FG
2012 msg->compute_cost(cct);
2013 get_parent()->send_message_osd_cluster(msg, con);
2014 }
2015}
2016
2017int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
2018 const ObjectRecoveryProgress &progress,
2019 ObjectRecoveryProgress *out_progress,
2020 PushOp *out_op,
2021 object_stat_sum_t *stat,
2022 bool cache_dont_need)
2023{
2024 ObjectRecoveryProgress _new_progress;
2025 if (!out_progress)
2026 out_progress = &_new_progress;
2027 ObjectRecoveryProgress &new_progress = *out_progress;
2028 new_progress = progress;
2029
224ce89b 2030 dout(7) << __func__ << " " << recovery_info.soid
7c673cae
FG
2031 << " v " << recovery_info.version
2032 << " size " << recovery_info.size
2033 << " recovery_info: " << recovery_info
2034 << dendl;
2035
224ce89b 2036 eversion_t v = recovery_info.version;
11fdf7f2 2037 object_info_t oi;
7c673cae 2038 if (progress.first) {
11fdf7f2 2039 int r = store->omap_get_header(ch, ghobject_t(recovery_info.soid), &out_op->omap_header);
f67539c2
TL
2040 if (r < 0) {
2041 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
7c673cae
FG
2042 return r;
2043 }
2044 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
f67539c2 2045 if (r < 0) {
7c673cae
FG
2046 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
2047 return r;
2048 }
2049
2050 // Debug
2051 bufferlist bv = out_op->attrset[OI_ATTR];
224ce89b 2052 try {
11fdf7f2
TL
2053 auto bliter = bv.cbegin();
2054 decode(oi, bliter);
224ce89b
WB
2055 } catch (...) {
2056 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
2057 return -EINVAL;
2058 }
7c673cae 2059
224ce89b
WB
2060 // If requestor didn't know the version, use ours
2061 if (v == eversion_t()) {
2062 v = oi.version;
2063 } else if (oi.version != v) {
7c673cae
FG
2064 get_parent()->clog_error() << get_info().pgid << " push "
2065 << recovery_info.soid << " v "
2066 << recovery_info.version
2067 << " failed because local copy is "
2068 << oi.version;
2069 return -EINVAL;
2070 }
2071
2072 new_progress.first = false;
2073 }
224ce89b
WB
2074 // Once we provide the version subsequent requests will have it, so
2075 // at this point it must be known.
11fdf7f2 2076 ceph_assert(v != eversion_t());
7c673cae
FG
2077
2078 uint64_t available = cct->_conf->osd_recovery_max_chunk;
2079 if (!progress.omap_complete) {
2080 ObjectMap::ObjectMapIterator iter =
11fdf7f2 2081 store->get_omap_iterator(ch,
7c673cae 2082 ghobject_t(recovery_info.soid));
11fdf7f2 2083 ceph_assert(iter);
7c673cae
FG
2084 for (iter->lower_bound(progress.omap_recovered_to);
2085 iter->valid();
11fdf7f2 2086 iter->next()) {
7c673cae
FG
2087 if (!out_op->omap_entries.empty() &&
2088 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
2089 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
2090 available <= iter->key().size() + iter->value().length()))
2091 break;
2092 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
2093
2094 if ((iter->key().size() + iter->value().length()) <= available)
2095 available -= (iter->key().size() + iter->value().length());
2096 else
2097 available = 0;
2098 }
2099 if (!iter->valid())
2100 new_progress.omap_complete = true;
2101 else
2102 new_progress.omap_recovered_to = iter->key();
2103 }
2104
2105 if (available > 0) {
2106 if (!recovery_info.copy_subset.empty()) {
2107 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2108 map<uint64_t, uint64_t> m;
2109 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2110 copy_subset.range_end(), m);
2111 if (r >= 0) {
9f95a23c 2112 interval_set<uint64_t> fiemap_included(std::move(m));
7c673cae
FG
2113 copy_subset.intersection_of(fiemap_included);
2114 } else {
2115 // intersection of copy_subset and empty interval_set would be empty anyway
2116 copy_subset.clear();
2117 }
2118
2119 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2120 available);
f67539c2
TL
2121 // zero filled section, skip to end!
2122 if (out_op->data_included.empty() ||
2123 out_op->data_included.range_end() == copy_subset.range_end())
7c673cae
FG
2124 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2125 else
2126 new_progress.data_recovered_to = out_op->data_included.range_end();
2127 }
2128 } else {
2129 out_op->data_included.clear();
2130 }
2131
9f95a23c
TL
2132 auto origin_size = out_op->data_included.size();
2133 bufferlist bit;
2134 int r = store->readv(ch, ghobject_t(recovery_info.soid),
2135 out_op->data_included, bit,
2136 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2137 if (cct->_conf->osd_debug_random_push_read_error &&
224ce89b 2138 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
9f95a23c
TL
2139 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2140 r = -EIO;
7c673cae 2141 }
9f95a23c
TL
2142 if (r < 0) {
2143 return r;
2144 }
2145 if (out_op->data_included.size() != origin_size) {
2146 dout(10) << __func__ << " some extents get pruned "
2147 << out_op->data_included.size() << "/" << origin_size
2148 << dendl;
2149 new_progress.data_complete = true;
2150 }
2151 out_op->data.claim_append(bit);
eafe8130
TL
2152 if (progress.first && !out_op->data_included.empty() &&
2153 out_op->data_included.begin().get_start() == 0 &&
11fdf7f2
TL
2154 out_op->data.length() == oi.size && oi.is_data_digest()) {
2155 uint32_t crc = out_op->data.crc32c(-1);
2156 if (oi.data_digest != crc) {
2157 dout(0) << __func__ << " " << coll << std::hex
2158 << " full-object read crc 0x" << crc
2159 << " != expected 0x" << oi.data_digest
2160 << std::dec << " on " << recovery_info.soid << dendl;
2161 return -EIO;
2162 }
2163 }
7c673cae
FG
2164
2165 if (new_progress.is_complete(recovery_info)) {
2166 new_progress.data_complete = true;
11fdf7f2 2167 if (stat) {
7c673cae 2168 stat->num_objects_recovered++;
11fdf7f2
TL
2169 if (get_parent()->pg_is_repair())
2170 stat->num_objects_repaired++;
2171 }
9f95a23c
TL
2172 } else if (progress.first && progress.omap_complete) {
2173 // If omap is not changed, we need recovery omap when recovery cannot be completed once
2174 new_progress.omap_complete = false;
7c673cae
FG
2175 }
2176
2177 if (stat) {
2178 stat->num_keys_recovered += out_op->omap_entries.size();
2179 stat->num_bytes_recovered += out_op->data.length();
11fdf7f2 2180 get_parent()->get_logger()->inc(l_osd_rbytes, out_op->omap_entries.size() + out_op->data.length());
7c673cae
FG
2181 }
2182
2183 get_parent()->get_logger()->inc(l_osd_push);
2184 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2185
2186 // send
224ce89b 2187 out_op->version = v;
7c673cae
FG
2188 out_op->soid = recovery_info.soid;
2189 out_op->recovery_info = recovery_info;
2190 out_op->after_progress = new_progress;
2191 out_op->before_progress = progress;
2192 return 0;
2193}
2194
2195void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2196{
2197 op->recovery_info.version = eversion_t();
2198 op->version = eversion_t();
2199 op->soid = soid;
2200}
2201
2202bool ReplicatedBackend::handle_push_reply(
2203 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2204{
2205 const hobject_t &soid = op.soid;
2206 if (pushing.count(soid) == 0) {
2207 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2208 << ", or anybody else"
2209 << dendl;
2210 return false;
2211 } else if (pushing[soid].count(peer) == 0) {
2212 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2213 << dendl;
2214 return false;
2215 } else {
2216 PushInfo *pi = &pushing[soid][peer];
224ce89b 2217 bool error = pushing[soid].begin()->second.recovery_progress.error;
7c673cae 2218
224ce89b 2219 if (!pi->recovery_progress.data_complete && !error) {
7c673cae
FG
2220 dout(10) << " pushing more from, "
2221 << pi->recovery_progress.data_recovered_to
2222 << " of " << pi->recovery_info.copy_subset << dendl;
2223 ObjectRecoveryProgress new_progress;
2224 int r = build_push_op(
2225 pi->recovery_info,
2226 pi->recovery_progress, &new_progress, reply,
2227 &(pi->stat));
224ce89b 2228 // Handle the case of a read error right after we wrote, which is
11fdf7f2 2229 // hopefully extremely rare.
224ce89b
WB
2230 if (r < 0) {
2231 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2232
2233 error = true;
2234 goto done;
2235 }
7c673cae
FG
2236 pi->recovery_progress = new_progress;
2237 return true;
2238 } else {
2239 // done!
224ce89b
WB
2240done:
2241 if (!error)
2242 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
7c673cae
FG
2243
2244 get_parent()->release_locks(pi->lock_manager);
2245 object_stat_sum_t stat = pi->stat;
224ce89b 2246 eversion_t v = pi->recovery_info.version;
7c673cae
FG
2247 pushing[soid].erase(peer);
2248 pi = NULL;
2249
2250 if (pushing[soid].empty()) {
224ce89b 2251 if (!error)
c07f9fc5 2252 get_parent()->on_global_recover(soid, stat, false);
224ce89b 2253 else
9f95a23c
TL
2254 get_parent()->on_failed_pull(
2255 std::set<pg_shard_t>{ get_parent()->whoami_shard() },
2256 soid,
2257 v);
7c673cae
FG
2258 pushing.erase(soid);
2259 } else {
224ce89b
WB
2260 // This looks weird, but we erased the current peer and need to remember
2261 // the error on any other one, while getting more acks.
2262 if (error)
2263 pushing[soid].begin()->second.recovery_progress.error = true;
7c673cae
FG
2264 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2265 << pushing[soid].size() << " others" << dendl;
2266 }
2267 return false;
2268 }
2269 }
2270}
2271
2272void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2273{
2274 const hobject_t &soid = op.soid;
2275 struct stat st;
2276 int r = store->stat(ch, ghobject_t(soid), &st);
2277 if (r != 0) {
2278 get_parent()->clog_error() << get_info().pgid << " "
2279 << peer << " tried to pull " << soid
2280 << " but got " << cpp_strerror(-r);
2281 prep_push_op_blank(soid, reply);
2282 } else {
2283 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2284 ObjectRecoveryProgress &progress = op.recovery_progress;
2285 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2286 // Adjust size and copy_subset
2287 recovery_info.size = st.st_size;
9f95a23c
TL
2288 if (st.st_size) {
2289 interval_set<uint64_t> object_range;
2290 object_range.insert(0, st.st_size);
2291 recovery_info.copy_subset.intersection_of(object_range);
2292 } else {
2293 recovery_info.copy_subset.clear();
2294 }
2295 assert(recovery_info.clone_subset.empty());
7c673cae
FG
2296 }
2297
2298 r = build_push_op(recovery_info, progress, 0, reply);
2299 if (r < 0)
2300 prep_push_op_blank(soid, reply);
2301 }
2302}
2303
2304/**
2305 * trim received data to remove what we don't want
2306 *
2307 * @param copy_subset intervals we want
2308 * @param data_included intervals we got
2309 * @param data_recieved data we got
2310 * @param intervals_usable intervals we want to keep
2311 * @param data_usable matching data we want to keep
2312 */
2313void ReplicatedBackend::trim_pushed_data(
2314 const interval_set<uint64_t> &copy_subset,
2315 const interval_set<uint64_t> &intervals_received,
2316 bufferlist data_received,
2317 interval_set<uint64_t> *intervals_usable,
2318 bufferlist *data_usable)
2319{
2320 if (intervals_received.subset_of(copy_subset)) {
2321 *intervals_usable = intervals_received;
2322 *data_usable = data_received;
2323 return;
2324 }
2325
2326 intervals_usable->intersection_of(copy_subset,
2327 intervals_received);
2328
2329 uint64_t off = 0;
2330 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2331 p != intervals_received.end();
2332 ++p) {
2333 interval_set<uint64_t> x;
2334 x.insert(p.get_start(), p.get_len());
2335 x.intersection_of(copy_subset);
2336 for (interval_set<uint64_t>::const_iterator q = x.begin();
2337 q != x.end();
2338 ++q) {
2339 bufferlist sub;
2340 uint64_t data_off = off + (q.get_start() - p.get_start());
2341 sub.substr_of(data_received, data_off, q.get_len());
2342 data_usable->claim_append(sub);
2343 }
2344 off += p.get_len();
2345 }
2346}
2347
224ce89b 2348void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
7c673cae 2349{
224ce89b 2350 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
81eedcae
TL
2351 auto it = pulling.find(soid);
2352 assert(it != pulling.end());
9f95a23c
TL
2353 get_parent()->on_failed_pull(
2354 { from },
2355 soid,
2356 it->second.recovery_info.version);
7c673cae 2357
81eedcae 2358 clear_pull(it);
7c673cae
FG
2359}
2360
2361void ReplicatedBackend::clear_pull_from(
2362 map<hobject_t, PullInfo>::iterator piter)
2363{
2364 auto from = piter->second.from;
2365 pull_from_peer[from].erase(piter->second.soid);
2366 if (pull_from_peer[from].empty())
2367 pull_from_peer.erase(from);
2368}
2369
2370void ReplicatedBackend::clear_pull(
2371 map<hobject_t, PullInfo>::iterator piter,
2372 bool clear_pull_from_peer)
2373{
2374 if (clear_pull_from_peer) {
2375 clear_pull_from(piter);
2376 }
2377 get_parent()->release_locks(piter->second.lock_manager);
2378 pulling.erase(piter);
2379}
2380
2381int ReplicatedBackend::start_pushes(
2382 const hobject_t &soid,
2383 ObjectContextRef obc,
2384 RPGHandle *h)
2385{
224ce89b
WB
2386 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2387
2388 dout(20) << __func__ << " soid " << soid << dendl;
7c673cae 2389 // who needs it?
11fdf7f2 2390 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
7c673cae 2391 for (set<pg_shard_t>::iterator i =
11fdf7f2
TL
2392 get_parent()->get_acting_recovery_backfill_shards().begin();
2393 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2394 ++i) {
2395 if (*i == get_parent()->whoami_shard()) continue;
2396 pg_shard_t peer = *i;
2397 map<pg_shard_t, pg_missing_t>::const_iterator j =
2398 get_parent()->get_shard_missing().find(peer);
11fdf7f2 2399 ceph_assert(j != get_parent()->get_shard_missing().end());
7c673cae 2400 if (j->second.is_missing(soid)) {
224ce89b
WB
2401 shards.push_back(j);
2402 }
2403 }
2404
2405 // If more than 1 read will occur ignore possible request to not cache
2406 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2407
2408 for (auto j : shards) {
2409 pg_shard_t peer = j->first;
2410 h->pushes[peer].push_back(PushOp());
2411 int r = prep_push_to_replica(obc, soid, peer,
2412 &(h->pushes[peer].back()), cache);
2413 if (r < 0) {
2414 // Back out all failed reads
2415 for (auto k : shards) {
2416 pg_shard_t p = k->first;
2417 dout(10) << __func__ << " clean up peer " << p << dendl;
2418 h->pushes[p].pop_back();
2419 if (p == peer) break;
2420 }
2421 return r;
7c673cae
FG
2422 }
2423 }
224ce89b 2424 return shards.size();
7c673cae 2425}